Bounty: 100
I’m working on a small side project at the moment – like a homemade CCTV system.
This part is my Python Capture Client – it uses OpenCV to capture frames from a connected webcam and sends the frames to a connected server via a socket.
The main thing I was going for was a small application with two services which operate independently once started. One for capturing frames from the camera, and another for sending + receiving network messages. If either of these fail, the other would still work with no issues.
I have more or less achieved this but I’m not certain that I took the best approach – I’m not normally a Python developer so I sort of winged it with this application.
Things I felt especially strange about were the use of queues. From my searching, they seemed to be the best way for sharing data between threads.
The application can be found here – any advice or comments would be appreciated!
This is the main entry point into the application:
main.py
from orchestrator import Orchestrator
from connection_service import ConnectionService
from capture_service import CaptureService
HOST = "127.0.0.1"
PORT = 11000
def main():
capture_service = CaptureService()
connection_service = ConnectionService(HOST, PORT)
orchestrator = Orchestrator(capture_service, connection_service)
orchestrator.start()
if __name__ == '__main__':
main()
This is my orchestration service – it coordinates the main loop of retrieving frames + sending to the server:
orchestrator.py
from connection_service import ConnectionService
from capture_service import CaptureService
from not_connected_exception import NotConnectedException
import multiprocessing
import cv2
import time
class Orchestrator():
def __init__(self, capture_service, connection_service):
self.manager = multiprocessing.Manager()
self.connection_service = connection_service
self.capture_service = capture_service
self.SEND_FOOTAGE = True
self.DETECT_MOTION = False
self.RUN = True
# End services
def finish(self):
self.RUN = False
self.connection_service.disconnect()
self.capture_service.stop_capture()
# Start services, connect to server / start capturing from camera
# Grab frames from capture service and display
# Retrieve any messages from connection service
# Deal with message e.g stop / start sending frames
# If send footage is true, encode frame as string and send
def start(self):
print ("Starting Orchestration...")
self.connection_service.connect()
self.capture_service.start_capture()
while self.RUN:
message = None
#Get camera frames
frame = self.capture_service.get_current_frame()
self.display_frame(frame)
message = self.connection_service.get_message()
self.handle_message(message)
#Send footage if requested
if self.SEND_FOOTAGE and frame is not None: #or (self.DETECT_MOTION and motion_detected):
try:
frame_data = cv2.imencode('.jpg', frame)[1].tostring()
self.connection_service.send_message(frame_data)
except NotConnectedException as e:
self.connection_service.connect()
def handle_message(self, message):
if message is "SEND_FOOTAGE":
self.SEND_FOOTAGE = True
elif message is "STOP_SEND_FOOTAGE":
self.SEND_FOOTAGE = False
elif message is "DETECT_MOTION":
self.DETECT_MOTION = True
elif message is "STOP_DETECT_MOTION":
self.DETECT_MOTION = False
def display_frame(self, frame):
if frame is not None:
# Display the resulting frame
cv2.imshow('orchestrator', frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
cv2.destroyAllWindows()
raise SystemExit("Exiting...")
This is my capturing service – it’s job is to capture frames from the camera and put the frames onto a queue:
capture_service.py
import cv2
import multiprocessing
class CaptureService():
FRAME_QUEUE_SIZE_LIMIT = 10
STOP_QUEUE_SIZE_LIMIT = 1
START_QUEUE_SIZE_LIMIT = 1
def __init__(self):
self.frame = None
manager = multiprocessing.Manager()
# The queue to add frames to
self.frame_queue = manager.Queue(self.FRAME_QUEUE_SIZE_LIMIT)
# A queue to indicate capturing should be stopped
self.stop_queue = manager.Queue(self.STOP_QUEUE_SIZE_LIMIT)
# A queue to indicate that capturing should be started
self.start_queue = manager.Queue(self.START_QUEUE_SIZE_LIMIT)
# Start Capture
# Empty the stop queue. If the start queue is empty - start a new capture thread
# If start queue is not empty, service has already been started
def start_capture(self):
print ("Starting capture...")
while not self.stop_queue.empty():
self.stop_queue.get()
if self.start_queue.empty():
self.capture_thread = multiprocessing.Process(target=self.capture_frames)
self.capture_thread.start()
self.start_queue.put("")
print ("Capturing started...")
else:
print ("Capture already started...")
# Is Capturing
# Return true if start queue has a value
def is_capturing(self):
return not self.start_queue.empty()
# Get Current Frame
# Return the current frame from the frame queue
def get_current_frame(self):
if not self.frame_queue.empty():
return self.frame_queue.get()
return None
# Stop Capture
# Add a message to the stop queue
# Empty the start queue
def stop_capture(self):
if self.stop_queue.empty():
self.stop_queue.put("")
while not self.start_queue.empty():
self.start_queue.get()
# Capture Frames
# Captures frames from the device at 0
# Only add frames to queue if there's space
def capture_frames(self):
cap = None
try:
cap = cv2.VideoCapture(0)
while True:
#Empty Start / Stop queue signals
if not self.stop_queue.empty():
while not self.stop_queue.empty():
self.stop_queue.get()
while not self.start_queue.empty():
self.start_queue.get()
break;
ret, frame = cap.read()
if self.frame_queue.qsize() > self.FRAME_QUEUE_SIZE_LIMIT or self.frame_queue.full():
continue
self.frame_queue.put(frame)
# When everything done, release the capture
cap.release()
cv2.destroyAllWindows()
except Exception as e:
print ("Exception capturing images, stopping...")
self.stop_capture()
cv2.destroyAllWindows()
if cap is not None:
cap.release()
This is my connection service, it takes care of all network related comms.
connection_service.py
from send_message_exception import SendMessageException
from not_connected_exception import NotConnectedException
import socket
import time
import multiprocessing
import struct
class ConnectionService():
MAX_QUEUE_SIZE = 1
def __init__(self, host, port):
self.host = host
self.port = port
self.socket = None
manager = multiprocessing.Manager()
# The queue to put messages to send on
self.send_message_queue = manager.Queue(self.MAX_QUEUE_SIZE)
# The queue received messages go onto
self.receive_message_queue = manager.Queue(self.MAX_QUEUE_SIZE)
# A queue which indicates if the service is connected or not
self.is_connected_queue = manager.Queue(self.MAX_QUEUE_SIZE)
# A queue which indicateds if the service is trying to connect
self.pending_connection_queue = manager.Queue(self.MAX_QUEUE_SIZE)
# A queue to stop sending activity
self.stop_send_queue = manager.Queue(self.MAX_QUEUE_SIZE)
# A queue to stop receiving activity
self.stop_receive_queue = manager.Queue(self.MAX_QUEUE_SIZE)
# Connect to the server
# 1) If already connected - return
# 2) If pending connection - return
# 3) Start the network thread - don't return until the connection status is pending
def connect(self):
if self.is_connected():
return
elif not self.pending_connection_queue.empty():
return
else:
self.network_thread = multiprocessing.Process(target=self.start_network_comms)
self.network_thread.start()
#Give thread time to sort out queue
while self.pending_connection_queue.empty():
continue
# Start network communications
# Mark connection status as pending via queue. Clear stop queues.
# Get socket for connection, mark as connected via queue.
# Start Send + Receive message queues with socket as argument
def start_network_comms(self):
self.pending_connection_queue.put("CONNECTING")
self.clear_queue(self.stop_send_queue)
self.clear_queue(self.stop_receive_queue)
self.socket = self.connect_to_server(self.host, self.port)
self.is_connected_queue.put("CONNECTED")
self.pending_connection_queue.get()
print ("Connected to server...")
receive_message_thread = multiprocessing.Process(target=self.receive_message, args=(self.socket,))
receive_message_thread.start()
send_message_thread = multiprocessing.Process(target=self.send_message_to_server, args=(self.socket,))
send_message_thread.start()
# Return true if connected queue has a value
def is_connected(self):
return not self.is_connected_queue.empty()
# Put message on stop queues to end send / receive threads
# Clear connected state queues
def disconnect(self):
print ("Disconnecting...")
self.stop_receive_queue.put("")
self.stop_send_queue.put("")
self.clear_queue(self.pending_connection_queue)
self.clear_queue(self.is_connected_queue)
print ("Connection closed")
# Send a message
# If connected and send queue isn't full - add message to send queue
# Raise exception if not connected
def send_message(self, message):
if self.is_connected():
if self.send_message_queue.full():
print ("Send message queue full...")
return
self.send_message_queue.put(message)
else:
raise NotConnectedException("Not connected to server...")
# Send message to server
# If send queue isn't empty, send the message length + message (expects binary data) to server
# If exception while sending and the stop queue isn't empty - disconnect
def send_message_to_server(self, socket):
while self.stop_send_queue.empty():
while not self.send_message_queue.empty():
print ("Message found on queue...")
try:
message = self.send_message_queue.get()
message_size = len(message)
print (f"Message: {message_size}")
socket.sendall(struct.pack(">L", message_size) + message)
except Exception as e:
if not self.stop_send_queue.empty():
return
print (f"nException sending message:nn{e}")
self.disconnect()
# Get a message
# If the receive queue isn't empty - return a message
def get_message(self):
if not self.receive_message_queue.empty():
return self.receive_message_queue.get()
return None
# Receive messages from socket
# Read data from socket according to the pre-pended message length
def receive_message(self, socket):
data = b""
payload_size = struct.calcsize(">L")
print ("Listening for messages...")
while self.stop_receive_queue.empty():
#Get message size
try:
while len(data) < payload_size:
data += socket.recv(4096)
packed_msg_size = data[:payload_size]
data = data[payload_size:]
msg_size = struct.unpack(">L", packed_msg_size)[0]
print ("Received message size:")
print (msg_size)
#Get message
while len(data) < msg_size:
data += socket.recv(4096)
message = data[:msg_size]
data = data[msg_size:]
print (message)
if self.receive_message_queue.qsize() >= self.MAX_QUEUE_SIZE or self.receive_message_queue.full():
continue
self.receive_message_queue.put(message)
except Exception as e:
print (f"nException while receiving messages: {e}nn")
break
print ("nDisconnecting...nn")
self.disconnect()
# Connect to the server
def connect_to_server(self, host, port, wait_time=1):
try:
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client_socket.connect((host, port))
return client_socket
except Exception:
print (f"Couldn't connect to remote address, waiting {wait_time} seconds to retry")
time.sleep(wait_time)
return self.connect_to_server(host, port, wait_time * 1)
# Clear messages from the supplied queue (should live somewhere else)
def clear_queue(self, queue):
while not queue.empty():
queue.get()
not_connected_exception.py
class NotConnectedException(Exception):
def __init__(self, message):
super().__init__(message)
And a small test server just to test receiving messages..
test_server.py
import socket
import sys
import struct
HOST = "127.0.0.1"
PORT = 11000
def main():
s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
print('Socket created')
s.bind((HOST,PORT))
print('Socket bind complete')
while True:
s.listen(10)
try:
print('Socket now listening')
conn,addr=s.accept()
data = b""
payload_size = struct.calcsize(">L")
print("payload_size: {}".format(payload_size))
while True:
while len(data) < payload_size:
data += conn.recv(4096)
packed_msg_size = data[:payload_size]
data = data[payload_size:]
msg_size = struct.unpack(">L", packed_msg_size)[0]
print("msg_size: {}".format(msg_size))
while len(data) < msg_size:
data += conn.recv(4096)
frame_data = data[:msg_size]
data = data[msg_size:]
except Exception as e:
print("Whoops...")
print (e)
if __name__ == '__main__':
main()
Get this bounty!!!