Skip to content

Session 3: Producer–Consumer Design with Queues

Synopsis

Covers queues for decoupling task timing, buffering sensor data, smoothing bursts of activity, and structuring clean async data pipelines.

Session Content

Session 3: Producer–Consumer Design with Queues

Session Overview

Duration: ~45 minutes
Topic: Producer–Consumer Design with Queues
Platform: Raspberry Pi Pico 2 W
Language: MicroPython
IDE: Thonny

In this session, you will learn how to structure asynchronous programs using the producer–consumer pattern with uasyncio.Queue. This is a key design approach for building responsive embedded applications where sensors, buttons, network tasks, and actuators must work together without blocking each other.


Learning Objectives

By the end of this session, learners will be able to:

  • Explain the producer–consumer pattern in embedded systems
  • Use uasyncio.Queue to pass data between tasks
  • Build a non-blocking data pipeline from sensor input to processing/output
  • Combine periodic producers with event-driven consumers
  • Apply queue-based design to real-world Pico 2 W IoT projects

Prerequisites

  • Basic Python knowledge
  • Completed a MicroPython setup for Raspberry Pi Pico 2 W
  • Familiarity with:
  • machine.Pin
  • time.sleep()
  • Basic uasyncio concepts from previous session

Required Hardware

  • Raspberry Pi Pico 2 W
  • USB cable
  • Breadboard
  • 1 LED
  • 1 resistor (220Ω or 330Ω)
  • 1 pushbutton
  • Optional:
  • DHT11 or DHT22 sensor
  • Active buzzer
  • Jumper wires

Development Environment Setup

Use Thonny IDE with MicroPython firmware on the Pico 2 W.

Thonny Setup

  1. Install Thonny from: https://thonny.org
  2. Connect Pico 2 W to your computer via USB
  3. Open Thonny
  4. Go to Tools → Options → Interpreter
  5. Select:
  6. MicroPython (Raspberry Pi Pico)
  7. Choose the correct serial port
  8. Click OK
  9. Confirm the REPL shows the MicroPython prompt: ```python

    ```

Uploading Code

  • Save files directly to the Pico as main.py for auto-run
  • Use Ctrl+S to save
  • Use Run to execute scripts interactively

Theory: Producer–Consumer Pattern

What Is It?

The producer–consumer model splits a program into two roles:

  • Producer: generates data or events
  • Consumer: receives and processes that data

A queue acts as the buffer between them.

Why Use It?

In embedded systems: - Sensors produce data at fixed intervals - Buttons produce events unpredictably - Actuators consume commands - Network tasks may send or receive data asynchronously

Using a queue helps: - Decouple tasks - Prevent blocking - Manage bursts of data - Keep code modular and easier to debug

Queue Behavior

A queue is typically FIFO: - First In - First Out

Example: - Sensor readings are produced every second - Consumer processes them in order - No data is lost unless the queue becomes full


Key MicroPython Concepts

uasyncio.Queue

A queue is used for communication between coroutines.

Common methods: - await queue.put(item) — add an item - await queue.get() — remove an item - queue.qsize() — number of items waiting - queue.empty() — check if queue is empty

Important Design Notes

  • Use small, meaningful messages
  • Keep queue items lightweight
  • Avoid blocking operations inside coroutines
  • Decide what happens when the queue is full

Hands-On Exercise 1: Basic Queue with Producer and Consumer

Goal

Create one task that produces numbers and another that consumes them.

Code: main.py

import uasyncio as asyncio


async def producer(queue):
    """Produce numbers and place them into the queue."""
    count = 0
    while True:
        count += 1
        print("Producer generated:", count)
        await queue.put(count)
        print("Queue size after put:", queue.qsize())
        await asyncio.sleep(1)


async def consumer(queue):
    """Consume numbers from the queue and process them."""
    while True:
        item = await queue.get()
        print("Consumer received:", item)
        print("Queue size after get:", queue.qsize())

        # Simulate processing time
        await asyncio.sleep(2)


async def main():
    queue = asyncio.Queue(5)

    asyncio.create_task(producer(queue))
    asyncio.create_task(consumer(queue))

    while True:
        await asyncio.sleep(10)


try:
    asyncio.run(main())
finally:
    asyncio.new_event_loop()

Expected Output

Producer generated: 1
Queue size after put: 1
Consumer received: 1
Queue size after get: 0
Producer generated: 2
Queue size after put: 1
Producer generated: 3
Queue size after put: 2
Consumer received: 2
Queue size after get: 1

Discussion

  • The producer creates data every 1 second
  • The consumer takes 2 seconds to process each item
  • The queue buffers the difference in speed
  • If the queue fills up, await queue.put() will pause until space is available

Hands-On Exercise 2: Button Events to LED Actions

Goal

Use a button as the producer and an LED as the consumer.

Wiring

  • LED anode → GP15 through 220Ω resistor
  • LED cathode → GND
  • Button one side → GP14
  • Button other side → GND

This example uses the Pico’s internal pull-up resistor.

Code: main.py

import uasyncio as asyncio
from machine import Pin


button = Pin(14, Pin.IN, Pin.PULL_UP)
led = Pin(15, Pin.OUT)


async def button_producer(queue):
    """Detect button presses and send events to the queue."""
    last_state = 1

    while True:
        current_state = button.value()

        # Detect falling edge: button press
        if last_state == 1 and current_state == 0:
            print("Button pressed")
            await queue.put("PRESS")

        last_state = current_state
        await asyncio.sleep_ms(50)


async def led_consumer(queue):
    """Wait for events and blink the LED."""
    while True:
        event = await queue.get()

        if event == "PRESS":
            print("LED on")
            led.value(1)
            await asyncio.sleep(0.3)
            led.value(0)
            print("LED off")


async def main():
    queue = asyncio.Queue(10)

    asyncio.create_task(button_producer(queue))
    asyncio.create_task(led_consumer(queue))

    while True:
        await asyncio.sleep(1)


try:
    asyncio.run(main())
finally:
    asyncio.new_event_loop()

Expected Output

Button pressed
LED on
LED off
Button pressed
LED on
LED off

Discussion

  • The button task only detects input events
  • The LED task only handles output actions
  • This separation makes the code easier to extend later

Hands-On Exercise 3: Sensor-to-Display Pipeline

Goal

Simulate a sensor producer and a processing consumer.

Scenario

A temperature sensor task produces readings, and a consumer formats and prints them.

If you have a DHT11 or DHT22 sensor, you can replace the simulated values later.

Code: main.py

import uasyncio as asyncio
import random


async def sensor_producer(queue):
    """Simulate periodic temperature readings."""
    while True:
        temperature = 20 + random.randint(0, 10)
        print("Sensor reading:", temperature)
        await queue.put(temperature)
        await asyncio.sleep(1)


async def data_consumer(queue):
    """Consume sensor readings and format output."""
    while True:
        temp = await queue.get()

        if temp >= 28:
            status = "HOT"
        elif temp >= 24:
            status = "WARM"
        else:
            status = "OK"

        print("Processed:", temp, "C ->", status)
        await asyncio.sleep(0.5)


async def main():
    queue = asyncio.Queue(3)

    asyncio.create_task(sensor_producer(queue))
    asyncio.create_task(data_consumer(queue))

    while True:
        await asyncio.sleep(1)


try:
    asyncio.run(main())
finally:
    asyncio.new_event_loop()

Example Output

Sensor reading: 21
Processed: 21 C -> OK
Sensor reading: 29
Processed: 29 C -> HOT
Sensor reading: 24
Processed: 24 C -> WARM

Hands-On Exercise 4: Queue with Network-Style Message Flow

Goal

Model an IoT application where one task prepares messages and another task “sends” them.

This simulates a telemetry pipeline before adding real networking.

Code: main.py

import uasyncio as asyncio


async def telemetry_producer(queue):
    """Generate telemetry messages."""
    message_id = 0

    while True:
        message_id += 1
        message = {
            "id": message_id,
            "device": "pico2w",
            "temperature": 25 + (message_id % 5)
        }
        print("Produced telemetry:", message)
        await queue.put(message)
        await asyncio.sleep(2)


async def telemetry_consumer(queue):
    """Simulate sending telemetry to the cloud."""
    while True:
        message = await queue.get()

        # Simulated network delay
        print("Sending telemetry:", message)
        await asyncio.sleep(1.5)
        print("Telemetry sent:", message["id"])


async def main():
    queue = asyncio.Queue(5)

    asyncio.create_task(telemetry_producer(queue))
    asyncio.create_task(telemetry_consumer(queue))

    while True:
        await asyncio.sleep(1)


try:
    asyncio.run(main())
finally:
    asyncio.new_event_loop()

Example Output

Produced telemetry: {'id': 1, 'device': 'pico2w', 'temperature': 26}
Sending telemetry: {'id': 1, 'device': 'pico2w', 'temperature': 26}
Telemetry sent: 1
Produced telemetry: {'id': 2, 'device': 'pico2w', 'temperature': 27}
Sending telemetry: {'id': 2, 'device': 'pico2w', 'temperature': 27}
Telemetry sent: 2

Best Practices for Queue-Based Async Design

1. Keep Messages Small

Use: - integers - strings - short dictionaries

Avoid large blobs unless necessary.

2. Choose the Right Queue Size

  • Small queue: low memory use, more backpressure
  • Large queue: more buffering, more RAM usage

3. Separate Responsibilities

  • Producer: gather data
  • Consumer: process data
  • Another task: send to network
  • Another task: update display or actuators

4. Handle Full Queues Carefully

If a queue is full: - the producer waits - or you can choose to drop older data in custom logic

5. Use Clear Event Names

Examples: - "PRESS" - "TEMP_HIGH" - "SEND_NOW"


Common Mistakes

  • Using time.sleep() inside async code
  • Mixing sensor reading and actuator control in one coroutine
  • Making queue messages too complex
  • Forgetting to handle queue growth
  • Polling too aggressively with no delay

Mini Challenge

Build a system with:

  • one producer reading a button
  • one consumer controlling an LED
  • a second consumer logging presses to the REPL

Suggested Behavior

  • Button press generates "PRESS"
  • LED blinks once
  • Logger prints timestamped event information

Review Questions

  1. What problem does a queue solve in asynchronous programs?
  2. Why is producer–consumer a good fit for embedded systems?
  3. What happens when await queue.put() is called on a full queue?
  4. Why should producers and consumers be separated into different coroutines?
  5. When would you choose to drop data instead of buffering it?

Session Summary

In this session, you learned how to use the producer–consumer pattern with queues in MicroPython. You built coroutine-based systems where tasks communicate through uasyncio.Queue, enabling non-blocking designs that are well suited to sensors, buttons, LEDs, and IoT telemetry on the Raspberry Pi Pico 2 W.


Next Session Preview

Session 4: Event-Driven Programming with Interrupts and Async Coordination


Back to Chapter | Back to Master Plan | Previous Session | Next Session