classPrefectClient:""" An asynchronous client for interacting with the [Prefect REST API](/api-ref/rest-api/). Args: api: the REST API URL or FastAPI application to connect to api_key: An optional API key for authentication. api_version: The API version this client is compatible with. httpx_settings: An optional dictionary of settings to pass to the underlying `httpx.AsyncClient` Examples: Say hello to a Prefect REST API <div class="terminal"> ``` >>> async with get_client() as client: >>> response = await client.hello() >>> >>> print(response.json()) 👋 ``` </div> """def__init__(self,api:Union[str,ASGIApp],*,api_key:str=None,api_version:str=None,httpx_settings:Optional[Dict[str,Any]]=None,)->None:httpx_settings=httpx_settings.copy()ifhttpx_settingselse{}httpx_settings.setdefault("headers",{})ifPREFECT_API_TLS_INSECURE_SKIP_VERIFY:httpx_settings.setdefault("verify",False)else:cert_file=PREFECT_API_SSL_CERT_FILE.value()ifnotcert_file:cert_file=certifi.where()httpx_settings.setdefault("verify",cert_file)ifapi_versionisNone:api_version=SERVER_API_VERSIONhttpx_settings["headers"].setdefault("X-PREFECT-API-VERSION",api_version)ifapi_key:httpx_settings["headers"].setdefault("Authorization",f"Bearer {api_key}")# Context managementself._exit_stack=AsyncExitStack()self._ephemeral_app:Optional[ASGIApp]=Noneself.manage_lifespan=Trueself.server_type:ServerType# Only set if this client started the lifespan of the applicationself._ephemeral_lifespan:Optional[LifespanManager]=Noneself._closed=Falseself._started=False# Connect to an external applicationifisinstance(api,str):ifhttpx_settings.get("app"):raiseValueError("Invalid httpx settings: `app` cannot be set when providing an ""api url. `app` is only for use with ephemeral instances. Provide ""it as the `api` parameter instead.")httpx_settings.setdefault("base_url",api)# See https://www.python-httpx.org/advanced/#pool-limit-configurationhttpx_settings.setdefault("limits",httpx.Limits(# We see instability when allowing the client to open many connections at once.# Limiting concurrency results in more stable performance.max_connections=16,max_keepalive_connections=8,# The Prefect Cloud LB will keep connections alive for 30s.# Only allow the client to keep connections alive for 25s.keepalive_expiry=25,),)# See https://www.python-httpx.org/http2/# Enabling HTTP/2 support on the client does not necessarily mean that your requests# and responses will be transported over HTTP/2, since both the client and the server# need to support HTTP/2. If you connect to a server that only supports HTTP/1.1 the# client will use a standard HTTP/1.1 connection instead.httpx_settings.setdefault("http2",PREFECT_API_ENABLE_HTTP2.value())self.server_type=(ServerType.CLOUDifapi.startswith(PREFECT_CLOUD_API_URL.value())elseServerType.SERVER)# Connect to an in-process applicationelifisinstance(api,ASGIApp):self._ephemeral_app=apiself.server_type=ServerType.EPHEMERAL# When using an ephemeral server, server-side exceptions can be raised# client-side breaking all of our response error code handling. To work# around this, we create an ASGI transport with application exceptions# disabled instead of using the application directly.# refs:# - https://github.com/PrefectHQ/prefect/pull/9637# - https://github.com/encode/starlette/blob/d3a11205ed35f8e5a58a711db0ff59c86fa7bb31/starlette/middleware/errors.py#L184# - https://github.com/tiangolo/fastapi/blob/8cc967a7605d3883bd04ceb5d25cc94ae079612f/fastapi/applications.py#L163-L164httpx_settings.setdefault("transport",httpx.ASGITransport(app=self._ephemeral_app,raise_app_exceptions=False),)httpx_settings.setdefault("base_url","http://ephemeral-prefect/api")else:raiseTypeError(f"Unexpected type {type(api).__name__!r} for argument `api`. Expected"" 'str' or 'ASGIApp/FastAPI'")# See https://www.python-httpx.org/advanced/#timeout-configurationhttpx_settings.setdefault("timeout",httpx.Timeout(connect=PREFECT_API_REQUEST_TIMEOUT.value(),read=PREFECT_API_REQUEST_TIMEOUT.value(),write=PREFECT_API_REQUEST_TIMEOUT.value(),pool=PREFECT_API_REQUEST_TIMEOUT.value(),),)ifnotPREFECT_UNIT_TEST_MODE:httpx_settings.setdefault("follow_redirects",True)enable_csrf_support=(self.server_type!=ServerType.CLOUDandPREFECT_CLIENT_CSRF_SUPPORT_ENABLED.value())self._client=PrefectHttpxAsyncClient(**httpx_settings,enable_csrf_support=enable_csrf_support)self._loop=None# See https://www.python-httpx.org/advanced/#custom-transports## If we're using an HTTP/S client (not the ephemeral client), adjust the# transport to add retries _after_ it is instantiated. If we alter the transport# before instantiation, the transport will not be aware of proxies unless we# reproduce all of the logic to make it so.## Only alter the transport to set our default of 3 retries, don't modify any# transport a user may have provided via httpx_settings.## Making liberal use of getattr and isinstance checks here to avoid any# surprises if the internals of httpx or httpcore change on usifisinstance(api,str)andnothttpx_settings.get("transport"):transport_for_url=getattr(self._client,"_transport_for_url",None)ifcallable(transport_for_url):server_transport=transport_for_url(httpx.URL(api))ifisinstance(server_transport,httpx.AsyncHTTPTransport):pool=getattr(server_transport,"_pool",None)ifisinstance(pool,httpcore.AsyncConnectionPool):pool._retries=3self.logger=get_logger("client")@propertydefapi_url(self)->httpx.URL:""" Get the base URL for the API. """returnself._client.base_url# API methods ----------------------------------------------------------------------asyncdefapi_healthcheck(self)->Optional[Exception]:""" Attempts to connect to the API and returns the encountered exception if not successful. If successful, returns `None`. """try:awaitself._client.get("/health")returnNoneexceptExceptionasexc:returnexcasyncdefhello(self)->httpx.Response:""" Send a GET request to /hello for testing purposes. """returnawaitself._client.get("/hello")asyncdefcreate_flow(self,flow:"FlowObject")->UUID:""" Create a flow in the Prefect API. Args: flow: a [Flow][prefect.flows.Flow] object Raises: httpx.RequestError: if a flow was not created for any reason Returns: the ID of the flow in the backend """returnawaitself.create_flow_from_name(flow.name)asyncdefcreate_flow_from_name(self,flow_name:str)->UUID:""" Create a flow in the Prefect API. Args: flow_name: the name of the new flow Raises: httpx.RequestError: if a flow was not created for any reason Returns: the ID of the flow in the backend """flow_data=FlowCreate(name=flow_name)response=awaitself._client.post("/flows/",json=flow_data.dict(json_compatible=True))flow_id=response.json().get("id")ifnotflow_id:raisehttpx.RequestError(f"Malformed response: {response}")# Return the id of the created flowreturnUUID(flow_id)asyncdefread_flow(self,flow_id:UUID)->Flow:""" Query the Prefect API for a flow by id. Args: flow_id: the flow ID of interest Returns: a [Flow model][prefect.client.schemas.objects.Flow] representation of the flow """response=awaitself._client.get(f"/flows/{flow_id}")returnFlow.parse_obj(response.json())asyncdefread_flows(self,*,flow_filter:FlowFilter=None,flow_run_filter:FlowRunFilter=None,task_run_filter:TaskRunFilter=None,deployment_filter:DeploymentFilter=None,work_pool_filter:WorkPoolFilter=None,work_queue_filter:WorkQueueFilter=None,sort:FlowSort=None,limit:int=None,offset:int=0,)->List[Flow]:""" Query the Prefect API for flows. Only flows matching all criteria will be returned. Args: flow_filter: filter criteria for flows flow_run_filter: filter criteria for flow runs task_run_filter: filter criteria for task runs deployment_filter: filter criteria for deployments work_pool_filter: filter criteria for work pools work_queue_filter: filter criteria for work pool queues sort: sort criteria for the flows limit: limit for the flow query offset: offset for the flow query Returns: a list of Flow model representations of the flows """body={"flows":flow_filter.dict(json_compatible=True)ifflow_filterelseNone,"flow_runs":(flow_run_filter.dict(json_compatible=True,exclude_unset=True)ifflow_run_filterelseNone),"task_runs":(task_run_filter.dict(json_compatible=True)iftask_run_filterelseNone),"deployments":(deployment_filter.dict(json_compatible=True)ifdeployment_filterelseNone),"work_pools":(work_pool_filter.dict(json_compatible=True)ifwork_pool_filterelseNone),"work_queues":(work_queue_filter.dict(json_compatible=True)ifwork_queue_filterelseNone),"sort":sort,"limit":limit,"offset":offset,}response=awaitself._client.post("/flows/filter",json=body)returnpydantic.parse_obj_as(List[Flow],response.json())asyncdefread_flow_by_name(self,flow_name:str,)->Flow:""" Query the Prefect API for a flow by name. Args: flow_name: the name of a flow Returns: a fully hydrated Flow model """response=awaitself._client.get(f"/flows/name/{flow_name}")returnFlow.parse_obj(response.json())asyncdefcreate_flow_run_from_deployment(self,deployment_id:UUID,*,parameters:Optional[Dict[str,Any]]=None,context:Optional[Dict[str,Any]]=None,state:prefect.states.State=None,name:str=None,tags:Iterable[str]=None,idempotency_key:str=None,parent_task_run_id:UUID=None,work_queue_name:str=None,job_variables:Optional[Dict[str,Any]]=None,)->FlowRun:""" Create a flow run for a deployment. Args: deployment_id: The deployment ID to create the flow run from parameters: Parameter overrides for this flow run. Merged with the deployment defaults context: Optional run context data state: The initial state for the run. If not provided, defaults to `Scheduled` for now. Should always be a `Scheduled` type. name: An optional name for the flow run. If not provided, the server will generate a name. tags: An optional iterable of tags to apply to the flow run; these tags are merged with the deployment's tags. idempotency_key: Optional idempotency key for creation of the flow run. If the key matches the key of an existing flow run, the existing run will be returned instead of creating a new one. parent_task_run_id: if a subflow run is being created, the placeholder task run identifier in the parent flow work_queue_name: An optional work queue name to add this run to. If not provided, will default to the deployment's set work queue. If one is provided that does not exist, a new work queue will be created within the deployment's work pool. job_variables: Optional variables that will be supplied to the flow run job. Raises: httpx.RequestError: if the Prefect API does not successfully create a run for any reason Returns: The flow run model """parameters=parametersor{}context=contextor{}state=stateorprefect.states.Scheduled()tags=tagsor[]flow_run_create=DeploymentFlowRunCreate(parameters=parameters,context=context,state=state.to_state_create(),tags=tags,name=name,idempotency_key=idempotency_key,parent_task_run_id=parent_task_run_id,job_variables=job_variables,)# done separately to avoid including this field in payloads sent to older API versionsifwork_queue_name:flow_run_create.work_queue_name=work_queue_nameresponse=awaitself._client.post(f"/deployments/{deployment_id}/create_flow_run",json=flow_run_create.dict(json_compatible=True,exclude_unset=True),)returnFlowRun.parse_obj(response.json())asyncdefcreate_flow_run(self,flow:"FlowObject",name:Optional[str]=None,parameters:Optional[Dict[str,Any]]=None,context:Optional[Dict[str,Any]]=None,tags:Optional[Iterable[str]]=None,parent_task_run_id:Optional[UUID]=None,state:Optional["prefect.states.State"]=None,)->FlowRun:""" Create a flow run for a flow. Args: flow: The flow model to create the flow run for name: An optional name for the flow run parameters: Parameter overrides for this flow run. context: Optional run context data tags: a list of tags to apply to this flow run parent_task_run_id: if a subflow run is being created, the placeholder task run identifier in the parent flow state: The initial state for the run. If not provided, defaults to `Scheduled` for now. Should always be a `Scheduled` type. Raises: httpx.RequestError: if the Prefect API does not successfully create a run for any reason Returns: The flow run model """parameters=parametersor{}context=contextor{}ifstateisNone:state=prefect.states.Pending()# Retrieve the flow idflow_id=awaitself.create_flow(flow)flow_run_create=FlowRunCreate(flow_id=flow_id,flow_version=flow.version,name=name,parameters=parameters,context=context,tags=list(tagsor[]),parent_task_run_id=parent_task_run_id,state=state.to_state_create(),empirical_policy=FlowRunPolicy(retries=flow.retries,retry_delay=flow.retry_delay_seconds,),)flow_run_create_json=flow_run_create.dict(json_compatible=True)response=awaitself._client.post("/flow_runs/",json=flow_run_create_json)flow_run=FlowRun.parse_obj(response.json())# Restore the parameters to the local objects to retain expectations about# Python objectsflow_run.parameters=parametersreturnflow_runasyncdefupdate_flow_run(self,flow_run_id:UUID,flow_version:Optional[str]=None,parameters:Optional[dict]=None,name:Optional[str]=None,tags:Optional[Iterable[str]]=None,empirical_policy:Optional[FlowRunPolicy]=None,infrastructure_pid:Optional[str]=None,job_variables:Optional[dict]=None,)->httpx.Response:""" Update a flow run's details. Args: flow_run_id: The identifier for the flow run to update. flow_version: A new version string for the flow run. parameters: A dictionary of parameter values for the flow run. This will not be merged with any existing parameters. name: A new name for the flow run. empirical_policy: A new flow run orchestration policy. This will not be merged with any existing policy. tags: An iterable of new tags for the flow run. These will not be merged with any existing tags. infrastructure_pid: The id of flow run as returned by an infrastructure block. Returns: an `httpx.Response` object from the PATCH request """params={}ifflow_versionisnotNone:params["flow_version"]=flow_versionifparametersisnotNone:params["parameters"]=parametersifnameisnotNone:params["name"]=nameiftagsisnotNone:params["tags"]=tagsifempirical_policyisnotNone:params["empirical_policy"]=empirical_policyifinfrastructure_pid:params["infrastructure_pid"]=infrastructure_pidifjob_variablesisnotNone:params["job_variables"]=job_variablesflow_run_data=FlowRunUpdate(**params)returnawaitself._client.patch(f"/flow_runs/{flow_run_id}",json=flow_run_data.dict(json_compatible=True,exclude_unset=True),)asyncdefdelete_flow_run(self,flow_run_id:UUID,)->None:""" Delete a flow run by UUID. Args: flow_run_id: The flow run UUID of interest. Raises: prefect.exceptions.ObjectNotFound: If request returns 404 httpx.RequestError: If requests fails """try:awaitself._client.delete(f"/flow_runs/{flow_run_id}")excepthttpx.HTTPStatusErrorase:ife.response.status_code==status.HTTP_404_NOT_FOUND:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raiseasyncdefcreate_concurrency_limit(self,tag:str,concurrency_limit:int,)->UUID:""" Create a tag concurrency limit in the Prefect API. These limits govern concurrently running tasks. Args: tag: a tag the concurrency limit is applied to concurrency_limit: the maximum number of concurrent task runs for a given tag Raises: httpx.RequestError: if the concurrency limit was not created for any reason Returns: the ID of the concurrency limit in the backend """concurrency_limit_create=ConcurrencyLimitCreate(tag=tag,concurrency_limit=concurrency_limit,)response=awaitself._client.post("/concurrency_limits/",json=concurrency_limit_create.dict(json_compatible=True),)concurrency_limit_id=response.json().get("id")ifnotconcurrency_limit_id:raisehttpx.RequestError(f"Malformed response: {response}")returnUUID(concurrency_limit_id)asyncdefread_concurrency_limit_by_tag(self,tag:str,):""" Read the concurrency limit set on a specific tag. Args: tag: a tag the concurrency limit is applied to Raises: prefect.exceptions.ObjectNotFound: If request returns 404 httpx.RequestError: if the concurrency limit was not created for any reason Returns: the concurrency limit set on a specific tag """try:response=awaitself._client.get(f"/concurrency_limits/tag/{tag}",)excepthttpx.HTTPStatusErrorase:ife.response.status_code==status.HTTP_404_NOT_FOUND:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raiseconcurrency_limit_id=response.json().get("id")ifnotconcurrency_limit_id:raisehttpx.RequestError(f"Malformed response: {response}")concurrency_limit=ConcurrencyLimit.parse_obj(response.json())returnconcurrency_limitasyncdefread_concurrency_limits(self,limit:int,offset:int,):""" Lists concurrency limits set on task run tags. Args: limit: the maximum number of concurrency limits returned offset: the concurrency limit query offset Returns: a list of concurrency limits """body={"limit":limit,"offset":offset,}response=awaitself._client.post("/concurrency_limits/filter",json=body)returnpydantic.parse_obj_as(List[ConcurrencyLimit],response.json())asyncdefreset_concurrency_limit_by_tag(self,tag:str,slot_override:Optional[List[Union[UUID,str]]]=None,):""" Resets the concurrency limit slots set on a specific tag. Args: tag: a tag the concurrency limit is applied to slot_override: a list of task run IDs that are currently using a concurrency slot, please check that any task run IDs included in `slot_override` are currently running, otherwise those concurrency slots will never be released. Raises: prefect.exceptions.ObjectNotFound: If request returns 404 httpx.RequestError: If request fails """ifslot_overrideisnotNone:slot_override=[str(slot)forslotinslot_override]try:awaitself._client.post(f"/concurrency_limits/tag/{tag}/reset",json=dict(slot_override=slot_override),)excepthttpx.HTTPStatusErrorase:ife.response.status_code==status.HTTP_404_NOT_FOUND:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raiseasyncdefdelete_concurrency_limit_by_tag(self,tag:str,):""" Delete the concurrency limit set on a specific tag. Args: tag: a tag the concurrency limit is applied to Raises: prefect.exceptions.ObjectNotFound: If request returns 404 httpx.RequestError: If request fails """try:awaitself._client.delete(f"/concurrency_limits/tag/{tag}",)excepthttpx.HTTPStatusErrorase:ife.response.status_code==status.HTTP_404_NOT_FOUND:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raiseasyncdefcreate_work_queue(self,name:str,tags:Optional[List[str]]=None,description:Optional[str]=None,is_paused:Optional[bool]=None,concurrency_limit:Optional[int]=None,priority:Optional[int]=None,work_pool_name:Optional[str]=None,)->WorkQueue:""" Create a work queue. Args: name: a unique name for the work queue tags: DEPRECATED: an optional list of tags to filter on; only work scheduled with these tags will be included in the queue. This option will be removed on 2023-02-23. description: An optional description for the work queue. is_paused: Whether or not the work queue is paused. concurrency_limit: An optional concurrency limit for the work queue. priority: The queue's priority. Lower values are higher priority (1 is the highest). work_pool_name: The name of the work pool to use for this queue. Raises: prefect.exceptions.ObjectAlreadyExists: If request returns 409 httpx.RequestError: If request fails Returns: The created work queue """iftags:warnings.warn(("The use of tags for creating work queue filters is deprecated."" This option will be removed on 2023-02-23."),DeprecationWarning,)filter=QueueFilter(tags=tags)else:filter=Nonecreate_model=WorkQueueCreate(name=name,filter=filter)ifdescriptionisnotNone:create_model.description=descriptionifis_pausedisnotNone:create_model.is_paused=is_pausedifconcurrency_limitisnotNone:create_model.concurrency_limit=concurrency_limitifpriorityisnotNone:create_model.priority=prioritydata=create_model.dict(json_compatible=True)try:ifwork_pool_nameisnotNone:response=awaitself._client.post(f"/work_pools/{work_pool_name}/queues",json=data)else:response=awaitself._client.post("/work_queues/",json=data)excepthttpx.HTTPStatusErrorase:ife.response.status_code==status.HTTP_409_CONFLICT:raiseprefect.exceptions.ObjectAlreadyExists(http_exc=e)fromeelife.response.status_code==status.HTTP_404_NOT_FOUND:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raisereturnWorkQueue.parse_obj(response.json())asyncdefread_work_queue_by_name(self,name:str,work_pool_name:Optional[str]=None,)->WorkQueue:""" Read a work queue by name. Args: name (str): a unique name for the work queue work_pool_name (str, optional): the name of the work pool the queue belongs to. Raises: prefect.exceptions.ObjectNotFound: if no work queue is found httpx.HTTPStatusError: other status errors Returns: WorkQueue: a work queue API object """try:ifwork_pool_nameisnotNone:response=awaitself._client.get(f"/work_pools/{work_pool_name}/queues/{name}")else:response=awaitself._client.get(f"/work_queues/name/{name}")excepthttpx.HTTPStatusErrorase:ife.response.status_code==status.HTTP_404_NOT_FOUND:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raisereturnWorkQueue.parse_obj(response.json())asyncdefupdate_work_queue(self,id:UUID,**kwargs):""" Update properties of a work queue. Args: id: the ID of the work queue to update **kwargs: the fields to update Raises: ValueError: if no kwargs are provided prefect.exceptions.ObjectNotFound: if request returns 404 httpx.RequestError: if the request fails """ifnotkwargs:raiseValueError("No fields provided to update.")data=WorkQueueUpdate(**kwargs).dict(json_compatible=True,exclude_unset=True)try:awaitself._client.patch(f"/work_queues/{id}",json=data)excepthttpx.HTTPStatusErrorase:ife.response.status_code==status.HTTP_404_NOT_FOUND:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raiseasyncdefget_runs_in_work_queue(self,id:UUID,limit:int=10,scheduled_before:datetime.datetime=None,)->List[FlowRun]:""" Read flow runs off a work queue. Args: id: the id of the work queue to read from limit: a limit on the number of runs to return scheduled_before: a timestamp; only runs scheduled before this time will be returned. Defaults to now. Raises: prefect.exceptions.ObjectNotFound: If request returns 404 httpx.RequestError: If request fails Returns: List[FlowRun]: a list of FlowRun objects read from the queue """ifscheduled_beforeisNone:scheduled_before=pendulum.now("UTC")try:response=awaitself._client.post(f"/work_queues/{id}/get_runs",json={"limit":limit,"scheduled_before":scheduled_before.isoformat(),},)excepthttpx.HTTPStatusErrorase:ife.response.status_code==status.HTTP_404_NOT_FOUND:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raisereturnpydantic.parse_obj_as(List[FlowRun],response.json())asyncdefread_work_queue(self,id:UUID,)->WorkQueue:""" Read a work queue. Args: id: the id of the work queue to load Raises: prefect.exceptions.ObjectNotFound: If request returns 404 httpx.RequestError: If request fails Returns: WorkQueue: an instantiated WorkQueue object """try:response=awaitself._client.get(f"/work_queues/{id}")excepthttpx.HTTPStatusErrorase:ife.response.status_code==status.HTTP_404_NOT_FOUND:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raisereturnWorkQueue.parse_obj(response.json())asyncdefread_work_queue_status(self,id:UUID,)->WorkQueueStatusDetail:""" Read a work queue status. Args: id: the id of the work queue to load Raises: prefect.exceptions.ObjectNotFound: If request returns 404 httpx.RequestError: If request fails Returns: WorkQueueStatus: an instantiated WorkQueueStatus object """try:response=awaitself._client.get(f"/work_queues/{id}/status")excepthttpx.HTTPStatusErrorase:ife.response.status_code==status.HTTP_404_NOT_FOUND:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raisereturnWorkQueueStatusDetail.parse_obj(response.json())asyncdefmatch_work_queues(self,prefixes:List[str],work_pool_name:Optional[str]=None,)->List[WorkQueue]:""" Query the Prefect API for work queues with names with a specific prefix. Args: prefixes: a list of strings used to match work queue name prefixes work_pool_name: an optional work pool name to scope the query to Returns: a list of WorkQueue model representations of the work queues """page_length=100current_page=0work_queues=[]whileTrue:new_queues=awaitself.read_work_queues(work_pool_name=work_pool_name,offset=current_page*page_length,limit=page_length,work_queue_filter=WorkQueueFilter(name=WorkQueueFilterName(startswith_=prefixes)),)ifnotnew_queues:breakwork_queues+=new_queuescurrent_page+=1returnwork_queuesasyncdefdelete_work_queue_by_id(self,id:UUID,):""" Delete a work queue by its ID. Args: id: the id of the work queue to delete Raises: prefect.exceptions.ObjectNotFound: If request returns 404 httpx.RequestError: If requests fails """try:awaitself._client.delete(f"/work_queues/{id}",)excepthttpx.HTTPStatusErrorase:ife.response.status_code==status.HTTP_404_NOT_FOUND:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raiseasyncdefcreate_block_type(self,block_type:BlockTypeCreate)->BlockType:""" Create a block type in the Prefect API. """try:response=awaitself._client.post("/block_types/",json=block_type.dict(json_compatible=True,exclude_unset=True,exclude={"id"}),)excepthttpx.HTTPStatusErrorase:ife.response.status_code==status.HTTP_409_CONFLICT:raiseprefect.exceptions.ObjectAlreadyExists(http_exc=e)fromeelse:raisereturnBlockType.parse_obj(response.json())asyncdefcreate_block_schema(self,block_schema:BlockSchemaCreate)->BlockSchema:""" Create a block schema in the Prefect API. """try:response=awaitself._client.post("/block_schemas/",json=block_schema.dict(json_compatible=True,exclude_unset=True,exclude={"id","block_type","checksum"},),)excepthttpx.HTTPStatusErrorase:ife.response.status_code==status.HTTP_409_CONFLICT:raiseprefect.exceptions.ObjectAlreadyExists(http_exc=e)fromeelse:raisereturnBlockSchema.parse_obj(response.json())asyncdefcreate_block_document(self,block_document:Union[BlockDocument,BlockDocumentCreate],include_secrets:bool=True,)->BlockDocument:""" Create a block document in the Prefect API. This data is used to configure a corresponding Block. Args: include_secrets (bool): whether to include secret values on the stored Block, corresponding to Pydantic's `SecretStr` and `SecretBytes` fields. Note Blocks may not work as expected if this is set to `False`. """ifisinstance(block_document,BlockDocument):block_document=BlockDocumentCreate.parse_obj(block_document.dict(json_compatible=True,include_secrets=include_secrets,exclude_unset=True,exclude={"id","block_schema","block_type"},),)try:response=awaitself._client.post("/block_documents/",json=block_document.dict(json_compatible=True,include_secrets=include_secrets,exclude_unset=True,exclude={"id","block_schema","block_type"},),)excepthttpx.HTTPStatusErrorase:ife.response.status_code==status.HTTP_409_CONFLICT:raiseprefect.exceptions.ObjectAlreadyExists(http_exc=e)fromeelse:raisereturnBlockDocument.parse_obj(response.json())asyncdefupdate_block_document(self,block_document_id:UUID,block_document:BlockDocumentUpdate,):""" Update a block document in the Prefect API. """try:awaitself._client.patch(f"/block_documents/{block_document_id}",json=block_document.dict(json_compatible=True,exclude_unset=True,include={"data","merge_existing_data","block_schema_id"},include_secrets=True,),)excepthttpx.HTTPStatusErrorase:ife.response.status_code==status.HTTP_404_NOT_FOUND:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raiseasyncdefdelete_block_document(self,block_document_id:UUID):""" Delete a block document. """try:awaitself._client.delete(f"/block_documents/{block_document_id}")excepthttpx.HTTPStatusErrorase:ife.response.status_code==404:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raiseasyncdefread_block_type_by_slug(self,slug:str)->BlockType:""" Read a block type by its slug. """try:response=awaitself._client.get(f"/block_types/slug/{slug}")excepthttpx.HTTPStatusErrorase:ife.response.status_code==status.HTTP_404_NOT_FOUND:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raisereturnBlockType.parse_obj(response.json())asyncdefread_block_schema_by_checksum(self,checksum:str,version:Optional[str]=None)->BlockSchema:""" Look up a block schema checksum """try:url=f"/block_schemas/checksum/{checksum}"ifversionisnotNone:url=f"{url}?version={version}"response=awaitself._client.get(url)excepthttpx.HTTPStatusErrorase:ife.response.status_code==status.HTTP_404_NOT_FOUND:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raisereturnBlockSchema.parse_obj(response.json())asyncdefupdate_block_type(self,block_type_id:UUID,block_type:BlockTypeUpdate):""" Update a block document in the Prefect API. """try:awaitself._client.patch(f"/block_types/{block_type_id}",json=block_type.dict(json_compatible=True,exclude_unset=True,include=BlockTypeUpdate.updatable_fields(),include_secrets=True,),)excepthttpx.HTTPStatusErrorase:ife.response.status_code==status.HTTP_404_NOT_FOUND:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raiseasyncdefdelete_block_type(self,block_type_id:UUID):""" Delete a block type. """try:awaitself._client.delete(f"/block_types/{block_type_id}")excepthttpx.HTTPStatusErrorase:ife.response.status_code==404:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelif(e.response.status_code==status.HTTP_403_FORBIDDENande.response.json()["detail"]=="protected block types cannot be deleted."):raiseprefect.exceptions.ProtectedBlockError("Protected block types cannot be deleted.")fromeelse:raiseasyncdefread_block_types(self)->List[BlockType]:""" Read all block types Raises: httpx.RequestError: if the block types were not found Returns: List of BlockTypes. """response=awaitself._client.post("/block_types/filter",json={})returnpydantic.parse_obj_as(List[BlockType],response.json())asyncdefread_block_schemas(self)->List[BlockSchema]:""" Read all block schemas Raises: httpx.RequestError: if a valid block schema was not found Returns: A BlockSchema. """response=awaitself._client.post("/block_schemas/filter",json={})returnpydantic.parse_obj_as(List[BlockSchema],response.json())asyncdefget_most_recent_block_schema_for_block_type(self,block_type_id:UUID,)->Optional[BlockSchema]:""" Fetches the most recent block schema for a specified block type ID. Args: block_type_id: The ID of the block type. Raises: httpx.RequestError: If the request fails for any reason. Returns: The most recent block schema or None. """try:response=awaitself._client.post("/block_schemas/filter",json={"block_schemas":{"block_type_id":{"any_":[str(block_type_id)]}},"limit":1,},)excepthttpx.HTTPStatusError:raisereturnBlockSchema.parse_obj(response.json()[0])ifresponse.json()elseNoneasyncdefread_block_document(self,block_document_id:UUID,include_secrets:bool=True,):""" Read the block document with the specified ID. Args: block_document_id: the block document id include_secrets (bool): whether to include secret values on the Block, corresponding to Pydantic's `SecretStr` and `SecretBytes` fields. These fields are automatically obfuscated by Pydantic, but users can additionally choose not to receive their values from the API. Note that any business logic on the Block may not work if this is `False`. Raises: httpx.RequestError: if the block document was not found for any reason Returns: A block document or None. """assert(block_document_idisnotNone),"Unexpected ID on block document. Was it persisted?"try:response=awaitself._client.get(f"/block_documents/{block_document_id}",params=dict(include_secrets=include_secrets),)excepthttpx.HTTPStatusErrorase:ife.response.status_code==status.HTTP_404_NOT_FOUND:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raisereturnBlockDocument.parse_obj(response.json())asyncdefread_block_document_by_name(self,name:str,block_type_slug:str,include_secrets:bool=True,)->BlockDocument:""" Read the block document with the specified name that corresponds to a specific block type name. Args: name: The block document name. block_type_slug: The block type slug. include_secrets (bool): whether to include secret values on the Block, corresponding to Pydantic's `SecretStr` and `SecretBytes` fields. These fields are automatically obfuscated by Pydantic, but users can additionally choose not to receive their values from the API. Note that any business logic on the Block may not work if this is `False`. Raises: httpx.RequestError: if the block document was not found for any reason Returns: A block document or None. """try:response=awaitself._client.get(f"/block_types/slug/{block_type_slug}/block_documents/name/{name}",params=dict(include_secrets=include_secrets),)excepthttpx.HTTPStatusErrorase:ife.response.status_code==status.HTTP_404_NOT_FOUND:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raisereturnBlockDocument.parse_obj(response.json())asyncdefread_block_documents(self,block_schema_type:Optional[str]=None,offset:Optional[int]=None,limit:Optional[int]=None,include_secrets:bool=True,):""" Read block documents Args: block_schema_type: an optional block schema type offset: an offset limit: the number of blocks to return include_secrets (bool): whether to include secret values on the Block, corresponding to Pydantic's `SecretStr` and `SecretBytes` fields. These fields are automatically obfuscated by Pydantic, but users can additionally choose not to receive their values from the API. Note that any business logic on the Block may not work if this is `False`. Returns: A list of block documents """response=awaitself._client.post("/block_documents/filter",json=dict(block_schema_type=block_schema_type,offset=offset,limit=limit,include_secrets=include_secrets,),)returnpydantic.parse_obj_as(List[BlockDocument],response.json())asyncdefread_block_documents_by_type(self,block_type_slug:str,offset:Optional[int]=None,limit:Optional[int]=None,include_secrets:bool=True,)->List[BlockDocument]:"""Retrieve block documents by block type slug. Args: block_type_slug: The block type slug. offset: an offset limit: the number of blocks to return include_secrets: whether to include secret values Returns: A list of block documents """response=awaitself._client.get(f"/block_types/slug/{block_type_slug}/block_documents",params=dict(offset=offset,limit=limit,include_secrets=include_secrets,),)returnpydantic.parse_obj_as(List[BlockDocument],response.json())asyncdefcreate_deployment(self,flow_id:UUID,name:str,version:str=None,schedule:SCHEDULE_TYPES=None,schedules:List[DeploymentScheduleCreate]=None,parameters:Optional[Dict[str,Any]]=None,description:str=None,work_queue_name:str=None,work_pool_name:str=None,tags:List[str]=None,storage_document_id:UUID=None,manifest_path:str=None,path:str=None,entrypoint:str=None,infrastructure_document_id:UUID=None,infra_overrides:Optional[Dict[str,Any]]=None,# for backwards compatparameter_openapi_schema:Optional[Dict[str,Any]]=None,is_schedule_active:Optional[bool]=None,paused:Optional[bool]=None,pull_steps:Optional[List[dict]]=None,enforce_parameter_schema:Optional[bool]=None,job_variables:Optional[Dict[str,Any]]=None,)->UUID:""" Create a deployment. Args: flow_id: the flow ID to create a deployment for name: the name of the deployment version: an optional version string for the deployment schedule: an optional schedule to apply to the deployment tags: an optional list of tags to apply to the deployment storage_document_id: an reference to the storage block document used for the deployed flow infrastructure_document_id: an reference to the infrastructure block document to use for this deployment job_variables: A dictionary of dot delimited infrastructure overrides that will be applied at runtime; for example `env.CONFIG_KEY=config_value` or `namespace='prefect'`. This argument was previously named `infra_overrides`. Both arguments are supported for backwards compatibility. Raises: httpx.RequestError: if the deployment was not created for any reason Returns: the ID of the deployment in the backend """jv=handle_deprecated_infra_overrides_parameter(job_variables,infra_overrides)deployment_create=DeploymentCreate(flow_id=flow_id,name=name,version=version,parameters=dict(parametersor{}),tags=list(tagsor[]),work_queue_name=work_queue_name,description=description,storage_document_id=storage_document_id,path=path,entrypoint=entrypoint,manifest_path=manifest_path,# for backwards compatinfrastructure_document_id=infrastructure_document_id,job_variables=jv,parameter_openapi_schema=parameter_openapi_schema,is_schedule_active=is_schedule_active,paused=paused,schedule=schedule,schedules=schedulesor[],pull_steps=pull_steps,enforce_parameter_schema=enforce_parameter_schema,)ifwork_pool_nameisnotNone:deployment_create.work_pool_name=work_pool_name# Exclude newer fields that are not set to avoid compatibility issuesexclude={fieldforfieldin["work_pool_name","work_queue_name"]iffieldnotindeployment_create.__fields_set__}ifdeployment_create.is_schedule_activeisNone:exclude.add("is_schedule_active")ifdeployment_create.pausedisNone:exclude.add("paused")ifdeployment_create.pull_stepsisNone:exclude.add("pull_steps")ifdeployment_create.enforce_parameter_schemaisNone:exclude.add("enforce_parameter_schema")json=deployment_create.dict(json_compatible=True,exclude=exclude)response=awaitself._client.post("/deployments/",json=json,)deployment_id=response.json().get("id")ifnotdeployment_id:raisehttpx.RequestError(f"Malformed response: {response}")returnUUID(deployment_id)asyncdefupdate_schedule(self,deployment_id:UUID,active:bool=True):path="set_schedule_active"ifactiveelse"set_schedule_inactive"awaitself._client.post(f"/deployments/{deployment_id}/{path}",)asyncdefset_deployment_paused_state(self,deployment_id:UUID,paused:bool):awaitself._client.patch(f"/deployments/{deployment_id}",json={"paused":paused})asyncdefupdate_deployment(self,deployment:Deployment,schedule:SCHEDULE_TYPES=None,is_schedule_active:bool=None,):deployment_update=DeploymentUpdate(version=deployment.version,schedule=scheduleifscheduleisnotNoneelsedeployment.schedule,is_schedule_active=(is_schedule_activeifis_schedule_activeisnotNoneelsedeployment.is_schedule_active),description=deployment.description,work_queue_name=deployment.work_queue_name,tags=deployment.tags,manifest_path=deployment.manifest_path,path=deployment.path,entrypoint=deployment.entrypoint,parameters=deployment.parameters,storage_document_id=deployment.storage_document_id,infrastructure_document_id=deployment.infrastructure_document_id,job_variables=deployment.job_variables,enforce_parameter_schema=deployment.enforce_parameter_schema,)ifgetattr(deployment,"work_pool_name",None)isnotNone:deployment_update.work_pool_name=deployment.work_pool_nameexclude=set()ifdeployment.enforce_parameter_schemaisNone:exclude.add("enforce_parameter_schema")awaitself._client.patch(f"/deployments/{deployment.id}",json=deployment_update.dict(json_compatible=True,exclude=exclude),)asyncdef_create_deployment_from_schema(self,schema:DeploymentCreate)->UUID:""" Create a deployment from a prepared `DeploymentCreate` schema. """# TODO: We are likely to remove this method once we have considered the# packaging interface for deployments further.response=awaitself._client.post("/deployments/",json=schema.dict(json_compatible=True))deployment_id=response.json().get("id")ifnotdeployment_id:raisehttpx.RequestError(f"Malformed response: {response}")returnUUID(deployment_id)asyncdefread_deployment(self,deployment_id:UUID,)->DeploymentResponse:""" Query the Prefect API for a deployment by id. Args: deployment_id: the deployment ID of interest Returns: a [Deployment model][prefect.client.schemas.objects.Deployment] representation of the deployment """try:response=awaitself._client.get(f"/deployments/{deployment_id}")excepthttpx.HTTPStatusErrorase:ife.response.status_code==status.HTTP_404_NOT_FOUND:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raisereturnDeploymentResponse.parse_obj(response.json())asyncdefread_deployment_by_name(self,name:str,)->DeploymentResponse:""" Query the Prefect API for a deployment by name. Args: name: A deployed flow's name: <FLOW_NAME>/<DEPLOYMENT_NAME> Raises: prefect.exceptions.ObjectNotFound: If request returns 404 httpx.RequestError: If request fails Returns: a Deployment model representation of the deployment """try:response=awaitself._client.get(f"/deployments/name/{name}")excepthttpx.HTTPStatusErrorase:ife.response.status_code==status.HTTP_404_NOT_FOUND:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raisereturnDeploymentResponse.parse_obj(response.json())asyncdefread_deployments(self,*,flow_filter:FlowFilter=None,flow_run_filter:FlowRunFilter=None,task_run_filter:TaskRunFilter=None,deployment_filter:DeploymentFilter=None,work_pool_filter:WorkPoolFilter=None,work_queue_filter:WorkQueueFilter=None,limit:int=None,sort:DeploymentSort=None,offset:int=0,)->List[DeploymentResponse]:""" Query the Prefect API for deployments. Only deployments matching all the provided criteria will be returned. Args: flow_filter: filter criteria for flows flow_run_filter: filter criteria for flow runs task_run_filter: filter criteria for task runs deployment_filter: filter criteria for deployments work_pool_filter: filter criteria for work pools work_queue_filter: filter criteria for work pool queues limit: a limit for the deployment query offset: an offset for the deployment query Returns: a list of Deployment model representations of the deployments """body={"flows":flow_filter.dict(json_compatible=True)ifflow_filterelseNone,"flow_runs":(flow_run_filter.dict(json_compatible=True,exclude_unset=True)ifflow_run_filterelseNone),"task_runs":(task_run_filter.dict(json_compatible=True)iftask_run_filterelseNone),"deployments":(deployment_filter.dict(json_compatible=True)ifdeployment_filterelseNone),"work_pools":(work_pool_filter.dict(json_compatible=True)ifwork_pool_filterelseNone),"work_pool_queues":(work_queue_filter.dict(json_compatible=True)ifwork_queue_filterelseNone),"limit":limit,"offset":offset,"sort":sort,}response=awaitself._client.post("/deployments/filter",json=body)returnpydantic.parse_obj_as(List[DeploymentResponse],response.json())asyncdefdelete_deployment(self,deployment_id:UUID,):""" Delete deployment by id. Args: deployment_id: The deployment id of interest. Raises: prefect.exceptions.ObjectNotFound: If request returns 404 httpx.RequestError: If requests fails """try:awaitself._client.delete(f"/deployments/{deployment_id}")excepthttpx.HTTPStatusErrorase:ife.response.status_code==404:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raiseasyncdefcreate_deployment_schedules(self,deployment_id:UUID,schedules:List[Tuple[SCHEDULE_TYPES,bool]],)->List[DeploymentSchedule]:""" Create deployment schedules. Args: deployment_id: the deployment ID schedules: a list of tuples containing the schedule to create and whether or not it should be active. Raises: httpx.RequestError: if the schedules were not created for any reason Returns: the list of schedules created in the backend """deployment_schedule_create=[DeploymentScheduleCreate(schedule=schedule[0],active=schedule[1])forscheduleinschedules]json=[deployment_schedule_create.dict(json_compatible=True)fordeployment_schedule_createindeployment_schedule_create]response=awaitself._client.post(f"/deployments/{deployment_id}/schedules",json=json)returnpydantic.parse_obj_as(List[DeploymentSchedule],response.json())asyncdefread_deployment_schedules(self,deployment_id:UUID,)->List[DeploymentSchedule]:""" Query the Prefect API for a deployment's schedules. Args: deployment_id: the deployment ID Returns: a list of DeploymentSchedule model representations of the deployment schedules """try:response=awaitself._client.get(f"/deployments/{deployment_id}/schedules")excepthttpx.HTTPStatusErrorase:ife.response.status_code==status.HTTP_404_NOT_FOUND:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raisereturnpydantic.parse_obj_as(List[DeploymentSchedule],response.json())asyncdefupdate_deployment_schedule(self,deployment_id:UUID,schedule_id:UUID,active:Optional[bool]=None,schedule:Optional[SCHEDULE_TYPES]=None,):""" Update a deployment schedule by ID. Args: deployment_id: the deployment ID schedule_id: the deployment schedule ID of interest active: whether or not the schedule should be active schedule: the cron, rrule, or interval schedule this deployment schedule should use """kwargs={}ifactiveisnotNone:kwargs["active"]=activeifscheduleisnotNone:kwargs["schedule"]=scheduledeployment_schedule_update=DeploymentScheduleUpdate(**kwargs)json=deployment_schedule_update.dict(json_compatible=True,exclude_unset=True)try:awaitself._client.patch(f"/deployments/{deployment_id}/schedules/{schedule_id}",json=json)excepthttpx.HTTPStatusErrorase:ife.response.status_code==status.HTTP_404_NOT_FOUND:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raiseasyncdefdelete_deployment_schedule(self,deployment_id:UUID,schedule_id:UUID,)->None:""" Delete a deployment schedule. Args: deployment_id: the deployment ID schedule_id: the ID of the deployment schedule to delete. Raises: httpx.RequestError: if the schedules were not deleted for any reason """try:awaitself._client.delete(f"/deployments/{deployment_id}/schedules/{schedule_id}")excepthttpx.HTTPStatusErrorase:ife.response.status_code==404:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raiseasyncdefread_flow_run(self,flow_run_id:UUID)->FlowRun:""" Query the Prefect API for a flow run by id. Args: flow_run_id: the flow run ID of interest Returns: a Flow Run model representation of the flow run """try:response=awaitself._client.get(f"/flow_runs/{flow_run_id}")excepthttpx.HTTPStatusErrorase:ife.response.status_code==404:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raisereturnFlowRun.parse_obj(response.json())asyncdefresume_flow_run(self,flow_run_id:UUID,run_input:Optional[Dict]=None)->OrchestrationResult:""" Resumes a paused flow run. Args: flow_run_id: the flow run ID of interest run_input: the input to resume the flow run with Returns: an OrchestrationResult model representation of state orchestration output """try:response=awaitself._client.post(f"/flow_runs/{flow_run_id}/resume",json={"run_input":run_input})excepthttpx.HTTPStatusError:raisereturnOrchestrationResult.parse_obj(response.json())asyncdefread_flow_runs(self,*,flow_filter:FlowFilter=None,flow_run_filter:FlowRunFilter=None,task_run_filter:TaskRunFilter=None,deployment_filter:DeploymentFilter=None,work_pool_filter:WorkPoolFilter=None,work_queue_filter:WorkQueueFilter=None,sort:FlowRunSort=None,limit:int=None,offset:int=0,)->List[FlowRun]:""" Query the Prefect API for flow runs. Only flow runs matching all criteria will be returned. Args: flow_filter: filter criteria for flows flow_run_filter: filter criteria for flow runs task_run_filter: filter criteria for task runs deployment_filter: filter criteria for deployments work_pool_filter: filter criteria for work pools work_queue_filter: filter criteria for work pool queues sort: sort criteria for the flow runs limit: limit for the flow run query offset: offset for the flow run query Returns: a list of Flow Run model representations of the flow runs """body={"flows":flow_filter.dict(json_compatible=True)ifflow_filterelseNone,"flow_runs":(flow_run_filter.dict(json_compatible=True,exclude_unset=True)ifflow_run_filterelseNone),"task_runs":(task_run_filter.dict(json_compatible=True)iftask_run_filterelseNone),"deployments":(deployment_filter.dict(json_compatible=True)ifdeployment_filterelseNone),"work_pools":(work_pool_filter.dict(json_compatible=True)ifwork_pool_filterelseNone),"work_pool_queues":(work_queue_filter.dict(json_compatible=True)ifwork_queue_filterelseNone),"sort":sort,"limit":limit,"offset":offset,}response=awaitself._client.post("/flow_runs/filter",json=body)returnpydantic.parse_obj_as(List[FlowRun],response.json())asyncdefset_flow_run_state(self,flow_run_id:UUID,state:"prefect.states.State",force:bool=False,)->OrchestrationResult:""" Set the state of a flow run. Args: flow_run_id: the id of the flow run state: the state to set force: if True, disregard orchestration logic when setting the state, forcing the Prefect API to accept the state Returns: an OrchestrationResult model representation of state orchestration output """state_create=state.to_state_create()state_create.state_details.flow_run_id=flow_run_idstate_create.state_details.transition_id=uuid4()try:response=awaitself._client.post(f"/flow_runs/{flow_run_id}/set_state",json=dict(state=state_create.dict(json_compatible=True),force=force),)excepthttpx.HTTPStatusErrorase:ife.response.status_code==status.HTTP_404_NOT_FOUND:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raisereturnOrchestrationResult.parse_obj(response.json())asyncdefread_flow_run_states(self,flow_run_id:UUID)->List[prefect.states.State]:""" Query for the states of a flow run Args: flow_run_id: the id of the flow run Returns: a list of State model representations of the flow run states """response=awaitself._client.get("/flow_run_states/",params=dict(flow_run_id=str(flow_run_id)))returnpydantic.parse_obj_as(List[prefect.states.State],response.json())asyncdefset_task_run_name(self,task_run_id:UUID,name:str):task_run_data=TaskRunUpdate(name=name)returnawaitself._client.patch(f"/task_runs/{task_run_id}",json=task_run_data.dict(json_compatible=True,exclude_unset=True),)asyncdefcreate_task_run(self,task:"TaskObject[P, R]",flow_run_id:Optional[UUID],dynamic_key:str,name:Optional[str]=None,extra_tags:Optional[Iterable[str]]=None,state:Optional[prefect.states.State[R]]=None,task_inputs:Optional[Dict[str,List[Union[TaskRunResult,Parameter,Constant,]],]]=None,)->TaskRun:""" Create a task run Args: task: The Task to run flow_run_id: The flow run id with which to associate the task run dynamic_key: A key unique to this particular run of a Task within the flow name: An optional name for the task run extra_tags: an optional list of extra tags to apply to the task run in addition to `task.tags` state: The initial state for the run. If not provided, defaults to `Pending` for now. Should always be a `Scheduled` type. task_inputs: the set of inputs passed to the task Returns: The created task run. """tags=set(task.tags).union(extra_tagsor[])ifstateisNone:state=prefect.states.Pending()task_run_data=TaskRunCreate(name=name,flow_run_id=flow_run_id,task_key=task.task_key,dynamic_key=dynamic_key,tags=list(tags),task_version=task.version,empirical_policy=TaskRunPolicy(retries=task.retries,retry_delay=task.retry_delay_seconds,retry_jitter_factor=task.retry_jitter_factor,),state=state.to_state_create(),task_inputs=task_inputsor{},)response=awaitself._client.post("/task_runs/",json=task_run_data.dict(json_compatible=True))returnTaskRun.parse_obj(response.json())asyncdefread_task_run(self,task_run_id:UUID)->TaskRun:""" Query the Prefect API for a task run by id. Args: task_run_id: the task run ID of interest Returns: a Task Run model representation of the task run """response=awaitself._client.get(f"/task_runs/{task_run_id}")returnTaskRun.parse_obj(response.json())asyncdefread_task_runs(self,*,flow_filter:FlowFilter=None,flow_run_filter:FlowRunFilter=None,task_run_filter:TaskRunFilter=None,deployment_filter:DeploymentFilter=None,sort:TaskRunSort=None,limit:int=None,offset:int=0,)->List[TaskRun]:""" Query the Prefect API for task runs. Only task runs matching all criteria will be returned. Args: flow_filter: filter criteria for flows flow_run_filter: filter criteria for flow runs task_run_filter: filter criteria for task runs deployment_filter: filter criteria for deployments sort: sort criteria for the task runs limit: a limit for the task run query offset: an offset for the task run query Returns: a list of Task Run model representations of the task runs """body={"flows":flow_filter.dict(json_compatible=True)ifflow_filterelseNone,"flow_runs":(flow_run_filter.dict(json_compatible=True,exclude_unset=True)ifflow_run_filterelseNone),"task_runs":(task_run_filter.dict(json_compatible=True)iftask_run_filterelseNone),"deployments":(deployment_filter.dict(json_compatible=True)ifdeployment_filterelseNone),"sort":sort,"limit":limit,"offset":offset,}response=awaitself._client.post("/task_runs/filter",json=body)returnpydantic.parse_obj_as(List[TaskRun],response.json())asyncdefdelete_task_run(self,task_run_id:UUID)->None:""" Delete a task run by id. Args: task_run_id: the task run ID of interest Raises: prefect.exceptions.ObjectNotFound: If request returns 404 httpx.RequestError: If requests fails """try:awaitself._client.delete(f"/task_runs/{task_run_id}")excepthttpx.HTTPStatusErrorase:ife.response.status_code==404:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raiseasyncdefset_task_run_state(self,task_run_id:UUID,state:prefect.states.State,force:bool=False,)->OrchestrationResult:""" Set the state of a task run. Args: task_run_id: the id of the task run state: the state to set force: if True, disregard orchestration logic when setting the state, forcing the Prefect API to accept the state Returns: an OrchestrationResult model representation of state orchestration output """state_create=state.to_state_create()state_create.state_details.task_run_id=task_run_idresponse=awaitself._client.post(f"/task_runs/{task_run_id}/set_state",json=dict(state=state_create.dict(json_compatible=True),force=force),)returnOrchestrationResult.parse_obj(response.json())asyncdefread_task_run_states(self,task_run_id:UUID)->List[prefect.states.State]:""" Query for the states of a task run Args: task_run_id: the id of the task run Returns: a list of State model representations of the task run states """response=awaitself._client.get("/task_run_states/",params=dict(task_run_id=str(task_run_id)))returnpydantic.parse_obj_as(List[prefect.states.State],response.json())asyncdefcreate_logs(self,logs:Iterable[Union[LogCreate,dict]])->None:""" Create logs for a flow or task run Args: logs: An iterable of `LogCreate` objects or already json-compatible dicts """serialized_logs=[log.dict(json_compatible=True)ifisinstance(log,LogCreate)elselogforloginlogs]awaitself._client.post("/logs/",json=serialized_logs)asyncdefcreate_flow_run_notification_policy(self,block_document_id:UUID,is_active:bool=True,tags:List[str]=None,state_names:List[str]=None,message_template:Optional[str]=None,)->UUID:""" Create a notification policy for flow runs Args: block_document_id: The block document UUID is_active: Whether the notification policy is active tags: List of flow tags state_names: List of state names message_template: Notification message template """iftagsisNone:tags=[]ifstate_namesisNone:state_names=[]policy=FlowRunNotificationPolicyCreate(block_document_id=block_document_id,is_active=is_active,tags=tags,state_names=state_names,message_template=message_template,)response=awaitself._client.post("/flow_run_notification_policies/",json=policy.dict(json_compatible=True),)policy_id=response.json().get("id")ifnotpolicy_id:raisehttpx.RequestError(f"Malformed response: {response}")returnUUID(policy_id)asyncdefdelete_flow_run_notification_policy(self,id:UUID,)->None:""" Delete a flow run notification policy by id. Args: id: UUID of the flow run notification policy to delete. Raises: prefect.exceptions.ObjectNotFound: If request returns 404 httpx.RequestError: If requests fails """try:awaitself._client.delete(f"/flow_run_notification_policies/{id}")excepthttpx.HTTPStatusErrorase:ife.response.status_code==status.HTTP_404_NOT_FOUND:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raiseasyncdefupdate_flow_run_notification_policy(self,id:UUID,block_document_id:Optional[UUID]=None,is_active:Optional[bool]=None,tags:Optional[List[str]]=None,state_names:Optional[List[str]]=None,message_template:Optional[str]=None,)->None:""" Update a notification policy for flow runs Args: id: UUID of the notification policy block_document_id: The block document UUID is_active: Whether the notification policy is active tags: List of flow tags state_names: List of state names message_template: Notification message template Raises: prefect.exceptions.ObjectNotFound: If request returns 404 httpx.RequestError: If requests fails """params={}ifblock_document_idisnotNone:params["block_document_id"]=block_document_idifis_activeisnotNone:params["is_active"]=is_activeiftagsisnotNone:params["tags"]=tagsifstate_namesisnotNone:params["state_names"]=state_namesifmessage_templateisnotNone:params["message_template"]=message_templatepolicy=FlowRunNotificationPolicyUpdate(**params)try:awaitself._client.patch(f"/flow_run_notification_policies/{id}",json=policy.dict(json_compatible=True,exclude_unset=True),)excepthttpx.HTTPStatusErrorase:ife.response.status_code==status.HTTP_404_NOT_FOUND:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raiseasyncdefread_flow_run_notification_policies(self,flow_run_notification_policy_filter:FlowRunNotificationPolicyFilter,limit:Optional[int]=None,offset:int=0,)->List[FlowRunNotificationPolicy]:""" Query the Prefect API for flow run notification policies. Only policies matching all criteria will be returned. Args: flow_run_notification_policy_filter: filter criteria for notification policies limit: a limit for the notification policies query offset: an offset for the notification policies query Returns: a list of FlowRunNotificationPolicy model representations of the notification policies """body={"flow_run_notification_policy_filter":(flow_run_notification_policy_filter.dict(json_compatible=True)ifflow_run_notification_policy_filterelseNone),"limit":limit,"offset":offset,}response=awaitself._client.post("/flow_run_notification_policies/filter",json=body)returnpydantic.parse_obj_as(List[FlowRunNotificationPolicy],response.json())asyncdefread_logs(self,log_filter:LogFilter=None,limit:int=None,offset:int=None,sort:LogSort=LogSort.TIMESTAMP_ASC,)->List[Log]:""" Read flow and task run logs. """body={"logs":log_filter.dict(json_compatible=True)iflog_filterelseNone,"limit":limit,"offset":offset,"sort":sort,}response=awaitself._client.post("/logs/filter",json=body)returnpydantic.parse_obj_as(List[Log],response.json())asyncdefresolve_datadoc(self,datadoc:DataDocument)->Any:""" Recursively decode possibly nested data documents. "server" encoded documents will be retrieved from the server. Args: datadoc: The data document to resolve Returns: a decoded object, the innermost data """ifnotisinstance(datadoc,DataDocument):raiseTypeError(f"`resolve_datadoc` received invalid type {type(datadoc).__name__}")asyncdefresolve_inner(data):ifisinstance(data,bytes):try:data=DataDocument.parse_raw(data)exceptpydantic.ValidationError:returndataifisinstance(data,DataDocument):returnawaitresolve_inner(data.decode())returndatareturnawaitresolve_inner(datadoc)asyncdefsend_worker_heartbeat(self,work_pool_name:str,worker_name:str,heartbeat_interval_seconds:Optional[float]=None,):""" Sends a worker heartbeat for a given work pool. Args: work_pool_name: The name of the work pool to heartbeat against. worker_name: The name of the worker sending the heartbeat. """awaitself._client.post(f"/work_pools/{work_pool_name}/workers/heartbeat",json={"name":worker_name,"heartbeat_interval_seconds":heartbeat_interval_seconds,},)asyncdefread_workers_for_work_pool(self,work_pool_name:str,worker_filter:Optional[WorkerFilter]=None,offset:Optional[int]=None,limit:Optional[int]=None,)->List[Worker]:""" Reads workers for a given work pool. Args: work_pool_name: The name of the work pool for which to get member workers. worker_filter: Criteria by which to filter workers. limit: Limit for the worker query. offset: Limit for the worker query. """response=awaitself._client.post(f"/work_pools/{work_pool_name}/workers/filter",json={"worker_filter":(worker_filter.dict(json_compatible=True,exclude_unset=True)ifworker_filterelseNone),"offset":offset,"limit":limit,},)returnpydantic.parse_obj_as(List[Worker],response.json())asyncdefread_work_pool(self,work_pool_name:str)->WorkPool:""" Reads information for a given work pool Args: work_pool_name: The name of the work pool to for which to get information. Returns: Information about the requested work pool. """try:response=awaitself._client.get(f"/work_pools/{work_pool_name}")returnpydantic.parse_obj_as(WorkPool,response.json())excepthttpx.HTTPStatusErrorase:ife.response.status_code==status.HTTP_404_NOT_FOUND:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raiseasyncdefread_work_pools(self,limit:Optional[int]=None,offset:int=0,work_pool_filter:Optional[WorkPoolFilter]=None,)->List[WorkPool]:""" Reads work pools. Args: limit: Limit for the work pool query. offset: Offset for the work pool query. work_pool_filter: Criteria by which to filter work pools. Returns: A list of work pools. """body={"limit":limit,"offset":offset,"work_pools":(work_pool_filter.dict(json_compatible=True)ifwork_pool_filterelseNone),}response=awaitself._client.post("/work_pools/filter",json=body)returnpydantic.parse_obj_as(List[WorkPool],response.json())asyncdefcreate_work_pool(self,work_pool:WorkPoolCreate,)->WorkPool:""" Creates a work pool with the provided configuration. Args: work_pool: Desired configuration for the new work pool. Returns: Information about the newly created work pool. """try:response=awaitself._client.post("/work_pools/",json=work_pool.dict(json_compatible=True,exclude_unset=True),)excepthttpx.HTTPStatusErrorase:ife.response.status_code==status.HTTP_409_CONFLICT:raiseprefect.exceptions.ObjectAlreadyExists(http_exc=e)fromeelse:raisereturnpydantic.parse_obj_as(WorkPool,response.json())asyncdefupdate_work_pool(self,work_pool_name:str,work_pool:WorkPoolUpdate,):""" Updates a work pool. Args: work_pool_name: Name of the work pool to update. work_pool: Fields to update in the work pool. """try:awaitself._client.patch(f"/work_pools/{work_pool_name}",json=work_pool.dict(json_compatible=True,exclude_unset=True),)excepthttpx.HTTPStatusErrorase:ife.response.status_code==status.HTTP_404_NOT_FOUND:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raiseasyncdefdelete_work_pool(self,work_pool_name:str,):""" Deletes a work pool. Args: work_pool_name: Name of the work pool to delete. """try:awaitself._client.delete(f"/work_pools/{work_pool_name}")excepthttpx.HTTPStatusErrorase:ife.response.status_code==status.HTTP_404_NOT_FOUND:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raiseasyncdefread_work_queues(self,work_pool_name:Optional[str]=None,work_queue_filter:Optional[WorkQueueFilter]=None,limit:Optional[int]=None,offset:Optional[int]=None,)->List[WorkQueue]:""" Retrieves queues for a work pool. Args: work_pool_name: Name of the work pool for which to get queues. work_queue_filter: Criteria by which to filter queues. limit: Limit for the queue query. offset: Limit for the queue query. Returns: List of queues for the specified work pool. """json={"work_queues":(work_queue_filter.dict(json_compatible=True,exclude_unset=True)ifwork_queue_filterelseNone),"limit":limit,"offset":offset,}ifwork_pool_name:try:response=awaitself._client.post(f"/work_pools/{work_pool_name}/queues/filter",json=json,)excepthttpx.HTTPStatusErrorase:ife.response.status_code==status.HTTP_404_NOT_FOUND:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raiseelse:response=awaitself._client.post("/work_queues/filter",json=json)returnpydantic.parse_obj_as(List[WorkQueue],response.json())asyncdefget_scheduled_flow_runs_for_deployments(self,deployment_ids:List[UUID],scheduled_before:Optional[datetime.datetime]=None,limit:Optional[int]=None,):body:Dict[str,Any]=dict(deployment_ids=[str(id)foridindeployment_ids])ifscheduled_before:body["scheduled_before"]=str(scheduled_before)iflimit:body["limit"]=limitresponse=awaitself._client.post("/deployments/get_scheduled_flow_runs",json=body,)returnpydantic.parse_obj_as(List[FlowRunResponse],response.json())asyncdefget_scheduled_flow_runs_for_work_pool(self,work_pool_name:str,work_queue_names:Optional[List[str]]=None,scheduled_before:Optional[datetime.datetime]=None,)->List[WorkerFlowRunResponse]:""" Retrieves scheduled flow runs for the provided set of work pool queues. Args: work_pool_name: The name of the work pool that the work pool queues are associated with. work_queue_names: The names of the work pool queues from which to get scheduled flow runs. scheduled_before: Datetime used to filter returned flow runs. Flow runs scheduled for after the given datetime string will not be returned. Returns: A list of worker flow run responses containing information about the retrieved flow runs. """body:Dict[str,Any]={}ifwork_queue_namesisnotNone:body["work_queue_names"]=list(work_queue_names)ifscheduled_before:body["scheduled_before"]=str(scheduled_before)response=awaitself._client.post(f"/work_pools/{work_pool_name}/get_scheduled_flow_runs",json=body,)returnpydantic.parse_obj_as(List[WorkerFlowRunResponse],response.json())asyncdefcreate_artifact(self,artifact:ArtifactCreate,)->Artifact:""" Creates an artifact with the provided configuration. Args: artifact: Desired configuration for the new artifact. Returns: Information about the newly created artifact. """response=awaitself._client.post("/artifacts/",json=artifact.dict(json_compatible=True,exclude_unset=True),)returnpydantic.parse_obj_as(Artifact,response.json())asyncdefread_artifacts(self,*,artifact_filter:ArtifactFilter=None,flow_run_filter:FlowRunFilter=None,task_run_filter:TaskRunFilter=None,sort:ArtifactSort=None,limit:int=None,offset:int=0,)->List[Artifact]:""" Query the Prefect API for artifacts. Only artifacts matching all criteria will be returned. Args: artifact_filter: filter criteria for artifacts flow_run_filter: filter criteria for flow runs task_run_filter: filter criteria for task runs sort: sort criteria for the artifacts limit: limit for the artifact query offset: offset for the artifact query Returns: a list of Artifact model representations of the artifacts """body={"artifacts":(artifact_filter.dict(json_compatible=True)ifartifact_filterelseNone),"flow_runs":(flow_run_filter.dict(json_compatible=True)ifflow_run_filterelseNone),"task_runs":(task_run_filter.dict(json_compatible=True)iftask_run_filterelseNone),"sort":sort,"limit":limit,"offset":offset,}response=awaitself._client.post("/artifacts/filter",json=body)returnpydantic.parse_obj_as(List[Artifact],response.json())asyncdefread_latest_artifacts(self,*,artifact_filter:ArtifactCollectionFilter=None,flow_run_filter:FlowRunFilter=None,task_run_filter:TaskRunFilter=None,sort:ArtifactCollectionSort=None,limit:int=None,offset:int=0,)->List[ArtifactCollection]:""" Query the Prefect API for artifacts. Only artifacts matching all criteria will be returned. Args: artifact_filter: filter criteria for artifacts flow_run_filter: filter criteria for flow runs task_run_filter: filter criteria for task runs sort: sort criteria for the artifacts limit: limit for the artifact query offset: offset for the artifact query Returns: a list of Artifact model representations of the artifacts """body={"artifacts":(artifact_filter.dict(json_compatible=True)ifartifact_filterelseNone),"flow_runs":(flow_run_filter.dict(json_compatible=True)ifflow_run_filterelseNone),"task_runs":(task_run_filter.dict(json_compatible=True)iftask_run_filterelseNone),"sort":sort,"limit":limit,"offset":offset,}response=awaitself._client.post("/artifacts/latest/filter",json=body)returnpydantic.parse_obj_as(List[ArtifactCollection],response.json())asyncdefdelete_artifact(self,artifact_id:UUID)->None:""" Deletes an artifact with the provided id. Args: artifact_id: The id of the artifact to delete. """try:awaitself._client.delete(f"/artifacts/{artifact_id}")excepthttpx.HTTPStatusErrorase:ife.response.status_code==404:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raiseasyncdefcreate_variable(self,variable:VariableCreate)->Variable:""" Creates an variable with the provided configuration. Args: variable: Desired configuration for the new variable. Returns: Information about the newly created variable. """response=awaitself._client.post("/variables/",json=variable.dict(json_compatible=True,exclude_unset=True),)returnVariable(**response.json())asyncdefupdate_variable(self,variable:VariableUpdate)->None:""" Updates a variable with the provided configuration. Args: variable: Desired configuration for the updated variable. Returns: Information about the updated variable. """awaitself._client.patch(f"/variables/name/{variable.name}",json=variable.dict(json_compatible=True,exclude_unset=True),)asyncdefread_variable_by_name(self,name:str)->Optional[Variable]:"""Reads a variable by name. Returns None if no variable is found."""try:response=awaitself._client.get(f"/variables/name/{name}")returnVariable(**response.json())excepthttpx.HTTPStatusErrorase:ife.response.status_code==status.HTTP_404_NOT_FOUND:returnNoneelse:raiseasyncdefdelete_variable_by_name(self,name:str):"""Deletes a variable by name."""try:awaitself._client.delete(f"/variables/name/{name}")excepthttpx.HTTPStatusErrorase:ife.response.status_code==404:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raiseasyncdefread_variables(self,limit:int=None)->List[Variable]:"""Reads all variables."""response=awaitself._client.post("/variables/filter",json={"limit":limit})returnpydantic.parse_obj_as(List[Variable],response.json())asyncdefread_worker_metadata(self)->Dict[str,Any]:"""Reads worker metadata stored in Prefect collection registry."""response=awaitself._client.get("collections/views/aggregate-worker-metadata")response.raise_for_status()returnresponse.json()asyncdefincrement_concurrency_slots(self,names:List[str],slots:int,mode:str)->httpx.Response:returnawaitself._client.post("/v2/concurrency_limits/increment",json={"names":names,"slots":slots,"mode":mode},)asyncdefrelease_concurrency_slots(self,names:List[str],slots:int,occupancy_seconds:float)->httpx.Response:returnawaitself._client.post("/v2/concurrency_limits/decrement",json={"names":names,"slots":slots,"occupancy_seconds":occupancy_seconds,},)asyncdefcreate_global_concurrency_limit(self,concurrency_limit:GlobalConcurrencyLimitCreate)->UUID:response=awaitself._client.post("/v2/concurrency_limits/",json=concurrency_limit.dict(json_compatible=True,exclude_unset=True),)returnUUID(response.json()["id"])asyncdefupdate_global_concurrency_limit(self,name:str,concurrency_limit:GlobalConcurrencyLimitUpdate)->httpx.Response:try:response=awaitself._client.patch(f"/v2/concurrency_limits/{name}",json=concurrency_limit.dict(json_compatible=True,exclude_unset=True),)returnresponseexcepthttpx.HTTPStatusErrorase:ife.response.status_code==status.HTTP_404_NOT_FOUND:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raiseasyncdefdelete_global_concurrency_limit_by_name(self,name:str)->httpx.Response:try:response=awaitself._client.delete(f"/v2/concurrency_limits/{name}")returnresponseexcepthttpx.HTTPStatusErrorase:ife.response.status_code==status.HTTP_404_NOT_FOUND:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raiseasyncdefread_global_concurrency_limit_by_name(self,name:str)->GlobalConcurrencyLimitResponse:try:response=awaitself._client.get(f"/v2/concurrency_limits/{name}")returnGlobalConcurrencyLimitResponse.parse_obj(response.json())excepthttpx.HTTPStatusErrorase:ife.response.status_code==status.HTTP_404_NOT_FOUND:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raiseasyncdefread_global_concurrency_limits(self,limit:int=10,offset:int=0)->List[GlobalConcurrencyLimitResponse]:response=awaitself._client.post("/v2/concurrency_limits/filter",json={"limit":limit,"offset":offset,},)returnpydantic.parse_obj_as(List[GlobalConcurrencyLimitResponse],response.json())asyncdefcreate_flow_run_input(self,flow_run_id:UUID,key:str,value:str,sender:Optional[str]=None):""" Creates a flow run input. Args: flow_run_id: The flow run id. key: The input key. value: The input value. sender: The sender of the input. """# Initialize the input to ensure that the key is valid.FlowRunInput(flow_run_id=flow_run_id,key=key,value=value)response=awaitself._client.post(f"/flow_runs/{flow_run_id}/input",json={"key":key,"value":value,"sender":sender},)response.raise_for_status()asyncdeffilter_flow_run_input(self,flow_run_id:UUID,key_prefix:str,limit:int,exclude_keys:Set[str])->List[FlowRunInput]:response=awaitself._client.post(f"/flow_runs/{flow_run_id}/input/filter",json={"prefix":key_prefix,"limit":limit,"exclude_keys":list(exclude_keys),},)response.raise_for_status()returnpydantic.parse_obj_as(List[FlowRunInput],response.json())asyncdefread_flow_run_input(self,flow_run_id:UUID,key:str)->str:""" Reads a flow run input. Args: flow_run_id: The flow run id. key: The input key. """response=awaitself._client.get(f"/flow_runs/{flow_run_id}/input/{key}")response.raise_for_status()returnresponse.content.decode()asyncdefdelete_flow_run_input(self,flow_run_id:UUID,key:str):""" Deletes a flow run input. Args: flow_run_id: The flow run id. key: The input key. """response=awaitself._client.delete(f"/flow_runs/{flow_run_id}/input/{key}")response.raise_for_status()def_raise_for_unsupported_automations(self)->NoReturn:ifnotPREFECT_EXPERIMENTAL_EVENTS:raiseRuntimeError("The current server and client configuration does not support ""events. Enable experimental events support with the ""PREFECT_EXPERIMENTAL_EVENTS setting.")else:raiseRuntimeError("The current server and client configuration does not support ""automations. Enable experimental automations with the ""PREFECT_API_SERVICES_TRIGGERS_ENABLED setting.")asyncdefcreate_automation(self,automation:AutomationCore)->UUID:"""Creates an automation in Prefect Cloud."""ifnotself.server_type.supports_automations():self._raise_for_unsupported_automations()response=awaitself._client.post("/automations/",json=automation.dict(json_compatible=True),)returnUUID(response.json()["id"])asyncdefupdate_automation(self,automation_id:UUID,automation:AutomationCore):"""Updates an automation in Prefect Cloud."""ifnotself.server_type.supports_automations():self._raise_for_unsupported_automations()response=awaitself._client.put(f"/automations/{automation_id}",json=automation.dict(json_compatible=True,exclude_unset=True),)response.raise_for_statusasyncdefread_automations(self)->List[Automation]:ifnotself.server_type.supports_automations():self._raise_for_unsupported_automations()response=awaitself._client.post("/automations/filter")response.raise_for_status()returnpydantic.parse_obj_as(List[Automation],response.json())asyncdeffind_automation(self,id_or_name:Union[str,UUID],exit_if_not_found:bool=True)->Optional[Automation]:ifisinstance(id_or_name,str):try:id=UUID(id_or_name)exceptValueError:id=Noneelifisinstance(id_or_name,UUID):id=id_or_nameifid:try:automation=awaitself.read_automation(id)returnautomationexceptprefect.exceptions.HTTPStatusErrorase:ife.response.status_code==status.HTTP_404_NOT_FOUND:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeautomations=awaitself.read_automations()# Look for it by an exact nameforautomationinautomations:ifautomation.name==id_or_name:returnautomation# Look for it by a case-insensitive nameforautomationinautomations:ifautomation.name.lower()==id_or_name.lower():returnautomationreturnNoneasyncdefread_automation(self,automation_id:UUID)->Optional[Automation]:ifnotself.server_type.supports_automations():self._raise_for_unsupported_automations()response=awaitself._client.get(f"/automations/{automation_id}")ifresponse.status_code==404:returnNoneresponse.raise_for_status()returnAutomation.parse_obj(response.json())asyncdefread_automations_by_name(self,name:str)->List[Automation]:""" Query the Prefect API for an automation by name. Only automations matching the provided name will be returned. Args: name: the name of the automation to query Returns: a list of Automation model representations of the automations """ifnotself.server_type.supports_automations():self._raise_for_unsupported_automations()automation_filter=filters.AutomationFilter(name=dict(any_=[name]))response=awaitself._client.post("/automations/filter",json={"sort":sorting.AutomationSort.UPDATED_DESC,"automations":automation_filter.dict(json_compatible=True)ifautomation_filterelseNone,},)response.raise_for_status()returnpydantic.parse_obj_as(List[Automation],response.json())asyncdefpause_automation(self,automation_id:UUID):ifnotself.server_type.supports_automations():self._raise_for_unsupported_automations()response=awaitself._client.patch(f"/automations/{automation_id}",json={"enabled":False})response.raise_for_status()asyncdefresume_automation(self,automation_id:UUID):ifnotself.server_type.supports_automations():self._raise_for_unsupported_automations()response=awaitself._client.patch(f"/automations/{automation_id}",json={"enabled":True})response.raise_for_status()asyncdefdelete_automation(self,automation_id:UUID):ifnotself.server_type.supports_automations():self._raise_for_unsupported_automations()response=awaitself._client.delete(f"/automations/{automation_id}")ifresponse.status_code==404:returnresponse.raise_for_status()asyncdefread_resource_related_automations(self,resource_id:str)->List[Automation]:ifnotself.server_type.supports_automations():self._raise_for_unsupported_automations()response=awaitself._client.get(f"/automations/related-to/{resource_id}")response.raise_for_status()returnpydantic.parse_obj_as(List[Automation],response.json())asyncdefdelete_resource_owned_automations(self,resource_id:str):ifnotself.server_type.supports_automations():self._raise_for_unsupported_automations()awaitself._client.delete(f"/automations/owned-by/{resource_id}")asyncdef__aenter__(self):""" Start the client. If the client is already started, this will raise an exception. If the client is already closed, this will raise an exception. Use a new client instance instead. """ifself._closed:# httpx.AsyncClient does not allow reuse so we will not either.raiseRuntimeError("The client cannot be started again after closing. ""Retrieve a new client with `get_client()` instead.")ifself._started:# httpx.AsyncClient does not allow reentrancy so we will not either.raiseRuntimeError("The client cannot be started more than once.")self._loop=asyncio.get_running_loop()awaitself._exit_stack.__aenter__()# Enter a lifespan context if using an ephemeral application.# See https://github.com/encode/httpx/issues/350ifself._ephemeral_appandself.manage_lifespan:self._ephemeral_lifespan=awaitself._exit_stack.enter_async_context(app_lifespan_context(self._ephemeral_app))ifself._ephemeral_app:self.logger.debug("Using ephemeral application with database at "f"{PREFECT_API_DATABASE_CONNECTION_URL.value()}")else:self.logger.debug(f"Connecting to API at {self.api_url}")# Enter the httpx client's contextawaitself._exit_stack.enter_async_context(self._client)self._started=Truereturnselfasyncdef__aexit__(self,*exc_info):""" Shutdown the client. """self._closed=Truereturnawaitself._exit_stack.__aexit__(*exc_info)def__enter__(self):raiseRuntimeError("The `PrefectClient` must be entered with an async context. Use 'async ""with PrefectClient(...)' not 'with PrefectClient(...)'")def__exit__(self,*_):assertFalse,"This should never be called but must be defined for __enter__"
Attempts to connect to the API and returns the encountered exception if not
successful.
If successful, returns None.
Source code in src/prefect/client/orchestration.py
407408409410411412413414415416417418
asyncdefapi_healthcheck(self)->Optional[Exception]:""" Attempts to connect to the API and returns the encountered exception if not successful. If successful, returns `None`. """try:awaitself._client.get("/health")returnNoneexceptExceptionasexc:returnexc
Source code in src/prefect/client/orchestration.py
426427428429430431432433434435436437438439
asyncdefcreate_flow(self,flow:"FlowObject")->UUID:""" Create a flow in the Prefect API. Args: flow: a [Flow][prefect.flows.Flow] object Raises: httpx.RequestError: if a flow was not created for any reason Returns: the ID of the flow in the backend """returnawaitself.create_flow_from_name(flow.name)
asyncdefcreate_flow_from_name(self,flow_name:str)->UUID:""" Create a flow in the Prefect API. Args: flow_name: the name of the new flow Raises: httpx.RequestError: if a flow was not created for any reason Returns: the ID of the flow in the backend """flow_data=FlowCreate(name=flow_name)response=awaitself._client.post("/flows/",json=flow_data.dict(json_compatible=True))flow_id=response.json().get("id")ifnotflow_id:raisehttpx.RequestError(f"Malformed response: {response}")# Return the id of the created flowreturnUUID(flow_id)
Source code in src/prefect/client/orchestration.py
466467468469470471472473474475476477
asyncdefread_flow(self,flow_id:UUID)->Flow:""" Query the Prefect API for a flow by id. Args: flow_id: the flow ID of interest Returns: a [Flow model][prefect.client.schemas.objects.Flow] representation of the flow """response=awaitself._client.get(f"/flows/{flow_id}")returnFlow.parse_obj(response.json())
asyncdefread_flows(self,*,flow_filter:FlowFilter=None,flow_run_filter:FlowRunFilter=None,task_run_filter:TaskRunFilter=None,deployment_filter:DeploymentFilter=None,work_pool_filter:WorkPoolFilter=None,work_queue_filter:WorkQueueFilter=None,sort:FlowSort=None,limit:int=None,offset:int=0,)->List[Flow]:""" Query the Prefect API for flows. Only flows matching all criteria will be returned. Args: flow_filter: filter criteria for flows flow_run_filter: filter criteria for flow runs task_run_filter: filter criteria for task runs deployment_filter: filter criteria for deployments work_pool_filter: filter criteria for work pools work_queue_filter: filter criteria for work pool queues sort: sort criteria for the flows limit: limit for the flow query offset: offset for the flow query Returns: a list of Flow model representations of the flows """body={"flows":flow_filter.dict(json_compatible=True)ifflow_filterelseNone,"flow_runs":(flow_run_filter.dict(json_compatible=True,exclude_unset=True)ifflow_run_filterelseNone),"task_runs":(task_run_filter.dict(json_compatible=True)iftask_run_filterelseNone),"deployments":(deployment_filter.dict(json_compatible=True)ifdeployment_filterelseNone),"work_pools":(work_pool_filter.dict(json_compatible=True)ifwork_pool_filterelseNone),"work_queues":(work_queue_filter.dict(json_compatible=True)ifwork_queue_filterelseNone),"sort":sort,"limit":limit,"offset":offset,}response=awaitself._client.post("/flows/filter",json=body)returnpydantic.parse_obj_as(List[Flow],response.json())
Source code in src/prefect/client/orchestration.py
543544545546547548549550551552553554555556557
asyncdefread_flow_by_name(self,flow_name:str,)->Flow:""" Query the Prefect API for a flow by name. Args: flow_name: the name of a flow Returns: a fully hydrated Flow model """response=awaitself._client.get(f"/flows/name/{flow_name}")returnFlow.parse_obj(response.json())
The initial state for the run. If not provided, defaults to
Scheduled for now. Should always be a Scheduled type.
None
name
str
An optional name for the flow run. If not provided, the server will
generate a name.
None
tags
Iterable[str]
An optional iterable of tags to apply to the flow run; these tags
are merged with the deployment's tags.
None
idempotency_key
str
Optional idempotency key for creation of the flow run.
If the key matches the key of an existing flow run, the existing run will
be returned instead of creating a new one.
None
parent_task_run_id
UUID
if a subflow run is being created, the placeholder task
run identifier in the parent flow
None
work_queue_name
str
An optional work queue name to add this run to. If not provided,
will default to the deployment's set work queue. If one is provided that does not
exist, a new work queue will be created within the deployment's work pool.
None
job_variables
Optional[Dict[str, Any]]
Optional variables that will be supplied to the flow run job.
None
Raises:
Type
Description
RequestError
if the Prefect API does not successfully create a run for any reason
asyncdefcreate_flow_run_from_deployment(self,deployment_id:UUID,*,parameters:Optional[Dict[str,Any]]=None,context:Optional[Dict[str,Any]]=None,state:prefect.states.State=None,name:str=None,tags:Iterable[str]=None,idempotency_key:str=None,parent_task_run_id:UUID=None,work_queue_name:str=None,job_variables:Optional[Dict[str,Any]]=None,)->FlowRun:""" Create a flow run for a deployment. Args: deployment_id: The deployment ID to create the flow run from parameters: Parameter overrides for this flow run. Merged with the deployment defaults context: Optional run context data state: The initial state for the run. If not provided, defaults to `Scheduled` for now. Should always be a `Scheduled` type. name: An optional name for the flow run. If not provided, the server will generate a name. tags: An optional iterable of tags to apply to the flow run; these tags are merged with the deployment's tags. idempotency_key: Optional idempotency key for creation of the flow run. If the key matches the key of an existing flow run, the existing run will be returned instead of creating a new one. parent_task_run_id: if a subflow run is being created, the placeholder task run identifier in the parent flow work_queue_name: An optional work queue name to add this run to. If not provided, will default to the deployment's set work queue. If one is provided that does not exist, a new work queue will be created within the deployment's work pool. job_variables: Optional variables that will be supplied to the flow run job. Raises: httpx.RequestError: if the Prefect API does not successfully create a run for any reason Returns: The flow run model """parameters=parametersor{}context=contextor{}state=stateorprefect.states.Scheduled()tags=tagsor[]flow_run_create=DeploymentFlowRunCreate(parameters=parameters,context=context,state=state.to_state_create(),tags=tags,name=name,idempotency_key=idempotency_key,parent_task_run_id=parent_task_run_id,job_variables=job_variables,)# done separately to avoid including this field in payloads sent to older API versionsifwork_queue_name:flow_run_create.work_queue_name=work_queue_nameresponse=awaitself._client.post(f"/deployments/{deployment_id}/create_flow_run",json=flow_run_create.dict(json_compatible=True,exclude_unset=True),)returnFlowRun.parse_obj(response.json())
asyncdefcreate_flow_run(self,flow:"FlowObject",name:Optional[str]=None,parameters:Optional[Dict[str,Any]]=None,context:Optional[Dict[str,Any]]=None,tags:Optional[Iterable[str]]=None,parent_task_run_id:Optional[UUID]=None,state:Optional["prefect.states.State"]=None,)->FlowRun:""" Create a flow run for a flow. Args: flow: The flow model to create the flow run for name: An optional name for the flow run parameters: Parameter overrides for this flow run. context: Optional run context data tags: a list of tags to apply to this flow run parent_task_run_id: if a subflow run is being created, the placeholder task run identifier in the parent flow state: The initial state for the run. If not provided, defaults to `Scheduled` for now. Should always be a `Scheduled` type. Raises: httpx.RequestError: if the Prefect API does not successfully create a run for any reason Returns: The flow run model """parameters=parametersor{}context=contextor{}ifstateisNone:state=prefect.states.Pending()# Retrieve the flow idflow_id=awaitself.create_flow(flow)flow_run_create=FlowRunCreate(flow_id=flow_id,flow_version=flow.version,name=name,parameters=parameters,context=context,tags=list(tagsor[]),parent_task_run_id=parent_task_run_id,state=state.to_state_create(),empirical_policy=FlowRunPolicy(retries=flow.retries,retry_delay=flow.retry_delay_seconds,),)flow_run_create_json=flow_run_create.dict(json_compatible=True)response=awaitself._client.post("/flow_runs/",json=flow_run_create_json)flow_run=FlowRun.parse_obj(response.json())# Restore the parameters to the local objects to retain expectations about# Python objectsflow_run.parameters=parametersreturnflow_run
asyncdefupdate_flow_run(self,flow_run_id:UUID,flow_version:Optional[str]=None,parameters:Optional[dict]=None,name:Optional[str]=None,tags:Optional[Iterable[str]]=None,empirical_policy:Optional[FlowRunPolicy]=None,infrastructure_pid:Optional[str]=None,job_variables:Optional[dict]=None,)->httpx.Response:""" Update a flow run's details. Args: flow_run_id: The identifier for the flow run to update. flow_version: A new version string for the flow run. parameters: A dictionary of parameter values for the flow run. This will not be merged with any existing parameters. name: A new name for the flow run. empirical_policy: A new flow run orchestration policy. This will not be merged with any existing policy. tags: An iterable of new tags for the flow run. These will not be merged with any existing tags. infrastructure_pid: The id of flow run as returned by an infrastructure block. Returns: an `httpx.Response` object from the PATCH request """params={}ifflow_versionisnotNone:params["flow_version"]=flow_versionifparametersisnotNone:params["parameters"]=parametersifnameisnotNone:params["name"]=nameiftagsisnotNone:params["tags"]=tagsifempirical_policyisnotNone:params["empirical_policy"]=empirical_policyifinfrastructure_pid:params["infrastructure_pid"]=infrastructure_pidifjob_variablesisnotNone:params["job_variables"]=job_variablesflow_run_data=FlowRunUpdate(**params)returnawaitself._client.patch(f"/flow_runs/{flow_run_id}",json=flow_run_data.dict(json_compatible=True,exclude_unset=True),)
asyncdefdelete_flow_run(self,flow_run_id:UUID,)->None:""" Delete a flow run by UUID. Args: flow_run_id: The flow run UUID of interest. Raises: prefect.exceptions.ObjectNotFound: If request returns 404 httpx.RequestError: If requests fails """try:awaitself._client.delete(f"/flow_runs/{flow_run_id}")excepthttpx.HTTPStatusErrorase:ife.response.status_code==status.HTTP_404_NOT_FOUND:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raise
asyncdefcreate_concurrency_limit(self,tag:str,concurrency_limit:int,)->UUID:""" Create a tag concurrency limit in the Prefect API. These limits govern concurrently running tasks. Args: tag: a tag the concurrency limit is applied to concurrency_limit: the maximum number of concurrent task runs for a given tag Raises: httpx.RequestError: if the concurrency limit was not created for any reason Returns: the ID of the concurrency limit in the backend """concurrency_limit_create=ConcurrencyLimitCreate(tag=tag,concurrency_limit=concurrency_limit,)response=awaitself._client.post("/concurrency_limits/",json=concurrency_limit_create.dict(json_compatible=True),)concurrency_limit_id=response.json().get("id")ifnotconcurrency_limit_id:raisehttpx.RequestError(f"Malformed response: {response}")returnUUID(concurrency_limit_id)
asyncdefread_concurrency_limit_by_tag(self,tag:str,):""" Read the concurrency limit set on a specific tag. Args: tag: a tag the concurrency limit is applied to Raises: prefect.exceptions.ObjectNotFound: If request returns 404 httpx.RequestError: if the concurrency limit was not created for any reason Returns: the concurrency limit set on a specific tag """try:response=awaitself._client.get(f"/concurrency_limits/tag/{tag}",)excepthttpx.HTTPStatusErrorase:ife.response.status_code==status.HTTP_404_NOT_FOUND:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raiseconcurrency_limit_id=response.json().get("id")ifnotconcurrency_limit_id:raisehttpx.RequestError(f"Malformed response: {response}")concurrency_limit=ConcurrencyLimit.parse_obj(response.json())returnconcurrency_limit
asyncdefread_concurrency_limits(self,limit:int,offset:int,):""" Lists concurrency limits set on task run tags. Args: limit: the maximum number of concurrency limits returned offset: the concurrency limit query offset Returns: a list of concurrency limits """body={"limit":limit,"offset":offset,}response=awaitself._client.post("/concurrency_limits/filter",json=body)returnpydantic.parse_obj_as(List[ConcurrencyLimit],response.json())
Resets the concurrency limit slots set on a specific tag.
Parameters:
Name
Type
Description
Default
tag
str
a tag the concurrency limit is applied to
required
slot_override
Optional[List[Union[UUID, str]]]
a list of task run IDs that are currently using a
concurrency slot, please check that any task run IDs included in
slot_override are currently running, otherwise those concurrency
slots will never be released.
asyncdefreset_concurrency_limit_by_tag(self,tag:str,slot_override:Optional[List[Union[UUID,str]]]=None,):""" Resets the concurrency limit slots set on a specific tag. Args: tag: a tag the concurrency limit is applied to slot_override: a list of task run IDs that are currently using a concurrency slot, please check that any task run IDs included in `slot_override` are currently running, otherwise those concurrency slots will never be released. Raises: prefect.exceptions.ObjectNotFound: If request returns 404 httpx.RequestError: If request fails """ifslot_overrideisnotNone:slot_override=[str(slot)forslotinslot_override]try:awaitself._client.post(f"/concurrency_limits/tag/{tag}/reset",json=dict(slot_override=slot_override),)excepthttpx.HTTPStatusErrorase:ife.response.status_code==status.HTTP_404_NOT_FOUND:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raise
asyncdefdelete_concurrency_limit_by_tag(self,tag:str,):""" Delete the concurrency limit set on a specific tag. Args: tag: a tag the concurrency limit is applied to Raises: prefect.exceptions.ObjectNotFound: If request returns 404 httpx.RequestError: If request fails """try:awaitself._client.delete(f"/concurrency_limits/tag/{tag}",)excepthttpx.HTTPStatusErrorase:ife.response.status_code==status.HTTP_404_NOT_FOUND:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raise
asyncdefcreate_work_queue(self,name:str,tags:Optional[List[str]]=None,description:Optional[str]=None,is_paused:Optional[bool]=None,concurrency_limit:Optional[int]=None,priority:Optional[int]=None,work_pool_name:Optional[str]=None,)->WorkQueue:""" Create a work queue. Args: name: a unique name for the work queue tags: DEPRECATED: an optional list of tags to filter on; only work scheduled with these tags will be included in the queue. This option will be removed on 2023-02-23. description: An optional description for the work queue. is_paused: Whether or not the work queue is paused. concurrency_limit: An optional concurrency limit for the work queue. priority: The queue's priority. Lower values are higher priority (1 is the highest). work_pool_name: The name of the work pool to use for this queue. Raises: prefect.exceptions.ObjectAlreadyExists: If request returns 409 httpx.RequestError: If request fails Returns: The created work queue """iftags:warnings.warn(("The use of tags for creating work queue filters is deprecated."" This option will be removed on 2023-02-23."),DeprecationWarning,)filter=QueueFilter(tags=tags)else:filter=Nonecreate_model=WorkQueueCreate(name=name,filter=filter)ifdescriptionisnotNone:create_model.description=descriptionifis_pausedisnotNone:create_model.is_paused=is_pausedifconcurrency_limitisnotNone:create_model.concurrency_limit=concurrency_limitifpriorityisnotNone:create_model.priority=prioritydata=create_model.dict(json_compatible=True)try:ifwork_pool_nameisnotNone:response=awaitself._client.post(f"/work_pools/{work_pool_name}/queues",json=data)else:response=awaitself._client.post("/work_queues/",json=data)excepthttpx.HTTPStatusErrorase:ife.response.status_code==status.HTTP_409_CONFLICT:raiseprefect.exceptions.ObjectAlreadyExists(http_exc=e)fromeelife.response.status_code==status.HTTP_404_NOT_FOUND:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raisereturnWorkQueue.parse_obj(response.json())
asyncdefread_work_queue_by_name(self,name:str,work_pool_name:Optional[str]=None,)->WorkQueue:""" Read a work queue by name. Args: name (str): a unique name for the work queue work_pool_name (str, optional): the name of the work pool the queue belongs to. Raises: prefect.exceptions.ObjectNotFound: if no work queue is found httpx.HTTPStatusError: other status errors Returns: WorkQueue: a work queue API object """try:ifwork_pool_nameisnotNone:response=awaitself._client.get(f"/work_pools/{work_pool_name}/queues/{name}")else:response=awaitself._client.get(f"/work_queues/name/{name}")excepthttpx.HTTPStatusErrorase:ife.response.status_code==status.HTTP_404_NOT_FOUND:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raisereturnWorkQueue.parse_obj(response.json())
asyncdefupdate_work_queue(self,id:UUID,**kwargs):""" Update properties of a work queue. Args: id: the ID of the work queue to update **kwargs: the fields to update Raises: ValueError: if no kwargs are provided prefect.exceptions.ObjectNotFound: if request returns 404 httpx.RequestError: if the request fails """ifnotkwargs:raiseValueError("No fields provided to update.")data=WorkQueueUpdate(**kwargs).dict(json_compatible=True,exclude_unset=True)try:awaitself._client.patch(f"/work_queues/{id}",json=data)excepthttpx.HTTPStatusErrorase:ife.response.status_code==status.HTTP_404_NOT_FOUND:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raise
asyncdefget_runs_in_work_queue(self,id:UUID,limit:int=10,scheduled_before:datetime.datetime=None,)->List[FlowRun]:""" Read flow runs off a work queue. Args: id: the id of the work queue to read from limit: a limit on the number of runs to return scheduled_before: a timestamp; only runs scheduled before this time will be returned. Defaults to now. Raises: prefect.exceptions.ObjectNotFound: If request returns 404 httpx.RequestError: If request fails Returns: List[FlowRun]: a list of FlowRun objects read from the queue """ifscheduled_beforeisNone:scheduled_before=pendulum.now("UTC")try:response=awaitself._client.post(f"/work_queues/{id}/get_runs",json={"limit":limit,"scheduled_before":scheduled_before.isoformat(),},)excepthttpx.HTTPStatusErrorase:ife.response.status_code==status.HTTP_404_NOT_FOUND:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raisereturnpydantic.parse_obj_as(List[FlowRun],response.json())
asyncdefread_work_queue(self,id:UUID,)->WorkQueue:""" Read a work queue. Args: id: the id of the work queue to load Raises: prefect.exceptions.ObjectNotFound: If request returns 404 httpx.RequestError: If request fails Returns: WorkQueue: an instantiated WorkQueue object """try:response=awaitself._client.get(f"/work_queues/{id}")excepthttpx.HTTPStatusErrorase:ife.response.status_code==status.HTTP_404_NOT_FOUND:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raisereturnWorkQueue.parse_obj(response.json())
asyncdefread_work_queue_status(self,id:UUID,)->WorkQueueStatusDetail:""" Read a work queue status. Args: id: the id of the work queue to load Raises: prefect.exceptions.ObjectNotFound: If request returns 404 httpx.RequestError: If request fails Returns: WorkQueueStatus: an instantiated WorkQueueStatus object """try:response=awaitself._client.get(f"/work_queues/{id}/status")excepthttpx.HTTPStatusErrorase:ife.response.status_code==status.HTTP_404_NOT_FOUND:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raisereturnWorkQueueStatusDetail.parse_obj(response.json())
asyncdefmatch_work_queues(self,prefixes:List[str],work_pool_name:Optional[str]=None,)->List[WorkQueue]:""" Query the Prefect API for work queues with names with a specific prefix. Args: prefixes: a list of strings used to match work queue name prefixes work_pool_name: an optional work pool name to scope the query to Returns: a list of WorkQueue model representations of the work queues """page_length=100current_page=0work_queues=[]whileTrue:new_queues=awaitself.read_work_queues(work_pool_name=work_pool_name,offset=current_page*page_length,limit=page_length,work_queue_filter=WorkQueueFilter(name=WorkQueueFilterName(startswith_=prefixes)),)ifnotnew_queues:breakwork_queues+=new_queuescurrent_page+=1returnwork_queues
asyncdefdelete_work_queue_by_id(self,id:UUID,):""" Delete a work queue by its ID. Args: id: the id of the work queue to delete Raises: prefect.exceptions.ObjectNotFound: If request returns 404 httpx.RequestError: If requests fails """try:awaitself._client.delete(f"/work_queues/{id}",)excepthttpx.HTTPStatusErrorase:ife.response.status_code==status.HTTP_404_NOT_FOUND:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raise
asyncdefcreate_block_type(self,block_type:BlockTypeCreate)->BlockType:""" Create a block type in the Prefect API. """try:response=awaitself._client.post("/block_types/",json=block_type.dict(json_compatible=True,exclude_unset=True,exclude={"id"}),)excepthttpx.HTTPStatusErrorase:ife.response.status_code==status.HTTP_409_CONFLICT:raiseprefect.exceptions.ObjectAlreadyExists(http_exc=e)fromeelse:raisereturnBlockType.parse_obj(response.json())
asyncdefcreate_block_schema(self,block_schema:BlockSchemaCreate)->BlockSchema:""" Create a block schema in the Prefect API. """try:response=awaitself._client.post("/block_schemas/",json=block_schema.dict(json_compatible=True,exclude_unset=True,exclude={"id","block_type","checksum"},),)excepthttpx.HTTPStatusErrorase:ife.response.status_code==status.HTTP_409_CONFLICT:raiseprefect.exceptions.ObjectAlreadyExists(http_exc=e)fromeelse:raisereturnBlockSchema.parse_obj(response.json())
Create a block document in the Prefect API. This data is used to configure a
corresponding Block.
Parameters:
Name
Type
Description
Default
include_secrets
bool
whether to include secret values
on the stored Block, corresponding to Pydantic's SecretStr and
SecretBytes fields. Note Blocks may not work as expected if
this is set to False.
True
Source code in src/prefect/client/orchestration.py
asyncdefcreate_block_document(self,block_document:Union[BlockDocument,BlockDocumentCreate],include_secrets:bool=True,)->BlockDocument:""" Create a block document in the Prefect API. This data is used to configure a corresponding Block. Args: include_secrets (bool): whether to include secret values on the stored Block, corresponding to Pydantic's `SecretStr` and `SecretBytes` fields. Note Blocks may not work as expected if this is set to `False`. """ifisinstance(block_document,BlockDocument):block_document=BlockDocumentCreate.parse_obj(block_document.dict(json_compatible=True,include_secrets=include_secrets,exclude_unset=True,exclude={"id","block_schema","block_type"},),)try:response=awaitself._client.post("/block_documents/",json=block_document.dict(json_compatible=True,include_secrets=include_secrets,exclude_unset=True,exclude={"id","block_schema","block_type"},),)excepthttpx.HTTPStatusErrorase:ife.response.status_code==status.HTTP_409_CONFLICT:raiseprefect.exceptions.ObjectAlreadyExists(http_exc=e)fromeelse:raisereturnBlockDocument.parse_obj(response.json())
asyncdefupdate_block_document(self,block_document_id:UUID,block_document:BlockDocumentUpdate,):""" Update a block document in the Prefect API. """try:awaitself._client.patch(f"/block_documents/{block_document_id}",json=block_document.dict(json_compatible=True,exclude_unset=True,include={"data","merge_existing_data","block_schema_id"},include_secrets=True,),)excepthttpx.HTTPStatusErrorase:ife.response.status_code==status.HTTP_404_NOT_FOUND:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raise
Source code in src/prefect/client/orchestration.py
131813191320132113221323132413251326132713281329
asyncdefread_block_type_by_slug(self,slug:str)->BlockType:""" Read a block type by its slug. """try:response=awaitself._client.get(f"/block_types/slug/{slug}")excepthttpx.HTTPStatusErrorase:ife.response.status_code==status.HTTP_404_NOT_FOUND:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raisereturnBlockType.parse_obj(response.json())
asyncdefread_block_schema_by_checksum(self,checksum:str,version:Optional[str]=None)->BlockSchema:""" Look up a block schema checksum """try:url=f"/block_schemas/checksum/{checksum}"ifversionisnotNone:url=f"{url}?version={version}"response=awaitself._client.get(url)excepthttpx.HTTPStatusErrorase:ife.response.status_code==status.HTTP_404_NOT_FOUND:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raisereturnBlockSchema.parse_obj(response.json())
asyncdefupdate_block_type(self,block_type_id:UUID,block_type:BlockTypeUpdate):""" Update a block document in the Prefect API. """try:awaitself._client.patch(f"/block_types/{block_type_id}",json=block_type.dict(json_compatible=True,exclude_unset=True,include=BlockTypeUpdate.updatable_fields(),include_secrets=True,),)excepthttpx.HTTPStatusErrorase:ife.response.status_code==status.HTTP_404_NOT_FOUND:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raise
Source code in src/prefect/client/orchestration.py
13891390139113921393139413951396139713981399
asyncdefread_block_types(self)->List[BlockType]:""" Read all block types Raises: httpx.RequestError: if the block types were not found Returns: List of BlockTypes. """response=awaitself._client.post("/block_types/filter",json={})returnpydantic.parse_obj_as(List[BlockType],response.json())
Source code in src/prefect/client/orchestration.py
14011402140314041405140614071408140914101411
asyncdefread_block_schemas(self)->List[BlockSchema]:""" Read all block schemas Raises: httpx.RequestError: if a valid block schema was not found Returns: A BlockSchema. """response=awaitself._client.post("/block_schemas/filter",json={})returnpydantic.parse_obj_as(List[BlockSchema],response.json())
asyncdefget_most_recent_block_schema_for_block_type(self,block_type_id:UUID,)->Optional[BlockSchema]:""" Fetches the most recent block schema for a specified block type ID. Args: block_type_id: The ID of the block type. Raises: httpx.RequestError: If the request fails for any reason. Returns: The most recent block schema or None. """try:response=awaitself._client.post("/block_schemas/filter",json={"block_schemas":{"block_type_id":{"any_":[str(block_type_id)]}},"limit":1,},)excepthttpx.HTTPStatusError:raisereturnBlockSchema.parse_obj(response.json()[0])ifresponse.json()elseNone
whether to include secret values
on the Block, corresponding to Pydantic's SecretStr and
SecretBytes fields. These fields are automatically obfuscated
by Pydantic, but users can additionally choose not to receive
their values from the API. Note that any business logic on the
Block may not work if this is False.
True
Raises:
Type
Description
RequestError
if the block document was not found for any reason
Returns:
Type
Description
A block document or None.
Source code in src/prefect/client/orchestration.py
asyncdefread_block_document(self,block_document_id:UUID,include_secrets:bool=True,):""" Read the block document with the specified ID. Args: block_document_id: the block document id include_secrets (bool): whether to include secret values on the Block, corresponding to Pydantic's `SecretStr` and `SecretBytes` fields. These fields are automatically obfuscated by Pydantic, but users can additionally choose not to receive their values from the API. Note that any business logic on the Block may not work if this is `False`. Raises: httpx.RequestError: if the block document was not found for any reason Returns: A block document or None. """assert(block_document_idisnotNone),"Unexpected ID on block document. Was it persisted?"try:response=awaitself._client.get(f"/block_documents/{block_document_id}",params=dict(include_secrets=include_secrets),)excepthttpx.HTTPStatusErrorase:ife.response.status_code==status.HTTP_404_NOT_FOUND:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raisereturnBlockDocument.parse_obj(response.json())
Read the block document with the specified name that corresponds to a
specific block type name.
Parameters:
Name
Type
Description
Default
name
str
The block document name.
required
block_type_slug
str
The block type slug.
required
include_secrets
bool
whether to include secret values
on the Block, corresponding to Pydantic's SecretStr and
SecretBytes fields. These fields are automatically obfuscated
by Pydantic, but users can additionally choose not to receive
their values from the API. Note that any business logic on the
Block may not work if this is False.
True
Raises:
Type
Description
RequestError
if the block document was not found for any reason
asyncdefread_block_document_by_name(self,name:str,block_type_slug:str,include_secrets:bool=True,)->BlockDocument:""" Read the block document with the specified name that corresponds to a specific block type name. Args: name: The block document name. block_type_slug: The block type slug. include_secrets (bool): whether to include secret values on the Block, corresponding to Pydantic's `SecretStr` and `SecretBytes` fields. These fields are automatically obfuscated by Pydantic, but users can additionally choose not to receive their values from the API. Note that any business logic on the Block may not work if this is `False`. Raises: httpx.RequestError: if the block document was not found for any reason Returns: A block document or None. """try:response=awaitself._client.get(f"/block_types/slug/{block_type_slug}/block_documents/name/{name}",params=dict(include_secrets=include_secrets),)excepthttpx.HTTPStatusErrorase:ife.response.status_code==status.HTTP_404_NOT_FOUND:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raisereturnBlockDocument.parse_obj(response.json())
whether to include secret values
on the Block, corresponding to Pydantic's SecretStr and
SecretBytes fields. These fields are automatically obfuscated
by Pydantic, but users can additionally choose not to receive
their values from the API. Note that any business logic on the
Block may not work if this is False.
True
Returns:
Type
Description
A list of block documents
Source code in src/prefect/client/orchestration.py
asyncdefread_block_documents(self,block_schema_type:Optional[str]=None,offset:Optional[int]=None,limit:Optional[int]=None,include_secrets:bool=True,):""" Read block documents Args: block_schema_type: an optional block schema type offset: an offset limit: the number of blocks to return include_secrets (bool): whether to include secret values on the Block, corresponding to Pydantic's `SecretStr` and `SecretBytes` fields. These fields are automatically obfuscated by Pydantic, but users can additionally choose not to receive their values from the API. Note that any business logic on the Block may not work if this is `False`. Returns: A list of block documents """response=awaitself._client.post("/block_documents/filter",json=dict(block_schema_type=block_schema_type,offset=offset,limit=limit,include_secrets=include_secrets,),)returnpydantic.parse_obj_as(List[BlockDocument],response.json())
asyncdefread_block_documents_by_type(self,block_type_slug:str,offset:Optional[int]=None,limit:Optional[int]=None,include_secrets:bool=True,)->List[BlockDocument]:"""Retrieve block documents by block type slug. Args: block_type_slug: The block type slug. offset: an offset limit: the number of blocks to return include_secrets: whether to include secret values Returns: A list of block documents """response=awaitself._client.get(f"/block_types/slug/{block_type_slug}/block_documents",params=dict(offset=offset,limit=limit,include_secrets=include_secrets,),)returnpydantic.parse_obj_as(List[BlockDocument],response.json())
an optional list of tags to apply to the deployment
None
storage_document_id
UUID
an reference to the storage block document
used for the deployed flow
None
infrastructure_document_id
UUID
an reference to the infrastructure block document
to use for this deployment
None
job_variables
Optional[Dict[str, Any]]
A dictionary of dot delimited infrastructure overrides that
will be applied at runtime; for example env.CONFIG_KEY=config_value or
namespace='prefect'. This argument was previously named infra_overrides.
Both arguments are supported for backwards compatibility.
None
Raises:
Type
Description
RequestError
if the deployment was not created for any reason
Returns:
Type
Description
UUID
the ID of the deployment in the backend
Source code in src/prefect/client/orchestration.py
asyncdefcreate_deployment(self,flow_id:UUID,name:str,version:str=None,schedule:SCHEDULE_TYPES=None,schedules:List[DeploymentScheduleCreate]=None,parameters:Optional[Dict[str,Any]]=None,description:str=None,work_queue_name:str=None,work_pool_name:str=None,tags:List[str]=None,storage_document_id:UUID=None,manifest_path:str=None,path:str=None,entrypoint:str=None,infrastructure_document_id:UUID=None,infra_overrides:Optional[Dict[str,Any]]=None,# for backwards compatparameter_openapi_schema:Optional[Dict[str,Any]]=None,is_schedule_active:Optional[bool]=None,paused:Optional[bool]=None,pull_steps:Optional[List[dict]]=None,enforce_parameter_schema:Optional[bool]=None,job_variables:Optional[Dict[str,Any]]=None,)->UUID:""" Create a deployment. Args: flow_id: the flow ID to create a deployment for name: the name of the deployment version: an optional version string for the deployment schedule: an optional schedule to apply to the deployment tags: an optional list of tags to apply to the deployment storage_document_id: an reference to the storage block document used for the deployed flow infrastructure_document_id: an reference to the infrastructure block document to use for this deployment job_variables: A dictionary of dot delimited infrastructure overrides that will be applied at runtime; for example `env.CONFIG_KEY=config_value` or `namespace='prefect'`. This argument was previously named `infra_overrides`. Both arguments are supported for backwards compatibility. Raises: httpx.RequestError: if the deployment was not created for any reason Returns: the ID of the deployment in the backend """jv=handle_deprecated_infra_overrides_parameter(job_variables,infra_overrides)deployment_create=DeploymentCreate(flow_id=flow_id,name=name,version=version,parameters=dict(parametersor{}),tags=list(tagsor[]),work_queue_name=work_queue_name,description=description,storage_document_id=storage_document_id,path=path,entrypoint=entrypoint,manifest_path=manifest_path,# for backwards compatinfrastructure_document_id=infrastructure_document_id,job_variables=jv,parameter_openapi_schema=parameter_openapi_schema,is_schedule_active=is_schedule_active,paused=paused,schedule=schedule,schedules=schedulesor[],pull_steps=pull_steps,enforce_parameter_schema=enforce_parameter_schema,)ifwork_pool_nameisnotNone:deployment_create.work_pool_name=work_pool_name# Exclude newer fields that are not set to avoid compatibility issuesexclude={fieldforfieldin["work_pool_name","work_queue_name"]iffieldnotindeployment_create.__fields_set__}ifdeployment_create.is_schedule_activeisNone:exclude.add("is_schedule_active")ifdeployment_create.pausedisNone:exclude.add("paused")ifdeployment_create.pull_stepsisNone:exclude.add("pull_steps")ifdeployment_create.enforce_parameter_schemaisNone:exclude.add("enforce_parameter_schema")json=deployment_create.dict(json_compatible=True,exclude=exclude)response=awaitself._client.post("/deployments/",json=json,)deployment_id=response.json().get("id")ifnotdeployment_id:raisehttpx.RequestError(f"Malformed response: {response}")returnUUID(deployment_id)
asyncdefread_deployment(self,deployment_id:UUID,)->DeploymentResponse:""" Query the Prefect API for a deployment by id. Args: deployment_id: the deployment ID of interest Returns: a [Deployment model][prefect.client.schemas.objects.Deployment] representation of the deployment """try:response=awaitself._client.get(f"/deployments/{deployment_id}")excepthttpx.HTTPStatusErrorase:ife.response.status_code==status.HTTP_404_NOT_FOUND:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raisereturnDeploymentResponse.parse_obj(response.json())
asyncdefread_deployment_by_name(self,name:str,)->DeploymentResponse:""" Query the Prefect API for a deployment by name. Args: name: A deployed flow's name: <FLOW_NAME>/<DEPLOYMENT_NAME> Raises: prefect.exceptions.ObjectNotFound: If request returns 404 httpx.RequestError: If request fails Returns: a Deployment model representation of the deployment """try:response=awaitself._client.get(f"/deployments/name/{name}")excepthttpx.HTTPStatusErrorase:ife.response.status_code==status.HTTP_404_NOT_FOUND:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raisereturnDeploymentResponse.parse_obj(response.json())
asyncdefread_deployments(self,*,flow_filter:FlowFilter=None,flow_run_filter:FlowRunFilter=None,task_run_filter:TaskRunFilter=None,deployment_filter:DeploymentFilter=None,work_pool_filter:WorkPoolFilter=None,work_queue_filter:WorkQueueFilter=None,limit:int=None,sort:DeploymentSort=None,offset:int=0,)->List[DeploymentResponse]:""" Query the Prefect API for deployments. Only deployments matching all the provided criteria will be returned. Args: flow_filter: filter criteria for flows flow_run_filter: filter criteria for flow runs task_run_filter: filter criteria for task runs deployment_filter: filter criteria for deployments work_pool_filter: filter criteria for work pools work_queue_filter: filter criteria for work pool queues limit: a limit for the deployment query offset: an offset for the deployment query Returns: a list of Deployment model representations of the deployments """body={"flows":flow_filter.dict(json_compatible=True)ifflow_filterelseNone,"flow_runs":(flow_run_filter.dict(json_compatible=True,exclude_unset=True)ifflow_run_filterelseNone),"task_runs":(task_run_filter.dict(json_compatible=True)iftask_run_filterelseNone),"deployments":(deployment_filter.dict(json_compatible=True)ifdeployment_filterelseNone),"work_pools":(work_pool_filter.dict(json_compatible=True)ifwork_pool_filterelseNone),"work_pool_queues":(work_queue_filter.dict(json_compatible=True)ifwork_queue_filterelseNone),"limit":limit,"offset":offset,"sort":sort,}response=awaitself._client.post("/deployments/filter",json=body)returnpydantic.parse_obj_as(List[DeploymentResponse],response.json())
asyncdefdelete_deployment(self,deployment_id:UUID,):""" Delete deployment by id. Args: deployment_id: The deployment id of interest. Raises: prefect.exceptions.ObjectNotFound: If request returns 404 httpx.RequestError: If requests fails """try:awaitself._client.delete(f"/deployments/{deployment_id}")excepthttpx.HTTPStatusErrorase:ife.response.status_code==404:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raise
asyncdefcreate_deployment_schedules(self,deployment_id:UUID,schedules:List[Tuple[SCHEDULE_TYPES,bool]],)->List[DeploymentSchedule]:""" Create deployment schedules. Args: deployment_id: the deployment ID schedules: a list of tuples containing the schedule to create and whether or not it should be active. Raises: httpx.RequestError: if the schedules were not created for any reason Returns: the list of schedules created in the backend """deployment_schedule_create=[DeploymentScheduleCreate(schedule=schedule[0],active=schedule[1])forscheduleinschedules]json=[deployment_schedule_create.dict(json_compatible=True)fordeployment_schedule_createindeployment_schedule_create]response=awaitself._client.post(f"/deployments/{deployment_id}/schedules",json=json)returnpydantic.parse_obj_as(List[DeploymentSchedule],response.json())
asyncdefread_deployment_schedules(self,deployment_id:UUID,)->List[DeploymentSchedule]:""" Query the Prefect API for a deployment's schedules. Args: deployment_id: the deployment ID Returns: a list of DeploymentSchedule model representations of the deployment schedules """try:response=awaitself._client.get(f"/deployments/{deployment_id}/schedules")excepthttpx.HTTPStatusErrorase:ife.response.status_code==status.HTTP_404_NOT_FOUND:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raisereturnpydantic.parse_obj_as(List[DeploymentSchedule],response.json())
asyncdefupdate_deployment_schedule(self,deployment_id:UUID,schedule_id:UUID,active:Optional[bool]=None,schedule:Optional[SCHEDULE_TYPES]=None,):""" Update a deployment schedule by ID. Args: deployment_id: the deployment ID schedule_id: the deployment schedule ID of interest active: whether or not the schedule should be active schedule: the cron, rrule, or interval schedule this deployment schedule should use """kwargs={}ifactiveisnotNone:kwargs["active"]=activeifscheduleisnotNone:kwargs["schedule"]=scheduledeployment_schedule_update=DeploymentScheduleUpdate(**kwargs)json=deployment_schedule_update.dict(json_compatible=True,exclude_unset=True)try:awaitself._client.patch(f"/deployments/{deployment_id}/schedules/{schedule_id}",json=json)excepthttpx.HTTPStatusErrorase:ife.response.status_code==status.HTTP_404_NOT_FOUND:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raise
asyncdefdelete_deployment_schedule(self,deployment_id:UUID,schedule_id:UUID,)->None:""" Delete a deployment schedule. Args: deployment_id: the deployment ID schedule_id: the ID of the deployment schedule to delete. Raises: httpx.RequestError: if the schedules were not deleted for any reason """try:awaitself._client.delete(f"/deployments/{deployment_id}/schedules/{schedule_id}")excepthttpx.HTTPStatusErrorase:ife.response.status_code==404:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raise
asyncdefread_flow_run(self,flow_run_id:UUID)->FlowRun:""" Query the Prefect API for a flow run by id. Args: flow_run_id: the flow run ID of interest Returns: a Flow Run model representation of the flow run """try:response=awaitself._client.get(f"/flow_runs/{flow_run_id}")excepthttpx.HTTPStatusErrorase:ife.response.status_code==404:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raisereturnFlowRun.parse_obj(response.json())
asyncdefresume_flow_run(self,flow_run_id:UUID,run_input:Optional[Dict]=None)->OrchestrationResult:""" Resumes a paused flow run. Args: flow_run_id: the flow run ID of interest run_input: the input to resume the flow run with Returns: an OrchestrationResult model representation of state orchestration output """try:response=awaitself._client.post(f"/flow_runs/{flow_run_id}/resume",json={"run_input":run_input})excepthttpx.HTTPStatusError:raisereturnOrchestrationResult.parse_obj(response.json())
asyncdefread_flow_runs(self,*,flow_filter:FlowFilter=None,flow_run_filter:FlowRunFilter=None,task_run_filter:TaskRunFilter=None,deployment_filter:DeploymentFilter=None,work_pool_filter:WorkPoolFilter=None,work_queue_filter:WorkQueueFilter=None,sort:FlowRunSort=None,limit:int=None,offset:int=0,)->List[FlowRun]:""" Query the Prefect API for flow runs. Only flow runs matching all criteria will be returned. Args: flow_filter: filter criteria for flows flow_run_filter: filter criteria for flow runs task_run_filter: filter criteria for task runs deployment_filter: filter criteria for deployments work_pool_filter: filter criteria for work pools work_queue_filter: filter criteria for work pool queues sort: sort criteria for the flow runs limit: limit for the flow run query offset: offset for the flow run query Returns: a list of Flow Run model representations of the flow runs """body={"flows":flow_filter.dict(json_compatible=True)ifflow_filterelseNone,"flow_runs":(flow_run_filter.dict(json_compatible=True,exclude_unset=True)ifflow_run_filterelseNone),"task_runs":(task_run_filter.dict(json_compatible=True)iftask_run_filterelseNone),"deployments":(deployment_filter.dict(json_compatible=True)ifdeployment_filterelseNone),"work_pools":(work_pool_filter.dict(json_compatible=True)ifwork_pool_filterelseNone),"work_pool_queues":(work_queue_filter.dict(json_compatible=True)ifwork_queue_filterelseNone),"sort":sort,"limit":limit,"offset":offset,}response=awaitself._client.post("/flow_runs/filter",json=body)returnpydantic.parse_obj_as(List[FlowRun],response.json())
asyncdefset_flow_run_state(self,flow_run_id:UUID,state:"prefect.states.State",force:bool=False,)->OrchestrationResult:""" Set the state of a flow run. Args: flow_run_id: the id of the flow run state: the state to set force: if True, disregard orchestration logic when setting the state, forcing the Prefect API to accept the state Returns: an OrchestrationResult model representation of state orchestration output """state_create=state.to_state_create()state_create.state_details.flow_run_id=flow_run_idstate_create.state_details.transition_id=uuid4()try:response=awaitself._client.post(f"/flow_runs/{flow_run_id}/set_state",json=dict(state=state_create.dict(json_compatible=True),force=force),)excepthttpx.HTTPStatusErrorase:ife.response.status_code==status.HTTP_404_NOT_FOUND:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raisereturnOrchestrationResult.parse_obj(response.json())
asyncdefread_flow_run_states(self,flow_run_id:UUID)->List[prefect.states.State]:""" Query for the states of a flow run Args: flow_run_id: the id of the flow run Returns: a list of State model representations of the flow run states """response=awaitself._client.get("/flow_run_states/",params=dict(flow_run_id=str(flow_run_id)))returnpydantic.parse_obj_as(List[prefect.states.State],response.json())
asyncdefcreate_task_run(self,task:"TaskObject[P, R]",flow_run_id:Optional[UUID],dynamic_key:str,name:Optional[str]=None,extra_tags:Optional[Iterable[str]]=None,state:Optional[prefect.states.State[R]]=None,task_inputs:Optional[Dict[str,List[Union[TaskRunResult,Parameter,Constant,]],]]=None,)->TaskRun:""" Create a task run Args: task: The Task to run flow_run_id: The flow run id with which to associate the task run dynamic_key: A key unique to this particular run of a Task within the flow name: An optional name for the task run extra_tags: an optional list of extra tags to apply to the task run in addition to `task.tags` state: The initial state for the run. If not provided, defaults to `Pending` for now. Should always be a `Scheduled` type. task_inputs: the set of inputs passed to the task Returns: The created task run. """tags=set(task.tags).union(extra_tagsor[])ifstateisNone:state=prefect.states.Pending()task_run_data=TaskRunCreate(name=name,flow_run_id=flow_run_id,task_key=task.task_key,dynamic_key=dynamic_key,tags=list(tags),task_version=task.version,empirical_policy=TaskRunPolicy(retries=task.retries,retry_delay=task.retry_delay_seconds,retry_jitter_factor=task.retry_jitter_factor,),state=state.to_state_create(),task_inputs=task_inputsor{},)response=awaitself._client.post("/task_runs/",json=task_run_data.dict(json_compatible=True))returnTaskRun.parse_obj(response.json())
Source code in src/prefect/client/orchestration.py
223122322233223422352236223722382239224022412242
asyncdefread_task_run(self,task_run_id:UUID)->TaskRun:""" Query the Prefect API for a task run by id. Args: task_run_id: the task run ID of interest Returns: a Task Run model representation of the task run """response=awaitself._client.get(f"/task_runs/{task_run_id}")returnTaskRun.parse_obj(response.json())
asyncdefread_task_runs(self,*,flow_filter:FlowFilter=None,flow_run_filter:FlowRunFilter=None,task_run_filter:TaskRunFilter=None,deployment_filter:DeploymentFilter=None,sort:TaskRunSort=None,limit:int=None,offset:int=0,)->List[TaskRun]:""" Query the Prefect API for task runs. Only task runs matching all criteria will be returned. Args: flow_filter: filter criteria for flows flow_run_filter: filter criteria for flow runs task_run_filter: filter criteria for task runs deployment_filter: filter criteria for deployments sort: sort criteria for the task runs limit: a limit for the task run query offset: an offset for the task run query Returns: a list of Task Run model representations of the task runs """body={"flows":flow_filter.dict(json_compatible=True)ifflow_filterelseNone,"flow_runs":(flow_run_filter.dict(json_compatible=True,exclude_unset=True)ifflow_run_filterelseNone),"task_runs":(task_run_filter.dict(json_compatible=True)iftask_run_filterelseNone),"deployments":(deployment_filter.dict(json_compatible=True)ifdeployment_filterelseNone),"sort":sort,"limit":limit,"offset":offset,}response=awaitself._client.post("/task_runs/filter",json=body)returnpydantic.parse_obj_as(List[TaskRun],response.json())
asyncdefdelete_task_run(self,task_run_id:UUID)->None:""" Delete a task run by id. Args: task_run_id: the task run ID of interest Raises: prefect.exceptions.ObjectNotFound: If request returns 404 httpx.RequestError: If requests fails """try:awaitself._client.delete(f"/task_runs/{task_run_id}")excepthttpx.HTTPStatusErrorase:ife.response.status_code==404:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raise
asyncdefset_task_run_state(self,task_run_id:UUID,state:prefect.states.State,force:bool=False,)->OrchestrationResult:""" Set the state of a task run. Args: task_run_id: the id of the task run state: the state to set force: if True, disregard orchestration logic when setting the state, forcing the Prefect API to accept the state Returns: an OrchestrationResult model representation of state orchestration output """state_create=state.to_state_create()state_create.state_details.task_run_id=task_run_idresponse=awaitself._client.post(f"/task_runs/{task_run_id}/set_state",json=dict(state=state_create.dict(json_compatible=True),force=force),)returnOrchestrationResult.parse_obj(response.json())
asyncdefread_task_run_states(self,task_run_id:UUID)->List[prefect.states.State]:""" Query for the states of a task run Args: task_run_id: the id of the task run Returns: a list of State model representations of the task run states """response=awaitself._client.get("/task_run_states/",params=dict(task_run_id=str(task_run_id)))returnpydantic.parse_obj_as(List[prefect.states.State],response.json())
An iterable of LogCreate objects or already json-compatible dicts
required
Source code in src/prefect/client/orchestration.py
235523562357235823592360236123622363236423652366
asyncdefcreate_logs(self,logs:Iterable[Union[LogCreate,dict]])->None:""" Create logs for a flow or task run Args: logs: An iterable of `LogCreate` objects or already json-compatible dicts """serialized_logs=[log.dict(json_compatible=True)ifisinstance(log,LogCreate)elselogforloginlogs]awaitself._client.post("/logs/",json=serialized_logs)
asyncdefcreate_flow_run_notification_policy(self,block_document_id:UUID,is_active:bool=True,tags:List[str]=None,state_names:List[str]=None,message_template:Optional[str]=None,)->UUID:""" Create a notification policy for flow runs Args: block_document_id: The block document UUID is_active: Whether the notification policy is active tags: List of flow tags state_names: List of state names message_template: Notification message template """iftagsisNone:tags=[]ifstate_namesisNone:state_names=[]policy=FlowRunNotificationPolicyCreate(block_document_id=block_document_id,is_active=is_active,tags=tags,state_names=state_names,message_template=message_template,)response=awaitself._client.post("/flow_run_notification_policies/",json=policy.dict(json_compatible=True),)policy_id=response.json().get("id")ifnotpolicy_id:raisehttpx.RequestError(f"Malformed response: {response}")returnUUID(policy_id)
asyncdefdelete_flow_run_notification_policy(self,id:UUID,)->None:""" Delete a flow run notification policy by id. Args: id: UUID of the flow run notification policy to delete. Raises: prefect.exceptions.ObjectNotFound: If request returns 404 httpx.RequestError: If requests fails """try:awaitself._client.delete(f"/flow_run_notification_policies/{id}")excepthttpx.HTTPStatusErrorase:ife.response.status_code==status.HTTP_404_NOT_FOUND:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raise
asyncdefupdate_flow_run_notification_policy(self,id:UUID,block_document_id:Optional[UUID]=None,is_active:Optional[bool]=None,tags:Optional[List[str]]=None,state_names:Optional[List[str]]=None,message_template:Optional[str]=None,)->None:""" Update a notification policy for flow runs Args: id: UUID of the notification policy block_document_id: The block document UUID is_active: Whether the notification policy is active tags: List of flow tags state_names: List of state names message_template: Notification message template Raises: prefect.exceptions.ObjectNotFound: If request returns 404 httpx.RequestError: If requests fails """params={}ifblock_document_idisnotNone:params["block_document_id"]=block_document_idifis_activeisnotNone:params["is_active"]=is_activeiftagsisnotNone:params["tags"]=tagsifstate_namesisnotNone:params["state_names"]=state_namesifmessage_templateisnotNone:params["message_template"]=message_templatepolicy=FlowRunNotificationPolicyUpdate(**params)try:awaitself._client.patch(f"/flow_run_notification_policies/{id}",json=policy.dict(json_compatible=True,exclude_unset=True),)excepthttpx.HTTPStatusErrorase:ife.response.status_code==status.HTTP_404_NOT_FOUND:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raise
asyncdefread_flow_run_notification_policies(self,flow_run_notification_policy_filter:FlowRunNotificationPolicyFilter,limit:Optional[int]=None,offset:int=0,)->List[FlowRunNotificationPolicy]:""" Query the Prefect API for flow run notification policies. Only policies matching all criteria will be returned. Args: flow_run_notification_policy_filter: filter criteria for notification policies limit: a limit for the notification policies query offset: an offset for the notification policies query Returns: a list of FlowRunNotificationPolicy model representations of the notification policies """body={"flow_run_notification_policy_filter":(flow_run_notification_policy_filter.dict(json_compatible=True)ifflow_run_notification_policy_filterelseNone),"limit":limit,"offset":offset,}response=awaitself._client.post("/flow_run_notification_policies/filter",json=body)returnpydantic.parse_obj_as(List[FlowRunNotificationPolicy],response.json())
asyncdefread_logs(self,log_filter:LogFilter=None,limit:int=None,offset:int=None,sort:LogSort=LogSort.TIMESTAMP_ASC,)->List[Log]:""" Read flow and task run logs. """body={"logs":log_filter.dict(json_compatible=True)iflog_filterelseNone,"limit":limit,"offset":offset,"sort":sort,}response=awaitself._client.post("/logs/filter",json=body)returnpydantic.parse_obj_as(List[Log],response.json())
asyncdefresolve_datadoc(self,datadoc:DataDocument)->Any:""" Recursively decode possibly nested data documents. "server" encoded documents will be retrieved from the server. Args: datadoc: The data document to resolve Returns: a decoded object, the innermost data """ifnotisinstance(datadoc,DataDocument):raiseTypeError(f"`resolve_datadoc` received invalid type {type(datadoc).__name__}")asyncdefresolve_inner(data):ifisinstance(data,bytes):try:data=DataDocument.parse_raw(data)exceptpydantic.ValidationError:returndataifisinstance(data,DataDocument):returnawaitresolve_inner(data.decode())returndatareturnawaitresolve_inner(datadoc)
asyncdefsend_worker_heartbeat(self,work_pool_name:str,worker_name:str,heartbeat_interval_seconds:Optional[float]=None,):""" Sends a worker heartbeat for a given work pool. Args: work_pool_name: The name of the work pool to heartbeat against. worker_name: The name of the worker sending the heartbeat. """awaitself._client.post(f"/work_pools/{work_pool_name}/workers/heartbeat",json={"name":worker_name,"heartbeat_interval_seconds":heartbeat_interval_seconds,},)
asyncdefread_workers_for_work_pool(self,work_pool_name:str,worker_filter:Optional[WorkerFilter]=None,offset:Optional[int]=None,limit:Optional[int]=None,)->List[Worker]:""" Reads workers for a given work pool. Args: work_pool_name: The name of the work pool for which to get member workers. worker_filter: Criteria by which to filter workers. limit: Limit for the worker query. offset: Limit for the worker query. """response=awaitself._client.post(f"/work_pools/{work_pool_name}/workers/filter",json={"worker_filter":(worker_filter.dict(json_compatible=True,exclude_unset=True)ifworker_filterelseNone),"offset":offset,"limit":limit,},)returnpydantic.parse_obj_as(List[Worker],response.json())
asyncdefread_work_pool(self,work_pool_name:str)->WorkPool:""" Reads information for a given work pool Args: work_pool_name: The name of the work pool to for which to get information. Returns: Information about the requested work pool. """try:response=awaitself._client.get(f"/work_pools/{work_pool_name}")returnpydantic.parse_obj_as(WorkPool,response.json())excepthttpx.HTTPStatusErrorase:ife.response.status_code==status.HTTP_404_NOT_FOUND:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raise
asyncdefread_work_pools(self,limit:Optional[int]=None,offset:int=0,work_pool_filter:Optional[WorkPoolFilter]=None,)->List[WorkPool]:""" Reads work pools. Args: limit: Limit for the work pool query. offset: Offset for the work pool query. work_pool_filter: Criteria by which to filter work pools. Returns: A list of work pools. """body={"limit":limit,"offset":offset,"work_pools":(work_pool_filter.dict(json_compatible=True)ifwork_pool_filterelseNone),}response=awaitself._client.post("/work_pools/filter",json=body)returnpydantic.parse_obj_as(List[WorkPool],response.json())
asyncdefcreate_work_pool(self,work_pool:WorkPoolCreate,)->WorkPool:""" Creates a work pool with the provided configuration. Args: work_pool: Desired configuration for the new work pool. Returns: Information about the newly created work pool. """try:response=awaitself._client.post("/work_pools/",json=work_pool.dict(json_compatible=True,exclude_unset=True),)excepthttpx.HTTPStatusErrorase:ife.response.status_code==status.HTTP_409_CONFLICT:raiseprefect.exceptions.ObjectAlreadyExists(http_exc=e)fromeelse:raisereturnpydantic.parse_obj_as(WorkPool,response.json())
asyncdefupdate_work_pool(self,work_pool_name:str,work_pool:WorkPoolUpdate,):""" Updates a work pool. Args: work_pool_name: Name of the work pool to update. work_pool: Fields to update in the work pool. """try:awaitself._client.patch(f"/work_pools/{work_pool_name}",json=work_pool.dict(json_compatible=True,exclude_unset=True),)excepthttpx.HTTPStatusErrorase:ife.response.status_code==status.HTTP_404_NOT_FOUND:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raise
asyncdefdelete_work_pool(self,work_pool_name:str,):""" Deletes a work pool. Args: work_pool_name: Name of the work pool to delete. """try:awaitself._client.delete(f"/work_pools/{work_pool_name}")excepthttpx.HTTPStatusErrorase:ife.response.status_code==status.HTTP_404_NOT_FOUND:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raise
asyncdefread_work_queues(self,work_pool_name:Optional[str]=None,work_queue_filter:Optional[WorkQueueFilter]=None,limit:Optional[int]=None,offset:Optional[int]=None,)->List[WorkQueue]:""" Retrieves queues for a work pool. Args: work_pool_name: Name of the work pool for which to get queues. work_queue_filter: Criteria by which to filter queues. limit: Limit for the queue query. offset: Limit for the queue query. Returns: List of queues for the specified work pool. """json={"work_queues":(work_queue_filter.dict(json_compatible=True,exclude_unset=True)ifwork_queue_filterelseNone),"limit":limit,"offset":offset,}ifwork_pool_name:try:response=awaitself._client.post(f"/work_pools/{work_pool_name}/queues/filter",json=json,)excepthttpx.HTTPStatusErrorase:ife.response.status_code==status.HTTP_404_NOT_FOUND:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raiseelse:response=awaitself._client.post("/work_queues/filter",json=json)returnpydantic.parse_obj_as(List[WorkQueue],response.json())
asyncdefget_scheduled_flow_runs_for_work_pool(self,work_pool_name:str,work_queue_names:Optional[List[str]]=None,scheduled_before:Optional[datetime.datetime]=None,)->List[WorkerFlowRunResponse]:""" Retrieves scheduled flow runs for the provided set of work pool queues. Args: work_pool_name: The name of the work pool that the work pool queues are associated with. work_queue_names: The names of the work pool queues from which to get scheduled flow runs. scheduled_before: Datetime used to filter returned flow runs. Flow runs scheduled for after the given datetime string will not be returned. Returns: A list of worker flow run responses containing information about the retrieved flow runs. """body:Dict[str,Any]={}ifwork_queue_namesisnotNone:body["work_queue_names"]=list(work_queue_names)ifscheduled_before:body["scheduled_before"]=str(scheduled_before)response=awaitself._client.post(f"/work_pools/{work_pool_name}/get_scheduled_flow_runs",json=body,)returnpydantic.parse_obj_as(List[WorkerFlowRunResponse],response.json())
asyncdefcreate_artifact(self,artifact:ArtifactCreate,)->Artifact:""" Creates an artifact with the provided configuration. Args: artifact: Desired configuration for the new artifact. Returns: Information about the newly created artifact. """response=awaitself._client.post("/artifacts/",json=artifact.dict(json_compatible=True,exclude_unset=True),)returnpydantic.parse_obj_as(Artifact,response.json())
Query the Prefect API for artifacts. Only artifacts matching all criteria will
be returned.
Args:
artifact_filter: filter criteria for artifacts
flow_run_filter: filter criteria for flow runs
task_run_filter: filter criteria for task runs
sort: sort criteria for the artifacts
limit: limit for the artifact query
offset: offset for the artifact query
Returns:
a list of Artifact model representations of the artifacts
Source code in src/prefect/client/orchestration.py
asyncdefread_artifacts(self,*,artifact_filter:ArtifactFilter=None,flow_run_filter:FlowRunFilter=None,task_run_filter:TaskRunFilter=None,sort:ArtifactSort=None,limit:int=None,offset:int=0,)->List[Artifact]:""" Query the Prefect API for artifacts. Only artifacts matching all criteria will be returned. Args: artifact_filter: filter criteria for artifacts flow_run_filter: filter criteria for flow runs task_run_filter: filter criteria for task runs sort: sort criteria for the artifacts limit: limit for the artifact query offset: offset for the artifact query Returns: a list of Artifact model representations of the artifacts """body={"artifacts":(artifact_filter.dict(json_compatible=True)ifartifact_filterelseNone),"flow_runs":(flow_run_filter.dict(json_compatible=True)ifflow_run_filterelseNone),"task_runs":(task_run_filter.dict(json_compatible=True)iftask_run_filterelseNone),"sort":sort,"limit":limit,"offset":offset,}response=awaitself._client.post("/artifacts/filter",json=body)returnpydantic.parse_obj_as(List[Artifact],response.json())
Query the Prefect API for artifacts. Only artifacts matching all criteria will
be returned.
Args:
artifact_filter: filter criteria for artifacts
flow_run_filter: filter criteria for flow runs
task_run_filter: filter criteria for task runs
sort: sort criteria for the artifacts
limit: limit for the artifact query
offset: offset for the artifact query
Returns:
a list of Artifact model representations of the artifacts
Source code in src/prefect/client/orchestration.py
asyncdefread_latest_artifacts(self,*,artifact_filter:ArtifactCollectionFilter=None,flow_run_filter:FlowRunFilter=None,task_run_filter:TaskRunFilter=None,sort:ArtifactCollectionSort=None,limit:int=None,offset:int=0,)->List[ArtifactCollection]:""" Query the Prefect API for artifacts. Only artifacts matching all criteria will be returned. Args: artifact_filter: filter criteria for artifacts flow_run_filter: filter criteria for flow runs task_run_filter: filter criteria for task runs sort: sort criteria for the artifacts limit: limit for the artifact query offset: offset for the artifact query Returns: a list of Artifact model representations of the artifacts """body={"artifacts":(artifact_filter.dict(json_compatible=True)ifartifact_filterelseNone),"flow_runs":(flow_run_filter.dict(json_compatible=True)ifflow_run_filterelseNone),"task_runs":(task_run_filter.dict(json_compatible=True)iftask_run_filterelseNone),"sort":sort,"limit":limit,"offset":offset,}response=awaitself._client.post("/artifacts/latest/filter",json=body)returnpydantic.parse_obj_as(List[ArtifactCollection],response.json())
asyncdefdelete_artifact(self,artifact_id:UUID)->None:""" Deletes an artifact with the provided id. Args: artifact_id: The id of the artifact to delete. """try:awaitself._client.delete(f"/artifacts/{artifact_id}")excepthttpx.HTTPStatusErrorase:ife.response.status_code==404:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raise
asyncdefcreate_variable(self,variable:VariableCreate)->Variable:""" Creates an variable with the provided configuration. Args: variable: Desired configuration for the new variable. Returns: Information about the newly created variable. """response=awaitself._client.post("/variables/",json=variable.dict(json_compatible=True,exclude_unset=True),)returnVariable(**response.json())
asyncdefupdate_variable(self,variable:VariableUpdate)->None:""" Updates a variable with the provided configuration. Args: variable: Desired configuration for the updated variable. Returns: Information about the updated variable. """awaitself._client.patch(f"/variables/name/{variable.name}",json=variable.dict(json_compatible=True,exclude_unset=True),)
Reads a variable by name. Returns None if no variable is found.
Source code in src/prefect/client/orchestration.py
2973297429752976297729782979298029812982
asyncdefread_variable_by_name(self,name:str)->Optional[Variable]:"""Reads a variable by name. Returns None if no variable is found."""try:response=awaitself._client.get(f"/variables/name/{name}")returnVariable(**response.json())excepthttpx.HTTPStatusErrorase:ife.response.status_code==status.HTTP_404_NOT_FOUND:returnNoneelse:raise
Source code in src/prefect/client/orchestration.py
298429852986298729882989299029912992
asyncdefdelete_variable_by_name(self,name:str):"""Deletes a variable by name."""try:awaitself._client.delete(f"/variables/name/{name}")excepthttpx.HTTPStatusErrorase:ife.response.status_code==404:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raise
Source code in src/prefect/client/orchestration.py
2994299529962997
asyncdefread_variables(self,limit:int=None)->List[Variable]:"""Reads all variables."""response=awaitself._client.post("/variables/filter",json={"limit":limit})returnpydantic.parse_obj_as(List[Variable],response.json())
asyncdefcreate_flow_run_input(self,flow_run_id:UUID,key:str,value:str,sender:Optional[str]=None):""" Creates a flow run input. Args: flow_run_id: The flow run id. key: The input key. value: The input value. sender: The sender of the input. """# Initialize the input to ensure that the key is valid.FlowRunInput(flow_run_id=flow_run_id,key=key,value=value)response=awaitself._client.post(f"/flow_runs/{flow_run_id}/input",json={"key":key,"value":value,"sender":sender},)response.raise_for_status()
Source code in src/prefect/client/orchestration.py
31233124312531263127312831293130313131323133
asyncdefread_flow_run_input(self,flow_run_id:UUID,key:str)->str:""" Reads a flow run input. Args: flow_run_id: The flow run id. key: The input key. """response=awaitself._client.get(f"/flow_runs/{flow_run_id}/input/{key}")response.raise_for_status()returnresponse.content.decode()
Source code in src/prefect/client/orchestration.py
3135313631373138313931403141314231433144
asyncdefdelete_flow_run_input(self,flow_run_id:UUID,key:str):""" Deletes a flow run input. Args: flow_run_id: The flow run id. key: The input key. """response=awaitself._client.delete(f"/flow_runs/{flow_run_id}/input/{key}")response.raise_for_status()
Source code in src/prefect/client/orchestration.py
31603161316231633164316531663167316831693170
asyncdefcreate_automation(self,automation:AutomationCore)->UUID:"""Creates an automation in Prefect Cloud."""ifnotself.server_type.supports_automations():self._raise_for_unsupported_automations()response=awaitself._client.post("/automations/",json=automation.dict(json_compatible=True),)returnUUID(response.json()["id"])
Source code in src/prefect/client/orchestration.py
317231733174317531763177317831793180
asyncdefupdate_automation(self,automation_id:UUID,automation:AutomationCore):"""Updates an automation in Prefect Cloud."""ifnotself.server_type.supports_automations():self._raise_for_unsupported_automations()response=awaitself._client.put(f"/automations/{automation_id}",json=automation.dict(json_compatible=True,exclude_unset=True),)response.raise_for_status
asyncdefread_automations_by_name(self,name:str)->List[Automation]:""" Query the Prefect API for an automation by name. Only automations matching the provided name will be returned. Args: name: the name of the automation to query Returns: a list of Automation model representations of the automations """ifnotself.server_type.supports_automations():self._raise_for_unsupported_automations()automation_filter=filters.AutomationFilter(name=dict(any_=[name]))response=awaitself._client.post("/automations/filter",json={"sort":sorting.AutomationSort.UPDATED_DESC,"automations":automation_filter.dict(json_compatible=True)ifautomation_filterelseNone,},)response.raise_for_status()returnpydantic.parse_obj_as(List[Automation],response.json())
classSyncPrefectClient:""" A synchronous client for interacting with the [Prefect REST API](/api-ref/rest-api/). Args: api: the REST API URL or FastAPI application to connect to api_key: An optional API key for authentication. api_version: The API version this client is compatible with. httpx_settings: An optional dictionary of settings to pass to the underlying `httpx.Client` Examples: Say hello to a Prefect REST API <div class="terminal"> ``` >>> with get_client(sync_client=True) as client: >>> response = client.hello() >>> >>> print(response.json()) 👋 ``` </div> """def__init__(self,api:Union[str,ASGIApp],*,api_key:str=None,api_version:str=None,httpx_settings:Optional[Dict[str,Any]]=None,)->None:httpx_settings=httpx_settings.copy()ifhttpx_settingselse{}httpx_settings.setdefault("headers",{})ifPREFECT_API_TLS_INSECURE_SKIP_VERIFY:httpx_settings.setdefault("verify",False)else:cert_file=PREFECT_API_SSL_CERT_FILE.value()ifnotcert_file:cert_file=certifi.where()httpx_settings.setdefault("verify",cert_file)ifapi_versionisNone:api_version=SERVER_API_VERSIONhttpx_settings["headers"].setdefault("X-PREFECT-API-VERSION",api_version)ifapi_key:httpx_settings["headers"].setdefault("Authorization",f"Bearer {api_key}")# Context managementself._ephemeral_app:Optional[ASGIApp]=Noneself.manage_lifespan=Trueself.server_type:ServerTypeself._closed=Falseself._started=False# Connect to an external applicationifisinstance(api,str):ifhttpx_settings.get("app"):raiseValueError("Invalid httpx settings: `app` cannot be set when providing an ""api url. `app` is only for use with ephemeral instances. Provide ""it as the `api` parameter instead.")httpx_settings.setdefault("base_url",api)# See https://www.python-httpx.org/advanced/#pool-limit-configurationhttpx_settings.setdefault("limits",httpx.Limits(# We see instability when allowing the client to open many connections at once.# Limiting concurrency results in more stable performance.max_connections=16,max_keepalive_connections=8,# The Prefect Cloud LB will keep connections alive for 30s.# Only allow the client to keep connections alive for 25s.keepalive_expiry=25,),)# See https://www.python-httpx.org/http2/# Enabling HTTP/2 support on the client does not necessarily mean that your requests# and responses will be transported over HTTP/2, since both the client and the server# need to support HTTP/2. If you connect to a server that only supports HTTP/1.1 the# client will use a standard HTTP/1.1 connection instead.httpx_settings.setdefault("http2",PREFECT_API_ENABLE_HTTP2.value())self.server_type=(ServerType.CLOUDifapi.startswith(PREFECT_CLOUD_API_URL.value())elseServerType.SERVER)# Connect to an in-process applicationelifisinstance(api,ASGIApp):self._ephemeral_app=apiself.server_type=ServerType.EPHEMERALelse:raiseTypeError(f"Unexpected type {type(api).__name__!r} for argument `api`. Expected"" 'str' or 'ASGIApp/FastAPI'")# See https://www.python-httpx.org/advanced/#timeout-configurationhttpx_settings.setdefault("timeout",httpx.Timeout(connect=PREFECT_API_REQUEST_TIMEOUT.value(),read=PREFECT_API_REQUEST_TIMEOUT.value(),write=PREFECT_API_REQUEST_TIMEOUT.value(),pool=PREFECT_API_REQUEST_TIMEOUT.value(),),)ifnotPREFECT_UNIT_TEST_MODE:httpx_settings.setdefault("follow_redirects",True)enable_csrf_support=(self.server_type!=ServerType.CLOUDandPREFECT_CLIENT_CSRF_SUPPORT_ENABLED.value())ifself.server_type==ServerType.EPHEMERAL:self._client=PrefectHttpxSyncEphemeralClient(api,base_url="http://ephemeral-prefect/api")else:self._client=PrefectHttpxSyncClient(**httpx_settings,enable_csrf_support=enable_csrf_support)# See https://www.python-httpx.org/advanced/#custom-transports## If we're using an HTTP/S client (not the ephemeral client), adjust the# transport to add retries _after_ it is instantiated. If we alter the transport# before instantiation, the transport will not be aware of proxies unless we# reproduce all of the logic to make it so.## Only alter the transport to set our default of 3 retries, don't modify any# transport a user may have provided via httpx_settings.## Making liberal use of getattr and isinstance checks here to avoid any# surprises if the internals of httpx or httpcore change on usifisinstance(api,str)andnothttpx_settings.get("transport"):transport_for_url=getattr(self._client,"_transport_for_url",None)ifcallable(transport_for_url):server_transport=transport_for_url(httpx.URL(api))ifisinstance(server_transport,httpx.HTTPTransport):pool=getattr(server_transport,"_pool",None)ifisinstance(pool,httpcore.ConnectionPool):pool._retries=3self.logger=get_logger("client")@propertydefapi_url(self)->httpx.URL:""" Get the base URL for the API. """returnself._client.base_url# Context management ----------------------------------------------------------------def__enter__(self)->"SyncPrefectClient":""" Start the client. If the client is already started, this will raise an exception. If the client is already closed, this will raise an exception. Use a new client instance instead. """ifself._closed:# httpx.Client does not allow reuse so we will not either.raiseRuntimeError("The client cannot be started again after closing. ""Retrieve a new client with `get_client()` instead.")ifself._started:# httpx.Client does not allow reentrancy so we will not either.raiseRuntimeError("The client cannot be started more than once.")self._client.__enter__()self._started=Truereturnselfdef__exit__(self,*exc_info)->None:""" Shutdown the client. """self._closed=Trueself._client.__exit__(*exc_info)# API methods ----------------------------------------------------------------------defapi_healthcheck(self)->Optional[Exception]:""" Attempts to connect to the API and returns the encountered exception if not successful. If successful, returns `None`. """try:self._client.get("/health")returnNoneexceptExceptionasexc:returnexcdefhello(self)->httpx.Response:""" Send a GET request to /hello for testing purposes. """returnself._client.get("/hello")defcreate_flow(self,flow:"FlowObject")->UUID:""" Create a flow in the Prefect API. Args: flow: a [Flow][prefect.flows.Flow] object Raises: httpx.RequestError: if a flow was not created for any reason Returns: the ID of the flow in the backend """returnself.create_flow_from_name(flow.name)defcreate_flow_from_name(self,flow_name:str)->UUID:""" Create a flow in the Prefect API. Args: flow_name: the name of the new flow Raises: httpx.RequestError: if a flow was not created for any reason Returns: the ID of the flow in the backend """flow_data=FlowCreate(name=flow_name)response=self._client.post("/flows/",json=flow_data.dict(json_compatible=True))flow_id=response.json().get("id")ifnotflow_id:raisehttpx.RequestError(f"Malformed response: {response}")# Return the id of the created flowreturnUUID(flow_id)defcreate_flow_run(self,flow:"FlowObject",name:Optional[str]=None,parameters:Optional[Dict[str,Any]]=None,context:Optional[Dict[str,Any]]=None,tags:Optional[Iterable[str]]=None,parent_task_run_id:Optional[UUID]=None,state:Optional["prefect.states.State"]=None,)->FlowRun:""" Create a flow run for a flow. Args: flow: The flow model to create the flow run for name: An optional name for the flow run parameters: Parameter overrides for this flow run. context: Optional run context data tags: a list of tags to apply to this flow run parent_task_run_id: if a subflow run is being created, the placeholder task run identifier in the parent flow state: The initial state for the run. If not provided, defaults to `Scheduled` for now. Should always be a `Scheduled` type. Raises: httpx.RequestError: if the Prefect API does not successfully create a run for any reason Returns: The flow run model """parameters=parametersor{}context=contextor{}ifstateisNone:state=prefect.states.Pending()# Retrieve the flow idflow_id=self.create_flow(flow)flow_run_create=FlowRunCreate(flow_id=flow_id,flow_version=flow.version,name=name,parameters=parameters,context=context,tags=list(tagsor[]),parent_task_run_id=parent_task_run_id,state=state.to_state_create(),empirical_policy=FlowRunPolicy(retries=flow.retries,retry_delay=flow.retry_delay_seconds,),)flow_run_create_json=flow_run_create.dict(json_compatible=True)response=self._client.post("/flow_runs/",json=flow_run_create_json)flow_run=FlowRun.parse_obj(response.json())# Restore the parameters to the local objects to retain expectations about# Python objectsflow_run.parameters=parametersreturnflow_rundefread_flow_run(self,flow_run_id:UUID)->FlowRun:""" Query the Prefect API for a flow run by id. Args: flow_run_id: the flow run ID of interest Returns: a Flow Run model representation of the flow run """try:response=self._client.get(f"/flow_runs/{flow_run_id}")excepthttpx.HTTPStatusErrorase:ife.response.status_code==404:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raisereturnFlowRun.parse_obj(response.json())defread_flow_runs(self,*,flow_filter:FlowFilter=None,flow_run_filter:FlowRunFilter=None,task_run_filter:TaskRunFilter=None,deployment_filter:DeploymentFilter=None,work_pool_filter:WorkPoolFilter=None,work_queue_filter:WorkQueueFilter=None,sort:FlowRunSort=None,limit:int=None,offset:int=0,)->List[FlowRun]:""" Query the Prefect API for flow runs. Only flow runs matching all criteria will be returned. Args: flow_filter: filter criteria for flows flow_run_filter: filter criteria for flow runs task_run_filter: filter criteria for task runs deployment_filter: filter criteria for deployments work_pool_filter: filter criteria for work pools work_queue_filter: filter criteria for work pool queues sort: sort criteria for the flow runs limit: limit for the flow run query offset: offset for the flow run query Returns: a list of Flow Run model representations of the flow runs """body={"flows":flow_filter.dict(json_compatible=True)ifflow_filterelseNone,"flow_runs":(flow_run_filter.dict(json_compatible=True,exclude_unset=True)ifflow_run_filterelseNone),"task_runs":(task_run_filter.dict(json_compatible=True)iftask_run_filterelseNone),"deployments":(deployment_filter.dict(json_compatible=True)ifdeployment_filterelseNone),"work_pools":(work_pool_filter.dict(json_compatible=True)ifwork_pool_filterelseNone),"work_pool_queues":(work_queue_filter.dict(json_compatible=True)ifwork_queue_filterelseNone),"sort":sort,"limit":limit,"offset":offset,}response=self._client.post("/flow_runs/filter",json=body)returnpydantic.parse_obj_as(List[FlowRun],response.json())defset_flow_run_state(self,flow_run_id:UUID,state:"prefect.states.State",force:bool=False,)->OrchestrationResult:""" Set the state of a flow run. Args: flow_run_id: the id of the flow run state: the state to set force: if True, disregard orchestration logic when setting the state, forcing the Prefect API to accept the state Returns: an OrchestrationResult model representation of state orchestration output """state_create=state.to_state_create()state_create.state_details.flow_run_id=flow_run_idstate_create.state_details.transition_id=uuid4()try:response=self._client.post(f"/flow_runs/{flow_run_id}/set_state",json=dict(state=state_create.dict(json_compatible=True),force=force),)excepthttpx.HTTPStatusErrorase:ife.response.status_code==status.HTTP_404_NOT_FOUND:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raisereturnOrchestrationResult.parse_obj(response.json())defcreate_task_run(self,task:"TaskObject[P, R]",flow_run_id:Optional[UUID],dynamic_key:str,name:Optional[str]=None,extra_tags:Optional[Iterable[str]]=None,state:Optional[prefect.states.State[R]]=None,task_inputs:Optional[Dict[str,List[Union[TaskRunResult,Parameter,Constant,]],]]=None,)->TaskRun:""" Create a task run Args: task: The Task to run flow_run_id: The flow run id with which to associate the task run dynamic_key: A key unique to this particular run of a Task within the flow name: An optional name for the task run extra_tags: an optional list of extra tags to apply to the task run in addition to `task.tags` state: The initial state for the run. If not provided, defaults to `Pending` for now. Should always be a `Scheduled` type. task_inputs: the set of inputs passed to the task Returns: The created task run. """tags=set(task.tags).union(extra_tagsor[])ifstateisNone:state=prefect.states.Pending()task_run_data=TaskRunCreate(name=name,flow_run_id=flow_run_id,task_key=task.task_key,dynamic_key=dynamic_key,tags=list(tags),task_version=task.version,empirical_policy=TaskRunPolicy(retries=task.retries,retry_delay=task.retry_delay_seconds,retry_jitter_factor=task.retry_jitter_factor,),state=state.to_state_create(),task_inputs=task_inputsor{},)response=self._client.post("/task_runs/",json=task_run_data.dict(json_compatible=True))returnTaskRun.parse_obj(response.json())defread_task_run(self,task_run_id:UUID)->TaskRun:""" Query the Prefect API for a task run by id. Args: task_run_id: the task run ID of interest Returns: a Task Run model representation of the task run """response=self._client.get(f"/task_runs/{task_run_id}")returnTaskRun.parse_obj(response.json())defread_task_runs(self,*,flow_filter:FlowFilter=None,flow_run_filter:FlowRunFilter=None,task_run_filter:TaskRunFilter=None,deployment_filter:DeploymentFilter=None,sort:TaskRunSort=None,limit:int=None,offset:int=0,)->List[TaskRun]:""" Query the Prefect API for task runs. Only task runs matching all criteria will be returned. Args: flow_filter: filter criteria for flows flow_run_filter: filter criteria for flow runs task_run_filter: filter criteria for task runs deployment_filter: filter criteria for deployments sort: sort criteria for the task runs limit: a limit for the task run query offset: an offset for the task run query Returns: a list of Task Run model representations of the task runs """body={"flows":flow_filter.dict(json_compatible=True)ifflow_filterelseNone,"flow_runs":(flow_run_filter.dict(json_compatible=True,exclude_unset=True)ifflow_run_filterelseNone),"task_runs":(task_run_filter.dict(json_compatible=True)iftask_run_filterelseNone),"deployments":(deployment_filter.dict(json_compatible=True)ifdeployment_filterelseNone),"sort":sort,"limit":limit,"offset":offset,}response=self._client.post("/task_runs/filter",json=body)returnpydantic.parse_obj_as(List[TaskRun],response.json())defset_task_run_state(self,task_run_id:UUID,state:prefect.states.State,force:bool=False,)->OrchestrationResult:""" Set the state of a task run. Args: task_run_id: the id of the task run state: the state to set force: if True, disregard orchestration logic when setting the state, forcing the Prefect API to accept the state Returns: an OrchestrationResult model representation of state orchestration output """state_create=state.to_state_create()state_create.state_details.task_run_id=task_run_idresponse=self._client.post(f"/task_runs/{task_run_id}/set_state",json=dict(state=state_create.dict(json_compatible=True),force=force),)returnOrchestrationResult.parse_obj(response.json())defread_task_run_states(self,task_run_id:UUID)->List[prefect.states.State]:""" Query for the states of a task run Args: task_run_id: the id of the task run Returns: a list of State model representations of the task run states """response=self._client.get("/task_run_states/",params=dict(task_run_id=str(task_run_id)))returnpydantic.parse_obj_as(List[prefect.states.State],response.json())
Attempts to connect to the API and returns the encountered exception if not
successful.
If successful, returns None.
Source code in src/prefect/client/orchestration.py
356735683569357035713572357335743575357635773578
defapi_healthcheck(self)->Optional[Exception]:""" Attempts to connect to the API and returns the encountered exception if not successful. If successful, returns `None`. """try:self._client.get("/health")returnNoneexceptExceptionasexc:returnexc
defcreate_flow(self,flow:"FlowObject")->UUID:""" Create a flow in the Prefect API. Args: flow: a [Flow][prefect.flows.Flow] object Raises: httpx.RequestError: if a flow was not created for any reason Returns: the ID of the flow in the backend """returnself.create_flow_from_name(flow.name)
defcreate_flow_from_name(self,flow_name:str)->UUID:""" Create a flow in the Prefect API. Args: flow_name: the name of the new flow Raises: httpx.RequestError: if a flow was not created for any reason Returns: the ID of the flow in the backend """flow_data=FlowCreate(name=flow_name)response=self._client.post("/flows/",json=flow_data.dict(json_compatible=True))flow_id=response.json().get("id")ifnotflow_id:raisehttpx.RequestError(f"Malformed response: {response}")# Return the id of the created flowreturnUUID(flow_id)
defcreate_flow_run(self,flow:"FlowObject",name:Optional[str]=None,parameters:Optional[Dict[str,Any]]=None,context:Optional[Dict[str,Any]]=None,tags:Optional[Iterable[str]]=None,parent_task_run_id:Optional[UUID]=None,state:Optional["prefect.states.State"]=None,)->FlowRun:""" Create a flow run for a flow. Args: flow: The flow model to create the flow run for name: An optional name for the flow run parameters: Parameter overrides for this flow run. context: Optional run context data tags: a list of tags to apply to this flow run parent_task_run_id: if a subflow run is being created, the placeholder task run identifier in the parent flow state: The initial state for the run. If not provided, defaults to `Scheduled` for now. Should always be a `Scheduled` type. Raises: httpx.RequestError: if the Prefect API does not successfully create a run for any reason Returns: The flow run model """parameters=parametersor{}context=contextor{}ifstateisNone:state=prefect.states.Pending()# Retrieve the flow idflow_id=self.create_flow(flow)flow_run_create=FlowRunCreate(flow_id=flow_id,flow_version=flow.version,name=name,parameters=parameters,context=context,tags=list(tagsor[]),parent_task_run_id=parent_task_run_id,state=state.to_state_create(),empirical_policy=FlowRunPolicy(retries=flow.retries,retry_delay=flow.retry_delay_seconds,),)flow_run_create_json=flow_run_create.dict(json_compatible=True)response=self._client.post("/flow_runs/",json=flow_run_create_json)flow_run=FlowRun.parse_obj(response.json())# Restore the parameters to the local objects to retain expectations about# Python objectsflow_run.parameters=parametersreturnflow_run
defread_flow_run(self,flow_run_id:UUID)->FlowRun:""" Query the Prefect API for a flow run by id. Args: flow_run_id: the flow run ID of interest Returns: a Flow Run model representation of the flow run """try:response=self._client.get(f"/flow_runs/{flow_run_id}")excepthttpx.HTTPStatusErrorase:ife.response.status_code==404:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raisereturnFlowRun.parse_obj(response.json())
defread_flow_runs(self,*,flow_filter:FlowFilter=None,flow_run_filter:FlowRunFilter=None,task_run_filter:TaskRunFilter=None,deployment_filter:DeploymentFilter=None,work_pool_filter:WorkPoolFilter=None,work_queue_filter:WorkQueueFilter=None,sort:FlowRunSort=None,limit:int=None,offset:int=0,)->List[FlowRun]:""" Query the Prefect API for flow runs. Only flow runs matching all criteria will be returned. Args: flow_filter: filter criteria for flows flow_run_filter: filter criteria for flow runs task_run_filter: filter criteria for task runs deployment_filter: filter criteria for deployments work_pool_filter: filter criteria for work pools work_queue_filter: filter criteria for work pool queues sort: sort criteria for the flow runs limit: limit for the flow run query offset: offset for the flow run query Returns: a list of Flow Run model representations of the flow runs """body={"flows":flow_filter.dict(json_compatible=True)ifflow_filterelseNone,"flow_runs":(flow_run_filter.dict(json_compatible=True,exclude_unset=True)ifflow_run_filterelseNone),"task_runs":(task_run_filter.dict(json_compatible=True)iftask_run_filterelseNone),"deployments":(deployment_filter.dict(json_compatible=True)ifdeployment_filterelseNone),"work_pools":(work_pool_filter.dict(json_compatible=True)ifwork_pool_filterelseNone),"work_pool_queues":(work_queue_filter.dict(json_compatible=True)ifwork_queue_filterelseNone),"sort":sort,"limit":limit,"offset":offset,}response=self._client.post("/flow_runs/filter",json=body)returnpydantic.parse_obj_as(List[FlowRun],response.json())
defset_flow_run_state(self,flow_run_id:UUID,state:"prefect.states.State",force:bool=False,)->OrchestrationResult:""" Set the state of a flow run. Args: flow_run_id: the id of the flow run state: the state to set force: if True, disregard orchestration logic when setting the state, forcing the Prefect API to accept the state Returns: an OrchestrationResult model representation of state orchestration output """state_create=state.to_state_create()state_create.state_details.flow_run_id=flow_run_idstate_create.state_details.transition_id=uuid4()try:response=self._client.post(f"/flow_runs/{flow_run_id}/set_state",json=dict(state=state_create.dict(json_compatible=True),force=force),)excepthttpx.HTTPStatusErrorase:ife.response.status_code==status.HTTP_404_NOT_FOUND:raiseprefect.exceptions.ObjectNotFound(http_exc=e)fromeelse:raisereturnOrchestrationResult.parse_obj(response.json())
defcreate_task_run(self,task:"TaskObject[P, R]",flow_run_id:Optional[UUID],dynamic_key:str,name:Optional[str]=None,extra_tags:Optional[Iterable[str]]=None,state:Optional[prefect.states.State[R]]=None,task_inputs:Optional[Dict[str,List[Union[TaskRunResult,Parameter,Constant,]],]]=None,)->TaskRun:""" Create a task run Args: task: The Task to run flow_run_id: The flow run id with which to associate the task run dynamic_key: A key unique to this particular run of a Task within the flow name: An optional name for the task run extra_tags: an optional list of extra tags to apply to the task run in addition to `task.tags` state: The initial state for the run. If not provided, defaults to `Pending` for now. Should always be a `Scheduled` type. task_inputs: the set of inputs passed to the task Returns: The created task run. """tags=set(task.tags).union(extra_tagsor[])ifstateisNone:state=prefect.states.Pending()task_run_data=TaskRunCreate(name=name,flow_run_id=flow_run_id,task_key=task.task_key,dynamic_key=dynamic_key,tags=list(tags),task_version=task.version,empirical_policy=TaskRunPolicy(retries=task.retries,retry_delay=task.retry_delay_seconds,retry_jitter_factor=task.retry_jitter_factor,),state=state.to_state_create(),task_inputs=task_inputsor{},)response=self._client.post("/task_runs/",json=task_run_data.dict(json_compatible=True))returnTaskRun.parse_obj(response.json())
Source code in src/prefect/client/orchestration.py
387238733874387538763877387838793880388138823883
defread_task_run(self,task_run_id:UUID)->TaskRun:""" Query the Prefect API for a task run by id. Args: task_run_id: the task run ID of interest Returns: a Task Run model representation of the task run """response=self._client.get(f"/task_runs/{task_run_id}")returnTaskRun.parse_obj(response.json())
defread_task_runs(self,*,flow_filter:FlowFilter=None,flow_run_filter:FlowRunFilter=None,task_run_filter:TaskRunFilter=None,deployment_filter:DeploymentFilter=None,sort:TaskRunSort=None,limit:int=None,offset:int=0,)->List[TaskRun]:""" Query the Prefect API for task runs. Only task runs matching all criteria will be returned. Args: flow_filter: filter criteria for flows flow_run_filter: filter criteria for flow runs task_run_filter: filter criteria for task runs deployment_filter: filter criteria for deployments sort: sort criteria for the task runs limit: a limit for the task run query offset: an offset for the task run query Returns: a list of Task Run model representations of the task runs """body={"flows":flow_filter.dict(json_compatible=True)ifflow_filterelseNone,"flow_runs":(flow_run_filter.dict(json_compatible=True,exclude_unset=True)ifflow_run_filterelseNone),"task_runs":(task_run_filter.dict(json_compatible=True)iftask_run_filterelseNone),"deployments":(deployment_filter.dict(json_compatible=True)ifdeployment_filterelseNone),"sort":sort,"limit":limit,"offset":offset,}response=self._client.post("/task_runs/filter",json=body)returnpydantic.parse_obj_as(List[TaskRun],response.json())
defset_task_run_state(self,task_run_id:UUID,state:prefect.states.State,force:bool=False,)->OrchestrationResult:""" Set the state of a task run. Args: task_run_id: the id of the task run state: the state to set force: if True, disregard orchestration logic when setting the state, forcing the Prefect API to accept the state Returns: an OrchestrationResult model representation of state orchestration output """state_create=state.to_state_create()state_create.state_details.task_run_id=task_run_idresponse=self._client.post(f"/task_runs/{task_run_id}/set_state",json=dict(state=state_create.dict(json_compatible=True),force=force),)returnOrchestrationResult.parse_obj(response.json())
defread_task_run_states(self,task_run_id:UUID)->List[prefect.states.State]:""" Query for the states of a task run Args: task_run_id: the id of the task run Returns: a list of State model representations of the task run states """response=self._client.get("/task_run_states/",params=dict(task_run_id=str(task_run_id)))returnpydantic.parse_obj_as(List[prefect.states.State],response.json())
defget_client(httpx_settings:Optional[Dict[str,Any]]=None,sync_client:bool=False)->Union["PrefectClient","SyncPrefectClient"]:""" Retrieve a HTTP client for communicating with the Prefect REST API. The client must be context managed; for example: ```python async with get_client() as client: await client.hello() ``` To return a synchronous client, pass sync_client=True: ```python with get_client(sync_client=True) as client: client.hello() ``` """ctx=prefect.context.get_settings_context()api=PREFECT_API_URL.value()ifnotapi:# create an ephemeral API if none was providedfromprefect.server.api.serverimportcreate_appapi=create_app(ctx.settings,ephemeral=True)ifsync_client:returnSyncPrefectClient(api,api_key=PREFECT_API_KEY.value(),httpx_settings=httpx_settings,)else:returnPrefectClient(api,api_key=PREFECT_API_KEY.value(),httpx_settings=httpx_settings,)