I know that it is a common error, but as a rust (and async) noob, even after reading the documentation I don’t know how to fix after all my tests and I am not sure to understand all the concepts.
Here’s my code :
#[tokio::main]
async fn main() -> Result<(), Error> {
let _ = env_logger::try_init();
let addr = env::args().nth(1).unwrap_or_else(|| "127.0.0.1:8080".to_string());
// Create the event loop and TCP listener we'll accept connections on.
let try_socket = TcpListener::bind(&addr).await;
let listener = try_socket.expect("Failed to bind");
info!("Listening on: {}", addr);
while let Ok((stream, _)) = listener.accept().await {
tokio::spawn(accept_connection(stream));
}
Ok(())
}
async fn accept_connection(stream: TcpStream) {
let addr = stream.peer_addr().expect("connected streams should have a peer address");
info!("Peer address: {}", addr);
let ws_stream = accept_async(stream)
.await
.expect("Error during WebSocket handshake");
info!("New WebSocket connection: {}", addr);
let (mut write, mut read) = ws_stream.split();
let (buffer, mut compressor) = cam_setup();
// Receive and discard messages from the WebSocket client
while let Some(read_content) = read.try_next().await.unwrap() {
if let Ok(text) = read_content.to_text() {
let trimmed_text = text.trim(); // Trim whitespace
if trimmed_text == "one" {
let image_data = get_cam_picture(&buffer, &mut compressor);
let _ = write.send(tokio_tungstenite::tungstenite::Message::Text(image_data)).await;
}
else if trimmed_text == "loop" {
tokio::spawn(acquisition_loop(
Arc::new(Mutex::new(&buffer)),
Arc::new(Mutex::new(&mut compressor)),
&mut read,
&mut write
));
}
}
}
}
fn cam_setup() -> (AcquisitionBuffer, Compressor){
info!("started");
let mut cam = xiapi::open_device(None).unwrap();
info!("cam open");
let mut compressor = Compressor::new().unwrap();
_ = compressor.set_subsamp(turbojpeg::Subsamp::Gray);
_ = compressor.set_quality(70);
cam.set_exposure(10000.0).unwrap();
let buffer = cam.start_acquisition().unwrap();
(buffer, compressor)
}
fn get_cam_picture(buffer : &AcquisitionBuffer, compressor : &mut Compressor) -> String {
let image = buffer.next_image::<u8>(None).unwrap();
let pixel = image.pixel(0, 0);
match pixel {
Some(_) => {
// initialize a Compressor
let imagec = Image {
pixels: image.data(), //vec![0; (image.width() as usize) * (image.height()as usize)],
width: image.width() as usize,
pitch: image.width() as usize, // there is no padding between rows
height: image.height() as usize,
format: PixelFormat::GRAY,
};
// compress the Image to a Vec<u8> of JPEG data
let jpeg_data = compressor.compress_to_vec(imagec.as_deref()).unwrap();
let jpeg_data_encode = BASE64_STANDARD.encode(jpeg_data);
jpeg_data_encode
},
None => unreachable!("Could not get pixel value from image!"),
}
}
async fn acquisition_loop(
buffer_mutex: Arc<Mutex<&AcquisitionBuffer>>,
compressor_mutex: Arc<Mutex<&mut Compressor>>,
read: &'static mut SplitStream<WebSocketStream<TcpStream>>,
write: &'static mut SplitSink<WebSocketStream<TcpStream>, Message>,
) {
let buffer = Arc::clone(&buffer_mutex);
let compressor = Arc::clone(&compressor_mutex);
tokio::spawn(async move {
loop {
let mut buffer_lock = buffer.lock().unwrap();
let mut compressor_lock = compressor.lock().unwrap();
let image_data = get_cam_picture( *buffer_lock, *compressor_lock);
write.send(Message::Text(image_data)).await.unwrap();
}
});
while let Some(read_content) = read.next().await {
match read_content {
Ok(message) => {
if let Some(text) = message.to_text().ok().map(|s| s.trim()) {
if text == "stop" {
break;
}
}
}
Err(err) => {
eprintln!("Error receiving message: {:?}", err);
break;
}
}
}
}
To explain a bit my code, it a server that acquire pictures thanks to a camera and send them to a client. You have two options, obtain only one picture or having a continous flow of images. To do that I made 2 functions : get_picture that encode and return one picture and acquisition_loop that has to send a continous flow while the client does not send “stop”.
My program is not very clean for the moment and can have a lot a problems but the main issue is that in the acquisition_loop I have the following error :
[{
"resource": "/c:/Users/serge/Desktop/CameraControl/Ximea_Camera_With_WebSocket_Rust/web_server/src/main.rs",
"owner": "rustc",
"code": {
"value": "Click for full compiler diagnostic",
"target": {
"$mid": 1,
"path": "/diagnostic message [0]",
"scheme": "rust-analyzer-diagnostics-view",
"query": "0",
"fragment": "file:///c%3A/Users/serge/Desktop/CameraControl/Ximea_Camera_With_WebSocket_Rust/web_server/src/main.rs"
}
},
"severity": 8,
"message": "future cannot be sent between threads safelynwithin `AcquisitionBuffer`, the trait `Sync` is not implemented for `*mut c_void`, which is required by `{async block@src\main.rs:140:18: 147:6}: std::marker::Send`",
"source": "rustc",
"startLineNumber": 140,
"startColumn": 18,
"endLineNumber": 147,
"endColumn": 6,
"relatedInformation": [
{
"startLineNumber": 142,
"startColumn": 35,
"endLineNumber": 142,
"endColumn": 41,
"message": "captured value is not `Send`",
"resource": "/c:/Users/serge/Desktop/CameraControl/Ximea_Camera_With_WebSocket_Rust/web_server/src/main.rs"
},
{
"startLineNumber": 166,
"startColumn": 21,
"endLineNumber": 166,
"endColumn": 25,
"message": "required by a bound in `tokio::spawn`",
"resource": "/c:/Users/serge/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/task/spawn.rs"
}
]
}]
Even with the mutex, i don’t know how to solve this error.
Do you guys see obvious errors or having some things to explore ? I’m open and available ! Thank you in advance for taking my message into consideration !
I tried differents runtimes, tokio and std mutex’s.
Zeugy is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.