MQTT wkerned, files worden opgeschoond
This commit is contained in:
0
mqtt/__init__.py
Normal file
0
mqtt/__init__.py
Normal file
16
mqtt/mqtt_client.py
Normal file
16
mqtt/mqtt_client.py
Normal file
@@ -0,0 +1,16 @@
|
|||||||
|
# mqtt_client.py
|
||||||
|
import paho.mqtt.client as mqtt
|
||||||
|
|
||||||
|
def create_client(client_id, on_connect, on_message, broker="localhost", port=1883):
|
||||||
|
client = mqtt.Client(client_id)
|
||||||
|
client.on_connect = on_connect
|
||||||
|
client.on_message = on_message
|
||||||
|
|
||||||
|
client.connect(broker, port, 60)
|
||||||
|
return client
|
||||||
|
|
||||||
|
def start_loop(client):
|
||||||
|
try:
|
||||||
|
client.loop_forever()
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
print("Disconnecting from broker")
|
||||||
@@ -1,10 +1,7 @@
|
|||||||
import requests
|
import requests
|
||||||
import time
|
import time
|
||||||
import paho.mqtt.client as mqtt
|
from mqtt_client import create_client, start_loop # Importeer de aangepaste MQTT client module
|
||||||
from db_connect import database_connect
|
|
||||||
|
|
||||||
mqtt_broker = "localhost"
|
|
||||||
mqtt_port = 1883
|
|
||||||
publish_interval = 300 # Secondes om een aanvraag te doen
|
publish_interval = 300 # Secondes om een aanvraag te doen
|
||||||
|
|
||||||
api_endpoints = [
|
api_endpoints = [
|
||||||
@@ -17,9 +14,14 @@ api_endpoints = [
|
|||||||
{"url": "https://garden.inajar.nl/api/par_events/", "topic": "goodgarden/par_events"}
|
{"url": "https://garden.inajar.nl/api/par_events/", "topic": "goodgarden/par_events"}
|
||||||
]
|
]
|
||||||
|
|
||||||
client = mqtt.Client()
|
# Pas de on_connect en on_message functies aan indien nodig voor de publisher
|
||||||
client.connect(mqtt_broker, mqtt_port, 60)
|
def on_connect(client, userdata, flags, rc):
|
||||||
client.loop_start()
|
print("Connected with result code "+str(rc))
|
||||||
|
|
||||||
|
def on_message(client, userdata, msg):
|
||||||
|
print(f"Message: {msg.topic} {str(msg.payload)}")
|
||||||
|
|
||||||
|
client = create_client("publisher1", on_connect, on_message) # Gebruik een unieke client ID
|
||||||
|
|
||||||
def publish_to_mqtt(topic, data):
|
def publish_to_mqtt(topic, data):
|
||||||
"""
|
"""
|
||||||
@@ -35,34 +37,22 @@ def fetch_and_publish_data():
|
|||||||
for endpoint in api_endpoints:
|
for endpoint in api_endpoints:
|
||||||
url = endpoint["url"]
|
url = endpoint["url"]
|
||||||
mqtt_topic = endpoint["topic"]
|
mqtt_topic = endpoint["topic"]
|
||||||
access_token = "33bb3b42452306c58ecedc3c86cfae28ba22329c"
|
access_token = "33bb3b42452306c58ecedc3c86cfae28ba22329c" # Vervang door je echte toegangstoken
|
||||||
try:
|
try:
|
||||||
headers = {
|
headers = {"Authorization": f"Token {access_token}"}
|
||||||
"Authorization": f"Token {access_token}"
|
|
||||||
}
|
|
||||||
response = requests.get(url, headers=headers)
|
response = requests.get(url, headers=headers)
|
||||||
response.raise_for_status()
|
response.raise_for_status() # Zorgt ervoor dat HTTPError wordt opgeworpen voor slechte responses
|
||||||
data = response.json()
|
data = response.json()
|
||||||
print(f"Data from {url}:")
|
print(f"Data from {url}: {data}")
|
||||||
print(data)
|
|
||||||
# Publiceer naar MQTT voordat de data wordt geladen in de database
|
|
||||||
publish_to_mqtt(mqtt_topic, data)
|
publish_to_mqtt(mqtt_topic, data)
|
||||||
# Hier kan je de data laden naar je database
|
# load_data(data) # Zorg ervoor dat deze functie elders gedefinieerd is
|
||||||
load_data(data)
|
|
||||||
except requests.exceptions.RequestException as e:
|
except requests.exceptions.RequestException as e:
|
||||||
print(f"Error fetching data from {url}: {e}")
|
print(f"Error fetching data from {url}: {e}")
|
||||||
|
|
||||||
def load_data(data):
|
|
||||||
"""
|
|
||||||
Implementeer de database logica hier.
|
|
||||||
Deze functie moet mogelijk worden aangepast om te werken met verschillende datastructuren van verschillende endpoints.
|
|
||||||
"""
|
|
||||||
print("Data processing and loading logic goes here.")
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
client.loop_start() # Start de niet-blokkerende loop
|
||||||
while True:
|
while True:
|
||||||
fetch_and_publish_data()
|
fetch_and_publish_data()
|
||||||
print("Waiting for the next retrieval action...")
|
print("Waiting for the next retrieval action...")
|
||||||
time.sleep(publish_interval)
|
time.sleep(publish_interval)
|
||||||
|
|
||||||
client.loop_stop()
|
client.loop_stop()
|
||||||
|
|||||||
@@ -1,12 +1,11 @@
|
|||||||
import paho.mqtt.client as mqtt
|
from mqtt_client import create_client, start_loop
|
||||||
|
|
||||||
mqtt_broker = "localhost"
|
|
||||||
mqtt_port = 1883
|
|
||||||
|
|
||||||
# Lijst waarop je je wil subscriben
|
# Lijst waarop je je wil subscriben
|
||||||
mqtt_topics = ["goodgarden/devices", "goodgarden/relative_humidity"]
|
mqtt_topics = [
|
||||||
|
"goodgarden/devices",
|
||||||
|
"goodgarden/relative_humidity"
|
||||||
|
]
|
||||||
|
|
||||||
# Callback functie voor wanneer de client verbindt met de broker
|
|
||||||
def on_connect(client, userdata, flags, rc):
|
def on_connect(client, userdata, flags, rc):
|
||||||
print("Connected with result code " + str(rc))
|
print("Connected with result code " + str(rc))
|
||||||
# Abonneer op alle topics in de mqtt_topics lijst
|
# Abonneer op alle topics in de mqtt_topics lijst
|
||||||
@@ -14,21 +13,12 @@ def on_connect(client, userdata, flags, rc):
|
|||||||
client.subscribe(topic)
|
client.subscribe(topic)
|
||||||
print(f"Subscribed to {topic}")
|
print(f"Subscribed to {topic}")
|
||||||
|
|
||||||
# Callback functie voor wanneer een bericht is ontvangen van de server
|
|
||||||
def on_message(client, userdata, msg):
|
def on_message(client, userdata, msg):
|
||||||
# Decodeer de payload van bytes naar string
|
# Decodeer de payload van bytes naar string
|
||||||
message = msg.payload.decode()
|
message = msg.payload.decode()
|
||||||
print(f"Message received on topic {msg.topic}: {message}")
|
print(f"Message received on topic {msg.topic}: {message}")
|
||||||
# Hier kun je code toevoegen om iets te doen met het ontvangen bericht
|
# Hier kun je code toevoegen om iets te doen met het ontvangen bericht
|
||||||
# Bijvoorbeeld: de data opslaan, een actie uitvoeren, etc.
|
|
||||||
|
|
||||||
# Maak de MQTT Client aan
|
if __name__ == "__main__":
|
||||||
client = mqtt.Client()
|
client = create_client("subscriber1", on_connect, on_message) # Zorg voor een unieke client ID
|
||||||
client.on_connect = on_connect
|
start_loop(client)
|
||||||
client.on_message = on_message
|
|
||||||
|
|
||||||
# Verbind met de broker
|
|
||||||
client.connect(mqtt_broker, mqtt_port, 60)
|
|
||||||
|
|
||||||
# Start loop
|
|
||||||
client.loop_forever()
|
|
||||||
Reference in New Issue
Block a user