なぜ私のMapReduceの実装(Real World Haskell)がiteratee IOを使用しているのは、「オープンファイルが多すぎる」でも失敗するのか
質問
ファイルの各行をファイル内の互いの行と比較すると、Haskellプログラムを実装しています。実装できます シングルスレッド 次のように
distance :: Int -> Int -> Int
distance a b = (a-b)*(a-b)
sumOfDistancesOnSmallFile :: FilePath -> IO Int
sumOfDistancesOnSmallFile path = do
fileContents <- readFile path
return $ allDistances $ map read $ lines $ fileContents
where
allDistances (x:xs) = (allDistances xs) + ( sum $ map (distance x) xs)
allDistances _ = 0
これはo(n^2)時間で実行され、整数の完全なリストをメモリにずっと保持する必要があります。私の実際のプログラムでは、ラインにはより多くの数字が含まれており、そのうちINTよりもわずかに複雑なデータ型を作成します。これにより、処理しなければならないデータのメモリエラーがなくなりました。
したがって、上記の単一のスレッドソリューションには2つの改善があります。まず、実際の実行時間をスピードアップします。第二に、リスト全体をフルタイムでメモリに保持しない方法を見つけます。これには、完全なファイルをn回解析する必要があることはわかっています。したがって、o(n^2)比較があり、o(n^2)線が解析されます。失敗したプログラムよりも成功したプログラムが遅いので、これは私にとって問題ありません。入力ファイルが十分に小さい場合、私はいつでもより単純なバージョンに居することができます。
複数のCPUコアを使用するために、MapReduceの実装を実世界のHaskellから取り出しました(第24章、利用可能 ここ).
チャンキング関数を本から、完全なファイルをチャンクで分割する代わりに、各チャンクが1つの要素を表しているラインと同じくらい多くのチャンクを返すように変更しました
tails . lines . readFile
私はプログラムをファイルサイズでスケーラブルにすることを望んでいるので、私は最初に使用しました 怠zyio. 。しかし、これは「Open Filesが多すぎる」で失敗します。 前の質問 (ファイルハンドルはGCによって遅すぎる)。完全なLazy IOバージョンがそこに投稿されています。
受け入れられた答えが説明するように、 厳格なio 問題を解決できます。実際、2Kラインファイルの「Open Filesが多すぎる」問題を解決しますが、50Kファイルの「Out Out Memory」で失敗します。
最初のことに注意してください シングルスレッド 実装(MapReduceなし)は、50Kファイルを処理できます。
私にとって最もアピールする代替ソリューションは、使用することです Ioを繰り返します. 。これにより、ファイルハンドルとメモリリソースの疲労の両方が解決されると予想していました。しかし、私の実装は、2Kラインファイルの「Openファイルが多すぎる」エラーではまだ失敗します。
ITERETE IOバージョンは同じです MapReduce 本のように機能しますが、修正されています Chunkedfileenum で動作させるには 列挙器.
したがって、私の質問は次のとおりです。次のIo base Base実装の何が問題になっていますか?怠lazはどこにありますか?
import Control.Monad.IO.Class (liftIO)
import Control.Monad.Trans (MonadIO, liftIO)
import System.IO
import qualified Data.Enumerator.List as EL
import qualified Data.Enumerator.Text as ET
import Data.Enumerator hiding (map, filter, head, sequence)
import Data.Text(Text)
import Data.Text.Read
import Data.Maybe
import qualified Data.ByteString.Char8 as Str
import Control.Exception (bracket,finally)
import Control.Monad(forM,liftM)
import Control.Parallel.Strategies
import Control.Parallel
import Control.DeepSeq (NFData)
import Data.Int (Int64)
--Goal: in a file with n values, calculate the sum of all n*(n-1)/2 squared distances
--My operation for one value pair
distance :: Int -> Int -> Int
distance a b = (a-b)*(a-b)
combineDistances :: [Int] -> Int
combineDistances = sum
--Test file generation
createTestFile :: Int -> FilePath -> IO ()
createTestFile n path = writeFile path $ unlines $ map show $ take n $ infiniteList 0 1
where infiniteList :: Int->Int-> [Int]
infiniteList i j = (i + j) : infiniteList j (i+j)
--Applying my operation simply on a file
--(Actually does NOT throw an Out of memory on a file generated by createTestFile 50000)
--But i want to use multiple cores..
sumOfDistancesOnSmallFile :: FilePath -> IO Int
sumOfDistancesOnSmallFile path = do
fileContents <- readFile path
return $ allDistances $ map read $ lines $ fileContents
where
allDistances (x:xs) = (allDistances xs) + ( sum $ map (distance x) xs)
allDistances _ = 0
--Setting up an enumerator of read values from a text stream
readerEnumerator :: Monad m =>Integral a => Reader a -> Step a m b -> Iteratee Text m b
readerEnumerator reader = joinI . (EL.concatMapM transformer)
where transformer input = case reader input of
Right (val, remainder) -> return [val]
Left err -> return [0]
readEnumerator :: Monad m =>Integral a => Step a m b -> Iteratee Text m b
readEnumerator = readerEnumerator (signed decimal)
--The iteratee version of my operation
distancesFirstToTailIt :: Monad m=> Iteratee Int m Int
distancesFirstToTailIt = do
maybeNum <- EL.head
maybe (return 0) distancesOneToManyIt maybeNum
distancesOneToManyIt :: Monad m=> Int -> Iteratee Int m Int
distancesOneToManyIt base = do
maybeNum <- EL.head
maybe (return 0) combineNextDistance maybeNum
where combineNextDistance nextNum = do
rest <- distancesOneToManyIt base
return $ combineDistances [(distance base nextNum),rest]
--The mapreduce algorithm
mapReduce :: Strategy b -- evaluation strategy for mapping
-> (a -> b) -- map function
-> Strategy c -- evaluation strategy for reduction
-> ([b] -> c) -- reduce function
-> [a] -- list to map over
-> c
mapReduce mapStrat mapFunc reduceStrat reduceFunc input =
mapResult `pseq` reduceResult
where mapResult = parMap mapStrat mapFunc input
reduceResult = reduceFunc mapResult `using` reduceStrat
--Applying the iteratee operation using mapreduce
sumOfDistancesOnFileWithIt :: FilePath -> IO Int
sumOfDistancesOnFileWithIt path = chunkedFileEnum chunkByLinesTails (distancesUsingMapReduceIt) path
distancesUsingMapReduceIt :: [Enumerator Text IO Int] -> IO Int
distancesUsingMapReduceIt = mapReduce rpar (runEnumeratorAsMapFunc)
rpar (sumValuesAsReduceFunc)
where runEnumeratorAsMapFunc :: Enumerator Text IO Int -> IO Int
runEnumeratorAsMapFunc = (\source->run_ (source $$ readEnumerator $$ distancesFirstToTailIt))
sumValuesAsReduceFunc :: [IO Int] -> IO Int
sumValuesAsReduceFunc = liftM sum . sequence
--Working with (file)chunk enumerators:
data ChunkSpec = CS{
chunkOffset :: !Int
, chunkLength :: !Int
} deriving (Eq,Show)
chunkedFileEnum :: (NFData (a)) => MonadIO m =>
(FilePath-> IO [ChunkSpec])
-> ([Enumerator Text m b]->IO a)
-> FilePath
-> IO a
chunkedFileEnum chunkCreator funcOnChunks path = do
(chunks, handles)<- chunkedEnum chunkCreator path
r <- funcOnChunks chunks
(rdeepseq r `seq` (return r)) `finally` mapM_ hClose handles
chunkedEnum :: MonadIO m=>
(FilePath -> IO [ChunkSpec])
-> FilePath
-> IO ([Enumerator Text m b], [Handle])
chunkedEnum chunkCreator path = do
chunks <- chunkCreator path
liftM unzip . forM chunks $ \spec -> do
h <- openFile path ReadMode
hSeek h AbsoluteSeek (fromIntegral (chunkOffset spec))
let chunk = ET.enumHandle h --Note:chunklength not taken into account, so just to EOF
return (chunk,h)
-- returns set of chunks representing tails . lines . readFile
chunkByLinesTails :: FilePath -> IO[ChunkSpec]
chunkByLinesTails path = do
bracket (openFile path ReadMode) hClose $ \h-> do
totalSize <- fromIntegral `liftM` hFileSize h
let chunkSize = 1
findChunks offset = do
let newOffset = offset + chunkSize
hSeek h AbsoluteSeek (fromIntegral newOffset)
let findNewline lineSeekOffset = do
eof <- hIsEOF h
if eof
then return [CS offset (totalSize - offset)]
else do
bytes <- Str.hGet h 256
case Str.elemIndex '\n' bytes of
Just n -> do
nextChunks <- findChunks (lineSeekOffset + n + 1)
return (CS offset (totalSize-offset):nextChunks)
Nothing -> findNewline (lineSeekOffset + Str.length bytes)
findNewline newOffset
findChunks 0
ところで、私はMac OS X 10.6.7(Snow Leopard)でHaskellPlatform 2011.2.0を実行しています
次のパッケージで:
バイテストリング0.9.1.10
平行3.1.0.1
列挙器0.4.8、マニュアル付き ここ
解決
エラーが言うように、開いたファイルが多すぎます。 Haskellがプログラムのほとんどを順番に実行することを期待していましたが、いくつかの「スパーク」並列がありました。ただし、SCLVが述べたように、Haskellは常に評価を引き起こします。
これは通常、純粋な機能プログラムでは問題ではありませんが、IO(リソース)を扱うときです。現実世界のHaskellの本に記載されているように、私は並列性を拡大しました。したがって、私の結論は、スパーク内のIOリソースを扱うときに限られたスケールでのみ並列処理を行うことです。純粋な機能的部分では、過度の並列性が成功する可能性があります。
したがって、私の投稿に対する答えは、プログラム全体でMapReduceを使用せず、内部の純粋な機能部分内で使用することです。
プログラムが実際に失敗した場所を示すために、私はそれを-Enable -Executable -Profiliing -Pで構成し、それを構築し、 +RTS -P -HC -L30を使用して実行しました。実行可能ファイルはすぐに失敗するため、メモリ割り当てプロファイルはありません。 .Profファイルの結果の時間割り当てプロファイルは、次のものから始まります。
individual inherited
COST CENTRE MODULE no. entries %time %alloc %time %alloc
MAIN MAIN 1 0 0.0 0.3 100.0 100.0
main Main 1648 2 0.0 0.0 50.0 98.9
sumOfDistancesOnFileWithIt MapReduceTest 1649 1 0.0 0.0 50.0 98.9
chunkedFileEnum MapReduceTest 1650 1 0.0 0.0 50.0 98.9
chunkedEnum MapReduceTest 1651 495 0.0 24.2 50.0 98.9
lineOffsets MapReduceTest 1652 1 50.0 74.6 50.0 74.6
ChunkedenumはIO([列挙者テキストMB]、[ハンドル])を返し、明らかに495のエントリを受け取ります。入力ファイルは2K行ファイルであったため、LineOffsetsの単一エントリは2000のオフセットのリストを返しました。 distancessingmapreduceitには1つのエントリがないため、実際の作業は開始されませんでした!