User Guide¶
Base executors¶
These methods on the more_executors.Executors
class
create standalone Executor
instances which may serve as the basis of composing executors.
thread_pool()
creates a new
ThreadPoolExecutor
to execute callables in threads.process_pool()
creates a new
ProcessPoolExecutor
to execute callables in processes.sync()
creates a new
SyncExecutor
to execute callables in the calling thread.
Example:
from more_executors import Executors
with Executors.thread_pool(name='web-client') as executor:
future = executor.submit(requests.get, 'https://github.com/rohanpm/more-executors')
Composing executors¶
Executors produced by this module can be customized by chaining a series of with_* methods. This may be used to compose different behaviors for specific use-cases.
Methods for composition are provided for all implemented executors:
with_map()
transform the output of a future, synchronously
with_flat_map()
transform the output of a future, asynchronously
with_retry()
retry failing futures
with_poll()
resolve futures via a custom poll function
with_timeout()
cancel unresolved futures after a timeout
with_throttle()
limit the number of concurrently executing futures
with_cancel_on_shutdown()
cancel any pending futures on executor shutdown
with_asyncio()
bridge between
concurrent.futures
andasyncio
Example:
# Run in up to 4 threads, retry on failure, transform output values
executor = Executors.thread_pool(max_workers=4, name='web-client'). \
with_map(lambda response: response.json()). \
with_retry()
responses = [executor.submit(requests.get, url)
for url in urls]
Keep in mind that the order in which executors are composed is significant. For example, these two composition sequences have different effects:
Executors.sync().with_retry().with_throttle(4)
In this example, if 4 callables have failed and retries are currently pending, throttling takes effect, and any additional callables will be enqueued until at least one of the earlier callables has completed (or exhausted all retry attempts).
Executors.sync().with_throttle(4).with_retry()
In this example, an unlimited number of futures may be failed and awaiting
retries. The throttling in this example has no effect, since a
SyncExecutor
is intrinsically throttled to
a single pending future.
Naming executors¶
All executors accept an optional name
argument, an arbitrary string.
Setting the name
when creating an executor has the following effects:
If the executor creates any threads, the thread name will include the specified value.
The name will be used as
executor
label on any Prometheus metrics associated with the executor.
When creating chained executors via the with_*
methods
(see Composing executors), names automatically propagate through
the chain:
Executors.thread_pool(name='svc-client').with_retry().with_throttle(4)
In the above example, three executors are created and all of them are
given the name svc-client
.
Composing futures¶
A series of functions are provided for creating and composing
Future
objects. These functions
may be used standalone, or in conjunction with the Executor
implementations in more-executors
.
Function |
Signature |
Description |
---|---|---|
|
wrap any value in a future |
|
n/a |
wrap any exception in a future |
|
n/a |
get a cancelled future |
|
|
apply a function in the future |
|
|
boolean |
|
|
boolean |
|
|
combine futures into a tuple |
|
|
transform output value of a future via a blocking function |
|
|
transform output value of a future via a non-blocking function |
|
|
run non-blocking function over iterable |
|
|
convert list of futures to a future of list |
|
|
make a future unable to be cancelled |
|
|
make a future proxy calls to the future’s result |
|
|
make a future cancel itself after a timeout has elapsed |
Usage of threads¶
Several executors internally make use of threads. Thus, executors should be considered relatively heavyweight: creating dozens of executors within a process is probably fine, creating thousands is possibly not.
Callbacks added by add_done_callback()
may
be invoked from any thread and should avoid any slow blocking operations.
All provided executors are thread-safe with the exception of the
shutdown()
method, which should be called
from one thread only.
Executor shutdown¶
Shutting down an executor will also shut down all wrapped executors.
In the example below, any threads created by the
ThreadPoolExecutor
, as well as the thread
created by the RetryExecutor
, will be joined
at the end of the with block:
executor = Executors.thread_pool(). \
with_map(check_result). \
with_retry()
with executor:
do_something(executor)
do_other_thing(executor)
Note this implies that sharing of executors needs to be done carefully. For example, this code is broken:
executor = Executors.thread_pool().with_map(check_result)
# Only need retries on this part
with executor.with_retry() as retry_executor:
do_flaky_something(retry_executor)
# BUG: don't do this!
# The thread pool executor was already shut down, so this won't work.
with executor:
do_something(executor)
Generally, shutting down executors is optional and is not necessary to (eventually) reclaim resources.
However, where executors accept caller-provided code (such as the polling
function to PollExecutor
or the retry
policy to RetryExecutor
), it is easy to
accidentally create a circular reference between the provided code and the
executor. When this happens, it will no longer be possible for the garbage
collector to clean up the executor’s resources automatically and a thread
leak may occur. If in doubt, call
shutdown()
.
Prometheus metrics¶
This library automatically collects Prometheus
metrics if the prometheus_client
Python module is available.
The feature is disabled when this module is not installed or if the
MORE_EXECUTORS_PROMETHEUS
environment variable is set to 0
.
If you want to ensure that more-executors
is installed along with
all prometheus dependencies, you may request the ‘prometheus’ extras,
as in example:
pip install more-executors[prometheus]
The library only collects metrics; it does not expose them.
You must use prometheus_client
to expose metrics in the most
appropriate manner when integrating this library with your tool or service.
Here is a simple example to dump metrics to a file:
import prometheus_client
prometheus_client.write_to_textfile('metrics.txt')
The following metrics are available:
more_executors_exec_inprogress
A gauge for the number of executors currently in use.
“In use” means an executor has been created and
shutdown()
not yet called. Incorrect usage ofshutdown()
(e.g. calling more than once) will lead to inaccurate data.more_executors_exec_total
A counter for the total number of executors created.
more_executors_future_inprogress
A gauge for the number of futures currently in progress.
“In progress” means a future has been created and not yet reached a terminal state.
more_executors_future_total
A counter for the total number of futures created.
more_executors_future_cancel_total
A counter for the total number of futures cancelled.
more_executors_future_error_total
A counter for the total number of futures resolved with an exception.
more_executors_future_time_total
A counter for the total execution time (in seconds) of futures.
The execution time of a future is the period between the creation and resolution of a future.
more_executors_poll_total
A counter for the total number of times a Poll function was invoked.
more_executors_poll_error_total
A counter for the total number of times a Poll function raised an exception.
more_executors_poll_time_total
A counter for the total execution time (in seconds) of Poll function calls.
more_executors_retry_total
A counter for the total number of times a future was retried by
RetryExecutor
.more_executors_retry_queue
A gauge for the current queue size of a
RetryExecutor
(i.e. the number of futures currently waiting to retry).more_executors_retry_delay_total
A counter for the total time (in seconds) spent waiting to retry futures via
RetryExecutor
.more_executors_throttle_queue
A gauge for the current queue size of a
ThrottleExecutor
(i.e. the number of futures not yet able to start due to throttling).more_executors_timeout_total
A counter for the total number of futures cancelled due to timeout via
TimeoutExecutor
orf_timeout()
.Only successfully cancelled futures are included.
more_executors_shutdown_cancel_total
A counter for the total number of futures cancelled due to executor shutdown via
CancelOnShutdownExecutor
.Only successfully cancelled futures are included.
Metrics include the following labels:
type
The type of executor or future in use; e.g.
map
,retry
,poll
.executor
Name of executor (see Naming executors).
Executors created for internal use by this library are named
internal
.