#!/usr/bin/python3 # -*- coding: utf-8 -*- """ Timecode View Display Link, OSC, Artnet and Midi timecodes v0.1.0 Ableton Link git clone --recursive https://github.com/gonzaloflirt/link-python.git Build: Make sure python 3.8 is installed on your system. mkdir build cd build cmake .. cmake --build . by Sam Neurohack from /team/laser """ from tkinter import * import time import log import midix import traceback import sys, _thread import socket #from queue import Queue #import json, subprocess from OSC3 import OSCServer, OSCClient, OSCMessage print() log.infog('Timecode viewer') #myHostName = socket.gethostname() #print("Name of the localhost is {}".format(myHostName)) #myIP = socket.gethostbyname(myHostName) serverIP = "0.0.0.0" OSCPORT = 9001 ARTNETPort = 6454 prevphase = 0 bpm = 120 ccs =[0]*10 tkred = "#FFF0F0" tkgreen = "#F0FFF0" tkblue = "#F0F0FF" tktitle =('Arial', 11) tkfont =('Verdana', 18) # # OSC # oscserver = OSCServer( (serverIP, OSCPORT) ) oscserver.timeout = 0 def GetTime(): return time.strftime("%a, %d %b %Y %H:%M:%S", time.localtime()) # this method of reporting timeouts only works by convention # that before calling handle_request() field .timed_out is # set to False def handle_timeout(self): self.timed_out = True def OSC_Start(): global oscserver print("OSC start...") # funny python's way to add a method to an instance of a class import types oscserver.handle_timeout = types.MethodType(handle_timeout, oscserver) oscserver.addMsgHandler( "default", handler ) #oscserver.addMsgHandler( "/TC1", Note) #oscserver.addMsgHandler( "/TC", CC) #oscserver.addMsgHandler( "/p", PB) _thread.start_new_thread(osc_thread, ()) # RAW OSC Frame available ? def OSCframe(): # clear timed_out flag oscserver.timed_out = False # handle all pending requests then return while not oscserver.timed_out: oscserver.handle_request() # OSC server Thread : handler, dacs reports and simulator points sender to UI. def osc_thread(): #print("osc Thread launched") try: while True: time.sleep(0.005) OSCframe() except Exception as e: import sys, traceback print('\n---------------------') print(('Exception: %s' % e)) print('- - - - - - - - - - -') traceback.print_tb(sys.exc_info()[2]) print("\n") # Properly close the system. Todo def OSC_Stop(): oscserver.close() # OSC default handler def handler(path, tags, args, source): oscaddress = ''.join(path.split("/")) print() print(("TimeCode viewerOSC Handler got from " + str(source[0]),"OSC msg", path, "args", args)) #print("OSC address", path) #print("find.. /bhoreal ?", path.find('/bhoreal')) if len(args) > 0: #print("with args", args) pass oscpath = path.split("/") #print("OSC address", path) if path.find('/TC') > -1: #print("Timecode", args) OSCtimecode(path, args, tags, source[0]) if path.find('/MIDIBPM') > -1: #print("midibpm", args) interface.update_midibpm(str(args[0])+" bpm") if path.find('/MIDIQF') > -1: #print("midimtc", args) interface.update_midimtc(str(args[0])) # Dcode OSC Timecode /TC1/time/30 "00:01:07:12" or /TC2/time/30 "00:01:07:12" def OSCtimecode(timepath, timestr, tags, source): timelayer = int(timepath[3:4]) times = timestr[0].split(":") hour = times[0] minutes = times[1] seconds = times[2] msecs = times[3] #print('timecode layer', timelayer, "hour", hour, "min", minutes, "sec", seconds, "ms", msecs) interface.update_oscode(timestr) # # Link # def LINK_Start(): global lnk import link print("Link start...") lnk = link.Link(120) lnk.enabled = True lnk.startStopSyncEnabled = True linked = True def BeatEvent(): global lnk, prevphase lnkstr = lnk.captureSessionState() link_time = lnk.clock().micros(); tempo_str = '{:.4f}'.format(lnkstr.tempo()) bpm = float(tempo_str) beats_str = '{0:.2f}'.format(lnkstr.beatAtTime(link_time, 0)) playing_str = str(lnkstr.isPlaying()) # always False ??? phase = lnkstr.phaseAtTime(link_time, 4) phase_str = '' for x in range(0, 4): if x < phase: phase_str += 'X' else: phase_str += '0' # new beat ? if int(phase) != prevphase: prevphase = int(phase) interface.update_link(str(beats_str)+" "+str(phase_str)) interface.update_linkbpm(str(bpm)+" bpm") #sys.stdout.write("Beat "+str(beats_str) + ' \r') #sys.stdout.flush() currentbeat = float(beats_str) # Change current Link Tempo. def newtempo(tempo): global lnk if linked == True: lnk.enabled = False lnk.startStopSyncEnabled = False lnk = link.Link(tempo) lnk.enabled = True lnk.startStopSyncEnabled = True bpm = tempo else: print("Link is disabled") # def BPMAdj(val1, keyname): print((currentbpm)) # + 1 if val1 == 1: newtempo(currentbpm+1) # -1 if val1 == 127 and currentbpm > 0: newtempo(currentbpm-1) # # Artnet # ''' Opcode TimeCode 0x9700 This is an ArtTimeCode packet. It is used to transport time code over the network. OpTimeSync 0x9800 Used to synchronise real time date and clockOpTrigger0x9900Used to send trigger macros Frames The Frames field is an 8-bit number that defines the number of frames encoded in the time. The value ranges from 010 to 2910, although not all values are legal for all types of time code. See table below. Seconds 0-59 The Seconds field is an 8-bit number that defines the number of seconds encoded in the time. The value ranges from 010 to 5910. Minutes 0-59 The Minutes field is an 8-bit number that defines the number of minutes encoded in the time. The value ranges from 010 to 5910. Hours 0-23 The Hours field is an 8-bit number that defines the number of hours encoded in the time. The value ranges from 010 to 2310. Type The Type field is an 8-bit number that defines the time base of the encoded time. The following table describes the options: Film type 0 framerate 24 frames 0-23 EBU type 1 framerate 25 frames 0-24 DF type 2 framerate 29,97 frames 0-29 SMPTE type 3 framerate 30 frames 0-29 ''' artnetsock=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) artnetsock.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) artnetsock.bind(('',6454)) codetypes = ["Film","EBU","DF","SMPTE"] def ARTNET_Start(): print("Artnet Start...") _thread.start_new_thread(artnet_thread, ()) def artnetHandler(data): #if len(data) < 20: # continue ''' if data[0:7] != b"Art-Net" or data[7] != b"\0": print("artnet package") ''' protverhi = data[10] protverlo = data[11] sequence = data[12] physical = data[13] print(data[0:7], "version :",data[10],data[11], "sequence :", sequence, "physical :", physical, "type :", data[9],data[8]) # OpDmx if data[8] == 0x00 and data[9] == 0x50: subuni = data[14] net = data[15] lengthhi = data[16] length = data[17] dmx = data[18:] print("OpDmx : subuni", subuni, "net", net ,'length', len(dmx)) # OpTimeCode if data[8] == 0x00 and data[9] == 0x97: frames = data[14] seconds = data[15] minutes = data[16] hours = data[17] codetype = data[18] print("OptimeCode : hours", hours, "minutes", minutes, "seconds", seconds, "frames", frames,"type", codetypes[codetype]) interface.update_artnetcode(str(hours)+ ":"+str(minutes)+ ":"+ str(seconds)+ ":"+str(frames)+" "+codetypes[codetype]) if data[8] == 0x00 and data[9] == 0x98: print("OpTimeSync") def artnet_thread(): try: while True: data = artnetsock.recv(10240) artnetHandler(data) except Exception: traceback.print_exc() finally: artnetsock.close() # # UI # class Interface(Frame): """Notre fenêtre principale. Tous les widgets sont stockés comme attributs de cette fenêtre.""" def __init__(self, myframe, **kwargs): Frame.__init__(self, myframe, width=350, height=576, bg="black", **kwargs) self.pack(fill=NONE) self.nb_clic = 0 # Création de nos widgets #self.message = Label(self, text=" Timecodes viewer ", bg="black", foreground="white") #self.message.place(x = 0, y = 25) #self.message.pack() #self.message.config(bg="black", foreground="white") self.link = Label(self, text="Link", bg="black", foreground="white", font = tktitle, justify ='left') #self.lbl0.place(x = 0, y = 55) self.link.pack() self.linkbpm = Label(self, text="..bpm..", bg="black", foreground="white", font = tkfont) #self.lbl0.place(x = 0, y = 55) self.linkbpm.pack() self.lbl1 = Label(self, text="..beats..", bg="black", foreground="white", font = tkfont) #self.lbl0.place(x = 0, y = 55) self.lbl1.pack() self.lbl2 = Label(self, text="OSC", bg="black", foreground=tkblue, font = tktitle, anchor =W) #self.lbl0.place(x = 0, y = 55) self.lbl2.pack() self.oscode = Label(self, text="..:..:..:.. ", bg="black", foreground=tkblue, font = tkfont) #self.lbl0.place(x = 0, y = 55) self.oscode.pack() self.artnet = Label(self, text="Artnet", bg="black", foreground=tkgreen, font = tktitle) #self.lbl0.place(x = 0, y = 55) self.artnet.pack() self.artnetcode = Label(self, text="..:..:..:..", bg="black", foreground=tkgreen, font = tkfont) #self.lbl0.place(x = 0, y = 55) self.artnetcode.pack() self.midi = Label(self, text="Midi", bg="black", foreground=tkred, font = tktitle) #self.lbl0.place(x = 0, y = 55) self.midi.pack() self.midibpm = Label(self, text="... bpm", bg="black", foreground=tkred, font = tkfont) #self.lbl0.place(x = 0, y = 55) self.midibpm.pack() self.midimtc = Label(self, text="..:..:..:..", bg="black", foreground=tkred, font = tkfont) #self.lbl0.place(x = 0, y = 55) self.midimtc.pack() self.quit_button = Button(self, text="Quit", command=self.quit, font = tktitle) #self.quit_button.configure(background = "green") self.quit_button.pack(side="bottom") self.update_UI() ''' self.bouton_cliquer = Button(self, text="Cliquez ici", fg="red", command=self.cliquer) self.bouton_cliquer.pack(side="right") ''' def update_link(self, msg): self.lbl1.configure(text = msg) def update_linkbpm(self, msg): self.linkbpm.configure(text = msg) def update_oscode(self, msg): self.oscode.configure(text = msg) def update_midibpm(self, msg): self.midibpm.configure(text = msg) def update_midimtc(self, msg): self.midimtc.configure(text = msg) def update_artnetcode(self, msg): self.artnetcode.configure(text = msg) def update_UI(self): BeatEvent() self.after(6, self.update_UI) ''' def cliquer(self): """Il y a eu un clic sur le bouton. On change la valeur du label message.""" self.nb_clic += 1 self.message["text"] = "Vous avez cliqué {} fois.".format(self.nb_clic) ''' if __name__ == '__main__': myframe = Tk() OSC_Start() LINK_Start() interface = Interface(myframe) artnet_subnet = 0 artnet_net = 0 artnet_universe = 0 ARTNET_Start() print('UI start...') print("Midi Start...") midix.check() log.infog('Running.') ''' sh = ArtNet(artnet_net, artnet_subnet, artnet_universe, serverIP, ARTNETPort) ''' interface.mainloop() #sh.stop() interface.destroy()