import ctypes from libs3 import gstt from libs3 import homographyp from .tracer_common import Tracer, OnePointIterator, ProtocolError, Status import numpy as np from pathlib import Path # Define point structure class HeliosPoint(ctypes.Structure): # _pack_=1 _fields_ = [('x', ctypes.c_uint16), ('y', ctypes.c_uint16), ('r', ctypes.c_uint8), ('g', ctypes.c_uint8), ('b', ctypes.c_uint8), ('i', ctypes.c_uint8)] # Load and initialize library so_path = Path(__file__).absolute().parent.joinpath("libHeliosDacAPI.so") HeliosLib = ctypes.cdll.LoadLibrary(so_path) numDevices = HeliosLib.OpenDevices() print("Found ", numDevices, "Helios DACs") class TracerHelios(Tracer): """A connection to a DAC.""" def __init__(self, laser_id, PL, redis): self.redis = redis self.laser_id = laser_id self.PL = PL self.pl = [[0,0,0]] self.clientkey = self.redis.get("/clientkey").decode('ascii') self.xyrgb = self.xyrgb_prev = (0, 0, 0, 0, 0) self.intensity = 65280 self.intred = 100 self.intgreen = 100 self.intblue = 100 self.prev_x = 0 self.prev_y = 0 # self.newstream = OnePointIterator() # "Laser point List" Point generator # each points is yielded : Getpoints() call n times OnePoint() pass def get_points_capacity(self): return 30000 # def GetPoints(self, capacity): # a = [2,3] # return a def prepare(self): return True def begin(self, n, kpps): return True def get_status(self): """ Return 0 if not ready (playing), 1 if ready to receive new frame,-1 if communication failed """ # va chercher dans le helios et renvoie la normalisée status = HeliosLib.GetStatus(0) if status == 0 : return self.lstate["2"] # playing if status == 1 : return self.lstate["0"] # ready if status == -1 : return self.lstate["64"] # no connection def set_status(self, status: int): return def before_loop(self): return True def write(self, points): # status_attempts = 0 # j = 0 # while (status_attempts < 512 and HeliosLib.GetStatus(j) != 1): # status_attempts += 1 # print("attempt {}".format(status_attempts)) # HeliosLib.WriteFrame(j, 3000, 0, ctypes.pointer(frames[i % 30]), 1000) # Send the frame try : points = [ *i for i in items ] frames = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] frame_type = HeliosPoint * 1000 x = 0 y = 0 for i in range(30): y = round(i * 0xFFF / 30) frames[i] = frame_type() for j in range(1000): if (j < 500): x = round(j * 0xFFF / 500) else: x = round(0xFFF - ((j - 500) * 0xFFF / 500)) frames[i][j] = HeliosPoint(int(x), int(y), 255, 255, 255, 255) for i in range(150): for j in range(numDevices): statusAttempts = 0 # Make 512 attempts for DAC status to be ready. After that, just give up and try to write the frame anyway while (statusAttempts < 512 and HeliosLib.GetStatus(j) != 1): statusAttempts += 1 HeliosLib.WriteFrame(j, 30000, 0, ctypes.pointer(frames[i % 30]), 1000) # Send the frame except Exception as exc : print (exc) def get_warped_point(self, x, y ): # transform in one matrix, with warp !! # Etherpoint all transform in one matrix, with warp !! # position = homographyp.apply(gstt.EDH[self.laser_id], np.array([(x, y)])) return x, y # return position[0][0], position[0][1] # # # Create sample frames # frames = [0 for x in range(30)] # frameType = HeliosPoint * 1000 # x = 0 # y = 0 # for i in range(30): # y = round(i * 0xFFF / 30) # frames[i] = frameType() # for j in range(1000): # if (j < 500): # x = round(j * 0xFFF / 500) # else: # x = round(0xFFF - ((j - 500) * 0xFFF / 500)) # # frames[i][j] = HeliosPoint(int(x), int(y), 255, 255, 255, 255) # # # Play frames on DAC # for i in range(150): # for j in range(numDevices): # statusAttempts = 0 # # Make 512 attempts for DAC status to be ready. After that, just give up and try to write the frame anyway # while (statusAttempts < 512 and HeliosLib.GetStatus(j) != 1): # statusAttempts += 1 # HeliosLib.WriteFrame(j, 30000, 0, ctypes.pointer(frames[i % 30]), 1000) # Send the frame # # HeliosLib.CloseDevices()