Commit 11270df4 authored by Marco Schmiedel's avatar Marco Schmiedel

fix

parent b33ee70f
......@@ -2,11 +2,11 @@
"fileId": "38da158f-ad91-433f-8b7b-60ff4949d7ff",
"originalPath": "work/config/_CronConfig.txt",
"currentPath": "work/config/_CronConfig.txt",
"hash": "174ef9fe04e4d6aebb38573991945d535ec049a7e3069a8c033cd8e7ee30820e",
"hash": "6163e59558a5880e7708c36a39ef2d5bc25a1b24b9be547bdc4c654f2c2cd495",
"docContent": "<p><br></p>",
"checkedStatus": "done",
"comments": [],
"lastCheckedTimestamp": 1749816820741,
"lastFileModificationTimestamp": 1749816813585.9177,
"lastCheckedTimestamp": 1753176964166,
"lastFileModificationTimestamp": 1753176714171.083,
"hash_version": 2
}
......@@ -5,7 +5,13 @@
"hash": "35d56b9bc420e57388faa4e15e12cde381048f016b40307c4ed7f829e9aac7e4",
"docContent": "<p><br></p>",
"checkedStatus": "done",
"comments": [],
"comments": [
{
"commentId": "48748f93-8a7a-4f42-b5cb-71e8007dccf1",
"text": "For information about this script, refer to the base tutorial series in the README file.",
"timestamp": 1753170811938
}
],
"lastCheckedTimestamp": 1747070815750,
"lastFileModificationTimestamp": 1747070802673.0312,
"hash_version": 2
......
{
"fileId": "5a3f6886-edd3-48d5-935d-f15a42e82bac",
"originalPath": "work/commands/downloadDataFromMauiPartnercard.py",
"currentPath": "work/commands/downloadDataFromMauiPartnercard.py",
"hash": "dcf9d6bada1a0e0b7e1d9608174b5c5e3ae06efdc15f32219e599b266f71f74f",
"hash_version": 2,
"docContent": "<p><br></p>",
"checkedStatus": "done",
"comments": [
{
"commentId": "e85986cf-a5ed-4d22-a9e5-69ad29f26082",
"text": "https://s3.eu-central-1.amazonaws.com/monosnap.bugsmasher.online/marcoschmiedel/2025-07-22.m4v",
"timestamp": 1753177338921
}
],
"lastCheckedTimestamp": 1753177340827,
"lastFileModificationTimestamp": 1753176234527.168
}
......@@ -5,7 +5,13 @@
"hash": "3e1df1c401fbd00595912583ee4d4d2bb45364f81daca29113db874515eee0a0",
"docContent": "<p><br></p>",
"checkedStatus": "done",
"comments": [],
"comments": [
{
"commentId": "9b3b2e00-18f1-44cb-a752-0b74eeb492d8",
"text": "For information about this script, refer to the base tutorial series in the README file.",
"timestamp": 1753170808598
}
],
"lastCheckedTimestamp": 1750323681845,
"lastFileModificationTimestamp": 1750323366660.076,
"hash_version": 2
......
......@@ -2,11 +2,22 @@
"fileId": "62aea232-2549-437e-b5a9-72cb2aa92d16",
"originalPath": "work/commands/calculateTarifDetailsWithGpt.py",
"currentPath": "work/commands/calculateTarifDetailsWithGpt.py",
"hash": "6de592dae63250612a453932e1e344699a550e4438c16813d54ba4bf2a13c785",
"hash": "9a99857070c6c9066089985619e08d943c5c3ec785113239e48ff83d3e352792",
"docContent": "<p><br></p>",
"checkedStatus": "done",
"comments": [],
"lastCheckedTimestamp": 1747071244862,
"lastFileModificationTimestamp": 1747071237273.2832,
"comments": [
{
"commentId": "4face178-ab9f-4870-bbf9-73befd035a80",
"text": "For information about this script, refer to the base tutorial series in the README file.",
"timestamp": 1753170818842
},
{
"commentId": "22763243-229a-48ca-980b-8eb27433ba6e",
"text": "I've added a static field \"is_partnercard\" based on the partnercard.csv that is created by downloadDataFromMauiPartnercard.py.",
"timestamp": 1753177018105
}
],
"lastCheckedTimestamp": 1753176977286,
"lastFileModificationTimestamp": 1753169638397.8057,
"hash_version": 2
}
......@@ -5,7 +5,13 @@
"hash": "4c972fa8de12b095edb942fc260533235001e5b7a508a4e058e28f45340ebc59",
"docContent": "<p><br></p>",
"checkedStatus": "done",
"comments": [],
"comments": [
{
"commentId": "02d64edc-151e-46cd-ad31-7855739ed216",
"text": "For information about this script, refer to the base tutorial series in the README file.",
"timestamp": 1753170814981
}
],
"lastCheckedTimestamp": 1750663401713,
"lastFileModificationTimestamp": 1750662226645.7258,
"hash_version": 2
......
......@@ -2,7 +2,7 @@
"fileId": "986eeb57-8634-4f40-a4ea-a2eae9d87e71",
"originalPath": "work/readme.md",
"currentPath": "work/README.md",
"hash": "4f572de1efd35b429b45d9932e8ffa93153c9b0a421c7afec99b4af109aa87d1",
"hash": "756bb90539f71f054db700f99b053cf8e8b94a2d17499ce170cc1ba5db7276b3",
"docContent": "<p><br></p>",
"checkedStatus": "done",
"comments": [
......@@ -17,7 +17,7 @@
"timestamp": 1747069658074
}
],
"lastCheckedTimestamp": 1750683280697,
"lastFileModificationTimestamp": 1750683272808.5942,
"lastCheckedTimestamp": 1753177346533,
"lastFileModificationTimestamp": 1753177332762.9363,
"hash_version": 2
}
......@@ -2,17 +2,22 @@
"fileId": "e3281330-5559-49da-9434-bf3cccd4ddae",
"originalPath": "work/commands/calculateAiPrices.py",
"currentPath": "work/commands/calculateAiPrices.py",
"hash": "343345985f2e8153b4e9e6a7efa0479c3938e68080388f23e537609f14cafded",
"hash": "5853eddf6c87959b28413a7d63edaef015cde95c13b6b28d13a0d24dfdb30af2",
"hash_version": 2,
"docContent": "<p><br></p>",
"checkedStatus": "todo",
"checkedStatus": "done",
"comments": [
{
"commentId": "1d5f60af-fe8c-46f1-8a99-806b835d8ed6",
"text": "Maybe not stable... Need to run this a few weeks on prod...",
"timestamp": 1750682733287
},
{
"commentId": "1180a758-e39f-429f-ac9b-415747f29e56",
"text": "https://s3.eu-central-1.amazonaws.com/monosnap.bugsmasher.online/marcoschmiedel/2025-06-23.m4v",
"timestamp": 1753170856993
}
],
"lastCheckedTimestamp": 1750682714405,
"lastFileModificationTimestamp": 1750681385737.0137
"lastCheckedTimestamp": 1753177024777,
"lastFileModificationTimestamp": 1753169895196.4453
}
......@@ -12,13 +12,27 @@ from models.deal_deal import DealDeal
from models.option_opti import OptionOpti
from models.aiprice_aipr import AipriceAipr
# Konfiguriert ein einfaches Logging, um den Skriptverlauf zu sehen.
# This configures basic logging to monitor the script's execution flow.
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s")
def get_validated_response(gpt_manager: OpenAiManager, name: str) -> dict | None:
"""Sendet eine Anfrage an GPT und validiert die JSON-Antwort auf Konsistenz."""
"""
Sends a request to an LLM to extract structured pricing data from a name string.
This function employs a consistency-checking mechanism to ensure a high-quality
response. It queries the LLM repeatedly (up to 10 times) until it receives
two consecutive, identical, and structurally valid JSON responses. This
mitigates the risk of model hallucinations or inconsistent outputs.
Args:
gpt_manager: An instance of the OpenAiManager to handle the API call.
name: The name string of the mobile plan option to be analyzed.
Returns:
A dictionary containing the structured pricing data if a consistent
response is obtained, otherwise None.
"""
prompt = """
# Teil 1 – Aufgabe
......@@ -333,13 +347,13 @@ def get_validated_response(gpt_manager: OpenAiManager, name: str) -> dict | None
last_response = None
# GEÄNDERT: Schleife für bis zu 10 Versuche, um eine konsistente Antwort zu erhalten
# This loop runs up to 10 times to get a consistent and valid response.
for attempt in range(1, 11):
logging.info(f"-> GPT-Versuch {attempt}/10 für '{name}'...")
try:
raw_response = gpt_manager.chat(prompt, model="gpt-4.1")
# Bereinige und parse die JSON-Antwort
# This block cleans and parses the JSON response from the LLM.
if raw_response.strip().startswith("```"):
cleaned_response = re.sub(r"```[\w]*", "", raw_response).strip()
else:
......@@ -347,35 +361,42 @@ def get_validated_response(gpt_manager: OpenAiManager, name: str) -> dict | None
data = json.loads(cleaned_response)
# Prüfe, ob die Struktur der Antwort korrekt ist
# This block validates the structure of the JSON response.
if isinstance(data, dict) and all(key in data for key in expected_keys):
# Prüfe, ob die Antwort mit der letzten übereinstimmt
# This condition checks if the response is identical to the previous valid one.
if data == last_response:
logging.info(f"-> Konsistente Antwort für '{name}' in Versuch {attempt} erhalten. Daten sind gültig.")
return data
else:
# Speichere die erste gültige Antwort und fordere eine zweite zur Bestätigung an
# This line stores the first valid response to verify it against the next one.
last_response = data
logging.warning(f"-> Gültige, aber noch nicht bestätigte Antwort in Versuch {attempt} für '{name}'. Nächster Versuch zur Verifizierung.")
else:
logging.warning(f"-> Ungültige Datenstruktur in Versuch {attempt} für '{name}'.")
last_response = None # Setze zurück, da die Kette unterbrochen ist
last_response = None # This line resets consistency check if the chain of valid responses is broken.
except Exception as e:
logging.error(f"-> Fehler in Versuch {attempt} für '{name}': {e}")
last_response = None # Setze bei Fehler zurück
last_response = None # This line resets consistency check on error.
logging.error(f"-> Konnte nach 10 Versuchen keine zwei aufeinanderfolgenden, identischen Antworten für '{name}' erhalten.")
return None
def sync_names_to_aiprice(session: Session):
"""PROZESS 1: Synchronisiert eindeutige Namen in die aiprice_aipr Tabelle."""
"""
Process 1: Synchronizes new, relevant names into the aiprice_aipr table.
This function scans the deal_deal and option_opti tables for entries that
likely contain pricing information, based on a list of keywords. It then
inserts any unique names not already present in aiprice_aipr, preparing
them for analysis in the next step.
"""
logging.info("Starte Prozess 1: Synchronisiere Namen.")
# Lade existierende Keys, um Duplikate zu vermeiden
# This line loads all existing primary keys to avoid inserting duplicates.
existing_keys = {key for key, in session.query(AipriceAipr.key_aipr)}
# Schlüsselwörter, die für den Import berücksichtigt werden sollen
# This list defines keywords to identify deals and options that likely contain pricing information.
include_keywords = [
"%Sonderbonus%",
"%Vergütungsverzicht%",
......@@ -387,11 +408,11 @@ def sync_names_to_aiprice(session: Session):
"%Monatsgrundpreis%",
]
# Erstelle eine Liste von 'like'-Bedingungen für die Filterung
# This line creates a list of 'like' conditions for SQLAlchemy filtering.
deal_conditions = [DealDeal.name_deal.like(keyword) for keyword in include_keywords]
option_conditions = [OptionOpti.name_opti.like(keyword) for keyword in include_keywords]
# Lade eindeutige Namen aus den Quelltabellen und wende die Einschlussfilter an
# These queries fetch distinct, non-empty names from source tables based on the keyword filters.
deals = session.query(DealDeal.name_deal).distinct().filter(
DealDeal.name_deal != "",
or_(*deal_conditions)
......@@ -403,12 +424,12 @@ def sync_names_to_aiprice(session: Session):
or_(*option_conditions)
).all()
# Kombiniere und erstelle neue Einträge
# This block combines names from both sources and prepares new entries.
new_entries = []
for name, category in [(d[0], 'deal') for d in deals] + [(o[0], 'opti') for o in options]:
if name not in existing_keys:
new_entries.append(AipriceAipr(key_aipr=name, category_aipr=category))
existing_keys.add(name) # Verhindert doppeltes Hinzufügen im selben Lauf
existing_keys.add(name) # This line prevents adding the same key twice in one run.
if new_entries:
session.add_all(new_entries)
......@@ -420,11 +441,19 @@ def sync_names_to_aiprice(session: Session):
def enrich_aiprice_with_gpt(session: Session):
"""PROZESS 2: Reichert Einträge ohne response_aipr sofort einzeln an."""
"""
Process 2: Enriches records in aiprice_aipr with structured data from an LLM.
This function retrieves all records from the aiprice_aipr table that have
not yet been processed (i.e., `response_aipr` is NULL). It sends each
record's name to the LLM via `get_validated_response` and saves the
resulting structured JSON data back to the database, committing each
record individually to ensure progress is saved.
"""
logging.info("Starte Prozess 2: Reichere Daten mit GPT an.")
gpt_manager = OpenAiManager()
# Verarbeite nur Einträge, bei denen die Antwort noch fehlt
# This query selects only records from the aiprice table that have not yet been processed.
items_to_process = session.query(AipriceAipr).filter(AipriceAipr.response_aipr.is_(None)).all()
if not items_to_process:
......@@ -433,13 +462,15 @@ def enrich_aiprice_with_gpt(session: Session):
logging.info(f"{len(items_to_process)} Einträge werden verarbeitet.")
# This loop processes each item individually.
for item in items_to_process:
validated_data = get_validated_response(gpt_manager, item.key_aipr)
# Speichere sofort, wenn die Daten gültig sind
# This block saves the result if the response data is valid and consistent.
if validated_data:
item.response_aipr = validated_data
try:
# This line commits the change for the current item immediately.
session.commit()
logging.info(f"Eintrag für '{item.key_aipr}' erfolgreich gespeichert.")
except Exception as e:
......@@ -448,11 +479,14 @@ def enrich_aiprice_with_gpt(session: Session):
logging.info("Prozess 2 abgeschlossen.")
# --- Hauptausführung ---
# This block is the main entry point for the script.
if __name__ == "__main__":
logging.info("=== Starte kombinierte Ausführung: sync & enrich ===")
# This line initializes a new database session.
db_session = MysqlManager().getSession()
# This line executes the first process to sync new names.
sync_names_to_aiprice(db_session)
enrich_aiprice_with_gpt(db_session)
# This line executes the second process to enrich the new names with data.
enrich_aiprice_with_gpt(db_session)
\ No newline at end of file
......@@ -3,6 +3,7 @@ import os
import re
import ast
import json
import csv
import datetime as _dt
import traceback
from typing import Any, Dict, List, Tuple
......@@ -215,6 +216,24 @@ if __name__ == "__main__":
if not os.path.isdir(cacheDir):
print("FEHLER: Cache-Verzeichnis nicht gefunden.")
sys.exit(1)
# Load all partner card IDs from the CSV for quick lookup
partnercardsCsvPath = os.path.join(cacheDir, "partnercards.csv")
partnercardIds = set()
if os.path.exists(partnercardsCsvPath):
print(f"INFO: Lese Partnercard-IDs aus '{partnercardsCsvPath}'...")
try:
with open(partnercardsCsvPath, mode='r', newline='', encoding='utf-8') as f:
reader = csv.reader(f, delimiter=';')
header = next(reader) # Skip header
for row in reader:
if row:
partnercardIds.add(row[0])
print(f"INFO: {len(partnercardIds)} Partnercard-IDs geladen.")
except Exception as e:
print(f"WARNUNG: Fehler beim Lesen der Partnercard-CSV: {e}")
else:
print("INFO: Datei 'partnercards.csv' nicht gefunden. 'is_partnercard' wird immer false sein.")
# This list comprehension gathers all files with .pdf extension
pdfFiles = [f for f in os.listdir(cacheDir) if f.lower().endswith(".pdf")]
......@@ -311,6 +330,11 @@ if __name__ == "__main__":
if not validatedData:
print("FEHLER: Drei ungültige Antworten – übersprungen.")
continue
# Check if the current tariff ID is a partner card and add the flag to the data.
isPartnercard = tariffId in partnercardIds
validatedData['is_partnercard'] = isPartnercard
print(f"INFO: Feld 'is_partnercard' auf '{isPartnercard}' gesetzt.")
# This loop updates the details_base for each relevant BaseBase record
for br in baseRecords:
......@@ -326,4 +350,4 @@ if __name__ == "__main__":
# This line closes the database session after processing
dbSession.close()
print("INFO: Verarbeitung abgeschlossen.")
print("INFO: Verarbeitung abgeschlossen.")
\ No newline at end of file
import sys; sys.path.append("..")
import os
import csv
import datetime
import time
import pyotp
import re
import base64
from decimal import Decimal
import traceback
import shutil
import pandas as pd
import requests
from bs4 import BeautifulSoup
from selenium.webdriver.common.by import By
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support.ui import Select
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import TimeoutException
from selenium.common.exceptions import NoSuchElementException
from manager.SeleniumManager import SeleniumManager
from config.MauiConfig import MAUI_USERNAME, MAUI_PASSWORD, MAUI_AUTHCODE
from manager.MysqlManager import MysqlManager
from models._system import Base
from models.base_base import BaseBase
from models.deal_deal import DealDeal
from models.option_opti import OptionOpti
# This variable stores a unique set of category IDs to avoid duplicates.
uniqueCategorySet = set()
# This function downloads a PDF file as Base64 from the Selenium context and saves it.
def downloadPdfSelenium(seleniumDriver, pdfUrl, downloadFolder, fileName):
# This variable holds the asynchronous JavaScript code that requests the PDF as Base64.
downloadScript = """
var callback = arguments[arguments.length - 1];
var xhr = new XMLHttpRequest();
xhr.open('GET', arguments[0], true);
xhr.responseType = 'arraybuffer';
xhr.onload = function() {
var uInt8Array = new Uint8Array(xhr.response);
var binaryString = '';
for (var i = 0; i < uInt8Array.length; i++){
binaryString += String.fromCharCode(uInt8Array[i]);
}
var base64 = window.btoa(binaryString);
callback(base64);
};
xhr.onerror = function() { callback(null); };
xhr.send();
"""
# This variable stores the Base64 string that the script returns.
pdfBase64String = seleniumDriver.execute_async_script(downloadScript, pdfUrl)
# This if-structure checks if we have a valid Base64 result.
if not pdfBase64String:
# This line raises an exception if the PDF download failed.
raise Exception("Der PDF-Download per Selenium ist fehlgeschlagen.")
# This variable defines the full path where the PDF will be saved.
destinationPath = os.path.join(downloadFolder, fileName)
# This with-structure opens the file in write-binary mode and writes the decoded PDF data.
with open(destinationPath, "wb") as pdfFile:
# This line decodes the Base64 data and writes it into the file.
pdfFile.write(base64.b64decode(pdfBase64String))
# This function performs the login process using Selenium.
def login(seleniumManager, userName, userPassword, rawToken):
# This variable stores the Selenium driver after requesting the login page.
seleniumDriver = seleniumManager.simpleRequest("https://maui.md.de")
# This variable is a WebDriverWait object for waiting on elements to appear.
wait = WebDriverWait(seleniumDriver, 10)
# This variable holds the username field after it becomes present in the DOM.
usernameField = wait.until(EC.presence_of_element_located((By.ID, "mat-input-0")))
# This line sends the username to the username field.
usernameField.send_keys(userName)
# This variable holds the password field after it becomes present in the DOM.
userPasswordField = wait.until(EC.presence_of_element_located((By.ID, "mat-input-1")))
# This line sends the user password to the password field.
userPasswordField.send_keys(userPassword)
# This function call adds a short delay to ensure stability.
time.sleep(1)
# This variable holds the primary login button that will be clicked.
loginButtonElement = wait.until(EC.element_to_be_clickable((By.XPATH, "//button[.//span[contains(text(),'Anmelden')]]")))
# This line clicks the login button.
loginButtonElement.click()
# This variable creates a TOTP object that uses the provided token to generate the 2FA code.
totpGenerator = pyotp.TOTP(rawToken)
# This variable holds the current 2FA code from the TOTP generator.
twoFactorCode = totpGenerator.now()
# This variable holds the field for entering the 2FA code after it becomes present.
twoFactorField = wait.until(EC.presence_of_element_located((By.ID, "mat-input-2")))
# This line inputs the 2FA code into the field.
twoFactorField.send_keys(twoFactorCode)
# This variable holds the 2FA modal login button after it becomes clickable.
modalLoginButtonElement = wait.until(EC.element_to_be_clickable((By.XPATH, "//mat-dialog-actions//button[span[contains(text(),'Anmelden')]]")))
# This line clicks the 2FA button through JavaScript in case it is obscured.
seleniumDriver.execute_script("arguments[0].click();", modalLoginButtonElement)
# This function navigates to the Partnercard page and fills the initial form.
def openAndPreparePartnercard(seleniumManager):
# This variable is a WebDriverWait for the driver's instance.
wait = WebDriverWait(seleniumManager.driver, 20)
seleniumDriver = seleniumManager.driver
# This variable locates the link element for Partnercard after it becomes visible.
partnercardElement = wait.until(EC.presence_of_element_located((By.XPATH, "//a[contains(text(),'Partnercard')]")))
# This variable reads the href from the link and navigates the driver to it.
url = partnercardElement.get_attribute("href")
seleniumManager.driver.get(url)
# Wait for the new form elements to be ready
wait.until(EC.presence_of_element_located((By.NAME, "vorwahl_option")))
# 1) Select area code "0163"
vorwahlDropdown = Select(seleniumDriver.find_element(By.NAME, "vorwahl_option"))
vorwahlDropdown.select_by_value("0163")
# 2) Enter phone number "2877784"
durchwahlInput = seleniumDriver.find_element(By.NAME, "durchwahl")
durchwahlInput.send_keys("2877784")
# 3) Enter postal code "80335"
plzInput = seleniumDriver.find_element(By.NAME, "partner_plz")
plzInput.send_keys("80335")
# 4) Click on "Daten übernehmen"
datenUebernehmenButton = wait.until(EC.element_to_be_clickable((By.XPATH, "//a[contains(@href, 'get_customer')]")))
datenUebernehmenButton.click()
# Wait for the loading overlay to disappear
WebDriverWait(seleniumDriver, 60).until(
EC.invisibility_of_element_located((By.ID, "bg_layer"))
)
print("INFO: Partnercard-Daten übernommen, fahre mit Scraping fort.")
# This function checks if a dropdown is ready by waiting for overlays to disappear.
def waitForDropdownReady(seleniumDriver, wait, maxRetries=3, retryDelay=5):
# This for-structure attempts multiple retries to ensure the dropdown becomes ready.
for attemptIndex in range(maxRetries):
# This try-structure checks for invisible overlays or iframes that block interaction.
try:
# This print call is a debug message showing which attempt is in progress.
print(f"DEBUG: Warte auf Dropdown-Bereitschaft (Versuch {attemptIndex + 1}/{maxRetries})...")
# This line waits up to 60 seconds for a specific iframe to become invisible.
WebDriverWait(seleniumDriver, 60).until(
EC.invisibility_of_element_located((By.XPATH, "//iframe[contains(@src, 'wait.html')]"))
)
# This line waits up to 60 seconds for a specific overlay to become invisible.
WebDriverWait(seleniumDriver, 60).until(
EC.invisibility_of_element_located((By.ID, "bg_layer"))
)
# This line waits up to 20 seconds for the tariff dropdown to appear in the DOM.
wait.until(EC.presence_of_element_located((By.NAME, "tarif_id")))
# This print call is a debug message confirming the dropdown is ready.
print(f"DEBUG: Dropdown ist bereit (Versuch {attemptIndex + 1}).")
return True
# This except-structure handles a TimeoutException if the overlay remains visible too long.
except TimeoutException:
# This print call is a debug message warning that the attempt timed out.
print(f"DEBUG: Warnung: Timeout beim Warten auf Dropdown-Bereitschaft (Versuch {attemptIndex + 1}/{maxRetries}).")
# This if-structure checks if more attempts are allowed.
if attemptIndex < maxRetries - 1:
time.sleep(retryDelay)
else:
print(f"DEBUG: FEHLER: Konnte nach {maxRetries} Versuchen nicht auf Dropdown-Bereitschaft warten.")
break
# This except-structure captures any unexpected errors during the wait process.
except Exception as exception:
print(f"DEBUG: Unerwarteter Fehler beim Warten auf Dropdown (Versuch {attemptIndex + 1}/{maxRetries}): {exception}")
# This if-structure checks if more attempts are allowed.
if attemptIndex < maxRetries - 1:
time.sleep(retryDelay)
else:
print(f"DEBUG: FEHLER: Konnte nach {maxRetries} Versuchen wegen unerwartetem Fehler nicht auf Dropdown warten.")
break
# This line returns False if all retries have failed.
return False
# This function fetches the currently displayed tariff price and calculates the net price.
def parsePlanPrice(seleniumDriver):
# This variable initializes the net price to 0.0.
priceNet = 0.0
# This try-structure attempts to read the tariff price from preis_anzeige_tarif.
try:
priceElement = seleniumDriver.find_element(By.ID, "preis_anzeige_tarif")
priceText = priceElement.text
priceMatch = re.search(r'([\d\.,]+)\s*EUR', priceText)
# This if-structure checks if the regular expression found a match.
if priceMatch:
rawStr = priceMatch.group(1).replace(".", "").replace(",", ".")
grossPrice = float(rawStr)
priceNet = round(grossPrice / 1.19, 5)
# This except-structure handles any errors if the price cannot be extracted.
except Exception as exception:
print(f"DEBUG: Warnung: Konnte Tarifpreis nicht extrahieren: {exception}")
# This line returns the net price.
return priceNet
# This function parses available campaigns from the corresponding dropdown.
def parseCampaigns(seleniumDriver):
# This variable is a list that collects all found campaigns as tuples.
campaignsList = []
# This try-structure attempts to locate and parse the campaign select element.
try:
campaignSelect = seleniumDriver.find_element(By.NAME, "am_aktion_select")
campaignOptions = campaignSelect.find_elements(By.TAG_NAME, "option")
# This for-structure iterates over all option elements in the campaign select.
for copt in campaignOptions:
val = copt.get_attribute("value")
txt = copt.text.strip()
# This if-structure skips invalid or empty values.
if not val or val in [" |", "-1|", "|", "-1|", "0|"]:
continue
partsVal = val.split("|")
campaignId = partsVal[0].strip()
# This if-structure checks if a valid campaign ID was extracted.
if not campaignId:
continue
# This if-structure checks if the text has a dash that splits the campaign name.
if "-" in txt:
splitted = txt.split("-", 1)
campaignName = splitted[1].strip()
else:
campaignName = txt
campaignsList.append((campaignId, campaignName))
# This except-structure handles any error if the campaign select is not found.
except Exception as exception:
print(f"DEBUG: Warnung: Konnte Kampagnen nicht extrahieren: {exception}")
# This line returns the list of found campaigns.
return campaignsList
# This function scrapes the main data and writes it to various CSV files.
def scrapeData(seleniumManager):
# This variable holds the Selenium driver instance.
seleniumDriver = seleniumManager.driver
# This variable is a WebDriverWait object with a 60-second timeout.
wait = WebDriverWait(seleniumDriver, 60)
# This variable defines the path for the cache directory.
cacheDir = "../cache"
# This try-structure ensures that the cache directory is created.
try:
os.makedirs(cacheDir, exist_ok=True)
print(f"Info: Cache-Verzeichnis '{cacheDir}' sichergestellt.")
except OSError as exception:
print(f"Fehler beim Erstellen von Verzeichnis {cacheDir}: {exception}")
# This variable stores the path for the plans.csv file.
plansCsvFilePath = os.path.join(cacheDir, "plans.csv")
# This variable stores the path for the campaigns.csv file.
campaignsCsvFilePath = os.path.join(cacheDir, "campaigns.csv")
# This variable stores the path for the options.csv file.
optionsCsvFilePath = os.path.join(cacheDir, "options.csv")
# This variable stores the path for the categorys.csv file.
categorysCsvFilePath = os.path.join(cacheDir, "categorys.csv")
# This variable stores the path for the new partnercards.csv file.
partnercardsCsvFilePath = os.path.join(cacheDir, "partnercards.csv")
# Check if headers are needed for each file (only if file doesn't exist or is empty)
plans_needs_header = not os.path.exists(plansCsvFilePath) or os.path.getsize(plansCsvFilePath) == 0
campaigns_needs_header = not os.path.exists(campaignsCsvFilePath) or os.path.getsize(campaignsCsvFilePath) == 0
options_needs_header = not os.path.exists(optionsCsvFilePath) or os.path.getsize(optionsCsvFilePath) == 0
categorys_needs_header = not os.path.exists(categorysCsvFilePath) or os.path.getsize(categorysCsvFilePath) == 0
partnercards_needs_header = not os.path.exists(partnercardsCsvFilePath) or os.path.getsize(partnercardsCsvFilePath) == 0
# Pre-load existing category IDs to maintain uniqueness across runs.
if os.path.exists(categorysCsvFilePath):
print(f"INFO: Lese bestehende Kategorien aus '{categorysCsvFilePath}'...")
try:
with open(categorysCsvFilePath, mode='r', newline='', encoding='utf-8') as f:
reader = csv.reader(f, delimiter=';')
header = next(reader) # Skip header
for row in reader:
if row: # Make sure row is not empty
uniqueCategorySet.add(row[0]) # Add the ID from the first column
print(f"INFO: {len(uniqueCategorySet)} bestehende Kategorien geladen.")
except (StopIteration, IndexError):
print(f"WARNUNG: '{categorysCsvFilePath}' ist leer oder fehlerhaft. Set wird nicht vorab gefüllt.")
except Exception as e:
print(f"FEHLER beim Lesen von '{categorysCsvFilePath}': {e}")
# This variable is a set used to track which tariffs have been written to avoid duplicates.
writtenPlanIdSet = set()
# This with-structure opens all CSV files and prepares the writers.
with open(plansCsvFilePath, mode="a", newline="", encoding="utf-8") as plansFile, \
open(campaignsCsvFilePath, mode="a", newline="", encoding="utf-8") as campaignsFile, \
open(optionsCsvFilePath, mode="a", newline="", encoding="utf-8") as optionsFile, \
open(categorysCsvFilePath, mode="a", newline="", encoding="utf-8") as categorysFile, \
open(partnercardsCsvFilePath, mode="a", newline="", encoding="utf-8") as partnercardsFile:
# This variable is the CSV writer for plans.csv.
plansWriter = csv.writer(plansFile, delimiter=";")
# This variable is the CSV writer for campaigns.csv.
campaignsWriter = csv.writer(campaignsFile, delimiter=";")
# This variable is the CSV writer for options.csv.
optionsWriter = csv.writer(optionsFile, delimiter=";")
# This variable is the CSV writer for categorys.csv.
categorysWriter = csv.writer(categorysFile, delimiter=";")
# This variable is the CSV writer for partnercards.csv.
partnercardsWriter = csv.writer(partnercardsFile, delimiter=";")
# This line writes the header row for plans.csv.
if plans_needs_header:
plansWriter.writerow(["id", "provider", "network", "name", "price", "rahmen"])
# This line writes the header row for campaigns.csv.
if campaigns_needs_header:
campaignsWriter.writerow(["id", "plan", "name"])
# This line writes the header row for options.csv.
if options_needs_header:
optionsWriter.writerow(["id", "category", "plan", "name", "price"])
# This line writes the header row for categorys.csv.
if categorys_needs_header:
categorysWriter.writerow(["id", "name"])
# This line writes the header row for partnercards.csv.
if partnercards_needs_header:
partnercardsWriter.writerow(["id"])
# This function call adds a delay of 5 seconds before further steps.
time.sleep(5)
# This variable stores the list of available tariff worlds by reading the elements.
tarifWeltElements = seleniumDriver.find_elements(By.NAME, "tarif_welt")
tarifWelten = [elem.get_attribute("value") for elem in tarifWeltElements if elem.get_attribute("value")]
# This variable stores the list of available networks by reading the elements.
netzElements = seleniumDriver.find_elements(By.NAME, "netz")
netzList = [elem.get_attribute("value") for elem in netzElements if elem.get_attribute("value")]
# This try-structure attempts to select the product category 'A'.
try:
productCategoryElement = wait.until(EC.element_to_be_clickable((By.XPATH, '//input[@name="sel_produkt_kategorie" and @value="A"]')))
seleniumDriver.execute_script("arguments[0].click();", productCategoryElement)
except Exception as exception:
print(f"DEBUG: Konnte die Produktkategorie 'A' nicht auswählen: {exception}")
traceback.print_exc()
return # Exit if this fails
# This for-structure iterates over each available tariff world.
for tarifWelt in tarifWelten:
# This if-structure checks if the dropdown is ready.
if not waitForDropdownReady(seleniumDriver, wait):
print(f"DEBUG: Überspringe Tarifwelt {tarifWelt}, da die Seite nicht rechtzeitig bereit war.")
continue
# This try-structure attempts to click the radio button for the current tariff world.
try:
tarifWeltRadio = wait.until(EC.element_to_be_clickable((By.XPATH, f'//input[@name="tarif_welt" and @value="{tarifWelt}"]')))
seleniumDriver.execute_script("arguments[0].click();", tarifWeltRadio)
except Exception as exception:
print(f"DEBUG: FEHLER beim Auswählen der Tarifwelt {tarifWelt}: {exception}. Überspringe...")
traceback.print_exc()
continue
# This for-structure iterates over each possible network.
for net in netzList:
# This if-structure checks if the dropdown is still ready.
if not waitForDropdownReady(seleniumDriver, wait):
print(f"DEBUG: Überspringe Netz {net} in Tarifwelt {tarifWelt}, da die Seite nicht rechtzeitig bereit war.")
continue
# This try-structure attempts to click the radio button for the network.
try:
netRadio = wait.until(EC.element_to_be_clickable((By.XPATH, f'//input[@name="netz" and @value="{net}"]')))
seleniumDriver.execute_script("arguments[0].click();", netRadio)
except Exception as exception:
print(f"DEBUG: FEHLER beim Auswählen des Netzes {net} für Tarifwelt {tarifWelt}: {exception}. Überspringe...")
traceback.print_exc()
continue
# This if-structure checks again if the tariff dropdown is ready.
if not waitForDropdownReady(seleniumDriver, wait):
print(f"DEBUG: Überspringe Netz {net} in Tarifwelt {tarifWelt}, da Tarif-Dropdown nicht bereit war.")
continue
# This try-structure collects all tariff options from the dropdown for the current network.
try:
dropdown = wait.until(EC.presence_of_element_located((By.NAME, "tarif_id")))
selectObj = Select(dropdown)
optionsToProcess = [(opt.get_attribute("value"), opt.text.strip()) for opt in selectObj.options]
except Exception as exception:
print(f"DEBUG: FEHLER beim Sammeln der Tarifoptionen für Netz {net}, Tarifwelt {tarifWelt}: {exception}. Überspringe...")
traceback.print_exc()
continue
# This for-structure iterates over each tariff in the dropdown.
for tariffId, optText in optionsToProcess:
# This if-structure skips placeholder text and empty IDs.
if optText in ["Bitte wählen Sie aus...", ""] or not tariffId:
continue
# This if-structure checks if the tariff is already written for the current framework.
if tariffId in writtenPlanIdSet:
print(f"DEBUG: Tarif {tariffId} bereits in CSV, überspringe.")
continue
print(f"DEBUG: Verarbeite: {tariffId} - {net} - {optText}")
# This if-structure checks the dropdown readiness again before proceeding.
if not waitForDropdownReady(seleniumDriver, wait):
print(f"DEBUG: Überspringe Tarif {tariffId} ({optText}), da die Seite nicht rechtzeitig bereit war.")
continue
# This try-structure selects the appropriate tariff in the dropdown.
try:
currentDropdown = wait.until(EC.presence_of_element_located((By.NAME, "tarif_id")))
currentSelectObj = Select(currentDropdown)
currentOptionsValues = [o.get_attribute("value") for o in currentSelectObj.options]
if tariffId in currentOptionsValues:
currentSelectObj.select_by_value(tariffId)
else:
print(f"DEBUG: Warnung: Option mit Wert '{tariffId}' ({optText}) nicht mehr im Dropdown gefunden. Überspringe...")
continue
except Exception as exception:
print(f"DEBUG: Fehler beim Auswählen von Option '{optText}' für Tarif {tariffId}: {exception}")
traceback.print_exc()
continue
# This try-structure waits for the overlay to become invisible after the selection.
try:
WebDriverWait(seleniumDriver, timeout=60).until(EC.invisibility_of_element_located((By.ID, "bg_layer")))
except TimeoutException:
print(f"DEBUG: FEHLER: Timeout beim Warten auf bg_layer nach Auswahl von Tarif {tariffId}. Überspringe...")
continue
# This line waits briefly to stabilize the price display.
time.sleep(1.5)
# This variable holds the net price of the currently selected tariff.
planPriceNet = parsePlanPrice(seleniumDriver)
# This variable stores the list of all available campaigns for the tariff.
campaigns = parseCampaigns(seleniumDriver)
# This line writes the tariff record into plans.csv.
plansWriter.writerow([
tariffId,
tarifWelt,
net,
optText,
planPriceNet,
""
])
# This line writes the tariff ID to the partnercards.csv file.
partnercardsWriter.writerow([tariffId])
# This line marks the tariff as written to avoid duplication.
writtenPlanIdSet.add(tariffId)
# This for-structure writes each campaign to the campaigns.csv file.
for (campId, campName) in campaigns:
campaignsWriter.writerow([
campId,
tariffId,
campName
])
# This variable constructs the URL for the flyer PDF.
flyerPdfUrl = f"https://maui.mobilcom.de/vertragserfassung/show_pib_flyer.php?variant_id={tariffId}"
# This variable constructs the URL for the PIB PDF.
pibPdfUrl = flyerPdfUrl + "&pib"
# This try-structure attempts to download the PDF documents.
try:
downloadPdfSelenium(seleniumDriver, flyerPdfUrl, cacheDir, f"{tariffId}_flyer.pdf")
downloadPdfSelenium(seleniumDriver, pibPdfUrl, cacheDir, f"{tariffId}_pib.pdf")
except Exception as exception:
print(f"DEBUG: Fehler beim PDF-Download für Tarif {tariffId}: {exception}")
# This variable will store whether navigation to the options page succeeded.
navigationToOptionsSuccessful = False
# This try-structure attempts to navigate to the options page.
try:
print(f"DEBUG: Versuche zur Optionsseite zu navigieren für Tarif {tariffId}...")
wait.until(EC.presence_of_element_located((By.NAME, "mobildaten")))
seleniumDriver.execute_script("send_form(document.mobildaten, 'sim')")
navigationToOptionsSuccessful = True
print(f"DEBUG: Navigation zur Optionsseite vermutlich erfolgreich für Tarif {tariffId}.")
time.sleep(2)
except Exception as exception:
print(f"DEBUG: Fehler beim Navigieren zur Optionsseite für Tarif {tariffId}: {exception}")
traceback.print_exc()
continue
# This if-structure proceeds only if navigation to the options page was successful.
if navigationToOptionsSuccessful:
try:
print(f"DEBUG: Rufe scrapeOption für Tarif {tariffId} auf.")
scrapeOption(
seleniumManager,
tariffId,
optionsWriter,
categorysWriter
)
except Exception as exception:
print(f"DEBUG: Fehler während scrapeOption für Tarif {tariffId}: {exception}")
traceback.print_exc()
# This print call notifies that CSV files are flushed after processing each tariff.
print(f"DEBUG: Flushe CSV-Dateien nach Verarbeitung von Tarif {tariffId}.")
plansFile.flush()
campaignsFile.flush()
optionsFile.flush()
categorysFile.flush()
partnercardsFile.flush()
# This function scrapes the options for a given tariff on the options page.
def scrapeOption(seleniumManager, tariffId, optionsWriter, categorysWriter):
# This print call is a debug message indicating the start of scraping options.
print(f"DEBUG: scrapeOption gestartet für Tarif {tariffId}.")
# This variable references the Selenium driver from the SeleniumManager.
seleniumDriver = seleniumManager.driver
# This variable is a WebDriverWait with a 20-second timeout.
wait = WebDriverWait(seleniumDriver, 20)
# This try-structure attempts to parse the options page content.
try:
# This print call indicates waiting for the overlay to be invisible.
print(f"DEBUG: Warte auf Unsichtbarkeit von bg_layer für Tarif {tariffId}.")
WebDriverWait(seleniumDriver, timeout=60).until(EC.invisibility_of_element_located((By.ID, "bg_layer")))
# This print call indicates waiting for the 'tarifoptionen' form to be in the DOM.
print(f"DEBUG: Warte auf Formular 'tarifoptionen' für Tarif {tariffId}.")
wait.until(EC.presence_of_element_located((By.NAME, "tarifoptionen")))
# This print call indicates waiting for at least one table with class 'tb_back' to appear.
print(f"DEBUG: Warte auf Klasse 'tb_back' für Tarif {tariffId}.")
wait.until(EC.presence_of_element_located((By.CLASS_NAME, "tb_back")))
print(f"DEBUG: Optionsseite für Tarif {tariffId} scheint geladen zu sein.")
# This variable stores the entire HTML content of the current page.
htmlContent = seleniumDriver.page_source
# This variable is a BeautifulSoup object for parsing the page content.
soupObj = BeautifulSoup(htmlContent, "html.parser")
print(f"DEBUG: Rufe parse_options für Tarif {tariffId} auf.")
optionsData, categoryData = parseOptions(soupObj)
print(f"DEBUG: parse_options fand {len(optionsData)} Optionen und {len(categoryData)} Kategorien für Tarif {tariffId}.")
# This variable tracks how many new categories are written to the file.
catsWritten = 0
# This for-structure processes each category line found.
for catLine in categoryData:
parts = catLine.split(";", 1)
if len(parts) == 2:
catId = parts[0]
catName = parts[1]
global uniqueCategorySet
if catId not in uniqueCategorySet:
# This line writes the category data to categorys.csv.
categorysWriter.writerow([catId, catName])
uniqueCategorySet.add(catId)
catsWritten += 1
print(f"DEBUG: {catsWritten} neue Kategorien in CSV geschrieben für Tarif {tariffId}.")
# This variable tracks how many options are written to the file.
optsWritten = 0
# This for-structure processes each options line found.
for line in optionsData:
parts = line.split(";", 3)
if len(parts) == 4:
categoryRefId = parts[0]
itemId = parts[1]
itemName = parts[2]
priceStr = parts[3]
# This try-structure converts the price to float.
try:
grossPrice = float(priceStr)
except ValueError:
grossPrice = 0.0
# This variable calculates the net price from the gross price using 19% tax.
netPrice = round(grossPrice / 1.19, 5)
# This line writes the option data to the options.csv file.
optionsWriter.writerow([
itemId,
categoryRefId,
tariffId,
itemName,
netPrice
])
optsWritten += 1
print(f"DEBUG: {optsWritten} Optionen in CSV geschrieben für Tarif {tariffId}.")
# This except-structure logs any errors during parsing or writing.
except Exception as exception:
print(f"FEHLER in scrapeOption (Parsing/Writing) für Tarif {tariffId}: {exception}")
traceback.print_exc()
# This finally-structure attempts to navigate back to the main page after processing options.
finally:
# This variable sets how many navigation retries are allowed.
maxRetriesNav = 2
# This variable sets the delay between navigation retries.
retryDelayNav = 3
# This variable indicates whether the return navigation was successful.
backNavSuccessful = False
# This for-structure makes several attempts to return to the main page.
for attempt in range(maxRetriesNav):
try:
print(f"DEBUG: Versuche zurückzunavigieren von Optionsseite für Tarif {tariffId} (Versuch {attempt + 1}/{maxRetriesNav}).")
time.sleep(0.5)
WebDriverWait(seleniumDriver, 15).until(EC.presence_of_element_located((By.NAME, "tarifoptionen")))
seleniumDriver.execute_script("jump_2_container('Mobildaten')")
time.sleep(1.5)
WebDriverWait(seleniumDriver, 15).until(EC.presence_of_element_located((By.NAME, "tarif_id")))
backNavSuccessful = True
print(f"DEBUG: Rücknavigation von Optionsseite erfolgreich für Tarif {tariffId} (Versuch {attempt + 1}).")
break
except Exception as exception:
print(f"DEBUG: Warnung: Versuch {attempt + 1}/{maxRetriesNav} der Rücknavigation fehlgeschlagen für Tarif {tariffId}: {exception}")
if attempt < maxRetriesNav - 1:
time.sleep(retryDelayNav)
else:
print(f"DEBUG: Endgültige Warnung: Konnte nach {maxRetriesNav} Versuchen nicht von Tarif {tariffId} zurücknavigieren.")
# This if-structure logs a message if navigation back to the main page was not successful.
if not backNavSuccessful:
print(f"DEBUG: Rücknavigation von Tarif {tariffId} war nicht erfolgreich. Fortsetzung kann instabil sein.")
# This function parses the HTML for options and categories on the options page.
def parseOptions(soupObj):
# This variable is a list for storing all discovered options.
optionsResults = []
# This variable is a list for storing all discovered categories.
categoryResults = []
# This variable is a set to track category IDs that have already been added.
collectedCategoryIds = set()
# This variable is a regex for identifying group check inputs.
categoryCheckPattern = re.compile(r'service_code\[(G\d+)_check\]')
# This variable is a regex for identifying hidden group inputs.
categoryHiddenPattern = re.compile(r'service_code\[(G\d+)_check\]')
# This variable is a regex for identifying radio inputs in group fields.
categoryRadioPattern = re.compile(r'service_code\[(G\d+)\]')
# This variable is a regex for matching item IDs starting with G or O.
itemValuePattern = re.compile(r'^(G\d+|O\d+)$')
# This variable is a regex for matching monthly prices in the text.
pricePattern = re.compile(r'/\s*€\s*([\d.,]+)\s*monatlich', re.IGNORECASE)
# This variable is a regex for sub-select fields within a group.
subSelectPattern = re.compile(r"service_code\[(G\d+)_S\d+\]")
# This variable finds all main tables with class 'tb_back'.
allPotentialMainTables = soupObj.find_all("table", class_="tb_back")
print(f"DEBUG: parse_options: {len(allPotentialMainTables)} potenzielle Haupttabellen (tb_back) gefunden.")
# This for-structure processes each found main table.
for tbl in allPotentialMainTables:
catNameEl = tbl.find("td", class_="tb_head")
if not catNameEl:
continue
catTextRaw = catNameEl.get_text(strip=True)
catText = re.sub(r'\s+', ' ', catTextRaw.replace('\xa0', ' ')).strip()
if not catText:
continue
categoryId = None
catInputCheck = tbl.find("input", attrs={"name": categoryCheckPattern})
catInputHidden = tbl.find("input", type="hidden", attrs={"name": categoryHiddenPattern})
catInputRadio = tbl.find("input", type="radio", attrs={"name": categoryRadioPattern})
# This if-structure checks which type of input can provide the category ID.
if catInputCheck:
matchCheck = categoryCheckPattern.search(catInputCheck.get("name", ""))
if matchCheck:
categoryId = matchCheck.group(1)
elif catInputHidden:
matchHidden = categoryHiddenPattern.search(catInputHidden.get("name", ""))
if matchHidden:
categoryId = matchHidden.group(1)
elif catInputRadio:
matchRadio = categoryRadioPattern.search(catInputRadio.get("name", ""))
if matchRadio:
categoryId = matchRadio.group(1)
# This if-structure excludes irrelevant or unknown groups.
if not categoryId or catText in ["Sonstige Angaben", "Pflicht-Angaben"]:
continue
print(f"DEBUG: Verarbeite Optionsgruppe: {categoryId} - {catText}")
# This if-structure adds a new category to categoryResults if it has not been added yet.
if categoryId not in collectedCategoryIds:
categoryResults.append(f"{categoryId};{catText}")
collectedCategoryIds.add(categoryId)
# This variable finds potential sub-tables within the main table.
subTables = tbl.find_all("table", {"border": "0", "width": "520", "cellspacing": "0", "cellpadding": "4"})
if not subTables:
subTables = [tbl]
lastGId = None
# This for-structure processes each sub-table to find inputs and sub-selects.
for subTbl in subTables:
inp = subTbl.find("input", attrs={"value": itemValuePattern})
subSelect = subTbl.find("select", attrs={"name": subSelectPattern})
# This if-structure checks if a matching input field was found.
if inp:
itemId = inp.get("value", "").strip()
if not itemId:
continue
itemLabelTag = subTbl.find("a", attrs={"id": f"err_{itemId}"})
if not itemLabelTag:
itemLabelTag = subTbl.find("a", attrs={"name": f"err_{itemId}"})
itemName = "Unbekannt"
# This if-structure attempts to extract the label text if it exists.
if itemLabelTag and itemLabelTag.text.strip():
itemName = re.sub(r'\s+', ' ', itemLabelTag.text.strip())
else:
divBlock = subTbl.find("div", {"name": f"{itemId}_block"})
if divBlock:
linkInDiv = divBlock.find("a")
if linkInDiv and linkInDiv.text.strip():
itemName = re.sub(r'\s+', ' ', linkInDiv.text.strip())
# This if-structure continues if no valid name is found.
if itemName == "Unbekannt":
continue
combinedText = subTbl.get_text(" ", strip=True)
mPrice = pricePattern.search(combinedText)
priceStr = "0.0"
# This if-structure checks if a price was found in the text.
if mPrice:
rawPrice = mPrice.group(1)
normalized = rawPrice.replace(".", "").replace(",", ".")
try:
priceVal = float(normalized)
priceStr = f"{priceVal}"
except ValueError:
priceStr = "0.0"
# This if-structure differentiates between group items and normal options.
if itemId.startswith("G"):
optionsResults.append(f"{categoryId};{itemId};{itemName};{priceStr}")
lastGId = itemId
if itemId not in collectedCategoryIds:
categoryResults.append(f"{itemId};{itemName}")
collectedCategoryIds.add(itemId)
elif itemId.startswith("O"):
optionsResults.append(f"{categoryId};{itemId};{itemName};{priceStr}")
lastGId = None
# This if-structure handles sub-select elements if we have a stored group ID.
if subSelect and lastGId:
optionTags = subSelect.find_all("option", attrs={"value": re.compile(r"^O\d+$")})
for optTag in optionTags:
optId = optTag.get("value", "").strip()
if not optId:
continue
optText = optTag.get_text(strip=True)
if not optText or optText == "Bitte wählen Sie aus...":
continue
optPriceMatch = pricePattern.search(optText)
optPriceStr = "0.0"
if optPriceMatch:
rawPrice = optPriceMatch.group(1)
normalized = rawPrice.replace(".", "").replace(",", ".")
try:
priceVal = float(normalized)
optPriceStr = f"{priceVal}"
except ValueError:
optPriceStr = "0.0"
optText = pricePattern.sub('', optText).strip()
optText = re.sub(r'\s+/\s*$', '', optText).strip()
optionsResults.append(f"{lastGId};{optId};{optText};{optPriceStr}")
lastGId = None
# This variable deduplicates the options list.
uniqueOptions = list(set(optionsResults))
# This variable deduplicates the categories list.
uniqueCategoriesList = list(set(categoryResults))
print(f"DEBUG: parse_options: Gibt {len(uniqueOptions)} eindeutige Optionen und {len(uniqueCategoriesList)} eindeutige Kategorien zurück.")
return uniqueOptions, uniqueCategoriesList
# This function checks if a given group ID has a sub-select in the provided list of selects.
def hasSubSelectForId(gId, subSelects):
# This for-structure inspects each select element to see if it matches the group ID.
for s in subSelects:
sName = s.get("name", "")
if gId in sName:
return True
return False
# This variable initializes a SeleniumManager with a visible browser and the specified GeckoDriver path.
seleniumManager = SeleniumManager()
# This line logs in using the global credentials.
login(seleniumManager, MAUI_USERNAME, MAUI_PASSWORD, MAUI_AUTHCODE)
# This line opens the Partnercard page and fills the initial form.
openAndPreparePartnercard(seleniumManager)
# This line adds a short delay before scraping begins.
time.sleep(5)
# This line scrapes all data and writes to CSV files.
scrapeData(seleniumManager)
# This line waits 10 seconds before closing the driver.
time.sleep(10)
# This line closes the WebDriver at the end of the process.
seleniumManager.closeDriver()
# This print call indicates that scraping is complete.
print("Scraping abgeschlossen.")
\ No newline at end of file
......@@ -4,7 +4,7 @@ MYSQL_PASSWORD = "floz09sx3dTyx144gy"
MYSQL_DATABASE = "itmax_tarifs"
MYSQL_PORT = 3306
USE_SSH_TUNNEL = True
USE_SSH_TUNNEL = False
SSH_HOST = "jumphost.bugsmasher.online"
SSH_PORT = 22
SSH_USERNAME = "root"
......
# STEP 1 - IMPORT TARIFS FROM MAUI
0 3 * * * /maui/cron.sh downloadDataFromMaui.py
20 4 * * * /maui/cron.sh downloadDataFromMauiPartnercard.py
# STEP 2 - IMPORT DUMP TO DATABASE
30 4 * * * /maui/cron.sh importCacheToDatabase.py
40 4 * * * /maui/cron.sh importCacheToDatabase.py
# STEP 3 - UPLOAD FLYER TO AWS S3
0 5 * * * /maui/cron.sh uploadCacheToAwsS3.py
5 5 * * * /maui/cron.sh uploadCacheToAwsS3.py
# STEP 4 - GENERATE TARIF-DETAILS WITH GPT
0 5 * * * /maui/cron.sh calculateTarifDetailsWithGpt.py
5 5 * * * /maui/cron.sh calculateTarifDetailsWithGpt.py
# STEP 5 - GENERATE AI PRICES WITH GPT
30 5 * * * /maui/cron.sh calculateAiPrices.py
......@@ -9,6 +9,7 @@ https://s3.eu-central-1.amazonaws.com/monosnap.bugsmasher.online/marcoschmiedel/
https://s3.eu-central-1.amazonaws.com/monosnap.bugsmasher.online/marcoschmiedel/2024-05-13.m4v
https://s3.eu-central-1.amazonaws.com/monosnap.bugsmasher.online/marcoschmiedel/2025-06-03.m4v
https://s3.eu-central-1.amazonaws.com/monosnap.bugsmasher.online/marcoschmiedel/2025-06-23.m4v
https://s3.eu-central-1.amazonaws.com/monosnap.bugsmasher.online/marcoschmiedel/2025-07-22.m4v
## JupyterLab
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment