From 8e10e0d82ed43fbfe72b40284594adbba572d2a0 Mon Sep 17 00:00:00 2001 From: alban Date: Wed, 19 Jul 2023 21:39:25 +0200 Subject: [PATCH 1/2] fix: /etherdream some cleanup and better error management --- examples/populate_redis.rs | 34 ++++++---- src/device/etherdream.rs | 134 ++++++++++++++++--------------------- src/point.rs | 1 - 3 files changed, 78 insertions(+), 91 deletions(-) diff --git a/examples/populate_redis.rs b/examples/populate_redis.rs index cd95e77..a1d43fe 100644 --- a/examples/populate_redis.rs +++ b/examples/populate_redis.rs @@ -1,24 +1,34 @@ -/// -/// $ cargo run --example populate_redis -/// +use std::io::_print; +/** + +# Populate Redis Example + +**This script simulates the redis content provided by the LJ Python / web tool** + +$ cargo run --example populate_redis + **/ use redis::{ - //RedisResult, - Client, - Commands, - Connection, + //RedisResult, + Client, + Commands, + Connection, }; fn do_something() -> redis::RedisResult<()> { - let client = Client::open("redis://127.0.0.1/")?; - let mut con: Connection = client.get_connection()?; - + let client = Client::open("redis://127.0.0.1/")?; + let mut con: Connection = client.get_connection()?; let _ = con.set("/clientkey", "/pl/0/")?; let _ = con.set("/EDH/0", "[[1.0, 0.0, 0.0],\n [ 0.0, 1.0, 0.0],\n [ 0.0, 0.0, 1.0]]")?; let _ = con.set("/kpps/0", "5000")?; let _ = con.set("/intensity/0", "255")?; - Ok(()) + let _ = con.set("/pl/0/0", "[(-300, 300, 0), (-300, -300, 65280), (300, -300, 65280), (300, 300, 65280), (-300, 300, 65280)]")?; + Ok(()) } + fn main() { - _ = do_something(); + match do_something() { + Err(err) => println!("Something wrong occured: {:?}", err), + Ok(..) => println!("Successfully inserted content in Redis") + } } diff --git a/src/device/etherdream.rs b/src/device/etherdream.rs index d5bc4e8..86b6a6b 100644 --- a/src/device/etherdream.rs +++ b/src/device/etherdream.rs @@ -1,42 +1,48 @@ use std::time; use std::net::SocketAddr; -use ether_dream::dac::stream::connect; +use ether_dream::dac::stream::{CommunicationError, connect}; use ether_dream::dac::{Playback, Stream}; +use chrono::{DateTime, Utc}; +use std::time::SystemTime; use crate::conf::EtherDreamConf; use crate::device::{Device, Status, PlaybackState}; use crate::errors::{LJError, LJResult}; use crate::point::{Color, Point}; -use ether_dream::protocol::{DacBroadcast, DacPoint}; +use ether_dream::protocol::{DacBroadcast, DacPoint, DacResponse}; use log::{debug, info, warn}; #[warn(dead_code)] pub struct EtherdreamDevice { pub conf: EtherDreamConf, dac: DacBroadcast, - // source_address: SocketAddr, stream: Stream, - // sent_points: u16, - lack: String, - last_traced_at: String, -} + // "a": ACK "F": Full "I": invalid. 64 or 35 for no connection. + // /// The previous command was accepted. + // pub const ACK: u8 = 0x61; + // /// The write command could not be performed because there was not enough buffer space when it + // /// was received. + // pub const NAK_FULL: u8 = 0x46; + // /// The command contained an invalid `command` byte or parameters. + // pub const NAK_INVALID: u8 = 0x49; + // /// An emergency-stop condition still exists. + // pub const NAK_STOP_CONDITION: u8 = 0x21; + // } + dac_response: u8, +} impl EtherdreamDevice { pub fn new(conf: &EtherDreamConf) -> LJResult { - let (dac, _source_address, stream) = EtherdreamDevice::get_dac(conf)?; - + let (dac, _source_address, stream) = EtherdreamDevice::connect(conf)?; Ok(Self { conf: (*conf).clone(), dac, - // source_address, stream, - // sent_points: 0, - lack: "".to_string(), - last_traced_at: "1985-04-12T23:20:50.52Z".to_string(), + dac_response: DacResponse::ACK, }) } - pub fn get_dac(conf: &EtherDreamConf) -> LJResult<(DacBroadcast, SocketAddr, Stream)> { + fn connect(conf: &EtherDreamConf) -> LJResult<(DacBroadcast, SocketAddr, Stream)> { let ip = &conf.ip; let dac_broadcast = ether_dream::recv_dac_broadcasts()?; dac_broadcast.set_timeout(Some(time::Duration::new(10, 0)))?; @@ -72,75 +78,34 @@ impl EtherdreamDevice { } } - pub fn get_tcp_stream(dac: &DacBroadcast, source_address: &SocketAddr) -> LJResult { - // Establish the TCP connection. + fn get_tcp_stream(dac: &DacBroadcast, source_address: &SocketAddr) -> LJResult { let mut stream = connect(dac, source_address.ip())?; - EtherdreamDevice::prepare_tcp_stream(&mut stream).unwrap(); - Ok(stream) - } - - fn prepare_tcp_stream(stream: &mut Stream) -> LJResult<()> { - - // Prepare stream match stream .queue_commands() .prepare_stream() .submit() { - Err(err) => { - warn!( - "err occurred when submitting PREPARE_STREAM command and listening for response: {}", - err - ); - } - Ok(_) => { - info!("Prepared Stream.") - } + Err(err) => warn!("err occurred when submitting PREPARE_STREAM command and listening for response: {}",err), + Ok(_) => info!("Prepared Stream.") } - - let control = 0; - let (u1, u2) = (0, 0); - let i = 255; - let point = DacPoint { - control, - x: 0, - y: 0, - i, - r: 0, - g: 0, - b: 0, - u1, - u2, - }; - let begin_list = vec![point]; + let begin_list = vec![ + DacPoint { control: 0, x: 0, y: 0, i: 255, r: 0, g: 0, b: 0, u1: 0, u2: 0 }, + ]; let points_per_second = stream.dac().max_point_rate / 32; match stream .queue_commands() .data(begin_list.into_iter().take(1 as usize)) .begin(0, points_per_second) .submit() { - Err(err) => { - warn!( - "err occurred when submitting first data: {}", - err - ); - } - Ok(_) => { - info!("Sent first data to Etherdream.") - } + Err(err) => warn!("err occurred when submitting first data: {}",err), + Ok(_) => info!("Sent first data to Etherdream.") } - - Ok(()) + Ok(stream) } - - pub fn check_tcp_stream(&mut self) -> LJResult<()> { - // todo Reinit stream if needed - // self.stream = EtherdreamDevice::get_tcp_stream(&self.dac, &self.source_address)? - Ok(()) - } - - // Determine the number of points needed to fill the DAC. fn points_capacity(&self) -> u16 { + /*** + Determine the number of points needed to fill the DAC. + ***/ // Fixme thread 'main' panicked at 'attempt to subtract with overflow', src/device/etherdream.rs:144:24 let n_points = self.dac.buffer_capacity as u16 - self.stream.dac().dac.status.buffer_fullness as u16 - 1; n_points @@ -149,26 +114,25 @@ impl EtherdreamDevice { impl Device for EtherdreamDevice { fn status(&mut self) -> Status { - let _ = self.check_tcp_stream(); - - // "a": ACK "F": Full "I": invalid. 64 or 35 for no connection. let playback_state = match self.stream.dac().dac.status.playback { Playback::Idle => PlaybackState::IDLE, Playback::Prepared => PlaybackState::PREPARE, Playback::Playing => PlaybackState::PLAYING, }; + let now = SystemTime::now(); + let now: DateTime = now.into(); + let now = now.to_rfc3339(); let status = Status { - last_traced_at: self.last_traced_at.clone(), + last_traced_at: now, properties: vec!["foo".to_string()], playback_state, capacity: self.points_capacity(), - lack: String::from(&self.lack), + lack: self.dac_response.to_string(), }; // info!("Dac Status: {:?} ", status ); // info!("Etherdream Dac {:?} ", self.dac ); // info!("Stream dac{:?}", self.stream.dac()); - status } @@ -179,27 +143,41 @@ impl Device for EtherdreamDevice { let n_points = self.points_capacity(); // let n_points = &line.len(); debug!("Etherdream::device draw Generating {:?} points", n_points); - return match self.stream + match self.stream .queue_commands() .data( line.into_iter() .map(|point| point.into()) // .take(line.len() as usize) - .take(n_points as usize ) + .take(n_points as usize) ) .submit() { Err(err) => { // We should account for // 'Broken pipe (os error 32)' // Connection reset by peer (os error 104) - warn!("Draw error: '{}'",err); - Ok(()) + self.dac_response = match err { + CommunicationError::Io(err) => { + warn!("IO ERROR while drawing: '{}'",err); + DacResponse::ACK + } + CommunicationError::Protocol(err) => { + warn!("Protocol ERROR while drawing: '{}'",err); + DacResponse::ACK + } + CommunicationError::Response(err) => { + warn!("Response ERROR while drawing: '{}'",err); + err.response.response + } + }; + } Ok(_) => { + self.dac_response = DacResponse::ACK; debug!("Draw is ok"); - Ok(()) } }; + Ok(()) } fn stop(&mut self) -> LJResult<()> { diff --git a/src/point.rs b/src/point.rs index d060fd5..1298905 100644 --- a/src/point.rs +++ b/src/point.rs @@ -57,7 +57,6 @@ impl From for helios_dac::Point { impl From for DacPoint { fn from(pt: Point) -> DacPoint { - print!("."); let control = 0; let (u1, u2) = (0, 0); let i = 255; From 4719dcc4305fa85f6b668dd856a6017e8ae9df37 Mon Sep 17 00:00:00 2001 From: alban Date: Thu, 20 Jul 2023 00:54:24 +0200 Subject: [PATCH 2/2] feat: more dispatch actions and cosmetic --- src/main.rs | 18 +++++-------- src/redis_ctrl.rs | 69 ++++++++++++++++++++++++++--------------------- src/worldstate.rs | 4 ++- 3 files changed, 49 insertions(+), 42 deletions(-) diff --git a/src/main.rs b/src/main.rs index 2624273..62067a8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -65,19 +65,13 @@ fn run_all() -> LJResult<()> { let mut tracer = device_factory(&config)?; world_state.grid = tracer.grid(); - // can't work, but we can add + Debug to Device to make it work... - //dbg!(tracer); - // Setup geometry transformers on points lists let transformers = config.get_transformers(); // Dispatch based on redis requests while running.load(Ordering::SeqCst) { rs.set_status(tracer.status())?; - let order = rs.get_order(config.laser_id)?; - - match order { Order::Draw | Order::Black | Order::Grid => { // 0 : Draw Normal point list @@ -110,11 +104,14 @@ fn run_all() -> LJResult<()> { Order::ClientKey => { world_state.client_key = rs.get_client_key()?; } - // Order::ColorBalance => {}, + Order::ColorBalance => { + let (r, g, b) = rs.get_color_balance()?; + world_state.color_balance = Color { r, g, b }; + } + Order::Resampler => { + world_state.resampler = rs.get_resampler()?; + } _ => { - // 4 : Resampler Change (longs and shorts lsteps) - // 5 : Client Key Change = reread redis key /clientkey - // 8 : color balance change = reread redis keys /red /green /blue // 9 : poweroff LJ info!("Order: {:?}", order); } @@ -128,7 +125,6 @@ fn run_all() -> LJResult<()> { fn init_logging(config: &LJResult) { if let Ok(ref config) = config { - let level = if config.debug { LevelFilter::Debug } else { diff --git a/src/redis_ctrl.rs b/src/redis_ctrl.rs index 74ac5a0..9b4bb4a 100644 --- a/src/redis_ctrl.rs +++ b/src/redis_ctrl.rs @@ -2,7 +2,7 @@ use redis::{Client, Commands, Connection}; use ron::de::from_str; use crate::device::Status; use crate::errors::{LJError, LJResult}; -use crate::worldstate::{WorldState,EDH}; +use crate::worldstate::{WorldState, EDH}; // use log::info; #[repr(u8)] @@ -10,7 +10,6 @@ use crate::worldstate::{WorldState,EDH}; pub enum Order { Draw = 0, Edh, - //homography Black, Grid, Resampler, @@ -46,6 +45,7 @@ impl TryFrom for Order { } pub type Line = Vec<(f32, f32, u32)>; +pub type Resampler = Vec<(f32,f32)>; pub struct RedisCtrl { pub client: Client, @@ -84,11 +84,6 @@ impl RedisCtrl { Ok(val.try_into()?) } - /** - /lstt/lasernumber etherdream last_status.playback_state (0: idle 1: prepare 2: playing) - /cap/lasernumber number of empty points sent to fill etherdream buffer (up to 1799) - /lack/lasernumber "a": ACK "F": Full "I": invalid. 64 or 35 for no connection. - **/ pub fn set_status(&mut self, status: Status) -> LJResult<()> { let lstt_key = format!("/lstt/{}", self.laser_id); let cap_key = format!("/cap/{}", self.laser_id); @@ -99,34 +94,48 @@ impl RedisCtrl { Ok(()) } - pub fn init_world_state( &mut self) -> LJResult{ - Ok(WorldState { - client_key: self.get_client_key().unwrap(), - edh: self.get_edh().unwrap(), - kpps: self.get_int("kpps").unwrap().try_into().unwrap(), - intensity: self.get_int("intensity").unwrap().try_into().unwrap(), - ..WorldState::default() - }) + pub fn init_world_state(&mut self) -> LJResult { + Ok(WorldState { + client_key: self.get_client_key().unwrap(), + edh: self.get_edh().unwrap(), + kpps: self.get_int("kpps").unwrap().try_into().unwrap(), + intensity: self.get_int("intensity").unwrap().try_into().unwrap(), + ..WorldState::default() + }) } - pub fn get_edh( &mut self ) -> LJResult { - // Get new EDH - let edh_key = format!("/EDH/{}", self.laser_id); - let edh : String = self.connection.get(edh_key)?; - let edh : Vec> = from_str(&edh)?; - let edh = EDH::new(edh)?; - Ok(edh) + pub fn get_edh(&mut self) -> LJResult { + // Get new EDH + let edh_key = format!("/EDH/{}", self.laser_id); + let edh: String = self.connection.get(edh_key)?; + let edh: Vec> = from_str(&edh)?; + let edh = EDH::new(edh)?; + Ok(edh) } - pub fn get_client_key( &mut self ) -> LJResult { - let key : String = self.connection.get("/clientkey")?; - Ok(key) + pub fn get_client_key(&mut self) -> LJResult { + let key: String = self.connection.get("/clientkey")?; + Ok(key) } - pub fn get_int(&mut self, key: &str ) -> LJResult { - // Get new Int - let fmt = format!("/{key}/{}", self.laser_id); - let val : u32 = self.connection.get(fmt)?; - Ok(val) + pub fn get_color_balance(&mut self) -> LJResult<(u8, u8, u8)> { + Ok(( + self.connection.get("/red")?, + self.connection.get("/green")?, + self.connection.get("/blue")?, + )) + } + + pub fn get_resampler(&mut self ) -> LJResult { + let val: String = self.connection.get(format!("/resampler/{}", self.laser_id))?; + let resampler : Resampler = from_str(&val)?; + Ok(resampler) + } + + pub fn get_int(&mut self, key: &str) -> LJResult { + // Get new Int + let fmt = format!("/{key}/{}", self.laser_id); + let val: u32 = self.connection.get(fmt)?; + Ok(val) } } diff --git a/src/worldstate.rs b/src/worldstate.rs index e03edb1..4c1366c 100644 --- a/src/worldstate.rs +++ b/src/worldstate.rs @@ -2,6 +2,7 @@ use crate::point::{Point, Color}; use nalgebra::base::{Matrix3, Matrix1x3}; use crate::errors::{LJError, LJResult}; use log::debug; +use crate::redis_ctrl::Resampler; #[derive(Debug, Default)] pub struct EDH { @@ -39,7 +40,7 @@ impl EDH { #[derive(Debug, Default)] pub struct WorldState { pub edh: EDH, - pub resampler: Vec, + pub resampler: Resampler, pub client_key: String, pub intensity: u8, pub kpps: u32, @@ -47,6 +48,7 @@ pub struct WorldState { pub draw_black: bool, pub draw_grid: bool, pub grid: Vec, + pub color_balance: Color, } impl WorldState {}