Pourquoi ma mise en œuvre MapReduce (réel haskell mondial) en utilisant iteratee IO échoue aussi avec « Trop de fichiers ouverts »

StackOverflow https://stackoverflow.com/questions/5856435

  •  27-10-2019
  •  | 
  •  

Question

Je suis œuvre un programme de haskell Wich compare chaque ligne d'un fichier avec l'autre ligne dans le fichier. Ce qui peut être mis en œuvre un seul thread comme suit

distance :: Int -> Int -> Int
distance a b = (a-b)*(a-b)

sumOfDistancesOnSmallFile :: FilePath -> IO Int
sumOfDistancesOnSmallFile path = do
              fileContents <- readFile path
              return $ allDistances $ map read $ lines $ fileContents
              where
                  allDistances (x:xs) = (allDistances xs) + ( sum $ map (distance x) xs)
                  allDistances _ = 0

se déroulera en O temps (n ^ 2), et doit conserver la liste complète des entiers en mémoire tout le temps. Dans mon programme actuel de la ligne contient plus de chiffres, dont je construis un type légèrement complexer que Int. Cela m'a donné des erreurs de mémoire sur les données que j'ai à traiter.

Donc, il y a deux améliorations à apporter à la solution unique filetée mentionnée ci-dessus. Tout d'abord, accélérer le temps de fonctionnement réel. En second lieu, trouver un moyen de garder pas toute la liste en mémoire le temps plein. Je sais que cela nécessite l'analyse du dossier complet n fois. Ainsi, il y aura des comparaisons O (n ^ 2), et les lignes O (n ^ 2) analysable. Ceci est bien pour moi que je préfère avoir un programme réussi lent qu'un programme défaillant. Lorsque le fichier d'entrée est assez petit, je peux toujours Reside, pour une version plus simple.

Pour utiliser plusieurs cœurs cpu je pris la mise en œuvre MapReduce sur Real World Haskell (chapitre 24, disponible ici ).

Je l'ai modifié la fonction Chunking du livre, au lieu de diviser le dossier complet en morceaux, retour autant de morceaux que les lignes avec chaque morceau représentant un élément de

tails . lines . readFile

Parce que je veux que le programme soit également évolutif dans le fichier de taille, j'ai d'abord utilisé paresseux IO . Cela ne cependant avec « Trop de fichiers ouverts », dont je posais dans un précédente question (les descripteurs de fichiers ont été éliminés trop tard par le GC). La version complète IO paresseux y est affiché.

Comme la réponse acceptée explique, stricte IO pourrait résoudre le problème. Cela résout bien le problème « Trop de fichiers ouverts » pour 2k fichiers en ligne, mais échoue avec « de mémoire » sur un fichier 50k.

Notez que la première un seul thread mise en œuvre (sans MapReduce) est capable de gérer un fichier 50k.

La solution alternative, qui fait appel le plus pour moi, est également d'utiliser iteratee IO . Je m'y attendais à résoudre à la fois la poignée de fichiers, et l'épuisement des ressources mémoire. Ma mise en œuvre ne cependant toujours avec une erreur « Trop de fichiers ouverts » sur un fichier ligne 2k.

La iteratee la version IO a le même MapReduce fonction comme dans le livre, mais a une version modifiée chunkedFileEnum pour le laisser travailler avec recenseur .

Ainsi, ma question est; ce qui ne va pas avec le iteratee suivant IO mise en œuvre de base? Où se trouve la Paresse?.

import Control.Monad.IO.Class (liftIO)
import Control.Monad.Trans (MonadIO, liftIO)
import System.IO

import qualified Data.Enumerator.List as EL
import qualified Data.Enumerator.Text as ET
import Data.Enumerator hiding (map, filter, head, sequence)

import Data.Text(Text)
import Data.Text.Read
import Data.Maybe

import qualified Data.ByteString.Char8 as Str
import Control.Exception (bracket,finally)
import Control.Monad(forM,liftM)
import Control.Parallel.Strategies
import Control.Parallel
import Control.DeepSeq (NFData)
import Data.Int (Int64)

--Goal: in a file with n values, calculate the sum of all n*(n-1)/2 squared distances

--My operation for one value pair
distance :: Int -> Int -> Int
distance a b = (a-b)*(a-b)

combineDistances :: [Int] -> Int
combineDistances = sum

--Test file generation
createTestFile :: Int -> FilePath -> IO ()
createTestFile n path = writeFile path $ unlines $ map show $ take n $ infiniteList 0 1
        where infiniteList :: Int->Int-> [Int]
              infiniteList i j = (i + j) : infiniteList j (i+j)

--Applying my operation simply on a file 
--(Actually does NOT throw an Out of memory on a file generated by createTestFile 50000)
--But i want to use multiple cores..
sumOfDistancesOnSmallFile :: FilePath -> IO Int
sumOfDistancesOnSmallFile path = do
                  fileContents <- readFile path
                  return $ allDistances $ map read $ lines $ fileContents
                  where
                      allDistances (x:xs) = (allDistances xs) + ( sum $ map (distance x)    xs)
                      allDistances _ = 0

--Setting up an enumerator of read values from a text stream
readerEnumerator :: Monad m =>Integral a => Reader a -> Step a m b -> Iteratee Text m b
readerEnumerator reader = joinI . (EL.concatMapM transformer)
                            where transformer input = case reader input of
                                         Right (val, remainder) -> return [val]
                                         Left err -> return [0]

readEnumerator :: Monad m =>Integral a => Step a m b -> Iteratee Text m b
readEnumerator = readerEnumerator (signed decimal)

--The iteratee version of my operation
distancesFirstToTailIt :: Monad m=> Iteratee Int m Int
distancesFirstToTailIt = do
    maybeNum <- EL.head
    maybe (return 0) distancesOneToManyIt maybeNum

distancesOneToManyIt :: Monad m=> Int -> Iteratee Int m Int
distancesOneToManyIt base = do
    maybeNum <- EL.head
    maybe (return 0) combineNextDistance maybeNum
    where combineNextDistance nextNum = do
              rest <- distancesOneToManyIt base
              return $ combineDistances [(distance base nextNum),rest]

--The mapreduce algorithm
mapReduce :: Strategy b -- evaluation strategy for mapping
          -> (a -> b)   -- map function
          -> Strategy c -- evaluation strategy for reduction
          -> ([b] -> c) -- reduce function
          -> [a]        -- list to map over
          -> c
mapReduce mapStrat mapFunc reduceStrat reduceFunc input =
          mapResult `pseq` reduceResult
          where mapResult    = parMap mapStrat mapFunc input
                reduceResult = reduceFunc mapResult `using` reduceStrat

--Applying the iteratee operation using mapreduce
sumOfDistancesOnFileWithIt :: FilePath -> IO Int
sumOfDistancesOnFileWithIt path = chunkedFileEnum chunkByLinesTails (distancesUsingMapReduceIt) path

distancesUsingMapReduceIt :: [Enumerator Text IO Int] -> IO Int
distancesUsingMapReduceIt = mapReduce rpar (runEnumeratorAsMapFunc)
                                      rpar (sumValuesAsReduceFunc)
                            where runEnumeratorAsMapFunc :: Enumerator Text IO Int -> IO Int
                                  runEnumeratorAsMapFunc = (\source->run_ (source $$ readEnumerator $$ distancesFirstToTailIt))
                                  sumValuesAsReduceFunc :: [IO Int] -> IO Int
                                  sumValuesAsReduceFunc = liftM sum . sequence


--Working with (file)chunk enumerators:
data ChunkSpec = CS{
    chunkOffset :: !Int
    , chunkLength :: !Int
    } deriving (Eq,Show)

chunkedFileEnum ::   (NFData (a)) => MonadIO m =>
                (FilePath-> IO [ChunkSpec])
           ->   ([Enumerator Text m b]->IO a)
           ->   FilePath
           ->   IO a
chunkedFileEnum chunkCreator funcOnChunks path = do
    (chunks, handles)<- chunkedEnum chunkCreator path
    r <- funcOnChunks chunks
    (rdeepseq r `seq` (return r)) `finally` mapM_ hClose handles

chunkedEnum ::  MonadIO m=>
                (FilePath -> IO [ChunkSpec])
            ->  FilePath
            ->  IO ([Enumerator Text m b], [Handle])
chunkedEnum chunkCreator path = do
    chunks <- chunkCreator path
    liftM unzip . forM chunks $ \spec -> do
        h <- openFile path ReadMode
        hSeek h AbsoluteSeek (fromIntegral (chunkOffset spec))
        let chunk = ET.enumHandle h --Note:chunklength not taken into account, so just to EOF
        return (chunk,h)

-- returns set of chunks representing  tails . lines . readFile 
chunkByLinesTails :: FilePath -> IO[ChunkSpec]
chunkByLinesTails path = do
    bracket (openFile path ReadMode) hClose $ \h-> do
        totalSize <- fromIntegral `liftM` hFileSize h
        let chunkSize = 1
            findChunks offset = do
            let newOffset = offset + chunkSize
            hSeek h AbsoluteSeek (fromIntegral newOffset)
            let findNewline lineSeekOffset = do
                eof <- hIsEOF h
                if eof
                    then return [CS offset (totalSize - offset)]
                    else do
                        bytes <- Str.hGet h 256
                        case Str.elemIndex '\n' bytes of
                            Just n -> do
                                nextChunks <- findChunks (lineSeekOffset + n + 1)
                                return (CS offset (totalSize-offset):nextChunks)
                            Nothing -> findNewline (lineSeekOffset + Str.length bytes)
            findNewline newOffset
        findChunks 0

BTW, je suis en HaskellPlatform 2011.2.0 sous Mac OS X 10.6.7 (de léopard des neiges)
avec les packages suivants:
bytestring 0.9.1.10
parallèle 3.1.0.1
recenseur 0.4.8, avec un manuel

Était-ce utile?

La solution

Comme l'erreur dit, il y a trop de fichiers ouverts. Je me attendais Haskell à exécuter la plupart du programme de façon séquentielle, mais le parallèle des «étincelles de. Cependant, comme mentionné SCLV, Haskell déclenche toujours les évaluations.

Ceci est généralement pas un problème dans un programme fonctionnel pur, mais il est en matière de IO (ressources). Je mis à l'échelle du parallélisme tel que décrit dans le livre Real World Haskell trop loin. Donc, ma conclusion est de faire le parallélisme que sur une échelle limitée lorsque les ressources IO au sein des étincelles. Dans la partie fonctionnelle pur, le parallélisme excessif peut réussir.

Ainsi, la réponse à mon message est de ne pas utiliser MapReduce sur l'ensemble du programme, mais à l'intérieur d'une partie fonctionnelle intérieure pure.

Pour montrer où le programme effectivement échoué, je configuré avec --enable-exécutable-profilage -p, construire, et il a couru à l'aide + RTS -p -HC -L30. Parce que l'exécutable échoue immédiatement, il n'y a pas de profil d'allocation de mémoire. Le profil d'attribution de temps entraînant le début de fichier .prof avec ce qui suit:

                                                                                               individual    inherited
COST CENTRE              MODULE                                               no.    entries  %time %alloc   %time %alloc

MAIN                     MAIN                                                   1            0   0.0    0.3   100.0  100.0
  main                    Main                                                1648           2   0.0    0.0    50.0   98.9
    sumOfDistancesOnFileWithIt MapReduceTest                                  1649           1   0.0    0.0    50.0   98.9
      chunkedFileEnum       MapReduceTest                                     1650           1   0.0    0.0    50.0   98.9
        chunkedEnum          MapReduceTest                                    1651         495   0.0   24.2    50.0   98.9
          lineOffsets         MapReduceTest                                   1652           1  50.0   74.6    50.0   74.6

renvoie chunkedEnum IO ([recenseur Text m b], [poignée]), et reçoit apparemment 495 entrées. Le fichier d'entrée est un fichier de ligne 2k, de sorte que l'entrée unique sur lineOffsets retourné une liste de 2000 compensations. Il n'y a pas une seule entrée dans distancesUsingMapReduceIt, de sorte que le travail réel n'a même pas commencer!

Licencié sous: CC-BY-SA avec attribution
Non affilié à StackOverflow
scroll top