LJ/libs3/LPD8.py

312 lines
6.9 KiB
Python

#!/usr/bin/python3
# -*- coding: utf-8 -*-
"""
LPD8
v0.7.0
LPD8 Handler.
Start a dedicated thread to handle incoming events from LPD8 midi controller.
Depending on selected destination computer (Prog Chg + Pad number) actions will be done
locally or forwarded via OSC to given computer. Remote computer must run bhorpad or
maxwellator.
# Note
# Program Change button selected : change destination computer
# CC rotary -> midi CC.
by Sam Neurohack
from /team/laser
"""
import time
import rtmidi
from rtmidi.midiutil import open_midiinput
from threading import Thread
from rtmidi.midiconstants import (CHANNEL_PRESSURE, CONTROLLER_CHANGE, NOTE_ON, NOTE_OFF,
PITCH_BEND, POLY_PRESSURE, PROGRAM_CHANGE)
from mido import MidiFile
import mido
import sys
import os
ljpath = r'%s' % os.getcwd().replace('\\','/')
from . import midi3, gstt, launchpad
#import midimacros, maxwellmacros
import traceback
from queue import Queue
#from libs import macros
import json, subprocess
from .OSC3 import OSCServer, OSCClient, OSCMessage
import socket
myIP = "127.0.0.1"
print('LPD8 startup...')
myHostName = socket.gethostname()
print(("Name of the localhost is {}".format(myHostName)))
#myIP = socket.gethostbyname(myHostName)
print(("IP address of the localhost is {}".format(myIP)))
print(('Used IP', myIP))
OSCinPort = 8080
maxwellatorPort = 8090
LPD8queue = Queue()
mode = "maxwell"
mididest = 'Session 1'
midichannel = 1
CChannel = 0
CCvalue = 0
Here = -1
ModeCallback = ''
computerIP = ['127.0.0.1','192.168.2.95','192.168.2.52','127.0.0.1',
'127.0.0.1','127.0.0.1','127.0.0.1','127.0.0.1']
computer = 0
# /cc cc number value
def cc(ccnumber, value, dest=mididest):
midi3.MidiMsg([CONTROLLER_CHANGE+midichannel-1,ccnumber,value], dest)
def NoteOn(note,velocity, dest=mididest):
midi3.NoteOn(note,velocity, mididest)
def NoteOff(note, dest=mididest):
midi3.NoteOn(note, mididest)
def ComputerUpdate(comput):
global computer
computer = comput
# Client to export buttons actions from LPD8 or bhoreal
def SendOSC(ip,port,oscaddress,oscargs=''):
oscmsg = OSCMessage()
oscmsg.setAddress(oscaddress)
oscmsg.append(oscargs)
osclient = OSCClient()
osclient.connect((ip, port))
print(("sending OSC message : ", oscmsg, "to", ip, ":", port))
try:
osclient.sendto(oscmsg, (ip, port))
oscmsg.clearData()
return True
except:
print(('Connection to', ip, 'refused : died ?'))
return False
#
# Events from LPD8 buttons
#
# Process events coming from LPD8 in a separate thread.
def MidinProcess(LPD8queue):
global computer
while True:
LPD8queue_get = LPD8queue.get
msg = LPD8queue_get()
#print (msg)
# Note
if msg[0]==NOTE_ON:
# note mode
ModeNote(msg[1], msg[2], mididest)
'''
# ModeOS
if msg[2] > 0:
ModeOS(msg[0])
'''
# Program Change button selected : change destination computer
if msg[0]==PROGRAM_CHANGE:
print(("Program change : ", str(msg[1])))
# Change destination computer mode
print(("Destination computer",int(msg[1])))
computer = int(msg[1])
# CC rotary -> midi CC.
if msg[0] == CONTROLLER_CHANGE:
print(("CC :", msg[1], msg[2]))
if computer == 0 or computer == 1:
cc(int(msg[1]), int(msg[2]))
else:
SendOSC(computerIP[computer-1], maxwellatorPort, '/cc', [int(msg[1]), int(msg[2])])
#
# Notes = midi notes
#
def ModeNote(note, velocity, mididest):
print(('computer',computer))
# todo : decide whether its 0 or 1 !!!
if computer == 0 or computer == 1:
midi3.NoteOn(arg, velocity, mididest)
if velocity == 127:
pass
#print ('NoteON', BhorNoteXY(x,y),notename , "velocity", velocity )
#Disp(notename)
else:
midi3.NoteOff(arg)
#print ('NoteOFF', BhorNoteXY(x,y),notename , "velocity", velocity )
#
# Notes = OS Macros
#
def ModeOS(arg):
macroname = 'n'+arg
macronumber = findMacros(macroname,'OS')
if macronumber != -1:
eval(macros['OS'][macronumber]["code"])
else:
print("no Code yet")
LPD8queue = Queue()
# LPD8 Mini call back : new msg forwarded to LPD8 queue
class LPD8AddQueue(object):
def __init__(self, port):
self.port = port
#print("LPD8AddQueue", self.port)
self._wallclock = time.time()
def __call__(self, event, data=None):
message, deltatime = event
self._wallclock += deltatime
print()
print(("[%s] @%0.6f %r" % (self.port, self._wallclock, message)))
LPD8queue.put(message)
#
# Modes :
#
# Load Matrix only macros (for the moment) in LPD8.json
def LoadMacros():
global macros, nbmacro
#print()
#print("Loading LPD8 Macros...")
if os.path.exists('LPD8.json'):
#print('File is LPD8.json')
f=open("LPD8.json","r")
elif os.path.exists('../LPD8.json'):
#print('File is ../lpd8.json')
f=open("../LPD8.json","r")
elif os.path.exists('libs3/LPD8.json'):
#print('File is libs/lpd8.json')
f=open("libs3/LPD8.json","r")
elif os.path.exists(ljpath+'/../../libs3/LPD8.json'):
#print('File is '+ljpath+'/../../libs/lpd8.json')
f=open(ljpath+"/../../libs3/LPD8.json","r")
s = f.read()
macros = json.loads(s)
#print(len(macros['OS']),"Macros")
nbmacro = len(macros[gstt.lpd8Layers[gstt.lpd8Layer]])
#print("Loaded.")
# return macroname number for given type 'OS', 'Maxwell'
def findMacros(macroname,macrotype):
#print("searching", macroname,'...')
position = -1
for counter in range(len(macros[macrotype])):
#print (counter,macros[macrotype][counter]['name'],macros[macrotype][counter]['code'])
if macroname == macros[macrotype][counter]['name']:
#print(macroname, "is ", counter)
position = counter
return position
# Not assigned buttons
def DefaultMacro(arg):
print(("DefaultMacro", arg))
#
# Default macros
#
LPD8macros = {
"n1": {"command": DefaultMacro, "default": 1},
"n2": {"command": DefaultMacro, "default": 2},
"n3": {"command": DefaultMacro, "default": 3},
"n4": {"command": DefaultMacro, "default": 4},
"n5": {"command": DefaultMacro, "default": 5},
"n6": {"command": DefaultMacro, "default": 6},
"n7": {"command": DefaultMacro, "default": 7},
"n8": {"command": DefaultMacro, "default": 8}
}
def Run(macroname, macroargs=''):
doit = LPD8macros[macroname]["command"]
if macroargs=='':
macroargs = LPD8macros[macroname]["default"]
#print("Running", doit, "with args", macroargs )
doit(macroargs)
LoadMacros()