문제

I have a Java application setup which has multiple memcached server nodes communicating with a spymemcached client.

I want to know if it is possible to add or remove server nodes at runtime, without disturbing all the existing cache nodes (I know some nodes should be changed).

Here is what i know (or understand):

It is possible to set custom hashing algorithm in DefaultConnectionFactory, which helps us to use Consistent Hashing or even use the built in KetamaConnectionFactory.

So we should be able to add or remove nodes with changes to just one or few existing nodes.

Is it possible using spymemcached?

if it is, then how?

can anyone please point me in the right direction?

도움이 되었습니까?

해결책

It looks like NodeLocator.updateLocator(List<MemcachedNode> newNodes) should do the job.

But to get connected MemcachedNode is a bit hard. You have to override MemcachedClient, MemcachedConnection and DefaultConnectionFactory.

It reasonable you would like to add or remove clients from MemcachedClient so you add remove(MemcachedNode node) and add(MemcachedNode node) methods.

In case of remove you should disconnect the node (see MemcachedConnection.shutdown()) and remove it from NodeLocator.getAll() and call NodeLocator.updateLocator(List<MemcachedNode> newNodes).

In case of add you should connect the node via MemcachedConnection.createConnections(final Collection a), merge it with NodeLocator.getAll() and call NodeLocator.updateLocator(List<MemcachedNode> newNodes).

Well, I have never try it so it is possible it won't work. So good luck!

ExtMemCachedConnection.java public class ExtMemCachedConnection extends MemcachedConnection {

  protected final OperationFactory opFact;

  /**
   * Construct a memcached connection.
   *
   * @param bufSize the size of the buffer used for reading from the server
   * @param f       the factory that will provide an operation queue
   * @param a       the addresses of the servers to connect to
   * @throws java.io.IOException if a connection attempt fails early
   */
  public ExtendableMemcachedConnection(int bufSize, ConnectionFactory f,
                                       List<InetSocketAddress> a,
                                       Collection<ConnectionObserver> obs,
                                       FailureMode fm, OperationFactory opfactory)
      throws IOException {
    super(bufSize, f, a, obs, fm, opfactory);
    this.opFact = opfactory;
  }

  public void add(InetSocketAddress nodeAddress) throws IOException {
    final List<InetSocketAddress> nodeToAdd = new ArrayList<InetSocketAddress>(1);
    nodeToAdd.add(nodeAddress);
    List<MemcachedNode> newNodesList = createConnections(nodeToAdd);
    newNodesList.addAll(getLocator().getAll());
    getLocator().updateLocator(newNodesList);
  }

  //The node should be obtain from locator to ensure currentNode.equals(node) will return true
  public void remove(MemcachedNode node) throws IOException {
    for(MemcachedNode currentNode : getLocator().getAll()) {
      if(currentNode.equals(node)) {
        Collection<Operation> notCompletedOperations = currentNode.destroyInputQueue();
        if (currentNode.getChannel() != null) {
          currentNode.getChannel().close();
          currentNode.setSk(null);
          if (currentNode.getBytesRemainingToWrite() > 0) {
            getLogger().warn("Shut down with %d bytes remaining to write",
                             currentNode.getBytesRemainingToWrite());
          }
          getLogger().debug("Shut down channel %s", currentNode.getChannel());
        }
        //Unfortunatelly,  redistributeOperations is private so it cannot be used or override. I put copy/paste the implementation
        redistributeOperations(notCompletedOperations);
      }
    }
  }

  protected void redistributeOperations(Collection<Operation> ops) {
    for (Operation op : ops) {
      if (op.isCancelled() || op.isTimedOut()) {
        continue;
      }
      if (op instanceof KeyedOperation) {
        KeyedOperation ko = (KeyedOperation) op;
        int added = 0;
        for (String k : ko.getKeys()) {
          for (Operation newop : opFact.clone(ko)) {
            addOperation(k, newop);
            added++;
          }
        }
        assert added > 0 : "Didn't add any new operations when redistributing";
      } else {
        // Cancel things that don't have definite targets.
        op.cancel();
      }
    }
  }


}

ExtMemcachedClient.java

  public void add(InetSocketAddress nodeAddress) {
    if(mconn instanceof ExtMemcachedConnection) {
      ((ExtMemcachedConnection)mconn).add(nodeAddress);  
    }
  }

  public boolean remove(MemcachedNode node) {
    if(mconn instanceof ExtMemcachedConnection) {
      ((ExtMemcachedConnection)mconn).remove(nodeAddress);
    }
  }

ExtMemcachedConnectionfactory.java

  @Override
  public MemcachedConnection createConnection(List<InetSocketAddress> addrs) throws IOException {
    return new ExtendableMemcachedConnection(getReadBufSize(), this, addrs,
                                             getInitialObservers(), getFailureMode(), getOperationFactory());
  }

다른 팁

This question was asked a long time ago and for spymemcached, but I think this answer might be helpful for others looking for alternatives. There is another memcached client called xmemcached. This client supports dynamically adding/removing memcached servers.

https://code.google.com/p/xmemcached/

https://github.com/killme2008/xmemcached

memcachedClient.addServer(host, port);

memcachedClient.removeServer(host);
라이센스 : CC-BY-SA ~와 함께 속성
제휴하지 않습니다 StackOverflow
scroll top