codingstairs
NotesEDULifeContact
⌕Search⌘K
koen

Navigation

  • Intro
  • Blog
  • Life

Get in touch

Send without signing in. Add your email if you'd like a reply.

  • Leave a message anonymously →
  • ✉ warragon112@gmail.com
  • KakaoTalk Open Chat ↗

© 2026 codingstairs

  • Notes
  • EDU
  • Search
  • Life
  • Contact
  • Legal
  • RSS
  • GitHub
EDU›Building public-data crawlers›Step 5

Step 5

Incremental collection · deduplication

0 views

Incremental collection · deduplication

Crawling everything every day wastes resources and invites bans. Crawl only what changed.

1. Checkpoint the last success

CREATE TABLE crawl_checkpoints (
  source VARCHAR PRIMARY KEY,
  last_run_at TIMESTAMPTZ,
  last_successful_id BIGINT
);
last = await db.fetchrow(
  "SELECT last_run_at FROM crawl_checkpoints WHERE source = $1", "nps"
)
since = last["last_run_at"] if last else datetime(2000, 1, 1)
new_data = await fetch_since(since)

await db.execute(
  "INSERT INTO crawl_checkpoints (source, last_run_at) VALUES ($1, now()) "
  "ON CONFLICT (source) DO UPDATE SET last_run_at = EXCLUDED.last_run_at",
  "nps"
)

2. Natural-key UNIQUE

CREATE TABLE companies (
  id BIGSERIAL PRIMARY KEY,
  business_no VARCHAR UNIQUE NOT NULL,
  name VARCHAR NOT NULL,
  updated_at TIMESTAMPTZ DEFAULT now()
);

INSERT INTO companies (business_no, name)
VALUES ($1, $2)
ON CONFLICT (business_no) DO UPDATE
  SET name = EXCLUDED.name, updated_at = now();

3. Content hash

import hashlib
def content_hash(text): return hashlib.sha256(text.encode()).hexdigest()

new_hash = content_hash(article.body)
row = await db.fetchrow("SELECT content_hash FROM articles WHERE url = $1", url)
if row and row["content_hash"] == new_hash: return

await db.execute(
  "INSERT INTO articles (url, body, content_hash) VALUES ($1, $2, $3) "
  "ON CONFLICT (url) DO UPDATE SET body = $2, content_hash = $3, updated_at = now()",
  url, article.body, new_hash
)

4. ?since= params

resp = await client.get("https://opendart.fss.or.kr/list", params={
    "corp_code": "...",
    "bgn_de": since.strftime("%Y%m%d"),
    "end_de": today.strftime("%Y%m%d"),
})

5. Paginate with a cap

async def crawl_all_pages(source):
    page = 1
    while True:
        items = await fetch_page(source, page)
        if not items: break
        await save_batch(items)
        page += 1
        if page > 1000:
            logger.warning("exceeded page limit", source=source); break

6. Parallel + dedup

seen = set(); unique = []
for item in items:
    if item.id not in seen:
        seen.add(item.id); unique.append(item)

Or rely on DB UNIQUE + ON CONFLICT.

7. Delete detection

ALTER TABLE companies ADD COLUMN last_seen_at TIMESTAMPTZ;
UPDATE companies SET last_seen_at = now() WHERE business_no = $1;
SELECT * FROM companies WHERE last_seen_at < now() - interval '30 days';

Prefer soft-flags (is_active) over hard deletes.

8. Backfill

async def crawl(mode="incremental"):
    since = datetime(2000, 1, 1) if mode == "backfill" else await get_last_checkpoint()

Explicit CLI/admin trigger for backfill.

9. Gotchas

  • Missed checkpoint updates → re-crawl full set
  • Missing UNIQUE → duplicate rows
  • Hash diffs from whitespace — normalize
  • Hard-deleting "gone" items — prefer soft-flag

10. Data quality

async def validate(row):
    assert row.business_no and len(row.business_no) == 10
    assert row.name and len(row.name) < 200

Validate at ingest; bad rows never reach storage.

Closing

Crawling everything daily is the path to being banned. Day-one checkpoints + UNIQUE + hashes cut maintenance later.

Next

  • 06-observability-alerts

← Step 4

APScheduler + KST schedules

Step 6 →

Observability · alerts