Prioritizing celery tasks with different queues
When you have working SQL query there might not be so simple and straightforward it’s migration to Presto SQL. Recently I had a job to extract data from one table but with multiple added columns which requires sub-select. To demonstrate the problem and solution I have prepared simple user and location table where location entry represents where and where have users moved.
Celery supports prioritization within each queue, but what if you want to reserve premium resource allocations for some tasks? You can move some traffic to other queue where worker with better resources take tasks from it.
Example
Lets say we have 2 functions a
and b
. We can set them default queues like public
and private
through task decorator correspondingly.
@app.task(queue="public")
def a(x, y):
print(f"Printing from fn 'a': {x + y}")
@app.task(queue="private")
def b(x, y):
print(f"Printing from fn 'b': {x * y}")
We can run 2 workers, one for each queue and on different machines
celery -A tasks.app worker -Q public -l INFO # machine 1
celery -A tasks.app worker -Q private -l INFO # machine 2
and schedule task by invoking function using delay
or apply_async
like this:
a.delay(1, 2) # Printing from fn 'a': 3 | public worker
b.delay(2, 3) # Printing from fn 'b': 6 | private workder
a.apply_async(args=[3, 4], queue="private") # Printing from fn 'a': 7 | private worker
b.apply_async(args=[4, 5], queue="public") # Printing from fn 'b': 20 | public worker
Tasks will be processed by workers as written in comments. This allows you to process same task code on diferent workers, where task with higher prioriy can be executed on worker with better resources. Example: processing registered user vs. anonymous user data.
Here is the docker compose
and flower
output of whole POC example
...
public-worker-1 | [2022-11-27 18:19:56,784: INFO/MainProcess] Task tasks.a[25277467-1d4f-4b95-b915-d0ac7261a14a] received
public-worker-1 | [2022-11-27 18:19:56,786: WARNING/ForkPoolWorker-1] Printing from fn 'a': 3
private-worker-1 | [2022-11-27 18:19:56,786: INFO/MainProcess] Task tasks.b[32fa661f-706b-4139-a1cb-44d1e2c477a4] received
public-worker-1 | [2022-11-27 18:19:56,787: INFO/ForkPoolWorker-1] Task tasks.a[25277467-1d4f-4b95-b915-d0ac7261a14a] succeeded in 0.0016659999964758754s: None
private-worker-1 | [2022-11-27 18:19:56,788: INFO/MainProcess] Task tasks.a[fb61b60e-90e0-45cb-8fc6-edde1a431ffa] received
private-worker-1 | [2022-11-27 18:19:56,789: WARNING/ForkPoolWorker-3] Printing from fn 'a': 7
private-worker-1 | [2022-11-27 18:19:56,789: WARNING/ForkPoolWorker-1] Printing from fn 'b': 6
private-worker-1 | [2022-11-27 18:19:56,789: INFO/ForkPoolWorker-3] Task tasks.a[fb61b60e-90e0-45cb-8fc6-edde1a431ffa] succeeded in 0.0008720419136807323s: None
private-worker-1 | [2022-11-27 18:19:56,789: INFO/ForkPoolWorker-1] Task tasks.b[32fa661f-706b-4139-a1cb-44d1e2c477a4] succeeded in 0.0009083750192075968s: None
app-1 | Enqueing with default queue choice 'a' -> public
app-1 | Enqueing with default queue choice 'b' -> private
app-1 | Enqueing with overriden queue choice 'a' -> private
app-1 | Enqueing with overriden queue choice 'b' -> public
public-worker-1 | [2022-11-27 18:19:56,791: INFO/MainProcess] Task tasks.b[279c3ee7-b217-41e3-bb16-c4679cceabd2] received
public-worker-1 | [2022-11-27 18:19:56,791: WARNING/ForkPoolWorker-1] Printing from fn 'b': 20
public-worker-1 | [2022-11-27 18:19:56,791: INFO/ForkPoolWorker-1] Task tasks.b[279c3ee7-b217-41e3-bb16-c4679cceabd2] succeeded in 0.00010766694322228432s: None
app-1 exited with code 0