Source code for scripts.extract_scope

"""Fast extraction of scope articles (Article 1-3) from EUR-Lex acts.

Strategy: single bulk SELECT joining acts + article subdivisions + their subtrees.
Caps per-article content at ~2500 chars (scope info is always in first paragraphs).
"""

from __future__ import annotations

import os
import sys
import time

import psycopg2
from psycopg2.extras import RealDictCursor

DATABASE_URL = os.environ.get("DATABASE_URL", "postgres://dev:dev@127.0.0.1:5432/document")
OUTPUT = os.environ.get("SCOPE_OUTPUT", "SCOPE.md")
MAX_ARTICLE_CHARS = 2500


BULK_QUERY = """
WITH scope_articles AS (
    SELECT DISTINCT ON (s.act_id, s.number)
        s.id, s.act_id, s.number, s.hierarchy_path
    FROM subdivisions s
    JOIN acts a ON a.id = s.act_id
    WHERE a.celex NOT LIKE 'LEGI%%'
      AND s.subdivision_type = 'article'
      AND s.number IN ('1', '2', '3')
    ORDER BY s.act_id, s.number, LENGTH(COALESCE(s.content, '')) DESC, s.id
)
SELECT
    a.celex, a.title AS act_title, a.language, a.url_eurlex,
    sa.number AS article_num,
    sa.hierarchy_path AS article_path,
    s.hierarchy_path AS sub_path,
    s.content AS sub_content,
    s.title AS sub_title
FROM scope_articles sa
JOIN acts a ON a.id = sa.act_id
JOIN subdivisions s ON s.act_id = sa.act_id
    AND (s.hierarchy_path = sa.hierarchy_path
         OR s.hierarchy_path LIKE sa.hierarchy_path || '/%%')
ORDER BY a.celex, sa.number, s.hierarchy_path;
"""


[docs] def main(): """Extract the first scope articles for each act into a Markdown digest.""" t0 = time.time() conn = psycopg2.connect(DATABASE_URL) cur = conn.cursor(name="scope_bulk", cursor_factory=RealDictCursor) # server-side cur.itersize = 5000 print("[extract_scope] running bulk query…", file=sys.stderr) cur.execute(BULK_QUERY) print(f"[extract_scope] query done in {time.time() - t0:.1f}s, streaming rows", file=sys.stderr) # Group by (celex, article_num) in streaming fashion (ORDER BY guarantees locality) acts_written = 0 articles_written = 0 cur_celex = None cur_article_num = None cur_article_buf: list[str] = [] cur_article_chars = 0 cur_act_meta: dict = {} with open(OUTPUT, "w", encoding="utf-8") as f: f.write("# EUR-Lex Scope Articles\n\n") f.write( f"Generated: {time.strftime('%Y-%m-%d %H:%M:%S')}\n" f"Source: EUR-Lex acts, Article 1-3 with direct subtree (capped)\n\n---\n\n" ) def flush_article(): nonlocal articles_written, cur_article_buf, cur_article_chars if cur_article_num is None or not cur_article_buf: cur_article_buf = [] cur_article_chars = 0 return f.write(f"### Article {cur_article_num}\n\n") f.write("".join(cur_article_buf).rstrip() + "\n\n") articles_written += 1 cur_article_buf = [] cur_article_chars = 0 def flush_act(): nonlocal acts_written flush_article() if cur_act_meta: f.write("---\n\n") acts_written += 1 def open_act(meta): f.write(f"## {meta['celex']}{meta['act_title'][:200]}\n\n") f.write(f"**Language:** {meta['language']}\n") if meta.get("url_eurlex"): f.write(f"**URL:** {meta['url_eurlex']}\n") f.write("\n") n = 0 for row in cur: n += 1 if n % 20000 == 0: print( f"[extract_scope] {n} rows, {acts_written} acts, " f"{articles_written} articles, {time.time() - t0:.0f}s", file=sys.stderr, ) celex = row["celex"] article_num = row["article_num"] # Act transition if celex != cur_celex: flush_act() cur_act_meta = { "celex": celex, "act_title": row["act_title"], "language": row["language"], "url_eurlex": row["url_eurlex"], } open_act(cur_act_meta) cur_celex = celex cur_article_num = None # Article transition if article_num != cur_article_num: flush_article() cur_article_num = article_num # Skip if we've already hit the cap for this article if cur_article_chars >= MAX_ARTICLE_CHARS: continue content = (row["sub_content"] or "").strip() title = (row["sub_title"] or "").strip() if not content and not title: continue # Compute indentation from depth relative to the article root rel_depth = max( 0, row["sub_path"].count("/") - row["article_path"].count("/"), ) prefix = " " * rel_depth piece = [] if title and title not in content[:200]: piece.append(f"{prefix}**{title}**\n") if content: # Truncate long single content remaining = MAX_ARTICLE_CHARS - cur_article_chars if len(content) > remaining: content = content[: max(0, remaining - 20)] + "…" for line in content.split("\n"): piece.append(f"{prefix}{line}\n") piece.append("\n") blob = "".join(piece) cur_article_buf.append(blob) cur_article_chars += len(blob) flush_act() elapsed = time.time() - t0 print( f"[extract_scope] DONE: {acts_written} acts, {articles_written} articles in {elapsed:.0f}s → {OUTPUT}", file=sys.stderr, )
if __name__ == "__main__": main()