#!/usr/bin/python3 # -*- coding: utf-8 -*- # -*- mode: Python -*- ''' NozoidUI Client v0.0.1 Input : local midi instruments Output : nozoidtUI server Websocket INSTALLER https://files.pythonhosted.org/packages/8b/0f/52de51b9b450ed52694208ab952d5af6ebbcbce7f166a48784095d930d8c/websocket_client-0.57.0.tar.gz ''' print("") print("") print("NozoidUI Client") print("v0.0.1") from multiprocessing import Process, Queue, TimeoutError import subprocess import sys import traceback import os import time from rtmidi.midiconstants import (CHANNEL_PRESSURE, CONTROLLER_CHANGE, NOTE_ON, NOTE_OFF, PITCH_BEND, POLY_PRESSURE, PROGRAM_CHANGE) import midi3 from websocket_server import WebsocketServer #import socket import types, time import websocket import socket import struct import argparse argsparser = argparse.ArgumentParser(description="Nozoid : Transform musical parameters in XXX visualisation") argsparser.add_argument("-i","--iport",help="remserial IN port (9090 by default)",type=int) argsparser.add_argument("-o","--oport",help="remserial OUT port (9091 by default)",type=int) argsparser.add_argument("-t","--timeout",help="timeout connection (0.002 s by default)",type=float) args = argsparser.parse_args() nozIP='127.0.0.1' portrcv = 9090 portwrt = 9091 timeout=0.002 if args.iport: portrcv=args.iport if args.oport: portwrt=args.oport if args.timeout: timeout=args.timeout try: import _thread except ImportError: import _thread as thread import time debug = 0 # # webUI server # serverIP = "xrkia.org" # serverIP = "127.0.0.1" # serverIP = "10.8.0.46" wsPORT = 8081 # # Midi part # # nozmidi = "BCR2000 Port 1" # nozmidi = "Arturia BeatStep" # nozmidi = "Virtual Midi A" # nozmidi = "Virtual Sequencer" # nowmidi = "IAC Driver Sequencer Bus 1" nozmidi = "UM-ONE:UM-ONE MIDI 1 20:0" midichanOCS2 = 2 midichanMMO3 = 1 # resetMMO3 = [64,64,0,32,96] # un truc comme ca pour les valeurs de reset ? resetMMO3 = [0] * 32 resetOCS2 = [0] * 32 # /cc cc number value def cc(midichannel, ccnumber, value, mididest): print("NozoidUI Sending Midi channel", midichannel, "cc", ccnumber, "value", value, "to", mididest) #if mididest == "BCR2000 Port 1": midi3.MidiMsg([CONTROLLER_CHANGE+midichannel-1, ccnumber, value], mididest) # /reset nozoids with "default" values def reset(nozoid): print("") print("reseting", nozoid) if nozoid == "mmo3": for ccnumber in range(0,32): midi3.MidiMsg([CONTROLLER_CHANGE+midichanMMO3-1, ccnumber, resetMMO3[ccnumber]], nozmidi) sendWSall("/mmo3/cc/"+str(ccnumber)+" "+str(resetMMO3[ccnumber])) else: for ccnumber in range(0,32): midi3.MidiMsg([CONTROLLER_CHANGE+midichanOCS2-1, ccnumber, resetOCS2[ccnumber]], nozmidi) sendWSall("/ocs2/cc/"+str(ccnumber)+" "+str(resetMMO3[ccnumber])) # # Websocket part # def on_error(ws, error): print(error) def on_close(ws): print("### closed ###") def on_open(ws): def run(*args): try: NozStream() #while True: # time.sleep(1) except Exception: traceback.print_exc() finally: ws.close() print("thread terminating...") _thread.start_new_thread(run, ()) def on_message(ws, message): print("") print(message) if len(message) > 200: message = message[:200]+'..' oscpath = message.split(" ") if debug > 0: #print "Client got from WS", client['id'], "said :", message, "splitted in an oscpath :", oscpath print("Client got from WS said :", message, "splitted in an oscpath :", oscpath) wscommand = oscpath[0].split("/") print("WS command was :",wscommand) if len(oscpath) == 1: args[0] = "noargs" # CC : /device/cc/2 127 elif wscommand[2] == "cc": if wscommand[1] == "ocs2": print("Incoming OCS-2 WS") cc(midichanOCS2, int(wscommand[3]), int(oscpath[1]), nozmidi) else: print("Incoming MMO-3 WS") cc(midichanMMO3, int(wscommand[3]), int(oscpath[1]), nozmidi) # RESET : /device/reset 1 elif wscommand[2] == "reset": if wscommand[1] == "ocs2": reset("ocs2") else: reset("mmo3") # if needed a loop back : WS Client -> server -> WS Client # sendWSall(message) ################## #BEGIN NOZOID PART ################## #backup each oscillator value osc = [0] * 512 oscnumber = 0 Knob = [255] * 255 TimeoutCompt=0 LastNozMsg=struct.pack('>BBBB',255,255,0,0) def knob2cc(s): a1, a2 = 0,65535 b1, b2 = 0, 127 return b1 + ((s - a1) * (b2 - b1) / (a2 - a1)) def lhex(h): return ':'.join(x.encode('hex') for x in h) def Write(sockwrt,stuff): # Send data print('sending',stuff) my_bytes = bytearray() for count in range(len(stuff)): my_bytes.append(stuff[count]) sockwrt.sendall(my_bytes) def Read4(): #print "**SockRcv %s"%SockRcv return Read(SockRcv,4) # Read a given number of bytes def Read(sockrcv,length): # Look for the response amount_received_t = 0 data = "" data = sockrcv.recv(length) amount_received_t = len(data) while amount_received_t < length: amount_expected = length-amount_received_t #print "**data:",lhex(data), #print "**still needing %d byte(s) to read"%amount_expected data_received = sockrcv.recv(amount_expected) amount_received = len(data_received) data += data_received amount_received_t = len(data) if ord(data[0])==0xFF: return data else: while ord(data[0])!=0xFF: #print "*datax0",lhex(data) data=data[1:] data_received = sockrcv.recv(1) data+=data_received #data+=struct.pack('>B',255) #print "*datax1",lhex(data) #raw_input("Hit Enter To Continue...") print("**Shiftin'!!!",lhex(data)) #raw_input("Hit Enter To Continue...") return data #return struct.pack('>BBBB',255,255,0,0) # Nozoids Messages decoding def Msg(NozMsg,nozport): global osc global oscnumber if ord(NozMsg[1]) < 160: (val,) = struct.unpack_from('>H', NozMsg, 2) knob=ord(NozMsg[1])+(nozport*127) #ccknobv= cc knob value #if knob == 30 or knob == 31: if knob >= 30: #return (knob, val) #return (knob, 0) return (knob, 0-32767) ccknobv=knob2cc(val) if Knob[knob] != ccknobv: if Knob[knob] != 255: print("knob %d has changed from %d to %d value"%(knob,Knob[knob],ccknobv)) #raw_input("Hit Enter To Continue...") Knob[knob]=ccknobv #return (knob, val) return (knob, val-32767) if (ord(NozMsg[1]) >= 0xA0 and ord(NozMsg[1]) < 0xF0) or (ord(NozMsg[1]) >= 0xF6 and ord(NozMsg[1]) <= 0xF8): OrdNozMsg=ord(NozMsg[1]) (val,) = struct.unpack_from('>h', NozMsg, 2) oscnumber = ord(NozMsg[1])-0x9F+(nozport*127) oscvalue = val osc[oscnumber]=oscvalue return (oscnumber, oscvalue) # type of nozoid if ord(NozMsg[1]) == 0xF0: #if NozMsg[1] == 0xF0: print("Nozoid type sent:%s"%NozMsg[-2:]) #raw_input("Hit Enter To Continue") noztype[nozport] = NozMsg[-2:] #print "noztype:%s"%noztype return (0xF0, NozMsg[-2:]) if ord(NozMsg[1]) >= 0xF3 and ord(NozMsg[1]) <= 0xF5: #if NozMsg[1] >= 0xF3 and NozMsg[1] <= 0xF5:#an osc value that belong to the [0,65535] interval (CV values ? see Arduino's code of MMO-3 and OSC-2 firmware) (val,) = struct.unpack_from('>H', NozMsg, 2) oscnumber=ord(NozMsg[1])-0x9F+(nozport*127) oscvalue=val-32767 osc[oscnumber]=oscvalue return (oscnumber,oscvalue) if ord(NozMsg[1]) == 0xFA: #a note/keyboard has been sent (val,) = struct.unpack_from('>H', NozMsg, 2) #print("set notekb to", str(val)) return (0xFA, val) if ord(NozMsg[1]) == 0xFF: #an error somewhere ? (val,) = struct.unpack_from('>H', NozMsg, 2) return (0xFF, val) def NozStream(): global osc global oscnumber global TimeoutCompt if SockWrt: Write(SockWrt,[0xFF]) #Write(SockWrt,[0xF6])#VCF #Write(SockWrt,[0xA0])#VCO1 #Write(SockWrt,[0xA1])#VCO2 Write(SockWrt,[0xA3])#LFO2 #SockWrt.close() while True: try: NozMsg = Read4()#here we read ! #print "**NozMsg:%s"%NozMsg LastNozMsg = NozMsg#backing up last good msg #print "NozMsg:",lhex(NozMsg) MsgReturn = Msg(NozMsg,0)#Go decoding Msg and assigning global vars... print("MsgReturn:",MsgReturn) #y=osc[oscnumber] ws.send("/ocs2/OSC"+str(oscnumber)+" "+str(osc[oscnumber])) except socket.timeout: TimeoutCompt+=1 print('**caught a timeout(%d)'%TimeoutCompt) time.sleep(timeout) NozMsg = struct.pack('>BBBB',255,255,0,0)#0xFFFF0000 (ie error) #print "NozMsg:",lhex(NozMsg) # Nozoid reader via remserial # ex : remserial -p 9090 -s "2000000 raw" /dev/ttyACM0 # print("Nozoid Reader connecting...") try: # Create a TCP/IP socket SockRcv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # Connect the socket to the port where the server is listening serverrcv_address = (nozIP, portrcv) SockRcv.connect(serverrcv_address) SockRcv.settimeout(timeout)#timeout 0.002s print('connecting to %s portrcv %s' % serverrcv_address, file=sys.stderr) except: print("** huh! did you launch remserial on %s:%d ?"%(nozIP,portrcv)) SockRcv.close() SockRcv=None input("Hit Enter To Continue...") # # Nozoid write via remserial # ex : remserial -p 9091 -w -s "2000000 raw" /dev/ttyACM0 # print("Nozoid Writer connecting...") try: # Create a TCP/IP socket SockWrt = socket.socket(socket.AF_INET, socket.SOCK_STREAM) serverwrt_address = (nozIP, portwrt) # Connect the socket to the port where the server is listening SockWrt.connect(serverwrt_address) print('connecting to %s portwrt %s' % serverwrt_address, file=sys.stderr) except: print("** huh! did you launch remserial on %s:%d ?"%(nozIP,portwrt)) SockWrt.close() SockWrt=None input("Hit Enter To Continue...") ################### #END OF NOZOID PART ################### print("Running....") def GetTime(): return time.strftime("%a, %d %b %Y %H:%M:%S", time.localtime()) def sendWSall(message): if debug >0: print(GetTime(),"sending to all %s" % (message)) ws.send_message_to_all(message) # Main loop do nothing. Maybe do the webui server ? try: print("") print("Connecting to NozoidUI server...") print("at", serverIP, "port",wsPORT) #websocket.enableTrace(True) ws = websocket.WebSocketApp("ws://"+str(serverIP)+":"+str(wsPORT), on_message = on_message, on_error = on_error, on_close = on_close) midi3.ws = ws midi3.clientmode = True print("Midi Configuration...") print("Midi Destination", nozmidi) midi3.check() ws.on_open = on_open ws.run_forever() #print "That Point Is Never Reach ?" if SockWrt: Write(SockWrt,[0xFF]) print("closing nozoid connections!") SockWrt.shutdown(socket.SHUT_RDWR) SockWrt.close() SockRcv.shutdown(socket.SHUT_RDWR) SockRcv.close() except KeyboardInterrupt: pass # Gently stop on CTRL C print("Fin de NozoidUI.")