Downloading (or streaming) while logging
Hello,
I have 3 MMS sensors that I am using to track accelerometer and gyro data. I set up the following code to get the devices to log data, and after logging, download the data (see below). However, I was wondering if there is any way to get the devices to download while logging the data (or stream). I tried using multiprocessing, but that seems to give a lot of errors (at least the way I do it). Would you have any suggestions on how I could achieve this?
Thanks!
from __future__ import print_function
from mbientlab.metawear import MetaWear, libmetawear, parse_value, create_voidp
from mbientlab.metawear.cbindings import AccBmi270Odr, AccBoschRange, GyroBoschRange, \
GyroBoschOdr, FnVoid_VoidP_DataP, FnVoid_VoidP_UInt_UInt, LogDownloadHandler, cast, byref, \
FnVoid_VoidP_UByte_Long_UByteP_UByte, SensorFusionData
from mbientlab.warble import BleScanner
from time import sleep
from threading import Event
import keyboard
from datetime import datetime
# Get time and date
now = datetime.now()
stamp = f'{now.year}{now.month}{now.day}_{now.hour}-{now.minute}-{now.second}'
MAC_devices = ['XX:XX:XX:XX:XX:XX', 'YY:YY:YY:YY:YY:YY', 'ZZ:ZZ:ZZ:ZZ:ZZ:ZZ']
input_arr = list(enumerate(MAC_devices))
# Open file lists
acc_log_list = [None]*len(MAC_devices)
acc_stream_list = [None]*len(MAC_devices)
gyro_log_list = [None]*len(MAC_devices)
gyro_stream_list = [None]*len(MAC_devices)
# Logger list
device_loggers = [0]*len(MAC_devices)
# State list
device_states = [None]*len(MAC_devices)
# Device list
device_instances = [None]*len(MAC_devices)
# Prepare the log lists
for i, device in enumerate(MAC_devices):
acc_log_list[i] = open(f'logs/acc_log_{i+1}_{stamp}.txt', 'w')
gyro_log_list[i] = open(f'logs/gyro_log_{i+1}_{stamp}.txt', 'w')
class State:
def __init__(self, device, index):
self.device = device
self.samples = 0
self.index = index
def setup_device(state):
libmetawear.mbl_mw_acc_bmi270_set_odr(state.device.board, AccBmi270Odr._400Hz)
libmetawear.mbl_mw_acc_bosch_set_range(state.device.board, AccBoschRange._4G)
libmetawear.mbl_mw_acc_write_acceleration_config(state.device.board)
libmetawear.mbl_mw_gyro_bmi270_set_range(state.device.board, GyroBoschRange._500dps);
libmetawear.mbl_mw_gyro_bmi270_set_odr(state.device.board, GyroBoschOdr._400Hz);
libmetawear.mbl_mw_gyro_bmi270_write_config(state.device.board);
acc = libmetawear.mbl_mw_acc_get_acceleration_data_signal(state.device.board)
acc_logger = create_voidp(lambda fn: libmetawear.mbl_mw_datasignal_log(acc, None, fn), resource = "acc_logger")
gyro = libmetawear.mbl_mw_gyro_bmi270_get_rotation_data_signal(state.device.board)
gyro_logger = create_voidp(lambda fn: libmetawear.mbl_mw_datasignal_log(gyro, None, fn), resource = "gyro_logger")
return (acc_logger, gyro_logger)
def start_device(state):
libmetawear.mbl_mw_logging_start(state.device.board, 0)
libmetawear.mbl_mw_acc_enable_acceleration_sampling(state.device.board)
libmetawear.mbl_mw_acc_start(state.device.board)
libmetawear.mbl_mw_gyro_bmi270_enable_rotation_sampling(state.device.board)
libmetawear.mbl_mw_gyro_bmi270_start(state.device.board)
return
def stop_device(state):
libmetawear.mbl_mw_logging_stop(state.device.board)
libmetawear.mbl_mw_acc_stop(state.device.board)
libmetawear.mbl_mw_acc_disable_acceleration_sampling(state.device.board)
libmetawear.mbl_mw_gyro_bmi270_stop(state.device.board)
libmetawear.mbl_mw_gyro_bmi270_disable_rotation_sampling(state.device.board)
acc = libmetawear.mbl_mw_acc_get_acceleration_data_signal(state.device.board)
libmetawear.mbl_mw_datasignal_unsubscribe(acc)
gyro = libmetawear.mbl_mw_gyro_bmi270_get_rotation_data_signal(state.device.board)
libmetawear.mbl_mw_datasignal_unsubscribe(gyro)
libmetawear.mbl_mw_logging_flush_page(state.device.board)
return
def download_data(state, loggers, index):
e = Event()
device = state.device
def progress_update_handler(context, entries_left, total_entries):
if (entries_left == 0):
e.set()
def data_logger_acc(p):
acc_log_list[index].write(f'{p.contents.epoch}, {parse_value(p)}\n')
def data_logger_gyro(p):
gyro_log_list[index].write(f'{p.contents.epoch}, {parse_value(p)}\n')
callback_acc = FnVoid_VoidP_DataP(lambda ctx, p: data_logger_acc(p))
callback_gyro = FnVoid_VoidP_DataP(lambda ctx, p: data_logger_gyro(p))
fn_wrapper = FnVoid_VoidP_UInt_UInt(progress_update_handler)
download_handler = LogDownloadHandler(context = None,
received_progress_update = fn_wrapper,
received_unknown_entry = cast(None, FnVoid_VoidP_UByte_Long_UByteP_UByte),
received_unhandled_entry = cast(None, FnVoid_VoidP_DataP))
libmetawear.mbl_mw_logger_subscribe(loggers[0], None, callback_acc)
libmetawear.mbl_mw_logger_subscribe(loggers[1], None, callback_gyro)
libmetawear.mbl_mw_logging_download(device.board, 0,
byref(download_handler))
e.wait()
e.clear()
return True
def start(MAC_devices):
for index, MAC in enumerate(MAC_devices):
device = MetaWear(MAC)
device.connect()
libmetawear.mbl_mw_settings_set_connection_parameters(device.board, 7.5, 7.5, 0, 6000)
sleep(1.0)
state = State(device, index)
loggers = setup_device(state)
start_device(state)
device_loggers[index] = loggers
device_states[index] = state
device_instances[index] = device
return
def stop(MAC_devices):
for index, state in enumerate(device_states):
stop_device(state)
for index, state in enumerate(device_states):
download_data(state, device_loggers[index], index)
acc_log_list[index].close()
gyro_log_list[index].close()
libmetawear.mbl_mw_debug_disconnect(state.device.board)
return
def reset_dev(MAC):
device = MetaWear(MAC)
device.connect()
libmetawear.mbl_mw_logging_stop(device.board)
sleep(1.0)
libmetawear.mbl_mw_logging_flush_page(device.board)
sleep(1.0)
libmetawear.mbl_mw_logging_clear_entries(device.board)
sleep(1.0)
libmetawear.mbl_mw_event_remove_all(device.board)
sleep(1.0)
libmetawear.mbl_mw_macro_erase_all(device.board)
sleep(1.0)
libmetawear.mbl_mw_debug_reset_after_gc(device.board)
sleep(1.0)
libmetawear.mbl_mw_debug_disconnect(device.board)
sleep(1.0)
device.disconnect()
sleep(1.0)
def reset_devices(MAC_devices):
for MAC in MAC_devices:
reset_dev(MAC)
def main():
reset_devices(MAC_devices)
start(MAC_devices)
print(' Press the ` key to stop.')
keyboard.wait('`')
stop(MAC_devices)
if __name__ == '__main__':
main()
Comments
Also, is there a way to download the data through USB? Downloading the data from the 3 sensors takes about 4.5 hours via bluetooth.
@AndooBundoo The devices are capable of log download while actively logging. However, streaming high bandwidth (e.g. acc/gyo) while downloading will most likely result in data loss.
When you start a download, the present length of the log is "snapshot" for the entry download count. It means you will likely need to restart the download each time it finishes, until the raw data source is turned off.
There is a lot of code to understand in your snippet, but part of your issue may be the
Event
s used for the download process. They would cause each download to block until finished, but you should be able to start all three downloads and wait for them all at the end. The data coming from bluetooth is asynchronous and should not need multithreading if implemented that way. Alternatively, you might not block with wait statements at all, and periodically check theentries_left
being reported for each download and sleep in between instead.If you are running the latest python SDK on PyPi and latest MMS firmware, you only need to plug your devices in to the USB before initiating a connection and USB will be used for the communication link automatically. To switch from bluetooth to usb a device will need to be disconnected and reconnected to.
@Matt Thanks a lot for your reply, I implemented stuff as per your advice and it improved the download times. Just dropping the code here for people that might have the same issue in the future.
My code for downloading looks like this:
Thank you @AndooBundoo, very helpful!