Twitter Stream API с агентами в F #
Вопрос
От Don Syme Blog (http://blogs.msdn.com/b/dsyme/archive/2010/01/10/async-and-parallel-design-patterns-in-f-deporting-press-with-vents-plus-twitter-sample. aspx.) Я пытался реализовать поток Twitter Leam. Моя цель состоит в том, чтобы следовать руководству документации API в Твиттере, которая говорит «что твиты часто должны быть сохранены или очередны до обработки при построении системы высокой надежности».
Таким образом, мой код должен иметь два компонента:
- Очередь, которая складывается и обрабатывает каждый статус / Tweet JSON
- Что-то, чтобы прочитать поток Twitter, который сбрасывает в очередь Tweet в струнах JSON
Я выбираю следующее:
- Агент, к которому я публикую каждый твит, который декодирует JSON и сбрасывает его в базу данных
- Простой HTTP WebRequest
Я также хотел бы выбросить в текстовый файл любые ошибки от вставки в базу данных. (Я, вероятно, перейдем к агенту супервизора для всех ошибок).
Две проблемы:
- Моя стратегия здесь хорошая? Если я правильно понимаю, агент ведет себя как интеллектуальную очередь и обрабатывает свои сообщения асинхронно (если у него есть 10 парней в очередь, она будет обработать их в то время, вместо того, чтобы ждать 1-е годы, чтобы закончить, то 2-й и т. Д. ...), правильный ?
- По словам Don Syme's Post все до того, как то время изолирован так, чтобы обтенить и дамп базы данных изолированы. Но потому что мне это нужно, я никогда не закрываю связь с базой данных ...?
Код выглядит что-то вроде:
let dumpToDatabase databaseName =
//opens databse connection
fun tweet -> inserts tweet in database
type Agent<'T> = MailboxProcessor<'T>
let agentDump =
Agent.Start(fun (inbox: MailboxProcessor<string>) ->
async{
use w2 = new StreamWriter(@"\Errors.txt")
let dumpError =fun (error:string) -> w2.WriteLine( error )
let dumpTweet = dumpToDatabase "stream"
while true do
let! msg = inbox.Receive()
try
let tw = decode msg
dumpTweet tw
with
| :? MySql.Data.MySqlClient.MySqlException as ex ->
dumpError (msg+ex.ToString() )
| _ as ex -> ()
}
)
let filter_url = "http://stream.twitter.com/1/statuses/filter.json"
let parameters = "track=RT&"
let stream_url = filter_url
let stream = twitterStream MyCredentials stream_url parameters
while true do
agentDump.Post(stream.ReadLine())
Большое спасибо !
Редактировать код с агентом процессора:
let dumpToDatabase (tweets:tweet list)=
bulk insert of tweets in database
let agentProcessor =
Agent.Start(fun (inbox: MailboxProcessor<string list>) ->
async{
while true do
let! msg = inbox.Receive()
try
msg
|> List.map(decode)
|> dumpToDatabase
with
| _ as ex -> Console.WriteLine("Processor "+ex.ToString()))
}
)
let agentDump =
Agent.Start(fun (inbox: MailboxProcessor<string>) ->
let rec loop messageList count = async{
try
let! newMsg = inbox.Receive()
let newMsgList = newMsg::messageList
if count = 10 then
agentProcessor.Post( newMsgList )
return! loop [] 0
else
return! loop newMsgList (count+1)
with
| _ as ex -> Console.WriteLine("Dump "+ex.ToString())
}
loop [] 0)
let filter_url = "http://stream.twitter.com/1/statuses/filter.json"
let parameters = "track=RT&"
let stream_url = filter_url
let stream = twitterStream MyCredentials stream_url parameters
while true do
agentDump.Post(stream.ReadLine())
Решение
Я думаю, что лучший способ описать агент в том, что это Продолжительный процесс, который сохраняет некоторое состояние и может общаться с другими агентами (или веб-страницы или база данных). При написании приложения на основе агента вы часто можете использовать несколько агентов, которые отправляют сообщения друг другу.
Я думаю, что идея создать агент, который читает твиты из Интернета и хранит их в базе данных, является хорошим выбором (хотя вы также можете сохранить твиты в памяти как состояние агента).
Я бы не сохранил соединение с базой данных все время - MSSQL (и MySQL, вероятно, также) реализует объединение соединения, поэтому он не закроет соединение автоматически при его выпуске. Это означает, что он безопаснее и аналогично эффективно открыть соединение каждый раз, когда вам нужно писать данные в базу данных.
Если вы не ожидаете получить большое количество сообщений об ошибках, я бы, вероятно, сделаю то же самое для потока файлов, а также (при написании, вы можете открыть его, чтобы новый контент добавлен к концу).
Производительность очереди F # Agents работает в том, что он обрабатывает сообщения один за другим (в вашем примере, вы ждете сообщения, используя inbox.Receive()
. Отказ Когда очередь содержит несколько сообщений, вы получите их один за другим (в цикле).
Если вы хотели обрабатывать несколько сообщений одновременно, вы можете написать агент, который ждет, скажем, 10 сообщений, а затем отправляет их в виде списка другому агенту (который затем выполнил бы объемную обработку).
Вы также можете указать
timeout
параметр на тоReceive
Метод, так что вы могли бы дождаться не более 10 сообщений, пока все они поступают в течение одной секунды - таким образом, вы можете довольно элегантно реализовать объемную обработку, которая не содержит сообщения в течение длительного времени.