From 7dc60cd74af5c9226669f67cfb51e237bdbec6ca Mon Sep 17 00:00:00 2001 From: Marc Planard Date: Fri, 13 Oct 2023 13:42:16 +0200 Subject: [PATCH] version with a channel --- examples/livecoding_channels.rs | 82 +++++++++++++++++++++++++++++++++ src/main.rs | 1 + 2 files changed, 83 insertions(+) create mode 100644 examples/livecoding_channels.rs diff --git a/examples/livecoding_channels.rs b/examples/livecoding_channels.rs new file mode 100644 index 0000000..701a967 --- /dev/null +++ b/examples/livecoding_channels.rs @@ -0,0 +1,82 @@ +use scraper::{Html,Selector}; +use url::Url; +use tokio::sync::mpsc::{self, Sender}; //, Receiver}; + +const WORKERS : usize = 8; + +type SiteStat = (Url, Vec); + +#[tokio::main] +async fn main() -> Result<(), Box> { + let addr = std::env::args().nth(1) + .unwrap_or_else(|| "https://www.tmplab.org".to_string()); + + let links = get_links(addr.as_ref()).await?; + let addr = Url::parse(addr.as_ref())?; + let mut links : Vec = links.into_iter() + .filter(| url | url.host() != addr.host()) + .collect(); + + let mut to_fetch = links.len(); + let (tx, mut rx) = mpsc::channel(32); + // spawn a pool of workers to get the things started... + for _ in 0..WORKERS { + if let Some(addr) = links.pop() { + spawn_worker(addr, tx.clone()); + } + } + + let mut results = vec![]; + // gets the results back from the workers through the channel + while let Some(res) = rx.recv().await { + to_fetch -= 1; + let Some(site) = res else { + continue + }; + results.push(site); + // if there are still urls to fetch, pop one and spawn a new worker + // otherwise we want to break this loop if all the urls + // have been fetched... + if let Some(addr) = links.pop() { + spawn_worker(addr, tx.clone()); + } else if to_fetch == 0 { + break; + } + } + + for (url, links) in results { + println!("{url} : {} links", links.len()) + } + + Ok(()) +} + +// interestingly, this function must not be async... +//a worker fetch one url, send it's result back in the channel, and terminates. +fn spawn_worker(url: Url, tx: Sender>) { + println!("Start fetching {url}"); + tokio::spawn(async move { + let res = get_links(url.as_ref()).await + .ok() + .map(| v | (url.clone(), v)); + println!("Got {url}"); + tx.send(res).await.unwrap() + }); +} + +async fn get_links( + url: &str +) -> Result, reqwest::Error> { + let a_selector = Selector::parse("a[href]").unwrap(); + + let body = reqwest::get(url) + .await? + .text() + .await?; + + Ok(Html::parse_document(&body) + .select(&a_selector) + .filter_map(| link | link.value().attr("href") + .and_then(| href | Url::parse(href).ok())) + .collect()) +} diff --git a/src/main.rs b/src/main.rs index 131ac0e..eeb2beb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,5 +9,6 @@ Try: > cargo run --release --example livecoding > cargo run --release --example livecoding_cleaner > cargo run --release --example livecoding_simple + > cargo run --release --example livecoding_channels "#); }