Full Pipeline¶
This tutorial demonstrates a complete ETL pipeline: read data, clean nulls, filter, join, aggregate, cast to output schema, and write results.
Runnable example
The complete code is in examples/full_pipeline.py.
Define schemas¶
import colnade as cn
class Users(cn.Schema):
id: cn.Column[cn.UInt64]
name: cn.Column[cn.Utf8]
age: cn.Column[cn.UInt64]
score: cn.Column[cn.Float64 | None]
class Orders(cn.Schema):
id: cn.Column[cn.UInt64]
user_id: cn.Column[cn.UInt64]
amount: cn.Column[cn.Float64]
class UserOrders(cn.Schema):
"""Intermediate schema after joining users with orders."""
user_name: cn.Column[cn.Utf8] = cn.mapped_from(Users.name)
user_id: cn.Column[cn.UInt64] = cn.mapped_from(Users.id)
amount: cn.Column[cn.Float64] = cn.mapped_from(Orders.amount)
class UserRevenue(cn.Schema):
"""Output schema: per-user revenue summary."""
user_name: cn.Column[cn.Utf8]
user_id: cn.Column[cn.UInt64]
total_amount: cn.Column[cn.Float64]
Step 1: Read typed data¶
from colnade_polars import read_parquet
users = read_parquet("users.parquet", Users)
orders = read_parquet("orders.parquet", Orders)
Both read_parquet calls return typed DataFrames with the Polars backend attached. When validation is enabled, they also check the data against the schemas.
Step 2: Clean nulls¶
Step 3: Filter¶
Step 4: Join¶
Step 5: Cast to intermediate schema¶
mapped_from resolves user_name, user_id, and amount automatically from the join result.
Step 6: Aggregate¶
revenue = (
user_orders.group_by(UserOrders.user_name, UserOrders.user_id)
.agg(UserOrders.amount.sum().alias(UserRevenue.total_amount))
.cast_schema(UserRevenue)
)
group_by().agg() computes per-user totals. The result is cast to UserRevenue.
Step 7: Sort and write¶
from colnade_polars import write_parquet
result = revenue.sort(UserRevenue.total_amount.desc())
write_parquet(result, "user_revenue.parquet")
Step 8: Verify¶
restored = read_parquet("user_revenue.parquet", UserRevenue)
restored.validate() # confirms data matches schema
Pipeline summary¶
read_parquet(Users) → fill_null → filter(age >= 25)
↓
read_parquet(Orders) ──────────→ join(Users.id == Orders.user_id)
↓
cast_schema(UserOrders)
↓
group_by().agg(sum) → cast_schema(UserRevenue)
↓
sort(total_amount desc)
↓
write_parquet(result)
Every step in this pipeline is type-checked. The type checker verifies that column references exist, schema types match at boundaries, and mapped_from assignments are type-compatible.