Agent skill
upstash-workflow
Upstash Workflow implementation guide. Use when creating async workflows with QStash, implementing fan-out patterns, or building 3-layer workflow architecture (process → paginate → execute).
Install this agent skill to your Project
npx add-skill https://github.com/lobehub/lobehub/tree/canary/.agents/skills/upstash-workflow
SKILL.md
Upstash Workflow Implementation Guide
This guide covers the standard patterns for implementing Upstash Workflow + QStash async workflows in the LobeHub codebase.
🎯 The Three Core Patterns
All workflows in LobeHub follow the same 3-layer architecture with three essential patterns:
- 🔍 Dry-Run Mode - Get statistics without triggering actual execution
- 🌟 Fan-Out Pattern - Split large batches into smaller chunks for parallel processing
- 🎯 Single Task Execution - Each workflow execution processes ONE item only
These patterns ensure scalable, debuggable, and cost-efficient async workflows.
Table of Contents
- Architecture Overview
- Core Patterns
- File Structure
- Implementation Patterns
- Best Practices
- Examples
Architecture Overview
Standard 3-Layer Pattern
All workflows follow a standard 3-layer architecture:
Layer 1: Entry Point (process-*)
├─ Validates prerequisites
├─ Calculates total items to process
├─ Filters existing items
├─ Supports dry-run mode (statistics only)
└─ Triggers Layer 2 if work needed
Layer 2: Pagination (paginate-*)
├─ Handles cursor-based pagination
├─ Implements fan-out for large batches
├─ Recursively processes all pages
└─ Triggers Layer 3 for each item
Layer 3: Single Task Execution (execute-*/generate-*)
└─ Performs actual business logic for ONE item
Examples: welcome-placeholder, agent-welcome
Core Patterns
1. Dry-Run Mode
Purpose: Get statistics without triggering actual execution
Pattern:
// Layer 1: Entry Point
if (dryRun) {
console.log('[workflow:process] Dry run mode, returning statistics only');
return {
...result,
dryRun: true,
message: `[DryRun] Would process ${itemsNeedingProcessing.length} items`,
};
}
Use Case: Check how many items will be processed before committing to execution
Response:
{
success: true,
totalEligible: 100,
toProcess: 80,
alreadyProcessed: 20,
dryRun: true,
message: "[DryRun] Would process 80 items"
}
2. Fan-Out Pattern
Purpose: Split large batches into smaller chunks for parallel processing
Pattern:
// Layer 2: Pagination
const CHUNK_SIZE = 20;
if (itemIds.length > CHUNK_SIZE) {
// Fan-out to smaller chunks
const chunks = chunk(itemIds, CHUNK_SIZE);
console.log('[workflow:paginate] Fan-out mode:', {
chunks: chunks.length,
chunkSize: CHUNK_SIZE,
totalItems: itemIds.length,
});
await Promise.all(
chunks.map((ids, idx) =>
context.run(`workflow:fanout:${idx + 1}/${chunks.length}`, () =>
WorkflowClass.triggerPaginateItems({ itemIds: ids }),
),
),
);
}
Use Case: Avoid hitting workflow step limits by splitting large batches
Configuration:
PAGE_SIZE = 50- Items per pagination pageCHUNK_SIZE = 20- Items per fan-out chunk- If batch > CHUNK_SIZE, split into chunks and recursively trigger pagination
3. Single Task Execution
Purpose: Execute business logic for ONE item at a time
Pattern:
// Layer 3: Single Task Execution
export const { POST } = serve<ExecutePayload>(
async (context) => {
const { itemId } = context.requestPayload ?? {};
if (!itemId) {
return { success: false, error: 'Missing itemId' };
}
// Get item
const item = await context.run('workflow:get-item', async () => {
return getItem(itemId);
});
// Execute business logic for THIS item only
const result = await context.run('workflow:execute', async () => {
return processItem(item);
});
// Save result for THIS item
await context.run('workflow:save', async () => {
return saveResult(itemId, result);
});
return { success: true, itemId, result };
},
{
flowControl: {
key: 'workflow.execute',
parallelism: 10,
ratePerSecond: 5,
},
},
);
Key Principles:
- Each workflow execution handles exactly ONE item
- Parallelism controlled by
flowControlconfig - Multiple items processed via Layer 2 triggering multiple Layer 3 executions
File Structure
Directory Layout
src/
├── app/(backend)/api/workflows/
│ └── {workflow-name}/
│ ├── process-{entities}/route.ts # Layer 1
│ ├── paginate-{entities}/route.ts # Layer 2
│ └── execute-{entity}/route.ts # Layer 3
│
└── server/workflows/
└── {workflowName}/
└── index.ts # Workflow class
Cloud Project Configuration
For lobehub-cloud specific configurations (re-exports, cloud-only workflows, deployment patterns), see:
📄 Cloud Configuration Guide
Implementation Patterns
1. Workflow Class
Location: src/server/workflows/{workflowName}/index.ts
import { Client } from '@upstash/workflow';
import debug from 'debug';
const log = debug('lobe-server:workflows:{workflow-name}');
// Workflow paths
const WORKFLOW_PATHS = {
processItems: '/api/workflows/{workflow-name}/process-items',
paginateItems: '/api/workflows/{workflow-name}/paginate-items',
executeItem: '/api/workflows/{workflow-name}/execute-item',
} as const;
// Payload types
export interface ProcessItemsPayload {
dryRun?: boolean;
force?: boolean;
}
export interface PaginateItemsPayload {
cursor?: string;
itemIds?: string[]; // For fanout chunks
}
export interface ExecuteItemPayload {
itemId: string;
}
/**
* Get workflow URL using APP_URL
*/
const getWorkflowUrl = (path: string): string => {
const baseUrl = process.env.APP_URL;
if (!baseUrl) throw new Error('APP_URL is required to trigger workflows');
return new URL(path, baseUrl).toString();
};
/**
* Get workflow client
*/
const getWorkflowClient = (): Client => {
const token = process.env.QSTASH_TOKEN;
if (!token) throw new Error('QSTASH_TOKEN is required to trigger workflows');
const config: ConstructorParameters<typeof Client>[0] = { token };
if (process.env.QSTASH_URL) {
(config as Record<string, unknown>).url = process.env.QSTASH_URL;
}
return new Client(config);
};
/**
* {Workflow Name} Workflow
*/
export class {WorkflowName}Workflow {
private static client: Client;
private static getClient(): Client {
if (!this.client) {
this.client = getWorkflowClient();
}
return this.client;
}
/**
* Trigger workflow to process items (entry point)
*/
static triggerProcessItems(payload: ProcessItemsPayload) {
const url = getWorkflowUrl(WORKFLOW_PATHS.processItems);
log('Triggering process-items workflow');
return this.getClient().trigger({ body: payload, url });
}
/**
* Trigger workflow to paginate items
*/
static triggerPaginateItems(payload: PaginateItemsPayload) {
const url = getWorkflowUrl(WORKFLOW_PATHS.paginateItems);
log('Triggering paginate-items workflow');
return this.getClient().trigger({ body: payload, url });
}
/**
* Trigger workflow to execute a single item
*/
static triggerExecuteItem(payload: ExecuteItemPayload) {
const url = getWorkflowUrl(WORKFLOW_PATHS.executeItem);
log('Triggering execute-item workflow: %s', payload.itemId);
return this.getClient().trigger({ body: payload, url });
}
/**
* Filter items that need processing (e.g., check Redis cache, database state)
*/
static async filterItemsNeedingProcessing(itemIds: string[]): Promise<string[]> {
if (itemIds.length === 0) return [];
// Check existing state (Redis, database, etc.)
// Return items that need processing
return itemIds;
}
}
2. Layer 1: Entry Point (process-*)
Purpose: Validates prerequisites, calculates statistics, supports dryRun mode
import { serve } from '@upstash/workflow/nextjs';
import { getServerDB } from '@/database/server';
import { WorkflowClass, type ProcessPayload } from '@/server/workflows/{workflowName}';
/**
* Entry workflow for {workflow description}
* 1. Get all eligible items
* 2. Filter items that already have results
* 3. If dryRun, return statistics only
* 4. If no items need processing, return early
* 5. Trigger paginate workflow
*/
export const { POST } = serve<ProcessPayload>(
async (context) => {
const { dryRun, force } = context.requestPayload ?? {};
console.log('[{workflow}:process] Starting with payload:', { dryRun, force });
// Get all eligible items
const allItemIds = await context.run('{workflow}:get-all-items', async () => {
const db = await getServerDB();
// Query database for eligible items
return items.map((item) => item.id);
});
console.log('[{workflow}:process] Total eligible items:', allItemIds.length);
if (allItemIds.length === 0) {
return {
success: true,
totalEligible: 0,
message: 'No eligible items found',
};
}
// Filter items that need processing
const itemsNeedingProcessing = await context.run('{workflow}:filter-existing', () =>
WorkflowClass.filterItemsNeedingProcessing(allItemIds),
);
const result = {
success: true,
totalEligible: allItemIds.length,
toProcess: itemsNeedingProcessing.length,
alreadyProcessed: allItemIds.length - itemsNeedingProcessing.length,
};
console.log('[{workflow}:process] Check result:', result);
// If dryRun mode, return statistics only
if (dryRun) {
console.log('[{workflow}:process] Dry run mode, returning statistics only');
return {
...result,
dryRun: true,
message: `[DryRun] Would process ${itemsNeedingProcessing.length} items`,
};
}
// If no items need processing, return early
if (itemsNeedingProcessing.length === 0) {
console.log('[{workflow}:process] All items already processed');
return {
...result,
message: 'All items already processed',
};
}
// Trigger paginate workflow
console.log('[{workflow}:process] Triggering paginate workflow');
await context.run('{workflow}:trigger-paginate', () => WorkflowClass.triggerPaginateItems({}));
return {
...result,
message: `Triggered pagination for ${itemsNeedingProcessing.length} items`,
};
},
{
flowControl: {
key: '{workflow}.process',
parallelism: 1,
ratePerSecond: 1,
},
},
);
3. Layer 2: Pagination (paginate-*)
Purpose: Handles cursor-based pagination, implements fanout for large batches
import { serve } from '@upstash/workflow/nextjs';
import { chunk } from 'es-toolkit/compat';
import { getServerDB } from '@/database/server';
import { WorkflowClass, type PaginatePayload } from '@/server/workflows/{workflowName}';
const PAGE_SIZE = 50;
const CHUNK_SIZE = 20;
/**
* Paginate items workflow - handles pagination and fanout
* 1. If specific itemIds provided (from fanout), process them directly
* 2. Otherwise, paginate through all items using cursor
* 3. Filter items that need processing
* 4. If batch > CHUNK_SIZE, fanout to smaller chunks
* 5. Trigger execute workflow for each item
* 6. Schedule next page if cursor exists
*/
export const { POST } = serve<PaginatePayload>(
async (context) => {
const { cursor, itemIds: payloadItemIds } = context.requestPayload ?? {};
console.log('[{workflow}:paginate] Starting with payload:', {
cursor,
itemIdsCount: payloadItemIds?.length ?? 0,
});
// If specific itemIds are provided, process them directly (from fanout)
if (payloadItemIds && payloadItemIds.length > 0) {
console.log('[{workflow}:paginate] Processing specific itemIds:', {
count: payloadItemIds.length,
});
await Promise.all(
payloadItemIds.map((itemId) =>
context.run(`{workflow}:execute:${itemId}`, () =>
WorkflowClass.triggerExecuteItem({ itemId }),
),
),
);
return {
success: true,
processedItems: payloadItemIds.length,
};
}
// Paginate through all items
const itemBatch = await context.run('{workflow}:get-batch', async () => {
const db = await getServerDB();
// Query database with cursor and PAGE_SIZE
const items = await db.query(...);
if (!items.length) return { ids: [] };
const last = items.at(-1);
return {
ids: items.map(item => item.id),
cursor: last ? last.id : undefined,
};
});
const batchItemIds = itemBatch.ids;
const nextCursor = 'cursor' in itemBatch ? itemBatch.cursor : undefined;
console.log('[{workflow}:paginate] Got batch:', {
batchSize: batchItemIds.length,
nextCursor,
});
if (batchItemIds.length === 0) {
console.log('[{workflow}:paginate] No more items, pagination complete');
return { success: true, message: 'Pagination complete' };
}
// Filter items that need processing
const itemIds = await context.run('{workflow}:filter-existing', () =>
WorkflowClass.filterItemsNeedingProcessing(batchItemIds),
);
console.log('[{workflow}:paginate] After filtering:', {
needProcessing: itemIds.length,
skipped: batchItemIds.length - itemIds.length,
});
// Process items if any need processing
if (itemIds.length > 0) {
if (itemIds.length > CHUNK_SIZE) {
// Fanout to smaller chunks
const chunks = chunk(itemIds, CHUNK_SIZE);
console.log('[{workflow}:paginate] Fanout mode:', {
chunks: chunks.length,
chunkSize: CHUNK_SIZE,
totalItems: itemIds.length,
});
await Promise.all(
chunks.map((ids, idx) =>
context.run(`{workflow}:fanout:${idx + 1}/${chunks.length}`, () =>
WorkflowClass.triggerPaginateItems({ itemIds: ids }),
),
),
);
} else {
// Process directly
console.log('[{workflow}:paginate] Processing items directly:', {
count: itemIds.length,
});
await Promise.all(
itemIds.map((itemId) =>
context.run(`{workflow}:execute:${itemId}`, () =>
WorkflowClass.triggerExecuteItem({ itemId }),
),
),
);
}
}
// Schedule next page
if (nextCursor) {
console.log('[{workflow}:paginate] Scheduling next page:', { nextCursor });
await context.run('{workflow}:next-page', () =>
WorkflowClass.triggerPaginateItems({ cursor: nextCursor }),
);
} else {
console.log('[{workflow}:paginate] No more pages');
}
return {
success: true,
processedItems: itemIds.length,
skippedItems: batchItemIds.length - itemIds.length,
nextCursor: nextCursor ?? null,
};
},
{
flowControl: {
key: '{workflow}.paginate',
parallelism: 20,
ratePerSecond: 5,
},
},
);
4. Layer 3: Execution (execute-/generate-)
Purpose: Performs actual business logic
import { serve } from '@upstash/workflow/nextjs';
import { getServerDB } from '@/database/server';
import { WorkflowClass, type ExecutePayload } from '@/server/workflows/{workflowName}';
/**
* Execute item workflow - performs actual business logic
* 1. Get item data
* 2. Perform business logic (AI generation, data processing, etc.)
* 3. Save results
*/
export const { POST } = serve<ExecutePayload>(
async (context) => {
const { itemId } = context.requestPayload ?? {};
console.log('[{workflow}:execute] Starting:', { itemId });
if (!itemId) {
return { success: false, error: 'Missing itemId' };
}
const db = await getServerDB();
// Get item data
const item = await context.run('{workflow}:get-item', async () => {
// Query database for item
return item;
});
if (!item) {
return { success: false, error: 'Item not found' };
}
// Perform business logic
const result = await context.run('{workflow}:process-item', async () => {
const workflow = new WorkflowClass(db, itemId);
return workflow.generate(); // or process(), execute(), etc.
});
// Save results
await context.run('{workflow}:save-result', async () => {
const workflow = new WorkflowClass(db, itemId);
return workflow.saveToRedis(result); // or saveToDatabase(), etc.
});
console.log('[{workflow}:execute] Completed:', { itemId });
return {
success: true,
itemId,
result,
};
},
{
flowControl: {
key: '{workflow}.execute',
parallelism: 10,
ratePerSecond: 5,
},
},
);
Best Practices
1. Error Handling
export const { POST } = serve<Payload>(
async (context) => {
const { itemId } = context.requestPayload ?? {};
// Validate required parameters
if (!itemId) {
return { success: false, error: 'Missing itemId in payload' };
}
try {
// Perform work
const result = await context.run('step-name', () => doWork(itemId));
return { success: true, itemId, result };
} catch (error) {
console.error('[workflow:error]', error);
return {
success: false,
error: error instanceof Error ? error.message : 'Unknown error'
};
}
},
{ flowControl: { ... } },
);
2. Logging
Use consistent log prefixes and structured logging:
console.log('[{workflow}:{layer}] Starting with payload:', payload);
console.log('[{workflow}:{layer}] Processing items:', { count: items.length });
console.log('[{workflow}:{layer}] Completed:', result);
console.error('[{workflow}:{layer}:error]', error);
3. Return Values
Return consistent response shapes:
// Success response
return {
success: true,
itemId,
result,
message: 'Optional success message',
};
// Error response
return {
success: false,
error: 'Error description',
itemId, // Include context if available
};
// Statistics response (for entry point)
return {
success: true,
totalEligible: 100,
toProcess: 80,
alreadyProcessed: 20,
dryRun: true, // If applicable
message: 'Summary message',
};
4. flowControl Configuration
Purpose: Control concurrency and rate limiting for workflow executions
Tune concurrency based on layer:
// Layer 1: Entry point - single instance only
flowControl: {
key: '{workflow}.process',
parallelism: 1, // Only 1 process workflow at a time
ratePerSecond: 1, // 1 execution per second
}
// Layer 2: Pagination - moderate concurrency
flowControl: {
key: '{workflow}.paginate',
parallelism: 20, // Up to 20 pagination workflows in parallel
ratePerSecond: 5, // 5 new executions per second
}
// Layer 3: Single task execution - high concurrency
flowControl: {
key: '{workflow}.execute',
parallelism: 10, // Up to 10 items processed in parallel
ratePerSecond: 5, // 5 new items per second
}
Guidelines:
- Layer 1: Always use
parallelism: 1to avoid duplicate processing - Layer 2: Moderate concurrency for pagination (typically 10-20)
- Layer 3: Higher concurrency for parallel item processing (typically 5-10)
- Adjust
ratePerSecondbased on external API rate limits or resource constraints
5. context.run() Best Practices
- Use descriptive step names with prefixes:
{workflow}:step-name - Each step should be idempotent (safe to retry)
- Don't nest context.run() calls - keep them flat
- Use unique step names when processing multiple items:
// Good: Unique step names
await Promise.all(
items.map((item) => context.run(`{workflow}:execute:${item.id}`, () => processItem(item))),
);
// Bad: Same step name for all items
await Promise.all(
items.map((item) =>
context.run(`{workflow}:execute`, () =>
// ❌ Not unique
processItem(item),
),
),
);
6. Payload Validation
Always validate required parameters at the start:
export const { POST } = serve<Payload>(
async (context) => {
const { itemId, configId } = context.requestPayload ?? {};
// Validate at the start
if (!itemId) {
return { success: false, error: 'Missing itemId in payload' };
}
if (!configId) {
return { success: false, error: 'Missing configId in payload' };
}
// Proceed with work...
},
{ flowControl: { ... } },
);
7. Database Connection
Get database connection once per workflow:
export const { POST } = serve<Payload>(
async (context) => {
const db = await getServerDB(); // Get once
// Use in multiple steps
const item = await context.run('get-item', async () => {
return itemModel.findById(db, itemId);
});
const result = await context.run('save-result', async () => {
return resultModel.create(db, result);
});
},
{ flowControl: { ... } },
);
8. Testing
Create integration tests for workflows:
describe('WorkflowName', () => {
it('should process items successfully', async () => {
// Setup test data
const items = await createTestItems();
// Trigger workflow
await WorkflowClass.triggerProcessItems({ dryRun: false });
// Wait for completion (use polling or webhook)
await waitForCompletion();
// Verify results
const results = await getResults();
expect(results).toHaveLength(items.length);
});
it('should support dryRun mode', async () => {
const result = await WorkflowClass.triggerProcessItems({ dryRun: true });
expect(result).toMatchObject({
success: true,
dryRun: true,
totalEligible: expect.any(Number),
toProcess: expect.any(Number),
});
});
});
Examples
Example 1: Welcome Placeholder
Use Case: Generate AI-powered welcome placeholders for users
Structure:
- Layer 1:
process-users- Entry point, checks eligible users - Layer 2:
paginate-users- Paginates through active users - Layer 3:
generate-user- Generates placeholders for ONE user
Core Patterns Demonstrated:
- Dry-Run Mode:
// Layer 1: process-users
if (dryRun) {
return {
...result,
dryRun: true,
message: `[DryRun] Would process ${usersNeedingGeneration.length} users`,
};
}
- Fan-Out Pattern:
// Layer 2: paginate-users
if (userIds.length > CHUNK_SIZE) {
const chunks = chunk(userIds, CHUNK_SIZE);
await Promise.all(
chunks.map((ids, idx) =>
context.run(`welcome-placeholder:fanout:${idx + 1}/${chunks.length}`, () =>
WelcomePlaceholderWorkflow.triggerPaginateUsers({ userIds: ids }),
),
),
);
}
- Single Task Execution:
// Layer 3: generate-user
export const { POST } = serve<GenerateUserPlaceholderPayload>(async (context) => {
const { userId } = context.requestPayload ?? {};
// Execute for ONE user only
const workflow = new WelcomePlaceholderWorkflow(db, userId);
const placeholders = await context.run('generate', () => workflow.generate());
return { success: true, userId, placeholdersCount: placeholders.length };
});
Key Features:
- ✅ Filters users who already have cached placeholders in Redis
- ✅ Supports
paidOnlyflag to process only subscribed users - ✅ Supports
dryRunmode for statistics - ✅ Uses fan-out for large user batches (CHUNK_SIZE=20)
- ✅ Each execution processes exactly ONE user
Files:
/api/workflows/welcome-placeholder/process-users/route.ts/api/workflows/welcome-placeholder/paginate-users/route.ts/api/workflows/welcome-placeholder/generate-user/route.ts/server/workflows/welcomePlaceholder/index.ts
Example 2: Agent Welcome
Use Case: Generate welcome messages and open questions for AI agents
Structure:
- Layer 1:
process-agents- Entry point, checks eligible agents - Layer 2:
paginate-agents- Paginates through active agents - Layer 3:
generate-agent- Generates welcome data for ONE agent
Core Patterns Demonstrated:
- Dry-Run Mode:
// Layer 1: process-agents
if (dryRun) {
return {
...result,
dryRun: true,
message: `[DryRun] Would process ${agentsNeedingGeneration.length} agents`,
};
}
-
Fan-Out Pattern: Same as welcome-placeholder
-
Single Task Execution:
// Layer 3: generate-agent
export const { POST } = serve<GenerateAgentWelcomePayload>(async (context) => {
const { agentId } = context.requestPayload ?? {};
// Execute for ONE agent only
const workflow = new AgentWelcomeWorkflow(db, agentId);
const data = await context.run('generate', () => workflow.generate());
return { success: true, agentId, data };
});
Key Features:
- ✅ Filters agents who already have cached data in Redis
- ✅ Supports
paidOnlyflag for subscribed users' agents only - ✅ Supports
dryRunmode for statistics - ✅ Uses fan-out for large agent batches (CHUNK_SIZE=20)
- ✅ Each execution processes exactly ONE agent
Files:
/api/workflows/agent-welcome/process-agents/route.ts/api/workflows/agent-welcome/paginate-agents/route.ts/api/workflows/agent-welcome/generate-agent/route.ts/server/workflows/agentWelcome/index.ts
Key Takeaways from Examples
Both workflows follow the exact same pattern:
-
Layer 1 (Entry Point):
- Calculate statistics
- Filter existing items
- Support dry-run mode
- Trigger pagination only if needed
-
Layer 2 (Pagination):
- Paginate with cursor (PAGE_SIZE=50)
- Fan-out large batches (CHUNK_SIZE=20)
- Trigger Layer 3 for each item
- Recursively process all pages
-
Layer 3 (Execution):
- Process ONE item per execution
- Perform business logic
- Save results
- Return success/failure
The only differences are:
- Entity type (users vs agents)
- Business logic (placeholder generation vs welcome generation)
- Data source (different database queries)
Common Pitfalls
❌ Don't: Use context.run() without unique names
// Bad: Same step name when processing multiple items
await Promise.all(items.map((item) => context.run('process', () => process(item))));
// Good: Unique step names
await Promise.all(items.map((item) => context.run(`process:${item.id}`, () => process(item))));
❌ Don't: Forget to validate payload parameters
// Bad: No validation
export const { POST } = serve<Payload>(async (context) => {
const { itemId } = context.requestPayload ?? {};
const result = await process(itemId); // May fail with undefined
});
// Good: Validate early
export const { POST } = serve<Payload>(async (context) => {
const { itemId } = context.requestPayload ?? {};
if (!itemId) {
return { success: false, error: 'Missing itemId' };
}
const result = await process(itemId);
});
❌ Don't: Skip filtering existing items
// Bad: No filtering, may duplicate work
const allItems = await getAllItems();
await Promise.all(allItems.map((item) => triggerExecute(item)));
// Good: Filter existing items first
const allItems = await getAllItems();
const itemsNeedingProcessing = await filterExisting(allItems);
await Promise.all(itemsNeedingProcessing.map((item) => triggerExecute(item)));
❌ Don't: Use inconsistent logging
// Bad: Inconsistent prefixes and formats
console.log('Starting workflow');
log.info('Processing item:', itemId);
console.log(`Done with ${itemId}`);
// Good: Consistent structured logging
console.log('[workflow:layer] Starting with payload:', payload);
console.log('[workflow:layer] Processing item:', { itemId });
console.log('[workflow:layer] Completed:', { itemId, result });
Environment Variables Required
# Required for all workflows
APP_URL=https://your-app.com # Base URL for workflow endpoints
QSTASH_TOKEN=qstash_xxx # QStash authentication token
# Optional (for custom QStash URL)
QSTASH_URL=https://custom-qstash.com # Custom QStash endpoint
Checklist for New Workflows
Planning Phase
- Identify entity to process (users, agents, items, etc.)
- Define business logic for single item execution
- Determine filtering logic (Redis cache, database state, etc.)
Implementation Phase
- Define payload types with proper TypeScript interfaces
- Create workflow class with static trigger methods
- Layer 1: Implement entry point with dry-run support
- Layer 1: Add filtering logic to avoid duplicate work
- Layer 2: Implement pagination with fan-out logic
- Layer 3: Implement single task execution (ONE item per run)
- Configure appropriate flowControl for each layer
- Add consistent logging with workflow prefixes
- Validate all required payload parameters
- Use unique context.run() step names
Quality & Deployment
- Return consistent response shapes
- Configure cloud deployment (see Cloud Guide if using lobehub-cloud)
- Write integration tests
- Test with dry-run mode first
- Test with small batch before full rollout
Additional Resources
- Upstash Workflow Documentation
- QStash Documentation
- Example Workflows in Codebase
- Workflow Classes
Recommended Agent Skills
Expand your agent's capabilities with these related and highly-rated skills.
task
i18n
Internationalization guide using react-i18next. Use when adding translations, creating i18n keys, or working with localized text in React components (.tsx files). Triggers on translation tasks, locale management, or i18n implementation.
data-fetching
Data fetching architecture guide using Service layer + Zustand Store + SWR. Use when implementing data fetching, creating services, working with store hooks, or migrating from useEffect. Triggers on data loading, API calls, service creation, or store data fetching tasks.
recent-data
Guide for using Recent Data (topics, resources, pages). Use when working with recently accessed items, implementing recent lists, or accessing session store recent data. Triggers on recent data usage or implementation tasks.
hotkey
Guide for adding keyboard shortcuts. Use when implementing new hotkeys, registering shortcuts, or working with keyboard interactions. Triggers on hotkey implementation or keyboard shortcut tasks.
debug
Debug package usage guide. Use when adding debug logging, understanding log namespaces, or implementing debugging features. Triggers on debug logging requests or logging implementation.
Didn't find tool you were looking for?