everyone I try to call @MessageMapping endpoint using RSocketRequester. My controller class which contains endpoint I want to be called is
@RestController
public class SomeController {
private static final Logger LOG = LoggerFactory.getLogger(SomeController.class);
private TronCache<String, RSocketConsumer> consumersMapWarpper;
private Map<String, RSocketConsumer> consumersMap;
private Map<String, CacheMetaInfo> cacheMetaInfoMap = new HashMap();
private static final String GET_METADATA = "getMetadata";
public SomeController() {
}
@PostConstruct
public void init() {
LOG.info("init in SomeController");
}
@ConnectMapping({"handshake"})
public void connect(RSocketRequester requester, @Payload RefDataRequest payload) {
LOG.info("Client Connection Handshake: [{}]", payload.getCallerName());
this.validateConnection(payload);
requester.route("handshake", new Object[0]).data(RefDataResponse.HANDSHAKE.callerName(this.instanceConfig.getPodName())).retrieveMono(ConsumerNotification.class).subscribe((c) -> {
LOG.info("SOD Status[{}] Consumer[{}]", c.isSodCompleted(), c.getCallerName());
RSocketConsumer consumer = new RSocketConsumer(payload.getCallerName(), c, requester);
this.consumersMap.computeIfAbsent(payload.getCallerName(), (v) -> {
return consumer;
});
this.setDisconnectionHandler(consumer);
this.itrsUpdater.publishAddConsumer(payload.getCallerName());
});
}
@ConnectMapping({"genericConnection"})
public void connectForProperty(RSocketRequester requester, @Payload RefDataRequest payload) {
LOG.info("Client Connection : [{}] [{}] ", "Generic [One time] Connection", payload.getCallerName());
this.validateConnection(payload);
}
@MessageMapping ({"test"})
public Mono<String> getString(String s) {
LOG.info("test in RefDataController");
return Mono.just("string");
}
}
My class in another application which calls endpoint is
public class RSocketConnector {
private static final Logger LOG = LoggerFactory.getLogger(RSocketRefDataConnector.class);
private RSocketRequester requester;
private boolean initialized;
private final String host;
private final int port;
private final String podName;
private final String environment;
private final String setupRoute;
private final String encoders;
private final String decoders;
public String getHost() {
return host;
}
/**
* @param host
* @param port
* @param podName
* @param environment
* @param encoders
* @param decoders
*/
private RSocketRefDataConnector(String host,
int port,
String podName,
String environment,
String setupRoute,
String encoders,
String decoders) {
this.host = host;
this.port = port;
this.podName = podName;
this,setupRoute = setupRoute;
this.environment = environment;
this.encoders=StringUtils.defaultIfBlank(encoders,Jackson2CborEncoder.class.getName());
this.decoders=StringUtils.defaultIfBlank(decoders,Jackson2CborDecoder.class.getName());
}
public void initialize() {
RSocketStrategies strategies;
try {
strategies = new RSocketConfig().rsocketCustomEncoderDecoder(encoders,decoders);
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
ClientTransport transport = this.createTransport(host, port);
requester = RSocketRequester.builder()
.rsocketConnector(connector -> {
connector.reconnect(getRetrySpec())
.acceptor(RSocketMessageHandler.responder(strategies, this))
.payloadDecoder(PayloadDecoder.ZERO_COPY);
})
.dataMimeType(MediaType.APPLICATION_CBOR)
.setupRoute(this.setupRoute)
.setupData(RefDataRequest.SIGNAL.handShake(podName, environment, TronRuntimeConfig.hostInfo()))
.rsocketStrategies(strategies)
.transport(transport);
if (!isOneTimeConnection) {
setDisconnectionHandler();
//Heartbeat check
Flux.interval(Duration.ofMinutes(1))
.doOnNext(l -> heartbeatCheck())
.subscribe();
initialized = true;
}
//calling endpoint
requester.route("test").data("hello")
.retrieveMono(String.class)
.doOnNext(o->LOG.info("inbound message"))
.onErrorContinue((e,t)->LOG.info("contine on erro"))
.doOnSuccess(r -> LOG.info("message from test doOnSuccess is " + r))
.subscribe();
}
/**
* @param host
* @param port
* @return
*/
private ClientTransport createTransport(String host, int port) {
if (isProtocolWebsocket()) {
LOG.info("SSL : {}", this.ssl);
if (this.ssl) {
LOG.info("Initializing refdata websocket secure connection [{}]", String.format(WSS_URL_PATTERN, host, port));
return WebsocketClientTransport.create(
HttpClient.create()
.baseUrl(String.format(WSS_URL_PATTERN, host, port))
.secure((s) -> s.sslContext(buildSslContext()))
, WS_PATH);
} else {
LOG.info("Initializing refdata websocket connection [{}]", String.format(WS_URL_PATTERN, host, port));
return WebsocketClientTransport.create(URI.create(String.format(WS_URL_PATTERN, host, port)));
}
} else {
LOG.info("Initializing refdata TCP connection [{}:{}]", host, port);
return TcpClientTransport.create(host, port);
}
}
/**
* @return
*/
private SslContext buildSslContext() {
try {
KeyManager[] keyManagers = SSLUtils.getKeyManagers(ResourceUtils.getURL(trustStore).openStream(), trustStoreSafeNet, trustStoreSafeNet);
TrustManager[] trustManagers = InsecureTrustManagerFactory.INSTANCE.getTrustManagers();//SSLUtils.getTrustManagers(ResourceUtils.getURL(trustStore).openStream(), trustStoreSafeNet);
return SslContextBuilder.forClient()
.clientAuth(ClientAuth.REQUIRE)
.keyManager(keyManagers[0])
.trustManager(trustManagers[0])
.build();
} catch (Exception e) {
LOG.error("Exception occurred while building SSL Context [{}]", ExceptionUtils.getStackTrace(e));
}
return null;
}
/**
*
*/
public void heartbeatCheck() {
requester.route(ROUTE_HEARTBEAT)
.data(heartbeatRequest)
.retrieveMono(RefDataResponse.class)
.subscribe(r -> connectionFailed.set(false));
}
/**
* @return
*/
private Retry getRetrySpec() {
if (retryLimit == 0;
return Retry
.fixedDelay(Integer.MAX_VALUE, Duration.ofSeconds(retryInterval))
.scheduler(Schedulers.single())
//.doBeforeRetry(e -> connectionFailed.set(true))
.doAfterRetry(e -> {
LOG.warn("doAfterRetry===>{}", e);
this.connectionFailed.set(true);
});
else
return Retry
.fixedDelay(retryLimit, Duration.ofSeconds(retryInterval))
.scheduler(Schedulers.single())
//.doBeforeRetry(e -> connectionFailed.set(true))
.doAfterRetry(e -> {
LOG.warn("doAfterRetry===>{}", e);
this.connectionFailed.set(true);
})
.onRetryExhaustedThrow((rb, rs) -> new RefDataException("Unable to establish connection with RefData server. # of retry attempted [" + retryLimit + "]"));
}
}
I expect to receive "string" after calling "test" endpoint. Now my connection is getting closed without returning anything