Make async work very good

This commit is contained in:
Sofia 2020-08-24 01:36:47 +03:00
parent f45a39b960
commit b765cb3279
3 changed files with 84 additions and 37 deletions

View File

@ -4,4 +4,4 @@ states_url = "https://platform.yepzon.com/tags/{tag}/states"
locations_url = "https://platform.yepzon.com/tags/{tag}/locations/{state}" locations_url = "https://platform.yepzon.com/tags/{tag}/locations/{state}"
timestamp_format = "%Y-%m-%dT%H:%M:%S%.fZ" timestamp_format = "%Y-%m-%dT%H:%M:%S%.fZ"
between_format = "%d.%m.%Y %H:%M:%S" between_format = "%d.%m.%Y %H:%M:%S"
throttle = 10 throttle = 9

View File

@ -105,39 +105,81 @@ impl API {
let url = str::replace(&self.config.locations_url, "{tag}", &tag_id); let url = str::replace(&self.config.locations_url, "{tag}", &tag_id);
let url = str::replace(&url, "{state}", &state_id); let url = str::replace(&url, "{state}", &state_id);
self.location_req_que.push(url); self.location_req_que.push(url);
//let response = self.request(url)?;
//Ok(response.json()?)
} }
pub fn begin_location_fetch(&mut self) -> Receiver<Result<String, GenericError>> { pub fn begin_location_fetch(
&mut self,
) -> Receiver<Result<Result<Vec<LocationModel>, ErrorModel>, GenericError>> {
let (sender, receiver) = mpsc::channel(); let (sender, receiver) = mpsc::channel();
let locations = self.location_req_que.clone(); let locations = self.location_req_que.clone();
let config = self.config.clone(); let config = self.config.clone();
// sleep for a second to make sure no overlaps with previous requests.
thread::sleep(Duration::from_millis(1000));
thread::spawn(move || { thread::spawn(move || {
for location in locations { let (i_sender, i_receiver) = mpsc::channel();
let mut idx = 0;
let mut awaiting = 0;
let mut timer = 0;
let mut last_time = Instant::now();
let interval = 1_000_000_000 / config.throttle as u128;
loop {
let sender = sender.clone(); let sender = sender.clone();
let config = config.clone(); let config = config.clone();
let i_sender = i_sender.clone();
let now = Instant::now();
let delta = (now - last_time).as_nanos();
if timer > 0 {
timer = timer - delta.min(timer);
}
last_time = now;
if awaiting <= config.throttle && timer < (interval * (config.throttle as u128 + 2))
{
if locations.len() > idx {
let location = locations[idx].clone();
awaiting += 1;
thread::spawn(move || { thread::spawn(move || {
println!("Send req!");
let response = minreq::get(location) let response = minreq::get(location)
.with_header("content-type", "application/json") .with_header("content-type", "application/json")
.with_header("x-api-key", &config.api_key) .with_header("x-api-key", &config.api_key)
.send() .send()
.map_err(GenericError::from) .and_then(|r| match r.json() {
.and_then(|r| { Ok(loc) => Ok(Ok(loc)),
String::from_utf8(r.into_bytes()).map_err(GenericError::from) Err(_) => Ok(Err(r.json()?)),
})
.map_err(GenericError::from);
i_sender.send(response).ok();
}); });
sender.send(response).ok(); timer += interval;
}); idx += 1;
thread::sleep_ms(100); } else {
while awaiting > 0 {
if let Ok(rec) = i_receiver.try_recv() {
awaiting -= 1;
sender.send(rec).ok();
}
}
break;
}
} else {
if let Ok(rec) = i_receiver.try_recv() {
awaiting -= 1;
sender.send(rec).ok();
}
}
} }
}); });
self.location_req_que.clear(); self.location_req_que.clear();
receiver receiver
} }
pub fn get_between<T: Timestamped>( pub fn get_between<T: Timestamped + Clone>(
list: Vec<T>, list: &Vec<T>,
from: Option<NaiveDateTime>, from: Option<NaiveDateTime>,
to: Option<NaiveDateTime>, to: Option<NaiveDateTime>,
inclusive_from: bool, inclusive_from: bool,
@ -161,11 +203,11 @@ impl API {
} }
if too_old { if too_old {
if inclusive_from { if inclusive_from {
timestamped.push((t.clone(), item)); timestamped.push((t.clone(), item.clone()));
} }
break; break;
} }
timestamped.push((t.clone(), item)); timestamped.push((t.clone(), item.clone()));
} }
Err(e) => panic!(e), Err(e) => panic!(e),
} }
@ -177,18 +219,10 @@ impl API {
} }
fn request(&mut self, url: String) -> Result<Response, GenericError> { fn request(&mut self, url: String) -> Result<Response, GenericError> {
let before = Instant::now();
let response = minreq::get(url) let response = minreq::get(url)
.with_header("content-type", "application/json") .with_header("content-type", "application/json")
.with_header("x-api-key", &self.config.api_key) .with_header("x-api-key", &self.config.api_key)
.send(); .send();
let after = Instant::now();
let min_time = Duration::new(0, self.config.throttle * 1_000_000);
let duration = after.duration_since(before);
self.last_response_ping = (duration.as_millis() as u32).max(self.config.throttle);
if min_time > duration {
sleep(min_time - duration);
}
response.map_err(GenericError::from) response.map_err(GenericError::from)
} }

View File

@ -31,8 +31,8 @@ fn from_env(env: EnvOpt) -> Result<(), GenericError> {
run( run(
&config, &config,
Some("1.1.2019 00:00:00".to_owned()), Some("1.1.2015 00:00:00".to_owned()),
Some("1.5.2020 00:00:00".to_owned()), Some("1.5.2021 00:00:00".to_owned()),
)?; )?;
Ok(()) Ok(())
} }
@ -56,20 +56,27 @@ fn run(config: &Config, from: Option<String>, to: Option<String>) -> Result<(),
let first_tag = tags[0].id.clone().unwrap(); let first_tag = tags[0].id.clone().unwrap();
let state_list = api.get_states(&first_tag)?; let state_list = api.get_states(&first_tag)?;
let states = API::get_between(state_list.clone(), from, to, true, &config); let states = API::get_between(&state_list, from, to, true, &config);
//let mut locations = Vec::new(); let mut locations = Vec::new();
for (_, state) in states.iter() { for (_, state) in states.iter() {
api.queue_location(&first_tag, state.id.as_ref().unwrap()); api.queue_location(&first_tag, state.id.as_ref().unwrap());
} }
let receiver = api.begin_location_fetch(); let receiver = api.begin_location_fetch();
let mut counter = 0;
loop { loop {
match receiver.try_recv() { match receiver.try_recv() {
Ok(res) => match res { Ok(res) => match res {
Ok(loc) => { Ok(loc) => {
let mut loc = loc; counter += 1;
println!("Received! {}", loc); println!(
//locations.append(&mut loc); "Currently: {:#.2}%",
counter as f32 / states.len() as f32 * 100.
);
match loc {
Ok(mut loc) => locations.append(&mut loc),
Err(e) => println!("Error: {}", e.message.unwrap_or(String::new())),
}
} }
Err(e) => eprintln!("{:?}", e), Err(e) => eprintln!("{:?}", e),
}, },
@ -80,8 +87,14 @@ fn run(config: &Config, from: Option<String>, to: Option<String>) -> Result<(),
} }
} }
} }
locations.sort_by(|loc1, loc2| loc2.timestamp.cmp(&loc1.timestamp));
//dbg!(API::get_between(locations, from, to, false, &config)); let locs = API::get_between(&locations, from, to, false, &config);
dbg!(
&locs.iter().map(|loc| loc.0).collect::<Vec<NaiveDateTime>>(),
locs.len(),
locations.len()
);
Ok(()) Ok(())
} }