Skip to content
Snippets Groups Projects
Commit 6801c5e4 authored by Connor Jones's avatar Connor Jones
Browse files

Updated PiSidePython: More detail in comments

parent 4018e88e
No related branches found
No related tags found
No related merge requests found
......@@ -3,7 +3,6 @@ import time
import json
import datetime
import serial
import logging
import threading
from serial.serialutil import SerialException
import paho.mqtt.client as mqtt
......@@ -12,13 +11,9 @@ from grovepi import pinMode, digitalWrite, digitalRead, analogWrite, analogRead
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from AlarmHandler import getAlarms, print_alarms, addAlarm
import random
import string
# to be removed #
import random, string
#################
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
validDOW = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"] #icky
# Configuration
......@@ -46,6 +41,9 @@ buzzer_thread = None
bluetooth_device = None
# initialises communuication with the arduino
# NOTE: requires "sudo rfcomm connect hci0 <arduino mac address>" to already be started in the terminal to start
# (see BluetoothSetup.txt for help)
def setup_bluetooth(device_address='/dev/rfcomm0', baudrate=9600, retries=5):
for attempt in range(retries):
try:
......@@ -57,7 +55,8 @@ def setup_bluetooth(device_address='/dev/rfcomm0', baudrate=9600, retries=5):
time.sleep(2)
raise Exception("Failed to connect to Bluetooth device after several attempts")
# creates a file observer object to watch the alarms file for updates external to the code
# Would have allowed an external website to make adjustmentes to the alarms
def setup_file_observer(alarmFileName):
def on_modified(event):
if event.src_path.endswith(alarmFileName + ".json"):
......@@ -65,7 +64,6 @@ def setup_file_observer(alarmFileName):
global alarm_states
alarms = getAlarms(alarmFileName)
alarm_states = {alarm["AlarmName"]: {"ringing": False, "snoozed": False, "snooze_enabled": alarm.get("Snooze", False), "snooze_duration": 0} for alarm in alarms}
logging.debug(f"Reloaded alarms: {alarms}")
event_handler = FileSystemEventHandler()
event_handler.on_modified = on_modified
observer = Observer()
......@@ -73,20 +71,26 @@ def setup_file_observer(alarmFileName):
observer.start()
return observer
# parses through each alarm, checking if the current time and day on the alarm matches, and then rings the buzzer
def handle_alarms(current_time, alarms, alarm_states):
today_day = datetime.datetime.today().strftime("%A")
for alarm in alarms:
alarm_name = alarm["AlarmName"]
# if alarm should ring, and is not currently snoozed
if (alarm["Time"] == current_time and today_day in alarm["DaysOfTheWeek"]) and not alarm_states[alarm_name]["snoozed"]:
alarm_states[alarm_name]["ringing"] = True
# if the alarm should open the blinds, then tell the arduino to do so
if alarm['OpenBlinds']:
bluetooth_device.write(b'OPEN_BLINDS\n')
# if the alarm should turn on the lights, then send the details to the arduino
if alarm.get('Lights', {}).get('Enabled', False):
bluetooth_device.write(b'TURN_ON_LIGHTS\n')
bluetooth_device.flush()
# start a thread to allow the buzzer to run independent of the rest of the code
start_buzzer_thread()
# basic alarm sound, called by a buzzer_worker
def trigger_alarm():
notes = [
(262, 0.2),
......@@ -99,7 +103,7 @@ def trigger_alarm():
time.sleep(0.05)
return
# function to continuously trigger the alarm. Runs on it's own thread independent of the main loop
def buzzer_worker():
global alarm_states
while True:
......@@ -108,40 +112,40 @@ def buzzer_worker():
trigger_alarm()
time.sleep(0.5)
# initialises a thread to work independent of the main loop
# as long as there isnt a thread already working
def start_buzzer_thread():
global buzzer_thread
if not buzzer_thread or not buzzer_thread.is_alive():
buzzer_thread = threading.Thread(target=buzzer_worker, daemon=True)
buzzer_thread.start()
# checks the motion around the PIR sensor if the alarm is ringing
def check_motion(alarm_states, buzzer_pin):
motion = digitalRead(pir_sensor)
if motion:
logging.debug(f"Motion detected, stopping all ringing alarms")
# checks every alarm independently: ensures overlapping alarms are accounted for
for alarm, state in alarm_states.items():
if state["ringing"]:
logging.debug(f"Stopping ringing alarm: {alarm}")
digitalWrite(buzzer_pin, 0)
state["ringing"] = False
state["snoozed"] = True if state["snooze_enabled"] else False
state["snooze_duration"] = 0 if not state["snooze_enabled"] else 600
elif state["snoozed"] and motion: # If an alarm is snoozed and PIR detects motion
logging.debug(f"Deactivating snoozed alarm: {alarm}")
# if an alarm is snoozed and PIR detects motion
elif state["snoozed"] and motion:
state["snoozed"] = False
# reads the light from the bluetooth
def read_bluetooth(bluetooth_device):
if bluetooth_device.in_waiting > 0:
data = bluetooth_device.readline().decode().strip()
if "Light" in data:
try:
return int(data.split(":")[1])
except Exception as e:
logging.error(e)
return 0
except:
return 0
# updates the LCD display with the light level and current time
def update_lcd(light_level):
telemetry = {"light_level": light_level}
try:
......@@ -151,14 +155,14 @@ def update_lcd(light_level):
setText_norefresh(f"Light: {light_level} Lux\n{datetime.datetime.now().strftime('%H:%M:%S')}")
client.publish("v1/devices/me/telemetry", json.dumps(telemetry), qos=1)
# when the thingsboard sends a RPC message to the raspberry pi device
# it is handled here.
def on_message(client, userdata, message):
try:
payload = json.loads(message.payload.decode())
method = payload.get("method")
params = payload.get("params", {})
if method == "triggerAlarm":
logging.info("Activating Alarm")
# activates alarm instantly (does not wait for minute to pass)
addAlarm("my_alarms", False, "temp_alarm", [datetime.datetime.today().strftime("%A")], False, (False, (0, 0, 0), 0), int(datetime.datetime.now().strftime("%H%M")))
alarms = getAlarms('my_alarms')
......@@ -168,7 +172,6 @@ def on_message(client, userdata, message):
client.publish(f'v1/devices/me/rpc/response/1', json.dumps(response), qos=1)
elif method == "setAlarmtoOneMinute":
# activates an alarm in 1 minute, performs more like a normal alarm
logging.info("Set an alarm to 1 minute from now")
random_alarm_name = ''.join(random.choice(string.ascii_lowercase) for _ in range(10))
current_time_plus1 = int(datetime.datetime.now().strftime("%H%M")) + 1
lighting_details = (True, (255, 255, 255), 1)
......@@ -176,19 +179,14 @@ def on_message(client, userdata, message):
addAlarm("my_alarms", True, random_alarm_name, [today_day], True, lighting_details, current_time_plus1)
alarms = getAlarms('my_alarms')
alarm_states = {alarm["AlarmName"]: {"ringing": False, "snoozed": False, "snooze_enabled": alarm.get("Snooze", False), "snooze_duration": 0} for alarm in alarms}
print_alarms(alarms)
start_buzzer_thread()
elif method == "triggerServo":
#instantly activates servo
logging.info("Activating Servo")
bluetooth_device.write(b'OPEN_BLINDS\n')
elif method == "setSpecificAlarm":
logging.debug("Adding specific alarm")
alarm_name = payload.get("AlarmName", "Unnamed Alarm")
snooze = payload.get("Snooze", "Failed")
days_of_the_week = payload.get("DOW", "Failed") #["True", "False", "False", "False", "False", "False", "False"])
time_of_alarm = int(payload.get("Time", "0000"))
open_blinds = payload.get("OpenBlinds", False)
lights = payload.get("Lights", {})
......@@ -196,19 +194,9 @@ def on_message(client, userdata, message):
light_color = lights.get("Color", {"R": 0, "G": 0, "B": 0})
light_intensity = lights.get("Intensity", 1.0)
logging.info(f"Setting up alarm '{alarm_name}' with the following details:")
logging.info(f" - Snooze: {snooze}")
logging.info(f" - Time: {time_of_alarm}")
logging.info(f" - Days of the Week: {days_of_the_week}")
logging.info(f" - Open Blinds: {open_blinds}")
logging.info(f" - Lights Enabled: {lights_enabled}")
logging.info(f" - Light Color: {light_color}")
logging.info(f" - Light Intensity: {light_intensity}")
addAlarm("my_alarms", True, alarm_name, days_of_the_week, snooze, (lights_enabled, (light_color["R"], light_color["G"], light_color["B"]), light_intensity), time_of_alarm)
logging.info(f"Added new alarm: {alarm_name}")
except Exception as e:
logging.info("Error occurred: (on_message)")
logging.error(f"Error processing RPC request: {e}")
except:
pass
client.on_message = on_message
bluetooth_device = setup_bluetooth()
......@@ -242,19 +230,8 @@ def main():
if state["snooze_duration"] <= 0:
state["snoozed"] = False
state["ringing"] = True
logging.debug(f"Returning to ring after snooze for alarm: {alarm_name}")
start_buzzer_thread()
time.sleep(0.1)
except Exception as e:
logging.info("Error occurred: (main_loop)")
logging.error(f"Error occurred: {e}")
except:
main()
main()
#finally:
#
# digitalWrite(buzzer_pin, 0)
# client.loop_stop()
# client.disconnect()
# bluetooth_device.close()
# observer.stop()
# observer.join()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment