# usage: python data_processor.py [mac1] [mac2] ... [mac(n)] from __future__ import print_function from ctypes import c_void_p, cast, POINTER from mbientlab.metawear.cbindings import * from mbientlab.metawear import MetaWear, libmetawear, parse_value, cbindings, create_voidp,create_voidp_int,PassthroughMode,TimeMode from time import sleep from threading import Event from sys import argv states = [] class State: def __init__(self, device): self.device = device self.callback = cbindings.FnVoid_VoidP_DataP(self.data_handler) self.processor = None self.counter = 0 self.datarate = 20 def data_handler(self, ctx, data): #print("%s -> %s" % (self.device.address, cast(data.contents.value, POINTER(c_uint)).contents.value)) self.counter+=1 print("%s -> %s : %s" % (self.device.address, parse_value(data), self.counter)) def setup(self): libmetawear.mbl_mw_settings_set_connection_parameters(self.device.board, 7.5, 7.5, 0, 6000) sleep(1.5) e = Event() libmetawear.mbl_mw_sensor_fusion_set_mode(s.device.board, SensorFusionMode.NDOF) libmetawear.mbl_mw_sensor_fusion_set_acc_range(s.device.board, SensorFusionAccRange._8G) libmetawear.mbl_mw_sensor_fusion_set_gyro_range(s.device.board, SensorFusionGyroRange._2000DPS) libmetawear.mbl_mw_sensor_fusion_write_config(s.device.board) signal = libmetawear.mbl_mw_sensor_fusion_get_data_signal(s.device.board, SensorFusionData.QUATERNION) def processor_created(context, pointer): self.processor = pointer e.set() fn_wrapper = cbindings.FnVoid_VoidP_VoidP(processor_created) timer = create_voidp( lambda fn: libmetawear.mbl_mw_dataprocessor_time_create(signal, TimeMode.ABSOLUTE, int(1000 / self.datarate) - 1 , None, fn), resource="time", event=e) libmetawear.mbl_mw_dataprocessor_passthrough_create(signal, PassthroughMode.COUNT, 0, None, fn_wrapper) e.wait() e.clear() libmetawear.mbl_mw_event_record_commands(timer) libmetawear.mbl_mw_dataprocessor_passthrough_set_count(self.processor, 1) create_voidp_int(lambda fn: libmetawear.mbl_mw_event_end_record(timer, None, fn), event=e) libmetawear.mbl_mw_datasignal_subscribe(self.processor, None, s.callback) def start(self): libmetawear.mbl_mw_sensor_fusion_enable_data(s.device.board, SensorFusionData.QUATERNION) libmetawear.mbl_mw_sensor_fusion_start(s.device.board) if __name__ == '__main__': #enter list of sensors mac addresses list_of_devices = ['MMR ->ED:82:8B', 'MMS->B6:B4:B6',' MMS-> 94:C4:7A'] for device in list_of_devices: d = MetaWear(device) d.connect() print("Connected to " + d.address) states.append(State(d)) for s in states: print("Configuring %s" % s.device.address) s.setup() for s in states: s.start() #Sample for 5 seconds sleep(5.0) print("Resetting devices") events = [] for s in states: libmetawear.mbl_mw_sensor_fusion_stop(s.device.board) libmetawear.mbl_mw_led_stop_and_clear(s.device.board) libmetawear.mbl_mw_logging_stop(s.device.board) libmetawear.mbl_mw_logging_clear_entries(s.device.board) libmetawear.mbl_mw_macro_erase_all(s.device.board) signal = libmetawear.mbl_mw_sensor_fusion_get_data_signal(s.device.board, SensorFusionData.QUATERNION) libmetawear.mbl_mw_datasignal_unsubscribe(signal) libmetawear.mbl_mw_debug_reset_after_gc(s.device.board) event = Event() s.device.on_disconnect = lambda status: event.set() libmetawear.mbl_mw_debug_disconnect(s.device.board) event.wait()