Hello,
I am not sure if this is a bug, or a mistake from my end.
I am trying the streaming api in the following:
pub fn retrieve_response_async(
param: ReportDeps,
content: Arc<Mutex<String>>,
toasts: Arc<Mutex<Toasts>>,
boolean_flag: Arc<AtomicBool>,
) {
let flag = boolean_flag.clone();
let url = "http://127.0.0.1:8000/reportstream";
let request = ehttp::Request::post(url, serde_json::to_vec(¶m).unwrap());
ehttp::streaming::fetch(
request,
move |result: ehttp::Result<ehttp::streaming::Part>| {
let part = match result {
Ok(part) => part,
Err(err) => {
boolean_flag
.clone()
.store(false, std::sync::atomic::Ordering::Relaxed);
toasts
.lock()
.error(format!("An error occurred while streaming `{url}`: {err}"));
return std::ops::ControlFlow::Break(());
}
};
match part {
ehttp::streaming::Part::Response(response) => {
tracing::info!("RESPONSE");
println!("Status code: {:?}", response.status);
if response.ok {
std::ops::ControlFlow::Continue(())
} else {
boolean_flag.store(false, std::sync::atomic::Ordering::Relaxed);
toasts.lock().error(format!(
"An error occured, please try again: {}",
response.status_text
));
std::ops::ControlFlow::Break(())
}
}
ehttp::streaming::Part::Chunk(chunk) => {
if chunk.is_empty() {
flag.clone()
.store(false, std::sync::atomic::Ordering::Relaxed);
std::ops::ControlFlow::Break(())
} else {
let chunk = String::from_utf8(chunk).unwrap();
tracing::info!("{:?}", chunk);
content.lock().push_str(&chunk);
std::ops::ControlFlow::Continue(())
}
}
}
},
);
}
As of current, this is only resolving after the request has completed. Note that I'm using a streaming response from actix:
#[post("/reportstream")]
pub async fn report(payload: web::Payload) -> impl Responder {
let ReportDeps {
doctor_input,
age,
name,
sex,
date,
} = type_cast::<ReportDeps>(payload).await;
let (mut tx, rx) = futures::channel::mpsc::unbounded::<
std::result::Result<actix_web::web::Bytes, std::io::Error>,
>();
let mut result = client.chat().create_stream(request).await.unwrap();
tokio::task::spawn(async move {
while let Some(message) = result.next().await {
let message = match message {
Ok(message) => {
if let Some(message) = message.choices[0].delta.content.clone() {
message.clone()
} else {
String::new()
}
}
Err(e) => e.to_string(),
};
let _ = tx.start_send(Ok(Bytes::from(message)));
}
});
/// This works and results in a streamed response
HttpResponse::Ok().streaming(rx)
}
![image](https://private-user-images.githubusercontent.com/81748812/269555083-db8fa147-c79a-4c01-84d9-97d3e5b59e17.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MTgyMzAwNzksIm5iZiI6MTcxODIyOTc3OSwicGF0aCI6Ii84MTc0ODgxMi8yNjk1NTUwODMtZGI4ZmExNDctYzc5YS00YzAxLTg0ZDktOTdkM2U1YjU5ZTE3LnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA2MTIlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNjEyVDIyMDI1OVomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPWUyNTM3MzgyYmZkN2I1MWUwNGUxOTJhYmQzYjI1YWM1Y2IzZmFkYjRhMmFkMGQ1NmFlODJkNTlhZDExNDBiNTcmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.S0eB7dg4orXeei3__p9rW0Kb5FFByGzYrtaO2Dz9cCI)
As you see, the response is coming in, but its chunks are not being handled as they are coming in, instead, the streaming api is waiting until the end of the response.
Note that curl --no-buffer "127.0.0.1:8000/reportstream"
gives a chunked response.