293 lines
9.5 KiB
Rust
293 lines
9.5 KiB
Rust
pub mod structs;
|
|
|
|
use super::Config;
|
|
use super::GenericError;
|
|
use chrono::format::ParseError;
|
|
use chrono::NaiveDateTime;
|
|
use minreq::Response;
|
|
use std::fmt::Display;
|
|
use std::sync::mpsc;
|
|
use std::sync::mpsc::Receiver;
|
|
use std::thread;
|
|
use std::time::{Duration, Instant};
|
|
pub use structs::*;
|
|
|
|
pub struct API {
|
|
pub config: Config,
|
|
location_req_que: Vec<String>,
|
|
}
|
|
|
|
impl API {
|
|
pub fn new(config: Config) -> API {
|
|
API {
|
|
config: config,
|
|
location_req_que: Vec::new(),
|
|
}
|
|
}
|
|
|
|
#[allow(dead_code)]
|
|
pub fn get_raw_string(&self, url: APIUrl) -> Result<String, GenericError> {
|
|
Ok(self
|
|
.request(API::api_url(url, &self.config))?
|
|
.as_str()?
|
|
.to_owned())
|
|
}
|
|
|
|
pub fn get_sharetoken(&self, tag_id: &String) -> Result<SharetokenModel, GenericError> {
|
|
let response = self.request(API::api_url(
|
|
APIUrl::Sharetoken(tag_id.clone()),
|
|
&self.config,
|
|
))?;
|
|
Ok(response.json()?)
|
|
}
|
|
|
|
#[allow(dead_code)]
|
|
pub fn get_tags(&self) -> Result<Vec<TagModel>, GenericError> {
|
|
let response = self.request(API::api_url(APIUrl::Tags, &self.config))?;
|
|
let tags = response.json();
|
|
if let Err(_) = tags {
|
|
let err: ErrorModel = response.json()?;
|
|
Err(GenericError::from(err))
|
|
} else {
|
|
tags.map_err(GenericError::from)
|
|
}
|
|
}
|
|
|
|
#[allow(dead_code)]
|
|
pub fn get_tag(&self, tag_id: &String) -> Result<TagModel, GenericError> {
|
|
let response = self.request(API::api_url(APIUrl::Tag(tag_id.clone()), &self.config))?;
|
|
Ok(response.json()?)
|
|
}
|
|
|
|
pub fn get_states(&self, tag_id: &String) -> Result<Vec<StateModel>, GenericError> {
|
|
let response = self.request(API::api_url(APIUrl::States(tag_id.clone()), &self.config))?;
|
|
Ok(response.json()?)
|
|
}
|
|
|
|
pub fn get_current_locations(
|
|
&self,
|
|
tag_id: &String,
|
|
) -> Result<Vec<LocationModel>, GenericError> {
|
|
let response = self.request(API::api_url(
|
|
APIUrl::CurrLocations(tag_id.clone()),
|
|
&self.config,
|
|
))?;
|
|
Ok(response.json()?)
|
|
}
|
|
|
|
pub fn get_locations(
|
|
&self,
|
|
tag_id: &String,
|
|
state_id: &String,
|
|
) -> Result<Vec<LocationModel>, GenericError> {
|
|
let response = self.request(API::api_url(
|
|
APIUrl::Locations(tag_id.clone(), state_id.clone()),
|
|
&self.config,
|
|
))?;
|
|
Ok(response.json()?)
|
|
}
|
|
|
|
pub fn queue_location(&mut self, tag_id: &String, state_id: &String) {
|
|
self.location_req_que.push(API::api_url(
|
|
APIUrl::Locations(tag_id.clone(), state_id.clone()),
|
|
&self.config,
|
|
));
|
|
}
|
|
|
|
pub fn begin_location_fetch(
|
|
&mut self,
|
|
) -> Receiver<Result<Result<Vec<LocationModel>, ErrorModel>, GenericError>> {
|
|
let (sender, receiver) = mpsc::channel();
|
|
let mut locations = self.location_req_que.clone();
|
|
let config = self.config.clone();
|
|
|
|
// sleep for a second to make sure no overlaps with previous requests.
|
|
thread::sleep(Duration::from_millis(1000));
|
|
|
|
thread::spawn(move || {
|
|
let (i_sender, i_receiver) = mpsc::channel();
|
|
|
|
let mut idx = 0;
|
|
let mut awaiting = 0;
|
|
|
|
let mut timer = 0;
|
|
let mut last_time = Instant::now();
|
|
let interval = 1_000_000_000 / config.throttle as u128;
|
|
loop {
|
|
let sender = sender.clone();
|
|
let config = config.clone();
|
|
let i_sender = i_sender.clone();
|
|
|
|
let now = Instant::now();
|
|
let delta = (now - last_time).as_nanos();
|
|
if timer > 0 {
|
|
timer = timer - delta.min(timer);
|
|
}
|
|
last_time = now;
|
|
|
|
if awaiting <= config.throttle && timer < (interval * (config.throttle as u128 - 2))
|
|
{
|
|
if locations.len() > idx {
|
|
let location = locations[idx].clone();
|
|
awaiting += 1;
|
|
thread::spawn(move || {
|
|
let response = minreq::get(location.clone())
|
|
.with_header("content-type", "application/json")
|
|
.with_header("x-api-key", &config.api_key)
|
|
.send()
|
|
.and_then(|r| match r.json() {
|
|
Ok(loc) => Ok(Ok(loc)),
|
|
Err(_) => Ok(Err(r.json()?)),
|
|
})
|
|
.map_err(GenericError::from);
|
|
i_sender.send((location, response)).ok();
|
|
});
|
|
timer += interval;
|
|
idx += 1;
|
|
} else {
|
|
while awaiting > 0 {
|
|
if let Ok((loc, rec)) = i_receiver.try_recv() {
|
|
API::add_error_back(loc, &rec, &mut locations);
|
|
awaiting -= 1;
|
|
sender.send(rec).ok();
|
|
}
|
|
}
|
|
if locations.len() <= idx {
|
|
break;
|
|
}
|
|
}
|
|
} else {
|
|
if let Ok((loc, rec)) = i_receiver.try_recv() {
|
|
API::add_error_back(loc, &rec, &mut locations);
|
|
awaiting -= 1;
|
|
sender.send(rec).ok();
|
|
}
|
|
}
|
|
}
|
|
});
|
|
self.location_req_que.clear();
|
|
receiver
|
|
}
|
|
|
|
pub fn get_between<T: Timestamped + Clone>(
|
|
list: &Vec<T>,
|
|
from: Option<NaiveDateTime>,
|
|
to: Option<NaiveDateTime>,
|
|
inclusive_from: bool,
|
|
config: &Config,
|
|
) -> Vec<(NaiveDateTime, T)> {
|
|
let mut timestamped = Vec::new();
|
|
for item in list {
|
|
if let Some(res) = API::parse_timestamp(&item.timestamp(), config) {
|
|
match res {
|
|
Ok(t) => {
|
|
if let Some(to) = to {
|
|
if t > to {
|
|
continue;
|
|
}
|
|
}
|
|
let mut too_old = false;
|
|
if let Some(from) = from {
|
|
if t < from {
|
|
too_old = true;
|
|
}
|
|
}
|
|
if too_old {
|
|
if inclusive_from {
|
|
timestamped.push((t.clone(), item.clone()));
|
|
}
|
|
break;
|
|
}
|
|
timestamped.push((t.clone(), item.clone()));
|
|
}
|
|
Err(e) => panic!(e),
|
|
}
|
|
} else {
|
|
println!("Skipped item, did not have timestmap");
|
|
}
|
|
}
|
|
timestamped
|
|
}
|
|
|
|
#[allow(dead_code)]
|
|
pub fn print_list<T: Display + Timestamped>(list: Result<Vec<T>, GenericError>) {
|
|
match list {
|
|
Ok(items) => {
|
|
for item in items {
|
|
println!(
|
|
"\n{}\n{}",
|
|
item.timestamp().unwrap_or("No timestamp".to_owned()),
|
|
item
|
|
);
|
|
}
|
|
}
|
|
Err(e) => eprintln!("Could not complete request: {}", e),
|
|
}
|
|
}
|
|
|
|
pub fn api_url(url: APIUrl, config: &Config) -> String {
|
|
match url {
|
|
APIUrl::CurrLocations(tag) => {
|
|
str::replace(&config.current_locations_url, "{tag}", &tag)
|
|
}
|
|
APIUrl::Locations(tag, state) => {
|
|
let url = str::replace(&config.locations_url, "{tag}", &tag);
|
|
str::replace(&url, "{state}", &state)
|
|
}
|
|
APIUrl::States(tag) => str::replace(&config.states_url, "{tag}", &tag),
|
|
APIUrl::Tags => config.tags_url.clone(),
|
|
APIUrl::Sharetoken(tag) => str::replace(&config.sharetoken_url, "{tag}", &tag),
|
|
APIUrl::Tag(tag) => str::replace(&config.tag_url, "{tag}", &tag),
|
|
}
|
|
}
|
|
|
|
fn add_error_back(
|
|
location: String,
|
|
rec: &Result<Result<Vec<LocationModel>, ErrorModel>, GenericError>,
|
|
locations: &mut Vec<String>,
|
|
) -> bool {
|
|
if let Ok(inner) = rec {
|
|
if let Err(_) = inner {
|
|
locations.push(location);
|
|
true
|
|
} else {
|
|
false
|
|
}
|
|
} else {
|
|
false
|
|
}
|
|
}
|
|
|
|
fn request(&self, url: String) -> Result<Response, GenericError> {
|
|
let response = minreq::get(url)
|
|
.with_header("content-type", "application/json")
|
|
.with_header("x-api-key", &self.config.api_key)
|
|
.send();
|
|
response.map_err(GenericError::from)
|
|
}
|
|
|
|
fn parse_timestamp(
|
|
timestamp: &Option<String>,
|
|
config: &Config,
|
|
) -> Option<Result<NaiveDateTime, ParseError>> {
|
|
if let Some(timestamp) = ×tamp {
|
|
Some(NaiveDateTime::parse_from_str(
|
|
×tamp,
|
|
&config.timestamp_format,
|
|
))
|
|
} else {
|
|
None
|
|
}
|
|
}
|
|
}
|
|
|
|
pub enum APIUrl {
|
|
CurrLocations(String),
|
|
Locations(String, String),
|
|
States(String),
|
|
Tags,
|
|
#[allow(dead_code)]
|
|
Tag(String),
|
|
Sharetoken(String),
|
|
}
|