diff --git a/examples/populate_redis.rs b/examples/populate_redis.rs index a1d43fe..cd95e77 100644 --- a/examples/populate_redis.rs +++ b/examples/populate_redis.rs @@ -1,34 +1,24 @@ -use std::io::_print; -/** - -# Populate Redis Example - -**This script simulates the redis content provided by the LJ Python / web tool** - -$ cargo run --example populate_redis - **/ +/// +/// $ cargo run --example populate_redis +/// use redis::{ - //RedisResult, - Client, - Commands, - Connection, + //RedisResult, + Client, + Commands, + Connection, }; fn do_something() -> redis::RedisResult<()> { - let client = Client::open("redis://127.0.0.1/")?; - let mut con: Connection = client.get_connection()?; + let client = Client::open("redis://127.0.0.1/")?; + let mut con: Connection = client.get_connection()?; + let _ = con.set("/clientkey", "/pl/0/")?; let _ = con.set("/EDH/0", "[[1.0, 0.0, 0.0],\n [ 0.0, 1.0, 0.0],\n [ 0.0, 0.0, 1.0]]")?; let _ = con.set("/kpps/0", "5000")?; let _ = con.set("/intensity/0", "255")?; - let _ = con.set("/pl/0/0", "[(-300, 300, 0), (-300, -300, 65280), (300, -300, 65280), (300, 300, 65280), (-300, 300, 65280)]")?; - Ok(()) + Ok(()) } - fn main() { - match do_something() { - Err(err) => println!("Something wrong occured: {:?}", err), - Ok(..) => println!("Successfully inserted content in Redis") - } + _ = do_something(); } diff --git a/src/device/etherdream.rs b/src/device/etherdream.rs index 86b6a6b..d5bc4e8 100644 --- a/src/device/etherdream.rs +++ b/src/device/etherdream.rs @@ -1,48 +1,42 @@ use std::time; use std::net::SocketAddr; -use ether_dream::dac::stream::{CommunicationError, connect}; +use ether_dream::dac::stream::connect; use ether_dream::dac::{Playback, Stream}; -use chrono::{DateTime, Utc}; -use std::time::SystemTime; use crate::conf::EtherDreamConf; use crate::device::{Device, Status, PlaybackState}; use crate::errors::{LJError, LJResult}; use crate::point::{Color, Point}; -use ether_dream::protocol::{DacBroadcast, DacPoint, DacResponse}; +use ether_dream::protocol::{DacBroadcast, DacPoint}; use log::{debug, info, warn}; #[warn(dead_code)] pub struct EtherdreamDevice { pub conf: EtherDreamConf, dac: DacBroadcast, + // source_address: SocketAddr, stream: Stream, - - // "a": ACK "F": Full "I": invalid. 64 or 35 for no connection. - // /// The previous command was accepted. - // pub const ACK: u8 = 0x61; - // /// The write command could not be performed because there was not enough buffer space when it - // /// was received. - // pub const NAK_FULL: u8 = 0x46; - // /// The command contained an invalid `command` byte or parameters. - // pub const NAK_INVALID: u8 = 0x49; - // /// An emergency-stop condition still exists. - // pub const NAK_STOP_CONDITION: u8 = 0x21; - // } - dac_response: u8, + // sent_points: u16, + lack: String, + last_traced_at: String, } + impl EtherdreamDevice { pub fn new(conf: &EtherDreamConf) -> LJResult { - let (dac, _source_address, stream) = EtherdreamDevice::connect(conf)?; + let (dac, _source_address, stream) = EtherdreamDevice::get_dac(conf)?; + Ok(Self { conf: (*conf).clone(), dac, + // source_address, stream, - dac_response: DacResponse::ACK, + // sent_points: 0, + lack: "".to_string(), + last_traced_at: "1985-04-12T23:20:50.52Z".to_string(), }) } - fn connect(conf: &EtherDreamConf) -> LJResult<(DacBroadcast, SocketAddr, Stream)> { + pub fn get_dac(conf: &EtherDreamConf) -> LJResult<(DacBroadcast, SocketAddr, Stream)> { let ip = &conf.ip; let dac_broadcast = ether_dream::recv_dac_broadcasts()?; dac_broadcast.set_timeout(Some(time::Duration::new(10, 0)))?; @@ -78,34 +72,75 @@ impl EtherdreamDevice { } } - fn get_tcp_stream(dac: &DacBroadcast, source_address: &SocketAddr) -> LJResult { + pub fn get_tcp_stream(dac: &DacBroadcast, source_address: &SocketAddr) -> LJResult { + // Establish the TCP connection. let mut stream = connect(dac, source_address.ip())?; + EtherdreamDevice::prepare_tcp_stream(&mut stream).unwrap(); + Ok(stream) + } + + fn prepare_tcp_stream(stream: &mut Stream) -> LJResult<()> { + + // Prepare stream match stream .queue_commands() .prepare_stream() .submit() { - Err(err) => warn!("err occurred when submitting PREPARE_STREAM command and listening for response: {}",err), - Ok(_) => info!("Prepared Stream.") + Err(err) => { + warn!( + "err occurred when submitting PREPARE_STREAM command and listening for response: {}", + err + ); + } + Ok(_) => { + info!("Prepared Stream.") + } } - let begin_list = vec![ - DacPoint { control: 0, x: 0, y: 0, i: 255, r: 0, g: 0, b: 0, u1: 0, u2: 0 }, - ]; + + let control = 0; + let (u1, u2) = (0, 0); + let i = 255; + let point = DacPoint { + control, + x: 0, + y: 0, + i, + r: 0, + g: 0, + b: 0, + u1, + u2, + }; + let begin_list = vec![point]; let points_per_second = stream.dac().max_point_rate / 32; match stream .queue_commands() .data(begin_list.into_iter().take(1 as usize)) .begin(0, points_per_second) .submit() { - Err(err) => warn!("err occurred when submitting first data: {}",err), - Ok(_) => info!("Sent first data to Etherdream.") + Err(err) => { + warn!( + "err occurred when submitting first data: {}", + err + ); + } + Ok(_) => { + info!("Sent first data to Etherdream.") + } } - Ok(stream) + + Ok(()) } + + pub fn check_tcp_stream(&mut self) -> LJResult<()> { + // todo Reinit stream if needed + // self.stream = EtherdreamDevice::get_tcp_stream(&self.dac, &self.source_address)? + Ok(()) + } + + // Determine the number of points needed to fill the DAC. fn points_capacity(&self) -> u16 { - /*** - Determine the number of points needed to fill the DAC. - ***/ // Fixme thread 'main' panicked at 'attempt to subtract with overflow', src/device/etherdream.rs:144:24 let n_points = self.dac.buffer_capacity as u16 - self.stream.dac().dac.status.buffer_fullness as u16 - 1; n_points @@ -114,25 +149,26 @@ impl EtherdreamDevice { impl Device for EtherdreamDevice { fn status(&mut self) -> Status { + let _ = self.check_tcp_stream(); + + // "a": ACK "F": Full "I": invalid. 64 or 35 for no connection. let playback_state = match self.stream.dac().dac.status.playback { Playback::Idle => PlaybackState::IDLE, Playback::Prepared => PlaybackState::PREPARE, Playback::Playing => PlaybackState::PLAYING, }; - let now = SystemTime::now(); - let now: DateTime = now.into(); - let now = now.to_rfc3339(); let status = Status { - last_traced_at: now, + last_traced_at: self.last_traced_at.clone(), properties: vec!["foo".to_string()], playback_state, capacity: self.points_capacity(), - lack: self.dac_response.to_string(), + lack: String::from(&self.lack), }; // info!("Dac Status: {:?} ", status ); // info!("Etherdream Dac {:?} ", self.dac ); // info!("Stream dac{:?}", self.stream.dac()); + status } @@ -143,41 +179,27 @@ impl Device for EtherdreamDevice { let n_points = self.points_capacity(); // let n_points = &line.len(); debug!("Etherdream::device draw Generating {:?} points", n_points); - match self.stream + return match self.stream .queue_commands() .data( line.into_iter() .map(|point| point.into()) // .take(line.len() as usize) - .take(n_points as usize) + .take(n_points as usize ) ) .submit() { Err(err) => { // We should account for // 'Broken pipe (os error 32)' // Connection reset by peer (os error 104) - self.dac_response = match err { - CommunicationError::Io(err) => { - warn!("IO ERROR while drawing: '{}'",err); - DacResponse::ACK - } - CommunicationError::Protocol(err) => { - warn!("Protocol ERROR while drawing: '{}'",err); - DacResponse::ACK - } - CommunicationError::Response(err) => { - warn!("Response ERROR while drawing: '{}'",err); - err.response.response - } - }; - + warn!("Draw error: '{}'",err); + Ok(()) } Ok(_) => { - self.dac_response = DacResponse::ACK; debug!("Draw is ok"); + Ok(()) } }; - Ok(()) } fn stop(&mut self) -> LJResult<()> { diff --git a/src/main.rs b/src/main.rs index 62067a8..2624273 100644 --- a/src/main.rs +++ b/src/main.rs @@ -65,13 +65,19 @@ fn run_all() -> LJResult<()> { let mut tracer = device_factory(&config)?; world_state.grid = tracer.grid(); + // can't work, but we can add + Debug to Device to make it work... + //dbg!(tracer); + // Setup geometry transformers on points lists let transformers = config.get_transformers(); // Dispatch based on redis requests while running.load(Ordering::SeqCst) { rs.set_status(tracer.status())?; + let order = rs.get_order(config.laser_id)?; + + match order { Order::Draw | Order::Black | Order::Grid => { // 0 : Draw Normal point list @@ -104,14 +110,11 @@ fn run_all() -> LJResult<()> { Order::ClientKey => { world_state.client_key = rs.get_client_key()?; } - Order::ColorBalance => { - let (r, g, b) = rs.get_color_balance()?; - world_state.color_balance = Color { r, g, b }; - } - Order::Resampler => { - world_state.resampler = rs.get_resampler()?; - } + // Order::ColorBalance => {}, _ => { + // 4 : Resampler Change (longs and shorts lsteps) + // 5 : Client Key Change = reread redis key /clientkey + // 8 : color balance change = reread redis keys /red /green /blue // 9 : poweroff LJ info!("Order: {:?}", order); } @@ -125,6 +128,7 @@ fn run_all() -> LJResult<()> { fn init_logging(config: &LJResult) { if let Ok(ref config) = config { + let level = if config.debug { LevelFilter::Debug } else { diff --git a/src/point.rs b/src/point.rs index 1298905..d060fd5 100644 --- a/src/point.rs +++ b/src/point.rs @@ -57,6 +57,7 @@ impl From for helios_dac::Point { impl From for DacPoint { fn from(pt: Point) -> DacPoint { + print!("."); let control = 0; let (u1, u2) = (0, 0); let i = 255; diff --git a/src/redis_ctrl.rs b/src/redis_ctrl.rs index 9b4bb4a..74ac5a0 100644 --- a/src/redis_ctrl.rs +++ b/src/redis_ctrl.rs @@ -2,7 +2,7 @@ use redis::{Client, Commands, Connection}; use ron::de::from_str; use crate::device::Status; use crate::errors::{LJError, LJResult}; -use crate::worldstate::{WorldState, EDH}; +use crate::worldstate::{WorldState,EDH}; // use log::info; #[repr(u8)] @@ -10,6 +10,7 @@ use crate::worldstate::{WorldState, EDH}; pub enum Order { Draw = 0, Edh, + //homography Black, Grid, Resampler, @@ -45,7 +46,6 @@ impl TryFrom for Order { } pub type Line = Vec<(f32, f32, u32)>; -pub type Resampler = Vec<(f32,f32)>; pub struct RedisCtrl { pub client: Client, @@ -84,6 +84,11 @@ impl RedisCtrl { Ok(val.try_into()?) } + /** + /lstt/lasernumber etherdream last_status.playback_state (0: idle 1: prepare 2: playing) + /cap/lasernumber number of empty points sent to fill etherdream buffer (up to 1799) + /lack/lasernumber "a": ACK "F": Full "I": invalid. 64 or 35 for no connection. + **/ pub fn set_status(&mut self, status: Status) -> LJResult<()> { let lstt_key = format!("/lstt/{}", self.laser_id); let cap_key = format!("/cap/{}", self.laser_id); @@ -94,48 +99,34 @@ impl RedisCtrl { Ok(()) } - pub fn init_world_state(&mut self) -> LJResult { - Ok(WorldState { - client_key: self.get_client_key().unwrap(), - edh: self.get_edh().unwrap(), - kpps: self.get_int("kpps").unwrap().try_into().unwrap(), - intensity: self.get_int("intensity").unwrap().try_into().unwrap(), - ..WorldState::default() - }) + pub fn init_world_state( &mut self) -> LJResult{ + Ok(WorldState { + client_key: self.get_client_key().unwrap(), + edh: self.get_edh().unwrap(), + kpps: self.get_int("kpps").unwrap().try_into().unwrap(), + intensity: self.get_int("intensity").unwrap().try_into().unwrap(), + ..WorldState::default() + }) } - pub fn get_edh(&mut self) -> LJResult { - // Get new EDH - let edh_key = format!("/EDH/{}", self.laser_id); - let edh: String = self.connection.get(edh_key)?; - let edh: Vec> = from_str(&edh)?; - let edh = EDH::new(edh)?; - Ok(edh) + pub fn get_edh( &mut self ) -> LJResult { + // Get new EDH + let edh_key = format!("/EDH/{}", self.laser_id); + let edh : String = self.connection.get(edh_key)?; + let edh : Vec> = from_str(&edh)?; + let edh = EDH::new(edh)?; + Ok(edh) } - pub fn get_client_key(&mut self) -> LJResult { - let key: String = self.connection.get("/clientkey")?; - Ok(key) + pub fn get_client_key( &mut self ) -> LJResult { + let key : String = self.connection.get("/clientkey")?; + Ok(key) } - pub fn get_color_balance(&mut self) -> LJResult<(u8, u8, u8)> { - Ok(( - self.connection.get("/red")?, - self.connection.get("/green")?, - self.connection.get("/blue")?, - )) - } - - pub fn get_resampler(&mut self ) -> LJResult { - let val: String = self.connection.get(format!("/resampler/{}", self.laser_id))?; - let resampler : Resampler = from_str(&val)?; - Ok(resampler) - } - - pub fn get_int(&mut self, key: &str) -> LJResult { - // Get new Int - let fmt = format!("/{key}/{}", self.laser_id); - let val: u32 = self.connection.get(fmt)?; - Ok(val) + pub fn get_int(&mut self, key: &str ) -> LJResult { + // Get new Int + let fmt = format!("/{key}/{}", self.laser_id); + let val : u32 = self.connection.get(fmt)?; + Ok(val) } } diff --git a/src/worldstate.rs b/src/worldstate.rs index 4c1366c..e03edb1 100644 --- a/src/worldstate.rs +++ b/src/worldstate.rs @@ -2,7 +2,6 @@ use crate::point::{Point, Color}; use nalgebra::base::{Matrix3, Matrix1x3}; use crate::errors::{LJError, LJResult}; use log::debug; -use crate::redis_ctrl::Resampler; #[derive(Debug, Default)] pub struct EDH { @@ -40,7 +39,7 @@ impl EDH { #[derive(Debug, Default)] pub struct WorldState { pub edh: EDH, - pub resampler: Resampler, + pub resampler: Vec, pub client_key: String, pub intensity: u8, pub kpps: u32, @@ -48,7 +47,6 @@ pub struct WorldState { pub draw_black: bool, pub draw_grid: bool, pub grid: Vec, - pub color_balance: Color, } impl WorldState {}