Downloading (or streaming) while logging

Hello,

I have 3 MMS sensors that I am using to track accelerometer and gyro data. I set up the following code to get the devices to log data, and after logging, download the data (see below). However, I was wondering if there is any way to get the devices to download while logging the data (or stream). I tried using multiprocessing, but that seems to give a lot of errors (at least the way I do it). Would you have any suggestions on how I could achieve this?

Thanks!

from __future__ import print_function
from mbientlab.metawear import MetaWear, libmetawear, parse_value, create_voidp
from mbientlab.metawear.cbindings import AccBmi270Odr, AccBoschRange, GyroBoschRange, \
GyroBoschOdr, FnVoid_VoidP_DataP, FnVoid_VoidP_UInt_UInt, LogDownloadHandler, cast, byref, \
FnVoid_VoidP_UByte_Long_UByteP_UByte, SensorFusionData
from mbientlab.warble import BleScanner
from time import sleep
from threading import Event
import keyboard
from datetime import datetime

# Get time and date
now = datetime.now()
stamp = f'{now.year}{now.month}{now.day}_{now.hour}-{now.minute}-{now.second}'

MAC_devices = ['XX:XX:XX:XX:XX:XX', 'YY:YY:YY:YY:YY:YY', 'ZZ:ZZ:ZZ:ZZ:ZZ:ZZ']

input_arr = list(enumerate(MAC_devices))

# Open file lists
acc_log_list = [None]*len(MAC_devices)
acc_stream_list = [None]*len(MAC_devices)
gyro_log_list = [None]*len(MAC_devices)
gyro_stream_list = [None]*len(MAC_devices)

# Logger list
device_loggers = [0]*len(MAC_devices)

# State list
device_states = [None]*len(MAC_devices)

# Device list
device_instances = [None]*len(MAC_devices)

# Prepare the log lists
for i, device in enumerate(MAC_devices):
    acc_log_list[i] = open(f'logs/acc_log_{i+1}_{stamp}.txt', 'w')
    gyro_log_list[i] = open(f'logs/gyro_log_{i+1}_{stamp}.txt', 'w')

class State:
    def __init__(self, device, index):
        self.device = device
        self.samples = 0
        self.index = index

def setup_device(state):

    libmetawear.mbl_mw_acc_bmi270_set_odr(state.device.board, AccBmi270Odr._400Hz)
    libmetawear.mbl_mw_acc_bosch_set_range(state.device.board, AccBoschRange._4G)
    libmetawear.mbl_mw_acc_write_acceleration_config(state.device.board)


    libmetawear.mbl_mw_gyro_bmi270_set_range(state.device.board, GyroBoschRange._500dps);
    libmetawear.mbl_mw_gyro_bmi270_set_odr(state.device.board, GyroBoschOdr._400Hz);
    libmetawear.mbl_mw_gyro_bmi270_write_config(state.device.board);


    acc = libmetawear.mbl_mw_acc_get_acceleration_data_signal(state.device.board)

    acc_logger = create_voidp(lambda fn: libmetawear.mbl_mw_datasignal_log(acc, None, fn), resource = "acc_logger")


    gyro = libmetawear.mbl_mw_gyro_bmi270_get_rotation_data_signal(state.device.board)

    gyro_logger = create_voidp(lambda fn: libmetawear.mbl_mw_datasignal_log(gyro, None, fn), resource = "gyro_logger")
    return (acc_logger, gyro_logger)

def start_device(state):
    libmetawear.mbl_mw_logging_start(state.device.board, 0)

    libmetawear.mbl_mw_acc_enable_acceleration_sampling(state.device.board)
    libmetawear.mbl_mw_acc_start(state.device.board)

    libmetawear.mbl_mw_gyro_bmi270_enable_rotation_sampling(state.device.board)
    libmetawear.mbl_mw_gyro_bmi270_start(state.device.board)
    return

def stop_device(state):
    libmetawear.mbl_mw_logging_stop(state.device.board)

    libmetawear.mbl_mw_acc_stop(state.device.board)
    libmetawear.mbl_mw_acc_disable_acceleration_sampling(state.device.board)
    libmetawear.mbl_mw_gyro_bmi270_stop(state.device.board)
    libmetawear.mbl_mw_gyro_bmi270_disable_rotation_sampling(state.device.board)

    acc = libmetawear.mbl_mw_acc_get_acceleration_data_signal(state.device.board)
    libmetawear.mbl_mw_datasignal_unsubscribe(acc)
    gyro = libmetawear.mbl_mw_gyro_bmi270_get_rotation_data_signal(state.device.board)
    libmetawear.mbl_mw_datasignal_unsubscribe(gyro)

    libmetawear.mbl_mw_logging_flush_page(state.device.board)
    return

def download_data(state, loggers, index):
    e = Event()

    device = state.device

    def progress_update_handler(context, entries_left, total_entries):
        if (entries_left == 0):
            e.set()

    def data_logger_acc(p):
        acc_log_list[index].write(f'{p.contents.epoch}, {parse_value(p)}\n')

    def data_logger_gyro(p):
        gyro_log_list[index].write(f'{p.contents.epoch}, {parse_value(p)}\n')

    callback_acc = FnVoid_VoidP_DataP(lambda ctx, p: data_logger_acc(p))
    callback_gyro = FnVoid_VoidP_DataP(lambda ctx, p: data_logger_gyro(p))

    fn_wrapper = FnVoid_VoidP_UInt_UInt(progress_update_handler)
    download_handler = LogDownloadHandler(context = None,
                    received_progress_update = fn_wrapper,
                    received_unknown_entry = cast(None, FnVoid_VoidP_UByte_Long_UByteP_UByte),
                    received_unhandled_entry = cast(None, FnVoid_VoidP_DataP))

    libmetawear.mbl_mw_logger_subscribe(loggers[0], None, callback_acc)
    libmetawear.mbl_mw_logger_subscribe(loggers[1], None, callback_gyro)
    libmetawear.mbl_mw_logging_download(device.board, 0,
                                        byref(download_handler))
    e.wait()

    e.clear()
    return True

def start(MAC_devices):
    for index, MAC in enumerate(MAC_devices):
        device = MetaWear(MAC)
        device.connect()
        libmetawear.mbl_mw_settings_set_connection_parameters(device.board, 7.5, 7.5, 0, 6000)
        sleep(1.0)

        state = State(device, index)

        loggers = setup_device(state)

        start_device(state)
        device_loggers[index] = loggers

        device_states[index] = state

        device_instances[index] = device
    return

def stop(MAC_devices):
    for index, state in enumerate(device_states):
        stop_device(state)

    for index, state in enumerate(device_states):
        download_data(state, device_loggers[index], index)

        acc_log_list[index].close()
        gyro_log_list[index].close()

        libmetawear.mbl_mw_debug_disconnect(state.device.board)

    return

def reset_dev(MAC):
    device = MetaWear(MAC)
    device.connect()

    libmetawear.mbl_mw_logging_stop(device.board)
    sleep(1.0)

    libmetawear.mbl_mw_logging_flush_page(device.board)
    sleep(1.0)

    libmetawear.mbl_mw_logging_clear_entries(device.board)
    sleep(1.0)

    libmetawear.mbl_mw_event_remove_all(device.board)
    sleep(1.0)

    libmetawear.mbl_mw_macro_erase_all(device.board)
    sleep(1.0)

    libmetawear.mbl_mw_debug_reset_after_gc(device.board)
    sleep(1.0)

    libmetawear.mbl_mw_debug_disconnect(device.board)
    sleep(1.0)

    device.disconnect()
    sleep(1.0)

def reset_devices(MAC_devices):
    for MAC in MAC_devices:
        reset_dev(MAC)

def main():
    reset_devices(MAC_devices)
    start(MAC_devices)
    print(' Press the ` key to stop.')
    keyboard.wait('`')
    stop(MAC_devices)

if __name__ == '__main__':
    main()

Comments

  • Also, is there a way to download the data through USB? Downloading the data from the 3 sensors takes about 4.5 hours via bluetooth.

  • @AndooBundoo The devices are capable of log download while actively logging. However, streaming high bandwidth (e.g. acc/gyo) while downloading will most likely result in data loss.

    When you start a download, the present length of the log is "snapshot" for the entry download count. It means you will likely need to restart the download each time it finishes, until the raw data source is turned off.

    There is a lot of code to understand in your snippet, but part of your issue may be the Events used for the download process. They would cause each download to block until finished, but you should be able to start all three downloads and wait for them all at the end. The data coming from bluetooth is asynchronous and should not need multithreading if implemented that way. Alternatively, you might not block with wait statements at all, and periodically check the entries_left being reported for each download and sleep in between instead.

    If you are running the latest python SDK on PyPi and latest MMS firmware, you only need to plug your devices in to the USB before initiating a connection and USB will be used for the communication link automatically. To switch from bluetooth to usb a device will need to be disconnected and reconnected to.

  • @Matt Thanks a lot for your reply, I implemented stuff as per your advice and it improved the download times. Just dropping the code here for people that might have the same issue in the future.

    My code for downloading looks like this:

    class DataDownloader:
        def __init__(self, state, loggers, index):
            self.entries_left = 999999999 #Placeholder
            self.state = state
            self.device = state.device
            self.index = index
            self.loggers = loggers
    
            self.callback_acc = FnVoid_VoidP_DataP(lambda ctx, p: self.data_logger_acc(p))
            self.callback_gyro = FnVoid_VoidP_DataP(lambda ctx, p: self.data_logger_gyro(p))
    
            self.fn_wrapper = FnVoid_VoidP_UInt_UInt(self.progress_update_handler)
            self.download_handler = LogDownloadHandler(context = None,
                            received_progress_update = self.fn_wrapper,
                            received_unknown_entry = cast(None, FnVoid_VoidP_UByte_Long_UByteP_UByte),
                            received_unhandled_entry = cast(None, FnVoid_VoidP_DataP))
    
            #time_start = time.perf_counter()
            libmetawear.mbl_mw_logger_subscribe(self.loggers[0], None, self.callback_acc)
            libmetawear.mbl_mw_logger_subscribe(self.loggers[1], None, self.callback_gyro)
            libmetawear.mbl_mw_logging_download(self.device.board, 0,
                                                byref(self.download_handler))
    
        def progress_update_handler(self, context, entries_left, total_entries):
            self.entries_left = entries_left
    
        def data_logger_acc(self, p):
            acc_log_list[self.index].write(f'{p.contents.epoch}, {parse_value(p)}\n')
    
        def data_logger_gyro(self, p):
            gyro_log_list[self.index].write(f'{p.contents.epoch}, {parse_value(p)}\n')
    
    def stop(MAC_devices):
        # Connect back to devices
        for index, device in enumerate(device_instances):
            device.connect()
            print(f'Connected to {device.address}, device number {index+1}.')
    
        # Stops the whole thing, downloads the data
        for index, state in enumerate(device_states):
            # Stop logging for this device
            print(f'Stopping device number {index+1}.')
            stop_device(state)
    
        # Start downloading the data
        for index, state in enumerate(device_states):
            # Download data
            print(f'Downloading data for device number {index+1}.')
            data_downloaders[index] = DataDownloader(state, device_loggers[index], index)
    
        downloading = True
        time0 = time.time()
        while downloading:
            # First, get the remaining entries left
            entries_left_list = [x.entries_left for x in data_downloaders]
    
            # Print it cuz why not
            device_status = ['Ready' if x == 0 else 'Downloading' for x in entries_left_list ]
            print('--------------------------------------------')
            print(f'Status of devices: {device_status}')
            print('Time elapsed: {int(time.time()-time0)} seconds.')
    
            # Check if all zero
            if all([x==0 for x in entries_left_list]):
                # We're done
                print('Download complete!')
                downloading = False
    
            # Sleep a bit
            if downloading:
                sleep(10)
    
        for index, state in enumerate(device_states):    
            acc_log_list[index].close()
            gyro_log_list[index].close()
            print(f'Logs saved, disconnecting device number {index+1}.')
            libmetawear.mbl_mw_debug_disconnect(state.device.board)
    
        return
    
  • Thank you @AndooBundoo, very helpful!

Sign In or Register to comment.