Yields a temporary asynchronous dask client; this is useful
for parallelizing operations on dask collections,
such as a dask.DataFrame or dask.Bag.
Without invoking this, workers do not automatically get a client to connect
to the full cluster. Therefore, it will attempt perform work within the
worker itself serially, and potentially overwhelming the single worker.
Parameters:
Name
Type
Description
Default
timeout
Optional[Union[int, float, str, timedelta]]
Timeout after which to error out; has no effect in
flow run contexts because the client has already started;
Defaults to the distributed.comm.timeouts.connect
configuration value.
None
client_kwargs
Dict[str, Any]
Additional keyword arguments to pass to
distributed.Client, and overwrites inherited keyword arguments
from the task runner, if any.
{}
Yields:
Type
Description
AsyncGenerator[Client, None]
A temporary asynchronous dask client.
Examples:
Use get_async_dask_client to distribute work across workers.
@asynccontextmanagerasyncdefget_async_dask_client(timeout:Optional[Union[int,float,str,timedelta]]=None,**client_kwargs:Dict[str,Any],)->AsyncGenerator[Client,None]:""" Yields a temporary asynchronous dask client; this is useful for parallelizing operations on dask collections, such as a `dask.DataFrame` or `dask.Bag`. Without invoking this, workers do not automatically get a client to connect to the full cluster. Therefore, it will attempt perform work within the worker itself serially, and potentially overwhelming the single worker. Args: timeout: Timeout after which to error out; has no effect in flow run contexts because the client has already started; Defaults to the `distributed.comm.timeouts.connect` configuration value. client_kwargs: Additional keyword arguments to pass to `distributed.Client`, and overwrites inherited keyword arguments from the task runner, if any. Yields: A temporary asynchronous dask client. Examples: Use `get_async_dask_client` to distribute work across workers. ```python import dask from prefect import flow, task from prefect_dask import DaskTaskRunner, get_async_dask_client @task async def compute_task(): async with get_async_dask_client(timeout="120s") as client: df = dask.datasets.timeseries("2000", "2001", partition_freq="4w") summary_df = await client.compute(df.describe()) return summary_df @flow(task_runner=DaskTaskRunner()) async def dask_flow(): prefect_future = await compute_task.submit() return await prefect_future.result() asyncio.run(dask_flow()) ``` """client_kwargs=_generate_client_kwargs(async_client=True,timeout=timeout,**client_kwargs)asyncwithClient(**client_kwargs)asclient:yieldclient
Yields a temporary synchronous dask client; this is useful
for parallelizing operations on dask collections,
such as a dask.DataFrame or dask.Bag.
Without invoking this, workers do not automatically get a client to connect
to the full cluster. Therefore, it will attempt perform work within the
worker itself serially, and potentially overwhelming the single worker.
When in an async context, we recommend using get_async_dask_client instead.
Parameters:
Name
Type
Description
Default
timeout
Optional[Union[int, float, str, timedelta]]
Timeout after which to error out; has no effect in
flow run contexts because the client has already started;
Defaults to the distributed.comm.timeouts.connect
configuration value.
None
client_kwargs
Dict[str, Any]
Additional keyword arguments to pass to
distributed.Client, and overwrites inherited keyword arguments
from the task runner, if any.
{}
Yields:
Type
Description
Client
A temporary synchronous dask client.
Examples:
Use get_dask_client to distribute work across workers.
@contextmanagerdefget_dask_client(timeout:Optional[Union[int,float,str,timedelta]]=None,**client_kwargs:Dict[str,Any],)->Generator[Client,None,None]:""" Yields a temporary synchronous dask client; this is useful for parallelizing operations on dask collections, such as a `dask.DataFrame` or `dask.Bag`. Without invoking this, workers do not automatically get a client to connect to the full cluster. Therefore, it will attempt perform work within the worker itself serially, and potentially overwhelming the single worker. When in an async context, we recommend using `get_async_dask_client` instead. Args: timeout: Timeout after which to error out; has no effect in flow run contexts because the client has already started; Defaults to the `distributed.comm.timeouts.connect` configuration value. client_kwargs: Additional keyword arguments to pass to `distributed.Client`, and overwrites inherited keyword arguments from the task runner, if any. Yields: A temporary synchronous dask client. Examples: Use `get_dask_client` to distribute work across workers. ```python import dask from prefect import flow, task from prefect_dask import DaskTaskRunner, get_dask_client @task def compute_task(): with get_dask_client(timeout="120s") as client: df = dask.datasets.timeseries("2000", "2001", partition_freq="4w") summary_df = client.compute(df.describe()).result() return summary_df @flow(task_runner=DaskTaskRunner()) def dask_flow(): prefect_future = compute_task.submit() return prefect_future.result() dask_flow() ``` """client_kwargs=_generate_client_kwargs(async_client=False,timeout=timeout,**client_kwargs)withClient(**client_kwargs)asclient:yieldclient