This module is deprecated as of March 2024 and will not be available after September 2024.
It has been replaced by the Vertex AI worker, which offers enhanced functionality and better performance.
For upgrade instructions, see https://docs.prefect.io/latest/guides/upgrade-guide-agents-to-workers/.
@deprecated_class(start_date="Mar 2024",help=("Use the Vertex AI worker instead."" Refer to the upgrade guide for more information:"" https://docs.prefect.io/latest/guides/upgrade-guide-agents-to-workers/."),)classVertexAICustomTrainingJob(Infrastructure):""" Infrastructure block used to run Vertex AI custom training jobs. """_block_type_name="Vertex AI Custom Training Job"_block_type_slug="vertex-ai-custom-training-job"_logo_url="https://cdn.sanity.io/images/3ugk85nk/production/10424e311932e31c477ac2b9ef3d53cefbaad708-250x250.png"# noqa_documentation_url="https://prefecthq.github.io/prefect-gcp/aiplatform/#prefect_gcp.aiplatform.VertexAICustomTrainingJob"# noqa: E501type:Literal["vertex-ai-custom-training-job"]=Field("vertex-ai-custom-training-job",description="The slug for this task type.")gcp_credentials:GcpCredentials=Field(default_factory=GcpCredentials,description=("GCP credentials to use when running the configured Vertex AI custom ""training job. If not provided, credentials will be inferred from the ""environment. See `GcpCredentials` for details."),)region:str=Field(default=...,description="The region where the Vertex AI custom training job resides.",)image:str=Field(default=...,title="Image Name",description=("The image to use for a new Vertex AI custom training job. This value must ""refer to an image within either Google Container Registry ""or Google Artifact Registry, like `gcr.io/<project_name>/<repo>/`."),)env:Dict[str,str]=Field(default_factory=dict,title="Environment Variables",description="Environment variables to be passed to your Cloud Run Job.",)machine_type:str=Field(default="n1-standard-4",description="The machine type to use for the run, which controls the available ""CPU and memory.",)accelerator_type:Optional[str]=Field(default=None,description="The type of accelerator to attach to the machine.")accelerator_count:Optional[int]=Field(default=None,description="The number of accelerators to attach to the machine.")boot_disk_type:str=Field(default="pd-ssd",title="Boot Disk Type",description="The type of boot disk to attach to the machine.",)boot_disk_size_gb:int=Field(default=100,title="Boot Disk Size",description="The size of the boot disk to attach to the machine, in gigabytes.",)maximum_run_time:datetime.timedelta=Field(default=datetime.timedelta(days=7),description="The maximum job running time.")network:Optional[str]=Field(default=None,description="The full name of the Compute Engine network""to which the Job should be peered. Private services access must ""already be configured for the network. If left unspecified, the job ""is not peered with any network.",)reserved_ip_ranges:Optional[List[str]]=Field(default=None,description="A list of names for the reserved ip ranges under the VPC ""network that can be used for this job. If set, we will deploy the job ""within the provided ip ranges. Otherwise, the job will be deployed to ""any ip ranges under the provided VPC network.",)service_account:Optional[str]=Field(default=None,description=("Specifies the service account to use ""as the run-as account in Vertex AI. The agent submitting jobs must have ""act-as permission on this run-as account. If unspecified, the AI ""Platform Custom Code Service Agent for the CustomJob's project is ""used. Takes precedence over the service account found in gcp_credentials, ""and required if a service account cannot be detected in gcp_credentials."),)job_watch_poll_interval:float=Field(default=5.0,description=("The amount of time to wait between GCP API calls while monitoring the ""state of a Vertex AI Job."),)@propertydefjob_name(self):""" The name can be up to 128 characters long and can be consist of any UTF-8 characters. Reference: https://cloud.google.com/python/docs/reference/aiplatform/latest/google.cloud.aiplatform.CustomJob#google_cloud_aiplatform_CustomJob_display_name """# noqatry:base_name=self.nameorself.image.split("/")[2]returnf"{base_name}-{uuid4().hex}"exceptIndexError:raiseValueError("The provided image must be from either Google Container Registry ""or Google Artifact Registry")def_get_compatible_labels(self)->Dict[str,str]:""" Ensures labels are compatible with GCP label requirements. https://cloud.google.com/resource-manager/docs/creating-managing-labels Ex: the Prefect provided key of prefect.io/flow-name -> prefect-io_flow-name """compatible_labels={}forkey,valinself.labels.items():new_key=slugify(key,lowercase=True,replacements=[("/","_"),(".","-")],max_length=63,regex_pattern=_DISALLOWED_GCP_LABEL_CHARACTERS,)compatible_labels[new_key]=slugify(val,lowercase=True,replacements=[("/","_"),(".","-")],max_length=63,regex_pattern=_DISALLOWED_GCP_LABEL_CHARACTERS,)returncompatible_labelsdefpreview(self)->str:"""Generate a preview of the job definition that will be sent to GCP."""job_spec=self._build_job_spec()custom_job=CustomJob(display_name=self.job_name,job_spec=job_spec,labels=self._get_compatible_labels(),)returnstr(custom_job)# outputs a json stringdefget_corresponding_worker_type(self)->str:"""Return the corresponding worker type for this infrastructure block."""return"vertex-ai"asyncdefgenerate_work_pool_base_job_template(self)->dict:""" Generate a base job template for a `Vertex AI` work pool with the same configuration as this block. Returns: - dict: a base job template for a `Vertex AI` work pool """base_job_template=awaitget_default_base_job_template_for_infrastructure_type(self.get_corresponding_worker_type(),)assert(base_job_templateisnotNone),"Failed to generate default base job template for Cloud Run worker."forkey,valueinself.dict(exclude_unset=True,exclude_defaults=True).items():ifkey=="command":base_job_template["variables"]["properties"]["command"]["default"]=shlex.join(value)elifkeyin["type","block_type_slug","_block_document_id","_block_document_name","_is_anonymous",]:continueelifkey=="gcp_credentials":ifnotself.gcp_credentials._block_document_id:raiseBlockNotSavedError("It looks like you are trying to use a block that"" has not been saved. Please call `.save` on your block"" before publishing it as a work pool.")base_job_template["variables"]["properties"]["credentials"]["default"]={"$ref":{"block_document_id":str(self.gcp_credentials._block_document_id)}}elifkey=="maximum_run_time":base_job_template["variables"]["properties"]["maximum_run_time_hours"]["default"]=round(value.total_seconds()/3600)elifkey=="service_account":base_job_template["variables"]["properties"]["service_account_name"]["default"]=valueelifkeyinbase_job_template["variables"]["properties"]:base_job_template["variables"]["properties"][key]["default"]=valueelse:self.logger.warning(f"Variable {key!r} is not supported by `Vertex AI` work pools."" Skipping.")returnbase_job_templatedef_build_job_spec(self)->"CustomJobSpec":""" Builds a job spec by gathering details. """# gather worker pool specenv_list=[{"name":name,"value":value}forname,valuein{**self._base_environment(),**self.env,}.items()]container_spec=ContainerSpec(image_uri=self.image,command=self.command,args=[],env=env_list)machine_spec=MachineSpec(machine_type=self.machine_type,accelerator_type=self.accelerator_type,accelerator_count=self.accelerator_count,)worker_pool_spec=WorkerPoolSpec(container_spec=container_spec,machine_spec=machine_spec,replica_count=1,disk_spec=DiskSpec(boot_disk_type=self.boot_disk_type,boot_disk_size_gb=self.boot_disk_size_gb,),)# look for service accountservice_account=(self.service_accountorself.gcp_credentials._service_account_email)ifservice_accountisNone:raiseValueError("A service account is required for the Vertex job. ""A service account could not be detected in the attached credentials; ""please set a service account explicitly, e.g. "'`VertexAICustomTrainingJob(service_acount="...")`')# build custom job specstimeout=Duration().FromTimedelta(td=self.maximum_run_time)scheduling=Scheduling(timeout=timeout)job_spec=CustomJobSpec(worker_pool_specs=[worker_pool_spec],service_account=service_account,scheduling=scheduling,network=self.network,reserved_ip_ranges=self.reserved_ip_ranges,)returnjob_specasyncdef_create_and_begin_job(self,job_spec:"CustomJobSpec",job_service_async_client:"JobServiceAsyncClient",)->"CustomJob":""" Builds a custom job and begins running it. """# create custom jobcustom_job=CustomJob(display_name=self.job_name,job_spec=job_spec,labels=self._get_compatible_labels(),)# run jobself.logger.info(f"{self._log_prefix}: Creating job {self.job_name!r} "f"with command {' '.join(self.command)!r} in region "f"{self.region!r} using image {self.image!r}")project=self.gcp_credentials.projectresource_name=f"projects/{project}/locations/{self.region}"asyncforattemptinAsyncRetrying(stop=stop_after_attempt(3),wait=wait_fixed(1)+wait_random(0,3)):withattempt:custom_job_run=awaitjob_service_async_client.create_custom_job(parent=resource_name,custom_job=custom_job,)self.logger.info(f"{self._log_prefix}: Job {self.job_name!r} created. "f"The full job name is {custom_job_run.name!r}")returncustom_job_runasyncdef_watch_job_run(self,full_job_name:str,# different from self.job_namejob_service_async_client:"JobServiceAsyncClient",current_state:"JobState",until_states:Tuple["JobState"],timeout:int=None,)->"CustomJob":""" Polls job run to see if status changed. State changes reported by the Vertex AI API may sometimes be inaccurate immediately upon startup, but should eventually report a correct running and then terminal state. The minimum training duration for a custom job is 30 seconds, so short-lived jobs may be marked as successful some time after a flow run has completed. """state=JobState.JOB_STATE_UNSPECIFIEDlast_state=current_statet0=time.time()whilestatenotinuntil_states:job_run=awaitjob_service_async_client.get_custom_job(name=full_job_name,)state=job_run.stateifstate!=last_state:state_label=(state.name.replace("_"," ").lower().replace("state","state is now:"))# results in "New job state is now: succeeded"self.logger.debug(f"{self._log_prefix}: {self.job_name} has new {state_label}")last_state=stateelse:# Intermittently, the job will not be described. We want to respect the# watch timeout though.self.logger.debug(f"{self._log_prefix}: Job not found.")elapsed_time=time.time()-t0iftimeoutisnotNoneandelapsed_time>timeout:raiseRuntimeError(f"Timed out after {elapsed_time}s while watching job for states ""{until_states!r}")awaitasyncio.sleep(self.job_watch_poll_interval)returnjob_run@sync_compatibleasyncdefrun(self,task_status:Optional["TaskStatus"]=None)->VertexAICustomTrainingJobResult:""" Run the configured task on VertexAI. Args: task_status: An optional `TaskStatus` to update when the container starts. Returns: The `VertexAICustomTrainingJobResult`. """client_options=ClientOptions(api_endpoint=f"{self.region}-aiplatform.googleapis.com")job_spec=self._build_job_spec()job_service_async_client=self.gcp_credentials.get_job_service_async_client(client_options=client_options)job_run=awaitself._create_and_begin_job(job_spec,job_service_async_client,)iftask_status:task_status.started(self.job_name)final_job_run=awaitself._watch_job_run(full_job_name=job_run.name,job_service_async_client=job_service_async_client,current_state=job_run.state,until_states=(JobState.JOB_STATE_SUCCEEDED,JobState.JOB_STATE_FAILED,JobState.JOB_STATE_CANCELLED,JobState.JOB_STATE_EXPIRED,),timeout=self.maximum_run_time.total_seconds(),)error_msg=final_job_run.error.messageiferror_msg:raiseRuntimeError(f"{self._log_prefix}: {error_msg}")status_code=0iffinal_job_run.state==JobState.JOB_STATE_SUCCEEDEDelse1returnVertexAICustomTrainingJobResult(identifier=final_job_run.display_name,status_code=status_code)@sync_compatibleasyncdefkill(self,identifier:str,grace_seconds:int=30)->None:""" Kill a job running Cloud Run. Args: identifier: The Vertex AI full job name, formatted like "projects/{project}/locations/{location}/customJobs/{custom_job}". Returns: The `VertexAICustomTrainingJobResult`. """client_options=ClientOptions(api_endpoint=f"{self.region}-aiplatform.googleapis.com")job_service_async_client=self.gcp_credentials.get_job_service_async_client(client_options=client_options)awaitself._kill_job(job_service_async_client=job_service_async_client,full_job_name=identifier,)self.logger.info(f"Requested to cancel {identifier}...")asyncdef_kill_job(self,job_service_async_client:"JobServiceAsyncClient",full_job_name:str)->None:""" Thin wrapper around Job.delete, wrapping a try/except since Job is an independent class that doesn't have knowledge of CloudRunJob and its associated logic. """cancel_custom_job_request=CancelCustomJobRequest(name=full_job_name)try:awaitjob_service_async_client.cancel_custom_job(request=cancel_custom_job_request,)exceptExceptionasexc:if"does not exist"instr(exc):raiseInfrastructureNotFound(f"Cannot stop Vertex AI job; the job name {full_job_name!r} ""could not be found.")fromexcraise@propertydef_log_prefix(self)->str:""" Internal property for generating a prefix for logs where `name` may be null """ifself.nameisnotNone:returnf"VertexAICustomTrainingJob {self.name!r}"else:return"VertexAICustomTrainingJob"
The name can be up to 128 characters long and can be consist of any UTF-8 characters. Reference:
https://cloud.google.com/python/docs/reference/aiplatform/latest/google.cloud.aiplatform.CustomJob#google_cloud_aiplatform_CustomJob_display_name
Generate a base job template for a Vertex AI work pool with the same
configuration as this block.
Returns:
- dict: a base job template for a Vertex AI work pool
asyncdefgenerate_work_pool_base_job_template(self)->dict:""" Generate a base job template for a `Vertex AI` work pool with the same configuration as this block. Returns: - dict: a base job template for a `Vertex AI` work pool """base_job_template=awaitget_default_base_job_template_for_infrastructure_type(self.get_corresponding_worker_type(),)assert(base_job_templateisnotNone),"Failed to generate default base job template for Cloud Run worker."forkey,valueinself.dict(exclude_unset=True,exclude_defaults=True).items():ifkey=="command":base_job_template["variables"]["properties"]["command"]["default"]=shlex.join(value)elifkeyin["type","block_type_slug","_block_document_id","_block_document_name","_is_anonymous",]:continueelifkey=="gcp_credentials":ifnotself.gcp_credentials._block_document_id:raiseBlockNotSavedError("It looks like you are trying to use a block that"" has not been saved. Please call `.save` on your block"" before publishing it as a work pool.")base_job_template["variables"]["properties"]["credentials"]["default"]={"$ref":{"block_document_id":str(self.gcp_credentials._block_document_id)}}elifkey=="maximum_run_time":base_job_template["variables"]["properties"]["maximum_run_time_hours"]["default"]=round(value.total_seconds()/3600)elifkey=="service_account":base_job_template["variables"]["properties"]["service_account_name"]["default"]=valueelifkeyinbase_job_template["variables"]["properties"]:base_job_template["variables"]["properties"][key]["default"]=valueelse:self.logger.warning(f"Variable {key!r} is not supported by `Vertex AI` work pools."" Skipping.")returnbase_job_template
@sync_compatibleasyncdefkill(self,identifier:str,grace_seconds:int=30)->None:""" Kill a job running Cloud Run. Args: identifier: The Vertex AI full job name, formatted like "projects/{project}/locations/{location}/customJobs/{custom_job}". Returns: The `VertexAICustomTrainingJobResult`. """client_options=ClientOptions(api_endpoint=f"{self.region}-aiplatform.googleapis.com")job_service_async_client=self.gcp_credentials.get_job_service_async_client(client_options=client_options)awaitself._kill_job(job_service_async_client=job_service_async_client,full_job_name=identifier,)self.logger.info(f"Requested to cancel {identifier}...")
Generate a preview of the job definition that will be sent to GCP.
Source code in prefect_gcp/aiplatform.py
259260261262263264265266267
defpreview(self)->str:"""Generate a preview of the job definition that will be sent to GCP."""job_spec=self._build_job_spec()custom_job=CustomJob(display_name=self.job_name,job_spec=job_spec,labels=self._get_compatible_labels(),)returnstr(custom_job)# outputs a json string
@sync_compatibleasyncdefrun(self,task_status:Optional["TaskStatus"]=None)->VertexAICustomTrainingJobResult:""" Run the configured task on VertexAI. Args: task_status: An optional `TaskStatus` to update when the container starts. Returns: The `VertexAICustomTrainingJobResult`. """client_options=ClientOptions(api_endpoint=f"{self.region}-aiplatform.googleapis.com")job_spec=self._build_job_spec()job_service_async_client=self.gcp_credentials.get_job_service_async_client(client_options=client_options)job_run=awaitself._create_and_begin_job(job_spec,job_service_async_client,)iftask_status:task_status.started(self.job_name)final_job_run=awaitself._watch_job_run(full_job_name=job_run.name,job_service_async_client=job_service_async_client,current_state=job_run.state,until_states=(JobState.JOB_STATE_SUCCEEDED,JobState.JOB_STATE_FAILED,JobState.JOB_STATE_CANCELLED,JobState.JOB_STATE_EXPIRED,),timeout=self.maximum_run_time.total_seconds(),)error_msg=final_job_run.error.messageiferror_msg:raiseRuntimeError(f"{self._log_prefix}: {error_msg}")status_code=0iffinal_job_run.state==JobState.JOB_STATE_SUCCEEDEDelse1returnVertexAICustomTrainingJobResult(identifier=final_job_run.display_name,status_code=status_code)