123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126 |
- #!/usr/bin/python3
- # -*- coding: utf-8 -*-
- # -*- mode: Python -*-
-
- '''
- Forward /pl pointlist to cli
-
- input OSC in END points format : (x,y,color)
- output CLI in CLI points format : [x,y,color]
-
- /pl "[(150.0, 230.0, 255), (170.0, 170.0, 255), (230.0, 170.0, 255), (210.0, 230.0, 255), (150.0, 230.0, 255)]"
-
- v0.1.0
-
- LICENCE : CC
-
- by Cocoa, Sam Neurohack
-
- '''
- from __future__ import print_function
- from OSC3 import OSCServer, OSCClient, OSCMessage
- import sys
- from time import sleep
- import argparse
- import ast
-
- argsparser = argparse.ArgumentParser(description="fromOSC generator")
- argsparser.add_argument("-i","--ip",help="IP to bind to (0.0.0.0 by default)",default="0.0.0.0",type=str)
- argsparser.add_argument("-p","--port",help="OSC port to bind to (9002 by default)",default=9002,type=str)
- argsparser.add_argument("-v","--verbose",action="store_true",help="Verbose output")
- args = argsparser.parse_args()
-
- verbose = args.verbose
- ip = args.ip
- port = int(args.port)
-
- def debug(*args, **kwargs):
- if( verbose == False ):
- return
- print(*args, file=sys.stderr, **kwargs)
-
- oscserver = OSCServer( (ip, port) )
- oscserver.timeout = 0
- run = True
-
- # this method of reporting timeouts only works by convention
- # that before calling handle_request() field .timed_out is
- # set to False
- def handle_timeout(self):
- self.timed_out = True
-
- # funny python's way to add a method to an instance of a class
- import types
- oscserver.handle_timeout = types.MethodType(handle_timeout, oscserver)
-
- # RAW OSC Frame available ?
- def OSC_frame():
- # clear timed_out flag
- oscserver.timed_out = False
- # handle all pending requests then return
- while not oscserver.timed_out:
- oscserver.handle_request()
-
-
- # default handler
- def OSChandler(oscpath, tags, args, source):
-
- oscaddress = ''.join(oscpath.split("/"))
- debug("fromOSC Default OSC Handler got oscpath", oscpath, "from" + str(source[0]), ":", args)
- #print("OSC address", path)
- #print("find.. /bhoreal ?", path.find('/bhoreal'))
-
- if oscpath == "/pl" and len(args)==1:
-
- debug("correct OSC type :'/pl")
-
- if validate(args[0]) == True:
-
- debug("new pl : ", args[0])
- line = args[0].replace("(",'[')
- line = line.replace(")",']')
- line = "[{}]".format(line)
- print(line, flush=True);
-
- else:
- debug("Bad pointlist -> msg trapped.")
-
- else:
- debug("BAD OSC Message : " + oscpath +" " +args[0])
-
-
- oscserver.addMsgHandler( "default", OSChandler )
-
-
- def validate(pointlist):
-
- state = True
-
- if len(pointlist)<9:
- debug("Not enough characters :", pointlist)
- state = False
-
- if pointlist.find("(") == -1:
- debug("Bad format : use () not [] for points", pointlist)
- state = False
-
- try:
- pl = bytes(pointlist, 'ascii')
- check = ast.literal_eval(pl.decode('ascii'))
-
- except:
- debug("BAD POINTLIST :", pointlist)
- state = False
-
- return state
-
-
- # simulate a "game engine"
- while run:
- # do the game stuff:
- sleep(0.01)
- # call user script
- OSC_frame()
-
- oscserver.close()
|