سؤال

I am using memcached version 1.4.7 and spymemcached 2.8.4 as a client to set and get the key values to it. When used in multi-threaded and high load environment spymemcached client is unable to set the values in cache itself.

I am running my load test program with 40M long key which are equally divided in 20 worker threads. Each worker thread tries to set 1M keys in cache. Hence there are 40 worker threads running.

In my DefaultCache.java file, I have made a connection pool of 20 spymemcached clients. Every time a worker thread tries to set the key to cache DefaultCache.java returns it a random client as shown in getCache() method.

When my program exits, it prints

Total no of keys loaded = 40000000

However when I go to memcached telnet console, it always misses few thousands records. I have also verified it by randomly fetching few keys which output null. There is no eviction and the cmd_set, curr_items, total_items are each equal to 39.5M

What could be the reason behind these missing keys in cache.

Here is the code for reference purpose.

public class TestCacheLoader { 
public static final Long TOTAL_RECORDS = 40000000L;
public static final Long LIMIT = 1000000L;

public static void main(String[] args) {
    long keyCount = loadKeyCacheData();
    System.out.println("Total no of keys loaded  = " + keyCount);
}

public static long loadKeyCacheData() {
    DefaultCache cache = new DefaultCache();
    List<Future<Long>> futureList = new ArrayList<Future<Long>>();
    ExecutorService executorThread = Executors.newFixedThreadPool(40);
    long offset = 0;
    long keyCount = 0;
    long workerCount = 0;
    try {
        do {
            List<Long> keyList = new ArrayList<Long>(LIMIT.intValue());
            for (long counter = offset; counter < (offset + LIMIT) && counter < TOTAL_RECORDS; counter++) {
                keyList.add(counter);
            }
            if (keyList.size() != 0) {
                System.out.println("Initiating a new worker thread " + workerCount++);
                KeyCacheThread keyCacheThread = new KeyCacheThread(keyList, cache);
                futureList.add(executorThread.submit(keyCacheThread));
            }
            offset += LIMIT;
        } while (offset < TOTAL_RECORDS);
        for (Future<Long> future : futureList) {
            keyCount += (Long) future.get();
        }
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        cache.shutdown();
    }
    return keyCount;
}

}

class KeyCacheThread implements Callable<Long> {
private List<Long> keyList;
private DefaultCache cache;

public KeyCacheThread(List<Long> keyList, DefaultCache cache) {
    this.keyList = keyList;
    this.cache = cache;
}

public Long call() {
    return createKeyCache();
}

public Long createKeyCache() {
    String compoundKey = "";
    long keyCounter = 0;
    System.out.println(Thread.currentThread() + " started to process " + keyList.size() + " keys");
    for (Long key : keyList) {
        keyCounter++;
        compoundKey = key.toString();
        cache.set(compoundKey, 0, key);
    }
    System.out.println(Thread.currentThread() + " processed = " + keyCounter + " keys");
    return keyCounter;
}

}

public class DefaultCache {
private static final Logger LOGGER = Logger.getLogger(DefaultCache.class);

private MemcachedClient[] clients;

public DefaultCache() {
    this.cacheNamespace = "";
    this.cacheName = "keyCache";
    this.addresses = "127.0.0.1:11211";
    this.cacheLookupTimeout = 3000;
    this.numberOfClients = 20;

    try {
        LOGGER.debug("Cache initialization started for the cache : " + cacheName);
        ConnectionFactory connectionFactory = new DefaultConnectionFactory(DefaultConnectionFactory.DEFAULT_OP_QUEUE_LEN,
                DefaultConnectionFactory.DEFAULT_READ_BUFFER_SIZE, DefaultHashAlgorithm.KETAMA_HASH) {
            public NodeLocator createLocator(List<MemcachedNode> list) {
                KetamaNodeLocator locator = new KetamaNodeLocator(list, DefaultHashAlgorithm.KETAMA_HASH);
                return locator;
            }
        };

        clients = new MemcachedClient[numberOfClients];

        for (int i = 0; i < numberOfClients; i++) {
            MemcachedClient client = new MemcachedClient(connectionFactory, AddrUtil.getAddresses(getServerAddresses(addresses)));
            clients[i] = client;
        }
        LOGGER.debug("Cache initialization ended for the cache : " + cacheName);
    } catch (IOException e) {
        LOGGER.error("Exception occured while initializing cache : " + cacheName, e);
        throw new CacheException("Exception occured while initializing cache : " + cacheName, e);
    }
}

public Object get(String key) {
    try {
        return getCache().get(cacheNamespace + key);
    } catch (Exception e) {
        return null;
    }
}

public void set(String key, Integer expiryTime, final Object value) {
    getCache().set(cacheNamespace + key, expiryTime, value);
}

public Object delete(String key) {
    return getCache().delete(cacheNamespace + key);
}

public void shutdown() {
    for (MemcachedClient client : clients) {
        client.shutdown();
    }
}

public void flush() {
    for (MemcachedClient client : clients) {
        client.flush();
    }
}

private MemcachedClient getCache() {
    MemcachedClient client = null;
    int i = (int) (Math.random() * numberOfClients);
    client = clients[i];
    return client;
}

private String getServerAddresses(List<Address> addresses) {
    StringBuilder addressStr = new StringBuilder();
    for (Address address : addresses) {
        addressStr.append(address.getHost()).append(":").append(address.getPort()).append(" ");
    }
    return addressStr.toString().trim();
}

}

هل كانت مفيدة؟

المحلول 2

I am not sure but it seems the issue with spymemcached library itself. I changed the implemention of DefaultCache.java file to use xmemcached and everything started working fine. Now I am not missing any records. telnet stats are showing matching number of set commands.

Thanks for your patience though.

نصائح أخرى

I saw the same. The reason is reactor pattern that they use for async operations. That means 1 worker thread per 1 connection. This 1 thread is a bootleneck under high load and multi threaded machines. 1 thread can load only 1 CPU while rest 23 will be idle.

We have come up with pool of connections that increased the worker threads and allowed to utilize more hardware power. Check out project 3levelmemcache at github.

مرخصة بموجب: CC-BY-SA مع الإسناد
لا تنتمي إلى StackOverflow
scroll top