MCP server for Celery Flower — monitor workers, manage tasks and queues from any AI assistant
🌸 celery-flower-mcp
Give your AI assistant full control over Celery — monitor workers, manage tasks, inspect queues.
Features · Quick Start · Configuration · Tools · Development · Contributing
What is this?
celery-flower-mcp is a Model Context Protocol server that exposes the full Celery Flower REST API as MCP tools. Point it at your Flower instance and your AI assistant (Claude, Cursor, Windsurf, etc.) can:
- Monitor workers, tasks, and queues in real time
- Control worker pools — grow, shrink, autoscale, restart, shut down
- Manage tasks — apply, revoke, abort, set timeouts and rate limits
- Inspect queues — check depths, add/remove consumers
All 21 Flower API endpoints are covered.
Features
- Full API coverage — every Flower REST endpoint exposed as an MCP tool
- Dependency injection via dishka — clean, testable architecture
- Pydantic Settings — typed configuration with
.envfile support - Async throughout — built on
httpx+FastMCP - 65 tests — 49 unit tests (99% coverage) + 16 integration tests against real Flower
- Strict typing — mypy strict mode, fully annotated
Quick Start
Install via uvx
FLOWER_URL=http://localhost:5555 uvx celery-flower-mcp
Install from source
git clone https://github.com/Darius1223/celery-flower-mcp
cd celery-flower-mcp
uv sync
uv run python -m source.main
Claude Desktop
Add to ~/Library/Application Support/Claude/claude_desktop_config.json:
{
"mcpServers": {
"celery-flower": {
"command": "uvx",
"args": ["celery-flower-mcp"],
"env": {
"FLOWER_URL": "http://localhost:5555"
}
}
}
}
Configuration
Configuration is read from environment variables or a .env file in the project root. Copy .env.example to get started:
cp .env.example .env
| Variable | Default | Description |
|---|---|---|
| FLOWER_URL | http://localhost:5555 | Base URL of your Flower instance |
| FLOWER_USERNAME | — | Basic auth username |
| FLOWER_PASSWORD | — | Basic auth password |
| FLOWER_API_TOKEN | — | Bearer token (takes priority over basic auth) |
Available Tools
Workers (8 tools)
| Tool | Description |
|---|---|
| list_workers | List all workers — optionally filter by name, refresh live stats, or get status only |
| shutdown_worker | Gracefully shut down a worker |
| restart_worker_pool | Restart a worker's process pool |
| grow_worker_pool | Add N processes to a worker's pool |
| shrink_worker_pool | Remove N processes from a worker's pool |
| autoscale_worker_pool | Configure autoscale min/max bounds |
| add_queue_consumer | Make a worker start consuming from a queue |
| cancel_queue_consumer | Make a worker stop consuming from a queue |
Tasks (11 tools)
| Tool | Description |
|---|---|
| list_tasks | List tasks with filters: state, worker, name, date range, search, pagination |
| list_task_types | List all registered task types across workers |
| get_task_info | Get full details for a task by UUID |
| get_task_result | Retrieve a task's result (with optional timeout) |
| apply_task | Execute a task synchronously and wait for the result |
| async_apply_task | Dispatch a task asynchronously, returns task UUID |
| send_task | Send a task by name — no registration required on worker side |
| abort_task | Abort a running task |
| revoke_task | Revoke a task; optionally terminate with a signal |
| set_task_timeout | Set soft and/or hard time limits for a task on a worker |
| set_task_rate_limit | Set rate limit for a task on a worker (e.g. 100/m) |
Queues & Health (2 tools)
| Tool | Description |
|---|---|
| get_queue_lengths | Get the current depth of all configured queues |
| healthcheck | Check whether the Flower instance is reachable and healthy |
Architecture
source/
├── main.py # FastMCP server entry point + dishka container wiring
├── settings.py # Pydantic Settings — typed config from env / .env
├── client.py # Async HTTP client wrapping Flower REST API
├── providers.py # dishka Provider — manages FlowerClient lifecycle
└── tools/
├── workers.py # 8 worker management tools
├── tasks.py # 11 task management tools
└── queues.py # 2 queue / health tools
dishka manages the FlowerClient lifecycle: created once at startup, closed cleanly on shutdown via an async generator provider.
Development
make fmt # auto-format with ruff
make lint # lint with ruff
make typecheck # type-check with mypy (strict)
make test # run 49 unit tests
make cov # unit tests + coverage report
make all # fmt + lint + typecheck
Testing
The test suite is split into two layers:
Unit tests (tests/) — fast, no external dependencies, use pytest-httpx to mock HTTP calls:
make test
# or
uv run pytest tests/ -m "not integration"
Integration tests (tests/integration/) — run against a real Flower instance backed by Redis and a live Celery worker, all managed by Docker Compose:
make integration
This command:
- Builds and starts the Docker Compose stack (
docker-compose.test.yml) — Redis → Celery worker → Flower - Waits for Flower's
/healthcheckendpoint to return OK - Runs the 16 integration tests against
http://localhost:5555 - Tears down the stack when done
The stack is defined in docker-compose.test.yml. The worker and Flower images are built from tests/integration/Dockerfile.worker and tests/integration/Dockerfile.flower.
To start the stack manually for exploratory testing:
docker compose -f docker-compose.test.yml up -d --build
# run tests, explore, etc.
make integration-down # stop + remove volumes
Integration tests use pytest.mark.asyncio(loop_scope="session") so all tests share one event loop — this avoids RuntimeError: Event loop is closed when httpx transports are cleaned up across test boundaries on Python 3.14.
See CONTRIBUTING.md for details on adding new tools or submitting a PR.
Changelog
See CHANGELOG.md.