"""GigE Vision Control Protocol (GVCP) client.
Implements the UDP-based control protocol from the GigE Vision
specification: device discovery (broadcast), register read/write
(``READREG``/``WRITEREG``), bulk memory access (``READMEM``), and
heartbeat keepalive for the ``Control Channel Privilege`` register.
The protocol runs on UDP port 3956. Each packet is an 8-byte header
followed by a payload. Header layout::
key(1B=0x42) flag(1B) command(2B) payload_len(2B) req_id(2B)
ACK packets use an 8-byte response header::
status(2B) ack_cmd(2B) length(2B) ack_id(2B)
This module is vendor-agnostic. The same code works against any GigE
Vision compliant camera; vendor-specific register addresses and features
are layered on top by vendor drivers.
See Also
--------
pyGigEVision.gvsp : The streaming counterpart (GVSP).
pyGigEVision.standard : GigE Vision spec register addresses.
pyGigEVision.bootstrap : Convenience helper to perform the standard
boot sequence (open GVCP, take CCP, start heartbeat, fetch XML).
"""
from __future__ import annotations
import contextlib
import ipaddress
import select
import socket
import struct
import threading
import time
import psutil
from .standard import REG_CCP
# --- GVCP Constants ---
GVCP_PORT = 3956
GVCP_KEY = 0x42
FLAG_ACK = 0x01
FLAG_BROADCAST = 0x11
# Commands
CMD_DISCOVERY = 0x0002
CMD_FORCEIP = 0x0004
CMD_READREG = 0x0080
CMD_WRITEREG = 0x0082
CMD_READMEM = 0x0084
CMD_WRITEMEM = 0x0086
CMD_PACKETRESEND = 0x0040
# Status codes
STATUS_SUCCESS = 0x0000
STATUS_NAMES = {
0x0000: "SUCCESS",
0x8001: "NOT_IMPLEMENTED",
0x8002: "INVALID_PARAMETER",
0x8003: "INVALID_ADDRESS",
0x8004: "WRITE_PROTECT",
0x8005: "BAD_ALIGNMENT",
0x8006: "ACCESS_DENIED",
0x8007: "BUSY",
0x800C: "PACKET_NOT_YET_AVAILABLE",
0x800D: "PACKET_AND_PREV_REMOVED",
0x800E: "PACKET_REMOVED",
0x8FFF: "GENERIC_ERROR",
}
# Max payload for READMEM (safe for standard Ethernet)
READMEM_CHUNK = 512
def _enumerate_interfaces() -> list[tuple[str, str]]:
"""Return ``(ip, netmask)`` for every up, non-loopback IPv4 interface."""
out: list[tuple[str, str]] = []
stats = psutil.net_if_stats()
for name, addrs in psutil.net_if_addrs().items():
st = stats.get(name)
if st is None or not st.isup:
continue
for a in addrs:
if a.family == socket.AF_INET and a.address and not a.address.startswith("127."):
out.append((a.address, a.netmask or "255.255.255.0"))
return out
def _subnet_broadcasts_for(ip: str, netmask: str | None) -> list[str]:
"""Return broadcast destinations for an interface: global plus directed.
Parameters
----------
ip : str
IPv4 address of the local interface.
netmask : str or None
Subnet mask string (e.g. ``"255.255.255.0"``). When ``None``, a
``/16`` subnet is assumed and the directed broadcast is derived from
the first two octets of *ip*.
Returns
-------
list of str
Always contains ``"255.255.255.255"``; additionally contains the
directed subnet broadcast address derived from *ip* and *netmask*.
"""
targets = ["255.255.255.255"]
try:
if netmask:
net = ipaddress.IPv4Network(f"{ip}/{netmask}", strict=False)
targets.append(str(net.broadcast_address))
else:
parts = ip.split(".")
targets.append(f"{parts[0]}.{parts[1]}.255.255")
except (ValueError, IndexError):
pass
return targets
def _parse_discovery_ack(data: bytes, src_ip: str) -> dict | None:
"""Parse one GVCP discovery ACK into a camera dict, or ``None`` if invalid.
Handles both the standard GigE Vision discovery ACK layout and the
extended layout used by some cameras that shift the string fields by
24 bytes. The extended layout is detected when the manufacturer field
at the extended offset is non-empty and the standard offset is empty.
Parameters
----------
data : bytes
Raw UDP payload received from the camera (8-byte GVCP header +
discovery ACK payload). Must be at least 256 bytes.
src_ip : str
Source IPv4 address of the packet (used as the ``"ip"`` field in
the returned dict).
Returns
-------
dict or None
Camera information dict with keys ``"ip"``, ``"spec_version"``,
``"manufacturer"``, ``"model"``, ``"device_version"``,
``"manufacturer_info"``, ``"serial"``, ``"user_name"``; or ``None``
if *data* is too short to contain a valid ACK.
"""
if len(data) < 256:
return None
payload = data[8:]
def _str(offset: int, size: int) -> str:
return payload[offset : offset + size].split(b"\x00")[0].decode("ascii", errors="replace")
mfr_ext = _str(72, 32)
mfr_std = _str(48, 32)
spec_version = f"{struct.unpack('>H', payload[0:2])[0]}.{struct.unpack('>H', payload[2:4])[0]}"
mac = ":".join(f"{b:02x}" for b in payload[10:16])
if mfr_ext and not mfr_std:
return {
"ip": src_ip,
"spec_version": spec_version,
"mac": mac,
"manufacturer": mfr_ext,
"model": _str(104, 32),
"device_version": _str(136, 32),
"manufacturer_info": _str(168, 48),
"serial": _str(216, 16),
"user_name": _str(232, 16),
}
return {
"ip": src_ip,
"spec_version": spec_version,
"mac": mac,
"manufacturer": mfr_std,
"model": _str(80, 32),
"device_version": _str(112, 32),
"manufacturer_info": _str(144, 48),
"serial": _str(192, 16),
"user_name": _str(208, 16),
}
[docs]
class GVCPError(Exception):
"""GVCP protocol error raised when the camera returns a non-SUCCESS status.
Attributes
----------
status : int
Numeric GVCP status code, e.g. ``0x8006`` for ACCESS_DENIED.
status_name : str
Human-readable name from :data:`STATUS_NAMES`, or
``"UNKNOWN_0xXXXX"`` for unrecognised codes.
Parameters
----------
message : str
Short description of what operation failed.
status : int, optional
GVCP status code returned by the camera. Default is ``0``
(``SUCCESS``), used when the error is locally generated (e.g.
timeout after all retries).
Examples
--------
>>> err = GVCPError("Register read failed", 0x8006)
>>> err.status
32774
>>> err.status_name
'ACCESS_DENIED'
"""
def __init__(self, message: str, status: int = 0) -> None:
self.status = status
self.status_name = STATUS_NAMES.get(status, f"UNKNOWN_0x{status:04X}")
super().__init__(f"{message} (status: {self.status_name})")
[docs]
class GVCPClient:
"""GigE Vision Control Protocol client for camera register access.
Manages a single UDP socket to a specific camera, handles request/ACK
sequencing (including stale-ACK discard and PENDING_ACK extension),
takes and releases the ``Control Channel Privilege`` (CCP) register,
and maintains a background heartbeat thread to keep the session alive.
Parameters
----------
camera_ip : str
IPv4 address of the camera, e.g. ``"169.254.67.34"``.
local_ip : str or None, optional
Local network interface address to bind the GVCP socket to.
``None`` (default) lets the OS choose the interface based on
the camera's IP and routing rules.
timeout : float, optional
Socket timeout in seconds for the initial connection and the
overall socket. Default is ``2.0``.
Notes
-----
The constructor does not perform any network I/O. Call
:meth:`connect` (or use the context-manager form) to acquire control
privilege and start the heartbeat thread.
The client is safe for concurrent use: :meth:`read_reg`,
:meth:`write_reg`, :meth:`read_mem`, :meth:`read_float`,
:meth:`write_float`, and :meth:`send_packetresend` all acquire the
internal ``_lock`` before touching the socket.
Examples
--------
Using the context manager (recommended)::
with GVCPClient("169.254.67.34") as cam:
width = cam.read_reg(0xD300)
exposure = cam.read_float(0xE808)
cam.write_float(0xE808, 100.0)
Manual lifecycle::
client = GVCPClient("169.254.67.34")
client.connect()
try:
width = client.read_reg(0xD300)
finally:
client.disconnect()
"""
def __init__(
self,
camera_ip: str,
local_ip: str | None = None,
timeout: float = 2.0,
) -> None:
self.camera_ip = camera_ip
self.local_ip = local_ip or ""
self.timeout = timeout
self._sock: socket.socket | None = None
self._lock = threading.Lock()
self._req_id = 0
self._connected = False
self._control_lost = False
self._heartbeat_thread: threading.Thread | None = None
self._heartbeat_stop = threading.Event()
self._n_retries = 3
self._cmd_timeout = 0.5 # seconds per attempt
# --- Context Manager ---
def __enter__(self) -> GVCPClient:
"""Enter the context manager by calling :meth:`connect`.
Returns
-------
GVCPClient
``self``, so the ``as`` clause captures the connected client.
"""
self.connect()
return self
def __exit__(self, *args: object) -> None:
"""Exit the context manager by calling :meth:`disconnect`."""
self.disconnect()
# --- Discovery ---
[docs]
@staticmethod
def discover(interface_ip: str = "", timeout: float = 2.0) -> list[dict]:
"""Broadcast a GVCP discovery packet and return all responding cameras.
When *interface_ip* is empty (the default), discovery sweeps every
host interface: it binds a dedicated socket per NIC and sends both
the global broadcast (``255.255.255.255``) and the per-subnet
broadcast for each interface, then merges the results. When
*interface_ip* is given, a single socket bound to that interface is
used. All sends go to UDP port 3956 and each discovery ACK is
parsed into a dictionary.
Both the standard GigE Vision discovery ACK layout and the extended
layout (used by some cameras that prepend 24 extra bytes before the
string fields) are handled. Duplicate responses from the same IP
are deduplicated.
Parameters
----------
interface_ip : str, optional
Local interface IPv4 address to bind the socket to, e.g.
``"169.254.0.1"``. Empty string (default) lets the OS choose,
which sends the broadcast on all active interfaces.
timeout : float, optional
How long to wait for discovery responses, in seconds.
Default is ``2.0``.
Returns
-------
list of dict
One entry per discovered camera. Each dict has keys:
``"ip"``
IPv4 address string of the camera.
``"spec_version"``
GigE Vision spec version the camera reports, e.g. ``"1.2"``.
``"manufacturer"``
Manufacturer name string.
``"model"``
Model name string.
``"device_version"``
Firmware / device version string.
``"manufacturer_info"``
Additional manufacturer info string.
``"serial"``
Serial number string.
``"user_name"``
User-assigned name string (may be empty).
``"interface_ip"``
IPv4 address of the local host interface whose socket
received this camera's discovery reply, or ``""`` when the
OS chose the socket. Lets a caller bind the same interface
for follow-up unicast control.
Examples
--------
>>> cameras = GVCPClient.discover(interface_ip="169.254.0.1", timeout=1.0)
>>> for cam in cameras:
... print(cam["ip"], cam["model"])
"""
pkt = struct.pack(">BBHHH", GVCP_KEY, FLAG_BROADCAST, CMD_DISCOVERY, 0, 0xFFFF)
if interface_ip:
targets = [(interface_ip, _subnet_broadcasts_for(interface_ip, None))]
else:
targets = [
(ip, _subnet_broadcasts_for(ip, mask)) for ip, mask in _enumerate_interfaces()
]
if not targets:
targets = [("", ["255.255.255.255"])]
socks: list[socket.socket] = []
sock_bind: dict[socket.socket, str] = {}
try:
for bind_ip, bcasts in targets:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
if bind_ip:
try:
sock.bind((bind_ip, 0))
except OSError:
sock.close()
continue
for dest in bcasts:
with contextlib.suppress(OSError):
sock.sendto(pkt, (dest, GVCP_PORT))
socks.append(sock)
sock_bind[sock] = bind_ip
cameras: list[dict] = []
seen_ips: set[str] = set()
deadline = time.monotonic() + timeout
while True:
remaining = deadline - time.monotonic()
if remaining <= 0:
break
rlist, _, _ = select.select(socks, [], [], remaining)
if not rlist:
break
for sock in rlist:
try:
data, addr = sock.recvfrom(4096)
except OSError:
continue
if addr[0] in seen_ips:
continue
seen_ips.add(addr[0])
cam = _parse_discovery_ack(data, addr[0])
if cam is not None:
cam["interface_ip"] = sock_bind.get(sock, "")
cameras.append(cam)
return cameras
finally:
for sock in socks:
sock.close()
[docs]
@staticmethod
def force_ip(mac, ip: str, mask: str, gateway: str = "0.0.0.0", interface_ip: str = "") -> None:
"""Broadcast a GVCP FORCEIP command to assign an IP to a camera by MAC.
Re-homes a camera that is on the wrong subnet (or fell back to
link-local) without touching host NIC configuration. The camera
reboots its IP stack, so no ACK is expected.
When *interface_ip* is empty (the default), the FORCEIP packet is
sent on every active host interface (mirroring :meth:`discover`),
because the target camera may be reachable only via a specific NIC
in a multi-NIC link-local setup. When *interface_ip* is given, a
single socket bound to that interface is used.
Parameters
----------
mac : str or bytes
Target camera MAC, as ``"aa:bb:cc:dd:ee:ff"`` or 6 raw bytes.
ip, mask : str
New IPv4 address and subnet mask for the camera.
gateway : str, optional
Default gateway. Default ``"0.0.0.0"`` (none).
interface_ip : str, optional
Local interface IPv4 address to bind the socket to, e.g.
``"169.254.0.1"``. Empty string (default) sweeps every active
interface.
Notes
-----
FORCEIP is a broadcast command and is not acknowledged by the camera. The
camera reboots its IP stack to apply the new address, so allow a short delay
(about one second) before re-running discovery to see it at the new IP. The
assignment is not persistent; power-cycling the camera returns it to its
configured startup behavior (typically DHCP then link-local).
"""
if isinstance(mac, str):
mac_bytes = bytes.fromhex(mac.replace(":", "").replace("-", ""))
else:
mac_bytes = bytes(mac)
if len(mac_bytes) != 6:
raise ValueError(f"MAC must be 6 bytes, got {len(mac_bytes)}")
# FORCEIP_CMD payload layout (GigE Vision spec): a 2-byte reserved field,
# the 6-byte MAC, then the static IP, subnet mask, and gateway each in a
# 16-byte slot (12 reserved bytes + 4-byte value). 56 bytes total.
payload = bytearray(56)
payload[2:8] = mac_bytes
payload[20:24] = socket.inet_aton(ip)
payload[36:40] = socket.inet_aton(mask)
payload[52:56] = socket.inet_aton(gateway)
pkt = struct.pack(">BBHHH", GVCP_KEY, FLAG_BROADCAST, CMD_FORCEIP, len(payload), 1) + bytes(
payload
)
# Determine which interfaces to send on, mirroring discover().
if interface_ip:
targets = [(interface_ip, _subnet_broadcasts_for(interface_ip, None))]
else:
targets = [
(iface_ip, _subnet_broadcasts_for(iface_ip, netmask))
for iface_ip, netmask in _enumerate_interfaces()
]
if not targets:
targets = [("", ["255.255.255.255"])]
for bind_ip, bcasts in targets:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
if bind_ip:
try:
sock.bind((bind_ip, 0))
except OSError:
continue
for dest in bcasts:
with contextlib.suppress(OSError):
sock.sendto(pkt, (dest, GVCP_PORT))
finally:
sock.close()
# --- Connection ---
[docs]
def connect(self, force: bool = True) -> None:
"""Open the UDP socket, take CCP control, and start the heartbeat thread.
If another application (or a stale previous session) holds the CCP
control privilege and *force* is ``True``, this method polls with
1-second intervals until the remote heartbeat timeout expires and the
lock is released automatically by the camera. This handles the
common scenario where a previous Python session crashed without
calling :meth:`disconnect`.
A call on an already-connected client is a no-op.
Parameters
----------
force : bool, optional
If ``True`` (default), retry on ``ACCESS_DENIED`` for up to
15 seconds, printing a warning on the first retry. If
``False``, raise :exc:`GVCPError` immediately when access is
denied.
Raises
------
GVCPError
If the CCP register write fails for any reason other than
``ACCESS_DENIED``, or if *force* is ``True`` but the 15-second
retry window expires without gaining control.
OSError
If the UDP socket cannot be created or bound.
"""
if self._connected:
return
self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
if self.local_ip:
self._sock.bind((self.local_ip, 0))
self._sock.settimeout(self.timeout)
# Take control: poll on ACCESS_DENIED until old session's
# heartbeat times out.
max_wait = 15.0 # seconds; generous upper bound
deadline = time.monotonic() + max_wait
attempt = 0
try:
while True:
try:
self._write_reg_raw(REG_CCP, 0x00000002)
break # success
except GVCPError as e:
if e.status == 0x8006 and force: # ACCESS_DENIED
attempt += 1
if attempt == 1:
print(
"ACCESS_DENIED: waiting for stale CCP lock to expire...", flush=True
)
if time.monotonic() >= deadline:
raise GVCPError(
"Could not take CCP control after "
f"{max_wait:.0f}s; another application may "
"be actively connected",
0x8006,
) from e
time.sleep(1.0)
else:
raise
except Exception:
self._sock.close()
self._sock = None
raise
self._connected = True
self._control_lost = False
# Start heartbeat
self._heartbeat_stop.clear()
self._heartbeat_thread = threading.Thread(target=self._heartbeat_loop, daemon=True)
self._heartbeat_thread.start()
[docs]
def disconnect(self) -> None:
"""Stop the heartbeat, release CCP control, and close the socket.
Releases the ``Control Channel Privilege`` register (writes 0x00000000
to ``REG_CCP``) so other applications can immediately connect without
waiting for the heartbeat timeout. Any errors during the release are
silently suppressed to ensure the socket is always closed.
A call on a client that is not connected is a no-op.
"""
if not self._connected:
return
self._heartbeat_stop.set()
if self._heartbeat_thread:
self._heartbeat_thread.join(timeout=5.0)
with contextlib.suppress(OSError, GVCPError):
self._write_reg_raw(REG_CCP, 0x00000000)
self._connected = False
if self._sock:
self._sock.close()
self._sock = None
# --- Register Access ---
[docs]
def read_reg(self, addr: int) -> int:
"""Read a single 32-bit register from the camera.
Sends a ``READREG`` command and returns the raw unsigned 32-bit value.
Thread-safe: acquires the internal socket lock before sending.
Parameters
----------
addr : int
Register address (32-bit unsigned). Use constants from
:mod:`pyGigEVision.standard` for spec-defined registers, or
vendor-specific addresses from the camera's GenICam XML.
Returns
-------
int
Raw register value as a 32-bit unsigned integer.
Raises
------
GVCPError
If the camera returns a non-SUCCESS status, or if all retry
attempts time out.
Examples
--------
>>> with GVCPClient("169.254.67.34") as cam:
... width = cam.read_reg(0xD300)
... print(f"Width: {width}")
"""
with self._lock:
return self._read_reg_raw(addr)
[docs]
def read_float(self, addr: int) -> float:
"""Read a register and interpret it as an IEEE 754 big-endian float.
Reads the raw 32-bit value from *addr* via :meth:`read_reg` and
reinterprets the bit pattern as a single-precision float.
Parameters
----------
addr : int
Register address (32-bit unsigned) of the float-valued register.
Returns
-------
float
The register value reinterpreted as a 32-bit IEEE 754
single-precision float.
Raises
------
GVCPError
If the underlying :meth:`read_reg` call fails.
Examples
--------
>>> with GVCPClient("169.254.67.34") as cam:
... exposure_us = cam.read_float(0xE808)
"""
raw = self.read_reg(addr)
return struct.unpack(">f", struct.pack(">I", raw))[0]
[docs]
def write_reg(self, addr: int, value: int) -> None:
"""Write a 32-bit unsigned integer to a camera register.
Sends a ``WRITEREG`` command. Thread-safe: acquires the internal
socket lock before sending.
Parameters
----------
addr : int
Register address (32-bit unsigned).
value : int
Value to write (32-bit unsigned, 0–4294967295).
Raises
------
GVCPError
If the camera returns a non-SUCCESS status (e.g. ``WRITE_PROTECT``,
``ACCESS_DENIED``), or if all retry attempts time out.
Examples
--------
>>> with GVCPClient("169.254.67.34") as cam:
... cam.write_reg(0xD300, 640)
"""
with self._lock:
self._write_reg_raw(addr, value)
[docs]
def write_float(self, addr: int, value: float) -> None:
"""Write a float value to a camera register as IEEE 754 big-endian.
Packs *value* as a 32-bit single-precision float and writes the raw
bit pattern to the register at *addr* via :meth:`write_reg`.
Parameters
----------
addr : int
Register address (32-bit unsigned) of the float-valued register.
value : float
Value to write. The float is packed with big-endian byte order
before transmission.
Raises
------
GVCPError
If the underlying :meth:`write_reg` call fails.
Examples
--------
>>> with GVCPClient("169.254.67.34") as cam:
... cam.write_float(0xE808, 1000.0) # set exposure to 1000 µs
"""
raw = struct.unpack(">I", struct.pack(">f", value))[0]
self.write_reg(addr, raw)
[docs]
def read_mem(self, addr: int, size: int) -> bytes:
"""Read a contiguous block of camera memory.
Splits the request into chunks of at most :data:`READMEM_CHUNK` bytes
(512 bytes, safe for standard Ethernet) and concatenates the results.
Each chunk is read with the lock held, so concurrent register
operations may interleave between chunks.
The GigE Vision spec requires every READMEM byte-count to be a
multiple of 4, and some cameras reject unaligned reads. Each chunk
count is therefore rounded up to a 4-byte multiple on the wire and
the returned bytes are trimmed back to the exact *size* requested.
Parameters
----------
addr : int
Start address of the memory block (32-bit unsigned).
size : int
Number of bytes to read. If *size* is 0, an empty ``bytes``
object is returned immediately.
Returns
-------
bytes
Raw memory contents, exactly *size* bytes.
Raises
------
GVCPError
If any ``READMEM`` chunk fails.
Examples
--------
>>> with GVCPClient("169.254.67.34") as cam:
... xml_url = cam.read_mem(0x0200, 512)
... print(xml_url.split(b"\\x00")[0].decode())
"""
result = bytearray()
offset = 0
while offset < size:
want = min(READMEM_CHUNK, size - offset)
aligned = (want + 3) & ~3 # round up to a 4-byte multiple
with self._lock:
data = self._read_mem_raw(addr + offset, aligned)
result.extend(data[:want])
offset += want
return bytes(result)
# --- Internal Packet Methods ---
def _next_id(self) -> int:
"""Increment and return the next request ID (1–65535, wraps at 0xFFFF).
The value 0 is skipped; after 0xFFFF the counter resets to 1.
"""
self._req_id = (self._req_id + 1) & 0xFFFF
if self._req_id == 0:
self._req_id = 1
return self._req_id
def _send_cmd(self, flag: int, cmd: int, payload: bytes = b"") -> bytes:
"""Send a GVCP command packet and return the raw ACK data.
Builds and sends an 8-byte GVCP header followed by *payload*.
Reads response packets until one with a matching request ID arrives
or all retries are exhausted.
Stale ACKs (wrong ``ack_id``) are silently discarded. Runt packets
shorter than 8 bytes are also discarded. A ``PENDING_ACK`` (command
code ``0x0089``) extends the per-attempt deadline by the number of
milliseconds indicated in the response payload, bounded by a hard
30-second absolute deadline, but only when its request id matches the
current command; a stale ``PENDING_ACK`` for an old command is
discarded like any other non-matching packet.
Parameters
----------
flag : int
GVCP flag byte, e.g. :data:`FLAG_ACK` or :data:`FLAG_BROADCAST`.
cmd : int
GVCP command code, e.g. :data:`CMD_READREG`.
payload : bytes, optional
Command payload bytes. Default is empty (no payload).
Returns
-------
bytes
Raw ACK packet bytes, including the 8-byte ACK header.
Raises
------
GVCPError
If the camera returns a non-SUCCESS status in the ACK, or if
all ``_n_retries`` attempts time out without a matching ACK.
"""
req_id = self._next_id()
header = struct.pack(">BBHHH", GVCP_KEY, flag, cmd, len(payload), req_id)
hard_deadline = time.monotonic() + 30.0
for _attempt in range(self._n_retries):
self._sock.sendto(header + payload, (self.camera_ip, GVCP_PORT))
deadline = time.monotonic() + self._cmd_timeout
while time.monotonic() < deadline:
remaining = deadline - time.monotonic()
if remaining <= 0:
break
self._sock.settimeout(max(remaining, 0.01))
try:
data, _ = self._sock.recvfrom(8192)
except TimeoutError:
break # this attempt timed out, retry
if len(data) < 8:
continue # runt packet, ignore
ack_status = struct.unpack(">H", data[0:2])[0]
ack_cmd = struct.unpack(">H", data[2:4])[0]
ack_id = struct.unpack(">H", data[6:8])[0]
# Handle PENDING_ACK: camera needs more time. Only honor it
# when its request id matches the current command; a stale
# PENDING_ACK for an old command must not extend this one.
if ack_cmd == 0x0089:
if ack_id == req_id and len(data) >= 12:
pending_ms = struct.unpack(">I", data[8:12])[0]
new_deadline = time.monotonic() + pending_ms / 1000.0
deadline = min(new_deadline, hard_deadline)
continue
# Stale ACK from a previous command; discard
if ack_id != req_id:
continue
# Got our response
if ack_status != STATUS_SUCCESS:
raise GVCPError(f"Command 0x{cmd:04X} failed", ack_status)
return data
raise GVCPError(f"Timeout waiting for ACK (cmd=0x{cmd:04X}, {self._n_retries} retries)")
def _read_reg_raw(self, addr: int) -> int:
"""Send a READREG command and return the 32-bit result (not locked)."""
payload = struct.pack(">I", addr)
data = self._send_cmd(FLAG_ACK, CMD_READREG, payload)
return struct.unpack(">I", data[8:12])[0]
def _write_reg_raw(self, addr: int, value: int) -> None:
"""Send a WRITEREG command for a single register (not locked)."""
payload = struct.pack(">II", addr, value)
self._send_cmd(FLAG_ACK, CMD_WRITEREG, payload)
def _read_mem_raw(self, addr: int, size: int) -> bytes:
"""Send a READMEM command and return the raw memory bytes (not locked).
The READMEM ACK layout is: 8-byte ACK header + 4-byte address echo
+ data, so payload starts at byte offset 12.
"""
payload = struct.pack(">IHH", addr, 0, size)
data = self._send_cmd(FLAG_ACK, CMD_READMEM, payload)
# READMEM ACK: header(8) + address(4) + data
return data[12:]
# --- Packet Resend ---
[docs]
def send_packetresend(
self,
block_id: int,
first_packet_id: int,
last_packet_id: int,
stream_channel: int = 0,
) -> None:
"""Request retransmission of missing GVSP stream packets.
Sends a ``CMD_PACKETRESEND`` command asking the camera to re-send the
specified packet range for a given stream block. Used by
:class:`pyGigEVision.gvsp.GVSPReceiver` to recover from packet loss.
Parameters
----------
block_id : int
The GVSP block (frame) identifier for which packets are missing.
first_packet_id : int
ID of the first missing packet within the block.
last_packet_id : int
ID of the last missing packet within the block (inclusive).
stream_channel : int, optional
GVSP stream channel index. Default is ``0`` (the only channel
on most cameras).
Raises
------
GVCPError
If the camera returns a non-SUCCESS status or the request times
out after all retries.
"""
payload = struct.pack(">HHII", stream_channel, block_id, first_packet_id, last_packet_id)
with self._lock:
self._send_cmd(FLAG_ACK, CMD_PACKETRESEND, payload)
# --- Heartbeat ---
def _heartbeat_loop(self) -> None:
"""Background daemon thread that keeps the GVCP session alive.
Reads the CCP register every 2 seconds. The read itself acts as the
heartbeat that prevents the camera from expiring the control-channel
privilege.
Also monitors the CCP value: if the control bit (bit 1) is cleared
(indicating that another application has taken over or the camera
reset the privilege), sets :attr:`_control_lost` to ``True`` so
callers can detect the loss of control.
Network and protocol errors are silently suppressed; the loop
continues until :attr:`_heartbeat_stop` is set by :meth:`disconnect`.
"""
while not self._heartbeat_stop.wait(2.0):
try:
with self._lock:
value = self._read_reg_raw(REG_CCP)
if (value & 0x02) == 0: # control bit cleared
self._control_lost = True
except (OSError, GVCPError):
pass