Source code for mw75_streamer.server.ws_server

"""
WebSocket Server for Remote MW75 Control

Provides a WebSocket server that allows third-party applications to:
- Connect/disconnect to MW75 device remotely
- Receive real-time EEG data
- Get status updates and logs
- Configure auto-reconnect behavior
"""

import asyncio
import json
import logging
import sys
import time
import uuid
from enum import Enum
from typing import TYPE_CHECKING, Any, Dict, Optional

# WebSocket imports with fallback
try:
    import websockets
    from websockets.legacy.server import WebSocketServerProtocol

    WEBSOCKETS_AVAILABLE = True
except ImportError:
    WEBSOCKETS_AVAILABLE = False
    if TYPE_CHECKING:
        from websockets.legacy.server import WebSocketServerProtocol
    else:
        WebSocketServerProtocol = Any  # type: ignore[misc, assignment]

from ..utils.logging import get_logger

# Platform-specific imports
# For type checking, always import the types; at runtime, only on macOS
if TYPE_CHECKING or sys.platform == "darwin":
    from Foundation import NSDate, NSRunLoop

    from ..data.packet_processor import EEGPacket, PacketProcessor
    from ..device.mw75_device import MW75Device
    from ..device.rfcomm_manager import RFCOMMManager

if sys.platform != "darwin":
    # At runtime on non-macOS, these will be None
    MW75Device = None  # type: ignore[assignment, misc]  # noqa: F811
    RFCOMMManager = None  # type: ignore[assignment, misc]  # noqa: F811
    PacketProcessor = None  # type: ignore[assignment, misc]  # noqa: F811
    EEGPacket = None  # type: ignore[assignment, misc]  # noqa: F811
    NSRunLoop = None  # noqa: F811
    NSDate = None  # noqa: F811

# Mock device support (cross-platform)
from ..device.mock_rfcomm_manager import MockRFCOMMManager


[docs] class DeviceState(Enum): """MW75 device connection states""" IDLE = "idle" CONNECTING = "connecting" CONNECTED = "connected" DISCONNECTING = "disconnecting" DISCONNECTED = "disconnected" ERROR = "error"
[docs] class WebSocketLogHandler(logging.Handler): """Custom logging handler that sends logs to all WebSocket clients""" def __init__(self, server: "MW75WebSocketServer"): super().__init__() self.server = server
[docs] def emit(self, record: logging.LogRecord) -> None: """Send log record to all WebSocket clients""" try: # Only send if we have connected clients and a log level is set if self.server.clients and self.server.client_log_level: # Check if this log record meets the client's level threshold client_level_value = getattr(logging, self.server.client_log_level) if record.levelno >= client_level_value: log_message = self.format(record) # Schedule coroutine to run in the event loop from logging thread if self.server._loop is not None: asyncio.run_coroutine_threadsafe( self.server._send_log( level=record.levelname, message=log_message, logger_name=record.name, ), self.server._loop, ) except Exception: # Silently ignore errors in log handler to avoid recursion pass
[docs] class MW75WebSocketServer: """WebSocket server for remote MW75 device control""" def __init__(self, host: str = "localhost", port: int = 8080, use_mock: bool = False): """ Initialize MW75 WebSocket server Args: host: Host to bind to port: Port to listen on use_mock: Use mock device for development (cross-platform) """ if not WEBSOCKETS_AVAILABLE: raise ImportError("websockets library not found. Install with: pip install websockets") if not use_mock and MW75Device is None: raise RuntimeError("MW75Device not available on this platform (macOS only)") self.use_mock = use_mock self.host = host self.port = port self.logger = get_logger(__name__) # Client management - support multiple clients self.clients: set[WebSocketServerProtocol] = set() self.controlling_client: Optional[WebSocketServerProtocol] = None self.client_lock = asyncio.Lock() self.client_log_level: Optional[str] = None self.client_heartbeats: dict[WebSocketServerProtocol, asyncio.Task] = {} # Device management self.device: Optional[MW75Device] = None self.device_state = DeviceState.IDLE self.device_address: Optional[str] = None self.packet_processor: Optional[PacketProcessor] = None # Auto-reconnect self.auto_reconnect_enabled = False self.reconnect_task: Optional[asyncio.Task] = None # Heartbeat self.heartbeat_interval = 30.0 # seconds # Data streaming health monitoring self.last_packet_time: Optional[float] = None self.data_timeout_task: Optional[asyncio.Task] = None # Device connection task self.device_connection_task: Optional[asyncio.Task] = None # Logging handler self.log_handler: Optional[WebSocketLogHandler] = None # Server state self._server: Optional[Any] = None # Event loop reference for thread-safe task scheduling self._loop: Optional[asyncio.AbstractEventLoop] = None
[docs] async def start(self) -> None: """Start the WebSocket server""" print("=" * 80) print("MW75 WebSocket Server - Remote Control Mode") print("=" * 80) print(f"Starting server on ws://{self.host}:{self.port}") print("Waiting for client connection...") print("Commands: connect, disconnect, status") print("Press Ctrl+C to stop server") print("=" * 80) # Store event loop reference for thread-safe task scheduling self._loop = asyncio.get_running_loop() try: # fmt: off async with websockets.serve(self._handle_client, self.host, self.port) as server: # type: ignore[arg-type] # fmt: on self._server = server print(f"Server ready! Listening on ws://{self.host}:{self.port}") print() # Wait forever - Ctrl+C will raise KeyboardInterrupt naturally await asyncio.Future() finally: await self._shutdown()
async def _shutdown(self) -> None: """Clean shutdown of server and all connections""" print("Shutting down server...") # Disconnect all connected clients clients_to_cleanup = list(self.clients) for client in clients_to_cleanup: try: await self._cleanup_client(client) except Exception as e: print(f"Error during client cleanup: {e}") print("Server shutdown complete") async def _handle_client(self, websocket: WebSocketServerProtocol) -> None: """Handle incoming WebSocket client connection""" client_address = f"{websocket.remote_address[0]}:{websocket.remote_address[1]}" # Accept all clients - no rejection for multiple connections async with self.client_lock: self.clients.add(websocket) print(f"Client connected from {client_address} (Total clients: {len(self.clients)})") self.logger.info(f"Client connected: {client_address}") try: # Send welcome status to this specific client await self._send_to_client( websocket, msg_type="status", data={ "state": self.device_state.value, "message": "Client connected to MW75 server", "timestamp": time.time(), "battery_level": self._get_battery_level(), "device_address": self.device_address, }, ) # Start per-client heartbeat heartbeat_task = asyncio.create_task(self._heartbeat_loop(websocket)) self.client_heartbeats[websocket] = heartbeat_task # Handle messages from client async for message in websocket: try: # Handle both str and bytes messages if isinstance(message, bytes): message_str = message.decode("utf-8") else: message_str = message await self._process_client_message(message_str, websocket) except Exception as e: self.logger.error(f"Error processing message: {e}") await self._send_to_client( websocket, msg_type="error", data={ "message": str(e), "code": "MESSAGE_PROCESSING_ERROR", "timestamp": time.time(), }, ) except websockets.exceptions.ConnectionClosed: print(f"Client disconnected: {client_address}") self.logger.info(f"Client disconnected: {client_address}") except Exception as e: print(f"Error handling client {client_address}: {e}") self.logger.error(f"Error handling client {client_address}: {e}") finally: # Clean up client connection await self._cleanup_client(websocket) @staticmethod async def _cancel_task(task: Optional[asyncio.Task]) -> None: """Cancel an asyncio task gracefully""" if task and not task.done(): task.cancel() try: await task except asyncio.CancelledError: pass async def _cleanup_client(self, websocket: WebSocketServerProtocol) -> None: """Clean up individual client connection and associated resources""" async with self.client_lock: # Remove client from set if websocket in self.clients: self.clients.discard(websocket) self.logger.info(f"Client removed. Remaining clients: {len(self.clients)}") # Cancel this client's heartbeat if websocket in self.client_heartbeats: await self._cancel_task(self.client_heartbeats[websocket]) del self.client_heartbeats[websocket] # Check if this client was controlling the device was_controlling = websocket == self.controlling_client if was_controlling: self.controlling_client = None self.logger.info("Controlling client disconnected - control released") # Notify remaining clients that control is available if self.clients: await self._broadcast_message( msg_type="status", data={ "state": self.device_state.value, "message": "Device control released - available for new controller", "timestamp": time.time(), "battery_level": self._get_battery_level(), "device_address": self.device_address, }, ) # Only disconnect device if no clients remain if not self.clients: self.logger.info("No clients remaining - cleaning up device resources") # Stop auto-reconnect self.auto_reconnect_enabled = False await self._cancel_task(self.reconnect_task) self.reconnect_task = None # Stop data timeout monitoring await self._cancel_task(self.data_timeout_task) self.data_timeout_task = None self.last_packet_time = None # Stop RFCOMM streaming if active (before cancelling task) if self.device and self.device.rfcomm_manager: self.device.rfcomm_manager.stop() await asyncio.sleep(0.1) # Let the streaming loop exit gracefully # Cancel device connection task (its finally block will handle cleanup) await self._cancel_task(self.device_connection_task) self.device_connection_task = None # If device wasn't cleaned up by the task (shouldn't happen), clean up now if self.device: self.logger.warning("Device not cleaned up by connection task, cleaning up now") try: await self.device.cleanup() except Exception as e: self.logger.error(f"Error during fallback device cleanup: {e}") self.device = None self.packet_processor = None self.device_address = None self.device_state = DeviceState.IDLE # Remove logging handler if self.log_handler: top_logger = logging.getLogger("mw75_streamer") top_logger.removeHandler(self.log_handler) self.log_handler = None self.client_log_level = None self.logger.info("All clients disconnected - cleanup complete") async def _process_client_message( self, message: str, websocket: WebSocketServerProtocol ) -> None: """Process incoming message from client""" try: data = json.loads(message) except json.JSONDecodeError: await self._send_to_client( websocket, msg_type="error", data={ "message": "Invalid JSON", "code": "INVALID_JSON", "timestamp": time.time(), }, ) return # Validate message structure if not isinstance(data, dict): await self._send_to_client( websocket, msg_type="error", data={ "message": "Message must be a JSON object", "code": "INVALID_MESSAGE", "timestamp": time.time(), }, ) return msg_id = data.get("id") msg_type = data.get("type") msg_data = data.get("data", {}) if not msg_type: await self._send_to_client( websocket, msg_type="error", data={ "message": "Missing 'type' field in message", "code": "MISSING_TYPE", "timestamp": time.time(), }, request_id=msg_id, ) return # Route command if msg_type == "connect": await self._handle_connect_command(msg_data, msg_id, websocket) elif msg_type == "disconnect": await self._handle_disconnect_command(msg_data, msg_id, websocket) elif msg_type == "status": await self._handle_status_command(msg_data, msg_id, websocket) elif msg_type == "ping": await self._handle_ping_command(msg_id, websocket) elif msg_type == "broadcast": await self._handle_broadcast_command(msg_data, msg_id, websocket) else: await self._send_to_client( websocket, msg_type="error", data={ "message": f"Unknown command type: {msg_type}", "code": "UNKNOWN_COMMAND", "timestamp": time.time(), }, request_id=msg_id, ) async def _handle_connect_command( self, data: Dict[str, Any], request_id: Optional[str], websocket: WebSocketServerProtocol, ) -> None: """Handle connect command from client""" # Check if another client has control if self.controlling_client is not None and self.controlling_client != websocket: await self._send_to_client( websocket, msg_type="error", data={ "message": "Another client currently has device control", "code": "DEVICE_CONTROL_TAKEN", "timestamp": time.time(), }, request_id=request_id, ) return # Check current state if self.device_state in [DeviceState.CONNECTED, DeviceState.CONNECTING]: await self._send_to_client( websocket, msg_type="error", data={ "message": f"Already {self.device_state.value}", "code": "ALREADY_CONNECTED", "timestamp": time.time(), }, request_id=request_id, ) return # Extract parameters self.auto_reconnect_enabled = data.get("auto_reconnect", False) log_level = data.get("log_level", "ERROR").upper() # Validate log level if log_level not in ["DEBUG", "INFO", "WARNING", "ERROR"]: await self._send_to_client( websocket, msg_type="error", data={ "message": f"Invalid log_level: {log_level}. Must be DEBUG, INFO, WARNING, or ERROR", "code": "INVALID_LOG_LEVEL", "timestamp": time.time(), }, request_id=request_id, ) return # Grant control to this client self.controlling_client = websocket self.client_log_level = log_level # Set up logging handler (remove old one if exists, to support log level changes) if self.log_handler: top_logger = logging.getLogger("mw75_streamer") top_logger.removeHandler(self.log_handler) self.log_handler = WebSocketLogHandler(self) self.log_handler.setLevel(getattr(logging, log_level)) formatter = logging.Formatter( "%(asctime)s - %(name)s - %(levelname)s - %(message)s", datefmt="%H:%M:%S" ) self.log_handler.setFormatter(formatter) top_logger = logging.getLogger("mw75_streamer") top_logger.addHandler(self.log_handler) # Send confirmation to requesting client await self._send_to_client( websocket, msg_type="command_ack", data={ "command": "connect", "auto_reconnect": self.auto_reconnect_enabled, "log_level": log_level, "message": "Connect command received, initiating device connection", }, request_id=request_id, ) # Start device connection await self._connect_device() async def _handle_disconnect_command( self, _data: Dict[str, Any], request_id: Optional[str], websocket: WebSocketServerProtocol, ) -> None: """Handle disconnect command from client""" # Check if this client has control if self.controlling_client is not None and self.controlling_client != websocket: await self._send_to_client( websocket, msg_type="error", data={ "message": "Another client currently has device control", "code": "DEVICE_CONTROL_TAKEN", "timestamp": time.time(), }, request_id=request_id, ) return # Disable auto-reconnect self.auto_reconnect_enabled = False await self._cancel_task(self.reconnect_task) self.reconnect_task = None # Send confirmation to requesting client await self._send_to_client( websocket, msg_type="command_ack", data={"command": "disconnect", "message": "Disconnect command received"}, request_id=request_id, ) # Disconnect device if self.device and self.device_state in [ DeviceState.CONNECTED, DeviceState.CONNECTING, ]: await self._disconnect_device() else: await self._broadcast_message( msg_type="status", data={ "state": DeviceState.IDLE.value, "message": "No active device connection", "timestamp": time.time(), "battery_level": self._get_battery_level(), "device_address": self.device_address, }, ) async def _handle_status_command( self, _data: Dict[str, Any], request_id: Optional[str], websocket: WebSocketServerProtocol, ) -> None: """Handle status command from client""" status_info = { "device_state": self.device_state.value, "auto_reconnect": self.auto_reconnect_enabled, "log_level": self.client_log_level or "ERROR", "battery_level": self._get_battery_level(), "device_address": self.device_address, "has_control": (websocket == self.controlling_client), "total_clients": len(self.clients), } await self._send_to_client( websocket, msg_type="status", data=status_info, request_id=request_id, ) async def _handle_ping_command( self, request_id: Optional[str], websocket: WebSocketServerProtocol ) -> None: """Handle ping command from client""" await self._send_to_client( websocket, msg_type="pong", data={"timestamp": time.time()}, request_id=request_id, ) async def _handle_broadcast_command( self, data: Dict[str, Any], request_id: Optional[str], websocket: WebSocketServerProtocol, ) -> None: """Handle broadcast message from client - forward to all other clients""" # Get client address for identification client_address = f"{websocket.remote_address[0]}:{websocket.remote_address[1]}" # Add sender information to the broadcast data broadcast_data = { "from": client_address, "data": data, "timestamp": time.time(), } # Broadcast to all clients except the sender await self._broadcast_message( msg_type="broadcast", data=broadcast_data, request_id=request_id, exclude=websocket, ) # Send acknowledgement to sender await self._send_to_client( websocket, msg_type="command_ack", data={ "command": "broadcast", "message": f"Message broadcast to {len(self.clients) - 1} other client(s)", "recipients": len(self.clients) - 1, }, request_id=request_id, ) async def _connect_device(self) -> None: """Connect to MW75 device""" if self.device_state in [DeviceState.CONNECTED, DeviceState.CONNECTING]: return try: self.device_state = DeviceState.CONNECTING await self._send_status( state=DeviceState.CONNECTING.value, message="Connecting to MW75 device...", ) # Initialize packet processor self.packet_processor = PacketProcessor(verbose=False) if self.use_mock: # Use mock device - skip BLE, create device with mock RFCOMM manager # We still need PacketProcessor for data parsing self.logger.info("Using mock MW75 device (no hardware required)") # Start mock device connection in background self.device_connection_task = asyncio.create_task( self._mock_device_connection_task() ) else: # Use real device - Disable signal handler in device - we handle Ctrl+C at server level self.device = MW75Device(self._handle_device_data, setup_signal_handler=False) # Start device connection in background self.device_connection_task = asyncio.create_task(self._device_connection_task()) except Exception as e: self.logger.error(f"Error initiating device connection: {e}") self.device_state = DeviceState.ERROR await self._broadcast_message( msg_type="error", data={ "message": f"Failed to connect to device: {e}", "code": "CONNECTION_FAILED", "timestamp": time.time(), }, ) async def _device_connection_task(self) -> None: """Background task for device connection""" connection_successful = False try: # Ensure device is initialized assert self.device is not None, "Device must be initialized before connection task" # Start BLE activation print("Discovering MW75 device via BLE...") self.logger.info("Starting BLE discovery and activation...") device_name = await self.device.ble_manager.discover_and_activate() if not device_name: print("MW75 device not found") self.logger.error("BLE activation failed") self.device_state = DeviceState.ERROR await self._broadcast_message( msg_type="error", data={ "message": "MW75 device not found or BLE activation failed", "code": "BLE_ACTIVATION_FAILED", "timestamp": time.time(), }, ) if self.auto_reconnect_enabled: await self._start_reconnect_loop() return print(f"MW75 device found: {device_name}") # Disconnect BLE before RFCOMM (macOS Taho compatibility) print("Activating EEG mode...") self.logger.info("Disconnecting BLE (required for RFCOMM on macOS Taho)...") await self.device.ble_manager.disconnect_after_activation() await asyncio.sleep(0.5) # Establish RFCOMM connection print("Establishing data connection (RFCOMM)...") self.logger.info("Establishing RFCOMM connection...") self.device.rfcomm_manager = RFCOMMManager(device_name, self.device.data_callback) if not self.device.rfcomm_manager.connect(): print("RFCOMM connection failed") self.logger.error("RFCOMM connection failed") self.device_state = DeviceState.ERROR await self._broadcast_message( msg_type="error", data={ "message": "RFCOMM connection failed", "code": "RFCOMM_CONNECTION_FAILED", "timestamp": time.time(), }, ) if self.auto_reconnect_enabled: await self._start_reconnect_loop() return # Connection successful - update state and notify connection_successful = True self.device_state = DeviceState.CONNECTED # Get device address from RFCOMM manager self.device_address = self.device.rfcomm_manager.device_address await self._send_status( state=DeviceState.CONNECTED.value, message="Successfully connected to MW75 device, streaming EEG data", ) print(f"Successfully connected to MW75 device! (MAC: {self.device_address})") print("Streaming..") # Start data timeout monitoring self.last_packet_time = None self.data_timeout_task = asyncio.create_task(self._data_timeout_monitor()) self.logger.debug("Data timeout monitoring task started") # Run RFCOMM event loop interleaved with asyncio # NSRunLoop MUST be on main thread for delegates to work self.logger.info("Starting data streaming loop (interleaved with asyncio)...") await self._run_rfcomm_streaming() except Exception as e: self.logger.error(f"Device connection error: {e}") self.device_state = DeviceState.ERROR await self._broadcast_message( msg_type="error", data={ "message": f"Device error: {e}", "code": "DEVICE_ERROR", "timestamp": time.time(), }, ) # Clean up device on error if self.device: try: await self.device.cleanup() self.device = None self.packet_processor = None self.device_address = None except Exception as cleanup_error: self.logger.error(f"Error during cleanup after device error: {cleanup_error}") # Still clear references even on error self.device = None self.packet_processor = None self.device_address = None # Start auto-reconnect if enabled if self.auto_reconnect_enabled: await self._start_reconnect_loop() finally: # Stop data timeout monitoring if self.data_timeout_task and not self.data_timeout_task.done(): await self._cancel_task(self.data_timeout_task) self.data_timeout_task = None self.last_packet_time = None self.logger.debug("Data timeout monitoring task stopped") # Connection ended - clean up device resources if connection_successful: print("Device streaming ended") self.device_state = DeviceState.DISCONNECTED await self._send_status( state=DeviceState.DISCONNECTED.value, message="Device connection closed", ) # Clean up device to properly reset state for next connection if self.device: try: await self.device.cleanup() self.device = None self.packet_processor = None self.device_address = None except Exception as cleanup_error: self.logger.error(f"Error during device cleanup: {cleanup_error}") # Still clear references even on error self.device = None self.packet_processor = None self.device_address = None # Start auto-reconnect if enabled if self.auto_reconnect_enabled: await self._start_reconnect_loop() async def _mock_device_connection_task(self) -> None: """Background task for mock device connection (simplified flow)""" connection_successful = False mock_rfcomm_manager: Optional[MockRFCOMMManager] = None try: # Skip BLE entirely for mock device print("Connecting to mock MW75 device...") self.logger.info("Using mock RFCOMM manager (no BLE required)") # Create mock RFCOMM manager mock_rfcomm_manager = MockRFCOMMManager("MW75-MOCK", self._handle_device_data) if not mock_rfcomm_manager.connect(): print("Mock RFCOMM connection failed") self.logger.error("Mock RFCOMM connection failed") self.device_state = DeviceState.ERROR await self._broadcast_message( msg_type="error", data={ "message": "Mock RFCOMM connection failed", "code": "MOCK_CONNECTION_FAILED", "timestamp": time.time(), }, ) return # Connection successful connection_successful = True self.device_state = DeviceState.CONNECTED self.device_address = mock_rfcomm_manager.device_address await self._send_status( state=DeviceState.CONNECTED.value, message="Successfully connected to mock MW75 device, streaming synthetic EEG data", ) print("Successfully connected to mock MW75 device!") print("Streaming synthetic data at ~500Hz...") # Start data timeout monitoring self.last_packet_time = None self.data_timeout_task = asyncio.create_task(self._data_timeout_monitor()) self.logger.debug("Data timeout monitoring task started") # Run mock streaming loop (blocking call runs in thread) self.logger.info("Starting mock data streaming loop...") await asyncio.get_event_loop().run_in_executor( None, mock_rfcomm_manager.run_until_stopped ) except Exception as e: self.logger.error(f"Mock device connection error: {e}") self.device_state = DeviceState.ERROR await self._broadcast_message( msg_type="error", data={ "message": f"Mock device error: {e}", "code": "MOCK_DEVICE_ERROR", "timestamp": time.time(), }, ) finally: # Stop data timeout monitoring if self.data_timeout_task and not self.data_timeout_task.done(): await self._cancel_task(self.data_timeout_task) self.data_timeout_task = None self.last_packet_time = None self.logger.debug("Data timeout monitoring task stopped") # Clean up mock manager if mock_rfcomm_manager: try: mock_rfcomm_manager.close() except Exception as cleanup_error: self.logger.error(f"Error closing mock RFCOMM: {cleanup_error}") # Connection ended if connection_successful: print("Mock device streaming ended") self.device_state = DeviceState.DISCONNECTED await self._send_status( state=DeviceState.DISCONNECTED.value, message="Mock device connection closed", ) # Clear packet processor and device address self.packet_processor = None self.device_address = None async def _run_rfcomm_streaming(self) -> None: """ Run RFCOMM streaming loop interleaved with asyncio NSRunLoop must run on main thread for delegates to fire, so we run it in small chunks and yield to asyncio between each chunk. """ if not self.device or not self.device.rfcomm_manager: return runloop = NSRunLoop.currentRunLoop() while not self.device.rfcomm_manager.should_stop: # Run NSRunLoop for 1ms to process RFCOMM events runloop.runUntilDate_(NSDate.dateWithTimeIntervalSinceNow_(0.001)) # Yield to asyncio event loop await asyncio.sleep(0) async def _disconnect_device(self) -> None: """Disconnect from MW75 device""" if not self.device: self.device_state = DeviceState.IDLE return try: self.device_state = DeviceState.DISCONNECTING await self._send_status( state=DeviceState.DISCONNECTING.value, message="Disconnecting from MW75 device...", ) print("Disconnecting from MW75 device...") # Stop RFCOMM streaming loop if self.device.rfcomm_manager: self.device.rfcomm_manager.stop() # Give it a moment to process the stop await asyncio.sleep(0.1) await self.device.cleanup() self.device = None self.packet_processor = None self.device_address = None self.device_state = DeviceState.IDLE await self._send_status( state=DeviceState.IDLE.value, message="Disconnected from MW75 device" ) print("Disconnected from MW75 device") except Exception as e: self.logger.error(f"Error disconnecting device: {e}") self.device_state = DeviceState.ERROR await self._broadcast_message( msg_type="error", data={ "message": f"Disconnect error: {e}", "code": "DISCONNECT_ERROR", "timestamp": time.time(), }, ) async def _start_reconnect_loop(self) -> None: """Start auto-reconnect loop""" if not self.auto_reconnect_enabled: return if self.reconnect_task and not self.reconnect_task.done(): return # Already reconnecting self.reconnect_task = asyncio.create_task(self._auto_reconnect_loop()) async def _auto_reconnect_loop(self) -> None: """Auto-reconnect loop with exponential backoff""" attempt = 0 max_attempts = 10 while self.auto_reconnect_enabled and attempt < max_attempts: # Calculate backoff delay (exponential with max 30s) delay = min(2**attempt, 30) attempt += 1 await self._send_status( state="reconnecting", message=f"Auto-reconnect attempt {attempt}/{max_attempts} in {delay}s...", ) await asyncio.sleep(delay) if not self.auto_reconnect_enabled: break # Attempt reconnection try: await self._connect_device() # Wait for connection to establish await asyncio.sleep(2) if self.device_state == DeviceState.CONNECTED: await self._send_status( state=DeviceState.CONNECTED.value, message=f"Auto-reconnect successful after {attempt} attempts", ) return # Success! except Exception as e: self.logger.error(f"Auto-reconnect attempt {attempt} failed: {e}") await self._broadcast_message( msg_type="error", data={ "message": f"Reconnect attempt {attempt} failed: {e}", "code": "RECONNECT_FAILED", "timestamp": time.time(), }, ) # Max attempts reached if self.auto_reconnect_enabled and attempt >= max_attempts: await self._broadcast_message( msg_type="error", data={ "message": f"Auto-reconnect failed after {max_attempts} attempts", "code": "RECONNECT_EXHAUSTED", "timestamp": time.time(), }, ) self.auto_reconnect_enabled = False def _handle_device_data(self, data: bytes) -> None: """Handle raw data received from MW75 device""" if not self.packet_processor: return try: # Process data into packets self.packet_processor.process_data_buffer( data, self._handle_eeg_packet, self._handle_other_event ) except Exception as e: self.logger.error(f"Error processing device data: {e}") def _handle_eeg_packet(self, packet: EEGPacket) -> None: """Handle processed EEG packet (called from worker thread)""" if not self.clients: return try: # Update last packet timestamp for health monitoring self.last_packet_time = time.time() # Schedule coroutine to run in the event loop from worker thread if self._loop is not None: asyncio.run_coroutine_threadsafe(self._send_eeg_data(packet), self._loop) except Exception as e: self.logger.error(f"Error handling EEG packet: {e}") def _handle_other_event(self, packet: bytes) -> None: """ Handle non-EEG events Currently not forwarding other events to client. TODO: Consider forwarding device status events if needed in the future. """ pass async def _heartbeat_loop(self, websocket: WebSocketServerProtocol) -> None: """Heartbeat loop to keep connection alive for a specific client""" try: while True: await asyncio.sleep(self.heartbeat_interval) # Send heartbeat ping to this specific client # Include battery level and device address for periodic updates await self._send_to_client( websocket, msg_type="heartbeat", data={ "timestamp": time.time(), "battery_level": self._get_battery_level(), "device_address": self.device_address, }, ) except asyncio.CancelledError: pass except Exception as e: self.logger.error(f"Heartbeat error: {e}") async def _data_timeout_monitor(self) -> None: """ Monitor data stream health and detect device disconnection Checks periodically if data packets are still arriving from the device. If no packets received within DATA_PACKET_TIMEOUT, assumes device is disconnected (e.g., powered off) and triggers cleanup/reconnect. """ from ..config import DATA_PACKET_TIMEOUT, DATA_TIMEOUT_CHECK_INTERVAL try: # Wait for first packet to arrive before starting monitoring while self.last_packet_time is None: await asyncio.sleep(DATA_TIMEOUT_CHECK_INTERVAL) # Exit if device state changed if self.device_state != DeviceState.CONNECTED: self.logger.debug("Data timeout monitor exiting - device not connected") return self.logger.debug(f"Data timeout monitoring started (timeout={DATA_PACKET_TIMEOUT}s)") # Monitor loop while self.device_state == DeviceState.CONNECTED: await asyncio.sleep(DATA_TIMEOUT_CHECK_INTERVAL) # Calculate time since last packet if self.last_packet_time is not None: time_since_last_packet = time.time() - self.last_packet_time # Check for timeout if time_since_last_packet > DATA_PACKET_TIMEOUT: self.logger.warning( f"Data timeout detected: {time_since_last_packet:.1f}s since last packet " f"(threshold: {DATA_PACKET_TIMEOUT}s)" ) # Also check RFCOMM state as secondary signal rfcomm_still_connected = ( self.device and self.device.rfcomm_manager and self.device.rfcomm_manager.connected ) self.logger.info( f"Device appears disconnected (RFCOMM state: {rfcomm_still_connected})" ) # Notify clients of timeout await self._broadcast_message( msg_type="error", data={ "message": f"Device data stream timeout ({time_since_last_packet:.1f}s without data)", "code": "DATA_STREAM_TIMEOUT", "timestamp": time.time(), }, ) # Trigger disconnect and cleanup await self._handle_data_timeout() return except asyncio.CancelledError: self.logger.debug("Data timeout monitor cancelled") except Exception as e: self.logger.error(f"Error in data timeout monitor: {e}") async def _handle_data_timeout(self) -> None: """ Handle data stream timeout (device likely powered off) Performs cleanup and triggers auto-reconnect if enabled. """ if self.device_state not in [DeviceState.CONNECTED, DeviceState.CONNECTING]: return try: self.logger.info("Handling data timeout - initiating cleanup") self.device_state = DeviceState.DISCONNECTING await self._send_status( state=DeviceState.DISCONNECTING.value, message="Device connection lost (data timeout detected)", ) # Stop RFCOMM streaming loop if self.device and self.device.rfcomm_manager: self.device.rfcomm_manager.stop() await asyncio.sleep(0.1) # Clean up device resources if self.device: try: await self.device.cleanup() except Exception as cleanup_error: self.logger.error(f"Error during timeout cleanup: {cleanup_error}") finally: self.device = None self.packet_processor = None self.device_address = None self.last_packet_time = None self.device_state = DeviceState.DISCONNECTED await self._send_status( state=DeviceState.DISCONNECTED.value, message="Device disconnected due to data timeout", ) # Trigger auto-reconnect if enabled if self.auto_reconnect_enabled: self.logger.info("Auto-reconnect enabled - starting reconnect loop") await self._start_reconnect_loop() except Exception as e: self.logger.error(f"Error handling data timeout: {e}") self.device_state = DeviceState.ERROR await self._broadcast_message( msg_type="error", data={ "message": f"Error during timeout handling: {e}", "code": "TIMEOUT_HANDLER_ERROR", "timestamp": time.time(), }, ) async def _send_to_client( self, websocket: WebSocketServerProtocol, msg_type: str, data: Dict[str, Any], request_id: Optional[str] = None, ) -> None: """Send message to a specific client""" try: message = { "id": request_id if request_id else str(uuid.uuid4()), "type": msg_type, "data": data, } await websocket.send(json.dumps(message)) except websockets.exceptions.ConnectionClosed as e: self.logger.warning(f"Client connection closed during send: {e}") # Clean up the dead client try: await self._cleanup_client(websocket) except Exception as cleanup_error: self.logger.error(f"Error cleaning up dead client: {cleanup_error}") except Exception as e: self.logger.error(f"Error sending message to client: {e}") async def _broadcast_message( self, msg_type: str, data: Dict[str, Any], request_id: Optional[str] = None, exclude: Optional[WebSocketServerProtocol] = None, ) -> None: """Broadcast message to all connected clients, optionally excluding one""" message = { "id": request_id if request_id else str(uuid.uuid4()), "type": msg_type, "data": data, } message_json = json.dumps(message) # Collect dead clients for cleanup dead_clients = [] # Send to all clients except the excluded one for client in list(self.clients): if exclude and client == exclude: continue try: await client.send(message_json) except websockets.exceptions.ConnectionClosed as e: self.logger.warning(f"Client connection closed during broadcast: {e}") dead_clients.append(client) except Exception as e: self.logger.error(f"Error broadcasting to client: {e}") dead_clients.append(client) # Clean up dead clients after broadcast loop for dead_client in dead_clients: try: await self._cleanup_client(dead_client) except Exception as e: self.logger.error(f"Error cleaning up dead client: {e}") def _get_battery_level(self) -> Optional[int]: """Get current battery level from device""" if self.use_mock: return 84 if self.device and self.device.ble_manager: return self.device.ble_manager.battery_level return None async def _send_status(self, state: str, message: str) -> None: """Send status update to all clients""" await self._broadcast_message( msg_type="status", data={ "state": state, "message": message, "timestamp": time.time(), "battery_level": self._get_battery_level(), "device_address": self.device_address, }, ) async def _send_log(self, level: str, message: str, logger_name: str) -> None: """Send log message to all clients""" await self._broadcast_message( msg_type="log", data={ "level": level, "message": message, "logger": logger_name, "timestamp": time.time(), }, ) async def _send_eeg_data(self, packet: EEGPacket) -> None: """Send EEG data packet to all clients""" eeg_data = { "timestamp": packet.timestamp, "event_id": packet.event_id, "counter": packet.counter, "ref": packet.ref, "drl": packet.drl, "channels": {f"ch{i + 1}": packet.channels[i] for i in range(len(packet.channels))}, "feature_status": packet.feature_status, } await self._broadcast_message(msg_type="eeg_data", data=eeg_data)