Speedtest_Display_Rasberry_Pi/down.py

1002 lines
21 KiB
Python

from PIL import Image, ImageDraw, ImageFont
import numpy as np
import time
import socket
import fcntl
import struct
import traceback
import urllib.request
from zoneinfo import ZoneInfo
from influxdb_client import InfluxDBClient
from gpiozero import Button
# -------------------------------------------------
# GPIO Schalter
# -------------------------------------------------
SWITCH1 = Button(
5,
pull_up=True,
bounce_time=0.1
)
SWITCH2 = Button(
21,
pull_up=True,
bounce_time=0.1
)
# -------------------------------------------------
# InfluxDB V2
# -------------------------------------------------
INFLUX_URL = "http://10.0.1.134:9086"
INFLUX_TOKEN = "QSFzWhbpLI71fPiSfINDmtI3YkO4PgdTjyen_zxLzUe45vaVlUbAB04gV75wZjrfWV0WTIGQTv1F3_G0mTdIgQ=="
INFLUX_ORG = "iobroker"
INFLUX_BUCKET = "hintergasse"
# -------------------------------------------------
# Display
# -------------------------------------------------
WIDTH = 480
HEIGHT = 320
FB = "/dev/fb0"
# -------------------------------------------------
# Fonts
# -------------------------------------------------
font_title = ImageFont.truetype(
"/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf",
22
)
font_small = ImageFont.truetype(
"/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf",
18
)
font_small_bold = ImageFont.truetype(
"/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf",
18
)
font_download = ImageFont.truetype(
"/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf",
46
)
font_upload = ImageFont.truetype(
"/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf",
34
)
font_delta = ImageFont.truetype(
"/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf",
16
)
# -------------------------------------------------
# Lokale WLAN-IP
# -------------------------------------------------
def get_ip():
interfaces = ["wlan0", "wlp2s0", "wlan1"]
for interface in interfaces:
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
ip = socket.inet_ntoa(
fcntl.ioctl(
s.fileno(),
0x8915,
struct.pack(
'256s',
interface[:15].encode('utf-8')
)
)[20:24]
)
return ip
except:
pass
return "Keine WLAN-IP"
# -------------------------------------------------
# Öffentliche WAN-IP
# -------------------------------------------------
def get_wan_ip():
try:
with urllib.request.urlopen(
"https://api.ipify.org",
timeout=5
) as response:
return response.read().decode("utf-8")
except:
return "Keine WAN-IP"
# -------------------------------------------------
# Ping aus InfluxDB lesen
# -------------------------------------------------
def get_ping():
try:
client = InfluxDBClient(
url=INFLUX_URL,
token=INFLUX_TOKEN,
org=INFLUX_ORG
)
query_api = client.query_api()
query = f'''
from(bucket: "{INFLUX_BUCKET}")
|> range(start: -24h)
|> filter(fn: (r) => r["_measurement"] == "speedtest")
|> filter(fn: (r) => r["_field"] == "ping")
|> last()
'''
tables = query_api.query(query)
for table in tables:
for record in table.records:
return float(record.get_value())
except:
pass
return -1
# -------------------------------------------------
# Verlauf der letzten Downloads
# -------------------------------------------------
def get_influx_value(measurement):
try:
client = InfluxDBClient(
url=INFLUX_URL,
token=INFLUX_TOKEN,
org=INFLUX_ORG
)
query_api = client.query_api()
query = f'''
from(bucket: "{INFLUX_BUCKET}")
|> range(start: -24h)
|> filter(fn: (r) => r["_measurement"] == "{measurement}")
|> filter(fn: (r) => r["_field"] == "value")
|> group()
|> sort(columns: ["_time"])
|> tail(n:1)
'''
tables = query_api.query(query)
for table in tables:
for record in table.records:
return float(record.get_value())
except:
pass
return 0
def get_download_history():
try:
client = InfluxDBClient(
url=INFLUX_URL,
token=INFLUX_TOKEN,
org=INFLUX_ORG
)
query_api = client.query_api()
query = f'''
from(bucket: "{INFLUX_BUCKET}")
|> range(start: -24h)
|> filter(fn: (r) => r["_measurement"] == "speedtest")
|> filter(fn: (r) => r["_field"] == "download_bits")
|> group()
|> sort(columns: ["_time"])
'''
tables = query_api.query(query)
values = []
for table in tables:
for record in table.records:
value = record.get_value()
if value is not None:
values.append(
value / 1000 / 1000
)
return values[-24:]
except Exception:
print(traceback.format_exc())
return []
def get_unifi_clients():
try:
with urllib.request.urlopen(
"http://10.0.1.122:8087/getPlainValue/unifi.2.default.health.lan.num_user",
timeout=3
) as response:
lan = int(response.read().decode().strip())
with urllib.request.urlopen(
"http://10.0.1.122:8087/getPlainValue/unifi.2.default.health.wlan.num_user",
timeout=3
) as response:
wlan = int(response.read().decode().strip())
return lan + wlan
except Exception as e:
print("UniFi Client Fehler:", e)
return 0
def get_url_value(url):
try:
with urllib.request.urlopen(url, timeout=3) as response:
value = response.read().decode().strip()
# ioBroker liefert manche Werte als "2.3" oder "721371"
value = value.replace('"', '')
return value
except Exception as e:
print("URL Fehler:", e)
return "0"
def get_unifi_stats():
try:
lan = int(get_url_value(
"http://10.0.1.122:8087/getPlainValue/unifi.2.default.health.lan.num_user"
))
wlan = int(get_url_value(
"http://10.0.1.122:8087/getPlainValue/unifi.2.default.health.wlan.num_user"
))
cpu = float(
get_url_value(
"http://10.0.1.122:8087/getPlainValue/unifi.2.default.devices.9c:05:d6:52:52:19.system-stats.cpu"
).replace('"', '')
)
ram = float(
get_url_value(
"http://10.0.1.122:8087/getPlainValue/unifi.2.default.devices.9c:05:d6:52:52:19.system-stats.mem"
).replace('"', '')
)
uptime = int(get_url_value(
"http://10.0.1.122:8087/getPlainValue/unifi.2.default.devices.9c:05:d6:52:52:19.system-stats.uptime"
))
return {
"lan": lan,
"wlan": wlan,
"clients": lan + wlan,
"cpu": cpu,
"ram": ram,
"uptime": uptime
}
except Exception as e:
print("UniFi Fehler:", e)
return {
"lan": 0,
"wlan": 0,
"clients": 0,
"cpu": 0,
"ram": 0,
"uptime": 0
}
def format_uptime(seconds):
days = seconds // 86400
hours = (seconds % 86400) // 3600
return f"{days}d {hours}h"
# -------------------------------------------------
# Speedtest aus InfluxDB lesen
# -------------------------------------------------
def get_speedtest():
try:
client = InfluxDBClient(
url=INFLUX_URL,
token=INFLUX_TOKEN,
org=INFLUX_ORG
)
query_api = client.query_api()
# DOWNLOAD
query_down = f'''
from(bucket: "{INFLUX_BUCKET}")
|> range(start: -24h)
|> filter(fn: (r) => r["_measurement"] == "speedtest")
|> filter(fn: (r) => r["_field"] == "download_bits")
|> group()
|> sort(columns: ["_time"])
|> tail(n: 2)
'''
result_down = query_api.query(query_down)
down = 0
down_prev = 0
timestamp = "Keine Daten"
download_values = []
for table in result_down:
for record in table.records:
download_values.append(
record.get_value()
)
local_time = record.get_time().astimezone(
ZoneInfo("Europe/Berlin")
)
if len(download_values) >= 1:
down = download_values[-1]
if len(download_values) >= 2:
down_prev = download_values[-2]
now = time.localtime()
today = (
local_time.year == now.tm_year and
local_time.month == now.tm_mon and
local_time.day == now.tm_mday
)
yesterday_ts = time.time() - 86400
yesterday = time.localtime(yesterday_ts)
is_yesterday = (
local_time.year == yesterday.tm_year and
local_time.month == yesterday.tm_mon and
local_time.day == yesterday.tm_mday
)
if today:
timestamp = local_time.strftime(
"HEUTE %H:%M"
)
elif is_yesterday:
timestamp = local_time.strftime(
"GESTERN %H:%M"
)
else:
timestamp = local_time.strftime(
"%d.%m.%Y %H:%M"
)
# UPLOAD
query_up = f'''
from(bucket: "{INFLUX_BUCKET}")
|> range(start: -24h)
|> filter(fn: (r) => r["_measurement"] == "speedtest")
|> filter(fn: (r) => r["_field"] == "upload_bits")
|> group()
|> sort(columns: ["_time"])
|> tail(n: 2)
'''
result_up = query_api.query(query_up)
up = 0
up_prev = 0
upload_values = []
for table in result_up:
for record in table.records:
upload_values.append(
record.get_value()
)
if len(upload_values) >= 1:
up = upload_values[-1]
if len(upload_values) >= 2:
up_prev = upload_values[-2]
# bit/s -> Mbit/s
down = down / 1000 / 1000
up = up / 1000 / 1000
down_prev = down_prev / 1000 / 1000
up_prev = up_prev / 1000 / 1000
return down, up, down_prev, up_prev, timestamp
except Exception:
print(traceback.format_exc())
return 0, 0, 0, 0, "Influx Fehler"
# -------------------------------------------------
# Verlaufsgrafik
# -------------------------------------------------
def draw_graph(draw, values, x, y, w, h):
if len(values) < 2:
return
max_val = max(values)
if max_val <= 0:
return
draw.line(
(x, y + h // 2, x + w, y + h // 2),
fill=(40, 40, 40)
)
draw.line(
(x, y + h, x + w, y + h),
fill=(60, 60, 60)
)
points = []
for i, val in enumerate(values):
px = x + int(i * (w / (len(values) - 1)))
py = y + h - int((val / max_val) * h)
points.append((px, py))
draw.line(points, fill="green", width=2)
# -------------------------------------------------
# Dashboard 1
# -------------------------------------------------
def draw_dashboard_1():
down, up, down_prev, up_prev, ts = get_speedtest()
ping = get_ping()
history = get_download_history()
img = Image.new("RGB", (WIDTH, HEIGHT), "black")
draw = ImageDraw.Draw(img)
# Farben
if down < 50:
down_color = "red"
elif down < 90:
down_color = "yellow"
else:
down_color = "green"
if up < 10:
up_color = "red"
elif up < 30:
up_color = "yellow"
else:
up_color = "green"
if ping < 0:
ping_color = "red"
elif ping > 100:
ping_color = "red"
elif ping > 50:
ping_color = "yellow"
else:
ping_color = "green"
# Header
draw.text(
(10, 5),
"Hintergasse 9A",
fill="white",
font=font_title
)
current_time = time.strftime("%H:%M")
bbox = draw.textbbox(
(0, 0),
current_time,
font=font_title
)
time_width = bbox[2] - bbox[0]
draw.text(
(WIDTH - time_width - 10, 5),
current_time,
fill="white",
font=font_title
)
draw.text(
(10, 30),
f"WLAN: {get_ip()}",
fill="cyan",
font=font_small
)
draw.text(
(240, 30),
f"WAN: {get_wan_ip()}",
fill="orange",
font=font_small_bold
)
draw.line(
(10, 60, WIDTH - 10, 60),
fill="white"
)
# DOWNLOAD
draw.text(
(20, 95),
"DOWNLOAD",
fill="green",
font=font_small
)
draw.text(
(20, 120),
f"{down:.1f} Mbit/s",
fill=down_color,
font=font_download
)
draw.text(
(25, 172),
f"({down_prev:.1f} Mbit/s)",
fill=(140, 140, 140),
font=font_delta
)
# PING
draw.text(
(300, 95),
f"PING: {ping:.1f} ms",
fill=ping_color,
font=font_small_bold
)
# UPLOAD
draw.text(
(20, 210),
"UPLOAD",
fill="orange",
font=font_small
)
draw.text(
(20, 235),
f"{up:.1f} Mbit/s",
fill=up_color,
font=font_upload
)
draw.text(
(25, 278),
f"({up_prev:.1f} Mbit/s)",
fill=(140, 140, 140),
font=font_delta
)
# Verlaufsgrafik
draw.text(
(250, 190),
"24H VERLAUF",
fill="gray",
font=font_small
)
draw_graph(
draw,
history,
250,
215,
200,
80
)
# Footer
draw.line(
(10, HEIGHT - 20, WIDTH - 10, HEIGHT - 20),
fill="white"
)
draw.text(
(10, HEIGHT - 18),
f"Messung: {ts}",
fill="gray",
font=font_small
)
return img
# -------------------------------------------------
# Dashboard 2
# -------------------------------------------------
def draw_dashboard_2():
img = Image.new("RGB", (WIDTH, HEIGHT), "black")
draw = ImageDraw.Draw(img)
stats = get_unifi_stats()
# Header wie Dashboard 1
draw.text(
(10, 5),
"Hintergasse 9A",
fill="white",
font=font_title
)
current_time = time.strftime("%H:%M")
bbox = draw.textbbox(
(0, 0),
current_time,
font=font_title
)
time_width = bbox[2] - bbox[0]
draw.text(
(WIDTH - time_width - 10, 5),
current_time,
fill="white",
font=font_title
)
draw.text(
(10, 30),
f"WLAN: {get_ip()}",
fill="cyan",
font=font_small
)
draw.text(
(240, 30),
f"WAN: {get_wan_ip()}",
fill="orange",
font=font_small_bold
)
draw.line(
(10, 60, WIDTH - 10, 60),
fill="white"
)
draw.text(
(20, 90),
"UNIFI GATEWAY MAX",
fill="cyan",
font=font_small_bold
)
draw.text((20, 130), "LAN CLIENTS", fill="white", font=font_small_bold)
draw.text((220, 130), str(stats["lan"]), fill="lime", font=font_small_bold)
draw.text((20, 160), "WLAN CLIENTS", fill="white", font=font_small_bold)
draw.text((220, 160), str(stats["wlan"]), fill="lime", font=font_small_bold)
draw.text((20, 190), "CLIENTS GES.", fill="white", font=font_small_bold)
draw.text((220, 190), str(stats["clients"]), fill="lime", font=font_small_bold)
draw.text((20, 220), "UXG CPU", fill="white", font=font_small_bold)
draw.text((220, 220), f"{stats['cpu']:.0f} %", fill="yellow", font=font_small_bold)
draw.text((20, 250), "UXG RAM", fill="white", font=font_small_bold)
draw.text((220, 250), f"{stats['ram']:.0f} %", fill="cyan", font=font_small_bold)
draw.text((20, 280), "UPTIME", fill="white", font=font_small_bold)
draw.text((220, 280), format_uptime(stats["uptime"]), fill="orange", font=font_small_bold)
return img
# -------------------------------------------------
# Dashboard 3
# -------------------------------------------------
def draw_dashboard_3():
cpu_freq = get_influx_value("Proxmox_Frequ")
cpu_load = get_influx_value("Proxmox_CPU_Auslastung")
ram_load = get_influx_value("Proxmox_RAM_Auslastung")
cpu_temp = get_influx_value("Proxmox_Temp")
img = Image.new("RGB", (WIDTH, HEIGHT), "black")
server_temp = get_influx_value("Temp_Server_oben")
draw = ImageDraw.Draw(img)
cpu_color = "green"
if cpu_load > 80:
cpu_color = "red"
elif cpu_load > 50:
cpu_color = "yellow"
temp_color = "green"
if cpu_temp > 70:
temp_color = "red"
elif cpu_temp > 50:
temp_color = "yellow"
# Header wie Dashboard 1
draw.text(
(10, 5),
"Hintergasse 9A",
fill="white",
font=font_title
)
current_time = time.strftime("%H:%M")
bbox = draw.textbbox(
(0, 0),
current_time,
font=font_title
)
time_width = bbox[2] - bbox[0]
draw.text(
(WIDTH - time_width - 10, 5),
current_time,
fill="white",
font=font_title
)
draw.text(
(10, 30),
f"WLAN: {get_ip()}",
fill="cyan",
font=font_small
)
draw.text(
(240, 30),
f"WAN: {get_wan_ip()}",
fill="orange",
font=font_small_bold
)
draw.line(
(10, 60, WIDTH - 10, 60),
fill="white"
)
# Inhalt Dashboard 3
draw.text(
(20, 95),
"CPU-FREQUENZ",
fill="cyan",
font=font_small
)
draw.text(
(20, 115),
f"{cpu_freq:.0f} MHz",
fill="green",
font=font_download
)
draw.text(
(20, 185),
"CPU-LAST",
fill="white",
font=font_small_bold
)
draw.text(
(170, 185),
f"{cpu_load:.0f} %",
fill=cpu_color,
font=font_small_bold
)
ram_total = 30.77
ram_used = ram_total * ram_load / 100
if ram_load < 60:
ram_color = "lime"
elif ram_load < 80:
ram_color = "yellow"
else:
ram_color = "red"
draw.text(
(20, 215),
"RAM-LAST",
fill="white",
font=font_small_bold
)
draw.text(
(170, 215),
f"{ram_load:.0f} % ({ram_used:.1f} / {ram_total:.1f} GB)",
fill=ram_color,
font=font_small_bold
)
draw.text(
(20, 245),
"CPU-TEMP",
fill="white",
font=font_small_bold
)
draw.text(
(170, 245),
f"{cpu_temp:.0f} °C",
fill=temp_color,
font=font_small_bold
)
draw.text(
(20, 275),
"RAUM-TEMP",
fill="white",
font=font_small_bold
)
if server_temp < 25:
room_color = "lime"
elif server_temp < 31:
room_color = "yellow"
else:
room_color = "red"
draw.text(
(170, 275),
f"{server_temp:.1f} °C",
fill=room_color,
font=font_small_bold
)
return img
# -------------------------------------------------
# Framebuffer schreiben
# -------------------------------------------------
def write_fb(img):
arr = np.array(img)
r = (arr[:, :, 0] >> 3).astype(np.uint16)
g = (arr[:, :, 1] >> 2).astype(np.uint16)
b = (arr[:, :, 2] >> 3).astype(np.uint16)
rgb565 = (r << 11) | (g << 5) | b
data = rgb565.astype("<u2").tobytes()
with open(FB, "wb") as f:
f.write(data)
# -------------------------------------------------
# Hauptschleife
# -------------------------------------------------
while True:
try:
# Dashboard 3
if SWITCH2.is_pressed:
img = draw_dashboard_3()
# Dashboard 2
elif SWITCH1.is_pressed:
img = draw_dashboard_2()
# Dashboard 1
else:
img = draw_dashboard_1()
write_fb(img)
except Exception:
print(traceback.format_exc())
time.sleep(1)