How to send ConfigurationEvent
to StartCallAnalyticsStreamTranscriptionRequest
? I've configured following request but I'm unable to figure out how to send ConfigurationEvent
object to body of this request.
AudioInputStream audioInputStream =
AudioSystem.getAudioInputStream(inputFile);
int channelId = 0;
ParticipantRole participantRole = ParticipantRole.AGENT;
ChannelDefinition channelDefinition = ChannelDefinition.builder()
.channelId(channelId)
.participantRole(participantRole)
.build();
ConfigurationEvent configurationEvent = ConfigurationEvent.builder()
.channelDefinitions(channelDefinition)
.build();
AudioFormat audioFormat = audioInputStream.getFormat();
StartCallAnalyticsStreamTranscriptionRequest.builder()
.mediaSampleRateHertz(8000)
.enablePartialResultsStabilization(true)
.partialResultsStability("high")
.languageCode(CallAnalyticsLanguageCode.EN_US)
.mediaEncoding(getAwsMediaEncoding(audioFormat))
.overrideConfiguration(AwsRequestOverrideConfiguration.builder()
.putExecutionAttribute(new ExecutionAttribute<>("ConfigurationEvent"), configurationEvent).build())
.build();
Whenever I'm invoking it, I'm getting following exception:
Caused by: software.amazon.awssdk.services.transcribestreaming.model.BadRequestException: You must provide ConfigurationEvent as the first event in the stream (Service: transcribe, Status Code: 400, Request ID: e66c528c-55ae-4bf4-a0da-aa2d389ec1cf)
Here's complete code:
private static StartCallAnalyticsStreamTranscriptionResponseHandler getAnalyticsResponseHandler(String file) {
return StartCallAnalyticsStreamTranscriptionResponseHandler.builder()
.onError(e-> {
System.out.println(e.getMessage());
StringWriter sw = new StringWriter();
e.printStackTrace(new PrintWriter(sw));
System.out.println("Error Occurred: " + sw);
})
.onComplete(() -> System.out.println("====Analytics Completed.===="))
.onResponse(res -> System.out.println("contentRedactionTypeAsString----" + res.contentRedactionTypeAsString()))
.onEventStream(event -> {
event.doAfterOnError(e -> {
System.out.println(e.getMessage());
StringWriter sw = new StringWriter();
e.printStackTrace(new PrintWriter(sw));
System.out.println("Error Occurred: " + sw);
});
event.doAfterOnComplete(() -> System.out.println("====Analytics Completed.===="));
var eventType = event.map(CallAnalyticsTranscriptResultStream::sdkEventType);
event.subscribe(c -> System.out.println(c.sdkEventType()));
System.out.println("EventType:" + eventType);
//var analyticsSettings = ((ConfigurationEvent)event).postCallAnalyticsSettings();
System.out.println("------UtteranceEvent------");
System.out.println();
})
.build();
}
private static class CallAnalyticsAudioStreamPublisher implements Publisher<AudioStream> {
private final ConfigurationEvent configurationEvent;
private final InputStream inputStream;
private static Subscription currentSubscription;
private CallAnalyticsAudioStreamPublisher(String fileName, InputStream inputStream, ConfigurationEvent configurationEvent) {
this.configurationEvent = configurationEvent;
this.inputStream = inputStream;
}
@Override
public void subscribe(Subscriber<? super AudioStream> s) {
if (currentSubscription == null) {
currentSubscription = new SubscriptionImpl(s, inputStream);
} else {
currentSubscription.cancel();
currentSubscription = new SubscriptionImpl(s, inputStream);
}
s.onSubscribe(currentSubscription);
}
}
public static class SubscriptionImpl implements Subscription {
private static final int CHUNK_SIZE_IN_BYTES = 1024;
private final Subscriber<? super AudioStream> subscriber;
private final InputStream inputStream;
private final ExecutorService executor = Executors.newFixedThreadPool(1);
private final AtomicLong demand = new AtomicLong(0);
SubscriptionImpl(Subscriber<? super AudioStream> s, InputStream inputStream) {
this.subscriber = s;
this.inputStream = inputStream;
}
@Override
public void request(long n) {
if (n <= 0) {
subscriber.onError(new IllegalArgumentException("Demand must be positive"));
}
demand.getAndAdd(n);
executor.submit(() -> {
try {
do {
ByteBuffer audioBuffer = getNextEvent();
if (audioBuffer.remaining() > 0) {
AudioEvent audioEvent = audioEventFromBuffer(audioBuffer);
subscriber.onNext(audioEvent);
} else {
subscriber.onComplete();
break;
}
} while (demand.decrementAndGet() > 0);
} catch (Exception e) {
subscriber.onError(e);
}
});
}
@Override
public void cancel() {
executor.shutdown();
}
private ByteBuffer getNextEvent() throws IOException {
ByteBuffer audioBuffer = null;
byte[] audioBytes = new byte[CHUNK_SIZE_IN_BYTES];
int len;
try {
len = inputStream.read(audioBytes);
if (len <= 0) {
audioBuffer = ByteBuffer.allocate(0);
} else {
audioBuffer = ByteBuffer.wrap(audioBytes, 0, len);
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
return audioBuffer;
}
private AudioEvent audioEventFromBuffer(ByteBuffer bb) {
return AudioEvent.builder()
.audioChunk(SdkBytes.fromByteBuffer(bb))
.build();
}
}
private static MediaEncoding getAwsMediaEncoding(AudioFormat audioFormat) {
final String javaMediaEncoding = audioFormat.getEncoding().toString();
System.out.println("Java Media Encoding:" + javaMediaEncoding);
if (PCM_SIGNED.toString().equals(javaMediaEncoding)) {
return MediaEncoding.PCM;
} else if (PCM_UNSIGNED.toString().equals(javaMediaEncoding)){
return MediaEncoding.PCM;
}
throw new IllegalArgumentException("Not a recognized media encoding:" + javaMediaEncoding);
}
private static Integer getAwsSampleRate(AudioFormat audioFormat) {
return Math.round(audioFormat.getSampleRate());
}
private static StartCallAnalyticsStreamTranscriptionRequest getAnalyticsRequest(File inputFile) throws IOException, UnsupportedAudioFileException {
AudioInputStream audioInputStream = AudioSystem.getAudioInputStream(inputFile);
int channelId = 0;
ParticipantRole participantRole = ParticipantRole.AGENT;
if(inputFile.getName().contains("_R")) {
channelId = 1;
participantRole = ParticipantRole.CUSTOMER;
}
ChannelDefinition channelDefinition = ChannelDefinition.builder()
.channelId(channelId)
.participantRole(participantRole)
.build();
ConfigurationEvent configurationEvent = ConfigurationEvent.builder()
.channelDefinitions(channelDefinition)
.build();
AudioFormat audioFormat = audioInputStream.getFormat();
return StartCallAnalyticsStreamTranscriptionRequest.builder()
.mediaSampleRateHertz(8000)
.enablePartialResultsStabilization(true)
.partialResultsStability("high")
.languageCode(CallAnalyticsLanguageCode.EN_US)
.mediaEncoding(getAwsMediaEncoding(audioFormat))
.overrideConfiguration(AwsRequestOverrideConfiguration.builder()
.putExecutionAttribute(new ExecutionAttribute<>("ConfigurationEvent"), configurationEvent).build())
.build();
}
private static InputStream getStreamFromFile(String file) {
try {
File inputFile = new File(file);
return new FileInputStream(inputFile);
} catch (FileNotFoundException e) {
throw new RuntimeException(e);
}
}
private static ConfigurationEvent getConfigurationEvent(String inputFile) {
int channelId = 0;
ParticipantRole participantRole = ParticipantRole.AGENT;
if(inputFile.contains("_R")) {
channelId = 1;
participantRole = ParticipantRole.CUSTOMER;
}
ChannelDefinition channelDefinition = ChannelDefinition.builder()
.channelId(channelId)
.participantRole(participantRole)
.build();
return ConfigurationEvent.builder()
.channelDefinitions(channelDefinition)
.build();
}
public static void main(String[] args) throws URISyntaxException {
var resources = Paths.get(...);
var fileList = resources.toFile().listFiles();
var wavFiles = Arrays.stream(fileList).filter(it -> it.getName().contains(".wav")).collect(Collectors.toList());
AwsCredentialsProvider provider = EnvironmentVariableCredentialsProvider.create();
NettyNioAsyncHttpClient.Builder customHttp = NettyNioAsyncHttpClient.builder()
.connectionAcquisitionTimeout(Duration.ofMinutes(10))
.connectionTimeout(Duration.ofMinutes(10))
.writeTimeout(Duration.ofMinutes(10))
.tcpKeepAlive(true);
TranscribeStreamingAsyncClient client = TranscribeStreamingAsyncClient.builder()
.region(REGION)
.credentialsProvider(provider)
.httpClientBuilder(customHttp)
.build();
wavFiles.forEach(f -> {
try {
String dest = f.getParentFile().getParentFile().getPath().concat("\\processed\\");
String interimFile = dest.concat("\\interim\\").concat(f.getName().substring(0, f.getName().indexOf(".")).concat("-interim.txt"));
String file = dest.concat(f.getName().substring(0, f.getName().indexOf(".")).concat(".txt"));
System.out.println("###############################");
var completableFuture = client.startCallAnalyticsStreamTranscription(getAnalyticsRequest(new File(f.toURI()))
, new CallAnalyticsAudioStreamPublisher(f.getName(), getStreamFromFile(String.valueOf(f)), getConfigurationEvent(f.getName())),
getAnalyticsResponseHandler(interimFile));
completableFuture.get();
} catch (InterruptedException | ExecutionException | IOException | UnsupportedAudioFileException e) {
throw new RuntimeException(e);
}
});
client.close();
}
As per the example at https://docs.aws.amazon.com/lexv2/latest/dg/using-streaming-api.html, class:EventWriter method:writeConfigurationEvent, in the context of the SubscriptionImpl executor thread, you need to send a ConfigurationEvent using
before the first invocation of
My solution seems like an awkward workaround, I don't know why it is necessary given that the StartCallAnalyticsStreamTranscriptionRequest can include the ConfigurationEvent that describes the audio stream channel and other settings. ... AWS Java-SDK bug ?