version with a channel
This commit is contained in:
parent
c5f3b5dd87
commit
7dc60cd74a
82
examples/livecoding_channels.rs
Normal file
82
examples/livecoding_channels.rs
Normal file
@ -0,0 +1,82 @@
|
||||
use scraper::{Html,Selector};
|
||||
use url::Url;
|
||||
use tokio::sync::mpsc::{self, Sender}; //, Receiver};
|
||||
|
||||
const WORKERS : usize = 8;
|
||||
|
||||
type SiteStat = (Url, Vec<Url>);
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
let addr = std::env::args().nth(1)
|
||||
.unwrap_or_else(|| "https://www.tmplab.org".to_string());
|
||||
|
||||
let links = get_links(addr.as_ref()).await?;
|
||||
let addr = Url::parse(addr.as_ref())?;
|
||||
let mut links : Vec<Url> = links.into_iter()
|
||||
.filter(| url | url.host() != addr.host())
|
||||
.collect();
|
||||
|
||||
let mut to_fetch = links.len();
|
||||
let (tx, mut rx) = mpsc::channel(32);
|
||||
// spawn a pool of workers to get the things started...
|
||||
for _ in 0..WORKERS {
|
||||
if let Some(addr) = links.pop() {
|
||||
spawn_worker(addr, tx.clone());
|
||||
}
|
||||
}
|
||||
|
||||
let mut results = vec![];
|
||||
// gets the results back from the workers through the channel
|
||||
while let Some(res) = rx.recv().await {
|
||||
to_fetch -= 1;
|
||||
let Some(site) = res else {
|
||||
continue
|
||||
};
|
||||
results.push(site);
|
||||
// if there are still urls to fetch, pop one and spawn a new worker
|
||||
// otherwise we want to break this loop if all the urls
|
||||
// have been fetched...
|
||||
if let Some(addr) = links.pop() {
|
||||
spawn_worker(addr, tx.clone());
|
||||
} else if to_fetch == 0 {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
for (url, links) in results {
|
||||
println!("{url} : {} links", links.len())
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// interestingly, this function must not be async...
|
||||
//a worker fetch one url, send it's result back in the channel, and terminates.
|
||||
fn spawn_worker(url: Url, tx: Sender<Option<SiteStat>>) {
|
||||
println!("Start fetching {url}");
|
||||
tokio::spawn(async move {
|
||||
let res = get_links(url.as_ref()).await
|
||||
.ok()
|
||||
.map(| v | (url.clone(), v));
|
||||
println!("Got {url}");
|
||||
tx.send(res).await.unwrap()
|
||||
});
|
||||
}
|
||||
|
||||
async fn get_links(
|
||||
url: &str
|
||||
) -> Result<Vec<Url>, reqwest::Error> {
|
||||
let a_selector = Selector::parse("a[href]").unwrap();
|
||||
|
||||
let body = reqwest::get(url)
|
||||
.await?
|
||||
.text()
|
||||
.await?;
|
||||
|
||||
Ok(Html::parse_document(&body)
|
||||
.select(&a_selector)
|
||||
.filter_map(| link | link.value().attr("href")
|
||||
.and_then(| href | Url::parse(href).ok()))
|
||||
.collect())
|
||||
}
|
@ -9,5 +9,6 @@ Try:
|
||||
> cargo run --release --example livecoding
|
||||
> cargo run --release --example livecoding_cleaner
|
||||
> cargo run --release --example livecoding_simple
|
||||
> cargo run --release --example livecoding_channels
|
||||
"#);
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user