jamidi/client.py
2020-04-22 17:44:30 +02:00

412 lines
12 KiB
Python

#!/usr/bin/python3
# -*- coding: utf-8 -*-
# -*- mode: Python -*-
'''
Jamidi Client v0.1b
Input : local midi (hardware/software) instruments
Output : Jamidi server
ws = websocket.WebSocketApp("ws://"+str(serverIP)+":"+str(wsPORT),
on_message = on_message,
on_error = on_error,
on_close = on_close)
ws.on_open = on_open
ws.run_forever()
ws.send(message)
ws.send_message_to_all(msg = message)
ws.close()
'''
print("")
print("")
print("Jamidi Client")
print("v0.1b")
#from multiprocessing import Process, Queue, TimeoutError
#import subprocess
import sys
import traceback
import os
import time
import json
from rtmidi.midiconstants import (CHANNEL_PRESSURE, CONTROLLER_CHANGE, NOTE_ON, NOTE_OFF,
PITCH_BEND, POLY_PRESSURE, PROGRAM_CHANGE)
sys.path.append('libs/')
import midi3
import types
import websocket
try:
import _thread
except ImportError:
import _thread as thread
import argparse
debug = 1
print ("")
print ("Arguments parsing if needed...")
argsparser = argparse.ArgumentParser(description="Jamidi Client v0.1b commands help mode")
argsparser.add_argument("-s","--servername",help="servername: 'local', 'xrkia' ('local' by default)", type=str)
argsparser.add_argument('--default',help="All incoming midi <-> default midi device. Default option." , dest='default', action='store_true')
argsparser.add_argument('--no-default',help="Do not send reset values to local device a startup.", dest='default', action='store_false')
argsparser.set_defaults(default=True)
args = argsparser.parse_args()
# Server
if args.servername:
servername = args.servername
else:
servername = "local"
# Default midi device like controller
if args.default == False:
print("Default device disabled")
defaultdevice = False
else:
print("Default device enabled")
defaultdevice = True
#
# Midi part
#
# default local MIDI device
nozmidi = "BCR2000 Port 1"
# nozmidi = "UM-ONE:UM-ONE MIDI 1 20:0"
midichanOCS2 = 2
midichanMMO3 = 1
# resetMMO3 = [64,64,0,32,96] # un truc comme ca pour les valeurs de reset ?
resetMMO3 = [0] * 32
resetOCS2 = [0] * 32
songs = ["song1", "song2"]
song = 0
def GetTime():
return time.strftime("%a, %d %b %Y %H:%M:%S", time.localtime())
#
# Settings from rules.json
#
# Load midi routing definitions in clientruler.json
def LoadMidiRules():
global MidiRules, nbmidirule
if os.path.exists('rules.json'):
f=open("rules.json","r")
s = f.read()
MidiRules = json.loads(s)
# return midi rulename number for given type 'Specials', 'cc2cc'
def findMidiRules(rulename,ruletype):
#print("searching", midirulename,'...')
position = -1
for counter in range(len(MidiRules[ruletype])):
if rulename == MidiRules[ruletype][counter]['name']:
#print(rulename, "is ", counter)
position = counter
return position
#
# Settings from jamidi.json
#
# Load midi definitions in jamidi.json
def LoadConfs():
global Confs, nbmidiconf
if os.path.exists('jamidi.json'):
f=open("jamidi.json","r")
s = f.read()
Confs = json.loads(s)
# return midi confname number for given type
def findConfs(confname,conftype):
#print("searching", midiconfname,'...')
position = -1
for counter in range(len(Confs[conftype])):
if confname == Confs[conftype][counter]['name']:
#print(confname, "is ", counter)
position = counter
return position
def curved(value):
return round(np.sqrt(value)*11.27)
# /cc cc number value
def cc(midichannel, ccnumber, value, mididest):
#print "cc()"
print("Jamidi Sending locally Midi channel", midichannel, "cc", ccnumber, "value", value, "to", mididest)
# Apply CC routing ?
if len(MidiRules["cc2cc"]) > 0 :
#print "cc2cc test for channel", midichannel, "CC", ccnumber, "val", value, "song", songs[song]
for counter in range(len(MidiRules["cc2cc"])):
if (MidiRules["cc2cc"][counter]["songname"] == songs[song] or MidiRules["cc2cc"][counter]["songname"] == "all") and (MidiRules["cc2cc"][counter]["chanIN"] == midichannel or MidiRules["cc2cc"][counter]["chanIN"] == "all") and (MidiRules["cc2cc"][counter]["ccs"] == ccnumber or MidiRules["cc2cc"][counter]["ccs"] == "all"):
print("cc2cc routing for song",MidiRules["cc2cc"][counter]["songname"], ":", MidiRules["cc2cc"][counter]["name"])
#print("cc2cc got song :", MidiRules["cc2cc"][counter]["songname"]," IN Channel :", MidiRules["cc2cc"][counter]["chanIN"]," Code :", MidiRules["cc2cc"][counter]["code"], " value :",MidiRules["ZnotesLcc"][counter]["valuetype"], )
if MidiRules["cc2cc"][counter]["valuetype"] == "linear":
print("Linear", MidiVal)
midi3.MidiMsg((176 + MidiRules["cc2cc"][counter]["chanOUT"], MidiRules["cc2cc"][counter]["ccOUT"], MidiVal), MidiRules["cc2cc"][counter]["mididest"])
if MidiRules["cc2cc"][counter]["valuetype"] == "curved":
print(MidiVal,"got curved", curved(MidiVal))
midi3.MidiMsg((176 + MidiRules["cc2cc"][counter]["chanOUT"], MidiRules["cc2cc"][counter]["ccOUT"], curved(MidiVal)), MidiRules["cc2cc"][counter]["mididest"])
#else:
print("Jamidi Sending locally Midi channel", midichannel, "cc", ccnumber, "value", value, "to", mididest)
#if mididest == "BCR2000 Port 1":
midi3.MidiMsg([CONTROLLER_CHANGE+midichannel-1, ccnumber, value], mididest)
'''
# Specials features ?
if len(MidiRules['Specials']) > 0:
for counter in range(len(MidiRules["Specials"])):
# print()
# print("Name", MidiRules["Specials"][counter]["name"])
# print("Song", MidiRules["Specials"][counter]["songname"], songs[song]) # name, "all"
# print("Channel", MidiRules["Specials"][ counter]["chanIN"], MidiChannel) # number, "all"
# print("Note", MidiRules["Specials"][counter]["notes"], MidiNote) # number, "all"
# print("Notetype", MidiRules["Specials"][counter]["notetype"], "on") # "on", "off", "all"
if (MidiRules["Specials"][counter]["songname"] == songs[song] or MidiRules["Specials"][counter]["songname"] == "all") and (MidiRules["Specials"][counter]["chanIN"] == MidiChannel or MidiRules["Specials"][counter]["chanIN"] == "all") and (MidiRules["Specials"][counter]["notes"] == MidiNote or MidiRules["Specials"][counter]["notes"] == "all") and (MidiRules["Specials"][counter]["notetype"] == "off" or MidiRules["Specials"][counter]["notetype"] == "all") :
midirulecode = MidiRules["Specials"][counter]["code"]
print("Specials function :",MidiRules["Specials"][counter]["songname"], ":", MidiRules["Specials"][counter]["name"], midirulecode)
# python function
if midirulecode.count('.') > 0:
#print(midirulecode+"("+str(MidiNote)+')')
eval(midirulecode+"("+str(MidiNote)+')')
# Maxwell function
elif midirulecode.count('/') > 0:
#print("Specials NoteON got :", MidiRules["Specials"][counter]["songname"]," IN Channel :", MidiRules["Specials"][counter]["chanIN"]," Code :", midirulecode, " CC", maxwellccs.FindCC(MidiRules["Specials"][counter]["code"]), " value :",MidiRules["Specials"][counter]["valuetype"], " laser :", MidiRules["ZccLcc"][counter]["laser"] )
midi3.MidiMsg((CONTROLLER_CHANGE, maxwellccs.FindCC(MidiRules["Specials"][counter]["code"]), MidiRules["Specials"][counter]["valuetype"]), mididest, laser = MidiRules["Specials"][counter]["laser"])
'''
# /reset nozoids with "default" values
def reset(nozoid):
print("")
print(GetTime(),"reseting", nozoid)
if nozoid == "mmo3":
for ccnumber in range(0,32):
midi3.MidiMsg([CONTROLLER_CHANGE+Confs["mmo3"][0]["midichan"]-1, ccnumber, resetMMO3[ccnumber]], Confs["mmo3"][0]["mididevice"])
sendWSall("/mmo3/cc/"+str(ccnumber)+" "+str(resetMMO3[ccnumber]))
crtvalueMMO3[ccnumber]=resetMMO3[ccnumber]
else:
for ccnumber in range(0,32):
midi3.MidiMsg([CONTROLLER_CHANGE+Confs["ocs2"][0]["midichan"]-1, ccnumber, resetOCS2[ccnumber]], Confs["ocs2"][0]["mididevice"])
sendWSall("/ocs2/cc/"+str(ccnumber)+" "+str(resetOCS2[ccnumber]))
crtvalueOCS2[ccnumber]=resetOCS2[ccnumber]
print("End of reset for", nozoid)
print("")
#
# Websocket part
#
def on_error(ws, error):
print(error)
def on_close(ws):
print("### closed ###")
def on_open(ws):
def run(*args):
try:
while True:
time.sleep(1)
except Exception:
traceback.print_exc()
finally:
ws.close()
print("thread terminating...")
_thread.start_new_thread(run, ())
def on_message(ws, message):
print("")
print(message)
if len(message) > 200:
message = message[:200]+'..'
oscpath = message.split(" ")
if debug > 0:
print(GetTime(),"Main got from WS", client['id'], "said :", message, "splitted in an oscpath :", oscpath)
else:
print(GetTime(),"Main got WS Client", client['id'], "said :", message)
wscommand = oscpath[0].split("/")
# debug
if debug > 0:
print("wscommand :",wscommand)
# noarg
if len(oscpath) == 1:
args[0] = "noargs"
#print "noargs command"
# CC : /device/cc/2 127
elif wscommand[2] == "cc":
ccvr=int(wscommand[3]) #cc variable
ccvl=int(oscpath[1]) #cc value
if debug > 0:
print("ccvr=%d/ccvl=%d"%(ccvr,ccvl))
if wscommand[1] == "ocs2":
#cc(Confs[wscommand[1]][0]["midichan"], ccvr, ccvl, Confs[wscommand[1]][0]["mididevice"])
crtvalueOCS2[ccvr]=ccvl
else:
#cc(Confs[wscommand[1]][0]["midichan"], ccvr, ccvl, Confs[wscommand[1]][0]["mididevice"])
crtvalueMMO3[ccvr]=ccvl
cc(Confs[wscommand[1]][0]["midichan"], ccvr, ccvl, Confs[wscommand[1]][0]["mididevice"])
'''
# CC : /device/cc/2 127
elif wscommand[1] == "ocs2":
if wscommand[2] == "cc":
print("Incoming OCS-2 WS CC", wscommand[3], ":", int(oscpath[1]))
cc(Confs["ocs2"][0]["midichan"], int(wscommand[3]), int(oscpath[1]), Confs["default"][0]["mididevice"])
if wscommand[2] == "OSC1":
print("Incoming OCS-2 WS OSC1", wscommand[3], ":", int(oscpath[1]))
elif wscommand[1] == "mmo3":
if wscommand[2] == "cc":
print("Incoming MMO-3 WS CC", wscommand[3], ":", int(oscpath[1]))
cc(Confs["mmo3"][0]["midichan"], int(wscommand[3]), int(oscpath[1]), Confs["default"][0]["mididevice"])
if wscommand[2] == "OSC1":
print("Incoming MMO-3 WS OSC1", wscommand[3], ":", int(oscpath[1]))
'''
# RESET : /device/reset 1
elif wscommand[2] == "reset":
if wscommand[1] == "ocs2":
reset("ocs2")
else:
reset("mmo3")
# NOTEON : /device/noteon note velocity
elif wscommand[2] == "noteon":
midi3.NoteOn(int(oscpath[1]), int(oscpath[2]), Confs[wscommand[1]][0]["mididevice"])
# NOTEOFF /device/noteoff note
elif wscommand[2] == "noteoff":
midi3.NoteOff(int(oscpath[1]), Confs[wscommand[1]][0]["mididevice"])
# if needed a loop back : WS Client -> server -> WS Client
# sendWSall(message)
#
# Running...
#
LoadMidiRules()
LoadConfs()
serverIP = Confs[servername][0]["IP"]
#serverIP = "10.8.0.46"
wsPORT = Confs[servername][0]["port"]
print("Running....")
# Main loop do nothing. Maybe do the webui server ?
try:
print("")
print("Connecting to Jamidi server...")
print("at", serverIP, "port",wsPORT)
#websocket.enableTrace(True)
ws = websocket.WebSocketApp("ws://"+str(serverIP)+":"+str(wsPORT),
on_message = on_message,
on_error = on_error,
on_close = on_close)
midi3.ws = ws
midi3.clientmode = True
midi3.Confs = Confs
print("Midi Configuration...")
midi3.check()
ws.on_open = on_open
ws.run_forever()
except Exception:
traceback.print_exc()
except KeyboardInterrupt:
pass
# Gently stop on CTRL C
print("Fin de Jamidi.")