from PIL import Image, ImageDraw, ImageFont import numpy as np import time import socket import fcntl import struct import traceback import urllib.request from zoneinfo import ZoneInfo from influxdb_client import InfluxDBClient from gpiozero import Button # ------------------------------------------------- # GPIO Schalter # ------------------------------------------------- SWITCH1 = Button( 5, pull_up=True, bounce_time=0.1 ) SWITCH2 = Button( 21, pull_up=True, bounce_time=0.1 ) # ------------------------------------------------- # InfluxDB V2 # ------------------------------------------------- INFLUX_URL = "http://10.0.1.134:9086" INFLUX_TOKEN = "QSFzWhbpLI71fPiSfINDmtI3YkO4PgdTjyen_zxLzUe45vaVlUbAB04gV75wZjrfWV0WTIGQTv1F3_G0mTdIgQ==" INFLUX_ORG = "iobroker" INFLUX_BUCKET = "hintergasse" # ------------------------------------------------- # Display # ------------------------------------------------- WIDTH = 480 HEIGHT = 320 FB = "/dev/fb0" # ------------------------------------------------- # Fonts # ------------------------------------------------- font_title = ImageFont.truetype( "/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", 22 ) font_small = ImageFont.truetype( "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", 18 ) font_small_bold = ImageFont.truetype( "/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", 18 ) font_download = ImageFont.truetype( "/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", 46 ) font_upload = ImageFont.truetype( "/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", 34 ) font_delta = ImageFont.truetype( "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", 16 ) # ------------------------------------------------- # Lokale WLAN-IP # ------------------------------------------------- def get_ip(): interfaces = ["wlan0", "wlp2s0", "wlan1"] for interface in interfaces: try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) ip = socket.inet_ntoa( fcntl.ioctl( s.fileno(), 0x8915, struct.pack( '256s', interface[:15].encode('utf-8') ) )[20:24] ) return ip except: pass return "Keine WLAN-IP" # ------------------------------------------------- # Öffentliche WAN-IP # ------------------------------------------------- def get_wan_ip(): try: with urllib.request.urlopen( "https://api.ipify.org", timeout=5 ) as response: return response.read().decode("utf-8") except: return "Keine WAN-IP" # ------------------------------------------------- # Ping aus InfluxDB lesen # ------------------------------------------------- def get_ping(): try: client = InfluxDBClient( url=INFLUX_URL, token=INFLUX_TOKEN, org=INFLUX_ORG ) query_api = client.query_api() query = f''' from(bucket: "{INFLUX_BUCKET}") |> range(start: -24h) |> filter(fn: (r) => r["_measurement"] == "speedtest") |> filter(fn: (r) => r["_field"] == "ping") |> last() ''' tables = query_api.query(query) for table in tables: for record in table.records: return float(record.get_value()) except: pass return -1 def get_us24_temp(): try: return int(get_url_value( "http://10.0.1.122:8087/getPlainValue/unifi.2.default.devices.f4:92:bf:79:ce:8b.general_temperature" )) except: return 0 # ------------------------------------------------- # Verlauf der letzten Downloads # ------------------------------------------------- def get_influx_value(measurement): try: client = InfluxDBClient( url=INFLUX_URL, token=INFLUX_TOKEN, org=INFLUX_ORG ) query_api = client.query_api() query = f''' from(bucket: "{INFLUX_BUCKET}") |> range(start: -24h) |> filter(fn: (r) => r["_measurement"] == "{measurement}") |> filter(fn: (r) => r["_field"] == "value") |> group() |> sort(columns: ["_time"]) |> tail(n:1) ''' tables = query_api.query(query) for table in tables: for record in table.records: return float(record.get_value()) except: pass return 0 def get_download_history(): try: client = InfluxDBClient( url=INFLUX_URL, token=INFLUX_TOKEN, org=INFLUX_ORG ) query_api = client.query_api() query = f''' from(bucket: "{INFLUX_BUCKET}") |> range(start: -24h) |> filter(fn: (r) => r["_measurement"] == "speedtest") |> filter(fn: (r) => r["_field"] == "download_bits") |> group() |> sort(columns: ["_time"]) ''' tables = query_api.query(query) values = [] for table in tables: for record in table.records: value = record.get_value() if value is not None: values.append( value / 1000 / 1000 ) return values[-24:] except Exception: print(traceback.format_exc()) return [] def get_unifi_clients(): try: with urllib.request.urlopen( "http://10.0.1.122:8087/getPlainValue/unifi.2.default.health.lan.num_user", timeout=3 ) as response: lan = int(response.read().decode().strip()) with urllib.request.urlopen( "http://10.0.1.122:8087/getPlainValue/unifi.2.default.health.wlan.num_user", timeout=3 ) as response: wlan = int(response.read().decode().strip()) return lan + wlan except Exception as e: print("UniFi Client Fehler:", e) return 0 def get_url_value(url): try: with urllib.request.urlopen(url, timeout=3) as response: value = response.read().decode().strip() # ioBroker liefert manche Werte als "2.3" oder "721371" value = value.replace('"', '') return value except Exception as e: print("URL Fehler:", e) return "0" def get_unifi_stats(): try: lan = int(get_url_value( "http://10.0.1.122:8087/getPlainValue/unifi.2.default.health.lan.num_user" )) wlan = int(get_url_value( "http://10.0.1.122:8087/getPlainValue/unifi.2.default.health.wlan.num_user" )) cpu = float( get_url_value( "http://10.0.1.122:8087/getPlainValue/unifi.2.default.devices.9c:05:d6:52:52:19.system-stats.cpu" ).replace('"', '') ) ram = float( get_url_value( "http://10.0.1.122:8087/getPlainValue/unifi.2.default.devices.9c:05:d6:52:52:19.system-stats.mem" ).replace('"', '') ) uptime = int(get_url_value( "http://10.0.1.122:8087/getPlainValue/unifi.2.default.devices.9c:05:d6:52:52:19.system-stats.uptime" )) return { "lan": lan, "wlan": wlan, "clients": lan + wlan, "cpu": cpu, "ram": ram, "uptime": uptime } except Exception as e: print("UniFi Fehler:", e) return { "lan": 0, "wlan": 0, "clients": 0, "cpu": 0, "ram": 0, "uptime": 0 } def format_uptime(seconds): days = seconds // 86400 hours = (seconds % 86400) // 3600 return f"{days}d {hours}h" def get_proxmox_load(): try: load_string = get_url_value( "http://10.0.1.122:8087/getPlainValue/linux-control.0.119_Proxmox.Load" ) parts = load_string.split() return ( float(parts[0]), float(parts[1]), float(parts[2]) ) except Exception as e: print("Load Fehler:", e) return (0, 0, 0) def get_us24_cpu(): try: return float(get_url_value( "http://10.0.1.122:8087/getPlainValue/unifi.2.default.devices.f4:92:bf:79:ce:8b.system-stats.cpu" )) except: return 0 def get_us24_ram(): try: return float(get_url_value( "http://10.0.1.122:8087/getPlainValue/unifi.2.default.devices.f4:92:bf:79:ce:8b.system-stats.mem" )) except: return 0 def get_display_enabled(): try: value = get_url_value( "http://10.0.1.122:8087/getPlainValue/javascript.0.Variablen.Server_Display" ) return value.lower() == "true" except Exception as e: print(f"Displaysteuerung nicht erreichbar: {e}") return True # ------------------------------------------------- # Speedtest aus InfluxDB lesen # ------------------------------------------------- def get_speedtest(): try: client = InfluxDBClient( url=INFLUX_URL, token=INFLUX_TOKEN, org=INFLUX_ORG ) query_api = client.query_api() # DOWNLOAD query_down = f''' from(bucket: "{INFLUX_BUCKET}") |> range(start: -24h) |> filter(fn: (r) => r["_measurement"] == "speedtest") |> filter(fn: (r) => r["_field"] == "download_bits") |> group() |> sort(columns: ["_time"]) |> tail(n: 2) ''' result_down = query_api.query(query_down) down = 0 down_prev = 0 timestamp = "Keine Daten" download_values = [] for table in result_down: for record in table.records: download_values.append( record.get_value() ) local_time = record.get_time().astimezone( ZoneInfo("Europe/Berlin") ) if len(download_values) >= 1: down = download_values[-1] if len(download_values) >= 2: down_prev = download_values[-2] now = time.localtime() today = ( local_time.year == now.tm_year and local_time.month == now.tm_mon and local_time.day == now.tm_mday ) yesterday_ts = time.time() - 86400 yesterday = time.localtime(yesterday_ts) is_yesterday = ( local_time.year == yesterday.tm_year and local_time.month == yesterday.tm_mon and local_time.day == yesterday.tm_mday ) if today: timestamp = local_time.strftime( "HEUTE %H:%M" ) elif is_yesterday: timestamp = local_time.strftime( "GESTERN %H:%M" ) else: timestamp = local_time.strftime( "%d.%m.%Y %H:%M" ) # UPLOAD query_up = f''' from(bucket: "{INFLUX_BUCKET}") |> range(start: -24h) |> filter(fn: (r) => r["_measurement"] == "speedtest") |> filter(fn: (r) => r["_field"] == "upload_bits") |> group() |> sort(columns: ["_time"]) |> tail(n: 2) ''' result_up = query_api.query(query_up) up = 0 up_prev = 0 upload_values = [] for table in result_up: for record in table.records: upload_values.append( record.get_value() ) if len(upload_values) >= 1: up = upload_values[-1] if len(upload_values) >= 2: up_prev = upload_values[-2] # bit/s -> Mbit/s down = down / 1000 / 1000 up = up / 1000 / 1000 down_prev = down_prev / 1000 / 1000 up_prev = up_prev / 1000 / 1000 return down, up, down_prev, up_prev, timestamp except Exception: print(traceback.format_exc()) return 0, 0, 0, 0, "Influx Fehler" # ------------------------------------------------- # Verlaufsgrafik # ------------------------------------------------- def draw_graph(draw, values, x, y, w, h): if len(values) < 2: return max_val = max(values) if max_val <= 0: return draw.line( (x, y + h // 2, x + w, y + h // 2), fill=(40, 40, 40) ) draw.line( (x, y + h, x + w, y + h), fill=(60, 60, 60) ) points = [] for i, val in enumerate(values): px = x + int(i * (w / (len(values) - 1))) py = y + h - int((val / max_val) * h) points.append((px, py)) draw.line(points, fill="green", width=2) # ------------------------------------------------- # Dashboard 1 # ------------------------------------------------- def draw_dashboard_1(): down, up, down_prev, up_prev, ts = get_speedtest() ping = get_ping() history = get_download_history() img = Image.new("RGB", (WIDTH, HEIGHT), "black") draw = ImageDraw.Draw(img) # Farben if down < 50: down_color = "red" elif down < 90: down_color = "yellow" else: down_color = "green" if up < 10: up_color = "red" elif up < 30: up_color = "yellow" else: up_color = "green" if ping < 0: ping_color = "red" elif ping > 100: ping_color = "red" elif ping > 50: ping_color = "yellow" else: ping_color = "green" # Header draw.text( (10, 5), "Hintergasse 9A", fill="white", font=font_title ) current_time = time.strftime("%H:%M") bbox = draw.textbbox( (0, 0), current_time, font=font_title ) time_width = bbox[2] - bbox[0] draw.text( (WIDTH - time_width - 10, 5), current_time, fill="white", font=font_title ) draw.text( (10, 30), f"WLAN: {get_ip()}", fill="cyan", font=font_small ) draw.text( (240, 30), f"WAN: {get_wan_ip()}", fill="orange", font=font_small_bold ) draw.line( (10, 60, WIDTH - 10, 60), fill="white" ) # DOWNLOAD draw.text( (20, 95), "DOWNLOAD", fill="green", font=font_small ) draw.text( (20, 120), f"{down:.1f} Mbit/s", fill=down_color, font=font_download ) draw.text( (25, 172), f"({down_prev:.1f} Mbit/s)", fill=(140, 140, 140), font=font_delta ) # PING draw.text( (300, 95), f"PING: {ping:.1f} ms", fill=ping_color, font=font_small_bold ) # UPLOAD draw.text( (20, 210), "UPLOAD", fill="orange", font=font_small ) draw.text( (20, 235), f"{up:.1f} Mbit/s", fill=up_color, font=font_upload ) draw.text( (25, 278), f"({up_prev:.1f} Mbit/s)", fill=(140, 140, 140), font=font_delta ) # Verlaufsgrafik draw.text( (250, 190), "24H VERLAUF", fill="gray", font=font_small ) draw_graph( draw, history, 250, 215, 200, 80 ) # Footer draw.line( (10, HEIGHT - 20, WIDTH - 10, HEIGHT - 20), fill="white" ) draw.text( (10, HEIGHT - 18), f"Messung: {ts}", fill="gray", font=font_small ) return img # ------------------------------------------------- # Dashboard 2 # ------------------------------------------------- def draw_dashboard_2(): stats = get_unifi_stats() us24_cpu = get_us24_cpu() us24_ram = get_us24_ram() us24_temp = get_us24_temp() img = Image.new("RGB", (WIDTH, HEIGHT), "black") draw = ImageDraw.Draw(img) # Header draw.text( (10, 5), "Hintergasse 9A", fill="white", font=font_title ) current_time = time.strftime("%H:%M") bbox = draw.textbbox( (0, 0), current_time, font=font_title ) time_width = bbox[2] - bbox[0] draw.text( (WIDTH - time_width - 10, 5), current_time, fill="white", font=font_title ) draw.text( (10, 30), f"WLAN: {get_ip()}", fill="cyan", font=font_small ) draw.text( (240, 30), f"WAN: {get_wan_ip()}", fill="orange", font=font_small_bold ) draw.line( (10, 60, WIDTH - 10, 60), fill="white" ) # Temperaturfarbe if us24_temp < 50: temp_color = "lime" elif us24_temp < 65: temp_color = "yellow" else: temp_color = "red" # Titel title = "Hubobel's Netzwerk" bbox = draw.textbbox( (0, 0), title, font=font_title ) title_width = bbox[2] - bbox[0] draw.text( ((WIDTH - title_width) // 2, 75), title, fill="cyan", font=font_title ) # Uptime uptime_text = format_uptime(stats["uptime"]) bbox = draw.textbbox( (0, 0), uptime_text, font=font_small_bold ) uptime_width = bbox[2] - bbox[0] draw.text( ((WIDTH - uptime_width) // 2, 105), uptime_text, fill="orange", font=font_small_bold ) # Clients draw.text( (20, 140), "CLIENTS", fill="white", font=font_small_bold ) draw.text( (220, 140), f"{stats['clients']} ({stats['lan']} / {stats['wlan']})", fill="lime", font=font_small_bold ) # UXG CPU draw.text( (20, 175), "UXG CPU", fill="white", font=font_small_bold ) draw.text( (220, 175), f"{stats['cpu']:.1f} %", fill="yellow", font=font_small_bold ) # UXG RAM draw.text( (20, 205), "UXG RAM", fill="white", font=font_small_bold ) draw.text( (220, 205), f"{stats['ram']:.1f} %", fill="cyan", font=font_small_bold ) # US24 CPU draw.text( (20, 235), "US24 CPU", fill="white", font=font_small_bold ) draw.text( (220, 235), f"{us24_cpu:.1f} %", fill="yellow", font=font_small_bold ) # US24 RAM draw.text( (20, 265), "US24 RAM", fill="white", font=font_small_bold ) draw.text( (220, 265), f"{us24_ram:.1f} %", fill="cyan", font=font_small_bold ) # US24 Temperatur draw.text( (20, 295), "US24 TEMP", fill="white", font=font_small_bold ) draw.text( (220, 295), f"{us24_temp} °C", fill=temp_color, font=font_small_bold ) return img # ------------------------------------------------- # Dashboard 3 # ------------------------------------------------- def draw_dashboard_3(): cpu_freq = get_influx_value("Proxmox_Frequ") cpu_load = get_influx_value("Proxmox_CPU_Auslastung") ram_load = get_influx_value("Proxmox_RAM_Auslastung") cpu_temp = get_influx_value("Proxmox_Temp") img = Image.new("RGB", (WIDTH, HEIGHT), "black") server_temp = get_influx_value("Temp_Server_oben") load1, load5, load15 = get_proxmox_load() draw = ImageDraw.Draw(img) cpu_color = "green" if cpu_load > 80: cpu_color = "red" elif cpu_load > 50: cpu_color = "yellow" temp_color = "green" if cpu_temp > 70: temp_color = "red" elif cpu_temp > 50: temp_color = "yellow" # Header wie Dashboard 1 draw.text( (10, 5), "Hintergasse 9A", fill="white", font=font_title ) current_time = time.strftime("%H:%M") bbox = draw.textbbox( (0, 0), current_time, font=font_title ) time_width = bbox[2] - bbox[0] draw.text( (WIDTH - time_width - 10, 5), current_time, fill="white", font=font_title ) draw.text( (10, 30), f"WLAN: {get_ip()}", fill="cyan", font=font_small ) draw.text( (240, 30), f"WAN: {get_wan_ip()}", fill="orange", font=font_small_bold ) draw.line( (10, 60, WIDTH - 10, 60), fill="white" ) # Inhalt Dashboard 3 draw.text( (20, 95), "CPU-FREQUENZ", fill="cyan", font=font_small ) draw.text( (20, 115), f"{cpu_freq:.0f} MHz", fill="green", font=font_download ) draw.text( (20, 185), "CPU-LAST", fill="white", font=font_small_bold ) draw.text( (170, 185), f"{cpu_load:.0f}% {load1:.1f} | {load5:.1f} | {load15:.1f}", fill=cpu_color, font=font_small_bold ) ram_total = 30.77 ram_used = ram_total * ram_load / 100 if ram_load < 60: ram_color = "lime" elif ram_load < 80: ram_color = "yellow" else: ram_color = "red" draw.text( (20, 215), "RAM-LAST", fill="white", font=font_small_bold ) draw.text( (170, 215), f"{ram_load:.0f} % ({ram_used:.1f} / {ram_total:.1f} GB)", fill=ram_color, font=font_small_bold ) draw.text( (20, 245), "CPU-TEMP", fill="white", font=font_small_bold ) draw.text( (170, 245), f"{cpu_temp:.0f} °C", fill=temp_color, font=font_small_bold ) draw.text( (20, 275), "RAUM-TEMP", fill="white", font=font_small_bold ) if server_temp < 25: room_color = "lime" elif server_temp < 31: room_color = "yellow" else: room_color = "red" draw.text( (170, 275), f"{server_temp:.1f} °C", fill=room_color, font=font_small_bold ) return img # ------------------------------------------------- # Framebuffer schreiben # ------------------------------------------------- def write_fb(img): arr = np.array(img) r = (arr[:, :, 0] >> 3).astype(np.uint16) g = (arr[:, :, 1] >> 2).astype(np.uint16) b = (arr[:, :, 2] >> 3).astype(np.uint16) rgb565 = (r << 11) | (g << 5) | b data = rgb565.astype("