Overview
The stream processing module provides three classes for handling RTSP streams:
- StreamProcessor - Processes single RTSP streams with person detection and saving
- MultiStreamManager - Orchestrates multiple concurrent streams
- DisplayManager - Manages grid display window for multiple streams
StreamProcessor
Processes a single RTSP stream, runs person detection, handles saving (image snapshots or video clips), and optionally feeds frames to a DisplayManager.
Constructor
from stream_processor import StreamProcessor
from person_detector import PersonDetector
detector = PersonDetector()
processor = StreamProcessor(detector, output_dir="person")
__init__()
def __init__(self, detector: PersonDetector, output_dir: str = "person") -> None
PersonDetector instance to use for person detection. Can be shared across multiple StreamProcessor instances.
Base directory where captured images or video clips are saved.
Implementation: stream_processor.py:26-28
Methods
process_rtsp_stream()
def process_rtsp_stream(
self,
rtsp_url: str,
frame_skip: int = 15,
display: bool = True,
save_mode: Optional[str] = None,
) -> None
Process a single RTSP stream with an optional dedicated display window. Blocks until stream ends or user quits.
RTSP stream URL to connect to.Example: "rtsp://192.168.1.100:554/stream"
Analyze every Nth frame. At 30fps, frame_skip=15 means ~2 detections per second.Additional throttling: Detection also limited to once every 0.5 seconds (line 303).
Show a live annotated window resized to 1280×720. Press ‘q’ to quit.
save_mode
Optional[str]
default:"None"
Save mode for captures:
"image" - Save JPEG snapshots when person enters frame
"video" - Record MP4 clips from entry until exit
None - Don’t save anything
Behavior:
- Connects to RTSP stream with automatic reconnection (up to 5 attempts)
- Processes frames continuously
- Runs person detection every
frame_skip frames (and max once per 0.5s)
- Tracks person entry/exit with 3-frame exit confirmation threshold
- Saves snapshots or video clips based on
save_mode
- Displays annotated frames if
display=True
- Handles Ctrl+C gracefully
Exit Conditions:
- User presses ‘q’ (when display enabled)
- User presses Ctrl+C
- Max reconnection attempts exceeded
- Stream ends naturally
Console Output Example:
Connecting to RTSP stream: rtsp://192.168.1.100/stream
Created directory: person
Connected successfully! Processing frames...
Press 'q' to quit
[2026-03-09 14:32:15] Frame 15: 1 person(s) detected
Person entered frame! Entry #1
Saved snapshot: person/person_entry_1_20260309_143215_1678368735.jpg
[2026-03-09 14:32:18] Frame 45: No persons
No person detected (1/3)
No person detected (2/3)
No person detected (3/3)
Person(s) exited frame. Waiting for next entry...
Processed 120 frames, captured 1 person snapshot(s)
Implementation: stream_processor.py:230-410
process_single_stream()
def process_single_stream(
self,
stream_id: Union[int, str],
rtsp_url: str,
frame_skip: int = 15,
save_mode: Optional[str] = None,
display_manager: Optional[DisplayManager] = None,
) -> None
Process an RTSP stream indefinitely in the calling thread. Designed to be called from a worker thread in multi-stream scenarios.
Unique identifier for this stream (used in console output and directory naming).
RTSP stream URL to connect to.
save_mode
Optional[str]
default:"None"
"image", "video", or None.
display_manager
Optional[DisplayManager]
default:"None"
If provided, updates the shared grid display buffer with annotated frames.
Behavior:
- Nearly identical to
process_rtsp_stream(), but designed for threading
- Saves files to
{output_dir}/stream_{stream_id}/ subdirectories
- Updates
display_manager instead of creating its own window
- Prefixes all console output with
[Stream {stream_id}]
- Stops when
display_manager.is_running becomes False
Usage Pattern:
import threading
from stream_processor import StreamProcessor
from person_detector import PersonDetector
from display_manager import DisplayManager
detector = PersonDetector()
processor = StreamProcessor(detector)
display_manager = DisplayManager()
display_manager.start([1, 2, 3])
threads = []
for stream_id, rtsp_url in enumerate(["rtsp://cam1", "rtsp://cam2", "rtsp://cam3"], 1):
t = threading.Thread(
target=processor.process_single_stream,
args=(stream_id, rtsp_url, 15, "video", display_manager),
daemon=True
)
t.start()
threads.append(t)
for t in threads:
t.join()
display_manager.stop()
Implementation: stream_processor.py:34-224
Private Methods
_save_annotated_snapshot()
@staticmethod
def _save_annotated_snapshot(
frame: cv2.typing.MatLike,
boxes: List[Tuple[int, int, int, int, float]],
filename: str,
) -> None
Internal helper that draws bounding boxes and confidence labels on a frame and saves it as JPEG.
Implementation: stream_processor.py:416-434
Person Entry/Exit Detection
StreamProcessor uses a state machine to track person presence:
Entry Detection:
- Person detected in frame where
person_present = False
- Triggers snapshot save (image mode) or starts video recording (video mode)
- Increments
person_entry_count
Exit Detection:
- Uses 3-frame confirmation threshold (
NO_PERSON_EXIT_THRESHOLD = 3)
- Requires 3 consecutive detection frames with no person
- Prevents false exits from brief detection failures
- Stops video recording when person exits
State Variables:
person_present (bool): Current presence state
no_person_streak (int): Consecutive frames without detection
person_entry_count (int): Total number of entries detected
Implementation: stream_processor.py:123-181 and stream_processor.py:311-355
Automatic Reconnection
Both stream processing methods implement automatic reconnection:
consecutive_failures = 0
max_reconnect_attempts = 5
if not ret:
consecutive_failures += 1
print(f"Failed to read frame (attempt {consecutive_failures}/{max_reconnect_attempts}), reconnecting...")
cap.release()
time.sleep(2)
cap = cv2.VideoCapture(rtsp_url)
if consecutive_failures >= max_reconnect_attempts:
print("Max reconnect attempts reached. Giving up.")
break
Behavior:
- Waits 2 seconds between reconnection attempts
- Allows up to 5 consecutive failures
- Resets failure counter on successful frame read
Implementation: stream_processor.py:89-106 and stream_processor.py:281-296
Video Recording
When save_mode="video", StreamProcessor records MP4 clips:
Codec: mp4v (MPEG-4)
FPS: Automatically detected from stream via cv2.CAP_PROP_FPS (defaults to 25.0 if invalid)
Recording lifecycle:
- Person enters frame → Create
cv2.VideoWriter
- Every frame → Write to video file
- Person exits frame → Release writer and finalize file
Filename format:
person_clip_{entry_count}_{YYYYMMDD_HHMMSS}_{unix_timestamp}.mp4
Example:
person/stream_1/person_clip_1_20260309_143215_1678368735.mp4
Implementation: stream_processor.py:79-82, 148-159, 175-179, 329-339, 351-354
MultiStreamManager
Orchestrates concurrent processing of multiple RTSP streams. Each stream runs in its own daemon thread via StreamProcessor.
Constructor
from multi_stream_manager import MultiStreamManager
from person_detector import PersonDetector
detector = PersonDetector()
manager = MultiStreamManager(detector, output_dir="person")
__init__()
def __init__(self, detector: PersonDetector, output_dir: str = "person") -> None
PersonDetector instance shared across all streams.
Base directory for all stream outputs. Each stream creates a subdirectory: {output_dir}/stream_{id}/
Implementation: multi_stream_manager.py:31-34
Methods
process_multiple_streams()
def process_multiple_streams(
self,
rtsp_urls: Union[List[str], Dict[Union[int, str], str]],
frame_skip: int = 15,
save_mode: Optional[str] = None,
display: bool = False,
) -> None
Start all streams and block until they finish (or Ctrl+C).
rtsp_urls
Union[List[str], Dict[Union[int, str], str]]
required
RTSP stream URLs. Can be:List format (auto-numbered 1, 2, 3, …):rtsp_urls = [
"rtsp://camera1.local/stream",
"rtsp://camera2.local/stream",
"rtsp://camera3.local/stream",
]
Dict format (custom IDs):rtsp_urls = {
"lobby": "rtsp://lobby-cam.local/stream",
"parking": "rtsp://parking-cam.local/stream",
"entrance": "rtsp://entrance-cam.local/stream",
}
Analyze every Nth frame (applied to all streams).
save_mode
Optional[str]
default:"None"
"image", "video", or None (applied to all streams).
Show a live grid window with all streams. Press ‘q’ to quit.
Behavior:
- Creates output directory structure
- Optionally starts DisplayManager with grid window
- Spawns one daemon thread per stream
- Waits for all threads to complete or display window to close
- Handles Ctrl+C gracefully
- Cleans up display manager on exit
Console Output Example:
Starting person detection on 3 stream(s)...
Created main directory: person
Started thread for stream 1: rtsp://camera1.local/stream
Started thread for stream 2: rtsp://camera2.local/stream
Started thread for stream 3: rtsp://camera3.local/stream
All streams shown in a single grid window. Press 'q' or Ctrl+C to stop...
[Stream 1] Connecting to: rtsp://camera1.local/stream
[Stream 2] Connecting to: rtsp://camera2.local/stream
[Stream 3] Connecting to: rtsp://camera3.local/stream
[Stream 1] Connected successfully! Processing frames...
[Stream 2] Connected successfully! Processing frames...
[Stream 3] Connected successfully! Processing frames...
Implementation: multi_stream_manager.py:36-103
Complete Usage Example:
from person_detector import PersonDetector
from multi_stream_manager import MultiStreamManager
detector = PersonDetector(
confidence_threshold=0.6,
person_area_threshold=1500
)
manager = MultiStreamManager(detector, output_dir="captures")
manager.process_multiple_streams(
rtsp_urls=[
"rtsp://192.168.1.100/stream",
"rtsp://192.168.1.101/stream",
"rtsp://192.168.1.102/stream",
"rtsp://192.168.1.103/stream",
],
frame_skip=20,
save_mode="video",
display=True,
)
DisplayManager
Manages a single resizable grid window that composites frames from multiple streams in real time.
Constructor
from display_manager import DisplayManager
dm = DisplayManager(cell_width=640, cell_height=360)
__init__()
def __init__(self, cell_width: int = 640, cell_height: int = 360) -> None
Width of each stream cell in the grid (pixels).
Height of each stream cell in the grid (pixels).
Grid Layout:
- Automatically calculates grid dimensions based on number of streams
- Uses square-ish layout:
cols = ceil(sqrt(n)), rows = ceil(n / cols)
- Total window size:
(cell_width * cols, cell_height * rows)
Examples:
- 1 stream: 1×1 grid (640×360)
- 2 streams: 2×1 grid (1280×360)
- 3 streams: 2×2 grid (1280×720)
- 4 streams: 2×2 grid (1280×720)
- 9 streams: 3×3 grid (1920×1080)
Implementation: display_manager.py:22-30
Properties
is_running
@property
def is_running(self) -> bool
Returns whether the display thread is currently running. Useful for stream threads to detect when the user has closed the display window.
Implementation: display_manager.py:36-38
Methods
start()
def start(self, stream_ids: List[Union[int, str]]) -> None
Register stream IDs and start the display thread.
stream_ids
List[Union[int, str]]
required
List of stream identifiers that will be displayed. Order determines grid position (left-to-right, top-to-bottom).Example:dm.start([1, 2, 3, 4]) # Numeric IDs
dm.start(["lobby", "parking", "entrance"]) # String IDs
Behavior:
- Initializes frame buffer for each stream (all
None initially)
- Sets
is_running = True
- Spawns daemon thread running
_loop()
- Returns immediately (non-blocking)
Implementation: display_manager.py:40-48
stop()
Signal the display thread to stop and wait for it to finish (up to 2 seconds).
Behavior:
- Sets
is_running = False
- Waits for display thread with 2-second timeout
- Destroys all OpenCV windows
- Clears frame buffer
Implementation: display_manager.py:50-57
update_frame()
def update_frame(self, stream_id: Union[int, str], frame: cv2.typing.MatLike) -> None
Update the display buffer for a specific stream. Thread-safe.
Stream identifier (must match one from start()).
frame
cv2.typing.MatLike
required
OpenCV image matrix to display. Will be resized to (cell_width, cell_height) automatically.
Thread Safety: Protected by internal lock (self._lock).
Usage Pattern:
# From stream processing thread
while True:
ret, frame = cap.read()
if ret:
# Annotate frame
for x, y, w, h, conf in boxes:
cv2.rectangle(frame, (x, y), (x+w, y+h), (0, 255, 0), 2)
# Update display
display_manager.update_frame(stream_id, frame)
# Check if display is still running
if not display_manager.is_running:
break
Implementation: display_manager.py:59-62
Internal Methods
_build_grid()
def _build_grid(self) -> cv2.typing.MatLike
Compose all buffered frames into a single grid image. Called internally by display loop.
Behavior:
- Locks frame buffer during composition
- Resizes each frame to cell dimensions
- Shows “Connecting…” placeholder for streams without frames yet
- Arranges frames in grid layout
- Returns single composited image
Implementation: display_manager.py:68-109
_loop()
Display thread main loop. Renders the grid at ~30 fps until stopped.
Behavior:
- Creates window:
"Person Detection - All Streams"
- Updates display every 30ms (~33 fps)
- Stops when
is_running = False or user presses ‘q’
- Destroys window on exit
Implementation: display_manager.py:111-124
Complete Usage Example
import threading
import cv2
from display_manager import DisplayManager
# Initialize display manager
dm = DisplayManager(cell_width=640, cell_height=360)
# Start display with 4 streams
dm.start([1, 2, 3, 4])
# Stream processing function
def process_stream(stream_id, rtsp_url):
cap = cv2.VideoCapture(rtsp_url)
while dm.is_running:
ret, frame = cap.read()
if ret:
# Add stream ID label
cv2.putText(
frame,
f"Stream {stream_id}",
(10, 30),
cv2.FONT_HERSHEY_SIMPLEX,
1,
(0, 255, 0),
2
)
dm.update_frame(stream_id, frame)
cap.release()
# Launch threads
streams = [
(1, "rtsp://cam1.local/stream"),
(2, "rtsp://cam2.local/stream"),
(3, "rtsp://cam3.local/stream"),
(4, "rtsp://cam4.local/stream"),
]
threads = []
for stream_id, rtsp_url in streams:
t = threading.Thread(
target=process_stream,
args=(stream_id, rtsp_url),
daemon=True
)
t.start()
threads.append(t)
# Wait for completion
try:
for t in threads:
t.join()
except KeyboardInterrupt:
print("Stopping...")
finally:
dm.stop()
Integration Example
Complete example showing how all three classes work together:
from person_detector import PersonDetector
from multi_stream_manager import MultiStreamManager
from config import load_config
# Load configuration
cfg = load_config("config.cfg")
# Initialize detector
detector = PersonDetector(
confidence_threshold=cfg.confidence_threshold,
person_area_threshold=cfg.person_area_threshold,
model_dir=cfg.model_dir
)
# Initialize manager
manager = MultiStreamManager(detector, output_dir=cfg.output_dir)
# Process multiple streams with grid display
manager.process_multiple_streams(
rtsp_urls={
"lobby": "rtsp://lobby-cam.local/stream",
"parking": "rtsp://parking-cam.local/stream",
"entrance": "rtsp://entrance-cam.local/stream",
"hallway": "rtsp://hallway-cam.local/stream",
},
frame_skip=cfg.frame_skip,
save_mode="video",
display=True,
)
Implementation References
- StreamProcessor:
stream_processor.py (435 lines)
- MultiStreamManager:
multi_stream_manager.py (104 lines)
- DisplayManager:
display_manager.py (125 lines)