Is there a way to send a Flux or an Observable from frontend into an Rsocket requestChannel connection

54 views Asked by At

I am new to javascript and rsocket world.i am trying to initiate a requestchannel from my frontend client (reactjs). On my server side (spring boot), I have a MessageMapping as follows

    @MessageMapping("/sendMessage/{resourceId}")
    public Flux<ResponseDTO> sendMessage(@DestinationVariable String resourceId,
    Flux<RequestDTO> requestDto)

Now I was exploring how to pass a flux from the frontend rsocketjs. In the Api for requestChannel, i was trying to find if i can send some sort of a Publisher in the payload.

    requestChannel(payload: Payload, initialRequestN: number, isCompleted: boolean,    responderStream: OnTerminalSubscriber & OnNextSubscriber & OnExtensionSubscriber & Requestable & Cancellable): OnTerminalSubscriber & OnNextSubscriber & OnExtensionSubscriber & Requestable & Cancellable;

The source of the publisher would be a text input on the frontend and whenever a value is enetered into the same, the requestchannel would be invoked. I tried to use Subject from rxjs

Appreciate if someone can help validate if it is a valid idea or i am misunderstanding rsocketjs. i tried the below approach and it is failing to serialize the data

    const connector = new RSocketConnector({
    setup: {
    keepAlive: 100,
    lifetime: 10000,
    dataMimeType: "application/json",
    metadataMimeType: "message/x.rsocket.routing.v0",
    },
    transport: new WebsocketClientTransport({
    url: "ws://localhost:8080/rsocket",
    wsCreator: (url) => new WebSocket(url),
    }),
    });

    //const rsocket = await connector.connect();

    export async function getRsocketClient() {
    const cl = await connector.connect();
    return cl;
    }

    const textInputSubject = new Subject();

The code fails when trying to form the requestchannel as follows

    rsocketClient.client.requestChannel(
      {
        data: textInputSubject.asObservable().subscribe(), //Buffer.from(JSON.stringify({ sender:     "hello" })),
        metadata: Buffer.concat([
          Buffer.from(String.fromCharCode("/sendMessage/abc".length)),
          Buffer.from("/sendMessage/abc"),
        ]),
      },
      0,
      false,
      {
        onError: (e) => console.log(e), //reject(e)
        onNext: (payload, isComplete) => {
        },
        onComplete: () => {
        },
        onExtension: () => {},
        request: (n) => {
        },
        cancel: () => {},
      }
    );
    VM134:1 Uncaught ReferenceError: uint8array is not defined
    at eval (eval at checkInt (index.js:1396:1), <anonymous>:1:33)
    at checkInt (index.js:1396:1)
    at Uint8Array.writeInt32BE (index.js:1631:1)
    at writeHeader (Codecs.ts:1096:1)
    at serializeRequestManyFrame (Codecs.ts:788:1)
    at serializeFrame (Codecs.ts:237:1)
    at WebsocketDuplexConnection.send (WebsocketDuplexConnection.ts:78:1)
    at ClientServerInputMultiplexerDemultiplexer.send       (ClientServerMultiplexerDemultiplexer.ts:122:1)
    at RequestChannelRequesterStream.handleReady (RequestChannelStream.ts:121:1)
    at ClientServerMultiplexerDemultiplexer.ts:140:1

Thanks in advance for your help!

0

There are 0 answers