From 08fce952456bc2fadbb78893044e86a7749ecc05 Mon Sep 17 00:00:00 2001 From: Ryan Tresman <tresmanr@cardiff.ac.uk> Date: Sun, 4 Aug 2024 15:47:41 +0000 Subject: [PATCH] Upload New File --- video_server.py | 85 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 85 insertions(+) create mode 100644 video_server.py diff --git a/video_server.py b/video_server.py new file mode 100644 index 0000000..30ac7aa --- /dev/null +++ b/video_server.py @@ -0,0 +1,85 @@ +import time +import io +import sys +import math +from picamera import PiCamera, Color +from flask import Flask, render_template, Response +from queue import Queue + + +class VideoStream: + + def __init__(self, queue): + self.queue = queue + self.app = Flask(__name__) + + @self.app.route('/') + def index(): + #Redner webpage + return render_template('index.html') + + @self.app.route('/video_feed') + def video_feed(): + #Display camera on webpage + return Response(self.gen_frames(), mimetype='multipart/x-mixed-replace; boundary=frame') + + #Run app + self.app.run(host='0.0.0.0', port=5000, debug=True) + + def formatDistance(self, dataDict, camera): + #Get data from dictionary + distance = dataDict.get('distance') + warningThreshold = dataDict.get('warningThreshold') + alertThreshold = dataDict.get('alertThreshold') + midThreshold = dataDict.get('midThreshold') + + if distance <= warningThreshold: + colour = 'red' + elif distance <= midThreshold: + colour = 'orange' + else: + colour = 'green' + + camera.annotate_background = Color(colour) + + #Display distance and optionally set warning message based on distance + warning = "" if distance > warningThreshold else "\nWATCH OUT!!!" + message = f"Distance: {distance}cm{warning}" + camera.annotate_text = message + + def gen_frames(self): + with PiCamera() as camera: + #Initialise camera + camera.stop_preview() + camera.resolution = (1024, 768) + camera.rotation = 90 + camera.framerate = 16 + camera.annotate_text_size = 64 + camera.annotate_foreground = Color('white') + stream = io.BytesIO() + time.sleep(2) + text = None + for frame in camera.capture_continuous(stream, 'jpeg', use_video_port=True): + try: + #Check for termination of queue, if not then read from the queue and put into dictionary + value = self.queue.get(block=False) + if isinstance(value, str) and value == 'terminate': + sys.exit() + else: + dataDict = value + #Display distance on camera + self.formatDistance(dataDict, camera) + except: + print("Error reading dictionary") + pass + + stream.seek(0) + #Return the frame of the camera + yield b'--frame\r\nContent-Type: image/jpeg\r\n\r\n' + stream.read() + b'\r\n' + stream.seek(0) + stream.truncate() + + +def runVideo(queue): + vs = VideoStream(queue) + \ No newline at end of file -- GitLab