Merge branch 'patch-4' into 'main'

Draft: > "Automatische Live-Aktualisierung der Verspätungen eingeführt"

See merge request oeffi/oeffi!24
This commit is contained in:
Anton Cäsar Heinrich Theodor Bracht 2025-04-13 14:45:35 +00:00
commit bd0faab0a0
3 changed files with 58 additions and 1 deletions

View file

@ -0,0 +1,38 @@
import requests
import time
from datetime import datetime
# API-Endpunkt für Abfahrtszeiten (aktualisieren mit der echten URL)
API_URL = "https://api.deinverkehrsverbund.de/abfahrtszeiten"
# Funktion, um Abfahrtszeiten sicher abzurufen
def get_abfahrtszeiten(station_id):
try:
response = requests.get(f"{API_URL}/{station_id}", timeout=10)
response.raise_for_status()
return response.json()
except requests.exceptions.Timeout:
print("Fehler: Die Anfrage hat zu lange gedauert.")
except requests.exceptions.HTTPError as e:
print(f"HTTP-Fehler: {e}")
except requests.exceptions.RequestException as e:
print(f"Netzwerkfehler: {e}")
return []
# Automatische Aktualisierung der Abfahrtszeiten
def update_abfahrtszeiten_periodisch(station_id, interval=10):
while True:
print(f"[{datetime.now()}] Aktualisiere Abfahrtszeiten für Station {station_id}...")
abfahrten = get_abfahrtszeiten(station_id)
if abfahrten:
print(f"Neue Abfahrtszeiten: {abfahrten}")
else:
print("Keine Abfahrtszeiten verfügbar.")
time.sleep(interval * 60) # Wartezeit in Minuten
# Beispielstation und Start des Skripts
if __name__ == "__main__":
station_id = "obstwiesen_id" # Hier mit echter Stations-ID ersetzen
update_abfahrtszeiten_periodisch(station_id, interval=10)

Binary file not shown.

After

Width:  |  Height:  |  Size: 232 KiB

View file

@ -1,4 +1,23 @@
/*
import requests import time from datetime import datetime
API-Endpunkt für Abfahrtszeiten (aktualisieren mit der echten URL)
API_URL = "https://api.deinverkehrsverbund.de/abfahrtszeiten"
def get_abfahrtszeiten(station_id): """Holt die aktuellen Abfahrtszeiten von der API.""" try: response = requests.get(f"{API_URL}/{station_id}", timeout=10) response.raise_for_status() return response.json() except requests.exceptions.Timeout: print("Fehler: Die Anfrage hat zu lange gedauert.") except requests.exceptions.HTTPError as e: print(f"HTTP-Fehler: {e}") except requests.exceptions.RequestException as e: print(f"Netzwerkfehler: {e}") return []
def update_abfahrtszeiten_periodisch(station_id, interval=10): """Aktualisiert die Abfahrtszeiten in regelmäßigen Abständen.""" while True: print(f"[{datetime.now()}] Aktualisiere Abfahrtszeiten für Station {station_id}...") abfahrten = get_abfahrtszeiten(station_id)
if abfahrten:
print(f"Neue Abfahrtszeiten: {abfahrten}")
else:
print("Keine Abfahrtszeiten verfügbar.")
time.sleep(interval * 60) # Wartezeit in Minuten
if name == "main": station_id = "obstwiesen_id" # Hier mit echter Stations-ID ersetzen update_abfahrtszeiten_periodisch(station_id, interval=10)
/*
* Copyright the original author or authors.
*
* This program is free software: you can redistribute it and/or modify