forked from protonphoton/LJ
2874 lines
101 KiB
Python
2874 lines
101 KiB
Python
#!/usr/bin/python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
"""
|
|
March 2015:
|
|
Python 3 version tested in Blender and simpleOSC with twisted
|
|
|
|
This module contains an OpenSoundControl implementation (in Pure Python), based
|
|
(somewhat) on the good old 'SimpleOSC' implementation by Daniel Holth & Clinton
|
|
McChesney.
|
|
|
|
This implementation is intended to still be 'simple' to the user, but much more
|
|
complete (with OSCServer & OSCClient classes) and much more powerful (the
|
|
OSCMultiClient supports subscriptions & message-filtering, OSCMessage &
|
|
OSCBundle are now proper container-types)
|
|
|
|
===============================================================================
|
|
OpenSoundControl
|
|
===============================================================================
|
|
|
|
OpenSoundControl is a network-protocol for sending (small) packets of addressed
|
|
data over network sockets. This OSC-implementation supports the classical
|
|
UDP/IP protocol for sending and receiving packets but provides as well support
|
|
for TCP/IP streaming, whereas the message size is prepended as int32 (big
|
|
endian) before each message/packet.
|
|
|
|
OSC-packets come in two kinds:
|
|
|
|
- OSC-messages consist of an 'address'-string (not to be confused with a
|
|
(host:port) network-address!), followed by a string of 'typetags'
|
|
associated with the message's arguments (ie. 'payload'), and finally the
|
|
arguments themselves, encoded in an OSC-specific way. The OSCMessage class
|
|
makes it easy to create & manipulate OSC-messages of this kind in a
|
|
'pythonesque' way (that is, OSCMessage-objects behave a lot like lists)
|
|
|
|
- OSC-bundles are a special type of OSC-message containing only
|
|
OSC-messages as 'payload'. Recursively. (meaning; an OSC-bundle could
|
|
contain other OSC-bundles, containing OSC-bundles etc.)
|
|
|
|
OSC-bundles start with the special keyword '#bundle' and do not have an
|
|
OSC-address (but the OSC-messages a bundle contains will have OSC-addresses!).
|
|
Also, an OSC-bundle can have a timetag, essentially telling the receiving
|
|
server to 'hold' the bundle until the specified time. The OSCBundle class
|
|
allows easy cration & manipulation of OSC-bundles.
|
|
|
|
For further information see also http://opensoundcontrol.org/spec-1_0
|
|
|
|
-------------------------------------------------------------------------------
|
|
|
|
To send OSC-messages, you need an OSCClient, and to receive OSC-messages you
|
|
need an OSCServer.
|
|
|
|
The OSCClient uses an 'AF_INET / SOCK_DGRAM' type socket (see the 'socket'
|
|
module) to send binary representations of OSC-messages to a remote host:port
|
|
address.
|
|
|
|
The OSCServer listens on an 'AF_INET / SOCK_DGRAM' type socket bound to a local
|
|
port, and handles incoming requests. Either one-after-the-other (OSCServer) or
|
|
in a multi-threaded / multi-process fashion (ThreadingOSCServer/
|
|
ForkingOSCServer). If the Server has a callback-function (a.k.a. handler)
|
|
registered to 'deal with' (i.e. handle) the received message's OSC-address,
|
|
that function is called, passing it the (decoded) message.
|
|
|
|
The different OSCServers implemented here all support the (recursive) un-
|
|
bundling of OSC-bundles, and OSC-bundle timetags.
|
|
|
|
In fact, this implementation supports:
|
|
|
|
- OSC-messages with 'i' (int32), 'f' (float32), 'd' (double), 's' (string) and
|
|
'b' (blob / binary data) types
|
|
- OSC-bundles, including timetag-support
|
|
- OSC-address patterns including '*', '?', '{,}' and '[]' wildcards.
|
|
|
|
(please *do* read the OSC-spec! http://opensoundcontrol.org/spec-1_0 it
|
|
explains what these things mean.)
|
|
|
|
In addition, the OSCMultiClient supports:
|
|
- Sending a specific OSC-message to multiple remote servers
|
|
- Remote server subscription / unsubscription (through OSC-messages, of course)
|
|
- Message-address filtering.
|
|
|
|
-------------------------------------------------------------------------------
|
|
SimpleOSC:
|
|
Copyright (c) Daniel Holth & Clinton McChesney.
|
|
pyOSC:
|
|
Copyright (c) 2008-2010, Artem Baguinski <artm@v2.nl> et al., Stock, V2_Lab, Rotterdam, Netherlands.
|
|
Streaming support (OSC over TCP):
|
|
Copyright (c) 2010 Uli Franke <uli.franke@weiss.ch>, Weiss Engineering, Uster, Switzerland.
|
|
|
|
-------------------------------------------------------------------------------
|
|
Changelog:
|
|
-------------------------------------------------------------------------------
|
|
v0.3.0 - 27 Dec. 2007
|
|
Started out to extend the 'SimpleOSC' implementation (v0.2.3) by Daniel Holth & Clinton McChesney.
|
|
Rewrote OSCMessage
|
|
Added OSCBundle
|
|
|
|
v0.3.1 - 3 Jan. 2008
|
|
Added OSClient
|
|
Added OSCRequestHandler, loosely based on the original CallbackManager
|
|
Added OSCServer
|
|
Removed original CallbackManager
|
|
Adapted testing-script (the 'if __name__ == "__main__":' block at the end) to use new Server & Client
|
|
|
|
v0.3.2 - 5 Jan. 2008
|
|
Added 'container-type emulation' methods (getitem(), setitem(), __iter__() & friends) to OSCMessage
|
|
Added ThreadingOSCServer & ForkingOSCServer
|
|
- 6 Jan. 2008
|
|
Added OSCMultiClient
|
|
Added command-line options to testing-script (try 'python OSC.py --help')
|
|
|
|
v0.3.3 - 9 Jan. 2008
|
|
Added OSC-timetag support to OSCBundle & OSCRequestHandler
|
|
Added ThreadingOSCRequestHandler
|
|
|
|
v0.3.4 - 13 Jan. 2008
|
|
Added message-filtering to OSCMultiClient
|
|
Added subscription-handler to OSCServer
|
|
Added support fon numpy/scipy int & float types. (these get converted to 'standard' 32-bit OSC ints / floats!)
|
|
Cleaned-up and added more Docstrings
|
|
|
|
v0.3.5 - 14 aug. 2008
|
|
Added OSCServer.reportErr(...) method
|
|
|
|
v0.3.6 - 19 April 2010
|
|
Added Streaming support (OSC over TCP)
|
|
Updated documentation
|
|
Moved pattern matching stuff into separate class (OSCAddressSpace) to
|
|
facilitate implementation of different server and client architectures.
|
|
Callbacks feature now a context (object oriented) but dynamic function
|
|
inspection keeps the code backward compatible
|
|
Moved testing code into separate testbench (testbench.py)
|
|
|
|
-----------------
|
|
Original Comments
|
|
-----------------
|
|
> Open SoundControl for Python
|
|
> Copyright (C) 2002 Daniel Holth, Clinton McChesney
|
|
>
|
|
> This library is free software; you can redistribute it and/or modify it under
|
|
> the terms of the GNU Lesser General Public License as published by the Free
|
|
> Software Foundation; either version 2.1 of the License, or (at your option) any
|
|
> later version.
|
|
>
|
|
> This library is distributed in the hope that it will be useful, but WITHOUT ANY
|
|
> WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
|
|
> PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
|
|
> details.
|
|
>
|
|
> You should have received a copy of the GNU Lesser General Public License along
|
|
> with this library; if not, write to the Free Software Foundation, Inc., 59
|
|
> Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
|
>
|
|
> For questions regarding this module contact Daniel Holth <dholth@stetson.edu>
|
|
> or visit http://www.stetson.edu/~ProctoLogic/
|
|
>
|
|
> Changelog:
|
|
> 15 Nov. 2001:
|
|
> Removed dependency on Python 2.0 features.
|
|
> - dwh
|
|
> 13 Feb. 2002:
|
|
> Added a generic callback handler.
|
|
> - dwh
|
|
"""
|
|
|
|
import math, re, socket, select, string, struct, sys, threading, time, types, array, errno, inspect
|
|
from socketserver import UDPServer, DatagramRequestHandler, ForkingMixIn, ThreadingMixIn, StreamRequestHandler, TCPServer
|
|
from contextlib import closing
|
|
|
|
global version
|
|
version = ("0.3","6", "$Rev: 6382 $"[6:-2])
|
|
|
|
global FloatTypes
|
|
FloatTypes = [float]
|
|
|
|
global IntTypes
|
|
IntTypes = [int]
|
|
|
|
global NTP_epoch
|
|
from calendar import timegm
|
|
NTP_epoch = timegm((1900,1,1,0,0,0)) # NTP time started in 1 Jan 1900
|
|
del timegm
|
|
|
|
global NTP_units_per_second
|
|
NTP_units_per_second = 0x100000000 # about 232 picoseconds
|
|
|
|
##
|
|
# numpy/scipy support:
|
|
##
|
|
|
|
try:
|
|
from numpy import typeDict
|
|
|
|
for ftype in ['float32', 'float64', 'float128']:
|
|
try:
|
|
FloatTypes.append(typeDict[ftype])
|
|
except KeyError:
|
|
pass
|
|
|
|
for itype in ['int8', 'int16', 'int32', 'int64']:
|
|
try:
|
|
IntTypes.append(typeDict[itype])
|
|
IntTypes.append(typeDict['u' + itype])
|
|
except KeyError:
|
|
pass
|
|
|
|
# thanks for those...
|
|
del typeDict, ftype, itype
|
|
|
|
except ImportError:
|
|
pass
|
|
|
|
######
|
|
#
|
|
# OSCMessage classes
|
|
#
|
|
######
|
|
|
|
class OSCMessage(object):
|
|
""" Builds typetagged OSC messages.
|
|
|
|
OSCMessage objects are container objects for building OSC-messages.
|
|
On the 'front' end, they behave much like list-objects, and on the 'back' end
|
|
they generate a binary representation of the message, which can be sent over a network socket.
|
|
OSC-messages consist of an 'address'-string (not to be confused with a (host, port) IP-address!),
|
|
followed by a string of 'typetags' associated with the message's arguments (ie. 'payload'),
|
|
and finally the arguments themselves, encoded in an OSC-specific way.
|
|
|
|
On the Python end, OSCMessage are lists of arguments, prepended by the message's address.
|
|
The message contents can be manipulated much like a list:
|
|
>>> msg = OSCMessage("/my/osc/address")
|
|
>>> msg.append('something')
|
|
>>> msg.insert(0, 'something else')
|
|
>>> msg[1] = 'entirely'
|
|
>>> msg.extend([1,2,3.])
|
|
>>> msg += [4, 5, 6.]
|
|
>>> del msg[3:6]
|
|
>>> msg.pop(-2)
|
|
5
|
|
>>> print msg
|
|
/my/osc/address ['something else', 'entirely', 1, 6.0]
|
|
|
|
OSCMessages can be concatenated with the + operator. In this case, the resulting OSCMessage
|
|
inherits its address from the left-hand operand. The right-hand operand's address is ignored.
|
|
To construct an 'OSC-bundle' from multiple OSCMessage, see OSCBundle!
|
|
|
|
Additional methods exist for retreiving typetags or manipulating items as (typetag, value) tuples.
|
|
"""
|
|
def __init__(self, address=""):
|
|
"""Instantiate a new OSCMessage.
|
|
The OSC-address can be specified with the 'address' argument
|
|
"""
|
|
self.clear(address)
|
|
|
|
def setAddress(self, address):
|
|
"""Set or change the OSC-address
|
|
"""
|
|
self.address = address
|
|
|
|
def clear(self, address=""):
|
|
"""Clear (or set a new) OSC-address and clear any arguments appended so far
|
|
"""
|
|
self.address = address
|
|
self.clearData()
|
|
|
|
def clearData(self):
|
|
"""Clear any arguments appended so far
|
|
"""
|
|
self.typetags = ","
|
|
self.message = b""
|
|
|
|
def append(self, argument, typehint=None):
|
|
"""Appends data to the message, updating the typetags based on
|
|
the argument's type. If the argument is a blob (counted
|
|
string) pass in 'b' as typehint.
|
|
'argument' may also be a list or tuple, in which case its elements
|
|
will get appended one-by-one, all using the provided typehint
|
|
"""
|
|
if isinstance(argument,dict):
|
|
argument = list(argument.items())
|
|
elif isinstance(argument, OSCMessage):
|
|
raise TypeError("Can only append 'OSCMessage' to 'OSCBundle'")
|
|
|
|
if hasattr(argument, '__iter__') and not type(argument) in (str,bytes):
|
|
for arg in argument:
|
|
self.append(arg, typehint)
|
|
|
|
return
|
|
|
|
if typehint == 'b':
|
|
binary = OSCBlob(argument)
|
|
tag = 'b'
|
|
elif typehint == 't':
|
|
binary = OSCTimeTag(argument)
|
|
tag = 't'
|
|
else:
|
|
tag, binary = OSCArgument(argument, typehint)
|
|
|
|
self.typetags += tag
|
|
self.message += binary
|
|
|
|
def getBinary(self):
|
|
"""Returns the binary representation of the message
|
|
"""
|
|
binary = OSCString(self.address)
|
|
binary += OSCString(self.typetags)
|
|
binary += self.message
|
|
|
|
return binary
|
|
|
|
def __repr__(self):
|
|
"""Returns a string containing the decode Message
|
|
"""
|
|
return str(decodeOSC(self.getBinary()))
|
|
|
|
def __str__(self):
|
|
"""Returns the Message's address and contents as a string.
|
|
"""
|
|
return "%s %s" % (self.address, str(list(self.values())))
|
|
|
|
def __len__(self):
|
|
"""Returns the number of arguments appended so far
|
|
"""
|
|
return (len(self.typetags) - 1)
|
|
|
|
def __eq__(self, other):
|
|
"""Return True if two OSCMessages have the same address & content
|
|
"""
|
|
if not isinstance(other, self.__class__):
|
|
return False
|
|
|
|
return (self.address == other.address) and (self.typetags == other.typetags) and (self.message == other.message)
|
|
|
|
def __ne__(self, other):
|
|
"""Return (not self.__eq__(other))
|
|
"""
|
|
return not self.__eq__(other)
|
|
|
|
def __add__(self, values):
|
|
"""Returns a copy of self, with the contents of 'values' appended
|
|
(see the 'extend()' method, below)
|
|
"""
|
|
msg = self.copy()
|
|
msg.extend(values)
|
|
return msg
|
|
|
|
def __iadd__(self, values):
|
|
"""Appends the contents of 'values'
|
|
(equivalent to 'extend()', below)
|
|
Returns self
|
|
"""
|
|
self.extend(values)
|
|
return self
|
|
|
|
def __radd__(self, values):
|
|
"""Appends the contents of this OSCMessage to 'values'
|
|
Returns the extended 'values' (list or tuple)
|
|
"""
|
|
out = list(values)
|
|
out.extend(list(self.values()))
|
|
|
|
if isinstance(values,tuple):
|
|
return tuple(out)
|
|
|
|
return out
|
|
|
|
def _reencode(self, items):
|
|
"""Erase & rebuild the OSCMessage contents from the given
|
|
list of (typehint, value) tuples"""
|
|
self.clearData()
|
|
for item in items:
|
|
self.append(item[1], item[0])
|
|
|
|
def values(self):
|
|
"""Returns a list of the arguments appended so far
|
|
"""
|
|
return decodeOSC(self.getBinary())[2:]
|
|
|
|
def tags(self):
|
|
"""Returns a list of typetags of the appended arguments
|
|
"""
|
|
return list(self.typetags.lstrip(','))
|
|
|
|
def items(self):
|
|
"""Returns a list of (typetag, value) tuples for
|
|
the arguments appended so far
|
|
"""
|
|
out = []
|
|
values = list(self.values())
|
|
typetags = self.tags()
|
|
for i in range(len(values)):
|
|
out.append((typetags[i], values[i]))
|
|
|
|
return out
|
|
|
|
def __contains__(self, val):
|
|
"""Test if the given value appears in the OSCMessage's arguments
|
|
"""
|
|
return (val in list(self.values()))
|
|
|
|
def __getitem__(self, i):
|
|
"""Returns the indicated argument (or slice)
|
|
"""
|
|
return list(self.values())[i]
|
|
|
|
def __delitem__(self, i):
|
|
"""Removes the indicated argument (or slice)
|
|
"""
|
|
items = list(self.items())
|
|
del items[i]
|
|
|
|
self._reencode(items)
|
|
|
|
def _buildItemList(self, values, typehint=None):
|
|
if isinstance(values, OSCMessage):
|
|
items = list(values.items())
|
|
elif isinstance(values,list):
|
|
items = []
|
|
for val in values:
|
|
if isinstance(val,tuple):
|
|
items.append(val[:2])
|
|
else:
|
|
items.append((typehint, val))
|
|
elif isinstance(values,tuple):
|
|
items = [values[:2]]
|
|
else:
|
|
items = [(typehint, values)]
|
|
|
|
return items
|
|
|
|
def __setitem__(self, i, val):
|
|
"""Set indicatated argument (or slice) to a new value.
|
|
'val' can be a single int/float/string, or a (typehint, value) tuple.
|
|
Or, if 'i' is a slice, a list of these or another OSCMessage.
|
|
"""
|
|
items = list(self.items())
|
|
|
|
new_items = self._buildItemList(val)
|
|
|
|
if not isinstance(i,slice):
|
|
if len(new_items) != 1:
|
|
raise TypeError("single-item assignment expects a single value or a (typetag, value) tuple")
|
|
|
|
new_items = new_items[0]
|
|
|
|
# finally...
|
|
items[i] = new_items
|
|
|
|
self._reencode(items)
|
|
|
|
def setItem(self, i, val, typehint=None):
|
|
"""Set indicated argument to a new value (with typehint)
|
|
"""
|
|
items = list(self.items())
|
|
|
|
items[i] = (typehint, val)
|
|
|
|
self._reencode(items)
|
|
|
|
def copy(self):
|
|
"""Returns a deep copy of this OSCMessage
|
|
"""
|
|
msg = self.__class__(self.address)
|
|
msg.typetags = self.typetags
|
|
msg.message = self.message
|
|
return msg
|
|
|
|
def count(self, val):
|
|
"""Returns the number of times the given value occurs in the OSCMessage's arguments
|
|
"""
|
|
return list(self.values()).count(val)
|
|
|
|
def index(self, val):
|
|
"""Returns the index of the first occurence of the given value in the OSCMessage's arguments.
|
|
Raises ValueError if val isn't found
|
|
"""
|
|
return list(self.values()).index(val)
|
|
|
|
def extend(self, values):
|
|
"""Append the contents of 'values' to this OSCMessage.
|
|
'values' can be another OSCMessage, or a list/tuple of ints/floats/strings
|
|
"""
|
|
items = list(self.items()) + self._buildItemList(values)
|
|
|
|
self._reencode(items)
|
|
|
|
def insert(self, i, val, typehint = None):
|
|
"""Insert given value (with optional typehint) into the OSCMessage
|
|
at the given index.
|
|
"""
|
|
items = list(self.items())
|
|
|
|
for item in reversed(self._buildItemList(val)):
|
|
items.insert(i, item)
|
|
|
|
self._reencode(items)
|
|
|
|
def popitem(self, i):
|
|
"""Delete the indicated argument from the OSCMessage, and return it
|
|
as a (typetag, value) tuple.
|
|
"""
|
|
items = list(self.items())
|
|
|
|
item = items.pop(i)
|
|
|
|
self._reencode(items)
|
|
|
|
return item
|
|
|
|
def pop(self, i):
|
|
"""Delete the indicated argument from the OSCMessage, and return it.
|
|
"""
|
|
return self.popitem(i)[1]
|
|
|
|
def reverse(self):
|
|
"""Reverses the arguments of the OSCMessage (in place)
|
|
"""
|
|
items = list(self.items())
|
|
|
|
items.reverse()
|
|
|
|
self._reencode(items)
|
|
|
|
def remove(self, val):
|
|
"""Removes the first argument with the given value from the OSCMessage.
|
|
Raises ValueError if val isn't found.
|
|
"""
|
|
items = list(self.items())
|
|
|
|
# this is not very efficient...
|
|
i = 0
|
|
for (t, v) in items:
|
|
if (v == val):
|
|
break
|
|
i += 1
|
|
else:
|
|
raise ValueError("'%s' not in OSCMessage" % str(m))
|
|
# but more efficient than first calling self.values().index(val),
|
|
# then calling self.items(), which would in turn call self.values() again...
|
|
|
|
del items[i]
|
|
|
|
self._reencode(items)
|
|
|
|
def __iter__(self):
|
|
"""Returns an iterator of the OSCMessage's arguments
|
|
"""
|
|
return iter(list(self.values()))
|
|
|
|
def __reversed__(self):
|
|
"""Returns a reverse iterator of the OSCMessage's arguments
|
|
"""
|
|
return reversed(list(self.values()))
|
|
|
|
def itervalues(self):
|
|
"""Returns an iterator of the OSCMessage's arguments
|
|
"""
|
|
return iter(list(self.values()))
|
|
|
|
def iteritems(self):
|
|
"""Returns an iterator of the OSCMessage's arguments as
|
|
(typetag, value) tuples
|
|
"""
|
|
return iter(list(self.items()))
|
|
|
|
def itertags(self):
|
|
"""Returns an iterator of the OSCMessage's arguments' typetags
|
|
"""
|
|
return iter(self.tags())
|
|
|
|
class OSCBundle(OSCMessage):
|
|
"""Builds a 'bundle' of OSC messages.
|
|
|
|
OSCBundle objects are container objects for building OSC-bundles of OSC-messages.
|
|
An OSC-bundle is a special kind of OSC-message which contains a list of OSC-messages
|
|
(And yes, OSC-bundles may contain other OSC-bundles...)
|
|
|
|
OSCBundle objects behave much the same as OSCMessage objects, with these exceptions:
|
|
- if an item or items to be appended or inserted are not OSCMessage objects,
|
|
OSCMessage objectss are created to encapsulate the item(s)
|
|
- an OSC-bundle does not have an address of its own, only the contained OSC-messages do.
|
|
The OSCBundle's 'address' is inherited by any OSCMessage the OSCBundle object creates.
|
|
- OSC-bundles have a timetag to tell the receiver when the bundle should be processed.
|
|
The default timetag value (0) means 'immediately'
|
|
"""
|
|
def __init__(self, address="", time=0):
|
|
"""Instantiate a new OSCBundle.
|
|
The default OSC-address for newly created OSCMessages
|
|
can be specified with the 'address' argument
|
|
The bundle's timetag can be set with the 'time' argument
|
|
"""
|
|
super(OSCBundle, self).__init__(address)
|
|
self.timetag = time
|
|
|
|
def __str__(self):
|
|
"""Returns the Bundle's contents (and timetag, if nonzero) as a string.
|
|
"""
|
|
if (self.timetag > 0.):
|
|
out = "#bundle (%s) [" % self.getTimeTagStr()
|
|
else:
|
|
out = "#bundle ["
|
|
|
|
if self.__len__():
|
|
for val in list(self.values()):
|
|
out += "%s, " % str(val)
|
|
out = out[:-2] # strip trailing space and comma
|
|
|
|
return out + "]"
|
|
|
|
def setTimeTag(self, time):
|
|
"""Set or change the OSCBundle's TimeTag
|
|
In 'Python Time', that's floating seconds since the Epoch
|
|
"""
|
|
if time >= 0:
|
|
self.timetag = time
|
|
|
|
def getTimeTagStr(self):
|
|
"""Return the TimeTag as a human-readable string
|
|
"""
|
|
fract, secs = math.modf(self.timetag)
|
|
out = time.ctime(secs)[11:19]
|
|
out += ("%.3f" % fract)[1:]
|
|
|
|
return out
|
|
|
|
def append(self, argument, typehint = None):
|
|
"""Appends data to the bundle, creating an OSCMessage to encapsulate
|
|
the provided argument unless this is already an OSCMessage.
|
|
Any newly created OSCMessage inherits the OSCBundle's address at the time of creation.
|
|
If 'argument' is an iterable, its elements will be encapsuated by a single OSCMessage.
|
|
Finally, 'argument' can be (or contain) a dict, which will be 'converted' to an OSCMessage;
|
|
- if 'addr' appears in the dict, its value overrides the OSCBundle's address
|
|
- if 'args' appears in the dict, its value(s) become the OSCMessage's arguments
|
|
"""
|
|
if isinstance(argument, OSCMessage):
|
|
binary = OSCBlob(argument.getBinary())
|
|
else:
|
|
msg = OSCMessage(self.address)
|
|
if isinstance(argument,dict):
|
|
if 'addr' in argument:
|
|
msg.setAddress(argument['addr'])
|
|
if 'args' in argument:
|
|
msg.append(argument['args'], typehint)
|
|
else:
|
|
msg.append(argument, typehint)
|
|
|
|
binary = OSCBlob(msg.getBinary())
|
|
|
|
self.message += binary
|
|
self.typetags += 'b'
|
|
|
|
def getBinary(self):
|
|
"""Returns the binary representation of the message
|
|
"""
|
|
binary = OSCString("#bundle")
|
|
binary += OSCTimeTag(self.timetag)
|
|
binary += self.message
|
|
|
|
return binary
|
|
|
|
def _reencapsulate(self, decoded):
|
|
if decoded[0] == "#bundle":
|
|
msg = OSCBundle()
|
|
msg.setTimeTag(decoded[1])
|
|
for submsg in decoded[2:]:
|
|
msg.append(self._reencapsulate(submsg))
|
|
|
|
else:
|
|
msg = OSCMessage(decoded[0])
|
|
tags = decoded[1].lstrip(',')
|
|
for i in range(len(tags)):
|
|
msg.append(decoded[2+i], tags[i])
|
|
|
|
return msg
|
|
|
|
def values(self):
|
|
"""Returns a list of the OSCMessages appended so far
|
|
"""
|
|
out = []
|
|
for decoded in decodeOSC(self.getBinary())[2:]:
|
|
out.append(self._reencapsulate(decoded))
|
|
|
|
return out
|
|
|
|
def __eq__(self, other):
|
|
"""Return True if two OSCBundles have the same timetag & content
|
|
"""
|
|
if not isinstance(other, self.__class__):
|
|
return False
|
|
|
|
return (self.timetag == other.timetag) and (self.typetags == other.typetags) and (self.message == other.message)
|
|
|
|
def copy(self):
|
|
"""Returns a deep copy of this OSCBundle
|
|
"""
|
|
copy = super(OSCBundle, self).copy()
|
|
copy.timetag = self.timetag
|
|
return copy
|
|
|
|
######
|
|
#
|
|
# OSCMessage encoding functions
|
|
#
|
|
######
|
|
|
|
def OSCString(next):
|
|
"""Convert a string into a zero-padded OSC String.
|
|
The length of the resulting string is always a multiple of 4 bytes.
|
|
The string ends with 1 to 4 zero-bytes ('\x00')
|
|
"""
|
|
|
|
OSCstringLength = math.ceil((len(next)+1) / 4.0) * 4
|
|
return struct.pack(">%ds" % (OSCstringLength), str(next).encode('latin1'))
|
|
|
|
def OSCBlob(next):
|
|
"""Convert a string into an OSC Blob.
|
|
An OSC-Blob is a binary encoded block of data, prepended by a 'size' (int32).
|
|
The size is always a mutiple of 4 bytes.
|
|
The blob ends with 0 to 3 zero-bytes ('\x00')
|
|
"""
|
|
|
|
if isinstance(next,str):
|
|
next = next.encode('latin1')
|
|
if isinstance(next,bytes):
|
|
OSCblobLength = math.ceil((len(next)) / 4.0) * 4
|
|
binary = struct.pack(">i%ds" % (OSCblobLength), OSCblobLength, next)
|
|
else:
|
|
binary = b''
|
|
|
|
return binary
|
|
|
|
def OSCArgument(next, typehint=None):
|
|
""" Convert some Python types to their
|
|
OSC binary representations, returning a
|
|
(typetag, data) tuple.
|
|
"""
|
|
if not typehint:
|
|
if type(next) in FloatTypes:
|
|
binary = struct.pack(">f", float(next))
|
|
tag = 'f'
|
|
elif type(next) in IntTypes:
|
|
binary = struct.pack(">i", int(next))
|
|
tag = 'i'
|
|
else:
|
|
binary = OSCString(next)
|
|
tag = 's'
|
|
|
|
elif typehint == 'd':
|
|
try:
|
|
binary = struct.pack(">d", float(next))
|
|
tag = 'd'
|
|
except ValueError:
|
|
binary = OSCString(next)
|
|
tag = 's'
|
|
|
|
elif typehint == 'f':
|
|
try:
|
|
binary = struct.pack(">f", float(next))
|
|
tag = 'f'
|
|
except ValueError:
|
|
binary = OSCString(next)
|
|
tag = 's'
|
|
elif typehint == 'i':
|
|
try:
|
|
binary = struct.pack(">i", int(next))
|
|
tag = 'i'
|
|
except ValueError:
|
|
binary = OSCString(next)
|
|
tag = 's'
|
|
else:
|
|
binary = OSCString(next)
|
|
tag = 's'
|
|
|
|
return (tag, binary)
|
|
|
|
def OSCTimeTag(time):
|
|
"""Convert a time in floating seconds to its
|
|
OSC binary representation
|
|
"""
|
|
if time > 0:
|
|
fract, secs = math.modf(time)
|
|
secs = secs - NTP_epoch
|
|
binary = struct.pack('>LL', int(secs), int(fract * NTP_units_per_second))
|
|
else:
|
|
binary = struct.pack('>LL', 0, 1)
|
|
|
|
return binary
|
|
|
|
######
|
|
#
|
|
# OSCMessage decoding functions
|
|
#
|
|
######
|
|
|
|
def _readString(data):
|
|
"""Reads the next (null-terminated) block of data
|
|
"""
|
|
length = data.find(b'\0')
|
|
nextData = int(math.ceil((length+1) / 4.0) * 4)
|
|
return (data[0:length].decode('latin1'), data[nextData:])
|
|
|
|
def _readBlob(data):
|
|
"""Reads the next (numbered) block of data
|
|
"""
|
|
|
|
length = struct.unpack(">i", data[0:4])[0]
|
|
nextData = int(math.ceil((length) / 4.0) * 4) + 4
|
|
return (data[4:length+4], data[nextData:])
|
|
|
|
def _readInt(data):
|
|
"""Tries to interpret the next 4 bytes of the data
|
|
as a 32-bit integer. """
|
|
|
|
if(len(data)<4):
|
|
print("Error: too few bytes for int", data, len(data))
|
|
rest = data
|
|
integer = 0
|
|
else:
|
|
integer = struct.unpack(">i", data[0:4])[0]
|
|
rest = data[4:]
|
|
|
|
return (integer, rest)
|
|
|
|
def _readLong(data):
|
|
"""Tries to interpret the next 8 bytes of the data
|
|
as a 64-bit signed integer.
|
|
"""
|
|
|
|
high, low = struct.unpack(">ll", data[0:8])
|
|
big = (int(high) << 32) + low
|
|
rest = data[8:]
|
|
return (big, rest)
|
|
|
|
def _readTimeTag(data):
|
|
"""Tries to interpret the next 8 bytes of the data
|
|
as a TimeTag.
|
|
"""
|
|
high, low = struct.unpack(">LL", data[0:8])
|
|
if (high == 0) and (low <= 1):
|
|
time = 0.0
|
|
else:
|
|
time = int(NTP_epoch + high) + float(low / NTP_units_per_second)
|
|
rest = data[8:]
|
|
return (time, rest)
|
|
|
|
def _readFloat(data):
|
|
"""Tries to interpret the next 4 bytes of the data
|
|
as a 32-bit float.
|
|
"""
|
|
|
|
if(len(data)<4):
|
|
print("Error: too few bytes for float", data, len(data))
|
|
rest = data
|
|
float = 0
|
|
else:
|
|
float = struct.unpack(">f", data[0:4])[0]
|
|
rest = data[4:]
|
|
|
|
return (float, rest)
|
|
|
|
def _readDouble(data):
|
|
"""Tries to interpret the next 8 bytes of the data
|
|
as a 64-bit float.
|
|
"""
|
|
|
|
if(len(data)<8):
|
|
print("Error: too few bytes for double", data, len(data))
|
|
rest = data
|
|
float = 0
|
|
else:
|
|
float = struct.unpack(">d", data[0:8])[0]
|
|
rest = data[8:]
|
|
|
|
return (float, rest)
|
|
|
|
def decodeOSC(data):
|
|
"""Converts a binary OSC message to a Python list.
|
|
"""
|
|
table = {"i":_readInt, "f":_readFloat, "s":_readString, "b":_readBlob, "d":_readDouble, "t":_readTimeTag}
|
|
decoded = []
|
|
address, rest = _readString(data)
|
|
if address.startswith(","):
|
|
typetags = address
|
|
address = ""
|
|
else:
|
|
typetags = ""
|
|
|
|
if address == "#bundle":
|
|
time, rest = _readTimeTag(rest)
|
|
decoded.append(address)
|
|
decoded.append(time)
|
|
while len(rest)>0:
|
|
length, rest = _readInt(rest)
|
|
decoded.append(decodeOSC(rest[:length]))
|
|
rest = rest[length:]
|
|
|
|
elif len(rest)>0:
|
|
if not len(typetags):
|
|
typetags, rest = _readString(rest)
|
|
decoded.append(address)
|
|
decoded.append(typetags)
|
|
if typetags.startswith(","):
|
|
for tag in typetags[1:]:
|
|
value, rest = table[tag](rest)
|
|
decoded.append(value)
|
|
else:
|
|
raise OSCError("OSCMessage's typetag-string lacks the magic ','")
|
|
|
|
return decoded
|
|
|
|
######
|
|
#
|
|
# Utility functions
|
|
#
|
|
######
|
|
|
|
def hexDump(bytes):
|
|
""" Useful utility; prints the string in hexadecimal.
|
|
"""
|
|
print("byte 0 1 2 3 4 5 6 7 8 9 A B C D E F")
|
|
|
|
if isinstance(bytes,str):
|
|
bytes = bytes.encode('latin1')
|
|
num = len(bytes)
|
|
for i in range(num):
|
|
if (i) % 16 == 0:
|
|
line = "%02X0 : " % (i/16)
|
|
line += "%02X " % bytes[i]
|
|
if (i+1) % 16 == 0:
|
|
print("%s: %s" % (line, repr(bytes[i-15:i+1])))
|
|
line = ""
|
|
|
|
bytes_left = num % 16
|
|
if bytes_left:
|
|
print("%s: %s" % (line.ljust(54), repr(bytes[-bytes_left:])))
|
|
|
|
def getUrlStr(*args):
|
|
"""Convert provided arguments to a string in 'host:port/prefix' format
|
|
Args can be:
|
|
- (host, port)
|
|
- (host, port), prefix
|
|
- host, port
|
|
- host, port, prefix
|
|
"""
|
|
if not len(args):
|
|
return ""
|
|
|
|
if type(args[0]) == tuple:
|
|
host = args[0][0]
|
|
port = args[0][1]
|
|
args = args[1:]
|
|
else:
|
|
host = args[0]
|
|
port = args[1]
|
|
args = args[2:]
|
|
|
|
if len(args):
|
|
prefix = args[0]
|
|
else:
|
|
prefix = ""
|
|
|
|
if len(host) and (host != '0.0.0.0'):
|
|
try:
|
|
(host, _, _) = socket.gethostbyaddr(host)
|
|
except socket.error:
|
|
pass
|
|
else:
|
|
host = 'localhost'
|
|
|
|
if isinstance(port,int):
|
|
return "%s:%d%s" % (host, port, prefix)
|
|
else:
|
|
return host + prefix
|
|
|
|
def parseUrlStr(url):
|
|
"""Convert provided string in 'host:port/prefix' format to it's components
|
|
Returns ((host, port), prefix)
|
|
"""
|
|
if not (isinstance(url,str) and len(url)):
|
|
return (None, '')
|
|
|
|
i = url.find("://")
|
|
if i > -1:
|
|
url = url[i+3:]
|
|
|
|
i = url.find(':')
|
|
if i > -1:
|
|
host = url[:i].strip()
|
|
tail = url[i+1:].strip()
|
|
else:
|
|
host = ''
|
|
tail = url
|
|
|
|
for i in range(len(tail)):
|
|
if not tail[i].isdigit():
|
|
break
|
|
else:
|
|
i += 1
|
|
|
|
portstr = tail[:i].strip()
|
|
tail = tail[i:].strip()
|
|
|
|
found = len(tail)
|
|
for c in ('/', '+', '-', '*'):
|
|
i = tail.find(c)
|
|
if (i > -1) and (i < found):
|
|
found = i
|
|
|
|
head = tail[:found].strip()
|
|
prefix = tail[found:].strip()
|
|
|
|
prefix = prefix.strip('/')
|
|
if len(prefix) and prefix[0] not in ('+', '-', '*'):
|
|
prefix = '/' + prefix
|
|
|
|
if len(head) and not len(host):
|
|
host = head
|
|
|
|
if len(host):
|
|
try:
|
|
host = socket.gethostbyname(host)
|
|
except socket.error:
|
|
pass
|
|
|
|
try:
|
|
port = int(portstr)
|
|
except ValueError:
|
|
port = None
|
|
|
|
return ((host, port), prefix)
|
|
|
|
######
|
|
#
|
|
# OSCClient class
|
|
#
|
|
######
|
|
|
|
class OSCClient(object):
|
|
"""Simple OSC Client. Handles the sending of OSC-Packets (OSCMessage or OSCBundle) via a UDP-socket
|
|
"""
|
|
# set outgoing socket buffer size
|
|
sndbuf_size = 4096 * 8
|
|
|
|
def __init__(self, server=None):
|
|
"""Construct an OSC Client.
|
|
When the 'address' argument is given this client is connected to a specific remote server.
|
|
- address ((host, port) tuple): the address of the remote server to send all messages to
|
|
Otherwise it acts as a generic client:
|
|
If address == 'None', the client doesn't connect to a specific remote server,
|
|
and the remote address must be supplied when calling sendto()
|
|
- server: Local OSCServer-instance this client will use the socket of for transmissions.
|
|
If none is supplied, a socket will be created.
|
|
"""
|
|
self.socket = None
|
|
|
|
if server == None:
|
|
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, self.sndbuf_size)
|
|
self._fd = self.socket.fileno()
|
|
|
|
self.server = None
|
|
else:
|
|
self.setServer(server)
|
|
|
|
self.client_address = None
|
|
|
|
def setServer(self, server):
|
|
"""Associate this Client with given server.
|
|
The Client will send from the Server's socket.
|
|
The Server will use this Client instance to send replies.
|
|
"""
|
|
if not isinstance(server, OSCServer):
|
|
raise ValueError("'server' argument is not a valid OSCServer object")
|
|
|
|
if self.socket != None:
|
|
self.close()
|
|
|
|
self.socket = server.socket.dup()
|
|
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, self.sndbuf_size)
|
|
self._fd = self.socket.fileno()
|
|
|
|
self.server = server
|
|
|
|
if self.server.client != None:
|
|
self.server.client.close()
|
|
|
|
self.server.client = self
|
|
|
|
def close(self):
|
|
"""Disconnect & close the Client's socket
|
|
"""
|
|
if self.socket != None:
|
|
self.socket.close()
|
|
self.socket = None
|
|
|
|
def __str__(self):
|
|
"""Returns a string containing this Client's Class-name, software-version
|
|
and the remote-address it is connected to (if any)
|
|
"""
|
|
out = self.__class__.__name__
|
|
out += " v%s.%s-%s" % version
|
|
addr = self.address()
|
|
if addr:
|
|
out += " connected to osc://%s" % getUrlStr(addr)
|
|
else:
|
|
out += " (unconnected)"
|
|
|
|
return out
|
|
|
|
def __eq__(self, other):
|
|
"""Compare function.
|
|
"""
|
|
if not isinstance(other, self.__class__):
|
|
return False
|
|
|
|
isequal = cmp(self.socket._sock, other.socket._sock)
|
|
if isequal and self.server and other.server:
|
|
return cmp(self.server, other.server)
|
|
|
|
return isequal
|
|
|
|
def __ne__(self, other):
|
|
"""Compare function.
|
|
"""
|
|
return not self.__eq__(other)
|
|
|
|
def address(self):
|
|
"""Returns a (host,port) tuple of the remote server this client is
|
|
connected to or None if not connected to any server.
|
|
"""
|
|
try:
|
|
return self.socket.getpeername()
|
|
except socket.error:
|
|
return None
|
|
|
|
def connect(self, address):
|
|
"""Bind to a specific OSC server:
|
|
the 'address' argument is a (host, port) tuple
|
|
- host: hostname of the remote OSC server,
|
|
- port: UDP-port the remote OSC server listens to.
|
|
"""
|
|
try:
|
|
self.socket.connect(address)
|
|
self.client_address = address
|
|
except socket.error as e:
|
|
self.client_address = None
|
|
raise OSCClientError("SocketError: %s" % str(e))
|
|
|
|
if self.server != None:
|
|
self.server.return_port = address[1]
|
|
|
|
def sendto(self, msg, address, timeout=None):
|
|
"""Send the given OSCMessage to the specified address.
|
|
- msg: OSCMessage (or OSCBundle) to be sent
|
|
- address: (host, port) tuple specifing remote server to send the message to
|
|
- timeout: A timeout value for attempting to send. If timeout == None,
|
|
this call blocks until socket is available for writing.
|
|
Raises OSCClientError when timing out while waiting for the socket.
|
|
"""
|
|
if not isinstance(msg, OSCMessage):
|
|
raise TypeError("'msg' argument is not an OSCMessage or OSCBundle object")
|
|
|
|
ret = select.select([],[self._fd], [], timeout)
|
|
try:
|
|
ret[1].index(self._fd)
|
|
except:
|
|
# for the very rare case this might happen
|
|
raise OSCClientError("Timed out waiting for file descriptor")
|
|
|
|
try:
|
|
self.socket.connect(address)
|
|
self.socket.sendall(msg.getBinary())
|
|
|
|
if self.client_address:
|
|
self.socket.connect(self.client_address)
|
|
|
|
except socket.error as e:
|
|
if e[0] in (7, 65): # 7 = 'no address associated with nodename', 65 = 'no route to host'
|
|
raise e
|
|
else:
|
|
raise OSCClientError("while sending to %s: %s" % (str(address), str(e)))
|
|
|
|
def send(self, msg, timeout=None):
|
|
"""Send the given OSCMessage.
|
|
The Client must be already connected.
|
|
- msg: OSCMessage (or OSCBundle) to be sent
|
|
- timeout: A timeout value for attempting to send. If timeout == None,
|
|
this call blocks until socket is available for writing.
|
|
Raises OSCClientError when timing out while waiting for the socket,
|
|
or when the Client isn't connected to a remote server.
|
|
"""
|
|
if not isinstance(msg, OSCMessage):
|
|
raise TypeError("'msg' argument is not an OSCMessage or OSCBundle object")
|
|
|
|
ret = select.select([],[self._fd], [], timeout)
|
|
try:
|
|
ret[1].index(self._fd)
|
|
except:
|
|
# for the very rare case this might happen
|
|
raise OSCClientError("Timed out waiting for file descriptor")
|
|
|
|
try:
|
|
self.socket.sendall(msg.getBinary())
|
|
except socket.error as e:
|
|
if e[0] in (7, 65): # 7 = 'no address associated with nodename', 65 = 'no route to host'
|
|
raise e
|
|
else:
|
|
raise OSCClientError("while sending: %s" % str(e))
|
|
|
|
######
|
|
#
|
|
# FilterString Utility functions
|
|
#
|
|
######
|
|
|
|
def parseFilterStr(args):
|
|
"""Convert Message-Filter settings in '+<addr> -<addr> ...' format to a dict of the form
|
|
{ '<addr>':True, '<addr>':False, ... }
|
|
Returns a list: ['<prefix>', filters]
|
|
"""
|
|
out = {}
|
|
|
|
if isinstance(args,str):
|
|
args = [args]
|
|
|
|
prefix = None
|
|
for arg in args:
|
|
head = None
|
|
for plus in arg.split('+'):
|
|
minus = plus.split('-')
|
|
plusfs = minus.pop(0).strip()
|
|
if len(plusfs):
|
|
plusfs = '/' + plusfs.strip('/')
|
|
|
|
if (head == None) and (plusfs != "/*"):
|
|
head = plusfs
|
|
elif len(plusfs):
|
|
if plusfs == '/*':
|
|
out = { '/*':True } # reset all previous filters
|
|
else:
|
|
out[plusfs] = True
|
|
|
|
for minusfs in minus:
|
|
minusfs = minusfs.strip()
|
|
if len(minusfs):
|
|
minusfs = '/' + minusfs.strip('/')
|
|
if minusfs == '/*':
|
|
out = { '/*':False } # reset all previous filters
|
|
else:
|
|
out[minusfs] = False
|
|
|
|
if prefix == None:
|
|
prefix = head
|
|
|
|
return [prefix, out]
|
|
|
|
def getFilterStr(filters):
|
|
"""Return the given 'filters' dict as a list of
|
|
'+<addr>' | '-<addr>' filter-strings
|
|
"""
|
|
if not len(filters):
|
|
return []
|
|
|
|
if '/*' in list(filters.keys()):
|
|
if filters['/*']:
|
|
out = ["+/*"]
|
|
else:
|
|
out = ["-/*"]
|
|
else:
|
|
if False in list(filters.values()):
|
|
out = ["+/*"]
|
|
else:
|
|
out = ["-/*"]
|
|
|
|
for (addr, bool) in list(filters.items()):
|
|
if addr == '/*':
|
|
continue
|
|
|
|
if bool:
|
|
out.append("+%s" % addr)
|
|
else:
|
|
out.append("-%s" % addr)
|
|
|
|
return out
|
|
|
|
# A translation-table for mapping OSC-address expressions to Python 're' expressions
|
|
OSCtrans = str.maketrans("{,}?","(|).")
|
|
|
|
def getRegEx(pattern):
|
|
"""Compiles and returns a 'regular expression' object for the given address-pattern.
|
|
"""
|
|
# Translate OSC-address syntax to python 're' syntax
|
|
pattern = pattern.replace(".", r"\.") # first, escape all '.'s in the pattern.
|
|
pattern = pattern.replace("(", r"\(") # escape all '('s.
|
|
pattern = pattern.replace(")", r"\)") # escape all ')'s.
|
|
pattern = pattern.replace("*", r".*") # replace a '*' by '.*' (match 0 or more characters)
|
|
pattern = pattern.translate(OSCtrans) # change '?' to '.' and '{,}' to '(|)'
|
|
|
|
return re.compile(pattern)
|
|
|
|
######
|
|
#
|
|
# OSCMultiClient class
|
|
#
|
|
######
|
|
|
|
class OSCMultiClient(OSCClient):
|
|
"""'Multiple-Unicast' OSC Client. Handles the sending of OSC-Packets (OSCMessage or OSCBundle) via a UDP-socket
|
|
This client keeps a dict of 'OSCTargets'. and sends each OSCMessage to each OSCTarget
|
|
The OSCTargets are simply (host, port) tuples, and may be associated with an OSC-address prefix.
|
|
the OSCTarget's prefix gets prepended to each OSCMessage sent to that target.
|
|
"""
|
|
def __init__(self, server=None):
|
|
"""Construct a "Multi" OSC Client.
|
|
- server: Local OSCServer-instance this client will use the socket of for transmissions.
|
|
If none is supplied, a socket will be created.
|
|
"""
|
|
super(OSCMultiClient, self).__init__(server)
|
|
|
|
self.targets = {}
|
|
|
|
def _searchHostAddr(self, host):
|
|
"""Search the subscribed OSCTargets for (the first occurence of) given host.
|
|
Returns a (host, port) tuple
|
|
"""
|
|
try:
|
|
host = socket.gethostbyname(host)
|
|
except socket.error:
|
|
pass
|
|
|
|
for addr in list(self.targets.keys()):
|
|
if host == addr[0]:
|
|
return addr
|
|
|
|
raise NotSubscribedError((host, None))
|
|
|
|
def _updateFilters(self, dst, src):
|
|
"""Update a 'filters' dict with values form another 'filters' dict:
|
|
- src[a] == True and dst[a] == False: del dst[a]
|
|
- src[a] == False and dst[a] == True: del dst[a]
|
|
- a not in dst: dst[a] == src[a]
|
|
"""
|
|
if '/*' in list(src.keys()): # reset filters
|
|
dst.clear() # 'match everything' == no filters
|
|
if not src.pop('/*'):
|
|
dst['/*'] = False # 'match nothing'
|
|
|
|
for (addr, bool) in list(src.items()):
|
|
if (addr in list(dst.keys())) and (dst[addr] != bool):
|
|
del dst[addr]
|
|
else:
|
|
dst[addr] = bool
|
|
|
|
def _setTarget(self, address, prefix=None, filters=None):
|
|
"""Add (i.e. subscribe) a new OSCTarget, or change the prefix for an existing OSCTarget.
|
|
- address ((host, port) tuple): IP-address & UDP-port
|
|
- prefix (string): The OSC-address prefix prepended to the address of each OSCMessage
|
|
sent to this OSCTarget (optional)
|
|
"""
|
|
if address not in list(self.targets.keys()):
|
|
self.targets[address] = ["",{}]
|
|
|
|
if prefix != None:
|
|
if len(prefix):
|
|
# make sure prefix starts with ONE '/', and does not end with '/'
|
|
prefix = '/' + prefix.strip('/')
|
|
|
|
self.targets[address][0] = prefix
|
|
|
|
if filters != None:
|
|
if isinstance(filters,str):
|
|
(_, filters) = parseFilterStr(filters)
|
|
elif not isinstance(filters,dict):
|
|
raise TypeError("'filters' argument must be a dict with {addr:bool} entries")
|
|
|
|
self._updateFilters(self.targets[address][1], filters)
|
|
|
|
def setOSCTarget(self, address, prefix=None, filters=None):
|
|
"""Add (i.e. subscribe) a new OSCTarget, or change the prefix for an existing OSCTarget.
|
|
the 'address' argument can be a ((host, port) tuple) : The target server address & UDP-port
|
|
or a 'host' (string) : The host will be looked-up
|
|
- prefix (string): The OSC-address prefix prepended to the address of each OSCMessage
|
|
sent to this OSCTarget (optional)
|
|
"""
|
|
if isinstance(address,str):
|
|
address = self._searchHostAddr(address)
|
|
|
|
elif (isinstance(address,tuple)):
|
|
(host, port) = address[:2]
|
|
try:
|
|
host = socket.gethostbyname(host)
|
|
except:
|
|
pass
|
|
|
|
address = (host, port)
|
|
else:
|
|
raise TypeError("'address' argument must be a (host, port) tuple or a 'host' string")
|
|
|
|
self._setTarget(address, prefix, filters)
|
|
|
|
def setOSCTargetFromStr(self, url):
|
|
"""Adds or modifies a subscribed OSCTarget from the given string, which should be in the
|
|
'<host>:<port>[/<prefix>] [+/<filter>]|[-/<filter>] ...' format.
|
|
"""
|
|
(addr, tail) = parseUrlStr(url)
|
|
(prefix, filters) = parseFilterStr(tail)
|
|
self._setTarget(addr, prefix, filters)
|
|
|
|
def _delTarget(self, address, prefix=None):
|
|
"""Delete the specified OSCTarget from the Client's dict.
|
|
the 'address' argument must be a (host, port) tuple.
|
|
If the 'prefix' argument is given, the Target is only deleted if the address and prefix match.
|
|
"""
|
|
try:
|
|
if prefix == None:
|
|
del self.targets[address]
|
|
elif prefix == self.targets[address][0]:
|
|
del self.targets[address]
|
|
except KeyError:
|
|
raise NotSubscribedError(address, prefix)
|
|
|
|
def delOSCTarget(self, address, prefix=None):
|
|
"""Delete the specified OSCTarget from the Client's dict.
|
|
the 'address' argument can be a ((host, port) tuple), or a hostname.
|
|
If the 'prefix' argument is given, the Target is only deleted if the address and prefix match.
|
|
"""
|
|
if isinstance(address,str):
|
|
address = self._searchHostAddr(address)
|
|
|
|
if isinstance(address,tuple):
|
|
(host, port) = address[:2]
|
|
try:
|
|
host = socket.gethostbyname(host)
|
|
except socket.error:
|
|
pass
|
|
address = (host, port)
|
|
|
|
self._delTarget(address, prefix)
|
|
|
|
def hasOSCTarget(self, address, prefix=None):
|
|
"""Return True if the given OSCTarget exists in the Client's dict.
|
|
the 'address' argument can be a ((host, port) tuple), or a hostname.
|
|
If the 'prefix' argument is given, the return-value is only True if the address and prefix match.
|
|
"""
|
|
if isinstance(address,str):
|
|
address = self._searchHostAddr(address)
|
|
|
|
if isinstance(address,tuple):
|
|
(host, port) = address[:2]
|
|
try:
|
|
host = socket.gethostbyname(host)
|
|
except socket.error:
|
|
pass
|
|
address = (host, port)
|
|
|
|
if address in list(self.targets.keys()):
|
|
if prefix == None:
|
|
return True
|
|
elif prefix == self.targets[address][0]:
|
|
return True
|
|
|
|
return False
|
|
|
|
def getOSCTargets(self):
|
|
"""Returns the dict of OSCTargets: {addr:[prefix, filters], ...}
|
|
"""
|
|
out = {}
|
|
for ((host, port), pf) in list(self.targets.items()):
|
|
try:
|
|
(host, _, _) = socket.gethostbyaddr(host)
|
|
except socket.error:
|
|
pass
|
|
|
|
out[(host, port)] = pf
|
|
|
|
return out
|
|
|
|
def getOSCTarget(self, address):
|
|
"""Returns the OSCTarget matching the given address as a ((host, port), [prefix, filters]) tuple.
|
|
'address' can be a (host, port) tuple, or a 'host' (string), in which case the first matching OSCTarget is returned
|
|
Returns (None, ['',{}]) if address not found.
|
|
"""
|
|
if isinstance(address,str):
|
|
address = self._searchHostAddr(address)
|
|
|
|
if (isinstance(address,tuple)):
|
|
(host, port) = address[:2]
|
|
try:
|
|
host = socket.gethostbyname(host)
|
|
except socket.error:
|
|
pass
|
|
address = (host, port)
|
|
|
|
if (address in list(self.targets.keys())):
|
|
try:
|
|
(host, _, _) = socket.gethostbyaddr(host)
|
|
except socket.error:
|
|
pass
|
|
|
|
return ((host, port), self.targets[address])
|
|
|
|
return (None, ['',{}])
|
|
|
|
def clearOSCTargets(self):
|
|
"""Erases all OSCTargets from the Client's dict
|
|
"""
|
|
self.targets = {}
|
|
|
|
def updateOSCTargets(self, dict):
|
|
"""Update the Client's OSCTargets dict with the contents of 'dict'
|
|
The given dict's items MUST be of the form
|
|
{ (host, port):[prefix, filters], ... }
|
|
"""
|
|
for ((host, port), (prefix, filters)) in list(dict.items()):
|
|
val = [prefix, {}]
|
|
self._updateFilters(val[1], filters)
|
|
|
|
try:
|
|
host = socket.gethostbyname(host)
|
|
except socket.error:
|
|
pass
|
|
|
|
self.targets[(host, port)] = val
|
|
|
|
def getOSCTargetStr(self, address):
|
|
"""Returns the OSCTarget matching the given address as a ('osc://<host>:<port>[<prefix>]', ['<filter-string>', ...])' tuple.
|
|
'address' can be a (host, port) tuple, or a 'host' (string), in which case the first matching OSCTarget is returned
|
|
Returns (None, []) if address not found.
|
|
"""
|
|
(addr, (prefix, filters)) = self.getOSCTarget(address)
|
|
if addr == None:
|
|
return (None, [])
|
|
|
|
return ("osc://%s" % getUrlStr(addr, prefix), getFilterStr(filters))
|
|
|
|
def getOSCTargetStrings(self):
|
|
"""Returns a list of all OSCTargets as ('osc://<host>:<port>[<prefix>]', ['<filter-string>', ...])' tuples.
|
|
"""
|
|
out = []
|
|
for (addr, (prefix, filters)) in list(self.targets.items()):
|
|
out.append(("osc://%s" % getUrlStr(addr, prefix), getFilterStr(filters)))
|
|
|
|
return out
|
|
|
|
def connect(self, address):
|
|
"""The OSCMultiClient isn't allowed to connect to any specific
|
|
address.
|
|
"""
|
|
return NotImplemented
|
|
|
|
def sendto(self, msg, address, timeout=None):
|
|
"""Send the given OSCMessage.
|
|
The specified address is ignored. Instead this method calls send() to
|
|
send the message to all subscribed clients.
|
|
- msg: OSCMessage (or OSCBundle) to be sent
|
|
- address: (host, port) tuple specifing remote server to send the message to
|
|
- timeout: A timeout value for attempting to send. If timeout == None,
|
|
this call blocks until socket is available for writing.
|
|
Raises OSCClientError when timing out while waiting for the socket.
|
|
"""
|
|
self.send(msg, timeout)
|
|
|
|
def _filterMessage(self, filters, msg):
|
|
"""Checks the given OSCMessge against the given filters.
|
|
'filters' is a dict containing OSC-address:bool pairs.
|
|
If 'msg' is an OSCBundle, recursively filters its constituents.
|
|
Returns None if the message is to be filtered, else returns the message.
|
|
or
|
|
Returns a copy of the OSCBundle with the filtered messages removed.
|
|
"""
|
|
if isinstance(msg, OSCBundle):
|
|
out = msg.copy()
|
|
msgs = list(out.values())
|
|
out.clearData()
|
|
for m in msgs:
|
|
m = self._filterMessage(filters, m)
|
|
if m: # this catches 'None' and empty bundles.
|
|
out.append(m)
|
|
|
|
elif isinstance(msg, OSCMessage):
|
|
if '/*' in list(filters.keys()):
|
|
if filters['/*']:
|
|
out = msg
|
|
else:
|
|
out = None
|
|
|
|
elif False in list(filters.values()):
|
|
out = msg
|
|
else:
|
|
out = None
|
|
|
|
else:
|
|
raise TypeError("'msg' argument is not an OSCMessage or OSCBundle object")
|
|
|
|
expr = getRegEx(msg.address)
|
|
|
|
for addr in list(filters.keys()):
|
|
if addr == '/*':
|
|
continue
|
|
|
|
match = expr.match(addr)
|
|
if match and (match.end() == len(addr)):
|
|
if filters[addr]:
|
|
out = msg
|
|
else:
|
|
out = None
|
|
break
|
|
|
|
return out
|
|
|
|
def _prefixAddress(self, prefix, msg):
|
|
"""Makes a copy of the given OSCMessage, then prepends the given prefix to
|
|
The message's OSC-address.
|
|
If 'msg' is an OSCBundle, recursively prepends the prefix to its constituents.
|
|
"""
|
|
out = msg.copy()
|
|
|
|
if isinstance(msg, OSCBundle):
|
|
msgs = list(out.values())
|
|
out.clearData()
|
|
for m in msgs:
|
|
out.append(self._prefixAddress(prefix, m))
|
|
|
|
elif isinstance(msg, OSCMessage):
|
|
out.setAddress(prefix + out.address)
|
|
|
|
else:
|
|
raise TypeError("'msg' argument is not an OSCMessage or OSCBundle object")
|
|
|
|
return out
|
|
|
|
def send(self, msg, timeout=None):
|
|
"""Send the given OSCMessage to all subscribed OSCTargets
|
|
- msg: OSCMessage (or OSCBundle) to be sent
|
|
- timeout: A timeout value for attempting to send. If timeout == None,
|
|
this call blocks until socket is available for writing.
|
|
Raises OSCClientError when timing out while waiting for the socket.
|
|
"""
|
|
for (address, (prefix, filters)) in list(self.targets.items()):
|
|
if len(filters):
|
|
out = self._filterMessage(filters, msg)
|
|
if not out: # this catches 'None' and empty bundles.
|
|
continue
|
|
else:
|
|
out = msg
|
|
|
|
if len(prefix):
|
|
out = self._prefixAddress(prefix, msg)
|
|
|
|
binary = out.getBinary()
|
|
|
|
ret = select.select([],[self._fd], [], timeout)
|
|
try:
|
|
ret[1].index(self._fd)
|
|
except:
|
|
# for the very rare case this might happen
|
|
raise OSCClientError("Timed out waiting for file descriptor")
|
|
|
|
try:
|
|
while len(binary):
|
|
sent = self.socket.sendto(binary, address)
|
|
binary = binary[sent:]
|
|
|
|
except socket.error as e:
|
|
if e[0] in (7, 65): # 7 = 'no address associated with nodename', 65 = 'no route to host'
|
|
raise e
|
|
else:
|
|
raise OSCClientError("while sending to %s: %s" % (str(address), str(e)))
|
|
|
|
class OSCAddressSpace:
|
|
def __init__(self):
|
|
self.callbacks = {}
|
|
def addMsgHandler(self, address, callback):
|
|
"""Register a handler for an OSC-address
|
|
- 'address' is the OSC address-string.
|
|
the address-string should start with '/' and may not contain '*'
|
|
- 'callback' is the function called for incoming OSCMessages that match 'address'.
|
|
The callback-function will be called with the same arguments as the 'msgPrinter_handler' below
|
|
"""
|
|
for chk in '*?,[]{}# ':
|
|
if chk in address:
|
|
raise OSCServerError("OSC-address string may not contain any characters in '*?,[]{}# '")
|
|
|
|
if type(callback) not in (types.FunctionType, types.MethodType):
|
|
raise OSCServerError("Message callback '%s' is not callable" % repr(callback))
|
|
|
|
if address != 'default':
|
|
address = '/' + address.strip('/')
|
|
|
|
self.callbacks[address] = callback
|
|
|
|
def delMsgHandler(self, address):
|
|
"""Remove the registered handler for the given OSC-address
|
|
"""
|
|
del self.callbacks[address]
|
|
|
|
def getOSCAddressSpace(self):
|
|
"""Returns a list containing all OSC-addresses registerd with this Server.
|
|
"""
|
|
return list(self.callbacks.keys())
|
|
|
|
def dispatchMessage(self, pattern, tags, data, client_address):
|
|
"""Attmept to match the given OSC-address pattern, which may contain '*',
|
|
against all callbacks registered with the OSCServer.
|
|
Calls the matching callback and returns whatever it returns.
|
|
If no match is found, and a 'default' callback is registered, it calls that one,
|
|
or raises NoCallbackError if a 'default' callback is not registered.
|
|
|
|
- pattern (string): The OSC-address of the receied message
|
|
- tags (string): The OSC-typetags of the receied message's arguments, without ','
|
|
- data (list): The message arguments
|
|
"""
|
|
if len(tags) != len(data):
|
|
raise OSCServerError("Malformed OSC-message; got %d typetags [%s] vs. %d values" % (len(tags), tags, len(data)))
|
|
|
|
expr = getRegEx(pattern)
|
|
|
|
replies = []
|
|
matched = 0
|
|
for addr in list(self.callbacks.keys()):
|
|
match = expr.match(addr)
|
|
if match and (match.end() == len(addr)):
|
|
reply = self.callbacks[addr](pattern, tags, data, client_address)
|
|
matched += 1
|
|
if isinstance(reply, OSCMessage):
|
|
replies.append(reply)
|
|
elif reply != None:
|
|
raise TypeError("Message-callback %s did not return OSCMessage or None: %s" % (self.server.callbacks[addr], type(reply)))
|
|
|
|
if matched == 0:
|
|
if 'default' in self.callbacks:
|
|
reply = self.callbacks['default'](pattern, tags, data, client_address)
|
|
if isinstance(reply, OSCMessage):
|
|
replies.append(reply)
|
|
elif reply != None:
|
|
raise TypeError("Message-callback %s did not return OSCMessage or None: %s" % (self.server.callbacks['default'], type(reply)))
|
|
else:
|
|
raise NoCallbackError(pattern)
|
|
|
|
return replies
|
|
|
|
######
|
|
#
|
|
# OSCRequestHandler classes
|
|
#
|
|
######
|
|
class OSCRequestHandler(DatagramRequestHandler):
|
|
"""RequestHandler class for the OSCServer
|
|
"""
|
|
def setup(self):
|
|
"""Prepare RequestHandler.
|
|
Unpacks request as (packet, source socket address)
|
|
Creates an empty list for replies.
|
|
"""
|
|
(self.packet, self.socket) = self.request
|
|
self.replies = []
|
|
|
|
def _unbundle(self, decoded):
|
|
"""Recursive bundle-unpacking function"""
|
|
if decoded[0] != "#bundle":
|
|
self.replies += self.server.dispatchMessage(decoded[0], decoded[1][1:], decoded[2:], self.client_address)
|
|
return
|
|
|
|
now = time.time()
|
|
timetag = decoded[1]
|
|
if (timetag > 0.) and (timetag > now):
|
|
time.sleep(timetag - now)
|
|
|
|
for msg in decoded[2:]:
|
|
self._unbundle(msg)
|
|
|
|
def handle(self):
|
|
"""Handle incoming OSCMessage
|
|
"""
|
|
decoded = decodeOSC(self.packet)
|
|
if not len(decoded):
|
|
return
|
|
|
|
self._unbundle(decoded)
|
|
|
|
def finish(self):
|
|
"""Finish handling OSCMessage.
|
|
Send any reply returned by the callback(s) back to the originating client
|
|
as an OSCMessage or OSCBundle
|
|
"""
|
|
if self.server.return_port:
|
|
self.client_address = (self.client_address[0], self.server.return_port)
|
|
|
|
if len(self.replies) > 1:
|
|
msg = OSCBundle()
|
|
for reply in self.replies:
|
|
msg.append(reply)
|
|
elif len(self.replies) == 1:
|
|
msg = self.replies[0]
|
|
else:
|
|
return
|
|
|
|
self.server.client.sendto(msg, self.client_address)
|
|
|
|
class ThreadingOSCRequestHandler(OSCRequestHandler):
|
|
"""Multi-threaded OSCRequestHandler;
|
|
Starts a new RequestHandler thread for each unbundled OSCMessage
|
|
"""
|
|
def _unbundle(self, decoded):
|
|
"""Recursive bundle-unpacking function
|
|
This version starts a new thread for each sub-Bundle found in the Bundle,
|
|
then waits for all its children to finish.
|
|
"""
|
|
if decoded[0] != "#bundle":
|
|
self.replies += self.server.dispatchMessage(decoded[0], decoded[1][1:], decoded[2:], self.client_address)
|
|
return
|
|
|
|
now = time.time()
|
|
timetag = decoded[1]
|
|
if (timetag > 0.) and (timetag > now):
|
|
time.sleep(timetag - now)
|
|
now = time.time()
|
|
|
|
children = []
|
|
|
|
for msg in decoded[2:]:
|
|
t = threading.Thread(target = self._unbundle, args = (msg,))
|
|
t.start()
|
|
children.append(t)
|
|
|
|
# wait for all children to terminate
|
|
for t in children:
|
|
t.join()
|
|
|
|
######
|
|
#
|
|
# OSCServer classes
|
|
#
|
|
######
|
|
class OSCServer(UDPServer, OSCAddressSpace):
|
|
"""A Synchronous OSCServer
|
|
Serves one request at-a-time, until the OSCServer is closed.
|
|
The OSC address-pattern is matched against a set of OSC-adresses
|
|
that have been registered to the server with a callback-function.
|
|
If the adress-pattern of the message machtes the registered address of a callback,
|
|
that function is called.
|
|
"""
|
|
|
|
# set the RequestHandlerClass, will be overridden by ForkingOSCServer & ThreadingOSCServer
|
|
RequestHandlerClass = OSCRequestHandler
|
|
|
|
# define a socket timeout, so the serve_forever loop can actually exit.
|
|
socket_timeout = 1
|
|
|
|
# DEBUG: print error-tracebacks (to stderr)?
|
|
print_tracebacks = False
|
|
|
|
def __init__(self, server_address, client=None, return_port=0):
|
|
"""Instantiate an OSCServer.
|
|
- server_address ((host, port) tuple): the local host & UDP-port
|
|
the server listens on
|
|
- client (OSCClient instance): The OSCClient used to send replies from this server.
|
|
If none is supplied (default) an OSCClient will be created.
|
|
- return_port (int): if supplied, sets the default UDP destination-port
|
|
for replies coming from this server.
|
|
"""
|
|
UDPServer.__init__(self, server_address, self.RequestHandlerClass)
|
|
OSCAddressSpace.__init__(self)
|
|
|
|
self.setReturnPort(return_port)
|
|
self.error_prefix = ""
|
|
self.info_prefix = "/info"
|
|
|
|
self.socket.settimeout(self.socket_timeout)
|
|
|
|
self.running = False
|
|
self.client = None
|
|
|
|
if client == None:
|
|
self.client = OSCClient(server=self)
|
|
else:
|
|
self.setClient(client)
|
|
|
|
def setClient(self, client):
|
|
"""Associate this Server with a new local Client instance, closing the Client this Server is currently using.
|
|
"""
|
|
if not isinstance(client, OSCClient):
|
|
raise ValueError("'client' argument is not a valid OSCClient object")
|
|
|
|
if client.server != None:
|
|
raise OSCServerError("Provided OSCClient already has an OSCServer-instance: %s" % str(client.server))
|
|
|
|
# Server socket is already listening at this point, so we can't use the client's socket.
|
|
# we'll have to force our socket on the client...
|
|
client_address = client.address() # client may be already connected
|
|
client.close() # shut-down that socket
|
|
|
|
# force our socket upon the client
|
|
client.socket = self.socket.dup()
|
|
client.socket.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, client.sndbuf_size)
|
|
client._fd = client.socket.fileno()
|
|
client.server = self
|
|
|
|
if client_address:
|
|
client.connect(client_address)
|
|
if not self.return_port:
|
|
self.return_port = client_address[1]
|
|
|
|
if self.client != None:
|
|
self.client.close()
|
|
|
|
self.client = client
|
|
|
|
def serve_forever(self):
|
|
"""Handle one request at a time until server is closed."""
|
|
self.running = True
|
|
while self.running:
|
|
self.handle_request() # this times-out when no data arrives.
|
|
|
|
def close(self):
|
|
"""Stops serving requests, closes server (socket), closes used client
|
|
"""
|
|
self.running = False
|
|
self.client.close()
|
|
self.server_close()
|
|
|
|
def __str__(self):
|
|
"""Returns a string containing this Server's Class-name, software-version and local bound address (if any)
|
|
"""
|
|
out = self.__class__.__name__
|
|
out += " v%s.%s-%s" % version
|
|
addr = self.address()
|
|
if addr:
|
|
out += " listening on osc://%s" % getUrlStr(addr)
|
|
else:
|
|
out += " (unbound)"
|
|
|
|
return out
|
|
|
|
def __eq__(self, other):
|
|
"""Compare function.
|
|
"""
|
|
if not isinstance(other, self.__class__):
|
|
return False
|
|
|
|
return cmp(self.socket._sock, other.socket._sock)
|
|
|
|
def __ne__(self, other):
|
|
"""Compare function.
|
|
"""
|
|
return not self.__eq__(other)
|
|
|
|
def address(self):
|
|
"""Returns a (host,port) tuple of the local address this server is bound to,
|
|
or None if not bound to any address.
|
|
"""
|
|
try:
|
|
return self.socket.getsockname()
|
|
except socket.error:
|
|
return None
|
|
|
|
def setReturnPort(self, port):
|
|
"""Set the destination UDP-port for replies returning from this server to the remote client
|
|
"""
|
|
if (port > 1024) and (port < 65536):
|
|
self.return_port = port
|
|
else:
|
|
self.return_port = None
|
|
|
|
|
|
def setSrvInfoPrefix(self, pattern):
|
|
"""Set the first part of OSC-address (pattern) this server will use to reply to server-info requests.
|
|
"""
|
|
if len(pattern):
|
|
pattern = '/' + pattern.strip('/')
|
|
|
|
self.info_prefix = pattern
|
|
|
|
def setSrvErrorPrefix(self, pattern=""):
|
|
"""Set the OSC-address (pattern) this server will use to report errors occuring during
|
|
received message handling to the remote client.
|
|
|
|
If pattern is empty (default), server-errors are not reported back to the client.
|
|
"""
|
|
if len(pattern):
|
|
pattern = '/' + pattern.strip('/')
|
|
|
|
self.error_prefix = pattern
|
|
|
|
def addDefaultHandlers(self, prefix="", info_prefix="/info", error_prefix="/error"):
|
|
"""Register a default set of OSC-address handlers with this Server:
|
|
- 'default' -> noCallback_handler
|
|
the given prefix is prepended to all other callbacks registered by this method:
|
|
- '<prefix><info_prefix' -> serverInfo_handler
|
|
- '<prefix><error_prefix> -> msgPrinter_handler
|
|
- '<prefix>/print' -> msgPrinter_handler
|
|
and, if the used Client supports it;
|
|
- '<prefix>/subscribe' -> subscription_handler
|
|
- '<prefix>/unsubscribe' -> subscription_handler
|
|
|
|
Note: the given 'error_prefix' argument is also set as default 'error_prefix' for error-messages
|
|
*sent from* this server. This is ok, because error-messages generally do not elicit a reply from the receiver.
|
|
|
|
To do this with the serverInfo-prefixes would be a bad idea, because if a request received on '/info' (for example)
|
|
would send replies to '/info', this could potentially cause a never-ending loop of messages!
|
|
Do *not* set the 'info_prefix' here (for incoming serverinfo requests) to the same value as given to
|
|
the setSrvInfoPrefix() method (for *replies* to incoming serverinfo requests).
|
|
For example, use '/info' for incoming requests, and '/inforeply' or '/serverinfo' or even just '/print' as the
|
|
info-reply prefix.
|
|
"""
|
|
self.error_prefix = error_prefix
|
|
self.addMsgHandler('default', self.noCallback_handler)
|
|
self.addMsgHandler(prefix + info_prefix, self.serverInfo_handler)
|
|
self.addMsgHandler(prefix + error_prefix, self.msgPrinter_handler)
|
|
self.addMsgHandler(prefix + '/print', self.msgPrinter_handler)
|
|
|
|
if isinstance(self.client, OSCMultiClient):
|
|
self.addMsgHandler(prefix + '/subscribe', self.subscription_handler)
|
|
self.addMsgHandler(prefix + '/unsubscribe', self.subscription_handler)
|
|
|
|
def printErr(self, txt):
|
|
"""Writes 'OSCServer: txt' to sys.stderr
|
|
"""
|
|
sys.stderr.write("OSCServer: %s\n" % txt)
|
|
|
|
def sendOSCerror(self, txt, client_address):
|
|
"""Sends 'txt', encapsulated in an OSCMessage to the default 'error_prefix' OSC-addres.
|
|
Message is sent to the given client_address, with the default 'return_port' overriding
|
|
the client_address' port, if defined.
|
|
"""
|
|
lines = txt.split('\n')
|
|
if len(lines) == 1:
|
|
msg = OSCMessage(self.error_prefix)
|
|
msg.append(lines[0])
|
|
elif len(lines) > 1:
|
|
msg = OSCBundle(self.error_prefix)
|
|
for line in lines:
|
|
msg.append(line)
|
|
else:
|
|
return
|
|
|
|
if self.return_port:
|
|
client_address = (client_address[0], self.return_port)
|
|
|
|
self.client.sendto(msg, client_address)
|
|
|
|
def reportErr(self, txt, client_address):
|
|
"""Writes 'OSCServer: txt' to sys.stderr
|
|
If self.error_prefix is defined, sends 'txt' as an OSC error-message to the client(s)
|
|
(see printErr() and sendOSCerror())
|
|
"""
|
|
self.printErr(txt)
|
|
|
|
if len(self.error_prefix):
|
|
self.sendOSCerror(txt, client_address)
|
|
|
|
def sendOSCinfo(self, txt, client_address):
|
|
"""Sends 'txt', encapsulated in an OSCMessage to the default 'info_prefix' OSC-addres.
|
|
Message is sent to the given client_address, with the default 'return_port' overriding
|
|
the client_address' port, if defined.
|
|
"""
|
|
lines = txt.split('\n')
|
|
if len(lines) == 1:
|
|
msg = OSCMessage(self.info_prefix)
|
|
msg.append(lines[0])
|
|
elif len(lines) > 1:
|
|
msg = OSCBundle(self.info_prefix)
|
|
for line in lines:
|
|
msg.append(line)
|
|
else:
|
|
return
|
|
|
|
if self.return_port:
|
|
client_address = (client_address[0], self.return_port)
|
|
|
|
self.client.sendto(msg, client_address)
|
|
|
|
###
|
|
# Message-Handler callback functions
|
|
###
|
|
|
|
def handle_error(self, request, client_address):
|
|
"""Handle an exception in the Server's callbacks gracefully.
|
|
Writes the error to sys.stderr and, if the error_prefix (see setSrvErrorPrefix()) is set,
|
|
sends the error-message as reply to the client
|
|
"""
|
|
(e_type, e) = sys.exc_info()[:2]
|
|
self.printErr("%s on request from %s: %s" % (e_type.__name__, getUrlStr(client_address), str(e)))
|
|
|
|
if self.print_tracebacks:
|
|
import traceback
|
|
traceback.print_exc() # XXX But this goes to stderr!
|
|
|
|
if len(self.error_prefix):
|
|
self.sendOSCerror("%s: %s" % (e_type.__name__, str(e)), client_address)
|
|
|
|
def noCallback_handler(self, addr, tags, data, client_address):
|
|
"""Example handler for OSCMessages.
|
|
All registerd handlers must accept these three arguments:
|
|
- addr (string): The OSC-address pattern of the received Message
|
|
(the 'addr' string has already been matched against the handler's registerd OSC-address,
|
|
but may contain '*'s & such)
|
|
- tags (string): The OSC-typetags of the received message's arguments. (without the preceding comma)
|
|
- data (list): The OSCMessage's arguments
|
|
Note that len(tags) == len(data)
|
|
- client_address ((host, port) tuple): the host & port this message originated from.
|
|
|
|
a Message-handler function may return None, but it could also return an OSCMessage (or OSCBundle),
|
|
which then gets sent back to the client.
|
|
|
|
This handler prints a "No callback registered to handle ..." message.
|
|
Returns None
|
|
"""
|
|
self.reportErr("No callback registered to handle OSC-address '%s'" % addr, client_address)
|
|
|
|
def msgPrinter_handler(self, addr, tags, data, client_address):
|
|
"""Example handler for OSCMessages.
|
|
All registerd handlers must accept these three arguments:
|
|
- addr (string): The OSC-address pattern of the received Message
|
|
(the 'addr' string has already been matched against the handler's registerd OSC-address,
|
|
but may contain '*'s & such)
|
|
- tags (string): The OSC-typetags of the received message's arguments. (without the preceding comma)
|
|
- data (list): The OSCMessage's arguments
|
|
Note that len(tags) == len(data)
|
|
- client_address ((host, port) tuple): the host & port this message originated from.
|
|
|
|
a Message-handler function may return None, but it could also return an OSCMessage (or OSCBundle),
|
|
which then gets sent back to the client.
|
|
|
|
This handler prints the received message.
|
|
Returns None
|
|
"""
|
|
txt = "OSCMessage '%s' from %s: " % (addr, getUrlStr(client_address))
|
|
txt += str(data)
|
|
|
|
self.printErr(txt) # strip trailing comma & space
|
|
|
|
def serverInfo_handler(self, addr, tags, data, client_address):
|
|
"""Example handler for OSCMessages.
|
|
All registerd handlers must accept these three arguments:
|
|
- addr (string): The OSC-address pattern of the received Message
|
|
(the 'addr' string has already been matched against the handler's registerd OSC-address,
|
|
but may contain '*'s & such)
|
|
- tags (string): The OSC-typetags of the received message's arguments. (without the preceding comma)
|
|
- data (list): The OSCMessage's arguments
|
|
Note that len(tags) == len(data)
|
|
- client_address ((host, port) tuple): the host & port this message originated from.
|
|
|
|
a Message-handler function may return None, but it could also return an OSCMessage (or OSCBundle),
|
|
which then gets sent back to the client.
|
|
|
|
This handler returns a reply to the client, which can contain various bits of information
|
|
about this server, depending on the first argument of the received OSC-message:
|
|
- 'help' | 'info' : Reply contains server type & version info, plus a list of
|
|
available 'commands' understood by this handler
|
|
- 'list' | 'ls' : Reply is a bundle of 'address <string>' messages, listing the server's
|
|
OSC address-space.
|
|
- 'clients' | 'targets' : Reply is a bundle of 'target osc://<host>:<port>[<prefix>] [<filter>] [...]'
|
|
messages, listing the local Client-instance's subscribed remote clients.
|
|
"""
|
|
if len(data) == 0:
|
|
return None
|
|
|
|
cmd = data.pop(0)
|
|
|
|
reply = None
|
|
if cmd in ('help', 'info'):
|
|
reply = OSCBundle(self.info_prefix)
|
|
reply.append(('server', str(self)))
|
|
reply.append(('info_command', "ls | list : list OSC address-space"))
|
|
reply.append(('info_command', "clients | targets : list subscribed clients"))
|
|
elif cmd in ('ls', 'list'):
|
|
reply = OSCBundle(self.info_prefix)
|
|
for addr in list(self.callbacks.keys()):
|
|
reply.append(('address', addr))
|
|
elif cmd in ('clients', 'targets'):
|
|
if hasattr(self.client, 'getOSCTargetStrings'):
|
|
reply = OSCBundle(self.info_prefix)
|
|
for trg in self.client.getOSCTargetStrings():
|
|
reply.append(('target',) + trg)
|
|
else:
|
|
cli_addr = self.client.address()
|
|
if cli_addr:
|
|
reply = OSCMessage(self.info_prefix)
|
|
reply.append(('target', "osc://%s/" % getUrlStr(cli_addr)))
|
|
else:
|
|
self.reportErr("unrecognized command '%s' in /info request from osc://%s. Try 'help'" % (cmd, getUrlStr(client_address)), client_address)
|
|
|
|
return reply
|
|
|
|
def _subscribe(self, data, client_address):
|
|
"""Handle the actual subscription. the provided 'data' is concatenated together to form a
|
|
'<host>:<port>[<prefix>] [<filter>] [...]' string, which is then passed to
|
|
parseUrlStr() & parseFilterStr() to actually retreive <host>, <port>, etc.
|
|
|
|
This 'long way 'round' approach (almost) guarantees that the subscription works,
|
|
regardless of how the bits of the <url> are encoded in 'data'.
|
|
"""
|
|
url = ""
|
|
have_port = False
|
|
for item in data:
|
|
if (isinstance(item,int)) and not have_port:
|
|
url += ":%d" % item
|
|
have_port = True
|
|
elif isinstance(item,str):
|
|
url += item
|
|
|
|
(addr, tail) = parseUrlStr(url)
|
|
(prefix, filters) = parseFilterStr(tail)
|
|
|
|
if addr != None:
|
|
(host, port) = addr
|
|
if not host:
|
|
host = client_address[0]
|
|
if not port:
|
|
port = client_address[1]
|
|
addr = (host, port)
|
|
else:
|
|
addr = client_address
|
|
|
|
self.client._setTarget(addr, prefix, filters)
|
|
|
|
trg = self.client.getOSCTargetStr(addr)
|
|
if trg[0] != None:
|
|
reply = OSCMessage(self.info_prefix)
|
|
reply.append(('target',) + trg)
|
|
return reply
|
|
|
|
def _unsubscribe(self, data, client_address):
|
|
"""Handle the actual unsubscription. the provided 'data' is concatenated together to form a
|
|
'<host>:<port>[<prefix>]' string, which is then passed to
|
|
parseUrlStr() to actually retreive <host>, <port> & <prefix>.
|
|
|
|
This 'long way 'round' approach (almost) guarantees that the unsubscription works,
|
|
regardless of how the bits of the <url> are encoded in 'data'.
|
|
"""
|
|
url = ""
|
|
have_port = False
|
|
for item in data:
|
|
if (isinstance(item,int)) and not have_port:
|
|
url += ":%d" % item
|
|
have_port = True
|
|
elif isinstance(item,str):
|
|
url += item
|
|
|
|
(addr, _) = parseUrlStr(url)
|
|
|
|
if addr == None:
|
|
addr = client_address
|
|
else:
|
|
(host, port) = addr
|
|
if not host:
|
|
host = client_address[0]
|
|
if not port:
|
|
try:
|
|
(host, port) = self.client._searchHostAddr(host)
|
|
except NotSubscribedError:
|
|
port = client_address[1]
|
|
|
|
addr = (host, port)
|
|
|
|
try:
|
|
self.client._delTarget(addr)
|
|
except NotSubscribedError as e:
|
|
txt = "%s: %s" % (e.__class__.__name__, str(e))
|
|
self.printErr(txt)
|
|
|
|
reply = OSCMessage(self.error_prefix)
|
|
reply.append(txt)
|
|
return reply
|
|
|
|
def subscription_handler(self, addr, tags, data, client_address):
|
|
"""Handle 'subscribe' / 'unsubscribe' requests from remote hosts,
|
|
if the local Client supports this (i.e. OSCMultiClient).
|
|
|
|
Supported commands:
|
|
- 'help' | 'info' : Reply contains server type & version info, plus a list of
|
|
available 'commands' understood by this handler
|
|
- 'list' | 'ls' : Reply is a bundle of 'target osc://<host>:<port>[<prefix>] [<filter>] [...]'
|
|
messages, listing the local Client-instance's subscribed remote clients.
|
|
- '[subscribe | listen | sendto | target] <url> [<filter> ...] : Subscribe remote client/server at <url>,
|
|
and/or set message-filters for messages being sent to the subscribed host, with the optional <filter>
|
|
arguments. Filters are given as OSC-addresses (or '*') prefixed by a '+' (send matching messages) or
|
|
a '-' (don't send matching messages). The wildcard '*', '+*' or '+/*' means 'send all' / 'filter none',
|
|
and '-*' or '-/*' means 'send none' / 'filter all' (which is not the same as unsubscribing!)
|
|
Reply is an OSCMessage with the (new) subscription; 'target osc://<host>:<port>[<prefix>] [<filter>] [...]'
|
|
- '[unsubscribe | silence | nosend | deltarget] <url> : Unsubscribe remote client/server at <url>
|
|
If the given <url> isn't subscribed, a NotSubscribedError-message is printed (and possibly sent)
|
|
|
|
The <url> given to the subscribe/unsubscribe handler should be of the form:
|
|
'[osc://][<host>][:<port>][<prefix>]', where any or all components can be omitted.
|
|
|
|
If <host> is not specified, the IP-address of the message's source is used.
|
|
If <port> is not specified, the <host> is first looked up in the list of subscribed hosts, and if found,
|
|
the associated port is used.
|
|
If <port> is not specified and <host> is not yet subscribed, the message's source-port is used.
|
|
If <prefix> is specified on subscription, <prefix> is prepended to the OSC-address of all messages
|
|
sent to the subscribed host.
|
|
If <prefix> is specified on unsubscription, the subscribed host is only unsubscribed if the host,
|
|
port and prefix all match the subscription.
|
|
If <prefix> is not specified on unsubscription, the subscribed host is unsubscribed if the host and port
|
|
match the subscription.
|
|
"""
|
|
if not isinstance(self.client, OSCMultiClient):
|
|
raise OSCServerError("Local %s does not support subsctiptions or message-filtering" % self.client.__class__.__name__)
|
|
|
|
addr_cmd = addr.split('/')[-1]
|
|
|
|
if len(data):
|
|
if data[0] in ('help', 'info'):
|
|
reply = OSCBundle(self.info_prefix)
|
|
reply.append(('server', str(self)))
|
|
reply.append(('subscribe_command', "ls | list : list subscribed targets"))
|
|
reply.append(('subscribe_command', "[subscribe | listen | sendto | target] <url> [<filter> ...] : subscribe to messages, set filters"))
|
|
reply.append(('subscribe_command', "[unsubscribe | silence | nosend | deltarget] <url> : unsubscribe from messages"))
|
|
return reply
|
|
|
|
if data[0] in ('ls', 'list'):
|
|
reply = OSCBundle(self.info_prefix)
|
|
for trg in self.client.getOSCTargetStrings():
|
|
reply.append(('target',) + trg)
|
|
return reply
|
|
|
|
if data[0] in ('subscribe', 'listen', 'sendto', 'target'):
|
|
return self._subscribe(data[1:], client_address)
|
|
|
|
if data[0] in ('unsubscribe', 'silence', 'nosend', 'deltarget'):
|
|
return self._unsubscribe(data[1:], client_address)
|
|
|
|
if addr_cmd in ('subscribe', 'listen', 'sendto', 'target'):
|
|
return self._subscribe(data, client_address)
|
|
|
|
if addr_cmd in ('unsubscribe', 'silence', 'nosend', 'deltarget'):
|
|
return self._unsubscribe(data, client_address)
|
|
|
|
class ForkingOSCServer(ForkingMixIn, OSCServer):
|
|
"""An Asynchronous OSCServer.
|
|
This server forks a new process to handle each incoming request.
|
|
"""
|
|
# set the RequestHandlerClass, will be overridden by ForkingOSCServer & ThreadingOSCServer
|
|
RequestHandlerClass = ThreadingOSCRequestHandler
|
|
|
|
class ThreadingOSCServer(ThreadingMixIn, OSCServer):
|
|
"""An Asynchronous OSCServer.
|
|
This server starts a new thread to handle each incoming request.
|
|
"""
|
|
# set the RequestHandlerClass, will be overridden by ForkingOSCServer & ThreadingOSCServer
|
|
RequestHandlerClass = ThreadingOSCRequestHandler
|
|
|
|
######
|
|
#
|
|
# OSCError classes
|
|
#
|
|
######
|
|
|
|
class OSCError(Exception):
|
|
"""Base Class for all OSC-related errors
|
|
"""
|
|
def __init__(self, message):
|
|
self.message = message
|
|
|
|
def __str__(self):
|
|
return self.message
|
|
|
|
class OSCClientError(OSCError):
|
|
"""Class for all OSCClient errors
|
|
"""
|
|
pass
|
|
|
|
class OSCServerError(OSCError):
|
|
"""Class for all OSCServer errors
|
|
"""
|
|
pass
|
|
|
|
class NoCallbackError(OSCServerError):
|
|
"""This error is raised (by an OSCServer) when an OSCMessage with an 'unmatched' address-pattern
|
|
is received, and no 'default' handler is registered.
|
|
"""
|
|
def __init__(self, pattern):
|
|
"""The specified 'pattern' should be the OSC-address of the 'unmatched' message causing the error to be raised.
|
|
"""
|
|
self.message = "No callback registered to handle OSC-address '%s'" % pattern
|
|
|
|
class NotSubscribedError(OSCClientError):
|
|
"""This error is raised (by an OSCMultiClient) when an attempt is made to unsubscribe a host
|
|
that isn't subscribed.
|
|
"""
|
|
def __init__(self, addr, prefix=None):
|
|
if prefix:
|
|
url = getUrlStr(addr, prefix)
|
|
else:
|
|
url = getUrlStr(addr, '')
|
|
|
|
self.message = "Target osc://%s is not subscribed" % url
|
|
|
|
######
|
|
#
|
|
# OSC over streaming transport layers (usually TCP)
|
|
#
|
|
# Note from the OSC 1.0 specifications about streaming protocols:
|
|
#
|
|
# The underlying network that delivers an OSC packet is responsible for
|
|
# delivering both the contents and the size to the OSC application. An OSC
|
|
# packet can be naturally represented by a datagram by a network protocol such
|
|
# as UDP. In a stream-based protocol such as TCP, the stream should begin with
|
|
# an int32 giving the size of the first packet, followed by the contents of the
|
|
# first packet, followed by the size of the second packet, etc.
|
|
#
|
|
# The contents of an OSC packet must be either an OSC Message or an OSC Bundle.
|
|
# The first byte of the packet's contents unambiguously distinguishes between
|
|
# these two alternatives.
|
|
#
|
|
######
|
|
|
|
class OSCStreamRequestHandler(StreamRequestHandler, OSCAddressSpace):
|
|
""" This is the central class of a streaming OSC server. If a client
|
|
connects to the server, the server instantiates a OSCStreamRequestHandler
|
|
for each new connection. This is fundamentally different to a packet
|
|
oriented server which has a single address space for all connections.
|
|
This connection based (streaming) OSC server maintains an address space
|
|
for each single connection, because usually tcp server spawn a new thread
|
|
or process for each new connection. This would generate severe
|
|
multithreading synchronization problems when each thread would operate on
|
|
the same address space object. Therefore: To implement a streaming/TCP OSC
|
|
server a custom handler must be implemented which implements the
|
|
setupAddressSpace member in which it creates its own address space for this
|
|
very connection. This has been done within the testbench and can serve as
|
|
inspiration.
|
|
"""
|
|
def __init__(self, request, client_address, server):
|
|
""" Initialize all base classes. The address space must be initialized
|
|
before the stream request handler because the initialization function
|
|
of the stream request handler calls the setup member which again
|
|
requires an already initialized address space.
|
|
"""
|
|
self._txMutex = threading.Lock()
|
|
OSCAddressSpace.__init__(self)
|
|
StreamRequestHandler.__init__(self, request, client_address, server)
|
|
|
|
def _unbundle(self, decoded):
|
|
"""Recursive bundle-unpacking function"""
|
|
if decoded[0] != "#bundle":
|
|
self.replies += self.dispatchMessage(decoded[0], decoded[1][1:], decoded[2:], self.client_address)
|
|
return
|
|
|
|
now = time.time()
|
|
timetag = decoded[1]
|
|
if (timetag > 0.) and (timetag > now):
|
|
time.sleep(timetag - now)
|
|
|
|
for msg in decoded[2:]:
|
|
self._unbundle(msg)
|
|
|
|
def setup(self):
|
|
StreamRequestHandler.setup(self)
|
|
print("SERVER: New client connection.")
|
|
self.setupAddressSpace()
|
|
self.server._clientRegister(self)
|
|
|
|
def setupAddressSpace(self):
|
|
""" Override this function to customize your address space. """
|
|
pass
|
|
|
|
def finish(self):
|
|
StreamRequestHandler.finish(self)
|
|
self.server._clientUnregister(self)
|
|
print("SERVER: Client connection handled.")
|
|
def _transmit(self, data):
|
|
sent = 0
|
|
while sent < len(data):
|
|
tmp = self.connection.send(data[sent:])
|
|
if tmp == 0:
|
|
return False
|
|
sent += tmp
|
|
return True
|
|
def _transmitMsg(self, msg):
|
|
"""Send an OSC message over a streaming socket. Raises exception if it
|
|
should fail. If everything is transmitted properly, True is returned. If
|
|
socket has been closed, False.
|
|
"""
|
|
if not isinstance(msg, OSCMessage):
|
|
raise TypeError("'msg' argument is not an OSCMessage or OSCBundle object")
|
|
|
|
try:
|
|
binary = msg.getBinary()
|
|
length = len(binary)
|
|
# prepend length of packet before the actual message (big endian)
|
|
len_big_endian = array.array('c', '\0' * 4)
|
|
struct.pack_into(">L", len_big_endian, 0, length)
|
|
len_big_endian = len_big_endian.tostring()
|
|
if self._transmit(len_big_endian) and self._transmit(binary):
|
|
return True
|
|
return False
|
|
except socket.error as e:
|
|
if e[0] == errno.EPIPE: # broken pipe
|
|
return False
|
|
raise e
|
|
|
|
def _receive(self, count):
|
|
""" Receive a certain amount of data from the socket and return it. If the
|
|
remote end should be closed in the meanwhile None is returned.
|
|
"""
|
|
chunk = self.connection.recv(count)
|
|
if not chunk or len(chunk) == 0:
|
|
return None
|
|
while len(chunk) < count:
|
|
tmp = self.connection.recv(count - len(chunk))
|
|
if not tmp or len(tmp) == 0:
|
|
return None
|
|
chunk = chunk + tmp
|
|
return chunk
|
|
|
|
def _receiveMsg(self):
|
|
""" Receive OSC message from a socket and decode.
|
|
If an error occurs, None is returned, else the message.
|
|
"""
|
|
# get OSC packet size from stream which is prepended each transmission
|
|
chunk = self._receive(4)
|
|
if chunk == None:
|
|
print("SERVER: Socket has been closed.")
|
|
return None
|
|
# extract message length from big endian unsigned long (32 bit)
|
|
slen = struct.unpack(">L", chunk)[0]
|
|
# receive the actual message
|
|
chunk = self._receive(slen)
|
|
if chunk == None:
|
|
print("SERVER: Socket has been closed.")
|
|
return None
|
|
# decode OSC data and dispatch
|
|
msg = decodeOSC(chunk)
|
|
if msg == None:
|
|
raise OSCError("SERVER: Message decoding failed.")
|
|
return msg
|
|
|
|
def handle(self):
|
|
"""
|
|
Handle a connection.
|
|
"""
|
|
# set socket blocking to avoid "resource currently not available"
|
|
# exceptions, because the connection socket inherits the settings
|
|
# from the listening socket and this times out from time to time
|
|
# in order to provide a way to shut the server down. But we want
|
|
# clean and blocking behaviour here
|
|
self.connection.settimeout(None)
|
|
|
|
print("SERVER: Entered server loop")
|
|
try:
|
|
while True:
|
|
decoded = self._receiveMsg()
|
|
if decoded == None:
|
|
return
|
|
elif len(decoded) <= 0:
|
|
# if message decoding fails we try to stay in sync but print a message
|
|
print("OSC stream server: Spurious message received.")
|
|
continue
|
|
|
|
self.replies = []
|
|
self._unbundle(decoded)
|
|
|
|
if len(self.replies) > 1:
|
|
msg = OSCBundle()
|
|
for reply in self.replies:
|
|
msg.append(reply)
|
|
elif len(self.replies) == 1:
|
|
msg = self.replies[0]
|
|
else:
|
|
# no replies, continue receiving
|
|
continue
|
|
self._txMutex.acquire()
|
|
txOk = self._transmitMsg(msg)
|
|
self._txMutex.release()
|
|
if not txOk:
|
|
break
|
|
|
|
except socket.error as e:
|
|
if e[0] == errno.ECONNRESET:
|
|
# if connection has been reset by client, we do not care much
|
|
# about it, we just assume our duty fullfilled
|
|
print("SERVER: Connection has been reset by peer.")
|
|
else:
|
|
raise e
|
|
|
|
def sendOSC(self, oscData):
|
|
""" This member can be used to transmit OSC messages or OSC bundles
|
|
over the client/server connection. It is thread save.
|
|
"""
|
|
self._txMutex.acquire()
|
|
result = self._transmitMsg(oscData)
|
|
self._txMutex.release()
|
|
return result
|
|
|
|
""" TODO Note on threaded unbundling for streaming (connection oriented)
|
|
transport:
|
|
|
|
Threaded unbundling as implemented in ThreadingOSCServer must be implemented in
|
|
a different way for the streaming variant, because contrary to the datagram
|
|
version the streaming handler is instantiated only once per connection. This
|
|
leads to the problem (if threaded unbundling is implemented as in OSCServer)
|
|
that all further message reception is blocked until all (previously received)
|
|
pending messages are processed.
|
|
|
|
Each StreamRequestHandler should provide a so called processing queue in which
|
|
all pending messages or subbundles are inserted to be processed in the future).
|
|
When a subbundle or message gets queued, a mechanism must be provided that
|
|
those messages get invoked when time asks for them. There are the following
|
|
opportunities:
|
|
- a timer is started which checks at regular intervals for messages in the
|
|
queue (polling - requires CPU resources)
|
|
- a dedicated timer is started for each message (requires timer resources)
|
|
"""
|
|
|
|
class OSCStreamingServer(TCPServer):
|
|
""" A connection oriented (TCP/IP) OSC server.
|
|
"""
|
|
|
|
# define a socket timeout, so the serve_forever loop can actually exit.
|
|
# with 2.6 and server.shutdown this wouldn't be necessary
|
|
socket_timeout = 1
|
|
|
|
# this is the class which handles a new connection. Override this for a
|
|
# useful customized server. See the testbench for an example
|
|
RequestHandlerClass = OSCStreamRequestHandler
|
|
|
|
def __init__(self, address):
|
|
"""Instantiate an OSCStreamingServer.
|
|
- server_address ((host, port) tuple): the local host & UDP-port
|
|
the server listens for new connections.
|
|
"""
|
|
self._clientList = []
|
|
self._clientListMutex = threading.Lock()
|
|
TCPServer.__init__(self, address, self.RequestHandlerClass)
|
|
self.socket.settimeout(self.socket_timeout)
|
|
|
|
def serve_forever(self):
|
|
"""Handle one request at a time until server is closed.
|
|
Had to add this since 2.5 does not support server.shutdown()
|
|
"""
|
|
self.running = True
|
|
while self.running:
|
|
self.handle_request() # this times-out when no data arrives.
|
|
|
|
def start(self):
|
|
""" Start the server thread. """
|
|
self._server_thread = threading.Thread(target=self.serve_forever)
|
|
self._server_thread.setDaemon(True)
|
|
self._server_thread.start()
|
|
|
|
def stop(self):
|
|
""" Stop the server thread and close the socket. """
|
|
self.running = False
|
|
self._server_thread.join()
|
|
self.server_close()
|
|
# 2.6 only
|
|
#self.shutdown()
|
|
|
|
def _clientRegister(self, client):
|
|
""" Gets called by each request/connection handler when connection is
|
|
established to add itself to the client list
|
|
"""
|
|
self._clientListMutex.acquire()
|
|
self._clientList.append(client)
|
|
self._clientListMutex.release()
|
|
|
|
def _clientUnregister(self, client):
|
|
""" Gets called by each request/connection handler when connection is
|
|
lost to remove itself from the client list
|
|
"""
|
|
self._clientListMutex.acquire()
|
|
self._clientList.remove(client)
|
|
self._clientListMutex.release()
|
|
|
|
def broadcastToClients(self, oscData):
|
|
""" Send OSC message or bundle to all connected clients. """
|
|
result = True
|
|
for client in self._clientList:
|
|
result = result and client.sendOSC(oscData)
|
|
return result
|
|
|
|
class OSCStreamingServerThreading(ThreadingMixIn, OSCStreamingServer):
|
|
pass
|
|
""" Implements a server which spawns a separate thread for each incoming
|
|
connection. Care must be taken since the OSC address space is for all
|
|
the same.
|
|
"""
|
|
|
|
class OSCStreamingClient(OSCAddressSpace):
|
|
""" OSC streaming client.
|
|
A streaming client establishes a connection to a streaming server but must
|
|
be able to handle replies by the server as well. To accomplish this the
|
|
receiving takes place in a secondary thread, because no one knows if we
|
|
have to expect a reply or not, i.e. synchronous architecture doesn't make
|
|
much sense.
|
|
Replies will be matched against the local address space. If message
|
|
handlers access code of the main thread (where the client messages are sent
|
|
to the server) care must be taken e.g. by installing sychronization
|
|
mechanisms or by using an event dispatcher which can handle events
|
|
originating from other threads.
|
|
"""
|
|
# set outgoing socket buffer size
|
|
sndbuf_size = 4096 * 8
|
|
rcvbuf_size = 4096 * 8
|
|
|
|
def __init__(self):
|
|
self._txMutex = threading.Lock()
|
|
OSCAddressSpace.__init__(self)
|
|
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, self.sndbuf_size)
|
|
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, self.rcvbuf_size)
|
|
self.socket.settimeout(1.0)
|
|
self._running = False
|
|
|
|
def _receiveWithTimeout(self, count):
|
|
chunk = str()
|
|
while len(chunk) < count:
|
|
try:
|
|
tmp = self.socket.recv(count - len(chunk))
|
|
except socket.timeout:
|
|
if not self._running:
|
|
print("CLIENT: Socket timed out and termination requested.")
|
|
return None
|
|
else:
|
|
continue
|
|
except socket.error as e:
|
|
if e[0] == errno.ECONNRESET:
|
|
print("CLIENT: Connection reset by peer.")
|
|
return None
|
|
else:
|
|
raise e
|
|
if not tmp or len(tmp) == 0:
|
|
print("CLIENT: Socket has been closed.")
|
|
return None
|
|
chunk = chunk + tmp
|
|
return chunk
|
|
def _receiveMsgWithTimeout(self):
|
|
""" Receive OSC message from a socket and decode.
|
|
If an error occurs, None is returned, else the message.
|
|
"""
|
|
# get OSC packet size from stream which is prepended each transmission
|
|
chunk = self._receiveWithTimeout(4)
|
|
if not chunk:
|
|
return None
|
|
# extract message length from big endian unsigned long (32 bit)
|
|
slen = struct.unpack(">L", chunk)[0]
|
|
# receive the actual message
|
|
chunk = self._receiveWithTimeout(slen)
|
|
if not chunk:
|
|
return None
|
|
# decode OSC content
|
|
msg = decodeOSC(chunk)
|
|
if msg == None:
|
|
raise OSCError("CLIENT: Message decoding failed.")
|
|
return msg
|
|
|
|
def _receiving_thread_entry(self):
|
|
print("CLIENT: Entered receiving thread.")
|
|
self._running = True
|
|
while self._running:
|
|
decoded = self._receiveMsgWithTimeout()
|
|
if not decoded:
|
|
break
|
|
elif len(decoded) <= 0:
|
|
continue
|
|
|
|
self.replies = []
|
|
self._unbundle(decoded)
|
|
if len(self.replies) > 1:
|
|
msg = OSCBundle()
|
|
for reply in self.replies:
|
|
msg.append(reply)
|
|
elif len(self.replies) == 1:
|
|
msg = self.replies[0]
|
|
else:
|
|
continue
|
|
self._txMutex.acquire()
|
|
txOk = self._transmitMsgWithTimeout(msg)
|
|
self._txMutex.release()
|
|
if not txOk:
|
|
break
|
|
print("CLIENT: Receiving thread terminated.")
|
|
|
|
def _unbundle(self, decoded):
|
|
if decoded[0] != "#bundle":
|
|
self.replies += self.dispatchMessage(decoded[0], decoded[1][1:], decoded[2:], self.socket.getpeername())
|
|
return
|
|
|
|
now = time.time()
|
|
timetag = decoded[1]
|
|
if (timetag > 0.) and (timetag > now):
|
|
time.sleep(timetag - now)
|
|
|
|
for msg in decoded[2:]:
|
|
self._unbundle(msg)
|
|
|
|
def connect(self, address):
|
|
self.socket.connect(address)
|
|
self.receiving_thread = threading.Thread(target=self._receiving_thread_entry)
|
|
self.receiving_thread.start()
|
|
|
|
def close(self):
|
|
# let socket time out
|
|
self._running = False
|
|
self.receiving_thread.join()
|
|
self.socket.close()
|
|
|
|
def _transmitWithTimeout(self, data):
|
|
sent = 0
|
|
while sent < len(data):
|
|
try:
|
|
tmp = self.socket.send(data[sent:])
|
|
except socket.timeout:
|
|
if not self._running:
|
|
print("CLIENT: Socket timed out and termination requested.")
|
|
return False
|
|
else:
|
|
continue
|
|
except socket.error as e:
|
|
if e[0] == errno.ECONNRESET:
|
|
print("CLIENT: Connection reset by peer.")
|
|
return False
|
|
else:
|
|
raise e
|
|
if tmp == 0:
|
|
return False
|
|
sent += tmp
|
|
return True
|
|
|
|
def _transmitMsgWithTimeout(self, msg):
|
|
if not isinstance(msg, OSCMessage):
|
|
raise TypeError("'msg' argument is not an OSCMessage or OSCBundle object")
|
|
binary = msg.getBinary()
|
|
length = len(binary)
|
|
# prepend length of packet before the actual message (big endian)
|
|
len_big_endian = array.array('c', '\0' * 4)
|
|
struct.pack_into(">L", len_big_endian, 0, length)
|
|
len_big_endian = len_big_endian.tostring()
|
|
if self._transmitWithTimeout(len_big_endian) and self._transmitWithTimeout(binary):
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def sendOSC(self, msg):
|
|
"""Send an OSC message or bundle to the server. Returns True on success.
|
|
"""
|
|
self._txMutex.acquire()
|
|
txOk = self._transmitMsgWithTimeout(msg)
|
|
self._txMutex.release()
|
|
return txOk
|
|
|
|
def __str__(self):
|
|
"""Returns a string containing this Client's Class-name, software-version
|
|
and the remote-address it is connected to (if any)
|
|
"""
|
|
out = self.__class__.__name__
|
|
out += " v%s.%s-%s" % version
|
|
addr = self.socket.getpeername()
|
|
if addr:
|
|
out += " connected to osc://%s" % getUrlStr(addr)
|
|
else:
|
|
out += " (unconnected)"
|
|
|
|
return out
|
|
|
|
def __eq__(self, other):
|
|
"""Compare function.
|
|
"""
|
|
if not isinstance(other, self.__class__):
|
|
return False
|
|
|
|
isequal = cmp(self.socket._sock, other.socket._sock)
|
|
if isequal and self.server and other.server:
|
|
return cmp(self.server, other.server)
|
|
|
|
return isequal
|
|
|
|
def __ne__(self, other):
|
|
"""Compare function.
|
|
"""
|
|
return not self.__eq__(other)
|