lj-clitools/generators/osc2redis.py

137 lines
3.2 KiB
Python

#!/usr/bin/python3
# -*- coding: utf-8 -*-
# -*- mode: Python -*-
'''
Forward pointlist to redis key
END POINT Format : (x,y,color)
/pl/0/0 "[(150.0, 230.0, 255), (170.0, 170.0, 255), (230.0, 170.0, 255), (210.0, 230.0, 255), (150.0, 230.0, 255)]"
v0.1.0
Licensed under GNU GPLv3
by Cocoa, Sam Neurohack
'''
import sys
from os import path, getcwd
abspath, filename = path.split(path.realpath(__file__ ))
sys.path.insert(0, path.join(abspath,"../lib"))
from clitools import Clitools
from OSC3 import OSCServer, OSCClient, OSCMessage
from time import sleep
import ast
import redis
import argparse
argsparser = argparse.ArgumentParser(description="osc2redis generator")
argsparser.add_argument("-i","--ip",help="IP to bind to (0.0.0.0 by default)",default="0.0.0.0",type=str)
argsparser.add_argument("-p","--port",help="OSC port to bind to (9002 by default)",default=9002,type=str)
argsparser.add_argument("-r","--rip",help="Redis server IP (127.0.0.1 by default)",default="127.0.0.1",type=str)
argsparser.add_argument("-o","--rout",help="Redis port (6379 by default)",default=6379,type=str)
argsparser.add_argument("-v","--verbose",action="store_true",help="Verbose output")
args = argsparser.parse_args()
verbose = args.verbose
ip = args.ip
port = int(args.port)
rip = args.rip
rport = int(args.rout)
r = redis.StrictRedis(host=rip, port=rport, db=0)
def debug(msg):
if( verbose == False ):
return
print(msg)
oscserver = OSCServer( (ip, port) )
oscserver.timeout = 0
run = True
# this method of reporting timeouts only works by convention
# that before calling handle_request() field .timed_out is
# set to False
def handle_timeout(self):
self.timed_out = True
# funny python's way to add a method to an instance of a class
import types
oscserver.handle_timeout = types.MethodType(handle_timeout, oscserver)
def validate(pointlist):
state = True
if len(pointlist)<9:
state = False
try:
pl = bytes(pointlist, 'ascii')
check = ast.literal_eval(pl.decode('ascii'))
except:
state = False
return state
# RAW OSC Frame available ?
def OSC_frame():
# clear timed_out flag
oscserver.timed_out = False
# handle all pending requests then return
while not oscserver.timed_out:
oscserver.handle_request()
# default handler
def OSChandler(oscpath, tags, args, source):
oscaddress = ''.join(oscpath.split("/"))
print("fromOSC Default OSC Handler got oscpath :", oscpath, "from :" + str(source[0]), "args :", args)
print(oscpath.find("/pl/"), len(oscpath))
if oscpath.find("/pl/") ==0 and len(args)==1:
print("correct OSC type :'/pl/")
if validate(args[0]) == True and len(oscpath) == 7:
print("new pl for key ", oscpath, ":", args[0])
if r.set(oscpath,args[0])==True:
debug("exports::redis set("+str(oscpath)+") to "+args[0])
else:
print("Bad pointlist -> msg trapped.")
else:
print("BAD OSC Message :", oscpath)
oscserver.addMsgHandler( "default", OSChandler )
# simulate a "game engine"
while run:
# do the game stuff:
sleep(0.01)
# call user script
OSC_frame()
oscserver.close()