#[warn(unused_imports)] use log::{debug, info, warn}; use std::net::SocketAddr; use std::thread::sleep; use ether_dream::dac::stream::{CommunicationError, connect}; use ether_dream::dac::{Playback, Stream}; use chrono::{DateTime, Utc}; use std::time; use std::time::{Duration, SystemTime}; use crate::conf::EtherDreamConf; use crate::device::{Device, Status, PlaybackState}; use crate::errors::{LJError, LJResult}; use crate::point::{Color, Point}; use ether_dream::protocol::{DacBroadcast, DacResponse}; use crate::device::PlaybackState::PLAYING; #[warn(dead_code)] pub struct EtherdreamDevice { pub conf: EtherDreamConf, dac: DacBroadcast, stream: Stream, dac_response: u8, } impl EtherdreamDevice { pub fn new(conf: &EtherDreamConf) -> LJResult { let (dac, _source_address, stream) = EtherdreamDevice::connect(conf)?; Ok(Self { conf: (*conf).clone(), dac, stream, dac_response: DacResponse::ACK, }) } fn connect(conf: &EtherDreamConf) -> LJResult<(DacBroadcast, SocketAddr, Stream)> { let ip = &conf.ip; let dac_broadcast = ether_dream::recv_dac_broadcasts()?; dac_broadcast.set_timeout(Some(time::Duration::new(10, 0)))?; info!("Attempting to get DAC broadcast..."); let broadcast = dac_broadcast .take(5) .filter_map(|result| { match result { Err(err) => { warn!( "Failed to find a valid DAC via broadcast. Error: {:?}", err); info!( "Retrying..."); None } Ok((dac, source_addr)) => { info!("Valid broadcast, source_addr: {}", source_addr); if source_addr.is_ipv6() { return None; } if &source_addr.ip().to_string() != ip { return None; } Some(Ok((dac, source_addr))) } } }) .next() .expect("Failed to receive broadcast."); match broadcast { Err(err) => { Err(Box::new(LJError::EtherdreamConnectError(err))) } Ok((dac, source_addr)) => { info!("Trying to open TCP stream..."); let stream = EtherdreamDevice::get_tcp_stream(&dac, &source_addr)?; info!("Finished configuring DAC and TCP stream."); Ok((dac, source_addr, stream)) } } } fn get_tcp_stream(dac: &DacBroadcast, source_address: &SocketAddr) -> LJResult { let mut stream = connect(dac, source_address.ip())?; debug!("Stream dac BEFORE PREPARE {:?}", stream.dac()); debug!("Playback State BEFORE PREPARE {:?}, {:?}", stream.dac().dac.status.playback, Playback::Playing ); if stream.dac().dac.status.playback == Playback::Playing { warn!("DAC was in playback PLAYING, attempting to stop"); match stream .queue_commands() .stop() .submit() { Err(err) => warn!("err occurred when submitting STOP command: {}",err), Ok(_) => info!("Prepared Stream.") } } if stream.dac().dac.status.playback != Playback::Prepared { warn!("DAC was not in playback state PREPARED, attempting to prepare"); match stream .queue_commands() .prepare_stream() .submit() { Err(err) => warn!("err occurred when submitting PREPARE_STREAM command and listening for response: {}",err), Ok(_) => info!("Prepared Stream.") } } // If we want to create an animation (in our case a moving sine wave) we need a frame rate. let frames_per_second = 60.0; // Lets use the DAC at an eighth the maximum scan rate. let points_per_second = 20_000; // let points_per_second = 30_000; debug!("points per second {:?}", points_per_second); // Determine the number of points per frame given our target frame and point rates. let points_per_frame = (points_per_second as f32 / frames_per_second) as u16; let mut sine_wave = SineWave { point: 0, points_per_frame, frames_per_second, }; debug!("Stream dac BEFORE BEGIN {:?}", stream.dac()); match stream .queue_commands() .data(sine_wave.by_ref().take(400)) // .data(begin_list.into_iter().take(400 as usize)) .begin(0, points_per_second) .submit() { Err(err) => warn!("err occurred when submitting first data: {}",err), Ok(_) => info!("Sent first data to Etherdream.") } Ok(stream) } fn points_capacity(&self) -> usize { /*** Determine the number of points needed to fill the DAC. ***/ let cap = self.dac.buffer_capacity as usize; let fullness = self.stream.dac().dac.status.buffer_fullness as usize; // Sometimes we had thread 'main' panicked at 'attempt to subtract with overflow', src/device/etherdream.rs:144:24 let n_points = if cap > fullness { cap - fullness } else { 0 }; n_points } fn ping(&mut self) -> LJResult<()> { Ok(self.stream.queue_commands().ping().submit()?) } } impl Device for EtherdreamDevice { fn status(&mut self) -> Status { let playback_state = match self.stream.dac().dac.status.playback { Playback::Idle => PlaybackState::IDLE, Playback::Prepared => PlaybackState::PREPARE, Playback::Playing => PlaybackState::PLAYING, }; let now = SystemTime::now(); let now: DateTime = now.into(); let now = now.to_rfc3339(); // debug!("Dac Status: {:?} ", status ); // debug!("Etherdream Dac {:?} ", self.dac ); debug!("Stream dac{:?}", self.stream.dac()); Status { last_traced_at: now, properties: vec!["foo".to_string()], playback_state, capacity: self.points_capacity(), lack: self.dac_response.to_string(), } // status } fn draw(&mut self, line: Vec, _speed: u32, ) -> LJResult<()> { let chunk_size = 512; let points_iter = line.into_iter(); for chunk in points_iter.as_slice().chunks(chunk_size) { debug!("New chunk length: {:?}", chunk.len()); let capacity = self.points_capacity(); debug!("capacity : {:?}", capacity); loop { if chunk.len() > capacity as usize { debug!("Sleep"); // Sleep for 1/100th of a sec sleep(Duration::new(0, 100_000_000)); break; // self.ping(); } else { break; } } debug!("Drawing"); match self.stream .queue_commands() .data( chunk.into_iter() .map(|point| (*point).into()) .take(chunk_size as usize) ) .submit() { Err(err) => { // We should account for // 'Broken pipe (os error 32)' // Connection reset by peer (os error 104) self.dac_response = match err { CommunicationError::Io(err) => { warn!("IO ERROR while drawing: '{}'",err); DacResponse::ACK } CommunicationError::Protocol(err) => { warn!("Protocol ERROR while drawing: '{}'",err); DacResponse::ACK } CommunicationError::Response(err) => { warn!("Response ERROR while drawing: '{}'",err); err.response.response } }; } Ok(_) => { self.dac_response = DacResponse::ACK; // debug!("Draw is ok"); } }; } Ok(()) } fn stop(&mut self) -> LJResult<()> { info!("Stopping Etherdream device..."); info!("Stream dac{:?}", self.stream.dac()); match self.stream .queue_commands() .stop() .submit() { Err(err) => { warn!("Failed to stop EtherDream device with error {:?}", err); Err(Box::new(err)) } Ok(_) => { info!("Sucessfully closed EtherDream device."); Ok(()) } } } fn grid(&mut self) -> Vec { let dim_mid = 16000 as f32; let dim_max = 32000 as f32; let col_min = Color { r: 0, g: 0, b: 0 }; let col_max = Color { r: 255, g: 255, b: 255 }; vec![ Point { x: -dim_max, y: dim_max, color: col_min }, Point { x: -dim_max, y: dim_max, color: col_max }, Point { x: dim_max, y: dim_max, color: col_max }, Point { x: dim_max, y: -dim_max, color: col_max }, Point { x: -dim_max, y: -dim_max, color: col_max }, Point { x: -dim_max, y: -dim_mid, color: col_min }, Point { x: -dim_mid, y: dim_mid, color: col_min }, Point { x: -dim_mid, y: dim_mid, color: col_max }, Point { x: dim_mid, y: dim_mid, color: col_max }, Point { x: dim_mid, y: -dim_mid, color: col_max }, Point { x: -dim_mid, y: -dim_mid, color: col_max }, Point { x: -dim_mid, y: -dim_mid, color: col_min }, ] } } // An iterator that endlessly generates a sine wave of DAC points. // // The sine wave oscillates at a rate of once per second. struct SineWave { point: u32, points_per_frame: u16, frames_per_second: f32, } impl Iterator for SineWave { type Item = ether_dream::protocol::DacPoint; fn next(&mut self) -> Option { let coloured_points_per_frame = self.points_per_frame - 1; let i = (self.point % self.points_per_frame as u32) as u16; let hz = 1.0; let fract = i as f32 / coloured_points_per_frame as f32; let phase = (self.point as f32 / coloured_points_per_frame as f32) / self.frames_per_second; let amp = (hz * (fract + phase) * 2.0 * std::f32::consts::PI).sin(); let (r, g, b) = match i { i if i == coloured_points_per_frame || i < 13 => (0, 0, 0), _ => (std::u16::MAX, std::u16::MAX, std::u16::MAX), }; let x_min = std::i16::MIN; let x_max = std::i16::MAX; let x = (x_min as f32 + fract * (x_max as f32 - x_min as f32)) as i16; let y = (amp * x_max as f32) as i16; let control = 0; let (u1, u2) = (0, 0); let p = ether_dream::protocol::DacPoint { control, x, y, i, r, g, b, u1, u2, }; // debug!("{:?}",p); self.point += 1; Some(p) } }