first commit

This commit is contained in:
Sam 2020-11-15 18:47:23 +01:00
commit 93f9c17ea7
15 changed files with 1005 additions and 0 deletions

80
Readme.MD Normal file
View File

@ -0,0 +1,80 @@
MaxO
v0.1
OSC operations for ESP8266 + Max7219 matrixs led with micropython.
Need an ESP8266 on a local network with internet gateway at least for install.
Obviously change /dev/cu.usbserial-1D1310 with your serial port like /dev/tty.USB0
# Hardware connections
Connections wemos D1 mini https://docs.wemos.cc/en/latest/d1/d1_mini_lite.html
5V VCC
GND GND
D7 MOSI (GPIO13) DIN
D8 CS (GPIO15) CS
D5 SCK (GPIO14) CLK
# Flash Wemos D1 mini (ESP8266) 1 Mo
Download micropython firmware :
https://micropython.org/resources/firmware/esp8266-1m-20200902-v1.13.bin
esptool.py erase_flash
esptool.py --port /dev/cu.usbserial-1D1310 --baud 1000000 write_flash --flash_size=4MB -fm dio 0 PATHTO/esp8266-1m-20200902-v1.13.bin
# install ampy
pip3 install ampy
# Wifi config
edit netconf.py
# Matrixs config vertical or horizontal
in matrix9.py
screen = max7219.Max7219(8, 32, spi, Pin(15))
8 is x
32 is y (4 8x8 max7219)
# Transfer MaxO
ampy -p /dev/cu.usbserial-1D1310 put boot.py
ampy -p /dev/cu.usbserial-1D1310 put wlan.py
ampy -p /dev/cu.usbserial-1D1310 put netconf.py
ampy -p /dev/cu.usbserial-1D1310 put main.py
ampy -p /dev/cu.usbserial-1D1310 put maxserver.py
ampy -p /dev/cu.usbserial-1D1310 put max9.py
ampy -p /dev/cu.usbserial-1D1310 put max7219.py
ampy -p /dev/cu.usbserial-1D1310 put uosc
# Onboard install library via Internet
ampy -p /dev/cu.usbserial-1D1310 run wlan
screen /dev/cu.usbserial-1D1310 115200
type in micropython interactive :
import upip
upip.install('uasyncio')
upip.install('ffilib')
# reboot with onboard reset button
reboot

10
boot.py Executable file
View File

@ -0,0 +1,10 @@
# This file is executed on every boot (including wake-boot from deepsleep)
#import esp
#esp.osdebug(None)
import uos, machine
#uos.dupterm(None, 1) # disable REPL on UART(0)
import gc
#import webrepl
#webrepl.start()
gc.collect()

8
main.py Executable file
View File

@ -0,0 +1,8 @@
from wlan import do_connect
import netconf
ip = do_connect(netconf.ssid, netconf.password)
print(ip)
# netconf ('192.168.2.140', '255.255.255.0', '192.168.2.254', '1.1.1.1')
from maxserver import main
main(ip)

115
max7219.py Executable file
View File

@ -0,0 +1,115 @@
from machine import Pin
from micropython import const
import framebuf
_DIGIT_0 = const(0x1)
_DECODE_MODE = const(0x9)
_NO_DECODE = const(0x0)
_INTENSITY = const(0xa)
_INTENSITY_MIN = const(0x0)
_SCAN_LIMIT = const(0xb)
_DISPLAY_ALL_DIGITS = const(0x7)
_SHUTDOWN = const(0xc)
_SHUTDOWN_MODE = const(0x0)
_NORMAL_OPERATION = const(0x1)
_DISPLAY_TEST = const(0xf)
_DISPLAY_TEST_NORMAL_OPERATION = const(0x0)
_MATRIX_SIZE = const(8)
class Max7219(framebuf.FrameBuffer):
"""
Driver for MAX7219 8x8 LED matrices
Example for ESP8266 with 2x4 matrices (one on top, one on bottom),
so we have a 32x16 display area:
>>> from machine import Pin, SPI
>>> from max7219 import Max7219
>>> spi = SPI(1, baudrate=10000000)
>>> screen = Max7219(32, 16, spi, Pin(15))
>>> screen.rect(0, 0, 32, 16, 1) # Draws a frame
>>> screen.text('Hi!', 4, 4, 1)
>>> screen.show()
On some matrices, the display is inverted (rotated 180°), in this case
you can use `rotate_180=True` in the class constructor.
"""
def __init__(self, width, height, spi, cs, rotate_180=False):
# Pins setup
self.spi = spi
self.cs = cs
self.cs.init(Pin.OUT, True)
# Dimensions
self.width = width
self.height = height
# Guess matrices disposition
self.cols = width // _MATRIX_SIZE
self.rows = height // _MATRIX_SIZE
self.nb_matrices = self.cols * self.rows
self.rotate_180 = rotate_180
# 1 bit per pixel (on / off) -> 8 bytes per matrix
self.buffer = bytearray(width * height // 8)
format = framebuf.MONO_HLSB if not self.rotate_180 else framebuf.MONO_HMSB
super().__init__(self.buffer, width, height, format)
# Init display
self.init_display()
def _write_command(self, command, data):
"""Write command on SPI"""
cmd = bytearray([command, data])
self.cs(0)
for matrix in range(self.nb_matrices):
self.spi.write(cmd)
self.cs(1)
def init_display(self):
"""Init hardware"""
for command, data in (
(_SHUTDOWN, _SHUTDOWN_MODE), # Prevent flash during init
(_DECODE_MODE, _NO_DECODE),
(_DISPLAY_TEST, _DISPLAY_TEST_NORMAL_OPERATION),
(_INTENSITY, _INTENSITY_MIN),
(_SCAN_LIMIT, _DISPLAY_ALL_DIGITS),
(_SHUTDOWN, _NORMAL_OPERATION), # Let's go
):
self._write_command(command, data)
self.fill(0)
self.show()
def brightness(self, value):
"""Set display brightness (0 to 15)"""
if not 0 <= value < 16:
raise ValueError('Brightness must be between 0 and 15')
self._write_command(_INTENSITY, value)
def show(self):
"""Update display"""
# Write line per line on the matrices
for line in range(8):
self.cs(0)
for matrix in range(self.nb_matrices):
# Guess where the matrix is placed
row, col = divmod(matrix, self.cols)
# Compute where the data starts
if not self.rotate_180:
offset = row * 8 * self.cols
index = col + line * self.cols + offset
else:
offset = 8 * self.cols - row * self.cols * 8 - 1
index = self.cols * (8 - line) - col + offset
self.spi.write(bytearray([_DIGIT_0 + line, self.buffer[index]]))
self.cs(1)

75
max9.py Normal file
View File

@ -0,0 +1,75 @@
from machine import Pin, SPI
import max7219
from time import sleep
print('Loading Max9 + Max7219...')
spi = SPI(1, baudrate=10000000)
screen = max7219.Max7219(8, 32, spi, Pin(15))
def cls():
screen.fill(0)
screen.show()
def text(msg):
screen.fill(0)
screen.text(msg, 0, 0, 1)
screen.show()
def textscroll(msg):
for scrolls in range(32):
screen.fill(0)
screen.text(msg,0,scrolls,1)
screen.show()
sleep(0.03)
def textv(msg):
screen.fill(0)
for counter, letter in enumerate(msg):
screen.text(letter, 0, counter*8, 1)
screen.show()
def textvscrollup(msg):
for scrolls in range(32,0,-1):
screen.fill(0)
for counter, letter in enumerate(msg):
screen.text(letter, 0, scrolls+counter*8, 1)
screen.show()
sleep(0.005)
def textvscrolleft(msg):
for scrolls in range(8,-8,-1):
screen.fill(0)
for counter, letter in enumerate(msg):
screen.text(letter, scrolls, counter*8, 1)
screen.show()
sleep(0.005)
def textvblink(msg,times,speed):
for count in range(times):
textv(msg)
screen.show()
sleep(speed)
screen.fill(0)
sleep(1)
def demo():
textv('Demo')
sleep(2)
#textvblink('text',5,0.5)
textscroll('abcdef')
textvscrollup('abcdef')
sleep(1)
textvscrolleft('abcdef')
if __name__ == '__main__':
demo()

95
maxserver.py Executable file
View File

@ -0,0 +1,95 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Run an OSC server with asynchroneous I/O handling via the uasync framwork.
"""
import sys
import socket
from uasyncio.core import IORead, coroutine, get_event_loop, sleep
from uosc.server import handle_osc
print('Loading OSC server module...')
import max9
MAX_DGRAM_SIZE = 1472
OSCport = 9001
def run_server(host, port, client_coro, **params):
if __debug__: print("run_server(%s, %s)", host, port)
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setblocking(False)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((host, port))
try:
while True:
yield IORead(sock)
if __debug__: print("run_server: Before recvfrom")
data, caddr = sock.recvfrom(MAX_DGRAM_SIZE)
yield client_coro(data, caddr, **params)
finally:
sock.close()
print("Bye!")
@coroutine
def serve(data, caddr, **params):
if __debug__: print("Client request handler coroutine called.")
handle_osc(data, caddr, **params)
# simulate long running request handler
yield from sleep(1)
if __debug__: print("Finished processing request,")
class Callback:
def __init__(self):
self.count = 0
def __call__(self, t, msg):
self.count += 1
#print("OSC message from: udp://%s:%s" % get_hostport(msg[3]))
print("OSC address:", msg[0]) # /on
print("Type tags:", msg[1]) # 'i'
print("Arguments:", msg[2]) # (1,)
print()
if msg[0] == "/text":
print("/text",msg[2][0])
max9.textv(msg[2][0][1:-1])
if msg[0] =='/cls':
print("/cls")
max9.cls()
if msg[0] =='/demo':
print("/demo")
max9.demo()
def main(ip):
import time
loop = get_event_loop()
callback = Callback()
loop.call_soon(run_server("0.0.0.0", OSCport, serve, dispatch=callback))
if __debug__: print("Starting asyncio event loop")
print(ip)
max9.textv(ip[-4:])
time.sleep(1)
max9.cls()
start = time.time()
try:
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
loop.close()
reqs = counter.count / (time.time() - start)
print("Requests/second: %.2f" % reqs)
if __name__ == '__main__':
main()

2
netconf.py Normal file
View File

@ -0,0 +1,2 @@
ssid = 'ssidname'
password = 'password'

11
uosc/__init__.py Executable file
View File

@ -0,0 +1,11 @@
# -*- coding: utf-8 -*-
"""A minimal OSC client and server library for MicroPython.
To use it with the unix port of MicroPython, install the required modules from
the micropython-lib:
$ for mod in argparse ffilib logging socket struct; do
micropython -m upip install micropython-$mod
done
"""

34
uosc/__main__.py Executable file
View File

@ -0,0 +1,34 @@
#!/usr/bin/env micropython
# -*- coding: utf-8 -*-
import argparse
import logging
import sys
from uosc.server import run_server
DEFAULT_ADDRESS = '0.0.0.0'
DEFAULT_PORT = 9001
def main(args=None):
ap = argparse.ArgumentParser()
ap.add_argument('-v', '--verbose', action="store_true",
help="Enable debug logging")
ap.add_argument('-a', '--address', default=DEFAULT_ADDRESS,
help="OSC server address (default: %s)" % DEFAULT_ADDRESS)
ap.add_argument('-p', '--port', type=int, default=DEFAULT_PORT,
help="OSC server port (default: %s)" % DEFAULT_PORT)
args = ap.parse_args(args if args is not None else sys.argv[1:])
logging.basicConfig(level=logging.DEBUG if args.verbose else logging.INFO)
try:
run_server(args.address, int(args.port))
except KeyboardInterrupt:
pass
if __name__ == '__main__':
sys.exit(main(sys.argv[1:]) or 0)

204
uosc/client.py Executable file
View File

@ -0,0 +1,204 @@
# -*- coding: utf-8 -*-
#
# uosc/client.py
#
"""Simple OSC client."""
import socket
try:
from ustruct import pack
except ImportError:
from struct import pack
from uosc.common import Bundle, to_frac
if isinstance('', bytes):
have_bytes = False
unicodetype = unicode # noqa
else:
have_bytes = True
unicodetype = str
TYPE_MAP = {
int: 'i',
float: 'f',
bytes: 'b',
bytearray: 'b',
unicodetype: 's',
True: 'T',
False: 'F',
None: 'N',
}
def pack_addr(addr):
"""Pack a (host, port) tuple into the format expected by socket methods."""
if isinstance(addr, (bytes, bytearray)):
return addr
if len(addr) != 2:
raise NotImplementedError("Only IPv4/v6 supported")
addrinfo = socket.getaddrinfo(addr[0], addr[1])
return addrinfo[0][4]
def pack_timetag(t):
"""Pack an OSC timetag into 64-bit binary blob."""
return pack('>II', *to_frac(t))
def pack_string(s, encoding='utf-8'):
"""Pack a string into a binary OSC string."""
if isinstance(s, unicodetype):
s = s.encode(encoding)
assert all((i if have_bytes else ord(i)) < 128 for i in s), (
"OSC strings may only contain ASCII chars.")
slen = len(s)
return s + b'\0' * (((slen + 4) & ~0x03) - slen)
def pack_blob(b, encoding='utf-8'):
"""Pack a bytes, bytearray or tuple/list of ints into a binary OSC blob."""
if isinstance(b, (tuple, list)):
b = bytearray(b)
elif isinstance(b, unicodetype):
b = b.encode(encoding)
blen = len(b)
b = pack('>I', blen) + bytes(b)
return b + b'\0' * (((blen + 3) & ~0x03) - blen)
def pack_bundle(bundle):
"""Return bundle data packed into a binary string."""
data = []
for msg in bundle:
if isinstance(msg, Bundle):
msg = pack_bundle(msg)
elif isinstance(msg, tuple):
msg = create_message(*msg)
data.append(pack('>I', len(msg)) + msg)
return b'#bundle\0' + pack_timetag(bundle.timetag) + b''.join(data)
def pack_midi(val):
assert not isinstance(val, unicodetype), (
"Value with tag 'm' or 'r' must be bytes, bytearray or a sequence of "
"ints, not %s" % unicodetype)
if not have_bytes and isinstance(val, str):
val = (ord(c) for c in val)
return pack('BBBB', *tuple(val))
def create_message(address, *args):
"""Create an OSC message with given address pattern and arguments.
The OSC types are either inferred from the Python types of the arguments,
or you can pass arguments as 2-item tuples with the OSC typetag as the
first item and the argument value as the second. Python objects are mapped
to OSC typetags as follows:
* ``int``: i
* ``float``: f
* ``str``: s
* ``bytes`` / ``bytearray``: b
* ``None``: N
* ``True``: T
* ``False``: F
If you want to encode a Python object to another OSC type, you have to pass
a ``(typetag, data)`` tuple, where ``data`` must be of the appropriate type
according to the following table:
* c: ``str`` of length 1
* h: ``int``
* d: ``float``
* I: ``None`` (unused)
* m: ``tuple / list`` of 4 ``int``s or ``bytes / bytearray`` of length 4
* r: same as 'm'
* t: OSC timetag as as ``int / float`` seconds since the NTP epoch
* S: ``str``
"""
assert address.startswith('/'), "Address pattern must start with a slash."
data = []
types = [',']
for arg in args:
type_ = type(arg)
if isinstance(arg, tuple):
typetag, arg = arg
else:
typetag = TYPE_MAP.get(type_) or TYPE_MAP.get(arg)
if typetag in 'ifd':
data.append(pack('>' + typetag, arg))
elif typetag in 'sS':
data.append(pack_string(arg))
elif typetag == 'b':
data.append(pack_blob(arg))
elif typetag in 'rm':
data.append(pack_midi(arg))
elif typetag == 'c':
data.append(pack('>I', ord(arg)))
elif typetag == 'h':
data.append(pack('>q', arg))
elif typetag == 't':
data.append(pack_timetag(arg))
elif typetag not in 'IFNT':
raise TypeError("Argument of type '%s' not supported." % type_)
types.append(typetag)
return pack_string(address) + pack_string(''.join(types)) + b''.join(data)
class Client:
def __init__(self, host, port=None):
if port is None:
if isinstance(host, (list, tuple)):
host, port = host
else:
port = host
host = '127.0.0.1'
self.dest = pack_addr((host, port))
self.sock = None
def send(self, msg, *args, **kw):
dest = pack_addr(kw.get('dest', self.dest))
if not self.sock:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
if isinstance(msg, Bundle):
msg = pack_bundle(msg)
elif args or isinstance(msg, unicodetype):
msg = create_message(msg, *args)
self.sock.sendto(msg, dest)
def close(self):
if self.sock:
self.sock.close()
self.sock = None
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
def send(dest, address, *args):
with Client(dest) as client:
client.send(address, *args)

62
uosc/common.py Executable file
View File

@ -0,0 +1,62 @@
# -*- coding: utf-8 -*-
#
# uosc/common.py
#
"""OSC message parsing and building functions."""
try:
from time import time
except ImportError:
from utime import time
# UNIX_EPOCH = datetime.date(*time.gmtime(0)[0:3])
# NTP_EPOCH = datetime.date(1900, 1, 1)
# NTP_DELTA = (UNIX_EPOCH - NTP_EPOCH).days * 24 * 3600
NTP_DELTA = 2208988800
ISIZE = 4294967296 # 2**32
class Impulse:
pass
class Bundle:
"""Container for an OSC bundle."""
def __init__(self, *items):
"""Create bundle from given OSC timetag and messages/sub-bundles.
An OSC timetag can be given as the first positional argument, and must
be an int or float of seconds since the NTP epoch (1990-01-01 00:00).
It defaults to the current time.
Pass in messages or bundles via positional arguments as binary data
(bytes as returned by ``create_message`` resp. ``Bundle.pack``) or as
``Bundle`` instances or (address, *args) tuples.
"""
if items and isinstance(items[0], (int, float)):
self.timetag = items[0]
items = items[1:]
else:
self.timetag = time() + NTP_DELTA
self._items = list(items)
def add(self, *items):
self._items.extend(list(items))
def __iter__(self):
return iter(self._items)
def to_frac(t):
"""Return seconds and fractional part of NTP timestamp as 2-item tuple."""
sec = int(t)
return sec, int(abs(t - sec) * ISIZE)
def to_time(sec, frac):
"""Return NTP timestamp from integer seconds and fractional part."""
return sec + float(frac) / ISIZE

163
uosc/server.py Executable file
View File

@ -0,0 +1,163 @@
# -*- coding: utf-8 -*-
#
# uosc/server.py
#
"""A minimal OSC UDP server."""
import socket
try:
from ustruct import unpack
except ImportError:
from struct import unpack
from uosc.common import Impulse, to_time
#if __debug__:
# from uosc.socketutil import get_hostport
MAX_DGRAM_SIZE = 1472
def split_oscstr(msg, offset):
end = msg.find(b'\0', offset)
return msg[offset:end].decode('utf-8'), (end + 4) & ~0x03
def split_oscblob(msg, offset):
start = offset + 4
size = unpack('>I', msg[offset:start])[0]
return msg[start:start + size], (start + size + 4) & ~0x03
def parse_timetag(msg, offset):
"""Parse an OSC timetag from msg at offset."""
return to_time(unpack('>II', msg[offset:offset + 4]))
def parse_message(msg, strict=False):
args = []
addr, ofs = split_oscstr(msg, 0)
if not addr.startswith('/'):
raise ValueError("OSC address pattern must start with a slash.")
# type tag string must start with comma (ASCII 44)
if ofs < len(msg) and msg[ofs:ofs + 1] == b',':
tags, ofs = split_oscstr(msg, ofs)
tags = tags[1:]
else:
errmsg = "Missing/invalid OSC type tag string."
if strict:
raise ValueError(errmsg)
else:
print(errmsg + ' Ignoring arguments.')
tags = ''
for typetag in tags:
size = 0
if typetag in 'ifd':
size = 8 if typetag == 'd' else 4
args.append(unpack('>' + typetag, msg[ofs:ofs + size])[0])
elif typetag in 'sS':
s, ofs = split_oscstr(msg, ofs)
args.append(s)
elif typetag == 'b':
s, ofs = split_oscblob(msg, ofs)
args.append(s)
elif typetag in 'rm':
size = 4
args.append(unpack('BBBB', msg[ofs:ofs + size]))
elif typetag == 'c':
size = 4
args.append(chr(unpack('>I', msg[ofs:ofs + size])[0]))
elif typetag == 'h':
size = 8
args.append(unpack('>q', msg[ofs:ofs + size])[0])
elif typetag == 't':
size = 8
args.append(parse_timetag(msg, ofs))
elif typetag in 'TFNI':
args.append({'T': True, 'F': False, 'I': Impulse}.get(typetag))
else:
raise ValueError("Type tag '%s' not supported." % typetag)
ofs += size
return (addr, tags, tuple(args))
def parse_bundle(bundle, strict=False):
"""Parse a binary OSC bundle.
Returns a generator which walks over all contained messages and bundles
recursively, depth-first. Each item yielded is a (timetag, message) tuple.
"""
if not bundle.startswith(b'#bundle\0'):
raise TypeError("Bundle must start with b'#bundle\\0'.")
ofs = 16
timetag = to_time(*unpack('>II', bundle[8:ofs]))
while True:
if ofs >= len(bundle):
break
size = unpack('>I', bundle[ofs:ofs + 4])[0]
element = bundle[ofs + 4:ofs + 4 + size]
ofs += size + 4
if element.startswith(b'#bundle'):
for el in parse_bundle(element):
yield el
else:
yield timetag, parse_message(element, strict)
def handle_osc(data, src, dispatch=None, strict=False):
try:
head, _ = split_oscstr(data, 0)
if head.startswith('/'):
messages = [(-1, parse_message(data, strict))]
elif head == '#bundle':
messages = parse_bundle(data, strict)
except Exception as exc:
pass
#if __debug__:
# print(data)
return
try:
for timetag, (oscaddr, tags, args) in messages:
if __debug__:
print("OSC address: %s" % oscaddr)
print("OSC type tags: %r" % tags)
print("OSC arguments: %r" % (args,))
if dispatch:
dispatch(timetag, (oscaddr, tags, args, src))
except Exception as exc:
print("Exception in OSC handler: %s", exc)
def run_server(saddr, port, handler=handle_osc):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
if __debug__: print("Created OSC UDP server socket.")
sock.bind((saddr, port))
print("Listening for OSC messages on %s:%i.", saddr, port)
try:
while True:
data, caddr = sock.recvfrom(MAX_DGRAM_SIZE)
handler(data, caddr)
finally:
sock.close()
print("Bye!")

35
uosc/socketutil.py Executable file
View File

@ -0,0 +1,35 @@
# -*- coding: utf-8 -*-
#
# uosc/socketutil.py
#
import socket
INET_ADDRSTRLEN = 16
INET6_ADDRSTRLEN = 46
inet_ntoa = getattr(socket, 'inet_ntoa', None)
if not inet_ntoa:
import ffilib
inet_ntoa = ffilib.libc().func("s", "inet_ntoa", "p")
inet_ntop = getattr(socket, 'inet_ntop', None)
if not inet_ntop:
import ffilib
_inet_ntop = ffilib.libc().func("s", "inet_ntop", "iPpi")
def inet_ntop(af, addr):
buf = bytearray(INET_ADDRSTRLEN if af == socket.AF_INET else
INET6_ADDRSTRLEN)
res = _inet_ntop(af, addr, buf, INET_ADDRSTRLEN)
return res
def get_hostport(addr):
if isinstance(addr, tuple):
return addr
af, addr, port = socket.sockaddr(addr)
return inet_ntop(af, addr), port

86
uosc/threadedclient.py Executable file
View File

@ -0,0 +1,86 @@
# -*- coding: utf-8 -*-
"""OSC client running in a separate thread.
Communicates with the main thread via a queue. Provides the same API as the
non-threaded client, with a few threading-related extensions:
from uosc.threadedclient import ThreadedClient
# start=True starts the thread immediately
osc = ThreadedClient('192.168.0.42', 9001, start=True)
# if the OSC message can not placed in the queue within timeout
# raises a queue.Full error
osc.send('/pi', 3.14159, timeout=1.0)
# Stops and joins the thread and closes the client socket
osc.close()
"""
import logging
import threading
try:
import queue
except ImportError:
import Queue as queue
from uosc.client import Client
log = logging.getLogger(__name__)
class ThreadedClient(threading.Thread):
def __init__(self, host, port=None, start=False, timeout=3.0):
super(ThreadedClient, self).__init__()
self.host = host
self.port = port
self.timeout = timeout
self._q = queue.Queue()
if start:
self.start()
def run(self, *args, **kw):
self.client = Client((self.host, self.port))
while True:
msg = self._q.get()
if msg is None:
break
addr, msg = msg
log.debug("Sending OSC msg %s, %r", addr, msg)
self.client.send(addr, *msg)
self.client.close()
def send(self, addr, *args, **kw):
self._q.put((addr, args), timeout=kw.get('timeout', self.timeout))
def close(self, **kw):
timeout = kw.get('timeout', self.timeout)
log.debug("Emptying send queue...")
while True:
try:
self._q.get_nowait()
except queue.Empty:
break
if self.is_alive():
log.debug("Signalling OSC client thread to exit...")
self._q.put(None, timeout=timeout)
log.debug("Joining OSC client thread...")
self.join(timeout)
if self.is_alive():
log.warning("OSC client thread still alive after join().")
def __enter__(self):
return self
def __exit__(self, *args):
self.close()

25
wlan.py Executable file
View File

@ -0,0 +1,25 @@
def do_connect(ssid, password, tries=5):
from network import WLAN, STA_IF
from time import sleep
print('Loading Wifi module...')
sta_if = WLAN(STA_IF)
if not sta_if.isconnected():
sta_if.active(True)
sta_if.connect(ssid, password)
for i in range(tries):
print('Connecting to network (try {})...'.format(i+1))
if sta_if.isconnected():
netconf = sta_if.ifconfig()
print('network config:', netconf)
return netconf[0]
sleep(1)
else:
print("Failed to connect in {} seconds.".format(tries))
if __name__ == '__main__':
import netconf
doconnect(netconf.ssid, netconf.password)