Source code for pyGigEVision.gvsp

"""GigE Vision Streaming Protocol (GVSP) receiver.

Receives image frames pushed by a camera over UDP as a sequence of
packets:

* Leader packet: image metadata (dimensions, pixel format, timestamp).
* Data packets: raw pixel data chunks.
* Trailer packet: frame complete signal.

The :class:`GVSPReceiver` runs a background thread that reassembles
packets into pre-allocated NumPy buffers, detects gaps in packet
sequence numbers, requests resends directly via the receive socket
(bypassing the GVCP control channel for lower latency), and pushes
completed frames onto a thread-safe queue.

Byte order is configurable via the ``byteswap`` constructor parameter
(set ``True`` when the camera sends data in the opposite endianness
from the host; vendor-dependent).

Notes
-----
Implements aravis-inspired patterns: pre-allocated frame buffers
(direct offset writes, no dict-and-sort assembly), throttled gap
detection (every few packets plus on each socket timeout), three-tier
timeouts (initial gap grace, resend interval, frame retention).

See Also
--------
pyGigEVision.gvcp : The control counterpart (GVCP).
"""

from __future__ import annotations

import contextlib
import logging
import math
import socket
import struct
import threading
import time
from queue import Empty, Queue

import numpy as np

logger = logging.getLogger(__name__)

# GVSP packet types
PACKET_LEADER = 0x01
PACKET_TRAILER = 0x02
PACKET_DATA = 0x03

# Leader payload type IDs
PAYLOAD_IMAGE = 0x0001

# Standard pixel format codes (GenICam PFNC)
PIXEL_MONO8 = 0x01080001
PIXEL_MONO16 = 0x01100007
PIXEL_MONO10 = 0x01100003
PIXEL_MONO12 = 0x01100005
PIXEL_MONO14 = 0x01100025

PIXEL_BPP = {
    PIXEL_MONO8: 1,
    PIXEL_MONO10: 2,
    PIXEL_MONO12: 2,
    PIXEL_MONO14: 2,
    PIXEL_MONO16: 2,
}

PIXEL_DTYPE = {
    PIXEL_MONO8: np.uint8,
    PIXEL_MONO10: np.uint16,
    PIXEL_MONO12: np.uint16,
    PIXEL_MONO14: np.uint16,
    PIXEL_MONO16: np.uint16,
}

# GVCP constants for direct resend from stream socket
_GVCP_PORT = 3956
_GVCP_KEY = 0x42
_GVCP_FLAG_ACK = 0x01
_GVCP_CMD_PACKETRESEND = 0x0040

# Run the gap/timeout scan at most once per this many received packets (plus on
# every socket timeout). Bounds the per-packet cost of the receive loop so it
# sustains line rate under host CPU contention.
_GAP_CHECK_EVERY = 128


class _FrameBuffer:
    """Accumulate packets for a single in-flight frame.

    Internal to :mod:`pyGigEVision.gvsp`.  One ``_FrameBuffer`` is created
    per block ID when its leader packet arrives (or, for data-before-leader
    arrivals, on the first data packet).  The raw image data is stored in a
    pre-allocated :class:`bytearray` so every data packet is written with a
    single slice assignment, with no temporary buffers or sort step.

    Parameters
    ----------
    block_id : int
        GVSP block identifier for this frame.
    """

    def __init__(self, block_id: int) -> None:
        self.block_id = block_id
        self.timestamp = 0
        self.pixel_format = 0
        self.width = 0
        self.height = 0
        self.payload_type = 0
        self.leader_received = False
        self.trailer_received = False
        self.expected_packets = 0
        self.created_at = time.monotonic()
        self.last_packet_at = time.monotonic()

        # Pre-allocated buffer (set up when leader arrives and we know size)
        self._raw_buffer: bytearray | None = None
        self._received: bytearray | None = None  # bitfield: 1=received
        self._last_contiguous = 0  # highest contiguous packet from start
        self._received_count = 0
        self._packet_data_size = 0

        # Resend tracking per-packet
        self._resend_requested: set[int] = set()

    def setup_buffer(self, packet_data_size: int) -> None:
        """Allocate the frame buffer once image dimensions are known.

        Parameters
        ----------
        packet_data_size : int
            Expected data payload size per packet (bytes), used to compute
            the packet count and per-packet buffer offsets.
        """
        if self.width <= 0 or self.height <= 0:
            return

        max_pixels = 2048 * 2048
        if self.width * self.height > max_pixels:
            logger.warning(
                f"Frame {self.block_id}: invalid dimensions {self.width}x{self.height}, skipping"
            )
            return

        bpp = PIXEL_BPP.get(self.pixel_format, 2)
        total_bytes = self.width * self.height * bpp
        self.expected_packets = math.ceil(total_bytes / packet_data_size)
        self._packet_data_size = packet_data_size
        self._raw_buffer = bytearray(total_bytes)
        self._received = bytearray(self.expected_packets + 1)  # 0-indexed unused
        self._last_contiguous = 0
        self._received_count = 0

    def write_packet(self, packet_id: int, payload: bytes) -> None:
        """Write a data packet directly to the correct buffer offset."""
        self.last_packet_at = time.monotonic()

        if self._raw_buffer is not None and self._packet_data_size > 0:
            # Auto-detect actual payload size from first full packet.
            # The assumed _packet_data_size (packet_size - 8) may differ
            # from actual payloads due to extended GVSP headers.
            if packet_id == 1 and len(payload) != self._packet_data_size and len(payload) > 0:
                self._packet_data_size = len(payload)
                bpp = PIXEL_BPP.get(self.pixel_format, 2)
                total_bytes = self.width * self.height * bpp
                self.expected_packets = math.ceil(total_bytes / self._packet_data_size)
                self._received = bytearray(self.expected_packets + 1)

            offset = (packet_id - 1) * self._packet_data_size
            end = min(offset + len(payload), len(self._raw_buffer))
            if offset < len(self._raw_buffer):
                self._raw_buffer[offset:end] = payload[: end - offset]
            if packet_id <= len(self._received) - 1 and not self._received[packet_id]:
                self._received[packet_id] = 1
                self._received_count += 1
                if packet_id == self._last_contiguous + 1:
                    while (
                        self._last_contiguous + 1 < len(self._received)
                        and self._received[self._last_contiguous + 1]
                    ):
                        self._last_contiguous += 1

    def missing_packets(self) -> list[int]:
        """Return list of missing packet IDs (gaps in received set)."""
        if self._received is None or self.expected_packets == 0:
            return []
        return [i for i in range(1, self.expected_packets + 1) if not self._received[i]]

    def is_complete(self) -> bool:
        """Return ``True`` when leader, trailer, and all data packets are received."""
        if not self.leader_received or not self.trailer_received:
            return False
        if self.expected_packets > 0:
            return self._received_count >= self.expected_packets
        return True

    def assemble(self, byteswap: bool = False) -> np.ndarray | None:
        """Assemble the frame from the pre-allocated buffer.

        Parameters
        ----------
        byteswap : bool, optional
            Swap the byte order of each pixel after assembly.  Default is
            ``False``.

        Returns
        -------
        numpy.ndarray or None
            2-D array of shape ``(height, width)`` with the appropriate
            ``dtype`` from :data:`PIXEL_DTYPE`, or ``None`` if the leader
            has not been received or the dimensions are invalid.  The
            returned array is always writable, on both byteswap paths.
        """
        if not self.leader_received:
            return None

        if self.width <= 0 or self.height <= 0:
            return None

        max_pixels = 2048 * 2048
        if self.width * self.height > max_pixels:
            return None

        bpp = PIXEL_BPP.get(self.pixel_format, 2)
        dtype = PIXEL_DTYPE.get(self.pixel_format, np.uint16)
        expected_size = self.width * self.height * bpp

        if self._raw_buffer is not None:
            # Slicing a bytearray yields a fresh writable bytearray, so the
            # frombuffer view below is writable with no extra copy.
            raw: bytearray = self._raw_buffer[:expected_size]
        else:
            raw = bytearray(expected_size)

        arr = np.frombuffer(raw, dtype=dtype)

        if byteswap:
            # byteswap() returns a fresh (writable) array.
            arr = arr.byteswap()

        try:
            return arr.reshape((self.height, self.width))
        except ValueError:
            return arr


[docs] class GVSPReceiver: """Receive GVSP image frames on a UDP socket. Binds a UDP socket on the given *local_ip*/*local_port*, starts a background thread that reassembles GVSP packets into NumPy arrays, and exposes a thread-safe queue via :meth:`get_frame` / :meth:`get_frame_with_info`. Packet resends are sent directly from the stream socket to the camera's GVCP port (3956), bypassing any :class:`~pyGigEVision.gvcp.GVCPClient` lock, which keeps the control channel free for register reads during acquisition. Parameters ---------- local_ip : str, optional Local interface IPv4 address to bind to, e.g. ``"169.254.0.1"``. Empty string (default) lets the OS choose. local_port : int, optional Local UDP port to bind to. ``0`` (default) asks the OS to assign a free port; read back via the :attr:`port` property. max_queue : int, optional Maximum number of completed frames to hold in the internal queue before the oldest is discarded. Default is ``30``. gvcp_client : GVCPClient or None, optional If provided, ``camera_ip`` is read from ``gvcp_client.camera_ip`` when *camera_ip* is empty. The client is not otherwise used. packet_size : int, optional Expected network packet size in bytes (Ethernet MTU). The data payload per packet is ``packet_size - 8``. Default is ``1500`` (standard Ethernet MTU). byteswap : bool, optional Swap the byte order of each pixel after assembly. Set to ``True`` when the camera sends data in the opposite endianness from the host. Default is ``False``. camera_ip : str, optional IPv4 address of the camera, used as the destination for direct resend requests. Falls back to ``gvcp_client.camera_ip`` when *gvcp_client* is supplied. Leave empty to disable resends. initial_packet_timeout : float, optional Grace period in seconds before the first resend request is issued for a gap. Default is ``0.005``. frame_retention : float, optional Maximum time in seconds to keep an incomplete frame before emitting it with whatever data arrived. Default is ``0.200``. Notes ----- The constructor binds the socket immediately. Call :meth:`start` to launch the background receiver thread. The receiver is not thread-safe for concurrent calls to :meth:`start`, :meth:`stop`, or :meth:`close`; those should be called from a single controlling thread. :meth:`get_frame` and :meth:`get_frame_with_info` are safe to call from any thread. Examples -------- Minimal usage, receiving one frame:: receiver = GVSPReceiver(local_ip="169.254.0.1", camera_ip="169.254.67.34") receiver.start() # ... configure camera to stream to receiver.port ... frame = receiver.get_frame(timeout=5.0) receiver.stop() Context-manager style (using :meth:`close` after ``stop``):: receiver = GVSPReceiver(local_ip="169.254.0.1") receiver.start() try: frame, info = receiver.get_frame_with_info(timeout=10.0) finally: receiver.stop() receiver.close() """ def __init__( self, local_ip: str = "", local_port: int = 0, max_queue: int = 30, gvcp_client: object | None = None, packet_size: int = 1500, byteswap: bool = False, camera_ip: str = "", initial_packet_timeout: float = 0.005, frame_retention: float = 0.200, ) -> None: self.local_ip = local_ip self.byteswap = byteswap self._gvcp = gvcp_client self._packet_data_size = packet_size - 8 self._camera_ip = camera_ip or (gvcp_client.camera_ip if gvcp_client else "") # Three-tier timeout strategy (aravis-inspired) self._initial_packet_timeout = initial_packet_timeout self._frame_retention = frame_retention self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) with contextlib.suppress(OSError): self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 16 * 1024 * 1024) self._sock.bind((local_ip, local_port)) self._sock.settimeout(0.05) # short timeout for responsive gap checking self._port = self._sock.getsockname()[1] self._frame_queue: Queue[tuple[np.ndarray, dict]] = Queue(maxsize=max_queue) self._thread: threading.Thread | None = None self._stop_event = threading.Event() self._frame_buffers: dict[int, _FrameBuffer] = {} self._resend_stats = {"requested": 0, "recovered": 0, "failed": 0} self.resend_enabled = True # Resend req_id counter (for direct resend from stream socket) self._resend_req_id = 0 @property def port(self) -> int: """int : UDP port the receiver is bound to. Read the assigned port after construction, especially when *local_port* was ``0`` (OS-assigned):: receiver = GVSPReceiver() print(receiver.port) # e.g. 49152 """ return self._port
[docs] def start(self) -> None: """Start the background receiver thread. Idempotent. Does nothing if the thread is already running. Examples -------- :: receiver = GVSPReceiver(local_ip="169.254.0.1") receiver.start() # ... acquire frames ... receiver.stop() """ if self._thread and self._thread.is_alive(): return self._stop_event.clear() self._thread = threading.Thread(target=self._receive_loop, daemon=True) self._thread.start()
[docs] def stop(self) -> None: """Stop the background receiver thread. Signals the thread to exit and waits up to 5 seconds for it to finish. Clears any in-flight frame buffers. The socket is kept open; call :meth:`close` to release it. """ self._stop_event.set() if self._thread: self._thread.join(timeout=5.0) self._frame_buffers.clear()
[docs] def close(self) -> None: """Stop the receiver and close the UDP socket. After calling ``close``, the receiver must not be used again. """ self.stop() self._sock.close()
[docs] def get_frame(self, timeout: float = 5.0) -> np.ndarray | None: """Block until a frame is available and return it as a NumPy array. Discards the accompanying metadata. Use :meth:`get_frame_with_info` to retrieve metadata alongside the image. Parameters ---------- timeout : float, optional Maximum time in seconds to wait for a frame. Default is ``5.0``. Returns ------- numpy.ndarray or None 2-D array of shape ``(height, width)`` with dtype from :data:`PIXEL_DTYPE`, or ``None`` if no frame arrived within *timeout*. Examples -------- :: receiver.start() frame = receiver.get_frame(timeout=2.0) if frame is not None: print(frame.shape, frame.dtype) """ result = self.get_frame_with_info(timeout) if result is None: return None return result[0]
[docs] def get_frame_with_info(self, timeout: float = 5.0) -> tuple[np.ndarray, dict] | None: """Block until a frame is available and return it with metadata. Parameters ---------- timeout : float, optional Maximum time in seconds to wait for a frame. Default is ``5.0``. Returns ------- tuple of (numpy.ndarray, dict) or None ``(frame, info)`` where *frame* is a 2-D NumPy array and *info* is a dict with the following keys: ``"block_id"`` GVSP block identifier (int). ``"timestamp"`` Camera timestamp from the leader packet (int, camera ticks). ``"pixel_format"`` GenICam PFNC pixel format code, e.g. :data:`PIXEL_MONO16` (int). ``"width"`` Image width in pixels (int). ``"height"`` Image height in pixels (int). ``"missing_packets"`` Number of data packets that were not recovered before the frame was emitted (int; ``0`` for perfect frames). ``"complete"`` ``True`` when every data packet was received, ``False`` when the frame was emitted with one or more packets missing (bool). Equivalent to ``missing_packets == 0``; lets callers reject partial, zero-filled frames without inspecting the count. Returns ``None`` if no frame arrived within *timeout*. Examples -------- :: result = receiver.get_frame_with_info(timeout=3.0) if result is not None: frame, info = result print(info["block_id"], info["missing_packets"]) """ try: return self._frame_queue.get(timeout=timeout) except Empty: return None
[docs] def reset_resend_stats(self) -> None: """Zero the resend counters (``requested``/``recovered``/``failed``). The counters otherwise accumulate for the lifetime of the receiver. Call this before a fresh download to get per-download resend figures. """ self._resend_stats = {"requested": 0, "recovered": 0, "failed": 0}
# --- Internal --- def _receive_loop(self) -> None: """Main receiver loop with real-time gap detection.""" since_check = 0 while not self._stop_event.is_set(): try: data, addr = self._sock.recvfrom(65536) except TimeoutError: # No packet received; check for gaps and stale frames self._check_gaps_and_timeouts() since_check = 0 continue except OSError: break if len(data) < 8: continue self._parse_packet(data) # Gap detection is throttled to every _GAP_CHECK_EVERY packets # (plus every socket timeout) so the per-packet receive cost stays # low enough to sustain line rate under host CPU contention. # Completed frames still emit immediately on trailer receipt. since_check += 1 if since_check >= _GAP_CHECK_EVERY: self._check_gaps_and_timeouts() since_check = 0 def _parse_packet(self, data: bytes) -> None: """Parse a single GVSP packet.""" format_byte = data[4] # Bit 7 of byte 4 = bit 31 of packet_infos = extended ID flag extended = bool(format_byte & 0x80) if extended: # Extended header (GigE Vision 2.x): # [0:2] status, [2:4] block_id_high, [4:8] packet_infos, # [8:16] block_id_64, [16:20] packet_id_32 if len(data) < 20: return packet_infos = struct.unpack(">I", data[4:8])[0] packet_type = (packet_infos >> 24) & 0x0F block_id = struct.unpack(">Q", data[8:16])[0] packet_id = struct.unpack(">I", data[16:20])[0] header_size = 20 else: block_id = struct.unpack(">H", data[2:4])[0] packet_type = format_byte & 0x07 packet_id = (data[5] << 16) | (data[6] << 8) | data[7] header_size = 8 payload = data[header_size:] if packet_type == PACKET_LEADER: self._handle_leader(block_id, payload) elif packet_type == PACKET_DATA: self._handle_data(block_id, packet_id, payload) elif packet_type == PACKET_TRAILER: self._handle_trailer(block_id, payload) def _handle_leader(self, block_id: int, payload: bytes) -> None: """Parse leader packet and pre-allocate frame buffer.""" buf = _FrameBuffer(block_id) if len(payload) >= 24: buf.payload_type = struct.unpack(">H", payload[2:4])[0] buf.timestamp = struct.unpack(">Q", payload[4:12])[0] buf.pixel_format = struct.unpack(">I", payload[12:16])[0] buf.width = struct.unpack(">I", payload[16:20])[0] buf.height = struct.unpack(">I", payload[20:24])[0] buf.leader_received = True buf.setup_buffer(self._packet_data_size) self._frame_buffers[block_id] = buf _MAX_CONCURRENT_FRAMES = 100 def _handle_data(self, block_id: int, packet_id: int, payload: bytes) -> None: """Write data packet directly to pre-allocated frame buffer.""" if block_id not in self._frame_buffers: if len(self._frame_buffers) >= self._MAX_CONCURRENT_FRAMES: oldest = min(self._frame_buffers, key=lambda k: self._frame_buffers[k].created_at) self._frame_buffers.pop(oldest, None) self._frame_buffers[block_id] = _FrameBuffer(block_id) self._frame_buffers[block_id].write_packet(packet_id, payload) def _handle_trailer(self, block_id: int, payload: bytes) -> None: """Handle trailer packet, emit completed frame.""" if block_id not in self._frame_buffers: return buf = self._frame_buffers[block_id] buf.trailer_received = True # If we didn't get a leader (no pre-allocated buffer), calculate # expected packets now for the missing count if buf.expected_packets == 0 and buf.leader_received and buf.width > 0 and buf.height > 0: bpp = PIXEL_BPP.get(buf.pixel_format, 2) total_bytes = buf.width * buf.height * bpp buf.expected_packets = math.ceil(total_bytes / self._packet_data_size) # Log missing packets missing = buf.missing_packets() if missing: self._resend_stats["failed"] += len(missing) logger.warning( f"Frame {block_id}: {len(missing)}/{buf.expected_packets} packets unrecoverable" ) # Count packets that were resend-requested and ultimately arrived. if buf._resend_requested and buf._received is not None: self._resend_stats["recovered"] += sum( 1 for p in buf._resend_requested if p < len(buf._received) and buf._received[p] ) # Assemble and emit self._emit_frame(buf) def _emit_frame(self, buf: _FrameBuffer) -> None: """Assemble frame and put it on the output queue.""" frame = buf.assemble(byteswap=self.byteswap) if frame is not None: missing_packets = max( 0, buf.expected_packets - buf._received_count if buf.expected_packets > 0 else 0 ) info = { "block_id": buf.block_id, "timestamp": buf.timestamp, "pixel_format": buf.pixel_format, "width": buf.width, "height": buf.height, "missing_packets": missing_packets, "complete": missing_packets == 0, } if self._frame_queue.full(): with contextlib.suppress(Empty): self._frame_queue.get_nowait() self._frame_queue.put((frame, info)) self._frame_buffers.pop(buf.block_id, None) def _check_gaps_and_timeouts(self) -> None: """Real-time gap detection and frame retention timeout. Called every ``_GAP_CHECK_EVERY`` received packets and on socket timeouts. - Requests resend for packets missing longer than initial_packet_timeout - Emits or drops frames older than frame_retention """ now = time.monotonic() to_remove = [] for block_id, buf in list(self._frame_buffers.items()): age = now - buf.created_at since_last = now - buf.last_packet_at # Frame retention timeout: emit whatever we have if ( since_last > self._frame_retention and buf.leader_received and (buf.trailer_received or age > self._frame_retention * 2) ): self._emit_frame(buf) to_remove.append(block_id) continue # Real-time gap detection: request resend for missing packets if ( self.resend_enabled and self._camera_ip and buf.leader_received and buf.expected_packets > 0 and age > self._initial_packet_timeout ): missing = buf.missing_packets() # Only resend packets we haven't already requested new_missing = [p for p in missing if p not in buf._resend_requested] if new_missing: # Cap at 25% of frame packets per request (aravis default) max_resend = max(1, buf.expected_packets // 4) to_resend = new_missing[:max_resend] self._send_resend_direct(block_id, to_resend) buf._resend_requested.update(to_resend) # Hard timeout: drop frame if age > self._frame_retention * 5: to_remove.append(block_id) for bid in to_remove: self._frame_buffers.pop(bid, None) def _send_resend_direct(self, block_id: int, packet_ids: list[int]) -> None: """Send PACKETRESEND directly from the stream socket. Sends to camera's GVCP port (3956) from the stream socket, avoiding the GVCP client lock. This is the aravis approach. """ if not packet_ids: return ranges = self._contiguous_ranges(packet_ids) for first, last in ranges: self._resend_req_id = (self._resend_req_id + 1) & 0xFFFF if self._resend_req_id == 0: self._resend_req_id = 1 payload = struct.pack(">HHII", 0, block_id, first, last) header = struct.pack( ">BBHHH", _GVCP_KEY, _GVCP_FLAG_ACK, _GVCP_CMD_PACKETRESEND, len(payload), self._resend_req_id, ) try: self._sock.sendto(header + payload, (self._camera_ip, _GVCP_PORT)) self._resend_stats["requested"] += last - first + 1 except OSError: pass @staticmethod def _contiguous_ranges(ids: list[int]) -> list[tuple[int, int]]: """Group sorted packet IDs into contiguous (first, last) ranges.""" if not ids: return [] ids = sorted(ids) ranges = [] first = last = ids[0] for pid in ids[1:]: if pid == last + 1: last = pid else: ranges.append((first, last)) first = last = pid ranges.append((first, last)) return ranges