Вопрос

Я пытаюсь научиться пользоваться Python API Yelp для MapReduce, MRJOB. Их простой пример счетчика слов имеет смысл, но мне любопытно, как можно было бы обрабатывать приложение, включающее несколько входов. Например, вместо того, чтобы просто подсчитать слова в документе, умножая вектор на матрицу. Я придумал это решение, которое функционирует, но чувствую себя глупо:

class MatrixVectMultiplyTast(MRJob):
    def multiply(self,key,line):
            line = map(float,line.split(" "))
            v,col = line[-1],line[:-1]

            for i in xrange(len(col)):
                    yield i,col[i]*v

    def sum(self,i,occurrences):
            yield i,sum(occurrences)

    def steps(self):
            return [self.mr (self.multiply,self.sum),]

if __name__=="__main__":
    MatrixVectMultiplyTast.run()

Этот код запускается ./matrix.py < input.txt И причина, по которой он работает, заключается в том, что матрица, хранящаяся в input.txt по столбцам, с соответствующим значением вектора в конце строки.

Итак, следующая матрица и вектор:

enter image description here

представлены как input.txt как:

enter image description here

Короче говоря, как я бы более естественно хранения матрицы и вектора в отдельных файлах и передачи их обоим в MRJOB?

Это было полезно?

Решение

Если вам нужны обработки ваших необработанных данных с другим (или таким же набором данных ROW_I, ROW_J), вы можете либо:

1) Создайте ведро S3 для хранения копии ваших данных. Передайте местоположение этой копии в свой класс заданий, например, Self.options.bucket и Self.options.my_datafile_copy_location в коде ниже. Предостережение: К сожалению, кажется, что весь файл должен быть «загружен» на задача, прежде чем обрабатывать. Если соединения колеблется или занимает слишком много времени, чтобы загрузить, эта задача может потерпеть неудачу. Вот какой -то код Python/MrJob для этого.

Поместите это в свою функцию Mapper:

d1 = line1.split('\t', 1)
v1, col1 = d1[0], d1[1]
conn = boto.connect_s3(aws_access_key_id=<AWS_ACCESS_KEY_ID>, aws_secret_access_key=<AWS_SECRET_ACCESS_KEY>)
bucket = conn.get_bucket(self.options.bucket)  # bucket = conn.get_bucket(MY_UNIQUE_BUCKET_NAME_AS_STRING)
data_copy = bucket.get_key(self.options.my_datafile_copy_location).get_contents_as_string().rstrip()
### CAVEAT: Needs to get the whole file before processing the rest.
for line2 in data_copy.split('\n'):
    d2 = line2.split('\t', 1)
    v2, col2 = d2[0], d2[1]
    ## Now, insert code to do any operations between v1 and v2 (or c1 and c2) here:
    yield <your output key, value pairs>
conn.close()

2) Создайте домен SimpleB и храните все ваши данные там. Читайте здесь на Boto и Simpledb:http://code.google.com/p/boto/wiki/simpledbintro

Ваш код Mapper будет выглядеть так:

dline = dline.strip()
d0 = dline.split('\t', 1)
v1, c1 = d0[0], d0[1]
sdb = boto.connect_sdb(aws_access_key_id=<AWS_ACCESS_KEY>, aws_secret_access_key=<AWS_SECRET_ACCESS_KEY>)
domain = sdb.get_domain(MY_DOMAIN_STRING_NAME)
for item in domain:
    v2, c2 = item.name, item['column']
    ## Now, insert code to do any operations between v1 and v2 (or c1 and c2) here:
    yield <your output key, value pairs>
sdb.close()

Этот второй вариант может работать лучше, если у вас есть очень большие объемы данных, поскольку он может сделать запросы на каждую строку данных, а не всю сумму одновременно. Имейте в виду, что значения SimpleDB могут составлять только максимум 1024 символа, поэтому вам может потребоваться сжатие/декакомпресс с помощью некоторого метода, если значения ваших данных длиннее этого.

Другие советы

Фактический ответ на ваш вопрос заключается в том, что MRJob еще не поддерживает шаблон соединения потоковой передачи Hadoop, который состоит в том, чтобы прочитать переменную среды MAP_INPUT_FILE (которая раскрывает свойство. на его пути и/или названии.

Вы все равно можете справиться с этим, если вы можете легко обнаружить, просто читая сам данные, к которому он принадлежит, как это отображается в этой статье:

http://allthingshadoop.com/2011/12/16/simple-hadoop-streaming-tutorial-using-joins-and-keys-with-python/

Однако это не всегда возможно ...

В противном случае Myjob выглядит фантастически, и я хотел бы, чтобы они добавили поддержку для этого в будущем. До тех пор это в значительной степени нарушает сделки для меня.

Вот как я использую несколько входов и на основе имени файла вносит подходящие изменения в фазе Mapper.

Программа бегуна:

from mrjob.hadoop import *


#Define all arguments

os.environ['HADOOP_HOME'] = '/opt/cloudera/parcels/CDH/lib/hadoop/'
print "HADOOP HOME is now set to : %s" % (str(os.environ.get('HADOOP_HOME')))
job_running_time = datetime.datetime.now().strftime('%Y-%m-%d_%H_%M_%S')
hadoop_bin = '/usr/bin/hadoop'
mode = 'hadoop'
hs = HadoopFilesystem([hadoop_bin])

input_file_names = ["hdfs:///app/input_file1/","hdfs:///app/input_file2/"]

aargs = ['-r',mode,'--jobconf','mapred.job.name=JobName','--jobconf','mapred.reduce.tasks=3','--no-output','--hadoop-bin',hadoop_bin]
aargs.extend(input_file_names)
aargs.extend(['-o',output_dir])
print aargs
status_file = True

mr_job = MRJob(args=aargs)
with mr_job.make_runner() as runner:
    runner.run()
os.environ['HADOOP_HOME'] = ''
print "HADOOP HOME is now set to : %s" % (str(os.environ.get('HADOOP_HOME')))

Класс MRJob:

class MR_Job(MRJob):
    DEFAULT_OUTPUT_PROTOCOL = 'repr_value'
    def mapper(self, _, line):
    """
    This function reads lines from file.
    """
    try:
        #Need to clean email.
        input_file_name = get_jobconf_value('map.input.file').split('/')[-2]
                """
                Mapper code
                """
    except Exception, e:
        print e

    def reducer(self, email_id,visitor_id__date_time):
    try:
        """
                Reducer Code
                """
    except:
        pass


if __name__ == '__main__':
    MRV_Email.run()

В моем понимании, вы не будете использовать MRJOB, если не захотите использовать Hadoop Cluster или Hadoop Services от Amazon, даже если в примере используется работа в локальных файлах.

Mrjob в принципиальном использовании "Потоковая передача Hadoop"Отправить работу.

Это означает, что все входы, указанные как файлы или папки из Hadoop, передаются на Mapper, и последующие результаты для восстановления. Весь Mapper получает кусок ввода и считает, что все входные данные являются схематично одинаковыми, так что он равномерно анализации и процессов, значение для каждого среза данных.

Исходя из этого понимания, входные данные схематично одинаковы для Mapper. Единственный способ включить два разных схематических данных - это переплетать их в одном и том же файле таким образом, что карт может понять, какие данные являются векторными данными, а какие - данные матрицы.

You are actually doing it already.

Вы можете просто улучшить это, имея какой -то спецификатор, если линия является матрицей данных или векторными данными. Как только вы увидите векторные данные, к нему применяются предыдущие данные матрицы.

matrix, 1, 2, ...
matrix, 2, 4, ...
vector, 3, 4, ...
matrix, 1, 2, ...
.....

Но процесс, который вы упомянули, работает хорошо. Вы должны иметь все схематические данные в одном файле.

Это все еще есть проблемы, хотя. K, v карта снижение работает лучше, когда полная схема присутствует в одной линии, и содержит полный единичный блок обработки.

На мой взгляд, вы уже делаете это правильно, но я думаю, что карта-восстановление не является подходящим механизмом для такого рода данных. Я надеюсь, что кто -то разъясняет это еще дальше, чем я.

Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top