Source code for finamt.utils

"""
finamt.utils
~~~~~~~~~~~~~~~
Heuristic rule-based extraction utilities used as a fallback when the LLM
is unavailable or returns incomplete data.

These functions are intentionally simple and conservative — they prefer
returning ``None`` over returning plausibly wrong values.
"""

from __future__ import annotations

import json
import logging
import re
from datetime import datetime
from decimal import Decimal, InvalidOperation
from typing import Any

# Category keywords aligned with RECEIPT_CATEGORIES in prompts.py.
# The LLM and the rule-based fallback must agree on category names.

logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())

# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------

# Keyword → receipt category mapping (German terms only — English handled by LLM)
_CATEGORY_KEYWORDS: dict[str, list[str]] = {
    "material": ["papier", "rohstoff", "verbrauch", "büromaterial", "druckerpapier"],
    "equipment": [
        "gerät",
        "drucker",
        "monitor",
        "tastatur",
        "maus",
        "server",
        "hardware",
        "maschine",
    ],
    "software": ["software", "lizenz", "abo", "subscription", "app", "cloud", "saas"],
    "internet": ["internet", "dsl", "glasfaser", "breitband", "hosting", "domain"],
    "telecommunication": [
        "telefon",
        "handy",
        "mobilfunk",
        "sim",
        "telekom",
        "vodafone",
        "o2",
        "mobilität",
    ],
    "travel": ["hotel", "flug", "bahn", "taxi", "mietwagen", "reise", "übernachtung"],
    "education": ["kurs", "seminar", "buch", "schulung", "weiterbildung", "studium", "zertifikat"],
    "utilities": ["strom", "gas", "wasser", "heizung", "nebenkosten", "entsorgung"],
    "insurance": ["versicherung", "haftpflicht", "police", "prämie"],
    "taxes": ["steuer", "finanzamt", "steuerberater", "gebühr", "abgabe"],
    "public_fees": [
        "rundfunkbeitrag",
        "ard",
        "zdf",
        "gez",
        "ihk",
        "hwk",
        "berufsgenossenschaft",
        "pflichtbeitrag",
        "kammerbeitrag",
    ],
}

# Keywords that anchor a line as the grand total (checked before max() fallback)
_TOTAL_KEYWORDS = [
    "gesamt",
    "gesamtbetrag",
    "total",
    "summe",
    "endbetrag",
    "brutto",
    "rechnungsbetrag",
]

# German month names → month number
_MONTH_MAP: dict[str, int] = {
    "januar": 1,
    "january": 1,
    "februar": 2,
    "february": 2,
    "märz": 3,
    "marz": 3,
    "march": 3,
    "april": 4,
    "mai": 5,
    "may": 5,
    "juni": 6,
    "june": 6,
    "juli": 7,
    "july": 7,
    "august": 8,
    "september": 9,
    "oktober": 10,
    "october": 10,
    "november": 11,
    "dezember": 12,
    "december": 12,
}

# Date regexes in (pattern, order) pairs.
# order: "dmy" | "ymd" so the unpacking is unambiguous.
_DATE_PATTERNS: list[tuple[str, str]] = [
    (r"\b(\d{1,2})\.(\d{1,2})\.(\d{4})\b", "dmy"),  # DD.MM.YYYY
    (r"\b(\d{1,2})\.(\d{1,2})\.(\d{2})\b", "dmy"),  # DD.MM.YY
    (r"\b(\d{4})-(\d{2})-(\d{2})\b", "ymd"),  # YYYY-MM-DD  ← fixed order
    (r"\b(\d{1,2})/(\d{1,2})/(\d{4})\b", "dmy"),  # DD/MM/YYYY
    (r"\b(\d{1,2})\s+([A-Za-zÄÖÜäöü]+)\s+(\d{4})\b", "dmy"),  # 12 Januar 2023
]

# Amount regexes — German locale (period = thousands sep, comma = decimal)
_AMOUNT_PATTERNS = [
    r"(\d{1,3}(?:\.\d{3})*,\d{2})\s*€",
    r"€\s*(\d{1,3}(?:\.\d{3})*,\d{2})",
    r"EUR\s*(\d{1,3}(?:\.\d{3})*,\d{2})",
    r"(\d{1,3}(?:\.\d{3})*,\d{2})\s*EUR",
]

# VAT line regexes
_VAT_PATTERNS = [
    r"(\d{1,2}(?:,\d{1,2})?)\s*%.*?(\d{1,3}(?:\.\d{3})*,\d{2})\s*€",
    r"MwSt\.?\s*(\d{1,2}(?:,\d{1,2})?)\s*%.*?(\d{1,3}(?:\.\d{3})*,\d{2})",
    r"VAT\s*(\d{1,2}(?:,\d{1,2})?)\s*%.*?(\d{1,3}(?:\.\d{3})*,\d{2})",
]

# Item line regexes — ordered most-specific first so the quantity×price
# pattern is tried before the generic description+price fallback.
_ITEM_PATTERNS = [
    r"^(\d+(?:,\d+)?)\s*[xX]\s*(.+?)\s+(\d{1,3}(?:\.\d{3})*,\d{2})\s*€?\s*$",
    r"^(.+?)\s*@\s*(\d{1,3}(?:\.\d{3})*,\d{2})\s*=\s*(\d{1,3}(?:\.\d{3})*,\d{2})\s*€?\s*$",
    r"^(.+?)\s+(\d{1,3}(?:\.\d{3})*,\d{2})\s*€?\s*$",
]

# Lines that look like receipt boilerplate rather than company names
_SKIP_HEADER_WORDS = frozenset(
    [
        "receipt",
        "rechnung",
        "kassenbon",
        "beleg",
        "quittung",
        "datum",
        "uhrzeit",
        "kasse",
        "bon",
    ]
)


# ---------------------------------------------------------------------------
# Helper: German amount string → Decimal
# ---------------------------------------------------------------------------


def _parse_german_amount(s: str) -> Decimal | None:
    """Convert a German-format amount string (e.g. '1.234,56') to Decimal."""
    try:
        return Decimal(s.replace(".", "").replace(",", "."))
    except (InvalidOperation, ValueError):
        return None


# ---------------------------------------------------------------------------
# DataExtractor
# ---------------------------------------------------------------------------


[docs] class DataExtractor: """ Heuristic text extraction for receipts. All methods are static — instantiate the class or call methods directly. """ # ------------------------------------------------------------------ # Company / vendor # ------------------------------------------------------------------
[docs] @staticmethod def extract_company_name(text: str) -> str | None: """ Return the first non-trivial line from the top of the receipt. Skips blank lines, lines that start with a digit (dates, amounts), and lines containing common boilerplate words. """ for line in text.splitlines()[:8]: line = line.strip() if not line: continue if re.match(r"^\d", line): continue if len(line) < 3: continue if any(w in line.lower() for w in _SKIP_HEADER_WORDS): continue return line return None
# ------------------------------------------------------------------ # Date # ------------------------------------------------------------------
[docs] @staticmethod def extract_date(text: str) -> datetime | None: """ Return the first parseable date found in the text. Handles DD.MM.YYYY, YYYY-MM-DD, DD/MM/YYYY and German month names. Two-digit years are interpreted as 2000+ if < 50, else 1900+. """ for pattern, order in _DATE_PATTERNS: for groups in re.findall(pattern, text): try: a, b, c = groups if order == "ymd": year, month_raw, day = a, b, c else: day, month_raw, year = a, b, c # Resolve year year = int(year) if year < 100: year = 2000 + year if year < 50 else 1900 + year # Resolve month (numeric or German/English name) if month_raw.isdigit(): month = int(month_raw) else: month = _MONTH_MAP.get(month_raw.lower()) if month is None: continue return datetime(year, month, int(day)) except (ValueError, TypeError): continue return None
# ------------------------------------------------------------------ # Amounts # ------------------------------------------------------------------
[docs] @staticmethod def extract_amounts(text: str) -> dict[str, Any]: """ Extract monetary amounts from text. Strategy: 1. Scan lines that contain a total-indicating keyword; use the amount on that line as the grand total. 2. Fall back to the largest amount found in the document. Returns ``{"total": Decimal | None, "all": [Decimal, ...]}``. """ all_amounts: list[Decimal] = [] total_amount: Decimal | None = None for line in text.splitlines(): line_lower = line.lower() is_total_line = any(kw in line_lower for kw in _TOTAL_KEYWORDS) for pattern in _AMOUNT_PATTERNS: for match in re.findall(pattern, line): amount = _parse_german_amount(match) if amount is None: continue all_amounts.append(amount) # Prefer a total-keyword-anchored amount if is_total_line and total_amount is None: total_amount = amount if total_amount is None and all_amounts: total_amount = max(all_amounts) # last-resort fallback return {"total": total_amount, "all": all_amounts}
# ------------------------------------------------------------------ # VAT # ------------------------------------------------------------------
[docs] @staticmethod def extract_vat_info(text: str) -> dict[str, Decimal | None]: """Extract the first VAT percentage + absolute amount found.""" for pattern in _VAT_PATTERNS: for match in re.findall(pattern, text, re.IGNORECASE): try: vat_pct = Decimal(match[0].replace(",", ".")) vat_amt = _parse_german_amount(match[1]) if vat_pct and vat_amt: return {"vat_percentage": vat_pct, "vat_amount": vat_amt} except (InvalidOperation, IndexError): continue return {"vat_percentage": None, "vat_amount": None}
# ------------------------------------------------------------------ # Line items # ------------------------------------------------------------------
[docs] @staticmethod def extract_items(text: str) -> list[dict[str, Any]]: """ Parse individual receipt line items. Returns a list of dicts with keys matching the LLM extraction schema so both paths feed ``_build_receipt_data`` identically. """ items: list[dict[str, Any]] = [] for line in text.splitlines(): line = line.strip() if not line: continue for pattern in _ITEM_PATTERNS: m = re.match(pattern, line) if not m: continue groups = m.groups() if len(groups) == 2: # description + price description = groups[0].strip() total_price = _parse_german_amount(groups[1]) if total_price is None: continue items.append( { "description": description, "quantity": None, "unit_price": None, "total_price": float(total_price), "category": DataExtractor._categorize_item(description), "vat_rate": None, } ) elif len(groups) == 3: # qty × description = price try: qty = Decimal(groups[0].replace(",", ".")) except InvalidOperation: continue description = groups[1].strip() total_price = _parse_german_amount(groups[2]) if total_price is None: continue unit_price = total_price / qty if qty > 0 else None items.append( { "description": description, "quantity": float(qty), "unit_price": float(unit_price) if unit_price else None, "total_price": float(total_price), "category": DataExtractor._categorize_item(description), "vat_rate": None, } ) break # matched a pattern — don't try the others return items
@staticmethod def _categorize_item(description: str) -> str: """ Map an item description to a receipt category. Returns a string that is always a valid ``ReceiptCategory`` value. """ lower = description.lower() for category, keywords in _CATEGORY_KEYWORDS.items(): if any(kw in lower for kw in keywords): return category return "other"
# --------------------------------------------------------------------------- # JSON cleaning # ---------------------------------------------------------------------------
[docs] def clean_json_response(response: str) -> str: """ Extract and sanitise a JSON object from an LLM response string. Handles: - Markdown code fences (```json … ```) - Trailing commas in objects and arrays - Unquoted keys — only attempted when the extracted candidate is not already valid JSON, to avoid corrupting URLs or colons inside strings Returns an empty JSON object ``{}`` on total failure so callers can always call ``json.loads()`` on the result. """ # Strip markdown fences response = re.sub(r"```(?:json)?\s*", "", response) response = re.sub(r"```\s*$", "", response, flags=re.MULTILINE) response = response.strip() # Remove trailing commas before } or ] response = re.sub(r",\s*([}\]])", r"\1", response) # Extract the outermost JSON object match = re.search(r"\{.*\}", response, re.DOTALL) if not match: logger.warning("No JSON object found in LLM response.") return "{}" candidate = match.group(0) # Try to parse as-is first — if it's already valid JSON, return immediately. # This prevents any regex from corrupting URLs or colons inside string values. try: json.loads(candidate) return candidate except json.JSONDecodeError: pass # Only reach here when the JSON is actually malformed. # Attempt to quote unquoted object keys. # Pattern matches a word that is: # - preceded by { or , (with optional whitespace) — i.e. in key position # - not already quoted # - followed by optional whitespace and a colon fixed = re.sub( r"([{,]\s*)([A-Za-z_]\w*)\s*:", lambda m: f'{m.group(1)}"{m.group(2)}":', candidate, ) try: json.loads(fixed) return fixed except json.JSONDecodeError as exc: logger.warning("Could not produce valid JSON after cleaning: %s", exc) return "{}"
# --------------------------------------------------------------------------- # Shared parse helpers (used by agent.py) # ---------------------------------------------------------------------------
[docs] def parse_decimal(value: Any) -> Decimal | None: """Safely coerce any value to ``Decimal``, returning ``None`` on failure.""" if value is None: return None try: return Decimal(str(value)) except (InvalidOperation, ValueError): return None
_DE_MONTH: dict[str, str] = { # abbreviated (3-letter Oracle/SAP style) "JAN": "01", "FEB": "02", "MRZ": "03", "MAR": "03", "APR": "04", "MAI": "05", "JUN": "06", "JUL": "07", "AUG": "08", "SEP": "09", "OKT": "10", "NOV": "11", "DEZ": "12", # full German names "JANUAR": "01", "FEBRUAR": "02", "MÄRZ": "03", "MAERZ": "03", "APRIL": "04", "JUNI": "06", "JULI": "07", "AUGUST": "08", "SEPTEMBER": "09", "OKTOBER": "10", "NOVEMBER": "11", "DEZEMBER": "12", } def _normalise_date_str(date_str: str) -> str: """Replace German month names/abbreviations with their two-digit number.""" import re def _replace(m: re.Match) -> str: return _DE_MONTH.get(m.group(0).upper(), m.group(0)) return re.sub(r"[A-ZÄÖÜa-zäöü]+", _replace, date_str)
[docs] def parse_date(date_str: str) -> datetime | None: """ Parse an ISO-format date string (``YYYY-MM-DD``) to ``datetime``. Also accepts common European formats as a fallback. Uses explicit format strings rather than ``%B``/``%b`` to avoid locale dependency. Handles English abbreviated months (JUL, AUG …) and German month names/abbreviations (OKT, MRZ, JANUAR, OKTOBER …). """ if not date_str: return None candidates = [date_str.strip()] normalised = _normalise_date_str(date_str.strip()) if normalised != candidates[0]: candidates.append(normalised) formats = [ "%Y-%m-%d", "%d.%m.%Y", "%d/%m/%Y", "%Y/%m/%d", "%d-%m-%Y", "%d-%b-%Y", # e.g. 30-JUL-2025 (English abbrev, locale-safe on CPython) "%d-%B-%Y", # e.g. 30-July-2025 ] for candidate in candidates: for fmt in formats: try: return datetime.strptime(candidate, fmt) except ValueError: continue # Last resort: delegate to DataExtractor which handles German month names return DataExtractor.extract_date(date_str)