User Guide

Base executors

These methods on the more_executors.Executors class create standalone Executor instances which may serve as the basis of composing executors.

thread_pool()

creates a new ThreadPoolExecutor to execute callables in threads.

process_pool()

creates a new ProcessPoolExecutor to execute callables in processes.

sync()

creates a new SyncExecutor to execute callables in the calling thread.

Example:

from more_executors import Executors
with Executors.thread_pool(name='web-client') as executor:
    future = executor.submit(requests.get, 'https://github.com/rohanpm/more-executors')

Composing executors

Executors produced by this module can be customized by chaining a series of with_* methods. This may be used to compose different behaviors for specific use-cases.

Methods for composition are provided for all implemented executors:

with_map()

transform the output of a future, synchronously

with_flat_map()

transform the output of a future, asynchronously

with_retry()

retry failing futures

with_poll()

resolve futures via a custom poll function

with_timeout()

cancel unresolved futures after a timeout

with_throttle()

limit the number of concurrently executing futures

with_cancel_on_shutdown()

cancel any pending futures on executor shutdown

with_asyncio()

bridge between concurrent.futures and asyncio

Example:

# Run in up to 4 threads, retry on failure, transform output values
executor = Executors.thread_pool(max_workers=4, name='web-client'). \
    with_map(lambda response: response.json()). \
    with_retry()
responses = [executor.submit(requests.get, url)
             for url in urls]

Keep in mind that the order in which executors are composed is significant. For example, these two composition sequences have different effects:

Executors.sync().with_retry().with_throttle(4)

In this example, if 4 callables have failed and retries are currently pending, throttling takes effect, and any additional callables will be enqueued until at least one of the earlier callables has completed (or exhausted all retry attempts).

Executors.sync().with_throttle(4).with_retry()

In this example, an unlimited number of futures may be failed and awaiting retries. The throttling in this example has no effect, since a SyncExecutor is intrinsically throttled to a single pending future.

Naming executors

All executors accept an optional name argument, an arbitrary string. Setting the name when creating an executor has the following effects:

  • If the executor creates any threads, the thread name will include the specified value.

  • The name will be used as executor label on any Prometheus metrics associated with the executor.

When creating chained executors via the with_* methods (see Composing executors), names automatically propagate through the chain:

Executors.thread_pool(name='svc-client').with_retry().with_throttle(4)

In the above example, three executors are created and all of them are given the name svc-client.

Composing futures

A series of functions are provided for creating and composing Future objects. These functions may be used standalone, or in conjunction with the Executor implementations in more-executors.

Function

Signature

Description

f_return()

X

⟶ Future<X>

wrap any value in a future

f_return_error()

n/a

wrap any exception in a future

f_return_cancelled()

n/a

get a cancelled future

f_apply()

Future<fn<A[,B[,…]]⟶R>>, Future<A>[, Future<B>[, …]]

⟶ Future<R>

apply a function in the future

f_or()

Future<A>[, Future<B>[, …]]

⟶ Future<A|B|…>

boolean OR

f_and()

Future<A>[, Future<B>[, …]]

⟶ Future<A|B|…>

boolean AND

f_zip()

Future<A>[, Future<B>[, …]]

⟶ Future<A[, B[, …]]>

combine futures into a tuple

f_map()

Future<A>, fn<A⟶B>

⟶ Future<B>

transform output value of a future via a blocking function

f_flat_map()

Future<A>, fn<A⟶Future<B>>

⟶ Future<B>

transform output value of a future via a non-blocking function

f_traverse()

fn<A⟶Future<B>>, iterable<A>

⟶ Future<list<B>>

run non-blocking function over iterable

f_sequence()

list<Future<X>>

⟶ Future<list<X>>

convert list of futures to a future of list

f_nocancel()

Future<X>

⟶ Future<X>

make a future unable to be cancelled

f_proxy()

Future<X>

⟶ Future<X>

make a future proxy calls to the future’s result

f_timeout()

Future<X>, float

⟶ Future<X>

make a future cancel itself after a timeout has elapsed

Usage of threads

Several executors internally make use of threads. Thus, executors should be considered relatively heavyweight: creating dozens of executors within a process is probably fine, creating thousands is possibly not.

Callbacks added by add_done_callback() may be invoked from any thread and should avoid any slow blocking operations.

All provided executors are thread-safe with the exception of the shutdown() method, which should be called from one thread only.

Executor shutdown

Shutting down an executor will also shut down all wrapped executors.

In the example below, any threads created by the ThreadPoolExecutor, as well as the thread created by the RetryExecutor, will be joined at the end of the with block:

executor = Executors.thread_pool(). \
    with_map(check_result). \
    with_retry()

with executor:
    do_something(executor)
    do_other_thing(executor)

Note this implies that sharing of executors needs to be done carefully. For example, this code is broken:

executor = Executors.thread_pool().with_map(check_result)

# Only need retries on this part
with executor.with_retry() as retry_executor:
    do_flaky_something(retry_executor)

# BUG: don't do this!
# The thread pool executor was already shut down, so this won't work.
with executor:
    do_something(executor)

Generally, shutting down executors is optional and is not necessary to (eventually) reclaim resources.

However, where executors accept caller-provided code (such as the polling function to PollExecutor or the retry policy to RetryExecutor), it is easy to accidentally create a circular reference between the provided code and the executor. When this happens, it will no longer be possible for the garbage collector to clean up the executor’s resources automatically and a thread leak may occur. If in doubt, call shutdown().

Prometheus metrics

This library automatically collects Prometheus metrics if the prometheus_client Python module is available. The feature is disabled when this module is not installed or if the MORE_EXECUTORS_PROMETHEUS environment variable is set to 0.

If you want to ensure that more-executors is installed along with all prometheus dependencies, you may request the ‘prometheus’ extras, as in example:

pip install more-executors[prometheus]

The library only collects metrics; it does not expose them. You must use prometheus_client to expose metrics in the most appropriate manner when integrating this library with your tool or service. Here is a simple example to dump metrics to a file:

import prometheus_client
prometheus_client.write_to_textfile('metrics.txt')

The following metrics are available:

more_executors_exec_inprogress

A gauge for the number of executors currently in use.

“In use” means an executor has been created and shutdown() not yet called. Incorrect usage of shutdown() (e.g. calling more than once) will lead to inaccurate data.

more_executors_exec_total

A counter for the total number of executors created.

more_executors_future_inprogress

A gauge for the number of futures currently in progress.

“In progress” means a future has been created and not yet reached a terminal state.

more_executors_future_total

A counter for the total number of futures created.

more_executors_future_cancel_total

A counter for the total number of futures cancelled.

more_executors_future_error_total

A counter for the total number of futures resolved with an exception.

more_executors_future_time_total

A counter for the total execution time (in seconds) of futures.

The execution time of a future is the period between the creation and resolution of a future.

more_executors_poll_total

A counter for the total number of times a Poll function was invoked.

more_executors_poll_error_total

A counter for the total number of times a Poll function raised an exception.

more_executors_poll_time_total

A counter for the total execution time (in seconds) of Poll function calls.

more_executors_retry_total

A counter for the total number of times a future was retried by RetryExecutor.

more_executors_retry_queue

A gauge for the current queue size of a RetryExecutor (i.e. the number of futures currently waiting to retry).

more_executors_retry_delay_total

A counter for the total time (in seconds) spent waiting to retry futures via RetryExecutor.

more_executors_throttle_queue

A gauge for the current queue size of a ThrottleExecutor (i.e. the number of futures not yet able to start due to throttling).

more_executors_timeout_total

A counter for the total number of futures cancelled due to timeout via TimeoutExecutor or f_timeout().

Only successfully cancelled futures are included.

more_executors_shutdown_cancel_total

A counter for the total number of futures cancelled due to executor shutdown via CancelOnShutdownExecutor.

Only successfully cancelled futures are included.

Metrics include the following labels:

type

The type of executor or future in use; e.g. map, retry, poll.

executor

Name of executor (see Naming executors).

Executors created for internal use by this library are named internal.