Skip to content

Commit

Permalink
v0.1612 - better queuing & thread handling
Browse files Browse the repository at this point in the history
  • Loading branch information
FlyingFathead committed Nov 28, 2024
1 parent 203cb7b commit b34d03e
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 65 deletions.
3 changes: 3 additions & 0 deletions .catgitinclude
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
yolov8_live_rtmp_stream_detection.py
config.ini
remote_sync.py
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,9 @@ Use `utils/batch_humdet_yolo8_opencv2.py` to run YOLOv8 batch detection on direc
- Add hooks for sending detections to web servers or APIs

## Changelog
- **v0.1612**
- Improved thread and queue handling (non-blocking remote sync and other actions, etc)
- Maximum queue sizes can be set separately for saved frames and remote syncs (see `config.ini`)
- **v0.1611**
- TTS handling changes; test message on startup
- (TODO) Firejail users may still encounter issues due to audio routing inside Firejail instances
Expand Down
8 changes: 5 additions & 3 deletions config.ini
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,11 @@ timeout = 60
tts_cooldown = 2

[performance]
# 'frame_queue_size' defines the maximum frame queue size when saving detections.
# 'image_save_queue_maxsize' defines the maximum frame queue size when saving detections.
# Too high of a value may introduce lag or dropouts while saving.
# 10 or 20 might be a good starter for many. Can be easily 100-1000.
# Set to 0 or negative for unlimited queue size.
frame_queue_size = 1000
image_save_queue_maxsize = 1000

[logging]
enable_detection_logging_to_file = True
Expand Down Expand Up @@ -144,4 +144,6 @@ strip_local_path = true
# File send queuing options
max_retries = 10
retry_delay = 1
remote_sync_queue_maxsize = 1000
remote_sync_queue_maxsize = 1000
# Maximum number of worker threads for remote sync
max_workers = 10
81 changes: 75 additions & 6 deletions remote_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
import logging
import threading
import time
from queue import Queue, Empty
from queue import Queue, Empty, Full
from concurrent.futures import ThreadPoolExecutor
from configparser import NoOptionError

# Check for paramiko
try:
Expand Down Expand Up @@ -56,6 +58,25 @@ def __init__(self, config, main_logger, save_dir_base, aggregated_detections_fil
self.remote_sync_stop_event = threading.Event()
self.remote_sync_thread = None

# handle via thread pool executor
# self.executor = ThreadPoolExecutor(max_workers=5) # Adjust the number of workers as needed

# determine maximum number of workers for remote sync
# Read max_workers from config with a default fallback
try:
self.MAX_WORKERS = config.getint('remote_sync', 'max_workers')
except (NoOptionError, ValueError):
self.MAX_WORKERS = 5 # Default value if not specified or invalid
self.logger.info(f"Remote sync will use up to {self.MAX_WORKERS} worker threads.")

# Ensure MAX_WORKERS is a positive integer
if self.MAX_WORKERS <= 0:
self.logger.warning("max_workers must be a positive integer. Defaulting to 5.")
self.MAX_WORKERS = 5

# Initialize the ThreadPoolExecutor with the configured number of workers
self.executor = ThreadPoolExecutor(max_workers=self.MAX_WORKERS)

# Handle Remote Sync Configuration
if self.READ_REMOTE_CONFIG_FROM_ENV:
# Read from environment variables
Expand Down Expand Up @@ -205,6 +226,9 @@ def sync_file_to_remote(self, file_path):
:param file_path: Path to the file to be synced.
"""

self.logger.info(f"Starting sync for {file_path} in thread {threading.current_thread().name}")

attempt = 0
while attempt < self.MAX_RETRIES:
try:
Expand Down Expand Up @@ -335,18 +359,61 @@ def remote_sync_thread_function(self, sync_queue, stop_event):
:param stop_event: Event to signal the thread to stop.
"""
last_aggregated_detections_mtime = None
last_queue_log_time = 0 # Initialize last log time

while not stop_event.is_set():
current_time = time.time()
# Log queue size every 10 seconds
if current_time - last_queue_log_time >= 10:
queue_size = sync_queue.qsize()
self.logger.info(f"Remote sync queue size: {queue_size}/{self.REMOTE_SYNC_QUEUE_MAXSIZE}")
last_queue_log_time = current_time

# Existing warning if queue size is high
queue_size = sync_queue.qsize()
if queue_size > self.REMOTE_SYNC_QUEUE_MAXSIZE * 0.8:
self.logger.warning(f"Remote sync queue size is at {queue_size}/{self.REMOTE_SYNC_QUEUE_MAXSIZE}")

# Handle files in the sync_queue
try:
file_to_sync = sync_queue.get(timeout=1)
# Perform the sync operation with retries
self.sync_file_to_remote(file_to_sync)
queue_size = sync_queue.qsize()
self.logger.info(f"Dequeued file for remote sync: {file_to_sync} (Queue size: {queue_size}/{self.REMOTE_SYNC_QUEUE_MAXSIZE})")
self.executor.submit(self.sync_file_to_remote, file_to_sync)
except Empty:
pass
except Exception as e:
self.logger.error(f"Error in remote_sync_thread: {e}")

# while not stop_event.is_set():
# # Handle files in the sync_queue
# queue_size = sync_queue.qsize()
# if queue_size > self.REMOTE_SYNC_QUEUE_MAXSIZE * 0.8:
# self.logger.warning(f"Remote sync queue size is at {queue_size}/{self.REMOTE_SYNC_QUEUE_MAXSIZE}")

# # Handle files in the sync_queue
# try:
# file_to_sync = sync_queue.get(timeout=1)
# queue_size = sync_queue.qsize()
# self.logger.info(f"Dequeued file for remote sync: {file_to_sync} (Queue size: {queue_size}/{self.REMOTE_SYNC_QUEUE_MAXSIZE})")
# # Perform the sync operation with retries
# self.executor.submit(self.sync_file_to_remote, file_to_sync)
# except Empty:
# pass
# except Exception as e:
# self.logger.error(f"Error in remote_sync_thread: {e}")

# # Handle files in the sync_queue
# try:
# file_to_sync = sync_queue.get(timeout=1)
# # Perform the sync operation with retries
# # self.sync_file_to_remote(file_to_sync)
# self.executor.submit(self.sync_file_to_remote, file_to_sync)
# except Empty:
# pass
# except Exception as e:
# self.logger.error(f"Error in remote_sync_thread: {e}")

# Check for aggregated detections file
if self.SYNC_AGGREGATED_DETECTIONS and self.aggregated_detections_file:
try:
Expand All @@ -371,7 +438,9 @@ def enqueue_file(self, file_path):
"""
if self.REMOTE_SYNC_ENABLED:
try:
self.remote_sync_queue.put(file_path, timeout=5)
self.logger.debug(f"Enqueued file for remote sync: {file_path}")
except Queue.Full:
self.remote_sync_queue.put(file_path, block=False)
queue_size = self.remote_sync_queue.qsize()
self.logger.info(f"Enqueued file for remote sync: {file_path} (Queue size: {queue_size}/{self.REMOTE_SYNC_QUEUE_MAXSIZE})")
except Full:
self.logger.error(f"Remote sync queue is full. Failed to enqueue file: {file_path}")
raise # Re-raise to be caught by the caller
2 changes: 1 addition & 1 deletion version.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
# version.py

version_number = "0.1611 (Nov 23h 2024)"
version_number = "0.1612 (Nov 28 2024)"
Loading

0 comments on commit b34d03e

Please sign in to comment.