Source code for finamt.agents.agent

"""
finamt.agents.agent
~~~~~~~~~~~~~~~
Main entry point for receipt processing.

Pipeline (see agents/pipeline.py for details):
  1. OCR text extraction (PaddleOCR)
  2. Duplicate detection (SHA-256 content hash)
  3. Agent 1 — receipt number, date, category
  4. Agent 2 — counterparty (vendor or client)
  5. Agent 3 — amounts (total, VAT %, VAT amount)
  6. Agent 4 — line items
  7. Python merge of the 4 results → ReceiptData
  8. Validate + auto-save to SQLite

All 4 agents use the same local LLM (configured via FINAMT_AGENT_MODEL).
They run sequentially — not in parallel — for local model compatibility.
Debug output is saved to ~/.finamt/debug/<receipt_id>/.
"""

from __future__ import annotations

import logging
import shutil
import time
from pathlib import Path

from finamt import progress as _progress

from ..exceptions import InvalidReceiptError, OCRProcessingError
from ..models import ExtractionResult, ReceiptData, _content_hash
from ..ocr_processor import OCRProcessor
from ..storage.project import ProjectLayout, layout_from_db_path, resolve_project
from ..storage.sqlite import SQLiteRepository
from .config import AgentsConfig, Config
from .pipeline import run_pipeline

logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())

_UNSET = object()  # sentinel: distinguish "not passed" from None


[docs] class FinanceAgent: """ Orchestrates OCR + multi-agent extraction and auto-persists every result. Args: config: Optional Config instance (reads .env by default). project: Project name — determines ~/.finamt/<project>/ layout. Default: "default" (or FINAMT_PROJECT env var). db_path: Explicit SQLite path — overrides project layout. Pass None to disable persistence entirely. agents_cfg: Optional AgentsConfig — controls which models are used. """ def __init__( self, config: Config | None = None, project: str | None = None, db_path: str | Path | None = _UNSET, agents_cfg: AgentsConfig | None = None, ) -> None: self.config = config or Config() self.agents_cfg = agents_cfg or AgentsConfig() self.ocr = OCRProcessor(self.config) # Resolve project layout if db_path is not _UNSET and db_path is not None: # Explicit path takes precedence — infer layout from it self._layout: ProjectLayout | None = layout_from_db_path(Path(db_path)) self._db_path: Path | None = Path(db_path) elif db_path is None: # Explicitly disabled — no persistence self._layout = resolve_project(project) if project else None self._db_path = None else: # Normal case — derive everything from project name self._layout = resolve_project(project) self._db_path = self._layout.db_path # ------------------------------------------------------------------ # Public API # ------------------------------------------------------------------
[docs] def process_receipt( self, pdf_path: str | Path | bytes, receipt_type: str = "purchase", taxpayer_info: dict | None = None, ) -> ExtractionResult: """ Process a receipt or invoice PDF through the full pipeline. Args: pdf_path: Filesystem path or raw PDF bytes. receipt_type: "purchase" (default) or "sale". taxpayer_info: Optional dict with taxpayer's own data so Agent 2 does not confuse the taxpayer with the counterparty. Keys: name, vat_id, tax_number, address. Returns ExtractionResult — always populated, success flag indicates whether the result passed validation and was saved. """ start = time.monotonic() try: # 1 — OCR ------------------------------------------------------- name = Path(pdf_path).name if isinstance(pdf_path, (str, Path)) else "<bytes>" _progress.emit(f"[finamt] {name}") _progress.emit(f" {time.strftime('[%H:%M:%S]')} → PyMuPDF ...") raw_text = self.ocr.extract_text_from_pdf(pdf_path) if not raw_text.strip(): return ExtractionResult( success=False, error_message="No text could be extracted from the PDF.", processing_time=time.monotonic() - start, ) # 2 — Duplicate check ------------------------------------------- content_id = _content_hash(raw_text) if self._db_path: with SQLiteRepository(self._db_path) as repo: if repo.exists(content_id): existing = repo.get(content_id) logger.info("Duplicate detected: %s", content_id) return ExtractionResult( success=True, data=existing, duplicate=True, existing_id=content_id, processing_time=time.monotonic() - start, ) # 3-7 — Multi-agent extraction ---------------------------------- _progress.emit(f" {time.strftime('[%H:%M:%S]')} → Extraction pipeline ...") pdf_file_path: Path | None = ( Path(pdf_path) if isinstance(pdf_path, (str, Path)) else None ) receipt_data: ReceiptData = run_pipeline( raw_text=raw_text, pdf_path=pdf_file_path, receipt_type=receipt_type, cfg=self.agents_cfg, receipt_id=content_id, debug_root=self._layout.debug_dir if self._layout else None, taxpayer_info=taxpayer_info, ) # 8 — Validate (collects warnings; never blocks) ----------------- receipt_data.validate() # populates receipt_data.validation_warnings # Receipts with warnings are saved and shown with a warning banner # in the UI — the user decides to correct or delete them. # 9 — Save + PDF copy ------------------------------------------- if self._db_path: with SQLiteRepository(self._db_path) as repo: repo.save(receipt_data) if pdf_file_path: self._store_pdf(pdf_file_path, receipt_data.id) return ExtractionResult( success=True, data=receipt_data, processing_time=time.monotonic() - start, ) except (OCRProcessingError, InvalidReceiptError) as exc: logger.error("%s: %s", type(exc).__name__, exc) return ExtractionResult( success=False, error_message=str(exc), processing_time=time.monotonic() - start, ) except Exception as exc: # noqa: BLE001 logger.exception("Unexpected error processing receipt.") return ExtractionResult( success=False, error_message=f"Unexpected error: {exc}", processing_time=time.monotonic() - start, )
[docs] def batch_process( self, pdf_paths: list[str | Path], receipt_type: str = "purchase", ) -> dict[str, ExtractionResult]: """Process multiple receipts sequentially.""" return {str(p): self.process_receipt(p, receipt_type=receipt_type) for p in pdf_paths}
# ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ def _store_pdf(self, src: Path, receipt_id: str) -> None: """Copy the original file to <project>/pdfs/<id>.<ext>, preserving the source extension.""" if not src.exists(): return try: pdf_dir = self._layout.pdfs_dir if self._layout else self._db_path.parent / "pdfs" pdf_dir.mkdir(parents=True, exist_ok=True) # Preserve original extension so images stay as .png / .jpg etc. ext = src.suffix.lower() or ".pdf" dest = pdf_dir / f"{receipt_id}{ext}" if not dest.exists(): shutil.copy2(src, dest) logger.info("File stored: %s", dest) except Exception as exc: # noqa: BLE001 logger.warning("Could not store file copy: %s", exc)