# connects to a single MMR and streams euler angles at 100Hz from __future__ import print_function #import MetaWear classes and variables from mbientlab.metawear import MetaWear, libmetawear, parse_value from mbientlab.metawear.cbindings import * #import sleep function to be able to pause execution from time import sleep, time #event object that manages flags from threading import Event from ctypes import c_void_p, cast, POINTER, c_uint import csv import sys #create MetaWear object with specified adress of board and call connect method device = MetaWear("DA:6A:BD:93:6C:FA") device.connect() #returns True if device is connected if device.is_connected: print("Device Connected") #create state class that handles incoming data of sensors class State: def __init__(self, device): self.device = device self.samples_received = 0 self.callback = FnVoid_VoidP_DataP(self.data_handler) self.data = [] self.processor = None def data_handler(self, ctx, data): print("%s -> %s" % (self.device.address, parse_value(data))) euler_components = parse_value(data) roll = euler_components.roll pitch = euler_components.pitch yaw = euler_components.yaw self.epoch = data.contents.epoch self.sample = cast(data.contents.extra, POINTER(c_uint8)).contents.value self.data.append([self.epoch, roll, pitch, yaw, self.sample]) self.samples_received+= 1 def setup(self): libmetawear.mbl_mw_settings_set_connection_parameters(self.device.board, 30, 30, 0, 6000) sleep(1.5) e = Event() def processor_created(context, pointer): self.processor = pointer e.set() fn_wrapper = FnVoid_VoidP_VoidP(processor_created) libmetawear.mbl_mw_sensor_fusion_set_mode(self.device.board, SensorFusionMode.NDOF); libmetawear.mbl_mw_sensor_fusion_set_acc_range(self.device.board, SensorFusionAccRange._8G) libmetawear.mbl_mw_sensor_fusion_set_gyro_range(self.device.board, SensorFusionGyroRange._2000DPS) libmetawear.mbl_mw_sensor_fusion_write_config(self.device.board) signal = libmetawear.mbl_mw_sensor_fusion_get_data_signal(self.device.board, SensorFusionData.EULER_ANGLE); libmetawear.mbl_mw_dataprocessor_accounter_create_count(signal, None ,fn_wrapper) e.wait() libmetawear.mbl_mw_datasignal_subscribe(self.processor, None, self.callback) def start(self): libmetawear.mbl_mw_sensor_fusion_enable_data(self.device.board, SensorFusionData.EULER_ANGLE); libmetawear.mbl_mw_sensor_fusion_start(self.device.board); states = [] states.append(State(device)) for s in states: print("Configuring device") s.setup() for s in states: print("Starting Stream") s.start() sleep(10.0) for s in states: libmetawear.mbl_mw_sensor_fusion_stop(s.device.board); libmetawear.mbl_mw_metawearboard_tear_down(s.device.board) sleep(1.0) libmetawear.mbl_mw_debug_reset(s.device.board) sleep(1.0) libmetawear.mbl_mw_debug_disconnect(s.device.board) sleep(1.0) print("Total Samples Received") for s in states: print("%s -> %d" % (s.device.address, s.samples_received)) for s in states: with open("euler_accounter_1.csv", "w", newline = "") as csvfile: writer = csv.writer(csvfile) for line in s.data: writer.writerow(line)