from __future__ import print_function from mbientlab.metawear import MetaWear, libmetawear, parse_value from mbientlab.metawear.cbindings import * from mbientlab.metawear import MetaWear from mbientlab.metawear import * from mbientlab.warble import * from subprocess import call import time,csv import sys,platform,six,os.path,threading,subprocess from os import path from time import sleep subprocess.run("sudo hciconfig hci1 reset",shell=True) ID2 = MetaWear("xx:xx:xx:xx:xx:xx") #_______________________________________________________________________ class s2: def __init__(self,device,csvfilename): self.device = device self.board = self.device.board self.filename = csvfilename self.sensordatastr = "" print ("Connected") self.checkLogFiles(self.filename) self.euler_signal = None self.device.connect() #_______________________________________________________________________ def run(self): try: self.euler_signal = libmetawear.mbl_mw_sensor_fusion_get_data_signal(self.board, SensorFusionData.EULER_ANGLE) self.euler_callback = FnVoid_VoidP_DataP(self.data_handler) libmetawear.mbl_mw_datasignal_subscribe(self.euler_signal, None, self.euler_callback) libmetawear.mbl_mw_sensor_fusion_enable_data(self.board, SensorFusionData.EULER_ANGLE) libmetawear.mbl_mw_sensor_fusion_set_mode(self.board, SensorFusionMode.NDOF) libmetawear.mbl_mw_sensor_fusion_write_config(self.board) libmetawear.mbl_mw_sensor_fusion_start(self.board) while True: pass except Exception as e: print ("Some error", e) self.close() sys.exit() #_______________________________________________________________________ def data_handler(self,content,data): self.sensordatastr ="" EulerAngels = parse_value(data) #print (EulerAngels) pi = pointer(EulerAngels) self.sensordatastr = str(data.contents.epoch)+","+ str(("%.2f" %pi.contents.heading)) +","+ str(("%.2f" %pi.contents.pitch))+","+ str(("%.2f" %pi.contents.roll))+","+str(("%.2f" %pi.contents.yaw)) print (self.filename,"=",self.sensordatastr) self.logData(self.filename,self.sensordatastr) sleep(0.01) #_______________________________________________________________________ def close(self): libmetawear.mbl_mw_sensor_fusion_stop(self.board) libmetawear.mbl_mw_sensor_fusion_clear_enabled_mask(self.board) libmetawear.mbl_mw_datasignal_unsubscribe(self.euler_signal) self.device.disconnect() print ('disconnected') sleep(2) #_______________________________________________________________________ def checkLogFiles(self,filename): res = path.exists(filename) if res == False: print("no file found, creating file") f = open(filename,"w+") f.close() print ("file created") with open(filename, mode='w+') as csv_file: fieldnames = ['EPOCH', 'PITCH', 'ROLL','YAW'] writer = csv.DictWriter(csv_file, fieldnames=fieldnames, delimiter = ',') writer.writeheader() else: print ("File found in the path") #_______________________________________________________________________ def logData(self,filename,data): f = open(filename,"a+") f.write(data) f.write("\n") f.close() #_______________________________________________________________________ #os.remove("log2.csv") try: sensor2 = s2(ID2,"log2.csv") print ("got here") except Exception as e: print("Connection errors") sensor2 = s2(ID2,"log2.csv") print ("FINALLY GOT HERE") #_______________________________________________________________________ while True: try: sensor2.run() except KeyboardInterrupt as e: sensor2.close() sys.exit() #_______________________________________________________________________