Session 3: Producer–Consumer Design with Queues
Synopsis
Covers queues for decoupling task timing, buffering sensor data, smoothing bursts of activity, and structuring clean async data pipelines.
Session Content
Session 3: Producer–Consumer Design with Queues
Session Overview
Duration: ~45 minutes
Topic: Producer–Consumer Design with Queues
Platform: Raspberry Pi Pico 2 W
Language: MicroPython
IDE: Thonny
In this session, you will learn how to structure asynchronous programs using the producer–consumer pattern with uasyncio.Queue. This is a key design approach for building responsive embedded applications where sensors, buttons, network tasks, and actuators must work together without blocking each other.
Learning Objectives
By the end of this session, learners will be able to:
- Explain the producer–consumer pattern in embedded systems
- Use
uasyncio.Queueto pass data between tasks - Build a non-blocking data pipeline from sensor input to processing/output
- Combine periodic producers with event-driven consumers
- Apply queue-based design to real-world Pico 2 W IoT projects
Prerequisites
- Basic Python knowledge
- Completed a MicroPython setup for Raspberry Pi Pico 2 W
- Familiarity with:
machine.Pintime.sleep()- Basic
uasyncioconcepts from previous session
Required Hardware
- Raspberry Pi Pico 2 W
- USB cable
- Breadboard
- 1 LED
- 1 resistor (220Ω or 330Ω)
- 1 pushbutton
- Optional:
- DHT11 or DHT22 sensor
- Active buzzer
- Jumper wires
Development Environment Setup
Use Thonny IDE with MicroPython firmware on the Pico 2 W.
Thonny Setup
- Install Thonny from: https://thonny.org
- Connect Pico 2 W to your computer via USB
- Open Thonny
- Go to Tools → Options → Interpreter
- Select:
- MicroPython (Raspberry Pi Pico)
- Choose the correct serial port
- Click OK
- Confirm the REPL shows the MicroPython prompt:
```python
```
Uploading Code
- Save files directly to the Pico as
main.pyfor auto-run - Use
Ctrl+Sto save - Use
Runto execute scripts interactively
Theory: Producer–Consumer Pattern
What Is It?
The producer–consumer model splits a program into two roles:
- Producer: generates data or events
- Consumer: receives and processes that data
A queue acts as the buffer between them.
Why Use It?
In embedded systems: - Sensors produce data at fixed intervals - Buttons produce events unpredictably - Actuators consume commands - Network tasks may send or receive data asynchronously
Using a queue helps: - Decouple tasks - Prevent blocking - Manage bursts of data - Keep code modular and easier to debug
Queue Behavior
A queue is typically FIFO: - First In - First Out
Example: - Sensor readings are produced every second - Consumer processes them in order - No data is lost unless the queue becomes full
Key MicroPython Concepts
uasyncio.Queue
A queue is used for communication between coroutines.
Common methods:
- await queue.put(item) — add an item
- await queue.get() — remove an item
- queue.qsize() — number of items waiting
- queue.empty() — check if queue is empty
Important Design Notes
- Use small, meaningful messages
- Keep queue items lightweight
- Avoid blocking operations inside coroutines
- Decide what happens when the queue is full
Hands-On Exercise 1: Basic Queue with Producer and Consumer
Goal
Create one task that produces numbers and another that consumes them.
Code: main.py
import uasyncio as asyncio
async def producer(queue):
"""Produce numbers and place them into the queue."""
count = 0
while True:
count += 1
print("Producer generated:", count)
await queue.put(count)
print("Queue size after put:", queue.qsize())
await asyncio.sleep(1)
async def consumer(queue):
"""Consume numbers from the queue and process them."""
while True:
item = await queue.get()
print("Consumer received:", item)
print("Queue size after get:", queue.qsize())
# Simulate processing time
await asyncio.sleep(2)
async def main():
queue = asyncio.Queue(5)
asyncio.create_task(producer(queue))
asyncio.create_task(consumer(queue))
while True:
await asyncio.sleep(10)
try:
asyncio.run(main())
finally:
asyncio.new_event_loop()
Expected Output
Producer generated: 1
Queue size after put: 1
Consumer received: 1
Queue size after get: 0
Producer generated: 2
Queue size after put: 1
Producer generated: 3
Queue size after put: 2
Consumer received: 2
Queue size after get: 1
Discussion
- The producer creates data every 1 second
- The consumer takes 2 seconds to process each item
- The queue buffers the difference in speed
- If the queue fills up,
await queue.put()will pause until space is available
Hands-On Exercise 2: Button Events to LED Actions
Goal
Use a button as the producer and an LED as the consumer.
Wiring
- LED anode → GP15 through 220Ω resistor
- LED cathode → GND
- Button one side → GP14
- Button other side → GND
This example uses the Pico’s internal pull-up resistor.
Code: main.py
import uasyncio as asyncio
from machine import Pin
button = Pin(14, Pin.IN, Pin.PULL_UP)
led = Pin(15, Pin.OUT)
async def button_producer(queue):
"""Detect button presses and send events to the queue."""
last_state = 1
while True:
current_state = button.value()
# Detect falling edge: button press
if last_state == 1 and current_state == 0:
print("Button pressed")
await queue.put("PRESS")
last_state = current_state
await asyncio.sleep_ms(50)
async def led_consumer(queue):
"""Wait for events and blink the LED."""
while True:
event = await queue.get()
if event == "PRESS":
print("LED on")
led.value(1)
await asyncio.sleep(0.3)
led.value(0)
print("LED off")
async def main():
queue = asyncio.Queue(10)
asyncio.create_task(button_producer(queue))
asyncio.create_task(led_consumer(queue))
while True:
await asyncio.sleep(1)
try:
asyncio.run(main())
finally:
asyncio.new_event_loop()
Expected Output
Button pressed
LED on
LED off
Button pressed
LED on
LED off
Discussion
- The button task only detects input events
- The LED task only handles output actions
- This separation makes the code easier to extend later
Hands-On Exercise 3: Sensor-to-Display Pipeline
Goal
Simulate a sensor producer and a processing consumer.
Scenario
A temperature sensor task produces readings, and a consumer formats and prints them.
If you have a DHT11 or DHT22 sensor, you can replace the simulated values later.
Code: main.py
import uasyncio as asyncio
import random
async def sensor_producer(queue):
"""Simulate periodic temperature readings."""
while True:
temperature = 20 + random.randint(0, 10)
print("Sensor reading:", temperature)
await queue.put(temperature)
await asyncio.sleep(1)
async def data_consumer(queue):
"""Consume sensor readings and format output."""
while True:
temp = await queue.get()
if temp >= 28:
status = "HOT"
elif temp >= 24:
status = "WARM"
else:
status = "OK"
print("Processed:", temp, "C ->", status)
await asyncio.sleep(0.5)
async def main():
queue = asyncio.Queue(3)
asyncio.create_task(sensor_producer(queue))
asyncio.create_task(data_consumer(queue))
while True:
await asyncio.sleep(1)
try:
asyncio.run(main())
finally:
asyncio.new_event_loop()
Example Output
Sensor reading: 21
Processed: 21 C -> OK
Sensor reading: 29
Processed: 29 C -> HOT
Sensor reading: 24
Processed: 24 C -> WARM
Hands-On Exercise 4: Queue with Network-Style Message Flow
Goal
Model an IoT application where one task prepares messages and another task “sends” them.
This simulates a telemetry pipeline before adding real networking.
Code: main.py
import uasyncio as asyncio
async def telemetry_producer(queue):
"""Generate telemetry messages."""
message_id = 0
while True:
message_id += 1
message = {
"id": message_id,
"device": "pico2w",
"temperature": 25 + (message_id % 5)
}
print("Produced telemetry:", message)
await queue.put(message)
await asyncio.sleep(2)
async def telemetry_consumer(queue):
"""Simulate sending telemetry to the cloud."""
while True:
message = await queue.get()
# Simulated network delay
print("Sending telemetry:", message)
await asyncio.sleep(1.5)
print("Telemetry sent:", message["id"])
async def main():
queue = asyncio.Queue(5)
asyncio.create_task(telemetry_producer(queue))
asyncio.create_task(telemetry_consumer(queue))
while True:
await asyncio.sleep(1)
try:
asyncio.run(main())
finally:
asyncio.new_event_loop()
Example Output
Produced telemetry: {'id': 1, 'device': 'pico2w', 'temperature': 26}
Sending telemetry: {'id': 1, 'device': 'pico2w', 'temperature': 26}
Telemetry sent: 1
Produced telemetry: {'id': 2, 'device': 'pico2w', 'temperature': 27}
Sending telemetry: {'id': 2, 'device': 'pico2w', 'temperature': 27}
Telemetry sent: 2
Best Practices for Queue-Based Async Design
1. Keep Messages Small
Use: - integers - strings - short dictionaries
Avoid large blobs unless necessary.
2. Choose the Right Queue Size
- Small queue: low memory use, more backpressure
- Large queue: more buffering, more RAM usage
3. Separate Responsibilities
- Producer: gather data
- Consumer: process data
- Another task: send to network
- Another task: update display or actuators
4. Handle Full Queues Carefully
If a queue is full: - the producer waits - or you can choose to drop older data in custom logic
5. Use Clear Event Names
Examples:
- "PRESS"
- "TEMP_HIGH"
- "SEND_NOW"
Common Mistakes
- Using
time.sleep()inside async code - Mixing sensor reading and actuator control in one coroutine
- Making queue messages too complex
- Forgetting to handle queue growth
- Polling too aggressively with no delay
Mini Challenge
Build a system with:
- one producer reading a button
- one consumer controlling an LED
- a second consumer logging presses to the REPL
Suggested Behavior
- Button press generates
"PRESS" - LED blinks once
- Logger prints timestamped event information
Review Questions
- What problem does a queue solve in asynchronous programs?
- Why is producer–consumer a good fit for embedded systems?
- What happens when
await queue.put()is called on a full queue? - Why should producers and consumers be separated into different coroutines?
- When would you choose to drop data instead of buffering it?
Session Summary
In this session, you learned how to use the producer–consumer pattern with queues in MicroPython. You built coroutine-based systems where tasks communicate through uasyncio.Queue, enabling non-blocking designs that are well suited to sensors, buttons, LEDs, and IoT telemetry on the Raspberry Pi Pico 2 W.
Next Session Preview
Session 4: Event-Driven Programming with Interrupts and Async Coordination
Back to Chapter | Back to Master Plan | Previous Session | Next Session