from mbientlab.metawear.cbindings import ( BaroBoschOversampling, BaroBmp280StandbyTime, BaroBoschIirFilter, FnVoid_VoidP_VoidP, POINTER, cast, c_uint, c_int, c_float, DataTypeId, CartesianFloat, BatteryState, Tcs34725ColorAdc, EulerAngles, Quaternion, CorrectedCartesianFloat, FnVoid_VoidP_DataP, FnVoid_VoidP_UInt_UInt, FnVoid_VoidP_UByte_Long_UByteP_UByte, LogDownloadHandler, byref, c_ubyte, Const ) from mbientlab.metawear import MetaWear, libmetawear from mbientlab.metawear.metawear import _array_to_buffer, _gattchar_to_string import threading import csv from pathlib import Path from copy import deepcopy from datetime import datetime import time # monkeypatch enable notifications so we can log the incoming bytes def _enable_notifications_patched(self, context, caller, ptr_gattchar, handler, ready): def handle_and_log(value): print('Received notification from device %s' % self.address) print(' '.join('{:02x}'.format(x) for x in value)) handler( caller, cast(_array_to_buffer(value), POINTER(c_ubyte)), len(value) ) print("Enabling notifications for %s" % self.address) uuid = _gattchar_to_string(ptr_gattchar.contents) gatt_char = self.warble.find_characteristic(uuid) if (gatt_char is None): # import pdb; pdb.set_trace() ready(caller, Const.STATUS_ERROR_ENABLE_NOTIFY) else: def completed(err): if err != None: print(str(err)) # import pdb; pdb.set_trace() ready(caller, Const.STATUS_ERROR_ENABLE_NOTIFY) else: gatt_char.on_notification_received(handle_and_log) ready(caller, Const.STATUS_OK) gatt_char.enable_notifications_async(completed) MetaWear._enable_notifications = _enable_notifications_patched def configure(board): # configure sensors libmetawear.mbl_mw_acc_set_odr(board, 100.0) libmetawear.mbl_mw_acc_set_range(board, 8.0) libmetawear.mbl_mw_acc_write_acceleration_config(board) libmetawear.mbl_mw_baro_bosch_set_standby_time( board, BaroBmp280StandbyTime._0_5ms ) libmetawear.mbl_mw_baro_bosch_set_oversampling( board, BaroBoschOversampling.STANDARD ) libmetawear.mbl_mw_baro_bosch_set_iir_filter( board, BaroBoschIirFilter.OFF ) libmetawear.mbl_mw_baro_bosch_write_config(board) print('Configuration written. Setting up loggers...') # set up loggers pointers = {} _callbacks = [] for name, signal in { 'accel': libmetawear.mbl_mw_acc_get_acceleration_data_signal, 'pres': libmetawear.mbl_mw_baro_bosch_get_pressure_data_signal, 'switch': libmetawear.mbl_mw_switch_get_state_data_signal, }.items(): e = threading.Event() def cb(ctx, ptr): nonlocal pointers pointers[name] = ptr e.set() cfunc_cb = FnVoid_VoidP_VoidP(cb) libmetawear.mbl_mw_datasignal_log( signal(board), None, cfunc_cb ) e.wait() print('Loggers set up. Starting logging.') # start logging libmetawear.mbl_mw_acc_enable_acceleration_sampling(board) libmetawear.mbl_mw_acc_start(board) libmetawear.mbl_mw_baro_bosch_start(board) libmetawear.mbl_mw_logging_start(board, 0) return pointers def generate_filename(address, name): return '{}-{}-{}.csv'.format(address, name, int(time.time())) def stop(board, address): # stop logging libmetawear.mbl_mw_acc_stop(board) libmetawear.mbl_mw_acc_disable_acceleration_sampling(board) libmetawear.mbl_mw_baro_bosch_stop(board) libmetawear.mbl_mw_logging_stop(board) print('Logging stopped for', address) def download(board, pointers, address): print('Initiating download for', address) def _byte_array_handler(data): ptr = cast(data.contents.value, POINTER(c_ubyte * data.contents.length)) return [ptr.contents[i].value for i in range(0, data.contents.length)] # set up download # first define and set up callbacks DATA_HANDLERS = { DataTypeId.UINT32: lambda x: cast( x.contents.value, POINTER(c_uint)).contents.value, DataTypeId.INT32: lambda x: cast( x.contents.value, POINTER(c_int)).contents.value, DataTypeId.FLOAT: lambda x: cast( x.contents.value, POINTER(c_float)).contents.value, DataTypeId.CARTESIAN_FLOAT: lambda x: cast( x.contents.value, POINTER(CartesianFloat)).contents, DataTypeId.BATTERY_STATE: lambda x: cast( x.contents.value, POINTER(BatteryState)).contents, DataTypeId.BYTE_ARRAY: _byte_array_handler, DataTypeId.TCS34725_ADC: lambda x: cast( x.contents.value, POINTER(Tcs34725ColorAdc)).contents, DataTypeId.EULER_ANGLE: lambda x: cast( x.contents.value, POINTER(EulerAngles)).contents, DataTypeId.QUATERNION: lambda x: cast( x.contents.value, POINTER(Quaternion)).contents, DataTypeId.CORRECTED_CARTESIAN_FLOAT: lambda x: cast( x.contents.value, POINTER(CorrectedCartesianFloat)).contents } def data_error_handler(data): print('Unrecognized data type id: ' + str(data.contents.type_id)) def create_data_cb(logger_name): filename = generate_filename(address, logger_name) path = Path(filename) if not path.exists(): f = path.open('w') writer = csv.writer(f) else: f = path.open('a') writer = csv.writer(f) def cb(context, data): nonlocal writer value = deepcopy( DATA_HANDLERS.get(data.contents.type_id, data_error_handler)(data) ) epoch = int(data.contents.epoch) if isinstance(value, CartesianFloat): values = [value.x, value.y, value.z] else: values = [value] writer.writerow([str(epoch), str(datetime.utcfromtimestamp(epoch / 1000)), ''] + values) f.flush() return FnVoid_VoidP_DataP(cb) complete_event = threading.Event() def progress_cb(ctx, entries_left, total_entries): print(entries_left, total_entries) if entries_left == 0: complete_event.set() # keep callbacks in an array in case Python tries to GC them callbacks = [] for name, ptr in pointers.items(): print('Subscribing to', name, 'at', ptr) cb = create_data_cb(name) callbacks.append(cb) libmetawear.mbl_mw_logger_subscribe(ptr, None, cb) progress_cb = FnVoid_VoidP_UInt_UInt(progress_cb) def unknown_entry(ctx, id, epoch, data, length): print( 'Unknown Entry: ID: {0}, epoch: {1}, ' 'data: {2}, Length: {3}'.format( id, epoch, bytearray(data)[:length], length ) ) def unhandled_entry(ctx, data): print('Unhandled Entry: ' + str(data)) unhandled_entry = FnVoid_VoidP_DataP(unhandled_entry) unknown_entry = FnVoid_VoidP_UByte_Long_UByteP_UByte(unknown_entry) handler = LogDownloadHandler( received_progress_update=progress_cb, received_unknown_entry=unknown_entry, received_unhandled_entry=unhandled_entry ) # reduce the interval to 7.5ms libmetawear.mbl_mw_settings_set_connection_parameters( board, 7.5, 7.5, 0, 6000 ) # now start the download libmetawear.mbl_mw_logging_download(board, 1000, byref(handler)) complete_event.wait() ADDRESSES = ['E1:19:93:32:EA:37'] def main(): devices = {} pointers = {} for address in ADDRESSES: device = devices[address] = MetaWear(address) device.connect() print("Resetting", address) libmetawear.mbl_mw_logging_clear_entries(device.board) libmetawear.mbl_mw_debug_reset_after_gc(device.board) libmetawear.mbl_mw_debug_disconnect(device.board) time.sleep(5) device.disconnect() for address, device in devices.items(): device.connect() pointers[address] = configure(device.board) device.disconnect() print(pointers) time.sleep(60) for address, device in devices.items(): device.connect() stop(device.board, address) for address, device in devices.items(): download(device.board, pointers[address], address) device.disconnect() main()