Agent skill
production-scheduler
Production scheduling skill with sequencing rules, resource allocation, and schedule optimization.
Install this agent skill to your Project
npx add-skill https://github.com/a5c-ai/babysitter/tree/main/library/specializations/domains/science/industrial-engineering/skills/production-scheduler
Metadata
Additional technical details for this skill
- author
- babysitter-sdk
- version
- 1.0.0
- category
- production-planning
- backlog id
- SK-IE-029
SKILL.md
production-scheduler
You are production-scheduler - a specialized skill for production scheduling including job sequencing, resource allocation, and schedule optimization.
Overview
This skill enables AI-powered production scheduling including:
- Job shop scheduling
- Flow shop scheduling
- Priority dispatch rules (SPT, EDD, CR, SLACK)
- Makespan minimization
- Tardiness minimization
- Resource-constrained scheduling
- Gantt chart generation
- Schedule performance metrics
Capabilities
1. Priority Dispatch Rules
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
def apply_dispatch_rules(jobs: pd.DataFrame, rule: str):
"""
Apply priority dispatch rules for job sequencing
jobs: DataFrame with columns ['job_id', 'processing_time', 'due_date', 'arrival_time']
rule: 'SPT', 'LPT', 'EDD', 'CR', 'SLACK', 'FCFS'
"""
jobs = jobs.copy()
current_time = jobs['arrival_time'].min()
if rule == 'SPT': # Shortest Processing Time
jobs['priority'] = jobs['processing_time']
sequence = jobs.sort_values('priority')
elif rule == 'LPT': # Longest Processing Time
jobs['priority'] = -jobs['processing_time']
sequence = jobs.sort_values('priority')
elif rule == 'EDD': # Earliest Due Date
jobs['priority'] = jobs['due_date']
sequence = jobs.sort_values('priority')
elif rule == 'CR': # Critical Ratio
jobs['time_remaining'] = (jobs['due_date'] - current_time).dt.total_seconds() / 3600
jobs['priority'] = jobs['time_remaining'] / jobs['processing_time']
jobs.loc[jobs['priority'] <= 0, 'priority'] = 0.001 # Urgent jobs first
sequence = jobs.sort_values('priority')
elif rule == 'SLACK': # Minimum Slack
jobs['time_remaining'] = (jobs['due_date'] - current_time).dt.total_seconds() / 3600
jobs['priority'] = jobs['time_remaining'] - jobs['processing_time']
sequence = jobs.sort_values('priority')
elif rule == 'FCFS': # First Come First Served
sequence = jobs.sort_values('arrival_time')
return {
"rule": rule,
"sequence": sequence['job_id'].tolist(),
"jobs": sequence
}
2. Schedule Performance Metrics
def calculate_schedule_metrics(schedule: pd.DataFrame):
"""
Calculate comprehensive schedule performance metrics
schedule: DataFrame with ['job_id', 'start_time', 'end_time', 'due_date']
"""
schedule = schedule.copy()
# Completion time (flow time)
schedule['completion_time'] = schedule['end_time']
# Lateness (can be positive or negative)
schedule['lateness'] = (schedule['end_time'] - schedule['due_date']).dt.total_seconds() / 3600
# Tardiness (max of lateness and 0)
schedule['tardiness'] = schedule['lateness'].apply(lambda x: max(0, x))
# Earliness
schedule['earliness'] = schedule['lateness'].apply(lambda x: abs(min(0, x)))
# Binary late indicator
schedule['is_late'] = schedule['lateness'] > 0
metrics = {
'makespan': (schedule['end_time'].max() - schedule['start_time'].min()).total_seconds() / 3600,
'mean_flow_time': schedule['completion_time'].mean(),
'total_tardiness': schedule['tardiness'].sum(),
'max_tardiness': schedule['tardiness'].max(),
'number_tardy': schedule['is_late'].sum(),
'percent_on_time': (1 - schedule['is_late'].mean()) * 100,
'mean_lateness': schedule['lateness'].mean(),
'total_earliness': schedule['earliness'].sum()
}
return {
"metrics": metrics,
"schedule_detail": schedule
}
3. Job Shop Scheduling
from collections import defaultdict
def job_shop_schedule(jobs: list, machines: list):
"""
Job shop scheduling using dispatching rules
jobs: list of {'job_id': str, 'operations': [(machine, processing_time), ...], 'due_date': datetime}
machines: list of machine IDs
"""
# Initialize machine availability
machine_available = {m: 0 for m in machines}
job_completion = defaultdict(lambda: 0)
schedule = []
# Process jobs operation by operation
for job in jobs:
job_id = job['job_id']
prev_end = 0
for op_idx, (machine, proc_time) in enumerate(job['operations']):
# Start time is max of machine availability and job's previous operation end
start_time = max(machine_available[machine], prev_end)
end_time = start_time + proc_time
schedule.append({
'job_id': job_id,
'operation': op_idx + 1,
'machine': machine,
'start_time': start_time,
'end_time': end_time,
'processing_time': proc_time
})
machine_available[machine] = end_time
prev_end = end_time
job_completion[job_id] = prev_end
return {
"schedule": pd.DataFrame(schedule),
"makespan": max(job_completion.values()),
"machine_utilization": calculate_machine_utilization(schedule, machines)
}
def calculate_machine_utilization(schedule: list, machines: list):
"""Calculate utilization for each machine"""
makespan = max(s['end_time'] for s in schedule)
utilization = {}
for machine in machines:
machine_ops = [s for s in schedule if s['machine'] == machine]
busy_time = sum(s['processing_time'] for s in machine_ops)
utilization[machine] = busy_time / makespan * 100 if makespan > 0 else 0
return utilization
4. Flow Shop Scheduling
def johnson_algorithm(jobs: list):
"""
Johnson's algorithm for 2-machine flow shop
Minimizes makespan
jobs: list of {'job_id': str, 'machine1_time': float, 'machine2_time': float}
"""
# Separate into two sets
set_i = [] # Jobs where machine1_time <= machine2_time
set_ii = [] # Jobs where machine1_time > machine2_time
for job in jobs:
if job['machine1_time'] <= job['machine2_time']:
set_i.append(job)
else:
set_ii.append(job)
# Sort set_i by machine1_time ascending
set_i.sort(key=lambda x: x['machine1_time'])
# Sort set_ii by machine2_time descending
set_ii.sort(key=lambda x: x['machine2_time'], reverse=True)
# Optimal sequence is set_i followed by set_ii
optimal_sequence = set_i + set_ii
# Calculate makespan
m1_end = 0
m2_end = 0
schedule = []
for job in optimal_sequence:
# Machine 1 processing
m1_start = m1_end
m1_end = m1_start + job['machine1_time']
# Machine 2 processing (must wait for both machine 1 and previous job on machine 2)
m2_start = max(m1_end, m2_end)
m2_end = m2_start + job['machine2_time']
schedule.append({
'job_id': job['job_id'],
'machine1_start': m1_start,
'machine1_end': m1_end,
'machine2_start': m2_start,
'machine2_end': m2_end
})
return {
"optimal_sequence": [j['job_id'] for j in optimal_sequence],
"makespan": m2_end,
"schedule": schedule
}
5. Resource-Constrained Scheduling
def resource_constrained_schedule(tasks: list, resources: dict, dependencies: dict):
"""
Schedule tasks with resource constraints and dependencies
tasks: list of {'task_id': str, 'duration': float, 'resources': {resource: amount}}
resources: {resource: available_amount}
dependencies: {task_id: [predecessor_task_ids]}
"""
# Track task status
scheduled = {}
resource_timeline = {r: [] for r in resources}
# Get tasks in topological order
remaining = set(t['task_id'] for t in tasks)
task_dict = {t['task_id']: t for t in tasks}
current_time = 0
max_time = 1000 # Safety limit
while remaining and current_time < max_time:
# Find eligible tasks (all predecessors complete)
eligible = []
for task_id in remaining:
predecessors = dependencies.get(task_id, [])
if all(p in scheduled for p in predecessors):
# Check earliest start (after predecessors)
earliest = 0
for p in predecessors:
earliest = max(earliest, scheduled[p]['end_time'])
eligible.append((task_id, earliest))
# Try to schedule eligible tasks
for task_id, earliest in sorted(eligible, key=lambda x: x[1]):
task = task_dict[task_id]
start_time = max(earliest, current_time)
# Check resource availability
can_schedule = True
for resource, amount in task.get('resources', {}).items():
if amount > resources.get(resource, 0):
can_schedule = False
break
if can_schedule:
end_time = start_time + task['duration']
scheduled[task_id] = {
'task_id': task_id,
'start_time': start_time,
'end_time': end_time,
'duration': task['duration']
}
remaining.remove(task_id)
current_time += 1
return {
"schedule": list(scheduled.values()),
"makespan": max(s['end_time'] for s in scheduled.values()) if scheduled else 0,
"unscheduled": list(remaining)
}
6. Gantt Chart Generation
def generate_gantt_data(schedule: pd.DataFrame, group_by: str = 'machine'):
"""
Generate data for Gantt chart visualization
schedule: DataFrame with columns appropriate for group_by
group_by: 'machine' or 'job'
"""
gantt_data = []
for _, row in schedule.iterrows():
gantt_data.append({
'Task': row[group_by] if group_by in row else row.get('job_id', 'Unknown'),
'Start': row['start_time'],
'Finish': row['end_time'],
'Resource': row.get('job_id', row.get('machine', 'Unknown')),
'Duration': row['end_time'] - row['start_time']
})
return {
"gantt_data": gantt_data,
"timeline_start": min(g['Start'] for g in gantt_data),
"timeline_end": max(g['Finish'] for g in gantt_data),
"resources": list(set(g['Resource'] for g in gantt_data))
}
Process Integration
This skill integrates with the following processes:
production-scheduling-optimization.jscapacity-planning-analysis.js
Output Format
{
"schedule": {
"sequence": ["J1", "J3", "J2", "J4"],
"rule_applied": "EDD"
},
"metrics": {
"makespan": 42,
"mean_flow_time": 24.5,
"total_tardiness": 8,
"percent_on_time": 75
},
"gantt_data": [...],
"recommendations": [
"Consider SPT rule to minimize flow time",
"Job J4 is critical path - prioritize"
]
}
Best Practices
- Match rule to objective - SPT for flow time, EDD for due dates
- Consider setup times - Sequence-dependent setups matter
- Build in buffers - Account for variability
- Monitor actual vs. planned - Adjust rules based on performance
- Communicate changes - Schedule visibility is critical
- Review regularly - Reschedule as conditions change
Constraints
- Static scheduling assumes known job arrivals
- Setup times may be sequence-dependent
- Resource constraints add complexity
- Real-time adjustments needed for disruptions
Recommended Agent Skills
Expand your agent's capabilities with these related and highly-rated skills.
gsd-tools
Central utility skill for GSD operations. Provides config parsing, slug generation, timestamps, path operations, and orchestrates calls to other specialized skills. Acts as the unified entry point that the original gsd-tools.cjs provided via its lib/ modules (commands, config, core, init).
model-profile-resolution
Resolve model profile (quality/balanced/budget) at orchestration start and map agents to specific models. Enables cost/quality tradeoffs by selecting appropriate AI models for each agent role.
verification-suite
Plan structure validation, phase completeness checks, reference integrity verification, and artifact existence confirmation. Provides the structured verification layer ensuring GSD artifacts are well-formed and complete.
state-management
STATE.md reading, writing, and field-level updates. Provides cross-session state persistence via .planning/STATE.md with structured fields for current task, completed phases, blockers, decisions, and quick tasks.
git-integration
Git commit patterns, formats, and conventions for GSD methodology. Provides atomic commits per task, structured commit messages, planning file commits, branch management, and milestone tag operations.
frontmatter-parsing
YAML frontmatter parsing and manipulation for .planning/ documents. Provides read, write, update, query, and validation operations on frontmatter blocks in GSD markdown artifacts.
Didn't find tool you were looking for?