codingstairs
노트에듀라이프연락
⌕검색⌘K
koen

Navigation

  • Intro
  • Blog
  • Life

연락하기

로그인 없이도 보낼 수 있어요. 답변이 필요하면 이메일을 함께 적어 주세요.

  • 익명 폼으로 의견 남기기 →
  • ✉ warragon112@gmail.com
  • 카카오톡 오픈채팅 ↗

© 2026 codingstairs

  • 노트
  • 에듀
  • 검색
  • 라이프
  • 연락
  • 약관
  • RSS
  • GitHub
에듀›Python · FastAPI · 데이터 파이프라인›6단계

6단계

6단계 — 데이터 파이프라인

0회 조회

6단계 — 데이터 파이프라인

"외부 → 가공 → DB" 흐름이 한 줄로 이어지면 그게 곧 파이프라인 이에요. ETL 패턴이 표준.

ETL — Extract, Transform, Load

[외부 API/CSV/HTML]
    ↓ Extract (받아오기)
[원본 raw 데이터]
    ↓ Transform (정제)
[정형 데이터 (스키마 일치)]
    ↓ Load (저장)
[PostgreSQL row]

각 단계를 함수 로 분리해 두면 테스트와 재사용이 쉬워져요.

첫 파이프라인

# services/external_sync_service.py
from utils.http_client import RateLimitedClient
from db.connection import get_conn

client = RateLimitedClient("https://api.example.com", requests_per_second=2)

def extract():
    """외부 API 에서 원본 가져오기"""
    return client.get("/items").json()

def transform(raw: list[dict]) -> list[dict]:
    """필드 정제·타입 변환"""
    return [
        {
            "external_id": item["id"],
            "name": item["name"].strip(),
            "price": int(item["price"]),
            "synced_at": datetime.now(tz=UTC),
        }
        for item in raw
        if item.get("name")  # 빈 이름 필터
    ]

def load(rows: list[dict]):
    """DB UPSERT"""
    with get_conn() as conn, conn.cursor() as cur:
        for row in rows:
            cur.execute("""
                INSERT INTO external_items (external_id, name, price, synced_at)
                VALUES (%(external_id)s, %(name)s, %(price)s, %(synced_at)s)
                ON CONFLICT (external_id) DO UPDATE
                  SET name = EXCLUDED.name,
                      price = EXCLUDED.price,
                      synced_at = EXCLUDED.synced_at
            """, row)

def sync():
    """전체 흐름"""
    raw = extract()
    rows = transform(raw)
    load(rows)
    return len(rows)

세 함수가 각자의 책임 만 가져요 — 테스트하기도 쉬워요.

부분 실패 대응

10,000건 동기화 중 1건 실패하면 9,999건도 롤백할까? 경우에 따라.

  • 소량·강한 정합성: 전체 트랜잭션 → 1건 실패 시 모두 롤백
  • 대량·약한 정합성: 배치 단위 (예: 500개씩) 트랜잭션 → 부분 성공 허용
BATCH = 500

def load(rows: list[dict]):
    for i in range(0, len(rows), BATCH):
        batch = rows[i:i + BATCH]
        try:
            with get_conn() as conn, conn.cursor() as cur:
                # … batch INSERT
                pass
        except Exception as e:
            logger.error(f"batch {i}~{i + BATCH} failed: {e}")
            continue  # 다음 배치 진행

APScheduler 와 결합

scheduler.add_job(sync, IntervalTrigger(minutes=10))

10분마다 자동으로 외부 → DB 동기화. 4단계의 멱등성 + 5단계의 윤리가 모두 적용됨.

직접 해 보기

JSONPlaceholder 의 /posts 를 가져와 external_posts 테이블에 UPSERT 하는 파이프라인을 만들어 보세요. 10분 간격으로 자동 실행.

더 깊이

  • 데이터 파이프라인 노트
  • 백업·복원 노트

다음 단계

마지막 7단계에서는 이 모든 게 살아 있는지 자동으로 확인하는 헬스체크와 관측을 배워요.

← 5단계

5단계 — 외부 API · 크롤러 윤리

7단계 →

7단계 — 헬스체크·관측