I have a really annoying lifetime issue that is solved by using tokio_scoped, but I don’t understand why.
First I will layout the structure of my program so you can understand the layout. I have an http client, an algebra which wraps the client CarDataApiImpl
, a redis client RedisImpl
, and an algebra that calls the api and upserts it to redis EventSyncImpl
let http_requester = TelemetryHttpRequester;
let api = CarDataApiImpl {
http_requester: &http_requester,
uri: "https://api.openf1.org",
};
let redis_client: RedisImpl = RedisImpl::default().expect("unable to connect to redis");
let event_sync = EventSyncImpl {
api: &api,
redis: &redis_client,
};
these are traits, with structs that take setup parameters, and then an impl for the trait. ex:
pub trait CarDataApi {
fn get_session(
&self,
country_name: &str,
session_name: &str,
year: u32,
) -> Option<Vec<Session>>;
fn get_drivers(&self, session_key: u32, driver_number: &DriverNumber) -> Option<Vec<Driver>>;
fn get_car_data(
&self,
session_key: u32,
driver_number: Option<DriverNumber>,
speed: Option<u32>,
) -> Option<Vec<CarData>>;
fn get_intervals(&self, session_key: u32, maybe_interval: Option<f32>)
-> Option<Vec<Interval>>;
fn get_lap(&self, session_key: u32, driver_number: &DriverNumber, lap: u32)
-> Option<Vec<Lap>>;
fn get_car_location(
&self,
session_key: u32,
driver_number: &DriverNumber,
start_time: &str,
end_time: &str,
) -> Option<Vec<CarLocation>>;
fn get_meeting(&self, year: u32, country: &str) -> Option<Vec<Meeting>>;
fn get_pit(&self, session_key: u32, pit_duration: Option<u32>) -> Option<Vec<Pit>>;
fn get_position(
&self,
meeting_key: u32,
driver_number: &DriverNumber,
position: Option<u32>,
) -> Option<Vec<Position>>;
fn get_race_control(
&self,
category: Option<Category>,
flag: Option<Flag>,
driver_number: Option<DriverNumber>,
start_date: Option<String>,
end_date: Option<String>,
) -> Option<Vec<RaceControl>>;
fn get_stints(&self, session_key: u32, tyre_age: Option<u32>) -> Option<Vec<Stint>>;
fn get_team_radio(
&self,
session_key: u32,
driver_number: Option<DriverNumber>,
) -> Option<Vec<TeamRadio>>;
fn get_weather(
&self,
meeting_key: u32,
wind_direction: Option<u32>,
track_temp: Option<u32>,
) -> Option<Vec<Weather>>;
}
pub struct CarDataApiImpl<'a> {
pub http_requester: &'a TelemetryHttpRequester,
pub uri: &'a str,
}
impl CarDataApi for CarDataApiImpl<'_> {
fn get_session(
&self,
country_name: &str,
session_name: &str,
year: u32,
) -> Option<Vec<Session>> {
let request_url = self.uri.to_owned()
+ &format!(
"/v1/sessions?country_name={country_name}&session_name={session_name}&year={year}"
);
debug!("{request_url}");
match self.http_requester.get::<Vec<Session>>(&request_url) {
Ok(sessions) if sessions.is_empty() => None,
Ok(sessions) => Some(sessions),
Err(_) => None,
}
}
fn get_drivers(&self, session_key: u32, driver_number: &DriverNumber) -> Option<Vec<Driver>> {
let request_url = self.uri.to_owned()
+ &format!(
"/v1/drivers?driver_number={}&session_key={}",
driver_number, session_key
);
debug!("{:?}", request_url);
match self.http_requester.get::<Vec<Driver>>(&request_url) {
Ok(drivers) if drivers.is_empty() => None,
Ok(drivers) => Some(drivers),
Err(_) => None,
}
}
fn get_car_data(
&self,
session_key: u32,
driver_number: Option<DriverNumber>,
speed: Option<u32>,
) -> Option<Vec<CarData>> {
let driver_num_str =
driver_number.map_or_else(|| "".to_string(), |dr| format!("&driver_number={}", dr));
let speed = speed.map_or_else(|| "".to_string(), |s| format!("&speed>={}", s));
let request_url = self.uri.to_owned()
+ &format!(
"/v1/car_data?session_key={}{}{}",
session_key, driver_num_str, speed
);
debug!("{:?}", request_url);
match self.http_requester.get::<Vec<CarData>>(&request_url) {
Ok(car_data) if car_data.is_empty() => None,
Ok(car_data) => Some(car_data),
Err(_) => None,
}
}
fn get_intervals(
&self,
session_key: u32,
maybe_interval: Option<f32>,
) -> Option<Vec<Interval>> {
let interval_query_param =
maybe_interval.map_or_else(|| "".to_string(), |i| format!("&interval<{}", i));
let request_url = self.uri.to_owned()
+ &format!(
"/v1/intervals?session_key={}{}",
session_key, interval_query_param
);
debug!("{:?}", request_url);
match self.http_requester.get::<Vec<Interval>>(&request_url) {
Ok(interval) if interval.is_empty() => None,
Ok(interval) => Some(interval),
Err(_) => None,
}
}
fn get_lap(
&self,
session_key: u32,
driver_number: &DriverNumber,
lap: u32,
) -> Option<Vec<Lap>> {
let request_url = self.uri.to_owned()
+ &format!(
"/v1/laps?session_key={}&driver_number={}&lap_number={}",
session_key, driver_number, lap
);
debug!("{:?}", request_url);
match self.http_requester.get::<Vec<Lap>>(&request_url) {
Ok(laps) if laps.is_empty() => None,
Ok(laps) => Some(laps),
Err(_) => None,
}
}
fn get_car_location(
&self,
session_key: u32,
driver_number: &DriverNumber,
start_time: &str,
end_time: &str,
) -> Option<Vec<CarLocation>> {
let request_url = self.uri.to_owned()
+ &format!(
"/v1/location?session_key={}&driver_number={}&date>{}&date<{}",
session_key, driver_number, start_time, end_time
);
debug!("{:?}", request_url);
match self.http_requester.get::<Vec<CarLocation>>(&request_url) {
Ok(locations) if locations.is_empty() => None,
Ok(locations) => Some(locations),
Err(_) => None,
}
}
fn get_meeting(&self, year: u32, country: &str) -> Option<Vec<Meeting>> {
let request_url =
self.uri.to_owned() + &format!("/v1/meetings?year={}&country_name={}", year, country);
debug!("{:?}", request_url);
match self.http_requester.get::<Vec<Meeting>>(&request_url) {
Ok(meeting) if meeting.is_empty() => None,
Ok(meeting) => Some(meeting),
Err(_) => None,
}
}
fn get_pit(&self, session_key: u32, pit_duration: Option<u32>) -> Option<Vec<Pit>> {
let pit_duration_str =
pit_duration.map_or_else(|| "".to_string(), |p| format!("&pit_duration<{}", &p));
let request_url = self.uri.to_owned()
+ &format!("/v1/pit?session_key={}{}", session_key, pit_duration_str);
debug!("{:?}", request_url);
match self.http_requester.get::<Vec<Pit>>(&request_url) {
Ok(pit) if pit.is_empty() => None,
Ok(pit) => Some(pit),
Err(_) => None,
}
}
fn get_position(
&self,
meeting_key: u32,
driver_number: &DriverNumber,
position: Option<u32>,
) -> Option<Vec<Position>> {
let position_str = position.map_or_else(|| "".to_string(), |p| format!("&position<={}", p));
let request_url = self.uri.to_owned()
+ &format!(
"/v1/position?meeting_key={}&driver_number={}{}",
meeting_key, driver_number, position_str
);
debug!("{:?}", request_url);
match self.http_requester.get::<Vec<Position>>(&request_url) {
Ok(position) if position.is_empty() => None,
Ok(position) => Some(position),
Err(_) => None,
}
}
fn get_race_control(
&self,
category: Option<Category>,
flag: Option<Flag>,
driver_number: Option<DriverNumber>,
start_date: Option<String>,
end_date: Option<String>,
) -> Option<Vec<RaceControl>> {
let params = build_query_params(category, flag, driver_number, start_date, end_date);
let request_url = self.uri.to_owned() + &"/v1/race_control" + ¶ms;
debug!("{:?}", request_url);
match self.http_requester.get::<Vec<RaceControl>>(&request_url) {
Ok(race_control) if race_control.is_empty() => None,
Ok(race_control) => Some(race_control),
Err(_) => None,
}
}
fn get_stints(&self, session_key: u32, tyre_age: Option<u32>) -> Option<Vec<Stint>> {
let tyre_age_at_start =
tyre_age.map_or_else(|| "".to_string(), |t| format!("&tyre_age_at_start>={}", &t));
let request_url = self.uri.to_owned()
+ &format!(
"/v1/stints?session_key={}{}",
session_key, tyre_age_at_start
);
debug!("{:?}", request_url);
match self.http_requester.get::<Vec<Stint>>(&request_url) {
Ok(stint) if stint.is_empty() => None,
Ok(stint) => Some(stint),
Err(_) => None,
}
}
fn get_team_radio(
&self,
session_key: u32,
driver_number: Option<DriverNumber>,
) -> Option<Vec<TeamRadio>> {
let driver_num_str =
driver_number.map_or_else(|| "".to_string(), |dn| format!("&driver_number={}", &dn));
let request_url = self.uri.to_owned()
+ &format!(
"/v1/team_radio?session_key={}{}",
session_key, driver_num_str
);
debug!("{:?}", request_url);
match self.http_requester.get::<Vec<TeamRadio>>(&request_url) {
Ok(team_radio) if team_radio.is_empty() => None,
Ok(team_radio) => Some(team_radio),
Err(_) => None,
}
}
fn get_weather(
&self,
meeting_key: u32,
wind_direction: Option<u32>,
track_temp: Option<u32>,
) -> Option<Vec<Weather>> {
let wind_direction_str =
wind_direction.map_or_else(|| "".to_string(), |wd| format!("&wind_direction>={}", &wd));
let track_temp_str = track_temp.map_or_else(
|| "".to_string(),
|tt| format!("&track_temperature>={}", &tt),
);
let request_url = self.uri.to_owned()
+ &format!(
"/v1/weather?meeting_key={}{}{}",
meeting_key, wind_direction_str, track_temp_str
);
debug!("{:?}", request_url);
match self.http_requester.get::<Vec<Weather>>(&request_url) {
Ok(weather) if weather.is_empty() => None,
Ok(weather) => Some(weather),
Err(_) => None,
}
}
}
fn build_query_params(
category: Option<Category>,
flag: Option<Flag>,
driver_number: Option<DriverNumber>,
start_date: Option<String>,
end_date: Option<String>,
) -> String {
let category_str = category.map_or_else(|| "".to_string(), |c| format!("&category={:?}", c));
let flag_str = flag.map_or_else(|| "".to_string(), |f| format!("&flag={}", f));
let driver_num_str =
driver_number.map_or_else(|| "".to_string(), |d| format!("&driver_number={}", d));
let start_date_str = start_date.map_or_else(|| "".to_string(), |d| format!("&date>={}", d));
let end_date_str = end_date.map_or_else(|| "".to_string(), |d| format!("&date<{}", d));
let s = category_str + &flag_str + &driver_num_str + &start_date_str + &end_date_str;
if !s.is_empty() && s.starts_with('&') {
("?".to_owned() + &s[1..]).to_string()
} else {
s
}
}
redis
#[async_trait]
pub trait Redis {
async fn set_json<V: Serialize + Send + Sync>(
&self,
key: &str,
value: V,
) -> Result<(), RedisClientError>;
async fn get_json<V: DeserializeOwned, K: Into<RedisKey> + Send + Display + Clone>(
&self,
key: K,
) -> Result<Option<V>, RedisClientError>;
async fn redis_fire_and_forget<V: Serialize + Send + Sync>(
&self,
maybe_value: Option<Vec<V>>,
redis_key: String,
);
}
pub struct RedisImpl {
pub client: RedisClient,
}
impl RedisImpl {
pub fn default() -> Result<RedisImpl, RedisError> {
info!("Connecting to redis");
let config: RedisConfig = RedisConfig::default();
let reconnect_policy: ReconnectPolicy = ReconnectPolicy::new_exponential(5, 1, 10, 5);
let client = RedisClient::new(config);
let _ = client.connect(Some(reconnect_policy));
let _ = client.wait_for_connect();
let redis_algebra: RedisImpl = RedisImpl { client: client };
info!("Connected to Redis");
Ok(redis_algebra)
}
}
#[async_trait]
impl Redis for RedisImpl {
async fn set_json<V: Serialize + Send + Sync>(
&self,
key: &str,
value: V,
) -> Result<(), RedisClientError> {
let json = serde_json::to_string(&value)?;
self.client.set(key, json, None, None, false).await?;
Ok(())
}
async fn get_json<V, K>(&self, key: K) -> Result<Option<V>, RedisClientError>
where
V: DeserializeOwned,
K: Into<RedisKey> + Send + Display + Clone,
{
let key_log = key.clone();
let json: Option<String> = self.client.get(key).await?;
if let Some(json_str) = json {
let value: V = serde_json::from_str(&json_str)?;
Ok(Some(value))
} else {
error!("value not found for key: {}", key_log);
Ok(None)
}
}
async fn redis_fire_and_forget<V: Serialize + Send + Sync>(
&self,
maybe_value: Option<Vec<V>>,
redis_key: String,
) {
match maybe_value {
Some(value) => match self.set_json::<Vec<V>>(&redis_key, value).await {
Ok(_) => info!("{redis_key} synced"),
Err(e) => error!("could not Redis write {redis_key}, err: {e}"),
},
None => warn!("no {redis_key} returned from client"),
}
}
}
http client
pub trait HttpRequester {
fn get<T: DeserializeOwned>(&self, url: &str) -> Result<T, Box<dyn Error>>;
}
pub struct TelemetryHttpRequester;
impl HttpRequester for TelemetryHttpRequester {
fn get<T: DeserializeOwned>(&self, url: &str) -> Result<T, Box<dyn Error>> {
let response = attohttpc::get(url).send()?;
if response.is_success() {
let body = response.text()?;
let parsed = serde_json::from_str::<T>(&body);
match parsed {
Ok(val) => return Ok(val),
Err(e) => {
error!("Error parsing JSON: {e}");
// error!("Body: {}", body);
return Err(Box::new(e));
}
}
} else {
let message = format!(
"Request Error: Status:{:?}, Body: {:?}",
response.status(),
response.text()
);
error!("{message}");
Err(anyhow!(message).into())
}
}
}
and finally the EventSync
I’m having issues with: Note: I’ve omitted the methods that are not causing errors.
#[async_trait]
pub trait EventSync {
async fn car_data_sync(
&self,
session_key: u32,
driver_number: Option<DriverNumber>,
speed: Option<u32>,
);
async fn intervals_sync(&self, session_key: u32, maybe_interval: Option<f32>);
async fn team_radio_sync(&self, session_key: u32, driver_number: Option<DriverNumber>);
async fn laps_sync(&self, session_key: u32, driver_number: &DriverNumber, lap: u32);
async fn pit_sync(&self, session_key: u32, pit_duration: Option<u32>);
async fn position_sync(
&self,
meeting_key: u32,
driver_number: &DriverNumber,
position: Option<u32>,
);
async fn stints_sync(&self, session_key: u32, tyre_age: Option<u32>);
async fn run_sync(
&self,
session_key: u32,
meeting_key: u32,
speed: Option<u32>,
maybe_interval: Option<f32>,
driver_number: DriverNumber,
lap: u32,
pit_duration: Option<u32>,
position: Option<u32>,
tyre_age: Option<u32>,
);
}
pub struct EventSyncImpl<'a> {
pub api: &'a CarDataApiImpl<'a>,
pub redis: &'a RedisImpl,
}
#[async_trait]
impl EventSync for EventSyncImpl<'_> {
async fn run_sync(
&self,
session_key: u32,
meeting_key: u32,
speed: Option<u32>,
maybe_interval: Option<f32>,
driver_number: DriverNumber,
lap: u32,
pit_duration: Option<u32>,
position: Option<u32>,
tyre_age: Option<u32>,
) {
tokio::spawn(async move {
tokio::select! {
_ = self.car_data_sync(session_key, Some(driver_number), speed) => {},
_ = self.intervals_sync(session_key, maybe_interval) => {},
_ = self.team_radio_sync(session_key, None) => {},
_ = self.laps_sync(session_key, &driver_number, lap) => {},
_ = self.pit_sync(session_key, pit_duration) => {},
_ = self.position_sync(meeting_key, &driver_number, position) => {},
_ = self.stints_sync(session_key, tyre_age) => {},
}
});
The above gives me the following error:
error[E0521]: borrowed data escapes outside of method
--> src/algebras/event_sync.rs:186:9
|
175 | &self,
| -----
| |
| `self` is a reference that is only valid in the method body
| has type `&EventSyncImpl<'1>`
...
186 | / tokio::spawn(async move {
187 | | tokio::select! {
188 | | _ = self.car_data_sync(session_key, Some(driver_numb
189 | | _ = self.intervals_sync(session_key, maybe_interval)
... |
197 | | }
198 | | });
| | ^
| | |
| |__________`self` escapes the method body here
| argument requires that `'1` must outlive `'static`
error[E0521]: borrowed data escapes outside of method
--> src/algebras/event_sync.rs:186:9
|
175 | &self,
| -----
| |
| `self` is a reference that is only valid in the method body
| lifetime `'life0` defined here
...
186 | / tokio::spawn(async move {
187 | | tokio::select! {
188 | | _ = self.car_data_sync(session_key, Some(driver_numb
189 | | _ = self.intervals_sync(session_key, maybe_interval)
... |
197 | | }
198 | | });
| | ^
| | |
| |__________`self` escapes the method body here
| argument requires that `'life0` must outlive `'static`
help: replace `'life0` with `'static`
that being said if I replace it with tokio_scoped
the error goes away. I tried using std::sync::Arc
, which didn’t help, and mutexes were a pain so I decided not to deal with locking a resource and creating deadlock issues.
error[E0521]: borrowed data escapes outside of method
--> src/algebras/event_sync.rs:186:9
|
175 | &self,
| -----
| |
| `self` is a reference that is only valid in the method body
| has type `&EventSyncImpl<'1>`
...
186 | / tokio::spawn(async move {
187 | | tokio::select! {
188 | | _ = self.car_data_sync(session_key, Some(driver_numb
189 | | _ = self.intervals_sync(session_key, maybe_interval)
... |
197 | | }
198 | | });
| | ^
| | |
| |__________`self` escapes the method body here
| argument requires that `'1` must outlive `'static`
error[E0521]: borrowed data escapes outside of method
--> src/algebras/event_sync.rs:186:9
|
175 | &self,
| -----
| |
| `self` is a reference that is only valid in the method body
| lifetime `'life0` defined here
...
186 | / tokio::spawn(async move {
187 | | tokio::select! {
188 | | _ = self.car_data_sync(session_key, Some(driver_numb
189 | | _ = self.intervals_sync(session_key, maybe_interval)
... |
197 | | }
198 | | });
| | ^
| | |
| |__________`self` escapes the method body here
| argument requires that `'life0` must outlive `'static`
help: replace `'life0` with `'static`
What am I not understanding about how lifetimes work, or what tokio_scoped is doing to combat this resource issue? I thought it could be because tokio is global which would require a static lifetime, and somehow tokio_scoped is not. If I wanted to remove tokio_scoped as a dependency what would I have to do?