Question

Task definition: I need to test custom concurrent collection or some container which manipulates with collections in concurrent environment. More precisely - I've read-API and write-API. I should test if there is any scenarios where I can get inconsistent data.

Problem: All concurrent test frameworks (like MultiThreadedTC, look at MultiThreadedTc section of my question) just provides you an ability to control the asynchronous code execution sequence. I mean you should suppose a critical scenarios by your own.

Broad question: Is there frameworks that can take annotations like @SharedResource, @readAPI, @writeAPI and check if your data will always be consistent? Is that impossible or I just leak a startup idea?

Annotation: If there is no such framework, but you find the idea attractive, you are welcome to contact me or propose your ideas.

Narrow question: I'm new in concurrency. So can you suggest which scenarios should I test in the code below? (look at PeerContainer class)

PeerContainer:

public class PeersContainer {

    public class DaemonThreadFactory implements ThreadFactory {

        private int counter = 1;
        private final String prefix = "Daemon";

        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r, prefix + "-" + counter);
            thread.setDaemon(true);
            counter++;
            return thread;
        }
    }

    private static class CacheCleaner implements Runnable {

        private final Cache<Long, BlockingDeque<Peer>> cache;

        public CacheCleaner(Cache<Long, BlockingDeque<Peer>> cache) {
            this.cache = cache;
            Thread.currentThread().setDaemon(true);
        }

        @Override
        public void run() {
            cache.cleanUp();
        }
    }

    private final static int MAX_CACHE_SIZE = 100;
    private final static int STRIPES_AMOUNT = 10;
    private final static int PEER_ACCESS_TIMEOUT_MIN = 30;
    private final static int CACHE_CLEAN_FREQUENCY_MIN = 1;

    private final static PeersContainer INSTANCE;

    private final Cache<Long, BlockingDeque<Peer>> peers = CacheBuilder.newBuilder()
            .maximumSize(MAX_CACHE_SIZE)
            .expireAfterWrite(PEER_ACCESS_TIMEOUT_MIN, TimeUnit.MINUTES)
            .removalListener(new RemovalListener<Long, BlockingDeque<Peer>>() {
                public void onRemoval(RemovalNotification<Long, BlockingDeque<Peer>> removal) {
                    if (removal.getCause() == RemovalCause.EXPIRED) {
                        for (Peer peer : removal.getValue()) {
                            peer.sendLogoutResponse(peer);
                        }
                    }
                }
            })
            .build();
    private final Striped<Lock> stripes = Striped.lock(STRIPES_AMOUNT);
    private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1, new DaemonThreadFactory());

    private PeersContainer() {
        scheduledExecutorService.schedule(new CacheCleaner(peers), CACHE_CLEAN_FREQUENCY_MIN, TimeUnit.MINUTES);
    }

    static {
        INSTANCE = new PeersContainer();
    }

    public static PeersContainer getInstance() {
        return INSTANCE;
    }

    private final Cache<Long, UserAuthorities> authToRestore = CacheBuilder.newBuilder()
            .maximumSize(MAX_CACHE_SIZE)
            .expireAfterWrite(PEER_ACCESS_TIMEOUT_MIN, TimeUnit.MINUTES)
            .build();

    public Collection<Peer> getPeers(long sessionId) {
        return Collections.unmodifiableCollection(peers.getIfPresent(sessionId));
    }

    public Collection<Peer> getAllPeers() {
        BlockingDeque<Peer> result = new LinkedBlockingDeque<Peer>();
        for (BlockingDeque<Peer> deque : peers.asMap().values()) {
            result.addAll(deque);
        }
        return Collections.unmodifiableCollection(result);
    }

    public boolean addPeer(Peer peer) {
        long key = peer.getSessionId();
        Lock lock = stripes.get(key);
        lock.lock();
        try {
            BlockingDeque<Peer> userPeers = peers.getIfPresent(key);
            if (userPeers == null) {
                userPeers = new LinkedBlockingDeque<Peer>();
                peers.put(key, userPeers);
            }
            UserAuthorities authorities = restoreSession(key);
            if (authorities != null) {
                peer.setAuthorities(authorities);
            }
            return userPeers.offer(peer);
        } finally {
            lock.unlock();
        }
    }

    public void removePeer(Peer peer) {
        long sessionId = peer.getSessionId();
        Lock lock = stripes.get(sessionId);
        lock.lock();
        try {
            BlockingDeque<Peer> userPeers = peers.getIfPresent(sessionId);
            if (userPeers != null && !userPeers.isEmpty()) {
                UserAuthorities authorities = userPeers.getFirst().getAuthorities();
                authToRestore.put(sessionId, authorities);
                userPeers.remove(peer);
            }
        } finally {
            lock.unlock();
        }
    }

     void removePeers(long sessionId) {
        Lock lock = stripes.get(sessionId);
        lock.lock();
        try {
            peers.invalidate(sessionId);
            authToRestore.invalidate(sessionId);
        } finally {
            lock.unlock();
        }
    }

    private UserAuthorities restoreSession(long sessionId) {
        BlockingDeque<Peer> activePeers = peers.getIfPresent(sessionId);
        return (activePeers != null && !activePeers.isEmpty()) ? activePeers.getFirst().getAuthorities() : authToRestore.getIfPresent(sessionId);
    }

    public void resetAccessedTimeout(long sessionId) {
        Lock lock = stripes.get(sessionId);
        lock.lock();
        try {
            BlockingDeque<Peer> deque = peers.getIfPresent(sessionId);
            peers.invalidate(sessionId);
            peers.put(sessionId, deque);
        } finally {
            lock.unlock();
        }
    }
}

MultiThreadedTC test case sample: [optional section of question]

public class ProducerConsumerTest extends MultithreadedTestCase {
    private LinkedTransferQueue<String> queue;

    @Override
    public void initialize() {
        super.initialize();
        queue = new LinkedTransferQueue<String>();
    }

    public void thread1() throws InterruptedException {
        String ret = queue.take();
    }

    public void thread2() throws InterruptedException {
        waitForTick(1);
        String ret = queue.take();
    }

    public void thread3() {
        waitForTick(1);
        waitForTick(2);
        queue.put("Event 1");
        queue.put("Event 2");
    }

    @Override
    public void finish() {
        super.finish();
        assertEquals(true, queue.size() == 0);
    }
}
Was it helpful?

Solution

Sounds like a job for static analysis, not testing, unless you have time to run multiple trillions of test cases. You pretty much can't test multithreaded behaviour - test behaviour in a single thread, then prove the abscence of threading bugs.

Try:

http://www.contemplateltd.com/threadsafe

http://checkthread.org/

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top