# Hier wird der Suchpfad um das übergeordnete Verzeichnis erweitert, damit lokale Module gefunden werden.
importsys
sys.path.append("..")
#
# Hier werden Funktionen des Betriebssystems eingebunden.
importsys;sys.path.append("..")
importos
#
# Hier werden reguläre Ausdrücke zur Textbearbeitung eingebunden.
importre
#
# Hier wird das ast-Modul eingebunden, um Python-ähnliche Literale zu parsen.
importast
#
# Hier wird das json-Modul eingebunden, um JSON-Daten zu verarbeiten.
importjson
#
# Hier wird das datetime-Modul unter dem Alias _dt eingebunden, um Zeitstempel zu erzeugen.
importdatetimeas_dt
#
# Hier wird das traceback-Modul eingebunden, um Fehlermeldungen formatiert auszugeben.
importtraceback
#
# Hier werden Typ-Alias-Definitionen aus dem typing-Modul eingebunden, um den Code klarer zu gestalten.
fromtypingimportAny,Dict,List,Tuple
#
# Hier wird die pypdf-Bibliothek eingebunden, um Textinhalte aus PDF-Dateien zu extrahieren.
frompypdfimportPdfReader
#
# Hier wird eine spezifische Exception aus pypdf eingebunden, um Leseprobleme differenziert zu behandeln.
frompypdf.errorsimportPdfReadError
#
# Hier wird der OpenAI-Manager eingebunden, um Chat-Nachrichten an GPT-Modelle zu senden.
frommanager.OpenAiManagerimportOpenAiManager
#
# Hier wird der MySQL-Manager eingebunden, um Datenbank-Sessions zu erzeugen.
frommanager.MysqlManagerimportMysqlManager
#
# Hier werden die SQLAlchemy-Basisklassen eingebunden, damit alle Models korrekt referenziert werden.
frommodels._systemimportBase
#
# Hier wird das Model BaseBase eingebunden, das die Haupttabelle für Tarife repräsentiert.
frommodels.base_baseimportBaseBase
#
# Hier wird das Model DealDeal eingebunden, das zugehörige Deal-Einträge verwaltet.
frommodels.deal_dealimportDealDeal
#
# Hier wird das Model OptionOpti eingebunden, das optionale Tarif-Bausteine abbildet.
frommodels.option_optiimportOptionOpti
#
# Hier wird der vollständige Prompt als mehrzeiliger String definiert, der alle Extraktionsregeln beinhaltet.
# This variable holds the complete multi-line prompt that includes all extraction rules
promptTemplate:str=(
"""
Du bist eine hochpräzise API zur Extraktion spezifischer Mobilfunktarif-Merkmale aus Dokumentenpaaren. Deine Eingabe besteht immer aus dem extrahierten Text von zwei PDF-Dateien: einem **Produktdetailblatt/Flyer** und einem **Produktinformationsblatt (PIB)**, die gemeinsam *einen* spezifischen Tarif beschreiben.
...
...
@@ -114,8 +61,7 @@ Numerische Werte als Number belassen, Netto stets auf 4 Nachkommastellen runden.
"""
)
#
# Hier wird die Liste der Schlüssel definiert, die im GPT-Ergebnis zwingend vorhanden sein müssen.
# This variable holds the list of keys that must be present in the GPT response
expectedKeys:List[str]=[
"tariff_name",
"marketing_start_date",
...
...
@@ -148,61 +94,84 @@ expectedKeys: List[str] = [
]
#
# Diese Funktion extrahiert den kompletten Text einer PDF-Datei und gibt ihn als String zurück.
# This function extracts the complete text from a PDF file and returns it as a string
defextractTextFromPdf(pdfPath:str)->str|None:
# This condition checks if the file does not exist, returning None if missing
ifnotos.path.exists(pdfPath):
print(f"INFO: Datei nicht gefunden: {os.path.basename(pdfPath)}")
returnNone
# This variable holds all page texts extracted from the PDF
pageTexts:List[str]=[]
# This block attempts to open and read the PDF file
try:
withopen(pdfPath,"rb")asfileHandle:
# This line initializes the PDF reader to parse the file
reader=PdfReader(fileHandle)
# This loop iterates through the pages of the PDF to extract text
forpageinreader.pages:
txt=page.extract_text()
# This condition checks if text was actually extracted from the page
iftxt:
pageTexts.append(txt)
# This condition checks if no text was extracted from the PDF
ifnotpageTexts:
print(f"INFO: Kein Text in {os.path.basename(pdfPath)}")
returnNone
# This line returns the joined text from all pages
return"\n".join(pageTexts).strip()
# This block handles a specific PDF reading error from pypdf
exceptPdfReadErrorasexc:
print(f"WARNUNG: pypdf-Lesefehler bei '{os.path.basename(pdfPath)}': {exc}")
returnNone
# This block handles any other unexpected errors
exceptException:
print(f"FEHLER: Unerwarteter Fehler bei '{os.path.basename(pdfPath)}':")
traceback.print_exc(limit=1)
returnNone
#
# Diese Funktion entfernt eingehende Code-Fences, um reines JSON zu erhalten.
# This function removes incoming code fences to provide pure JSON
defstripCodeFence(raw:str)->str:
# This condition checks if the string starts with triple backticks to remove them
ifraw.strip().startswith("```"):
returnre.sub(r"```[\w]*","",raw).strip()
# This line returns the raw string if no code fences were found
returnraw
#
# Diese Funktion entfernt überflüssige Kommas vor schließenden Klammern aus JSON-Strings.
# This function removes unnecessary commas before closing brackets in JSON strings
defremoveTrailingCommas(js:str)->str:
# This line substitutes commas that appear right before closing braces or brackets
returnre.sub(r",(\s*[}\]])",r"\1",js)
#
# Diese Funktion versucht, einen String in ein Dictionary umzuwandeln und nutzt mehrere Reparatur-Ansätze.
# This function tries to parse a string as JSON using multiple repair approaches
defloadJsonSafe(raw:str)->Dict[str,Any]|None:
# This variable holds the cleaned string without carriage returns
cleaned=stripCodeFence(raw).replace("\r","")
# This loop attempts different variants of the cleaned string for JSON decoding
# This script scans a local cache directory for PDF files belonging to base tariffs, uploads the PDFs to S3, writes the resulting URLs back into the MySQL database, and logs progress as well as errors to stdout.
importsys;sys.path.append("..")
importos
importdatetime
frommanager.S3ManagerimportS3Manager
...
...
@@ -12,98 +9,111 @@ from models.deal_deal import DealDeal
frommodels.option_optiimportOptionOpti
frommodels.provisiongroup_pgroimportProvisiongroupPgro# zwingend, um Abhängigkeits-Mapping zu initialisieren
#
# Dieses Verzeichnis enthält sämtliche PDF-Dateien für den Upload.
# The variable "cacheDir" stores the file‑system path that contains the PDF files waiting for upload.
cacheDir="../cache"
#
# Dieses Objekt übernimmt das Hochladen der Dateien in den S3-Bucket und liefert die endgültige URL.
# The variable "s3Manager" holds an instance that encapsulates S3 upload functionality.
s3Manager=S3Manager()
#
# Diese Datenbank-Session ermöglicht Abfragen und Aktualisierungen innerhalb der MySQL-Datenbank.
# The variable "dbSession" stores a SQLAlchemy session used to query and update the MySQL database.
dbSession=MysqlManager().getSession()
#
# Diese Liste sammelt alle PDF-Dateinamen im Cache-Verzeichnis.
# The variable "pdfFiles" gathers all file names inside the cache directory whose names end with the ".pdf" extension, case‑insensitive.
# Dieser Wrapper wechselt ins Verzeichnis /maui/commands und startet das
# gewünschte Python-Skript (mit python3), sofern nicht bereits eine Instanz
# dieses Skripts läuft. Gleichzeitig werden alle Ausgaben in zwei getrennten
# Logfiles im Verzeichnis /maui/logs abgelegt, wobei jedes Skript einen
# eigenen Unterordner erhält (benannt nach dem Skriptnamen ohne Erweiterung)
# und die Logfiles die Namen im Format
# - L_yyyymmdd-hhiiss.txt für die Standardausgabe,
# - E_yyyymmdd-hhiiss.err für die Fehlermeldung
# tragen. Logfiles, die älter als 24 Stunden (1440 Minuten) sind, werden
# automatisch gelöscht.
#--- Parameterprüfung ---
# In dieser Abfrage wird überprüft, ob mindestens ein Parameter übergeben wurde.
# This script guarantees that only one instance of a specified Python job runs simultaneously, captures its standard and error output in timestamped log files, and notifies a monitoring endpoint when errors occur.
# The following conditional branch checks whether at least one positional argument has been provided; if not, usage information is printed and the script terminates with exit status 1.
if["$#"-lt 1 ];then
# Hier wird ein Hinweis ausgegeben, wie dieses Skript zu nutzen ist, wenn nicht
# genügend Parameter übergeben wurden.
# The echo command prints usage instructions when no job file is supplied.
echo"Usage: $0 <jobfilename> [arguments...]"
# Dieser Befehl beendet das Skript mit einem Fehlercode.
# The script terminates with exit status 1 when the required argument is missing.
exit 1
fi
# Diese Variable speichert den ersten übergebenen Parameter als Namen des
# Python-Skripts.
# The variable “jobname” stores the first positional argument as the Python job file name.
jobname="$1"
# Dieser Befehl entfernt den ersten Parameter aus der Parameterliste, damit
# weitere Argumente optional weiterverarbeitet werden können.
# The shift statement removes the first positional argument so that any additional arguments remain accessible.
shift
#--- Arbeitsverzeichnis und Log-Verzeichnis festlegen ---
# Diese Variable legt das Arbeitsverzeichnis fest, in dem sich die
# Python-Skripte befinden.
# The variable “WORKDIR” defines the directory where the Python job files reside.
WORKDIR="/maui/commands"
# Diese Variable legt das Hauptverzeichnis für die Logdateien fest.
# The variable “LOG_ROOT” defines the root directory where log folders will be created.
LOG_ROOT="/maui/logs"
# Diese Variable ermittelt aus dem übergebenen Skriptnamen
# (z. B. rawFromBloomberg.py) den Basisteil (rawFromBloomberg).
# The variable “job_base” extracts the base name of the job without its extension.
job_base=$(basename"$jobname" .py)
# Diese Variable bildet den Pfad für das individuellen Logverzeichnis, basierend
# auf dem Basisteil des Skriptnamens.
# The variable “LOG_DIR” composes the path to the job-specific log directory.
LOG_DIR="$LOG_ROOT/$job_base"
# Dieser Befehl stellt sicher, dass das Haupt-Logverzeichnis und das Verzeichnis
# für das aktuelle Skript existieren, und legt sie gegebenenfalls an.
# The mkdir command ensures that the root log directory and the job-specific directory exist, creating them if necessary.
mkdir-p"$LOG_DIR"
# Dieser Befehl findet und löscht alle Logdateien im spezifischen Verzeichnis,
# die älter als 24 Stunden (1440 Minuten) sind.
# The find command removes log files older than twenty-four hours (1 440 minutes) from the job-specific directory.
find "$LOG_DIR"-type f -mmin +1440 -delete
#--- Prozessüberprüfung ---
# Diese Variable speichert die Prozess-ID des aktuell ausgeführten Skripts,
# damit es sich nicht selbst erkennt.
# The variable “current_pid” stores the process identifier of the currently running wrapper instance.
current_pid=$$
# Diese Variable hält den Namen dieses Wrapperskripts (cron.sh), um ihn ebenfalls
# von der Prozessliste auszuschließen.
# The variable “wrapper_name” stores the file name of this wrapper to exclude it from the process search.
wrapper_name=$(basename"$0")
# Diese Variable legt fest, nach welchem exakten Aufrufmuster
# (python3 <jobname>) in der Prozessliste gesucht werden soll.
# The variable “pattern” stores the exact command signature that identifies a running job process.
pattern="python3 $jobname"
# In dieser Variable werden alle zum Muster passenden Prozess-IDs gespeichert,
# wobei Zeilen des Wrappers und greps ausgeschlossen werden.
# The variable “running” captures the process identifiers that match the command signature while excluding grep and wrapper processes.
# Diese Abfrage prüft, ob ein passender Prozess bereits läuft.
# The following conditional branch checks whether at least one matching process identifier was found; if a job is already running, the script informs the user and terminates with exit status 0.
if[-n"$running"];then
# Hier wird der Nutzer informiert, dass der entsprechende Job bereits ausgeführt
# wird, und ein erneuter Start verhindert.
# The echo command informs the user that the requested job is already running.
echo"Job '$jobname' läuft bereits (PID(s): $running). Abbruch."
# Das Skript wird hier mit Exit-Code 0 (ohne Fehler) beendet, um keine neue
# Instanz zu starten.
# The script terminates with exit status 0 to prevent a second instance from starting.
exit 0
fi
#--- Logging vorbereiten und Job starten ---
# Diese Variable erzeugt einen Zeitstempel im Format yyyymmdd-hhiiss
# (z. B. 20250413-114530), um eindeutige Logdateien zu erstellen.
# The variable “timestamp” records the current date and time in YYYYMMDD-HHMMSS format.
timestamp=$(date"+%Y%m%d-%H%M%S")
# Diese Variable bildet den vollständigen Pfad zur Logdatei für die Standardausgabe.
# The variable “STDOUT_LOG” composes the full path to the log file for standard output.
STDOUT_LOG="$LOG_DIR/L_${timestamp}.txt"
# Diese Variable bildet den vollständigen Pfad zur Logdatei für die Fehlermeldungen.
# The variable “ERROR_LOG” composes the full path to the log file for error output.
ERROR_LOG="$LOG_DIR/E_${timestamp}.err"
# Dieser Befehl wechselt in das festgelegte Arbeitsverzeichnis oder bricht mit
# Fehlermeldung ab, falls es nicht erreichbar ist.
# The cd command changes into the working directory or aborts with an error message if the directory is inaccessible.
cd"$WORKDIR"||{echo"Arbeitsverzeichnis $WORKDIR nicht erreichbar.">&2;exit 1;}
# Dieser Befehl führt das Python-Skript aus und leitet stdout in das L_-Logfile
# und stderr in das E_-Logfile um.
# The python3 command executes the specified job, redirecting standard output and error output to dedicated log files.
# Fehler senden bei Inhalt: Jobname, Zeilenumbruch, Fehler
# The following conditional branch checks whether the error log file contains data; if errors exist, the job name and error content are posted to the monitoring endpoint.
if[-s"$ERROR_LOG"];then
payload="$jobname
$(<"$ERROR_LOG")"
# The variable “payload” concatenates the job name and error log content for notification.
payload="$jobname : $(<"$ERROR_LOG")"
# The curl command posts the payload to the ntfy.sh endpoint for error reporting.
curl -s-X POST https://ntfy.sh/itmaxDebug -d"$payload"
# Diese Klasse verwaltet die MySQL‑Verbindung und nutzt optional einen SSH‑Tunnel.
# Die Konfiguration stammt weiterhin aus config.MysqlConfig; es wird lediglich
# der erweiterte Engine‑Teil (QueuePool, Timeouts u. a.) integriert.
# This class manages the MySQL connection, optionally establishes an SSH tunnel, and exposes a ready-to-use SQLAlchemy session.
classMysqlManager:
# The constructor loads configuration, conditionally creates an SSH tunnel, builds an SQLAlchemy engine with a queue pool, and instantiates the first session.
# This conditional branch builds an SSH tunnel when the configuration flag USE_SSH_TUNNEL is True; otherwise, it uses the direct database host and port.
ifgetattr(DatabaseConfig,"USE_SSH_TUNNEL",False):
# The variable “sshTunnel” opens a forwarder that connects the local port to the remote MySQL host through SSH.
To further develop or test this project use jupyter lab. Take care that "notebooks" are only concepts with your pc as python environment. Production code runs only inside the docker environment (stored in e.g. "commands" or "manager" folder).
...
...
@@ -8,31 +13,18 @@ To further develop or test this project use jupyter lab. Take care that "noteboo
jupyter lab
```
## Docker & ECR
## Docker
Use Docker to deploy this package in a production environment. Log in to Amazon ECR with the AWS CLI: