codingstairs
NotesEDULifeContact
⌕Search⌘K
koen

Navigation

  • Intro
  • Blog
  • Life

Get in touch

Send without signing in. Add your email if you'd like a reply.

  • Leave a message anonymously →
  • ✉ warragon112@gmail.com
  • KakaoTalk Open Chat ↗

© 2026 codingstairs

  • Notes
  • EDU
  • Search
  • Life
  • Contact
  • Legal
  • RSS
  • GitHub
EDU›Building public-data crawlers›Step 4

Step 4

APScheduler + KST schedules

0 views

APScheduler + KST schedules

The standard Python scheduler — easier than system cron for in-repo management.

1. Install

uv add apscheduler

2. Basic async scheduler

from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
import pytz

KST = pytz.timezone("Asia/Seoul")
scheduler = AsyncIOScheduler(timezone=KST)

@scheduler.scheduled_job(CronTrigger(hour=3, minute=0), id="daily-nps-crawl")
async def daily_crawl():
    await crawl_nps()

scheduler.start()

3. Triggers

CronTrigger(hour=3, minute=0)
IntervalTrigger(hours=1)
DateTrigger(run_date=datetime(2026, 5, 10, 9, 0))
CronTrigger(day_of_week="mon,wed,fri", hour=3)

4. Idempotency options

JOB_DEFAULTS = {
    "max_instances": 1,
    "coalesce": True,
    "misfire_grace_time": 300,
    "replace_existing": True,
}

scheduler = AsyncIOScheduler(timezone=KST, job_defaults=JOB_DEFAULTS)

5. Wire to FastAPI lifespan

@asynccontextmanager
async def lifespan(app):
    scheduler.start(); yield; scheduler.shutdown(wait=True)

app = FastAPI(lifespan=lifespan)

6. Manual trigger

@app.post("/admin/jobs/{job_id}/run")
async def trigger(job_id: str):
    job = scheduler.get_job(job_id)
    if not job: raise HTTPException(404)
    job.modify(next_run_time=datetime.now(KST))
    return {"ok": True}

7. Multi-instance — distributed lock

async def crawl_with_lock():
    async with redis_lock("lock:daily-nps-crawl", ttl=3600):
        await crawl_nps()

8. Persistent jobstore

from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore

scheduler = AsyncIOScheduler(
    jobstores={"default": SQLAlchemyJobStore(url="postgresql://...")},
    timezone=KST,
)

Often overkill; decorator-based registration is simpler.

9. Record outcomes

@scheduler.scheduled_job(CronTrigger(hour=3), id="nps")
async def nps_job():
    start = time.time()
    try:
        rows = await crawl_nps()
        await db.execute(
            "INSERT INTO crawl_runs (source, status, rows, duration_ms) VALUES ($1, $2, $3, $4)",
            "nps", "ok", rows, int((time.time() - start) * 1000)
        )
    except Exception as e:
        await db.execute(
            "INSERT INTO crawl_runs (source, status, error) VALUES ($1, $2, $3)",
            "nps", "fail", str(e))
        raise

10. Gotchas

  • Missing timezone → UTC surprises
  • Default misfire_grace_time too small
  • Multiple instances without a lock → duplicate runs
  • Async jobs inside a sync scheduler

Closing

APScheduler + KST + the four idempotency options is the default set for Python backend scheduling.

Next

  • 05-incremental-dedup

← Step 3

Rate limit · retries · backoff

Step 5 →

Incremental collection · deduplication