Make erronous locations requeue
This commit is contained in:
parent
4046f018c3
commit
8187f1e184
35
src/api.rs
35
src/api.rs
@ -160,7 +160,7 @@ impl API {
|
|||||||
&mut self,
|
&mut self,
|
||||||
) -> Receiver<Result<Result<Vec<LocationModel>, ErrorModel>, GenericError>> {
|
) -> Receiver<Result<Result<Vec<LocationModel>, ErrorModel>, GenericError>> {
|
||||||
let (sender, receiver) = mpsc::channel();
|
let (sender, receiver) = mpsc::channel();
|
||||||
let locations = self.location_req_que.clone();
|
let mut locations = self.location_req_que.clone();
|
||||||
let config = self.config.clone();
|
let config = self.config.clone();
|
||||||
|
|
||||||
// sleep for a second to make sure no overlaps with previous requests.
|
// sleep for a second to make sure no overlaps with previous requests.
|
||||||
@ -193,7 +193,7 @@ impl API {
|
|||||||
let location = locations[idx].clone();
|
let location = locations[idx].clone();
|
||||||
awaiting += 1;
|
awaiting += 1;
|
||||||
thread::spawn(move || {
|
thread::spawn(move || {
|
||||||
let response = minreq::get(location)
|
let response = minreq::get(location.clone())
|
||||||
.with_header("content-type", "application/json")
|
.with_header("content-type", "application/json")
|
||||||
.with_header("x-api-key", &config.api_key)
|
.with_header("x-api-key", &config.api_key)
|
||||||
.send()
|
.send()
|
||||||
@ -202,21 +202,29 @@ impl API {
|
|||||||
Err(_) => Ok(Err(r.json()?)),
|
Err(_) => Ok(Err(r.json()?)),
|
||||||
})
|
})
|
||||||
.map_err(GenericError::from);
|
.map_err(GenericError::from);
|
||||||
i_sender.send(response).ok();
|
i_sender.send((location, response)).ok();
|
||||||
});
|
});
|
||||||
timer += interval;
|
timer += interval;
|
||||||
idx += 1;
|
idx += 1;
|
||||||
} else {
|
} else {
|
||||||
while awaiting > 0 {
|
while awaiting > 0 {
|
||||||
if let Ok(rec) = i_receiver.try_recv() {
|
if let Ok((loc, rec)) = i_receiver.try_recv() {
|
||||||
|
if API::add_error_back(loc, &rec, &mut locations) {
|
||||||
|
println!("Sent back to queue");
|
||||||
|
}
|
||||||
awaiting -= 1;
|
awaiting -= 1;
|
||||||
sender.send(rec).ok();
|
sender.send(rec).ok();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if locations.len() <= idx {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
if let Ok(rec) = i_receiver.try_recv() {
|
if let Ok((loc, rec)) = i_receiver.try_recv() {
|
||||||
|
if API::add_error_back(loc, &rec, &mut locations) {
|
||||||
|
println!("Sent back to queue");
|
||||||
|
}
|
||||||
awaiting -= 1;
|
awaiting -= 1;
|
||||||
sender.send(rec).ok();
|
sender.send(rec).ok();
|
||||||
}
|
}
|
||||||
@ -227,6 +235,23 @@ impl API {
|
|||||||
receiver
|
receiver
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn add_error_back(
|
||||||
|
location: String,
|
||||||
|
rec: &Result<Result<Vec<LocationModel>, ErrorModel>, GenericError>,
|
||||||
|
locations: &mut Vec<String>,
|
||||||
|
) -> bool {
|
||||||
|
if let Ok(inner) = rec {
|
||||||
|
if let Err(_) = inner {
|
||||||
|
locations.push(location);
|
||||||
|
true
|
||||||
|
} else {
|
||||||
|
false
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub fn get_between<T: Timestamped + Clone>(
|
pub fn get_between<T: Timestamped + Clone>(
|
||||||
list: &Vec<T>,
|
list: &Vec<T>,
|
||||||
from: Option<NaiveDateTime>,
|
from: Option<NaiveDateTime>,
|
||||||
|
@ -119,7 +119,9 @@ fn run(
|
|||||||
match receiver.try_recv() {
|
match receiver.try_recv() {
|
||||||
Ok(res) => match res {
|
Ok(res) => match res {
|
||||||
Ok(loc) => {
|
Ok(loc) => {
|
||||||
|
if loc.is_ok() {
|
||||||
counter += 1;
|
counter += 1;
|
||||||
|
}
|
||||||
let remaining = exp_time(&api, states.len() as u64 - counter as u64);
|
let remaining = exp_time(&api, states.len() as u64 - counter as u64);
|
||||||
print!(
|
print!(
|
||||||
"Done: {:<5.2}% Remaining: {:<5.2} seconds\r",
|
"Done: {:<5.2}% Remaining: {:<5.2} seconds\r",
|
||||||
|
Loading…
Reference in New Issue
Block a user