Question

So my general question is "Is it possible to have an Accumulo BatchScanner only pull back the first result per Range I give it?"

Now some details about my use case as there may be a better way to approach this anyway. I have data that represent messages from different systems. There can be different types of messages. My users want to be able to ask the system questions, such as "give me the most recent message of a certain type as of a certain time for all these systems".

My table layout looks like this

  rowid: system_name, family: message_type, qualifier: masked_timestamp, value: message_text

The idea is that the user gives me a list of systems they care about, the type of message, and a certain timestamp. I used masked timestamp so that the table sorts most recent first. That way when I scan for a timestamp, the first result is the most recent prior to that time. I am using a BatchScanner because I have multiple systems I am searching for per query. Can I make the BatchScanner only fetch the first result for each Range? I can't specify a specific key because the most recent may not match the datetime given by the user.

Currently, I am using the BatchScanner and ignoring all but the first result per Key. It works right now, but it seems like a waste to pull back all the data for a specific system/type over the network when I only care about the first result per system/type.

EDIT

My attempt using the FirstEntryInRowIterator

@Test
public void testFirstEntryIterator() throws Exception
{
    Connector connector = new MockInstance("inst").getConnector("user", new PasswordToken("password"));
    connector.tableOperations().create("testing");

    BatchWriter writer = writer(connector, "testing");
    writer.addMutation(mutation("row", "fam", "qual1", "val1"));
    writer.addMutation(mutation("row", "fam", "qual2", "val2"));
    writer.addMutation(mutation("row", "fam", "qual3", "val3"));
    writer.close();

    Scanner scanner = connector.createScanner("testing", new Authorizations());
    scanner.addScanIterator(new IteratorSetting(50, FirstEntryInRowIterator.class));

    Key begin = new Key("row", "fam", "qual2");
    scanner.setRange(new Range(begin, begin.followingKey(PartialKey.ROW_COLFAM_COLQUAL)));

    int numResults = 0;
    for (Map.Entry<Key, Value> entry : scanner)
    {
        Assert.assertEquals("qual2", entry.getKey().getColumnQualifier().toString());
        numResults++;
    }

    Assert.assertEquals(1, numResults);
}

My goal is that the returned entry will be the ("row", "fam", "qual2", "val2") but I get 0 results. It almost seems like the Iterator is being applied before the Range maybe? I haven't dug into this yet.

Était-ce utile?

La solution

This sounds like a good use case for using one of Accumulo's SortedKeyValueIterators, specifically the FirstEntryInRowIterator (contained in the accumulo-core artifact).

Create an IteratorSetting with the FirstEntryInRowIterator and add it to your BatchScanner. This will return the first Key/Value in that system_name, and then stop avoiding the overhead of your client ignoring all other results.

A quick modification of the FirstEntryInRowIterator might get you what you want:

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.accumulo.core.iterators;

import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.PartialKey;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.hadoop.io.Text;

public class FirstEntryInRangeIterator extends SkippingIterator implements OptionDescriber {

  // options
  static final String NUM_SCANS_STRING_NAME = "scansBeforeSeek";

  // iterator predecessor seek options to pass through
  private Range latestRange;
  private Collection<ByteSequence> latestColumnFamilies;
  private boolean latestInclusive;

  // private fields
  private Text lastRowFound;
  private int numscans;

  /**
   * convenience method to set the option to optimize the frequency of scans vs. seeks
   */
  public static void setNumScansBeforeSeek(IteratorSetting cfg, int num) {
    cfg.addOption(NUM_SCANS_STRING_NAME, Integer.toString(num));
  }

  // this must be public for OptionsDescriber
  public FirstEntryInRangeIterator() {
    super();
  }

  public FirstEntryInRangeIterator(FirstEntryInRangeIterator other, IteratorEnvironment env) {
    super();
    setSource(other.getSource().deepCopy(env));
  }

  @Override
  public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
    return new FirstEntryInRangeIterator(this, env);
  }

  @Override
  public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
    super.init(source, options, env);
    String o = options.get(NUM_SCANS_STRING_NAME);
    numscans = o == null ? 10 : Integer.parseInt(o);
  }

  // this is only ever called immediately after getting "next" entry
  @Override
  protected void consume() throws IOException {
    if (finished == true || lastRowFound == null)
      return;
    int count = 0;
    while (getSource().hasTop() && lastRowFound.equals(getSource().getTopKey().getRow())) {

      // try to efficiently jump to the next matching key
      if (count < numscans) {
        ++count;
        getSource().next(); // scan
      } else {
        // too many scans, just seek
        count = 0;

        // determine where to seek to, but don't go beyond the user-specified range
        Key nextKey = getSource().getTopKey().followingKey(PartialKey.ROW);
        if (!latestRange.afterEndKey(nextKey))
          getSource().seek(new Range(nextKey, true, latestRange.getEndKey(), latestRange.isEndKeyInclusive()), latestColumnFamilies, latestInclusive);
        else {
          finished = true;
          break;
        }
      }
    }
    lastRowFound = getSource().hasTop() ? getSource().getTopKey().getRow(lastRowFound) : null;
  }

  private boolean finished = true;

  @Override
  public boolean hasTop() {
    return !finished && getSource().hasTop();
  }

  @Override
  public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
    // save parameters for future internal seeks
    latestRange = range;
    latestColumnFamilies = columnFamilies;
    latestInclusive = inclusive;
    lastRowFound = null;

    super.seek(range, columnFamilies, inclusive);
    finished = false;

    if (getSource().hasTop()) {
      lastRowFound = getSource().getTopKey().getRow();
      if (range.beforeStartKey(getSource().getTopKey()))
        consume();
    }
  }

  @Override
  public IteratorOptions describeOptions() {
    String name = "firstEntry";
    String desc = "Only allows iteration over the first entry per range";
    HashMap<String,String> namedOptions = new HashMap<String,String>();
    namedOptions.put(NUM_SCANS_STRING_NAME, "Number of scans to try before seeking [10]");
    return new IteratorOptions(name, desc, namedOptions, null);
  }

  @Override
  public boolean validateOptions(Map<String,String> options) {
    try {
      String o = options.get(NUM_SCANS_STRING_NAME);
      if (o != null)
        Integer.parseInt(o);
    } catch (Exception e) {
      throw new IllegalArgumentException("bad integer " + NUM_SCANS_STRING_NAME + ":" + options.get(NUM_SCANS_STRING_NAME), e);
    }
    return true;
  }

}
Licencié sous: CC-BY-SA avec attribution
Non affilié à StackOverflow
scroll top