Agent skill
rust-async
Rust async/await patterns with Tokio. Use when writing async code, handling concurrency, managing tasks, working with channels, or debugging async issues.
Stars
163
Forks
31
Install this agent skill to your Project
npx add-skill https://github.com/majiayu000/claude-skill-registry/tree/main/skills/development/rust-async
SKILL.md
Rust Async Patterns with Tokio
Async Closures (Rust 1.85+)
rust
// Async closures - stable since Rust 1.85
let fetch_data = async |url: &str| -> Result<Data, Error> {
let response = reqwest::get(url).await?;
response.json().await
};
// Use with higher-order functions
async fn process_urls(urls: Vec<String>) -> Vec<Result<Data, Error>> {
let fetch = async |url: String| {
reqwest::get(&url).await?.json().await
};
futures::future::join_all(urls.into_iter().map(fetch)).await
}
// Async closures capture by reference by default (like regular closures)
let data = vec![1, 2, 3];
let process = async || {
// `data` is borrowed here
data.iter().sum::<i32>()
};
Task Management
Spawning Tasks
rust
use tokio::task::JoinSet;
// GOOD - JoinSet for scoped tasks
async fn process_items(items: Vec<Item>) -> Vec<Result<Output, Error>> {
let mut set = JoinSet::new();
for item in items {
set.spawn(async move {
process_item(item).await
});
}
let mut results = Vec::with_capacity(set.len());
while let Some(result) = set.join_next().await {
match result {
Ok(output) => results.push(output),
Err(e) => results.push(Err(e.into())),
}
}
results
}
// For fire-and-forget
tokio::spawn(async move {
// Task runs independently
background_work().await;
});
Cancellation
rust
use tokio::select;
use tokio_util::sync::CancellationToken;
async fn cancellable_operation(cancel: CancellationToken) -> Result<Output, Error> {
select! {
result = do_work() => result,
_ = cancel.cancelled() => Err(Error::Cancelled),
}
}
// Using timeout
use tokio::time::{timeout, Duration};
async fn with_timeout() -> Result<Output, Error> {
timeout(Duration::from_secs(30), do_work())
.await
.map_err(|_| Error::Timeout)?
}
Graceful Shutdown
rust
use tokio::signal;
use tokio::sync::broadcast;
async fn run_with_shutdown() {
let (shutdown_tx, _) = broadcast::channel::<()>(1);
// Spawn worker with shutdown receiver
let mut shutdown_rx = shutdown_tx.subscribe();
let worker = tokio::spawn(async move {
loop {
select! {
_ = do_work() => {}
_ = shutdown_rx.recv() => {
println!("Shutting down worker");
break;
}
}
}
});
// Wait for Ctrl+C
signal::ctrl_c().await.expect("Failed to listen for Ctrl+C");
// Signal shutdown
let _ = shutdown_tx.send(());
// Wait for worker to finish
let _ = worker.await;
}
Channels
Bounded MPSC (Multi-Producer, Single-Consumer)
rust
use tokio::sync::mpsc;
// For work queues - bounded prevents memory exhaustion
let (tx, mut rx) = mpsc::channel::<Work>(100);
// Producer
tokio::spawn(async move {
tx.send(work).await.expect("receiver dropped");
});
// Consumer
while let Some(work) = rx.recv().await {
process(work).await;
}
Broadcast (Multi-Producer, Multi-Consumer)
rust
use tokio::sync::broadcast;
// For events/notifications - stateless
let (tx, _) = broadcast::channel::<Event>(16);
// Multiple subscribers
let mut rx1 = tx.subscribe();
let mut rx2 = tx.subscribe();
// Publish
tx.send(Event::Updated)?;
// Receive
while let Ok(event) = rx1.recv().await {
handle_event(event);
}
Oneshot (Single Value)
rust
use tokio::sync::oneshot;
// For request-response pattern
async fn request_with_response() -> Result<Response, Error> {
let (tx, rx) = oneshot::channel();
// Send request with response channel
request_queue.send(Request { response: tx }).await?;
// Wait for response
rx.await.map_err(|_| Error::NoResponse)
}
Watch (Single Value, Latest Only)
rust
use tokio::sync::watch;
// For configuration/state that updates
let (tx, rx) = watch::channel(initial_config);
// Update
tx.send(new_config)?;
// Read latest (non-blocking)
let current = rx.borrow().clone();
// Wait for changes
let mut rx = rx.clone();
while rx.changed().await.is_ok() {
let new_value = rx.borrow().clone();
apply_config(new_value);
}
Shared State
RwLock (Prefer Over Mutex)
rust
use tokio::sync::RwLock;
use std::sync::Arc;
struct SharedState {
data: Arc<RwLock<Data>>,
}
impl SharedState {
async fn read(&self) -> Data {
self.data.read().await.clone()
}
async fn update(&self, new_data: Data) {
let mut guard = self.data.write().await;
*guard = new_data;
}
}
Downgrade Write Lock to Read Lock
rust
use parking_lot::RwLock;
// parking_lot RwLock supports downgrade (not available in tokio::sync::RwLock)
fn update_and_read(lock: &RwLock<Data>) -> Data {
let mut write_guard = lock.write();
// Modify data
write_guard.value += 1;
// Atomically downgrade to read lock without releasing
// Other readers can now access, but we keep our view
let read_guard = parking_lot::RwLockWriteGuard::downgrade(write_guard);
read_guard.clone()
}
// Note: std::sync::RwLockWriteGuard::downgrade stabilized in Rust 1.92 (project MSRV)
// For async code, use parking_lot or clone data before releasing write lock
Avoid Holding Locks Across Await
rust
// BAD - lock held across await point
async fn bad_example(state: Arc<RwLock<Data>>) {
let guard = state.read().await;
do_async_work(&guard).await; // Lock held here!
}
// GOOD - clone data, release lock
async fn good_example(state: Arc<RwLock<Data>>) {
let data = state.read().await.clone();
// Lock released
do_async_work(&data).await;
}
Streams
Processing Streams
rust
use futures::StreamExt;
use tokio_stream::wrappers::ReceiverStream;
async fn process_stream(rx: mpsc::Receiver<Item>) {
let stream = ReceiverStream::new(rx);
stream
.map(|item| async move { transform(item).await })
.buffer_unordered(10) // Concurrent processing
.for_each(|result| async {
handle_result(result);
})
.await;
}
Buffering and Batching
rust
use futures::StreamExt;
use tokio::time::Duration;
async fn batch_process(rx: mpsc::Receiver<Item>) {
let stream = ReceiverStream::new(rx);
// Collect into batches
stream
.chunks_timeout(100, Duration::from_millis(100))
.for_each(|batch| async move {
process_batch(batch).await;
})
.await;
}
Error Handling in Async
Propagating Errors
rust
async fn fallible_operation() -> Result<Output, Error> {
let data = fetch_data().await?;
let processed = process(data).await?;
Ok(processed)
}
Handling Multiple Futures
rust
use futures::future::try_join_all;
async fn process_all(items: Vec<Item>) -> Result<Vec<Output>, Error> {
let futures = items.into_iter().map(|item| async move {
process_item(item).await
});
try_join_all(futures).await
}
Retry Logic
rust
use tokio::time::{sleep, Duration};
async fn with_retry<T, E, F, Fut>(
mut operation: F,
max_retries: u32,
) -> Result<T, E>
where
F: FnMut() -> Fut,
Fut: std::future::Future<Output = Result<T, E>>,
{
let mut attempts = 0;
loop {
match operation().await {
Ok(result) => return Ok(result),
Err(e) if attempts < max_retries => {
attempts += 1;
let delay = Duration::from_millis(100 * 2u64.pow(attempts));
sleep(delay).await;
}
Err(e) => return Err(e),
}
}
}
Testing Async Code
rust
#[tokio::test]
async fn test_async_function() {
let result = async_function().await;
assert!(result.is_ok());
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_concurrent() {
// Test with multiple worker threads
}
// Time-based testing
#[tokio::test]
async fn test_timeout() {
tokio::time::pause(); // Control time
let start = tokio::time::Instant::now();
tokio::time::advance(Duration::from_secs(10)).await;
assert_eq!(start.elapsed(), Duration::from_secs(10));
}
Blocking Work in Async Context
rust
use tokio::task::spawn_blocking;
async fn process_with_cpu_work(data: Data) -> Result<Output, Error> {
// Don't block the async runtime with CPU-heavy work
let result = spawn_blocking(move || {
// CPU-intensive computation runs on blocking thread pool
expensive_computation(&data)
})
.await
.map_err(|e| Error::TaskPanic(e.to_string()))?;
Ok(result)
}
// For sync I/O in async context
async fn read_file_blocking(path: PathBuf) -> std::io::Result<String> {
spawn_blocking(move || std::fs::read_to_string(path)).await?
}
Async Anti-patterns and Pitfalls
Common Async Issues
- Blocking in async - never use
std::thread::sleep,std::sync::Mutex, or blocking I/O - Runtime mixing - don't mix Tokio and async-std; pick one runtime
- Send bound violations - ensure spawned futures are Send
- Task starvation - yield periodically in long computations with
tokio::task::yield_now() - Unbounded channel growth - always use bounded channels for backpressure
- Select bias -
select!polls randomly by default; usebiased;for deterministic priority order - Cancellation unsafety - dropped futures may leave resources in inconsistent state
- Future leak - spawned tasks without JoinHandle tracking can leak
Async Recursion
rust
// BAD - infinite stack size due to recursive future
async fn bad_recursive(n: u32) -> u32 {
if n == 0 { 0 } else { bad_recursive(n - 1).await }
}
// GOOD - use Box::pin for async recursion
use std::pin::Pin;
use std::future::Future;
fn good_recursive(n: u32) -> Pin<Box<dyn Future<Output = u32> + Send>> {
Box::pin(async move {
if n == 0 { 0 } else { good_recursive(n - 1).await }
})
}
Avoiding Blocking
rust
// BAD - blocks the async runtime
async fn bad_blocking() {
std::thread::sleep(Duration::from_secs(1)); // Blocks!
std::fs::read_to_string("file.txt"); // Blocks!
}
// GOOD - use async alternatives or spawn_blocking
async fn good_non_blocking() {
tokio::time::sleep(Duration::from_secs(1)).await;
tokio::fs::read_to_string("file.txt").await;
// For unavoidable blocking:
tokio::task::spawn_blocking(|| {
heavy_cpu_work()
}).await;
}
Proper Mutex Usage
rust
use tokio::sync::Mutex; // NOT std::sync::Mutex
// GOOD - tokio Mutex for async
async fn async_safe(data: Arc<tokio::sync::Mutex<Data>>) {
let guard = data.lock().await;
// Use guard
}
// ALSO GOOD - parking_lot for short non-async critical sections
use parking_lot::Mutex;
async fn quick_sync(data: Arc<parking_lot::Mutex<Data>>) {
let result = {
let guard = data.lock(); // Very brief, OK
guard.clone()
}; // Lock released immediately
do_async_work(result).await;
}
Nebula-Specific Async Patterns
- Default timeout: 30s for operations
- Database operations: 5s timeout
- HTTP calls: 10s timeout
- Always include cancellation in long-running tasks
- Use
JoinSetfor managing concurrent workflow steps - Prefer bounded channels to prevent memory exhaustion
- Use
select!for responsive shutdown handling - Use
spawn_blockingfor CPU-bound work
Didn't find tool you were looking for?