Python scripts publishing MQTT sensor data

I have MetaC running with Python script streaming analog data from GPIO

I would like to extend to have MQTT message published when signal eclipses a set threshold.

Just curious if anyone has done before, sample code or tips for implementing.



  • On our side we've used a bit of the internal python tools but we don't have any sample code or tips:

  • edited May 18

    Nothing too elegant, but working

    # usage: python [mac1] [mac2] ... [mac(n)]
    from __future__ import print_function
    from mbientlab.metawear import MetaWear, libmetawear, parse_value,create_voidp_int,create_voidp
    from mbientlab.metawear.cbindings import *
    from time import sleep
    from threading import Event
    import platform
    import sys
    import paho.mqtt.publish as publish
    MQTT_SERVER = "xx.xx.xx.xx" #IP or Name of MQTT Broker
    MQTT_PATH = "test_channel"
    if sys.version_info[0] == 2:
        range = xrange
    class State:
        def __init__(self, device):
            self.device = device
            self.samples = 0
            self.callback = FnVoid_VoidP_DataP(self.data_handler)
    def data_handler(self, ctx, data):
            # Only publish message if signal eclipses set value
            if parse_value(data) > 5000: 
                print("%s -> %s" % (self.device.address, parse_value(data)))
                publish.single(MQTT_PATH, parse_value(data), hostname=MQTT_SERVER)
                self.samples+= 1
                print("%s -> %s" % (self.device.address, parse_value(data)))
                self.samples+= 1
    e = Event()
    states = []
    for i in range(len(sys.argv) - 1):
            d = MetaWear(sys.argv[i + 1])
            print("Connected to " + d.address)
    for s in states:
            print("Configuring device")
            libmetawear.mbl_mw_settings_set_connection_parameters(s.device.board, 7.5, 7.5, 0, 6000)
    # Get Signal
    adc_signal = libmetawear.mbl_mw_gpio_get_analog_input_data_signal(s.device.board,0,GpioAnalogReadMode.ADC)
    # Subscribe
    libmetawear.mbl_mw_datasignal_subscribe(adc_signal, None, s.callback) 
    # Create Timer
    timer = create_voidp(lambda fn: libmetawear.mbl_mw_timer_create_indefinite(s.device.board, 50, 0, None, fn), resource = "timer", event = e)
    # Start recording commands:
    # The first command is to read the signal - temp data - based on the timer - every 1s
    # Stop recording commands
    create_voidp_int(lambda fn: libmetawear.mbl_mw_event_end_record(timer, None, fn), event = e)
    # Start the timer
  • edited May 18


Sign In or Register to comment.