Skip to content
Snippets Groups Projects
Commit f2f60388 authored by Jeyan Kanagaratnam's avatar Jeyan Kanagaratnam
Browse files

Delete main.py

parent 2acdd54d
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/env python3
import time
import i2c
import scan
import messages
import models
import emails
import numpy as np
from picamera import PiCamera
from pyzbar.pyzbar import decode
from PIL import Image
import sqlalchemy as db
from sqlalchemy.orm import sessionmaker
import uuid
# All in seconds
POLL_INTERVAL = 1
MAX_IDLE_RESCANS = 6
MAX_IDLE_DEPOSIT = 30
def init_db_engine():
engine = db.create_engine('sqlite:///local.db', echo=True)
models.Base.metadata.create_all(engine)
return engine
def init_camera():
"""
Initialises the camera.
"""
camera = PiCamera()
camera.resolution = (1920,1080)
camera.framerate = 24
camera.start_preview(fullscreen=False,window=(100,200,640, 480))
return camera
def init_lcd():
"""
Initialises the LCD screen.
"""
i2c.setRGB(0, 255, 0)
i2c.setText(messages.STATE_NO_CUSTOMER_MESSAGE)
def lookup_customer(engine, customer_id):
print(f'Locating CUSTOMER: {customer_id}')
Session = sessionmaker(bind=engine)
session = Session()
customer = session.query(models.Customer).filter_by(id = customer_id).first()
return customer
def lookup_product(engine, product_id):
print(f'Locating PRODUCT: {product_id}')
Session = sessionmaker(bind=engine)
session = Session()
product = session.query(models.Product).filter_by(id = product_id).first()
return product
def create_return(engine, return_obj):
print('Creating return')
Session = sessionmaker(bind=engine)
session = Session()
try:
session.add(return_obj)
session.commit()
session.refresh(return_obj)
print('Successfully created return',return_obj.id)
except Exception as e:
session.rollback()
print(e)
finally:
session.close()
def main():
print('Initialising...')
engine = init_db_engine()
camera = init_camera()
init_lcd()
i2c.init_pins()
customer = None
# Store the customer ID here to prevent double scanning.
customer_id = None
product = None
returnObj = None
rescan_count = 0
try:
while True:
if customer == None or product == None:
# In this state, we are missing one of two key pieces of information.
# This is the default state.
#
# We collect this information using the camera, so we must take a photo here
# while this state is true.
# Capture the view of the camera.
camera.capture('tmp.jpg')
# Open the image,
img = Image.open('tmp.jpg')
# and overwrite the type with the binary data of the image instead.
img = np.array(img)
# Use pyzbar to extract a barcode from the image.
data = scan.detect_code(img)
if customer == None:
# Default back to the initial state.
i2c.setRGB(0, 255, 0)
i2c.setText(messages.STATE_NO_CUSTOMER_MESSAGE)
if data:
i2c.setText(messages.STATE_TRANSITION_MESSAGE)
# Locate the customer's record
customer = lookup_customer(engine, data)
if customer == None:
i2c.setRGB(255, 0, 0)
i2c.setText(messages.STATE_CUSTOMER_NOT_FOUND_MESSAGE)
time.sleep(5)
else:
customer_id = data
rescan_count = 0
i2c.buzz()
else:
# It must be the product we don't have then.
# Default back to the initial state.
i2c.setRGB(0, 255, 0)
i2c.setText(messages.STATE_NO_PRODUCT_MESSAGE)
if rescan_count >= MAX_IDLE_RESCANS:
print("Product scan timed out after 6 attempted rescans.")
customer = None
customer_id = None
rescan_count = 0
else:
rescan_count = rescan_count + 1
if data:
if data != customer_id:
i2c.setText(messages.STATE_TRANSITION_MESSAGE)
# Locate the product's record.
product = lookup_product(engine, data)
if product == None:
i2c.setRGB(255, 0, 0)
i2c.setText(messages.STATE_PRODUCT_NOT_FOUND_MESSAGE)
time.sleep(5)
else:
i2c.buzz()
# No other case needed, we wait for a barcode
time.sleep(POLL_INTERVAL)
else:
i2c.setRGB(0, 255, 0)
if product.recycling_type == 0:
i2c.setText(messages.STATE_DEPOSIT_LEFT_BIN_MESSAGE)
else:
i2c.setText(messages.STATE_DEPOSIT_RIGHT_BIN_MESSAGE)
# 30 seconds
deposit_window_end_time = time.time() + MAX_IDLE_DEPOSIT
while time.time() < deposit_window_end_time:
if i2c.is_motion():
i2c.setText(messages.STATE_PRODUCT_DEPOSITED)
i2c.buzz()
time.sleep(5)
new_return = models.Return(customer_id=customer.id,product_id=product.id,reward_value=product.reward_value)
create_return(engine, new_return)
time.sleep(5)
emails.send(customer.email, customer.first_name, product.reward_value, 200)
print(f"Succesful return. ID: {new_return.id}")
customer = None
customer_id = None
product = None
returnObj = None
rescan_count = 0
finally:
camera.stop_preview()
if __name__ == '__main__':
main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment