Request/Response socket with spring integration

1.9k views Asked by At

I have a spring boot sever application where I need to create an ssl socket and let a client connect to it. I want to be able to send the client a message (upon a REST API request to my server for example) and also read the response that it sends back.

What I managed to do so far is:

  1. Create the socket and allow the clients to connect (and save the connection id from the TcpEvent)
  2. Send a message on the socket upon request
  3. The client receives the request and send a response back

in this point- I'm not able to read their reply (and I do see that they send a reply using Wireshark). even though I configured a TcpReceivingChannelAdapter which uses the same connection factory as the TcpSendingMessageHandler.

From that point onward - I'm able to send requests but the client will not receive them...I suspect it is because his first response was not processed on my side (I have no access to the client code to verify that).

Here is my code:

Config.java

@EnableIntegration
@IntegrationComponentScan
@Configuration
public class SocketConfiguration implements ApplicationListener<TcpConnectionEvent> {

   private static final org.slf4j.Logger log = LoggerFactory.getLogger("SocketConfiguration");

   @Bean
   public AbstractServerConnectionFactory AbstractServerConnectionFactory() {
      TcpNetServerConnectionFactory tcpNetServerConnectionFactory = new TcpNetServerConnectionFactory(40003);
      DefaultTcpNetSSLSocketFactorySupport tcpNetSSLSocketFactory = tcpSocketFactorySupport();
      tcpNetServerConnectionFactory.setTcpSocketFactorySupport(tcpNetSSLSocketFactory);
      return tcpNetServerConnectionFactory;
   }


   @Bean
   public DefaultTcpNetSSLSocketFactorySupport tcpSocketFactorySupport() {
      DefaultTcpSSLContextSupport sslContextSupport = new DefaultTcpSSLContextSupport("keystore.jks",
            "trustStore.jks", "123456", "123456");
      sslContextSupport.setProtocol("TLSv1.2");
      DefaultTcpNetSSLSocketFactorySupport tcpSocketFactorySupport = new DefaultTcpNetSSLSocketFactorySupport(sslContextSupport);
      return tcpSocketFactorySupport;
   }

   @Bean
   public static MessageChannel getResponseChannel() {
      DirectChannel directChannel = new DirectChannel();
      directChannel.setComponentName("getResponseChannel");
      directChannel.setLoggingEnabled(true);
      return directChannel;
   }

   @Bean
   public static MessageChannel getInputMessageChannel() {
      DirectChannel directChannel = new DirectChannel();
      directChannel.setComponentName("inputMessageChannel");
      directChannel.setLoggingEnabled(true);
      return directChannel;
   }

   @Bean
   public MessageChannel invokeChannel() {
      return new DirectChannel();
   }

   @Bean  
   public TcpReceivingChannelAdapter in(AbstractServerConnectionFactory connectionFactory) {
      TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter();
      adapter.setOutputChannel(getInputMessageChannel());
      adapter.setConnectionFactory(connectionFactory);
      adapter.setSendTimeout(5000);
      return adapter;
   }

   @ServiceActivator(inputChannel="toClientChannel")
   @Bean
   public TcpSendingMessageHandler out(AbstractServerConnectionFactory connectionFactory) {
      TcpSendingMessageHandler tcpSendingMessageHandler = new TcpSendingMessageHandler();
      tcpSendingMessageHandler.setConnectionFactory(connectionFactory);
      tcpSendingMessageHandler.setLoggingEnabled(true);
      return tcpSendingMessageHandler;
   }

   @Transformer(inputChannel = "invokeChannel", outputChannel = "toClientChannel")
   public Message<String> headerBeforeSend(String message) throws Exception {
      log.debug("send message to socket: {}", message);
      Map.Entry<String, TcpConnection> connectionEntry = GetConnectionEntry();
      log.debug("connection id is: {}", connectionEntry.getKey());
      return MessageBuilder.withPayload(message)
         .setHeader(IpHeaders.CONNECTION_ID,connectionEntry.getKey())
         .build();
   }

   private static ConcurrentHashMap<String, TcpConnection> tcpConnections = new ConcurrentHashMap<>();

   @Override
   public void onApplicationEvent(TcpConnectionEvent tcpEvent) {
      TcpConnection source = (TcpConnection) tcpEvent.getSource();
      if (tcpEvent instanceof TcpConnectionOpenEvent) {
         log.info("Socket Opened " + source.getConnectionId());
         tcpConnections.put(tcpEvent.getConnectionId(), source);
      } else if (tcpEvent instanceof TcpConnectionCloseEvent) {
         log.info("Socket Closed " + source.getConnectionId());
         if(tcpConnections.containsKey(source.getConnectionId()))
            tcpConnections.remove(source.getConnectionId());
      } else if (tcpEvent instanceof TcpConnectionExceptionEvent) {
         log.error("Error {}",tcpEvent.getCause().getMessage());
         if(tcpConnections.containsKey(source.getConnectionId()))
            tcpConnections.remove(source.getConnectionId());
      }
   }
}

Controller.java

@RestController
@ControllerAdvice
@RequestMapping("/socket")
public class SocketController {
   private final Logger log = LoggerFactory.getLogger(getClass());

   @Inject
   MessageChannel invokeChannel;

   @LogAspect
   @PostMapping
   public ResponseEntity sendMessage(@RequestBody SendMessageRequest request) throws Exception {
      log.debug("Message is {}",request.get_message());
      String msg = "Some test message";
      MessagingTemplate template = new MessagingTemplate();
      template.send(invokeChannel, new GenericMessage<>(msg));
      return new ResponseEntity(HttpStatus.OK);
   } 
}

Can you please check my configuration and direct me to the right setup?

Thanks

1

There are 1 answers

7
Artem Bilan On

You should be sure that you send and receive messages properly serialized and deserialized. By default it is ByteArrayCrlfSerializer which is based on the \r\n message terminator: https://docs.spring.io/spring-integration/docs/4.3.11.RELEASE/reference/html/ip.html#connection-factories