Lotto2PY/6aus49APP/app.py
2025-10-19 17:29:26 +02:00

291 lines
No EOL
10 KiB
Python

# /opt/lotto/app.py
import os
from datetime import date, datetime
from typing import List, Optional
from fastapi import FastAPI, Query, Request, HTTPException
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from pydantic import BaseModel
from sqlalchemy import create_engine, text
from sqlalchemy.engine import Engine, Row
from sqlalchemy.pool import NullPool
from dotenv import load_dotenv
# --------------------------------------------------------
# 0) Konfiguration laden
# --------------------------------------------------------
load_dotenv()
DATABASE_URL = os.getenv("DATABASE_URL")
if not DATABASE_URL:
raise RuntimeError("DATABASE_URL nicht gesetzt (.env prüfen)")
PAGE_SIZE = int(os.getenv("PAGE_SIZE", "10"))
# --------------------------------------------------------
# 1) App und DB initialisieren
# --------------------------------------------------------
engine: Engine = create_engine(DATABASE_URL, poolclass=NullPool, future=True)
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
STATIC_DIR = os.path.join(BASE_DIR, "static")
TEMPLATE_DIR = os.path.join(BASE_DIR, "templates")
app = FastAPI(title="Lotto Ziehungen API")
app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static")
templates = Jinja2Templates(directory=TEMPLATE_DIR)
# --------------------------------------------------------
# 2) Modelle (Pydantic)
# --------------------------------------------------------
class Draw(BaseModel):
datum: Optional[date]
z1: Optional[int]
z2: Optional[int]
z3: Optional[int]
z4: Optional[int]
z5: Optional[int]
z6: Optional[int]
sz: Optional[int]
sz1: Optional[int]
sz2: Optional[int]
super6: Optional[str]
spiel77: Optional[str]
class DrawList(BaseModel):
total: int
items: List[Draw]
# --------------------------------------------------------
# 3) Hilfsfunktionen
# --------------------------------------------------------
def normalize_date_sql(column: str) -> str:
return (
f"COALESCE("
f" (CASE WHEN {column} REGEXP '^[0-9]{{4}}-[0-9]{{2}}-[0-9]{{2}}$' THEN DATE({column}) END),"
f" STR_TO_DATE({column}, '%d.%m.%Y'),"
f" STR_TO_DATE(SUBSTRING_INDEX({column}, '/', -1), '%d.%m.%y')"
f")"
)
def row_to_draw(row: Row) -> Draw:
return Draw(**row)
def _to_date(s: Optional[str]) -> Optional[date]:
if not s:
return None
t = s.strip()
if not t:
return None
try:
return date.fromisoformat(t)
except ValueError:
pass
for fmt in ("%d.%m.%Y", "%d.%m.%y"):
try:
return datetime.strptime(t, fmt).date()
except ValueError:
continue
return None
# --------------------------------------------------------
# 4) Routen
# --------------------------------------------------------
@app.get("/", response_class=HTMLResponse)
async def index(request: Request):
return templates.TemplateResponse(
"index.html", {"request": request, "page_size": PAGE_SIZE}
)
# -------------------- API: Ziehungen (JSON) ---------------------
@app.get("/api/draws", response_model=DrawList)
async def list_draws(
game: str = Query("6aus49", pattern="^(6aus49|euro)$"),
date_from: Optional[date] = Query(None),
date_to: Optional[date] = Query(None),
limit: int = Query(PAGE_SIZE, ge=1, le=500),
offset: int = Query(0, ge=0),
order: str = Query("desc", pattern="^(asc|desc)$"),
):
tbl = "`6aus49`" if game == "6aus49" else "`euro`"
dx = normalize_date_sql(f"{tbl}.datum")
if game == "6aus49":
base_select = f"""
SELECT {dx} AS datum,
z1, z2, z3, z4, z5, z6, sz,
CAST(super6 AS CHAR) AS super6,
CAST(spiel77 AS CHAR) AS spiel77,
NULL AS sz1, NULL AS sz2
FROM {tbl}
"""
else:
base_select = f"""
SELECT {dx} AS datum,
z1, z2, z3, z4, z5,
NULL AS z6, NULL AS sz,
NULL AS super6, NULL AS spiel77,
sz1, sz2
FROM {tbl}
"""
where_parts = []
params = {}
if date_from:
where_parts.append("datum >= :date_from")
params["date_from"] = date_from
if date_to:
where_parts.append("datum <= :date_to")
params["date_to"] = date_to
where_sql = "WHERE " + " AND ".join(where_parts) if where_parts else ""
order_sql = "ORDER BY datum DESC" if order == "desc" else "ORDER BY datum ASC"
count_sql = f"SELECT COUNT(*) AS cnt FROM ({base_select}) AS t {where_sql}"
data_sql = f"SELECT * FROM ({base_select}) AS t {where_sql} {order_sql} LIMIT :limit OFFSET :offset"
with engine.begin() as conn:
total = conn.execute(text(count_sql), params).scalar_one()
rows = conn.execute(text(data_sql), {**params, "limit": limit, "offset": offset}).mappings().all()
items = [row_to_draw(r) for r in rows]
return DrawList(total=total, items=items)
# -------------------- API: Detail (JSON) ------------------------
@app.get("/api/draw/{game}/{draw_date}", response_model=Draw)
async def get_draw(game: str, draw_date: date):
tbl = "`6aus49`" if game == "6aus49" else "`euro`"
dx = normalize_date_sql(f"{tbl}.datum")
if game == "6aus49":
sql = text(f"""
SELECT {dx} AS datum,
z1, z2, z3, z4, z5, z6, sz,
CAST(super6 AS CHAR) AS super6,
CAST(spiel77 AS CHAR) AS spiel77,
NULL AS sz1, NULL AS sz2
FROM {tbl}
WHERE {dx} = :d
""")
else:
sql = text(f"""
SELECT {dx} AS datum,
z1, z2, z3, z4, z5,
NULL AS z6, NULL AS sz,
NULL AS super6, NULL AS spiel77,
sz1, sz2
FROM {tbl}
WHERE {dx} = :d
""")
with engine.begin() as conn:
row = conn.execute(sql, {"d": draw_date}).mappings().first()
if not row:
raise HTTPException(status_code=404, detail="Ziehung nicht gefunden")
return row_to_draw(row)
# -------------------- UI / HTMX (HTML) -------------------------
@app.get("/ui/draws", response_class=HTMLResponse)
async def ui_draws(
request: Request,
game: str = Query("6aus49", pattern="^(6aus49|euro)$"),
date_from: Optional[str] = Query(None),
date_to: Optional[str] = Query(None),
limit: int = Query(PAGE_SIZE, ge=1, le=500),
offset: int = Query(0, ge=0),
order: str = Query("desc", pattern="^(asc|desc)$"),
):
d_from = _to_date(date_from)
d_to = _to_date(date_to)
result = await list_draws(game, d_from, d_to, limit, offset, order) # type: ignore
rows_html = []
if game == "6aus49":
header = (
"<tr><th>Datum</th><th>Zahlen</th><th>Superzahl</th><th>Super 6</th><th>Spiel 77</th></tr>"
)
for r in result.items:
numbers = " ".join(
f"<span class='badge'>{getattr(r, f'z{i}')}</span>"
for i in range(1, 7)
if getattr(r, f"z{i}") is not None
)
sz_html = "" if r.sz is None else r.sz
rows_html.append(
f"<tr><td>{r.datum or ''}</td><td class='numbers'>{numbers}</td>"
f"<td><b>{sz_html}</b></td><td>{r.super6 or ''}</td><td>{r.spiel77 or ''}</td></tr>"
)
else:
header = "<tr><th>Datum</th><th>Zahlen</th><th>Super 1</th><th>Super 2</th></tr>"
for r in result.items:
numbers = " ".join(
f"<span class='badge'>{getattr(r, f'z{i}')}</span>"
for i in range(1, 6)
if getattr(r, f"z{i}") is not None
)
sz1_html = "" if r.sz1 is None else r.sz1
sz2_html = "" if r.sz2 is None else r.sz2
rows_html.append(
f"<tr><td>{r.datum or ''}</td><td class='numbers'>{numbers}</td>"
f"<td><b>{sz1_html}</b></td><td><b>{sz2_html}</b></td></tr>"
)
html = (
f"<p class='muted'>Treffer: {result.total}</p>"
f"<table role='grid'><thead>{header}</thead><tbody>{''.join(rows_html)}</tbody></table>"
)
return HTMLResponse(html)
# -------------------- UI: Header-Kugeln (inkl. Datum) -------------------------
@app.get("/ui/header", response_class=HTMLResponse)
async def ui_header(
game: str = Query("6aus49", pattern="^(6aus49|euro)$"),
date_from: Optional[str] = Query(None),
date_to: Optional[str] = Query(None),
order: str = Query("desc", pattern="^(asc|desc)$"),
):
d_from = _to_date(date_from)
d_to = _to_date(date_to)
result = await list_draws(game, d_from, d_to, limit=1, offset=0, order=order) # type: ignore
if not result.items:
return HTMLResponse("<div class='balls' aria-hidden='true'></div>")
r = result.items[0]
date_label = r.datum.strftime("%d.%m.%Y") if r.datum else ""
if game == "6aus49":
nums = " ".join(
f"<div class='ball'>{getattr(r, f'z{i}')}</div>"
for i in range(1, 7)
if getattr(r, f"z{i}") is not None
)
sz_html = "" if r.sz is None else f"<div class='ball super-ball'><b>{r.sz}</b></div>"
html = f"""
<div class="balls" title="Letzte Ziehung {date_label}">
{nums}{sz_html}
<span class="label">Stand: {date_label}</span>
</div>
"""
else:
nums = " ".join(
f"<div class='ball'>{getattr(r, f'z{i}')}</div>"
for i in range(1, 6)
if getattr(r, f"z{i}") is not None
)
s1 = "" if r.sz1 is None else f"<div class='star'>{r.sz1}</div>"
s2 = "" if r.sz2 is None else f"<div class='star'>{r.sz2}</div>"
html = f"""
<div class="balls" title="Letzte Ziehung {date_label}">
{nums}{s1}{s2}
<span class="label">Stand: {date_label}</span>
</div>
"""
return HTMLResponse(html)
# -------------------- Healthcheck --------------------------
@app.get("/health")
def health():
return {"status": "ok"}