first commit

This commit is contained in:
Sam 2021-09-23 03:34:33 +02:00
commit 36766367c8
9 changed files with 4204 additions and 0 deletions

2874
OSC3.py Executable file

File diff suppressed because it is too large Load Diff

29
README.md Normal file
View File

@ -0,0 +1,29 @@
tcview v0.1
By Sam Neurohack
Display Link, OSC, Artnet and Midi timecodes
Listen on all network interface and midi interfaces
OSC port : 9001
ARTNET Port : 6454
* Link
- BPM
- beats
* OSC
- hour:minutes:seconds:ms (from OSC message received /TC1/time/30 '00:00:02:23')
- (https://github.com/hautetechnique/OSCTimeCode.git)
* Artnet (OpTimeCode)
- codetype hours:minutes:seconds:frames
* Midi (Clock)
- BPM computed from received clock messages
- MTC https://github.com/jeffmikels/timecode_tools.git

Binary file not shown.

BIN
link.cpython-38-darwin.so Executable file

Binary file not shown.

BIN
link.cpython-39-darwin.so Executable file

Binary file not shown.

43
log.py Normal file
View File

@ -0,0 +1,43 @@
#!/usr/bin/python3
# -*- coding: utf-8 -*-
# -*- mode: Python -*-
'''
Log in color from
https://stackoverflow.com/questions/287871/how-to-print-colored-text-in-terminal-in-python
usage :
import log
log.info("Hello World")
log.err("System Error")
'''
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = "\033[1m"
def disable():
HEADER = ''
OKBLUE = ''
OKGREEN = ''
WARNING = ''
FAIL = ''
ENDC = ''
def infog( msg):
print(OKGREEN + msg + ENDC)
def info( msg):
print(OKBLUE + msg + ENDC)
def warn( msg):
print(WARNING + msg + ENDC)
def err( msg):
print(FAIL + msg + ENDC)

578
midix.py Normal file
View File

@ -0,0 +1,578 @@
#!/usr/bin/python3
# -*- coding: utf-8 -*-
"""
Midi3 light version for soundt/Jamidi/clapt
v0.7.0
Midi Handler :
- Hook to the MIDI host
- Enumerate connected midi devices and spawn a process/device to handle incoming events
by Sam Neurohack
from /team/laser
Midi conversions from https://github.com/craffel/pretty-midi
"""
import time
from threading import Thread
import rtmidi
from rtmidi.midiutil import open_midiinput
from rtmidi.midiconstants import (CHANNEL_PRESSURE, CONTROLLER_CHANGE, NOTE_ON, NOTE_OFF,
PITCH_BEND, POLY_PRESSURE, PROGRAM_CHANGE, TIMING_CLOCK, SONG_CONTINUE, SONG_START, SONG_STOP)
import mido
from mido import MidiFile
import traceback
import weakref
import sys
from sys import platform
import os
import re
from collections import deque
import log
from queue import Queue
from OSC3 import OSCServer, OSCClient, OSCMessage
print("")
midiname = ["Name"] * 16
midiport = [rtmidi.MidiOut() for i in range(16) ]
OutDevice = []
InDevice = []
midisync = True
# max 16 midi port array
midinputsname = ["Name"] * 16
midinputsqueue = [Queue() for i in range(16) ]
midinputs = []
debug = 1
#Mser = False
MidInsNumber = 0
serverIP = "127.0.0.1"
OSCPORT = 9001
clock = mido.Message(type="clock")
start = mido.Message(type ="start")
stop = mido.Message(type ="stop")
ccontinue = mido.Message(type ="continue")
reset = mido.Message(type ="reset")
songpos = mido.Message(type ="songpos")
#mode = "maxwell"
'''
print "clock",clock)
print "start",start)
print "continue", ccontinue)
print "reset",reset)
print "sonpos",songpos)
'''
try:
input = raw_input
except NameError:
# Python 3
Exception = Exception
STATUS_MAP = {
'noteon': NOTE_ON,
'noteoff': NOTE_OFF,
'programchange': PROGRAM_CHANGE,
'controllerchange': CONTROLLER_CHANGE,
'pitchbend': PITCH_BEND,
'polypressure': POLY_PRESSURE,
'channelpressure': CHANNEL_PRESSURE
}
def SendTCview(oscaddress,oscargs=''):
oscmsg = OSCMessage()
oscmsg.setAddress(oscaddress)
oscmsg.append(oscargs)
osclientc = OSCClient()
osclientc.connect((serverIP, OSCPORT))
#print("MIDI Aurora sending UI :", oscmsg, "to",TouchOSCIP,":",TouchOSCPort)
try:
osclientc.sendto(oscmsg, (serverIP, OSCPORT))
oscmsg.clearData()
except:
log.err('Connection to TCview refused : died ?')
pass
#time.sleep(0.001
def GetTime():
return time.strftime("%a, %d %b %Y %H:%M:%S", time.localtime())
# /cc cc number value
def cc(midichannel, ccnumber, value, mididest):
if debug>0:
print("Midix sending Midi channel", midichannel, "cc", ccnumber, "value", value, "to", mididest)
MidiMsg([CONTROLLER_CHANGE+midichannel-1, ccnumber, value], mididest)
#
# MIDI Startup and handling
#
mqueue = Queue()
inqueue = Queue()
bpm = 0
running = True
samples = deque()
last_clock = None
#
# Events from Generic MIDI Handling
#
def MidinProcess(inqueue, portname):
inqueue_get = inqueue.get
bpm = 0
samples = deque()
last_clock = None
while True:
time.sleep(0.001)
msg = inqueue_get()
#print("")
#print("Generic from", portname,"msg : ", msg)
# NOTE ON message on all midi channels
if NOTE_ON -1 < msg[0] < 160 and msg[2] !=0 :
MidiChannel = msg[0]-144
MidiNote = msg[1]
MidiVel = msg[2]
print()
print("NOTE ON :", MidiNote, 'velocity :', MidiVel, "Channel", MidiChannel)
#print("Midi in process send /aurora/noteon "+str(msg[1])+" "+str(msg[2]))
SendTCview("/noteon",[MidiChannel, msg[1], msg[2]])
'''
# Sampler mode : note <63 launch snare.wav / note > 62 kick.wav
if MidiNote < 63 and MidiVel >0:
if platform == 'darwin':
os.system("afplay snare.wav")
else:
os.system("aplay snare.wav")
if MidiNote > 62 and MidiVel >0:
if platform == 'darwin':
os.system("afplay kick.wav")
else:
os.system("aplay kick.wav")
'''
# NOTE OFF or Note with 0 velocity on all midi channels
if NOTE_OFF -1 < msg[0] < 145 or (NOTE_OFF -1 < msg[0] < 160 and msg[2] == 0):
print("NOTE_off :",NOTE_OFF)
if msg[0] > 143:
MidiChannel = msg[0]-144
else:
MidiChannel = msg[0]-128
#print("NOTE OFF :", MidiNote, "Channel", MidiChannel)
#print("Midi in process send /aurora/noteoff "+str(msg[1]))
SendTCview("/noteoff",[MidiChannel, msg[1]])
# # CC on all Midi Channels
if CONTROLLER_CHANGE -1 < msg[0] < 192:
MidiChannel = msg[0]-175
print()
#print("channel", MidiChannel, "CC :", msg[1], msg[2])
print("Midi in process send /aurora/rawcc "+str(msg[0]-175-1)+" "+str(msg[1])+" "+str(msg[2]))
SendTCview("/rawcc",[msg[0]-175-1, msg[1], msg[2]])
if msg[0] == TIMING_CLOCK:
now = time.time()
if last_clock is not None:
samples.append(now - last_clock)
last_clock = now
if len(samples) > 24:
samples.popleft()
if len(samples) >= 2:
#bpm = 2.5 / (sum(samples) / len(samples))
#print("%.2f bpm" % bpm)
bpm = round(2.5 / (sum(samples) / len(samples))) # Against BPM lot very tiny change :
sync = True
# print("MIDI BPM", bpm)
#print("Midi clock : BPM", bpm)
SendTCview("/MIDIBPM",[bpm])
if msg[0] in (SONG_CONTINUE, SONG_START):
running = True
#print("START/CONTINUE received.")
#print("Midi in process send /aurora/start")
SendTCview("/start",[])
if msg[0] == SONG_STOP:
running = False
#print("STOP received.")
#print("Midi in process send /aurora/stop")
SendTCview("/stop",[])
'''
# other midi message
if msg[0] != NOTE_OFF and msg[0] != NOTE_ON and msg[0] != CONTROLLER_CHANGE:
pass
print("from", portname,"other midi message")
MidiMsg(msg[0],msg[1],msg[2],mididest)
'''
#def NoteOn(note, color, mididest):
#https://pypi.org/project/python-rtmidi/0.3a/
# NOTE_ON=#90 et NOTE_OFF=#80 on ajoute le channel (0 le premier) pour envoyer effectivement sur le channel
def NoteOn(note, color, mididest, midichannel=0):
global MidInsNumber
if debug >0:
print("Sending", note, color, "to", mididest, "on channel", midichannel)
for port in range(MidInsNumber):
# To mididest
if midiname[port].find(mididest) == 0:
midiport[port].send_message([NOTE_ON+midichannel, note, color])
# To All
elif mididest == "all" and midiname[port].find(mididest) != 0:
midiport[port].send_message([NOTE_ON+midichannel, note, color])
def NoteOff(note, mididest):
global MidInsNumber
for port in range(MidInsNumber):
# To mididest
if midiname[port].find(mididest) != -1:
midiport[port].send_message([NOTE_OFF, note, 0])
# To All
elif mididest == "all" and midiname[port].find(mididest) == -1:
midiport[port].send_message([NOTE_OFF, note, 0])
# Generic call back : new msg forwarded to queue
class AddQueue(object):
def __init__(self, portname, port):
self.portname = portname
self.port = port
#print "AddQueue", port)
self._wallclock = time.time()
def __call__(self, event, data=None):
message, deltatime = event
self._wallclock += deltatime
#print "inqueue : [%s] @%0.6f %r" % ( self.portname, self._wallclock, message))
message.append(deltatime)
midinputsqueue[self.port].put(message)
#
# MIDI OUT Handling
#
class OutObject():
_instances = set()
counter = 0
def __init__(self, name, kind, port):
self.name = name
self.kind = kind
self.port = port
self._instances.add(weakref.ref(self))
OutObject.counter += 1
print("Adding OutDevice name", self.name, "kind", self.kind, "port", self.port)
@classmethod
def getinstances(cls):
dead = set()
for ref in cls._instances:
obj = ref()
if obj is not None:
yield obj
else:
dead.add(ref)
cls._instances -= dead
def __del__(self):
OutObject.counter -= 1
def OutConfig():
global midiout, MidInsNumber
#
if len(OutDevice) == 0:
print("")
log.info("MIDIout...")
print("List and attach to available devices on host with IN port :")
# Display list of available midi IN devices on the host, create and start an OUT instance to talk to each of these Midi IN devices
midiout = rtmidi.MidiOut()
available_ports = midiout.get_ports()
for port, name in enumerate(available_ports):
midiname[port]=name
midiport[port].open_port(port)
#print )
#print "New OutDevice [%i] %s" % (port, name))
OutDevice.append(OutObject(name, "generic", port))
#print "")
print(len(OutDevice), "Out devices")
#ListOutDevice()
MidInsNumber = len(OutDevice)+1
def ListOutDevice():
for item in OutObject.getinstances():
print(item.name)
def FindOutDevice(name):
port = -1
for item in OutObject.getinstances():
#print "searching", name, "in", item.name)
if name == item.name:
#print 'found port',item.port)
port = item.port
return port
def DelOutDevice(name):
Outnumber = Findest(name)
print('deleting OutDevice', name)
if Outnumber != -1:
print('found OutDevice', Outnumber)
delattr(OutObject, str(name))
print("OutDevice", Outnumber,"was removed")
else:
print("OutDevice was not found")
#
# MIDI IN Handling
# Create processing thread and queue for each device
#
class InObject():
_instances = set()
counter = 0
def __init__(self, name, kind, port, rtmidi):
self.name = name
self.kind = kind
self.port = port
self.rtmidi = rtmidi
self.queue = Queue()
self._instances.add(weakref.ref(self))
InObject.counter += 1
print("Adding InDevice name", self.name, "kind", self.kind, "port", self.port)
@classmethod
def getinstances(cls):
dead = set()
for ref in cls._instances:
obj = ref()
if obj is not None:
yield obj
else:
dead.add(ref)
cls._instances -= dead
def __del__(self):
InObject.counter -= 1
def InConfig():
print("")
log.info("MIDIin...")
print("List and attach to available devices on host with OUT port :")
if platform == 'darwin':
mido.set_backend('mido.backends.rtmidi/MACOSX_CORE')
genericnumber = 0
for port, name in enumerate(mido.get_input_names()):
outport = FindOutDevice(name)
midinputsname[port]=name
#print "name",name, "Port",port, "Outport", outport)
# print "midinames", midiname)
#ListInDevice()
try:
#print name, name.find("RtMidi output"))
if name.find("RtMidi output") > -1:
print("No thread started for device", name)
else:
portin = object
port_name = ""
portin, port_name = open_midiinput(outport)
if midisync == True:
portin.ignore_types(timing=False)
#midinputs.append(portin)
InDevice.append(InObject(name, "generic", outport, portin))
thread = Thread(target=MidinProcess, args=(midinputsqueue[port],port_name))
thread.setDaemon(True)
thread.start()
#print "Thread launched for midi port", port, "portname", port_name, "Inname", midiname.index(port_name)
#print "counter", InObject.counter
#midinputs[port].set_callback(AddQueue(name),midinputsqueue[port])
#midinputs[port].set_callback(AddQueue(name))
#genericnumber += 1
InDevice[InObject.counter-1].rtmidi.set_callback(AddQueue(name,port))
except Exception:
traceback.print_exc()
#print "")
print(InObject.counter, "In devices")
#ListInDevice()
def ListInDevice():
#print "known IN devices :"
for item in InObject.getinstances():
print(item.name)
print("")
def FindInDevice(name):
port = -1
for item in InObject.getinstances():
#print "searching", name, "in", item.name)
if name in item.name:
#print 'found port',item.port)
port = item.port
return port
def DelInDevice(name):
Innumber = Findest(name)
print('deleting InDevice', name)
if Innumber != -1:
print('found InDevice', Innumber)
delattr(InObject, str(name))
print("InDevice", Innumber,"was removed")
else:
print("InDevice was not found")
def End():
global midiout
#midiin.close_port()
midiout.close_port()
# mididest : all or specifiname, won't be sent to launchpad or Bhoreal.
def MidiMsg(midimsg, mididest):
desterror = -1
print("tcview got midimsg", midimsg, "for", mididest)
for port in range(len(OutDevice)):
# To mididest
if midiname[port].find(mididest) != -1:
if debug>0:
print("tcview sending to name", midiname[port], "port", port, ":", midimsg)
midiport[port].send_message(midimsg)
desterror = 0
if desterror == -1:
print("mididest",mididest, ": ** This midi destination doesn't exists **")
def listdevice(number):
return midiname[number]
def check():
OutConfig()
InConfig()

486
tcview.py Normal file
View File

@ -0,0 +1,486 @@
#!/usr/bin/python3
# -*- coding: utf-8 -*-
"""
Timecode View
Display Link, OSC, Artnet and Midi timecodes
v0.1.0
Ableton Link
git clone --recursive https://github.com/gonzaloflirt/link-python.git
Build:
Make sure python 3.8 is installed on your system.
mkdir build
cd build
cmake ..
cmake --build .
by Sam Neurohack
from /team/laser
"""
from tkinter import *
import time
import log
import midix
import traceback
import sys, _thread
import socket
#from queue import Queue
#import json, subprocess
from OSC3 import OSCServer, OSCClient, OSCMessage
print()
log.infog('Timecode viewer')
#myHostName = socket.gethostname()
#print("Name of the localhost is {}".format(myHostName))
#myIP = socket.gethostbyname(myHostName)
serverIP = "0.0.0.0"
OSCPORT = 9001
ARTNETPort = 6454
prevphase = 0
bpm = 120
ccs =[0]*10
#
# OSC
#
oscserver = OSCServer( (serverIP, OSCPORT) )
oscserver.timeout = 0
def GetTime():
return time.strftime("%a, %d %b %Y %H:%M:%S", time.localtime())
# this method of reporting timeouts only works by convention
# that before calling handle_request() field .timed_out is
# set to False
def handle_timeout(self):
self.timed_out = True
def OSC_Start():
global oscserver
print("OSC start...")
# funny python's way to add a method to an instance of a class
import types
oscserver.handle_timeout = types.MethodType(handle_timeout, oscserver)
oscserver.addMsgHandler( "default", handler )
#oscserver.addMsgHandler( "/TC1", Note)
#oscserver.addMsgHandler( "/TC", CC)
#oscserver.addMsgHandler( "/p", PB)
_thread.start_new_thread(osc_thread, ())
# RAW OSC Frame available ?
def OSCframe():
# clear timed_out flag
oscserver.timed_out = False
# handle all pending requests then return
while not oscserver.timed_out:
oscserver.handle_request()
# OSC server Thread : handler, dacs reports and simulator points sender to UI.
def osc_thread():
#print("osc Thread launched")
try:
while True:
time.sleep(0.005)
OSCframe()
except Exception as e:
import sys, traceback
print('\n---------------------')
print(('Exception: %s' % e))
print('- - - - - - - - - - -')
traceback.print_tb(sys.exc_info()[2])
print("\n")
# Properly close the system. Todo
def OSC_Stop():
oscserver.close()
# default handler
def handler(path, tags, args, source):
oscaddress = ''.join(path.split("/"))
print()
print(("TimeCode viewerOSC Handler got from " + str(source[0]),"OSC msg", path, "args", args))
#print("OSC address", path)
#print("find.. /bhoreal ?", path.find('/bhoreal'))
if len(args) > 0:
#print("with args", args)
pass
oscpath = path.split("/")
#print("OSC address", path)
if path.find('/TC') > -1:
#print("Timecode", args)
OSCtimecode(path, args, tags, source[0])
if path.find('/MIDIBPM') > -1:
#print("midibpm", args)
interface.update_midibpm("BPM " +str(args[0]))
# Dcode OSC Timecode /TC1/time/30 "00:01:07:12" or /TC2/time/30 "00:01:07:12"
def OSCtimecode(timepath, timestr, tags, source):
timelayer = int(timepath[3:4])
times = timestr[0].split(":")
hour = times[0]
minutes = times[1]
seconds = times[2]
msecs = times[3]
#print('timecode layer', timelayer, "hour", hour, "min", minutes, "sec", seconds, "ms", msecs)
interface.update_oscode(timestr)
#
# Link
#
def LINK_Start():
global lnk
import link
print("Link start...")
lnk = link.Link(120)
lnk.enabled = True
lnk.startStopSyncEnabled = True
linked = True
def BeatEvent():
global lnk, prevphase
lnkstr = lnk.captureSessionState()
link_time = lnk.clock().micros();
tempo_str = '{:.4f}'.format(lnkstr.tempo())
bpm = float(tempo_str)
beats_str = '{0:.2f}'.format(lnkstr.beatAtTime(link_time, 0))
playing_str = str(lnkstr.isPlaying()) # always False ???
phase = lnkstr.phaseAtTime(link_time, 4)
phase_str = ''
for x in range(0, 4):
if x < phase:
phase_str += 'X'
else:
phase_str += '0'
# new beat ?
if int(phase) != prevphase:
prevphase = int(phase)
interface.update_link("Beat "+str(beats_str)+" "+str(phase_str))
interface.update_linkbpm("BPM "+str(bpm))
#sys.stdout.write("Beat "+str(beats_str) + ' \r')
#sys.stdout.flush()
currentbeat = float(beats_str)
# Change current Link Tempo.
def newtempo(tempo):
global lnk
if linked == True:
lnk.enabled = False
lnk.startStopSyncEnabled = False
lnk = link.Link(tempo)
lnk.enabled = True
lnk.startStopSyncEnabled = True
bpm = tempo
else:
print("Link is disabled")
#
def BPMAdj(val1, keyname):
print((currentbpm))
# + 1
if val1 == 1:
newtempo(currentbpm+1)
# -1
if val1 == 127 and currentbpm > 0:
newtempo(currentbpm-1)
#
# Artnet
#
'''
Opcode
TimeCode 0x9700 This is an ArtTimeCode packet. It is used to transport time code over the network.
OpTimeSync 0x9800 Used to synchronise real time date and clockOpTrigger0x9900Used to send trigger macros
Frames
The Frames field is an 8-bit number that defines the number of frames encoded in the time. The value ranges from 010 to 2910, although not all values are legal for all types of time code. See table below.
Seconds 0-59
The Seconds field is an 8-bit number that defines the number of seconds encoded in the time. The value ranges from 010 to 5910.
Minutes 0-59
The Minutes field is an 8-bit number that defines the number of minutes encoded in the time. The value ranges from 010 to 5910.
Hours 0-23
The Hours field is an 8-bit number that defines the number of hours encoded in the time. The value ranges from 010 to 2310.
Type
The Type field is an 8-bit number that defines the time base of the encoded time. The following table describes the options:
Film type 0 framerate 24 frames 0-23
EBU type 1 framerate 25 frames 0-24
DF type 2 framerate 29,97 frames 0-29
SMPTE type 3 framerate 30 frames 0-29
'''
artnetsock=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
artnetsock.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
artnetsock.bind(('',6454))
codetypes = ["Film","EBU","DF","SMPTE"]
def ARTNET_Start():
print("Artnet Start...")
_thread.start_new_thread(artnet_thread, ())
def artnetHandler(data):
#if len(data) < 20:
# continue
'''
if data[0:7] != b"Art-Net" or data[7] != b"\0":
print("artnet package")
'''
protverhi = data[10]
protverlo = data[11]
sequence = data[12]
physical = data[13]
print(data[0:7], "version :",data[10],data[11], "sequence :", sequence, "physical :", physical, "type :", data[9],data[8])
#motherosc.send('/artnet/package', "version : "+str(data[10])+"."+str(data[11]) +" sequence : "+str(sequence)+ " physical : "+str(physical)+" type : " +str(data[9])+" "+str(data[8]))
# OpDmx
if data[8] == 0x00 and data[9] == 0x50:
subuni = data[14]
net = data[15]
lengthhi = data[16]
length = data[17]
dmx = data[18:]
#for i in range(0,len(dmx)):
#for i in range(0,5):
# print("canal",i,"value",dmx[i])
#print(dmx)
print("OpDmx : subuni", subuni, "net", net ,'length', len(dmx))
#motherosc.send('/artnet/OpDmx :', "subuni " +str(subuni)+ " net "+str(net) +' length '+str(len(dmx)))
#toleds()
# OpTimeCode
if data[8] == 0x00 and data[9] == 0x97:
frames = data[14]
seconds = data[15]
minutes = data[16]
hours = data[17]
codetype = data[18]
print("OptimeCode : hours", hours, "minutes", minutes, "seconds", seconds, "frames", frames,"type", codetypes[codetype])
#motherosc.send('/artnet/timecode', str(hours)+ ":"+str(minutes)+ ":"+ str(seconds)+ ":"+str(frames)+" timecode type "+str(codetype))
interface.update_artnetcode(codetypes[codetype]+" "+str(hours)+ ":"+str(minutes)+ ":"+ str(seconds)+ ":"+str(frames))
if data[8] == 0x00 and data[9] == 0x98:
print("OpTimeSync")
#motherosc.send('/artnet/OpTimeSync', 1)
def artnet_thread():
try:
while True:
data = artnetsock.recv(10240)
artnetHandler(data)
except Exception:
traceback.print_exc()
finally:
artnetsock.close()
#
# UI
#
class Interface(Frame):
"""Notre fenêtre principale.
Tous les widgets sont stockés comme attributs de cette fenêtre."""
def __init__(self, myframe, **kwargs):
Frame.__init__(self, myframe, width=300, height=576, bg="black", **kwargs)
self.pack(fill=NONE)
self.nb_clic = 0
# Création de nos widgets
self.message = Label(self, text=" Timecodes viewer ", bg="black", foreground="white")
self.message.place(x = 0, y = 25)
self.message.pack()
#self.message.config(bg="black", foreground="white")
self.link = Label(self, text="-- Link --", bg="black", foreground="white")
#self.lbl0.place(x = 0, y = 55)
self.link.pack()
self.linkbpm = Label(self, text="BPM", bg="black", foreground="white")
#self.lbl0.place(x = 0, y = 55)
self.linkbpm.pack()
self.lbl1 = Label(self, text="Waiting...", bg="black", foreground="white")
#self.lbl0.place(x = 0, y = 55)
self.lbl1.pack()
self.lbl2 = Label(self, text="-- OSC --", bg="black", foreground="white")
#self.lbl0.place(x = 0, y = 55)
self.lbl2.pack()
self.oscode = Label(self, text="Waiting...", bg="black", foreground="white")
#self.lbl0.place(x = 0, y = 55)
self.oscode.pack()
self.artnet = Label(self, text="-- Artnet --", bg="black", foreground="white")
#self.lbl0.place(x = 0, y = 55)
self.artnet.pack()
self.artnetcode = Label(self, text="Waiting...", bg="black", foreground="white")
#self.lbl0.place(x = 0, y = 55)
self.artnetcode.pack()
self.midi = Label(self, text="-- Midi --", bg="black", foreground="white")
#self.lbl0.place(x = 0, y = 55)
self.midi.pack()
self.midibpm = Label(self, text="Waiting...", bg="black", foreground="white")
#self.lbl0.place(x = 0, y = 55)
self.midibpm.pack()
self.quit_button = Button(self, text="Quit", command=self.quit)
#self.quit_button.configure(background = "green")
self.quit_button.pack(side="bottom")
self.update_UI()
'''
self.bouton_cliquer = Button(self, text="Cliquez ici", fg="red",
command=self.cliquer)
self.bouton_cliquer.pack(side="right")
'''
def update_link(self, msg):
self.lbl1.configure(text = msg)
def update_linkbpm(self, msg):
self.linkbpm.configure(text = msg)
def update_oscode(self, msg):
self.oscode.configure(text = msg)
def update_midibpm(self, msg):
self.midibpm.configure(text = msg)
def update_artnetcode(self, msg):
self.artnetcode.configure(text = msg)
def update_UI(self):
BeatEvent()
self.after(6, self.update_UI)
'''
def cliquer(self):
"""Il y a eu un clic sur le bouton.
On change la valeur du label message."""
self.nb_clic += 1
self.message["text"] = "Vous avez cliqué {} fois.".format(self.nb_clic)
'''
if __name__ == '__main__':
#print(dmxdata)
myframe = Tk()
OSC_Start()
LINK_Start()
interface = Interface(myframe)
artnet_subnet = 0
artnet_net = 0
artnet_universe = 0
ARTNET_Start()
print('UI start...')
print("Midi Start...")
midix.check()
log.infog('Running.')
'''
sh = ArtNet(artnet_net, artnet_subnet, artnet_universe, serverIP, ARTNETPort)
'''
interface.mainloop()
#sh.stop()
interface.destroy()

194
tools.py Executable file
View File

@ -0,0 +1,194 @@
#!/usr/bin/env python3
from timecode import Timecode
def bitstring_to_bytes(s, bytecount=1, byteorder='big'):
return int(s, 2).to_bytes(bytecount, byteorder)
# binary big-endian
def bbe(n, bits=8):
# terminal condition
retval = ''
if n == 0:
retval = '0'
else:
retval = bbe(n//2, None) + str(n%2)
if bits is None:
return retval
else:
return (('0'*bits) + retval)[-bits:]
# binary, little-endian
def ble(n, bits=8):
# terminal condition
retval = ''
if n == 0:
retval = '0'
else:
retval = str(n%2) + ble(n//2, None)
if bits is None:
return retval
else:
return (retval + ('0'*bits))[0:bits]
def cint(n, bytecount=2):
return int(n).to_bytes(bytecount, byteorder='little')
def units_tens(n):
return n % 10, int(n/10)
##
## LTC functions
##
# GENERATE BINARY-CODED DATA FOR LTC
# ACCORDING TO https://en.wikipedia.org/wiki/Linear_timecode
# everything is encoded little endian
# so to encode the number 3 with four bits, we have 1100
def ltc_encode(timecode, as_string=False):
LTC = ''
HLP = ''
hrs, mins, secs, frs = timecode.frames_to_tc(timecode.frames)
frame_units, frame_tens = units_tens(frs)
secs_units, secs_tens = units_tens(secs)
mins_units, mins_tens = units_tens(mins)
hrs_units, hrs_tens = units_tens(hrs)
#frames units / user bits field 1 / frames tens
LTC += ble(frame_units,4) + '0000' + ble(frame_tens,2)
HLP += '---{u}____-{t}'.format(u=frame_units, t=frame_tens)
#drop frame / color frame / user bits field 2
LTC += '00'+'0000'
HLP += '__'+'____'
#secs units / user bits field 3 / secs tens
LTC += ble(secs_units,4) + '0000' + ble(secs_tens,3)
HLP += '---{u}____--{t}'.format(u=secs_units, t=secs_tens)
# bit 27 flag / user bits field 4
LTC += '0' + '0000'
HLP += '_' + '____'
#mins units / user bits field 5 / mins tens
LTC += ble(mins_units,4) + '0000' + ble(mins_tens,3)
HLP += '---{u}____--{t}'.format(u=mins_units, t=mins_tens)
# bit 43 flag / user bits field 6
LTC += '0' + '0000'
HLP += '_' + '____'
#hrs units / user bits field 7 / hrs tens
LTC += ble(hrs_units,4) + '0000' + ble(hrs_tens,2)
HLP += '---{u}____--{t}'.format(u=hrs_units, t=hrs_tens)
# bit 58 clock flag / bit 59 flag / user bits field 8
LTC += '0' + '0' + '0000'
HLP += '_' + '_' + '____'
# sync word
LTC += '0011111111111101'
HLP += '################'
if as_string:
return LTC
else:
return bitstring_to_bytes(LTC, bytecount=10)
##
## MTC functions
##
def mtc_encode(timecode, as_string=False):
# MIDI bytes are little-endian
# Byte 0
# 0rrhhhhh: Rate (03) and hour (023).
# rr = 000: 24 frames/s
# rr = 001: 25 frames/s
# rr = 010: 29.97 frames/s (SMPTE drop-frame timecode)
# rr = 011: 30 frames/s
# Byte 1
# 00mmmmmm: Minute (059)
# Byte 2
# 00ssssss: Second (059)
# Byte 3
# 000fffff: Frame (029, or less at lower frame rates)
hrs, mins, secs, frs = timecode.frames_to_tc(timecode.frames)
framerate = timecode.framerate
rateflags = {
'24': 0,
'25': 1,
'29.97': 2,
'30': 3
}
rateflag = rateflags[framerate] * 32 # multiply by 32, because the rate flag starts at bit 6
# print('{:8} {:8} {:8} {:8}'.format(hrs, mins, secs, frs))
if as_string:
b0 = bbe(rateflag + hrs, 8)
b1 = bbe(mins)
b2 = bbe(secs)
b3 = bbe(frs)
# print('{:8} {:8} {:8} {:8}'.format(b0, b1, b2, b3))
return b0+b1+b2+b3
else:
b = bytearray([rateflag + hrs, mins, secs, frs])
# debug_string = ' 0x{:02} 0x{:02} 0x{:02} 0x{:02}'
# debug_array = [ord(b[0]), ord(b[1]), ord(b[2]), ord(b[3])]
# print(debug_string.format(debug_array))
return b
# convert a bytearray back to timecode
def mtc_decode(mtc_bytes):
rhh, mins, secs, frs = mtc_bytes
rateflag = rhh >> 5
hrs = rhh & 31
fps = ['24','25','29.97','30'][rateflag]
total_frames = int(frs + float(fps) * (secs + mins * 60 + hrs * 60 * 60))
return Timecode(fps, frames=total_frames)
def mtc_full_frame(timecode):
# if sending this to a MIDI device, remember that MIDI is generally little endian
# but the full frame timecode bytes are big endian
mtc_bytes = mtc_encode(timecode)
# mtc full frame has a special header and ignores the rate flag
return bytearray([0xf0, 0x7f, 0x7f, 0x01, 0x01]) + mtc_bytes + bytearray([0xf7])
def mtc_decode_full_frame(full_frame_bytes):
mtc_bytes = full_frame_bytes[5:-1]
return mtc_decode(mtc_bytes)
def mtc_quarter_frame(timecode, piece=0):
# there are 8 different mtc_quarter frame pieces
# see https://en.wikipedia.org/wiki/MIDI_timecode
# and https://web.archive.org/web/20120212181214/http://home.roadrunner.com/~jgglatt/tech/mtc.htm
# these are little-endian bytes
# piece 0 : 0xF1 0000 ffff frame
mtc_bytes = mtc_encode(timecode)
this_byte = mtc_bytes[3 - piece//2] #the order of pieces is the reverse of the mtc_encode
if piece % 2 == 0:
# even pieces get the low nibble
nibble = this_byte & 15
else:
# odd pieces get the high nibble
nibble = this_byte >> 4
return bytearray([0xf1, piece * 16 + nibble])
def mtc_decode_quarter_frames(frame_pieces):
mtc_bytes = bytearray(4)
if len(frame_pieces) < 8:
return None
for piece in range(8):
mtc_index = 3 - piece//2 # quarter frame pieces are in reverse order of mtc_encode
this_frame = frame_pieces[piece]
if this_frame is bytearray or this_frame is list:
this_frame = this_frame[1]
data = this_frame & 15 # ignore the frame_piece marker bits
if piece % 2 == 0:
# 'even' pieces came from the low nibble
# and the first piece is 0, so it's even
mtc_bytes[mtc_index] += data
else:
# 'odd' pieces came from the high nibble
mtc_bytes[mtc_index] += data * 16
return mtc_decode(mtc_bytes)