from mbientlab.metawear import MetaWear, libmetawear, parse_value, cbindings import time import numpy as np import socket import struct import json # import matplotlib.pyplot as plt # MAC_ADDRESS="FD:3C:E4:1F:75:F4" MAC_ADDRESS="CD:A6:F7:E4:AA:3D" class Accelerometer(): """ The first Accelerometer class """ def __init__(self, mac_address=None, device='metaware'): if device == 'metaware' and mac_address is not None: self.device = MetaWear(mac_address) else: raise ValueError self.device.connect() self.__led_config = { 'ready': { 'repeat': 0, # cbindings.Const.LED_REPEAT_INDEFINITELY, 'preset': cbindings.LedPreset.SOLID, 'color': cbindings.LedColor.GREEN, }, 'stream': { 'repeat': 3, 'preset': cbindings.LedPreset.BLINK, 'color': cbindings.LedColor.BLUE, }, } self.set_LED_Pattern('ready') self.acc_rate = 0. print('Initializing device') self.stream = [] def set_LED_Pattern(self, ltype='ready'): """ Cosmetic function that change the LED color according to the action. """ pattern = cbindings.LedPattern(repeat_count=self.__led_config[ltype]['repeat']) libmetawear.mbl_mw_led_load_preset_pattern(cbindings.byref(pattern), self.__led_config[ltype]['preset']) libmetawear.mbl_mw_led_write_pattern(self.device.board, cbindings.byref(pattern), self.__led_config[ltype]['color']) libmetawear.mbl_mw_led_play(self.device.board) def configure(self, acc_rate=50., acc_range=4.): """ Configure the aquisition of the accelerometer :acc_rate: float : rate of acquisition of the accelerometer in Hz, , defaults to 50 :acc_range: float : range of the accelerometer in g, , defaults to 4. """ self.acc_rate = acc_rate # Enable accelerometer sampling before activating it libmetawear.mbl_mw_acc_enable_acceleration_sampling(self.device.board) # Set ODR to 25Hz or closest valid frequency libmetawear.mbl_mw_acc_set_odr(self.device.board, acc_rate) # Set range to +/-4g or closest valid range libmetawear.mbl_mw_acc_set_range(self.device.board, acc_range) # Write the config to the sensor libmetawear.mbl_mw_acc_write_acceleration_config(self.device.board) def stream_data(self, duration=0, show=False): """ Stream data :duration: time in second that the stream last, if 0 then it never stop, , defaults to 0 :show: bool : if we want to print in prompt every data on the fly, defaults to False :return data: list of the data sent by the device. """ t0 = time.time() fps = self.acc_rate if fps < 0. : raise ValueError frame = 0 delta_frame = 1./fps frame_max = duration*fps if duration > 0 else np.inf def data_handler(context, data): on_fly = parse_value(data) values = { 'time': time.time()-t0, 'x': on_fly.x, 'y': on_fly.y, 'z': on_fly.z } if show: print(values) self.stream.append(values) data_handler_fn = cbindings.FnVoid_VoidP_DataP(data_handler) signal = libmetawear.mbl_mw_acc_get_acceleration_data_signal(self.device.board) libmetawear.mbl_mw_datasignal_subscribe(signal, None, data_handler_fn) libmetawear.mbl_mw_acc_start(self.device.board) self.set_LED_Pattern('stream') print(">> Streaming data") while frame < frame_max: time.sleep(delta_frame) frame += 1 print('> ok') libmetawear.mbl_mw_datasignal_unsubscribe(signal) libmetawear.mbl_mw_acc_stop(self.device.board) def write(self, filename): """ Exports data to a filename The output is formated like this : [time, x, y, z] Where time, x, y and z are lists sorted in a chronological way """ times = [] x = [] y = [] z = [] print('Exporting data to ' + filename) with(open(filename, "wb")) as F: for i, frame in enumerate(self.stream): times.append(frame['time']) x.append(frame['x']) y.append(frame['y']) z.append(frame['z']) output = [times, x, y, z] np.save(F, output) def flush(self): self.stream = [] if __name__ == "__main__": from streamdata import Accelerometer, MAC_ADDRESS import os import time beep = lambda x: os.system("echo -n '\a';sleep 0.2;" * x) total_second = 5 + 10 + \ 5 + 10 + \ 5 + 10 + \ 5 + 10 + \ 5 + 10 # come back acc = Accelerometer(mac_address=MAC_ADDRESS) acc.configure(acc_rate=50.) print("Streaming") for step in ["rest", "table", "foot", "walk", "chamber", "back"]: beep(1) print(step) acc.stream_data(10) beep(1) print("\nget ready:") time.sleep(5) acc.write("test_transmission.npy")