Mule ESB 3.4.0CE Sending message to multiple outbound endpoints that are added/removed dynamically

StackOverflow https://stackoverflow.com/questions/22973631

  •  30-06-2023
  •  | 
  •  

Question

I am working on a part of a Mule ESB application and the issue I am having is very analogous to a chat room.

The chat room (server, Mule ESB) can listen for many chatters (clients, external applications) and when a single chatter (c1) inputs a message, the server echos it to all the other chatters, except for c1. In my case though, I would need to also echo it to c1.

I am having a hard time wrapping my mind around some detailed aspects of how the message processing works in Mule ESB. I read the documentation about transports and believe I may need to work with Message Receivers and Message Dispatchers.

Consider the following configuration:

<mule ...>
       <tcp:connector name="tcpConn" doc:name="TCP connector" clientSoTimeout="0" keepAlive="true" keepSendSocketOpen="true" receiveBacklog="0" receiveBufferSize="0" reuseAddress="true" sendBufferSize="0" serverSoTimeout="0" socketSoLinger="0" validateConnections="true">
            <reconnect-forever />
            <tcp:direct-protocol payloadOnly="true"/>
        </tcp:connector>
        </spring:beans>
        <tcp:endpoint exchange-pattern="request-response" host="localhost" port="8090" name="ListenEndpoint" responseTimeout="10000" doc:name="TCP"/>
        <flow name="ChatroomExampleFlow1" doc:name="ChatroomExampleFlow1">
            <tcp:inbound-endpoint exchange-pattern="request-response" ref="ListenEndpoint" responseTimeout="10000" doc:name="Client Message"/>
            <logger level="INFO" doc:name="Logger"/>
            <all doc:name="All">
                <tcp:outbound-endpoint host="${client1.host}" port="${client1.port}" responseTimeout="10000" doc:name="TCP"/>
                <tcp:outbound-endpoint host="${client2.host}" port="${client2.port}" responseTimeout="10000" doc:name="TCP"/>
                <tcp:outbound-endpoint host="${client3.host}" port="${client3.port}" responseTimeout="10000" doc:name="TCP"/>
            </all>

        </flow>
</mule>

If I know at configuration time the host and port of all clients, then this configuration holds. However, my goal is to be able to send out the messages to clients that have established a connection, which could be up to n clients. As more clients connect/disconnect, I would need to add or remove them from the list.

There exists a <recipient-list expression="#[app.registry.remoteEndpoints]"/> where the expression should evaluate to a List of endpoints and the flow will send the message to every endpoint in the collection. However, for my purposes this does not work (see answer below). See the documentation for recipient list

<mule ...>
    <tcp:connector name="tcpConn" doc:name="TCP connector" clientSoTimeout="0" keepAlive="true" keepSendSocketOpen="true" receiveBacklog="0" receiveBufferSize="0" reuseAddress="true" sendBufferSize="0" serverSoTimeout="0" socketSoLinger="0" validateConnections="true">
        <reconnect-forever />
        <tcp:direct-protocol payloadOnly="true"/>
    </tcp:connector>
    <spring:beans>
        <spring:bean name="remoteEndpoints" scope="singleton" class="java.util.ArrayList"/>
    </spring:beans>
    <tcp:endpoint  host="localhost" exchange-pattern="one-way" port="8090" name="ListenEndpoint" responseTimeout="10000" doc:name="TCP"/>
    <flow name="ChatroomExampleFlow1" doc:name="ChatroomExampleFlow1">
        <tcp:inbound-endpoint  ref="ListenEndpoint" responseTimeout="10000" doc:name="Client Message"/>
        <echo-component doc:name="Echo"/>
        <component class="EndpointStorer" doc:name="Java"/>
        <recipient-list expression="#[app.registry.remoteEndpoints]"/>
    </flow>
</mule>

And with EndpointStorer looking like this:

public Object onCall(MuleEventContext eventContext) throws Exception
    {
        Registry registry = eventContext.getMuleContext().getRegistry();


        MuleMessage message = eventContext.getMessage();
        ArrayList arrList = (ArrayList) registry.get("remoteEndpoints"); 
        Object remoteAddress = message.getOutboundProperty("MULE_REMOTE_CLIENT_ADDRESS");

        String tcpAddr = "tcp://" + remoteAddress.toString().substring(1); //substring 1 because we see the remote client address as something like /127.0.0.1:4943
        if (!arrList.contains(tcpAddr))
        {
            arrList.add(tcpAddr);
        }

        return message;
    }
Was it helpful?

Solution

You can't reach established inbound TCP connections, at least not using TCP outbound endpoints. It may be possible to extend the TCP connector to expose the Socket in http://github.com/mulesoft/mule/blob/mule-3.x/transports/tcp/src/main/… but it's no small feat.

You could consider using the AJAX transport.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top