Skip to content
Snippets Groups Projects
Commit 112bd3cb authored by zsh28's avatar zsh28
Browse files

iot files commited all

parent 305af77f
Branches
No related tags found
No related merge requests found
import io
import time
from picamera import PiCamera
import grovepi
import mail
from flask import Flask, Response, render_template, request
import serial
import threading
app = Flask(__name__) # Correction: use '__name__' instead of 'name'
# Create a serial object globally if you intend to use it across multiple routes without opening/closing frequently.
ser = serial.Serial('/dev/ttyACM0', 9600)
redirect_status = None #None, 'to_target', 'to_home'
image_path = "/home/pi/Desktop/ringbellimg.jpg"
# SIG,NC,VCC,GND
# set I2C to use the hardware bus
grovepi.set_bus("RPI_1")
buzzer = 8 # Assign grove buzzer port to D7
grovepi.pinMode(buzzer,"OUTPUT")
ultrasonic_ranger = 7 # Assign sonic ranger port to D7
last_buzz_time = 0
cooldown_period = 7
#motionsensor = 2
#button = 3
#camera = PiCamera()
take_photo = False
def checkMotion():
global take_photo
global last_time_motion_detected
last_time_motion_detected = None
while True:
current_time = time.time()
global last_buzz_time
# Buzz for 1 second
distance = grovepi.ultrasonicRead(ultrasonic_ranger) # Checks distance
#sys.stdout.write("The distance is \r%d" %distance) # Prints distance
#print("The distance is " + distance) # Prints distance
#print("The distance is \r%d" %distance) # Prints distance
#if ((time.time() > last_buzz_time + cooldown_period ) and (distance <= 10)):
if distance <= 10:
#Video
#print("Should Redirect to /video_page")
#time = Time
#if time higher than 5
# long buzz
# send email
if last_time_motion_detected is None:
last_time_motion_detected = current_time # Start timer when motion first detected
last_time_bell_rung = current_time
if current_time - last_time_motion_detected >= 5:
if current_time - 5 > last_time_bell_rung:
last_time_bell_rung = current_time
grovepi.digitalWrite(buzzer, 1)
time.sleep(1)
grovepi.digitalWrite(buzzer, 0)
mail.send_email("Doorbell rung!", "Someone is at your door. To watch the video feed, go here: [LINK]", image_path)
redirect_status = 'to_target' # Takes user to videofeed page
# RING DOORBELL
elif (time.time() > last_buzz_time + cooldown_period ) and (distance <= 100):
last_time_motion_detected = None
#Picture
take_photo = True
# redirect_status = 'to_home' # Takes user home
#Buzzer buzzes
last_buzz_time = time.time()
grovepi.digitalWrite(buzzer,1)
time.sleep(0.1)
grovepi.digitalWrite(buzzer,0)
mail.send_email("Activity Detected!", "Motion detected outside your door. To watch the video feed, go here: [LINK]", image_path)
##print("Take photo set to true")
#print("Should Redirect to /")
#camera.capture(image_path)
#mail.send_email(image_path)
else:
last_time_motion_detected = None
time.sleep(0.05)
# if time.time() > last_buzz_time + cooldown_period and distance > 50 :
# grovepi.digitalWrite(buzzer,1)
# last_buzz_time = time.time()
# time.sleep(0.1)
# grovepi.digitalWrite(buzzer,0)#
# #camera.start_preview()
# time.sleep(0.5)
# image_path = "/home/pi/Desktop/ringbellimg.jpg"
# camera.capture(image_path)
# camera.stop_preview()
# mail.send_email(image_path)
def run_checkMotion_thread():
motion_thread = threading.Thread(target=checkMotion)
motion_thread_daemon = True
motion_thread.start()
@app.route('/check-redirect')
def check_redirect():
global redirect_status
if redirect_status == 'to_target':
return jsonify({"redirect": True, "url": url_for('video_page')})
elif redirect_status == 'to_home':
redirect_status = None
return jsonify({"redirect": True, "url": url_for('index')})
return jsonify({"redirect": False,})
def generate_frames():
global take_photo
with PiCamera() as camera:
camera.resolution = (640, 480)
camera.framerate = 24
stream = io.BytesIO()
start_time = time.time() # Fixed typo in variable name
duration = 3600 # Duration for how long to stream, adjust as needed
print("HAPPENED")
for frame in camera.capture_continuous(stream, 'jpeg', use_video_port=True):
if time.time() - start_time > duration:
break
stream.seek(0)
frame_data = stream.read()
#print(take_photo)
if take_photo == True:
#print("Should take photo")
with open(image_path, "wb") as f:
f.write(frame_data)
#mail.send_email(frame_data)
take_photo = False
#print("Stream happen")
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' +
frame_data +
b'\r\n')
stream.seek(0)
stream.truncate()
@app.route('/')
def index():
# This route will render the HTML template that includes the control buttons and empty videofeed.
print("Test")
return render_template('index.html')
#@app.route('/video_page')
#def video_page():
# # This route is dedicated to video streaming.
# return render_template('videofeed.html')
@app.route('/video_feed')
def video_feed():
# This route is dedicated to video streaming.
return Response(generate_frames(), mimetype='multipart/x-mixed-replace; boundary=frame')
@app.route("/", methods=['POST'])
def command():
action = request.form['action']
ser.write((action.upper() + '!\n').encode('utf-8')) # Simplify the serial write command
time.sleep(2) # Delay to allow Arduino to process the command
line = ser.readline().decode('utf-8').strip() # Read response from Arduino
return render_template('index.html')
if __name__ == '__main__':
run_checkMotion_thread()
app.run(host='0.0.0.0', port=5000, debug=True)
print("Test")
\ No newline at end of file
mail.py 0 → 100644
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email import encoders
email_user = "zeeshdev28@gmail.com"
email_password = "eddt rekq jhwn yydu"
email_send = "zeeshdev28@gmail.com"
#subject = "Test"
def send_email(subject, body, image_path):
msg = MIMEMultipart()
msg['From'] = email_user
msg['To'] = email_send
msg['Subject'] = subject
#body = "Motion detected! See the attached image."
msg.attach(MIMEText(body, 'plain'))
attachment = open(image_path, 'rb')
part = MIMEBase('application', 'octet-stream')
part.set_payload((attachment).read())
encoders.encode_base64(part)
part.add_header('Content-Disposition', "attachment; filename= " + image_path)
msg.attach(part)
text = msg.as_string()
server = smtplib.SMTP('smtp.gmail.com', 587)
server.starttls()
server.login(email_user, email_password)
server.sendmail(email_user, email_send, text)
server.quit()
\ No newline at end of file
<!DOCTYPE html>
<html>
<head>
<title>Raspberry Pi Camera Control</title>
</head>
<body>
<div style="width:100%; text-align: center;">
<div style="height:50%;">
<h1>Live Camera Feed</h1>
<!-- Embedding the live video feed -->
<img src="{{ url_for('video_feed') }}" alt="Video Feed">
</div>
<h2>Control Panel</h2>
<form action="{{ url_for('command') }}" method="post">
<button name="action" value="open" type="submit" style= "width:200px; height:50px; font-size: 30px;" >Open</button>
<button name="action" value="close" type="submit" style= "width:200px; height:50px; font-size: 30px;" >Close</button>
</form>
</div>
</body>
</html>
#include <Servo.h>
Servo myservo;
int pos = 0;
void setup() {
Serial.begin(9600);
myservo.attach(5);
}
void open() {
for (pos = 0; pos <= 90; pos += 1) {
myservo.write(pos);
delay(15);
}
Serial.println("Opened");
}
void close() {
for (pos = 90; pos >= 0; pos -= 1) {
myservo.write(pos);
delay(15);
}
Serial.println("Closed");
}
void loop() {
if (Serial.available() > 0) {
String data = Serial.readStringUntil('\n'); // Adjusted to read until newline
data.trim(); // Trim whitespace and newline
if (data.equals("OPEN!")) {
open();
}
else if (data.equals("CLOSE!")) {
close();
}
}
delay(100); // Reduce delay to respond faster if needed
}
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment