Frage

Der folgende Code liest aus der hbase, konvertiert ihn dann in eine JSON-Struktur und konvertiert ihn in schemaRDD. Das Problem ist jedoch, dass ich es bin using List Speichern Sie den JSON-String und übergeben Sie ihn dann an javaRDD. Bei Daten von etwa 100 GB wird der Master mit Daten in den Speicher geladen.Was ist der richtige Weg, um die Daten von hbase zu laden, dann eine Manipulation durchzuführen und sie dann in JavaRDD zu konvertieren?

package hbase_reader;


import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.JavaSchemaRDD;
import org.apache.commons.cli.ParseException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.spark.SparkConf;

import scala.Function1;
import scala.Tuple2;
import scala.runtime.AbstractFunction1;

import com.google.common.collect.Lists;

public class hbase_reader {

    public static void main(String[] args) throws IOException, ParseException {

        List<String> jars = Lists.newArrayList("");

        SparkConf spconf = new SparkConf();
        spconf.setMaster("local[2]");
        spconf.setAppName("HBase");
        //spconf.setSparkHome("/opt/human/opt/spark-0.9.0-hdp1");
        spconf.setJars(jars.toArray(new String[jars.size()]));
        JavaSparkContext sc = new JavaSparkContext(spconf);
        //spconf.set("spark.executor.memory", "1g");

        JavaSQLContext jsql = new JavaSQLContext(sc);


        HBaseConfiguration conf = new HBaseConfiguration();
        String tableName = "HBase.CounData1_Raw_Min1";
        HTable table = new HTable(conf,tableName);
        try {

            ResultScanner scanner = table.getScanner(new Scan());
            List<String> jsonList = new ArrayList<String>();

            String json = null;

            for(Result rowResult:scanner) {
                json = "";
                String rowKey  = Bytes.toString(rowResult.getRow());
                for(byte[] s1:rowResult.getMap().keySet()) {
                    String s1_str = Bytes.toString(s1);

                    String jsonSame = "";
                    for(byte[] s2:rowResult.getMap().get(s1).keySet()) {
                        String s2_str = Bytes.toString(s2);
                        for(long s3:rowResult.getMap().get(s1).get(s2).keySet()) {
                            String s3_str = new String(rowResult.getMap().get(s1).get(s2).get(s3));
                            jsonSame += "\""+s2_str+"\":"+s3_str+",";
                        }
                    }
                    jsonSame = jsonSame.substring(0,jsonSame.length()-1);
                    json += "\""+s1_str+"\""+":{"+jsonSame+"}"+",";
                }
                json = json.substring(0,json.length()-1);
                json = "{\"RowKey\":\""+rowKey+"\","+json+"}";
                jsonList.add(json);
            }

            JavaRDD<String> jsonRDD = sc.parallelize(jsonList);

            JavaSchemaRDD schemaRDD = jsql.jsonRDD(jsonRDD);




            System.out.println(schemaRDD.take(2));

        } finally {
            table.close();
        }

    }

}
War es hilfreich?

Lösung

Ein grundlegendes Beispiel, um die HBASE-Daten mit Funken (Scala) zu lesen, können Sie diese auch in Java wrtie:

generasacodicetagpre.

Aktualisiert -2016

Ab Funke 1.0.x +, jetzt können Sie jetzt den Funken-HBase-Anschluss verwenden:

MAVEN-Abhängigkeit, einzuschließen:

generasacodicetagpre.

und finden Sie einen unteren Beispielcode für dasselbe:

generasacodicetagpre.

Aktualisiert - 2017

Ab Funke 1.6.x +, jetzt können Sie SHC Connector auch verwenden (HortonWorks oder HDP-Benutzer):

MAVEN-Abhängigkeit, einzuschließen:

generasacodicetagpre.

Der Hauptvorteil der Verwendung dieses Anschlusses besteht darin, dass es in der Schemadefinition flexibel ist, und benötigt nicht wie im Nerdammer / Spark-HBASE-Anschluss nicht gedruckte Parameter. Denken Sie auch daran, dass er Funken 2.x unterstützt, sodass dieser Anschluss ziemlich flexibel ist und in Fragen und PRS End-to-End-Unterstützung bietet.

Finden Sie den folgenden Repository-Pfad für die neuesten Readme und Proben:

hortonWorks Spark HBASE-Anschluss

Sie können diese RDDs auch in die Datenframes konvertieren und SQL darüber ausführen, oder Sie können diese Datenmenge oder Datenframes an den Benutzer definierten Java Pojo's oder Fallklassen aufnehmen. Es funktioniert brillant.

Bitte kommentieren Sie unten, wenn Sie noch etwas benötigen.

Andere Tipps

Ich lieber lieber von HBase und mache die JSON-Manipulation alles in Funken.
Funken bietet javasparkcontext.newapihadooprdd Funktion, um Daten von der Hadoop-Lagerung einschließlich HBase zu lesen.Sie müssen die HBASE-Konfiguration, den Tabellennamen und den Scannen des Konfigurationsparameters und des Tabelleneingangsformats und des Schlüsselwerts

angeben.

Sie können Tabelleneingabeformat Klasse und den Job-Parameter verwendenSo liefern Sie den Tabellennamen und die Scan-Konfiguration

Beispiel:

generasacodicetagpre.

Dann können Sie die JSON-Manipulation in Funke machen.Da der Funken die Neuberechnung durchführen kann, wenn der Speicher voll ist, lädt er nur die Daten, die für den Neuberechnungsteil (CMIIW) benötigt werden, sodass Sie sich nicht um die Datengröße

kümmern müssen

Nur ein Kommentar zum Hinzufügen von Scan:

hinzu

tableputformat hat die folgenden Attribute:

    .
  1. scan_row_start
  2. scan_row_stop
generasacodicetagpre.

Da die Frage nicht neu ist, gibt es vorerst noch ein paar andere Alternativen:

Ich weiß nicht viel über das erste Projekt, aber es sieht so aus, als würde es Spark 2.x nicht unterstützen.Allerdings gibt es auf RDD-Ebene umfangreiche Unterstützung für Spark 1.6.x.

Spark-on-HBase hingegen verfügt über Zweige für Spark 2.0 und das kommende Spark 2.1.Dieses Projekt ist sehr vielversprechend, da es sich auf Dataset/DataFrame-APIs konzentriert.Unter der Haube implementiert es die Standard-Spark-Datasource-API und nutzt die Spark-Catalyst-Engine zur Abfrageoptimierung.Das behaupten die Entwickler Hier dass es in der Lage ist, Partitionen und Spalten zu bereinigen, Prädikat-Pushdown durchzuführen und Datenlokalität zu erreichen.

Ein einfaches Beispiel, das die verwendet com.hortonworks:shc:1.0.0-2.0-s_2.11 Artefakt daraus Repo und Spark 2.0.2, wird als nächstes vorgestellt:

case class Record(col0: Int, col1: Int, col2: Boolean)

val spark = SparkSession
  .builder()
  .appName("Spark HBase Example")
  .master("local[4]")
  .getOrCreate()

def catalog =
  s"""{
      |"table":{"namespace":"default", "name":"table1"},
      |"rowkey":"key",
      |"columns":{
      |"col0":{"cf":"rowkey", "col":"key", "type":"int"},
      |"col1":{"cf":"cf1", "col":"col1", "type":"int"},
      |"col2":{"cf":"cf2", "col":"col2", "type":"boolean"}
      |}
      |}""".stripMargin

val artificialData = (0 to 100).map(number => Record(number, number, number % 2 == 0))

// write
spark
  .createDataFrame(artificialData)
  .write
  .option(HBaseTableCatalog.tableCatalog, catalog)
  .option(HBaseTableCatalog.newTable, "5")
  .format("org.apache.spark.sql.execution.datasources.hbase")
  .save()

// read
val df = spark
  .read
  .option(HBaseTableCatalog.tableCatalog, catalog)
  .format("org.apache.spark.sql.execution.datasources.hbase")
  .load()

df.count()
Lizenziert unter: CC-BY-SA mit Zuschreibung
Nicht verbunden mit StackOverflow
scroll top