Agent skill

using-message-queues

Async communication patterns using message brokers and task queues. Use when building event-driven systems, background job processing, or service decoupling. Covers Kafka (event streaming), RabbitMQ (complex routing), NATS (cloud-native), Redis Streams, Celery (Python), BullMQ (TypeScript), Temporal (workflows), and event sourcing patterns.

Stars 333
Forks 51

Install this agent skill to your Project

npx add-skill https://github.com/ancoleman/ai-design-components/tree/main/skills/using-message-queues

SKILL.md

Message Queues

Implement asynchronous communication patterns for event-driven architectures, background job processing, and service decoupling.

When to Use This Skill

Use message queues when:

  • Long-running operations block HTTP requests (report generation, video processing)
  • Service decoupling required (microservices, event-driven architecture)
  • Guaranteed delivery needed (payment processing, order fulfillment)
  • Event streaming for analytics (log aggregation, metrics pipelines)
  • Workflow orchestration for complex processes (multi-step sagas, human-in-the-loop)
  • Background job processing (email sending, image resizing)

Broker Selection Decision Tree

Choose message broker based on primary need:

Event Streaming / Log Aggregation

→ Apache Kafka

  • Throughput: 500K-1M msg/s
  • Replay events (event sourcing)
  • Exactly-once semantics
  • Long-term retention
  • Use: Analytics pipelines, CQRS, event sourcing

Simple Background Jobs

→ Task Queues

  • Python → Celery + Redis
  • TypeScript → BullMQ + Redis
  • Go → Asynq + Redis
  • Use: Email sending, report generation, webhooks

Complex Workflows / Sagas

→ Temporal

  • Durable execution (survives restarts)
  • Saga pattern support
  • Human-in-the-loop workflows
  • Use: Order processing, AI agent orchestration

Request-Reply / RPC Patterns

→ NATS

  • Built-in request-reply
  • Sub-millisecond latency
  • Cloud-native, simple operations
  • Use: Microservices RPC, IoT command/control

Complex Message Routing

→ RabbitMQ

  • Exchanges (direct, topic, fanout, headers)
  • Dead letter exchanges
  • Message TTL, priorities
  • Use: Multi-consumer patterns, pub/sub

Already Using Redis

→ Redis Streams

  • No new infrastructure
  • Simple consumer groups
  • Moderate throughput (100K+ msg/s)
  • Use: Notification queues, simple job queues

Performance Comparison

Broker Throughput Latency (p99) Best For
Kafka 500K-1M msg/s 10-50ms Event streaming
NATS JetStream 200K-400K msg/s Sub-ms to 5ms Cloud-native microservices
RabbitMQ 50K-100K msg/s 5-20ms Task queues, complex routing
Redis Streams 100K+ msg/s Sub-ms Simple queues, caching

Quick Start Examples

Kafka Producer/Consumer (Python)

See examples/kafka-python/ for working code.

python
from confluent_kafka import Producer, Consumer

# Producer
producer = Producer({'bootstrap.servers': 'localhost:9092'})
producer.produce('orders', key='order_123', value='{"status": "created"}')
producer.flush()

# Consumer
consumer = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'order-processors',
    'auto.offset.reset': 'earliest'
})
consumer.subscribe(['orders'])

while True:
    msg = consumer.poll(1.0)
    if msg is not None:
        process_order(msg.value())

Celery Background Jobs (Python)

See examples/celery-image-processing/ for full implementation.

python
from celery import Celery

app = Celery('tasks', broker='redis://localhost:6379')

@app.task(bind=True, max_retries=3)
def process_image(self, image_url: str):
    try:
        result = expensive_image_processing(image_url)
        return result
    except RecoverableError as e:
        raise self.retry(exc=e, countdown=60)

BullMQ Job Processing (TypeScript)

See examples/bullmq-webhook-processor/ for full implementation.

typescript
import { Queue, Worker } from 'bullmq'

const queue = new Queue('webhooks', {
  connection: { host: 'localhost', port: 6379 }
})

// Enqueue job
await queue.add('send-webhook', {
  url: 'https://example.com/webhook',
  payload: { event: 'order.created' }
})

// Process jobs
const worker = new Worker('webhooks', async job => {
  await fetch(job.data.url, {
    method: 'POST',
    body: JSON.stringify(job.data.payload)
  })
}, { connection: { host: 'localhost', port: 6379 } })

Temporal Workflow Orchestration

See examples/temporal-order-saga/ for saga pattern implementation.

python
from temporalio import workflow, activity
from datetime import timedelta

@workflow.defn
class OrderSagaWorkflow:
    @workflow.run
    async def run(self, order_id: str) -> str:
        # Step 1: Reserve inventory
        inventory_id = await workflow.execute_activity(
            reserve_inventory,
            order_id,
            start_to_close_timeout=timedelta(seconds=10),
        )

        # Step 2: Charge payment
        payment_id = await workflow.execute_activity(
            charge_payment,
            order_id,
            start_to_close_timeout=timedelta(seconds=30),
        )

        return f"Order {order_id} completed"

Core Patterns

Event Naming Convention

Use: Domain.Entity.Action.Version

Examples:

  • order.created.v1
  • user.profile.updated.v2
  • payment.failed.v1

Event Schema Structure

json
{
  "event_type": "order.created.v2",
  "event_id": "uuid-here",
  "timestamp": "2025-12-02T10:00:00Z",
  "version": "2.0",
  "data": {
    "order_id": "ord_123",
    "customer_id": "cus_456"
  },
  "metadata": {
    "producer": "order-service",
    "trace_id": "abc123",
    "correlation_id": "xyz789"
  }
}

Dead Letter Queue Pattern

Route failed messages to dead letter queue (DLQ) after max retries:

python
@app.task(bind=True, max_retries=3)
def process_order(self, order_id: str):
    try:
        result = perform_processing(order_id)
        return result
    except UnrecoverableError as e:
        send_to_dlq(order_id, str(e))
        raise Reject(e, requeue=False)

Idempotency for Exactly-Once Processing

python
@app.post("/process")
async def process_payment(
    payment_data: dict,
    idempotency_key: str = Header(None)
):
    # Check if already processed
    cached_result = redis_client.get(f"idempotency:{idempotency_key}")
    if cached_result:
        return {"status": "already_processed"}

    result = process_payment_logic(payment_data)
    redis_client.setex(f"idempotency:{idempotency_key}", 86400, result)
    return {"status": "processed", "result": result}

Frontend Integration

Job Status Updates via SSE

python
# FastAPI endpoint for real-time job status
@app.get("/status/{task_id}")
async def task_status_stream(task_id: str):
    async def event_generator():
        while True:
            task = celery_app.AsyncResult(task_id)

            if task.state == 'PROGRESS':
                yield {"event": "progress", "data": task.info.get('progress', 0)}
            elif task.state == 'SUCCESS':
                yield {"event": "complete", "data": task.result}
                break

            await asyncio.sleep(0.5)

    return EventSourceResponse(event_generator())

React Component

typescript
export function JobStatus({ jobId }: { jobId: string }) {
  const [progress, setProgress] = useState(0)

  useEffect(() => {
    const eventSource = new EventSource(`/api/status/${jobId}`)

    eventSource.addEventListener('progress', (e) => {
      setProgress(JSON.parse(e.data))
    })

    eventSource.addEventListener('complete', (e) => {
      toast({ title: 'Job complete', description: JSON.parse(e.data) })
      eventSource.close()
    })

    return () => eventSource.close()
  }, [jobId])

  return <ProgressBar value={progress} />
}

Detailed Guides

For comprehensive documentation, see reference files:

Broker-Specific Guides

  • Kafka: See references/kafka.md for partitioning, consumer groups, exactly-once semantics
  • RabbitMQ: See references/rabbitmq.md for exchanges, bindings, routing patterns
  • NATS: See references/nats.md for JetStream, request-reply patterns
  • Redis Streams: See references/redis-streams.md for consumer groups, acknowledgments

Task Queue Guides

  • Celery: See references/celery.md for periodic tasks, canvas (workflows), monitoring
  • BullMQ: See references/bullmq.md for job prioritization, flows, Bull Board monitoring
  • Temporal: See references/temporal-workflows.md for saga patterns, signals, queries

Pattern Guides

  • Event Patterns: See references/event-patterns.md for event sourcing, CQRS, outbox pattern

Common Anti-Patterns to Avoid

1. Synchronous API for Long Operations

python
# ❌ BAD: Blocks request thread
@app.post("/generate-report")
def generate_report(user_id: str):
    report = expensive_computation(user_id)  # 5 minutes!
    return report

# ✅ GOOD: Enqueue background job
@app.post("/generate-report")
async def generate_report(user_id: str):
    task = generate_report_task.delay(user_id)
    return {"task_id": task.id}

2. Non-Idempotent Consumers

python
# ❌ BAD: Processes duplicates
@app.task
def send_email(email: str):
    send_email_service(email)  # Sends twice if retried!

# ✅ GOOD: Idempotent with deduplication
@app.task
def send_email(email: str, idempotency_key: str):
    if redis.exists(f"sent:{idempotency_key}"):
        return "already_sent"
    send_email_service(email)
    redis.setex(f"sent:{idempotency_key}", 86400, "1")

3. Ignoring Dead Letter Queues

python
# ❌ BAD: Failed messages lost forever
@app.task(max_retries=3)
def risky_task(data):
    process(data)  # If all retries fail, data disappears

# ✅ GOOD: DLQ for manual inspection
@app.task(max_retries=3)
def risky_task(data):
    try:
        process(data)
    except Exception as e:
        if self.request.retries >= 3:
            send_to_dlq(data, str(e))
        raise

4. Using Kafka for Request-Reply

python
# ❌ BAD: Kafka is not designed for RPC
def get_user_profile(user_id: str):
    kafka_producer.send("user_requests", {"user_id": user_id})
    # How to correlate response? Kafka is asynchronous!

# ✅ GOOD: Use NATS request-reply or HTTP/gRPC
response = await nats.request("user.profile", user_id.encode())

Library Recommendations

Context7 Research

Confluent Kafka (Python)

  • Context7 ID: /confluentinc/confluent-kafka-python
  • Trust Score: 68.8/100
  • Code Snippets: 192+
  • Production-ready Python Kafka client

Temporal

  • Context7 ID: /websites/temporal_io
  • Trust Score: 80.9/100
  • Code Snippets: 3,769+
  • Workflow orchestration for durable execution

Installation

Python:

bash
pip install confluent-kafka celery[redis] temporalio aio-pika redis

TypeScript/Node.js:

bash
npm install kafkajs bullmq @temporalio/client amqplib ioredis

Rust:

bash
cargo add rdkafka lapin async-nats redis

Go:

bash
go get github.com/confluentinc/confluent-kafka-go
go get github.com/hibiken/asynq
go get go.temporal.io/sdk

Utilities

Use scripts for setup automation:

  • Kafka setup: Run python scripts/kafka_producer_consumer.py for test utilities
  • Schema validation: Run python scripts/validate_message_schema.py to validate event schemas

Related Skills

  • api-patterns: API design for async job submission
  • realtime-sync: WebSocket/SSE for job status updates
  • feedback: Toast notifications for job completion
  • databases-*: Persistent storage for event logs
  • observability: Tracing and metrics for queue operations

Expand your agent's capabilities with these related and highly-rated skills.

ancoleman/ai-design-components

designing-sdks

Design production-ready SDKs with retry logic, error handling, pagination, and multi-language support. Use when building client libraries for APIs or creating developer-facing SDK interfaces.

333 51
Explore
ancoleman/ai-design-components

administering-linux

Manage Linux systems covering systemd services, process management, filesystems, networking, performance tuning, and troubleshooting. Use when deploying applications, optimizing server performance, diagnosing production issues, or managing users and security on Linux servers.

333 51
Explore
ancoleman/ai-design-components

implementing-api-patterns

API design and implementation across REST, GraphQL, gRPC, and tRPC patterns. Use when building backend services, public APIs, or service-to-service communication. Covers REST frameworks (FastAPI, Axum, Gin, Hono), GraphQL libraries (Strawberry, async-graphql, gqlgen, Pothos), gRPC (Tonic, Connect-Go), tRPC for TypeScript, pagination strategies (cursor-based, offset-based), rate limiting, caching, versioning, and OpenAPI documentation generation. Includes frontend integration patterns for forms, tables, dashboards, and ai-chat skills.

333 51
Explore
ancoleman/ai-design-components

prompt-engineering

Engineer effective LLM prompts using zero-shot, few-shot, chain-of-thought, and structured output techniques. Use when building LLM applications requiring reliable outputs, implementing RAG systems, creating AI agents, or optimizing prompt quality and cost. Covers OpenAI, Anthropic, and open-source models with multi-language examples (Python/TypeScript).

333 51
Explore
ancoleman/ai-design-components

deploying-applications

Deployment patterns from Kubernetes to serverless and edge functions. Use when deploying applications, setting up CI/CD, or managing infrastructure. Covers Kubernetes (Helm, ArgoCD), serverless (Vercel, Lambda), edge (Cloudflare Workers, Deno), IaC (Pulumi, OpenTofu, SST), and GitOps patterns.

333 51
Explore
ancoleman/ai-design-components

optimizing-costs

Optimize cloud infrastructure costs through FinOps practices, commitment discounts, right-sizing, and automated cost management. Use when reducing cloud spend, implementing budget controls, or establishing cost visibility across AWS, Azure, GCP, and Kubernetes environments.

333 51
Explore

Didn't find tool you were looking for?

Be as detailed as possible for better results