forked from protonphoton/LJ
127 lines
3.1 KiB
Python
127 lines
3.1 KiB
Python
|
#!/usr/bin/python3
|
||
|
# -*- coding: utf-8 -*-
|
||
|
# -*- mode: Python -*-
|
||
|
|
||
|
'''
|
||
|
Forward /pl pointlist to cli
|
||
|
|
||
|
input OSC in END points format : (x,y,color)
|
||
|
output CLI in CLI points format : [x,y,color]
|
||
|
|
||
|
/pl "[(150.0, 230.0, 255), (170.0, 170.0, 255), (230.0, 170.0, 255), (210.0, 230.0, 255), (150.0, 230.0, 255)]"
|
||
|
|
||
|
v0.1.0
|
||
|
|
||
|
LICENCE : CC
|
||
|
|
||
|
by Cocoa, Sam Neurohack
|
||
|
|
||
|
'''
|
||
|
from __future__ import print_function
|
||
|
from OSC3 import OSCServer, OSCClient, OSCMessage
|
||
|
import sys
|
||
|
from time import sleep
|
||
|
import argparse
|
||
|
import ast
|
||
|
|
||
|
argsparser = argparse.ArgumentParser(description="fromOSC generator")
|
||
|
argsparser.add_argument("-i","--ip",help="IP to bind to (0.0.0.0 by default)",default="0.0.0.0",type=str)
|
||
|
argsparser.add_argument("-p","--port",help="OSC port to bind to (9002 by default)",default=9002,type=str)
|
||
|
argsparser.add_argument("-v","--verbose",action="store_true",help="Verbose output")
|
||
|
args = argsparser.parse_args()
|
||
|
|
||
|
verbose = args.verbose
|
||
|
ip = args.ip
|
||
|
port = int(args.port)
|
||
|
|
||
|
def debug(*args, **kwargs):
|
||
|
if( verbose == False ):
|
||
|
return
|
||
|
print(*args, file=sys.stderr, **kwargs)
|
||
|
|
||
|
oscserver = OSCServer( (ip, port) )
|
||
|
oscserver.timeout = 0
|
||
|
run = True
|
||
|
|
||
|
# this method of reporting timeouts only works by convention
|
||
|
# that before calling handle_request() field .timed_out is
|
||
|
# set to False
|
||
|
def handle_timeout(self):
|
||
|
self.timed_out = True
|
||
|
|
||
|
# funny python's way to add a method to an instance of a class
|
||
|
import types
|
||
|
oscserver.handle_timeout = types.MethodType(handle_timeout, oscserver)
|
||
|
|
||
|
# RAW OSC Frame available ?
|
||
|
def OSC_frame():
|
||
|
# clear timed_out flag
|
||
|
oscserver.timed_out = False
|
||
|
# handle all pending requests then return
|
||
|
while not oscserver.timed_out:
|
||
|
oscserver.handle_request()
|
||
|
|
||
|
|
||
|
# default handler
|
||
|
def OSChandler(oscpath, tags, args, source):
|
||
|
|
||
|
oscaddress = ''.join(oscpath.split("/"))
|
||
|
debug("fromOSC Default OSC Handler got oscpath", oscpath, "from" + str(source[0]), ":", args)
|
||
|
#print("OSC address", path)
|
||
|
#print("find.. /bhoreal ?", path.find('/bhoreal'))
|
||
|
|
||
|
if oscpath == "/pl" and len(args)==1:
|
||
|
|
||
|
debug("correct OSC type :'/pl")
|
||
|
|
||
|
if validate(args[0]) == True:
|
||
|
|
||
|
debug("new pl : ", args[0])
|
||
|
line = args[0].replace("(",'[')
|
||
|
line = line.replace(")",']')
|
||
|
line = "[{}]".format(line)
|
||
|
print(line, flush=True);
|
||
|
|
||
|
else:
|
||
|
debug("Bad pointlist -> msg trapped.")
|
||
|
|
||
|
else:
|
||
|
debug("BAD OSC Message : " + oscpath +" " +args[0])
|
||
|
|
||
|
|
||
|
oscserver.addMsgHandler( "default", OSChandler )
|
||
|
|
||
|
|
||
|
def validate(pointlist):
|
||
|
|
||
|
state = True
|
||
|
|
||
|
if len(pointlist)<9:
|
||
|
debug("Not enough characters :", pointlist)
|
||
|
state = False
|
||
|
|
||
|
if pointlist.find("(") == -1:
|
||
|
debug("Bad format : use () not [] for points", pointlist)
|
||
|
state = False
|
||
|
|
||
|
try:
|
||
|
pl = bytes(pointlist, 'ascii')
|
||
|
check = ast.literal_eval(pl.decode('ascii'))
|
||
|
|
||
|
except:
|
||
|
debug("BAD POINTLIST :", pointlist)
|
||
|
state = False
|
||
|
|
||
|
return state
|
||
|
|
||
|
|
||
|
# simulate a "game engine"
|
||
|
while run:
|
||
|
# do the game stuff:
|
||
|
sleep(0.01)
|
||
|
# call user script
|
||
|
OSC_frame()
|
||
|
|
||
|
oscserver.close()
|
||
|
|