Polling: PollExecutor

PollExecutor allows producing futures which should be resolved via a custom poll function.

Poll function

The poll function has the following semantics:

  • It’s called with a single argument, a list of zero or more PollDescriptor objects.

  • If the poll function can determine that a particular future should be completed, either successfully or in error, it should call the yield_result() or yield_exception() methods on the appropriate PollDescriptor.

  • If the poll function raises an exception, all futures depending on that poll will fail with that exception. The poll function will be retried later.

  • If the poll function returns an int or float, it is used as the delay in seconds until the next poll.

Warning

The poll function should avoid holding a reference to the corresponding PollExecutor. If it holds a reference to the executor, invoking shutdown() becomes mandatory in order to stop the polling thread.

Cancel function

The cancel function has the following semantics:

  • It’s called when cancel is requested on a future returned by this executor, if and only if the future is currently in the list of futures being polled, i.e. it will not be called if the delegate future has not yet completed.

  • It’s called with a single argument: the value returned by the delegate future.

  • It should return True if the future can be cancelled.

  • If the cancel function raises an exception, the future’s cancel method will return False and a message will be logged.

Example

Consider a web service which executes tasks asynchronously, with an API like this:

  • To create a task: POST https://myservice/object/:id/publish

    • Returns an identifier for a task, such as {“task_id”: 123}

  • To get status of a single task: GET https://myservice/tasks/:id

    • Returns task status, such as {“task_id”: 123, “state”: “finished”}

  • To cancel a single task: DELETE https://myservice/tasks/:id

  • To search for status of multiple tasks: POST https://myservice/tasks/search {“task_id”: [123, 456, …]}

    • Returns array of task statuses, such as:

    [{"task_id": 123, "state": "finished"},
     {"task_id": 456, "state": "error", "error": "..."}]
    

Imagine we want to run many tasks concurrently and monitor their status. Two obvious approaches include:

  • We could make a function which starts a task and polls that task until it completes, and have several threads run that function for different tasks.

    • This could be easily encapsulated as a future using only the executors from concurrent.futures, but is very inefficient - each task in progress will consume a thread for polling.

  • We could maintain a list of all outstanding tasks and implement a special-purpose function to wait for all of them to complete.

    • This is efficient as only one request is needed per poll interval, but since tasks aren’t represented as futures, useful features from concurrent.futures cannot be used and different types of asynchronous work will be handled inconsistently.

PollExecutor bridges the gap between the two approaches. By providing a custom poll function, the tasks can be monitored by one thread efficiently, while at the same time allowing full usage of the API provided by Future objects.

In this example, the poll function would look similar to this:

def poll_tasks(poll_descriptors):
    # Collect tasks we'll need to search for
    task_ids = [d.result.get('task_id')
                for d in poll_descriptors]

    # Search for status of these tasks
    search = {"task_id": task_ids}
    response = requests.post(
        'https://myservice/tasks/search', json=search)

    # If request failed then all outstanding futures fail
    response.raise_for_status()

    # Request passed, resolve futures accordingly
    tasks = {task.get('task_id'): task.get('state')
            for task in response.json()}
    for d in poll_descriptors:
        task = tasks.get(d.result, {})
        state = task.get('state')
        if state == 'finished':
            d.yield_result()
        elif state == 'error':
            d.yield_exception(TaskFailed())

To ensure that calling future.cancel() also cancels tasks on the remote service, we can also provide a cancel function:

def cancel_task(task):
    task_id = task['task_id']

    # Attempt to DELETE this task
    response = requests.delete(
        'https://myservice/tasks/%s' % task_id)

    # Succeeded only if response was OK.
    # Otherwise, we may have been too late to cancel.
    return response.ok

With the poll and cancel functions in place, futures could be created like this:

def publish(object_id):
    response = requests.post(
        'https://myservice/object/%s/publish' % object_id)
    response.raise_for_status()
    return response.json()

# Submit up to 4 HTTP requests at a time,
# and poll for the returned tasks using our poll function.
executor = Executors.\
    threadpool(max_workers=4).\
    with_poll(poll_tasks, cancel_task)
futures = [executor.submit(publish, x)
            for x in (10, 20, 30)]

# Now we can use concurrent.futures API to get tasks as
# the poll function determines they've completed
for completed in as_completed(futures):
    # ...
class more_executors.PollExecutor(delegate, poll_fn, cancel_fn=None, default_interval=5.0, logger=None, name='default')

Instances of PollExecutor submit callables to a delegate executor and resolve the returned futures via a provided poll function.

A cancel function may also be provided to perform additional processing when a returned future is cancelled.

Parameters
  • delegate (Executor) – executor to which callables will be submitted

  • poll_fn (callable) – a poll function used to decide when futures should be resolved

  • cancel_fn (callable) – a cancel function invoked when future cancel is required

  • default_interval (float) – default interval between polls (in seconds)

  • logger (Logger) – a logger used for messages from this executor

  • name (str) – a name for this executor

Changed in version 2.7.0: Introduced name.

notify()

Request the executor to re-run the polling function as soon as possible.

This method may be used to perform the next poll earlier than it would otherwise run.

This is useful for cases where futures may be resolved via a mixture of polling and external events.

New in version 2.2.0.

classmethod Executors.with_poll(executor, poll_fn, cancel_fn=None, default_interval=5.0, logger=None)
Returns

a new executor which produces polled futures.

Submitted callables will have their output passed into a poll function.

Return type

PollExecutor

class more_executors.PollDescriptor

Represents an unresolved Future.

The poll function used by PollExecutor will be invoked with a list of PollDescriptor objects.

property result

The result from the delegate executor’s future, which should be used to drive the poll.

yield_exception(exception, traceback=None)

The poll function can call this function to make the future raise the given exception.

Parameters
  • exception (Exception) – An exception to be returned or raised by the future associated with this descriptor

  • traceback (traceback) – An optional associated traceback. This argument only has an effect on python 2.x, where exception objects do not include a traceback.

yield_result(result)

The poll function can call this function to make the future yield the given result.

Parameters

result (object) – a result to be returned by the future associated with this descriptor