#!/usr/bin/python3 # -*- coding: utf-8 -*- # -*- mode: Python -*- ''' Forward /pl pointlist to cli input OSC in END points format : (x,y,color) output CLI in CLI points format : [x,y,color] /pl "[(150.0, 230.0, 255), (170.0, 170.0, 255), (230.0, 170.0, 255), (210.0, 230.0, 255), (150.0, 230.0, 255)]" v0.1.0 Licensed under GNU GPLv3 by Cocoa, Sam Neurohack ''' from __future__ import print_function from OSC3 import OSCServer, OSCClient, OSCMessage import sys from time import sleep import argparse import ast argsparser = argparse.ArgumentParser(description="fromOSC generator") argsparser.add_argument("-i","--ip",help="IP to bind to (0.0.0.0 by default)",default="0.0.0.0",type=str) argsparser.add_argument("-p","--port",help="OSC port to bind to (9002 by default)",default=9002,type=str) argsparser.add_argument("-v","--verbose",action="store_true",help="Verbose output") args = argsparser.parse_args() verbose = args.verbose ip = args.ip port = int(args.port) def debug(*args, **kwargs): if( verbose == False ): return print(*args, file=sys.stderr, **kwargs) oscserver = OSCServer( (ip, port) ) oscserver.timeout = 0 run = True # this method of reporting timeouts only works by convention # that before calling handle_request() field .timed_out is # set to False def handle_timeout(self): self.timed_out = True # funny python's way to add a method to an instance of a class import types oscserver.handle_timeout = types.MethodType(handle_timeout, oscserver) # RAW OSC Frame available ? def OSC_frame(): # clear timed_out flag oscserver.timed_out = False # handle all pending requests then return while not oscserver.timed_out: oscserver.handle_request() # default handler def OSChandler(oscpath, tags, args, source): oscaddress = ''.join(oscpath.split("/")) debug("fromOSC Default OSC Handler got oscpath", oscpath, "from" + str(source[0]), ":", args) #print("OSC address", path) #print("find.. /bhoreal ?", path.find('/bhoreal')) if oscpath == "/pl" and len(args)==1: debug("correct OSC type :'/pl") if validate(args[0]) == True: debug("new pl : ", args[0]) line = args[0].replace("(",'[') line = line.replace(")",']') line = "[{}]".format(line) print(line, flush=True); else: debug("Bad pointlist -> msg trapped.") else: debug("BAD OSC Message : " + oscpath +" " +args[0]) oscserver.addMsgHandler( "default", OSChandler ) def validate(pointlist): state = True if len(pointlist)<9: debug("Not enough characters :", pointlist) state = False if pointlist.find("(") == -1: debug("Bad format : use () not [] for points", pointlist) state = False try: pl = bytes(pointlist, 'ascii') check = ast.literal_eval(pl.decode('ascii')) except: debug("BAD POINTLIST :", pointlist) state = False return state # simulate a "game engine" while run: # do the game stuff: sleep(0.01) # call user script OSC_frame() oscserver.close()