#How the client will handle a large number of requests

19 messages · Page 1 of 1 (latest)

tawdry owl

Hello, I have some questions about tokio, reqwest, and rust.
What I want to achieve:
I want my program to make queries to the addresses listed in the file and if that address responds to the query add the answer to Array or Vec, while doing it as fast as possible so I decided to add each query to the thread and execute those queries immediately after adding them, but I have some questions.
General questions:

I want it to cancel only the request that was processed longer than timeout.
Also I want it to wait if it doesn't have enough bandwidth for other requests.
I assume this limitation is not possible, so it should execute let's say 100 requests and after they are done execute next batch, right?
2. Why do verified_urls.sort() and other sorting methods return ()?```

The questions about variant 2 of the code:
```1. How do I fix the problem in the "urls" variable?
2. Can I use url in loop without format!("{}", url)?```

I'd be happy if you have any suggestions to make this better.

First version

use reqwest::Client;
use std::time::Duration;
use tokio::sync::mpsc;

fn get_proccent_diff(a: f32, b: f32) -> f32 {
    (((100.0 - 100.0 / (a / b as f32)) * 100.0).round() as i16 as f32 / 100.0) as f32
}

async fn get(client: &Client, url: &String) -> (Result<reqwest::Response, reqwest::Error>, Duration) {
    let time = std::time::Instant::now();
    let res = client.get(url).send().await;
    let time_end = time.elapsed();

    (res, time_end)
}

#[tokio::main]
async fn main() {
    let contents = tokio::fs::read("urls.txt").await.unwrap();
    let utf8 = String::from_utf8_lossy(&contents).to_string();
    let urls_length = utf8.lines().collect::<Vec<&str>>().len() as f32;

    let mut verified_urls = vec![];

    let (tx, mut rx) = mpsc::channel(100);

    tokio::spawn(async move {
        let client = reqwest::Client::builder()
            .timeout(std::time::Duration::new(3, 0))
            .build()
            .unwrap();
        for url in utf8.lines() {
            let client = client.clone();
            let tx = tx.clone();
            let url = format!("{}", url); // I don't think that's the best solution.
            tokio::spawn(async move {
                let res = get(&client, &url).await;
                tx.send(res).await; // Why fix warn "rustc(unused_must_use)"?
            });
        }
    });

    while let Some(res) = rx.recv().await {
        if let Ok(ok) = res.0 {
            verified_urls.push((ok.url().to_string(), res.1));
        }
    }

    println!(
        "||| {:#?} ||| {:?}/{:?} - {:?}% diff",
        verified_urls.sort(), //Why this function returns "()"?
        verified_urls.len(),
        urls_length as i16,
        get_proccent_diff(urls_length, verified_urls.len() as f32) //Percentage of requests that answered not Ok
    );
}

Second version

use reqwest::Client;
use std::time::Duration;
use tokio::sync::mpsc;

fn get_proccent_diff(a: f32, b: f32) -> f32 {
    (((100.0 - 100.0 / (a / b as f32)) * 100.0).round() as i16 as f32 / 100.0) as f32
}

async fn get(client: &Client, url: &String) -> (Result<reqwest::Response, reqwest::Error>, Duration) {
    let time = std::time::Instant::now();
    let res = client.get(url).send().await;
    let time_end = time.elapsed();

    (res, time_end)
}

#[tokio::main]
async fn main() {
    let contents = tokio::fs::read("urls.txt").await.unwrap();
    let utf8 = String::from_utf8_lossy(&contents).to_string();
    let urls = utf8.lines().collect::<Vec<&str>>();
    // error[E0597]: `utf8` does not live long enough
    // `utf8` does not live long enough
    // borrowed value does not live long enough

    let mut verified_urls = vec![];

    let (tx, mut rx) = mpsc::channel(100);

    tokio::spawn(async move {
        let client = reqwest::Client::builder()
            .timeout(std::time::Duration::new(3, 0))
            .build()
            .unwrap();
        for url in urls {
            let client = client.clone();
            let tx = tx.clone();
            let url = format!("{}", url); // I don't think that's the best solution.
            tokio::spawn(async move {
                let res = get(&client, &url).await;
                tx.send(res).await;
            });
        }
    });

    while let Some(res) = rx.recv().await {
        if let Ok(ok) = res.0 {
            verified_urls.push((ok.url().to_string(), res.1));
        }
    }

    println!(
        "||| {:#?} ||| {:?}/{:?} - {:?}% diff",
        verified_urls.sort(), //Why this function returns "()"?
        verified_urls.len(),
        urls.len(),
     // ^^^^ value borrowed here after move
        get_proccent_diff(urls.len() as f32, verified_urls.len() as f32) //Percentage of requests that answered not Ok
    );
}
proven dust

The timeouts are per request

tawdry owl

Can I be sure that every request in this code will be handled correctly no matter how much data is input?

proven dust

Probably?

You may want to limit how many requests are active at once

Youll probably hit an OS-level file limit, and that will start to error requests

(an open socket is considered an open file)

tawdry owl

Okay.
As I understand the client creates a container for each request which stores its status (to work with Future I guess) and a socket for the request? After all the data has been received the status is updated and the socket is closed? He can create many sockets but how will the bandwidth be distributed between them? Is this the work of the OS?

tawdry owl
proven dust
proven dust
tawdry owl
tawdry owl
proven dust

It returns nothing because it changes the vector itself. Check verified_urls after that call

It's like vec.push(4), that doesn't return a new vector, it mutates (changes) the existing one