Skip to content
Snippets Groups Projects
Commit a77e694f authored by Ewan Crowle's avatar Ewan Crowle
Browse files

initial merge

parents e8b69339 60820c48
No related branches found
No related tags found
No related merge requests found
__pycache__
\ No newline at end of file
File moved
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from models import Customer, Base, ENGLISH_LANGUAGE
DATABASE_URL = 'sqlite:///local.db'
engine = create_engine(DATABASE_URL)
Session = sessionmaker(bind=engine)
session = Session()
# You don't need to specify the language, as the field has a default value,
# but as an example of completeness we will here
new_customer = Customer(id="0600077786", first_name="Jeyan", email="kanagaratnamj@cardiff.ac.uk", locale=ENGLISH_LANGUAGE)
session.add(new_customer)
session.commit()
print("Done")
\ No newline at end of file
from requests import request
from email_template import html
url = "https://api.useplunk.com/v1/send"
# Change me to a Secret API Key from https://app.useplunk.com/settings/api
# Do not commit me
token = ""
def send(to_email, name, points, total_points):
payload = {
"to": to_email,
"subject": "You've Earned Points Today",
"body": html.format(name=name, points=points, total_points=total_points),
"name": "Return&Earn",
}
headers = {
"Content-Type": "application/json",
"Authorization": f'Bearer {token}'
}
response = request("POST", url, json=payload, headers=headers)
print(response.text)
# Test
# send("@cardiff.ac.uk", "", 20, 200)
\ No newline at end of file
from models import ENGLISH_LANGUAGE, WELSH_LANGUAGE
messages = {
ENGLISH_LANGUAGE: {
"STATE_TRANSITION_MESSAGE": "One second...",
"STATE_ERROR_MESSAGE": "Sorry! Something went wrong!",
"STATE_NO_CUSTOMER_MESSAGE": "Scan your Boots card to start.",
"STATE_CUSTOMER_NOT_FOUND_MESSAGE": "Card not found. Try again.",
"STATE_NO_PRODUCT_MESSAGE": "Now, scan your empty product.",
"STATE_PRODUCT_NOT_FOUND_MESSAGE": "Product not found. Try again.",
"STATE_IDLE_MESSAGE": "No activity detected. Signing out.",
"STATE_DEPOSIT_LEFT_BIN_MESSAGE": "Put the item in the left bin.",
"STATE_DEPOSIT_RIGHT_BIN_MESSAGE": "Put the item in the right bin.",
"STATE_PRODUCT_DEPOSITED": "Return complete. Check your email.",
"STATE_BIN_FULL_MESSAGE": "I'll be back! I need emptying!",
},
WELSH_LANGUAGE: {
"STATE_TRANSITION_MESSAGE": "Un eiliad...",
"STATE_ERROR_MESSAGE": "Aeth rhywbeth o'i le!",
"STATE_NO_CUSTOMER_MESSAGE": "Sganiwch eich cerdyn Boots gyntaf.",
"STATE_CUSTOMER_NOT_FOUND_MESSAGE": "Heb ganfod cerdyn. Ceisiwch eto.",
"STATE_NO_PRODUCT_MESSAGE": "Nawr, sganiwch cynnyrch gwag.",
"STATE_PRODUCT_NOT_FOUND_MESSAGE": "Heb ganfod cynnyrch. Ceisiwch eto.",
"STATE_IDLE_MESSAGE": "Dim ymateb. Cofrestru allan.",
"STATE_DEPOSIT_LEFT_BIN_MESSAGE": "Rhowch yr eitem yn y bin chwith.",
"STATE_DEPOSIT_RIGHT_BIN_MESSAGE": "Rhowch yr eitem yn y bin dde.",
"STATE_PRODUCT_DEPOSITED": "Wedi gorffen. Gwiriwch eich e-bost.",
"STATE_BIN_FULL_MESSAGE": "Angen gwagio! Yn ôl yn fuan."
},
}
\ No newline at end of file
......@@ -2,7 +2,12 @@ from datetime import datetime
from sqlalchemy import Integer, String, DateTime
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import mapped_column
from uuid import uuid4
ENGLISH_LANGUAGE = "en"
WELSH_LANGUAGE = "cy"
DEFAULT_LANGUAGE = ENGLISH_LANGUAGE
class Base(DeclarativeBase):
pass
......@@ -14,6 +19,7 @@ class Customer(Base):
id = mapped_column(String, primary_key=True)
first_name = mapped_column(String(50), nullable=False)
email = mapped_column(String, nullable=False)
locale = mapped_column(String, nullable=False, default=DEFAULT_LANGUAGE)
class Product(Base):
......@@ -28,7 +34,7 @@ class Product(Base):
class Return(Base):
__tablename__ = "returns"
id = mapped_column(String, primary_key=True)
id = mapped_column(String, primary_key=True, default=lambda: str(uuid4()))
customer_id = mapped_column(String, nullable=False)
product_id = mapped_column(String, nullable=False)
return_date = mapped_column(DateTime, nullable=False, default=datetime.utcnow)
......
import paho.mqtt.client as mqtt
import json
# -----------------------------------------------------------------------------
# 2. ThingsBoard Configuration
# -----------------------------------------------------------------------------
# - We connect to a ThingsBoard server for storing and visualizing sensor data.
# - 'thingsboard.cs.cf.ac.uk' is the server address for your institution.
# - ACCESS_TOKEN is a unique string that authenticates this device to ThingsBoard.
# Ensure you use your own token to post data to your own ThingsBoard dashboard.
# -----------------------------------------------------------------------------
THINGSBOARD_HOST = 'thingsboard.cs.cf.ac.uk'
ACCESS_TOKEN = 'zHObHAhH5uTIwRrT8GF7' # Replace with your actual token as needed
# -----------------------------------------------------------------------------
# 3. MQTT Callbacks
# -----------------------------------------------------------------------------
# - These callback functions are automatically called by the paho-mqtt library
# in response to various MQTT events (publishing, receiving messages, connecting).
# -----------------------------------------------------------------------------
def on_publish(client, userdata, result):
"""
Called whenever a message is successfully published to the MQTT broker.
Prints 'Success' to confirm the publish event, helping you verify
that data actually reached the server.
"""
print("Success")
def on_message(client, userdata, msg):
"""
Called when the client receives a message on a subscribed topic.
For this script, we listen for RPC requests that instruct the device
to toggle the buzzer. The message payload is in JSON format, which we decode.
"""
print('Topic: ' + msg.topic + '\nMessage: ' + str(msg.payload))
data = json.loads(msg.payload)
# If the method is "setValue", we interpret "params" to be either True or False,
# which determines the buzzer's ON/OFF state.
if data['method'] == 'setValue':
print(data['params'])
buzzer_state['State'] = data['params']
# The grovepi.digitalWrite function sends a digital HIGH or LOW to the buzzer pin
# depending on whether we pass True (HIGH) or False (LOW).
grovepi.digitalWrite(buzzer, data['params'])
def on_connect(client, userdata, flags, rc, *extra_params):
"""
Called after the client attempts to connect to the MQTT broker.
rc (result code) = 0 means a successful connection.
This is a good place to log or print a message indicating the connection status.
"""
print('Connected with result code ' + str(rc))
# -----------------------------------------------------------------------------
# 5. MQTT Client Creation and Configuration
# -----------------------------------------------------------------------------
# - The paho.mqtt library needs a client instance to manage connections and
# message events. We'll attach our callback functions and connect to the
# ThingsBoard server using our access token.
# -----------------------------------------------------------------------------
client = mqtt.Client()
def init_mqtt():
# Provide the ACCESS_TOKEN for device authentication to ThingsBoard.
client.username_pw_set(ACCESS_TOKEN)
# Bind our callbacks for connect, publish, and message events.
client.on_connect = on_connect
client.on_publish = on_publish
client.on_message = on_message
# Connect to the ThingsBoard server on port 1883 (standard MQTT).
# The third parameter (60) is the keepalive interval in seconds.
client.connect(THINGSBOARD_HOST, 1883, 60)
# Subscribe to RPC commands from ThingsBoard to let remote users
# toggle the buzzer by sending "setValue" method calls.
client.subscribe('v1/devices/me/rpc/request/+')
# Start the MQTT network loop in a separate thread, so the script can continue
# running and processing sensor data while maintaining an MQTT connection.
client.loop_start()
def publish_return(new_return):
payload = {
event_type: "return",
data: new_return,
}
client.publish('v1/devices/me/telemetry', json.dumps(payload), qos=1)
File deleted
from flask import Flask, render_template, jsonify, request
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from models import Base, Return, Customer, Product
from core.models import Base, Return, Customer, Product
from datetime import datetime
app = Flask(__name__)
# Database configuration
SQLALCHEMY_DATABASE_URL = "sqlite:///./local.db"
SQLALCHEMY_DATABASE_URL = "sqlite:///local.sqlite"
engine = create_engine(SQLALCHEMY_DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
......
No preview for this file type
#!/usr/bin/env python3
import os
import time
import i2c
import scan
import uuid
import subprocess
import numpy as np
from picamera import PiCamera
from pyzbar.pyzbar import decode
from PIL import Image
import sqlalchemy as db
from sqlalchemy import URL
from sqlalchemy.orm import sessionmaker
from core import models
from core import emails
from core.messages import messages
from core.models import DEFAULT_LANGUAGE
#
# CONSTANTS (IN SECONDS)
#
POLL_INTERVAL = 1
MAX_IDLE_RESCANS = 30
MAX_IDLE_DEPOSIT = 30
def init_db_engine():
#
# IF YOU WANT TO USE THE UNIVERSITY'S HOSTED DATABASE,
# UNCOMMENT THE LINES BELOW.
#
# url_object = URL.create(
# "mysql+pymysql",
# username="c23030734",
# #
# # CHANGE THE DATABASE VALUES IF NEEDED OR COMMENT OUT THIS URL_OBJECT BLOCK
# # AND USE THE LOCAL SQLITE DB.
# # YOU MUST SPECIFY THE DB PASSWORD IN THE PYTHON RUN COMMAND LIKE:
# #
# # DB_PASSWORD=password python3 main.py
# #
# password=os.environ['DB_PASSWORD'],
# host="mariadb.cs.cf.ac.uk:3306",
# database="c23030734_group_17_project",
# )
# engine = db.create_engine(url_object)
engine = db.create_engine('sqlite:///local.sqlite', echo=True)
models.Base.metadata.create_all(engine)
return engine
def init_camera():
camera = PiCamera()
camera.resolution = (1920,1080)
camera.framerate = 24
camera.start_preview(fullscreen=False,window=(100,200,640, 480))
return camera
def init_lcd():
i2c.setRGB(0, 255, 0)
i2c.setText(messages[DEFAULT_LANGUAGE]["STATE_NO_CUSTOMER_MESSAGE"])
def lookup_customer(engine, customer_id):
print(f'Locating CUSTOMER: {customer_id}')
Session = sessionmaker(bind=engine)
session = Session()
customer = session.query(models.Customer).filter_by(id = customer_id).first()
return customer
def lookup_product(engine, product_id):
print(f'Locating PRODUCT: {product_id}')
Session = sessionmaker(bind=engine)
session = Session()
product = session.query(models.Product).filter_by(id = product_id).first()
return product
def create_return(engine, return_obj):
print('Creating return')
Session = sessionmaker(bind=engine)
session = Session()
try:
session.add(return_obj)
session.commit()
session.refresh(return_obj)
print('Successfully created return',return_obj.id)
except Exception as e:
session.rollback()
print(e)
finally:
session.close()
def main():
print('Initialising...')
engine = init_db_engine()
camera = init_camera()
init_lcd()
i2c.init_pins()
customer = None
# Store the customer ID here to prevent double scanning.
customer_id = None
product = None
returnObj = None
rescan_count = 0
try:
while True:
if customer == None or product == None:
# In this state, we are missing one of two key pieces of information.
# This is the default state.
#
# We collect this information using the camera, so we must take a photo here
# while this state is true.
# Capture the view of the camera.
camera.capture('tmp.jpg')
# Open the image,
img = Image.open('tmp.jpg')
# and overwrite the type with the binary data of the image instead.
img = np.array(img)
# Use pyzbar to extract a barcode from the image.
data = scan.detect_code(img)
if customer == None:
# Default back to the initial state.
i2c.setRGB(0, 255, 0)
i2c.setText(messages[DEFAULT_LANGUAGE]["STATE_NO_CUSTOMER_MESSAGE"])
if data:
i2c.setText(messages[DEFAULT_LANGUAGE]["STATE_TRANSITION_MESSAGE"])
# Locate the customer's record
customer = lookup_customer(engine, data)
if customer == None:
i2c.setRGB(255, 0, 0)
i2c.setText(messages[DEFAULT_LANGUAGE]["STATE_CUSTOMER_NOT_FOUND_MESSAGE"])
time.sleep(5)
else:
customer_id = data
rescan_count = 0
i2c.buzz()
else:
# It must be the product we don't have then.
# Default back to the initial state.
i2c.setRGB(0, 255, 0)
i2c.setText(messages[customer.locale]["STATE_NO_PRODUCT_MESSAGE"])
if rescan_count >= MAX_IDLE_RESCANS:
print("Product scan timed out after 30 attempted rescans.")
i2c.setText(messages[customer.locale]["STATE_IDLE_MESSAGE"])
time.sleep(5)
customer = None
customer_id = None
rescan_count = 0
else:
rescan_count = rescan_count + 1
if data:
if data != customer_id:
i2c.setText(messages[customer.locale]["STATE_TRANSITION_MESSAGE"])
# Locate the product's record.
product = lookup_product(engine, data)
if product == None:
i2c.setRGB(255, 0, 0)
i2c.setText(messages[customer.locale]["STATE_PRODUCT_NOT_FOUND_MESSAGE"])
time.sleep(5)
print('Attempting object detection')
camera.stop_preview()
camera.close()
env = os.environ.copy()
subprocess.run(["python3","Object_detection_picamera.py"],
cwd="/home/pi/tensorflow1/models/research/object_detection",
env=env)
camera = init_camera()
with open("/home/pi/detected_object.txt","r") as f:
result = f.read().strip()
print("Object detected:", result)
class TempProduct:
def __init__(self,name):
self.id = str(uuid.uuid4())
self.recycling_type = 0
self.reward_value = 10
product = TempProduct(result)
i2c.buzz()
else:
i2c.buzz()
# No other case needed, we wait for a barcode
time.sleep(POLL_INTERVAL)
else:
i2c.setRGB(0, 255, 0)
if product.recycling_type == 0:
i2c.setText(messages[customer.locale]["STATE_DEPOSIT_LEFT_BIN_MESSAGE"])
else:
i2c.setText(messages[customer.locale]["STATE_DEPOSIT_RIGHT_BIN_MESSAGE"])
# 30 seconds
deposit_window_end_time = time.time() + MAX_IDLE_DEPOSIT
while time.time() < deposit_window_end_time:
if i2c.is_motion():
i2c.setText(messages[customer.locale]["STATE_PRODUCT_DEPOSITED"])
i2c.buzz()
time.sleep(5)
new_return = models.Return(customer_id=customer.id,product_id=product.id,reward_value=product.reward_value)
create_return(engine, new_return)
time.sleep(5)
emails.send(customer.email, customer.first_name, product.reward_value, 200)
print(f"Succesful return. ID: {new_return.id}")
customer = None
customer_id = None
product = None
returnObj = None
rescan_count = 0
finally:
camera.stop_preview()
if __name__ == '__main__':
main()
......@@ -6,4 +6,8 @@ MarkupSafe==3.0.2
click==8.1.8
colorama==0.4.6
itsdangerous==2.2.0
blinker==1.9.0
\ No newline at end of file
blinker==1.9.0
numpy
picamera
pyzbar
PIL
\ No newline at end of file
File moved
File moved
File moved
File moved
File moved
File moved
File moved
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment