From f62a84ad075664348442ac215ef428bbadc6068d Mon Sep 17 00:00:00 2001 From: Marc Planard Date: Sat, 14 Oct 2023 23:22:29 +0200 Subject: [PATCH] add multi-channels example --- examples/livecoding_channel.rs | 82 +++++++++++++++++++++++++++++++++ examples/livecoding_channels.rs | 72 ++++++++++++++++------------- examples/livecoding_cleaner.rs | 17 +++---- examples/livecoding_simple.rs | 4 +- src/main.rs | 1 + 5 files changed, 135 insertions(+), 41 deletions(-) create mode 100644 examples/livecoding_channel.rs diff --git a/examples/livecoding_channel.rs b/examples/livecoding_channel.rs new file mode 100644 index 0000000..701a967 --- /dev/null +++ b/examples/livecoding_channel.rs @@ -0,0 +1,82 @@ +use scraper::{Html,Selector}; +use url::Url; +use tokio::sync::mpsc::{self, Sender}; //, Receiver}; + +const WORKERS : usize = 8; + +type SiteStat = (Url, Vec); + +#[tokio::main] +async fn main() -> Result<(), Box> { + let addr = std::env::args().nth(1) + .unwrap_or_else(|| "https://www.tmplab.org".to_string()); + + let links = get_links(addr.as_ref()).await?; + let addr = Url::parse(addr.as_ref())?; + let mut links : Vec = links.into_iter() + .filter(| url | url.host() != addr.host()) + .collect(); + + let mut to_fetch = links.len(); + let (tx, mut rx) = mpsc::channel(32); + // spawn a pool of workers to get the things started... + for _ in 0..WORKERS { + if let Some(addr) = links.pop() { + spawn_worker(addr, tx.clone()); + } + } + + let mut results = vec![]; + // gets the results back from the workers through the channel + while let Some(res) = rx.recv().await { + to_fetch -= 1; + let Some(site) = res else { + continue + }; + results.push(site); + // if there are still urls to fetch, pop one and spawn a new worker + // otherwise we want to break this loop if all the urls + // have been fetched... + if let Some(addr) = links.pop() { + spawn_worker(addr, tx.clone()); + } else if to_fetch == 0 { + break; + } + } + + for (url, links) in results { + println!("{url} : {} links", links.len()) + } + + Ok(()) +} + +// interestingly, this function must not be async... +//a worker fetch one url, send it's result back in the channel, and terminates. +fn spawn_worker(url: Url, tx: Sender>) { + println!("Start fetching {url}"); + tokio::spawn(async move { + let res = get_links(url.as_ref()).await + .ok() + .map(| v | (url.clone(), v)); + println!("Got {url}"); + tx.send(res).await.unwrap() + }); +} + +async fn get_links( + url: &str +) -> Result, reqwest::Error> { + let a_selector = Selector::parse("a[href]").unwrap(); + + let body = reqwest::get(url) + .await? + .text() + .await?; + + Ok(Html::parse_document(&body) + .select(&a_selector) + .filter_map(| link | link.value().attr("href") + .and_then(| href | Url::parse(href).ok())) + .collect()) +} diff --git a/examples/livecoding_channels.rs b/examples/livecoding_channels.rs index 701a967..f33e56f 100644 --- a/examples/livecoding_channels.rs +++ b/examples/livecoding_channels.rs @@ -10,58 +10,68 @@ type SiteStat = (Url, Vec); async fn main() -> Result<(), Box> { let addr = std::env::args().nth(1) .unwrap_or_else(|| "https://www.tmplab.org".to_string()); - let links = get_links(addr.as_ref()).await?; let addr = Url::parse(addr.as_ref())?; let mut links : Vec = links.into_iter() .filter(| url | url.host() != addr.host()) .collect(); - let mut to_fetch = links.len(); let (tx, mut rx) = mpsc::channel(32); - // spawn a pool of workers to get the things started... - for _ in 0..WORKERS { - if let Some(addr) = links.pop() { - spawn_worker(addr, tx.clone()); + + // spawn a pool of workers + let mut workers : Vec>> = (0..WORKERS) + .map(| n | Some(spawn_worker(n, tx.clone()))) + .collect(); + + // send 1 url to each worker to get things started + for tx in &workers { + if let Some(url) = links.pop() { + tx.as_ref().unwrap().send(url).await.unwrap(); } } - + println!("links: {}", links.len()); let mut results = vec![]; - // gets the results back from the workers through the channel - while let Some(res) = rx.recv().await { - to_fetch -= 1; - let Some(site) = res else { - continue - }; - results.push(site); - // if there are still urls to fetch, pop one and spawn a new worker - // otherwise we want to break this loop if all the urls - // have been fetched... + // recieve the results and send back new url to the workers + while let Some((idx, res)) = rx.recv().await { + if let Some(res) = res { + results.push(res); + } if let Some(addr) = links.pop() { - spawn_worker(addr, tx.clone()); - } else if to_fetch == 0 { - break; + workers[idx].as_ref().unwrap().send(addr).await.unwrap(); + } else { + workers[idx].take().unwrap(); + if workers.iter().filter(| tx | tx.is_some()).count() == 0 { + break; + } } } - for (url, links) in results { + for (url, links) in &results { println!("{url} : {} links", links.len()) } - + println!("TOTAL: {}", results.len()); Ok(()) } -// interestingly, this function must not be async... -//a worker fetch one url, send it's result back in the channel, and terminates. -fn spawn_worker(url: Url, tx: Sender>) { - println!("Start fetching {url}"); +fn spawn_worker( + id: usize, + tx: Sender<(usize,Option)> +) -> Sender { + let (tx1, mut rx) = mpsc::channel::(16); + tokio::spawn(async move { - let res = get_links(url.as_ref()).await - .ok() - .map(| v | (url.clone(), v)); - println!("Got {url}"); - tx.send(res).await.unwrap() + println!("Start worker {id}"); + while let Some(url) = rx.recv().await { + println!("Start fetching {url}"); + let res = get_links(url.as_ref()).await + .map(| v | (url.clone(), v)).ok(); + println!("Got {url}"); + tx.send((id, res)).await.unwrap() + } + println!("Terminate worker {id}"); }); + + tx1 } async fn get_links( diff --git a/examples/livecoding_cleaner.rs b/examples/livecoding_cleaner.rs index 2503a7b..ca5f0de 100644 --- a/examples/livecoding_cleaner.rs +++ b/examples/livecoding_cleaner.rs @@ -11,11 +11,11 @@ type SiteStat = (Url, Vec); #[tokio::main] async fn main() -> Result<(), Box> { let addr = std::env::args().nth(1) - .unwrap_or_else(|| "www.tmplab.org".to_string()); - + .unwrap_or_else(|| "https://www.tmplab.org".to_string()); let links = get_links(addr.as_ref()).await?; - let links : Vec = links.into_iter() - .filter(| url | url.host() != Some(Host::Domain(&addr))) + let addr = Url::parse(addr.as_ref())?; + let mut links : Vec = links.into_iter() + .filter(| url | url.host() != addr.host()) .collect(); let links = Arc::new(Mutex::new(links)); @@ -23,15 +23,16 @@ async fn main() -> Result<(), Box> { let joins = (0..WORKERS) .map(| n | spawn_worker(n, &links)); - let results = future::join_all(joins).await + let results : Vec<_>= future::join_all(joins).await .into_iter() .filter_map(| r | r.ok()) - .flatten(); + .flatten() + .collect(); - for (url, links) in results { + for (url, links) in &results { println!("{url} : {} links", links.len()) } - + println!("TOTAL: {}", results.len()); Ok(()) } diff --git a/examples/livecoding_simple.rs b/examples/livecoding_simple.rs index 6d89303..4dfc017 100644 --- a/examples/livecoding_simple.rs +++ b/examples/livecoding_simple.rs @@ -7,9 +7,9 @@ type SiteStat = (Url, Vec); #[tokio::main] async fn main() -> Result<(), Box> { let addr = std::env::args().nth(1) - .unwrap_or_else(|| "www.tmplab.org".to_string()); + .unwrap_or_else(|| "https://tmplab.org".to_string()); - let links = get_links("https://tmplab.org").await?; + let links = get_links(addr.as_ref()).await?; let links : Vec = links.into_iter() .filter(| url | url.host() != Some(Host::Domain(&addr))) .collect(); diff --git a/src/main.rs b/src/main.rs index eeb2beb..83303de 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,6 +9,7 @@ Try: > cargo run --release --example livecoding > cargo run --release --example livecoding_cleaner > cargo run --release --example livecoding_simple + > cargo run --release --example livecoding_channel > cargo run --release --example livecoding_channels "#); }