From b765cb3279ec11ff02c545fa5b541473d855ba4a Mon Sep 17 00:00:00 2001 From: Teascade Date: Mon, 24 Aug 2020 01:36:47 +0300 Subject: [PATCH] Make async work very good --- config.toml | 2 +- src/api.rs | 90 ++++++++++++++++++++++++++++++++++++----------------- src/main.rs | 29 ++++++++++++----- 3 files changed, 84 insertions(+), 37 deletions(-) diff --git a/config.toml b/config.toml index 3952808..a4270ff 100644 --- a/config.toml +++ b/config.toml @@ -4,4 +4,4 @@ states_url = "https://platform.yepzon.com/tags/{tag}/states" locations_url = "https://platform.yepzon.com/tags/{tag}/locations/{state}" timestamp_format = "%Y-%m-%dT%H:%M:%S%.fZ" between_format = "%d.%m.%Y %H:%M:%S" -throttle = 10 +throttle = 9 diff --git a/src/api.rs b/src/api.rs index af02476..40d50a2 100644 --- a/src/api.rs +++ b/src/api.rs @@ -105,39 +105,81 @@ impl API { let url = str::replace(&self.config.locations_url, "{tag}", &tag_id); let url = str::replace(&url, "{state}", &state_id); self.location_req_que.push(url); - //let response = self.request(url)?; - //Ok(response.json()?) } - pub fn begin_location_fetch(&mut self) -> Receiver> { + pub fn begin_location_fetch( + &mut self, + ) -> Receiver, ErrorModel>, GenericError>> { let (sender, receiver) = mpsc::channel(); let locations = self.location_req_que.clone(); let config = self.config.clone(); + + // sleep for a second to make sure no overlaps with previous requests. + thread::sleep(Duration::from_millis(1000)); + thread::spawn(move || { - for location in locations { + let (i_sender, i_receiver) = mpsc::channel(); + + let mut idx = 0; + let mut awaiting = 0; + + let mut timer = 0; + let mut last_time = Instant::now(); + let interval = 1_000_000_000 / config.throttle as u128; + loop { let sender = sender.clone(); let config = config.clone(); - thread::spawn(move || { - println!("Send req!"); - let response = minreq::get(location) - .with_header("content-type", "application/json") - .with_header("x-api-key", &config.api_key) - .send() - .map_err(GenericError::from) - .and_then(|r| { - String::from_utf8(r.into_bytes()).map_err(GenericError::from) + let i_sender = i_sender.clone(); + + let now = Instant::now(); + let delta = (now - last_time).as_nanos(); + if timer > 0 { + timer = timer - delta.min(timer); + } + last_time = now; + + if awaiting <= config.throttle && timer < (interval * (config.throttle as u128 + 2)) + { + if locations.len() > idx { + let location = locations[idx].clone(); + awaiting += 1; + thread::spawn(move || { + let response = minreq::get(location) + .with_header("content-type", "application/json") + .with_header("x-api-key", &config.api_key) + .send() + .and_then(|r| match r.json() { + Ok(loc) => Ok(Ok(loc)), + Err(_) => Ok(Err(r.json()?)), + }) + .map_err(GenericError::from); + i_sender.send(response).ok(); }); - sender.send(response).ok(); - }); - thread::sleep_ms(100); + timer += interval; + idx += 1; + } else { + while awaiting > 0 { + if let Ok(rec) = i_receiver.try_recv() { + awaiting -= 1; + sender.send(rec).ok(); + } + } + break; + } + } else { + if let Ok(rec) = i_receiver.try_recv() { + awaiting -= 1; + sender.send(rec).ok(); + } + } } }); self.location_req_que.clear(); receiver } - pub fn get_between( - list: Vec, + pub fn get_between( + list: &Vec, from: Option, to: Option, inclusive_from: bool, @@ -161,11 +203,11 @@ impl API { } if too_old { if inclusive_from { - timestamped.push((t.clone(), item)); + timestamped.push((t.clone(), item.clone())); } break; } - timestamped.push((t.clone(), item)); + timestamped.push((t.clone(), item.clone())); } Err(e) => panic!(e), } @@ -177,18 +219,10 @@ impl API { } fn request(&mut self, url: String) -> Result { - let before = Instant::now(); let response = minreq::get(url) .with_header("content-type", "application/json") .with_header("x-api-key", &self.config.api_key) .send(); - let after = Instant::now(); - let min_time = Duration::new(0, self.config.throttle * 1_000_000); - let duration = after.duration_since(before); - self.last_response_ping = (duration.as_millis() as u32).max(self.config.throttle); - if min_time > duration { - sleep(min_time - duration); - } response.map_err(GenericError::from) } diff --git a/src/main.rs b/src/main.rs index e413078..a54f4a9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -31,8 +31,8 @@ fn from_env(env: EnvOpt) -> Result<(), GenericError> { run( &config, - Some("1.1.2019 00:00:00".to_owned()), - Some("1.5.2020 00:00:00".to_owned()), + Some("1.1.2015 00:00:00".to_owned()), + Some("1.5.2021 00:00:00".to_owned()), )?; Ok(()) } @@ -56,20 +56,27 @@ fn run(config: &Config, from: Option, to: Option) -> Result<(), let first_tag = tags[0].id.clone().unwrap(); let state_list = api.get_states(&first_tag)?; - let states = API::get_between(state_list.clone(), from, to, true, &config); + let states = API::get_between(&state_list, from, to, true, &config); - //let mut locations = Vec::new(); + let mut locations = Vec::new(); for (_, state) in states.iter() { api.queue_location(&first_tag, state.id.as_ref().unwrap()); } let receiver = api.begin_location_fetch(); + let mut counter = 0; loop { match receiver.try_recv() { Ok(res) => match res { Ok(loc) => { - let mut loc = loc; - println!("Received! {}", loc); - //locations.append(&mut loc); + counter += 1; + println!( + "Currently: {:#.2}%", + counter as f32 / states.len() as f32 * 100. + ); + match loc { + Ok(mut loc) => locations.append(&mut loc), + Err(e) => println!("Error: {}", e.message.unwrap_or(String::new())), + } } Err(e) => eprintln!("{:?}", e), }, @@ -80,8 +87,14 @@ fn run(config: &Config, from: Option, to: Option) -> Result<(), } } } + locations.sort_by(|loc1, loc2| loc2.timestamp.cmp(&loc1.timestamp)); - //dbg!(API::get_between(locations, from, to, false, &config)); + let locs = API::get_between(&locations, from, to, false, &config); + dbg!( + &locs.iter().map(|loc| loc.0).collect::>(), + locs.len(), + locations.len() + ); Ok(()) }