Add multithreading

This commit is contained in:
Sofia 2020-08-23 20:54:34 +03:00
parent 96c03ae7ed
commit f45a39b960
6 changed files with 81 additions and 21 deletions

4
Cargo.lock generated
View File

@ -126,9 +126,9 @@ dependencies = [
[[package]] [[package]]
name = "minreq" name = "minreq"
version = "2.2.0" version = "2.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab229c252995e9d56cc66857f3ab2c41e3138b1a6c92089f013698388e64d6bd" checksum = "2466d0a7e6bfcd54f69e4a17d4a4318985aaaf7fe3df4cd3b6f11ff551129ca3"
dependencies = [ dependencies = [
"lazy_static", "lazy_static",
"rustls", "rustls",

View File

@ -1,7 +1,7 @@
api_key = "" api_key = "E3GrOiQAnY61BP623XXzt9Fo87A1IQrS1FFzD57P"
tags_url = "https://platform.yepzon.com/tags" tags_url = "https://platform.yepzon.com/tags"
states_url = "https://platform.yepzon.com/tags/{tag}/states" states_url = "https://platform.yepzon.com/tags/{tag}/states"
locations_url = "https://platform.yepzon.com/tags/{tag}/locations/{state}" locations_url = "https://platform.yepzon.com/tags/{tag}/locations/{state}"
timestamp_format = "%Y-%m-%dT%H:%M:%S%.fZ" timestamp_format = "%Y-%m-%dT%H:%M:%S%.fZ"
between_format = "%d.%m.%Y %H:%M:%S" between_format = "%d.%m.%Y %H:%M:%S"
throttle = 110 throttle = 10

View File

@ -2,9 +2,13 @@ use super::Config;
use super::GenericError; use super::GenericError;
use chrono::format::ParseError; use chrono::format::ParseError;
use chrono::NaiveDateTime; use chrono::NaiveDateTime;
use minreq::{Error, Response}; use minreq::Response;
use serde::Deserialize; use serde::Deserialize;
use std::sync::mpsc;
use std::sync::mpsc::Receiver;
use std::thread;
use std::thread::sleep; use std::thread::sleep;
use std::thread::Thread;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
pub trait Timestamped { pub trait Timestamped {
@ -67,6 +71,8 @@ impl Timestamped for LocationModel {
pub struct API { pub struct API {
config: Config, config: Config,
pub last_response_ping: u32, pub last_response_ping: u32,
location_req_que: Vec<String>,
} }
impl API { impl API {
@ -74,6 +80,8 @@ impl API {
API { API {
last_response_ping: config.throttle, last_response_ping: config.throttle,
config: config, config: config,
location_req_que: Vec::new(),
} }
} }
@ -93,15 +101,39 @@ impl API {
Ok(response.json()?) Ok(response.json()?)
} }
pub fn get_locations( pub fn queue_location(&mut self, tag_id: &String, state_id: &String) {
&mut self,
tag_id: &String,
state_id: &String,
) -> Result<Vec<LocationModel>, GenericError> {
let url = str::replace(&self.config.locations_url, "{tag}", &tag_id); let url = str::replace(&self.config.locations_url, "{tag}", &tag_id);
let url = str::replace(&url, "{state}", &state_id); let url = str::replace(&url, "{state}", &state_id);
let response = self.request(url)?; self.location_req_que.push(url);
Ok(response.json()?) //let response = self.request(url)?;
//Ok(response.json()?)
}
pub fn begin_location_fetch(&mut self) -> Receiver<Result<String, GenericError>> {
let (sender, receiver) = mpsc::channel();
let locations = self.location_req_que.clone();
let config = self.config.clone();
thread::spawn(move || {
for location in locations {
let sender = sender.clone();
let config = config.clone();
thread::spawn(move || {
println!("Send req!");
let response = minreq::get(location)
.with_header("content-type", "application/json")
.with_header("x-api-key", &config.api_key)
.send()
.map_err(GenericError::from)
.and_then(|r| {
String::from_utf8(r.into_bytes()).map_err(GenericError::from)
});
sender.send(response).ok();
});
thread::sleep_ms(100);
}
});
self.location_req_que.clear();
receiver
} }
pub fn get_between<T: Timestamped>( pub fn get_between<T: Timestamped>(

View File

@ -26,7 +26,7 @@ impl Default for Config {
timestamp_format: "%Y-%m-%dT%H:%M:%S%.fZ".to_owned(), timestamp_format: "%Y-%m-%dT%H:%M:%S%.fZ".to_owned(),
between_format: "%d.%m.%Y %H:%M:%S".to_owned(), between_format: "%d.%m.%Y %H:%M:%S".to_owned(),
throttle: 110, throttle: 10,
} }
} }
} }

View File

@ -8,6 +8,8 @@ pub enum GenericError {
MinreqError(minreq::Error), MinreqError(minreq::Error),
ChronoParseError(chrono::ParseError), ChronoParseError(chrono::ParseError),
IOError(io::Error), IOError(io::Error),
FromUTF8Error(std::string::FromUtf8Error),
OutOfLocations,
} }
impl From<toml::de::Error> for GenericError { impl From<toml::de::Error> for GenericError {
@ -39,3 +41,9 @@ impl From<io::Error> for GenericError {
GenericError::IOError(error) GenericError::IOError(error)
} }
} }
impl From<std::string::FromUtf8Error> for GenericError {
fn from(error: std::string::FromUtf8Error) -> Self {
GenericError::FromUTF8Error(error)
}
}

View File

@ -10,6 +10,7 @@ use config::Config;
use errors::GenericError; use errors::GenericError;
use std::fs::File; use std::fs::File;
use std::io::prelude::*; use std::io::prelude::*;
use std::sync::mpsc::TryRecvError;
fn main() { fn main() {
let env: EnvOpt = argh::from_env(); let env: EnvOpt = argh::from_env();
@ -28,7 +29,11 @@ fn from_env(env: EnvOpt) -> Result<(), GenericError> {
let config: Config = toml::from_str(&string)?; let config: Config = toml::from_str(&string)?;
run(&config, None, None)?; run(
&config,
Some("1.1.2019 00:00:00".to_owned()),
Some("1.5.2020 00:00:00".to_owned()),
)?;
Ok(()) Ok(())
} }
Subcommand::Init(opt) => { Subcommand::Init(opt) => {
@ -52,16 +57,31 @@ fn run(config: &Config, from: Option<String>, to: Option<String>) -> Result<(),
let state_list = api.get_states(&first_tag)?; let state_list = api.get_states(&first_tag)?;
let states = API::get_between(state_list.clone(), from, to, true, &config); let states = API::get_between(state_list.clone(), from, to, true, &config);
let len = states.len();
let mut locations = Vec::new(); //let mut locations = Vec::new();
for (idx, (_, state)) in states.iter().enumerate() { for (_, state) in states.iter() {
println!("Expected {}s left", exp_time(&api, (len - idx) as u32)); api.queue_location(&first_tag, state.id.as_ref().unwrap());
let mut location_list = api.get_locations(&first_tag, state.id.as_ref().unwrap())?; }
locations.append(&mut location_list); let receiver = api.begin_location_fetch();
loop {
match receiver.try_recv() {
Ok(res) => match res {
Ok(loc) => {
let mut loc = loc;
println!("Received! {}", loc);
//locations.append(&mut loc);
}
Err(e) => eprintln!("{:?}", e),
},
Err(e) => {
if let TryRecvError::Disconnected = e {
break;
}
}
}
} }
dbg!(API::get_between(locations, from, to, false, &config)); //dbg!(API::get_between(locations, from, to, false, &config));
Ok(()) Ok(())
} }