#!/usr/bin/env python # -*- coding: utf-8 -*- from __future__ import print_function from mbientlab.metawear import MetaWear, libmetawear, parse_value from mbientlab.metawear.cbindings import * from time import sleep from threading import Event import platform import sys import MAC_Address as mac import time import csv ############################################################################################################################ # collect all the samples acceleration = [ [], [], [] ] gyro_acc = [ [], [], [] ] magField = [ [], [], [] ] elapsedTime_acc = [0] elapsedTime_gyro = [0] elapsedTime_mag = [0] ############################################################################################################################ class FusionDevice: def __init__(self, MACAddress): self.device = MetaWear(MAC_Rosso) self.samplesAcc = 0 self.samplesGyro = 0 self.samplesMag = 0 self.callbackAcc = FnVoid_VoidP_DataP(self.data_handlerAcc) self.callbackGyro = FnVoid_VoidP_DataP(self.data_handlerGyro) self.callbackMag = FnVoid_VoidP_DataP(self.data_handlerMag) # self.callbackBattery = FnVoid_VoidP_DataP(self.battery_handler) # self.batteryVoltage = 0 # self.batteryCharge = 0 self.initTimeAcc = 0 self.initTimeGyro = 0 self.initTimeMag = 0 self.thisEpochAcc = 0 self.thisEpochGyro = 0 self.thisEpochMag = 0 ############################################################################################################################ def data_handlerAcc(self, ctx, data): #print("%s -> %s" % (self.device.address, parse_value(data))) coordinates = parse_value(data) acceleration[0].append(coordinates.x*9.8) acceleration[1].append(coordinates.y*9.8) acceleration[2].append(coordinates.z*9.8) self.thisEpochAcc = data.contents.epoch #First sample [0][x,y,z] if(self.samplesAcc == 0): self.initTimeAcc = self.thisEpochAcc #from 2nd to last sample [now-init][x,y,z] else: elapsedTime_acc.append(float(self.thisEpochAcc-self.initTimeAcc)) self.samplesAcc += 1 #################################################################### def data_handlerGyro(self, ctx, data): #print("%s -> %s" % (self.device.address, parse_value(data))) coordinates = parse_value(data) gyro_acc[0].append(coordinates.x*9.8) gyro_acc[1].append(coordinates.y*9.8) gyro_acc[2].append(coordinates.z*9.8) self.thisEpochGyro = data.contents.epoch #First sample [0][x,y,z] if(self.samplesGyro == 0): self.initTimeGyro = self.thisEpochGyro #from 2nd to last sample [now-init][x,y,z] else: elapsedTime_gyro.append(float(self.thisEpochGyro-self.initTimeGyro)) self.samplesGyro += 1 #################################################################### def data_handlerMag(self, ctx, data): #print("%s -> %s" % (self.device.address, parse_value(data))) coordinates = parse_value(data) magField[0].append(coordinates.x) magField[1].append(coordinates.y) magField[2].append(coordinates.z) self.thisEpochMag = data.contents.epoch #First sample [0][x,y,z] if(self.samplesMag == 0): self.initTimeMag = self.thisEpochMag #from 2nd to last sample [now-init][x,y,z] else: elapsedTime_mag.append(float(self.thisEpochMag-self.initTimeMag)) self.samplesMag += 1 ############################################################################################################################ # def battery_handler(self, ctx, data): # value = parse_value(data) # self.batteryVoltage = value.voltage # self.batteryCharge = value.charge ############################################################################################################################ def connect(self): self.device.connect() print("\nConnected to " + self.device.address) ############################################################################################################################ def configure(self): ############# Device configuration print("\nConfiguring %s" % self.device.address) libmetawear.mbl_mw_settings_set_connection_parameters(self.device.board, 7.5, 7.5, 0, 6000) sleep(1.0) ############# Sensors configuration # ACC config print("Accelerometer configuration") libmetawear.mbl_mw_acc_set_odr(self.device.board, 100.0) #100Hz libmetawear.mbl_mw_acc_set_range(self.device.board, 16.0) libmetawear.mbl_mw_acc_write_acceleration_config(self.device.board) # GYRO config print("Gyroscope configuration") libmetawear.mbl_mw_gyro_bmi160_set_odr(self.device.board, GyroBmi160Odr._100Hz) #100Hz libmetawear.mbl_mw_gyro_bmi160_set_range(self.device.board, GyroBmi160Range._250dps) libmetawear.mbl_mw_gyro_bmi160_write_config(self.device.board) # Magnotometer config print("Magnetometer configuration\n") libmetawear.mbl_mw_mag_bmm150_set_preset(self.device.board, MagBmm150Preset.HIGH_ACCURACY) #20Hz ############################################################################################################################ def startStreaming(self): print('\n%s start streaming' % self.device.address) # Accelerometer signalACC = libmetawear.mbl_mw_acc_get_acceleration_data_signal(self.device.board) libmetawear.mbl_mw_datasignal_subscribe(signalACC, None, self.callbackAcc) libmetawear.mbl_mw_acc_enable_acceleration_sampling(self.device.board) libmetawear.mbl_mw_acc_start(self.device.board) #Gyroscope signalGyro = libmetawear.mbl_mw_gyro_bmi160_get_rotation_data_signal(self.device.board) libmetawear.mbl_mw_datasignal_subscribe(signalGyro, None, self.callbackGyro) libmetawear.mbl_mw_gyro_bmi160_enable_rotation_sampling(self.device.board) libmetawear.mbl_mw_gyro_bmi160_start(self.device.board) #Magnetometer signalMag = libmetawear.mbl_mw_mag_bmm150_get_b_field_data_signal(self.device.board) libmetawear.mbl_mw_datasignal_subscribe(signalMag, None, self.callbackMag) libmetawear.mbl_mw_mag_bmm150_enable_b_field_sampling(self.device.board) libmetawear.mbl_mw_mag_bmm150_start(self.device.board) #Battery # signalBattery = self.libmetawear.mbl_mw_settings_get_battery_state_data_signal(self.board) # voltage = self.libmetawear.mbl_mw_datasignal_get_component(signalBattery, Const.SETTINGS_BATTERY_VOLTAGE_INDEX) # charge = self.libmetawear.mbl_mw_datasignal_get_component(signalBattery, Const.SETTINGS_BATTERY_CHARGE_INDEX) # self.libmetawear.mbl_mw_datasignal_subscribe(charge, None, self.callbackBattery) # self.libmetawear.mbl_mw_datasignal_read(voltage) # self.libmetawear.mbl_mw_datasignal_read(charge) ############################################################################################################################ def stopStraming(self): print('\n%s stop streaming data' % self.device.address) #Accelerometer libmetawear.mbl_mw_acc_stop(self.device.board) libmetawear.mbl_mw_acc_disable_acceleration_sampling(self.device.board) signalAcc = libmetawear.mbl_mw_acc_get_acceleration_data_signal(self.device.board) libmetawear.mbl_mw_datasignal_unsubscribe(signalAcc) # libmetawear.mbl_mw_debug_disconnect(s.device.board) print("Accelerometer stop streaming") #Gyroscope libmetawear.mbl_mw_gyro_bmi160_stop(self.device.board) libmetawear.mbl_mw_gyro_bmi160_disable_rotation_sampling(self.device.board) signalGyro = libmetawear.mbl_mw_gyro_bmi160_get_rotation_data_signal(self.device.board) libmetawear.mbl_mw_datasignal_unsubscribe(signalGyro) print("Gyroscope sto streaming") #Magnetometer libmetawear.mbl_mw_mag_bmm150_stop(self.device.board) libmetawear.mbl_mw_mag_bmm150_disable_b_field_sampling(self.device.board) signalMag = libmetawear.mbl_mw_mag_bmm150_get_b_field_data_signal(self.device.board) libmetawear.mbl_mw_datasignal_unsubscribe(signalMag, None, self.callbackMag) print("Magnetometer stop streaming\n") libmetawear.mbl_mw_debug_disconnect(self.device.board) ############## SAVE DATA STREAMING TO CSV FILEs ####################### with open(self.device.address+'_acc_100Hz_step1.csv', mode='w') as acc_file: acc_writer = csv.writer(acc_file, delimiter=',', quoting=csv.QUOTE_MINIMAL) acc_writer.writerow(['Elapsed time [ms]', 'x-acc', 'y-acc', 'z-acc' ]) for i in range(len(acceleration[0])): acc_writer.writerow([elapsedTime_acc[i], acceleration[0][i], acceleration[1][i], acceleration[2][i]]) if acc_file.closed: print("%s raw data saved to ./%s\n" % ("AccelerometerBmi160", acc_file.name)) with open(self.device.address+'_gyro_100Hz_step1.csv', mode='w') as gyro_file: gyro_writer = csv.writer(gyro_file, delimiter=',', quoting=csv.QUOTE_MINIMAL) gyro_writer.writerow(['Elapsed time [ms]', 'x-acc', 'y-acc', 'z-acc' ]) for i in range(len(gyro_acc[0])): gyro_writer.writerow([elapsedTime_gyro[i], gyro_acc[0][i], gyro_acc[1][i], gyro_acc[2][i]]) if(gyro_file.closed): print("%s raw data saved to ./%s\n" % ("GyroBmi160", gyro_file.name)) with open(self.device.address+'_mag_100Hz_step1.csv', mode='w') as mag_file: mag_writer = csv.writer(mag_file, delimiter=',', quoting=csv.QUOTE_MINIMAL) mag_writer.writerow(['Elapsed time [ms]', 'x-acc', 'y-acc', 'z-acc' ]) for i in range(len(magField[0])): mag_writer.writerow([elapsedTime_mag[i], magField[0][i], magField[1][i], magField[2][i]]) if mag_file.closed: print("%s raw data saved to ./%s\n" % ("MagnetometerBmm150", mag_file.name)) ############################################################################################################################ # blink led for battery charge level under 15% # def blinkLed(sefl, color, start = True): # pattern = LedPattern(repeat_count= Const.LED_REPEAT_INDEFINITELY) # libmetawear.mbl_mw_led_load_preset_pattern(byref(pattern), LedPreset.BLINK) # switch(color): # case 'green': # libmetawear.mbl_mw_led_write_pattern(self.device.board, byref(pattern), LedColor.GREEN) # break # case 'red': # ibmetawear.mbl_mw_led_write_pattern(self.device.board, byref(pattern), LedColor.RED) # break # case 'blue': # libmetawear.mbl_mw_led_write_pattern(self.device.board, byref(pattern), LedColor.BLUE) # break # if start==True: # libmetawear.mbl_mw_led_play(self.device.board) # else: # libmetawear.mbl_mw_led_stop_and_clear(self.device.board) ############################################################################################################################ ####### end of FUSIONDEVICE ################################################################################################ ############################################################################################################################ ################### TEST ###################################### MAC_Rosso = mac.MAC_Rosso #"C2:2D:11:80:72:15" MAC_Nero = mac.MAC_Nero #"F2:7B:27:68:0A:25" streamingTime = 15.0 #Streaming time deviceTest = FusionDevice(MAC_Rosso) deviceTest.connect() deviceTest.configure() deviceTest.startStreaming() print('\nStreaming time: ',streamingTime,' s\n') sleep(streamingTime) #streaming time deviceTest.stopStraming()