diff --git a/mqtt/__init__.py b/mqtt/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/mqtt/mqtt_client.py b/mqtt/mqtt_client.py new file mode 100644 index 0000000..b14b472 --- /dev/null +++ b/mqtt/mqtt_client.py @@ -0,0 +1,16 @@ +# mqtt_client.py +import paho.mqtt.client as mqtt + +def create_client(client_id, on_connect, on_message, broker="localhost", port=1883): + client = mqtt.Client(client_id) + client.on_connect = on_connect + client.on_message = on_message + + client.connect(broker, port, 60) + return client + +def start_loop(client): + try: + client.loop_forever() + except KeyboardInterrupt: + print("Disconnecting from broker") diff --git a/mqtt/publisher.py b/mqtt/publisher.py index df4c4c2..d5c48c7 100644 --- a/mqtt/publisher.py +++ b/mqtt/publisher.py @@ -1,11 +1,8 @@ import requests import time -import paho.mqtt.client as mqtt -from db_connect import database_connect +from mqtt_client import create_client, start_loop # Importeer de aangepaste MQTT client module -mqtt_broker = "localhost" -mqtt_port = 1883 -publish_interval = 300 # Secondes om een aanvraag te doen +publish_interval = 300 # Secondes om een aanvraag te doen api_endpoints = [ {"url": "https://garden.inajar.nl/api/devices/", "topic": "goodgarden/devices"}, @@ -17,9 +14,14 @@ api_endpoints = [ {"url": "https://garden.inajar.nl/api/par_events/", "topic": "goodgarden/par_events"} ] -client = mqtt.Client() -client.connect(mqtt_broker, mqtt_port, 60) -client.loop_start() +# Pas de on_connect en on_message functies aan indien nodig voor de publisher +def on_connect(client, userdata, flags, rc): + print("Connected with result code "+str(rc)) + +def on_message(client, userdata, msg): + print(f"Message: {msg.topic} {str(msg.payload)}") + +client = create_client("publisher1", on_connect, on_message) # Gebruik een unieke client ID def publish_to_mqtt(topic, data): """ @@ -35,34 +37,22 @@ def fetch_and_publish_data(): for endpoint in api_endpoints: url = endpoint["url"] mqtt_topic = endpoint["topic"] - access_token = "33bb3b42452306c58ecedc3c86cfae28ba22329c" + access_token = "33bb3b42452306c58ecedc3c86cfae28ba22329c" # Vervang door je echte toegangstoken try: - headers = { - "Authorization": f"Token {access_token}" - } + headers = {"Authorization": f"Token {access_token}"} response = requests.get(url, headers=headers) - response.raise_for_status() + response.raise_for_status() # Zorgt ervoor dat HTTPError wordt opgeworpen voor slechte responses data = response.json() - print(f"Data from {url}:") - print(data) - # Publiceer naar MQTT voordat de data wordt geladen in de database + print(f"Data from {url}: {data}") publish_to_mqtt(mqtt_topic, data) - # Hier kan je de data laden naar je database - load_data(data) + # load_data(data) # Zorg ervoor dat deze functie elders gedefinieerd is except requests.exceptions.RequestException as e: print(f"Error fetching data from {url}: {e}") -def load_data(data): - """ - Implementeer de database logica hier. - Deze functie moet mogelijk worden aangepast om te werken met verschillende datastructuren van verschillende endpoints. - """ - print("Data processing and loading logic goes here.") - if __name__ == "__main__": + client.loop_start() # Start de niet-blokkerende loop while True: fetch_and_publish_data() print("Waiting for the next retrieval action...") time.sleep(publish_interval) - -client.loop_stop() + client.loop_stop() diff --git a/mqtt/subscribe.py b/mqtt/subscribe.py index 7fa4e71..4599c01 100644 --- a/mqtt/subscribe.py +++ b/mqtt/subscribe.py @@ -1,12 +1,11 @@ -import paho.mqtt.client as mqtt - -mqtt_broker = "localhost" -mqtt_port = 1883 +from mqtt_client import create_client, start_loop # Lijst waarop je je wil subscriben -mqtt_topics = ["goodgarden/devices", "goodgarden/relative_humidity"] +mqtt_topics = [ + "goodgarden/devices", + "goodgarden/relative_humidity" +] -# Callback functie voor wanneer de client verbindt met de broker def on_connect(client, userdata, flags, rc): print("Connected with result code " + str(rc)) # Abonneer op alle topics in de mqtt_topics lijst @@ -14,21 +13,12 @@ def on_connect(client, userdata, flags, rc): client.subscribe(topic) print(f"Subscribed to {topic}") -# Callback functie voor wanneer een bericht is ontvangen van de server def on_message(client, userdata, msg): # Decodeer de payload van bytes naar string message = msg.payload.decode() print(f"Message received on topic {msg.topic}: {message}") # Hier kun je code toevoegen om iets te doen met het ontvangen bericht - # Bijvoorbeeld: de data opslaan, een actie uitvoeren, etc. -# Maak de MQTT Client aan -client = mqtt.Client() -client.on_connect = on_connect -client.on_message = on_message - -# Verbind met de broker -client.connect(mqtt_broker, mqtt_port, 60) - -# Start loop -client.loop_forever() +if __name__ == "__main__": + client = create_client("subscriber1", on_connect, on_message) # Zorg voor een unieke client ID + start_loop(client) \ No newline at end of file