from __future__ import print_function from mbientlab.metawear import MetaWear, libmetawear, parse_value from mbientlab.metawear.cbindings import * from mbientlab.warble import * from time import sleep from threading import Event import platform import sys import datetime import time import os class Cube: def __init__(self,letter, device_mac, btl_mac): self.__device = None self.letter = letter self.device_mac = device_mac self.btl_mac = btl_mac self.connected = False self.__charge_status_callback = FnVoid_VoidP_VoidP_Int(self.__charge_status_handler) self.charge_status = -1 self.__battery_signal = None self.__battery_signal_callback = FnVoid_VoidP_DataP(self.__battery_data_handler) self.battery_state = -1 self.__acc_signal = None self.acc_samples = 0 self.__acc_curtime = 0 self.__acc_prevtime = 0 self.__acc_signal_callback = FnVoid_VoidP_DataP(self.__acc_data_handler) self.__acc_data_file = None self.__gyr_signal = None self.gyr_samples = 0 self.__gyr_curtime = 0 self.__gyr_prevtime = 0 self.__gyr_signal_callback = FnVoid_VoidP_DataP(self.__gyr_data_handler) self.__gyr_data_file = None self.__magn_signal = None self.magn_samples = 0 self.__magn_curtime = 0 self.__magn_prevtime = 0 self.__magn_signal_callback = FnVoid_VoidP_DataP(self.__magn_data_handler) self.__magn_data_file = None self.__callback_answer = False ''' self.__fuser_processor = None self.__fuser_processor_created_callback = FnVoid_VoidP_VoidP(self.__fuser_processor_created) self.__fuser_processor_callback = FnVoid_VoidP_DataP(self.__fuser_processor_data_handler) ''' def connect(self): # Connessione alla scheda MetamotionR tramite il suo MAC address ed il MAC address del dispositvo Bluetooth (BTL) # Utilizzare il comando 'hcitool dev' per individuare il MAC del proprio dispositivo BTL self.__device = MetaWear(self.device_mac, hci_mac=self.btl_mac) self.__device.connect() libmetawear.mbl_mw_settings_set_connection_parameters(self.__device.board, 7.5, 7.5, 0, 6000) #libmetawear.mbl.mbl_mw_settings_set_ad_interval(500,0) print("Connected to " + self.__device.address) sleep(2) self.connected = True def disconnect(self): #libmetawear.mbl_mw_debug_disconnect(self.__device.board) self.__device.disconnect() self.connected = False self.charge_status = -1 self.battery_state = -1 def reset(self): #libmetawear.mbl_mw_metawearboard_free(self.__device.board) libmetawear.mbl_mw_metawearboard_tear_down(self.__device.board,10) #libmetawear.mbl_mw_debug_reset(self.__device.board) # Battery State def read_battery_state(self): self.__callback_answer = False self.__battery_signal = libmetawear.mbl_mw_settings_get_battery_state_data_signal(self.__device.board) libmetawear.mbl_mw_datasignal_subscribe(self.__battery_signal, None, self.__battery_signal_callback) libmetawear.mbl_mw_datasignal_read(self.__battery_signal) current_time = int(round(time.time() * 100)) current_seconds = (current_time // 100) % 60 prev_seconds = current_seconds while not self.__callback_answer and current_seconds-prev_seconds <= 3: sleep(0.15) current_time = int(round(time.time() * 100)) current_seconds = (current_time // 100) % 60 return self.battery_state def __battery_data_handler(self, ctx, data): battery_state = cast(data.contents.value, POINTER(BatteryState)).contents self.battery_state = battery_state.charge #print("%s -> Voltage: %s Charge: %s" % (self.__device.address, battery_state.voltage, battery_state.charge)) self.__callback_answer = True # Charge Status def read_charge_status(self): self.__callback_answer = False libmetawear.mbl_mw_settings_read_current_charge_status(self.__device.board, None, self.__charge_status_callback) current_time = int(round(time.time() * 100)) current_seconds = (current_time // 100) % 60 prev_seconds = current_seconds while not self.__callback_answer and current_seconds-prev_seconds <= 3: sleep(0.15) current_time = int(round(time.time() * 100)) current_seconds = (current_time // 100) % 60 return self.charge_status def __charge_status_handler(self, ctx, board, value): self.charge_status = value ''' if (value != Const.SETTINGS_CHARGE_STATUS_UNSUPPORTED): print("Charge status: %d" % (value)) else: print("Charge status not supported on this board") ''' self.__callback_answer = True #Accelerometer def config_acc(self, odr, range): # Config odr and range of accelerometer libmetawear.mbl_mw_acc_set_odr(self.__device.board, odr) libmetawear.mbl_mw_acc_set_range(self.__device.board, range) libmetawear.mbl_mw_acc_write_acceleration_config(self.__device.board) def check_acc_type(self): # Individuare modello di accelerometro acc_type = libmetawear.mbl_mw_metawearboard_lookup_module(self.__device.board, Module.ACCELEROMETER) if acc_type==Const.MODULE_TYPE_NA: print("No accelerometer on this board") elif acc_type==Const.MODULE_ACC_TYPE_MMA8452Q: print("Acc Type = MMA8452Q") elif acc_type==Const.MODULE_ACC_TYPE_BMI160: print("Acc Type = BMI160") elif acc_type==Const.MODULE_ACC_TYPE_BMA255: print("Acc Type = BMA255") def enable_acc_sampling(self, base_path=""): self.__acc_signal = libmetawear.mbl_mw_acc_get_packed_acceleration_data_signal(self.__device.board) libmetawear.mbl_mw_datasignal_subscribe(self.__acc_signal, None, self.__acc_signal_callback) libmetawear.mbl_mw_acc_enable_acceleration_sampling(self.__device.board) filename = '{}_{}_{}'.format(self.letter, self.__device.address.replace(':','.'), "acceleremoter.txt") data_filename = os.path.join(base_path, filename) if data_filename: self.__acc_data_file = open(data_filename, 'w') def disable_acc_sampling(self): libmetawear.mbl_mw_acc_disable_acceleration_sampling(self.__device.board) libmetawear.mbl_mw_datasignal_unsubscribe(self.__acc_signal) def start_acc_sampling(self): self.acc_samples=0 libmetawear.mbl_mw_acc_start(self.__device.board) def stop_acc_sampling(self): libmetawear.mbl_mw_acc_stop(self.__device.board) def __acc_data_handler(self, ctx, data): acc_data = cast(data.contents.value, POINTER(CartesianFloat)).contents #print("%.3f: %s %s %s" % (data.contents.epoch, acc_data.x, acc_data.y, acc_data.z)) #timestamp = datetime.datetime.fromtimestamp(float(data.contents.epoch/1000)) #print("%s -> %s %s %s %s" % (self.__device.address, timestamp.strftime('%Y-%m-%d %H:%M:%S.%f'), acc_data.x, acc_data.y, acc_data.z)) if self.acc_samples == 0: self.__acc_curtime = 0 else: self.__acc_curtime = self.__acc_curtime + (data.contents.epoch - self.__acc_prevtime) if self.__acc_data_file != None: self.__acc_data_file.write("%s\t%.3f\t%.3f\t%.3f\n" % (self.__acc_curtime, acc_data.x, acc_data.y, acc_data.z)) #self.file.write("%s -> %s %s %s %s \n" % (self.__device.address, timestamp.strftime('%Y-%m-%d %H:%M:%S.%f'), acc_data.x, acc_data.y, acc_data.z)) self.__acc_prevtime = data.contents.epoch self.acc_samples+= 1 #Gyroscope def config_gyr(self, odr, range): libmetawear.mbl_mw_gyro_bmi160_set_odr(self.__device.board, odr) libmetawear.mbl_mw_gyro_bmi160_set_range(self.__device.board, range) libmetawear.mbl_mw_gyro_bmi160_write_config(self.__device.board) def enable_gyr_sampling(self, base_path=""): self.__gyr_signal = libmetawear.mbl_mw_gyro_bmi160_get_packed_rotation_data_signal(self.__device.board) libmetawear.mbl_mw_datasignal_subscribe(self.__gyr_signal, None, self.__gyr_signal_callback) libmetawear.mbl_mw_gyro_bmi160_enable_rotation_sampling(self.__device.board) filename = '{}_{}_{}'.format(self.letter,self.__device.address.replace(':','.'), "gyroscope.txt") data_filename = os.path.join(base_path, filename) if data_filename: self.__gyr_data_file = open(data_filename, 'w') def disable_gyr_sampling(self): libmetawear.mbl_mw_gyro_bmi160_disable_rotation_sampling(self.__device.board) libmetawear.mbl_mw_datasignal_unsubscribe(self.__gyr_signal) def start_gyr_sampling(self): self.gyr_samples=0 libmetawear.mbl_mw_gyro_bmi160_start(self.__device.board) def stop_gyr_sampling(self): libmetawear.mbl_mw_gyro_bmi160_stop(self.__device.board) def __gyr_data_handler(self, ctx, data): gyr_data = cast(data.contents.value, POINTER(CartesianFloat)).contents #print("%.3f: %s %s %s" % (data.contents.epoch, gyr_data.x, gyr_data.y, gyr_data.z)) #timestamp = datetime.datetime.fromtimestamp(float(data.contents.epoch/1000)) #print("%s -> %s %s %s %s" % (self.__device.address, timestamp.strftime('%Y-%m-%d %H:%M:%S.%f'), gyr_data.x, gyr_data.y, gyr_data.z)) if self.gyr_samples == 0: self.__gyr_curtime = 0 else: self.__gyr_curtime = self.__gyr_curtime + (data.contents.epoch - self.__gyr_prevtime) if self.__gyr_data_file != None: #self.file.write("%s -> %s %s %s %s \n" % (self.__device.address, timestamp.strftime('%Y-%m-%d %H:%M:%S.%f'), gyr_data.x, gyr_data.y, gyr_data.z)) self.__gyr_data_file.write("%s\t%.3f\t%.3f\t%.3f\n" % (self.__gyr_curtime, gyr_data.x, gyr_data.y, gyr_data.z)) self.__gyr_prevtime = data.contents.epoch self.gyr_samples+= 1 #Magnetometer def config_magn(self, preset): libmetawear.mbl_mw_mag_bmm150_set_preset(self.__device.board, preset) def enable_magn_sampling(self, base_path=""): self.__magn_signal = libmetawear.mbl_mw_mag_bmm150_get_packed_b_field_data_signal(self.__device.board) libmetawear.mbl_mw_datasignal_subscribe(self.__magn_signal, None, self.__magn_signal_callback) libmetawear.mbl_mw_mag_bmm150_enable_b_field_sampling(self.__device.board) filename = '{}_{}_{}'.format(self.letter,self.__device.address.replace(':','.'), "magnetometer.txt") data_filename = os.path.join(base_path, filename) if data_filename: self.__magn_data_file = open(data_filename, 'w') def disable_magn_sampling(self): libmetawear.mbl_mw_mag_bmm150_disable_b_field_sampling(self.__device.board) libmetawear.mbl_mw_datasignal_unsubscribe(self.__magn_signal) def start_magn_sampling(self): self.magn_samples=0 libmetawear.mbl_mw_mag_bmm150_start(self.__device.board) def stop_magn_sampling(self): libmetawear.mbl_mw_mag_bmm150_stop(self.__device.board) def __magn_data_handler(self, ctx, data): magn_data = cast(data.contents.value, POINTER(CartesianFloat)).contents #print("%.3f: %s %s %s" % (data.contents.epoch, gyr_data.x, gyr_data.y, gyr_data.z)) #timestamp = datetime.datetime.fromtimestamp(float(data.contents.epoch/1000)) #print("%s -> %s %s %s %s" % (self.__device.address, timestamp.strftime('%Y-%m-%d %H:%M:%S.%f'), magn_data.x, magn_data.y, magn_data.z)) if self.magn_samples == 0: self.__magn_curtime = 0 else: self.__magn_curtime = self.__magn_curtime + (data.contents.epoch - self.__magn_prevtime) if self.__magn_data_file != None: #self.file.write("%s -> %s %s %s %s \n" % (self.__device.address, timestamp.strftime('%Y-%m-%d %H:%M:%S.%f'), magn_data.x, magn_data.y, magn_data.z)) self.__magn_data_file.write("%s\t%.3f\t%.3f\t%.3f\n" % (self.__magn_curtime, magn_data.x, magn_data.y, magn_data.z)) self.__magn_prevtime = data.contents.epoch self.magn_samples+= 1 ''' # Data Processor Type: Fuser def enable_fuser_sampling(self, base_path=""): self.__acc_signal = libmetawear.mbl_mw_acc_get_acceleration_data_signal(self.__device.board) self.__gyr_signal = libmetawear.mbl_mw_gyro_bmi160_get_rotation_data_signal(self.__device.board) libmetawear.mbl_mw_acc_enable_acceleration_sampling(self.__device.board) libmetawear.mbl_mw_gyro_bmi160_enable_rotation_sampling(self.__device.board) signals = (c_void_p * 1)(self.__gyr_signal) libmetawear.mbl_mw_dataprocessor_fuser_create(self.__acc_signal, signals, 1, None, self.__fuser_processor_created_callback) def __fuser_processor_created(self, ctx, processor): self.__fuser_processor = processor libmetawear.mbl_mw_datasignal_subscribe(processor, None, self.__fuser_processor_callback) print(processor) def __fuser_processor_data_handler(self,ctx,data): print("sample") print(parse_value(data[0])) def start_fuser_sampling(self): self.start_acc_sampling self.start_gyr_sampling def stop_fuser_sampling(self): self.stop_acc_sampling self.stop_gyr_sampling def disable_fuser_sampling(self): libmetawear.mbl_mw_acc_disable_acceleration_sampling(self.__device.board) libmetawear.mbl_mw_gyro_bmi160_disable_rotation_sampling(self.__device.board) libmetawear.mbl_mw_datasignal_unsubscribe(self.__fuser_processor) '''