Using StartCallAnalyticsStreamTranscription with Java AWS SDK

138 views Asked by At

How to send ConfigurationEvent to StartCallAnalyticsStreamTranscriptionRequest? I've configured following request but I'm unable to figure out how to send ConfigurationEvent object to body of this request.

AudioInputStream audioInputStream = 
AudioSystem.getAudioInputStream(inputFile);
int channelId = 0;
ParticipantRole participantRole = ParticipantRole.AGENT;
ChannelDefinition channelDefinition = ChannelDefinition.builder()
                .channelId(channelId)
                .participantRole(participantRole)
                .build();
ConfigurationEvent configurationEvent = ConfigurationEvent.builder()
                .channelDefinitions(channelDefinition)
                .build();
        AudioFormat audioFormat = audioInputStream.getFormat();
StartCallAnalyticsStreamTranscriptionRequest.builder()
                .mediaSampleRateHertz(8000)
                .enablePartialResultsStabilization(true)
                .partialResultsStability("high")
                .languageCode(CallAnalyticsLanguageCode.EN_US)
                .mediaEncoding(getAwsMediaEncoding(audioFormat))
            .overrideConfiguration(AwsRequestOverrideConfiguration.builder()
                        .putExecutionAttribute(new ExecutionAttribute<>("ConfigurationEvent"), configurationEvent).build())
                .build();

Whenever I'm invoking it, I'm getting following exception:

Caused by: software.amazon.awssdk.services.transcribestreaming.model.BadRequestException: You must provide ConfigurationEvent as the first event in the stream (Service: transcribe, Status Code: 400, Request ID: e66c528c-55ae-4bf4-a0da-aa2d389ec1cf)

Here's complete code:

private static StartCallAnalyticsStreamTranscriptionResponseHandler getAnalyticsResponseHandler(String file) {
        return StartCallAnalyticsStreamTranscriptionResponseHandler.builder()
                .onError(e-> {
                    System.out.println(e.getMessage());
                    StringWriter sw = new StringWriter();
                    e.printStackTrace(new PrintWriter(sw));
                    System.out.println("Error Occurred: " + sw);
                })
                .onComplete(() -> System.out.println("====Analytics Completed.===="))
                .onResponse(res -> System.out.println("contentRedactionTypeAsString----" + res.contentRedactionTypeAsString()))
                .onEventStream(event -> {
                    event.doAfterOnError(e -> {
                        System.out.println(e.getMessage());
                        StringWriter sw = new StringWriter();
                        e.printStackTrace(new PrintWriter(sw));
                        System.out.println("Error Occurred: " + sw);
                    });
                    event.doAfterOnComplete(() -> System.out.println("====Analytics Completed.===="));
                    var eventType = event.map(CallAnalyticsTranscriptResultStream::sdkEventType);
                    event.subscribe(c -> System.out.println(c.sdkEventType()));
                    System.out.println("EventType:" + eventType);
                    //var analyticsSettings = ((ConfigurationEvent)event).postCallAnalyticsSettings();
                    System.out.println("------UtteranceEvent------");
                    System.out.println();
                })
                .build();
    }

private static class CallAnalyticsAudioStreamPublisher implements Publisher<AudioStream> {
        private final ConfigurationEvent configurationEvent;
        private final InputStream inputStream;
        private static Subscription currentSubscription;
        private CallAnalyticsAudioStreamPublisher(String fileName, InputStream inputStream, ConfigurationEvent configurationEvent) {
            this.configurationEvent = configurationEvent;
            this.inputStream = inputStream;
        }
        @Override
        public void subscribe(Subscriber<? super AudioStream> s) {
            if (currentSubscription == null) {
                currentSubscription = new SubscriptionImpl(s, inputStream);
            } else {
                currentSubscription.cancel();
                currentSubscription = new SubscriptionImpl(s, inputStream);
            }
            s.onSubscribe(currentSubscription);
        }
    }

    public static class SubscriptionImpl implements Subscription {
        private static final int CHUNK_SIZE_IN_BYTES = 1024;
        private final Subscriber<? super AudioStream> subscriber;
        private final InputStream inputStream;
        private final ExecutorService executor = Executors.newFixedThreadPool(1);
        private final AtomicLong demand = new AtomicLong(0);

        SubscriptionImpl(Subscriber<? super AudioStream> s, InputStream inputStream) {
            this.subscriber = s;
            this.inputStream = inputStream;
        }

        @Override
        public void request(long n) {
            if (n <= 0) {
                subscriber.onError(new IllegalArgumentException("Demand must be positive"));
            }
            demand.getAndAdd(n);
            executor.submit(() -> {
                try {
                    do {
                        ByteBuffer audioBuffer = getNextEvent();
                        if (audioBuffer.remaining() > 0) {
                            AudioEvent audioEvent = audioEventFromBuffer(audioBuffer);
                            subscriber.onNext(audioEvent);
                        } else {
                            subscriber.onComplete();
                            break;
                        }
                    } while (demand.decrementAndGet() > 0);
                } catch (Exception e) {
                    subscriber.onError(e);
                }
            });
        }

        @Override
        public void cancel() {
            executor.shutdown();
        }

        private ByteBuffer getNextEvent() throws IOException {
            ByteBuffer audioBuffer = null;
            byte[] audioBytes = new byte[CHUNK_SIZE_IN_BYTES];
            int len;
            try {
                len = inputStream.read(audioBytes);

                if (len <= 0) {
                    audioBuffer = ByteBuffer.allocate(0);
                } else {
                    audioBuffer = ByteBuffer.wrap(audioBytes, 0, len);
                }
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
            return audioBuffer;
        }

        private AudioEvent audioEventFromBuffer(ByteBuffer bb) {
            return AudioEvent.builder()
                    .audioChunk(SdkBytes.fromByteBuffer(bb))
                    .build();
        }
    }
private static MediaEncoding getAwsMediaEncoding(AudioFormat audioFormat) {
        final String javaMediaEncoding = audioFormat.getEncoding().toString();
        System.out.println("Java Media Encoding:" + javaMediaEncoding);
        if (PCM_SIGNED.toString().equals(javaMediaEncoding)) {
            return MediaEncoding.PCM;
        } else if (PCM_UNSIGNED.toString().equals(javaMediaEncoding)){
            return MediaEncoding.PCM;
        }
        throw new IllegalArgumentException("Not a recognized media encoding:" + javaMediaEncoding);
    }

    private static Integer getAwsSampleRate(AudioFormat audioFormat) {
        return Math.round(audioFormat.getSampleRate());
    }
private static StartCallAnalyticsStreamTranscriptionRequest getAnalyticsRequest(File inputFile) throws IOException, UnsupportedAudioFileException {
        AudioInputStream audioInputStream = AudioSystem.getAudioInputStream(inputFile);
        int channelId = 0;
        ParticipantRole participantRole = ParticipantRole.AGENT;
        if(inputFile.getName().contains("_R")) {
            channelId = 1;
            participantRole = ParticipantRole.CUSTOMER;
        }
        ChannelDefinition channelDefinition = ChannelDefinition.builder()
                .channelId(channelId)
                .participantRole(participantRole)
                .build();
        ConfigurationEvent configurationEvent = ConfigurationEvent.builder()
                .channelDefinitions(channelDefinition)
                .build();
        AudioFormat audioFormat = audioInputStream.getFormat();
        return StartCallAnalyticsStreamTranscriptionRequest.builder()
                .mediaSampleRateHertz(8000)
                .enablePartialResultsStabilization(true)
                .partialResultsStability("high")
                .languageCode(CallAnalyticsLanguageCode.EN_US)
                .mediaEncoding(getAwsMediaEncoding(audioFormat))
                .overrideConfiguration(AwsRequestOverrideConfiguration.builder()
                        .putExecutionAttribute(new ExecutionAttribute<>("ConfigurationEvent"), configurationEvent).build())
                .build();
    }
private static InputStream getStreamFromFile(String file) {
        try {
            File inputFile = new File(file);
            return new FileInputStream(inputFile);

        } catch (FileNotFoundException e) {
            throw new RuntimeException(e);
        }
    }

    private static ConfigurationEvent getConfigurationEvent(String inputFile) {
        int channelId = 0;
        ParticipantRole participantRole = ParticipantRole.AGENT;
        if(inputFile.contains("_R")) {
            channelId = 1;
            participantRole = ParticipantRole.CUSTOMER;
        }
        ChannelDefinition channelDefinition = ChannelDefinition.builder()
                .channelId(channelId)
                .participantRole(participantRole)
                .build();
        return ConfigurationEvent.builder()
                .channelDefinitions(channelDefinition)
                .build();
    }

public static void main(String[] args) throws URISyntaxException {
        var resources = Paths.get(...);
        var fileList = resources.toFile().listFiles();
        var wavFiles = Arrays.stream(fileList).filter(it -> it.getName().contains(".wav")).collect(Collectors.toList());
        AwsCredentialsProvider provider = EnvironmentVariableCredentialsProvider.create();
        NettyNioAsyncHttpClient.Builder customHttp = NettyNioAsyncHttpClient.builder()
                .connectionAcquisitionTimeout(Duration.ofMinutes(10))
                .connectionTimeout(Duration.ofMinutes(10))
                .writeTimeout(Duration.ofMinutes(10))
                .tcpKeepAlive(true);
        TranscribeStreamingAsyncClient client = TranscribeStreamingAsyncClient.builder()
                .region(REGION)
                .credentialsProvider(provider)
                .httpClientBuilder(customHttp)
                .build();
        wavFiles.forEach(f -> {
            try {
                String dest = f.getParentFile().getParentFile().getPath().concat("\\processed\\");
                String interimFile = dest.concat("\\interim\\").concat(f.getName().substring(0, f.getName().indexOf(".")).concat("-interim.txt"));
                String file = dest.concat(f.getName().substring(0, f.getName().indexOf(".")).concat(".txt"));
                System.out.println("###############################");
                var completableFuture = client.startCallAnalyticsStreamTranscription(getAnalyticsRequest(new File(f.toURI()))
                        , new CallAnalyticsAudioStreamPublisher(f.getName(), getStreamFromFile(String.valueOf(f)), getConfigurationEvent(f.getName())),
                        getAnalyticsResponseHandler(interimFile));
                completableFuture.get();
            } catch (InterruptedException | ExecutionException | IOException | UnsupportedAudioFileException e) {
                throw new RuntimeException(e);
            }
        });
        client.close();
    }
1

There are 1 answers

0
Diarmuid Leonard On

As per the example at https://docs.aws.amazon.com/lexv2/latest/dg/using-streaming-api.html, class:EventWriter method:writeConfigurationEvent, in the context of the SubscriptionImpl executor thread, you need to send a ConfigurationEvent using

 subscriber.onNext(configurationEvent);

before the first invocation of

 subscriber.onNext(audioEvent);

My solution seems like an awkward workaround, I don't know why it is necessary given that the StartCallAnalyticsStreamTranscriptionRequest can include the ConfigurationEvent that describes the audio stream channel and other settings. ... AWS Java-SDK bug ?