Linked Questions

Popular Questions

I have custom objects: CustomKey, CustomValue which I provided coder via Avro: CustomKeyCoder, CustomValueCoder.

Since I need to group by KV[CustomKey, CustomValue], I registered KVCoder.of(new CustomKeyCoder, new CustomValueCoder). Custom coders wraps in/out stream to data in/out stream and uses Avro Datum Writer/Reader.

Issue I am having is in the decode of the KVCoder, when we attempt to decode value part of KV I get Forbidden IOException when reading from InputStream. As noted, key part of decoding works properly, error is thrown when input stream is passed into decoding value. KVCoder reuses same input stream for both key and value I am guessing key decoding reads entire stream. Why would this be happening? Is usage of Avro a problem?

Here is some code to showcase above:

  //Coder
  override def decode(inputStream: InputStream): CustomValue = {
    val dataInputStream = new DataInputStream(inputStream)
    val id = dataInputStream.readShort
    underlying.decode(dataInputStream)
  }

 //Underlying
  override def decode(inputStream: InputStream): CustomValue = {
    val decoder = DecoderFactory.get().binaryDecoder(inputStream, null)
    val record = datumReader.read(null, decoder)
    CustomValue.decode(record)
  }

Related Questions