So I’m a beginner in robotics and programming. I’m making a project using ESP32 and OpenCv to make a car that follows a certain color. When I was coding the opencv part of the project, I noticed that I would need to use threading to simultaneously process and send commands. So i came up with my own code with a little help from ChatGPT for the locks and Queues which i didnt know about before chatGPT inserted them.
import requests, threading
import requests, threading
direction_queue = Queue()
val = direction_queue.get()
if direction == prev_direction:
direction_queue.task_done()
prev_direction = direction
url = 'http://192.168.0.110/'
response = requests.get(url + direction)
response.raise_for_status() # Raises HTTPError if the status code is 4xx or 5xx
print('Response received successfully')
except requests.exceptions.HTTPError as errh:
print(f'HTTP Error: {errh}')
except requests.exceptions.ConnectionError as errc:
print(f'Error Connecting: {errc}')
except requests.exceptions.Timeout as errt:
print(f'Timeout Error: {errt}')
except requests.exceptions.RequestException as err:
print(f'An error occurred: {err}')'''
direction_queue.task_done()
motor_thread = threading.Thread(target=run_motor, daemon=True)
#'http://192.168.0.113:81/stream'
cap = cv2.VideoCapture('http://192.168.0.113:81/stream')
print("Error: Could not open video stream.")
upper_limit = np.array([85,255,255])
lower_limit = np.array([35, 100, 100])
print("Error: Could not read frame.")
if frame is None or frame.size == 0:
print("Error: Empty frame received.")
#height, width, channel = frame.shape
#print(f"height{height}, width {width}")
hsv_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
mask = cv2.inRange(hsv_frame, lower_limit, upper_limit)
mask_ = Image.fromarray(mask)
frame = cv2.rectangle (frame, (x1,y1), (x2,y2), (0,255,0), 4)
cv2.imshow('frame', frame)
if cv2.waitKey(1) == ord('q'):
cv2.destroyAllWindows() '''
<code>import cv2
import numpy as np
from PIL import Image
import requests, threading
from queue import Queue
#height = 480, width 640
import cv2
import numpy as np
from PIL import Image
import requests, threading
from queue import Queue
#height = 480, width 640
prev_direction = 'None'
lock = threading.Lock()
direction_queue = Queue()
def run_motor():
global prev_direction
while True:
val = direction_queue.get()
if val == -1:
direction = 'right'
elif val == 1:
direction = 'left'
elif val == 0:
direction = 'stop'
else:
continue
with lock:
if direction == prev_direction:
direction_queue.task_done()
continue
prev_direction = direction
url = 'http://192.168.0.110/'
print (direction)
'''try:
print(url+direction)
response = requests.get(url + direction)
response.raise_for_status() # Raises HTTPError if the status code is 4xx or 5xx
print('Response received successfully')
except requests.exceptions.HTTPError as errh:
print(f'HTTP Error: {errh}')
except requests.exceptions.ConnectionError as errc:
print(f'Error Connecting: {errc}')
except requests.exceptions.Timeout as errt:
print(f'Timeout Error: {errt}')
except requests.exceptions.RequestException as err:
print(f'An error occurred: {err}')'''
direction_queue.task_done()
motor_thread = threading.Thread(target=run_motor, daemon=True)
motor_thread.start()
#'http://192.168.0.113:81/stream'
cap = cv2.VideoCapture('http://192.168.0.113:81/stream')
if not cap.isOpened():
print("Error: Could not open video stream.")
exit()
upper_limit = np.array([85,255,255])
lower_limit = np.array([35, 100, 100])
while True:
ret, frame = cap.read()
if not ret:
print("Error: Could not read frame.")
break
if frame is None or frame.size == 0:
print("Error: Empty frame received.")
continue
#height, width, channel = frame.shape
#print(f"height{height}, width {width}")
hsv_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
mask = cv2.inRange(hsv_frame, lower_limit, upper_limit)
mask_ = Image.fromarray(mask)
bbox = mask_.getbbox()
if bbox is not None:
x1, y1, x2, y2 = bbox
frame = cv2.rectangle (frame, (x1,y1), (x2,y2), (0,255,0), 4)
cx = (x1+x2)//2
cy = (y1+y2)//2
if cx < 140:
val = -1
elif cx > 180:
val = 1
else:
val = 0
if val is not None:
direction_queue.put(val)
cv2.imshow('frame', frame)
if cv2.waitKey(1) == ord('q'):
break
cap.release()
cv2.destroyAllWindows() '''
</code>
import cv2
import numpy as np
from PIL import Image
import requests, threading
from queue import Queue
#height = 480, width 640
import cv2
import numpy as np
from PIL import Image
import requests, threading
from queue import Queue
#height = 480, width 640
prev_direction = 'None'
lock = threading.Lock()
direction_queue = Queue()
def run_motor():
global prev_direction
while True:
val = direction_queue.get()
if val == -1:
direction = 'right'
elif val == 1:
direction = 'left'
elif val == 0:
direction = 'stop'
else:
continue
with lock:
if direction == prev_direction:
direction_queue.task_done()
continue
prev_direction = direction
url = 'http://192.168.0.110/'
print (direction)
'''try:
print(url+direction)
response = requests.get(url + direction)
response.raise_for_status() # Raises HTTPError if the status code is 4xx or 5xx
print('Response received successfully')
except requests.exceptions.HTTPError as errh:
print(f'HTTP Error: {errh}')
except requests.exceptions.ConnectionError as errc:
print(f'Error Connecting: {errc}')
except requests.exceptions.Timeout as errt:
print(f'Timeout Error: {errt}')
except requests.exceptions.RequestException as err:
print(f'An error occurred: {err}')'''
direction_queue.task_done()
motor_thread = threading.Thread(target=run_motor, daemon=True)
motor_thread.start()
#'http://192.168.0.113:81/stream'
cap = cv2.VideoCapture('http://192.168.0.113:81/stream')
if not cap.isOpened():
print("Error: Could not open video stream.")
exit()
upper_limit = np.array([85,255,255])
lower_limit = np.array([35, 100, 100])
while True:
ret, frame = cap.read()
if not ret:
print("Error: Could not read frame.")
break
if frame is None or frame.size == 0:
print("Error: Empty frame received.")
continue
#height, width, channel = frame.shape
#print(f"height{height}, width {width}")
hsv_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
mask = cv2.inRange(hsv_frame, lower_limit, upper_limit)
mask_ = Image.fromarray(mask)
bbox = mask_.getbbox()
if bbox is not None:
x1, y1, x2, y2 = bbox
frame = cv2.rectangle (frame, (x1,y1), (x2,y2), (0,255,0), 4)
cx = (x1+x2)//2
cy = (y1+y2)//2
if cx < 140:
val = -1
elif cx > 180:
val = 1
else:
val = 0
if val is not None:
direction_queue.put(val)
cv2.imshow('frame', frame)
if cv2.waitKey(1) == ord('q'):
break
cap.release()
cv2.destroyAllWindows() '''
after that it was working ok but i was curious on how i can optimize it. So i asked chatGPT (a bad idea…) and it spit out this code
<code>'''direction_queue = Queue()
stop_flag = threading.Event()
while not stop_flag.is_set():
val = direction_queue.get()
if direction != prev_direction:
prev_direction = direction
url = 'http://192.168.0.110/'
response = requests.get(url + direction)
response.raise_for_status() # Raises HTTPError if the status code is 4xx or 5xx
print('Response received successfully')
except requests.exceptions.HTTPError as errh:
print(f'HTTP Error: {errh}')
except requests.exceptions.ConnectionError as errc:
print(f'Error Connecting: {errc}')
except requests.exceptions.Timeout as errt:
print(f'Timeout Error: {errt}')
except requests.exceptions.RequestException as err:
print(f'An error occurred: {err}')
#'http://192.168.0.113:81/stream'
cap = cv2.VideoCapture('http://192.168.0.113:81/stream')
print("Error: Could not open video stream.")
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
while not stop_flag.is_set():
print("Error: Could not read frame.")
if frame is not None and frame.size > 0:
upper_limit = np.array([85,255,255])
lower_limit = np.array([35, 100, 100])
while not stop_flag.is_set():
frame = frame_queue.get()
hsv_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
mask = cv2.inRange(hsv_frame, lower_limit, upper_limit)
mask_ = Image.fromarray(mask)
frame = cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 4)
cv2.imshow('frame', frame)
if cv2.waitKey(1) == ord('q'):
capture_thread = threading.Thread(target = capture_frame, daemon = True)
process_thread = threading.Thread(target = process_frame, daemon = True)
motor_thread = threading.Thread(target = run_motor, daemon = True)
<code>'''direction_queue = Queue()
frame_queue = Queue()
stop_flag = threading.Event()
def run_motor():
prev_direction = 'None'
while not stop_flag.is_set():
val = direction_queue.get()
if val == -1:
direction = 'right'
elif val == 1:
direction = 'left'
elif val == 0:
direction = 'stop'
else:
continue
if direction != prev_direction:
prev_direction = direction
url = 'http://192.168.0.110/'
print (direction)
try:
print(url+direction)
response = requests.get(url + direction)
response.raise_for_status() # Raises HTTPError if the status code is 4xx or 5xx
print('Response received successfully')
except requests.exceptions.HTTPError as errh:
print(f'HTTP Error: {errh}')
except requests.exceptions.ConnectionError as errc:
print(f'Error Connecting: {errc}')
except requests.exceptions.Timeout as errt:
print(f'Timeout Error: {errt}')
except requests.exceptions.RequestException as err:
print(f'An error occurred: {err}')
def capture_frame():
#'http://192.168.0.113:81/stream'
cap = cv2.VideoCapture('http://192.168.0.113:81/stream')
if not cap.isOpened():
print("Error: Could not open video stream.")
return
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
while not stop_flag.is_set():
ret, frame = cap.read()
if not ret:
print("Error: Could not read frame.")
break
if frame is not None and frame.size > 0:
frame_queue.put(frame)
cap.release()
def process_frame():
x =0
y = 0
upper_limit = np.array([85,255,255])
lower_limit = np.array([35, 100, 100])
while not stop_flag.is_set():
frame = frame_queue.get()
if frame is None:
continue
hsv_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
mask = cv2.inRange(hsv_frame, lower_limit, upper_limit)
mask_ = Image.fromarray(mask)
bbox = mask_.getbbox()
if bbox is not None:
x1, y1, x2, y2 = bbox
frame = cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 4)
cx = (x1 + x2) // 2
if cx < 300:
val = -1
elif cx > 340:
val = 1
else:
val = 0
direction_queue.put(val)
cv2.imshow('frame', frame)
if cv2.waitKey(1) == ord('q'):
stop_flag.set()
cv2.destroyAllWindows()
capture_thread = threading.Thread(target = capture_frame, daemon = True)
process_thread = threading.Thread(target = process_frame, daemon = True)
motor_thread = threading.Thread(target = run_motor, daemon = True)
capture_thread.start()
process_thread.start()
motor_thread.start()
capture_thread.join()
process_thread.join()
motor_thread.join()'''
</code>
'''direction_queue = Queue()
frame_queue = Queue()
stop_flag = threading.Event()
def run_motor():
prev_direction = 'None'
while not stop_flag.is_set():
val = direction_queue.get()
if val == -1:
direction = 'right'
elif val == 1:
direction = 'left'
elif val == 0:
direction = 'stop'
else:
continue
if direction != prev_direction:
prev_direction = direction
url = 'http://192.168.0.110/'
print (direction)
try:
print(url+direction)
response = requests.get(url + direction)
response.raise_for_status() # Raises HTTPError if the status code is 4xx or 5xx
print('Response received successfully')
except requests.exceptions.HTTPError as errh:
print(f'HTTP Error: {errh}')
except requests.exceptions.ConnectionError as errc:
print(f'Error Connecting: {errc}')
except requests.exceptions.Timeout as errt:
print(f'Timeout Error: {errt}')
except requests.exceptions.RequestException as err:
print(f'An error occurred: {err}')
def capture_frame():
#'http://192.168.0.113:81/stream'
cap = cv2.VideoCapture('http://192.168.0.113:81/stream')
if not cap.isOpened():
print("Error: Could not open video stream.")
return
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
while not stop_flag.is_set():
ret, frame = cap.read()
if not ret:
print("Error: Could not read frame.")
break
if frame is not None and frame.size > 0:
frame_queue.put(frame)
cap.release()
def process_frame():
x =0
y = 0
upper_limit = np.array([85,255,255])
lower_limit = np.array([35, 100, 100])
while not stop_flag.is_set():
frame = frame_queue.get()
if frame is None:
continue
hsv_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
mask = cv2.inRange(hsv_frame, lower_limit, upper_limit)
mask_ = Image.fromarray(mask)
bbox = mask_.getbbox()
if bbox is not None:
x1, y1, x2, y2 = bbox
frame = cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 4)
cx = (x1 + x2) // 2
if cx < 300:
val = -1
elif cx > 340:
val = 1
else:
val = 0
direction_queue.put(val)
cv2.imshow('frame', frame)
if cv2.waitKey(1) == ord('q'):
stop_flag.set()
cv2.destroyAllWindows()
capture_thread = threading.Thread(target = capture_frame, daemon = True)
process_thread = threading.Thread(target = process_frame, daemon = True)
motor_thread = threading.Thread(target = run_motor, daemon = True)
capture_thread.start()
process_thread.start()
motor_thread.start()
capture_thread.join()
process_thread.join()
motor_thread.join()'''
Turns out this code is so slow and doesnt even close the program properly. I’m just curious on why this code turned out to be slow.