Pregunta

Using TPL / Parallel.ForEach is there an out-of-the-box way to limit the number of times a method is called per unit of time (i.e. no more than 50 calls per second). This is different than limiting the number of threads. Perhaps there's some simple hack to make this work?

¿Fue útil?

Solución

One solution is to make a thread-safe version of the following https://stackoverflow.com/a/7728872/356790

/// <summary>
/// This class limits the number of requests (method calls, events fired, etc.) that can occur in a given unit of time.
/// </summary>
class RequestLimiter
{

    #region Constructors

    /// <summary>
    /// Initializes an instance of the RequestLimiter class.
    /// </summary>
    /// <param name="maxRequests">The maximum number of requests that can be made in a given unit of time.</param>
    /// <param name="timeSpan">The unit of time that the maximum number of requests is limited to.</param>
    /// <exception cref="ArgumentException">maxRequests &lt;= 0</exception>
    /// <exception cref="ArgumentException">timeSpan.TotalMilliseconds &lt;= 0</exception>
    public RequestLimiter( int maxRequests , TimeSpan timeSpan )
    {
        // check parameters
        if ( maxRequests <= 0 )
        {
            throw new ArgumentException( "maxRequests <= 0" , "maxRequests" );
        }
        if ( timeSpan.TotalMilliseconds <= 0 )
        {
            throw new ArgumentException( "timeSpan.TotalMilliseconds <= 0" , "timeSpan" );
        }

        // initialize instance vars
        _maxRequests = maxRequests;
        _timeSpan = timeSpan;
        _requestTimes = new Queue<DateTime>( maxRequests );

        // sleep for 1/10th timeSpan
        _sleepTimeInMs = Convert.ToInt32( Math.Ceiling( timeSpan.TotalMilliseconds / 10 ) );
    }

    #endregion

    /// <summary>
    /// Waits until an request can be made
    /// </summary>
    public void WaitUntilRequestCanBeMade()
    {
        while ( !TryEnqueueRequest() )
        {
            Thread.Sleep( _sleepTimeInMs );
        }
    }

    #region Private Members

    private readonly Queue<DateTime> _requestTimes;
    private readonly object _requestTimesLock = new object();
    private readonly int _maxRequests;
    private readonly TimeSpan _timeSpan;
    private readonly int _sleepTimeInMs;

    /// <summary>
    /// Remove requests that are older than _timeSpan
    /// </summary>
    private void SynchronizeQueue()
    {
        while ( ( _requestTimes.Count > 0 ) && ( _requestTimes.Peek().Add( _timeSpan ) < DateTime.Now ) )
        {
            _requestTimes.Dequeue();
        }
    }

    /// <summary>
    /// Attempts to enqueue a request.
    /// </summary>
    /// <returns>
    /// Returns true if the request was successfully enqueued.  False if not.
    /// </returns>
    private bool TryEnqueueRequest()
    {
        lock ( _requestTimesLock )
        {
            SynchronizeQueue();
            if ( _requestTimes.Count < _maxRequests )
            {
                _requestTimes.Enqueue( DateTime.Now );
                return true;
            }
            return false;
        }
    }

    #endregion

}

Otros consejos

The ready code samples using Timer:

The code samples/examples using Reactive Extensions (Rx):

This solution enforces a delay between the start of each thread and could be used to fulfill your requirements.

    private SemaphoreSlim CooldownLock = new SemaphoreSlim(1, 1);
    private DateTime lastAction;

    private void WaitForCooldown(TimeSpan delay)
    {
        CooldownLock.Wait();

        var waitTime = delay - (DateTime.Now - lastAction);

        if (waitTime > TimeSpan.Zero)
        {
            Task.Delay(waitTime).Wait();
            lastAction = DateTime.Now;
        }

        lastAction = DateTime.Now;

        CooldownLock.Release();
    }

    public void Execute(Action[] actions, int concurrentThreadLimit, TimeSpan threadDelay)
    {
        if (actions.Any())
        {
            Parallel.ForEach(actions, 
                             new ParallelOptions() { MaxDegreeOfParallelism = concurrentThreadLimit}, 
                            (currentAction) =>
                            {
                                WaitForCooldown(threadDelay);
                                currentAction();
                            });
        }
    }
Licenciado bajo: CC-BY-SA con atribución
No afiliado a StackOverflow
scroll top