import sys import os import signal import subprocess import time import tty,termios import re import json from pathlib import Path import redis environ = { # "REDIS_IP" : "127.0.0.1", "REDIS_IP" : "192.168.2.44", "REDIS_PORT" : "6379", "REDIS_KEY" : "/pl/0/0", "REDIS_SCENE" : "0", "REDIS_LASER" : "0" } class bcolors: HL = '\033[31m' OKBLUE = '\033[94m' OKGREEN = '\033[92m' ENDC = '\033[0m' BOLD = '\033[1m' UNDERLINE = '\033[4m' class _Getch: def __call__(self): fd = sys.stdin.fileno() old_settings = termios.tcgetattr(fd) try: tty.setraw(sys.stdin.fileno()) ch = sys.stdin.read(1) finally: termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) return ch inkey = _Getch() def intkey(): try: i = int( inkey() ) return(i) except ValueError: print("Error.") current_id = 0 current_cmd = "" process = None current_filename = "" currentPlayList = [] playlistsDir = Path("./playlists") if not playlistsDir.is_dir() : playlistsDir.mkdir() def ask(q): print(q) return inkey() def _ok(msg): print( bcolors.BOLD+bcolors.OKBLUE+ msg + bcolors.ENDC) def _err(msg): print( bcolors.HL + msg + bcolors.ENDC) def _kill(process): if process : try: pid = os.getpgid(process.pid) os.killpg(pid, signal.SIGTERM) os.killpg(pid, signal.SIGKILL) os.kill(pid, signal.SIGTERM) os.kill(pid, signal.SIGKILL) process.terminate() process.kill() except Exception as e: print("woops:{}".format(e)) def _killBill(): subprocess.run("ps --ppid 1 -fo pid,sess,ppid,cmd | grep 'toRedis.py' | while read pid sid other; do pkill -9 -s $sid; done", shell=True,executable='/bin/bash') def action_info(): print(""" Welcome to LJ playlist manager Currently running on IP : {} Port : {} Key : {} Scene : {} Laser : {} """.format( environ["REDIS_IP"], environ["REDIS_PORT"], environ["REDIS_KEY"], environ["REDIS_SCENE"], environ["REDIS_LASER"] )) def action_changeCommand( inc ): global currentPlayList global current_id if 0 == len(currentPlayList): _err("Empty playlist") return False current_id = (current_id + 1) % len(currentPlayList) return True def action_match( k ): global current_id, currentPlayList if int(k) > (len(currentPlayList) - 1): print( bcolors.HL + "This key does not exist" + bcolors.ENDC ) return False else : _ok("Changed action id to {}.".format(k)) current_id = int(k) def action_runCommand(): global currentPlayList global current_id global process # Get new command try: current_cmd = currentPlayList[current_id] except IndexError as e: _err("woops:{}".format(e)) return False print("\n[!]New command:'{}'\n".format(current_cmd)) # Start subprocess try : _kill(process) process = subprocess.Popen("./_run.sh '"+current_cmd+" | exports/toRedis.py -i $REDIS_IP -k $REDIS_KEY'", shell=True, executable='/bin/bash', stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, env=environ, preexec_fn=os.setsid) except Exception as e: print("woops:{}".format(e)) def action_newCommand(): global currentPlayList print("Enter new command or e(x)it.") k = input() # Exit early if "x" == k: return(False) currentPlayList.append(k) print(bcolors.OKBLUE + "Command added" + bcolors.ENDC) return True def action_deleteCommand(): global currentPlayList print("Select sequence to delete or e(x)it.") action_listAll() key = int(input()) # Exit early if "x" == key: return(False) del currentPlayList[key] return True def action_listAll(): global currentPlayList, current_cmd, current_id print("\n--------------------------------------") for i,seq in enumerate(currentPlayList): pre="" suf="" if current_cmd == seq : pre = bcolors.HL suf = bcolors.ENDC print( pre + "{}\t{}".format(i,seq) + suf ) print("--------------------------------------\n") def action_edit(): print("Enter the command number to edit, or 'x' to abort.") k = intkey() if 'x' == k: return print("Enter the next value, or 'x' to abort.") value = input() if 'x' == value: return currentPlayList[k] = value def action_loadPlaylist(): global playlistsDir global currentPlayList global current_playlist_name # list files i=0 file_list = [x for x in playlistsDir.glob("*json")] if 0 == len(file_list ): print( bcolors.HL + "Error. No file in path '{}'\n".format(playlistsDir.name)) return False print("\n Id\tName") for k,name in enumerate(file_list) : print(" {}\t{}".format(k,name),flush=True) # ask file print("\nChoose a file or e(x)it:") k = intkey() if '' == k: print("Invalid choice: '{}'".format(k)) return # Exit early if "x" == k: return(False) # todo : helper for detecting invalid keys try: if k > (len(file_list) - 1): print( bcolors.HL + "This key '{}' does not exist".format(k) + bcolors.ENDC ) return False except TypeError: print( bcolors.HL + "This key '{}' is not valid".format(k) + bcolors.ENDC ) return False # @todo replace with _loadPlaylist playlistFile = Path("./playlists/"+file_list[k].name) currentPlayList = json.loads(playlistFile.read_text()) current_playlist_name = file_list[k].name current_id = 0 print( bcolors.OKBLUE + "Playlist loaded: {}\n".format(current_playlist_name)+ bcolors.ENDC) return True def _loadPlaylist( filename ): global currentPlayList, current_playlist_name, current_id try: playlistFile = Path(filename) currentPlayList = json.loads(playlistFile.read_text()) current_playlist_name = filename current_id = 0 _ok("Playlist loaded: {}\n".format(current_playlist_name)) return True except Exception as e: _err("_loadPlaylist error when loading '{}':{}".format(filename,e)) def action_newPlaylist(): global playlistsDir global currentPlayList # ask for name print("Enter new playlist name (without.json) or e(x)it question?") k = input() # Exit early if "x" == k: return(False) # save file currentPlayList = [] _savePlaylist( k+".json" ) currentPlayList = [] pass def _savePlaylist( playlistname ): global currentPlayList filepath = Path("playlists/{}".format(playlistname)) with filepath.open("w", encoding ="utf-8") as f: f.write(json.dumps(currentPlayList, indent=4, sort_keys=True)) return(True) def action_savePlaylist( name=False ): global current_playlist_name playlist_name = name if name else current_playlist_name if not playlist_name : _err("No name found.") return False try: _savePlaylist(playlist_name) print( bcolors.OKBLUE + "\nSaved as '{}'.\n".format(playlist_name) + bcolors.ENDC) except Exception as e: print("woops:{}".format(e)) return False def action_commandHelp(): global playlistsDir # iterate through files file_list=[] for folder in ["generators","filters","exports"]: p = Path("./"+folder) for plFile in Path("./"+folder).iterdir() : if re.match("^.*py$",plFile.name): file_list.append(os.path.join(folder,plFile.name)) print("\n Id\tFile") for k,filename in enumerate(file_list): print(" {}\t{}".format(k,filename)) print("\nChoose a file:") k = int(input()) print("\n-----------------------------------------------\n") subprocess.run("python3 "+file_list[k]+" -h", shell=True, executable='/bin/bash') print("\n-----------------------------------------------\n") def _setKey( laser=0, scene=0 ): global environ laser = laser if laser else environ["REDIS_LASER"] scene = scene if scene else environ["REDIS_SCENE"] new_key = "/pl/{}/{}".format(scene,laser) environ["REDIS_KEY"] = new_key print("Sending new key '{}'".format(new_key)) def action_laserId(): k = int(ask("Enter the LJ Laser id [0-3]")) _setKey( laser = k ) def action_laserScene(): k = int(ask("Enter the LJ Scene id [0-3]")) _setKey( scene = k ) def action_killPid(): print("Enter pid to kill") kill_pid = input() subprocess.run("pkill -9 -s $(awk '{print $6}' /proc/$kill_pid/stat)", shell=True,executable='/bin/bash', env={"kill_pid":kill_pid}) def action_quit(): print("Quit [Y/n]?") global process quit = inkey() if quit != "n": _kill(process) sys.exit(1)