Source code for scripts.merge_entities
"""Merge the 8 ENTITIES_part_NN.json files into a consolidated ENTITIES.md.
Deduplicates entities by (name_en, name_fr) and aggregates source_acts.
Groups by category, sorts by frequency.
"""
from __future__ import annotations
import glob
import json
import os
from collections import defaultdict
CHUNKS_DIR = "scripts/chunks"
OUTPUT = "ENTITIES.md"
CATEGORY_LABELS = {
"financial_institution": "Financial Institutions",
"fund": "Funds & Collective Investment",
"market_infrastructure": "Market Infrastructure",
"investor": "Investors & Clients",
"issuer": "Issuers & Listed Entities",
"authority": "Authorities & Regulators",
"service_provider": "Service Providers",
"employee_role": "Roles & Personnel",
"other": "Other",
}
[docs]
def normalize(s: str | None) -> str:
"""Normalize entity labels for case-insensitive deduplication."""
return (s or "").strip().lower()
[docs]
def make_key(entity: dict) -> str:
"""Dedup key: prefer EN name, fallback to FR."""
en = normalize(entity.get("name_en"))
fr = normalize(entity.get("name_fr"))
return en or fr
[docs]
def main():
"""Merge partial entity inventories into a consolidated Markdown report."""
files = sorted(glob.glob(os.path.join(CHUNKS_DIR, "ENTITIES_part_*.json")))
if not files:
print(f"No ENTITIES_part_*.json files found in {CHUNKS_DIR}/")
return
print(f"Merging {len(files)} chunk files…")
# key -> merged entity
merged: dict[str, dict] = {}
for path in files:
with open(path, encoding="utf-8") as f:
data = json.load(f)
for ent in data.get("entities", []):
key = make_key(ent)
if not key:
continue
if key not in merged:
merged[key] = {
"name_en": ent.get("name_en"),
"name_fr": ent.get("name_fr"),
"definition": ent.get("definition", ""),
"category": ent.get("category", "other"),
"source_acts": set(ent.get("source_acts", [])),
}
else:
m = merged[key]
# Prefer non-null names
if not m["name_en"] and ent.get("name_en"):
m["name_en"] = ent["name_en"]
if not m["name_fr"] and ent.get("name_fr"):
m["name_fr"] = ent["name_fr"]
# Prefer longer definition
new_def = ent.get("definition", "")
if len(new_def) > len(m["definition"]):
m["definition"] = new_def
m["source_acts"].update(ent.get("source_acts", []))
# Group by category, sort by frequency (source_acts count)
by_cat: dict[str, list[dict]] = defaultdict(list)
for ent in merged.values():
ent["occurrences"] = len(ent["source_acts"])
ent["source_acts"] = sorted(ent["source_acts"])
by_cat[ent["category"]].append(ent)
for cat, lst in by_cat.items():
lst.sort(key=lambda e: -e["occurrences"])
# Write markdown
with open(OUTPUT, "w", encoding="utf-8") as f:
f.write("# Entity Types — EUR-Lex Scope Analysis\n\n")
f.write(f"Consolidated from {len(files)} sub-agent outputs. Total unique entity types: **{len(merged)}**\n\n")
f.write(
"Sorted by category, then by frequency (number of distinct "
"EUR-Lex acts where the entity type appears).\n\n---\n\n"
)
# Order categories by total occurrences
cat_order = sorted(
by_cat.keys(),
key=lambda c: -sum(e["occurrences"] for e in by_cat[c]),
)
for cat in cat_order:
label = CATEGORY_LABELS.get(cat, cat.title())
entities = by_cat[cat]
f.write(f"## {label} ({len(entities)})\n\n")
f.write("| # | EN | FR | Definition | Occ. | Example acts |\n")
f.write("|---|----|----|-----------|------|--------------|\n")
for i, ent in enumerate(entities, 1):
en = ent["name_en"] or "—"
fr = ent["name_fr"] or "—"
defn = (ent["definition"] or "—").replace("\n", " ").replace("|", "\\|")
if len(defn) > 150:
defn = defn[:150] + "…"
occ = ent["occurrences"]
examples = ", ".join(ent["source_acts"][:3])
f.write(f"| {i} | {en} | {fr} | {defn} | {occ} | {examples} |\n")
f.write("\n")
print(f"Wrote {OUTPUT} with {len(merged)} unique entity types across {len(by_cat)} categories.")
if __name__ == "__main__":
main()