Speedtest_Display_Rasberry_Pi/down.py

577 lines
No EOL
12 KiB
Python

from PIL import Image, ImageDraw, ImageFont
import numpy as np
import time
import socket
import fcntl
import struct
import traceback
import urllib.request
from zoneinfo import ZoneInfo
from influxdb_client import InfluxDBClient
# -------------------------------------------------
# InfluxDB V2
# -------------------------------------------------
INFLUX_URL = "http://10.0.1.134:9086"
INFLUX_TOKEN = "QSFzWhbpLI71fPiSfINDmtI3YkO4PgdTjyen_zxLzUe45vaVlUbAB04gV75wZjrfWV0WTIGQTv1F3_G0mTdIgQ=="
INFLUX_ORG = "iobroker"
INFLUX_BUCKET = "hintergasse"
# -------------------------------------------------
# Display
# -------------------------------------------------
WIDTH = 480
HEIGHT = 320
FB = "/dev/fb0"
# -------------------------------------------------
# Fonts
# -------------------------------------------------
font_title = ImageFont.truetype(
"/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf",
22
)
font_small = ImageFont.truetype(
"/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf",
18
)
font_small_bold = ImageFont.truetype(
"/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf",
18
)
font_download = ImageFont.truetype(
"/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf",
46
)
font_upload = ImageFont.truetype(
"/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf",
34
)
font_delta = ImageFont.truetype(
"/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf",
16
)
# -------------------------------------------------
# Lokale WLAN-IP
# -------------------------------------------------
def get_ip():
interfaces = ["wlan0", "wlp2s0", "wlan1"]
for interface in interfaces:
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
ip = socket.inet_ntoa(
fcntl.ioctl(
s.fileno(),
0x8915,
struct.pack(
'256s',
interface[:15].encode('utf-8')
)
)[20:24]
)
return ip
except:
pass
return "Keine WLAN-IP"
# -------------------------------------------------
# Öffentliche WAN-IP
# -------------------------------------------------
def get_wan_ip():
try:
with urllib.request.urlopen(
"https://api.ipify.org",
timeout=5
) as response:
return response.read().decode("utf-8")
except:
return "Keine WAN-IP"
# -------------------------------------------------
# Ping aus InfluxDB lesen
# -------------------------------------------------
def get_ping():
try:
client = InfluxDBClient(
url=INFLUX_URL,
token=INFLUX_TOKEN,
org=INFLUX_ORG
)
query_api = client.query_api()
query = f'''
from(bucket: "{INFLUX_BUCKET}")
|> range(start: -24h)
|> filter(fn: (r) => r["_measurement"] == "speedtest")
|> filter(fn: (r) => r["_field"] == "ping")
|> last()
'''
tables = query_api.query(query)
for table in tables:
for record in table.records:
return float(record.get_value())
except:
pass
return -1
# -------------------------------------------------
# Verlauf der letzten Downloads
# -------------------------------------------------
def get_download_history():
try:
client = InfluxDBClient(
url=INFLUX_URL,
token=INFLUX_TOKEN,
org=INFLUX_ORG
)
query_api = client.query_api()
query = f'''
from(bucket: "{INFLUX_BUCKET}")
|> range(start: -24h)
|> filter(fn: (r) => r["_measurement"] == "speedtest")
|> filter(fn: (r) => r["_field"] == "download_bits")
|> sort(columns: ["_time"])
'''
tables = query_api.query(query)
values = []
for table in tables:
for record in table.records:
value = record.get_value()
if value is not None:
values.append(
value / 1000 / 1000
)
return values[-24:]
except Exception:
print(traceback.format_exc())
return []
# -------------------------------------------------
# Speedtest aus InfluxDB lesen
# -------------------------------------------------
def get_speedtest():
try:
client = InfluxDBClient(
url=INFLUX_URL,
token=INFLUX_TOKEN,
org=INFLUX_ORG
)
query_api = client.query_api()
# DOWNLOAD
query_down = f'''
from(bucket: "{INFLUX_BUCKET}")
|> range(start: -24h)
|> filter(fn: (r) => r["_measurement"] == "speedtest")
|> filter(fn: (r) => r["_field"] == "download_bits")
|> tail(n: 2)
'''
result_down = query_api.query(query_down)
down = 0
down_prev = 0
timestamp = "Keine Daten"
download_values = []
for table in result_down:
for record in table.records:
download_values.append(
record.get_value()
)
local_time = record.get_time().astimezone(
ZoneInfo("Europe/Berlin")
)
if len(download_values) >= 1:
down = download_values[-1]
if len(download_values) >= 2:
down_prev = download_values[-2]
now = time.localtime()
today = (
local_time.year == now.tm_year and
local_time.month == now.tm_mon and
local_time.day == now.tm_mday
)
yesterday_ts = time.time() - 86400
yesterday = time.localtime(yesterday_ts)
is_yesterday = (
local_time.year == yesterday.tm_year and
local_time.month == yesterday.tm_mon and
local_time.day == yesterday.tm_mday
)
if today:
timestamp = local_time.strftime(
"HEUTE %H:%M"
)
elif is_yesterday:
timestamp = local_time.strftime(
"GESTERN %H:%M"
)
else:
timestamp = local_time.strftime(
"%d.%m.%Y %H:%M"
)
# UPLOAD
query_up = f'''
from(bucket: "{INFLUX_BUCKET}")
|> range(start: -24h)
|> filter(fn: (r) => r["_measurement"] == "speedtest")
|> filter(fn: (r) => r["_field"] == "upload_bits")
|> tail(n: 2)
'''
result_up = query_api.query(query_up)
up = 0
up_prev = 0
upload_values = []
for table in result_up:
for record in table.records:
upload_values.append(
record.get_value()
)
if len(upload_values) >= 1:
up = upload_values[-1]
if len(upload_values) >= 2:
up_prev = upload_values[-2]
# bit/s -> Mbit/s
down = down / 1000 / 1000
up = up / 1000 / 1000
down_prev = down_prev / 1000 / 1000
up_prev = up_prev / 1000 / 1000
return down, up, down_prev, up_prev, timestamp
except Exception:
print(traceback.format_exc())
return 0, 0, 0, 0, "Influx Fehler"
# -------------------------------------------------
# Verlaufsgrafik
# -------------------------------------------------
def draw_graph(draw, values, x, y, w, h):
if len(values) < 2:
return
max_val = max(values)
if max_val <= 0:
return
# Dezente Hilfslinien
draw.line(
(x, y + h // 2, x + w, y + h // 2),
fill=(40, 40, 40)
)
draw.line(
(x, y + h, x + w, y + h),
fill=(60, 60, 60)
)
points = []
for i, val in enumerate(values):
px = x + int(i * (w / (len(values) - 1)))
py = y + h - int((val / max_val) * h)
points.append((px, py))
draw.line(points, fill="green", width=2)
# -------------------------------------------------
# UI zeichnen
# -------------------------------------------------
def draw_ui():
down, up, down_prev, up_prev, ts = get_speedtest()
ping = get_ping()
history = get_download_history()
img = Image.new("RGB", (WIDTH, HEIGHT), "black")
draw = ImageDraw.Draw(img)
# Farben
if down < 50:
down_color = "red"
elif down < 90:
down_color = "yellow"
else:
down_color = "green"
if up < 10:
up_color = "red"
elif up < 30:
up_color = "yellow"
else:
up_color = "green"
if ping < 0:
ping_color = "red"
elif ping > 100:
ping_color = "red"
elif ping > 50:
ping_color = "yellow"
else:
ping_color = "green"
# Header
draw.text(
(10, 5),
"Hintergasse 9A",
fill="white",
font=font_title
)
draw.text(
(10, 30),
f"WLAN: {get_ip()}",
fill="cyan",
font=font_small
)
draw.text(
(240, 30),
f"WAN: {get_wan_ip()}",
fill="orange",
font=font_small_bold
)
if ping >= 0:
draw.text(
(10, 55),
f"PING: {ping:.1f} ms",
fill=ping_color,
font=font_small_bold
)
else:
draw.text(
(10, 55),
"PING: FEHLER",
fill="red",
font=font_small_bold
)
draw.line(
(10, 80, WIDTH - 10, 80),
fill="white"
)
# DOWNLOAD
draw.text(
(20, 95),
"DOWNLOAD",
fill="green",
font=font_small
)
draw.text(
(20, 120),
f"{down:.1f} Mbit/s",
fill=down_color,
font=font_download
)
draw.text(
(25, 172),
f"({down_prev:.1f} Mbit/s)",
fill=(140, 140, 140),
font=font_delta
)
# UPLOAD
draw.text(
(20, 210),
"UPLOAD",
fill="orange",
font=font_small
)
draw.text(
(20, 235),
f"{up:.1f} Mbit/s",
fill=up_color,
font=font_upload
)
draw.text(
(25, 278),
f"({up_prev:.1f} Mbit/s)",
fill=(140, 140, 140),
font=font_delta
)
# Verlaufsgrafik
draw.text(
(250, 190),
"24H VERLAUF",
fill="gray",
font=font_small
)
draw_graph(
draw,
history,
250,
215,
200,
80
)
# Footer
draw.line(
(10, HEIGHT - 20, WIDTH - 10, HEIGHT - 20),
fill="white"
)
draw.text(
(10, HEIGHT - 18),
f"Messung: {ts}",
fill="gray",
font=font_small
)
return img
# -------------------------------------------------
# Framebuffer schreiben
# -------------------------------------------------
def write_fb(img):
arr = np.array(img)
r = (arr[:, :, 0] >> 3).astype(np.uint16)
g = (arr[:, :, 1] >> 2).astype(np.uint16)
b = (arr[:, :, 2] >> 3).astype(np.uint16)
rgb565 = (r << 11) | (g << 5) | b
data = rgb565.astype("<u2").tobytes()
with open(FB, "wb") as f:
f.write(data)
# -------------------------------------------------
# Hauptschleife
# -------------------------------------------------
while True:
try:
img = draw_ui()
write_fb(img)
except Exception:
print(traceback.format_exc())
time.sleep(60)