How to read data serialized with Chronicle Wire from InputStream?

1k views Asked by At

Some data are serialized to an outputstream via Chronicle Wire.

Object m = ... ;
OutputStream out = ... ;

WireType.RAW                               //
        .apply(Bytes.elasticByteBuffer())  //
        .getValueOut().object(m)           //
        .bytes().copyTo(out)
;

I want to get them back from an Inputstream.

InputStream in = ... ;

WireType.RAW
        .apply(Bytes.elasticByteBuffer())
        .getValueIn()
        ???
;

Object m = ???; // How to initialize m ?

How to read my initial object m from in ?

1

There are 1 answers

6
Peter Lawrey On BEST ANSWER

There is an assumption you will have some idea of how long the data is and read it in one go. It is also assumed you will want to reuse the buffers to avoid creating garbage. To minimise latency data is typical read to/from NIO Channels.

I have raised an issue to create this example, Improve support for Input/OutputStream and non Marshallable objects https://github.com/OpenHFT/Chronicle-Wire/issues/111

This should do what you want efficiently without creating garbage each time.

package net.openhft.chronicle.wire;

import net.openhft.chronicle.bytes.Bytes;

import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;

public class WireToOutputStream {
    private final Bytes<ByteBuffer> bytes = Bytes.elasticHeapByteBuffer(128);
    private final Wire wire;
    private final DataOutputStream dos;

    public WireToOutputStream(WireType wireType, OutputStream os) {
        wire = wireType.apply(bytes);
        dos = new DataOutputStream(os);
    }

    public Wire getWire() {
        wire.clear();
        return wire;
    }

    public void flush() throws IOException {
        int length = Math.toIntExact(bytes.readRemaining());
        dos.writeInt(length);
        dos.write(bytes.underlyingObject().array(), 0, length);
    }
}

package net.openhft.chronicle.wire;

import net.openhft.chronicle.bytes.Bytes;

import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.StreamCorruptedException;
import java.nio.ByteBuffer;

public class InputStreamToWire {
    private final Bytes<ByteBuffer> bytes = Bytes.elasticHeapByteBuffer(128);
    private final Wire wire;
    private final DataInputStream dis;

    public InputStreamToWire(WireType wireType, InputStream is) {
        wire = wireType.apply(bytes);
        dis = new DataInputStream(is);
    }

    public Wire readOne() throws IOException {
        wire.clear();
        int length = dis.readInt();
        if (length < 0) throw new StreamCorruptedException();
        bytes.ensureCapacity(length);
        byte[] array = bytes.underlyingObject().array();
        dis.readFully(array, 0, length);
        bytes.readPositionRemaining(0, length);
        return wire;
    }
}

You can then do the following

package net.openhft.chronicle.wire;

import net.openhft.chronicle.core.util.ObjectUtils;
import org.junit.Test;

import java.io.IOException;
import java.io.Serializable;
import java.net.ServerSocket;
import java.net.Socket;

import static org.junit.Assert.assertEquals;

public class WireToOutputStreamTest {
    @Test
    public void testVisSocket() throws IOException {
        ServerSocket ss = new ServerSocket(0);
        Socket s = new Socket("localhost", ss.getLocalPort());
        Socket s2 = ss.accept();
        WireToOutputStream wtos = new WireToOutputStream(WireType.RAW, s.getOutputStream());

        Wire wire = wtos.getWire();
        AnObject ao = new AnObject();
        ao.value = 12345;
        ao.text = "Hello";
        // write the type is needed.
        wire.getValueOut().typeLiteral(AnObject.class);
        Wires.writeMarshallable(ao, wire);
        wtos.flush();

        InputStreamToWire istw = new InputStreamToWire(WireType.RAW, s2.getInputStream());
        Wire wire2 = istw.readOne();
        Class type = wire2.getValueIn().typeLiteral();
        Object ao2 = ObjectUtils.newInstance(type);
        Wires.readMarshallable(ao2, wire2, true);
        System.out.println(ao2);
        ss.close();
        s.close();
        s2.close();
        assertEquals(ao.toString(), ao2.toString());
    }

    public static class AnObject implements Serializable {
        long value;
        String text;

        @Override
        public String toString() {
            return "AnObject{" +
                    "value=" + value +
                    ", text='" + text + '\'' +
                    '}';
        }
    }
}

Sample code

 // On Sender side
 Object m = ... ;
 OutputStream out = ... ;

 WireToOutputStream wireToOutputStream = new 
 WireToOutputStream(WireType.TEXT, out);

 Wire wire = wireToOutputStream.getWire();
 wire.getValueOut().typeLiteral(m.getClass());
 Wires.writeMarshallable(m, wire);
 wireToOutputStream.flush();

 // On Receiver side
 InputStream in = ... ;

 InputStreamToWire inputStreamToWire = new InputStreamToWire(WireType.TEXT, in);

 Wire wire2 = inputStreamToWire.readOne();
 Class type = wire2.getValueIn().typeLiteral();
 Object m = ObjectUtils.newInstance(type);
 Wires.readMarshallable(m, wire2, true);

This code is a lot simpler if your DTO extends Marshallable but this will work whether you extend an interface or not. i.e. you don't need to extend Serializable.

Also if you know what the type will be you don't need to write it each time.

The helper classes above have been added to the latest SNAPSHOT