You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

fromOSC.py 3.1KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. #!/usr/bin/python3
  2. # -*- coding: utf-8 -*-
  3. # -*- mode: Python -*-
  4. '''
  5. Forward /pl pointlist to cli
  6. input OSC in END points format : (x,y,color)
  7. output CLI in CLI points format : [x,y,color]
  8. /pl "[(150.0, 230.0, 255), (170.0, 170.0, 255), (230.0, 170.0, 255), (210.0, 230.0, 255), (150.0, 230.0, 255)]"
  9. v0.1.0
  10. LICENCE : CC
  11. by Cocoa, Sam Neurohack
  12. '''
  13. from __future__ import print_function
  14. from OSC3 import OSCServer, OSCClient, OSCMessage
  15. import sys
  16. from time import sleep
  17. import argparse
  18. import ast
  19. argsparser = argparse.ArgumentParser(description="fromOSC generator")
  20. argsparser.add_argument("-i","--ip",help="IP to bind to (0.0.0.0 by default)",default="0.0.0.0",type=str)
  21. argsparser.add_argument("-p","--port",help="OSC port to bind to (9002 by default)",default=9002,type=str)
  22. argsparser.add_argument("-v","--verbose",action="store_true",help="Verbose output")
  23. args = argsparser.parse_args()
  24. verbose = args.verbose
  25. ip = args.ip
  26. port = int(args.port)
  27. def debug(*args, **kwargs):
  28. if( verbose == False ):
  29. return
  30. print(*args, file=sys.stderr, **kwargs)
  31. oscserver = OSCServer( (ip, port) )
  32. oscserver.timeout = 0
  33. run = True
  34. # this method of reporting timeouts only works by convention
  35. # that before calling handle_request() field .timed_out is
  36. # set to False
  37. def handle_timeout(self):
  38. self.timed_out = True
  39. # funny python's way to add a method to an instance of a class
  40. import types
  41. oscserver.handle_timeout = types.MethodType(handle_timeout, oscserver)
  42. # RAW OSC Frame available ?
  43. def OSC_frame():
  44. # clear timed_out flag
  45. oscserver.timed_out = False
  46. # handle all pending requests then return
  47. while not oscserver.timed_out:
  48. oscserver.handle_request()
  49. # default handler
  50. def OSChandler(oscpath, tags, args, source):
  51. oscaddress = ''.join(oscpath.split("/"))
  52. debug("fromOSC Default OSC Handler got oscpath", oscpath, "from" + str(source[0]), ":", args)
  53. #print("OSC address", path)
  54. #print("find.. /bhoreal ?", path.find('/bhoreal'))
  55. if oscpath == "/pl" and len(args)==1:
  56. debug("correct OSC type :'/pl")
  57. if validate(args[0]) == True:
  58. debug("new pl : ", args[0])
  59. line = args[0].replace("(",'[')
  60. line = line.replace(")",']')
  61. line = "[{}]".format(line)
  62. print(line, flush=True);
  63. else:
  64. debug("Bad pointlist -> msg trapped.")
  65. else:
  66. debug("BAD OSC Message : " + oscpath +" " +args[0])
  67. oscserver.addMsgHandler( "default", OSChandler )
  68. def validate(pointlist):
  69. state = True
  70. if len(pointlist)<9:
  71. debug("Not enough characters :", pointlist)
  72. state = False
  73. if pointlist.find("(") == -1:
  74. debug("Bad format : use () not [] for points", pointlist)
  75. state = False
  76. try:
  77. pl = bytes(pointlist, 'ascii')
  78. check = ast.literal_eval(pl.decode('ascii'))
  79. except:
  80. debug("BAD POINTLIST :", pointlist)
  81. state = False
  82. return state
  83. # simulate a "game engine"
  84. while run:
  85. # do the game stuff:
  86. sleep(0.01)
  87. # call user script
  88. OSC_frame()
  89. oscserver.close()