Реализуйте CCR Interleave Arbiter в F#
Вопрос
Я хочу реализовать концепцию порта структуры CCR в F# (поскольку CCR официально не поддерживается для .NET 4.0). Я знаю, что можно использовать Почтовый ящик класс в f# для этого. Это отлично работает для простого Получать Арбитры, но мне нужна концепция Переплетать Арбитр, то есть я хочу контролировать, какие сообщения обрабатываются исключительно, а какие обрабатываются одновременно. До сих пор я понятия не имею, чтобы реализовать это в F#, и я был бы благодарен за вашу помощь.
Решение
Я не очень знаком с CCR, но я постараюсь ответить - мое понимание в том, что переплетать арбитр ведет себя немного как ReaderWriterLock
. Анкет То есть вы можете указать некоторые операции, которые могут работать параллельно (чтения), и некоторые операции, которые являются исключительными (записывает).
Следующий агент является одним из способов его реализации (не протестированные, но типа проверки :-)). Агент раскрывает две операции, которые предназначены для общественного использования. Последний внутренний:
type Message<'T> =
| PerformReadOperation of ('T -> Async<unit>)
| PerformWriteOperation of ('T -> Async<'T>)
| ReadOperationCompleted
Отправив агента
PerformReadOperation
, Вы даете ему операцию, которая должна выполняться (один раз) с использованием состояния и, возможно, параллельно с другими операциями чтения.Отправив агента
PerformWriteOperation
, Вы даете ему операцию, которая вычисляет новое состояние и должно выполняться после завершения всех операций чтения. (Если бы вы работали с неизменным состоянием, это было бы проще - вам не придется ждать, пока читатели завершит! Но реализация ниже реализует ожидание).
Агент начинается с некоторого начального состояния:
let initial = // initial state
И остальная часть агента реализована с использованием двух петлей:
let interleaver = MailboxProcessor.Start(fun mbox ->
// Asynchronously wait until all read operations complete
let rec waitUntilReadsComplete reads =
if reads = 0 then async { return () }
else mbox.Scan(fun msg ->
match msg with
| ReadOperationCompleted -> Some(waitUntilReadsComplete (reads - 1))
| _ -> None)
let rec readingLoop state reads = async {
let! msg = mbox.Receive()
match msg with
| ReadOperationCompleted ->
// Some read operation completed - decrement counter
return! readingLoop state (reads - 1)
| PerformWriteOperation(op) ->
do! waitUntilReadsComplete reads
let! newState = op state
return! readingLoop newState 0
| PerformReadOperation(op) ->
// Start the operation in background & increment counter
async { do! op state
mbox.Post(ReadOperationCompleted) }
|> Async.Start
return! readingLoop state (reads + 1) }
readingLoop initial 0)
Другие советы
Просто чтобы добавить в предложенное решение Томаса, если вы не хотите разоблачить сообщение «чтения comperation -completed» потребителю почтового ящика (поскольку это сообщение является внутренним, и в текущей реализации может быть отправлен любой потребитель почтового ящика) Отдельный почтовый ящик может быть создан внутри функции основного почтового процессора, которая примет два сообщения: readoPerationCompleted и waitForreadCompleted (этот будет использоваться с PostandAsyncreply в основном почтовом ящике), так как ответ на это сообщение будет появляться только тогда, когда все прочитано Операции завершены. Также «чтение», представленное «чтением», будет перемещен в этот новый внутренний почтовый ящик, поскольку это состояние будет заключено в этом внутреннем почтовом ящике.