How do I perform a half-close on a gRPC bidirectional stream using Tonic?

1k views Asked by At

I am implementing a Rust library for Google's Dialogflow streaming detect intent utilizing gRPC using the Tonic library.

Streaming works fine, but I need to send a half-close call once all audio data are pushed into stream. Without this, the detect intent stream will not call intent detection with the final text and will time out complaining that no audio data have been provided for more than 11 seconds.

How do I send a half-close via Tonic? I need something like this Golang example.

My rust code below. Note: dropping the sender explicitly does not seem to make any difference. The result will be same (timeout, API never recognizes that no more data will arrive and hence it should already do NLP).

use google_cognitive_apis::api::grpc::google::cloud::dialogflow::v2beta1::{
    query_input::Input, InputAudioConfig, QueryInput, StreamingDetectIntentRequest,
};
use google_cognitive_apis::dialogflow::sessions_client::SessionsClient;

use log::*;
use std::env;
use std::fs::{self, File};
use std::io::Read;

#[tokio::main]
async fn main() {
    env::set_var("RUST_LOG", "info");
    env_logger::init();
    info!("sessions_client_streaming_detect_intent example");

    let credentials = fs::read_to_string("/tmp/gcp-cred.json").unwrap();

    let guid = "10db5977-7f28-4a57-92fb-88459ff8c239";
    let session_id = SessionsClient::get_session_string("gcp-proj-id", guid);

    #[allow(deprecated)]
    let streaming_detect_intent_req = StreamingDetectIntentRequest {
        session: session_id.to_owned(),
        query_params: None,
        query_input: Some(QueryInput {
            input: Some(Input::AudioConfig(InputAudioConfig {
                audio_encoding: 1, // linear16
                sample_rate_hertz: 8000,
                language_code: "en".to_owned(),
                enable_word_info: false,
                phrase_hints: vec![],
                speech_contexts: vec![],
                model: "".to_string(),
                model_variant: 0,
                single_utterance: false,
                disable_no_speech_recognized_event: false,
            })),
        }),
        single_utterance: false,
        output_audio_config: None,
        output_audio_config_mask: None,
        input_audio: vec![],
    };

    let mut sessions_client =
        SessionsClient::create_async(credentials, streaming_detect_intent_req, None)
            .await
            .unwrap();

    let audio_sender = sessions_client.get_audio_sink().unwrap();

    let mut result_receiver = sessions_client.get_streaming_result_receiver(None);

    tokio::spawn(async move {
        let recognition_result = sessions_client.streaming_detect_intent().await;

        match recognition_result {
            Err(err) => error!("streaming_detect_intent error {:?}", err),
            Ok(_) => info!("streaming_detect_intent ok!"),
        }
    });

    tokio::spawn(async move {
        let mut file = File::open("/tmp/hello_rust_8.wav").unwrap();
        let chunk_size = 1024;

        loop {
            let mut chunk = Vec::with_capacity(chunk_size);
            let n = file
                .by_ref()
                .take(chunk_size as u64)
                .read_to_end(&mut chunk)
                .unwrap();
            if n == 0 {
                break;
            }

            let streaming_request =
                SessionsClient::streaming_request_from_bytes(session_id.to_string(), chunk);

            audio_sender.send(streaming_request).await.unwrap();

            if n < chunk_size {
                drop(audio_sender);
                info!("audio sender dropped");
                break;
            }
        }
    });

    while let Some(reco_result) = result_receiver.recv().await {
        info!("recognition result {:?}", reco_result);
    }
}

Sample log:

[2021-05-21T10:15:15Z INFO sessions_client_streaming_detect_intent] sessions_client_streaming_detect_intent example

[2021-05-21T10:15:16Z INFO sessions_client_streaming_detect_intent] audio sender dropped

[2021-05-21T10:15:16Z INFO sessions_client_streaming_detect_intent] recognition result StreamingDetectIntentResponse { response_id: "", recognition_result: Some(StreamingRecognitionResult { message_type: Transcript, transcript: "he llo", is_final: false, confidence: 0.0, stability: 0.01, speech_word_info: [], speech_end_offset: Some(Duration { seconds: 0, nanos: 480000000 }), dtmf_digits: None }), query_result: None, alternative_query_results: [], webhook_stat us: None, output_audio: [], output_audio_config: None }

[2021-05-21T10:15:17Z INFO sessions_client_streaming_detect_intent] recognition result StreamingDetectIntentResponse { response_id: "", recognition_result: Some(StreamingRecognitionResult { message_type: Transcript, transcript: "he llo Ray", is_final: false, confidence: 0.0, stability: 0.01, speech_word_info: [], speech_end_offset: Some(Duration { seconds: 0, nanos: 930000000 }), dtmf_digits: None }), query_result: None, alternative_query_results: [], webhook_ status: None, output_audio: [], output_audio_config: None }

[2021-05-21T10:15:17Z INFO sessions_client_streaming_detect_intent] recognition result StreamingDetectIntentResponse { response_id: "", recognition_result: Some(StreamingRecognitionResult { message_type: Transcript, transcript: "he llo Russ", is_final: false, confidence: 0.0, stability: 0.01, speech_word_info: [], speech_end_offset: Some(Duration { seconds: 0, nanos: 960000000 }), dtmf_digits: None }), query_result: None, alternative_query_results: [], webhook _status: None, output_audio: [], output_audio_config: None } [2021-05-21T10:15:17Z INFO sessions_client_streaming_detect_intent] recognition result StreamingDetectIntentResponse { response_id: "", recognition_result: Some(StreamingRecognitionResult { message_type: Transcript, transcript: "he

llo Russ", is_final: false, confidence: 0.0, stability: 0.01, speech_word_info: [], speech_end_offset: Some(Duration { seconds: 1, nanos: 50000000 }), dtmf_digits: None }), query_result: None, alternative_query_results: [], webhook_ status: None, output_audio: [], output_audio_config: None }

[2021-05-21T10:15:17Z INFO sessions_client_streaming_detect_intent] recognition result StreamingDetectIntentResponse { response_id: "", recognition_result: Some(StreamingRecognitionResult { message_type: Transcript, transcript: "he llo rust", is_final: false, confidence: 0.0, stability: 0.01, speech_word_info: [], speech_end_offset: Some(Duration { seconds: 1, nanos: 260000000 }), dtmf_digits: None }), query_result: None, alternative_query_results: [], webhook _status: None, output_audio: [], output_audio_config: None }

[2021-05-21T10:15:27Z INFO sessions_client_streaming_detect_intent] recognition result StreamingDetectIntentResponse { response_id: "", recognition_result: Some(StreamingRecognitionResult { message_type: Transcript, transcript: "he llo rust", is_final: true, confidence: 0.60010684, stability: 0.0, speech_word_info: [], speech_end_offset: Some(Duration { seconds: 1, nanos: 380000000 }), dtmf_digits: None }), query_result: None, alternative_query_results: [], we bhook_status: None, output_audio: [], output_audio_config: None }

[2021-05-21T10:15:27Z ERROR sessions_client_streaming_detect_intent] streaming_detect_intent error Error { message: "status: OutOfRange, message: "While calling Cloud Speech API: Audio Timeout Error: Long duration elapsed without a udio. Audio should be sent close to real time.", details: [], metadata: MetadataMap { headers: {"grpc-server-stats-bin": "AAAhxq1zAgAAAA"} }", code: None }

1

There are 1 answers

0
jayy-lmao On

I've been following your project as I'm attempting to do something similar.

Now I'm not 100% certain but I believe the issue is that EncodeBody<S> in src/codec/encode.rs has a function called is_end_stream that always defaults to false. I think this might be the function that decides whether or not to set the END_STREAM flag within the data region of the http/2 packet.

This might be a bit difficult to modify as the stream currently only handles items of Result<Bytes, Status> (also seen in the above encode.rs). This doesn't appear to give much room to add in a conditional flag for the end-stream. Maybe another alternative would be to send an EmptyBody (defaults is_end_stream to true) or handle empty bytes by setting the flag to false.

The location this stream is passed in is within pub async fn streaming<S, M1, M2, C> in tonic's src/client/grpc.rs.

I've spent a couple hours of digging into it and probably can't spare much more in work hours. Letting you know in case it's of any help.