Worker Frameworks¶
Overview¶
We use two different frameworks for scheduled & background tasks: Chancy and APScheduler. Chancy is currently used in the Busy Level Management Service and should be used for new workers. APScheduler is deprecated in our codebase, but is used for some legacy workers. This document will focus on Chancy, since it should be used going forward.
Chancy Overview¶
Chancy was chosen due to its rich feature set, open source model, and PostgreSQL backing store. It was evaluated against Celery & Oban. Celery lacked features like rate limiting by queue, and Oban locked those features behind a paid plan.
Chancy Patterns¶
Refer to the chancy documentation and existing examples for basic usage. This document will focus on patterns that we prefer in the orbital-manager-backend codebase. After reading through these patterns & setting up your service's Chancy code, see Creating Chancy's Database Tables for the final steps.
Folder structure¶
This is the basic structure for workers in a given service. You can reference the busy level management service for an example of the contents of these files.
# Responsible for actually initializing the service's Chancy instance, declaring queues and workers, and scheduling cron jobs.
app/<service>/workers/chancy_initialization.py
# Responsible for getting & setting the Chancy instance.
app/<service>/workers/chancy_instance.py
# Job files
app/<service>/workers/<job_name>/worker.py
One Chancy Instance Per Service¶
Chancy allows you to prefix the tables it uses, such that multiple instances of Chancy can run against the same database while remaining isolated. We take advantage of this and run a separate Chancy instance per service with the service name as a prefix. A single chancy instance may have multiple worker instances (not to be confused with individual jobs/tasks/workers) that each pick up a certain set of queues via Chancy's tagging system. But there should be only one Chancy object per service. This ensures that all jobs which reference a service's code are located within the service.
FastAPI Integration¶
While Chancy could run as its own process, we integrate it with the FastAPI process so that startup & shutdown are handled together. If a service ever has workers that are affecting FastAPI response times, we can split that service's Chancy instance out thanks to the One Chancy Instance Per Service rule.
In order to integrate with FastAPI, we declare initialize_workers() as an async context manager in chancy_initialization.py:
@asynccontextmanager
async def initialize_workers() -> AsyncIterator[None]:
"""Configures chancy and starts a worker."""
# Initialize chancy with options/patterns discussed below
chancy = Chancy(...)
# Set chancy as global instance (set_chancy defined in chancy_instance.py)
set_chancy(chancy)
async with chancy:
# Declare queues
...
# Schedule cron jobs
...
# Worker instance declaration, could have multiple
# register_signal_handlers=False prevents Chancy from interfering with Uvicorn's shutdown handlers
async with Worker(chancy, register_signal_handlers=False) as _worker:
yield
Then, in our service's main.py's lifespan method, we use initialize workers before yielding to FastAPI:
@asynccontextmanager
async def lifespan(_app: FastAPI) -> AsyncIterator[None]:
"""Application lifespan context manager.
On startup, initialize the async Postgres pool.
On shutdown, clean up the pool.
"""
# Initialize DB/RabbitMQ/Whatever
...
try:
async with initialize_workers():
yield
finally:
# Tear down DB/RabbitMQ/Whatever
Connecting to the Session Pooler¶
Chancy does its own transaction management, and so isn't compatible with Supabase's transaction pooler. Ideally we would connect to the DB directly, but Supabase is IPv6 only for direct connection (without a paid addon) and Azure Container Apps only support IPv4. So, we use Supabase's session pooler:
chancy = Chancy(
get_secret_value("POSTGRES-SUPABASE-SESSION-POOLER-URL"),
min_connection_pool_size=CHANCY_MIN_POOL_SIZE, # 5
max_connection_pool_size=CHANCY_MAX_POOL_SIZE, # 10
...
)
Limited Plugins¶
Chancy implements most features as plugins, which lets us opt out of those we don't need. This reduces DB load and unnecessary tasks.
chancy = Chancy(
...,
no_default_plugins=True, # disabling chancy metrics & workflow until needed
plugins=[Leadership(), Pruner(), Recovery(), Cron(), SentryPlugin()],
...
)
Logging¶
The SentryPlugin shown above logs errors to sentry. We also want regular logging to go to our normal log stream.
Creating Chancy's Database Tables¶
At the moment, we don't have a proper system for DB migrations. Chancy does have a migrate() function which can run at startup, but until we have a system for general database migrations we'll rely on manually creating Chancy's database tables for each service. You can run Chancy's DB migration when adding a new service via the command line:
- Pull your chancy declaration to the top-level of your service's
chancy_initialization.py(outside ofinitialize_workers) - Modify the database URL to point at a local PostgreSQL instance first
- From
orbital-manager-backendroot, run: - Verify that the tables were created in your local PostgreSQL DB
- Return the database URL to use the Supabase session pooler, and run the command again to migrate on production.
- Revert your changes to
chancy_initialization.py