livecoding_async_rust/examples/livecoding_channels.rs

93 lines
2.3 KiB
Rust
Raw Normal View History

2023-10-13 11:42:16 +00:00
use scraper::{Html,Selector};
use url::Url;
use tokio::sync::mpsc::{self, Sender}; //, Receiver};
const WORKERS : usize = 8;
type SiteStat = (Url, Vec<Url>);
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let addr = std::env::args().nth(1)
.unwrap_or_else(|| "https://www.tmplab.org".to_string());
let links = get_links(addr.as_ref()).await?;
let addr = Url::parse(addr.as_ref())?;
let mut links : Vec<Url> = links.into_iter()
.filter(| url | url.host() != addr.host())
.collect();
let (tx, mut rx) = mpsc::channel(32);
2023-10-14 21:22:29 +00:00
// spawn a pool of workers
let mut workers : Vec<Option<Sender<Url>>> = (0..WORKERS)
.map(| n | Some(spawn_worker(n, tx.clone())))
.collect();
// send 1 url to each worker to get things started
for tx in &workers {
if let Some(url) = links.pop() {
tx.as_ref().unwrap().send(url).await.unwrap();
2023-10-13 11:42:16 +00:00
}
}
2023-10-14 21:23:56 +00:00
2023-10-13 11:42:16 +00:00
let mut results = vec![];
2023-10-14 21:22:29 +00:00
// recieve the results and send back new url to the workers
while let Some((idx, res)) = rx.recv().await {
if let Some(res) = res {
results.push(res);
}
2023-10-13 11:42:16 +00:00
if let Some(addr) = links.pop() {
2023-10-14 21:22:29 +00:00
workers[idx].as_ref().unwrap().send(addr).await.unwrap();
} else {
workers[idx].take().unwrap();
if workers.iter().filter(| tx | tx.is_some()).count() == 0 {
break;
}
2023-10-13 11:42:16 +00:00
}
}
2023-10-14 21:22:29 +00:00
for (url, links) in &results {
2023-10-13 11:42:16 +00:00
println!("{url} : {} links", links.len())
}
2023-10-14 21:22:29 +00:00
println!("TOTAL: {}", results.len());
2023-10-13 11:42:16 +00:00
Ok(())
}
2023-10-14 21:22:29 +00:00
fn spawn_worker(
id: usize,
tx: Sender<(usize,Option<SiteStat>)>
) -> Sender<Url> {
let (tx1, mut rx) = mpsc::channel::<Url>(16);
2023-10-13 11:42:16 +00:00
tokio::spawn(async move {
2023-10-14 21:22:29 +00:00
println!("Start worker {id}");
while let Some(url) = rx.recv().await {
println!("Start fetching {url}");
let res = get_links(url.as_ref()).await
.map(| v | (url.clone(), v)).ok();
println!("Got {url}");
tx.send((id, res)).await.unwrap()
}
println!("Terminate worker {id}");
2023-10-13 11:42:16 +00:00
});
2023-10-14 21:22:29 +00:00
tx1
2023-10-13 11:42:16 +00:00
}
async fn get_links(
url: &str
) -> Result<Vec<Url>, reqwest::Error> {
let a_selector = Selector::parse("a[href]").unwrap();
let body = reqwest::get(url)
.await?
.text()
.await?;
Ok(Html::parse_document(&body)
.select(&a_selector)
.filter_map(| link | link.value().attr("href")
.and_then(| href | Url::parse(href).ok()))
.collect())
}