Agent skill
data-engineering
ETL pipelines, Apache Spark, data warehousing, and big data processing. Use for building data pipelines, processing large datasets, or data infrastructure.
Stars
4
Forks
1
Install this agent skill to your Project
npx add-skill https://github.com/pluginagentmarketplace/custom-plugin-ai-data-scientist/tree/main/skills/data-engineering
SKILL.md
Data Engineering
Build scalable data pipelines and infrastructure for big data processing.
Quick Start with Apache Spark
python
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, sum, count
# Initialize Spark
spark = SparkSession.builder \
.appName("DataProcessing") \
.config("spark.executor.memory", "4g") \
.getOrCreate()
# Read data
df = spark.read.parquet("s3://bucket/data/")
# Transformations (lazy evaluation)
df_clean = df \
.filter(col("value") > 0) \
.groupBy("category") \
.agg(
sum("sales").alias("total_sales"),
avg("price").alias("avg_price"),
count("*").alias("count")
) \
.orderBy(col("total_sales").desc())
# Write results
df_clean.write \
.mode("overwrite") \
.partitionBy("date") \
.parquet("s3://bucket/output/")
ETL Pipeline with Apache Airflow
python
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'email_on_failure': True,
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'etl_pipeline',
default_args=default_args,
schedule_interval='@daily',
catchup=False
)
def extract(**context):
# Extract data from source
data = fetch_api_data()
context['task_instance'].xcom_push(key='raw_data', value=data)
def transform(**context):
# Transform data
data = context['task_instance'].xcom_pull(key='raw_data')
cleaned = clean_and_transform(data)
context['task_instance'].xcom_push(key='clean_data', value=cleaned)
def load(**context):
# Load to data warehouse
data = context['task_instance'].xcom_pull(key='clean_data')
load_to_warehouse(data)
extract_task = PythonOperator(
task_id='extract',
python_callable=extract,
dag=dag
)
transform_task = PythonOperator(
task_id='transform',
python_callable=transform,
dag=dag
)
load_task = PythonOperator(
task_id='load',
python_callable=load,
dag=dag
)
extract_task >> transform_task >> load_task
Data Warehousing
Star Schema Design
sql
-- Fact Table
CREATE TABLE fact_sales (
sale_id SERIAL PRIMARY KEY,
date_key INT REFERENCES dim_date(date_key),
product_key INT REFERENCES dim_product(product_key),
customer_key INT REFERENCES dim_customer(customer_key),
quantity INT,
revenue DECIMAL(10,2),
cost DECIMAL(10,2)
);
-- Dimension Table
CREATE TABLE dim_product (
product_key INT PRIMARY KEY,
product_id VARCHAR(50),
product_name VARCHAR(200),
category VARCHAR(100),
brand VARCHAR(100)
);
Snowflake Data Warehouse
sql
-- Create warehouse
CREATE WAREHOUSE compute_wh
WAREHOUSE_SIZE = 'MEDIUM'
AUTO_SUSPEND = 300
AUTO_RESUME = TRUE;
-- Load data from S3
COPY INTO sales_table
FROM 's3://bucket/data/'
FILE_FORMAT = (TYPE = 'PARQUET')
ON_ERROR = 'CONTINUE';
-- Clustering
ALTER TABLE sales CLUSTER BY (date, region);
-- Time travel
SELECT * FROM sales AT (OFFSET => -3600); -- 1 hour ago
Big Data Processing
Spark SQL
python
# Register as temp view
df.createOrReplaceTempView("sales")
# SQL queries
result = spark.sql("""
SELECT
category,
SUM(sales) as total_sales,
AVG(price) as avg_price
FROM sales
WHERE date >= '2024-01-01'
GROUP BY category
HAVING SUM(sales) > 10000
ORDER BY total_sales DESC
""")
result.show()
Spark Optimization
python
# Cache in memory
df.cache()
# Repartition
df.repartition(200)
# Broadcast small tables
from pyspark.sql.functions import broadcast
result = large_df.join(broadcast(small_df), "key")
# Persist
from pyspark.storagelevel import StorageLevel
df.persist(StorageLevel.MEMORY_AND_DISK)
Stream Processing with Kafka
python
from kafka import KafkaProducer, KafkaConsumer
import json
# Producer
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
producer.send('topic-name', {'key': 'value'})
# Consumer
consumer = KafkaConsumer(
'topic-name',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
group_id='my-group',
auto_offset_reset='earliest'
)
for message in consumer:
process_message(message.value)
Data Quality Validation
python
import great_expectations as ge
# Load data
df = ge.read_csv('data.csv')
# Define expectations
df.expect_column_values_to_not_be_null('user_id')
df.expect_column_values_to_be_unique('email')
df.expect_column_values_to_be_between('age', 0, 120)
df.expect_column_values_to_match_regex(
'email',
r'^[\w\.-]+@[\w\.-]+\.\w+$'
)
# Validate
results = df.validate()
print(results)
Delta Lake (Data Lakehouse)
python
from delta.tables import DeltaTable
# Write to Delta
df.write.format("delta") \
.mode("overwrite") \
.save("/path/to/delta-table")
# Read from Delta
df = spark.read.format("delta").load("/path/to/delta-table")
# ACID transactions
deltaTable = DeltaTable.forPath(spark, "/path/to/delta-table")
# Upsert (merge)
deltaTable.alias("target") \
.merge(
updates.alias("source"),
"target.id = source.id"
) \
.whenMatchedUpdate(set={"value": "source.value"}) \
.whenNotMatchedInsert(
values={"id": "source.id", "value": "source.value"}
) \
.execute()
# Time travel
df = spark.read.format("delta") \
.option("versionAsOf", 10) \
.load("/path/to/delta-table")
Best Practices
- Incremental processing: Process only new data
- Idempotency: Same input produces same output
- Data validation: Check quality at every stage
- Monitoring: Track pipeline health and performance
- Error handling: Retry logic, dead letter queues
- Partitioning: Partition large datasets by date/category
- Compression: Use Parquet, ORC for storage efficiency
Didn't find tool you were looking for?