jamidi/nozWS.py

452 lines
11 KiB
Python
Executable File

#!/usr/bin/python3
# -*- coding: utf-8 -*-
# -*- mode: Python -*-
'''
NozoidUI Client v0.0.1
Input : local midi instruments
Output : nozoidtUI server
Websocket INSTALLER
https://files.pythonhosted.org/packages/8b/0f/52de51b9b450ed52694208ab952d5af6ebbcbce7f166a48784095d930d8c/websocket_client-0.57.0.tar.gz
'''
print("")
print("")
print("NozoidUI Client")
print("v0.0.1")
from multiprocessing import Process, Queue, TimeoutError
import subprocess
import sys
import traceback
import os
import time
from rtmidi.midiconstants import (CHANNEL_PRESSURE, CONTROLLER_CHANGE, NOTE_ON, NOTE_OFF,
PITCH_BEND, POLY_PRESSURE, PROGRAM_CHANGE)
import midi3
from websocket_server import WebsocketServer
#import socket
import types, time
import websocket
import socket
import struct
import argparse
argsparser = argparse.ArgumentParser(description="Nozoid : Transform musical parameters in XXX visualisation")
argsparser.add_argument("-i","--iport",help="remserial IN port (9090 by default)",type=int)
argsparser.add_argument("-o","--oport",help="remserial OUT port (9091 by default)",type=int)
argsparser.add_argument("-t","--timeout",help="timeout connection (0.002 s by default)",type=float)
args = argsparser.parse_args()
nozIP='127.0.0.1'
portrcv = 9090
portwrt = 9091
timeout=0.002
if args.iport:
portrcv=args.iport
if args.oport:
portwrt=args.oport
if args.timeout:
timeout=args.timeout
try:
import _thread
except ImportError:
import _thread as thread
import time
debug = 0
#
# webUI server
#
serverIP = "xrkia.org"
# serverIP = "127.0.0.1"
# serverIP = "10.8.0.46"
wsPORT = 8081
#
# Midi part
#
# nozmidi = "BCR2000 Port 1"
# nozmidi = "Arturia BeatStep"
# nozmidi = "Virtual Midi A"
# nozmidi = "Virtual Sequencer"
# nowmidi = "IAC Driver Sequencer Bus 1"
nozmidi = "UM-ONE:UM-ONE MIDI 1 20:0"
midichanOCS2 = 2
midichanMMO3 = 1
# resetMMO3 = [64,64,0,32,96] # un truc comme ca pour les valeurs de reset ?
resetMMO3 = [0] * 32
resetOCS2 = [0] * 32
# /cc cc number value
def cc(midichannel, ccnumber, value, mididest):
print("NozoidUI Sending Midi channel", midichannel, "cc", ccnumber, "value", value, "to", mididest)
#if mididest == "BCR2000 Port 1":
midi3.MidiMsg([CONTROLLER_CHANGE+midichannel-1, ccnumber, value], mididest)
# /reset nozoids with "default" values
def reset(nozoid):
print("")
print("reseting", nozoid)
if nozoid == "mmo3":
for ccnumber in range(0,32):
midi3.MidiMsg([CONTROLLER_CHANGE+midichanMMO3-1, ccnumber, resetMMO3[ccnumber]], nozmidi)
sendWSall("/mmo3/cc/"+str(ccnumber)+" "+str(resetMMO3[ccnumber]))
else:
for ccnumber in range(0,32):
midi3.MidiMsg([CONTROLLER_CHANGE+midichanOCS2-1, ccnumber, resetOCS2[ccnumber]], nozmidi)
sendWSall("/ocs2/cc/"+str(ccnumber)+" "+str(resetMMO3[ccnumber]))
#
# Websocket part
#
def on_error(ws, error):
print(error)
def on_close(ws):
print("### closed ###")
def on_open(ws):
def run(*args):
try:
NozStream()
#while True:
# time.sleep(1)
except Exception:
traceback.print_exc()
finally:
ws.close()
print("thread terminating...")
_thread.start_new_thread(run, ())
def on_message(ws, message):
print("")
print(message)
if len(message) > 200:
message = message[:200]+'..'
oscpath = message.split(" ")
if debug > 0:
#print "Client got from WS", client['id'], "said :", message, "splitted in an oscpath :", oscpath
print("Client got from WS said :", message, "splitted in an oscpath :", oscpath)
wscommand = oscpath[0].split("/")
print("WS command was :",wscommand)
if len(oscpath) == 1:
args[0] = "noargs"
# CC : /device/cc/2 127
elif wscommand[2] == "cc":
if wscommand[1] == "ocs2":
print("Incoming OCS-2 WS")
cc(midichanOCS2, int(wscommand[3]), int(oscpath[1]), nozmidi)
else:
print("Incoming MMO-3 WS")
cc(midichanMMO3, int(wscommand[3]), int(oscpath[1]), nozmidi)
# RESET : /device/reset 1
elif wscommand[2] == "reset":
if wscommand[1] == "ocs2":
reset("ocs2")
else:
reset("mmo3")
# if needed a loop back : WS Client -> server -> WS Client
# sendWSall(message)
##################
#BEGIN NOZOID PART
##################
#backup each oscillator value
osc = [0] * 512
oscnumber = 0
Knob = [255] * 255
TimeoutCompt=0
LastNozMsg=struct.pack('>BBBB',255,255,0,0)
def knob2cc(s):
a1, a2 = 0,65535
b1, b2 = 0, 127
return b1 + ((s - a1) * (b2 - b1) / (a2 - a1))
def lhex(h):
return ':'.join(x.encode('hex') for x in h)
def Write(sockwrt,stuff):
# Send data
print('sending',stuff)
my_bytes = bytearray()
for count in range(len(stuff)):
my_bytes.append(stuff[count])
sockwrt.sendall(my_bytes)
def Read4():
#print "**SockRcv %s"%SockRcv
return Read(SockRcv,4)
# Read a given number of bytes
def Read(sockrcv,length):
# Look for the response
amount_received_t = 0
data = ""
data = sockrcv.recv(length)
amount_received_t = len(data)
while amount_received_t < length:
amount_expected = length-amount_received_t
#print "**data:",lhex(data),
#print "**still needing %d byte(s) to read"%amount_expected
data_received = sockrcv.recv(amount_expected)
amount_received = len(data_received)
data += data_received
amount_received_t = len(data)
if ord(data[0])==0xFF:
return data
else:
while ord(data[0])!=0xFF:
#print "*datax0",lhex(data)
data=data[1:]
data_received = sockrcv.recv(1)
data+=data_received
#data+=struct.pack('>B',255)
#print "*datax1",lhex(data)
#raw_input("Hit Enter To Continue...")
print("**Shiftin'!!!",lhex(data))
#raw_input("Hit Enter To Continue...")
return data
#return struct.pack('>BBBB',255,255,0,0)
# Nozoids Messages decoding
def Msg(NozMsg,nozport):
global osc
global oscnumber
if ord(NozMsg[1]) < 160:
(val,) = struct.unpack_from('>H', NozMsg, 2)
knob=ord(NozMsg[1])+(nozport*127)
#ccknobv= cc knob value
#if knob == 30 or knob == 31:
if knob >= 30:
#return (knob, val)
#return (knob, 0)
return (knob, 0-32767)
ccknobv=knob2cc(val)
if Knob[knob] != ccknobv:
if Knob[knob] != 255:
print("knob %d has changed from %d to %d value"%(knob,Knob[knob],ccknobv))
#raw_input("Hit Enter To Continue...")
Knob[knob]=ccknobv
#return (knob, val)
return (knob, val-32767)
if (ord(NozMsg[1]) >= 0xA0 and ord(NozMsg[1]) < 0xF0) or (ord(NozMsg[1]) >= 0xF6 and ord(NozMsg[1]) <= 0xF8):
OrdNozMsg=ord(NozMsg[1])
(val,) = struct.unpack_from('>h', NozMsg, 2)
oscnumber = ord(NozMsg[1])-0x9F+(nozport*127)
oscvalue = val
osc[oscnumber]=oscvalue
return (oscnumber, oscvalue)
# type of nozoid
if ord(NozMsg[1]) == 0xF0:
#if NozMsg[1] == 0xF0:
print("Nozoid type sent:%s"%NozMsg[-2:])
#raw_input("Hit Enter To Continue")
noztype[nozport] = NozMsg[-2:]
#print "noztype:%s"%noztype
return (0xF0, NozMsg[-2:])
if ord(NozMsg[1]) >= 0xF3 and ord(NozMsg[1]) <= 0xF5:
#if NozMsg[1] >= 0xF3 and NozMsg[1] <= 0xF5:#an osc value that belong to the [0,65535] interval (CV values ? see Arduino's code of MMO-3 and OSC-2 firmware)
(val,) = struct.unpack_from('>H', NozMsg, 2)
oscnumber=ord(NozMsg[1])-0x9F+(nozport*127)
oscvalue=val-32767
osc[oscnumber]=oscvalue
return (oscnumber,oscvalue)
if ord(NozMsg[1]) == 0xFA: #a note/keyboard has been sent
(val,) = struct.unpack_from('>H', NozMsg, 2)
#print("set notekb to", str(val))
return (0xFA, val)
if ord(NozMsg[1]) == 0xFF: #an error somewhere ?
(val,) = struct.unpack_from('>H', NozMsg, 2)
return (0xFF, val)
def NozStream():
global osc
global oscnumber
global TimeoutCompt
if SockWrt:
Write(SockWrt,[0xFF])
#Write(SockWrt,[0xF6])#VCF
#Write(SockWrt,[0xA0])#VCO1
#Write(SockWrt,[0xA1])#VCO2
Write(SockWrt,[0xA3])#LFO2
#SockWrt.close()
while True:
try:
NozMsg = Read4()#here we read !
#print "**NozMsg:%s"%NozMsg
LastNozMsg = NozMsg#backing up last good msg
#print "NozMsg:",lhex(NozMsg)
MsgReturn = Msg(NozMsg,0)#Go decoding Msg and assigning global vars...
print("MsgReturn:",MsgReturn)
#y=osc[oscnumber]
ws.send("/ocs2/OSC"+str(oscnumber)+" "+str(osc[oscnumber]))
except socket.timeout:
TimeoutCompt+=1
print('**caught a timeout(%d)'%TimeoutCompt)
time.sleep(timeout)
NozMsg = struct.pack('>BBBB',255,255,0,0)#0xFFFF0000 (ie error)
#print "NozMsg:",lhex(NozMsg)
# Nozoid reader via remserial
# ex : remserial -p 9090 -s "2000000 raw" /dev/ttyACM0
#
print("Nozoid Reader connecting...")
try:
# Create a TCP/IP socket
SockRcv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Connect the socket to the port where the server is listening
serverrcv_address = (nozIP, portrcv)
SockRcv.connect(serverrcv_address)
SockRcv.settimeout(timeout)#timeout 0.002s
print('connecting to %s portrcv %s' % serverrcv_address, file=sys.stderr)
except:
print("** huh! did you launch remserial on %s:%d ?"%(nozIP,portrcv))
SockRcv.close()
SockRcv=None
input("Hit Enter To Continue...")
#
# Nozoid write via remserial
# ex : remserial -p 9091 -w -s "2000000 raw" /dev/ttyACM0
#
print("Nozoid Writer connecting...")
try:
# Create a TCP/IP socket
SockWrt = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serverwrt_address = (nozIP, portwrt)
# Connect the socket to the port where the server is listening
SockWrt.connect(serverwrt_address)
print('connecting to %s portwrt %s' % serverwrt_address, file=sys.stderr)
except:
print("** huh! did you launch remserial on %s:%d ?"%(nozIP,portwrt))
SockWrt.close()
SockWrt=None
input("Hit Enter To Continue...")
###################
#END OF NOZOID PART
###################
print("Running....")
def GetTime():
return time.strftime("%a, %d %b %Y %H:%M:%S", time.localtime())
def sendWSall(message):
if debug >0:
print(GetTime(),"sending to all %s" % (message))
ws.send_message_to_all(message)
# Main loop do nothing. Maybe do the webui server ?
try:
print("")
print("Connecting to NozoidUI server...")
print("at", serverIP, "port",wsPORT)
#websocket.enableTrace(True)
ws = websocket.WebSocketApp("ws://"+str(serverIP)+":"+str(wsPORT),
on_message = on_message,
on_error = on_error,
on_close = on_close)
midi3.ws = ws
midi3.clientmode = True
print("Midi Configuration...")
print("Midi Destination", nozmidi)
midi3.check()
ws.on_open = on_open
ws.run_forever()
#print "That Point Is Never Reach ?"
if SockWrt:
Write(SockWrt,[0xFF])
print("closing nozoid connections!")
SockWrt.shutdown(socket.SHUT_RDWR)
SockWrt.close()
SockRcv.shutdown(socket.SHUT_RDWR)
SockRcv.close()
except KeyboardInterrupt:
pass
# Gently stop on CTRL C
print("Fin de NozoidUI.")