From 82d7f127f4139751167d2daecb900683af10fa6d Mon Sep 17 00:00:00 2001 From: Atilla Date: Tue, 12 Mar 2024 12:13:35 +0100 Subject: [PATCH] MQTT wkerned, files worden opgeschoond --- mqtt/__init__.py | 0 mqtt/__pycache__/__init__.cpython-311.pyc | Bin 0 -> 198 bytes mqtt/__pycache__/mqtt_client.cpython-311.pyc | Bin 0 -> 1074 bytes mqtt/mqtt_client.py | 16 +++++++ mqtt/publisher.py | 44 +++++++------------ mqtt/subscribe.py | 26 ++++------- 6 files changed, 41 insertions(+), 45 deletions(-) create mode 100644 mqtt/__init__.py create mode 100644 mqtt/__pycache__/__init__.cpython-311.pyc create mode 100644 mqtt/__pycache__/mqtt_client.cpython-311.pyc create mode 100644 mqtt/mqtt_client.py diff --git a/mqtt/__init__.py b/mqtt/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/mqtt/__pycache__/__init__.cpython-311.pyc b/mqtt/__pycache__/__init__.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..46f90e1a53b0b8a93b7ff710b6a95093291736bd GIT binary patch literal 198 zcmZ3^%ge<81jQO3QbF`%5CH>>P{wCAAY(d13PUi1CZpd#KKVqQv2aB@a|eoj_mVv&N8v96IxOn`BW zqmN^Zdwzb3dty-vP-kvoNl8q6d}dx|NqoFsLFF$Fo80`A(wtPgB37VHAh#6r1BnmJ PjEsyQ7+^#ZGf)fwN;fh` literal 0 HcmV?d00001 diff --git a/mqtt/__pycache__/mqtt_client.cpython-311.pyc b/mqtt/__pycache__/mqtt_client.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..8af0f8e522fd66231332330f6ce33fb29d9961ef GIT binary patch literal 1074 zcmZ`&%}X0W6n~T5jhkq!u`ShFX{b_IMGt=TP^2IgrL?9+=|K&Ib$8U*-JNuH{VH1s zBJ|QjZ+i4(DPH?G^qiDHWkC>n@=^j`f~UUOd{mJ>X5YN`=6%ik&3;WJ;((-Rf9MYa z@LMPiiqwUR$0+QB2_~_?CPsvS9a$s<%%~~vkwwW=_Fz#q$vkK+=OQQ_#4Kj$)*5rY z^Q9I5L#gKyn~){^F1C^FgNJn+wBD3`I70cjsee>_;w#xkPvA#T6Inv?l|#89U)_Dl z$G(ox@3`qpz*n%-NcwyNT{R_;?f0G+M4wwZ>Ucq%Iaz}_4mE1BO}58WTX!toF*EbV8e`VFuJhE$=+MYm=FMnk^3`N!hB0$S=O#vqenXb6RT<9j1POyv z-J^92Phwp$5m(%?0H=xM=cR-0QlhV%=qpU0bUi9em*u;S{Y;L3O7Et>JUJLU>@3Os zWx2m7_n)*T3$qs(4@MIPR7BbkzB~&15Cb0$QMyS-XgY41@wvb}^nure5TGCUZ35nv z$AMs7gPp)a7`Fx6(gJJ|gzZE3RL-q~k#km4E1cP>+VC|gXtNm0XIB`fn+QhoCEZzO zIJFm!M>*ffdx4VYImgoozlE*2;$kV@fjo%2p3c3ji0Ajv&?e63kQJa3k(9v`wf&dc z^Hc39sl8>jx7b+K)7A$SAj;rBcB&?d?OI74D60d-E2|=*TeVtsMef}@U2xDnE`2BC zk{01&q+HAvUt~vKUt_}p=HVK`9RDE*I}VWz`}+pGQ&>gwm+FMg%nge^DUt0G59X9cda4w>LxZnCi4Uji}u3jhEB literal 0 HcmV?d00001 diff --git a/mqtt/mqtt_client.py b/mqtt/mqtt_client.py new file mode 100644 index 0000000..b14b472 --- /dev/null +++ b/mqtt/mqtt_client.py @@ -0,0 +1,16 @@ +# mqtt_client.py +import paho.mqtt.client as mqtt + +def create_client(client_id, on_connect, on_message, broker="localhost", port=1883): + client = mqtt.Client(client_id) + client.on_connect = on_connect + client.on_message = on_message + + client.connect(broker, port, 60) + return client + +def start_loop(client): + try: + client.loop_forever() + except KeyboardInterrupt: + print("Disconnecting from broker") diff --git a/mqtt/publisher.py b/mqtt/publisher.py index df4c4c2..d5c48c7 100644 --- a/mqtt/publisher.py +++ b/mqtt/publisher.py @@ -1,11 +1,8 @@ import requests import time -import paho.mqtt.client as mqtt -from db_connect import database_connect +from mqtt_client import create_client, start_loop # Importeer de aangepaste MQTT client module -mqtt_broker = "localhost" -mqtt_port = 1883 -publish_interval = 300 # Secondes om een aanvraag te doen +publish_interval = 300 # Secondes om een aanvraag te doen api_endpoints = [ {"url": "https://garden.inajar.nl/api/devices/", "topic": "goodgarden/devices"}, @@ -17,9 +14,14 @@ api_endpoints = [ {"url": "https://garden.inajar.nl/api/par_events/", "topic": "goodgarden/par_events"} ] -client = mqtt.Client() -client.connect(mqtt_broker, mqtt_port, 60) -client.loop_start() +# Pas de on_connect en on_message functies aan indien nodig voor de publisher +def on_connect(client, userdata, flags, rc): + print("Connected with result code "+str(rc)) + +def on_message(client, userdata, msg): + print(f"Message: {msg.topic} {str(msg.payload)}") + +client = create_client("publisher1", on_connect, on_message) # Gebruik een unieke client ID def publish_to_mqtt(topic, data): """ @@ -35,34 +37,22 @@ def fetch_and_publish_data(): for endpoint in api_endpoints: url = endpoint["url"] mqtt_topic = endpoint["topic"] - access_token = "33bb3b42452306c58ecedc3c86cfae28ba22329c" + access_token = "33bb3b42452306c58ecedc3c86cfae28ba22329c" # Vervang door je echte toegangstoken try: - headers = { - "Authorization": f"Token {access_token}" - } + headers = {"Authorization": f"Token {access_token}"} response = requests.get(url, headers=headers) - response.raise_for_status() + response.raise_for_status() # Zorgt ervoor dat HTTPError wordt opgeworpen voor slechte responses data = response.json() - print(f"Data from {url}:") - print(data) - # Publiceer naar MQTT voordat de data wordt geladen in de database + print(f"Data from {url}: {data}") publish_to_mqtt(mqtt_topic, data) - # Hier kan je de data laden naar je database - load_data(data) + # load_data(data) # Zorg ervoor dat deze functie elders gedefinieerd is except requests.exceptions.RequestException as e: print(f"Error fetching data from {url}: {e}") -def load_data(data): - """ - Implementeer de database logica hier. - Deze functie moet mogelijk worden aangepast om te werken met verschillende datastructuren van verschillende endpoints. - """ - print("Data processing and loading logic goes here.") - if __name__ == "__main__": + client.loop_start() # Start de niet-blokkerende loop while True: fetch_and_publish_data() print("Waiting for the next retrieval action...") time.sleep(publish_interval) - -client.loop_stop() + client.loop_stop() diff --git a/mqtt/subscribe.py b/mqtt/subscribe.py index 7fa4e71..4599c01 100644 --- a/mqtt/subscribe.py +++ b/mqtt/subscribe.py @@ -1,12 +1,11 @@ -import paho.mqtt.client as mqtt - -mqtt_broker = "localhost" -mqtt_port = 1883 +from mqtt_client import create_client, start_loop # Lijst waarop je je wil subscriben -mqtt_topics = ["goodgarden/devices", "goodgarden/relative_humidity"] +mqtt_topics = [ + "goodgarden/devices", + "goodgarden/relative_humidity" +] -# Callback functie voor wanneer de client verbindt met de broker def on_connect(client, userdata, flags, rc): print("Connected with result code " + str(rc)) # Abonneer op alle topics in de mqtt_topics lijst @@ -14,21 +13,12 @@ def on_connect(client, userdata, flags, rc): client.subscribe(topic) print(f"Subscribed to {topic}") -# Callback functie voor wanneer een bericht is ontvangen van de server def on_message(client, userdata, msg): # Decodeer de payload van bytes naar string message = msg.payload.decode() print(f"Message received on topic {msg.topic}: {message}") # Hier kun je code toevoegen om iets te doen met het ontvangen bericht - # Bijvoorbeeld: de data opslaan, een actie uitvoeren, etc. -# Maak de MQTT Client aan -client = mqtt.Client() -client.on_connect = on_connect -client.on_message = on_message - -# Verbind met de broker -client.connect(mqtt_broker, mqtt_port, 60) - -# Start loop -client.loop_forever() +if __name__ == "__main__": + client = create_client("subscriber1", on_connect, on_message) # Zorg voor een unieke client ID + start_loop(client) \ No newline at end of file