diff --git a/examples/populate_redis.rs b/examples/populate_redis.rs index cd95e77..a1d43fe 100644 --- a/examples/populate_redis.rs +++ b/examples/populate_redis.rs @@ -1,24 +1,34 @@ -/// -/// $ cargo run --example populate_redis -/// +use std::io::_print; +/** + +# Populate Redis Example + +**This script simulates the redis content provided by the LJ Python / web tool** + +$ cargo run --example populate_redis + **/ use redis::{ - //RedisResult, - Client, - Commands, - Connection, + //RedisResult, + Client, + Commands, + Connection, }; fn do_something() -> redis::RedisResult<()> { - let client = Client::open("redis://127.0.0.1/")?; - let mut con: Connection = client.get_connection()?; - + let client = Client::open("redis://127.0.0.1/")?; + let mut con: Connection = client.get_connection()?; let _ = con.set("/clientkey", "/pl/0/")?; let _ = con.set("/EDH/0", "[[1.0, 0.0, 0.0],\n [ 0.0, 1.0, 0.0],\n [ 0.0, 0.0, 1.0]]")?; let _ = con.set("/kpps/0", "5000")?; let _ = con.set("/intensity/0", "255")?; - Ok(()) + let _ = con.set("/pl/0/0", "[(-300, 300, 0), (-300, -300, 65280), (300, -300, 65280), (300, 300, 65280), (-300, 300, 65280)]")?; + Ok(()) } + fn main() { - _ = do_something(); + match do_something() { + Err(err) => println!("Something wrong occured: {:?}", err), + Ok(..) => println!("Successfully inserted content in Redis") + } } diff --git a/src/device/etherdream.rs b/src/device/etherdream.rs index d5bc4e8..86b6a6b 100644 --- a/src/device/etherdream.rs +++ b/src/device/etherdream.rs @@ -1,42 +1,48 @@ use std::time; use std::net::SocketAddr; -use ether_dream::dac::stream::connect; +use ether_dream::dac::stream::{CommunicationError, connect}; use ether_dream::dac::{Playback, Stream}; +use chrono::{DateTime, Utc}; +use std::time::SystemTime; use crate::conf::EtherDreamConf; use crate::device::{Device, Status, PlaybackState}; use crate::errors::{LJError, LJResult}; use crate::point::{Color, Point}; -use ether_dream::protocol::{DacBroadcast, DacPoint}; +use ether_dream::protocol::{DacBroadcast, DacPoint, DacResponse}; use log::{debug, info, warn}; #[warn(dead_code)] pub struct EtherdreamDevice { pub conf: EtherDreamConf, dac: DacBroadcast, - // source_address: SocketAddr, stream: Stream, - // sent_points: u16, - lack: String, - last_traced_at: String, -} + // "a": ACK "F": Full "I": invalid. 64 or 35 for no connection. + // /// The previous command was accepted. + // pub const ACK: u8 = 0x61; + // /// The write command could not be performed because there was not enough buffer space when it + // /// was received. + // pub const NAK_FULL: u8 = 0x46; + // /// The command contained an invalid `command` byte or parameters. + // pub const NAK_INVALID: u8 = 0x49; + // /// An emergency-stop condition still exists. + // pub const NAK_STOP_CONDITION: u8 = 0x21; + // } + dac_response: u8, +} impl EtherdreamDevice { pub fn new(conf: &EtherDreamConf) -> LJResult { - let (dac, _source_address, stream) = EtherdreamDevice::get_dac(conf)?; - + let (dac, _source_address, stream) = EtherdreamDevice::connect(conf)?; Ok(Self { conf: (*conf).clone(), dac, - // source_address, stream, - // sent_points: 0, - lack: "".to_string(), - last_traced_at: "1985-04-12T23:20:50.52Z".to_string(), + dac_response: DacResponse::ACK, }) } - pub fn get_dac(conf: &EtherDreamConf) -> LJResult<(DacBroadcast, SocketAddr, Stream)> { + fn connect(conf: &EtherDreamConf) -> LJResult<(DacBroadcast, SocketAddr, Stream)> { let ip = &conf.ip; let dac_broadcast = ether_dream::recv_dac_broadcasts()?; dac_broadcast.set_timeout(Some(time::Duration::new(10, 0)))?; @@ -72,75 +78,34 @@ impl EtherdreamDevice { } } - pub fn get_tcp_stream(dac: &DacBroadcast, source_address: &SocketAddr) -> LJResult { - // Establish the TCP connection. + fn get_tcp_stream(dac: &DacBroadcast, source_address: &SocketAddr) -> LJResult { let mut stream = connect(dac, source_address.ip())?; - EtherdreamDevice::prepare_tcp_stream(&mut stream).unwrap(); - Ok(stream) - } - - fn prepare_tcp_stream(stream: &mut Stream) -> LJResult<()> { - - // Prepare stream match stream .queue_commands() .prepare_stream() .submit() { - Err(err) => { - warn!( - "err occurred when submitting PREPARE_STREAM command and listening for response: {}", - err - ); - } - Ok(_) => { - info!("Prepared Stream.") - } + Err(err) => warn!("err occurred when submitting PREPARE_STREAM command and listening for response: {}",err), + Ok(_) => info!("Prepared Stream.") } - - let control = 0; - let (u1, u2) = (0, 0); - let i = 255; - let point = DacPoint { - control, - x: 0, - y: 0, - i, - r: 0, - g: 0, - b: 0, - u1, - u2, - }; - let begin_list = vec![point]; + let begin_list = vec![ + DacPoint { control: 0, x: 0, y: 0, i: 255, r: 0, g: 0, b: 0, u1: 0, u2: 0 }, + ]; let points_per_second = stream.dac().max_point_rate / 32; match stream .queue_commands() .data(begin_list.into_iter().take(1 as usize)) .begin(0, points_per_second) .submit() { - Err(err) => { - warn!( - "err occurred when submitting first data: {}", - err - ); - } - Ok(_) => { - info!("Sent first data to Etherdream.") - } + Err(err) => warn!("err occurred when submitting first data: {}",err), + Ok(_) => info!("Sent first data to Etherdream.") } - - Ok(()) + Ok(stream) } - - pub fn check_tcp_stream(&mut self) -> LJResult<()> { - // todo Reinit stream if needed - // self.stream = EtherdreamDevice::get_tcp_stream(&self.dac, &self.source_address)? - Ok(()) - } - - // Determine the number of points needed to fill the DAC. fn points_capacity(&self) -> u16 { + /*** + Determine the number of points needed to fill the DAC. + ***/ // Fixme thread 'main' panicked at 'attempt to subtract with overflow', src/device/etherdream.rs:144:24 let n_points = self.dac.buffer_capacity as u16 - self.stream.dac().dac.status.buffer_fullness as u16 - 1; n_points @@ -149,26 +114,25 @@ impl EtherdreamDevice { impl Device for EtherdreamDevice { fn status(&mut self) -> Status { - let _ = self.check_tcp_stream(); - - // "a": ACK "F": Full "I": invalid. 64 or 35 for no connection. let playback_state = match self.stream.dac().dac.status.playback { Playback::Idle => PlaybackState::IDLE, Playback::Prepared => PlaybackState::PREPARE, Playback::Playing => PlaybackState::PLAYING, }; + let now = SystemTime::now(); + let now: DateTime = now.into(); + let now = now.to_rfc3339(); let status = Status { - last_traced_at: self.last_traced_at.clone(), + last_traced_at: now, properties: vec!["foo".to_string()], playback_state, capacity: self.points_capacity(), - lack: String::from(&self.lack), + lack: self.dac_response.to_string(), }; // info!("Dac Status: {:?} ", status ); // info!("Etherdream Dac {:?} ", self.dac ); // info!("Stream dac{:?}", self.stream.dac()); - status } @@ -179,27 +143,41 @@ impl Device for EtherdreamDevice { let n_points = self.points_capacity(); // let n_points = &line.len(); debug!("Etherdream::device draw Generating {:?} points", n_points); - return match self.stream + match self.stream .queue_commands() .data( line.into_iter() .map(|point| point.into()) // .take(line.len() as usize) - .take(n_points as usize ) + .take(n_points as usize) ) .submit() { Err(err) => { // We should account for // 'Broken pipe (os error 32)' // Connection reset by peer (os error 104) - warn!("Draw error: '{}'",err); - Ok(()) + self.dac_response = match err { + CommunicationError::Io(err) => { + warn!("IO ERROR while drawing: '{}'",err); + DacResponse::ACK + } + CommunicationError::Protocol(err) => { + warn!("Protocol ERROR while drawing: '{}'",err); + DacResponse::ACK + } + CommunicationError::Response(err) => { + warn!("Response ERROR while drawing: '{}'",err); + err.response.response + } + }; + } Ok(_) => { + self.dac_response = DacResponse::ACK; debug!("Draw is ok"); - Ok(()) } }; + Ok(()) } fn stop(&mut self) -> LJResult<()> { diff --git a/src/point.rs b/src/point.rs index d060fd5..1298905 100644 --- a/src/point.rs +++ b/src/point.rs @@ -57,7 +57,6 @@ impl From for helios_dac::Point { impl From for DacPoint { fn from(pt: Point) -> DacPoint { - print!("."); let control = 0; let (u1, u2) = (0, 0); let i = 255;