Source code for mw75_streamer.device.ble_manager

"""
BLE Manager for MW75 EEG Streamer

Handles Bluetooth Low Energy device discovery, connection, and activation sequence
for the MW75 Neuro headphones.
"""

import asyncio
from typing import Optional, Any
from bleak import BleakClient, BleakScanner

from ..config import (
    MW75_COMMAND_CHAR,
    MW75_STATUS_CHAR,
    MW75_CONTROL_CHAR,
    ENABLE_EEG_CMD,
    ENABLE_RAW_MODE_CMD,
    START_SESSION_CMD,
    DISABLE_EEG_CMD,
    DISABLE_RAW_MODE_CMD,
    STOP_SESSION_CMD,
    BATTERY_CMD,
    BLE_ACTIVATION_DELAY,
    BLE_COMMAND_DELAY,
    BLE_SESSION_DELAY,
    BLE_DISCOVERY_TIMEOUT,
    MW75_DEVICE_NAME_PATTERN,
    BLE_SUCCESS_CODE,
    BLE_EEG_COMMAND,
    BLE_RAW_MODE_COMMAND,
    BLE_UNKNOWN_E0_COMMAND,
    BLE_BATTERY_COMMAND,
)
from ..utils.logging import get_logger


[docs] class BLEManager: """Manages BLE connection and MW75 activation sequence""" def __init__(self) -> None: self.client: Optional[BleakClient] = None self.device_name: Optional[str] = None self.battery_level: Optional[int] = None self.logger = get_logger(__name__)
[docs] async def discover_and_activate(self) -> Optional[str]: """ Discover MW75 device and execute activation sequence Returns: Device name if successful, None if failed """ self.logger.info("Scanning for MW75...") # Discover BLE devices devices = await BleakScanner.discover(timeout=BLE_DISCOVERY_TIMEOUT) mw75_device = None for device in devices: if device.name and MW75_DEVICE_NAME_PATTERN in device.name.upper(): self.logger.info(f"Found MW75: {device.name}") mw75_device = device self.device_name = device.name break if not mw75_device: self.logger.error("MW75 not found") return None # Execute activation sequence if await self._activate_device(mw75_device): return self.device_name else: return None
async def _activate_device(self, device: Any) -> bool: """ Execute the MW75 activation sequence Args: device: BLE device to activate Returns: True if activation successful, False otherwise """ # Activation tracking responses = [] eeg_enabled = False raw_mode_enabled = False def notification_handler(sender: Any, data: bytearray) -> None: """Handle BLE activation responses""" responses.append(data) hex_data = " ".join(f"{b:02x}" for b in data) self.logger.debug(f"BLE Response: {hex_data}") nonlocal eeg_enabled, raw_mode_enabled if len(data) >= 5: cmd_type = data[3] status = data[4] self.logger.debug(f"BLE Command: 0x{cmd_type:02x}, Status: 0x{status:02x}") if cmd_type == BLE_EEG_COMMAND and status == BLE_SUCCESS_CODE: eeg_enabled = True self.logger.info("EEG mode confirmed enabled") elif cmd_type == BLE_RAW_MODE_COMMAND and status == BLE_SUCCESS_CODE: raw_mode_enabled = True self.logger.info("Raw mode confirmed enabled") elif cmd_type == BLE_UNKNOWN_E0_COMMAND: self.logger.debug(f"Unknown E0 command response: status=0x{status:02x}") elif cmd_type == BLE_BATTERY_COMMAND and status == BLE_SUCCESS_CODE: # Battery response format: [0x09, 0x9A, 0x03, 0x14, 0xF1, <battery_level>] if len(data) >= 6: battery_level = data[5] self.battery_level = battery_level self.logger.info(f"Battery level: {battery_level}%") else: self.logger.debug(f"Battery command response: status=0x{status:02x}") elif cmd_type == BLE_SUCCESS_CODE: # Alternative battery response format where success code comes first # Battery response format: [0x09, 0x9A, 0x03, 0xF1, <battery_level>] if data[0] == 0x09 and data[1] == 0x9A and data[2] == 0x03: battery_level = status # status field contains battery level self.battery_level = battery_level self.logger.info(f"Battery level: {battery_level}%") else: self.logger.debug(f"Success response: status=0x{status:02x}") else: self.logger.warning( f"Unexpected command response: 0x{cmd_type:02x} status=0x{status:02x}" ) try: # Connect to device self.client = BleakClient(device) await self.client.connect() self.logger.info("BLE connected") # Setup notifications to receive responses await self.client.start_notify(MW75_STATUS_CHAR, notification_handler) self.logger.info("BLE notifications enabled") # Send activation sequence with proper timing await self._send_activation_sequence() # Verify activation self.logger.info(f"Activation Results: EEG={eeg_enabled}, Raw={raw_mode_enabled}") self.logger.debug(f"Total responses: {len(responses)}") # Log all responses for debugging for i, resp in enumerate(responses): hex_data = " ".join(f"{b:02x}" for b in resp) self.logger.debug(f"Response {i + 1}: {hex_data}") if not (eeg_enabled and raw_mode_enabled): self.logger.error("BLE activation failed - EEG streaming not properly enabled") await self.client.disconnect() self.client = None return False self.logger.info("BLE activation confirmed successful") return True except Exception as e: self.logger.error(f"BLE activation error: {e}") if self.client: try: await self.client.disconnect() except Exception: pass self.client = None return False async def _send_activation_sequence(self) -> None: """Send the MW75 activation command sequence with proper timing""" self.logger.info("Sending ENABLE_EEG...") if self.client: await self.client.write_gatt_char(MW75_COMMAND_CHAR, ENABLE_EEG_CMD) await asyncio.sleep(BLE_ACTIVATION_DELAY) self.logger.info("Sending ENABLE_RAW_MODE...") if self.client: await self.client.write_gatt_char(MW75_COMMAND_CHAR, ENABLE_RAW_MODE_CMD) await asyncio.sleep(BLE_COMMAND_DELAY) self.logger.info("Sending START_SESSION...") if self.client: await self.client.write_gatt_char(MW75_CONTROL_CHAR, START_SESSION_CMD) await asyncio.sleep(BLE_SESSION_DELAY) # Battery check self.logger.info("Getting battery level...") if self.client: await self.client.write_gatt_char(MW75_COMMAND_CHAR, BATTERY_CMD) await asyncio.sleep(BLE_COMMAND_DELAY)
[docs] async def disconnect_after_activation(self) -> None: """ Disconnect BLE connection after activation is complete. This is required on macOS Taho (26+) where keeping the BLE connection open blocks RFCOMM delegate callbacks from being delivered. """ if not self.client: self.logger.debug("No BLE client to disconnect") return try: if self.client.is_connected: await self.client.disconnect() self.logger.info("BLE disconnected (activation complete)") self.client = None except Exception as e: self.logger.warning(f"Error disconnecting BLE after activation: {e}") self.client = None
[docs] async def cleanup(self) -> None: """Send disable commands and disconnect from BLE""" # Note: On macOS Taho+, BLE is disconnected after activation, so client may be None if not self.client and not self.device_name: self.logger.debug("No BLE connection to cleanup") return # If device_name is set but client is None, we disconnected after activation # We need to reconnect to send disable commands to properly reset device state if not self.client and self.device_name: self.logger.info( "BLE was disconnected after activation - reconnecting to send disable commands..." ) await self._reconnect_and_disable() return # At this point, self.client must be set (type guard for mypy) if not self.client: self.logger.debug("No BLE client to cleanup") return try: await self._send_disable_sequence() except Exception as e: self.logger.error(f"Error during BLE cleanup: {e}") finally: self.client = None self.device_name = None self.battery_level = None
async def _send_disable_sequence(self) -> None: """Send the disable command sequence to the device""" if not self.client: return self.logger.info("Stopping EEG streaming...") # Send stop commands in reverse order self.logger.info("Sending STOP_SESSION...") await self.client.write_gatt_char(MW75_CONTROL_CHAR, STOP_SESSION_CMD) await asyncio.sleep(BLE_ACTIVATION_DELAY) self.logger.info("Sending DISABLE_RAW_MODE...") await self.client.write_gatt_char(MW75_COMMAND_CHAR, DISABLE_RAW_MODE_CMD) await asyncio.sleep(BLE_ACTIVATION_DELAY) self.logger.info("Sending DISABLE_EEG...") await self.client.write_gatt_char(MW75_COMMAND_CHAR, DISABLE_EEG_CMD) await asyncio.sleep(BLE_COMMAND_DELAY) await self.client.disconnect() self.logger.info("EEG streaming disabled and BLE disconnected") async def _reconnect_and_disable(self) -> None: """Reconnect to BLE and send disable commands to reset device state""" if not self.device_name: self.logger.warning("Cannot reconnect for cleanup - device name not available") return try: # Scan for the device self.logger.debug(f"Scanning for {self.device_name} to send disable commands...") devices = await BleakScanner.discover(timeout=BLE_DISCOVERY_TIMEOUT) target_device = None for device in devices: if device.name and self.device_name.upper() in device.name.upper(): target_device = device break if not target_device: self.logger.warning( f"Could not find {self.device_name} for cleanup - device may be out of range" ) self.device_name = None return # Reconnect and send disable commands self.logger.debug("Reconnecting to BLE for cleanup...") self.client = BleakClient(target_device) await self.client.connect() self.logger.debug("BLE reconnected for cleanup") # Send disable sequence await self._send_disable_sequence() except Exception as e: self.logger.warning(f"Error during BLE reconnection for cleanup: {e}") finally: self.client = None self.device_name = None self.battery_level = None