pub mod structs; use super::Config; use super::LibError; use chrono::format::ParseError; use chrono::NaiveDateTime; use minreq::Response; use std::fmt::Display; use std::sync::mpsc; use std::sync::mpsc::Receiver; use std::thread; use std::time::{Duration, Instant}; pub use structs::*; pub struct API { pub config: Config, location_req_que: Vec, } impl API { pub fn new(config: Config) -> API { API { config: config, location_req_que: Vec::new(), } } pub fn get_raw_string(&self, url: APIUrl) -> Result { Ok(self .request(API::api_url(url, &self.config))? .as_str()? .to_owned()) } pub fn get_sharetoken(&self, tag_id: &String) -> Result { let response = self.request(API::api_url( APIUrl::Sharetoken(tag_id.clone()), &self.config, ))?; Ok(response.json()?) } pub fn get_tags(&self) -> Result, LibError> { let response = self.request(API::api_url(APIUrl::Tags, &self.config))?; let tags = response.json(); if let Err(_) = tags { let err: ErrorModel = response.json()?; Err(LibError::from(err)) } else { tags.map_err(LibError::from) } } pub fn get_tag(&self, tag_id: &String) -> Result { let response = self.request(API::api_url(APIUrl::Tag(tag_id.clone()), &self.config))?; Ok(response.json()?) } pub fn get_states(&self, tag_id: &String) -> Result, LibError> { let response = self.request(API::api_url(APIUrl::States(tag_id.clone()), &self.config))?; Ok(response.json()?) } pub fn get_current_locations(&self, tag_id: &String) -> Result, LibError> { let response = self.request(API::api_url( APIUrl::CurrLocations(tag_id.clone()), &self.config, ))?; Ok(response.json()?) } pub fn get_locations( &self, tag_id: &String, state_id: &String, ) -> Result, LibError> { let response = self.request(API::api_url( APIUrl::Locations(tag_id.clone(), state_id.clone()), &self.config, ))?; Ok(response.json()?) } pub fn queue_location(&mut self, tag_id: &String, state_id: &String) { self.location_req_que.push(API::api_url( APIUrl::Locations(tag_id.clone(), state_id.clone()), &self.config, )); } pub fn begin_location_fetch( &mut self, ) -> Receiver, ErrorModel>, LibError>> { let (sender, receiver) = mpsc::channel(); let mut locations = self.location_req_que.clone(); let config = self.config.clone(); // sleep for a second to make sure no overlaps with previous requests. thread::sleep(Duration::from_millis(1000)); thread::spawn(move || { let (i_sender, i_receiver) = mpsc::channel(); let mut idx = 0; let mut awaiting = 0; let mut timer = 0; let mut last_time = Instant::now(); let interval = 1_000_000_000 / config.throttle as u128; loop { let sender = sender.clone(); let config = config.clone(); let i_sender = i_sender.clone(); let now = Instant::now(); let delta = (now - last_time).as_nanos(); if timer > 0 { timer = timer - delta.min(timer); } last_time = now; if awaiting <= config.throttle && timer < (interval * (config.throttle as u128 - 2)) { if locations.len() > idx { let location = locations[idx].clone(); awaiting += 1; thread::spawn(move || { let response = minreq::get(location.clone()) .with_header("content-type", "application/json") .with_header("x-api-key", &config.api_key) .send() .and_then(|r| match r.json() { Ok(loc) => Ok(Ok(loc)), Err(_) => Ok(Err(r.json()?)), }) .map_err(LibError::from); i_sender.send((location, response)).ok(); }); timer += interval; idx += 1; } else { while awaiting > 0 { if let Ok((loc, rec)) = i_receiver.try_recv() { API::add_error_back(loc, &rec, &mut locations); awaiting -= 1; sender.send(rec).ok(); } } if locations.len() <= idx { break; } } } else { if let Ok((loc, rec)) = i_receiver.try_recv() { API::add_error_back(loc, &rec, &mut locations); awaiting -= 1; sender.send(rec).ok(); } } } }); self.location_req_que.clear(); receiver } pub(crate) fn get_between( list: &Vec, from: Option, to: Option, inclusive_from: bool, config: &Config, ) -> Vec<(NaiveDateTime, T)> { let mut timestamped = Vec::new(); for item in list { if let Some(res) = API::parse_timestamp(&item.timestamp(), config) { match res { Ok(t) => { if let Some(to) = to { if t > to { continue; } } let mut too_old = false; if let Some(from) = from { if t < from { too_old = true; } } if too_old { if inclusive_from { timestamped.push((t.clone(), item.clone())); } break; } timestamped.push((t.clone(), item.clone())); } Err(e) => panic!(e), } } else { println!("Skipped item, did not have timestmap"); } } timestamped } #[allow(dead_code)] pub fn print_list(list: Result, LibError>) { match list { Ok(items) => { for item in items { println!( "\n{}\n{}", item.timestamp().unwrap_or("No timestamp".to_owned()), item ); } } Err(e) => eprintln!("Could not complete request: {}", e), } } pub fn api_url(url: APIUrl, config: &Config) -> String { match url { APIUrl::CurrLocations(tag) => { str::replace(&config.current_locations_url, "{tag}", &tag) } APIUrl::Locations(tag, state) => { let url = str::replace(&config.locations_url, "{tag}", &tag); str::replace(&url, "{state}", &state) } APIUrl::States(tag) => str::replace(&config.states_url, "{tag}", &tag), APIUrl::Tags => config.tags_url.clone(), APIUrl::Sharetoken(tag) => str::replace(&config.sharetoken_url, "{tag}", &tag), APIUrl::Tag(tag) => str::replace(&config.tag_url, "{tag}", &tag), } } fn add_error_back( location: String, rec: &Result, ErrorModel>, LibError>, locations: &mut Vec, ) -> bool { if let Ok(inner) = rec { if let Err(_) = inner { locations.push(location); true } else { false } } else { false } } fn request(&self, url: String) -> Result { let response = minreq::get(url) .with_header("content-type", "application/json") .with_header("x-api-key", &self.config.api_key) .send(); response.map_err(LibError::from) } fn parse_timestamp( timestamp: &Option, config: &Config, ) -> Option> { if let Some(timestamp) = ×tamp { Some(NaiveDateTime::parse_from_str( ×tamp, &config.timestamp_format, )) } else { None } } } pub enum APIUrl { CurrLocations(String), Locations(String, String), States(String), Tags, #[allow(dead_code)] Tag(String), Sharetoken(String), }