Source code for mw75_streamer.main

"""
MW75 EEG Streamer - Main Entry Point

Clean main function and CLI interface for the MW75 EEG streamer.
"""

import argparse
import asyncio
import logging
import os
import sys
import time
import webbrowser
from logging import Logger
from typing import TYPE_CHECKING, Any, Callable, List, Optional

from .data.packet_processor import PacketProcessor
from .utils.logging import get_logger, setup_logging

# Type checking imports
if TYPE_CHECKING:
    # Import the actual type for MyPy, even if it might not work at runtime
    try:
        from .device.mw75_device import MW75Device
    except ImportError:
        # On platforms where PyObjC dependencies aren't available
        MW75Device = Any  # type: ignore

# Runtime platform detection
if sys.platform == "darwin":
    from .device.mw75_device import MW75Device as _MW75Device  # noqa: F401
else:
    _MW75Device = None

# Mock device support (cross-platform)
from .data.packet_processor import EEGPacket
from .data.streamers import CSVWriter, LSLStreamer, StdoutStreamer, WebSocketStreamer
from .device.mock_rfcomm_manager import MockRFCOMMManager
from .panel.panel_server import PanelServer, WebSocketLogHandler


[docs] class MW75Streamer: """Main MW75 EEG streamer application""" device: "MW75Device" def __init__( self, csv_file: Optional[str] = None, extra_file: Optional[str] = None, websocket_url: Optional[str] = None, lsl_stream_name: Optional[str] = None, panel_server: Optional[PanelServer] = None, verbose: Optional[bool] = False, eeg_callback: Optional[Callable[[EEGPacket], None]] = None, raw_data_callback: Optional[Callable[[bytes], None]] = None, other_event_callback: Optional[Callable[[bytes], None]] = None, use_mock: bool = False, ): """ Initialize MW75 streamer Args: csv_file: Path for EEG CSV output file extra_file: Path for other events CSV output file websocket_url: WebSocket URL for real-time streaming lsl_stream_name: LSL stream name for LSL streaming panel_server: Panel server for browser dashboard verbose: Enable verbose logging eeg_callback: Custom callback function for EEG packets (receives EEGPacket objects) raw_data_callback: Custom callback function for raw device data (receives bytes) other_event_callback: Custom callback function for non-EEG events (receives bytes) use_mock: Use mock device for development (cross-platform) """ # Store custom callbacks self.eeg_callback = eeg_callback self.raw_data_callback = raw_data_callback self.other_event_callback = other_event_callback self.csv_writer = CSVWriter(csv_file, extra_file) self.websocket_streamer = WebSocketStreamer(websocket_url) self.verbose = verbose # Initialize LSL streamer with error handling self.lsl_streamer = None if lsl_stream_name: try: self.lsl_streamer = LSLStreamer(lsl_stream_name) except ImportError as e: self.logger = get_logger(__name__) self.logger.error(f"Failed to initialize LSL streamer: {e}") sys.exit(1) # Suppress stdout printing when browser panel is used or custom callback is provided if panel_server is not None: self.stdout_streamer = None else: self.stdout_streamer = StdoutStreamer( print_header=( not csv_file and not websocket_url and not lsl_stream_name and not eeg_callback ) ) self.packet_processor = PacketProcessor(self.verbose or False) self.logger = get_logger(__name__) # Panel server relay and runtime stats self.panel_server = panel_server self.last_counter: Optional[int] = None self.dropped_packets: int = 0 self._rate_times: List[float] = [] self._last_stats_emit: float = 0.0 # Store mock flag self.use_mock = use_mock self.mock_rfcomm_manager: Optional[MockRFCOMMManager] = None # Initialize device with data callback if use_mock: # Mock mode - no device needed, we'll create mock manager directly self.device = None # type: ignore self.logger.info("Using mock MW75 device (no hardware required)") else: if _MW75Device is None: raise RuntimeError("MW75Device not available on this platform") self.device = _MW75Device(self._handle_device_data)
[docs] def set_verbose(self, verbose: bool) -> None: """ Enable or disable verbose logging including checksum error messages Args: verbose: True to enable verbose logging, False to suppress it """ self.verbose = verbose self.packet_processor.verbose = verbose self.logger.debug(f"Verbose logging {'enabled' if verbose else 'disabled'}")
def _handle_device_data(self, data: bytes) -> None: """Handle raw data received from MW75 device""" # Call raw data callback if provided if self.raw_data_callback: try: self.raw_data_callback(data) except Exception as e: self.logger.error(f"Error in raw data callback: {e}") # Process the data into packets self.packet_processor.process_data_buffer( data, self._handle_eeg_packet, self._handle_other_event ) def _handle_eeg_packet(self, packet: EEGPacket) -> None: """Handle processed EEG packet""" # Call user's custom EEG callback if provided if self.eeg_callback: try: self.eeg_callback(packet) except Exception as e: self.logger.error(f"Error in EEG callback: {e}") # Sequence tracking for dropped packets try: if self.last_counter is not None: expected = (self.last_counter + 1) % 256 if packet.counter != expected: dropped = (packet.counter - expected) % 256 self.dropped_packets += dropped self.last_counter = packet.counter except Exception: pass # Write to CSV file if specified if self.csv_writer.eeg_file: self.csv_writer.write_eeg_packet(packet) elif ( self.stdout_streamer and not self.websocket_streamer.connected and not self.lsl_streamer and not self.eeg_callback ): # Write to stdout if no other outputs self.stdout_streamer.write_eeg_packet(packet) # Send to WebSocket if connected self.websocket_streamer.send_eeg_data(packet) # Send to LSL if configured if self.lsl_streamer: self.lsl_streamer.send_eeg_data(packet) # Debug logging self.logger.debug( f"EEG Packet: counter={packet.counter}, channels={len(packet.channels)}, checksum=OK" ) # Relay to panel if clients are connected if self.panel_server: # Lightweight rate tracking (packets per 5s window) now = time.time() self._rate_times.append(now) five_sec_ago = now - 5.0 self._rate_times = [t for t in self._rate_times if t > five_sec_ago] # Publish EEG self.panel_server.publish_eeg( { "timestamp": packet.timestamp, "event_id": packet.event_id, "counter": packet.counter, "ref": packet.ref, "drl": packet.drl, "channels": { f"ch{i + 1}": packet.channels[i] for i in range(len(packet.channels)) }, "feature_status": packet.feature_status, } ) # Throttle stats emission to 5 Hz if now - self._last_stats_emit > 0.2: stats = self.packet_processor.get_final_stats() current_rate = len(self._rate_times) / 5.0 self.panel_server.publish_stats( { "total_packets": stats.total_packets, "invalid_packets": stats.invalid_packets, "valid_packets": stats.valid_packets, "error_rate": stats.error_rate, "dropped_packets": self.dropped_packets, "rate": current_rate, } ) self._last_stats_emit = now def _handle_other_event(self, packet: bytes) -> None: """Handle non-EEG events""" # Call user's custom other event callback if provided if self.other_event_callback: try: self.other_event_callback(packet) except Exception as e: self.logger.error(f"Error in other event callback: {e}") self.csv_writer.write_other_event(packet) event_id = packet[1] if len(packet) > 1 else 0 counter = packet[3] if len(packet) > 3 else 0 self.logger.debug(f"Other Event: event_id={event_id}, counter={counter}")
[docs] async def start_streaming(self) -> bool: """ Start the MW75 streaming process Returns: True if streaming completed successfully, False otherwise """ try: self.logger.info("MW75 EEG Streamer - Starting...") if self.use_mock: # Mock mode - create and run mock RFCOMM manager success = await self._start_mock_streaming() else: # Real device mode success = await self.device.connect_and_stream() return bool(success) except Exception as e: self.logger.error(f"Error during streaming: {e}") return False finally: await self._cleanup()
async def _start_mock_streaming(self) -> bool: """Start mock device streaming""" try: self.logger.info("Connecting to mock MW75 device...") # Create mock RFCOMM manager self.mock_rfcomm_manager = MockRFCOMMManager("MW75-MOCK", self._handle_device_data) if not self.mock_rfcomm_manager.connect(): self.logger.error("Mock RFCOMM connection failed") return False self.logger.info("Mock device connected - streaming synthetic data at ~500Hz") # Run mock streaming loop (blocking call, runs in executor) await asyncio.get_event_loop().run_in_executor( None, self.mock_rfcomm_manager.run_until_stopped ) return True except Exception as e: self.logger.error(f"Mock streaming error: {e}") return False async def _cleanup(self) -> None: """Clean up all resources""" self.logger.info("Cleaning up streamer resources...") # Print final statistics stats = self.packet_processor.get_final_stats() if stats.total_packets > 0: self.logger.info( f"Final Stats: {stats.total_packets} packets, " f"{stats.valid_packets} valid ({100 - stats.error_rate:.1f}%), " f"{stats.invalid_packets} invalid ({stats.error_rate:.1f}%)" ) # Clean up mock manager if used if self.mock_rfcomm_manager: try: self.mock_rfcomm_manager.close() self.mock_rfcomm_manager = None except Exception as e: self.logger.error(f"Error closing mock RFCOMM: {e}") # Close output streams self.csv_writer.close() self.websocket_streamer.close() if self.lsl_streamer: self.lsl_streamer.close() self.logger.info("Cleanup complete - stream stopped safely")
[docs] def parse_arguments() -> argparse.Namespace: """Parse command line arguments""" parser = argparse.ArgumentParser( description="MW75 EEG Streamer - Stream EEG data to CSV files and/or WebSocket", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: python -m mw75_streamer # EEG data to stdout python -m mw75_streamer --browser # Open dashboard panel python -m mw75_streamer > eeg_data.csv # Redirect stdout to file python -m mw75_streamer --csv eeg_data.csv # EEG to CSV file python -m mw75_streamer --csv eeg.csv --extra events.csv # Both CSV files python -m mw75_streamer --ws ws://localhost:8080 # WebSocket streaming only python -m mw75_streamer --lsl MW75_EEG # LSL streaming only """, ) parser.add_argument( "-csv", "--csv-file", help="Output file for EEG data (Event ID 239). If not specified, EEG data prints to stdout", ) parser.add_argument( "-extra", "--extra-file", help="Output file for other events (non-EEG). Default: other_events.csv if CSV mode is used", ) parser.add_argument( "-ws", "--websocket", help="WebSocket URL for real-time EEG streaming (ws://host:port/path or wss://host:port/path)", ) parser.add_argument( "-lsl", "--lsl-stream", help='LSL stream name for Lab Streaming Layer output (e.g., "MW75_EEG")', ) parser.add_argument("-v", "--verbose", action="store_true", help="Enable verbose logging") parser.add_argument( "--mock", action="store_true", help="Use mock MW75 device for development (no hardware required, cross-platform)", ) # Browser dashboard panel parser.add_argument( "-b", "--browser", action="store_true", help="Open browser dashboard (panel) and start internal WebSocket relay", ) parser.add_argument( "--panel-host", default="localhost", help="Host for internal panel WebSocket relay (default: localhost)", ) parser.add_argument( "--panel-port", type=int, default=8090, help="Port for internal panel WebSocket relay (default: 8090)", ) args = parser.parse_args() # Handle default values and validation if not args.csv_file and not args.websocket and not args.lsl_stream and not args.browser: print( "No output specified - streaming EEG data to stdout", file=sys.stderr, ) # Don't default extra_file anymore - let it be None to skip extra events return args
[docs] def show_output_configuration(args: argparse.Namespace, logger: Logger) -> None: """Display output configuration to user""" logger.info("Streaming data to:") if args.csv_file: logger.info(f"EEG CSV (Event ID 239): {args.csv_file}") if args.extra_file: logger.info(f"Other Events CSV: {args.extra_file}") else: logger.info("Other Events: discarded (use --extra to save)") elif args.websocket: logger.info(f"WebSocket EEG Stream: {args.websocket}") elif args.lsl_stream: logger.info(f"LSL Stream: {args.lsl_stream} (Lab Streaming Layer)") elif args.browser: logger.info("Browser Dashboard Panel") else: logger.info("EEG Data: stdout (console)") logger.info("Press Ctrl+C to stop streaming")
[docs] async def main() -> None: """Main entry point for the MW75 EEG streamer""" try: args = parse_arguments() except SystemExit: return # Setup logging setup_logging(args.verbose, "mw75_streamer") logger = get_logger(__name__) # Check if running on supported platform (skip check if using mock) if not args.mock and _MW75Device is None: logger.error("MW75 device support is only available on macOS") logger.error("Current platform: %s", sys.platform) logger.info("Tip: Use --mock flag for cross-platform development with synthetic data") logger.info( "For cross-platform support contributions, see: https://github.com/arctop/mw75-streamer/blob/main/CONTRIBUTING.md" ) return # Optional panel server for browser dashboard panel_server: Optional[PanelServer] = None ws_log_handler: Optional[WebSocketLogHandler] = None if getattr(args, "browser", False): try: panel_server = PanelServer( host=getattr(args, "panel_host", "localhost"), port=getattr(args, "panel_port", 8090), ) except ImportError as e: logger.error(str(e)) panel_server = None # Show configuration show_output_configuration(args, logger) # Create and start streamer streamer = MW75Streamer( csv_file=args.csv_file, extra_file=args.extra_file, websocket_url=args.websocket, lsl_stream_name=args.lsl_stream, panel_server=panel_server, verbose=args.verbose, use_mock=args.mock, ) # Start panel server and open browser if requested if panel_server: # Run the panel server in background to avoid blocking panel_server.start_background() # Attach WebSocket logging handler to top-level package logger top_logger = logging.getLogger("mw75_streamer") ws_log_handler = WebSocketLogHandler(panel_server) ws_log_handler.setLevel(logging.DEBUG if args.verbose else logging.INFO) formatter = logging.Formatter( "%(asctime)s - %(name)s - %(levelname)s - %(message)s", datefmt="%H:%M:%S" ) ws_log_handler.setFormatter(formatter) top_logger.addHandler(ws_log_handler) # Auto-open panel HTML in browser try: panel_html = os.path.join(os.path.dirname(__file__), "panel", "panel.html") webbrowser.open(f"file://{panel_html}") except Exception: logger.warning("Failed to open browser automatically. Open panel/panel.html manually.") success = await streamer.start_streaming() if not success: logger.error("Streaming failed") sys.exit(1) # Cleanup panel server if panel_server: try: top_logger = logging.getLogger("mw75_streamer") if ws_log_handler: top_logger.removeHandler(ws_log_handler) panel_server.stop_background() except Exception: pass
[docs] def cli_main() -> None: """Synchronous entry point for CLI console scripts""" try: asyncio.run(main()) except KeyboardInterrupt: print("\nInterrupted by user", file=sys.stderr) except Exception as e: print(f"\nUnexpected error: {e}", file=sys.stderr) sys.exit(1)
if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: print("\nInterrupted by user", file=sys.stderr) except Exception as e: print(f"\nUnexpected error: {e}", file=sys.stderr) sys.exit(1)