User Guide¶
Base executors¶
These methods on the more_executors.Executors class
create standalone Executor
instances which may serve as the basis of composing executors.
thread_pool()creates a new
ThreadPoolExecutorto execute callables in threads.process_pool()creates a new
ProcessPoolExecutorto execute callables in processes.sync()creates a new
SyncExecutorto execute callables in the calling thread.
Example:
from more_executors import Executors
with Executors.thread_pool(name='web-client') as executor:
future = executor.submit(requests.get, 'https://github.com/rohanpm/more-executors')
Composing executors¶
Executors produced by this module can be customized by chaining a series of with_* methods. This may be used to compose different behaviors for specific use-cases.
Methods for composition are provided for all implemented executors:
with_map()transform the output of a future, synchronously
with_flat_map()transform the output of a future, asynchronously
with_retry()retry failing futures
with_poll()resolve futures via a custom poll function
with_timeout()cancel unresolved futures after a timeout
with_throttle()limit the number of concurrently executing futures
with_cancel_on_shutdown()cancel any pending futures on executor shutdown
with_asyncio()bridge between
concurrent.futuresandasyncio
Example:
# Run in up to 4 threads, retry on failure, transform output values
executor = Executors.thread_pool(max_workers=4, name='web-client'). \
with_map(lambda response: response.json()). \
with_retry()
responses = [executor.submit(requests.get, url)
for url in urls]
Keep in mind that the order in which executors are composed is significant. For example, these two composition sequences have different effects:
Executors.sync().with_retry().with_throttle(4)
In this example, if 4 callables have failed and retries are currently pending, throttling takes effect, and any additional callables will be enqueued until at least one of the earlier callables has completed (or exhausted all retry attempts).
Executors.sync().with_throttle(4).with_retry()
In this example, an unlimited number of futures may be failed and awaiting
retries. The throttling in this example has no effect, since a
SyncExecutor is intrinsically throttled to
a single pending future.
Naming executors¶
All executors accept an optional name argument, an arbitrary string.
Setting the name when creating an executor has the following effects:
If the executor creates any threads, the thread name will include the specified value.
The name will be used as
executorlabel on any Prometheus metrics associated with the executor.
When creating chained executors via the with_* methods
(see Composing executors), names automatically propagate through
the chain:
Executors.thread_pool(name='svc-client').with_retry().with_throttle(4)
In the above example, three executors are created and all of them are
given the name svc-client.
Composing futures¶
A series of functions are provided for creating and composing
Future objects. These functions
may be used standalone, or in conjunction with the Executor
implementations in more-executors.
Function |
Signature |
Description |
|---|---|---|
|
wrap any value in a future |
|
n/a |
wrap any exception in a future |
|
n/a |
get a cancelled future |
|
|
apply a function in the future |
|
|
boolean |
|
|
boolean |
|
|
combine futures into a tuple |
|
|
transform output value of a future via a blocking function |
|
|
transform output value of a future via a non-blocking function |
|
|
run non-blocking function over iterable |
|
|
convert list of futures to a future of list |
|
|
make a future unable to be cancelled |
|
|
make a future proxy calls to the future’s result |
|
|
make a future cancel itself after a timeout has elapsed |
Usage of threads¶
Several executors internally make use of threads. Thus, executors should be considered relatively heavyweight: creating dozens of executors within a process is probably fine, creating thousands is possibly not.
Callbacks added by add_done_callback() may
be invoked from any thread and should avoid any slow blocking operations.
All provided executors are thread-safe with the exception of the
shutdown() method, which should be called
from one thread only.
Executor shutdown¶
Shutting down an executor will also shut down all wrapped executors.
In the example below, any threads created by the
ThreadPoolExecutor, as well as the thread
created by the RetryExecutor, will be joined
at the end of the with block:
executor = Executors.thread_pool(). \
with_map(check_result). \
with_retry()
with executor:
do_something(executor)
do_other_thing(executor)
Note this implies that sharing of executors needs to be done carefully. For example, this code is broken:
executor = Executors.thread_pool().with_map(check_result)
# Only need retries on this part
with executor.with_retry() as retry_executor:
do_flaky_something(retry_executor)
# BUG: don't do this!
# The thread pool executor was already shut down, so this won't work.
with executor:
do_something(executor)
Generally, shutting down executors is optional and is not necessary to (eventually) reclaim resources.
However, where executors accept caller-provided code (such as the polling
function to PollExecutor or the retry
policy to RetryExecutor), it is easy to
accidentally create a circular reference between the provided code and the
executor. When this happens, it will no longer be possible for the garbage
collector to clean up the executor’s resources automatically and a thread
leak may occur. If in doubt, call
shutdown().
Prometheus metrics¶
This library automatically collects Prometheus
metrics if the prometheus_client Python module is available.
The feature is disabled when this module is not installed or if the
MORE_EXECUTORS_PROMETHEUS environment variable is set to 0.
If you want to ensure that more-executors is installed along with
all prometheus dependencies, you may request the ‘prometheus’ extras,
as in example:
pip install more-executors[prometheus]
The library only collects metrics; it does not expose them.
You must use prometheus_client to expose metrics in the most
appropriate manner when integrating this library with your tool or service.
Here is a simple example to dump metrics to a file:
import prometheus_client
prometheus_client.write_to_textfile('metrics.txt')
The following metrics are available:
more_executors_exec_inprogressA gauge for the number of executors currently in use.
“In use” means an executor has been created and
shutdown()not yet called. Incorrect usage ofshutdown()(e.g. calling more than once) will lead to inaccurate data.more_executors_exec_totalA counter for the total number of executors created.
more_executors_future_inprogressA gauge for the number of futures currently in progress.
“In progress” means a future has been created and not yet reached a terminal state.
more_executors_future_totalA counter for the total number of futures created.
more_executors_future_cancel_totalA counter for the total number of futures cancelled.
more_executors_future_error_totalA counter for the total number of futures resolved with an exception.
more_executors_future_time_totalA counter for the total execution time (in seconds) of futures.
The execution time of a future is the period between the creation and resolution of a future.
more_executors_poll_totalA counter for the total number of times a Poll function was invoked.
more_executors_poll_error_totalA counter for the total number of times a Poll function raised an exception.
more_executors_poll_time_totalA counter for the total execution time (in seconds) of Poll function calls.
more_executors_retry_totalA counter for the total number of times a future was retried by
RetryExecutor.more_executors_retry_queueA gauge for the current queue size of a
RetryExecutor(i.e. the number of futures currently waiting to retry).more_executors_retry_delay_totalA counter for the total time (in seconds) spent waiting to retry futures via
RetryExecutor.more_executors_throttle_queueA gauge for the current queue size of a
ThrottleExecutor(i.e. the number of futures not yet able to start due to throttling).more_executors_timeout_totalA counter for the total number of futures cancelled due to timeout via
TimeoutExecutororf_timeout().Only successfully cancelled futures are included.
more_executors_shutdown_cancel_totalA counter for the total number of futures cancelled due to executor shutdown via
CancelOnShutdownExecutor.Only successfully cancelled futures are included.
Metrics include the following labels:
typeThe type of executor or future in use; e.g.
map,retry,poll.executorName of executor (see Naming executors).
Executors created for internal use by this library are named
internal.