From f45a39b9603fd7ad18b3e8271e9f4228770cb249 Mon Sep 17 00:00:00 2001 From: Teascade Date: Sun, 23 Aug 2020 20:54:34 +0300 Subject: [PATCH] Add multithreading --- Cargo.lock | 4 ++-- config.toml | 4 ++-- src/api.rs | 48 ++++++++++++++++++++++++++++++++++++++++-------- src/config.rs | 2 +- src/errors.rs | 8 ++++++++ src/main.rs | 36 ++++++++++++++++++++++++++++-------- 6 files changed, 81 insertions(+), 21 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2f2ea5d..5005905 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -126,9 +126,9 @@ dependencies = [ [[package]] name = "minreq" -version = "2.2.0" +version = "2.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab229c252995e9d56cc66857f3ab2c41e3138b1a6c92089f013698388e64d6bd" +checksum = "2466d0a7e6bfcd54f69e4a17d4a4318985aaaf7fe3df4cd3b6f11ff551129ca3" dependencies = [ "lazy_static", "rustls", diff --git a/config.toml b/config.toml index 9163a5e..3952808 100644 --- a/config.toml +++ b/config.toml @@ -1,7 +1,7 @@ -api_key = "" +api_key = "E3GrOiQAnY61BP623XXzt9Fo87A1IQrS1FFzD57P" tags_url = "https://platform.yepzon.com/tags" states_url = "https://platform.yepzon.com/tags/{tag}/states" locations_url = "https://platform.yepzon.com/tags/{tag}/locations/{state}" timestamp_format = "%Y-%m-%dT%H:%M:%S%.fZ" between_format = "%d.%m.%Y %H:%M:%S" -throttle = 110 +throttle = 10 diff --git a/src/api.rs b/src/api.rs index e91ad38..af02476 100644 --- a/src/api.rs +++ b/src/api.rs @@ -2,9 +2,13 @@ use super::Config; use super::GenericError; use chrono::format::ParseError; use chrono::NaiveDateTime; -use minreq::{Error, Response}; +use minreq::Response; use serde::Deserialize; +use std::sync::mpsc; +use std::sync::mpsc::Receiver; +use std::thread; use std::thread::sleep; +use std::thread::Thread; use std::time::{Duration, Instant}; pub trait Timestamped { @@ -67,6 +71,8 @@ impl Timestamped for LocationModel { pub struct API { config: Config, pub last_response_ping: u32, + + location_req_que: Vec, } impl API { @@ -74,6 +80,8 @@ impl API { API { last_response_ping: config.throttle, config: config, + + location_req_que: Vec::new(), } } @@ -93,15 +101,39 @@ impl API { Ok(response.json()?) } - pub fn get_locations( - &mut self, - tag_id: &String, - state_id: &String, - ) -> Result, GenericError> { + pub fn queue_location(&mut self, tag_id: &String, state_id: &String) { let url = str::replace(&self.config.locations_url, "{tag}", &tag_id); let url = str::replace(&url, "{state}", &state_id); - let response = self.request(url)?; - Ok(response.json()?) + self.location_req_que.push(url); + //let response = self.request(url)?; + //Ok(response.json()?) + } + + pub fn begin_location_fetch(&mut self) -> Receiver> { + let (sender, receiver) = mpsc::channel(); + let locations = self.location_req_que.clone(); + let config = self.config.clone(); + thread::spawn(move || { + for location in locations { + let sender = sender.clone(); + let config = config.clone(); + thread::spawn(move || { + println!("Send req!"); + let response = minreq::get(location) + .with_header("content-type", "application/json") + .with_header("x-api-key", &config.api_key) + .send() + .map_err(GenericError::from) + .and_then(|r| { + String::from_utf8(r.into_bytes()).map_err(GenericError::from) + }); + sender.send(response).ok(); + }); + thread::sleep_ms(100); + } + }); + self.location_req_que.clear(); + receiver } pub fn get_between( diff --git a/src/config.rs b/src/config.rs index a7b9e23..9743f23 100644 --- a/src/config.rs +++ b/src/config.rs @@ -26,7 +26,7 @@ impl Default for Config { timestamp_format: "%Y-%m-%dT%H:%M:%S%.fZ".to_owned(), between_format: "%d.%m.%Y %H:%M:%S".to_owned(), - throttle: 110, + throttle: 10, } } } diff --git a/src/errors.rs b/src/errors.rs index ab709cb..bb42763 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -8,6 +8,8 @@ pub enum GenericError { MinreqError(minreq::Error), ChronoParseError(chrono::ParseError), IOError(io::Error), + FromUTF8Error(std::string::FromUtf8Error), + OutOfLocations, } impl From for GenericError { @@ -39,3 +41,9 @@ impl From for GenericError { GenericError::IOError(error) } } + +impl From for GenericError { + fn from(error: std::string::FromUtf8Error) -> Self { + GenericError::FromUTF8Error(error) + } +} diff --git a/src/main.rs b/src/main.rs index 84d6ee4..e413078 100644 --- a/src/main.rs +++ b/src/main.rs @@ -10,6 +10,7 @@ use config::Config; use errors::GenericError; use std::fs::File; use std::io::prelude::*; +use std::sync::mpsc::TryRecvError; fn main() { let env: EnvOpt = argh::from_env(); @@ -28,7 +29,11 @@ fn from_env(env: EnvOpt) -> Result<(), GenericError> { let config: Config = toml::from_str(&string)?; - run(&config, None, None)?; + run( + &config, + Some("1.1.2019 00:00:00".to_owned()), + Some("1.5.2020 00:00:00".to_owned()), + )?; Ok(()) } Subcommand::Init(opt) => { @@ -52,16 +57,31 @@ fn run(config: &Config, from: Option, to: Option) -> Result<(), let state_list = api.get_states(&first_tag)?; let states = API::get_between(state_list.clone(), from, to, true, &config); - let len = states.len(); - let mut locations = Vec::new(); - for (idx, (_, state)) in states.iter().enumerate() { - println!("Expected {}s left", exp_time(&api, (len - idx) as u32)); - let mut location_list = api.get_locations(&first_tag, state.id.as_ref().unwrap())?; - locations.append(&mut location_list); + //let mut locations = Vec::new(); + for (_, state) in states.iter() { + api.queue_location(&first_tag, state.id.as_ref().unwrap()); + } + let receiver = api.begin_location_fetch(); + loop { + match receiver.try_recv() { + Ok(res) => match res { + Ok(loc) => { + let mut loc = loc; + println!("Received! {}", loc); + //locations.append(&mut loc); + } + Err(e) => eprintln!("{:?}", e), + }, + Err(e) => { + if let TryRecvError::Disconnected = e { + break; + } + } + } } - dbg!(API::get_between(locations, from, to, false, &config)); + //dbg!(API::get_between(locations, from, to, false, &config)); Ok(()) }