This document describes the current stable version of Celery (4.3). For development docs, go here.

celery.worker.consumer

Worker consumer.

class celery.worker.consumer.Consumer(on_task_request, init_callback=<function noop>, hostname=None, pool=None, app=None, timer=None, controller=None, hub=None, amqheartbeat=None, worker_options=None, disable_rate_limits=False, initial_prefetch_count=2, prefetch_multiplier=1, **kwargs)[source]

Consumer blueprint.

class Blueprint(steps=None, name=None, on_start=None, on_close=None, on_stopped=None)[source]

Consumer blueprint.

default_steps = ['celery.worker.consumer.connection:Connection', 'celery.worker.consumer.mingle:Mingle', 'celery.worker.consumer.events:Events', 'celery.worker.consumer.gossip:Gossip', 'celery.worker.consumer.heart:Heart', 'celery.worker.consumer.control:Control', 'celery.worker.consumer.tasks:Tasks', 'celery.worker.consumer.consumer:Evloop', 'celery.worker.consumer.agent:Agent']
name = 'Consumer'
shutdown(parent)[source]
Strategies

alias of builtins.dict

add_task_queue(queue, exchange=None, exchange_type=None, routing_key=None, **options)[source]
apply_eta_task(task)[source]

Method called by the timer to apply a task with an ETA/countdown.

bucket_for_task(type)[source]
call_soon(p, *args, **kwargs)[source]
cancel_task_queue(queue)[source]
connect()[source]

Establish the broker connection used for consuming tasks.

Retries establishing the connection if the broker_connection_retry setting is enabled

connection_for_read(heartbeat=None)[source]
connection_for_write(heartbeat=None)[source]
create_task_handler(promise=<class 'vine.promises.promise'>)[source]
ensure_connected(conn)[source]
init_callback = None

Optional callback called the first time the worker is ready to receive tasks.

loop_args()[source]
on_close()[source]
on_connection_error_after_connected(exc)[source]
on_connection_error_before_connected(exc)[source]
on_decode_error(message, exc)[source]

Callback called if an error occurs while decoding a message.

Simply logs the error and acknowledges the message so it doesn’t enter a loop.

Parameters:
  • message (kombu.Message) – The message received.
  • exc (Exception) – The exception being handled.
on_invalid_task(body, message, exc)[source]
on_ready()[source]
on_send_event_buffered()[source]
on_unknown_message(body, message)[source]
on_unknown_task(body, message, exc)[source]
perform_pending_operations()[source]
pool = None

The current worker pool instance.

register_with_event_loop(hub)[source]
reset_rate_limits()[source]
restart_count = -1
shutdown()[source]
start()[source]
stop()[source]
timer = None

A timer used for high-priority internal tasks, such as sending heartbeats.

update_strategies()[source]
class celery.worker.consumer.Agent(c, **kwargs)[source]

Agent starts cell actors.

conditional = True
create(c)[source]

Create the step.

name = 'celery.worker.consumer.agent.Agent'
requires = (step:celery.worker.consumer.connection.Connection{()},)
class celery.worker.consumer.Connection(c, **kwargs)[source]

Service managing the consumer broker connection.

info(c)[source]
name = 'celery.worker.consumer.connection.Connection'
shutdown(c)[source]
start(c)[source]
class celery.worker.consumer.Control(c, **kwargs)[source]

Remote control command service.

include_if(c)[source]

Return true if bootstep should be included.

You can define this as an optional predicate that decides whether this step should be created.

name = 'celery.worker.consumer.control.Control'
requires = (step:celery.worker.consumer.tasks.Tasks{(step:celery.worker.consumer.mingle.Mingle{(step:celery.worker.consumer.events.Events{(step:celery.worker.consumer.connection.Connection{()},)},)},)},)
class celery.worker.consumer.Events(c, task_events=True, without_heartbeat=False, without_gossip=False, **kwargs)[source]

Service used for sending monitoring events.

name = 'celery.worker.consumer.events.Events'
requires = (step:celery.worker.consumer.connection.Connection{()},)
shutdown(c)[source]
start(c)[source]
stop(c)[source]
class celery.worker.consumer.Gossip(c, without_gossip=False, interval=5.0, heartbeat_interval=2.0, **kwargs)[source]

Bootstep consuming events from other workers.

This keeps the logical clock value up to date.

call_task(task)[source]
compatible_transport(app)[source]
compatible_transports = {'amqp', 'redis'}
election(id, topic, action=None)[source]
get_consumers(channel)[source]
label = 'Gossip'
name = 'celery.worker.consumer.gossip.Gossip'
on_elect(event)[source]
on_elect_ack(event)[source]
on_message(prepare, message)[source]
on_node_join(worker)[source]
on_node_leave(worker)[source]
on_node_lost(worker)[source]
periodic()[source]
register_timer()[source]
requires = (step:celery.worker.consumer.mingle.Mingle{(step:celery.worker.consumer.events.Events{(step:celery.worker.consumer.connection.Connection{()},)},)},)
start(c)[source]
class celery.worker.consumer.Heart(c, without_heartbeat=False, heartbeat_interval=None, **kwargs)[source]

Bootstep sending event heartbeats.

This service sends a worker-heartbeat message every n seconds.

Note

Not to be confused with AMQP protocol level heartbeats.

name = 'celery.worker.consumer.heart.Heart'
requires = (step:celery.worker.consumer.events.Events{(step:celery.worker.consumer.connection.Connection{()},)},)
shutdown(c)
start(c)[source]
stop(c)[source]
class celery.worker.consumer.Mingle(c, without_mingle=False, **kwargs)[source]

Bootstep syncing state with neighbor workers.

At startup, or upon consumer restart, this will:

  • Sync logical clocks.
  • Sync revoked tasks.
compatible_transport(app)[source]
compatible_transports = {'amqp', 'redis'}
label = 'Mingle'
name = 'celery.worker.consumer.mingle.Mingle'
on_clock_event(c, clock)[source]
on_node_reply(c, nodename, reply)[source]
on_revoked_received(c, revoked)[source]
requires = (step:celery.worker.consumer.events.Events{(step:celery.worker.consumer.connection.Connection{()},)},)
send_hello(c)[source]
start(c)[source]
sync(c)[source]
sync_with_node(c, clock=None, revoked=None, **kwargs)[source]
class celery.worker.consumer.Tasks(c, **kwargs)[source]

Bootstep starting the task message consumer.

info(c)[source]

Return task consumer info.

name = 'celery.worker.consumer.tasks.Tasks'
requires = (step:celery.worker.consumer.mingle.Mingle{(step:celery.worker.consumer.events.Events{(step:celery.worker.consumer.connection.Connection{()},)},)},)
shutdown(c)[source]

Shutdown task consumer.

start(c)[source]

Start task consumer.

stop(c)[source]

Stop task consumer.