livecoding_async_rust/examples/livecoding_cleaner.rs

89 lines
2.0 KiB
Rust
Raw Normal View History

2023-10-13 10:30:32 +00:00
use scraper::{Html,Selector};
use url::{Url,Host};
//use tokio::sync::mpsc::{self, Sender, Receiver};
use std::sync::{Arc,Mutex};
use tokio::task::JoinHandle;
use futures::future;
const WORKERS : usize = 8;
type SiteStat = (Url, Vec<Url>);
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let addr = std::env::args().nth(1)
.unwrap_or_else(|| "www.tmplab.org".to_string());
2023-10-13 10:36:23 +00:00
let links = get_links(addr.as_ref()).await?;
2023-10-13 10:30:32 +00:00
let links : Vec<Url> = links.into_iter()
.filter(| url | url.host() != Some(Host::Domain(&addr)))
.collect();
let links = Arc::new(Mutex::new(links));
let joins = (0..WORKERS)
.map(| n | spawn_worker(n, &links));
let results = future::join_all(joins).await
.into_iter()
.filter_map(| r | r.ok())
.flatten();
for (url, links) in results {
println!("{url} : {} links", links.len())
}
Ok(())
}
// interestingly, this function must not be async...
fn spawn_worker(
n: usize,
links: &Arc<Mutex<Vec<Url>>>
) -> JoinHandle<Vec<SiteStat>> {
println!("Spawning worker {n}");
let links = links.clone();
tokio::spawn(async move { looper(links).await })
}
async fn looper(links: Arc<Mutex<Vec<Url>>>) -> Vec<SiteStat> {
let mut results = vec![];
loop {
let url = {
let mut v = links.lock().unwrap();
let Some(url) = v.pop() else {
break;
};
url
};
println!("Start fetching {url}...");
let res = match get_links(url.as_ref()).await {
Err(_) => "nope",
Ok(links) => {
results.push((url.clone(), links));
"YEA!"
}
};
println!("Got {url} => {res}");
}
results
}
async fn get_links(
url: &str
) -> Result<Vec<Url>, reqwest::Error> {
let a_selector = Selector::parse("a[href]").unwrap();
let body = reqwest::get(url)
.await?
.text()
.await?;
Ok(Html::parse_document(&body)
.select(&a_selector)
.filter_map(| link | link.value().attr("href")
.and_then(| href | Url::parse(href).ok()))
.collect())
}