I have this code to PUT and GET file on S3 using legacy hyper client. Now I want to use reqwest but I don’t know what to use, can you help me?
use futures::{StreamExt, TryStreamExt};
use http_body_util::{combinators::UnsyncBoxBody, BodyExt, BodyStream, StreamBody};
use hyper::{body::Frame, header, Method};
use hyper_tls::HttpsConnector;
use hyper_util::{
client::legacy::{self, connect::HttpConnector},
rt::TokioExecutor,
};
use std::{
io,
pin::Pin,
task::{Context, Poll},
time::Duration,
};
use tokio::io::AsyncRead;
use tokio_util::{
bytes::Bytes,
io::{ReaderStream, StreamReader},
};
enum HyperClientCustomBody {
Stream(UnsyncBoxBody<Bytes, std::io::Error>),
Empty,
}
impl hyper::body::Body for HyperClientCustomBody {
type Data = hyper::body::Bytes;
type Error = std::io::Error;
fn poll_frame(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
match &mut *self.get_mut() {
Self::Stream(stream) => Pin::new(stream).poll_frame(cx),
Self::Empty => Poll::Ready(None),
}
}
}
async fn put_file(
&self,
filename: &str,
size: i64,
reader: Pin<Box<(dyn AsyncRead + Send)>>,
) -> Result<()> {
let hyper_client = legacy::Client::builder(TokioExecutor::new()).build::<_, HyperClientCustomBody>(HttpsConnector::new());
hyper_client
.request(
hyper::Request::builder()
.method(Method::PUT)
.uri(get_uri())
.header(header::CONTENT_LENGTH, size)
.body(HyperClientCustomBody::Stream(BodyExt::boxed_unsync(
StreamBody::new(ReaderStream::new(reader).map_ok(Frame::data)),
)))
.unwrap(),
)
.await?;
Ok(())
}
async fn get_file(&self, filename: &str) -> Result<Pin<Box<dyn AsyncRead + Send + Sync>>> {
let hyper_client = legacy::Client::builder(TokioExecutor::new()).build::<_, HyperClientCustomBody>(HttpsConnector::new());
let resp = hyper_client
.request(
hyper::Request::builder()
.method(Method::GET)
.uri(get_uri())
.body(HyperClientCustomBody::Empty)
.unwrap(),
)
.await?;
let body = BodyStream::new(resp).filter_map(|res| async {
Some(match res {
Ok(frame) => Ok(frame.into_data().ok()?),
Err(e) => Err(io::Error::other(e)),
})
});
let res = Box::pin(StreamReader::new(body));
Ok(res)
}
I tried using reqwest like this:
async fn put_file(
&self,
filename: &str,
size: i64,
reader: Pin<Box<(dyn AsyncRead + Send)>>,
) -> Result<()> {
let http_client = reqwest::Client::new();
http_client
.put(get_uri())
.header("Content-Length", size)
// -----> I don't know what to use in body here
// .body(StreamBody::new(ReaderStream::new(
// reqwest::Body::wrap_stream(reader),
// )))
// .body(reqwest::Body::wrap_stream(reader))
.send()
.await?;
Ok(())
}
async fn get_file(&self, filename: &str) -> Result<Pin<Box<dyn AsyncRead + Send + Sync>>> {
let hyper_client = reqwest::Client::new();
let resp = http_client.get(get_uri()).send().await?;
// -----> How to handle body here?
let body = BodyStream::new(resp).filter_map(|res| async {
Some(match res {
Ok(frame) => Ok(frame.into_data().ok()?),
Err(e) => Err(io::Error::other(e)),
})
});
let res = Box::pin(StreamReader::new(body));
Ok(res)
}