MaxO/uosc/client.py
2020-11-15 18:47:23 +01:00

205 lines
5.3 KiB
Python
Executable File

# -*- coding: utf-8 -*-
#
# uosc/client.py
#
"""Simple OSC client."""
import socket
try:
from ustruct import pack
except ImportError:
from struct import pack
from uosc.common import Bundle, to_frac
if isinstance('', bytes):
have_bytes = False
unicodetype = unicode # noqa
else:
have_bytes = True
unicodetype = str
TYPE_MAP = {
int: 'i',
float: 'f',
bytes: 'b',
bytearray: 'b',
unicodetype: 's',
True: 'T',
False: 'F',
None: 'N',
}
def pack_addr(addr):
"""Pack a (host, port) tuple into the format expected by socket methods."""
if isinstance(addr, (bytes, bytearray)):
return addr
if len(addr) != 2:
raise NotImplementedError("Only IPv4/v6 supported")
addrinfo = socket.getaddrinfo(addr[0], addr[1])
return addrinfo[0][4]
def pack_timetag(t):
"""Pack an OSC timetag into 64-bit binary blob."""
return pack('>II', *to_frac(t))
def pack_string(s, encoding='utf-8'):
"""Pack a string into a binary OSC string."""
if isinstance(s, unicodetype):
s = s.encode(encoding)
assert all((i if have_bytes else ord(i)) < 128 for i in s), (
"OSC strings may only contain ASCII chars.")
slen = len(s)
return s + b'\0' * (((slen + 4) & ~0x03) - slen)
def pack_blob(b, encoding='utf-8'):
"""Pack a bytes, bytearray or tuple/list of ints into a binary OSC blob."""
if isinstance(b, (tuple, list)):
b = bytearray(b)
elif isinstance(b, unicodetype):
b = b.encode(encoding)
blen = len(b)
b = pack('>I', blen) + bytes(b)
return b + b'\0' * (((blen + 3) & ~0x03) - blen)
def pack_bundle(bundle):
"""Return bundle data packed into a binary string."""
data = []
for msg in bundle:
if isinstance(msg, Bundle):
msg = pack_bundle(msg)
elif isinstance(msg, tuple):
msg = create_message(*msg)
data.append(pack('>I', len(msg)) + msg)
return b'#bundle\0' + pack_timetag(bundle.timetag) + b''.join(data)
def pack_midi(val):
assert not isinstance(val, unicodetype), (
"Value with tag 'm' or 'r' must be bytes, bytearray or a sequence of "
"ints, not %s" % unicodetype)
if not have_bytes and isinstance(val, str):
val = (ord(c) for c in val)
return pack('BBBB', *tuple(val))
def create_message(address, *args):
"""Create an OSC message with given address pattern and arguments.
The OSC types are either inferred from the Python types of the arguments,
or you can pass arguments as 2-item tuples with the OSC typetag as the
first item and the argument value as the second. Python objects are mapped
to OSC typetags as follows:
* ``int``: i
* ``float``: f
* ``str``: s
* ``bytes`` / ``bytearray``: b
* ``None``: N
* ``True``: T
* ``False``: F
If you want to encode a Python object to another OSC type, you have to pass
a ``(typetag, data)`` tuple, where ``data`` must be of the appropriate type
according to the following table:
* c: ``str`` of length 1
* h: ``int``
* d: ``float``
* I: ``None`` (unused)
* m: ``tuple / list`` of 4 ``int``s or ``bytes / bytearray`` of length 4
* r: same as 'm'
* t: OSC timetag as as ``int / float`` seconds since the NTP epoch
* S: ``str``
"""
assert address.startswith('/'), "Address pattern must start with a slash."
data = []
types = [',']
for arg in args:
type_ = type(arg)
if isinstance(arg, tuple):
typetag, arg = arg
else:
typetag = TYPE_MAP.get(type_) or TYPE_MAP.get(arg)
if typetag in 'ifd':
data.append(pack('>' + typetag, arg))
elif typetag in 'sS':
data.append(pack_string(arg))
elif typetag == 'b':
data.append(pack_blob(arg))
elif typetag in 'rm':
data.append(pack_midi(arg))
elif typetag == 'c':
data.append(pack('>I', ord(arg)))
elif typetag == 'h':
data.append(pack('>q', arg))
elif typetag == 't':
data.append(pack_timetag(arg))
elif typetag not in 'IFNT':
raise TypeError("Argument of type '%s' not supported." % type_)
types.append(typetag)
return pack_string(address) + pack_string(''.join(types)) + b''.join(data)
class Client:
def __init__(self, host, port=None):
if port is None:
if isinstance(host, (list, tuple)):
host, port = host
else:
port = host
host = '127.0.0.1'
self.dest = pack_addr((host, port))
self.sock = None
def send(self, msg, *args, **kw):
dest = pack_addr(kw.get('dest', self.dest))
if not self.sock:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
if isinstance(msg, Bundle):
msg = pack_bundle(msg)
elif args or isinstance(msg, unicodetype):
msg = create_message(msg, *args)
self.sock.sendto(msg, dest)
def close(self):
if self.sock:
self.sock.close()
self.sock = None
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
def send(dest, address, *args):
with Client(dest) as client:
client.send(address, *args)