Fill This Form To Receive Instant Help

Help in Homework
trustpilot ratings
google ratings


Homework answers / question archive / Assignment 3: Multiprocessing Practice A MQTT data source, data sink, and data transformer in one package to practice multiprocessing and multitasking

Assignment 3: Multiprocessing Practice A MQTT data source, data sink, and data transformer in one package to practice multiprocessing and multitasking

Computer Science

Assignment 3: Multiprocessing Practice

A MQTT data source, data sink, and data transformer in one package to practice multiprocessing and multitasking.  Usually your data source will be a bunch of sensor nodes instead, but an edge server usually will need to perform both data sink/logging/storage and data transformation to save on bandwidth.  You can imagine a more powerful edge server is used, such as the Coral System-on-Module, which sports a quad-core NXP i.MX 8M system-on-a-chip as its main microprocessor and a Coral neural network co-processor, using around 5-10 W of power (still decently low).

 

**In this assignment, you will make two versions of this fancy MQTT servicer program:**

 

1. A cooperatively multitasked version using `trio` ([see docs](https://trio.readthedocs.io/en/stable/)) and `distmqtt` ([see examples](https://github.com/M-o-a-T/distmqtt/tree/main/samples)) (install these two packages using mambaforge prompt: `mamba install trio`, `pip install distmqtt`).

2. A multiprocessed version using `concurrent.futures` ([see docs](https://docs.python.org/3/library/concurrent.futures.html)), `multiprocessing.Manager` ([see docs](https://docs.python.org/3/library/multiprocessing.html#managers)), and `paho` ([see examples](https://github.com/eclipse/paho.mqtt.python/tree/master/examples)).

 

The full `paho` documentation is available [on GitHub](https://github.com/eclipse/paho.mqtt.python).

 

You do not need to use a finite state machine for this assignment, since the 

  functions themselves are quite simple (just computationally taxing.  You can

  imagine there are more complicated scenarios where you might have to use a

  finite state machine (or multiple of them) to handle cases where you might

  be handling requests via MQTT in addition to storing/processing incoming

  data simultaneously (as in the case of an edge server).

 

Here are the three tasks that comprise this "super edge server":

1. MQTT Publishing Task:

    - Every 5 ms, we will publish a random fixed-point number (two decimal places) picked from the range 10.0 to 30.0 (simulating temperature in degC) to the topic "sensors/0/temperature/raw".

    - Every 3 ms, we will publish a random integer picked from the range -180 to 180 deg (simulating angle on some axis) to the topic "sensors/0/angle/raw".

    - Every 10 ms, we will publish the current value of some counter (this can literally just be a number incremented every 10 ms) to the topic "sensors/0/counter/raw".

2. MQTT Ingestion Task:

    - Subscribes to all sensor information topics

    - Each message is added to a `multiprocessing.Manager` queue corresponding to the sensor type.

3. MQTT Transform and Publish Task:

    - Gets 10 samples at a time from the corresponding `multiprocessing.Manager` queue.

    - Produces a mean of those 10 samples and publishes it to the corresponding topic, subtopic'd to the "mean".  For example, "sensors/mean/counter" would provide means of every 10 samples of "sensors/mean/counter".  The messages here are of the format: "sensor N: xxxxxx".

 

Each task has been written as a method functions in a class in a single-threaded version using paho inside `server_singlethreaded.py`.  

 

**Your job is to convert this single-threaded server into the two concurrently-executed versions described above.**

 

The cooperatively multitasked version should be named `server_multitasked.py`.

 

The multiprocessed version should be named `server_multiprocessed.py`.

 

Hint: for the multiprocessed version, it might be simpler to just have each task as its own class.

 

Make sure to run mosquitto so that there is a local MQTT broker running.


 

## Hints

 

1. In the current configuration, you can see that the efficiency of the 

   program depends highly on the publisher, ingester, and transformpublisher tasks executing as quickly (in some order) as possible.  They are all coupled to the same `mqtt.Client` object,

   since this program runs single-threaded.  One of the ways to allow

   for these tasks to run concurrently is to separate out the MQTT clients when you work on the multitasked or multiprocessed versions.

 

2. For the multiprocessed version, leave the Manager() and its queues in.  It will simplify the

need for locks and synchronization primitives (which we did not fully get to discuss yet).

 

3. For the cooperatively-multitasked version, it is probably simpler than you think using `trio` and `distmqtt`, since the await calls are required on any MQTT publish/receive function call. 

pur-new-sol

Purchase A New Answer

Custom new solution created by our subject matter experts

GET A QUOTE