Dateien nach „/“ hochladen

This commit is contained in:
hubobel 2026-05-25 15:19:15 +02:00
parent a76a91b0a4
commit d393d4263e

577
down.py Normal file
View file

@ -0,0 +1,577 @@
from PIL import Image, ImageDraw, ImageFont
import numpy as np
import time
import socket
import fcntl
import struct
import traceback
import urllib.request
from zoneinfo import ZoneInfo
from influxdb_client import InfluxDBClient
# -------------------------------------------------
# InfluxDB V2
# -------------------------------------------------
INFLUX_URL = "http://10.0.1.134:9086"
INFLUX_TOKEN = "QSFzWhbpLI71fPiSfINDmtI3YkO4PgdTjyen_zxLzUe45vaVlUbAB04gV75wZjrfWV0WTIGQTv1F3_G0mTdIgQ=="
INFLUX_ORG = "iobroker"
INFLUX_BUCKET = "hintergasse"
# -------------------------------------------------
# Display
# -------------------------------------------------
WIDTH = 480
HEIGHT = 320
FB = "/dev/fb0"
# -------------------------------------------------
# Fonts
# -------------------------------------------------
font_title = ImageFont.truetype(
"/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf",
22
)
font_small = ImageFont.truetype(
"/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf",
18
)
font_small_bold = ImageFont.truetype(
"/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf",
18
)
font_download = ImageFont.truetype(
"/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf",
46
)
font_upload = ImageFont.truetype(
"/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf",
34
)
font_delta = ImageFont.truetype(
"/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf",
16
)
# -------------------------------------------------
# Lokale WLAN-IP
# -------------------------------------------------
def get_ip():
interfaces = ["wlan0", "wlp2s0", "wlan1"]
for interface in interfaces:
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
ip = socket.inet_ntoa(
fcntl.ioctl(
s.fileno(),
0x8915,
struct.pack(
'256s',
interface[:15].encode('utf-8')
)
)[20:24]
)
return ip
except:
pass
return "Keine WLAN-IP"
# -------------------------------------------------
# Öffentliche WAN-IP
# -------------------------------------------------
def get_wan_ip():
try:
with urllib.request.urlopen(
"https://api.ipify.org",
timeout=5
) as response:
return response.read().decode("utf-8")
except:
return "Keine WAN-IP"
# -------------------------------------------------
# Ping aus InfluxDB lesen
# -------------------------------------------------
def get_ping():
try:
client = InfluxDBClient(
url=INFLUX_URL,
token=INFLUX_TOKEN,
org=INFLUX_ORG
)
query_api = client.query_api()
query = f'''
from(bucket: "{INFLUX_BUCKET}")
|> range(start: -24h)
|> filter(fn: (r) => r["_measurement"] == "speedtest")
|> filter(fn: (r) => r["_field"] == "ping")
|> last()
'''
tables = query_api.query(query)
for table in tables:
for record in table.records:
return float(record.get_value())
except:
pass
return -1
# -------------------------------------------------
# Verlauf der letzten Downloads
# -------------------------------------------------
def get_download_history():
try:
client = InfluxDBClient(
url=INFLUX_URL,
token=INFLUX_TOKEN,
org=INFLUX_ORG
)
query_api = client.query_api()
query = f'''
from(bucket: "{INFLUX_BUCKET}")
|> range(start: -24h)
|> filter(fn: (r) => r["_measurement"] == "speedtest")
|> filter(fn: (r) => r["_field"] == "download_bits")
|> sort(columns: ["_time"])
'''
tables = query_api.query(query)
values = []
for table in tables:
for record in table.records:
value = record.get_value()
if value is not None:
values.append(
value / 1000 / 1000
)
return values[-24:]
except Exception:
print(traceback.format_exc())
return []
# -------------------------------------------------
# Speedtest aus InfluxDB lesen
# -------------------------------------------------
def get_speedtest():
try:
client = InfluxDBClient(
url=INFLUX_URL,
token=INFLUX_TOKEN,
org=INFLUX_ORG
)
query_api = client.query_api()
# DOWNLOAD
query_down = f'''
from(bucket: "{INFLUX_BUCKET}")
|> range(start: -24h)
|> filter(fn: (r) => r["_measurement"] == "speedtest")
|> filter(fn: (r) => r["_field"] == "download_bits")
|> tail(n: 2)
'''
result_down = query_api.query(query_down)
down = 0
down_prev = 0
timestamp = "Keine Daten"
download_values = []
for table in result_down:
for record in table.records:
download_values.append(
record.get_value()
)
local_time = record.get_time().astimezone(
ZoneInfo("Europe/Berlin")
)
if len(download_values) >= 1:
down = download_values[-1]
if len(download_values) >= 2:
down_prev = download_values[-2]
now = time.localtime()
today = (
local_time.year == now.tm_year and
local_time.month == now.tm_mon and
local_time.day == now.tm_mday
)
yesterday_ts = time.time() - 86400
yesterday = time.localtime(yesterday_ts)
is_yesterday = (
local_time.year == yesterday.tm_year and
local_time.month == yesterday.tm_mon and
local_time.day == yesterday.tm_mday
)
if today:
timestamp = local_time.strftime(
"HEUTE %H:%M"
)
elif is_yesterday:
timestamp = local_time.strftime(
"GESTERN %H:%M"
)
else:
timestamp = local_time.strftime(
"%d.%m.%Y %H:%M"
)
# UPLOAD
query_up = f'''
from(bucket: "{INFLUX_BUCKET}")
|> range(start: -24h)
|> filter(fn: (r) => r["_measurement"] == "speedtest")
|> filter(fn: (r) => r["_field"] == "upload_bits")
|> tail(n: 2)
'''
result_up = query_api.query(query_up)
up = 0
up_prev = 0
upload_values = []
for table in result_up:
for record in table.records:
upload_values.append(
record.get_value()
)
if len(upload_values) >= 1:
up = upload_values[-1]
if len(upload_values) >= 2:
up_prev = upload_values[-2]
# bit/s -> Mbit/s
down = down / 1000 / 1000
up = up / 1000 / 1000
down_prev = down_prev / 1000 / 1000
up_prev = up_prev / 1000 / 1000
return down, up, down_prev, up_prev, timestamp
except Exception:
print(traceback.format_exc())
return 0, 0, 0, 0, "Influx Fehler"
# -------------------------------------------------
# Verlaufsgrafik
# -------------------------------------------------
def draw_graph(draw, values, x, y, w, h):
if len(values) < 2:
return
max_val = max(values)
if max_val <= 0:
return
# Dezente Hilfslinien
draw.line(
(x, y + h // 2, x + w, y + h // 2),
fill=(40, 40, 40)
)
draw.line(
(x, y + h, x + w, y + h),
fill=(60, 60, 60)
)
points = []
for i, val in enumerate(values):
px = x + int(i * (w / (len(values) - 1)))
py = y + h - int((val / max_val) * h)
points.append((px, py))
draw.line(points, fill="green", width=2)
# -------------------------------------------------
# UI zeichnen
# -------------------------------------------------
def draw_ui():
down, up, down_prev, up_prev, ts = get_speedtest()
ping = get_ping()
history = get_download_history()
img = Image.new("RGB", (WIDTH, HEIGHT), "black")
draw = ImageDraw.Draw(img)
# Farben
if down < 50:
down_color = "red"
elif down < 90:
down_color = "yellow"
else:
down_color = "green"
if up < 10:
up_color = "red"
elif up < 30:
up_color = "yellow"
else:
up_color = "green"
if ping < 0:
ping_color = "red"
elif ping > 100:
ping_color = "red"
elif ping > 50:
ping_color = "yellow"
else:
ping_color = "green"
# Header
draw.text(
(10, 5),
"Hintergasse 9A",
fill="white",
font=font_title
)
draw.text(
(10, 30),
f"WLAN: {get_ip()}",
fill="cyan",
font=font_small
)
draw.text(
(240, 30),
f"WAN: {get_wan_ip()}",
fill="orange",
font=font_small_bold
)
if ping >= 0:
draw.text(
(10, 55),
f"PING: {ping:.1f} ms",
fill=ping_color,
font=font_small_bold
)
else:
draw.text(
(10, 55),
"PING: FEHLER",
fill="red",
font=font_small_bold
)
draw.line(
(10, 80, WIDTH - 10, 80),
fill="white"
)
# DOWNLOAD
draw.text(
(20, 95),
"DOWNLOAD",
fill="green",
font=font_small
)
draw.text(
(20, 120),
f"{down:.1f} Mbit/s",
fill=down_color,
font=font_download
)
draw.text(
(25, 172),
f"({down_prev:.1f} Mbit/s)",
fill=(140, 140, 140),
font=font_delta
)
# UPLOAD
draw.text(
(20, 210),
"UPLOAD",
fill="orange",
font=font_small
)
draw.text(
(20, 235),
f"{up:.1f} Mbit/s",
fill=up_color,
font=font_upload
)
draw.text(
(25, 278),
f"({up_prev:.1f} Mbit/s)",
fill=(140, 140, 140),
font=font_delta
)
# Verlaufsgrafik
draw.text(
(250, 190),
"24H VERLAUF",
fill="gray",
font=font_small
)
draw_graph(
draw,
history,
250,
215,
200,
80
)
# Footer
draw.line(
(10, HEIGHT - 20, WIDTH - 10, HEIGHT - 20),
fill="white"
)
draw.text(
(10, HEIGHT - 18),
f"Messung: {ts}",
fill="gray",
font=font_small
)
return img
# -------------------------------------------------
# Framebuffer schreiben
# -------------------------------------------------
def write_fb(img):
arr = np.array(img)
r = (arr[:, :, 0] >> 3).astype(np.uint16)
g = (arr[:, :, 1] >> 2).astype(np.uint16)
b = (arr[:, :, 2] >> 3).astype(np.uint16)
rgb565 = (r << 11) | (g << 5) | b
data = rgb565.astype("<u2").tobytes()
with open(FB, "wb") as f:
f.write(data)
# -------------------------------------------------
# Hauptschleife
# -------------------------------------------------
while True:
try:
img = draw_ui()
write_fb(img)
except Exception:
print(traceback.format_exc())
time.sleep(60)