Skip to content
Snippets Groups Projects
Commit ff84e173 authored by Evan Jones's avatar Evan Jones
Browse files

Replace main.py

parent 8010ae67
Branches main
No related tags found
No related merge requests found
......@@ -3,7 +3,9 @@ from grove_rgb_lcd import *
import time
import datetime
import threading
import paho.mqtt.client as mqtt
import json
# === Variables ===
# Port Assignments
light_port = 1 # A1
......@@ -11,6 +13,11 @@ pir_port = 4 # D4
button_port = 3 # D2 (FOR SOME REASON THIS BOARD THINKS D3 IS D2)
buzzer_port = 7 # D7
# Thingsboard Connection
client = None
THINGSBOARD_HOST = "thingsboard.cs.cf.ac.uk"
ACCESS_TOKEN = "ad9EfCbasXFLokL3tkC1"
# Notification Preferences
preferences = {"Thingsboard": True,
"Visual": True,
......@@ -27,9 +34,17 @@ light_trigger_delay = 15 # Scans every 0.2 seconds
# 3/0.2 = 15
# This means if we want a delay of 3 seconds to use a TriggerDelay value of 15
# Detection variables
# motion variables
motion_detected = False
motion_counter = 0
no_motion_counter = 0
last_motion_time = 0
motion_trigger_delay = 7
no_motion_release_delay = 15 # 15 no motion reads to reset
motion_cooldown = 3 #3s between alerts
# button variables
last_button_state = 1 # For state change detection
# === Initilisation Functions ===
......@@ -39,6 +54,20 @@ def initialise_sensors():
pinMode(buzzer_port, "OUTPUT")
return
def initilise_thingsboard():
global client
try:
client = mqtt.Client()
client.username_pw_set(ACCESS_TOKEN)
client.connect(THINGSBOARD_HOST, 1883, 60)
client.loop_start()
client.subscribe("v1/devices/me/attributes")
client.on_message = handle_attribute_update
log_event("INFO","Connected to thingsboard")
except Exception as e:
log_event("ERROR",f"MQTT Connection failed: {e}")
return
# === Notification Functions ===
def notify(preferences, message = "visitor", duration = 0.2):
if preferences.get("Thingsboard"):
......@@ -78,28 +107,37 @@ def detect_button_press():
set_display("Ringing")
notify(preferences)
time.sleep(2)
set_display("Rung")
set_display("Doorbell System Ready")
last_button_state = button
return
def detect_motion():
global motion_detected, no_motion_counter
global motion_detected, motion_counter, last_motion_time, no_motion_counter
motion = digitalRead(pir_port)
if motion == 1 and not motion_detected:
log_event("EVENT", "Motion detected!")
motion_detected = True
now = time.time()
if motion == 1:
motion_counter += 1
no_motion_counter = 0
notify(preferences, message="Motion Detected")
elif motion == 0:
no_motion_counter += 1
if motion_detected and no_motion_counter > 5:
log_event("EVENT", "Motion stopped.")
motion_detected = False
if motion_counter >= motion_trigger_delay and not motion_detected:
if now - last_motion_time >= motion_cooldown:
motion_detected = True
last_motion_time = now
print("MOTION")
else:
if motion_detected:
no_motion_counter += 1
if no_motion_counter >= no_motion_release_delay:
motion_detected = False
motion_counter = 0
print("MOTION ENDED")
else:
motion_counter = 0
no_motion_counter = 0
def detect_light_level():
global light_dark_counter, light_trigger_delay, night_mode
light = analogRead(light_port)
......@@ -112,7 +150,7 @@ def detect_light_level():
else:
if night_mode:
#log_event("EVENT", "SIGNIFICANT LIGHT DETECTED")
adjust_lights()
light_dark_counter = 0
else:
light_dark_counter = 0 # Reset if it's bright
......@@ -178,6 +216,7 @@ def self_check():
try:
set_display("Running self check...")
time.sleep(1)
log_event("PASS", "LCD ON")
status["LCD"] = True
except:
log_event("ERROR", "LCD not responding.")
......@@ -185,7 +224,7 @@ def self_check():
# Check PIR
try:
motion = digitalRead(pir_port)
print(f"[PASS] PIR motion sensor: {motion}")
log_event("PASS", f"PIR motion sensor: {motion}")
status["PIR"] = True
except:
log_event("ERROR", "PIR motion sensor not responding")
......@@ -219,7 +258,40 @@ def self_check():
set_display("SENSORS FAILED: ".join(failed))
log_event("FAILED", f"Sensors failed: {', '.join(failed)}")
return False
def send_data_thingsboard(payload):
global client
if not client:
log_event("ERROR","MQTT Client not initilised")
return
try:
client.publish("v1/devices/me/telemetry", json.dumps(payload), qos=1)
log_event("THINGSBOARD",f"Data sent: {payload}")
except Exception as e:
log_event("ERROR",f"failed to send data to thingsboard: {e}")
def handle_attribute_update(client, userdata, msg):
global preferences
try:
data = json.loads(msg.payload.decode("utf-8"))
updated = []
for key in preferences:
if key in data:
preferences[key] = data[key]
updated.append(f"{key} = {data[key]}")
if updated:
log_event("THINGSBOARD", "Preferences Updated: " + ", ".join(updated))
except Exception as e:
log_event("ERROR",f"Failed to handle attribute update: {e}")
def report_preferences():
try:
client.publish("v1/devices/me/attributes", json.dumps(preferences), qos =1)
log_event("THINGSBOARD","Reported current preferences")
except Exception as e:
log_event("ERROR","Failed to report preferences: {e}")
# === Main Reaction Loop ===
def main_loop():
print("System Starting")
......@@ -230,7 +302,7 @@ def main_loop():
detect_button_press()
detect_motion()
detect_light_level()
print("[DEBUG]: LOOP")
#print("[DEBUG]: LOOP")
time.sleep(0.2)
except KeyboardInterrupt:
......@@ -240,5 +312,7 @@ def main_loop():
# Run
if __name__ == "__main__":
initialise_sensors()
initilise_thingsboard()
report_preferences()
self_check()
main_loop()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment