NATS Transport

Real-time subscriptions and event streaming for DGP

Enable AI agents to subscribe to entity changes in real-time using NATS pub/sub, with request/reply for queries and mutations.

Overview

The NATS transport extends DGP with real-time capabilities, allowing agents to receive entity change events as they happen. This is essential for reactive agent architectures that need to respond to business events without polling.

Agent Gateway Query (request/reply) Response Subscribe (pub/sub) Event: Order.created Event: Order.updated Event: Order.state_changed
Installation pip install dgp-python[nats]

Components

AsyncNATSAdapter

Execute DGP queries and mutations over NATS using the request/reply pattern. Drop-in replacement for HTTP transport.

SubscriptionManager

Subscribe to entity change events with filtering. Supports ephemeral and durable (JetStream) subscriptions.

EventPublisher

Gateway-side component that publishes events after mutations. Integrates with the mutation executor.

DGPEvent

Standard event envelope with entity, actor, changes, and metadata. Follows the DGP NATS Transport Specification.

Subject Patterns

NATS subjects follow a hierarchical pattern for easy filtering and routing:

Pattern Purpose Example
dgp.{org}.query Query requests dgp.commerce.query
dgp.{org}.mutation Mutation requests dgp.commerce.mutation
dgp.{org}.subscription.{entity}.{event_type} All events of type for entity class dgp.commerce.subscription.Order.created
dgp.{org}.subscription.{entity}.{event_type}.{entity_id} Entity-specific events dgp.commerce.subscription.Order.updated.ord_123
dgp.{org}.subscription.{entity}.* Wildcard (all event types, broadcast) dgp.commerce.subscription.Order.*

Subscribing to Events

Agents can subscribe to entity changes with optional filtering:

from dgp.subscriptions import (
    SubscriptionManager,
    SubscriptionRequest,
    DGPEvent,
)

# Create subscription manager
manager = SubscriptionManager(nats_client, organization="commerce")

# Define event handler
async def handle_order_event(event: DGPEvent):
    print(f"Order {event.entity_id} {event.event_type.value}")
    if event.changes:
        print(f"  Changes: {event.changes}")

# Subscribe to Order events
subscription = await manager.subscribe(
    SubscriptionRequest(
        entity="Order",
        events=["created", "updated", "state_changed"],
        where={"status": {"in": ["pending", "confirmed"]}}
    ),
    callback=handle_order_event,
    durable="my-agent-orders"  # Optional: survives reconnects
)

# Later: unsubscribe
await subscription.unsubscribe()

Subscription Filtering

The where clause supports client-side filtering with these operators:

  • eq, ne - Equality / inequality
  • gt, gte, lt, lte - Comparisons
  • in - Value in list
  • contains - String contains

Publishing Events

The gateway publishes events after successful mutations:

from dgp.subscriptions import (
    EventPublisher,
    Actor,
    ActorType,
)

# Create publisher
publisher = EventPublisher(nats_client, organization="commerce")

# Publish after creating an order
result = await publisher.publish_created(
    entity="Order",
    entity_id="ord_123",
    actor=Actor(type=ActorType.AGENT, id="agent_xyz"),
    data={
        "id": "ord_123",
        "status": "pending",
        "total": 99.99
    }
)

# Publish after updating
result = await publisher.publish_updated(
    entity="Order",
    entity_id="ord_123",
    actor=Actor(type=ActorType.USER, id="user_456"),
    changes={"status": {"from": "pending", "to": "confirmed"}},
    data={"id": "ord_123", "status": "confirmed"}
)

# Publish state transition
result = await publisher.publish_state_changed(
    entity="Order",
    entity_id="ord_123",
    actor=Actor(type=ActorType.SYSTEM, id="fulfillment-service"),
    from_state="confirmed",
    to_state="shipped"
)

Event Envelope

All events follow a standard envelope format:

{
    "dgp_version": "0.9.0",
    "event_id": "evt_abc123def456",
    "timestamp": "2026-01-06T12:34:56.789Z",
    "domain": "commerce",
    "entity": "Order",
    "entity_id": "ord_123",
    "event_type": "updated",
    "actor": {
        "type": "agent",
        "id": "agent_xyz",
        "name": "Order Processing Agent"
    },
    "changes": {
        "status": {"from": "pending", "to": "confirmed"}
    },
    "data": {
        "id": "ord_123",
        "status": "confirmed",
        "total": 99.99
    }
}

Query/Mutation over NATS

Use AsyncNATSAdapter as an alternative to HTTP transport:

from dgp.adapters.nats import AsyncNATSAdapter, NATSConfig
from dgp.query import query

# Configure adapter
config = NATSConfig(
    servers=["nats://localhost:4222"],
    organization="commerce",
    request_timeout=30.0
)

adapter = AsyncNATSAdapter(config)
await adapter.connect()

# Execute query
customer_query = query("Customer").where("id", eq="cust_123").build()
result = await adapter.execute(customer_query)

# Execute mutation
mutation_result = await adapter.execute_mutation(
    operation="create_order",
    entity="Order",
    input_data={"customer_id": "cust_123", "items": [...]}
)

await adapter.disconnect()

JetStream for Durability

For production use, enable JetStream for durable subscriptions that survive agent restarts and network interruptions:

  • Durable subscriptions - Resume from last acknowledged message
  • At-least-once delivery - Events are redelivered until acknowledged
  • Replay capability - Replay events from a specific time or sequence
  • Consumer groups - Load balance events across multiple agent instances

Event Types

Event Type Description Includes Changes
created Entity was created No (full data included)
updated Entity attributes changed Yes
deleted Entity was deleted No
state_changed State machine transition Yes (from/to state)
relationship_added Relationship created Related entity info
relationship_removed Relationship removed Related entity info