我已经创建了一个 WCF 服务并正在利用 netMsmqBinding 绑定。

这是一个简单的服务,它将 Dto 传递给我的服务方法,并且不需要响应。该消息被放置在 MSMQ 中,一旦被拾取就插入到数据库中。

确保不丢失数据的最佳方法是什么?

我尝试过以下2种方法:

  1. 抛出异常

    这会将消息放入死信队列中以供手动阅读。当我的服务启动时我可以处理这个

  2. 在绑定上设置 receiveRetryCount="3"

    经过 3 次尝试(即刻发生),这似乎将消息留在队列中,但使我的服务出现故障。重新启动我的服务会重复此过程。

理想情况下我想执行以下操作:

尝试处理消息

  • 如果失败,请等待 5 分钟以看到该消息,然后重试。
  • 如果该过程失败 3 次,则将消息移至死信队列。
  • 重新启动服务会将死信队列中的所有消息推回到队列中,以便可以对其进行处理。

我能实现这个目标吗?如果是这样怎么办?您能否为我指出有关如何针对我的给定场景最好地利用 WCF 和 MSMQ 的任何好文章。

任何帮助将非常感激。谢谢!

一些附加信息

我在 Windows XP 和 Windows Server 2003 上使用 MSMQ 3.0。不幸的是,我无法使用针对 MSMQ 4.0 和 Vista/2008 的内置有害消息支持。

有帮助吗?

解决方案

SDK 中有一个示例可能对您的情况有用。基本上,它的作用是将 IErrorHandler 实现附加到您的服务,当 WCF 声明消息为“有毒”(即,当所有配置的重试都已用完时)。该示例的作用是将消息移动到另一个队列,然后重新启动与该消息关联的 ServiceHost(因为当发现有害消息时它将出现故障)。

这不是一个非常漂亮的示例,但它可能很有用。但也有一些限制:

1-如果您有多个与您的服务关联的端点(即通过多个队列暴露),无法知道有害消息到达哪个队列。如果您只有一个队列,这不会成为问题。我还没有看到任何官方的解决方法,但我已经尝试了一种可能的替代方案,我已在此处记录了该替代方案: http://winterdom.com/weblog/2008/05/27/NetMSMQAndPoisonMessages.aspx

2- 一旦问题消息被移动到另一个队列,它就成为您的责任,因此您可以在超时完成后将其移回处理队列(或将新服务附加到该队列来处理它)。

老实说,无论哪种情况,您都会在这里看到一些 WCF 本身未涵盖的“手动”工作。

我最近一直在从事一个不同的项目,其中我需要显式控制重试发生的频率,我当前的解决方案是创建一组重试队列,并基于以下内容在重试队列和主处理队列之间手动移动消息一组计时器和一些启发式方法,仅使用原始 System.Messaging 内容来处理 MSMQ 队列。它似乎工作得很好,尽管如果你这样做的话会有一些问题。

其他提示

我认为使用 MSMQ(仅在 Vista 上可用),您也许可以这样做:

<bindings>
    <netMsmqBinding>
        <binding name="PosionMessageHandling"
             receiveRetryCount="3"
             retryCycleDelay="00:05:00"
             maxRetryCycles="3"
             receiveErrorHandling="Move" />
    </netMsmqBinding>
</bindings>

第一次调用失败后,WCF 将立即重试 ReceiveRetryCount 次。批次失败后,消息将移至重试队列。延迟 RetryCycleDelay 分钟后,消息从重试队列移至端点队列,并重试批次。这将是重复的最大旋转时间。如果所有失败的消息都根据接收器处理,可以移动(毒药队列),拒绝,掉落或错误

顺便说一句,关于 WCF 和 MSMQ 的好文章是 Juval Lowy 的 Progammig WCF 书的第 9 章

如果您使用 SQL-Server,那么您应该使用分布式事务,因为 MSMQ 和 SQL-Server 都支持它。发生的情况是,您将数据库写入包装在 TransactionScope 块中,并仅在成功时才调用scope.Complete()。如果失败,那么当您的 WCF 方法返回时,该消息将被放回队列中以再次尝试。这是我使用的代码的精简版本:

    [OperationBehavior(TransactionScopeRequired=true, TransactionAutoComplete=true)]
    public void InsertRecord(RecordType record)
    {
        try
        {
            using (TransactionScope scope = new TransactionScope(TransactionScopeOption.Required))
            {
                SqlConnection InsertConnection = new SqlConnection(ConnectionString);
                InsertConnection.Open();

                // Insert statements go here

                InsertConnection.Close();

                // Vote to commit the transaction if there were no failures
                scope.Complete();
            }
        }
        catch (Exception ex)
        {
            logger.WarnException(string.Format("Distributed transaction failure for {0}", 
                Transaction.Current.TransactionInformation.DistributedIdentifier.ToString()),
                ex);
        }
     }

我通过对大量但已知数量的记录进行排队来测试这一点,让 WCF 启动大量线程来同时处理其中的许多记录(达到 16 个线程 - 一次从队列中取出 16 条消息),然后在操作中间终止进程。当程序重新启动时,消息将从队列中读回并再次处理,就好像什么也没发生一样,并且在测试结束时数据库是一致的并且没有丢失记录。

分布式事务管理器具有环境存在性,当您创建 TransactionScope 的新实例时,它会自动在方法调用范围内搜索当前事务——当 WCF 将消息从队列中弹出时,该事务应该已经由 WCF 创建了并调用了你的方法。

不幸的是,我被困在 Windows XP 和 Windows Server 2003 上,所以这对我来说不是一个选择。-(我将在我的问题中重新澄清这一点,因为我在发布后找到了这个解决方案并意识到我无法使用它)

我发现一种解决方案是设置一个自定义处理程序,它将我的消息移动到另一个队列或有毒队列并重新启动我的服务。这对我来说似乎很疯狂。想象一下我的 Sql Server 宕机了,服务重新启动的频率是多少。

所以我最终所做的就是允许线路出现故障并将消息留在队列中。我还向我的系统日志服务记录了一条致命消息,表明发生了这种情况。一旦我们的问题得到解决,我就会重新启动服务,所有消息都会再次开始处理。

我意识到重新处理此消息或任何其他消息都会失败,所以为什么需要将此消息和其他消息移动到另一个队列。我不妨停止我的服务,并在一切按预期运行时重新启动它。

aogan,您对 MSMQ 4.0 有完美的答案,但不幸的是不适合我

许可以下: CC-BY-SA归因
不隶属于 StackOverflow
scroll top