Frage

Ich versuche zu lernen, Yelps Python -API für MapReduce, Mrjob, zu verwenden. Ihr einfaches Wortzählerbeispiel ist sinnvoll, aber ich bin gespannt, wie man mit einer Anwendung mit mehreren Eingaben umgehen würde. Wenn Sie beispielsweise die Wörter in einem Dokument nur zählen, multiplizieren Sie einen Vektor mit einer Matrix. Ich habe mich auf diese Lösung ausgedacht, die funktioniert, aber albern fühlt:

class MatrixVectMultiplyTast(MRJob):
    def multiply(self,key,line):
            line = map(float,line.split(" "))
            v,col = line[-1],line[:-1]

            for i in xrange(len(col)):
                    yield i,col[i]*v

    def sum(self,i,occurrences):
            yield i,sum(occurrences)

    def steps(self):
            return [self.mr (self.multiply,self.sum),]

if __name__=="__main__":
    MatrixVectMultiplyTast.run()

Dieser Code wird ausgeführt ./matrix.py < input.txt und der Grund, warum es funktioniert, ist, dass die in input.txt gespeicherte Matrix durch Spalten mit dem entsprechenden Vektorwert am Ende der Linie gespeichert ist.

Die folgende Matrix und Vektor:

enter image description here

werden als input.txt als:

enter image description here

Kurz gesagt, wie würde ich die Matrix und den Vektor in separaten Dateien natürlicher speichern und beide an MRJOB übergeben?

War es hilfreich?

Lösung

Wenn Sie Ihre Rohdaten mit einem anderen Datensatz (oder demselben row_i, row_j) verarbeiten möchten, können Sie entweder:

1) Erstellen Sie einen S3 -Bucket, um eine Kopie Ihrer Daten zu speichern. Übergeben Sie den Standort dieser Kopie an Ihre Aufgabenklasse, z. Vorbehalt: Leider scheint es, dass die gesamte Datei vor der Verarbeitung "heruntergeladen" werden muss. Wenn die Verbindungen ins Stocken geraten oder zu lange dauern, kann dieser Job fehlschlagen. Hier ist ein Python/MRJOB -Code, um dies zu tun.

Setzen Sie dies in Ihre Mapper -Funktion:

d1 = line1.split('\t', 1)
v1, col1 = d1[0], d1[1]
conn = boto.connect_s3(aws_access_key_id=<AWS_ACCESS_KEY_ID>, aws_secret_access_key=<AWS_SECRET_ACCESS_KEY>)
bucket = conn.get_bucket(self.options.bucket)  # bucket = conn.get_bucket(MY_UNIQUE_BUCKET_NAME_AS_STRING)
data_copy = bucket.get_key(self.options.my_datafile_copy_location).get_contents_as_string().rstrip()
### CAVEAT: Needs to get the whole file before processing the rest.
for line2 in data_copy.split('\n'):
    d2 = line2.split('\t', 1)
    v2, col2 = d2[0], d2[1]
    ## Now, insert code to do any operations between v1 and v2 (or c1 and c2) here:
    yield <your output key, value pairs>
conn.close()

2) Erstellen Sie eine SimpledB -Domäne und speichern Sie alle Ihre Daten dort. Lesen Sie hier auf Boto und SimpledB:http://code.google.com/p/boto/wiki/SimpledBinTro

Ihr Mapper -Code würde so aussehen:

dline = dline.strip()
d0 = dline.split('\t', 1)
v1, c1 = d0[0], d0[1]
sdb = boto.connect_sdb(aws_access_key_id=<AWS_ACCESS_KEY>, aws_secret_access_key=<AWS_SECRET_ACCESS_KEY>)
domain = sdb.get_domain(MY_DOMAIN_STRING_NAME)
for item in domain:
    v2, c2 = item.name, item['column']
    ## Now, insert code to do any operations between v1 and v2 (or c1 and c2) here:
    yield <your output key, value pairs>
sdb.close()

Diese zweite Option kann besser abschneiden, wenn Sie über sehr große Datenmengen verfügen, da die Anforderungen für jede Datenzeile und nicht den gesamten Betrag gleichzeitig erfolgen können. Beachten Sie, dass SimpledB -Werte nur maximal 1024 Zeichen lang sein können, sodass Sie möglicherweise über eine Methode komprimieren/dekomprimieren müssen, wenn Ihre Datenwerte länger sind.

Andere Tipps

Die tatsächliche Antwort auf Ihre Frage lautet, dass MRJOB das Hadoop -Streaming -Join -Muster noch nicht ganz unterstützt, das die Umgebungsvariable map_input_file (die die Eigenschaft map.input.file enthüllt) lesen soll, um zu bestimmen, mit welcher Art von Datei Sie sich befassen, mit basiert auf seinem Weg und/oder Namen.

Möglicherweise können Sie es weiterhin schaffen, wenn Sie leicht aus dem Lesen der Daten selbst erkennen können, zu welcher Typ sie gehört, wie in diesem Artikel angezeigt wird:

http://allthingshadoop.com/2011/12/16/simple-hadoop-streaming-tutorial-using-joins--keys-with-python/

Das ist jedoch nicht immer möglich ...

Ansonsten sieht Myjob fantastisch aus und ich wünschte, sie könnten in Zukunft Unterstützung dafür hinzufügen. Bis dahin ist dies so ziemlich ein Deal Breaker für mich.

Auf diese Weise verwende ich mehrere Eingänge und vor dem Dateinamen geeignete Änderungen in der Mapper -Phase.

Läuferprogramm:

from mrjob.hadoop import *


#Define all arguments

os.environ['HADOOP_HOME'] = '/opt/cloudera/parcels/CDH/lib/hadoop/'
print "HADOOP HOME is now set to : %s" % (str(os.environ.get('HADOOP_HOME')))
job_running_time = datetime.datetime.now().strftime('%Y-%m-%d_%H_%M_%S')
hadoop_bin = '/usr/bin/hadoop'
mode = 'hadoop'
hs = HadoopFilesystem([hadoop_bin])

input_file_names = ["hdfs:///app/input_file1/","hdfs:///app/input_file2/"]

aargs = ['-r',mode,'--jobconf','mapred.job.name=JobName','--jobconf','mapred.reduce.tasks=3','--no-output','--hadoop-bin',hadoop_bin]
aargs.extend(input_file_names)
aargs.extend(['-o',output_dir])
print aargs
status_file = True

mr_job = MRJob(args=aargs)
with mr_job.make_runner() as runner:
    runner.run()
os.environ['HADOOP_HOME'] = ''
print "HADOOP HOME is now set to : %s" % (str(os.environ.get('HADOOP_HOME')))

Die MRJOB -Klasse:

class MR_Job(MRJob):
    DEFAULT_OUTPUT_PROTOCOL = 'repr_value'
    def mapper(self, _, line):
    """
    This function reads lines from file.
    """
    try:
        #Need to clean email.
        input_file_name = get_jobconf_value('map.input.file').split('/')[-2]
                """
                Mapper code
                """
    except Exception, e:
        print e

    def reducer(self, email_id,visitor_id__date_time):
    try:
        """
                Reducer Code
                """
    except:
        pass


if __name__ == '__main__':
    MRV_Email.run()

Nach meinem Verständnis würden Sie MRJOB nicht verwenden, es sei denn, Sie wollten Hadoop -Cluster- oder Hadoop -Dienste von Amazon nutzen, auch wenn das Beispiel auf lokalen Dateien ausgeführt wird.

Mrjob in Principal verwendet "Hadoop -Streaming"Um den Job einzureichen.

Dies bedeutet, dass alle als Dateien oder Ordner von Hadoop angegebenen Eingaben in Mapper und nachfolgende Ergebnisse zum Reduzierer gestreamt werden. Alle Mapper erhalten eine Eingabescheibe und berücksichtigen alle Eingaben als schematisch gleich, so dass es den Schlüssel für jedes Datenschicht gleichmäßig analysiert und verarbeitet.

Aus diesem Verständnis stammen die Eingaben schematisch mit dem Mapper gleich. Nur möglich, zwei verschiedene schematische Daten einzuschließen, besteht darin, sie in derselben Datei so zu verschieben, dass der Mapper verstehen kann, welche Vektordaten und welche Matrixdaten sind.

You are actually doing it already.

Sie können dies einfach verbessern, indem Sie einen Spezifizierer haben, wenn eine Zeile Matrixdaten oder Vektordaten ist. Sobald Sie eine Vektordaten sehen, werden die vorhergehenden Matrixdaten darauf angewendet.

matrix, 1, 2, ...
matrix, 2, 4, ...
vector, 3, 4, ...
matrix, 1, 2, ...
.....

Aber der Prozess, den Sie erwähnt haben, funktioniert gut. Sie müssen alle schematischen Daten in einer einzelnen Datei haben.

Dies hat jedoch immer noch Probleme. K, V Map reduzieren funktioniert besser, wenn das vollständige Schema in einer einzigen Linie vorhanden ist und eine vollständige Einzelverarbeitungseinheit enthält.

Nach meinem Verständnis machen Sie es bereits richtig, aber ich denke, Map-Reduce ist kein geeigneter Mechanismus für diese Art von Daten. Ich hoffe, jemand klärt dies noch weiter als ich konnte.

Lizenziert unter: CC-BY-SA mit Zuschreibung
Nicht verbunden mit StackOverflow
scroll top