import time from mbientlab.metawear.cbindings import * from mbientlab.metawear import MetaWear, libmetawear, parse_value, cbindings, create_voidp, create_voidp_int, \ PassthroughMode, TimeMode from time import sleep from threading import Event class State: def __init__(self, device): self.device = device self.callback = cbindings.FnVoid_VoidP_DataP(self.data_handler) self.download_callback = FnVoid_VoidP_DataP(self.download_handler) self.download_process_callback = FnVoid_VoidP_UInt_UInt(self.download_process) self.download_received_unknown_entry_callback = \ FnVoid_VoidP_UByte_Long_UByteP_UByte(self.received_unknown_entry_handler) self.processor = None self.counter = 0 self.download_counter = 0 self.datarate = 20 self.logger = None self.download_finished = Event() self.event = Event() self.unknown_entry = [None]*10 self.anonymous_signal_handler = FnVoid_VoidP_VoidP_VoidP_UInt(self.anonymous_signal) self.anonymous_signal_event = Event() self.anonymous_signal_result = {} self.download_handler = LogDownloadHandler(context=None, received_progress_update=self.download_process_callback, received_unknown_entry=self.download_received_unknown_entry_callback, received_unhandled_entry=self.download_callback) def anonymous_signal(self, ctx, board, signals, len): self.anonymous_signal_result['length'] = len self.anonymous_signal_result['signals'] = cast(signals, POINTER(c_void_p * len)) if signals is not None else None self.anonymous_signal_event.set() def received_unknown_entry_handler(self, ctx, id, epoch, data, length): self.unknown_entry[id] = data if id == length - 1: self.download_counter += 1 for ii in range(length): print(self.unknown_entry[ii].contents.value) print("Download: {epoch: %d, id: %d, lenght: %d value: %s : %s}" % (epoch, id, length, data, self.download_counter)) def download_handler(self, ctx, data): self.download_counter += 1 print("Download: {epoch: %d, value: %s : %s}" % (data.contents.epoch, parse_value(data), self.download_counter)) def data_handler(self, ctx, data): # print("%s -> %s" % (self.device.address, cast(data.contents.value, POINTER(c_uint)).contents.value)) self.counter += 1 print("Stream: %s -> %s : %s" % (int(time.time()*1000), parse_value(data), self.counter)) def download_process(self, ctx, entries_left, total_entries): if entries_left == 0: self.download_finished.set() def connect(self): self.device.connect() libmetawear.mbl_mw_settings_set_tx_power(self.device.board, 4) libmetawear.mbl_mw_settings_set_connection_parameters(self.device.board, 7.5, 7.5, 0, 2000) sleep(1) print('Connected and configured') def setup(self): pattern = LedPattern(high_time_ms=50, pulse_duration_ms=500, high_intensity=31) e = Event() libmetawear.mbl_mw_sensor_fusion_set_mode(self.device.board, SensorFusionMode.NDOF) libmetawear.mbl_mw_sensor_fusion_set_acc_range(self.device.board, SensorFusionAccRange._8G) libmetawear.mbl_mw_sensor_fusion_set_gyro_range(self.device.board, SensorFusionGyroRange._2000DPS) libmetawear.mbl_mw_sensor_fusion_write_config(self.device.board) signal = libmetawear.mbl_mw_sensor_fusion_get_data_signal(self.device.board, SensorFusionData.QUATERNION) # Reduce sample rates for data fusion by letting though only one sample every time a timer fires def processor_created(context, pointer): self.processor = pointer id = libmetawear.mbl_mw_dataprocessor_get_id(self.processor) print("Processor:" + str(self.processor)) print("ID:" + str(id)) libmetawear.mbl_mw_led_write_pattern(self.device.board, byref(pattern), LedColor.GREEN) libmetawear.mbl_mw_led_play(self.device.board) e.set() fn_wrapper = cbindings.FnVoid_VoidP_VoidP(processor_created) # create a timer fireing every time a new sample should be transmitted timer = create_voidp(lambda fn: libmetawear.mbl_mw_dataprocessor_time_create(signal, TimeMode.ABSOLUTE, int(1000 / self.datarate) - 1, None, fn), resource="time", event=e) # create a passthrough for letting through data libmetawear.mbl_mw_dataprocessor_passthrough_create(signal, PassthroughMode.COUNT, 100, None, fn_wrapper) e.wait() e.clear() # Record event: set passthrough to 1 sample every time the timer fires libmetawear.mbl_mw_event_record_commands(timer) libmetawear.mbl_mw_dataprocessor_passthrough_set_count(self.processor, 1) create_voidp_int(lambda fn: libmetawear.mbl_mw_event_end_record(timer, None, fn), event=e) # Create Logger for data # change from "self.processor" to "signal" to log 100Hz quaternions self.logger = create_voidp(lambda fn: libmetawear.mbl_mw_datasignal_log(self.processor, None, fn), resource="logger", event=e) print("LoggerID:" + str(libmetawear.mbl_mw_logger_get_id(self.logger))) # Start Sensor fusion libmetawear.mbl_mw_sensor_fusion_enable_data(self.device.board, SensorFusionData.QUATERNION) libmetawear.mbl_mw_sensor_fusion_start(self.device.board) e.clear() # Start Logging data on disconnect def commands_recorded(context, event, status): print("Event recorded") e.set() # Function pointer commands_recorded_fn = FnVoid_VoidP_VoidP_Int(commands_recorded) # Create an event based on a disconnection event = libmetawear.mbl_mw_settings_get_disconnect_event(self.device.board) # Start recording commands: libmetawear.mbl_mw_event_record_commands(event) # The first command is to write the led pattern libmetawear.mbl_mw_led_write_pattern(self.device.board, byref(pattern), LedColor.RED) # The second command is to play the led pattern libmetawear.mbl_mw_led_play(self.device.board) # Disable data stream # libmetawear.mbl_mw_datasignal_unsubscribe(signal) # Start Logging libmetawear.mbl_mw_logging_start(self.device.board, 0) # Stop recording commands (everything up until now is going into the MetaSensor memory) libmetawear.mbl_mw_event_end_record(event, None, commands_recorded_fn) # Wait until MetaSensor gives the OK e.wait() def start_logging(self): libmetawear.mbl_mw_logging_start(self.device.board, 0) def stop_logging(self): libmetawear.mbl_mw_logging_stop(self.device.board) def start_data(self): self.processor = libmetawear.mbl_mw_dataprocessor_lookup_id(self.device.board, 1) if self.processor: # If the sensor is configured print("Connecting to processor") libmetawear.mbl_mw_datasignal_subscribe(self.processor, None, self.callback) else: # reconnect print("Connecting to signal") signal = libmetawear.mbl_mw_sensor_fusion_get_data_signal(self.device.board, SensorFusionData.QUATERNION) libmetawear.mbl_mw_datasignal_subscribe(signal, None, self.callback) def stop_data(self): if self.processor: # If the sensor is configured libmetawear.mbl_mw_datasignal_unsubscribe(self.processor) else: # reconnect signal = libmetawear.mbl_mw_sensor_fusion_get_data_signal(self.device.board, SensorFusionData.QUATERNION) libmetawear.mbl_mw_datasignal_unsubscribe(signal) def anonymous_download(self): print("Creating anonymous signals") libmetawear.mbl_mw_metawearboard_create_anonymous_datasignals(self.device.board, None, self.anonymous_signal_handler) self.anonymous_signal_event.wait() # make sense of logging signals found if self.anonymous_signal_result['signals'] is None: if self.anonymous_signal_result['length'] != 0: print("Error creating anonymous signals, status = " + str(self.anonymous_signal_result['length'])) else: print("No active loggers detected") else: self.anonymous_signal_event.clear() # stop logging libmetawear.mbl_mw_logging_stop(self.device.board) print(str(self.anonymous_signal_result['length']) + " active loggers discovered") for signal in self.anonymous_signal_result['signals'].contents: self.identifier = libmetawear.mbl_mw_anonymous_datasignal_get_identifier(signal) # subscribe to download if good signal libmetawear.mbl_mw_anonymous_datasignal_subscribe(signal, None, self.download_callback) libmetawear.mbl_mw_logging_download(self.device.board, 0, byref(self.download_handler)) self.download_finished.wait() self.download_finished.clear() libmetawear.mbl_mw_led_stop_and_clear(self.device.board) def download(self): libmetawear.mbl_mw_logger_subscribe(self.logger, None, self.download_callback) libmetawear.mbl_mw_logging_stop(self.device.board) t = time.time() libmetawear.mbl_mw_logging_download(self.device.board, 0, byref(self.download_handler)) print("Waiting to finish") self.download_finished.wait() self.download_finished.clear() print('Download took ' + str(time.time() - t) + ' Seconds') # def reset(self): libmetawear.mbl_mw_sensor_fusion_stop(self.device.board) libmetawear.mbl_mw_led_stop_and_clear(self.device.board) libmetawear.mbl_mw_logging_stop(self.device.board) libmetawear.mbl_mw_logging_clear_entries(self.device.board) libmetawear.mbl_mw_macro_erase_all(self.device.board) # signal = libmetawear.mbl_mw_sensor_fusion_get_data_signal(self.device.board, SensorFusionData.QUATERNION) # libmetawear.mbl_mw_datasignal_unsubscribe(signal) libmetawear.mbl_mw_debug_reset_after_gc(self.device.board) event = Event() self.device.on_disconnect = lambda status: event.set() libmetawear.mbl_mw_debug_disconnect(self.device.board) event.wait() if __name__ == '__main__': d = MetaWear(sys.argv[1], deserialize=False) if len(sys.argv) > 1 else MetaWear('CE:70:B9:9D:92:C5', deserialize=False) state = State(device=d) state.connect() menu = {} menu['1'] = "Connect" menu['2'] = "Disconnect" menu['3'] = "Setup" menu['4'] = "Start Data" menu['5'] = "Stop Data" menu['6'] = "Start Logging" menu['7'] = "Stop Logging" menu['8'] = "Download" menu['9'] = "Anon. Download" menu['n'] = "Reconnect to current device (State reset)" menu['r'] = "Reset Sensor" menu['e'] = "Exit" menu['m'] = "Print Menu" def print_menu(): for entry in menu.keys(): print("Entry: %s -> %s" % (entry, menu[entry])) print_menu() while True: selection = input("Please Select:") if selection in menu.keys(): print(menu[selection]) if selection == '1': if state.device.is_connected: print("Already connected") else: state.connect() elif selection == '2': state.device.disconnect() elif selection == '3': state.setup() elif selection == '4': state.start_data() elif selection == '5': state.stop_data() elif selection == '6': state.start_logging() elif selection == '7': state.stop_logging() elif selection == '8': state.download() elif selection == '9': state.anonymous_download() elif selection == 'n': if state.device.is_connected: state.device.disconnect() d = MetaWear(sys.argv[1], deserialize=False) if len(sys.argv) > 1 else MetaWear('CE:70:B9:9D:92:C5', deserialize=False) state = State(device=d) state.connect() elif selection == 'r': state.reset() elif selection == 'm': print_menu() elif selection == 'e': break else: print('invalid selection' + selection)