I’m building an art piece. It consists of multiple raspberry pi picos. Coding in micropython.
-
There is a RPi pico-based remote control that acts like a web server, providing the state of the potentiometer.
-
The art piece has 3 more picos on it that open a socket and get the pot state from the remote. Each pico has motors and cascading WS2812 LEDs to combine color and movement.
-
There may be other ways to solve the general task of communicating with 3 clients. Bluetooth got a little too complicated… I’m open to ideas. Or if there is a book someone could recommend to help me learn this stuff.
The problem is that while using asyncio, the s.recv code blocks the LEDs moving for a second, and I want the LEDs to keep moving.
I tried to use threads but that didn’t work.
I tried to use s.setblocking(False) also didn’t work. I’m fact that’s made it pause longer at print(“3”).
This is the code that blocks (ignore the fact that the r,g,b variables are actually for the motors for now):
import machine
import network
import time
from secret import ssid, password
import socket
from neopixel import Neopixel
from time import sleep
import asyncio
## Setup
NUM_LEDS = 74
LED_PIN = 13
pixels = Neopixel(NUM_LEDS, 0, LED_PIN, "GRB")
## Color Parameters
# Hue
hue_offset = 1028
# Brightness
brightness = 16
x = 1
motor1 = machine.PWM(machine.Pin(9))
motor2 = machine.PWM(machine.Pin(10))
motor3 = machine.PWM(machine.Pin(11))
motor4 = machine.PWM(machine.Pin(12))
motor1.freq(1000)
motor2.freq(1000)
motor3.freq(1000)
motor4.freq(1000)
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(ssid, password)
while not wlan.isconnected() and wlan.status() >= 0:
print("Waiting to connect:")
time.sleep(1)
# Should be connected and have an IP address
wlan.status() # 3 == success
wlan.ifconfig()
print(wlan.ifconfig())
def interval_mapping(x, in_min, in_max, out_min, out_max):
return (x - in_min) * (out_max - out_min) / (in_max - in_min) + out_min
async def get_data():
global x
while True:
try:
if wlan.isconnected():
print("requesting data")
ai = socket.getaddrinfo("192.168.0.24", 80) # Address of Web Server
print("1")
addr = ai[0][-1]
print("2")
# Create a socket and make a HTTP request
s = socket.socket() # Open socket
print("3")
s.connect(addr)
print("4")
s.send(b"GET Data") # Send request
print("5")
ss = str(s.recv(512)) # Store reply
print("6")
# Print what we received
print(f"ss={ss}")
# Split into RGB components
l = len(ss)
ss = ss[2:l-1] # Strip to essentials
p = ss.find(",") # Find first comma
r = int(ss[0:p]) # Extract RED value
ss = ss[p+1:] # Remove red part
p = ss.find(",") # Find comma separator
g = int(ss[0:p]) # Extract GREEN value
b = int(ss[p+1:]) # Extract BLUE value
print(f"ss2={r,g,b}") # Print RGB values
print()
# Set RGB LED here
x = int(interval_mapping(r,0,65535,-65535,65535))
s.close() # Close socket
await asyncio.sleep_ms(500) # wait
else:
print("no connection")
await asyncio.sleep_ms(1000)
except Exception as e:
print(f"Exception: {e}")
print('no data, connection closed')
await asyncio.sleep_ms(1000)
s.close()
## Loop
async def rainbow():
while True:
for hue in range(0, 65535, 1000):
# Set the hues
for led in range(NUM_LEDS):
color = pixels.colorHSV(hue +(led * hue_offset), 255, brightness)
pixels.set_pixel(led, color)
pixels.show()
print(f"rgb={color}")
await asyncio.sleep_ms(100)
async def drive_motors():
global x
while True:
print(f"x={x}")
if -10000 < x < 10000:
motor1.duty_u16(0)
motor2.duty_u16(0)
motor3.duty_u16(0)
motor4.duty_u16(0)
elif x > 10000:
motor1.duty_u16(x)
motor2.duty_u16(0)
motor3.duty_u16(x)
motor4.duty_u16(0)
elif x < -10000:
motor1.duty_u16(0)
motor2.duty_u16(-x)
motor3.duty_u16(0)
motor4.duty_u16(-x)
await asyncio.sleep_ms(100)
async def main():
tasks = [
asyncio.create_task(get_data()),
asyncio.create_task(rainbow()),
asyncio.create_task(drive_motors()),
]
await asyncio.gather(*tasks)
asyncio.run(main())