¿Por qué mi implementación de MapReduce (Real World Haskell) usando ITeratee IO también falla con "demasiados archivos abiertos"?

StackOverflow https://stackoverflow.com/questions/5856435

  •  27-10-2019
  •  | 
  •  

Pregunta

Estoy implementando un programa Haskell que compara cada línea de un archivo entre sí en el archivo. Que se puede implementar solo enhebrado como sigue

distance :: Int -> Int -> Int
distance a b = (a-b)*(a-b)

sumOfDistancesOnSmallFile :: FilePath -> IO Int
sumOfDistancesOnSmallFile path = do
              fileContents <- readFile path
              return $ allDistances $ map read $ lines $ fileContents
              where
                  allDistances (x:xs) = (allDistances xs) + ( sum $ map (distance x) xs)
                  allDistances _ = 0

Esto se ejecutará en el tiempo O (n^2) y tiene que mantener la lista completa de enteros en la memoria todo el tiempo. En mi programa real, la línea contiene más números, de los cuales construyo un tipo de datos ligeramente más complejo que int. Esto me dio errores de memoria sobre los datos que tengo que procesar.

Por lo tanto, hay dos mejoras para hacer a la solución de rosca única mencionada anteriormente. Primero, acelera el tiempo de ejecución real. En segundo lugar, encuentre una manera de no mantener toda la lista en la memoria a tiempo completo. Sé que esto requiere analizar el archivo completo n veces. Por lo tanto, habrá comparaciones o (n^2), y o (n^2) líneas analizadas. Esto está bien para mí, ya que preferiría tener un programa lento exitoso que un programa fallido. Cuando el archivo de entrada es lo suficientemente pequeño, siempre puedo residir a una versión más simple.

Para usar múltiples núcleos de CPU, sacé la implementación de MapReduce del mundo real Haskell (capítulo 24, disponible aquí).

Modifiqué la función de fragmentación del libro a, en lugar de dividir el archivo completo en trozos, devolver tantos fragmentos como líneas con cada fragmento que representa un elemento de

tails . lines . readFile

Debido a que quiero que el programa también sea escalable en tamaño de archivo, inicialmente utilicé perezoso io. Sin embargo, esto falla con "demasiados archivos abiertos", sobre los cuales pregunté en un Pregunta anterior (Las manijas del archivo fueron dispuestas demasiado tarde por el GC). La versión Full Lazy IO se publica allí.

Como explica la respuesta aceptada, io estricto podría resolver el problema. Eso de hecho resuelve el problema de "demasiados archivos abiertos" para archivos de línea 2K, pero falla con "fuera de memoria" en un archivo de 50k.

Tenga en cuenta que el primero solo enhebrado La implementación (sin MapReduce) es capaz de manejar un archivo de 50k.

La solución alternativa, que también me atrae más, es usar iteradoe io. Esperaba que esto resolviera tanto el mango del archivo como el agotamiento de los recursos de memoria. Sin embargo, mi implementación aún falla con un error de "demasiados archivos abiertos" en un archivo de línea 2K.

La versión iteradoe io tiene lo mismo Mapa reducido funcionar como en el libro, pero tiene un modificado fortuito para que funcione con un Enumerador.

Así mi pregunta es; ¿Qué hay de malo en la siguiente implementación de la base ITeratee IO? ¿Dónde está la pereza?

import Control.Monad.IO.Class (liftIO)
import Control.Monad.Trans (MonadIO, liftIO)
import System.IO

import qualified Data.Enumerator.List as EL
import qualified Data.Enumerator.Text as ET
import Data.Enumerator hiding (map, filter, head, sequence)

import Data.Text(Text)
import Data.Text.Read
import Data.Maybe

import qualified Data.ByteString.Char8 as Str
import Control.Exception (bracket,finally)
import Control.Monad(forM,liftM)
import Control.Parallel.Strategies
import Control.Parallel
import Control.DeepSeq (NFData)
import Data.Int (Int64)

--Goal: in a file with n values, calculate the sum of all n*(n-1)/2 squared distances

--My operation for one value pair
distance :: Int -> Int -> Int
distance a b = (a-b)*(a-b)

combineDistances :: [Int] -> Int
combineDistances = sum

--Test file generation
createTestFile :: Int -> FilePath -> IO ()
createTestFile n path = writeFile path $ unlines $ map show $ take n $ infiniteList 0 1
        where infiniteList :: Int->Int-> [Int]
              infiniteList i j = (i + j) : infiniteList j (i+j)

--Applying my operation simply on a file 
--(Actually does NOT throw an Out of memory on a file generated by createTestFile 50000)
--But i want to use multiple cores..
sumOfDistancesOnSmallFile :: FilePath -> IO Int
sumOfDistancesOnSmallFile path = do
                  fileContents <- readFile path
                  return $ allDistances $ map read $ lines $ fileContents
                  where
                      allDistances (x:xs) = (allDistances xs) + ( sum $ map (distance x)    xs)
                      allDistances _ = 0

--Setting up an enumerator of read values from a text stream
readerEnumerator :: Monad m =>Integral a => Reader a -> Step a m b -> Iteratee Text m b
readerEnumerator reader = joinI . (EL.concatMapM transformer)
                            where transformer input = case reader input of
                                         Right (val, remainder) -> return [val]
                                         Left err -> return [0]

readEnumerator :: Monad m =>Integral a => Step a m b -> Iteratee Text m b
readEnumerator = readerEnumerator (signed decimal)

--The iteratee version of my operation
distancesFirstToTailIt :: Monad m=> Iteratee Int m Int
distancesFirstToTailIt = do
    maybeNum <- EL.head
    maybe (return 0) distancesOneToManyIt maybeNum

distancesOneToManyIt :: Monad m=> Int -> Iteratee Int m Int
distancesOneToManyIt base = do
    maybeNum <- EL.head
    maybe (return 0) combineNextDistance maybeNum
    where combineNextDistance nextNum = do
              rest <- distancesOneToManyIt base
              return $ combineDistances [(distance base nextNum),rest]

--The mapreduce algorithm
mapReduce :: Strategy b -- evaluation strategy for mapping
          -> (a -> b)   -- map function
          -> Strategy c -- evaluation strategy for reduction
          -> ([b] -> c) -- reduce function
          -> [a]        -- list to map over
          -> c
mapReduce mapStrat mapFunc reduceStrat reduceFunc input =
          mapResult `pseq` reduceResult
          where mapResult    = parMap mapStrat mapFunc input
                reduceResult = reduceFunc mapResult `using` reduceStrat

--Applying the iteratee operation using mapreduce
sumOfDistancesOnFileWithIt :: FilePath -> IO Int
sumOfDistancesOnFileWithIt path = chunkedFileEnum chunkByLinesTails (distancesUsingMapReduceIt) path

distancesUsingMapReduceIt :: [Enumerator Text IO Int] -> IO Int
distancesUsingMapReduceIt = mapReduce rpar (runEnumeratorAsMapFunc)
                                      rpar (sumValuesAsReduceFunc)
                            where runEnumeratorAsMapFunc :: Enumerator Text IO Int -> IO Int
                                  runEnumeratorAsMapFunc = (\source->run_ (source $$ readEnumerator $$ distancesFirstToTailIt))
                                  sumValuesAsReduceFunc :: [IO Int] -> IO Int
                                  sumValuesAsReduceFunc = liftM sum . sequence


--Working with (file)chunk enumerators:
data ChunkSpec = CS{
    chunkOffset :: !Int
    , chunkLength :: !Int
    } deriving (Eq,Show)

chunkedFileEnum ::   (NFData (a)) => MonadIO m =>
                (FilePath-> IO [ChunkSpec])
           ->   ([Enumerator Text m b]->IO a)
           ->   FilePath
           ->   IO a
chunkedFileEnum chunkCreator funcOnChunks path = do
    (chunks, handles)<- chunkedEnum chunkCreator path
    r <- funcOnChunks chunks
    (rdeepseq r `seq` (return r)) `finally` mapM_ hClose handles

chunkedEnum ::  MonadIO m=>
                (FilePath -> IO [ChunkSpec])
            ->  FilePath
            ->  IO ([Enumerator Text m b], [Handle])
chunkedEnum chunkCreator path = do
    chunks <- chunkCreator path
    liftM unzip . forM chunks $ \spec -> do
        h <- openFile path ReadMode
        hSeek h AbsoluteSeek (fromIntegral (chunkOffset spec))
        let chunk = ET.enumHandle h --Note:chunklength not taken into account, so just to EOF
        return (chunk,h)

-- returns set of chunks representing  tails . lines . readFile 
chunkByLinesTails :: FilePath -> IO[ChunkSpec]
chunkByLinesTails path = do
    bracket (openFile path ReadMode) hClose $ \h-> do
        totalSize <- fromIntegral `liftM` hFileSize h
        let chunkSize = 1
            findChunks offset = do
            let newOffset = offset + chunkSize
            hSeek h AbsoluteSeek (fromIntegral newOffset)
            let findNewline lineSeekOffset = do
                eof <- hIsEOF h
                if eof
                    then return [CS offset (totalSize - offset)]
                    else do
                        bytes <- Str.hGet h 256
                        case Str.elemIndex '\n' bytes of
                            Just n -> do
                                nextChunks <- findChunks (lineSeekOffset + n + 1)
                                return (CS offset (totalSize-offset):nextChunks)
                            Nothing -> findNewline (lineSeekOffset + Str.length bytes)
            findNewline newOffset
        findChunks 0

Por cierto, estoy ejecutando Haskellplatform 2011.2.0 en Mac OS X 10.6.7 (Leopardo de Snow)
Con los siguientes paquetes:
Bytestring 0.9.1.10
Paralelo 3.1.0.1
Enumerator 0.4.8, con un manual aquí

¿Fue útil?

Solución

Como dice el error, hay demasiados archivos abiertos. Esperaba que Haskell ejecutara la mayor parte del programa secuencialmente, pero algunas 'chispas' paralelas. Sin embargo, como SCLV mencionó, Haskell siempre provoca las evaluaciones.

Esto generalmente no es un problema en un programa funcional puro, pero es cuando se trata de IO (recursos). Escalé el paralelismo como se describe en el libro de Haskell del mundo real demasiado lejos. Entonces, mi conclusión es hacer paralelismo solo en una escala limitada cuando se trata de recursos de IO dentro de las chispas. En la parte funcional pura, el paralelismo excesivo puede tener éxito.

Por lo tanto, la respuesta a mi publicación es no usar MapReduce en todo el programa, sino dentro de una parte funcional pura interna.

Para mostrar dónde falló el programa, lo configuré con --enable -Ejecutable -Profiling -P, construirlo y ejecutarlo usando +RTS -P -HC -L30. Debido a que el ejecutable falla de inmediato, no hay perfil de asignación de memoria. El perfil de asignación de tiempo resultante en el archivo .prof comienza con lo siguiente:

                                                                                               individual    inherited
COST CENTRE              MODULE                                               no.    entries  %time %alloc   %time %alloc

MAIN                     MAIN                                                   1            0   0.0    0.3   100.0  100.0
  main                    Main                                                1648           2   0.0    0.0    50.0   98.9
    sumOfDistancesOnFileWithIt MapReduceTest                                  1649           1   0.0    0.0    50.0   98.9
      chunkedFileEnum       MapReduceTest                                     1650           1   0.0    0.0    50.0   98.9
        chunkedEnum          MapReduceTest                                    1651         495   0.0   24.2    50.0   98.9
          lineOffsets         MapReduceTest                                   1652           1  50.0   74.6    50.0   74.6

Chunkedenum Devuelve IO ([Texto del enumerador MB], [manejar]) y aparentemente recibe 495 entradas. El archivo de entrada era un archivo de línea de 2K, por lo que la entrada única en LineOffsets devolvió una lista de 2000 compensaciones. No hay una sola entrada en distancias para colocar la mapasa, ¡por lo que el trabajo real ni siquiera comenzó!

Licenciado bajo: CC-BY-SA con atribución
No afiliado a StackOverflow
scroll top