Speedtest_Display_Rasberry_Pi/down.py

1214 lines
No EOL
24 KiB
Python

from PIL import Image, ImageDraw, ImageFont
import numpy as np
import time
import socket
import fcntl
import struct
import traceback
import urllib.request
from zoneinfo import ZoneInfo
from influxdb_client import InfluxDBClient
from gpiozero import Button
# -------------------------------------------------
# GPIO Schalter
# -------------------------------------------------
SWITCH1 = Button(
5,
pull_up=True,
bounce_time=0.1
)
SWITCH2 = Button(
21,
pull_up=True,
bounce_time=0.1
)
# -------------------------------------------------
# InfluxDB V2
# -------------------------------------------------
INFLUX_URL = "http://10.0.1.134:9086"
INFLUX_TOKEN = "QSFzWhbpLI71fPiSfINDmtI3YkO4PgdTjyen_zxLzUe45vaVlUbAB04gV75wZjrfWV0WTIGQTv1F3_G0mTdIgQ=="
INFLUX_ORG = "iobroker"
INFLUX_BUCKET = "hintergasse"
# -------------------------------------------------
# Display
# -------------------------------------------------
WIDTH = 480
HEIGHT = 320
FB = "/dev/fb0"
# -------------------------------------------------
# Fonts
# -------------------------------------------------
font_title = ImageFont.truetype(
"/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf",
22
)
font_small = ImageFont.truetype(
"/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf",
18
)
font_small_bold = ImageFont.truetype(
"/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf",
18
)
font_download = ImageFont.truetype(
"/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf",
46
)
font_upload = ImageFont.truetype(
"/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf",
34
)
font_delta = ImageFont.truetype(
"/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf",
16
)
# -------------------------------------------------
# Lokale WLAN-IP
# -------------------------------------------------
def get_ip():
interfaces = ["wlan0", "wlp2s0", "wlan1"]
for interface in interfaces:
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
ip = socket.inet_ntoa(
fcntl.ioctl(
s.fileno(),
0x8915,
struct.pack(
'256s',
interface[:15].encode('utf-8')
)
)[20:24]
)
return ip
except:
pass
return "Keine WLAN-IP"
# -------------------------------------------------
# Öffentliche WAN-IP
# -------------------------------------------------
def get_wan_ip():
try:
with urllib.request.urlopen(
"https://api.ipify.org",
timeout=5
) as response:
return response.read().decode("utf-8")
except:
return "Keine WAN-IP"
# -------------------------------------------------
# Ping aus InfluxDB lesen
# -------------------------------------------------
def get_ping():
try:
client = InfluxDBClient(
url=INFLUX_URL,
token=INFLUX_TOKEN,
org=INFLUX_ORG
)
query_api = client.query_api()
query = f'''
from(bucket: "{INFLUX_BUCKET}")
|> range(start: -24h)
|> filter(fn: (r) => r["_measurement"] == "speedtest")
|> filter(fn: (r) => r["_field"] == "ping")
|> last()
'''
tables = query_api.query(query)
for table in tables:
for record in table.records:
return float(record.get_value())
except:
pass
return -1
def get_us24_temp():
try:
return int(get_url_value(
"http://10.0.1.122:8087/getPlainValue/unifi.2.default.devices.f4:92:bf:79:ce:8b.general_temperature"
))
except:
return 0
# -------------------------------------------------
# Verlauf der letzten Downloads
# -------------------------------------------------
def get_influx_value(measurement):
try:
client = InfluxDBClient(
url=INFLUX_URL,
token=INFLUX_TOKEN,
org=INFLUX_ORG
)
query_api = client.query_api()
query = f'''
from(bucket: "{INFLUX_BUCKET}")
|> range(start: -24h)
|> filter(fn: (r) => r["_measurement"] == "{measurement}")
|> filter(fn: (r) => r["_field"] == "value")
|> group()
|> sort(columns: ["_time"])
|> tail(n:1)
'''
tables = query_api.query(query)
for table in tables:
for record in table.records:
return float(record.get_value())
except:
pass
return 0
def get_download_history():
try:
client = InfluxDBClient(
url=INFLUX_URL,
token=INFLUX_TOKEN,
org=INFLUX_ORG
)
query_api = client.query_api()
query = f'''
from(bucket: "{INFLUX_BUCKET}")
|> range(start: -24h)
|> filter(fn: (r) => r["_measurement"] == "speedtest")
|> filter(fn: (r) => r["_field"] == "download_bits")
|> group()
|> sort(columns: ["_time"])
'''
tables = query_api.query(query)
values = []
for table in tables:
for record in table.records:
value = record.get_value()
if value is not None:
values.append(
value / 1000 / 1000
)
return values[-24:]
except Exception:
print(traceback.format_exc())
return []
def get_unifi_clients():
try:
with urllib.request.urlopen(
"http://10.0.1.122:8087/getPlainValue/unifi.2.default.health.lan.num_user",
timeout=3
) as response:
lan = int(response.read().decode().strip())
with urllib.request.urlopen(
"http://10.0.1.122:8087/getPlainValue/unifi.2.default.health.wlan.num_user",
timeout=3
) as response:
wlan = int(response.read().decode().strip())
return lan + wlan
except Exception as e:
print("UniFi Client Fehler:", e)
return 0
def get_url_value(url):
try:
with urllib.request.urlopen(url, timeout=3) as response:
value = response.read().decode().strip()
# ioBroker liefert manche Werte als "2.3" oder "721371"
value = value.replace('"', '')
return value
except Exception as e:
print("URL Fehler:", e)
return "0"
def get_unifi_stats():
try:
lan = int(get_url_value(
"http://10.0.1.122:8087/getPlainValue/unifi.2.default.health.lan.num_user"
))
wlan = int(get_url_value(
"http://10.0.1.122:8087/getPlainValue/unifi.2.default.health.wlan.num_user"
))
cpu = float(
get_url_value(
"http://10.0.1.122:8087/getPlainValue/unifi.2.default.devices.9c:05:d6:52:52:19.system-stats.cpu"
).replace('"', '')
)
ram = float(
get_url_value(
"http://10.0.1.122:8087/getPlainValue/unifi.2.default.devices.9c:05:d6:52:52:19.system-stats.mem"
).replace('"', '')
)
uptime = int(get_url_value(
"http://10.0.1.122:8087/getPlainValue/unifi.2.default.devices.9c:05:d6:52:52:19.system-stats.uptime"
))
return {
"lan": lan,
"wlan": wlan,
"clients": lan + wlan,
"cpu": cpu,
"ram": ram,
"uptime": uptime
}
except Exception as e:
print("UniFi Fehler:", e)
return {
"lan": 0,
"wlan": 0,
"clients": 0,
"cpu": 0,
"ram": 0,
"uptime": 0
}
def format_uptime(seconds):
days = seconds // 86400
hours = (seconds % 86400) // 3600
return f"{days}d {hours}h"
def get_proxmox_load():
try:
load_string = get_url_value(
"http://10.0.1.122:8087/getPlainValue/linux-control.0.119_Proxmox.Load"
)
parts = load_string.split()
return (
float(parts[0]),
float(parts[1]),
float(parts[2])
)
except Exception as e:
print("Load Fehler:", e)
return (0, 0, 0)
def get_us24_cpu():
try:
return float(get_url_value(
"http://10.0.1.122:8087/getPlainValue/unifi.2.default.devices.f4:92:bf:79:ce:8b.system-stats.cpu"
))
except:
return 0
def get_us24_ram():
try:
return float(get_url_value(
"http://10.0.1.122:8087/getPlainValue/unifi.2.default.devices.f4:92:bf:79:ce:8b.system-stats.mem"
))
except:
return 0
def get_display_enabled():
try:
value = get_url_value(
"http://10.0.1.122:8087/getPlainValue/javascript.0.Variablen.Server_Display"
)
return value.lower() == "true"
except Exception as e:
print(f"Displaysteuerung nicht erreichbar: {e}")
return True
# -------------------------------------------------
# Speedtest aus InfluxDB lesen
# -------------------------------------------------
def get_speedtest():
try:
client = InfluxDBClient(
url=INFLUX_URL,
token=INFLUX_TOKEN,
org=INFLUX_ORG
)
query_api = client.query_api()
# DOWNLOAD
query_down = f'''
from(bucket: "{INFLUX_BUCKET}")
|> range(start: -24h)
|> filter(fn: (r) => r["_measurement"] == "speedtest")
|> filter(fn: (r) => r["_field"] == "download_bits")
|> group()
|> sort(columns: ["_time"])
|> tail(n: 2)
'''
result_down = query_api.query(query_down)
down = 0
down_prev = 0
timestamp = "Keine Daten"
download_values = []
for table in result_down:
for record in table.records:
download_values.append(
record.get_value()
)
local_time = record.get_time().astimezone(
ZoneInfo("Europe/Berlin")
)
if len(download_values) >= 1:
down = download_values[-1]
if len(download_values) >= 2:
down_prev = download_values[-2]
now = time.localtime()
today = (
local_time.year == now.tm_year and
local_time.month == now.tm_mon and
local_time.day == now.tm_mday
)
yesterday_ts = time.time() - 86400
yesterday = time.localtime(yesterday_ts)
is_yesterday = (
local_time.year == yesterday.tm_year and
local_time.month == yesterday.tm_mon and
local_time.day == yesterday.tm_mday
)
if today:
timestamp = local_time.strftime(
"HEUTE %H:%M"
)
elif is_yesterday:
timestamp = local_time.strftime(
"GESTERN %H:%M"
)
else:
timestamp = local_time.strftime(
"%d.%m.%Y %H:%M"
)
# UPLOAD
query_up = f'''
from(bucket: "{INFLUX_BUCKET}")
|> range(start: -24h)
|> filter(fn: (r) => r["_measurement"] == "speedtest")
|> filter(fn: (r) => r["_field"] == "upload_bits")
|> group()
|> sort(columns: ["_time"])
|> tail(n: 2)
'''
result_up = query_api.query(query_up)
up = 0
up_prev = 0
upload_values = []
for table in result_up:
for record in table.records:
upload_values.append(
record.get_value()
)
if len(upload_values) >= 1:
up = upload_values[-1]
if len(upload_values) >= 2:
up_prev = upload_values[-2]
# bit/s -> Mbit/s
down = down / 1000 / 1000
up = up / 1000 / 1000
down_prev = down_prev / 1000 / 1000
up_prev = up_prev / 1000 / 1000
return down, up, down_prev, up_prev, timestamp
except Exception:
print(traceback.format_exc())
return 0, 0, 0, 0, "Influx Fehler"
# -------------------------------------------------
# Verlaufsgrafik
# -------------------------------------------------
def draw_graph(draw, values, x, y, w, h):
if len(values) < 2:
return
max_val = max(values)
if max_val <= 0:
return
draw.line(
(x, y + h // 2, x + w, y + h // 2),
fill=(40, 40, 40)
)
draw.line(
(x, y + h, x + w, y + h),
fill=(60, 60, 60)
)
points = []
for i, val in enumerate(values):
px = x + int(i * (w / (len(values) - 1)))
py = y + h - int((val / max_val) * h)
points.append((px, py))
draw.line(points, fill="green", width=2)
# -------------------------------------------------
# Dashboard 1
# -------------------------------------------------
def draw_dashboard_1():
down, up, down_prev, up_prev, ts = get_speedtest()
ping = get_ping()
history = get_download_history()
img = Image.new("RGB", (WIDTH, HEIGHT), "black")
draw = ImageDraw.Draw(img)
# Farben
if down < 50:
down_color = "red"
elif down < 90:
down_color = "yellow"
else:
down_color = "green"
if up < 10:
up_color = "red"
elif up < 30:
up_color = "yellow"
else:
up_color = "green"
if ping < 0:
ping_color = "red"
elif ping > 100:
ping_color = "red"
elif ping > 50:
ping_color = "yellow"
else:
ping_color = "green"
# Header
draw.text(
(10, 5),
"Hintergasse 9A",
fill="white",
font=font_title
)
current_time = time.strftime("%H:%M")
bbox = draw.textbbox(
(0, 0),
current_time,
font=font_title
)
time_width = bbox[2] - bbox[0]
draw.text(
(WIDTH - time_width - 10, 5),
current_time,
fill="white",
font=font_title
)
draw.text(
(10, 30),
f"WLAN: {get_ip()}",
fill="cyan",
font=font_small
)
draw.text(
(240, 30),
f"WAN: {get_wan_ip()}",
fill="orange",
font=font_small_bold
)
draw.line(
(10, 60, WIDTH - 10, 60),
fill="white"
)
# DOWNLOAD
draw.text(
(20, 95),
"DOWNLOAD",
fill="green",
font=font_small
)
draw.text(
(20, 120),
f"{down:.1f} Mbit/s",
fill=down_color,
font=font_download
)
draw.text(
(25, 172),
f"({down_prev:.1f} Mbit/s)",
fill=(140, 140, 140),
font=font_delta
)
# PING
draw.text(
(300, 95),
f"PING: {ping:.1f} ms",
fill=ping_color,
font=font_small_bold
)
# UPLOAD
draw.text(
(20, 210),
"UPLOAD",
fill="orange",
font=font_small
)
draw.text(
(20, 235),
f"{up:.1f} Mbit/s",
fill=up_color,
font=font_upload
)
draw.text(
(25, 278),
f"({up_prev:.1f} Mbit/s)",
fill=(140, 140, 140),
font=font_delta
)
# Verlaufsgrafik
draw.text(
(250, 190),
"24H VERLAUF",
fill="gray",
font=font_small
)
draw_graph(
draw,
history,
250,
215,
200,
80
)
# Footer
draw.line(
(10, HEIGHT - 20, WIDTH - 10, HEIGHT - 20),
fill="white"
)
draw.text(
(10, HEIGHT - 18),
f"Messung: {ts}",
fill="gray",
font=font_small
)
return img
# -------------------------------------------------
# Dashboard 2
# -------------------------------------------------
def draw_dashboard_2():
stats = get_unifi_stats()
us24_cpu = get_us24_cpu()
us24_ram = get_us24_ram()
us24_temp = get_us24_temp()
img = Image.new("RGB", (WIDTH, HEIGHT), "black")
draw = ImageDraw.Draw(img)
# Header
draw.text(
(10, 5),
"Hintergasse 9A",
fill="white",
font=font_title
)
current_time = time.strftime("%H:%M")
bbox = draw.textbbox(
(0, 0),
current_time,
font=font_title
)
time_width = bbox[2] - bbox[0]
draw.text(
(WIDTH - time_width - 10, 5),
current_time,
fill="white",
font=font_title
)
draw.text(
(10, 30),
f"WLAN: {get_ip()}",
fill="cyan",
font=font_small
)
draw.text(
(240, 30),
f"WAN: {get_wan_ip()}",
fill="orange",
font=font_small_bold
)
draw.line(
(10, 60, WIDTH - 10, 60),
fill="white"
)
# Temperaturfarbe
if us24_temp < 50:
temp_color = "lime"
elif us24_temp < 65:
temp_color = "yellow"
else:
temp_color = "red"
# Titel
title = "Hubobel's Netzwerk"
bbox = draw.textbbox(
(0, 0),
title,
font=font_title
)
title_width = bbox[2] - bbox[0]
draw.text(
((WIDTH - title_width) // 2, 75),
title,
fill="cyan",
font=font_title
)
# Uptime
uptime_text = format_uptime(stats["uptime"])
bbox = draw.textbbox(
(0, 0),
uptime_text,
font=font_small_bold
)
uptime_width = bbox[2] - bbox[0]
draw.text(
((WIDTH - uptime_width) // 2, 105),
uptime_text,
fill="orange",
font=font_small_bold
)
# Clients
draw.text(
(20, 140),
"CLIENTS",
fill="white",
font=font_small_bold
)
draw.text(
(220, 140),
f"{stats['clients']} ({stats['lan']} / {stats['wlan']})",
fill="lime",
font=font_small_bold
)
# UXG CPU
draw.text(
(20, 175),
"UXG CPU",
fill="white",
font=font_small_bold
)
draw.text(
(220, 175),
f"{stats['cpu']:.1f} %",
fill="yellow",
font=font_small_bold
)
# UXG RAM
draw.text(
(20, 205),
"UXG RAM",
fill="white",
font=font_small_bold
)
draw.text(
(220, 205),
f"{stats['ram']:.1f} %",
fill="cyan",
font=font_small_bold
)
# US24 CPU
draw.text(
(20, 235),
"US24 CPU",
fill="white",
font=font_small_bold
)
draw.text(
(220, 235),
f"{us24_cpu:.1f} %",
fill="yellow",
font=font_small_bold
)
# US24 RAM
draw.text(
(20, 265),
"US24 RAM",
fill="white",
font=font_small_bold
)
draw.text(
(220, 265),
f"{us24_ram:.1f} %",
fill="cyan",
font=font_small_bold
)
# US24 Temperatur
draw.text(
(20, 295),
"US24 TEMP",
fill="white",
font=font_small_bold
)
draw.text(
(220, 295),
f"{us24_temp} °C",
fill=temp_color,
font=font_small_bold
)
return img
# -------------------------------------------------
# Dashboard 3
# -------------------------------------------------
def draw_dashboard_3():
cpu_freq = get_influx_value("Proxmox_Frequ")
cpu_load = get_influx_value("Proxmox_CPU_Auslastung")
ram_load = get_influx_value("Proxmox_RAM_Auslastung")
cpu_temp = get_influx_value("Proxmox_Temp")
img = Image.new("RGB", (WIDTH, HEIGHT), "black")
server_temp = get_influx_value("Temp_Server_oben")
load1, load5, load15 = get_proxmox_load()
draw = ImageDraw.Draw(img)
cpu_color = "green"
if cpu_load > 80:
cpu_color = "red"
elif cpu_load > 50:
cpu_color = "yellow"
temp_color = "green"
if cpu_temp > 70:
temp_color = "red"
elif cpu_temp > 50:
temp_color = "yellow"
# Header wie Dashboard 1
draw.text(
(10, 5),
"Hintergasse 9A",
fill="white",
font=font_title
)
current_time = time.strftime("%H:%M")
bbox = draw.textbbox(
(0, 0),
current_time,
font=font_title
)
time_width = bbox[2] - bbox[0]
draw.text(
(WIDTH - time_width - 10, 5),
current_time,
fill="white",
font=font_title
)
draw.text(
(10, 30),
f"WLAN: {get_ip()}",
fill="cyan",
font=font_small
)
draw.text(
(240, 30),
f"WAN: {get_wan_ip()}",
fill="orange",
font=font_small_bold
)
draw.line(
(10, 60, WIDTH - 10, 60),
fill="white"
)
# Inhalt Dashboard 3
draw.text(
(20, 95),
"CPU-FREQUENZ",
fill="cyan",
font=font_small
)
draw.text(
(20, 115),
f"{cpu_freq:.0f} MHz",
fill="green",
font=font_download
)
draw.text(
(20, 185),
"CPU-LAST",
fill="white",
font=font_small_bold
)
draw.text(
(170, 185),
f"{cpu_load:.0f}% {load1:.1f} | {load5:.1f} | {load15:.1f}",
fill=cpu_color,
font=font_small_bold
)
ram_total = 30.77
ram_used = ram_total * ram_load / 100
if ram_load < 60:
ram_color = "lime"
elif ram_load < 80:
ram_color = "yellow"
else:
ram_color = "red"
draw.text(
(20, 215),
"RAM-LAST",
fill="white",
font=font_small_bold
)
draw.text(
(170, 215),
f"{ram_load:.0f} % ({ram_used:.1f} / {ram_total:.1f} GB)",
fill=ram_color,
font=font_small_bold
)
draw.text(
(20, 245),
"CPU-TEMP",
fill="white",
font=font_small_bold
)
draw.text(
(170, 245),
f"{cpu_temp:.0f} °C",
fill=temp_color,
font=font_small_bold
)
draw.text(
(20, 275),
"RAUM-TEMP",
fill="white",
font=font_small_bold
)
if server_temp < 25:
room_color = "lime"
elif server_temp < 31:
room_color = "yellow"
else:
room_color = "red"
draw.text(
(170, 275),
f"{server_temp:.1f} °C",
fill=room_color,
font=font_small_bold
)
return img
# -------------------------------------------------
# Framebuffer schreiben
# -------------------------------------------------
def write_fb(img):
arr = np.array(img)
r = (arr[:, :, 0] >> 3).astype(np.uint16)
g = (arr[:, :, 1] >> 2).astype(np.uint16)
b = (arr[:, :, 2] >> 3).astype(np.uint16)
rgb565 = (r << 11) | (g << 5) | b
data = rgb565.astype("<u2").tobytes()
with open(FB, "wb") as f:
f.write(data)
# -------------------------------------------------
# Hauptschleife
# -------------------------------------------------
black_img = Image.new("RGB", (WIDTH, HEIGHT), "black")
display_was_off = False
while True:
try:
# Display über ioBroker ein-/ausschalten
if not get_display_enabled():
if not display_was_off:
write_fb(black_img)
display_was_off = True
time.sleep(5)
continue
display_was_off = False
# Dashboard 3
if SWITCH2.is_pressed:
img = draw_dashboard_3()
# Dashboard 2
elif SWITCH1.is_pressed:
img = draw_dashboard_2()
# Dashboard 1
else:
img = draw_dashboard_1()
write_fb(img)
except Exception:
print(traceback.format_exc())
time.sleep(1)