Event Subscription Queues
This guide explains the EventSubscriptionQueue system for managing RabbitMQ queues
used by event subscriptions, including queue lifecycle management and cleanup mechanisms.
Overview
The EventSubscriptionQueue model tracks RabbitMQ queues that site agents create to
receive marketplace events. This explicit queue registration prevents race conditions
between STOMP subscribers and publishers that would otherwise cause precondition_failed
errors in RabbitMQ.
Problem Solved
Without explicit queue management, a race condition occurs:
sequenceDiagram
participant Agent as Site Agent
participant RMQ as RabbitMQ
participant Waldur as Waldur Mastermind
Agent->>RMQ: STOMP SUBSCRIBE to queue
Note over RMQ: Queue auto-created<br/>WITHOUT special arguments
Waldur->>RMQ: Publish message with<br/>x-dead-letter-exchange header
RMQ-->>Waldur: PRECONDITION_FAILED
Note over RMQ: Queue arguments mismatch!
The solution requires agents to create queues via API before subscribing:
sequenceDiagram
participant Agent as Site Agent
participant Waldur as Waldur Mastermind
participant RMQ as RabbitMQ
Agent->>Waldur: POST /create_queue/
Waldur->>RMQ: Create queue with correct arguments
RMQ-->>Waldur: Queue created
Waldur-->>Agent: 201 Created (queue_name, vhost)
Agent->>RMQ: STOMP SUBSCRIBE to pre-created queue
Note over RMQ: Queue already exists<br/>with correct arguments
Waldur->>RMQ: Publish message with headers
RMQ-->>Agent: Message delivered
Architecture
Components
| Component | Location | Purpose |
|---|---|---|
EventSubscriptionQueue model |
waldur_core/logging/models.py |
Tracks queue registrations |
create_queue API action |
marketplace_site_agent/views.py |
Creates queues via API |
RabbitMQManagementBackend.create_queue() |
waldur_core/logging/backend.py |
RabbitMQ Management API calls |
prepare_messages() queue check |
marketplace/utils.py |
Skips unregistered queues |
pre_delete signal handler |
waldur_core/logging/handlers.py |
Cleans up RabbitMQ on deletion |
cleanup_orphan_subscription_queues task |
waldur_core/logging/tasks.py |
Removes orphaned queues |
Queue Naming Convention
Queue names follow the pattern:
1 | |
Example: subscription_a1b2c3d4_offering_e5f6g7h8_resource
Queue Arguments
All subscription queues are created with these RabbitMQ arguments:
1 2 3 4 5 6 | |
Queue Lifecycle
Creation Flow
sequenceDiagram
participant Agent as Site Agent
participant API as Waldur API
participant DB as PostgreSQL
participant RMQ as RabbitMQ
Agent->>API: POST /agent-identities/{uuid}/create_queue/
Note over Agent,API: {offering_uuid, object_type}
API->>API: Validate offering access
API->>DB: Check if queue exists
alt Queue exists
API->>RMQ: Ensure queue exists (idempotent)
API-->>Agent: 200 OK (existing queue)
else Queue doesn't exist
API->>RMQ: PUT /api/queues/{vhost}/{name}
RMQ-->>API: 201 Created
API->>DB: INSERT EventSubscriptionQueue
API-->>Agent: 201 Created (new queue)
end
Deletion Flow (Signal-Based)
When an EventSubscriptionQueue record is deleted (directly or via cascade),
a pre_delete signal automatically removes the RabbitMQ queue:
sequenceDiagram
participant Client as API Client
participant Django as Django ORM
participant Signal as pre_delete Signal
participant RMQ as RabbitMQ
Client->>Django: Delete EventSubscription
Django->>Django: CASCADE to EventSubscriptionQueue
loop For each queue record
Django->>Signal: pre_delete triggered
Signal->>RMQ: DELETE /api/queues/{vhost}/{name}
RMQ-->>Signal: 204 No Content
end
Django->>Django: Delete DB records
Django-->>Client: Success
Orphan Queue Cleanup
A periodic task runs every 6 hours to find and remove orphaned queues (RabbitMQ queues without matching DB records):
sequenceDiagram
participant Celery as Celery Beat
participant Task as cleanup_orphan_subscription_queues
participant RMQ as RabbitMQ
participant DB as PostgreSQL
Celery->>Task: Execute task (every 6 hours)
Task->>RMQ: List all subscription_* queues
RMQ-->>Task: Queue list per vhost
loop For each queue
Task->>DB: Check EventSubscriptionQueue exists
alt No matching record
Task->>RMQ: DELETE queue
Note over Task: Log: "Deleted orphan queue"
end
end
Cleanup Mechanisms
1. Signal-Based Cleanup (Real-Time)
Trigger: EventSubscriptionQueue record deletion
Handler: cleanup_rabbitmq_queue_on_delete in handlers.py
Behavior:
- Fires on pre_delete signal
- Calls RabbitMQManagementBackend.delete_queue()
- Logs warning on failure but doesn't block deletion
2. Orphan Queue Cleanup (Periodic)
Task: cleanup_orphan_subscription_queues
Schedule: Every 6 hours (configurable in celery beat)
Behavior:
- Lists all subscription_* queues from RabbitMQ
- Compares against EventSubscriptionQueue records
- Deletes queues with no matching DB record
- Continues processing even if individual deletes fail
3. Stale Subscription Cleanup (Existing)
Task: delete_stale_event_subscriptions
Schedule: Every 24 hours
Behavior:
- Removes subscriptions for users with expired tokens
- CASCADE deletes EventSubscriptionQueue records
- Signal handler cleans up RabbitMQ queues
API Reference
Create Queue
1 | |
Request:
1 2 3 4 | |
Response (201 Created):
1 2 3 4 5 6 7 8 | |
Response (200 OK): Same format, returned when queue already exists.
Valid object_type values:
- resource
- order
- user_role
- service_account
- course_account
- importable_resources
Monitoring
Check Queue Status
1 2 3 4 5 6 7 | |
Watch for Errors
1 2 3 4 5 | |
Django Shell Queries
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | |
Troubleshooting
Queue Creation Fails
Symptom: API returns 400/500 on create_queue
Check: 1. RabbitMQ is running and accessible 2. User has valid EventSubscription 3. Offering UUID exists and user has access
Messages Not Delivered
Symptom: Events published but agent doesn't receive them
Check:
1. Queue exists in RabbitMQ with correct arguments
2. EventSubscriptionQueue record exists in DB
3. Waldur logs for "Queue not registered... Skipping"
Orphan Queues Accumulating
Symptom: RabbitMQ has subscription queues with no consumers
Fix: 1. Run cleanup task manually:
1 2 | |
precondition_failed Errors
Symptom: RabbitMQ logs show PRECONDITION_FAILED - inequivalent arg
Cause: Queue was created by STOMP subscriber before API call
Fix:
1. Delete the misconfigured queue from RabbitMQ
2. Ensure agent calls create_queue API before STOMP subscribe
3. Restart agent to recreate queue correctly
Configuration
Celery Beat Schedule
The cleanup tasks are registered in marketplace_site_agent/extension.py:
1 2 3 4 5 6 7 | |
Queue Arguments
Queue arguments are defined in waldur_core/logging/backend.py:
1 2 3 4 5 6 | |