Skip to content

STOMP-Based Event Notification System

System Overview

The STOMP-based event notification system enables Waldur to communicate changes to resources, orders, user roles, and other events to external systems via message queues. This eliminates the need for constant polling and enables immediate reactions to events in distributed architectures.

The key components include:

  1. STOMP Publisher (Waldur side): Located in the waldur_core/logging/utils.py file, this component publishes messages to STOMP queues when specific events occur.

  2. Event Subscription Service: Manages subscriptions to events by creating unique topics for each type of notification. Related file: event subscription management via API: waldur_core/logging/views.py

  3. STOMP Consumer (External System): Any external system that subscribes to these topics and processes incoming messages. This can be:

  4. The waldur-site-agent running on resource provider infrastructure
  5. Custom integration services (e.g., SharePoint integration, external notification systems)
  6. Third-party systems that need to react to Waldur events

Event Flow

  1. An event occurs in Waldur (e.g., a new order is created, a user role changes, or a resource is updated)
  2. Waldur publishes a message to the appropriate STOMP queue(s)
  3. External systems (agents, integrations, or third-party services) receive the message and process it based on the event type
  4. The consuming system executes the necessary actions based on the message content

Queue Naming Strategy

The system follows an object-based naming convention for STOMP queues rather than event-based naming. This design choice provides several benefits:

  • Simplified Client Configuration: Clients subscribe to object types (e.g., resource_periodic_limits) rather than specific event types
  • Action Flexibility: Specific actions (e.g., apply_periodic_settings, update_limits) are stored in the message payload
  • Easier Maintenance: Adding new actions doesn't require queue reconfiguration
  • Future Migration Path: Sets foundation for eventual migration to event-based naming without immediate client changes

Current Approach:

  • Queue: resource_periodic_limits
  • Payload: {"action": "apply_periodic_settings", "settings": {...}}

Alternative Event-Based Approach (for future consideration):

  • Queue: resource_periodic_limits_update
  • More specific but requires client reconfiguration for each new event type

Message Types

The system handles several types of events:

  1. Order Messages (order): Notifications about marketplace orders (create, update, terminate)
  2. User Role Messages (user_role): Changes to user permissions in projects
  3. Resource Messages (resource): Updates to resource configuration or status
  4. Resource Periodic Limits (resource_periodic_limits): SLURM periodic usage policy updates with allocation and limit settings
  5. Offering User Messages (offering_user): Creation, updates, and deletion of offering users
  6. Service Account Messages (service_account): Service account lifecycle events
  7. Course Account Messages (course_account): Course account management events
  8. Importable Resources Messages (importable_resources): Backend resource discovery events

Implementation Details

Publishing Messages (Waldur Side)

Events are published through a standardized mechanism in Waldur:

  1. Event Detection: Events are triggered by Django signal handlers throughout the system
  2. Message Preparation: Event data is serialized into JSON format with standardized payload structure
  3. Queue Publishing: Messages are sent to appropriate queues using the publish_messages Celery task

The core publishing function is located in src/waldur_core/logging/tasks.py:118 and utilizes the publish_stomp_messages utility in src/waldur_core/logging/utils.py:93.

Offering User Event Messages

Offering user events are published when offering users are created, updated, or deleted. These handlers are located in waldur_mastermind/marketplace/handlers.py:

  • send_offering_user_created_message - Triggers when an OfferingUser is created
  • send_offering_user_updated_message - Triggers when an OfferingUser is updated
  • send_offering_user_deleted_message - Triggers when an OfferingUser is deleted

Message Payload Structure for OfferingUser Events:

1
2
3
4
5
6
7
8
9
{
  "offering_user_uuid": "uuid-hex-string",
  "user_uuid": "user-uuid-hex-string",
  "username": "generated-username",
  "state": "OK|Requested|Creating|Pending account linking|Pending additional validation|Requested deletion|Deleting|Deleted|Error creating|Error deleting",
  "action": "create|update|delete",
  "offering_uuid": "offering-uuid",
  "changed_fields": ["field1", "field2"]  // Only present for updates
}

Event Triggers:

  • Create: When a new offering user account is created for a user in an offering
  • Update: When any field of an existing offering user is modified (username, state, etc.)
  • Delete: When an offering user account is removed from an offering

Resource Periodic Limits Event Messages

Resource periodic limits events are published when SLURM periodic usage policies are applied to resources. These messages contain calculated SLURM settings including allocation limits, fairshare values, and QoS thresholds. The handler is located in waldur_mastermind/policy/models.py.

Message Payload Structure for Resource Periodic Limits:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
{
  "resource_uuid": "resource-uuid-hex-string",
  "backend_id": "slurm-account-name",
  "offering_uuid": "offering-uuid-hex-string",
  "action": "apply_periodic_settings",
  "timestamp": "2024-01-01T00:00:00.000000",
  "settings": {
    "fairshare": 333,
    "limit_type": "GrpTRESMins",
    "grp_tres_mins": {
      "billing": 119640
    },
    "qos_threshold": {
      "billing": 119640
    },
    "grace_limit": {
      "billing": 143568
    },
    "carryover_details": {
      "carryover_applied": true,
      "previous_period": "2023-Q4",
      "previous_usage": 750.0,
      "decay_factor": 0.015625,
      "effective_previous_usage": 11.7,
      "unused_allocation": 988.3,
      "base_allocation": 1000.0,
      "total_allocation": 1988.3
    }
  }
}

Event Triggers:

  • Policy Application: When a SLURM periodic usage policy calculates new allocation limits and sends them to the site agent
  • Carryover Calculation: When unused allocation from previous periods is calculated with decay factors
  • Limit Updates: When fairshare values, TRES limits, or QoS thresholds need to be updated on the SLURM backend

Subscription Management (Consumer Side)

External systems consuming events can be implemented with different levels of sophistication:

1. Simple Event Subscription (Basic Integration)

For basic integrations, implement a direct subscription pattern:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from waldur_api_client import AuthenticatedClient
from waldur_api_client.models import ObservableObjectTypeEnum
import stomp

# Create event subscription
client = AuthenticatedClient(base_url="https://api.waldur.com", token="your-token")
subscription = create_event_subscription(
    client,
    ObservableObjectTypeEnum.ORDER  # or other types
)

# Setup STOMP connection
connection = stomp.WSStompConnection(
    host_and_ports=[(stomp_host, stomp_port)],
    vhost=subscription.user_uuid.hex
)

# Implement message listener
class EventListener(stomp.ConnectionListener):
    def on_message(self, frame):
        message_data = json.loads(frame.body)
        # Process message based on action and content
        handle_event(message_data)

2. Structured Agent Pattern (Advanced Integration)

For more complex systems that need structured management and monitoring, use the AgentIdentity framework pattern from waldur-site-agent:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import datetime
from waldur_api_client.models import AgentIdentityRequest, AgentServiceCreateRequest, AgentProcessorCreateRequest
from waldur_api_client.api.marketplace_site_agent_identities import (
    marketplace_site_agent_identities_create,
    marketplace_site_agent_identities_register_service,
)
from waldur_api_client.api.marketplace_site_agent_services import (
    marketplace_site_agent_services_register_processor,
)

# Register agent identity
agent_identity_data = AgentIdentityRequest(
    offering=offering_uuid,
    name="my-integration-agent",
    version="1.0.0",
    dependencies=["stomp", "requests"],
    last_restarted=datetime.datetime.now(),
    config_file_path="/etc/my-agent/config.yaml",
    config_file_content="# agent configuration"
)

agent_identity = marketplace_site_agent_identities_create.sync(
    body=agent_identity_data,
    client=waldur_rest_client
)

# Register agent service for event processing
service_name = f"event_process-{observable_object_type}"
agent_service = marketplace_site_agent_identities_register_service.sync(
    uuid=agent_identity.uuid.hex,
    body=AgentServiceCreateRequest(
        name=service_name,
        mode="event_process"
    ),
    client=waldur_rest_client
)

# Register processors within the service
processor = marketplace_site_agent_services_register_processor.sync(
    uuid=agent_service.uuid.hex,
    body=AgentProcessorCreateRequest(
        name="order-processor",
        backend_type="CUSTOM_BACKEND",
        backend_version="2.0"
    ),
    client=waldur_rest_client
)

Benefits of AgentIdentity Pattern:

  • Monitoring: Track agent health, version, and dependencies in Waldur
  • Service Management: Organize multiple services within a single agent
  • Processor Tracking: Monitor individual processors and their backend versions
  • Configuration Management: Store and version configuration files
  • Statistics: Collect and report agent performance metrics

Message Processing (Consumer Side)

When a message arrives, it should be routed to appropriate handlers based on the event type and action. The message structure includes:

  • Event Type: Determined by the observable object type (order, user_role, resource, etc.)
  • Action: Specific operation to perform (create, update, delete, apply_periodic_settings, etc.)
  • Payload: Event-specific data needed to process the action

Message Processing Patterns:

The system supports different message processing approaches based on complexity:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# 1. Simple message processing (lightweight integration pattern)
class SimpleEventListener(stomp.ConnectionListener):
    def on_message(self, frame):
        try:
            message_data = json.loads(frame.body)
            message_type = self.get_message_type_from_queue(frame.headers.get('destination'))

            if message_type == 'order':
                self.handle_order(message_data)
            elif message_type == 'user_role':
                self.handle_user_role(message_data)

        except Exception as e:
            logger.error(f"Error processing message: {e}")

# 2. Structured agent processing (waldur-site-agent pattern)
OBJECT_TYPE_TO_HANDLER = {
    "order": handle_order_message_stomp,
    "user_role": handle_user_role_message_stomp,
    "resource": handle_resource_message_stomp,
    "resource_periodic_limits": handle_resource_periodic_limits_stomp,
    "service_account": handle_account_message_stomp,
    "course_account": handle_account_message_stomp,
    "importable_resources": handle_importable_resources_message_stomp,
}

def route_message(frame, offering, user_agent):
    """Route message to appropriate handler based on destination."""
    destination = frame.headers.get(HDR_DESTINATION, "")
    # Extract object type from queue name: subscription_xxx_offering_yyy_OBJECT_TYPE
    object_type = destination.split('_')[-1] if '_' in destination else ""

    handler = OBJECT_TYPE_TO_HANDLER.get(object_type)
    if handler:
        handler(frame, offering, user_agent)
    else:
        logger.warning(f"No handler found for object type: {object_type}")

API Endpoints

The event notification system provides REST API endpoints for managing event-based functionality (verified from OpenAPI specification):

Event Subscriptions

  • GET /api/event-subscriptions/ - List event subscriptions
  • POST /api/event-subscriptions/ - Create new event subscription
  • GET /api/event-subscriptions/{uuid}/ - Retrieve specific subscription
  • PATCH /api/event-subscriptions/{uuid}/ - Update subscription settings
  • DELETE /api/event-subscriptions/{uuid}/ - Delete subscription

Agent Identity Management

  • GET /api/marketplace-site-agent-identities/ - List agent identities
  • POST /api/marketplace-site-agent-identities/ - Register new agent identity
  • GET /api/marketplace-site-agent-identities/{uuid}/ - Retrieve agent identity
  • PATCH /api/marketplace-site-agent-identities/{uuid}/ - Update agent identity
  • DELETE /api/marketplace-site-agent-identities/{uuid}/ - Delete agent identity
  • POST /api/marketplace-site-agent-identities/{uuid}/register_service/ - Register service within agent
  • POST /api/marketplace-site-agent-identities/{uuid}/register_event_subscription/ - Register event subscription for agent

Agent Services

  • GET /api/marketplace-site-agent-services/ - List agent services
  • GET /api/marketplace-site-agent-services/{uuid}/ - Retrieve service details
  • PATCH /api/marketplace-site-agent-services/{uuid}/ - Update service
  • DELETE /api/marketplace-site-agent-services/{uuid}/ - Delete service
  • POST /api/marketplace-site-agent-services/{uuid}/register_processor/ - Register processor within service
  • POST /api/marketplace-site-agent-services/{uuid}/set_statistics/ - Update service statistics

Agent Processors

  • GET /api/marketplace-site-agent-processors/ - List agent processors
  • GET /api/marketplace-site-agent-processors/{uuid}/ - Retrieve processor details
  • PATCH /api/marketplace-site-agent-processors/{uuid}/ - Update processor
  • DELETE /api/marketplace-site-agent-processors/{uuid}/ - Delete processor

Monitoring & Statistics

  • GET /api/rabbitmq-vhost-stats/ - Get RabbitMQ virtual host statistics
  • GET /api/rabbitmq-user-stats/ - Get RabbitMQ user statistics

Utility Endpoints

  • POST /api/projects/{uuid}/sync_user_roles/ - Trigger user role synchronization for specific project

Technical Components

  1. WebSocket Transport: The system uses STOMP over WebSockets for communication
  2. TLS Security: Connections can be secured with TLS
  3. User Authentication: Each subscription has its own credentials and permissions in RabbitMQ
  4. Queue Structure: Queue names follow the pattern /queue/subscription_{subscription_uuid}_offering_{offering_uuid}_{observable_object_type}

Example queue names: - /queue/subscription_abc123_offering_def456_order - /queue/subscription_abc123_offering_def456_user_role - /queue/subscription_abc123_offering_def456_resource_periodic_limits

Error Handling and Resilience

The system includes:

  • Graceful connection handling
  • Signal handlers for proper shutdown
  • Retry mechanisms for order processing
  • Error logging and optional Sentry integration

Integration Examples

Real-world Implementations

  1. Waldur Site Agent: Full-featured agent for SLURM/HPC resource management
  2. Manages compute allocations, user accounts, and resource limits
  3. Implements structured AgentIdentity pattern with services and processors
  4. Handles complex periodic usage policies and carryover calculations

  5. External Billing Systems: Automated billing updates

  6. Subscribes to resource usage and order events
  7. Updates external accounting systems in real-time
  8. Reduces manual billing reconciliation

  9. Custom Integration Services: Lightweight integration patterns

  10. Process marketplace orders to create external resources
  11. Use simple subscription patterns for specific event types
  12. Demonstrate flexible integration approaches