ING/categorize_transactions.py

553 lines
No EOL
12 KiB
Python

#!/usr/bin/env python3
#2
#git -C ~/ING pull
from pathlib import Path
from datetime import datetime
import json
import csv
import sys
from openpyxl import Workbook, load_workbook
import subprocess
from shutil import which, copy2
import platform
import logging
from openpyxl.styles import Font
# --------------------------------------------------
# Logging
# --------------------------------------------------
if platform.system() == "Windows":
log_dir = Path(__file__).parent / "logs"
else:
log_dir = Path.home() / "logs"
log_dir.mkdir(exist_ok=True)
log_file = log_dir / "ing.log"
logging.basicConfig(
filename=log_file,
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
logging.info(
"[categorize] === Scriptstart ==="
)
# --------------------------------------------------
# Kalenderwoche bestimmen
# --------------------------------------------------
if len(sys.argv) == 3:
year = int(sys.argv[1])
week = int(sys.argv[2])
else:
year, week, _ = datetime.now().isocalendar()
# --------------------------------------------------
# Kategorien laden
# --------------------------------------------------
categories_file = Path(__file__).parent / "Kategorien.json"
with open(categories_file, encoding="utf-8") as f:
categories = json.load(f)
# --------------------------------------------------
# Verzeichnisse bestimmen
# --------------------------------------------------
BASE_DIR = Path(__file__).resolve().parent
base_dir = BASE_DIR / "Transaktionen"
year_dir = base_dir / str(year)
source_json = (
year_dir
/ "json"
/ f"transactions_{year}_KW{week:02d}.json"
)
categorized_csv_dir = year_dir / "categorized_csv"
categorized_json_dir = year_dir / "categorized_json"
summary_dir = year_dir / "summary"
jahresauswertung_dir = year_dir / "jahresauswertung"
categorized_csv_dir.mkdir(parents=True, exist_ok=True)
categorized_json_dir.mkdir(parents=True, exist_ok=True)
summary_dir.mkdir(parents=True, exist_ok=True)
jahresauswertung_dir.mkdir(parents=True, exist_ok=True)
csv_file = (
categorized_csv_dir
/ f"categorized_transactions_{year}_KW{week:02d}.csv"
)
json_file = (
categorized_json_dir
/ f"categorized_transactions_{year}_KW{week:02d}.json"
)
summary_file = (
summary_dir
/ f"category_summary_{year}_KW{week:02d}.json"
)
excel_file = (
jahresauswertung_dir
/ f"Kontobewegungen_{year}.xlsx"
)
# --------------------------------------------------
# Prüfen ob Quelldatei existiert
# --------------------------------------------------
if not source_json.exists():
logging.error(
f"[categorize] Transaktionsdatei nicht gefunden: {source_json}"
)
sys.exit(1)
# --------------------------------------------------
# Transaktionen laden
# --------------------------------------------------
with open(source_json, encoding="utf-8") as f:
transactions = json.load(f)
# --------------------------------------------------
# Kategorien vorbereiten
# --------------------------------------------------
category_totals = {}
for category in categories.keys():
category_totals[category] = {
"income": 0.0,
"expenses": 0.0,
"saldo": 0.0
}
category_totals["Unbekannt"] = {
"income": 0.0,
"expenses": 0.0,
"saldo": 0.0
}
categorized_transactions = []
# --------------------------------------------------
# Transaktionen kategorisieren
# --------------------------------------------------
for transaction in transactions:
search_text = (
f"{transaction.get('applicant_name', '')} "
f"{transaction.get('purpose', '')} "
f"{transaction.get('posting_text', '')}"
).lower()
category_found = "Unbekannt"
for category, keywords in categories.items():
found = False
for keyword in keywords:
if keyword.lower() in search_text:
category_found = category
found = True
break
if found:
break
categorized_transaction = transaction.copy()
categorized_transaction["category"] = category_found
categorized_transactions.append(categorized_transaction)
amount = float(transaction.get("amount", 0))
if amount >= 0:
category_totals[category_found]["income"] += amount
else:
category_totals[category_found]["expenses"] += abs(amount)
category_totals[category_found]["saldo"] += amount
# --------------------------------------------------
# Kategorien runden
# --------------------------------------------------
for category in category_totals:
category_totals[category]["income"] = round(
category_totals[category]["income"],
2
)
category_totals[category]["expenses"] = round(
category_totals[category]["expenses"],
2
)
category_totals[category]["saldo"] = round(
category_totals[category]["saldo"],
2
)
# --------------------------------------------------
# CSV exportieren
# --------------------------------------------------
with open(csv_file, "w", newline="", encoding="utf-8") as csvfile:
writer = csv.DictWriter(
csvfile,
fieldnames=[
"date",
"amount",
"category",
"posting_text",
"applicant_name",
"purpose"
],
delimiter=";"
)
csv_export = []
for row in categorized_transactions:
csv_row = row.copy()
csv_row["amount"] = (
f"{float(row['amount']):.2f}"
.replace(".", ",")
)
csv_export.append(csv_row)
writer.writeheader()
writer.writerows(csv_export)
# --------------------------------------------------
# JSON exportieren
# --------------------------------------------------
with open(json_file, "w", encoding="utf-8") as outfile:
json.dump(
categorized_transactions,
outfile,
ensure_ascii=False,
indent=2
)
# --------------------------------------------------
# Jahres-Excel aktualisieren
# --------------------------------------------------
if excel_file.exists():
wb = load_workbook(excel_file)
else:
wb = Workbook()
if "Sheet" in wb.sheetnames:
wb.remove(wb["Sheet"])
sheet_name = f"KW{week:02d}"
# Vorhandenes Blatt ersetzen
if sheet_name in wb.sheetnames:
wb.remove(wb[sheet_name])
ws = wb.create_sheet(sheet_name)
# --------------------------------------------------
# Kennzahlen
# --------------------------------------------------
total_income = sum(
values["income"]
for values in category_totals.values()
)
total_expenses = sum(
values["expenses"]
for values in category_totals.values()
)
total_saldo = sum(
values["saldo"]
for values in category_totals.values()
)
ws.append(["Kennzahl", "Wert"])
ws.append(["Transaktionen", len(categorized_transactions)])
ws.append(["Einnahmen", total_income])
ws.append(["Ausgaben", total_expenses])
ws.append(["Saldo", total_saldo])
ws.append([])
# --------------------------------------------------
# Kategorien
# --------------------------------------------------
ws.append([
"Kategorie",
"Einnahmen",
"Ausgaben",
"Saldo"
])
for category, values in sorted(category_totals.items()):
ws.append([
category,
values["income"],
values["expenses"],
values["saldo"]
])
ws.append([])
ws.append([])
# --------------------------------------------------
# Kontobewegungen
# --------------------------------------------------
ws.append(["Kontobewegungen"])
ws.append([
"Datum",
"Betrag",
"Kategorie",
"Name",
"Verwendungszweck"
])
for transaction in categorized_transactions:
ws.append([
transaction["date"],
transaction["amount"],
transaction["category"],
transaction["applicant_name"],
transaction["purpose"]
])
# --------------------------------------------------
# Formatierung KW-Blatt
# --------------------------------------------------
# Kennzahlen-Überschrift
for cell in ws[1]:
cell.font = Font(bold=True)
# Kategorien-Überschrift suchen
for row in ws.iter_rows():
if row[0].value == "Kategorie":
for cell in row:
cell.font = Font(bold=True)
break
# Kontobewegungen-Überschrift suchen
for row in ws.iter_rows():
if row[0].value == "Datum":
for cell in row:
cell.font = Font(bold=True)
break
# --------------------------------------------------
# Übersicht aktualisieren
# --------------------------------------------------
if "Übersicht" in wb.sheetnames:
wb.remove(wb["Übersicht"])
overview = wb.create_sheet("Übersicht", 0)
overview.append([
"KW",
"Einnahmen",
"Ausgaben",
"Saldo"
])
# Überschrift fett
for cell in overview[1]:
cell.font = Font(bold=True)
for sheet in sorted(
[s for s in wb.sheetnames if s.startswith("KW")]
):
ws_kw = wb[sheet]
income = ws_kw["B3"].value
expenses = ws_kw["B4"].value
saldo = ws_kw["B5"].value
overview.append([
sheet,
income,
expenses,
saldo
])
# --------------------------------------------------
# Spaltenbreiten automatisch anpassen
# --------------------------------------------------
for worksheet in wb.worksheets:
for column in worksheet.columns:
max_length = 0
column_letter = column[0].column_letter
for cell in column:
try:
if cell.value:
max_length = max(
max_length,
len(str(cell.value))
)
except Exception:
pass
worksheet.column_dimensions[
column_letter
].width = min(max_length + 2, 60)
# --------------------------------------------------
# Excel speichern
# --------------------------------------------------
wb.save(excel_file)
# --------------------------------------------------
# Kategorien-Summen exportieren
# --------------------------------------------------
with open(summary_file, "w", encoding="utf-8") as outfile:
json.dump(
category_totals,
outfile,
ensure_ascii=False,
indent=2
)
# --------------------------------------------------
# Ausgabe
# --------------------------------------------------
logging.info(f"[categorize] Jahr : {year}")
logging.info(f"[categorize] Kalenderwoche : {week}")
logging.info(
f"[categorize] Transaktionen : {len(categorized_transactions)}"
)
logging.info("----------------------------------------")
logging.info("Summen pro Kategorie")
logging.info("----------------------------------------")
for category, values in sorted(category_totals.items()):
logging.info(
f"[categorize] Kategorie={category}; "
f"[categorize] Einnahmen={values['income']:.2f}; "
f"[categorize] Ausgaben={values['expenses']:.2f}; "
f"[categorize] Saldo={values['saldo']:.2f}"
)
logging.info(f"[categorize] CSV : {csv_file}")
logging.info(f"[categorize] JSON : {json_file}")
logging.info(f"[categorize] Summary : {summary_file}")
logging.info(f"[categorize] Excel : {excel_file}")
# --------------------------------------------------
# Synchronisation
# --------------------------------------------------
if which("rclone"):
try:
subprocess.run(
[
"rclone",
"copy",
str(excel_file),
f"nextcloud:Bank/{year}"
],
check=True
)
logging.info("[categorize] Excel nach Nextcloud synchronisiert.")
except Exception as e:
logging.error(f"[categorize] rclone Fehler: {e}")
elif platform.system() == "Windows":
try:
target_dir = (
Path.home()
/ "Nextcloud"
/ "Bank"
/ str(year)
)
target_dir.mkdir(
parents=True,
exist_ok=True
)
target_file = (
target_dir
/ excel_file.name
)
copy2(
excel_file,
target_file
)
logging.info(
f"[categorize] Nextcloud-Synchronisation erfolgreich: {target_file}"
)
except Exception as e:
logging.error(
f"[categorize] Nextcloud-Kopie fehlgeschlagen: {e}"
)
else:
logging.warning(
"[categorize] Keine Synchronisationsmethode gefunden."
)
logging.info(
"[categorize] === Scriptende ==="
)