from __future__ import print_function from mbientlab.metawear import MetaWear, libmetawear, parse_value, create_voidp, create_voidp_int from mbientlab.metawear.cbindings import * from mbientlab.warble import * from time import sleep from threading import Event class Sensor(): def __init__(self,mac): self.mac_add = mac self.d = None self.logger_acc = None def connect(self): self.d = MetaWear(self.mac_add) self.d.connect() libmetawear.mbl_mw_settings_set_connection_parameters(self.d.board, 7.5, 7.5, 0, 6000) sleep(1.0) return 0 def disconnect(self): try: ev = Event() self.d.on_disconnect = lambda status: ev.set() libmetawear.mbl_mw_metawearboard_free(self.d.board) libmetawear.mbl_mw_metawearboard_tear_down(self.d.board) libmetawear.mbl_mw_debug_reset(self.d.board) libmetawear.mbl_mw_debug_disconnect(self.d.board) ev.wait() except Exception as e: return -1 return 0 def read_log_acc(self): """ printint log to screen :return:Success\Error """ try: e = Event() def progress_update_handler(context, entries_left, total_entries): if (entries_left == 0): e.set() fn_wrapper = FnVoid_VoidP_UInt_UInt(progress_update_handler) download_handler = LogDownloadHandler(context=None, received_progress_update=fn_wrapper, received_unknown_entry=cast(None, FnVoid_VoidP_UByte_Long_UByteP_UByte), received_unhandled_entry=cast(None, FnVoid_VoidP_DataP)) callback_acc = FnVoid_VoidP_DataP( lambda ctx, p: print(f"{{'type': 'acc', 'epoch': {p.contents.epoch}, 'x': {parse_value(p).x}, 'y': {parse_value(p).y}, 'z': {parse_value(p).z}}}")) libmetawear.mbl_mw_logger_subscribe(self.logger_acc, None, callback_acc) libmetawear.mbl_mw_logging_download(self.d.board, 0, byref(download_handler)) e.wait() #remove logger from device libmetawear.mbl_mw_logger_remove(self.logger_acc) except: return -1 return 0 def start_log_acc(self,f,g): """ Configure acc start logging :param: time_s: time to log in seconds :return:#Success/Error """ try: libmetawear.mbl_mw_acc_set_odr(self.d.board, f) # Sample rate HZ libmetawear.mbl_mw_acc_set_range(self.d.board, g) # Range g libmetawear.mbl_mw_acc_write_acceleration_config(self.d.board) # Write configuration to device signal = libmetawear.mbl_mw_acc_get_acceleration_data_signal(self.d.board) self.logger_acc = create_voidp(lambda fn: libmetawear.mbl_mw_datasignal_log(signal, None, fn), resource="acc_logger") libmetawear.mbl_mw_logging_start(self.d.board,0) libmetawear.mbl_mw_acc_enable_acceleration_sampling(self.d.board) libmetawear.mbl_mw_acc_start(self.d.board) except Exception as e: return -1 return 0 def stop_log_acc(self): """ stopping the log """ try: libmetawear.mbl_mw_acc_stop(self.d.board) libmetawear.mbl_mw_acc_disable_acceleration_sampling(self.d.board) libmetawear.mbl_mw_logging_stop(self.d.board) except Exception as e: return -1 return 0 def reset(self): try: libmetawear.mbl_mw_logging_stop(self.d.board) libmetawear.mbl_mw_logging_clear_entries(self.d.board) libmetawear.mbl_mw_macro_erase_all(self.d.board) libmetawear.mbl_mw_debug_reset_after_gc(self.d.board) libmetawear.mbl_mw_debug_disconnect(self.d.board) except Exception as e: return -1 return 0 if __name__ == '__main__': s = Sensor("ED:1F:D1:15:03:BC") s.connect() print("connected") #1 s.start_log_acc(100,8) print("start logging") sleep(5) s.stop_log_acc() print("stop logging") s.read_log_acc() print("read log") #2 s.start_log_acc(100,8) print("start logging") sleep(5) s.stop_log_acc() print("stop logging") s.read_log_acc() print("read log") #3 s.start_log_acc(100,8) print("start logging") sleep(5) s.stop_log_acc() print("stop logging") s.read_log_acc() print("read log") s.disconnect() print("disconnect")