なぜ私のMapReduceの実装(Real World Haskell)がiteratee IOを使用しているのは、「オープンファイルが多すぎる」でも失敗するのか

StackOverflow https://stackoverflow.com/questions/5856435

  •  27-10-2019
  •  | 
  •  

質問

ファイルの各行をファイル内の互いの行と比較すると、Haskellプログラムを実装しています。実装できます シングルスレッド 次のように

distance :: Int -> Int -> Int
distance a b = (a-b)*(a-b)

sumOfDistancesOnSmallFile :: FilePath -> IO Int
sumOfDistancesOnSmallFile path = do
              fileContents <- readFile path
              return $ allDistances $ map read $ lines $ fileContents
              where
                  allDistances (x:xs) = (allDistances xs) + ( sum $ map (distance x) xs)
                  allDistances _ = 0

これはo(n^2)時間で実行され、整数の完全なリストをメモリにずっと保持する必要があります。私の実際のプログラムでは、ラインにはより多くの数字が含まれており、そのうちINTよりもわずかに複雑なデータ型を作成します。これにより、処理しなければならないデータのメモリエラーがなくなりました。

したがって、上記の単一のスレッドソリューションには2つの改善があります。まず、実際の実行時間をスピードアップします。第二に、リスト全体をフルタイムでメモリに保持しない方法を見つけます。これには、完全なファイルをn回解析する必要があることはわかっています。したがって、o(n^2)比較があり、o(n^2)線が解析されます。失敗したプログラムよりも成功したプログラムが遅いので、これは私にとって問題ありません。入力ファイルが十分に小さい場合、私はいつでもより単純なバージョンに居することができます。

複数のCPUコアを使用するために、MapReduceの実装を実世界のHaskellから取り出しました(第24章、利用可能 ここ).

チャンキング関数を本から、完全なファイルをチャンクで分割する代わりに、各チャンクが1つの要素を表しているラインと同じくらい多くのチャンクを返すように変更しました

tails . lines . readFile

私はプログラムをファイルサイズでスケーラブルにすることを望んでいるので、私は最初に使用しました 怠zyio. 。しかし、これは「Open Filesが多すぎる」で失敗します。 前の質問 (ファイルハンドルはGCによって遅すぎる)。完全なLazy IOバージョンがそこに投稿されています。

受け入れられた答えが説明するように、 厳格なio 問題を解決できます。実際、2Kラインファイルの「Open Filesが多すぎる」問題を解決しますが、50Kファイルの「Out Out Memory」で失敗します。

最初のことに注意してください シングルスレッド 実装(MapReduceなし)は、50Kファイルを処理できます。

私にとって最もアピールする代替ソリューションは、使用することです Ioを繰り返します. 。これにより、ファイルハンドルとメモリリソースの疲労の両方が解決されると予想していました。しかし、私の実装は、2Kラインファイルの「Openファイルが多すぎる」エラーではまだ失敗します。

ITERETE IOバージョンは同じです MapReduce 本のように機能しますが、修正されています Chunkedfileenum で動作させるには 列挙器.

したがって、私の質問は次のとおりです。次のIo base Base実装の何が問題になっていますか?怠lazはどこにありますか?

import Control.Monad.IO.Class (liftIO)
import Control.Monad.Trans (MonadIO, liftIO)
import System.IO

import qualified Data.Enumerator.List as EL
import qualified Data.Enumerator.Text as ET
import Data.Enumerator hiding (map, filter, head, sequence)

import Data.Text(Text)
import Data.Text.Read
import Data.Maybe

import qualified Data.ByteString.Char8 as Str
import Control.Exception (bracket,finally)
import Control.Monad(forM,liftM)
import Control.Parallel.Strategies
import Control.Parallel
import Control.DeepSeq (NFData)
import Data.Int (Int64)

--Goal: in a file with n values, calculate the sum of all n*(n-1)/2 squared distances

--My operation for one value pair
distance :: Int -> Int -> Int
distance a b = (a-b)*(a-b)

combineDistances :: [Int] -> Int
combineDistances = sum

--Test file generation
createTestFile :: Int -> FilePath -> IO ()
createTestFile n path = writeFile path $ unlines $ map show $ take n $ infiniteList 0 1
        where infiniteList :: Int->Int-> [Int]
              infiniteList i j = (i + j) : infiniteList j (i+j)

--Applying my operation simply on a file 
--(Actually does NOT throw an Out of memory on a file generated by createTestFile 50000)
--But i want to use multiple cores..
sumOfDistancesOnSmallFile :: FilePath -> IO Int
sumOfDistancesOnSmallFile path = do
                  fileContents <- readFile path
                  return $ allDistances $ map read $ lines $ fileContents
                  where
                      allDistances (x:xs) = (allDistances xs) + ( sum $ map (distance x)    xs)
                      allDistances _ = 0

--Setting up an enumerator of read values from a text stream
readerEnumerator :: Monad m =>Integral a => Reader a -> Step a m b -> Iteratee Text m b
readerEnumerator reader = joinI . (EL.concatMapM transformer)
                            where transformer input = case reader input of
                                         Right (val, remainder) -> return [val]
                                         Left err -> return [0]

readEnumerator :: Monad m =>Integral a => Step a m b -> Iteratee Text m b
readEnumerator = readerEnumerator (signed decimal)

--The iteratee version of my operation
distancesFirstToTailIt :: Monad m=> Iteratee Int m Int
distancesFirstToTailIt = do
    maybeNum <- EL.head
    maybe (return 0) distancesOneToManyIt maybeNum

distancesOneToManyIt :: Monad m=> Int -> Iteratee Int m Int
distancesOneToManyIt base = do
    maybeNum <- EL.head
    maybe (return 0) combineNextDistance maybeNum
    where combineNextDistance nextNum = do
              rest <- distancesOneToManyIt base
              return $ combineDistances [(distance base nextNum),rest]

--The mapreduce algorithm
mapReduce :: Strategy b -- evaluation strategy for mapping
          -> (a -> b)   -- map function
          -> Strategy c -- evaluation strategy for reduction
          -> ([b] -> c) -- reduce function
          -> [a]        -- list to map over
          -> c
mapReduce mapStrat mapFunc reduceStrat reduceFunc input =
          mapResult `pseq` reduceResult
          where mapResult    = parMap mapStrat mapFunc input
                reduceResult = reduceFunc mapResult `using` reduceStrat

--Applying the iteratee operation using mapreduce
sumOfDistancesOnFileWithIt :: FilePath -> IO Int
sumOfDistancesOnFileWithIt path = chunkedFileEnum chunkByLinesTails (distancesUsingMapReduceIt) path

distancesUsingMapReduceIt :: [Enumerator Text IO Int] -> IO Int
distancesUsingMapReduceIt = mapReduce rpar (runEnumeratorAsMapFunc)
                                      rpar (sumValuesAsReduceFunc)
                            where runEnumeratorAsMapFunc :: Enumerator Text IO Int -> IO Int
                                  runEnumeratorAsMapFunc = (\source->run_ (source $$ readEnumerator $$ distancesFirstToTailIt))
                                  sumValuesAsReduceFunc :: [IO Int] -> IO Int
                                  sumValuesAsReduceFunc = liftM sum . sequence


--Working with (file)chunk enumerators:
data ChunkSpec = CS{
    chunkOffset :: !Int
    , chunkLength :: !Int
    } deriving (Eq,Show)

chunkedFileEnum ::   (NFData (a)) => MonadIO m =>
                (FilePath-> IO [ChunkSpec])
           ->   ([Enumerator Text m b]->IO a)
           ->   FilePath
           ->   IO a
chunkedFileEnum chunkCreator funcOnChunks path = do
    (chunks, handles)<- chunkedEnum chunkCreator path
    r <- funcOnChunks chunks
    (rdeepseq r `seq` (return r)) `finally` mapM_ hClose handles

chunkedEnum ::  MonadIO m=>
                (FilePath -> IO [ChunkSpec])
            ->  FilePath
            ->  IO ([Enumerator Text m b], [Handle])
chunkedEnum chunkCreator path = do
    chunks <- chunkCreator path
    liftM unzip . forM chunks $ \spec -> do
        h <- openFile path ReadMode
        hSeek h AbsoluteSeek (fromIntegral (chunkOffset spec))
        let chunk = ET.enumHandle h --Note:chunklength not taken into account, so just to EOF
        return (chunk,h)

-- returns set of chunks representing  tails . lines . readFile 
chunkByLinesTails :: FilePath -> IO[ChunkSpec]
chunkByLinesTails path = do
    bracket (openFile path ReadMode) hClose $ \h-> do
        totalSize <- fromIntegral `liftM` hFileSize h
        let chunkSize = 1
            findChunks offset = do
            let newOffset = offset + chunkSize
            hSeek h AbsoluteSeek (fromIntegral newOffset)
            let findNewline lineSeekOffset = do
                eof <- hIsEOF h
                if eof
                    then return [CS offset (totalSize - offset)]
                    else do
                        bytes <- Str.hGet h 256
                        case Str.elemIndex '\n' bytes of
                            Just n -> do
                                nextChunks <- findChunks (lineSeekOffset + n + 1)
                                return (CS offset (totalSize-offset):nextChunks)
                            Nothing -> findNewline (lineSeekOffset + Str.length bytes)
            findNewline newOffset
        findChunks 0

ところで、私はMac OS X 10.6.7(Snow Leopard)でHaskellPlatform 2011.2.0を実行しています
次のパッケージで:
バイテストリング0.9.1.10
平行3.1.0.1
列挙器0.4.8、マニュアル付き ここ

役に立ちましたか?

解決

エラーが言うように、開いたファイルが多すぎます。 Haskellがプログラムのほとんどを順番に実行することを期待していましたが、いくつかの「スパーク」並列がありました。ただし、SCLVが述べたように、Haskellは常に評価を引き起こします。

これは通常、純粋な機能プログラムでは問題ではありませんが、IO(リソース)を扱うときです。現実世界のHaskellの本に記載されているように、私は並列性を拡大しました。したがって、私の結論は、スパーク内のIOリソースを扱うときに限られたスケールでのみ並列処理を行うことです。純粋な機能的部分では、過度の並列性が成功する可能性があります。

したがって、私の投稿に対する答えは、プログラム全体でMapReduceを使用せず、内部の純粋な機能部分内で使用することです。

プログラムが実際に失敗した場所を示すために、私はそれを-Enable -Executable -Profiliing -Pで構成し、それを構築し、 +RTS -P -HC -L30を使用して実行しました。実行可能ファイルはすぐに失敗するため、メモリ割り当てプロファイルはありません。 .Profファイルの結果の時間割り当てプロファイルは、次のものから始まります。

                                                                                               individual    inherited
COST CENTRE              MODULE                                               no.    entries  %time %alloc   %time %alloc

MAIN                     MAIN                                                   1            0   0.0    0.3   100.0  100.0
  main                    Main                                                1648           2   0.0    0.0    50.0   98.9
    sumOfDistancesOnFileWithIt MapReduceTest                                  1649           1   0.0    0.0    50.0   98.9
      chunkedFileEnum       MapReduceTest                                     1650           1   0.0    0.0    50.0   98.9
        chunkedEnum          MapReduceTest                                    1651         495   0.0   24.2    50.0   98.9
          lineOffsets         MapReduceTest                                   1652           1  50.0   74.6    50.0   74.6

ChunkedenumはIO([列挙者テキストMB]、[ハンドル])を返し、明らかに495のエントリを受け取ります。入力ファイルは2K行ファイルであったため、LineOffsetsの単一エントリは2000のオフセットのリストを返しました。 distancessingmapreduceitには1つのエントリがないため、実際の作業は開始されませんでした!

ライセンス: CC-BY-SA帰属
所属していません StackOverflow
scroll top