전자공시 대량 수집은 왜 느리고 어디서 막히나
Quick Summary

OpenDART와 EDGAR에서 수천 종목 데이터를 대량 수집할 때 부딪히는 rate limit, 네트워크 장애, 재시도 전략을 정리한다. 단순 반복 호출이 아닌 실전 파이프라인 설계 기준을 다룬다.

전자공시 대량 수집은 왜 느리고 어디서 막히나

종목 하나의 재무제표를 가져오는 건 쉽다. API를 호출하고 JSON을 파싱하면 끝이다. 문제는 2,700개 종목을 전부 수집하려 할 때 시작된다. OpenDART는 하루 호출 횟수에 벽이 있고, EDGAR는 초당 속도에 벽이 있다. 벽에 부딪히면 수집이 멈추는 것이 아니라 차단을 당한다. 차단 해제에 며칠이 걸릴 수 있다.

대량 수집은 단순 반복 호출이 아니다. rate limit을 인식하고, 실패를 분류하고, 중단된 지점에서 이어받고, 수집한 데이터를 검증하는 파이프라인을 설계해야 한다. 이 글은 OpenDART와 EDGAR 양쪽을 대상으로, 수천 종목 규모의 수집에서 실제로 부딪히는 병목과 그에 대응하는 실전 전략을 정리한다.

전자공시 대량 수집 파이프라인 전체 구조


단건 조회와 대량 수집은 왜 근본적으로 다른가

API 문서만 보면, 대량 수집은 단건 조회를 N번 반복하면 될 것 같다. 실제로는 전혀 다른 문제를 풀어야 한다.

단건 조회는 요청 → 응답 → 끝이다. 실패하면 한 번 더 시도하면 된다. 대량 수집에서는 다음과 같은 문제가 동시에 터진다.

  • 할당량 소진: 2,700개 종목 × 4개 API = 10,800건. OpenDART 일일 한도 10,000건을 초과한다. 하루에 끝나지 않는다.
  • 속도 제한: EDGAR에서 초당 11건을 보내면 IP가 차단된다. 한 번 차단되면 SEC에 이메일을 보내고 며칠을 기다려야 한다.
  • 중간 실패: 1,847번째 종목에서 네트워크가 끊기면, 앞서 수집한 1,846건은 어디에 있는가? 체크포인트가 없으면 처음부터 다시 시작해야 한다.
  • 데이터 정합성: 수집 도중 원본 데이터가 갱신되면, 앞서 받은 데이터와 나중에 받은 데이터의 시점이 다르다.
  • 저장소 관리: 수천 개 파일을 어떻게 분류하고, 업데이트가 필요한 파일만 골라내는가?

이 문제들은 단건 조회 로직을 for문으로 감싸는 것만으로는 해결되지 않는다. 수집, 검증, 저장, 복구를 분리한 파이프라인 아키텍처가 필요하다.


OpenDART와 EDGAR의 rate limit 정책은 근본적으로 다르다

같은 “공시 API”지만 DART와 EDGAR의 제한 방식은 완전히 다르다. 하나의 수집 코드로 양쪽을 처리하면 반드시 한쪽에서 문제가 생긴다.

OpenDART vs EDGAR rate limit 정책 비교

OpenDART: 일일 총량이 병목

OpenDART API는 API key 하나당 하루 10,000건의 요청을 허용한다. DART 전자공시 관련 모든 API가 이 한도를 공유한다. 무료 키(미등록)는 하루 1,000건으로 더 제한적이다.

핵심적인 차이는 초과 시 응답 방식이다. OpenDART는 HTTP 429를 반환하지 않는다. 대신 HTTP 200에 JSON 본문의 status 필드로 에러를 알려준다. 일반적인 HTTP 에러 핸들링으로는 잡을 수 없다.

import httpx

DART_API_KEY = "your_api_key"
BASE_URL = "https://opendart.fss.or.kr/api"

def fetch_dart(endpoint: str, params: dict) -> dict | None:
    """OpenDART API 호출 — 에러코드 기반 핸들링"""
    params["crtfc_key"] = DART_API_KEY
    response = httpx.get(f"{BASE_URL}/{endpoint}.json", params=params)
    data = response.json()

    status = data.get("status")
    if status == "000":
        return data  # 정상
    if status == "020":
        # 일일 한도 초과 — 재시도 무의미, 다음 날까지 대기
        return None
    if status == "013":
        # 조회된 데이터 없음 — 정상적 빈 응답
        return {"list": []}
    return None

status == "020"이 뜨면 오늘은 끝이다. 같은 key로 몇 번을 재시도해도 같은 응답이 돌아온다. 이 시점에서 체크포인트를 저장하고 다음 날 이어받아야 한다.

분당 제한은 공식적으로 공개되어 있지 않지만, 실측 기준 분당 600건을 넘기면 응답 속도가 눈에 띄게 느려진다. 분당 300건 수준(요청당 0.2초 간격)이면 안정적이다.

EDGAR: 초당 속도가 병목

SEC EDGAR는 일일 제한이 없는 대신 초당 10건을 넘기면 즉시 HTTP 429를 반환한다. 반복적으로 위반하면 IP 자체를 차단한다. 차단이 걸리면 SEC 웹마스터(webmaster@sec.gov)에 이메일로 해제를 요청해야 하는데, 1~3 영업일이 걸린다.

EDGAR는 반드시 User-Agent 헤더에 조직명과 연락처 이메일을 포함해야 한다. 이 헤더가 없거나 브라우저를 흉내낸 값이면 요청 자체가 403으로 거부된다.

import httpx
import asyncio

EDGAR_HEADERS = {
    "User-Agent": "MyCompany admin@mycompany.com",
    "Accept-Encoding": "gzip, deflate",
}

async def fetch_edgar(client: httpx.AsyncClient, url: str) -> bytes | None:
    """EDGAR 요청 — User-Agent 필수, 429 핸들링"""
    response = await client.get(url, headers=EDGAR_HEADERS)

    if response.status_code == 200:
        return response.content
    if response.status_code == 429:
        retry_after = int(response.headers.get("Retry-After", "10"))
        await asyncio.sleep(retry_after)
        return None
    if response.status_code == 403:
        # User-Agent 누락 또는 IP 차단
        return None
    return None

비교 테이블: OpenDART vs EDGAR

항목OpenDARTEDGAR
일일 한도10,000건 (등록 key) / 1,000건 (미등록)없음
초당 한도공식 없음 (체감 분당 600건)10건/초 (공식)
초과 시 응답HTTP 200 + JSON status “020”HTTP 429 + Retry-After 헤더
인증 방식API key (쿼리 파라미터)User-Agent 헤더 (조직명 + 이메일)
차단 해제다음 날 자동 리셋SEC 이메일 요청 (1~3일)
병렬 수집 효과낮음 (총량 병목)높음 (속도 병목)
주요 에러코드“020” 한도초과, “013” 데이터없음, “800” 시스템오류429 속도초과, 403 인증실패, 503 서버과부하
데이터 형식JSON (API 응답)SGML/HTML/XBRL (filing 원문)

이 차이 때문에 DART 수집기와 EDGAR 수집기는 별도로 설계해야 한다. DART는 순차 처리 + 일일 할당량 관리가 핵심이고, EDGAR는 병렬 처리 + 초당 속도 제어가 핵심이다.


HTTP 429 핸들링과 지수 백오프 재시도

API가 “느려지라”는 신호를 보냈는데 같은 속도로 재시도하면 상황만 악화된다. 지수 백오프(exponential backoff)는 실패할 때마다 대기 시간을 2배로 늘려서 서버에 가하는 압력을 점진적으로 줄이는 전략이다.

네트워크 장애 대응과 재시도 전략

import random
import time
from typing import Callable, TypeVar

T = TypeVar("T")

def with_retry(
    fn: Callable[..., T],
    *args,
    max_retries: int = 5,
    base_wait: float = 1.0,
    max_wait: float = 60.0,
) -> T | None:
    """지수 백오프 + jitter 재시도 래퍼"""
    for attempt in range(max_retries + 1):
        try:
            result = fn(*args)
            if result is not None:
                return result
        except httpx.HTTPStatusError as e:
            if e.response.status_code == 429:
                retry_after = e.response.headers.get("Retry-After")
                if retry_after:
                    wait = int(retry_after)
                else:
                    wait = min(
                        base_wait * (2 ** attempt) + random.uniform(0, 1),
                        max_wait,
                    )
                time.sleep(wait)
                continue
            raise
        except (httpx.ConnectError, httpx.ReadTimeout):
            wait = min(
                base_wait * (2 ** attempt) + random.uniform(0, 1),
                max_wait,
            )
            time.sleep(wait)
            continue
    return None

jitter(무작위 지연)가 핵심이다. 10개 클라이언트가 동시에 실패하면, jitter 없이는 모두 정확히 2초 후에 재시도를 보낸다. random.uniform(0, 1)이면 재시도 시점이 흩어져서 서버에 가해지는 순간 부하가 줄어든다.

Retry-After 헤더가 있으면 백오프보다 우선한다

EDGAR는 429 응답에 Retry-After 헤더를 포함한다. 이 값이 있으면 계산된 백오프 시간을 무시하고 서버가 지정한 시간만큼 대기해야 한다. OpenDART는 429를 반환하지 않으므로 이 헤더 자체가 없다. OpenDART에서의 재시도 판단은 JSON 본문의 status 코드에 의존한다.

네트워크 장애 유형별 대응

대량 수집에서 만나는 장애는 rate limit만이 아니다.

장애 유형증상대응 전략
연결 타임아웃ConnectTimeout지수 백오프 재시도 (네트워크 일시 불안정)
읽기 타임아웃ReadTimeout타임아웃 값 늘리고 재시도
부분 다운로드응답 body가 잘림Content-Length 대비 실제 크기 검증 후 재시도
DNS 실패ConnectError잠시 대기 후 재시도 (DNS 서버 일시 장애)
SSL 핸드셰이크 실패SSLError재시도 (서버측 인증서 갱신 중) 또는 환경 점검
서버 503Service Unavailable장시간 대기 후 재시도 (서버 점검)

부분 다운로드는 특히 EDGAR에서 자주 발생한다. filing 파일이 수십 MB에 달하기 때문이다. 응답의 Content-Length 헤더와 실제 수신 바이트 수를 비교하는 검증이 필요하다.

def validate_download(response: httpx.Response) -> bool:
    """다운로드 완전성 검증"""
    content_length = response.headers.get("Content-Length")
    if content_length is None:
        return True  # 서버가 길이를 안 알려주면 검증 불가
    expected = int(content_length)
    actual = len(response.content)
    return actual == expected

GitHub Releases를 데이터 배포 레이어로 활용하기

대량 수집의 결과물을 어떻게 배포하는가도 중요한 문제다. 수천 개의 parquet 파일을 사용자에게 전달하려면 별도의 파일 서버나 CDN이 필요하다. dartlab은 GitHub Releases를 데이터 배포 레이어로 사용한다.

GitHub Releases 기반 데이터 분산 저장 구조

GitHub Releases의 제약과 분할 전략

GitHub Release 하나의 태그에는 최대 1,000개 에셋을 첨부할 수 있다. 이것은 GitHub 플랫폼의 하드 제한이다. 종목 수가 1,000개를 넘으면 하나의 태그에 모든 파일을 올릴 수 없다.

dartlab의 분할 전략은 다음과 같다.

  • docs: 단일 태그 data-docs (272개 파일, 1,000개 이하이므로 분할 불필요)
  • finance: 종목코드 범위별 4개 shard로 분할
    • data-finance-1: 종목코드 000000~049999 (928개)
    • data-finance-2: 종목코드 050000~099999 (496개)
    • data-finance-3: 종목코드 100000~199999 (378개)
    • data-finance-4: 종목코드 200000~999999 + 비숫자 코드 (941개)

종목코드를 정수로 변환해서 범위에 맞는 태그에 업로드한다. 비숫자 종목코드(예: 0004V0)는 마지막 shard에 포함된다.

def finance_tag(stock_code: str) -> str:
    """종목코드 → GitHub Release 태그 결정"""
    try:
        code_int = int(stock_code)
    except ValueError:
        return "data-finance-4"  # 비숫자 코드

    if code_int < 50_000:
        return "data-finance-1"
    if code_int < 100_000:
        return "data-finance-2"
    if code_int < 200_000:
        return "data-finance-3"
    return "data-finance-4"

이 분할 로직은 중앙 config(dataConfig.py)에서 관리한다. shard를 추가하거나 범위를 바꿀 때 이 config 한 곳만 수정하면 업로드, 다운로드, 문서가 전부 반영된다.

에셋 업로드와 다운로드

GitHub Release 에셋은 gh release upload CLI로 올리고, HTTP GET으로 다운로드한다. 대량 업로드 시에도 GitHub API의 rate limit(5,000 req/hour for authenticated)을 고려해야 한다.

import subprocess

def upload_to_release(tag: str, file_path: str) -> bool:
    """GitHub Release에 parquet 파일 업로드"""
    result = subprocess.run(
        ["gh", "release", "upload", tag, file_path, "--clobber"],
        capture_output=True,
        text=True,
    )
    return result.returncode == 0

--clobber 옵션은 같은 이름의 에셋이 이미 있으면 덮어쓴다. 증분 업데이트 시 유용하다.

왜 GitHub Releases인가

전용 파일 서버, S3, GCS 같은 클라우드 스토리지도 선택지다. GitHub Releases를 쓰는 이유는 세 가지다.

  1. 무료: 오픈소스 프로젝트에서 추가 비용이 없다.
  2. CDN 내장: GitHub의 글로벌 CDN을 통해 다운로드가 분산된다.
  3. 버전 관리: 태그별로 에셋이 묶이므로, 과거 버전의 데이터도 보존된다.

단점은 에셋당 2GB 제한과 태그당 1,000개 에셋 제한이다. 재무 데이터처럼 종목별 수 KB~수 MB인 경우에는 문제가 되지 않지만, 대용량 원문 파일을 통째로 올리기에는 부적합하다.


증분 업데이트 vs 전체 갱신: 언제 무엇을 선택하나

초기 전수 수집이 끝나면, 이후에는 변경분만 가져와야 효율적이다. 하지만 증분 업데이트만으로는 데이터 정합성을 보장할 수 없는 경우도 있다.

증분 업데이트 vs 전체 갱신 판단 흐름

mtime 비교 방식

dartlab의 downloadAll(forceUpdate=True) 함수는 로컬 파일의 수정 시각(mtime)과 GitHub Release 에셋의 updated_at 타임스탬프를 비교한다. 로컬이 더 오래됐으면 재다운로드한다.

import os
from datetime import datetime, timezone
from pathlib import Path

def needs_update(local_path: Path, remote_updated_at: str) -> bool:
    """로컬 파일이 원격보다 오래됐는지 판단"""
    if not local_path.exists():
        return True  # 로컬에 없으면 무조건 다운로드

    local_mtime = datetime.fromtimestamp(
        os.path.getmtime(local_path), tz=timezone.utc
    )
    remote_time = datetime.fromisoformat(
        remote_updated_at.replace("Z", "+00:00")
    )
    return local_mtime < remote_time

이 방식은 대부분의 경우 충분하다. 하지만 다음 상황에서는 전체 갱신이 필요하다.

전체 갱신이 필요한 경우

상황이유대응
매핑 로직 변경과거 파일의 계정 매핑이 달라짐전체 재파싱 + 재업로드
스키마 변경parquet 컬럼 구조가 바뀜전체 재생성
원본 데이터 소급 수정DART/EDGAR가 과거 filing을 수정변경 감지 어려움, 주기적 전체 갱신 필요
shard 재분배종목코드 범위가 바뀜전체 재업로드

실전에서는 일상적으로 증분, 분기마다 전체 갱신이 현실적인 전략이다. 증분으로 놓친 변경이 있어도 분기 전체 갱신에서 복구된다.

forceUpdate 플래그

def download_all(
    data_dir: Path,
    force_update: bool = False,
) -> dict:
    """전체 데이터 다운로드 — 증분 또는 전체 갱신"""
    assets = list_release_assets()  # GitHub API로 에셋 목록 조회
    downloaded = 0
    skipped = 0

    for asset in assets:
        local_path = data_dir / asset["name"]

        if not force_update and not needs_update(local_path, asset["updated_at"]):
            skipped += 1
            continue

        content = download_asset(asset["url"])
        if content and validate_parquet_bytes(content):
            local_path.write_bytes(content)
            downloaded += 1

    return {"downloaded": downloaded, "skipped": skipped}

force_update=False(기본값)이면 mtime 비교로 필요한 것만 받고, True이면 전부 다시 받는다. 매핑이나 스키마를 바꿨을 때 True로 돌리면 된다.


병렬 처리 전략: async vs threading vs 순차

“병렬이 빠르니까 무조건 병렬”은 대량 수집에서 위험한 생각이다. rate limit이 있는 API에서 병렬 요청은 한도를 순식간에 소진시킨다.

순차가 맞는 경우: DART 일일 수집

OpenDART의 병목은 초당 속도가 아니라 일일 총량이다. 10개를 동시에 보내든 1개씩 보내든 하루 10,000건에서 멈춘다. 병렬로 보내면 할당량을 10배 빨리 소진할 뿐이고, 중간 에러 추적도 어려워진다.

def collect_sequential(
    stock_codes: list[str],
    checkpoint: dict,
    delay: float = 0.2,
) -> None:
    """순차 수집 — DART에 적합"""
    completed = set(checkpoint.get("completed", []))
    remaining = [c for c in stock_codes if c not in completed]

    with httpx.Client(timeout=30.0) as client:
        for code in remaining:
            result = fetch_dart_with_retry(client, code)
            if result is not None:
                save_parquet(code, result)
                completed.add(code)
                update_checkpoint(checkpoint, completed)
            time.sleep(delay)

병렬이 맞는 경우: EDGAR 파일 다운로드

EDGAR는 일일 제한이 없고 초당 10건만 지키면 된다. 대용량 filing 파일은 다운로드에 수 초가 걸리므로, 순차로 처리하면 네트워크 대기 시간이 전부 직렬화된다. asyncio.Semaphore로 동시 요청 수를 제한하면 rate limit 안에서 처리량을 극대화할 수 있다.

import asyncio
import httpx

async def collect_parallel(
    urls: list[str],
    max_concurrent: int = 8,
    delay: float = 0.12,
) -> list[bytes]:
    """병렬 수집 — EDGAR에 적합 (초당 ~8건 유지)"""
    semaphore = asyncio.Semaphore(max_concurrent)
    results = []

    async def fetch_one(url: str) -> bytes | None:
        async with semaphore:
            async with httpx.AsyncClient(headers=EDGAR_HEADERS) as client:
                result = await fetch_edgar(client, url)
                await asyncio.sleep(delay)
                return result

    tasks = [fetch_one(url) for url in urls]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    return [r for r in results if isinstance(r, bytes)]

max_concurrent=8delay=0.12면 초당 약 8건이다. EDGAR의 10 req/sec 한도에서 20% 안전 마진을 둔 수치다.

세션 재사용과 연결 풀링

2,700건을 순차로 가져오면 HTTP 연결을 2,700번 열고 닫는다. TCP 핸드셰이크와 TLS 협상이 매번 반복되면서 요청당 수백 밀리초가 낭비된다.

httpx.Client(동기)나 httpx.AsyncClient(비동기)로 연결 풀을 유지하면 이 오버헤드가 사라진다. 한 세션으로 전체 수집을 돌리면 네트워크 레벨에서 20~30% 빨라진다. HTTPS가 기본인 OpenDART와 EDGAR에서 TLS 재사용 효과가 특히 크다.

with httpx.Client(
    timeout=httpx.Timeout(30.0, connect=10.0),
    limits=httpx.Limits(max_connections=10, max_keepalive_connections=5),
) as client:
    for code in stock_codes:
        # 같은 client 인스턴스로 모든 요청
        response = client.get(url, params=params)

비교 체크리스트: 병렬 vs 순차

기준순차 수집async 병렬 수집
적합 대상일일 총량 병목 (DART)초당 속도 병목 (EDGAR)
구현 복잡도낮음중간 (Semaphore, 에러 수집)
디버깅쉬움 (순서 보장)어려움 (비동기 스택 트레이스)
rate limit 관리time.sleep() 한 줄Semaphore + per-request delay
체크포인트동기적 즉시 저장비동기 콜백 또는 완료 후 일괄
처리량 (EDGAR 기준)~2건/초 (네트워크 대기 포함)~8건/초
처리량 (DART 기준)~5건/초 (API 응답 빠름)같음 (총량 병목이므로 차이 없음)

체크포인트와 중단 복구

2,743개 종목을 수집하다가 중간에 멈추면 어떻게 될까? 체크포인트가 없으면 처음부터 다시 시작해야 한다. 이미 소비한 API 호출 횟수는 돌아오지 않는다.

건별 즉시 기록

체크포인트의 핵심은 건별 즉시 기록이다. 100건 모아서 한 번에 쓰면 99건째에서 중단됐을 때 100건을 다시 해야 한다.

import json
from pathlib import Path

CHECKPOINT_PATH = Path("checkpoint.json")

def load_checkpoint() -> dict:
    if CHECKPOINT_PATH.exists():
        return json.loads(CHECKPOINT_PATH.read_text(encoding="utf-8"))
    return {
        "completed": [],
        "failed": [],
        "api_calls_today": 0,
        "last_date": None,
    }

def update_checkpoint(checkpoint: dict, completed: set) -> None:
    """원자적 체크포인트 갱신"""
    from datetime import datetime

    checkpoint["completed"] = sorted(completed)
    checkpoint["last_updated"] = datetime.now().isoformat()

    # tmp 파일에 쓰고 rename — 쓰는 도중 중단돼도 원본이 안 깨진다
    tmp_path = CHECKPOINT_PATH.with_suffix(".tmp")
    tmp_path.write_text(
        json.dumps(checkpoint, ensure_ascii=False, indent=2),
        encoding="utf-8",
    )
    tmp_path.rename(CHECKPOINT_PATH)

원자적 쓰기(atomic write)가 중요하다. checkpoint.json에 직접 쓰다가 프로세스가 죽으면 JSON이 깨진다. tmp 파일에 완전히 쓴 다음 rename하면 파일시스템 레벨에서 원자적으로 교체된다.

실패 건 분류

실패한 종목은 원인별로 분류해야 한다. “일일 한도 초과”로 실패한 건은 다음 날 재시도하면 되지만, “존재하지 않는 종목코드”로 실패한 건은 재시도해도 소용없다.

def categorize_failure(code: str, status: str, checkpoint: dict) -> None:
    """실패 건을 원인별로 분류"""
    if status == "020":  # DART 일일 한도
        checkpoint.setdefault("quota_exceeded", []).append(code)
    elif status == "013":  # 데이터 없음
        checkpoint.setdefault("no_data", []).append(code)
    else:  # 네트워크 오류 등
        checkpoint.setdefault("transient_failure", []).append(code)

quota_exceeded는 다음 날 자동 재시도, no_data는 재시도 대상에서 제외, transient_failure는 즉시 또는 잠시 후 재시도 대상이다.


저장 포맷: 왜 Parquet인가

수집한 재무 데이터를 어떤 형식으로 저장할지는 이후 분석 성능에 직결된다. dartlab은 Parquet을 사용한다.

Parquet의 장점

특성ParquetCSVJSON
압축률높음 (열 단위 압축)없음없음
파일 크기 (동일 데이터)1x5~10x8~15x
읽기 속도빠름 (필요한 컬럼만)느림 (전체 파싱)매우 느림
타입 보존정수, 실수, 문자열, 날짜 등 원본 그대로전부 문자열제한적
스키마 내장포함없음 (첫 행 추론)없음
부분 읽기가능 (row group + 컬럼 필터)불가 (전체 로드)불가

재무 데이터는 컬럼이 많고(계정 ID, 계정명, 재무제표 구분, 사업연도, 보고서 구분, 금액 등), 같은 종목의 여러 기간 데이터가 반복 구조를 가진다. Parquet의 열 단위 압축이 이 패턴에서 높은 압축률을 보인다.

종목별 분리 저장

dartlab은 종목코드별로 개별 parquet 파일을 유지한다. 하나의 거대한 파일 대신 종목별로 분리하면 다음 이점이 있다.

  1. 증분 업데이트: 변경된 종목 파일만 교체하면 된다.
  2. 병렬 접근: 여러 종목을 동시에 읽어도 파일 락이 없다.
  3. 장애 격리: 한 파일이 깨져도 다른 종목에 영향이 없다.
  4. 배포 효율: GitHub Release 에셋 단위로 개별 업로드/다운로드 가능.
import polars as pl
from pathlib import Path

DATA_DIR = Path("data/finance")

def save_parquet(stock_code: str, data: dict) -> None:
    """종목별 parquet 저장 — 기존 데이터와 병합"""
    file_path = DATA_DIR / f"{stock_code}.parquet"
    new_df = pl.DataFrame(data["list"])

    if file_path.exists():
        existing = pl.read_parquet(file_path)
        merged = pl.concat([existing, new_df]).unique(
            subset=["bsns_year", "reprt_code", "sj_div", "account_id"]
        )
    else:
        merged = new_df

    merged.write_parquet(file_path)

저장 후에는 반드시 read-back 검증을 한다. 디스크 공간 부족이나 권한 문제로 파일이 깨질 수 있다.

def verify_parquet(file_path: Path) -> bool:
    """parquet 무결성 — read-back 테스트"""
    try:
        df = pl.read_parquet(file_path)
        return len(df) > 0
    except (pl.exceptions.ComputeError, FileNotFoundError, OSError):
        return False

실전 파이프라인 아키텍처: download → validate → store → index

지금까지 다룬 개별 패턴을 하나의 파이프라인으로 조합하면 다음과 같은 구조가 된다.

1. Download    API 호출 + 재시도 + rate limit 준수

2. Validate    응답 검증 (스키마, 필수 필드, 데이터 타입)

3. Store       종목별 parquet 저장 + read-back 검증

4. Index       메타데이터 갱신 (종목 목록, 수집 날짜, 파일 크기)

각 단계가 분리되어 있으므로, 한 단계에서 실패해도 이전 단계의 결과가 보존된다.

일일 수집 파이프라인 전체 코드

import time
from datetime import date
from pathlib import Path

def run_daily_pipeline(
    stock_codes: list[str],
    data_dir: Path,
) -> dict:
    """일일 수집 파이프라인 — download → validate → store → index"""
    checkpoint = load_checkpoint()
    quota = QuotaTracker(daily_limit=10_000, buffer=500)

    # 오늘 이미 쓴 할당량 반영
    if checkpoint.get("last_date") == str(date.today()):
        quota.calls_today = checkpoint.get("api_calls_today", 0)

    completed = set(checkpoint.get("completed", []))

    with httpx.Client(timeout=30.0) as client:
        for code in stock_codes:
            if code in completed:
                continue
            if not quota.can_call():
                break  # 할당량 소진 — 내일 이어받기

            # 1. Download
            result = with_retry(fetch_dart, client, code)
            quota.record_call()

            if result is None:
                categorize_failure(code, "network", checkpoint)
                continue

            # 2. Validate
            if not validate_response(result):
                categorize_failure(code, "invalid", checkpoint)
                continue

            # 3. Store
            save_parquet(code, result)
            if not verify_parquet(data_dir / f"{code}.parquet"):
                categorize_failure(code, "storage", checkpoint)
                continue

            # 4. Index (체크포인트 = 인덱스)
            completed.add(code)
            checkpoint["completed"] = sorted(completed)
            checkpoint["api_calls_today"] = quota.calls_today
            checkpoint["last_date"] = str(date.today())
            update_checkpoint(checkpoint, completed)

            time.sleep(0.2)

    return {
        "completed": len(completed),
        "remaining": len(stock_codes) - len(completed),
        "quota_remaining": quota.remaining,
    }

이 파이프라인의 특징은 세 가지다.

  1. 건별 체크포인트 — 1건 처리마다 진행 상황을 저장한다. 어디서 중단돼도 이어받는다.
  2. 할당량 인식 — 남은 호출 횟수를 추적해서 한도 전에 스스로 멈춘다. 다음 날 자동으로 이어진다.
  3. 4단계 분리 — download, validate, store, index 실패를 구분해서 원인별로 다른 재시도 전략을 적용한다.

할당량 추적 클래스

from datetime import date

class QuotaTracker:
    """DART API 일일 할당량 추적"""

    def __init__(self, daily_limit: int = 10_000, buffer: int = 500):
        self.daily_limit = daily_limit
        self.buffer = buffer
        self.calls_today = 0
        self.current_date = date.today()

    def can_call(self) -> bool:
        if date.today() != self.current_date:
            self.calls_today = 0
            self.current_date = date.today()
        return self.calls_today < (self.daily_limit - self.buffer)

    def record_call(self) -> None:
        self.calls_today += 1

    @property
    def remaining(self) -> int:
        return max(0, self.daily_limit - self.buffer - self.calls_today)

buffer=500은 안전 마진이다. 카운트가 정확하지 않을 수 있고, 다른 프로세스가 같은 key를 쓸 수도 있으므로 9,500에서 멈추고 다음 날 이어받는 것이 안전하다.


FAQ

OpenDART 미등록 key와 등록 key의 차이는 무엇인가요?

미등록 key(공시정보활용마당에 앱 등록을 하지 않은 상태)는 일일 1,000건으로 제한된다. 앱 등록을 하면 10,000건으로 늘어난다. 앱 등록은 공시정보활용마당에서 인증 후 무료로 할 수 있다. 대량 수집을 계획한다면 등록 key는 필수다.

EDGAR에서 IP가 차단되면 어떻게 해야 하나요?

SEC 웹마스터(webmaster@sec.gov)에 이메일로 차단 해제를 요청해야 한다. IP 주소, 사용한 User-Agent, 사용 목적을 설명하고, 향후 10 req/sec 제한을 준수하겠다는 약속을 포함하면 보통 1~3 영업일 내에 해제된다. 차단 중에는 해당 IP에서 EDGAR 전체 접근이 불가능하므로, VPN이나 프록시로 우회하는 것은 추가 차단 사유가 될 수 있어 권장하지 않는다.

Semaphore 대신 토큰 버킷을 써야 하나요?

Semaphore는 “동시에 N개”를 제한하지만, “초당 정확히 N건”을 보장하지는 않는다. 정밀한 속도 제어가 필요하면 토큰 버킷(token bucket)이 더 적합하다. 하지만 실전에서는 Semaphore(8) + sleep(0.12)만으로도 EDGAR의 10 req/sec을 안정적으로 지킬 수 있다. 토큰 버킷은 동시 요청이 수백 개 이상인 대규모 시스템에서 의미가 있고, 종목 수천 개 수준에서는 과잉 설계다.

증분 업데이트만으로 데이터 정합성이 보장되나요?

보장되지 않는다. XBRL 파싱과 매핑에서 다뤘듯이 매핑 로직이 바뀌면 과거에 매핑한 결과도 달라진다. 또한 DART/EDGAR가 과거 filing을 소급 수정하는 경우도 드물지만 있다. 일상적으로 증분 업데이트를 하되, 분기마다 한 번씩 전체 갱신(forceUpdate=True)을 돌리는 것이 현실적인 전략이다.

수집한 데이터의 품질을 어떻게 검증하나요?

데이터 품질 검증 글에서 상세히 다루겠지만, 최소한 다음 세 가지는 수집 직후에 확인해야 한다. (1) parquet read-back이 성공하는지, (2) 필수 컬럼(bsns_year, sj_div, account_id, 금액 컬럼)이 있는지, (3) 행 수가 0이 아닌지. 이 세 가지만 통과해도 깨진 파일이 파이프라인을 오염시키는 것을 막을 수 있다.


비교 체크리스트

대량 수집 파이프라인을 설계할 때 점검해야 할 항목을 정리한다.

점검 항목DART 수집기EDGAR 수집기
rate limit 인식일일 10,000건 카운터초당 10건 Semaphore
에러 감지JSON status 필드 파싱HTTP status code
재시도 전략에러코드별 분기 (“020”이면 중단)지수 백오프 + Retry-After
인증API key (쿼리 파라미터)User-Agent 헤더
병렬 여부순차 (총량 병목)async 병렬 (속도 병목)
체크포인트건별 즉시 JSON 저장건별 즉시 JSON 저장
저장 형식종목별 parquetfiling별 파일 또는 종목별 parquet
증분 기준마지막 수집일 이후 공시 목록분기별 FULL-INDEX 신규 filing
배포 방식GitHub Release shard (4개)GitHub Release 또는 직접 서빙
무결성 검증parquet read-back + 행 수Content-Length 비교 + 파일 파싱

출처


한 줄 정리

대량 수집은 “빠르게”가 아니라 “안전하게, 이어서”가 본질이다. rate limit을 인식하고, 실패를 분류하고, 건별 체크포인트를 기록하면 며칠에 걸쳐서라도 수천 종목 데이터를 빠짐없이 수집할 수 있다. 이 파이프라인이 있어야 XBRL 매핑이나 sections 수평화가 비로소 동작한다.

같은 시리즈에서 이어 읽기
DartLab

같은 카테고리에서 더 읽기

DartLab은 데이터 자동화 카테고리 안에서 글이 서로 이어지도록 설계합니다. 다음 글로 넘어가며 구조와 맥락을 같이 쌓는 방식입니다.

DartLab Product

이 글의 판단을 실제 데이터 흐름으로 옮기기

DartLab은 전자공시를 읽는 법을 코드와 데이터로 연결하기 위해 만든 제품입니다. 사업보고서 텍스트, 재무 시계열, 정기보고서 데이터를 한 흐름에서 다루도록 설계했습니다.