Warum fällt meine MapReduce -Implementierung (Real World Haskell) mit ITeratee IO auch mit „zu vielen offenen Dateien“ fehl?

StackOverflow https://stackoverflow.com/questions/5856435

  •  27-10-2019
  •  | 
  •  

Frage

Ich implementiere ein Haskell -Programm, das jede Zeile einer Datei miteinander in der Datei vergleicht. Das kann implementiert werden Einzelfaden folgendermaßen

distance :: Int -> Int -> Int
distance a b = (a-b)*(a-b)

sumOfDistancesOnSmallFile :: FilePath -> IO Int
sumOfDistancesOnSmallFile path = do
              fileContents <- readFile path
              return $ allDistances $ map read $ lines $ fileContents
              where
                  allDistances (x:xs) = (allDistances xs) + ( sum $ map (distance x) xs)
                  allDistances _ = 0

Dies läuft in der Zeit von o (n^2) und muss die vollständige Liste der Ganzzahlen die ganze Zeit im Speicher halten. In meinem tatsächlichen Programm enthält die Zeile mehr Zahlen, aus denen ich einen leicht komplexen Datenatyp als int erstellte. Dies gab mich aus Speicherfehlern über die Daten, die ich verarbeiten muss.

Daher gibt es zwei Verbesserungen an der oben genannten Einzelfadenlösung. Beschleunigen Sie zunächst die tatsächliche Laufzeit. Zweitens finden Sie einen Weg, um die gesamte Liste nicht in Vollzeit in Speicher zu halten. Ich weiß, dass dies das Parsen der vollständigen Datei n -mal erfordert. Somit wird es o (n^2) Vergleiche und O (n^2) -Linien analysiert. Das ist in Ordnung für mich, da ich lieber ein langsam erfolgreiches Programm als ein fehlgeschlagenes Programm habe. Wenn die Eingabedatei so klein ist, kann ich immer in einer einfacheren Version wohnen.

Um mehrere CPU -Kerne zu verwenden, habe ich die MapReduce -Implementierung von Haskell aus der realen Welt herausgenommen (Kapitel 24, verfügbar hier).

Ich habe die Chunking -Funktion vom Buch an, anstatt die vollständige Datei in Stücken zu teilen

tails . lines . readFile

Da ich möchte, dass das Programm auch in der Dateigröße skalierbar ist, habe ich zunächst verwendet faul io. Dies schlägt jedoch mit "zu vielen offenen Dateien" fehl, über die ich in a gefragt habe Vorherige Frage (Die Dateigriffe wurden von der GC zu spät entsorgt). Die vollständige faule IO -Version ist dort gepostet.

Wie die akzeptierte Antwort erklärt, Strenge IO könnte das Problem lösen. Das löst in der Tat das Problem "zu viele geöffnete Dateien" für 2K -Zeilendateien, fällt jedoch mit "Out -Speicher" in einer 50K -Datei fehl.

Beachten Sie, dass der erste Einzelfaden Implementierung (ohne MapReduce) kann eine 50 -km -Datei bearbeiten.

Die alternative Lösung, die mich auch am meisten anspricht, ist zu verwenden Iteratee io. Ich erwartete, dass dies sowohl das Dateihandle als auch die Erschöpfung der Speicherressourcen lösen würde. Meine Implementierung schlägt jedoch mit einem Fehler "zu viele geöffnete Dateien" in einer 2K -Zeilendatei fehl.

Die ITeratee IO -Version hat die gleiche Karte verkleinern Funktion wie im Buch, hat aber eine modifizierte ChunkedFileenum um es mit einem funktionieren zu lassen Aufzähler.

Also ist meine Frage; Was ist falsch mit der folgenden IT -IO -Basisimplementierung? Wo ist die Faulheit?.

import Control.Monad.IO.Class (liftIO)
import Control.Monad.Trans (MonadIO, liftIO)
import System.IO

import qualified Data.Enumerator.List as EL
import qualified Data.Enumerator.Text as ET
import Data.Enumerator hiding (map, filter, head, sequence)

import Data.Text(Text)
import Data.Text.Read
import Data.Maybe

import qualified Data.ByteString.Char8 as Str
import Control.Exception (bracket,finally)
import Control.Monad(forM,liftM)
import Control.Parallel.Strategies
import Control.Parallel
import Control.DeepSeq (NFData)
import Data.Int (Int64)

--Goal: in a file with n values, calculate the sum of all n*(n-1)/2 squared distances

--My operation for one value pair
distance :: Int -> Int -> Int
distance a b = (a-b)*(a-b)

combineDistances :: [Int] -> Int
combineDistances = sum

--Test file generation
createTestFile :: Int -> FilePath -> IO ()
createTestFile n path = writeFile path $ unlines $ map show $ take n $ infiniteList 0 1
        where infiniteList :: Int->Int-> [Int]
              infiniteList i j = (i + j) : infiniteList j (i+j)

--Applying my operation simply on a file 
--(Actually does NOT throw an Out of memory on a file generated by createTestFile 50000)
--But i want to use multiple cores..
sumOfDistancesOnSmallFile :: FilePath -> IO Int
sumOfDistancesOnSmallFile path = do
                  fileContents <- readFile path
                  return $ allDistances $ map read $ lines $ fileContents
                  where
                      allDistances (x:xs) = (allDistances xs) + ( sum $ map (distance x)    xs)
                      allDistances _ = 0

--Setting up an enumerator of read values from a text stream
readerEnumerator :: Monad m =>Integral a => Reader a -> Step a m b -> Iteratee Text m b
readerEnumerator reader = joinI . (EL.concatMapM transformer)
                            where transformer input = case reader input of
                                         Right (val, remainder) -> return [val]
                                         Left err -> return [0]

readEnumerator :: Monad m =>Integral a => Step a m b -> Iteratee Text m b
readEnumerator = readerEnumerator (signed decimal)

--The iteratee version of my operation
distancesFirstToTailIt :: Monad m=> Iteratee Int m Int
distancesFirstToTailIt = do
    maybeNum <- EL.head
    maybe (return 0) distancesOneToManyIt maybeNum

distancesOneToManyIt :: Monad m=> Int -> Iteratee Int m Int
distancesOneToManyIt base = do
    maybeNum <- EL.head
    maybe (return 0) combineNextDistance maybeNum
    where combineNextDistance nextNum = do
              rest <- distancesOneToManyIt base
              return $ combineDistances [(distance base nextNum),rest]

--The mapreduce algorithm
mapReduce :: Strategy b -- evaluation strategy for mapping
          -> (a -> b)   -- map function
          -> Strategy c -- evaluation strategy for reduction
          -> ([b] -> c) -- reduce function
          -> [a]        -- list to map over
          -> c
mapReduce mapStrat mapFunc reduceStrat reduceFunc input =
          mapResult `pseq` reduceResult
          where mapResult    = parMap mapStrat mapFunc input
                reduceResult = reduceFunc mapResult `using` reduceStrat

--Applying the iteratee operation using mapreduce
sumOfDistancesOnFileWithIt :: FilePath -> IO Int
sumOfDistancesOnFileWithIt path = chunkedFileEnum chunkByLinesTails (distancesUsingMapReduceIt) path

distancesUsingMapReduceIt :: [Enumerator Text IO Int] -> IO Int
distancesUsingMapReduceIt = mapReduce rpar (runEnumeratorAsMapFunc)
                                      rpar (sumValuesAsReduceFunc)
                            where runEnumeratorAsMapFunc :: Enumerator Text IO Int -> IO Int
                                  runEnumeratorAsMapFunc = (\source->run_ (source $$ readEnumerator $$ distancesFirstToTailIt))
                                  sumValuesAsReduceFunc :: [IO Int] -> IO Int
                                  sumValuesAsReduceFunc = liftM sum . sequence


--Working with (file)chunk enumerators:
data ChunkSpec = CS{
    chunkOffset :: !Int
    , chunkLength :: !Int
    } deriving (Eq,Show)

chunkedFileEnum ::   (NFData (a)) => MonadIO m =>
                (FilePath-> IO [ChunkSpec])
           ->   ([Enumerator Text m b]->IO a)
           ->   FilePath
           ->   IO a
chunkedFileEnum chunkCreator funcOnChunks path = do
    (chunks, handles)<- chunkedEnum chunkCreator path
    r <- funcOnChunks chunks
    (rdeepseq r `seq` (return r)) `finally` mapM_ hClose handles

chunkedEnum ::  MonadIO m=>
                (FilePath -> IO [ChunkSpec])
            ->  FilePath
            ->  IO ([Enumerator Text m b], [Handle])
chunkedEnum chunkCreator path = do
    chunks <- chunkCreator path
    liftM unzip . forM chunks $ \spec -> do
        h <- openFile path ReadMode
        hSeek h AbsoluteSeek (fromIntegral (chunkOffset spec))
        let chunk = ET.enumHandle h --Note:chunklength not taken into account, so just to EOF
        return (chunk,h)

-- returns set of chunks representing  tails . lines . readFile 
chunkByLinesTails :: FilePath -> IO[ChunkSpec]
chunkByLinesTails path = do
    bracket (openFile path ReadMode) hClose $ \h-> do
        totalSize <- fromIntegral `liftM` hFileSize h
        let chunkSize = 1
            findChunks offset = do
            let newOffset = offset + chunkSize
            hSeek h AbsoluteSeek (fromIntegral newOffset)
            let findNewline lineSeekOffset = do
                eof <- hIsEOF h
                if eof
                    then return [CS offset (totalSize - offset)]
                    else do
                        bytes <- Str.hGet h 256
                        case Str.elemIndex '\n' bytes of
                            Just n -> do
                                nextChunks <- findChunks (lineSeekOffset + n + 1)
                                return (CS offset (totalSize-offset):nextChunks)
                            Nothing -> findNewline (lineSeekOffset + Str.length bytes)
            findNewline newOffset
        findChunks 0

Übrigens, ich betreibe Haskellplatform 2011.2.0 unter Mac OS X 10.6.7 (Schneeleopard)
mit den folgenden Paketen:
Bytestring 0.9.1.10
Parallele 3.1.0.1
Enumerator 0.4.8, mit einem Handbuch hier

War es hilfreich?

Lösung

Wie der Fehler sagt, gibt es zu viele geöffnete Dateien. Ich erwartete, dass Haskell den größten Teil des Programms nacheinander ausführen würde, aber einige "Sparks" parallel. Wie SCLV jedoch erwähnt, löst Haskell immer die Bewertungen aus.

Dies ist normalerweise kein Problem in einem reinen funktionalen Programm, aber es ist beim Umgang mit IO (Ressourcen). Ich habe die Parallelität, wie im realen Haskell -Buch von Real World Haskell zu weit beschrieben, skaliert. Meine Schlussfolgerung ist also, die Parallelität nur in begrenztem Umfang zu tun, wenn es sich um IO -Ressourcen innerhalb der Funken handelt. Im reinen funktionellen Teil kann eine übermäßige Parallelität erfolgreich sein.

Die Antwort auf meinen Beitrag lautet daher, MapReduce für das gesamte Programm nicht zu verwenden, sondern innerhalb eines inneren reinen Funktionsteils.

Um anzuzeigen, wo das Programm tatsächlich fehlgeschlagen ist, habe ich es mit -durch -durchaus executable -profiliert -p konfiguriert, erstellt und mit +RTS -P -HC -L30 ausgeführt. Da die ausführbare Datei sofort fehlschlägt, gibt es kein Speicherzuweisungsprofil. Das resultierende Zeitallokationsprofil in der .prof -Datei beginnt mit Folgendem:

                                                                                               individual    inherited
COST CENTRE              MODULE                                               no.    entries  %time %alloc   %time %alloc

MAIN                     MAIN                                                   1            0   0.0    0.3   100.0  100.0
  main                    Main                                                1648           2   0.0    0.0    50.0   98.9
    sumOfDistancesOnFileWithIt MapReduceTest                                  1649           1   0.0    0.0    50.0   98.9
      chunkedFileEnum       MapReduceTest                                     1650           1   0.0    0.0    50.0   98.9
        chunkedEnum          MapReduceTest                                    1651         495   0.0   24.2    50.0   98.9
          lineOffsets         MapReduceTest                                   1652           1  50.0   74.6    50.0   74.6

ChunkDenum gibt IO ([Enumerator Text MB], [Handle]) zurück und erhält anscheinend 495 Einträge. Die Eingabedatei war eine 2K -Zeilendatei, sodass der einzelnen Eintrag in LineOffsets eine Liste von 2000 Offsets zurückgegeben hat. Es gibt keinen einzigen Eintrag in Entfernungsmapeduceit, sodass die tatsächliche Arbeit nicht einmal begonnen hat!

Lizenziert unter: CC-BY-SA mit Zuschreibung
Nicht verbunden mit StackOverflow
scroll top