LJ/libs3/tracer_helios.py

122 lines
3.8 KiB
Python
Raw Permalink Normal View History

2023-02-24 12:37:49 +00:00
import ctypes
2023-02-25 21:18:32 +00:00
import math
2023-02-25 09:07:20 +00:00
import random
2023-02-24 12:37:49 +00:00
from libs3 import gstt
from libs3 import homographyp
from .tracer_common import Tracer, OnePointIterator, ProtocolError, Status
import numpy as np
from pathlib import Path
# Define point structure
class HeliosPoint(ctypes.Structure):
# _pack_=1
_fields_ = [('x', ctypes.c_uint16),
('y', ctypes.c_uint16),
('r', ctypes.c_uint8),
('g', ctypes.c_uint8),
('b', ctypes.c_uint8),
('i', ctypes.c_uint8)]
# Load and initialize library
so_path = Path(__file__).absolute().parent.joinpath("libHeliosDacAPI.so")
HeliosLib = ctypes.cdll.LoadLibrary(so_path)
numDevices = HeliosLib.OpenDevices()
print("Found ", numDevices, "Helios DACs")
2023-06-29 20:44:21 +00:00
2023-02-24 12:37:49 +00:00
class TracerHelios(Tracer):
"""A connection to a DAC."""
def __init__(self, laser_id, PL, redis):
self.redis = redis
self.laser_id = laser_id
self.PL = PL
2023-02-25 21:18:32 +00:00
self.pl = [[0, 0, 0]]
2023-02-24 12:37:49 +00:00
self.clientkey = self.redis.get("/clientkey").decode('ascii')
self.xyrgb = self.xyrgb_prev = (0, 0, 0, 0, 0)
self.intensity = 65280
self.intred = 100
self.intgreen = 100
self.intblue = 100
self.prev_x = 0
self.prev_y = 0
2023-06-29 20:44:21 +00:00
self.min_res = 0
self.max_res = 4095
2023-02-24 12:37:49 +00:00
# self.newstream = OnePointIterator()
# "Laser point List" Point generator
# each points is yielded : Getpoints() call n times OnePoint()
pass
2023-06-29 20:44:21 +00:00
def clip(self, number):
return int( self.min_res if number < self.min_res else self.max_res if number > self.max_res else number)
2023-02-24 12:37:49 +00:00
def get_points_capacity(self):
2023-02-25 21:18:32 +00:00
return 1000
2023-02-24 12:37:49 +00:00
# def GetPoints(self, capacity):
# a = [2,3]
# return a
def prepare(self):
return True
2023-02-25 09:07:20 +00:00
def begin(self, n, kpps):
2023-02-24 12:37:49 +00:00
return True
def get_status(self):
""" Return 0 if not ready (playing), 1 if ready to receive new frame,-1 if communication failed """
# va chercher dans le helios et renvoie la normalisée
status = HeliosLib.GetStatus(0)
2023-02-25 09:07:20 +00:00
if status == 0:
return self.lstate["2"] # playing
if status == 1:
return self.lstate["0"] # ready
if status == -1:
return self.lstate["64"] # no connection
2023-02-24 12:37:49 +00:00
def set_status(self, status: int):
return
def before_loop(self):
return True
def write(self, points):
2023-02-25 09:07:20 +00:00
frame_type = HeliosPoint * self.get_points_capacity()
frame = frame_type()
helios_id = 0
points = [point for point in points]
for i, point in enumerate(points):
2023-02-25 14:29:37 +00:00
x, y, r, g, b = point
2023-06-29 20:44:21 +00:00
x *= 10
y *= 10
x = 0 if math.isnan(x) else self.clip(x)
y = 0 if math.isnan(y) else self.clip(y)
2023-02-25 09:07:20 +00:00
frame[i] = HeliosPoint(int(x), int(y), int(r), int(g), int(b), 255)
statusAttempts = 0
# Make 512 attempts for DAC status to be ready. After that, just give up and try to write the frame anyway
while (statusAttempts < 512 and HeliosLib.GetStatus(helios_id) != 1):
statusAttempts += 1
f = ctypes.pointer(frame)
# int HeliosDac::WriteFrame(unsigned int devNum, unsigned int pps, std::uint8_t flags, HeliosPoint* points, unsigned int numOfPoints)
2023-06-29 20:44:21 +00:00
# ret_helios = HeliosLib.WriteFrame(0, 3000, 0, f, len(frame))
# # @todo : detect errors
# if ret_helios != 1:
# print(f"ERR ]Helios DAC #{self.laser_id} returned error {ret_helios}")
2023-02-25 09:07:20 +00:00
def get_warped_point(self, x, y):
2023-02-24 12:37:49 +00:00
# transform in one matrix, with warp !!
# Etherpoint all transform in one matrix, with warp !!
2023-02-25 21:18:32 +00:00
np_arr = np.array([(x, y, 0)])
2023-06-29 20:44:21 +00:00
laser_edh = gstt.EDH[self.laser_id]
position = homographyp.apply(laser_edh, np_arr)
2023-02-25 21:18:32 +00:00
return position[0][0], position[0][1]