TcpOutboundGateway - Cannot correlate response - no pending reply
-
21-12-2019 - |
Question
I'm using spring integration to create TCP server and also to test if it works with junit. The problem is that i'm receiving an error: org.springframework.integration.ip.tcp.TcpOutboundGateway - Cannot correlate response - no pending reply. Please help me to fix it. Here is more info. I have unit test that send some data to server, server has to reply with "success" on each portion of data. but after second portion of read data, TcpOutboundGateway (on unit test side) write error to log.
So Server configuration file:
<int-ip:tcp-connection-factory id="crLfServer"
type="server"
port="5000"
single-use="false"
so-timeout="10000"
/>
<task:executor id="pool" pool-size="16"/>
<int-ip:tcp-inbound-gateway id="gatewayCrLf"
connection-factory="crLfServer"
request-channel="serverBytes2StringChannel"
error-channel="errorChannel"/>
<int:channel id="toSA" >
<int:dispatcher task-executor="pool" />
</int:channel>
<int:service-activator input-channel="toSA"
ref="connectionHandler"
method="handleData" />
<bean id="connectionHandler" class="com.pc.tracker.utils.ConnectionHandler" />
<bean id="objectMapper" class="org.codehaus.jackson.map.ObjectMapper"/>
<int:transformer id="serverBytes2String"
input-channel="serverBytes2StringChannel"
output-channel="toSA"
expression="new String(payload).trim()"/>
<int:transformer id="errorHandler"
input-channel="errorChannel"
expression="payload.failedMessage.payload + ':' + payload.cause.message"/>
Client configuration file:
<int:gateway id="gw"
service-interface="com.pc.tracker.tcp.ConnectionHandlerTestHellperGateway"
default-request-channel="input"/>
<int-ip:tcp-connection-factory id="client"
type="client"
host="localhost"
port="5000"
single-use="false"
so-timeout="10000"/>
<int:channel id="input" />
<int-ip:tcp-outbound-gateway id="outGateway"
request-channel="input"
reply-channel="clientBytes2StringChannel"
connection-factory="client"
request-timeout="10000"
reply-timeout="10000"/>
<int:transformer id="clientBytes2String"
input-channel="clientBytes2StringChannel"
expression="new String(payload)"/>
and test that produce already mentioned error.
@Test
public void testRecivedBackupedData() {
String testData =
"[{\"s\":1,\"t\":0,\"b\":0,\"sp\":0,\"long\":0,\"sa\":0,\"lat\":0,\"a\":0,\"i\":\"device_for_unit_tests\"},"
+ "{\"s\":12,\"t\":0,\"b\":0,\"sp\":0,\"long\":0,\"sa\":0,\"lat\":0,\"a\":0,\"i\":\"device_for_unit_tests\"}]\r\n"
+ "[{\"s\":13,\"t\":0,\"b\":0,\"sp\":0,\"long\":0,\"sa\":0,\"lat\":0,\"a\":0,\"i\":\"device_for_unit_tests\"},"
+ "{\"s\":24,\"t\":0,\"b\":0,\"sp\":0,\"long\":0,\"sa\":0,\"lat\":0,\"a\":0,\"i\":\"device_for_unit_tests\"}]\r\n"
+ "[{\"s\":1,\"t\":0,\"b\":0,\"sp\":0,\"long\":0,\"sa\":0,\"lat\":0,\"a\":0,\"i\":\"device_for_unit_tests\"},"
+ "{\"s\":12,\"t\":0,\"b\":0,\"sp\":0,\"long\":0,\"sa\":0,\"lat\":0,\"a\":0,\"i\":\"device_for_unit_tests\"}]\r\n"
+"[{\"s\":1,\"t\":0,\"b\":0,\"sp\":0,\"long\":0,\"sa\":0,\"lat\":0,\"a\":0,\"i\":\"device_for_unit_tests\"},"
+ "{\"s\":12,\"t\":0,\"b\":0,\"sp\":0,\"long\":0,\"sa\":0,\"lat\":0,\"a\":0,\"i\":\"device_for_unit_tests\"}]\r\n";
String result = gw.send(testData);
here is log with error
2014-04-11 23:12:23,059 [pool-1] ERROR com.pc.tracker.utils.ConnectionHandler - recived data: [{"s":1,"t":0,"b":0,"sp":0,"long":0,"sa":0,"lat":0,"a":0,"i":"device_for_unit_tests"},{"s":12,"t":0,"b":0,"sp":0,"long":0,"sa":0,"lat":0,"a":0,"i":"device_for_unit_tests"}]
2014-04-11 23:12:23,263 [pool-1] ERROR com.pc.tracker.utils.ConnectionHandler - parsisted
2014-04-11 23:12:23,265 [pool-2] ERROR com.pc.tracker.utils.ConnectionHandler - recived data: [{"s":13,"t":0,"b":0,"sp":0,"long":0,"sa":0,"lat":0,"a":0,"i":"device_for_unit_tests"},{"s":24,"t":0,"b":0,"sp":0,"long":0,"sa":0,"lat":0,"a":0,"i":"device_for_unit_tests"}]
2014-04-11 23:12:23,330 [pool-2] ERROR com.pc.tracker.utils.ConnectionHandler - parsisted
2014-04-11 23:12:23,331 [pool-2-thread-1] ERROR org.springframework.integration.ip.tcp.TcpOutboundGateway - Cannot correlate response - no pending reply
2014-04-11 23:12:23,332 [pool-3] ERROR com.pc.tracker.utils.ConnectionHandler - recived data: [{"s":1,"t":0,"b":0,"sp":0,"long":0,"sa":0,"lat":0,"a":0,"i":"device_for_unit_tests"},{"s":12,"t":0,"b":0,"sp":0,"long":0,"sa":0,"lat":0,"a":0,"i":"device_for_unit_tests"}]
2014-04-11 23:12:23,409 [pool-3] ERROR com.pc.tracker.utils.ConnectionHandler - parsisted
2014-04-11 23:12:23,409 [pool-2-thread-1] ERROR org.springframework.integration.ip.tcp.TcpOutboundGateway - Cannot correlate response - no pending reply
2014-04-11 23:12:23,410 [pool-4] ERROR com.pc.tracker.utils.ConnectionHandler - recived data: [{"s":1,"t":0,"b":0,"sp":0,"long":0,"sa":0,"lat":0,"a":0,"i":"device_for_unit_tests"},{"s":12,"t":0,"b":0,"sp":0,"long":0,"sa":0,"lat":0,"a":0,"i":"device_for_unit_tests"}]
2014-04-11 23:12:23,487 [pool-4] ERROR com.pc.tracker.utils.ConnectionHandler - parsisted
2014-04-11 23:12:23,488 [pool-2-thread-1] ERROR org.springframework.integration.ip.tcp.TcpOutboundGateway - Cannot correlate response - no pending reply
2014-04-11 23:12:23,489 [pool-5] ERROR com.pc.tracker.utils.ConnectionHandler - recived data:
2014-04-11 23:12:23,490 [pool-2-thread-1] ERROR org.springframework.integration.ip.tcp.TcpOutboundGateway - Cannot correlate response - no pending reply
I've spent two days solving the problem. sorry for long question. Thanks.
La solution
You are sending data with embedded \r\n
...
public void testRecivedBackupedData() {
String testData =
"[{\"s\":1,\"t\":0,\"b\":0,\"sp\":0,\"long\":0,\"sa\":0,\"lat\":0,\"a\":0,\"i\":\"device_for_unit_tests\"},"
+ "{\"s\":12,\"t\":0,\"b\":0,\"sp\":0,\"long\":0,\"sa\":0,\"lat\":0,\"a\":0,\"i\":\"device_for_unit_tests\"}]\r\n"
+ "[{\"s\":13,\"t\":0,\"b\":0,\"sp\":0,\"long\":0,\"sa\":0,\"lat\":0,\"a\":0,\"i\":\"device_for_unit_tests\"},"
+ "{\"s\":24,\"t\":0,\"b\":0,\"sp\":0,\"long\":0,\"sa\":0,\"lat\":0,\"a\":0,\"i\":\"device_for_unit_tests\"}]\r\n"
+ "[{\"s\":1,\"t\":0,\"b\":0,\"sp\":0,\"long\":0,\"sa\":0,\"lat\":0,\"a\":0,\"i\":\"device_for_unit_tests\"},"
+ "{\"s\":12,\"t\":0,\"b\":0,\"sp\":0,\"long\":0,\"sa\":0,\"lat\":0,\"a\":0,\"i\":\"device_for_unit_tests\"}]\r\n"
+"[{\"s\":1,\"t\":0,\"b\":0,\"sp\":0,\"long\":0,\"sa\":0,\"lat\":0,\"a\":0,\"i\":\"device_for_unit_tests\"},"
+ "{\"s\":12,\"t\":0,\"b\":0,\"sp\":0,\"long\":0,\"sa\":0,\"lat\":0,\"a\":0,\"i\":\"device_for_unit_tests\"}]\r\n";
String result = gw.send(testData);
Remove them all; the framework will add one at the end.
Your server is sending multiple replies on the socket for one request. Search for connection id localhost:5000:51420:9f93417a-c879-4753-a61e-0b9485940d14
. You will see that you sent your request at
2014-04-12 11:27:11,307
The reply is received at
2014-04-12 11:27:11,340
(onMessage() call) and sent to the reply channel at
2014-04-12 11:27:11,343
the next activity on that socket was the reception of another message
2014-04-12 11:27:11,346
for which there was nobody waiting hence the error message.
Now, looking at the connection id, we can see that the remote socket is 51420
, so let's take a look on the server side...
The corresponding connection id on the server is localhost:51420:5000:66862ff3-3f40-4291-938f-0694bf3727be
. The reply success
to the original request was sent at
2014-04-12 11:27:11,339
However, that same thread reads another message (without another send) - notice the message Available to read:525
.
So, the bottom line is you are using the default (de)serializer which expects the message to be terminated with \r\n
but you are sending requests that have \r\n
embedded in them so the server side is "seeing" multiple requests when the sender only sent one message.
TCP is a streaming protocol - you need to frame your data somehow so the server side knows when a message is complete. If you need to send data that contains \r\n
you will need to use a different deserializer to detect the end of the data (perhaps the length header implementation, which is the most efficient).
The available standard serializers and information about customizing them are documented here.