ING/app.py
2026-06-25 17:29:25 +02:00

1037 lines
No EOL
21 KiB
Python

from flask import Flask, render_template, request
import subprocess
import sys
import json
from flask import redirect, send_file
from urllib.request import urlopen
from pathlib import Path
from influxdb_client import InfluxDBClient
#from datetime import datetime, timedelta
from collections import defaultdict
from datetime import datetime
import shutil
from shutil import copy2
import logging
import platform
import os
# --------------------------------------------------
# Logging
# --------------------------------------------------
if platform.system() == "Windows":
log_dir = Path(__file__).parent / "logs"
else:
log_dir = Path.home() / "logs"
log_dir.mkdir(exist_ok=True)
log_file = log_dir / "ing.log"
# Hauptlogger für Datei
logging.basicConfig(
filename=log_file,
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
# Eigener Logger für Kategorien
category_logger = logging.getLogger("categories")
# --------------------------------------------------
# Flask/Werkzeug nur auf Konsole ausgeben
# --------------------------------------------------
werkzeug_logger = logging.getLogger("werkzeug")
# Keine Weitergabe an den Root-Logger
werkzeug_logger.propagate = False
# Vorhandene Handler entfernen
werkzeug_logger.handlers.clear()
# Ausgabe nur auf Konsole
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging.INFO)
console_formatter = logging.Formatter(
"%(asctime)s [%(levelname)s] %(message)s"
)
console_handler.setFormatter(console_formatter)
werkzeug_logger.addHandler(console_handler)
werkzeug_logger.setLevel(logging.INFO)
# --------------------------------------------------
# Startmeldung
# --------------------------------------------------
logging.info(
"[app.py] === Scriptstart ==="
)
app = Flask(__name__)
BASE_DIR = Path(__file__).parent
CATEGORIES_FILE = BASE_DIR / "Kategorien.json"
config = {}
config_file = Path.home() / ".ing.conf"
if not config_file.exists():
config_file = Path(__file__).parent / ".ing.conf"
with open(config_file, encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
if "=" not in line:
continue
key, value = line.split("=", 1)
config[key] = value
IOBROKER_HOST = config["IOBROKER_HOST"]
IOBROKER_PORT = config["IOBROKER_PORT"]
IOBROKER_DP = config["IOBROKER_DP"]
INFLUX_URL = config.get("INFLUX_URL")
INFLUX_ORG = config.get("INFLUX_ORG")
INFLUX_BUCKET = config.get("INFLUX_BUCKET")
INFLUX_TOKEN = config.get("INFLUX_TOKEN")
BALANCE_YELLOW_DAY = int(
config["BALANCE_YELLOW_DAY"]
)
BALANCE_YELLOW_LIMIT = float(
config["BALANCE_YELLOW_LIMIT"]
)
BALANCE_RED_DAY = int(
config["BALANCE_RED_DAY"]
)
BALANCE_RED_LIMIT = float(
config["BALANCE_RED_LIMIT"]
)
def get_current_week_data():
year, week, _ = datetime.now().isocalendar()
summary_file = (
BASE_DIR
/ "Transaktionen"
/ str(year)
/ "summary"
/ f"category_summary_{year}_KW{week:02d}.json"
)
transactions_file = (
BASE_DIR
/ "Transaktionen"
/ str(year)
/ "categorized_json"
/ f"categorized_transactions_{year}_KW{week:02d}.json"
)
summary = None
transactions = []
unknown_transactions = []
total_income = 0
total_expenses = 0
total_saldo = 0
if summary_file.exists():
with open(
summary_file,
encoding="utf-8"
) as f:
summary = json.load(f)
for values in summary.values():
total_income += values["income"]
total_expenses += values["expenses"]
total_saldo += values["saldo"]
if transactions_file.exists():
with open(
transactions_file,
encoding="utf-8"
) as f:
transactions = json.load(f)
for transaction in transactions:
if (
transaction.get("category")
== "Unbekannt"
):
unknown_transactions.append(
transaction
)
return (
year,
week,
summary,
transactions,
unknown_transactions,
total_income,
total_expenses,
total_saldo
)
def get_balance():
try:
url = (
f"http://{IOBROKER_HOST}:{IOBROKER_PORT}"
f"/getPlainValue/{IOBROKER_DP}"
)
value = (
urlopen(url)
.read()
.decode("utf-8")
.strip()
)
return float(value)
except Exception as e:
print(
f"Kontostand konnte nicht gelesen werden: {e}"
)
return None
from datetime import datetime
def get_balance_color(balance):
if balance is None:
return "green"
today = datetime.now().day
if today >= BALANCE_RED_DAY:
if balance < BALANCE_RED_LIMIT:
return "red"
if balance < BALANCE_YELLOW_LIMIT:
return "orange"
return "green"
if today >= BALANCE_YELLOW_DAY:
if balance < BALANCE_YELLOW_LIMIT:
return "orange"
return "green"
def get_cron_jobs():
jobs = {
"balance": "",
"transactions": "",
"categorize": ""
}
if platform.system() != "Linux":
jobs["balance"] = "nur unter Linux verfügbar"
jobs["transactions"] = "nur unter Linux verfügbar"
jobs["categorize"] = "nur unter Linux verfügbar"
return jobs
result = subprocess.run(
["crontab", "-l"],
capture_output=True,
text=True
)
for line in result.stdout.splitlines():
line = line.strip()
if not line:
continue
if line.startswith("#"):
continue
cron = " ".join(line.split()[:5])
if line.endswith("categorize_transactions.py"):
jobs["categorize"] = cron
elif line.endswith("transactions.py"):
jobs["transactions"] = cron
elif line.endswith("balance.py"):
jobs["balance"] = cron
return jobs
def get_balance_comparison():
if not all([
INFLUX_URL,
INFLUX_ORG,
INFLUX_BUCKET,
INFLUX_TOKEN
]):
return {}
client = InfluxDBClient(
url=INFLUX_URL,
token=INFLUX_TOKEN,
org=INFLUX_ORG
)
query = f'''
from(bucket: "{INFLUX_BUCKET}")
|> range(start: -180d)
|> filter(fn: (r) => r._measurement == "Kontostand")
|> filter(fn: (r) => r._field == "value")
'''
result = client.query_api().query(query)
raw_data = []
for table in result:
for record in table.records:
raw_data.append({
"time": record.get_time(),
"value": round(record.get_value(), 2)
})
raw_data.sort(
key=lambda x: x["time"]
)
periods = defaultdict(dict)
for item in raw_data:
dt = item["time"]
if dt.day >= 15:
period_start = datetime(
dt.year,
dt.month,
15
)
else:
if dt.month == 1:
period_start = datetime(
dt.year - 1,
12,
15
)
else:
period_start = datetime(
dt.year,
dt.month - 1,
15
)
day_index = (
dt.date() -
period_start.date()
).days + 1
periods[period_start][day_index] = item["value"]
sorted_periods = sorted(
periods.keys(),
reverse=True
)
result_data = {}
names = [
"current",
"period_1",
"period_2",
"period_3",
"period_4"
]
for idx, period in enumerate(sorted_periods[:5]):
data = []
days = periods[period]
last_value = None
for day in range(1, 32):
if day in days:
last_value = days[day]
if last_value is not None:
data.append({
"day": day,
"value": last_value
})
result_data[names[idx]] = data
return result_data
@app.context_processor
def inject_css_version():
return {
"css_version": int(
os.path.getmtime(
BASE_DIR / "static" / "style.css"
)
)
}
@app.route("/")
def index():
css_version = int(
os.path.getmtime(
BASE_DIR / "static" / "style.css"
)
)
(
year,
week,
summary,
transactions,
unknown_transactions,
total_income,
total_expenses,
total_saldo
) = get_current_week_data()
balance = get_balance()
balance_color = get_balance_color(
balance
)
balance_history = get_balance_comparison()
#print(balance_history)
today = datetime.now()
if today.day >= 15:
current_day = today.day - 14
else:
current_day = today.day + 17
return render_template(
"index.html",
year=year,
week=week,
summary=summary,
transactions=transactions,
unknown_transactions=unknown_transactions,
total_income=total_income,
total_expenses=total_expenses,
total_saldo=total_saldo,
balance=balance,
css_version=css_version,
balance_color=balance_color,
current_day=current_day,
balance_history=balance_history
)
@app.route("/test-influx")
def test_influx():
data = get_balance_history()
return {
"count": len(data),
"first": data[0] if data else None,
"last": data[-1] if data else None
}
@app.route("/log/download")
def download_log():
return send_file(
log_file,
as_attachment=True,
download_name="ing.log"
)
@app.route("/maintenance/run/<script>")
def run_script(script):
if platform.system() != "Linux":
return redirect(
"/maintenance?message=Nur unter Linux verfügbar"
)
scripts = {
"balance":
"/home/banking/ING/balance.py",
"transactions":
"/home/banking/ING/transactions.py",
"categorize":
"/home/banking/ING/categorize_transactions.py"
}
if script not in scripts:
return redirect("/maintenance")
subprocess.Popen(
["python3", scripts[script]]
)
return redirect(
"/maintenance?message=Script gestartet"
)
@app.route("/log/delete")
def delete_log():
if log_file.exists():
backup_file = (
log_file.parent
/ "ing.log.bak"
)
copy2(
log_file,
backup_file
)
with open(
log_file,
"w",
encoding="utf-8"
) as f:
f.write(
f"{datetime.now()} "
"[INFO] Logdatei gelöscht, Backup erstellt.\n"
)
return redirect("/log")
@app.route("/run/balance", methods=["POST"])
def run_balance():
subprocess.run(
[sys.executable, str(BASE_DIR / "balance.py")]
)
return redirect(
"/maintenance?message=Kontostand erfolgreich aktualisiert"
)
@app.route("/run/transactions", methods=["POST"])
def run_transactions():
year = request.form.get("year", "").strip()
week = request.form.get("week", "").strip()
command = [
sys.executable,
str(BASE_DIR / "transactions.py")
]
if year and week:
command.extend([year, week])
result = subprocess.run(
command,
capture_output=True,
text=True
)
transaction_count = 0
try:
transactions_file = (
BASE_DIR
/ "Transaktionen"
/ str(year)
/ "json"
/ f"transactions_{year}_KW{int(week):02d}.json"
)
if transactions_file.exists():
with open(
transactions_file,
encoding="utf-8"
) as f:
transaction_count = len(
json.load(f)
)
except Exception:
pass
return redirect(
f"/maintenance?message={transaction_count} Transaktionen geladen"
)
@app.route("/run/categorize", methods=["POST"])
def run_categorize():
year = request.form.get("year", "").strip()
week = request.form.get("week", "").strip()
command = [
sys.executable,
str(BASE_DIR / "categorize_transactions.py")
]
if year and week:
command.extend([year, week])
result = subprocess.run(
command,
capture_output=True,
text=True
)
categorized_count = 0
try:
categorized_file = (
BASE_DIR
/ "Transaktionen"
/ str(year)
/ "categorized_json"
/ f"categorized_transactions_{year}_KW{int(week):02d}.json"
)
if categorized_file.exists():
with open(
categorized_file,
encoding="utf-8"
) as f:
categorized_count = len(
json.load(f)
)
except Exception:
pass
return redirect(
f"/maintenance?message={categorized_count} Buchungen kategorisiert"
)
@app.route("/run/weekly_update", methods=["POST"])
def run_weekly_update():
year = request.form.get("year", "").strip()
week = request.form.get("week", "").strip()
transaction_command = [
sys.executable,
str(BASE_DIR / "transactions.py")
]
categorize_command = [
sys.executable,
str(BASE_DIR / "categorize_transactions.py")
]
if year and week:
transaction_command.extend(
[year, week]
)
categorize_command.extend(
[year, week]
)
subprocess.run(
transaction_command
)
subprocess.run(
categorize_command
)
transaction_count = 0
try:
transactions_file = (
BASE_DIR
/ "Transaktionen"
/ str(year)
/ "json"
/ f"transactions_{year}_KW{int(week):02d}.json"
)
if transactions_file.exists():
with open(
transactions_file,
encoding="utf-8"
) as f:
transaction_count = len(
json.load(f)
)
except Exception:
pass
message = (
f"{transaction_count} Transaktionen verarbeitet - "
f"KW {week}/{year}"
)
return redirect(
f"/maintenance?message={message}"
)
@app.route("/categories")
def categories():
with open(
CATEGORIES_FILE,
encoding="utf-8"
) as f:
categories = json.load(f)
#print(categories)
return render_template(
"categories.html",
categories=categories
)
@app.route("/categories/delete/<category>")
def delete_category(category):
with open(
CATEGORIES_FILE,
encoding="utf-8"
) as f:
categories = json.load(f)
if category in categories:
del categories[category]
with open(
CATEGORIES_FILE,
"w",
encoding="utf-8"
) as f:
if CATEGORIES_FILE.exists():
shutil.copy2(
CATEGORIES_FILE,
str(CATEGORIES_FILE) + ".bak"
)
logging.warning(
f"Kategorien werden gespeichert. "
f"Anzahl Kategorien={len(categories)} "
f"Keys={list(categories.keys())}"
)
json.dump(
categories,
f,
ensure_ascii=False,
indent=2
)
return redirect("/categories")
@app.route(
"/categories/add",
methods=["POST"]
)
def add_category():
category = request.form["category"].strip()
with open(
CATEGORIES_FILE,
encoding="utf-8"
) as f:
categories = json.load(f)
if category not in categories:
categories[category] = []
with open(
CATEGORIES_FILE,
"w",
encoding="utf-8"
) as f:
if CATEGORIES_FILE.exists():
shutil.copy2(
CATEGORIES_FILE,
str(CATEGORIES_FILE) + ".bak"
)
logging.warning(
f"Kategorien werden gespeichert. "
f"Anzahl Kategorien={len(categories)} "
f"Keys={list(categories.keys())}"
)
json.dump(
categories,
f,
ensure_ascii=False,
indent=2
)
return redirect("/categories")
@app.route("/categories/edit/<category>")
def edit_category(category):
with open(
CATEGORIES_FILE,
encoding="utf-8"
) as f:
categories = json.load(f)
if category not in categories:
return redirect("/categories")
return render_template(
"edit_category.html",
category=category,
words=sorted(categories[category])
)
@app.route(
"/categories/add_word/<category>",
methods=["POST"]
)
def add_word(category):
word = request.form["word"].strip()
with open(
CATEGORIES_FILE,
encoding="utf-8"
) as f:
categories = json.load(f)
if (
category in categories
and word
and word not in categories[category]
):
categories[category].append(word)
with open(
CATEGORIES_FILE,
"w",
encoding="utf-8"
) as f:
if CATEGORIES_FILE.exists():
shutil.copy2(
CATEGORIES_FILE,
str(CATEGORIES_FILE) + ".bak"
)
logging.warning(
f"Kategorien werden gespeichert. "
f"Anzahl Kategorien={len(categories)} "
f"Keys={list(categories.keys())}"
)
json.dump(
categories,
f,
ensure_ascii=False,
indent=2
)
return redirect(
f"/categories/edit/{category}"
)
@app.route(
"/categories/delete_word/<category>/<word>"
)
def delete_word(category, word):
with open(
CATEGORIES_FILE,
encoding="utf-8"
) as f:
categories = json.load(f)
if (
category in categories
and word in categories[category]
):
categories[category].remove(word)
with open(
CATEGORIES_FILE,
"w",
encoding="utf-8"
) as f:
if CATEGORIES_FILE.exists():
shutil.copy2(
CATEGORIES_FILE,
str(CATEGORIES_FILE) + ".bak"
)
logging.warning(
f"Kategorien werden gespeichert. "
f"Anzahl Kategorien={len(categories)} "
f"Keys={list(categories.keys())}"
)
json.dump(
categories,
f,
ensure_ascii=False,
indent=2
)
return redirect(
f"/categories/edit/{category}"
)
def parse_cron_jobs(cron_jobs):
schedules = {}
for name, cron in cron_jobs.items():
schedules[name] = {
"cron": cron,
"minute": "",
"hour": "",
"interval": "",
"days": []
}
if (
not cron
or "Linux" in cron
):
continue
try:
minute, hour, day, month, weekday = (
cron.split()
)
schedules[name]["minute"] = minute
if hour.startswith("*/"):
schedules[name]["interval"] = (
hour.replace(
"*/",
""
)
)
else:
schedules[name]["hour"] = hour
if "-" in weekday:
start, end = weekday.split("-")
schedules[name]["days"] = list(
range(
int(start),
int(end) + 1
)
)
else:
schedules[name]["days"] = [
int(weekday)
]
except Exception:
pass
return schedules
@app.route("/maintenance")
def maintenance():
year, week, _ = datetime.now().isocalendar()
message = request.args.get(
"message",
""
)
cron_jobs = get_cron_jobs()
schedules = parse_cron_jobs(
cron_jobs
)
print(schedules)
return render_template(
"maintenance.html",
year=year,
week=week,
cron_jobs=cron_jobs,
schedules=schedules,
message=message
)
@app.route("/log")
def show_log():
if sys.platform.startswith("win"):
log_file = BASE_DIR / "logs" / "ing.log"
else:
log_file = Path.home() / "logs" / "ing.log"
if not log_file.exists():
return render_template(
"log.html",
log_content="Logdatei nicht gefunden."
)
with open(
log_file,
encoding="utf-8",
errors="replace"
) as f:
lines = f.readlines()
log_content = "".join(lines[-200:])
return render_template(
"log.html",
log_content=log_content
)
if __name__ == "__main__":
app.run(
host="0.0.0.0",
port=5000,
debug=True
)