use std::time; use std::net::SocketAddr; use ether_dream::dac::stream::{CommunicationError, connect}; use ether_dream::dac::{Playback, Stream}; use chrono::{DateTime, Utc}; use std::time::SystemTime; use crate::conf::EtherDreamConf; use crate::device::{Device, Status, PlaybackState}; use crate::errors::{LJError, LJResult}; use crate::point::{Color, Point}; use ether_dream::protocol::{DacBroadcast, DacPoint, DacResponse}; use log::{debug, info, warn}; #[warn(dead_code)] pub struct EtherdreamDevice { pub conf: EtherDreamConf, dac: DacBroadcast, stream: Stream, // "a": ACK "F": Full "I": invalid. 64 or 35 for no connection. // /// The previous command was accepted. // pub const ACK: u8 = 0x61; // /// The write command could not be performed because there was not enough buffer space when it // /// was received. // pub const NAK_FULL: u8 = 0x46; // /// The command contained an invalid `command` byte or parameters. // pub const NAK_INVALID: u8 = 0x49; // /// An emergency-stop condition still exists. // pub const NAK_STOP_CONDITION: u8 = 0x21; // } dac_response: u8, } impl EtherdreamDevice { pub fn new(conf: &EtherDreamConf) -> LJResult { let (dac, _source_address, stream) = EtherdreamDevice::connect(conf)?; Ok(Self { conf: (*conf).clone(), dac, stream, dac_response: DacResponse::ACK, }) } fn connect(conf: &EtherDreamConf) -> LJResult<(DacBroadcast, SocketAddr, Stream)> { let ip = &conf.ip; let dac_broadcast = ether_dream::recv_dac_broadcasts()?; dac_broadcast.set_timeout(Some(time::Duration::new(10, 0)))?; info!("Attempting to get DAC broadcast..."); let broadcast = dac_broadcast .take(3) .filter_map(|result| { match result { Err(err) => { warn!( "Failed to find a valid DAC via broadcast. Error: {:?}", err); info!( "Retrying..."); None } Ok((dac, source_addr)) => { info!("Valid broadcast, source_addr: {}", source_addr); if source_addr.is_ipv6() { return None; } if &source_addr.ip().to_string() != ip { return None; } Some(Ok((dac, source_addr))) } } }) .next() .expect("Failed to receive broadcast."); match broadcast { Err(err) => { Err(Box::new(LJError::EtherdreamConnectError(err))) } Ok((dac, source_addr)) => { let stream = EtherdreamDevice::get_tcp_stream(&dac, &source_addr)?; info!("Finished configuring DAC and TCP stream."); Ok((dac, source_addr, stream)) } } } fn get_tcp_stream(dac: &DacBroadcast, source_address: &SocketAddr) -> LJResult { let mut stream = connect(dac, source_address.ip())?; match stream .queue_commands() .prepare_stream() .submit() { Err(err) => warn!("err occurred when submitting PREPARE_STREAM command and listening for response: {}",err), Ok(_) => info!("Prepared Stream.") } let begin_list = vec![ DacPoint { control: 0, x: 0, y: 0, i: 255, r: 0, g: 0, b: 0, u1: 0, u2: 0 }, ]; let points_per_second = stream.dac().max_point_rate / 32; match stream .queue_commands() .data(begin_list.into_iter().take(1 as usize)) .begin(0, points_per_second) .submit() { Err(err) => warn!("err occurred when submitting first data: {}",err), Ok(_) => info!("Sent first data to Etherdream.") } Ok(stream) } fn points_capacity(&self) -> u16 { /*** Determine the number of points needed to fill the DAC. ***/ // Fixme thread 'main' panicked at 'attempt to subtract with overflow', src/device/etherdream.rs:144:24 let n_points = self.dac.buffer_capacity as u16 - self.stream.dac().dac.status.buffer_fullness as u16 - 1; n_points } } impl Device for EtherdreamDevice { fn status(&mut self) -> Status { let playback_state = match self.stream.dac().dac.status.playback { Playback::Idle => PlaybackState::IDLE, Playback::Prepared => PlaybackState::PREPARE, Playback::Playing => PlaybackState::PLAYING, }; let now = SystemTime::now(); let now: DateTime = now.into(); let now = now.to_rfc3339(); let status = Status { last_traced_at: now, properties: vec!["foo".to_string()], playback_state, capacity: self.points_capacity(), lack: self.dac_response.to_string(), }; // info!("Dac Status: {:?} ", status ); // info!("Etherdream Dac {:?} ", self.dac ); // info!("Stream dac{:?}", self.stream.dac()); status } fn draw(&mut self, line: Vec, _speed: u32, ) -> LJResult<()> { let n_points = self.points_capacity(); // let n_points = &line.len(); debug!("Etherdream::device draw Generating {:?} points", n_points); match self.stream .queue_commands() .data( line.into_iter() .map(|point| point.into()) // .take(line.len() as usize) .take(n_points as usize) ) .submit() { Err(err) => { // We should account for // 'Broken pipe (os error 32)' // Connection reset by peer (os error 104) self.dac_response = match err { CommunicationError::Io(err) => { warn!("IO ERROR while drawing: '{}'",err); DacResponse::ACK } CommunicationError::Protocol(err) => { warn!("Protocol ERROR while drawing: '{}'",err); DacResponse::ACK } CommunicationError::Response(err) => { warn!("Response ERROR while drawing: '{}'",err); err.response.response } }; } Ok(_) => { self.dac_response = DacResponse::ACK; debug!("Draw is ok"); } }; Ok(()) } fn stop(&mut self) -> LJResult<()> { info!("Stopping Etherdream device..."); match self.stream .queue_commands() .stop() .submit() { Err(err) => { warn!("Failed to stop EtherDream device with error {:?}", err); Err(Box::new(err)) } Ok(_) => { info!("Sucessfully closed EtherDream device."); Ok(()) } } } fn grid(&mut self) -> Vec { let dim_mid = 16000 as f32; let dim_max = 32000 as f32; let col_min = Color { r: 0, g: 0, b: 0 }; let col_max = Color { r: 255, g: 255, b: 255 }; vec![ Point { x: -dim_max, y: dim_max, color: col_min }, Point { x: -dim_max, y: dim_max, color: col_max }, Point { x: dim_max, y: dim_max, color: col_max }, Point { x: dim_max, y: -dim_max, color: col_max }, Point { x: -dim_max, y: -dim_max, color: col_max }, Point { x: -dim_max, y: -dim_mid, color: col_min }, Point { x: -dim_mid, y: dim_mid, color: col_min }, Point { x: -dim_mid, y: dim_mid, color: col_max }, Point { x: dim_mid, y: dim_mid, color: col_max }, Point { x: dim_mid, y: -dim_mid, color: col_max }, Point { x: -dim_mid, y: -dim_mid, color: col_max }, Point { x: -dim_mid, y: -dim_mid, color: col_min }, ] } }