215 lines
4.8 KiB
Python
215 lines
4.8 KiB
Python
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
'''
|
|
Led data pin : D5 (GPIO14)
|
|
'''
|
|
|
|
import sys, time
|
|
import socket
|
|
from uosc.server import run_server, split_oscstr, parse_message, parse_bundle
|
|
from uosc.client import send
|
|
from uosc.client import Client
|
|
from machine import Pin
|
|
import mynetconf as netconf
|
|
import machine, neopixel
|
|
|
|
|
|
print('Loading OSC server module...')
|
|
|
|
|
|
MAX_DGRAM_SIZE = 1472
|
|
|
|
OSCport = 9001
|
|
|
|
ledpin = Pin(2, Pin.OUT)
|
|
ESPledstate = False
|
|
stripin = Pin(14, Pin.OUT)
|
|
|
|
n = 144
|
|
np = neopixel.NeoPixel(stripin, n)
|
|
motherip = "192.168.2.104"
|
|
motherosc = Client(motherip, 9002)
|
|
|
|
leftnote = 36
|
|
|
|
def ESPledon():
|
|
global ESPledstate
|
|
|
|
ledpin.value(0) # LED ON
|
|
ESPledstate = True
|
|
|
|
def ESPledoff():
|
|
global ESPledstate
|
|
|
|
ledpin.value(1) # LED OFF
|
|
ESPledstate = False
|
|
|
|
ESPledoff()
|
|
|
|
|
|
def noteledon(notenumber, velocity, r = 128, g=0 , b=64 ):
|
|
|
|
if notenumber > leftnote-1:
|
|
|
|
np[int((notenumber-leftnote)*2)] = (velocity*2,g,b)
|
|
np[int((notenumber-leftnote)*2)+1] = (velocity*2,g,b)
|
|
np.write()
|
|
motherosc.send('/noteledon', notenumber)
|
|
else:
|
|
motherosc.send('/notetoolow', notenumber)
|
|
|
|
def noteledoff(notenumber):
|
|
|
|
if notenumber > leftnote-1:
|
|
np[int((notenumber-leftnote)*2)] = (0, 0, 0)
|
|
np[int((notenumber-leftnote)*2)+1] = (0, 0, 0)
|
|
np.write()
|
|
motherosc.send('/noteledoff', notenumber)
|
|
else:
|
|
motherosc.send('/notetoolow', notenumber)
|
|
|
|
def npcycle():
|
|
|
|
for i in range(2 * n):
|
|
for j in range(n):
|
|
np[j] = (0, 0, 0)
|
|
np[i % n] = (126, 20, 126)
|
|
np.write()
|
|
time.sleep_ms(15)
|
|
|
|
'''
|
|
def npcycle2(r, g, b, wait):
|
|
|
|
print(r,g,b,wait)
|
|
for i in range(2 * n):
|
|
for j in range(n):
|
|
np[j] = (0, 0, 0)
|
|
np[int(i % n)] = (r, g, b)
|
|
#print(np)
|
|
np.write()
|
|
print()
|
|
for count in range(n):
|
|
print(i%n)
|
|
time.sleep(wait)
|
|
'''
|
|
|
|
def npcolor(r, g, b):
|
|
|
|
for i in range(n):
|
|
np[i] = (r, g, b)
|
|
np.write()
|
|
|
|
def npbounce():
|
|
|
|
for i in range(4 * n):
|
|
for j in range(n):
|
|
np[j] = (0, 0, 128)
|
|
if (i // n) % 2 == 0:
|
|
np[i % n] = (0, 0, 0)
|
|
else:
|
|
np[n - 1 - (i % n)] = (0, 0, 0)
|
|
np.write()
|
|
time.sleep_ms(60)
|
|
|
|
|
|
|
|
def OSCHandler(t, msg):
|
|
|
|
#print()
|
|
print("OSCHandler")
|
|
'''
|
|
print("OSC address:", msg[0]) # /on
|
|
print("Type tags:", msg[1]) # 'i'
|
|
print("Arguments:", msg[2]) # (1,)
|
|
print()
|
|
'''
|
|
|
|
# /noteon midichannel note velocity
|
|
if msg[0] == "/noteon":
|
|
print("ESP : NOTEON channel", msg[2][0], "note",msg[2][1], "velocity", msg[2][2])
|
|
noteledon(int(msg[2][1]), int(msg[2][2]))
|
|
#np[int(msg[2][1]%n)] = (255, 0, 153)
|
|
#np.write()
|
|
|
|
# /noteoff midichannel note
|
|
if msg[0] =='/noteoff':
|
|
print("ESP : NOTEOFF channel", msg[2][0], "note",msg[2][1])
|
|
noteledoff(int(msg[2][1]))
|
|
#np.write()
|
|
|
|
|
|
def handle_rawosc(data, src, strict=False):
|
|
try:
|
|
head, _ = split_oscstr(data, 0)
|
|
#if __debug__: print("head", head)
|
|
|
|
if head.startswith('/'):
|
|
messages = [(-1, parse_message(data, strict))]
|
|
elif head == '#bundle':
|
|
messages = parse_bundle(data, strict)
|
|
except Exception as exc:
|
|
if __debug__:
|
|
pass
|
|
return
|
|
|
|
try:
|
|
for timetag, (oscaddr, tags, args) in messages:
|
|
'''
|
|
if __debug__:
|
|
print("OSC address: %s" % oscaddr)
|
|
print("OSC type tags: %r" % tags)
|
|
print("OSC arguments: %r" % (args,))
|
|
'''
|
|
print("Dispatching", timetag, (oscaddr, tags, args, src))
|
|
OSCHandler(timetag, (oscaddr, tags, args, src))
|
|
|
|
except Exception as exc:
|
|
print("Exception in OSC handler: %s", exc)
|
|
|
|
|
|
|
|
def run_server(saddr, port):
|
|
|
|
motherosc.send('/runningserver', 1)
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
if __debug__: print("Created OSC UDP server socket.")
|
|
|
|
sock.bind((saddr, port))
|
|
print("Listening for OSC messages on", saddr, port)
|
|
|
|
try:
|
|
while True:
|
|
data, caddr = sock.recvfrom(MAX_DGRAM_SIZE)
|
|
if __debug__: print("Server received", len(data), "bytes from",caddr)
|
|
handle_rawosc(data, caddr)
|
|
print()
|
|
finally:
|
|
sock.close()
|
|
print("Bye!")
|
|
|
|
|
|
|
|
def main(ip):
|
|
|
|
#import time
|
|
ESPledon()
|
|
print("Ledserver main IP",ip)
|
|
time.sleep(1)
|
|
npcolor(0,0,0)
|
|
#npcycle(255, 0, 0, 2)
|
|
#npbounce()
|
|
npcycle()
|
|
npcolor(0,0,0)
|
|
|
|
start = time.time()
|
|
|
|
try:
|
|
run_server("0.0.0.0", int(netconf.OSCin))
|
|
except KeyboardInterrupt:
|
|
pass
|
|
|
|
ESPledoff()
|
|
|
|
if __name__ == '__main__':
|
|
main(0) |