Source code for mw75_streamer.main

"""
MW75 EEG Streamer - Main Entry Point

Clean main function and CLI interface for the MW75 EEG streamer.
"""

import sys
import asyncio
import argparse
from typing import Optional, List, Any, TYPE_CHECKING, Callable
import time
import webbrowser
import os
import logging
from logging import Logger

from .utils.logging import setup_logging, get_logger
from .data.packet_processor import PacketProcessor

# Type checking imports
if TYPE_CHECKING:
    # Import the actual type for MyPy, even if it might not work at runtime
    try:
        from .device.mw75_device import MW75Device
    except ImportError:
        # On platforms where PyObjC dependencies aren't available
        MW75Device = Any  # type: ignore

# Runtime platform detection
if sys.platform == "darwin":
    from .device.mw75_device import MW75Device as _MW75Device  # noqa: F401
else:
    _MW75Device = None
from .data.streamers import CSVWriter, WebSocketStreamer, StdoutStreamer, LSLStreamer
from .data.packet_processor import EEGPacket
from .panel.panel_server import PanelServer, WebSocketLogHandler


[docs] class MW75Streamer: """Main MW75 EEG streamer application""" device: "MW75Device" def __init__( self, csv_file: Optional[str] = None, extra_file: Optional[str] = None, websocket_url: Optional[str] = None, lsl_stream_name: Optional[str] = None, panel_server: Optional[PanelServer] = None, verbose: Optional[bool] = False, eeg_callback: Optional[Callable[[EEGPacket], None]] = None, raw_data_callback: Optional[Callable[[bytes], None]] = None, other_event_callback: Optional[Callable[[bytes], None]] = None, ): """ Initialize MW75 streamer Args: csv_file: Path for EEG CSV output file extra_file: Path for other events CSV output file websocket_url: WebSocket URL for real-time streaming lsl_stream_name: LSL stream name for LSL streaming panel_server: Panel server for browser dashboard verbose: Enable verbose logging eeg_callback: Custom callback function for EEG packets (receives EEGPacket objects) raw_data_callback: Custom callback function for raw device data (receives bytes) other_event_callback: Custom callback function for non-EEG events (receives bytes) """ # Store custom callbacks self.eeg_callback = eeg_callback self.raw_data_callback = raw_data_callback self.other_event_callback = other_event_callback self.csv_writer = CSVWriter(csv_file, extra_file) self.websocket_streamer = WebSocketStreamer(websocket_url) self.verbose = verbose # Initialize LSL streamer with error handling self.lsl_streamer = None if lsl_stream_name: try: self.lsl_streamer = LSLStreamer(lsl_stream_name) except ImportError as e: self.logger = get_logger(__name__) self.logger.error(f"Failed to initialize LSL streamer: {e}") sys.exit(1) # Suppress stdout printing when browser panel is used or custom callback is provided if panel_server is not None: self.stdout_streamer = None else: self.stdout_streamer = StdoutStreamer( print_header=( not csv_file and not websocket_url and not lsl_stream_name and not eeg_callback ) ) self.packet_processor = PacketProcessor(self.verbose or False) self.logger = get_logger(__name__) # Panel server relay and runtime stats self.panel_server = panel_server self.last_counter: Optional[int] = None self.dropped_packets: int = 0 self._rate_times: List[float] = [] self._last_stats_emit: float = 0.0 # Initialize device with data callback if _MW75Device is None: raise RuntimeError("MW75Device not available on this platform") self.device = _MW75Device(self._handle_device_data)
[docs] def set_verbose(self, verbose: bool) -> None: """ Enable or disable verbose logging including checksum error messages Args: verbose: True to enable verbose logging, False to suppress it """ self.verbose = verbose self.packet_processor.verbose = verbose self.logger.debug(f"Verbose logging {'enabled' if verbose else 'disabled'}")
def _handle_device_data(self, data: bytes) -> None: """Handle raw data received from MW75 device""" # Call raw data callback if provided if self.raw_data_callback: try: self.raw_data_callback(data) except Exception as e: self.logger.error(f"Error in raw data callback: {e}") # Process the data into packets self.packet_processor.process_data_buffer( data, self._handle_eeg_packet, self._handle_other_event ) def _handle_eeg_packet(self, packet: EEGPacket) -> None: """Handle processed EEG packet""" # Call user's custom EEG callback if provided if self.eeg_callback: try: self.eeg_callback(packet) except Exception as e: self.logger.error(f"Error in EEG callback: {e}") # Sequence tracking for dropped packets try: if self.last_counter is not None: expected = (self.last_counter + 1) % 256 if packet.counter != expected: dropped = (packet.counter - expected) % 256 self.dropped_packets += dropped self.last_counter = packet.counter except Exception: pass # Write to CSV file if specified if self.csv_writer.eeg_file: self.csv_writer.write_eeg_packet(packet) elif ( self.stdout_streamer and not self.websocket_streamer.connected and not self.lsl_streamer and not self.eeg_callback ): # Write to stdout if no other outputs self.stdout_streamer.write_eeg_packet(packet) # Send to WebSocket if connected self.websocket_streamer.send_eeg_data(packet) # Send to LSL if configured if self.lsl_streamer: self.lsl_streamer.send_eeg_data(packet) # Debug logging self.logger.debug( f"EEG Packet: counter={packet.counter}, channels={len(packet.channels)}, checksum=OK" ) # Relay to panel if clients are connected if self.panel_server: # Lightweight rate tracking (packets per 5s window) now = time.time() self._rate_times.append(now) five_sec_ago = now - 5.0 self._rate_times = [t for t in self._rate_times if t > five_sec_ago] # Publish EEG self.panel_server.publish_eeg( { "timestamp": packet.timestamp, "event_id": packet.event_id, "counter": packet.counter, "ref": packet.ref, "drl": packet.drl, "channels": { f"ch{i + 1}": packet.channels[i] for i in range(len(packet.channels)) }, "feature_status": packet.feature_status, } ) # Throttle stats emission to 5 Hz if now - self._last_stats_emit > 0.2: stats = self.packet_processor.get_final_stats() current_rate = len(self._rate_times) / 5.0 self.panel_server.publish_stats( { "total_packets": stats.total_packets, "invalid_packets": stats.invalid_packets, "valid_packets": stats.valid_packets, "error_rate": stats.error_rate, "dropped_packets": self.dropped_packets, "rate": current_rate, } ) self._last_stats_emit = now def _handle_other_event(self, packet: bytes) -> None: """Handle non-EEG events""" # Call user's custom other event callback if provided if self.other_event_callback: try: self.other_event_callback(packet) except Exception as e: self.logger.error(f"Error in other event callback: {e}") self.csv_writer.write_other_event(packet) event_id = packet[1] if len(packet) > 1 else 0 counter = packet[3] if len(packet) > 3 else 0 self.logger.debug(f"Other Event: event_id={event_id}, counter={counter}")
[docs] async def start_streaming(self) -> bool: """ Start the MW75 streaming process Returns: True if streaming completed successfully, False otherwise """ try: self.logger.info("MW75 EEG Streamer - Starting...") # Connect and stream data success = await self.device.connect_and_stream() return bool(success) except Exception as e: self.logger.error(f"Error during streaming: {e}") return False finally: await self._cleanup()
async def _cleanup(self) -> None: """Clean up all resources""" self.logger.info("Cleaning up streamer resources...") # Print final statistics stats = self.packet_processor.get_final_stats() if stats.total_packets > 0: self.logger.info( f"Final Stats: {stats.total_packets} packets, " f"{stats.valid_packets} valid ({100 - stats.error_rate:.1f}%), " f"{stats.invalid_packets} invalid ({stats.error_rate:.1f}%)" ) # Close output streams self.csv_writer.close() self.websocket_streamer.close() if self.lsl_streamer: self.lsl_streamer.close() self.logger.info("Cleanup complete - stream stopped safely")
[docs] def parse_arguments() -> argparse.Namespace: """Parse command line arguments""" parser = argparse.ArgumentParser( description="MW75 EEG Streamer - Stream EEG data to CSV files and/or WebSocket", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: python -m mw75_streamer # EEG data to stdout python -m mw75_streamer --browser # Open dashboard panel python -m mw75_streamer > eeg_data.csv # Redirect stdout to file python -m mw75_streamer --csv eeg_data.csv # EEG to CSV file python -m mw75_streamer --csv eeg.csv --extra events.csv # Both CSV files python -m mw75_streamer --ws ws://localhost:8080 # WebSocket streaming only python -m mw75_streamer --lsl MW75_EEG # LSL streaming only """, ) parser.add_argument( "-csv", "--csv-file", help="Output file for EEG data (Event ID 239). If not specified, EEG data prints to stdout", ) parser.add_argument( "-extra", "--extra-file", help="Output file for other events (non-EEG). Default: other_events.csv if CSV mode is used", ) parser.add_argument( "-ws", "--websocket", help="WebSocket URL for real-time EEG streaming (ws://host:port/path or wss://host:port/path)", ) parser.add_argument( "-lsl", "--lsl-stream", help='LSL stream name for Lab Streaming Layer output (e.g., "MW75_EEG")', ) parser.add_argument("-v", "--verbose", action="store_true", help="Enable verbose logging") # Browser dashboard panel parser.add_argument( "-b", "--browser", action="store_true", help="Open browser dashboard (panel) and start internal WebSocket relay", ) parser.add_argument( "--panel-host", default="localhost", help="Host for internal panel WebSocket relay (default: localhost)", ) parser.add_argument( "--panel-port", type=int, default=8090, help="Port for internal panel WebSocket relay (default: 8090)", ) args = parser.parse_args() # Handle default values and validation if not args.csv_file and not args.websocket and not args.lsl_stream and not args.browser: print( "No output specified - streaming EEG data to stdout", file=sys.stderr, ) # Don't default extra_file anymore - let it be None to skip extra events return args
[docs] def show_output_configuration(args: argparse.Namespace, logger: Logger) -> None: """Display output configuration to user""" logger.info("Streaming data to:") if args.csv_file: logger.info(f"EEG CSV (Event ID 239): {args.csv_file}") if args.extra_file: logger.info(f"Other Events CSV: {args.extra_file}") else: logger.info("Other Events: discarded (use --extra to save)") elif args.websocket: logger.info(f"WebSocket EEG Stream: {args.websocket}") elif args.lsl_stream: logger.info(f"LSL Stream: {args.lsl_stream} (Lab Streaming Layer)") elif args.browser: logger.info("Browser Dashboard Panel") else: logger.info("EEG Data: stdout (console)") logger.info("Press Ctrl+C to stop streaming")
[docs] async def main() -> None: """Main entry point for the MW75 EEG streamer""" try: args = parse_arguments() except SystemExit: return # Setup logging setup_logging(args.verbose, "mw75_streamer") logger = get_logger(__name__) # Check if running on supported platform if _MW75Device is None: logger.error("MW75 device support is only available on macOS") logger.error("Current platform: %s", sys.platform) logger.info( "For cross-platform support contributions, see: https://github.com/arctop/mw75-streamer/blob/main/CONTRIBUTING.md" ) return # Optional panel server for browser dashboard panel_server: Optional[PanelServer] = None ws_log_handler: Optional[WebSocketLogHandler] = None if getattr(args, "browser", False): try: panel_server = PanelServer( host=getattr(args, "panel_host", "localhost"), port=getattr(args, "panel_port", 8090), ) except ImportError as e: logger.error(str(e)) panel_server = None # Show configuration show_output_configuration(args, logger) # Create and start streamer streamer = MW75Streamer( csv_file=args.csv_file, extra_file=args.extra_file, websocket_url=args.websocket, lsl_stream_name=args.lsl_stream, panel_server=panel_server, verbose=args.verbose, ) # Start panel server and open browser if requested if panel_server: # Run the panel server in background to avoid blocking panel_server.start_background() # Attach WebSocket logging handler to top-level package logger top_logger = logging.getLogger("mw75_streamer") ws_log_handler = WebSocketLogHandler(panel_server) ws_log_handler.setLevel(logging.DEBUG if args.verbose else logging.INFO) formatter = logging.Formatter( "%(asctime)s - %(name)s - %(levelname)s - %(message)s", datefmt="%H:%M:%S" ) ws_log_handler.setFormatter(formatter) top_logger.addHandler(ws_log_handler) # Auto-open panel HTML in browser try: panel_html = os.path.join(os.path.dirname(__file__), "panel", "panel.html") webbrowser.open(f"file://{panel_html}") except Exception: logger.warning("Failed to open browser automatically. Open panel/panel.html manually.") success = await streamer.start_streaming() if not success: logger.error("Streaming failed") sys.exit(1) # Cleanup panel server if panel_server: try: top_logger = logging.getLogger("mw75_streamer") if ws_log_handler: top_logger.removeHandler(ws_log_handler) panel_server.stop_background() except Exception: pass
[docs] def cli_main() -> None: """Synchronous entry point for CLI console scripts""" try: asyncio.run(main()) except KeyboardInterrupt: print("\nInterrupted by user", file=sys.stderr) except Exception as e: print(f"\nUnexpected error: {e}", file=sys.stderr) sys.exit(1)
if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: print("\nInterrupted by user", file=sys.stderr) except Exception as e: print(f"\nUnexpected error: {e}", file=sys.stderr) sys.exit(1)