Question

I want to implement single producer ..multiple consumer problem using Spring SubscribableChannel . I am not able to solve this problem.

For example i have a user as a producer and his friends as consumers. So when the producer user does something it must be published to his friends.

The real problem is the the producer and consumer relationships is dynamic because at any time i can unfriend a user that means it should not any more listen to my activities. So i want some kind of java config where i can subscribe and unsubscribe consumers from producers also there must be something which can detach consumers from producers in case the producers don't exist anymore...

I am having difficulty in designing the channel because there will be number of channel equal to number of users. Also I am not sure how to design the MessageHandler which comes with Spring Integration.

Are there some examples which points in this direction.

What i did so far is now is I have a config bean

@Configuration
public class ChannelConfig {

    @Bean
    public SubscribableChannel subscribeChannel() {
        PublishSubscribeChannel channel = new PublishSubscribeChannel(getAsyncExecutor());
        return channel;
    }

    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(2);
        executor.setMaxPoolSize(50);
        executor.setQueueCapacity(10000);
        executor.setThreadNamePrefix("ChannelExecutor-");
        executor.initialize();
        return executor;
    }
}

which i will inject but than this will be the only sole channel. i want a channel for every user.

Was it helpful?

Solution 2

I am doing something like this

public class ChannelCreator {

    private Map<String , SubscribableChannel> channels;
    ThreadPoolTaskExecutor executor;

    public ChannelCreator() {
        channels = new ConcurrentHashMap<String, SubscribableChannel>();
        prepareAsyncExecutor();
    }

    public void createChannel(String login) {
        PublishSubscribeChannel channel = new PublishSubscribeChannel(executor);
        channels.put(login, channel);
    }

    public SubscribableChannel getChannel(String login){
        SubscribableChannel channel = channels.get(login);
        if(channel == null){
            channel = new PublishSubscribeChannel(executor);
            channels.put(login, channel);
        }
        return channel;
    }

    public void removeChannel(String login){
        channels.remove(login);
    }

    public Executor prepareAsyncExecutor() {
        executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(2);
        executor.setMaxPoolSize(50);
        executor.setQueueCapacity(10000);
        executor.setThreadNamePrefix("ChannelExecutor-");
        executor.initialize();
        return executor;
    }
}

and

@Configuration
public class ChannelConfig {

    @Bean
    public ChannelCreator channelCreator() {
        ChannelCreator creator = new ChannelCreator();
        return creator;
    }
}

OTHER TIPS

Why do you want a separate channel for every user? That's the whole point of a pub/sub channel; simply subscribe() a new MessageHandler for each user to that channel. Then any messages sent to that channel will be dispatched to every consumer.

EDIT: (based on comment below).

In that case then instead of declaring the channel as a @Bean simply instantiate it programmatically (or set its scope to prototype and get a new instance) for each producer and subscribe his "friends".

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top