NATS Transport
Real-time subscriptions and event streaming for DGP
Enable AI agents to subscribe to entity changes in real-time using NATS pub/sub, with request/reply for queries and mutations.
Overview
The NATS transport extends DGP with real-time capabilities, allowing agents to receive entity change events as they happen. This is essential for reactive agent architectures that need to respond to business events without polling.
pip install dgp-python[nats]
Components
AsyncNATSAdapter
Execute DGP queries and mutations over NATS using the request/reply pattern. Drop-in replacement for HTTP transport.
SubscriptionManager
Subscribe to entity change events with filtering. Supports ephemeral and durable (JetStream) subscriptions.
EventPublisher
Gateway-side component that publishes events after mutations. Integrates with the mutation executor.
DGPEvent
Standard event envelope with entity, actor, changes, and metadata. Follows the DGP NATS Transport Specification.
Subject Patterns
NATS subjects follow a hierarchical pattern for easy filtering and routing:
| Pattern | Purpose | Example |
|---|---|---|
dgp.{org}.query |
Query requests | dgp.commerce.query |
dgp.{org}.mutation |
Mutation requests | dgp.commerce.mutation |
dgp.{org}.subscription.{entity}.{event_type} |
All events of type for entity class | dgp.commerce.subscription.Order.created |
dgp.{org}.subscription.{entity}.{event_type}.{entity_id} |
Entity-specific events | dgp.commerce.subscription.Order.updated.ord_123 |
dgp.{org}.subscription.{entity}.* |
Wildcard (all event types, broadcast) | dgp.commerce.subscription.Order.* |
Subscribing to Events
Agents can subscribe to entity changes with optional filtering:
from dgp.subscriptions import ( SubscriptionManager, SubscriptionRequest, DGPEvent, ) # Create subscription manager manager = SubscriptionManager(nats_client, organization="commerce") # Define event handler async def handle_order_event(event: DGPEvent): print(f"Order {event.entity_id} {event.event_type.value}") if event.changes: print(f" Changes: {event.changes}") # Subscribe to Order events subscription = await manager.subscribe( SubscriptionRequest( entity="Order", events=["created", "updated", "state_changed"], where={"status": {"in": ["pending", "confirmed"]}} ), callback=handle_order_event, durable="my-agent-orders" # Optional: survives reconnects ) # Later: unsubscribe await subscription.unsubscribe()
Subscription Filtering
The where clause supports client-side filtering with these operators:
eq,ne- Equality / inequalitygt,gte,lt,lte- Comparisonsin- Value in listcontains- String contains
Publishing Events
The gateway publishes events after successful mutations:
from dgp.subscriptions import ( EventPublisher, Actor, ActorType, ) # Create publisher publisher = EventPublisher(nats_client, organization="commerce") # Publish after creating an order result = await publisher.publish_created( entity="Order", entity_id="ord_123", actor=Actor(type=ActorType.AGENT, id="agent_xyz"), data={ "id": "ord_123", "status": "pending", "total": 99.99 } ) # Publish after updating result = await publisher.publish_updated( entity="Order", entity_id="ord_123", actor=Actor(type=ActorType.USER, id="user_456"), changes={"status": {"from": "pending", "to": "confirmed"}}, data={"id": "ord_123", "status": "confirmed"} ) # Publish state transition result = await publisher.publish_state_changed( entity="Order", entity_id="ord_123", actor=Actor(type=ActorType.SYSTEM, id="fulfillment-service"), from_state="confirmed", to_state="shipped" )
Event Envelope
All events follow a standard envelope format:
{
"dgp_version": "0.9.0",
"event_id": "evt_abc123def456",
"timestamp": "2026-01-06T12:34:56.789Z",
"domain": "commerce",
"entity": "Order",
"entity_id": "ord_123",
"event_type": "updated",
"actor": {
"type": "agent",
"id": "agent_xyz",
"name": "Order Processing Agent"
},
"changes": {
"status": {"from": "pending", "to": "confirmed"}
},
"data": {
"id": "ord_123",
"status": "confirmed",
"total": 99.99
}
}
Query/Mutation over NATS
Use AsyncNATSAdapter as an alternative to HTTP transport:
from dgp.adapters.nats import AsyncNATSAdapter, NATSConfig from dgp.query import query # Configure adapter config = NATSConfig( servers=["nats://localhost:4222"], organization="commerce", request_timeout=30.0 ) adapter = AsyncNATSAdapter(config) await adapter.connect() # Execute query customer_query = query("Customer").where("id", eq="cust_123").build() result = await adapter.execute(customer_query) # Execute mutation mutation_result = await adapter.execute_mutation( operation="create_order", entity="Order", input_data={"customer_id": "cust_123", "items": [...]} ) await adapter.disconnect()
JetStream for Durability
For production use, enable JetStream for durable subscriptions that survive agent restarts and network interruptions:
- Durable subscriptions - Resume from last acknowledged message
- At-least-once delivery - Events are redelivered until acknowledged
- Replay capability - Replay events from a specific time or sequence
- Consumer groups - Load balance events across multiple agent instances
Event Types
| Event Type | Description | Includes Changes |
|---|---|---|
created |
Entity was created | No (full data included) |
updated |
Entity attributes changed | Yes |
deleted |
Entity was deleted | No |
state_changed |
State machine transition | Yes (from/to state) |
relationship_added |
Relationship created | Related entity info |
relationship_removed |
Relationship removed | Related entity info |