Вопрос

От Don Syme Blog (http://blogs.msdn.com/b/dsyme/archive/2010/01/10/async-and-parallel-design-patterns-in-f-deporting-press-with-vents-plus-twitter-sample. aspx.) Я пытался реализовать поток Twitter Leam. Моя цель состоит в том, чтобы следовать руководству документации API в Твиттере, которая говорит «что твиты часто должны быть сохранены или очередны до обработки при построении системы высокой надежности».

Таким образом, мой код должен иметь два компонента:

  • Очередь, которая складывается и обрабатывает каждый статус / Tweet JSON
  • Что-то, чтобы прочитать поток Twitter, который сбрасывает в очередь Tweet в струнах JSON

Я выбираю следующее:

  • Агент, к которому я публикую каждый твит, который декодирует JSON и сбрасывает его в базу данных
  • Простой HTTP WebRequest

Я также хотел бы выбросить в текстовый файл любые ошибки от вставки в базу данных. (Я, вероятно, перейдем к агенту супервизора для всех ошибок).

Две проблемы:

  • Моя стратегия здесь хорошая? Если я правильно понимаю, агент ведет себя как интеллектуальную очередь и обрабатывает свои сообщения асинхронно (если у него есть 10 парней в очередь, она будет обработать их в то время, вместо того, чтобы ждать 1-е годы, чтобы закончить, то 2-й и т. Д. ...), правильный ?
  • По словам Don Syme's Post все до того, как то время изолирован так, чтобы обтенить и дамп базы данных изолированы. Но потому что мне это нужно, я никогда не закрываю связь с базой данных ...?

Код выглядит что-то вроде:

let dumpToDatabase databaseName = 
   //opens databse connection 
   fun tweet -> inserts tweet in database

type Agent<'T> = MailboxProcessor<'T>



 let agentDump =
            Agent.Start(fun (inbox: MailboxProcessor<string>) ->
               async{
                   use w2 = new StreamWriter(@"\Errors.txt")
                   let dumpError  =fun (error:string) -> w2.WriteLine( error )
                   let dumpTweet =  dumpToDatabase "stream"
                   while true do 
                       let! msg = inbox.Receive()
                       try 
                           let tw = decode msg
                           dumpTweet tw
                       with 
                       | :? MySql.Data.MySqlClient.MySqlException as ex -> 
    dumpError (msg+ex.ToString() ) 
                        | _ as ex -> () 



                             }
                             )

    let filter_url = "http://stream.twitter.com/1/statuses/filter.json"
    let parameters = "track=RT&"
    let stream_url = filter_url

    let stream = twitterStream MyCredentials stream_url parameters


    while true do 
        agentDump.Post(stream.ReadLine())

Большое спасибо !

Редактировать код с агентом процессора:

let dumpToDatabase (tweets:tweet list)= 
    bulk insert of tweets in database    

let agentProcessor = 
        Agent.Start(fun (inbox: MailboxProcessor<string list>) ->
           async{
               while true do 
                       let! msg = inbox.Receive()
                       try
                          msg
                          |> List.map(decode)
                          |> dumpToDatabase 
                        with
                        | _ as ex -> Console.WriteLine("Processor "+ex.ToString()))
                 }
                 )



let agentDump =
        Agent.Start(fun (inbox: MailboxProcessor<string>) ->
                  let rec loop messageList count = async{
                      try
                          let! newMsg = inbox.Receive()
                          let newMsgList = newMsg::messageList
                          if count = 10 then 
                               agentProcessor.Post( newMsgList )
                               return! loop [] 0
                          else                    
                               return! loop newMsgList (count+1)
                      with
                      | _ as ex -> Console.WriteLine("Dump "+ex.ToString())

                  }
                  loop [] 0)

let filter_url = "http://stream.twitter.com/1/statuses/filter.json"
let parameters = "track=RT&"
let stream_url = filter_url

let stream = twitterStream MyCredentials stream_url parameters


while true do 
    agentDump.Post(stream.ReadLine())
Это было полезно?

Решение

Я думаю, что лучший способ описать агент в том, что это Продолжительный процесс, который сохраняет некоторое состояние и может общаться с другими агентами (или веб-страницы или база данных). При написании приложения на основе агента вы часто можете использовать несколько агентов, которые отправляют сообщения друг другу.

Я думаю, что идея создать агент, который читает твиты из Интернета и хранит их в базе данных, является хорошим выбором (хотя вы также можете сохранить твиты в памяти как состояние агента).

  • Я бы не сохранил соединение с базой данных все время - MSSQL (и MySQL, вероятно, также) реализует объединение соединения, поэтому он не закроет соединение автоматически при его выпуске. Это означает, что он безопаснее и аналогично эффективно открыть соединение каждый раз, когда вам нужно писать данные в базу данных.

  • Если вы не ожидаете получить большое количество сообщений об ошибках, я бы, вероятно, сделаю то же самое для потока файлов, а также (при написании, вы можете открыть его, чтобы новый контент добавлен к концу).

Производительность очереди F # Agents работает в том, что он обрабатывает сообщения один за другим (в вашем примере, вы ждете сообщения, используя inbox.Receive(). Отказ Когда очередь содержит несколько сообщений, вы получите их один за другим (в цикле).

  • Если вы хотели обрабатывать несколько сообщений одновременно, вы можете написать агент, который ждет, скажем, 10 сообщений, а затем отправляет их в виде списка другому агенту (который затем выполнил бы объемную обработку).

  • Вы также можете указать timeout параметр на то Receive Метод, так что вы могли бы дождаться не более 10 сообщений, пока все они поступают в течение одной секунды - таким образом, вы можете довольно элегантно реализовать объемную обработку, которая не содержит сообщения в течение длительного времени.

Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top