codingstairs
NotesEDULifeContact
⌕Search⌘K
koen

Navigation

  • Intro
  • Blog
  • Life

Get in touch

Send without signing in. Add your email if you'd like a reply.

  • Leave a message anonymously →
  • ✉ warragon112@gmail.com
  • KakaoTalk Open Chat ↗

© 2026 codingstairs

  • Notes
  • EDU
  • Search
  • Life
  • Contact
  • Legal
  • RSS
  • GitHub
EDU›Python · FastAPI · Data Pipelines›Step 6

Step 6

Step 6 — Data pipeline

0 views

Step 6 — Data pipeline

External → transform → DB. That's a pipeline. ETL is the standard pattern.

ETL — Extract, Transform, Load

[external API/CSV/HTML]
    ↓ Extract
[raw]
    ↓ Transform (clean, normalize)
[shaped rows]
    ↓ Load
[Postgres rows]

Three functions, three responsibilities — easy to test and reuse.

First pipeline

def extract():
    return client.get("/items").json()

def transform(raw):
    return [
        {
            "external_id": item["id"],
            "name": item["name"].strip(),
            "price": int(item["price"]),
            "synced_at": datetime.now(tz=UTC),
        }
        for item in raw
        if item.get("name")
    ]

def load(rows):
    with get_conn() as conn, conn.cursor() as cur:
        for row in rows:
            cur.execute("""
                INSERT INTO external_items (external_id, name, price, synced_at)
                VALUES (%(external_id)s, %(name)s, %(price)s, %(synced_at)s)
                ON CONFLICT (external_id) DO UPDATE
                  SET name = EXCLUDED.name,
                      price = EXCLUDED.price,
                      synced_at = EXCLUDED.synced_at
            """, row)

def sync():
    raw = extract()
    rows = transform(raw)
    load(rows)
    return len(rows)

Partial-failure strategy

10,000 rows, 1 fails — roll all back? It depends.

  • Small + strict: one transaction, fail-rollback-all
  • Large + loose: 500-row batches, allow partial success

Combine with APScheduler

scheduler.add_job(sync, IntervalTrigger(minutes=10))

Idempotency (step 4) + ethics (step 5) all apply.

Try it

Pull JSONPlaceholder /posts and UPSERT into external_posts, every 10 minutes.

Next

Step 7 makes the service observable.

← Step 5

Step 5 — External APIs · crawler ethics

Step 7 →

Step 7 — Observability