This commit is contained in:
alban 2020-11-11 17:31:08 +01:00
commit 6d70ca0c32
39 changed files with 13735 additions and 0 deletions

200
filters/anaglyph.py Executable file
View file

@ -0,0 +1,200 @@
#!/usr/bin/python3
# -*- coding: utf-8 -*-
# -*- mode: Python -*-
'''
anaglyph
v0.1.0
Attempts to create a valid 3D-glasses structure
LICENCE : CC
by cocoa
'''
from __future__ import print_function
import argparse
import ast
import math
import os
import random
import sys
import time
name = "filters::cycle"
maxDist = 300
argsparser = argparse.ArgumentParser(description="Redis exporter LJ")
argsparser.add_argument("-x","--centerX",help="geometrical center X position",default=400,type=int)
argsparser.add_argument("-y","--centerY",help="geometrical center Y position",default=400,type=int)
argsparser.add_argument("-m","--min",help="Minimal displacement (default:2) ",default=1,type=int)
argsparser.add_argument("-M","--max",help="Maximal displacement (default:20) ",default=5,type=int)
argsparser.add_argument("-f","--fps",help="Frame Per Second",default=30,type=int)
argsparser.add_argument("-v","--verbose",action="store_true",help="Verbose")
args = argsparser.parse_args()
fps = args.fps
minVal = args.min
maxVal = args.max
centerX = args.centerX
centerY = args.centerY
verbose = args.verbose
optimal_looptime = 1 / fps
name = "filters::anaglyph"
def debug(*args, **kwargs):
if( verbose == False ):
return
print(*args, file=sys.stderr, **kwargs)
def rgb2int(rgb):
#debug(name,"::rgb2int rbg:{}".format(rgb))
return int('0x%02x%02x%02x' % tuple(rgb),0)
def isValidColor( color, intensityColThreshold ):
if color[0] + color[1] + color[2] > intensityColThreshold:
return True
return False
# These are paper colors
red = (41,24,24)
white = (95,95,95)
blue = (0,41,64)
red = (127,0,0)
blue = (0,128,128)
white = (128,128,128)
def anaglyph( pl ):
debug(name,'--------------- new loop ------------------')
# We will send one list after the other to optimize color change
blueList = list()
redList = list()
whiteList = list()
out = []
out1 = []
out2 = []
out3 = []
# The anaglyphic effect will be optained by :
# * having close objects appear as white
# * having distant objects appear as blue + red
# * having in between objects appear as distanceDecreased(white) + blue + red
for i, point in enumerate(pl):
ref_x = point[0]-centerX
ref_y = point[1]-centerY
ref_color = point[2]
angle = math.atan2( ref_x , ref_y )
dist = ref_y / math.cos(angle)
white_rvb = (0,0,0)
blue_rvb = (0,0,0)
red_rvb = (0,0,0)
# Calculate the point's spread factor (0.0 to 1.0)
# The spread is high if the point is close to center
"""
dist = 0 : spread = 1.0
dist = maxDist spread = 0.0
"""
if dist == 0:
spread = 1.0
else :
spread =( maxDist - dist ) / maxDist
if spread < 0.0:
spread = 0.0
#debug(name,"dist:{} spread:{}".format(dist,spread))
# White color is high if spread is low, i.e. point away from center
"""
spread = 1.0 : white_c = 0.0
spread = 0.0 : whice_c = 1.0
"""
if point[2] == 0:
white_color = 0
else:
white_factor = 1.0 - math.pow(spread,0.5)
white_rvb = tuple(map( lambda a: int(white_factor* a), white))
white_color = rgb2int( white_rvb)
#debug(name,"spread:{}\t white_rvb:{}\t white_color:{}".format(spread, white_rvb, white_color))
# Blue and Red colors are high if spread is high, i.e. close to center
"""
spread = 1.0 : red_c = 1.0
spread = 0.0 : red_c = 0.0
"""
color_factor = math.pow(spread,1)
if point[2] == 0:
blue_color = 0
red_color = 0
else:
blue_rvb = tuple(map( lambda a: int(color_factor * a), blue))
blue_color = rgb2int( blue_rvb)
red_rvb = tuple(map( lambda a: int(color_factor * a), red))
red_color = rgb2int( red_rvb)
#debug(name,"color_factor:{}\t\t blue_color:{}\t\t red_color:{}".format(color_factor,blue_color,red_color))
# Blue-to-Red spatial spread is high when spread is high, i.e. point close to center
"""
spread = 1.0 : spatial_spread = maxVal
spread = 0.0 : spatial_spread = minVal
"""
spatial_spread = minVal + spread * (maxVal - minVal)
#debug(name,"spatial_spread:{}".format(spatial_spread))
red_x = int(point[0] + spatial_spread)
blue_x = int(point[0] - spatial_spread )
red_y = int(point[1] )
blue_y = int(point[1])
white_point = [point[0], point[1], white_color]
blue_point = [blue_x,blue_y,blue_color]
red_point = [red_x,red_y,red_color]
#debug(name,"white[x,y,c]:{}".format(white_point))
#debug(name,"blue[x,y,c]:{}".format(blue_point))
#debug(name,"red[x,y,c]:{}".format(red_point))
# Do not append "black lines" i.e. a color where each composent is below X
# if isValidColor(white_rvb, 150):
# out1.append(white_point)
# if isValidColor(blue_rvb, 50):
# out2.append(blue_point)
# if isValidColor(red_rvb, 30):
# out3.append(red_point)
out1.append(white_point)
out2.append(blue_point)
out3.append(red_point)
#debug(name,"source pl:{}".format(pl))
debug(name,"whiteList:{}".format(out1))
debug(name,"blueList:{}".format(out2))
debug(name,"redList:{}".format(out3))
return out1 + out3 + out2
#return out1 + out2 + out3
try:
while True:
start = time.time()
line = sys.stdin.readline()
if line == "":
time.sleep(0.01)
line = line.rstrip('\n')
pointsList = ast.literal_eval(line)
# Do the filter
result = anaglyph( pointsList )
print( result, flush=True )
looptime = time.time() - start
# debug(name+" looptime:"+str(looptime))
if( looptime < optimal_looptime ):
time.sleep( optimal_looptime - looptime)
# debug(name+" micro sleep:"+str( optimal_looptime - looptime))
except EOFError:
debug(name+" break")# no more information

108
filters/colorcycle.py Executable file
View file

@ -0,0 +1,108 @@
#!/usr/bin/python3
# -*- coding: utf-8 -*-
# -*- mode: Python -*-
'''
colorcycle
v0.1.0
A simple effect : cycle colors
LICENCE : CC
by cocoa
'''
from __future__ import print_function
import sys
import ast
import os
import argparse
import random
import time
name = "filters::cycle"
argsparser = argparse.ArgumentParser(description="Redis exporter LJ")
argsparser.add_argument("-x","--centerX",help="geometrical center X position",default=300,type=int)
argsparser.add_argument("-y","--centerY",help="geometrical center Y position",default=300,type=int)
argsparser.add_argument("-m","--min",help="Lowest value in the range 0-255",default=10,type=int)
argsparser.add_argument("-M","--max",help="Highest value in the range 0-255",default=255,type=int)
argsparser.add_argument("-f","--fps",help="Frame Per Second",default=30,type=int)
argsparser.add_argument("-v","--verbose",action="store_true",help="Verbose")
args = argsparser.parse_args()
fps = args.fps
minVal = args.min
maxVal = args.max
centerX = args.centerX
centerY = args.centerY
verbose = args.verbose
optimal_looptime = 1 / fps
UP = 5
DOWN = -5
currentColor = [0,0,0]
composant = 0
currentDirection = UP
def debug(*args, **kwargs):
if( verbose == False ):
return
print(*args, file=sys.stderr, **kwargs)
def rgb2int(rgb):
return int('0x%02x%02x%02x' % tuple(rgb),0)
def cycleColor( pl ):
global composant
global currentDirection
# debug(name,"pl:{}".format(pl))
value = currentColor[composant]
if currentDirection == UP:
target = maxVal
else:
target = minVal
value += currentDirection
currentColor[composant] = value
debug(name,"currentColor:{}".format(currentColor))
for i in range( 0, len(pl)):
if pl[i][2] != 0:
pl[i][2] = rgb2int( currentColor)
# change the composant if target reached
if value <= target and currentDirection == DOWN or value >= target and currentDirection == UP :
composant = random.randint( 0,2)
value = currentColor[composant]
if value == 0 :
currentDirection = UP
else:
currentDirection = DOWN
#debug( "pl:{}".format(pl))
return pl
try:
while True:
start = time.time()
line = sys.stdin.readline()
if line == "":
time.sleep(0.01)
line = line.rstrip('\n')
pointsList = ast.literal_eval(line)
# Do the filter
result = cycleColor( pointsList )
print( result, flush=True )
looptime = time.time() - start
# debug(name+" looptime:"+str(looptime))
if( looptime < optimal_looptime ):
time.sleep( optimal_looptime - looptime)
# debug(name+" micro sleep:"+str( optimal_looptime - looptime))
except EOFError:
debug(name+" break")# no more information

174
filters/kaleidoscope.py Executable file
View file

@ -0,0 +1,174 @@
#!/usr/bin/python3
# -*- coding: utf-8 -*-
# -*- mode: Python -*-
'''
kaleidoscop
v0.1.0
A simple effect : mirror a quadrant of the input
LICENCE : CC
by Sam Neurohack
'''
from __future__ import print_function
import sys
import ast
import os
import argparse
ljpath = r'%s' % os.getcwd().replace('\\','/')
sys.path.append(ljpath +'/../libs/')
sys.path.append(ljpath +'/libs/')
import time
name = "filters::kaleidoscope"
argsparser = argparse.ArgumentParser(description="Redis exporter LJ")
argsparser.add_argument("-x","--centerX",help="geometrical center X position",default=400,type=int)
argsparser.add_argument("-y","--centerY",help="geometrical center Y position",default=400,type=int)
argsparser.add_argument("-f","--fps",help="Frame Per Second",default=30,type=int)
argsparser.add_argument("-v","--verbose",action="store_true",help="Verbose")
args = argsparser.parse_args()
fps = args.fps
centerX = args.centerX
centerY = args.centerY
verbose = args.verbose
optimal_looptime = 1 / fps
def debug(*args, **kwargs):
if( verbose == False ):
return
print(*args, file=sys.stderr, **kwargs)
def kaleidoscope( pl ):
# Stage 1: Crop points in single quadrant
quad1 = []
# Iterate trough the segments
for i in range( 0, len(pl) ):
#debug(name+" point #", i)
currentpoint = cp = pl[i]
cx,cy,cc = [cp[0],cp[1],cp[2]]
# Exception: escape early if last point
if i == len(pl) - 1:
if cx >= centerX and cy >= centerY :
quad1.append( currentpoint )
break
# Search for the couple of points
nextpoint = pl[i+1]
nx,ny,nc = [nextpoint[0],nextpoint[1],nextpoint[2]]
rect=[[cx,cy],[cx,ny],[nx,ny],[nx,cy]]
right = wrong = 0
#debug(name+" rect: ", rect,"curr",currentpoint,"next",nextpoint )
# Enumerate the points in rectangle to see
# how many right / wrong there are
# either to add or skip early
for iterator, p in enumerate(rect):
if p[0] >= centerX and p[1] >= centerY:
right += 1
else:
#if p[0] <= centerX and p[1] <= centerY:
wrong += 1
# If all rectangle points are in the right quadrant, Add and Skip
if right == 4:
quad1.append(pl[i])
#debug(name+" found valid point", pl[i])
continue
# If all rectangle points in wrong quadrant, Skip
if wrong == 4:
#debug(name+" found bad point", pl[i])
continue
# Find the (x,y) intersections
#
#debug(name+" Looking for crossing point between ("+str(cx)+","+str(cy)+") and ("+str(nx)+","+str(ny)+")")
delta=[ nx - cx, ny - cy ]
#debug(name+" delta:",delta)
crossX = None
crossY = None
absnewX = 0
absnewY = 0
# If one point has negative x, search y axis crossing
if cx < centerX or nx < centerX:
if delta[0] == 0 :
delta[0] = 0.0000001
v=[ delta[0]/abs(delta[0]), delta[1]/abs(delta[0]) ]
absnewX = abs( centerX - cx )
#print("on y axis, v=",str(v)," and absnewX=",str(absnewX))
crossX = [( absnewX*v[0] + cx ),( absnewX*v[1]+cy ), nc]
# If one point has negative y, search x axis crossing
if cy < centerY or ny < centerY:
if delta[1] == 0 :
delta[1] = 0.0000001
v=[ delta[0]/abs(delta[1]), delta[1]/abs(delta[1])]
absnewY = abs( centerY - cy )
#print("on x axis, v=",str(v)," and absnewY=",str(absnewY))
crossY = [( absnewY*v[0] + cy ),( absnewY*v[1]+cy ), nc]
# Inject in order
# If current point is the quadrant, add it
if cx >= centerX and cy >= centerY :
quad1.append( currentpoint )
# If absnewX smaller, it is closest to currentPoint
if absnewX < absnewY:
if None != crossX : quad1.append( crossX )
if None != crossY : quad1.append( crossY )
else :
if None != crossY : quad1.append( crossY )
if None != crossX : quad1.append( crossX )
# Add a black point at the end
#lastQuad1Point = quad1[-1]
#quad1.append( [lastQuad1Point[0],lastQuad1Point[1],0] )
## Stage 2 : Mirror points
#
quad2 = []
# quad2 = vertical symetric of quad1
for iterator in range( len(quad1) -1 , -1, -1):
point = quad1[iterator]
quad2.append([ point[0], 2*centerY - point[1], point[2] ])
# quad3 is the merge of 1 and 2
quad3 = quad1 + quad2
# quad4 is the horizontal symetric of quad3
quad4 = []
for iterator in range( len(quad3) -1, -1, -1):
point = quad3[iterator]
quad4.append([ 2*centerX - point[0], point[1], point[2] ])
#debug(name+" quad1:",quad1)
#debug(name+" quad2:", quad2 )
#debug(name+" quad3:", quad3 )
#debug(name+" quad4:", quad4 )
return quad3+quad4
try:
while True:
start = time.time()
line = sys.stdin.readline()
if line == "":
time.sleep(0.01)
line = line.rstrip('\n')
pointsList = ast.literal_eval(line)
# Do the filter
result = kaleidoscope( pointsList )
print( result, flush=True )
looptime = time.time() - start
# debug(name+" looptime:"+str(looptime))
if( looptime < optimal_looptime ):
time.sleep( optimal_looptime - looptime)
# debug(name+" micro sleep:"+str( optimal_looptime - looptime))
except EOFError:
debug(name+" break")# no more information

300
filters/redilysis.py Executable file
View file

@ -0,0 +1,300 @@
#!/usr/bin/python3
# -*- coding: utf-8 -*-
# -*- mode: Python -*-
'''
redilysis
v0.1.0
A complex effect that depends on redis keys for audio analysis
see https://git.interhacker.space/teamlase/redilysis for more informations
about the redilysis project
LICENCE : CC
by cocoa
'''
from __future__ import print_function
import argparse
import ast
import os
import math
import random
import redis
import sys
import time
name = "filters::redilysis"
def debug(*args, **kwargs):
if( verbose == False ):
return
print(*args, file=sys.stderr, **kwargs)
def msNow():
return time.time()
# The list of available modes => redis keys each requires to run
oModeList = {
"rms_noise": ["rms"],
"rms_size": ["rms"],
"bpm_size": ["bpm"],
"bpm_detect_size": ["bpm","bpm_delay","bpm_sample_interval","beats"]
}
CHAOS = 1
REDISLATENCY = 30
REDIS_FREQ = 300
# General Args
argsparser = argparse.ArgumentParser(description="Redilysis filter")
argsparser.add_argument("-v","--verbose",action="store_true",help="Verbose")
# Redis Args
argsparser.add_argument("-i","--ip",help="IP address of the Redis server ",default="127.0.0.1",type=str)
argsparser.add_argument("-p","--port",help="Port of the Redis server ",default="6379",type=str)
argsparser.add_argument("-s","--redis-freq",help="Query Redis every x (in milliseconds). Default:{}".format(REDIS_FREQ),default=REDIS_FREQ,type=int)
# General args
argsparser.add_argument("-x","--centerX",help="geometrical center X position",default=400,type=int)
argsparser.add_argument("-y","--centerY",help="geometrical center Y position",default=400,type=int)
argsparser.add_argument("-f","--fps",help="Frame Per Second",default=30,type=int)
# Modes And Common Modes Parameters
argsparser.add_argument("-l","--redisLatency",help="Latency in ms to substract. Default:{}".format(REDISLATENCY),default=REDISLATENCY,type=float)
argsparser.add_argument("-m","--modelist",required=True,help="Comma separated list of modes to use from: {}".format("i, ".join(oModeList.keys())),type=str)
argsparser.add_argument("--chaos",help="How much disorder to bring. High value = More chaos. Default {}".format(CHAOS), default=CHAOS, type=str)
args = argsparser.parse_args()
ip = args.ip
port = args.port
redisFreq = args.redis_freq / 1000
verbose = args.verbose
fps = args.fps
centerX = args.centerX
centerY = args.centerY
redisLatency = args.redisLatency
chaos = float(args.chaos)
optimal_looptime = 1 / fps
modeList = args.modelist.split(",")
redisKeys = []
for mode in modeList:
if not mode in oModeList:
print("Mode '{}' is invalid. Exiting.".format(mode))
sys.exit(2)
redisKeys += oModeList[mode]
redisKeys = list(set(redisKeys))
debug(name,"Redis Keys:{}".format(redisKeys))
redisData = {}
redisLastHit = msNow() - 99999
r = redis.Redis(
host=ip,
port=port)
# Records the last bpm
tsLastBeat = time.time()
def gauss(x, mu, sigma):
return( math.exp(-math.pow((x-mu),2)/(2*math.pow(sigma,2))/math.sqrt(2*math.pi*math.pow(sigma,2))))
previousPTTL = 0
tsNextBeatsList = []
def bpmDetect( ):
"""
An helper to compute the next beat time in milliseconds
Returns True if the cache was updated
"""
global tsNextBeatsList
global previousPTTL
global redisLastHit
global redisLatency
# Get the redis PTTL value for bpm
PTTL = redisData["bpm_pttl"]
# Skip early if PTTL < 0
if PTTL < 0 :
debug(name,"bpmDetect skip detection : PTTL expired for 'bpm' key")
return False
# Skip early if the record hasn't been rewritten
if PTTL <= previousPTTL :
previousPTTL = PTTL
#debug(name,"bpmDetect skip detection : {} <= {}".format(PTTL, previousPTTL))
return False
debug(name,"bpmDetect running detection : {} > {}".format(PTTL, previousPTTL))
previousPTTL = PTTL
# Skip early if beat list is empty
beatsList = ast.literal_eval(redisData["beats"])
tsNextBeatsList = []
if( len(beatsList) == 0 ):
return True
# Read from redis
bpm = float(redisData["bpm"])
msBpmDelay = float(redisData["bpm_delay"])
samplingInterval = float(redisData["bpm_sample_interval"])
# Calculate some interpolations
lastBeatTiming = float(beatsList[len(beatsList) - 1])
msPTTLDelta = 2 * samplingInterval - float(PTTL)
sPerBeat = 60 / bpm
lastBeatDelay = msBpmDelay - lastBeatTiming*1000 + msPTTLDelta
countBeatsPast = math.floor( (lastBeatDelay / 1000) / sPerBeat)
#debug(name,"bpmDetect lastBeatTiming:{}\tmsPTTLDelta:{}\tsPerBeat:{}".format(lastBeatTiming,msPTTLDelta,sPerBeat))
#debug(name,"lastBeatDelay:{}\t countBeatsPast:{}".format(lastBeatDelay, countBeatsPast))
for i in range( countBeatsPast, 1000):
beatTime = i * sPerBeat - lastBeatTiming
if beatTime < 0:
continue
if beatTime * 1000 > 2 * samplingInterval :
break
#debug(name, "bpmDetect beat add beatTime:{} redisLastHit:{}".format(beatTime, redisLastHit))
tsNextBeatsList.append( redisLastHit + beatTime - redisLatency/1000)
debug(name, "bpmDetect new tsNextBeatsList:{}".format(tsNextBeatsList))
return True
def bpm_detect_size( pl ):
bpmDetect()
# Find the next beat in the list
tsNextBeat = 0
now = time.time()
msNearestBeat = None
msRelativeNextBTList = list(map( lambda a: abs(now - a) * 1000, tsNextBeatsList))
msToBeat = min( msRelativeNextBTList)
#debug(name,"bpm_detect_size msRelativeNextBTList:{} msToBeat:{}".format(msRelativeNextBTList,msToBeat))
# Calculate the intensity based on bpm coming/leaving
# The curb is a gaussian
mu = 15
intensity = gauss( msToBeat, 0 , mu)
#debug(name,"bpm_size","mu:{}\t msToBeat:{}\tintensity:{}".format(mu, msToBeat, intensity))
if msToBeat < 20:
debug(name,"bpm_detect_size kick:{}".format(msToBeat))
pass
for i, point in enumerate(pl):
ref_x = point[0]-centerX
ref_y = point[1]-centerY
#debug(name,"In new ref x:{} y:{}".format(point[0]-centerX,point[1]-centerY))
angle=math.atan2( point[0] - centerX , point[1] - centerY )
l = ref_y / math.cos(angle)
new_l = l * intensity
#debug(name,"bpm_size","angle:{} l:{} new_l:{}".format(angle,l,new_l))
new_x = math.sin(angle) * new_l + centerX
new_y = math.cos(angle) * new_l + centerY
#debug(name,"x,y:({},{}) x',y':({},{})".format(point[0],point[1],new_x,new_y))
pl[i][0] = new_x
pl[i][1] = new_y
#debug( name,"bpm_detect_size output:{}".format(pl))
return( pl );
def bpm_size( pl ):
global tsLastBeat
bpm = float(redisData["bpm"])
# msseconds ber beat
msPerBeat = int(60 / bpm * 1000)
# Calculate the intensity based on bpm coming/leaving
# The curb is a gaussian
mu = math.sqrt(msPerBeat)
msTimeToLastBeat = (time.time() - tsLastBeat) * 1000
msTimeToNextBeat = (msPerBeat - msTimeToLastBeat)
intensity = gauss( msTimeToNextBeat, 0 , mu)
debug(name,"bpm_size","msPerBeat:{}\tmu:{}".format(msPerBeat, mu))
debug(name,"bpm_size","msTimeToLastBeat:{}\tmsTimeToNextBeat:{}\tintensity:{}".format(msTimeToLastBeat, msTimeToNextBeat, intensity))
if msTimeToNextBeat <= 0 :
tsLastBeat = time.time()
for i, point in enumerate(pl):
ref_x = point[0]-centerX
ref_y = point[1]-centerY
#debug(name,"In new ref x:{} y:{}".format(point[0]-centerX,point[1]-centerY))
angle=math.atan2( point[0] - centerX , point[1] - centerY )
l = ref_y / math.cos(angle)
new_l = l * intensity
#debug(name,"bpm_size","angle:{} l:{} new_l:{}".format(angle,l,new_l))
new_x = math.sin(angle) * new_l + centerX
new_y = math.cos(angle) * new_l + centerY
#debug(name,"x,y:({},{}) x',y':({},{})".format(point[0],point[1],new_x,new_y))
pl[i][0] = new_x
pl[i][1] = new_y
#debug( name,"bpm_noise output:{}".format(pl))
return pl
def rms_size( pl ):
rms = float(redisData["rms"])
for i, point in enumerate(pl):
ref_x = point[0]-centerX
ref_y = point[1]-centerY
debug(name,"In new ref x:{} y:{}".format(point[0]-centerX,point[1]-centerY))
angle=math.atan2( point[0] - centerX , point[1] - centerY )
l = ref_y / math.cos(angle)
debug(name,"angle:{} l:{}".format(angle,l))
new_l = l + rms * chaos
new_x = math.sin(angle) * new_l + centerX
new_y = math.cos(angle) * new_l + centerY
debug(name,"x,y:({},{}) x',y':({},{})".format(point[0],point[1],new_x,new_y))
pl[i][0] = new_x
pl[i][1] = new_y
#debug( name,"rms_noise output:{}".format(pl))
return pl
def rms_noise( pl ):
rms = float(redisData["rms"])
debug(name, "pl:{}".format(pl))
for i, point in enumerate(pl):
#debug(name,"rms_noise chaos:{} rms:{}".format(chaos, rms))
xRandom = random.uniform(-1,1) * rms * chaos
yRandom = random.uniform(-1,1) * rms * chaos
#debug(name,"rms_noise xRandom:{} yRandom:{}".format(xRandom, yRandom))
pl[i][0] += xRandom
pl[i][1] += yRandom
#debug( name,"rms_noise output:{}".format(pl))
return pl
def refreshRedis():
global redisLastHit
global redisData
# Skip if cache is sufficent
diff = msNow() - redisLastHit
if diff < redisFreq :
#debug(name, "refreshRedis not updating redis, {} < {}".format(diff, redisFreq))
pass
else:
#debug(name, "refreshRedis updating redis, {} > {}".format(diff, redisFreq))
redisLastHit = msNow()
for key in redisKeys:
redisData[key] = r.get(key).decode('ascii')
#debug(name,"refreshRedis key:{} value:{}".format(key,redisData[key]))
# Only update the TTLs
if 'bpm' in redisKeys:
redisData['bpm_pttl'] = r.pttl('bpm')
#debug(name,"refreshRedis key:bpm_ttl value:{}".format(redisData["bpm_pttl"]))
#debug(name,"redisData:{}".format(redisData))
return True
try:
while True:
refreshRedis()
start = time.time()
line = sys.stdin.readline()
if line == "":
time.sleep(0.01)
line = line.rstrip('\n')
pointsList = ast.literal_eval(line)
# Do the filter
for mode in modeList:
pointsList = locals()[mode](pointsList)
print( pointsList, flush=True )
looptime = time.time() - start
# debug(name+" looptime:"+str(looptime))
if( looptime < optimal_looptime ):
time.sleep( optimal_looptime - looptime)
# debug(name+" micro sleep:"+str( optimal_looptime - looptime))
except EOFError:
debug(name+" break")# no more information

186
filters/redilysis_colors.py Normal file
View file

@ -0,0 +1,186 @@
#!/usr/bin/python3
# -*- coding: utf-8 -*-
# -*- mode: Python -*-
'''
redilysis colors
v0.1.0
A complex effect that depends on redis keys for audio analysis
see https://git.interhacker.space/teamlase/redilysis for more informations
about the redilysis project
LICENCE : CC
by cocoa
'''
from __future__ import print_function
import argparse
import ast
import os
import math
import random
import redis
import sys
import time
name = "filters::redilysis_colors"
def debug(*args, **kwargs):
if( verbose == False ):
return
print(*args, file=sys.stderr, **kwargs)
def msNow():
return time.time()
# The list of available modes => redis keys each requires to run
oModeList = {
}
def rgb2int(rgb):
return int('0x%02x%02x%02x' % tuple(rgb),0)
def int2rgb(intcode):
#hexcode = hex(intcode)[2:]
hexcode = '{0:06X}'.format(intcode)
return tuple(int(hexcode[i:i+2], 16) for i in (0, 2, 4))
#return tuple(map(ord,hexcode[1:].decode('hex')))
CHAOS = 1
REDIS_FREQ = 100
# General Args
argsparser = argparse.ArgumentParser(description="Redilysis filter")
argsparser.add_argument("-f","--fps",help="Frame Per Second",default=30,type=int)
argsparser.add_argument("-v","--verbose",action="store_true",help="Verbose")
# Redis Args
argsparser.add_argument("-i","--ip",help="IP address of the Redis server ",default="127.0.0.1",type=str)
argsparser.add_argument("-p","--port",help="Port of the Redis server ",default="6379",type=str)
argsparser.add_argument("-s","--redis-freq",help="Query Redis every x (in milliseconds). Default:{}".format(REDIS_FREQ),default=REDIS_FREQ,type=int)
# Modes And Common Modes Parameters
#argsparser.add_argument("-m","--modelist",required=False,help="Comma separated list of modes to use from: {}".format("i, ".join(oModeList.keys())),type=str)
argsparser.add_argument("-c","--chaos",help="How much disorder to bring. High value = More chaos. Default {}".format(CHAOS), default=CHAOS, type=float)
args = argsparser.parse_args()
fps = args.fps
ip = args.ip
port = args.port
redisFreq = args.redis_freq / 1000
verbose = args.verbose
chaos = float(args.chaos)
optimal_looptime = 1 / fps
max_width = 800
max_height = 800
redisKeys = ["rms","spectrum_10","spectrum_120"]
debug(name,"Redis Keys:{}".format(redisKeys))
redisData = {}
redisLastHit = msNow() - 99999
r = redis.Redis(
host=ip,
port=port)
def refreshRedis():
global redisData
for key in redisKeys:
try:
redisData[key] = ast.literal_eval(r.get(key).decode('ascii'))
except :
debug("Error when reading redis key '{}".format(key))
def gauss(x, mu, sigma):
return( math.exp(-math.pow((x-mu),2)/(2*math.pow(sigma,2))/math.sqrt(2*math.pi*math.pow(sigma,2))))
spect10Correct = [
6.0,
1.5,
1.0,
1.0,
1.0,
1.0,
1.0,
0.8,
0.6,
0.5,
]
def default( pl ):
global redisData
spect = redisData["spectrum_10"]
debug(name, "spect:{}".format(spect))
new_list = []
# We want to color points that are on left and right when high is strong
# i.e. the farther the distance from spectrum, the higher notes have influence
# power = 0-1
# x = 800 spec[2]= 6.0 spec[7]=0.0 power=0.0
# x = 0 spec[2]= 6.0 spec[7]=0.0 power=0.0
# x = 0 spec[2]= 1.0 spec[7]=0.5 power=1.0
# dist 0 = 1
# 400 - 400 : maxW/2 -x
# 399 = -1 : x - 400
# 401 = 1
# x = 400 spec[2]= 6.0 spec[7]=0.0 power=1.0
# x = 400 spec[2]= 1.0 spec[7]=0.5 power=0.0
for i, point in enumerate(pl):
ocolor = pl[i][2]
if ocolor == 0 :
new_list.append(point)
continue
colorTuple = int2rgb(ocolor)
x = point[0]
dist = abs(x - max_width/2)
key = int(2* dist / max_width * 8)
power = spect[key] / spect10Correct[key] * chaos
color = []
for i in colorTuple:
new_color = int(i * power)
if new_color > 255 :
new_color = 255
if new_color < 0 :
new_color = 0
color.append( new_color )
color = rgb2int(tuple(color))
point[2] = color
new_list.append(point)
#debug(name,"x:{}\t dist:{}\t key:{}\t power:{}\t ocolor:{}\t color:{}".format(point[0], dist, key,power, ocolor, pl[i][2]))
debug( name,"rms_noise output:{}".format(new_list))
return new_list
try:
while True:
refreshRedis()
start = time.time()
line = sys.stdin.readline()
if line == "":
time.sleep(0.01)
line = line.rstrip('\n')
pointsList = ast.literal_eval(line)
# Do the filter
pointsList = default(pointsList)
print( pointsList, flush=True )
looptime = time.time() - start
# debug(name+" looptime:"+str(looptime))
if( looptime < optimal_looptime ):
time.sleep( optimal_looptime - looptime)
# debug(name+" micro sleep:"+str( optimal_looptime - looptime))
except EOFError:
debug(name+" break")# no more information