#!/usr/bin/env python3 #2 #git -C ~/ING pull from pathlib import Path from datetime import datetime import json import csv import sys from openpyxl import Workbook, load_workbook import subprocess from shutil import which, copy2 import platform import logging from openpyxl.styles import Font # -------------------------------------------------- # Logging # -------------------------------------------------- if platform.system() == "Windows": log_dir = Path(__file__).parent / "logs" else: log_dir = Path.home() / "logs" log_dir.mkdir(exist_ok=True) log_file = log_dir / "ing.log" logging.basicConfig( filename=log_file, level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", ) logging.info( "[categorize] === Scriptstart ===" ) # -------------------------------------------------- # Kalenderwoche bestimmen # -------------------------------------------------- if len(sys.argv) == 3: year = int(sys.argv[1]) week = int(sys.argv[2]) else: year, week, _ = datetime.now().isocalendar() # -------------------------------------------------- # Kategorien laden # -------------------------------------------------- categories_file = Path(__file__).parent / "Kategorien.json" with open(categories_file, encoding="utf-8") as f: categories = json.load(f) # -------------------------------------------------- # Verzeichnisse bestimmen # -------------------------------------------------- BASE_DIR = Path(__file__).resolve().parent base_dir = BASE_DIR / "Transaktionen" year_dir = base_dir / str(year) source_json = ( year_dir / "json" / f"transactions_{year}_KW{week:02d}.json" ) categorized_csv_dir = year_dir / "categorized_csv" categorized_json_dir = year_dir / "categorized_json" summary_dir = year_dir / "summary" jahresauswertung_dir = year_dir / "jahresauswertung" categorized_csv_dir.mkdir(parents=True, exist_ok=True) categorized_json_dir.mkdir(parents=True, exist_ok=True) summary_dir.mkdir(parents=True, exist_ok=True) jahresauswertung_dir.mkdir(parents=True, exist_ok=True) csv_file = ( categorized_csv_dir / f"categorized_transactions_{year}_KW{week:02d}.csv" ) json_file = ( categorized_json_dir / f"categorized_transactions_{year}_KW{week:02d}.json" ) summary_file = ( summary_dir / f"category_summary_{year}_KW{week:02d}.json" ) excel_file = ( jahresauswertung_dir / f"Kontobewegungen_{year}.xlsx" ) # -------------------------------------------------- # Prüfen ob Quelldatei existiert # -------------------------------------------------- if not source_json.exists(): logging.error( f"[categorize] Transaktionsdatei nicht gefunden: {source_json}" ) sys.exit(1) # -------------------------------------------------- # Transaktionen laden # -------------------------------------------------- with open(source_json, encoding="utf-8") as f: transactions = json.load(f) # -------------------------------------------------- # Kategorien vorbereiten # -------------------------------------------------- category_totals = {} for category in categories.keys(): category_totals[category] = { "income": 0.0, "expenses": 0.0, "saldo": 0.0 } category_totals["Unbekannt"] = { "income": 0.0, "expenses": 0.0, "saldo": 0.0 } categorized_transactions = [] # -------------------------------------------------- # Transaktionen kategorisieren # -------------------------------------------------- for transaction in transactions: search_text = ( f"{transaction.get('applicant_name', '')} " f"{transaction.get('purpose', '')} " f"{transaction.get('posting_text', '')}" ).lower() category_found = "Unbekannt" for category, keywords in categories.items(): found = False for keyword in keywords: if keyword.lower() in search_text: category_found = category found = True break if found: break categorized_transaction = transaction.copy() categorized_transaction["category"] = category_found categorized_transactions.append(categorized_transaction) amount = float(transaction.get("amount", 0)) if amount >= 0: category_totals[category_found]["income"] += amount else: category_totals[category_found]["expenses"] += abs(amount) category_totals[category_found]["saldo"] += amount # -------------------------------------------------- # Kategorien runden # -------------------------------------------------- for category in category_totals: category_totals[category]["income"] = round( category_totals[category]["income"], 2 ) category_totals[category]["expenses"] = round( category_totals[category]["expenses"], 2 ) category_totals[category]["saldo"] = round( category_totals[category]["saldo"], 2 ) # -------------------------------------------------- # CSV exportieren # -------------------------------------------------- with open(csv_file, "w", newline="", encoding="utf-8") as csvfile: writer = csv.DictWriter( csvfile, fieldnames=[ "date", "amount", "category", "posting_text", "applicant_name", "purpose" ], delimiter=";" ) csv_export = [] for row in categorized_transactions: csv_row = row.copy() csv_row["amount"] = ( f"{float(row['amount']):.2f}" .replace(".", ",") ) csv_export.append(csv_row) writer.writeheader() writer.writerows(csv_export) # -------------------------------------------------- # JSON exportieren # -------------------------------------------------- with open(json_file, "w", encoding="utf-8") as outfile: json.dump( categorized_transactions, outfile, ensure_ascii=False, indent=2 ) # -------------------------------------------------- # Jahres-Excel aktualisieren # -------------------------------------------------- if excel_file.exists(): wb = load_workbook(excel_file) else: wb = Workbook() if "Sheet" in wb.sheetnames: wb.remove(wb["Sheet"]) sheet_name = f"KW{week:02d}" # Vorhandenes Blatt ersetzen if sheet_name in wb.sheetnames: wb.remove(wb[sheet_name]) ws = wb.create_sheet(sheet_name) # -------------------------------------------------- # Kennzahlen # -------------------------------------------------- total_income = sum( values["income"] for values in category_totals.values() ) total_expenses = sum( values["expenses"] for values in category_totals.values() ) total_saldo = sum( values["saldo"] for values in category_totals.values() ) ws.append(["Kennzahl", "Wert"]) ws.append(["Transaktionen", len(categorized_transactions)]) ws.append(["Einnahmen", total_income]) ws.append(["Ausgaben", total_expenses]) ws.append(["Saldo", total_saldo]) ws.append([]) # -------------------------------------------------- # Kategorien # -------------------------------------------------- ws.append([ "Kategorie", "Einnahmen", "Ausgaben", "Saldo" ]) for category, values in sorted(category_totals.items()): ws.append([ category, values["income"], values["expenses"], values["saldo"] ]) ws.append([]) ws.append([]) # -------------------------------------------------- # Kontobewegungen # -------------------------------------------------- ws.append(["Kontobewegungen"]) ws.append([ "Datum", "Betrag", "Kategorie", "Name", "Verwendungszweck" ]) for transaction in categorized_transactions: ws.append([ transaction["date"], transaction["amount"], transaction["category"], transaction["applicant_name"], transaction["purpose"] ]) # -------------------------------------------------- # Formatierung KW-Blatt # -------------------------------------------------- # Kennzahlen-Überschrift for cell in ws[1]: cell.font = Font(bold=True) # Kategorien-Überschrift suchen for row in ws.iter_rows(): if row[0].value == "Kategorie": for cell in row: cell.font = Font(bold=True) break # Kontobewegungen-Überschrift suchen for row in ws.iter_rows(): if row[0].value == "Datum": for cell in row: cell.font = Font(bold=True) break # -------------------------------------------------- # Übersicht aktualisieren # -------------------------------------------------- if "Übersicht" in wb.sheetnames: wb.remove(wb["Übersicht"]) overview = wb.create_sheet("Übersicht", 0) overview.append([ "KW", "Einnahmen", "Ausgaben", "Saldo" ]) # Überschrift fett for cell in overview[1]: cell.font = Font(bold=True) for sheet in sorted( [s for s in wb.sheetnames if s.startswith("KW")] ): ws_kw = wb[sheet] income = ws_kw["B3"].value expenses = ws_kw["B4"].value saldo = ws_kw["B5"].value overview.append([ sheet, income, expenses, saldo ]) # -------------------------------------------------- # Spaltenbreiten automatisch anpassen # -------------------------------------------------- for worksheet in wb.worksheets: for column in worksheet.columns: max_length = 0 column_letter = column[0].column_letter for cell in column: try: if cell.value: max_length = max( max_length, len(str(cell.value)) ) except Exception: pass worksheet.column_dimensions[ column_letter ].width = min(max_length + 2, 60) # -------------------------------------------------- # Excel speichern # -------------------------------------------------- wb.save(excel_file) # -------------------------------------------------- # Kategorien-Summen exportieren # -------------------------------------------------- with open(summary_file, "w", encoding="utf-8") as outfile: json.dump( category_totals, outfile, ensure_ascii=False, indent=2 ) # -------------------------------------------------- # Ausgabe # -------------------------------------------------- logging.info(f"[categorize] Jahr : {year}") logging.info(f"[categorize] Kalenderwoche : {week}") logging.info( f"[categorize] Transaktionen : {len(categorized_transactions)}" ) logging.info("----------------------------------------") logging.info("Summen pro Kategorie") logging.info("----------------------------------------") for category, values in sorted(category_totals.items()): logging.info( f"[categorize] Kategorie={category}; " f"[categorize] Einnahmen={values['income']:.2f}; " f"[categorize] Ausgaben={values['expenses']:.2f}; " f"[categorize] Saldo={values['saldo']:.2f}" ) logging.info(f"[categorize] CSV : {csv_file}") logging.info(f"[categorize] JSON : {json_file}") logging.info(f"[categorize] Summary : {summary_file}") logging.info(f"[categorize] Excel : {excel_file}") # -------------------------------------------------- # Synchronisation # -------------------------------------------------- if which("rclone"): try: subprocess.run( [ "rclone", "copy", str(excel_file), f"nextcloud:Bank/{year}" ], check=True ) logging.info("[categorize] Excel nach Nextcloud synchronisiert.") except Exception as e: logging.error(f"[categorize] rclone Fehler: {e}") elif platform.system() == "Windows": try: target_dir = ( Path.home() / "Nextcloud" / "Bank" / str(year) ) target_dir.mkdir( parents=True, exist_ok=True ) target_file = ( target_dir / excel_file.name ) copy2( excel_file, target_file ) logging.info( f"[categorize] Nextcloud-Synchronisation erfolgreich: {target_file}" ) except Exception as e: logging.error( f"[categorize] Nextcloud-Kopie fehlgeschlagen: {e}" ) else: logging.warning( "[categorize] Keine Synchronisationsmethode gefunden." ) logging.info( "[categorize] === Scriptende ===" )