Почему моя реализация MapReduce (Real World Haskell) с использованием ITERATE IO также терпит неудачу с «Слишком много открытых файлов»

StackOverflow https://stackoverflow.com/questions/5856435

  •  27-10-2019
  •  | 
  •  

Вопрос

Я внедряю программу Haskell Wich, которая сравнивает каждую строку файла друг с другом в файле. Который может быть реализован Одиночная резьба следующим образом

distance :: Int -> Int -> Int
distance a b = (a-b)*(a-b)

sumOfDistancesOnSmallFile :: FilePath -> IO Int
sumOfDistancesOnSmallFile path = do
              fileContents <- readFile path
              return $ allDistances $ map read $ lines $ fileContents
              where
                  allDistances (x:xs) = (allDistances xs) + ( sum $ map (distance x) xs)
                  allDistances _ = 0

Это будет работать во время O (n^2), и все время сохраняет полный список целых чисел в памяти. В моей реальной программе линия содержит больше чисел, из которых я строю слегка комплексный данных, чем int. Это дало мне ошибки памяти на данные, которые я должен обрабатывать.

Таким образом, есть два улучшения, которые должны быть сделаны в вышеупомянутом одноободном решении. Во -первых, ускорить фактическое время работы. Во -вторых, найдите способ не хранить весь список в памяти на полный рабочий день. Я знаю, что это требует анализа полного файла n раз. Таким образом, будут сравниваться O (n^2), а O (n^2) проанализированы. Это нормально для меня, так как я предпочел бы иметь медленную успешную программу, чем программа неудачной. Когда входной файл достаточно маленький, я всегда могу прожить более простую версию.

Чтобы использовать несколько ядер ЦП, я взял реализацию MapReduce из реального мира Haskell (глава 24, доступна здесь).

Я изменил функцию кункинга от книги, вместо того, чтобы делить полный файл на куски, вернуть столько кусков, сколько линий с каждым кусочком, представляющим один элемент

tails . lines . readFile

Поскольку я хочу, чтобы программа также была масштабируемой в размере файлов, я изначально использовал ленивый io. Анкет Это, однако, терпит неудачу с «слишком много открытых файлов», о которых я спросил в Предыдущий вопрос (Ручки файла были утилизированы слишком поздно GC). Там размещена полная ленивая версия IO.

Как объясняется, объясняется, строгого io может решить проблему. Это действительно решает проблему «слишком много открытых файлов» для файлов линейных линий 2K, но не сбои с «вне памяти» в файле 50K.

Обратите внимание, что первый Одиночная резьба Реализация (без MapReduce) способна обрабатывать файл 50K.

Альтернативное решение, которое также больше всего привлекает меня, - использовать ИТИТЕРИ IO. Анкет Я ожидал, что это решит как дескриптор файла, так и истощение ресурса памяти. Однако моя реализация по -прежнему терпит неудачу с ошибкой «слишком много открытых файлов» в файле линии 2K.

Версия Iteratere IO имеет то же самое уменьшение карты функционировать, как в книге, но имеет модифицированную Chunkedfileenum чтобы позволить ему работать с Перечисление.

Таким образом, мой вопрос; Что не так со следующей базовой реализацией IOTAREE? Где лень?

import Control.Monad.IO.Class (liftIO)
import Control.Monad.Trans (MonadIO, liftIO)
import System.IO

import qualified Data.Enumerator.List as EL
import qualified Data.Enumerator.Text as ET
import Data.Enumerator hiding (map, filter, head, sequence)

import Data.Text(Text)
import Data.Text.Read
import Data.Maybe

import qualified Data.ByteString.Char8 as Str
import Control.Exception (bracket,finally)
import Control.Monad(forM,liftM)
import Control.Parallel.Strategies
import Control.Parallel
import Control.DeepSeq (NFData)
import Data.Int (Int64)

--Goal: in a file with n values, calculate the sum of all n*(n-1)/2 squared distances

--My operation for one value pair
distance :: Int -> Int -> Int
distance a b = (a-b)*(a-b)

combineDistances :: [Int] -> Int
combineDistances = sum

--Test file generation
createTestFile :: Int -> FilePath -> IO ()
createTestFile n path = writeFile path $ unlines $ map show $ take n $ infiniteList 0 1
        where infiniteList :: Int->Int-> [Int]
              infiniteList i j = (i + j) : infiniteList j (i+j)

--Applying my operation simply on a file 
--(Actually does NOT throw an Out of memory on a file generated by createTestFile 50000)
--But i want to use multiple cores..
sumOfDistancesOnSmallFile :: FilePath -> IO Int
sumOfDistancesOnSmallFile path = do
                  fileContents <- readFile path
                  return $ allDistances $ map read $ lines $ fileContents
                  where
                      allDistances (x:xs) = (allDistances xs) + ( sum $ map (distance x)    xs)
                      allDistances _ = 0

--Setting up an enumerator of read values from a text stream
readerEnumerator :: Monad m =>Integral a => Reader a -> Step a m b -> Iteratee Text m b
readerEnumerator reader = joinI . (EL.concatMapM transformer)
                            where transformer input = case reader input of
                                         Right (val, remainder) -> return [val]
                                         Left err -> return [0]

readEnumerator :: Monad m =>Integral a => Step a m b -> Iteratee Text m b
readEnumerator = readerEnumerator (signed decimal)

--The iteratee version of my operation
distancesFirstToTailIt :: Monad m=> Iteratee Int m Int
distancesFirstToTailIt = do
    maybeNum <- EL.head
    maybe (return 0) distancesOneToManyIt maybeNum

distancesOneToManyIt :: Monad m=> Int -> Iteratee Int m Int
distancesOneToManyIt base = do
    maybeNum <- EL.head
    maybe (return 0) combineNextDistance maybeNum
    where combineNextDistance nextNum = do
              rest <- distancesOneToManyIt base
              return $ combineDistances [(distance base nextNum),rest]

--The mapreduce algorithm
mapReduce :: Strategy b -- evaluation strategy for mapping
          -> (a -> b)   -- map function
          -> Strategy c -- evaluation strategy for reduction
          -> ([b] -> c) -- reduce function
          -> [a]        -- list to map over
          -> c
mapReduce mapStrat mapFunc reduceStrat reduceFunc input =
          mapResult `pseq` reduceResult
          where mapResult    = parMap mapStrat mapFunc input
                reduceResult = reduceFunc mapResult `using` reduceStrat

--Applying the iteratee operation using mapreduce
sumOfDistancesOnFileWithIt :: FilePath -> IO Int
sumOfDistancesOnFileWithIt path = chunkedFileEnum chunkByLinesTails (distancesUsingMapReduceIt) path

distancesUsingMapReduceIt :: [Enumerator Text IO Int] -> IO Int
distancesUsingMapReduceIt = mapReduce rpar (runEnumeratorAsMapFunc)
                                      rpar (sumValuesAsReduceFunc)
                            where runEnumeratorAsMapFunc :: Enumerator Text IO Int -> IO Int
                                  runEnumeratorAsMapFunc = (\source->run_ (source $$ readEnumerator $$ distancesFirstToTailIt))
                                  sumValuesAsReduceFunc :: [IO Int] -> IO Int
                                  sumValuesAsReduceFunc = liftM sum . sequence


--Working with (file)chunk enumerators:
data ChunkSpec = CS{
    chunkOffset :: !Int
    , chunkLength :: !Int
    } deriving (Eq,Show)

chunkedFileEnum ::   (NFData (a)) => MonadIO m =>
                (FilePath-> IO [ChunkSpec])
           ->   ([Enumerator Text m b]->IO a)
           ->   FilePath
           ->   IO a
chunkedFileEnum chunkCreator funcOnChunks path = do
    (chunks, handles)<- chunkedEnum chunkCreator path
    r <- funcOnChunks chunks
    (rdeepseq r `seq` (return r)) `finally` mapM_ hClose handles

chunkedEnum ::  MonadIO m=>
                (FilePath -> IO [ChunkSpec])
            ->  FilePath
            ->  IO ([Enumerator Text m b], [Handle])
chunkedEnum chunkCreator path = do
    chunks <- chunkCreator path
    liftM unzip . forM chunks $ \spec -> do
        h <- openFile path ReadMode
        hSeek h AbsoluteSeek (fromIntegral (chunkOffset spec))
        let chunk = ET.enumHandle h --Note:chunklength not taken into account, so just to EOF
        return (chunk,h)

-- returns set of chunks representing  tails . lines . readFile 
chunkByLinesTails :: FilePath -> IO[ChunkSpec]
chunkByLinesTails path = do
    bracket (openFile path ReadMode) hClose $ \h-> do
        totalSize <- fromIntegral `liftM` hFileSize h
        let chunkSize = 1
            findChunks offset = do
            let newOffset = offset + chunkSize
            hSeek h AbsoluteSeek (fromIntegral newOffset)
            let findNewline lineSeekOffset = do
                eof <- hIsEOF h
                if eof
                    then return [CS offset (totalSize - offset)]
                    else do
                        bytes <- Str.hGet h 256
                        case Str.elemIndex '\n' bytes of
                            Just n -> do
                                nextChunks <- findChunks (lineSeekOffset + n + 1)
                                return (CS offset (totalSize-offset):nextChunks)
                            Nothing -> findNewline (lineSeekOffset + Str.length bytes)
            findNewline newOffset
        findChunks 0

Кстати, я использую HaskellPlatform 2011.2.0 на Mac OS X 10.6.7 (Snow Leopard)
со следующими пакетами:
Постанивание 0.9.1.10
Параллель 3.1.0.1
Перечисление 0.4.8, с ручным здесь

Это было полезно?

Решение

Как говорится в ошибке, слишком много открытых файлов. Я ожидал, что Haskell запустит большую часть программы последовательно, но некоторые «искры» параллельны. Однако, как упоминал SCLV, Хаскелл всегда вызывает оценки.

Обычно это не проблема в чистой функциональной программе, но это происходит при работе с IO (ресурсами). Я масштабировал параллелизм, как описано в реальной книге Haskell, слишком далеко. Поэтому мой вывод состоит в том, чтобы делать параллелизм только в ограниченном масштабе при работе с ресурсами IO в искры. В чистой функциональной части чрезмерный параллелизм может быть успешным.

Таким образом, ответ на мой пост заключается не в том, чтобы не использовать MapReduce для всей программы, а во внутренней чистой функциональной части.

Чтобы показать, где программа на самом деле не сработала, я настроил ее с помощью -enable -excecutable -profiling -p -p, создать ее и запустил с помощью +rts -p -hc -L30. Поскольку исполняемый файл сразу не работает, нет профиля распределения памяти. Полученный профиль распределения времени в файле .prof начинается со следующего:

                                                                                               individual    inherited
COST CENTRE              MODULE                                               no.    entries  %time %alloc   %time %alloc

MAIN                     MAIN                                                   1            0   0.0    0.3   100.0  100.0
  main                    Main                                                1648           2   0.0    0.0    50.0   98.9
    sumOfDistancesOnFileWithIt MapReduceTest                                  1649           1   0.0    0.0    50.0   98.9
      chunkedFileEnum       MapReduceTest                                     1650           1   0.0    0.0    50.0   98.9
        chunkedEnum          MapReduceTest                                    1651         495   0.0   24.2    50.0   98.9
          lineOffsets         MapReduceTest                                   1652           1  50.0   74.6    50.0   74.6

ChunkedEnum возвращает IO ([текст перечисления MB], [Ручка]) и, по -видимому, получает 495 записей. Входной файл представлял собой линейный файл 2K, поэтому единственная запись в LineOffsets вернула список из 2000 смещений. В DistancesusingMapreduceit нет ни одной записи, поэтому фактическая работа даже не началась!

Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top