Skip to main content

Overview

The stream processing module provides three classes for handling RTSP streams:
  • StreamProcessor - Processes single RTSP streams with person detection and saving
  • MultiStreamManager - Orchestrates multiple concurrent streams
  • DisplayManager - Manages grid display window for multiple streams

StreamProcessor

Processes a single RTSP stream, runs person detection, handles saving (image snapshots or video clips), and optionally feeds frames to a DisplayManager.

Constructor

from stream_processor import StreamProcessor
from person_detector import PersonDetector

detector = PersonDetector()
processor = StreamProcessor(detector, output_dir="person")

__init__()

def __init__(self, detector: PersonDetector, output_dir: str = "person") -> None
detector
PersonDetector
required
PersonDetector instance to use for person detection. Can be shared across multiple StreamProcessor instances.
output_dir
str
default:"person"
Base directory where captured images or video clips are saved.
Implementation: stream_processor.py:26-28

Methods

process_rtsp_stream()

def process_rtsp_stream(
    self,
    rtsp_url: str,
    frame_skip: int = 15,
    display: bool = True,
    save_mode: Optional[str] = None,
) -> None
Process a single RTSP stream with an optional dedicated display window. Blocks until stream ends or user quits.
rtsp_url
str
required
RTSP stream URL to connect to.Example: "rtsp://192.168.1.100:554/stream"
frame_skip
int
default:"15"
Analyze every Nth frame. At 30fps, frame_skip=15 means ~2 detections per second.Additional throttling: Detection also limited to once every 0.5 seconds (line 303).
display
bool
default:"True"
Show a live annotated window resized to 1280×720. Press ‘q’ to quit.
save_mode
Optional[str]
default:"None"
Save mode for captures:
  • "image" - Save JPEG snapshots when person enters frame
  • "video" - Record MP4 clips from entry until exit
  • None - Don’t save anything
Behavior:
  1. Connects to RTSP stream with automatic reconnection (up to 5 attempts)
  2. Processes frames continuously
  3. Runs person detection every frame_skip frames (and max once per 0.5s)
  4. Tracks person entry/exit with 3-frame exit confirmation threshold
  5. Saves snapshots or video clips based on save_mode
  6. Displays annotated frames if display=True
  7. Handles Ctrl+C gracefully
Exit Conditions:
  • User presses ‘q’ (when display enabled)
  • User presses Ctrl+C
  • Max reconnection attempts exceeded
  • Stream ends naturally
Console Output Example:
Connecting to RTSP stream: rtsp://192.168.1.100/stream
Created directory: person
Connected successfully! Processing frames...
Press 'q' to quit
[2026-03-09 14:32:15] Frame 15: 1 person(s) detected
  Person entered frame! Entry #1
  Saved snapshot: person/person_entry_1_20260309_143215_1678368735.jpg
[2026-03-09 14:32:18] Frame 45: No persons
  No person detected (1/3)
  No person detected (2/3)
  No person detected (3/3)
  Person(s) exited frame. Waiting for next entry...
Processed 120 frames, captured 1 person snapshot(s)
Implementation: stream_processor.py:230-410

process_single_stream()

def process_single_stream(
    self,
    stream_id: Union[int, str],
    rtsp_url: str,
    frame_skip: int = 15,
    save_mode: Optional[str] = None,
    display_manager: Optional[DisplayManager] = None,
) -> None
Process an RTSP stream indefinitely in the calling thread. Designed to be called from a worker thread in multi-stream scenarios.
stream_id
Union[int, str]
required
Unique identifier for this stream (used in console output and directory naming).
rtsp_url
str
required
RTSP stream URL to connect to.
frame_skip
int
default:"15"
Analyze every Nth frame.
save_mode
Optional[str]
default:"None"
"image", "video", or None.
display_manager
Optional[DisplayManager]
default:"None"
If provided, updates the shared grid display buffer with annotated frames.
Behavior:
  • Nearly identical to process_rtsp_stream(), but designed for threading
  • Saves files to {output_dir}/stream_{stream_id}/ subdirectories
  • Updates display_manager instead of creating its own window
  • Prefixes all console output with [Stream {stream_id}]
  • Stops when display_manager.is_running becomes False
Usage Pattern:
import threading
from stream_processor import StreamProcessor
from person_detector import PersonDetector
from display_manager import DisplayManager

detector = PersonDetector()
processor = StreamProcessor(detector)
display_manager = DisplayManager()
display_manager.start([1, 2, 3])

threads = []
for stream_id, rtsp_url in enumerate(["rtsp://cam1", "rtsp://cam2", "rtsp://cam3"], 1):
    t = threading.Thread(
        target=processor.process_single_stream,
        args=(stream_id, rtsp_url, 15, "video", display_manager),
        daemon=True
    )
    t.start()
    threads.append(t)

for t in threads:
    t.join()

display_manager.stop()
Implementation: stream_processor.py:34-224

Private Methods

_save_annotated_snapshot()

@staticmethod
def _save_annotated_snapshot(
    frame: cv2.typing.MatLike,
    boxes: List[Tuple[int, int, int, int, float]],
    filename: str,
) -> None
Internal helper that draws bounding boxes and confidence labels on a frame and saves it as JPEG. Implementation: stream_processor.py:416-434

Person Entry/Exit Detection

StreamProcessor uses a state machine to track person presence: Entry Detection:
  • Person detected in frame where person_present = False
  • Triggers snapshot save (image mode) or starts video recording (video mode)
  • Increments person_entry_count
Exit Detection:
  • Uses 3-frame confirmation threshold (NO_PERSON_EXIT_THRESHOLD = 3)
  • Requires 3 consecutive detection frames with no person
  • Prevents false exits from brief detection failures
  • Stops video recording when person exits
State Variables:
  • person_present (bool): Current presence state
  • no_person_streak (int): Consecutive frames without detection
  • person_entry_count (int): Total number of entries detected
Implementation: stream_processor.py:123-181 and stream_processor.py:311-355

Automatic Reconnection

Both stream processing methods implement automatic reconnection:
consecutive_failures = 0
max_reconnect_attempts = 5

if not ret:
    consecutive_failures += 1
    print(f"Failed to read frame (attempt {consecutive_failures}/{max_reconnect_attempts}), reconnecting...")
    cap.release()
    time.sleep(2)
    cap = cv2.VideoCapture(rtsp_url)
    if consecutive_failures >= max_reconnect_attempts:
        print("Max reconnect attempts reached. Giving up.")
        break
Behavior:
  • Waits 2 seconds between reconnection attempts
  • Allows up to 5 consecutive failures
  • Resets failure counter on successful frame read
Implementation: stream_processor.py:89-106 and stream_processor.py:281-296

Video Recording

When save_mode="video", StreamProcessor records MP4 clips: Codec: mp4v (MPEG-4) FPS: Automatically detected from stream via cv2.CAP_PROP_FPS (defaults to 25.0 if invalid) Recording lifecycle:
  1. Person enters frame → Create cv2.VideoWriter
  2. Every frame → Write to video file
  3. Person exits frame → Release writer and finalize file
Filename format:
person_clip_{entry_count}_{YYYYMMDD_HHMMSS}_{unix_timestamp}.mp4
Example:
person/stream_1/person_clip_1_20260309_143215_1678368735.mp4
Implementation: stream_processor.py:79-82, 148-159, 175-179, 329-339, 351-354

MultiStreamManager

Orchestrates concurrent processing of multiple RTSP streams. Each stream runs in its own daemon thread via StreamProcessor.

Constructor

from multi_stream_manager import MultiStreamManager
from person_detector import PersonDetector

detector = PersonDetector()
manager = MultiStreamManager(detector, output_dir="person")

__init__()

def __init__(self, detector: PersonDetector, output_dir: str = "person") -> None
detector
PersonDetector
required
PersonDetector instance shared across all streams.
output_dir
str
default:"person"
Base directory for all stream outputs. Each stream creates a subdirectory: {output_dir}/stream_{id}/
Implementation: multi_stream_manager.py:31-34

Methods

process_multiple_streams()

def process_multiple_streams(
    self,
    rtsp_urls: Union[List[str], Dict[Union[int, str], str]],
    frame_skip: int = 15,
    save_mode: Optional[str] = None,
    display: bool = False,
) -> None
Start all streams and block until they finish (or Ctrl+C).
rtsp_urls
Union[List[str], Dict[Union[int, str], str]]
required
RTSP stream URLs. Can be:List format (auto-numbered 1, 2, 3, …):
rtsp_urls = [
    "rtsp://camera1.local/stream",
    "rtsp://camera2.local/stream",
    "rtsp://camera3.local/stream",
]
Dict format (custom IDs):
rtsp_urls = {
    "lobby": "rtsp://lobby-cam.local/stream",
    "parking": "rtsp://parking-cam.local/stream",
    "entrance": "rtsp://entrance-cam.local/stream",
}
frame_skip
int
default:"15"
Analyze every Nth frame (applied to all streams).
save_mode
Optional[str]
default:"None"
"image", "video", or None (applied to all streams).
display
bool
default:"False"
Show a live grid window with all streams. Press ‘q’ to quit.
Behavior:
  1. Creates output directory structure
  2. Optionally starts DisplayManager with grid window
  3. Spawns one daemon thread per stream
  4. Waits for all threads to complete or display window to close
  5. Handles Ctrl+C gracefully
  6. Cleans up display manager on exit
Console Output Example:
Starting person detection on 3 stream(s)...
Created main directory: person
Started thread for stream 1: rtsp://camera1.local/stream
Started thread for stream 2: rtsp://camera2.local/stream
Started thread for stream 3: rtsp://camera3.local/stream
All streams shown in a single grid window. Press 'q' or Ctrl+C to stop...
[Stream 1] Connecting to: rtsp://camera1.local/stream
[Stream 2] Connecting to: rtsp://camera2.local/stream
[Stream 3] Connecting to: rtsp://camera3.local/stream
[Stream 1] Connected successfully! Processing frames...
[Stream 2] Connected successfully! Processing frames...
[Stream 3] Connected successfully! Processing frames...
Implementation: multi_stream_manager.py:36-103 Complete Usage Example:
from person_detector import PersonDetector
from multi_stream_manager import MultiStreamManager

detector = PersonDetector(
    confidence_threshold=0.6,
    person_area_threshold=1500
)

manager = MultiStreamManager(detector, output_dir="captures")

manager.process_multiple_streams(
    rtsp_urls=[
        "rtsp://192.168.1.100/stream",
        "rtsp://192.168.1.101/stream",
        "rtsp://192.168.1.102/stream",
        "rtsp://192.168.1.103/stream",
    ],
    frame_skip=20,
    save_mode="video",
    display=True,
)

DisplayManager

Manages a single resizable grid window that composites frames from multiple streams in real time.

Constructor

from display_manager import DisplayManager

dm = DisplayManager(cell_width=640, cell_height=360)

__init__()

def __init__(self, cell_width: int = 640, cell_height: int = 360) -> None
cell_width
int
default:"640"
Width of each stream cell in the grid (pixels).
cell_height
int
default:"360"
Height of each stream cell in the grid (pixels).
Grid Layout:
  • Automatically calculates grid dimensions based on number of streams
  • Uses square-ish layout: cols = ceil(sqrt(n)), rows = ceil(n / cols)
  • Total window size: (cell_width * cols, cell_height * rows)
Examples:
  • 1 stream: 1×1 grid (640×360)
  • 2 streams: 2×1 grid (1280×360)
  • 3 streams: 2×2 grid (1280×720)
  • 4 streams: 2×2 grid (1280×720)
  • 9 streams: 3×3 grid (1920×1080)
Implementation: display_manager.py:22-30

Properties

is_running

@property
def is_running(self) -> bool
Returns whether the display thread is currently running. Useful for stream threads to detect when the user has closed the display window. Implementation: display_manager.py:36-38

Methods

start()

def start(self, stream_ids: List[Union[int, str]]) -> None
Register stream IDs and start the display thread.
stream_ids
List[Union[int, str]]
required
List of stream identifiers that will be displayed. Order determines grid position (left-to-right, top-to-bottom).Example:
dm.start([1, 2, 3, 4])  # Numeric IDs
dm.start(["lobby", "parking", "entrance"])  # String IDs
Behavior:
  • Initializes frame buffer for each stream (all None initially)
  • Sets is_running = True
  • Spawns daemon thread running _loop()
  • Returns immediately (non-blocking)
Implementation: display_manager.py:40-48

stop()

def stop(self) -> None
Signal the display thread to stop and wait for it to finish (up to 2 seconds). Behavior:
  • Sets is_running = False
  • Waits for display thread with 2-second timeout
  • Destroys all OpenCV windows
  • Clears frame buffer
Implementation: display_manager.py:50-57

update_frame()

def update_frame(self, stream_id: Union[int, str], frame: cv2.typing.MatLike) -> None
Update the display buffer for a specific stream. Thread-safe.
stream_id
Union[int, str]
required
Stream identifier (must match one from start()).
frame
cv2.typing.MatLike
required
OpenCV image matrix to display. Will be resized to (cell_width, cell_height) automatically.
Thread Safety: Protected by internal lock (self._lock). Usage Pattern:
# From stream processing thread
while True:
    ret, frame = cap.read()
    if ret:
        # Annotate frame
        for x, y, w, h, conf in boxes:
            cv2.rectangle(frame, (x, y), (x+w, y+h), (0, 255, 0), 2)
        
        # Update display
        display_manager.update_frame(stream_id, frame)
        
        # Check if display is still running
        if not display_manager.is_running:
            break
Implementation: display_manager.py:59-62

Internal Methods

_build_grid()

def _build_grid(self) -> cv2.typing.MatLike
Compose all buffered frames into a single grid image. Called internally by display loop. Behavior:
  • Locks frame buffer during composition
  • Resizes each frame to cell dimensions
  • Shows “Connecting…” placeholder for streams without frames yet
  • Arranges frames in grid layout
  • Returns single composited image
Implementation: display_manager.py:68-109

_loop()

def _loop(self) -> None
Display thread main loop. Renders the grid at ~30 fps until stopped. Behavior:
  • Creates window: "Person Detection - All Streams"
  • Updates display every 30ms (~33 fps)
  • Stops when is_running = False or user presses ‘q’
  • Destroys window on exit
Implementation: display_manager.py:111-124

Complete Usage Example

import threading
import cv2
from display_manager import DisplayManager

# Initialize display manager
dm = DisplayManager(cell_width=640, cell_height=360)

# Start display with 4 streams
dm.start([1, 2, 3, 4])

# Stream processing function
def process_stream(stream_id, rtsp_url):
    cap = cv2.VideoCapture(rtsp_url)
    while dm.is_running:
        ret, frame = cap.read()
        if ret:
            # Add stream ID label
            cv2.putText(
                frame,
                f"Stream {stream_id}",
                (10, 30),
                cv2.FONT_HERSHEY_SIMPLEX,
                1,
                (0, 255, 0),
                2
            )
            dm.update_frame(stream_id, frame)
    cap.release()

# Launch threads
streams = [
    (1, "rtsp://cam1.local/stream"),
    (2, "rtsp://cam2.local/stream"),
    (3, "rtsp://cam3.local/stream"),
    (4, "rtsp://cam4.local/stream"),
]

threads = []
for stream_id, rtsp_url in streams:
    t = threading.Thread(
        target=process_stream,
        args=(stream_id, rtsp_url),
        daemon=True
    )
    t.start()
    threads.append(t)

# Wait for completion
try:
    for t in threads:
        t.join()
except KeyboardInterrupt:
    print("Stopping...")
finally:
    dm.stop()

Integration Example

Complete example showing how all three classes work together:
from person_detector import PersonDetector
from multi_stream_manager import MultiStreamManager
from config import load_config

# Load configuration
cfg = load_config("config.cfg")

# Initialize detector
detector = PersonDetector(
    confidence_threshold=cfg.confidence_threshold,
    person_area_threshold=cfg.person_area_threshold,
    model_dir=cfg.model_dir
)

# Initialize manager
manager = MultiStreamManager(detector, output_dir=cfg.output_dir)

# Process multiple streams with grid display
manager.process_multiple_streams(
    rtsp_urls={
        "lobby": "rtsp://lobby-cam.local/stream",
        "parking": "rtsp://parking-cam.local/stream",
        "entrance": "rtsp://entrance-cam.local/stream",
        "hallway": "rtsp://hallway-cam.local/stream",
    },
    frame_skip=cfg.frame_skip,
    save_mode="video",
    display=True,
)

Implementation References

  • StreamProcessor: stream_processor.py (435 lines)
  • MultiStreamManager: multi_stream_manager.py (104 lines)
  • DisplayManager: display_manager.py (125 lines)