Skip to content

Full Pipeline

This tutorial demonstrates a complete ETL pipeline: read data, clean nulls, filter, join, aggregate, cast to output schema, and write results.

Runnable example

The complete code is in examples/full_pipeline.py.

Define schemas

import colnade as cn

class Users(cn.Schema):
    id: cn.Column[cn.UInt64]
    name: cn.Column[cn.Utf8]
    age: cn.Column[cn.UInt64]
    score: cn.Column[cn.Float64 | None]

class Orders(cn.Schema):
    id: cn.Column[cn.UInt64]
    user_id: cn.Column[cn.UInt64]
    amount: cn.Column[cn.Float64]

class UserOrders(cn.Schema):
    """Intermediate schema after joining users with orders."""
    user_name: cn.Column[cn.Utf8] = cn.mapped_from(Users.name)
    user_id: cn.Column[cn.UInt64] = cn.mapped_from(Users.id)
    amount: cn.Column[cn.Float64] = cn.mapped_from(Orders.amount)

class UserRevenue(cn.Schema):
    """Output schema: per-user revenue summary."""
    user_name: cn.Column[cn.Utf8]
    user_id: cn.Column[cn.UInt64]
    total_amount: cn.Column[cn.Float64]

Step 1: Read typed data

from colnade_polars import read_parquet

users = read_parquet("users.parquet", Users)
orders = read_parquet("orders.parquet", Orders)

Both read_parquet calls return typed DataFrames with the Polars backend attached. When validation is enabled, they also check the data against the schemas.

Step 2: Clean nulls

users_clean = users.with_columns(
    Users.score.fill_null(0.0).alias(Users.score)
)

Step 3: Filter

active_users = users_clean.filter(Users.age >= 25)

Step 4: Join

joined = active_users.join(orders, on=Users.id == Orders.user_id)

Step 5: Cast to intermediate schema

user_orders = joined.cast_schema(UserOrders)

mapped_from resolves user_name, user_id, and amount automatically from the join result.

Step 6: Aggregate

revenue = (
    user_orders.group_by(UserOrders.user_name, UserOrders.user_id)
    .agg(UserOrders.amount.sum().alias(UserRevenue.total_amount))
    .cast_schema(UserRevenue)
)

group_by().agg() computes per-user totals. The result is cast to UserRevenue.

Step 7: Sort and write

from colnade_polars import write_parquet

result = revenue.sort(UserRevenue.total_amount.desc())
write_parquet(result, "user_revenue.parquet")

Step 8: Verify

restored = read_parquet("user_revenue.parquet", UserRevenue)
restored.validate()  # confirms data matches schema

Pipeline summary

read_parquet(Users) → fill_null → filter(age >= 25)
read_parquet(Orders) ──────────→ join(Users.id == Orders.user_id)
                              cast_schema(UserOrders)
                              group_by().agg(sum) → cast_schema(UserRevenue)
                              sort(total_amount desc)
                              write_parquet(result)

Every step in this pipeline is type-checked. The type checker verifies that column references exist, schema types match at boundaries, and mapped_from assignments are type-compatible.