add multi-channels example
This commit is contained in:
parent
7dc60cd74a
commit
f62a84ad07
82
examples/livecoding_channel.rs
Normal file
82
examples/livecoding_channel.rs
Normal file
@ -0,0 +1,82 @@
|
|||||||
|
use scraper::{Html,Selector};
|
||||||
|
use url::Url;
|
||||||
|
use tokio::sync::mpsc::{self, Sender}; //, Receiver};
|
||||||
|
|
||||||
|
const WORKERS : usize = 8;
|
||||||
|
|
||||||
|
type SiteStat = (Url, Vec<Url>);
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
let addr = std::env::args().nth(1)
|
||||||
|
.unwrap_or_else(|| "https://www.tmplab.org".to_string());
|
||||||
|
|
||||||
|
let links = get_links(addr.as_ref()).await?;
|
||||||
|
let addr = Url::parse(addr.as_ref())?;
|
||||||
|
let mut links : Vec<Url> = links.into_iter()
|
||||||
|
.filter(| url | url.host() != addr.host())
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let mut to_fetch = links.len();
|
||||||
|
let (tx, mut rx) = mpsc::channel(32);
|
||||||
|
// spawn a pool of workers to get the things started...
|
||||||
|
for _ in 0..WORKERS {
|
||||||
|
if let Some(addr) = links.pop() {
|
||||||
|
spawn_worker(addr, tx.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut results = vec![];
|
||||||
|
// gets the results back from the workers through the channel
|
||||||
|
while let Some(res) = rx.recv().await {
|
||||||
|
to_fetch -= 1;
|
||||||
|
let Some(site) = res else {
|
||||||
|
continue
|
||||||
|
};
|
||||||
|
results.push(site);
|
||||||
|
// if there are still urls to fetch, pop one and spawn a new worker
|
||||||
|
// otherwise we want to break this loop if all the urls
|
||||||
|
// have been fetched...
|
||||||
|
if let Some(addr) = links.pop() {
|
||||||
|
spawn_worker(addr, tx.clone());
|
||||||
|
} else if to_fetch == 0 {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (url, links) in results {
|
||||||
|
println!("{url} : {} links", links.len())
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
// interestingly, this function must not be async...
|
||||||
|
//a worker fetch one url, send it's result back in the channel, and terminates.
|
||||||
|
fn spawn_worker(url: Url, tx: Sender<Option<SiteStat>>) {
|
||||||
|
println!("Start fetching {url}");
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let res = get_links(url.as_ref()).await
|
||||||
|
.ok()
|
||||||
|
.map(| v | (url.clone(), v));
|
||||||
|
println!("Got {url}");
|
||||||
|
tx.send(res).await.unwrap()
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_links(
|
||||||
|
url: &str
|
||||||
|
) -> Result<Vec<Url>, reqwest::Error> {
|
||||||
|
let a_selector = Selector::parse("a[href]").unwrap();
|
||||||
|
|
||||||
|
let body = reqwest::get(url)
|
||||||
|
.await?
|
||||||
|
.text()
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(Html::parse_document(&body)
|
||||||
|
.select(&a_selector)
|
||||||
|
.filter_map(| link | link.value().attr("href")
|
||||||
|
.and_then(| href | Url::parse(href).ok()))
|
||||||
|
.collect())
|
||||||
|
}
|
@ -10,58 +10,68 @@ type SiteStat = (Url, Vec<Url>);
|
|||||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
let addr = std::env::args().nth(1)
|
let addr = std::env::args().nth(1)
|
||||||
.unwrap_or_else(|| "https://www.tmplab.org".to_string());
|
.unwrap_or_else(|| "https://www.tmplab.org".to_string());
|
||||||
|
|
||||||
let links = get_links(addr.as_ref()).await?;
|
let links = get_links(addr.as_ref()).await?;
|
||||||
let addr = Url::parse(addr.as_ref())?;
|
let addr = Url::parse(addr.as_ref())?;
|
||||||
let mut links : Vec<Url> = links.into_iter()
|
let mut links : Vec<Url> = links.into_iter()
|
||||||
.filter(| url | url.host() != addr.host())
|
.filter(| url | url.host() != addr.host())
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
let mut to_fetch = links.len();
|
|
||||||
let (tx, mut rx) = mpsc::channel(32);
|
let (tx, mut rx) = mpsc::channel(32);
|
||||||
// spawn a pool of workers to get the things started...
|
|
||||||
for _ in 0..WORKERS {
|
|
||||||
if let Some(addr) = links.pop() {
|
|
||||||
spawn_worker(addr, tx.clone());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
// spawn a pool of workers
|
||||||
|
let mut workers : Vec<Option<Sender<Url>>> = (0..WORKERS)
|
||||||
|
.map(| n | Some(spawn_worker(n, tx.clone())))
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
// send 1 url to each worker to get things started
|
||||||
|
for tx in &workers {
|
||||||
|
if let Some(url) = links.pop() {
|
||||||
|
tx.as_ref().unwrap().send(url).await.unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
println!("links: {}", links.len());
|
||||||
let mut results = vec![];
|
let mut results = vec![];
|
||||||
// gets the results back from the workers through the channel
|
// recieve the results and send back new url to the workers
|
||||||
while let Some(res) = rx.recv().await {
|
while let Some((idx, res)) = rx.recv().await {
|
||||||
to_fetch -= 1;
|
if let Some(res) = res {
|
||||||
let Some(site) = res else {
|
results.push(res);
|
||||||
continue
|
}
|
||||||
};
|
|
||||||
results.push(site);
|
|
||||||
// if there are still urls to fetch, pop one and spawn a new worker
|
|
||||||
// otherwise we want to break this loop if all the urls
|
|
||||||
// have been fetched...
|
|
||||||
if let Some(addr) = links.pop() {
|
if let Some(addr) = links.pop() {
|
||||||
spawn_worker(addr, tx.clone());
|
workers[idx].as_ref().unwrap().send(addr).await.unwrap();
|
||||||
} else if to_fetch == 0 {
|
} else {
|
||||||
|
workers[idx].take().unwrap();
|
||||||
|
if workers.iter().filter(| tx | tx.is_some()).count() == 0 {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for (url, links) in results {
|
|
||||||
println!("{url} : {} links", links.len())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for (url, links) in &results {
|
||||||
|
println!("{url} : {} links", links.len())
|
||||||
|
}
|
||||||
|
println!("TOTAL: {}", results.len());
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
// interestingly, this function must not be async...
|
fn spawn_worker(
|
||||||
//a worker fetch one url, send it's result back in the channel, and terminates.
|
id: usize,
|
||||||
fn spawn_worker(url: Url, tx: Sender<Option<SiteStat>>) {
|
tx: Sender<(usize,Option<SiteStat>)>
|
||||||
println!("Start fetching {url}");
|
) -> Sender<Url> {
|
||||||
|
let (tx1, mut rx) = mpsc::channel::<Url>(16);
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
|
println!("Start worker {id}");
|
||||||
|
while let Some(url) = rx.recv().await {
|
||||||
|
println!("Start fetching {url}");
|
||||||
let res = get_links(url.as_ref()).await
|
let res = get_links(url.as_ref()).await
|
||||||
.ok()
|
.map(| v | (url.clone(), v)).ok();
|
||||||
.map(| v | (url.clone(), v));
|
|
||||||
println!("Got {url}");
|
println!("Got {url}");
|
||||||
tx.send(res).await.unwrap()
|
tx.send((id, res)).await.unwrap()
|
||||||
|
}
|
||||||
|
println!("Terminate worker {id}");
|
||||||
});
|
});
|
||||||
|
|
||||||
|
tx1
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn get_links(
|
async fn get_links(
|
||||||
|
@ -11,11 +11,11 @@ type SiteStat = (Url, Vec<Url>);
|
|||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
let addr = std::env::args().nth(1)
|
let addr = std::env::args().nth(1)
|
||||||
.unwrap_or_else(|| "www.tmplab.org".to_string());
|
.unwrap_or_else(|| "https://www.tmplab.org".to_string());
|
||||||
|
|
||||||
let links = get_links(addr.as_ref()).await?;
|
let links = get_links(addr.as_ref()).await?;
|
||||||
let links : Vec<Url> = links.into_iter()
|
let addr = Url::parse(addr.as_ref())?;
|
||||||
.filter(| url | url.host() != Some(Host::Domain(&addr)))
|
let mut links : Vec<Url> = links.into_iter()
|
||||||
|
.filter(| url | url.host() != addr.host())
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
let links = Arc::new(Mutex::new(links));
|
let links = Arc::new(Mutex::new(links));
|
||||||
@ -23,15 +23,16 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||||||
let joins = (0..WORKERS)
|
let joins = (0..WORKERS)
|
||||||
.map(| n | spawn_worker(n, &links));
|
.map(| n | spawn_worker(n, &links));
|
||||||
|
|
||||||
let results = future::join_all(joins).await
|
let results : Vec<_>= future::join_all(joins).await
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.filter_map(| r | r.ok())
|
.filter_map(| r | r.ok())
|
||||||
.flatten();
|
.flatten()
|
||||||
|
.collect();
|
||||||
|
|
||||||
for (url, links) in results {
|
for (url, links) in &results {
|
||||||
println!("{url} : {} links", links.len())
|
println!("{url} : {} links", links.len())
|
||||||
}
|
}
|
||||||
|
println!("TOTAL: {}", results.len());
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -7,9 +7,9 @@ type SiteStat = (Url, Vec<Url>);
|
|||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
let addr = std::env::args().nth(1)
|
let addr = std::env::args().nth(1)
|
||||||
.unwrap_or_else(|| "www.tmplab.org".to_string());
|
.unwrap_or_else(|| "https://tmplab.org".to_string());
|
||||||
|
|
||||||
let links = get_links("https://tmplab.org").await?;
|
let links = get_links(addr.as_ref()).await?;
|
||||||
let links : Vec<Url> = links.into_iter()
|
let links : Vec<Url> = links.into_iter()
|
||||||
.filter(| url | url.host() != Some(Host::Domain(&addr)))
|
.filter(| url | url.host() != Some(Host::Domain(&addr)))
|
||||||
.collect();
|
.collect();
|
||||||
|
@ -9,6 +9,7 @@ Try:
|
|||||||
> cargo run --release --example livecoding
|
> cargo run --release --example livecoding
|
||||||
> cargo run --release --example livecoding_cleaner
|
> cargo run --release --example livecoding_cleaner
|
||||||
> cargo run --release --example livecoding_simple
|
> cargo run --release --example livecoding_simple
|
||||||
|
> cargo run --release --example livecoding_channel
|
||||||
> cargo run --release --example livecoding_channels
|
> cargo run --release --example livecoding_channels
|
||||||
"#);
|
"#);
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user