Perché la mia implementazione MapReduce (Real World Haskell) usando Iotee IO fallisce anche con "troppi file aperti"

StackOverflow https://stackoverflow.com/questions/5856435

  •  27-10-2019
  •  | 
  •  

Domanda

Sto implementando un programma Haskell che confronta ogni riga di un file tra l'altro nel file. Che può essere implementato singolo filetto come segue

distance :: Int -> Int -> Int
distance a b = (a-b)*(a-b)

sumOfDistancesOnSmallFile :: FilePath -> IO Int
sumOfDistancesOnSmallFile path = do
              fileContents <- readFile path
              return $ allDistances $ map read $ lines $ fileContents
              where
                  allDistances (x:xs) = (allDistances xs) + ( sum $ map (distance x) xs)
                  allDistances _ = 0

Questo funzionerà nel tempo O (n^2) e dovrà mantenere l'elenco completo dei numeri interi in memoria per tutto il tempo. Nel mio programma effettivo la riga contiene più numeri, di cui costruisco un tipo di dati leggermente complesso rispetto a int. Questo mi ha dato errori di memoria sui dati che devo elaborare.

Quindi ci sono due miglioramenti da apportare alla soluzione singola sopra menzionata. Innanzitutto, accelerare il tempo di esecuzione effettivo. In secondo luogo, trova un modo per non mantenere l'intero elenco in memoria a tempo pieno. So che ciò richiede l'analisi del file completo N volte. Quindi ci saranno confronti O (n^2) e o (n^2) linee analizzate. Questo va bene per me perché preferirei avere un programma lento di successo piuttosto che un programma fallito. Quando il file di input è abbastanza piccolo, posso sempre risiedere in una versione più semplice.

Per utilizzare più core della CPU ho eliminato l'implementazione di MapReduce dal mondo reale Haskell (capitolo 24, disponibile qui).

Ho modificato la funzione di Chunking dal libro a, invece di dividere il file completo in blocchi, restituire tanti pezzi quanto le linee con ogni pezzo che rappresentano un elemento di

tails . lines . readFile

Perché voglio che anche il programma sia scalabile nelle dimensioni dei file, inizialmente ho usato pigro io. Questo tuttavia fallisce con "troppi file aperti", di cui ho chiesto in un domanda precedente (Le maniglie dei file sono state eliminate troppo tardi dal GC). La versione IO pigra completa è pubblicata lì.

Come spiega la risposta accettata, Io rigoroso potrebbe risolvere il problema. Ciò risolve davvero il problema "troppi file aperti" per i file di riga 2K, ma non riesce con "fuori memoria" su un file da 50K.

Nota che il primo singolo filetto L'implementazione (senza mapReduce) è in grado di gestire un file da 50k.

La soluzione alternativa, che mi piace anche di più, è usare iteratoe io. Mi aspettavo che questo risolvesse sia il manico del file, sia l'esaurimento delle risorse di memoria. La mia implementazione tuttavia fallisce ancora con un errore "troppi file aperti" su un file di riga 2K.

La versione io Io ha lo stesso Riduci mappa funzionare come nel libro, ma ha una modifica filodfileenum per lasciarlo funzionare con un Enumeratore.

Quindi la mia domanda è; Cosa c'è di sbagliato nella seguente implementazione della base Io Io? Dov'è la pigrizia?.

import Control.Monad.IO.Class (liftIO)
import Control.Monad.Trans (MonadIO, liftIO)
import System.IO

import qualified Data.Enumerator.List as EL
import qualified Data.Enumerator.Text as ET
import Data.Enumerator hiding (map, filter, head, sequence)

import Data.Text(Text)
import Data.Text.Read
import Data.Maybe

import qualified Data.ByteString.Char8 as Str
import Control.Exception (bracket,finally)
import Control.Monad(forM,liftM)
import Control.Parallel.Strategies
import Control.Parallel
import Control.DeepSeq (NFData)
import Data.Int (Int64)

--Goal: in a file with n values, calculate the sum of all n*(n-1)/2 squared distances

--My operation for one value pair
distance :: Int -> Int -> Int
distance a b = (a-b)*(a-b)

combineDistances :: [Int] -> Int
combineDistances = sum

--Test file generation
createTestFile :: Int -> FilePath -> IO ()
createTestFile n path = writeFile path $ unlines $ map show $ take n $ infiniteList 0 1
        where infiniteList :: Int->Int-> [Int]
              infiniteList i j = (i + j) : infiniteList j (i+j)

--Applying my operation simply on a file 
--(Actually does NOT throw an Out of memory on a file generated by createTestFile 50000)
--But i want to use multiple cores..
sumOfDistancesOnSmallFile :: FilePath -> IO Int
sumOfDistancesOnSmallFile path = do
                  fileContents <- readFile path
                  return $ allDistances $ map read $ lines $ fileContents
                  where
                      allDistances (x:xs) = (allDistances xs) + ( sum $ map (distance x)    xs)
                      allDistances _ = 0

--Setting up an enumerator of read values from a text stream
readerEnumerator :: Monad m =>Integral a => Reader a -> Step a m b -> Iteratee Text m b
readerEnumerator reader = joinI . (EL.concatMapM transformer)
                            where transformer input = case reader input of
                                         Right (val, remainder) -> return [val]
                                         Left err -> return [0]

readEnumerator :: Monad m =>Integral a => Step a m b -> Iteratee Text m b
readEnumerator = readerEnumerator (signed decimal)

--The iteratee version of my operation
distancesFirstToTailIt :: Monad m=> Iteratee Int m Int
distancesFirstToTailIt = do
    maybeNum <- EL.head
    maybe (return 0) distancesOneToManyIt maybeNum

distancesOneToManyIt :: Monad m=> Int -> Iteratee Int m Int
distancesOneToManyIt base = do
    maybeNum <- EL.head
    maybe (return 0) combineNextDistance maybeNum
    where combineNextDistance nextNum = do
              rest <- distancesOneToManyIt base
              return $ combineDistances [(distance base nextNum),rest]

--The mapreduce algorithm
mapReduce :: Strategy b -- evaluation strategy for mapping
          -> (a -> b)   -- map function
          -> Strategy c -- evaluation strategy for reduction
          -> ([b] -> c) -- reduce function
          -> [a]        -- list to map over
          -> c
mapReduce mapStrat mapFunc reduceStrat reduceFunc input =
          mapResult `pseq` reduceResult
          where mapResult    = parMap mapStrat mapFunc input
                reduceResult = reduceFunc mapResult `using` reduceStrat

--Applying the iteratee operation using mapreduce
sumOfDistancesOnFileWithIt :: FilePath -> IO Int
sumOfDistancesOnFileWithIt path = chunkedFileEnum chunkByLinesTails (distancesUsingMapReduceIt) path

distancesUsingMapReduceIt :: [Enumerator Text IO Int] -> IO Int
distancesUsingMapReduceIt = mapReduce rpar (runEnumeratorAsMapFunc)
                                      rpar (sumValuesAsReduceFunc)
                            where runEnumeratorAsMapFunc :: Enumerator Text IO Int -> IO Int
                                  runEnumeratorAsMapFunc = (\source->run_ (source $$ readEnumerator $$ distancesFirstToTailIt))
                                  sumValuesAsReduceFunc :: [IO Int] -> IO Int
                                  sumValuesAsReduceFunc = liftM sum . sequence


--Working with (file)chunk enumerators:
data ChunkSpec = CS{
    chunkOffset :: !Int
    , chunkLength :: !Int
    } deriving (Eq,Show)

chunkedFileEnum ::   (NFData (a)) => MonadIO m =>
                (FilePath-> IO [ChunkSpec])
           ->   ([Enumerator Text m b]->IO a)
           ->   FilePath
           ->   IO a
chunkedFileEnum chunkCreator funcOnChunks path = do
    (chunks, handles)<- chunkedEnum chunkCreator path
    r <- funcOnChunks chunks
    (rdeepseq r `seq` (return r)) `finally` mapM_ hClose handles

chunkedEnum ::  MonadIO m=>
                (FilePath -> IO [ChunkSpec])
            ->  FilePath
            ->  IO ([Enumerator Text m b], [Handle])
chunkedEnum chunkCreator path = do
    chunks <- chunkCreator path
    liftM unzip . forM chunks $ \spec -> do
        h <- openFile path ReadMode
        hSeek h AbsoluteSeek (fromIntegral (chunkOffset spec))
        let chunk = ET.enumHandle h --Note:chunklength not taken into account, so just to EOF
        return (chunk,h)

-- returns set of chunks representing  tails . lines . readFile 
chunkByLinesTails :: FilePath -> IO[ChunkSpec]
chunkByLinesTails path = do
    bracket (openFile path ReadMode) hClose $ \h-> do
        totalSize <- fromIntegral `liftM` hFileSize h
        let chunkSize = 1
            findChunks offset = do
            let newOffset = offset + chunkSize
            hSeek h AbsoluteSeek (fromIntegral newOffset)
            let findNewline lineSeekOffset = do
                eof <- hIsEOF h
                if eof
                    then return [CS offset (totalSize - offset)]
                    else do
                        bytes <- Str.hGet h 256
                        case Str.elemIndex '\n' bytes of
                            Just n -> do
                                nextChunks <- findChunks (lineSeekOffset + n + 1)
                                return (CS offset (totalSize-offset):nextChunks)
                            Nothing -> findNewline (lineSeekOffset + Str.length bytes)
            findNewline newOffset
        findChunks 0

A proposito, sto eseguendo HaskellPlatform 2011.2.0 su Mac OS X 10.6.7 (Snow Leopard)
Con i seguenti pacchetti:
Bytestring 0.9.1.10
parallelo 3.1.0.1
enumerator 0.4.8, con un manuale qui

È stato utile?

Soluzione

Come dice l'errore, ci sono troppi file aperti. Mi aspettavo che Haskell eseguisse la maggior parte del programma in sequenza, ma alcune "scintille" parallele. Tuttavia, come detto SCLV, Haskell suscita sempre le valutazioni.

Questo di solito non è un problema in un programma funzionale puro, ma è quando si tratta di IO (risorse). Ho ridimensionato il parallelismo come descritto nel World Real World Haskell Book troppo lontano. Quindi la mia conclusione è quella di fare parallelismo solo su scala limitata quando si tratta di risorse IO all'interno delle scintille. Nella parte funzionale pura, il parallelismo eccessivo può avere successo.

Pertanto, la risposta al mio post è non usare MapReduce sull'intero programma, ma all'interno di una parte funzionale pura interiore.

Per mostrare dove il programma effettivamente non è riuscito, l'ho configurato con -Enable -eseguibile -profilabile -p, costruirlo e eseguirlo usando +rts -p -hc -l30. Poiché l'eseguibile non riesce immediatamente, non esiste un profilo di allocazione della memoria. Il profilo di allocazione del tempo risultante nel file .Prof inizia con quanto segue:

                                                                                               individual    inherited
COST CENTRE              MODULE                                               no.    entries  %time %alloc   %time %alloc

MAIN                     MAIN                                                   1            0   0.0    0.3   100.0  100.0
  main                    Main                                                1648           2   0.0    0.0    50.0   98.9
    sumOfDistancesOnFileWithIt MapReduceTest                                  1649           1   0.0    0.0    50.0   98.9
      chunkedFileEnum       MapReduceTest                                     1650           1   0.0    0.0    50.0   98.9
        chunkedEnum          MapReduceTest                                    1651         495   0.0   24.2    50.0   98.9
          lineOffsets         MapReduceTest                                   1652           1  50.0   74.6    50.0   74.6

Chunkedenum restituisce IO ([Testo enumeratore MB], [handle]) e apparentemente riceve 495 voci. Il file di input era un file di riga 2K, quindi la singola voce su LineOffset ha restituito un elenco di 2000 offset. Non esiste una singola voce in Distancesusing MapReduceit, quindi il lavoro reale non è nemmeno iniziato!

Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top