codingstairs
노트에듀라이프연락
⌕검색⌘K
koen

Navigation

  • Intro
  • Blog
  • Life

연락하기

로그인 없이도 보낼 수 있어요. 답변이 필요하면 이메일을 함께 적어 주세요.

  • 익명 폼으로 의견 남기기 →
  • ✉ warragon112@gmail.com
  • 카카오톡 오픈채팅 ↗

© 2026 codingstairs

  • 노트
  • 에듀
  • 검색
  • 라이프
  • 연락
  • 약관
  • RSS
  • GitHub
에듀›공공데이터 크롤러 만들기›5단계

5단계

증분 수집 · 중복 해소

0회 조회

증분 수집 · 중복 해소

매일 전량 크롤링은 낭비 · 차단 유발. 바뀐 것만 수집.

1. 체크포인트 — 마지막 성공 시점

CREATE TABLE crawl_checkpoints (
  source VARCHAR PRIMARY KEY,
  last_run_at TIMESTAMPTZ,
  last_successful_id BIGINT
);
last = await db.fetchrow(
    "SELECT last_run_at FROM crawl_checkpoints WHERE source = $1", "nps"
)
since = last["last_run_at"] if last else datetime(2000, 1, 1)

new_data = await fetch_since(since)

await db.execute(
    "INSERT INTO crawl_checkpoints (source, last_run_at) VALUES ($1, now()) "
    "ON CONFLICT (source) DO UPDATE SET last_run_at = EXCLUDED.last_run_at",
    "nps"
)

2. 자연 키 UNIQUE

수집 데이터의 외부 ID (사업자번호 · 병원 ykiho · 문서 ID 등) 를 UNIQUE 컬럼.

CREATE TABLE companies (
  id BIGSERIAL PRIMARY KEY,
  business_no VARCHAR UNIQUE NOT NULL,
  name VARCHAR NOT NULL,
  updated_at TIMESTAMPTZ DEFAULT now()
);

INSERT INTO companies (business_no, name)
VALUES ($1, $2)
ON CONFLICT (business_no) DO UPDATE SET name = EXCLUDED.name, updated_at = now();
  • 중복 삽입 자동 방어
  • 변경 감지 (이름 바뀌면 updated_at 갱신)

3. content hash — 본문 변경 감지

텍스트 긴 경우 전체 비교는 비쌈. hash 로 빠르게.

import hashlib

def content_hash(text: str) -> str:
    return hashlib.sha256(text.encode()).hexdigest()

new_hash = content_hash(article.body)
row = await db.fetchrow("SELECT content_hash FROM articles WHERE url = $1", url)

if row and row["content_hash"] == new_hash:
    return  # 변경 없음 skip

await db.execute(
    "INSERT INTO articles (url, body, content_hash) VALUES ($1, $2, $3) "
    "ON CONFLICT (url) DO UPDATE SET body = $2, content_hash = $3, updated_at = now()",
    url, article.body, new_hash
)

문자 한 개 변경도 hash 다름. 정밀 감지.

4. API 의 ?since= 파라미터

많은 공공 API 가 증분 지원.

resp = await client.get("https://opendart.fss.or.kr/list", params={
    "corp_code": "...",
    "bgn_de": since.strftime("%Y%m%d"),   # 시작일
    "end_de": today.strftime("%Y%m%d"),
})

크롤링 자체 대신 API 가 증분 제공하면 그쪽.

5. 페이지네이션 — 안 끝나는 크롤

async def crawl_all_pages(source: str):
    page = 1
    while True:
        items = await fetch_page(source, page)
        if not items:
            break
        await save_batch(items)
        page += 1
        if page > 1000:
            logger.warning("exceeded page limit", source=source)
            break

실수로 무한 루프 방지 — 항상 상한.

6. 병렬 수집 + 중복 해소

여러 워커가 동시 크롤링 시 race condition.

# 방법 A — 배치 내 중복 제거
items = [...]   # 크롤 결과
seen = set()
unique = []
for item in items:
    if item.id not in seen:
        seen.add(item.id)
        unique.append(item)

# 방법 B — DB UNIQUE + ON CONFLICT 에 위임
# (동시 INSERT 여도 DB 가 해결)

7. 삭제 감지

원본에서 사라진 데이터 (예: 폐업한 업체) 처리.

ALTER TABLE companies ADD COLUMN last_seen_at TIMESTAMPTZ;

-- 수집 시 모두 last_seen_at 갱신
UPDATE companies SET last_seen_at = now() WHERE business_no = $1;

-- 한 달 이상 미관측 → 폐업 의심
SELECT * FROM companies WHERE last_seen_at < now() - interval '30 days';

즉시 DELETE 는 위험. soft flag (is_active) 로.

8. 백필 (backfill)

기존 데이터가 일부만 있을 때 전체 재수집.

# 플래그로 구분
async def crawl(mode: str = "incremental"):
    if mode == "backfill":
        since = datetime(2000, 1, 1)
    else:
        since = await get_last_checkpoint()
    # ...

운영자가 CLI · 관리 UI 에서 mode=backfill 명시적 실행.

9. 자주 걸리는 자리

  • 체크포인트 갱신 누락 — 다음 run 에서 전량 재수집
  • UNIQUE 컬럼 누락 — 중복 행 수만 증가
  • hash 만 비교 — 의미상 동일하지만 공백 차이로 hash 다름. 정규화 필요
  • 폐업 detection 없이 soft-delete 안 하고 행 삭제 — 과거 이력 소실

10. 데이터 품질 검증

async def validate(row):
    assert row.business_no and len(row.business_no) == 10
    assert row.name and len(row.name) < 200
    # 예상 범위 체크

수집 시 즉시 검증. 잘못된 데이터 저장 방지.

하고픈 말

"전량 매일" 은 1 년 후 차단당할 길. 증분 수집은 예의 + 성능 + 안정성 3 가지 모두 얻습니다. 첫 주에 체크포인트 · UNIQUE · hash 를 세팅하면 이후 관리 비용 급감.

Next

  • 06-observability-alerts

← 4단계

APScheduler + KST 스케줄

6단계 →

관측 · 알림