Agent skill

anthropic-streaming-patterns

Use when integrating Claude API with streaming responses, implementing tool execution in streams, tracking API costs, or encountering streaming errors - provides Anthropic SDK 0.30.1+ patterns with mandatory cost monitoring

Stars 163
Forks 31

Install this agent skill to your Project

npx add-skill https://github.com/majiayu000/claude-skill-registry/tree/main/skills/development/anthropic-streaming-patterns-krzemienski-claude-mobile-expo

SKILL.md

Anthropic Claude API Streaming Patterns

Overview

Claude API integration with streaming, tool execution, and cost tracking using Anthropic SDK.

Core principle: Stream (don't buffer). Track costs. Handle tools correctly.

Announce at start: "I'm using the anthropic-streaming-patterns skill for Claude API integration."

When to Use

  • Implementing Claude API service (Task 3.4)
  • Implementing streaming responses
  • Implementing tool execution within streams
  • Tracking API costs
  • Debugging streaming issues

Quick Reference

Pattern SDK Method Purpose
Initialize messages.stream() Start streaming
Text deltas stream.on('text') Receive text chunks
Tool start stream.on('contentBlockStart') Tool use begins
Tool input stream.on('contentBlockDelta') Accumulate params
Tool complete stream.on('contentBlockStop') Execute tool
Stream end stream.on('message') Calculate costs
Errors stream.on('error') Handle failures

Streaming Pattern (Complete)

typescript
const stream = client.messages.stream({
  model: 'claude-sonnet-4-20250514',
  max_tokens: 8192,
  messages: messageHistory,
  tools: toolDefinitions,
});

let currentToolUse = null;
let accumulatedInput = '';

// Text deltas → forward to client
stream.on('text', (text) => {
  sendToClient({type: 'content_delta', delta: text});
});

// Tool use started
stream.on('contentBlockStart', (block) => {
  if (block.type === 'tool_use') {
    currentToolUse = {name: block.name, id: block.id};
    accumulatedInput = '';
    sendToClient({type: 'tool_execution', tool: block.name});
  }
});

// Tool input accumulation
stream.on('contentBlockDelta', (delta) => {
  if (delta.type === 'input_json_delta' && currentToolUse) {
    accumulatedInput = delta.partial_json;
  }
});

// Tool execution
stream.on('contentBlockStop', async () => {
  if (currentToolUse) {
    const input = JSON.parse(accumulatedInput);
    const result = await executeTool(currentToolUse.name, input);
    
    sendToClient({type: 'tool_result', result});
    currentToolUse = null;
  }
});

// Stream complete with usage
stream.on('message', (message) => {
  if (message.usage) {
    const inputCost = (message.usage.input_tokens / 1000) * 0.003;
    const outputCost = (message.usage.output_tokens / 1000) * 0.015;
    
    saveSessionCost(sessionId, {
      inputTokens: message.usage.input_tokens,
      outputTokens: message.usage.output_tokens,
      cost: inputCost + outputCost
    });
  }
});

stream.on('error', (error) => {
  logger.error('Streaming error:', error);
  sendToClient({type: 'error', error: error.message});
});

await stream.finalMessage(); // Wait for completion

Cost Tracking (MANDATORY)

typescript
const PRICING = {
  input: 0.003,  // $0.003 per 1k tokens
  output: 0.015, // $0.015 per 1k output tokens
};

// Calculate per message
const cost = {
  input: (inputTokens / 1000) * PRICING.input,
  output: (outputTokens / 1000) * PRICING.output,
  total: inputCost + outputCost
};

// Aggregate per session
sessionCosts.push(cost);
const sessionTotal = sessionCosts.reduce((sum, c) => sum + c.total, 0);

Error Handling

typescript
try {
  const stream = await client.messages.stream({...});
} catch (error) {
  if (error.status === 429) {
    // Rate limit - wait and retry
    await delay(60000);
    return retry();
  } else if (error.status === 401) {
    // Auth error
    throw new Error('Invalid API key');
  } else {
    logger.error(error);
    throw error;
  }
}

Common Mistakes

Mistake Reality
"Buffering is simpler" WRONG. Streaming provides real-time UX. Required.
"Cost tracking is optional" WRONG. Users need visibility. Prevents surprise bills.
"I can figure out SDK" WRONG. Event handling is subtle. Use proven patterns.
"Error handling later" WRONG. Streams fail. Handle from start.

❌ WRONG: Buffering

typescript
const response = await client.messages.create({...}); // Buffering
const fullText = response.content[0].text;
sendToClient(fullText);

✅ CORRECT: Streaming

typescript
const stream = await client.messages.stream({...});
stream.on('text', (delta) => sendToClient({type: 'content_delta', delta}));

Red Flags

  • "Buffering is easier" → WRONG. Stream for real-time.
  • "Cost tracking is overhead" → WRONG. Mandatory feature.
  • "Skip error handling" → WRONG. Streams fail often.

Integration

  • Use FOR: Task 3.4 (claude.service.ts)
  • Use WITH: @claude-mobile-cost-tracking
  • Integrate: Task 3.11 (cost.service.ts)

Didn't find tool you were looking for?

Be as detailed as possible for better results