#!/usr/bin/python3 # -*- coding: utf-8 -*- # -*- mode: Python -*- ''' redilysis colors v0.1.0 A complex effect that depends on redis keys for audio analysis see https://git.interhacker.space/teamlase/redilysis for more informations about the redilysis project Licensed under GNU GPLv3 by cocoa ''' from __future__ import print_function import argparse import ast import os import math import random import redis import sys import time name = "filters::redilysis_colors" def debug(*args, **kwargs): if( verbose == False ): return print(*args, file=sys.stderr, **kwargs) def msNow(): return time.time() # The list of available modes => redis keys each requires to run oModeList = { } def rgb2int(rgb): return int('0x%02x%02x%02x' % tuple(rgb),0) def int2rgb(intcode): #hexcode = hex(intcode)[2:] hexcode = '{0:06X}'.format(intcode) return tuple(int(hexcode[i:i+2], 16) for i in (0, 2, 4)) #return tuple(map(ord,hexcode[1:].decode('hex'))) CHAOS = 1 REDIS_FREQ = 100 # General Args argsparser = argparse.ArgumentParser(description="Redilysis filter") argsparser.add_argument("-f","--fps",help="Frame Per Second",default=30,type=int) argsparser.add_argument("-v","--verbose",action="store_true",help="Verbose") # Redis Args argsparser.add_argument("-i","--ip",help="IP address of the Redis server ",default="127.0.0.1",type=str) argsparser.add_argument("-p","--port",help="Port of the Redis server ",default="6379",type=str) argsparser.add_argument("-s","--redis-freq",help="Query Redis every x (in milliseconds). Default:{}".format(REDIS_FREQ),default=REDIS_FREQ,type=int) # Modes And Common Modes Parameters #argsparser.add_argument("-m","--modelist",required=False,help="Comma separated list of modes to use from: {}".format("i, ".join(oModeList.keys())),type=str) argsparser.add_argument("-c","--chaos",help="How much disorder to bring. High value = More chaos. Default {}".format(CHAOS), default=CHAOS, type=float) args = argsparser.parse_args() fps = args.fps ip = args.ip port = args.port redisFreq = args.redis_freq / 1000 verbose = args.verbose chaos = float(args.chaos) optimal_looptime = 1 / fps max_width = 800 max_height = 800 redisKeys = ["rms","spectrum_10","spectrum_120"] debug(name,"Redis Keys:{}".format(redisKeys)) redisData = {} redisLastHit = msNow() - 99999 r = redis.Redis( host=ip, port=port) def refreshRedis(): global redisData for key in redisKeys: try: redisData[key] = ast.literal_eval(r.get(key).decode('ascii')) except : debug("Error when reading redis key '{}".format(key)) def gauss(x, mu, sigma): return( math.exp(-math.pow((x-mu),2)/(2*math.pow(sigma,2))/math.sqrt(2*math.pi*math.pow(sigma,2)))) spect10Correct = [ 6.0, 1.5, 1.0, 1.0, 1.0, 1.0, 1.0, 0.8, 0.6, 0.5, ] def default( pl ): global redisData spect = redisData["spectrum_10"] debug(name, "spect:{}".format(spect)) new_list = [] # We want to color points that are on left and right when high is strong # i.e. the farther the distance from spectrum, the higher notes have influence # power = 0-1 # x = 800 spec[2]= 6.0 spec[7]=0.0 power=0.0 # x = 0 spec[2]= 6.0 spec[7]=0.0 power=0.0 # x = 0 spec[2]= 1.0 spec[7]=0.5 power=1.0 # dist 0 = 1 # 400 - 400 : maxW/2 -x # 399 = -1 : x - 400 # 401 = 1 # x = 400 spec[2]= 6.0 spec[7]=0.0 power=1.0 # x = 400 spec[2]= 1.0 spec[7]=0.5 power=0.0 for i, point in enumerate(pl): ocolor = pl[i][2] if ocolor == 0 : new_list.append(point) continue colorTuple = int2rgb(ocolor) x = point[0] dist = abs(x - max_width/2) key = int(2* dist / max_width * 7) try: power = spect[key] / spect10Correct[key] * chaos except: pass color = [] for i in colorTuple: new_color = int(i * power) if new_color > 255 : new_color = 255 if new_color < 0 : new_color = 0 color.append( new_color ) color = rgb2int(tuple(color)) point[2] = color new_list.append(point) #debug(name,"x:{}\t dist:{}\t key:{}\t power:{}\t ocolor:{}\t color:{}".format(point[0], dist, key,power, ocolor, pl[i][2])) debug( name,"rms_noise output:{}".format(new_list)) return new_list try: while True: refreshRedis() start = time.time() line = sys.stdin.readline() if line == "": time.sleep(0.01) line = line.rstrip('\n') pointsList = ast.literal_eval(line) # Do the filter pointsList = default(pointsList) print( pointsList, flush=True ) looptime = time.time() - start # debug(name+" looptime:"+str(looptime)) if( looptime < optimal_looptime ): time.sleep( optimal_looptime - looptime) # debug(name+" micro sleep:"+str( optimal_looptime - looptime)) except EOFError: debug(name+" break")# no more information