Session 3: Supervision, Restarts, and Fault Containment
Synopsis
Covers supervisory patterns for detecting failed tasks, restarting subsystems, isolating faults, and maintaining partial system availability.
Session Content
Session 3: Supervision, Restarts, and Fault Containment
Session Overview
In this session, learners will understand how to make MicroPython applications on the Raspberry Pi Pico 2 W more robust using supervision, watchdog-style restarts, and fault containment patterns. The session focuses on preventing a single failing task from crashing the entire device, recovering from hangs, and designing systems that can continue operating safely after errors.
Duration
- Approximately 45 minutes
Prerequisites
- Basic Python knowledge
- Familiarity with
uasyncio - Completed prior sessions on tasks, scheduling, and
await - Raspberry Pi Pico 2 W with MicroPython installed
- Thonny IDE installed and configured
Learning Outcomes
By the end of this session, learners will be able to:
- Explain supervision and fault containment in embedded asynchronous systems
- Use uasyncio task supervision patterns to monitor and restart failing tasks
- Handle exceptions inside tasks without crashing the whole program
- Understand and use the watchdog timer for recovery from hangs
- Build a resilient LED/blink monitoring system with simulated failures
- Apply safe restart and containment strategies in IoT-style MicroPython applications
Development Environment Setup
Thonny Setup
- Install Thonny from: https://thonny.org/
- Connect the Raspberry Pi Pico 2 W via USB.
- Open Thonny.
- Select:
- Run > Select interpreter
- Choose MicroPython (Raspberry Pi Pico)
- Select the correct serial port
- Confirm the REPL works by running:
python print("Hello from Pico 2 W")
Project Files
Suggested file structure for this session:
- main.py — main supervised application
- boot.py — optional startup configuration
- lib/ — optional helper modules
1. Theory: Supervision and Fault Containment
What is Supervision?
Supervision is the practice of monitoring tasks and restarting them if they fail. Instead of letting one bad task stop the whole application, a supervisor: - starts the task - watches for exceptions - logs failures - optionally restarts the task after a delay
This pattern is common in reliable systems and is useful on microcontrollers where long-running unattended operation is important.
What is Fault Containment?
Fault containment means limiting the impact of a failure to one component. If a sensor task fails: - the display task may continue - the network task may recover independently - the device may degrade gracefully instead of halting
Why This Matters on Pico 2 W
Microcontroller applications often run continuously: - home monitoring - environmental sensing - lighting control - IoT telemetry
If a task hangs due to: - a bad sensor reading - network timeout - unexpected exception - resource exhaustion
then the device should ideally recover automatically.
2. Theory: Common Failure Modes in Async MicroPython
Typical Problems
- Unhandled exceptions in a coroutine
- Infinite loop that never yields
- Network connection hanging
- Sensor read timeout
- Memory pressure causing instability
- One task blocking the event loop
Recommended Practices
- Keep coroutines short and cooperative
- Use
await asyncio.sleep(...)regularly - Wrap risky operations in
try/except - Log errors clearly
- Restart only the failing task when possible
- Use a watchdog timer as a last line of defense
3. Hands-On Exercise 1: Supervised Blinker with Fault Injection
In this exercise, learners will build a blinking LED task that occasionally fails on purpose. A supervisor task will restart it automatically.
Hardware
- Raspberry Pi Pico 2 W
- On-board LED
Code: Supervised LED Task
Create main.py:
import uasyncio as asyncio
from machine import Pin
import time
led = Pin("LED", Pin.OUT)
async def blink_task():
"""
Blink the onboard LED.
Occasionally raise an exception to simulate a fault.
"""
counter = 0
while True:
led.toggle()
print("Blink:", counter, "LED state:", led.value())
counter += 1
# Simulate a fault every 7 cycles
if counter % 7 == 0:
raise RuntimeError("Simulated blink task failure")
await asyncio.sleep(0.5)
async def supervisor():
"""
Supervise the blink task.
Restart it after failures with a short delay.
"""
restart_count = 0
while True:
try:
print("\n[Supervisor] Starting blink task, restart #", restart_count)
await blink_task()
except Exception as exc:
restart_count += 1
print("[Supervisor] Caught failure:", repr(exc))
print("[Supervisor] Restarting task in 2 seconds...")
await asyncio.sleep(2)
async def main():
await supervisor()
try:
asyncio.run(main())
finally:
asyncio.new_event_loop()
Expected Output
[Supervisor] Starting blink task, restart # 0
Blink: 0 LED state: 1
Blink: 1 LED state: 0
Blink: 2 LED state: 1
Blink: 3 LED state: 0
Blink: 4 LED state: 1
Blink: 5 LED state: 0
Blink: 6 LED state: 1
[Supervisor] Caught failure: RuntimeError('Simulated blink task failure')
[Supervisor] Restarting task in 2 seconds...
[Supervisor] Starting blink task, restart # 1
...
Activity
- Load the code into Thonny.
- Run it on the Pico 2 W.
- Observe the LED blinking and the simulated fault.
- Confirm the supervisor restarts the task.
Discussion
- Why does the failure not crash the whole program?
- What happens if the exception is not caught?
- How might this pattern help with real sensors?
4. Theory: Restart Policies
A supervisor can use different restart strategies: - Immediate restart: restart instantly after failure - Delayed restart: wait a short time before retrying - Limited retries: stop after too many failures - Backoff: increase delay after repeated failures
Example Restart Policy Goals
- Avoid hammering a failing sensor
- Give Wi-Fi time to recover
- Prevent endless reboot loops
5. Hands-On Exercise 2: Fault-Contained Sensor Poller
This exercise simulates a sensor task that sometimes fails, while another task continues running.
Code: Fault-Contained Worker + Status Task
import uasyncio as asyncio
from machine import Pin
import random
led = Pin("LED", Pin.OUT)
async def sensor_task():
"""
Simulated sensor reader.
Raises an exception occasionally to mimic bad reads.
"""
reading = 0
while True:
reading += 1
# Simulate a failed sensor read sometimes
if reading % 5 == 0:
raise OSError("Sensor read failed")
print("[Sensor] Reading:", reading)
await asyncio.sleep(1)
async def status_task():
"""
Continues to run independently.
Toggles the LED to show the system is alive.
"""
while True:
led.toggle()
print("[Status] System alive. LED:", led.value())
await asyncio.sleep(0.25)
async def supervised_sensor():
"""
Restart the sensor task when it fails.
"""
failures = 0
while True:
try:
await sensor_task()
except Exception as exc:
failures += 1
print("[Sensor Supervisor] Error:", repr(exc))
print("[Sensor Supervisor] Failures:", failures)
await asyncio.sleep(2)
async def main():
sensor = asyncio.create_task(supervised_sensor())
status = asyncio.create_task(status_task())
await asyncio.gather(sensor, status)
try:
asyncio.run(main())
finally:
asyncio.new_event_loop()
Expected Output
[Sensor] Reading: 1
[Status] System alive. LED: 1
[Status] System alive. LED: 0
[Status] System alive. LED: 1
[Sensor] Reading: 2
...
[Sensor Supervisor] Error: OSError('Sensor read failed')
[Sensor Supervisor] Failures: 1
[Status] System alive. LED: 0
...
Activity
- Run the program.
- Observe that the status LED keeps toggling even when the sensor task fails.
- Identify which task is contained and which task remains available.
6. Theory: Watchdog Timers
A watchdog timer resets the microcontroller if the software stops responding.
Why Use a Watchdog?
A watchdog is useful if: - the event loop hangs - a task blocks indefinitely - the system enters an unrecoverable state
Important Concept
Supervision handles known failures in software. A watchdog handles software that stops making progress.
7. Hands-On Exercise 3: Watchdog Timer for Last-Line Recovery
Code: Basic Watchdog Example
from machine import WDT
import time
# Set watchdog timeout to 5 seconds
wdt = WDT(timeout=5000)
print("Watchdog started")
while True:
print("Feeding watchdog")
wdt.feed()
time.sleep(1)
Fault Demo
To see the watchdog trigger, temporarily comment out wdt.feed():
from machine import WDT
import time
wdt = WDT(timeout=5000)
print("Watchdog started")
while True:
print("Simulating hang... watchdog will reset the device")
time.sleep(1)
Expected Behavior
- The Pico prints messages for a few seconds
- Then resets automatically
Activity
- Run the first version.
- Observe stable operation.
- Run the second version.
- Watch the device reset after the timeout.
8. Best Practices for Fault Containment in MicroPython
Good Practices
- Keep each task focused on one responsibility
- Catch expected exceptions close to the source
- Log errors with useful messages
- Use retry delays to avoid tight failure loops
- Use independent tasks for independent functions
- Reserve watchdog resets for unrecoverable hangs
- Avoid blocking calls inside async code
Avoid
- Catching all exceptions and ignoring them silently
- Restarting too quickly without delay
- Blocking forever on I/O
- Putting unrelated logic into one large coroutine
9. Consolidation Exercise: Supervised IoT Status System
Scenario
Build a simple IoT-like device that: - blinks the onboard LED as a heartbeat - simulates periodic telemetry collection - restarts the telemetry task if it fails - uses a watchdog timer for overall resilience
Starter Code
import uasyncio as asyncio
from machine import Pin, WDT
import time
led = Pin("LED", Pin.OUT)
wdt = WDT(timeout=8000)
async def heartbeat_task():
while True:
led.toggle()
print("[Heartbeat] LED:", led.value())
await asyncio.sleep(0.5)
async def telemetry_task():
count = 0
while True:
count += 1
print("[Telemetry] Sending reading", count)
if count % 6 == 0:
raise RuntimeError("Telemetry publish failed")
await asyncio.sleep(1.5)
async def supervised_telemetry():
retries = 0
while True:
try:
await telemetry_task()
except Exception as exc:
retries += 1
print("[Supervisor] Telemetry error:", repr(exc))
print("[Supervisor] Retry count:", retries)
await asyncio.sleep(2)
async def watchdog_feeder():
while True:
wdt.feed()
await asyncio.sleep(2)
async def main():
await asyncio.gather(
heartbeat_task(),
supervised_telemetry(),
watchdog_feeder(),
)
try:
asyncio.run(main())
finally:
asyncio.new_event_loop()
Expected Output
[Heartbeat] LED: 1
[Telemetry] Sending reading 1
[Heartbeat] LED: 0
[Telemetry] Sending reading 2
...
[Supervisor] Telemetry error: RuntimeError('Telemetry publish failed')
[Supervisor] Retry count: 1
...
Activity Questions
- Which task is supervised?
- Which task acts as a heartbeat?
- What role does the watchdog feeder play?
- What happens if the feeder task stops?
10. Session Wrap-Up
Key Takeaways
- Supervision prevents one failing task from taking down the system.
- Fault containment limits errors to a single component.
- Restart policies help systems recover gracefully.
- Watchdog timers provide last-line recovery from hangs.
- Resilient MicroPython systems should be designed with failure in mind.
Reflection
Consider these questions: - Which tasks in your future Pico applications are critical? - Which tasks should be restartable? - What should happen if Wi-Fi fails repeatedly? - When should you use supervision versus a watchdog?
11. Optional Challenge
Modify the supervised telemetry system so that: - after 3 failures, it stops retrying - the LED blinks a fast error pattern - the watchdog eventually resets the device
Challenge Starter
import uasyncio as asyncio
from machine import Pin, WDT
led = Pin("LED", Pin.OUT)
wdt = WDT(timeout=10000)
async def error_pattern():
while True:
led.on()
await asyncio.sleep(0.1)
led.off()
await asyncio.sleep(0.1)
async def telemetry_task():
raise RuntimeError("Permanent telemetry failure")
async def supervised_telemetry():
failures = 0
while failures < 3:
try:
await telemetry_task()
except Exception as exc:
failures += 1
print("[Supervisor] Failure", failures, ":", repr(exc))
await asyncio.sleep(1)
print("[Supervisor] Too many failures; entering error state")
await error_pattern()
async def watchdog_feeder():
while True:
wdt.feed()
await asyncio.sleep(2)
async def main():
await asyncio.gather(
supervised_telemetry(),
watchdog_feeder(),
)
try:
asyncio.run(main())
finally:
asyncio.new_event_loop()
Expected Behavior
- The telemetry task fails repeatedly
- The supervisor stops retrying after 3 failures
- The system enters an error state
- If the watchdog feeder is also affected, the Pico resets
12. Suggested Follow-Up Topics
- Reboot strategies and persistent crash logs
- Circuit breaker patterns for network services
- Safe startup and recovery in
boot.py - Logging failures to flash or RTC memory
- Building a resilient Wi-Fi reconnection manager
Back to Chapter | Back to Master Plan | Previous Session | Next Session