Skip to content

MQTT-Based Event Notification System for Order Processing

System Overview

The MQTT-based event notification system allows Waldur to communicate changes to resources, orders, and user roles to the waldur-site-agent that runs on a remote cluster. This eliminates the need for constant polling and enables immediate reactions to events.

The key components include:

  1. MQTT Publisher (Waldur side): Located in the waldur_mastermind/marketplace_slurm_remote/handlers.py and utils.py files, this component publishes messages to MQTT topics when specific events occur.

  2. Event Subscription Service: Manages subscriptions to events by creating unique topics for each type of notification.

  3. MQTT Consumer (Agent side): The waldur-site-agent running on the resource provider's infrastructure that subscribes to these topics and processes incoming messages.

Event Flow

  1. An event occurs in Waldur (e.g., a new order is created, a user role changes, or a resource is updated)
  2. Waldur publishes a message to the appropriate MQTT topic
  3. The site agent receives the message and processes it based on the event type
  4. The agent communicates with the backend (e.g., SLURM) to execute the necessary actions

Message Types

The system handles three primary types of events:

  1. Order Messages: Notifications about marketplace orders (create, update, terminate)
  2. User Role Messages: Changes to user permissions in projects
  3. Resource Messages: Updates to resource configuration or status

Implementation Details

Publishing Messages (Waldur Side)

When events like order creation occur, Waldur prepares and publishes MQTT messages:

1
2
3
4
5
def prepare_mqtt_messages(offering, payload, affected_object):
    """...prepares MQTT messages for marketplace events..."""
    # Determines which users should receive the notification
    # Creates appropriate topic names
    # Returns a list of messages to be sent

These messages are then sent via:

1
logging_tasks.publish_mqtt_messages.delay(messages)

Subscription Management (Agent Side)

The EventSubscriptionManager class handles creation of event subscriptions and setup of MQTT consumers:

1
2
3
def create_event_subscription(self):
    """Create event subscription for specific object types"""
    # ...
1
2
3
4
5
def start_mqtt_consumer(self, event_subscription):
    """Start MQTT consumer for the subscription"""
    # Setup MQTT client
    # Connect to broker
    # Subscribe to topic

Message Processing (Agent Side)

When a message arrives, it's routed to the appropriate handler based on the event type:

1
2
3
def on_order_message(client, userdata, msg):
    # Process order messages
    # Create or update resources on backend
1
2
3
def on_user_role_message(client, userdata, msg):
    # Process user role changes
    # Update access permissions on backend
1
2
3
def on_resource_message(client, userdata, msg):
    # Process resource updates
    # Update resource configuration on backend

Technical Components

  1. WebSocket Transport: The system uses MQTT over WebSockets for communication
  2. TLS Security: Connections can be secured with TLS
  3. User Authentication: Each subscription has its own credentials
  4. Topic Structure: Topics follow the pattern subscription/{sub_uuid}/offering/{offering_uuid}/{affected_object}

Error Handling and Resilience

The system includes:

  • Graceful connection handling
  • Signal handlers for proper shutdown
  • Retry mechanisms for order processing
  • Error logging and optional Sentry integration

Benefits of the MQTT Approach

  1. Real-time Processing: Actions are triggered immediately when events occur
  2. Reduced Network Traffic: No constant polling needed
  3. Decoupling: The agent doesn't need direct access to Waldur's database
  4. Scalability: Multiple agents can subscribe to different events
  5. Reliability: The MQTT protocol provides QoS options to ensure message delivery

This event-driven architecture significantly improves the responsiveness and efficiency of the order processing system compared to traditional polling approaches.