Question

I have situation, where I have to receive requests in a Web API method, queue those request and then send the bulk to a database (Solr instance).

I am not really sure how do I maintain a batch of requests from multiple sources. For now I am writing each request data in json format to a file on disk, Later I will have a windows service, go through the folder read all files , update the database and delete those files.

Here is what I am doing in my Web API

public void Post(LogEntry value)
{
    value.EventID = Guid.NewGuid();
    value.ServerTime = DateTime.UtcNow;
    string json = JsonConvert.SerializeObject(value);
    using(StreamWriter sw = new StreamWriter(value.EventID.ToString()))
    {
        sw.Write(json);
    }
}

(Here EventID is GUID)

This process doesn't look right, there must be a way to maintain a queue of request, but I am not really sure how to maintain a queue during multiple requests.

The reason I am doing that is, insertion in batches in solr instance is faster than inserting a single record through SolrNet. I am expecting to get at least 100 requests each second on the Web API. I want to create a batch of 1000 request and update the solr instance every 10 seconds. Please don't think that I need code, just need to know what strategy should I adopt to maintain a queue of request / state.

Was it helpful?

Solution

You could use a concurrent queue, if you're using .NET 4.0 or higher:

Concurrent Queue (MSDN)

This is a thread-safe way of using a queue, which then could be accessed at a desired time.

Edit:

Example:

This would be a wrapper for the queue:

public static class RequestQueue
{
    private static ConcurrentQueue<int> _queue;

    public static ConcurrentQueue<int> Queue
    {
        get
        {
            if (_queue == null)
            {
                _queue = new ConcurrentQueue<int>();
            }

            return _queue;
        }
    }

}

Then you could set up your web api like this (this example stores integers for the sake of brevity):

public class ValuesController : ApiController
{        
    public string Get()
    {
        var sb = new StringBuilder();
        foreach (var item in RequestQueue.Queue)
        {
            sb.Append(item.ToString());
        }

        return sb.ToString();
    }

    public void Post(int id)
    {
        RequestQueue.Queue.Enqueue(id);
    }        
}

If u use this example you'll see that the queue holds the values across multiple requests. But, since it lives in memory, those queued items will be gone if the app pool is recycled (for instance).

Now you could build in a check for when the queue holds 10 items and then save those to the DB, while creating another queue to store incoming values.

Like so:

public static class RequestQueue
{
    private static ConcurrentQueue<int> _queue;

    public static ConcurrentQueue<int> Queue
    {
        get
        {
            if (_queue == null)
            {
                _queue = new ConcurrentQueue<int>();
            }

            if (_queue.Count >= 10)
            {
                SaveToDB(_queue);
                _queue = new ConcurrentQueue<int>();
            }

            return _queue;
        }
    }

    public static void SaveToDB(ConcurrentQueue<int> queue)
    {
        foreach (var item in queue)
        {
            SaveItemToDB(item);
        }
    }
}

You need to clean this up a bit, but this setup should work. Also, you might need some locking mechanism around the dumping of the queue to the DB and creating a new instance. I would write a Console app with multiple threads that access this Queue to test it.

OTHER TIPS

This is a very good scenario to make use of MSMQ. For each request just post the item to a MSMQ queue. Either in same webapp or any other app just read multiple items from the queue and post it to solr. Regardless of your app crashing or getting recycled, MSMQ will hold your data safely for you to retrieve it later.

MSMQ is robust, reliable and scalable. It is a perfect fit for your problem.

You can enqueue requests into a Queue in memory and you can send them to a database periodically with use of Quartz .Net. You can simply make this in Global.asax.cs as follows:

public class RequestQueue
{
    private readonly Queue<HttpRequest> _requestHistory; 
    private RequestQueue()
    {
        _requestHistory = new Queue<HttpRequest>();
    }
    private static RequestQueue _singleton;

    public static RequestQueue Instance()
    {
        if (_singleton == null)
            _singleton = new RequestQueue();
        return _singleton;
    }

    public void Enqueue(HttpRequest request)
    {
        _requestHistory.Enqueue(request);
    }

    public void Flush()
    {
        while (_requestHistory.Count > 0)
        {
            var request = _requestHistory.Dequeue();
            try
            {
                //Write request To Db
            }
            catch (Exception)
            {
                _requestHistory.Enqueue(request);
            }
        }
    }
}

public class WebApiApplication : System.Web.HttpApplication
{
    public WebApiApplication()
    {
        base.BeginRequest += delegate
            {
                RequestQueue.Instance().Enqueue(HttpContext.Current.Request);
            };
    }

    private void InitializeQuartz()
    {
        ISchedulerFactory sf = new StdSchedulerFactory();
        IScheduler sched = sf.GetScheduler();

        DateTimeOffset runTime = DateBuilder.EvenMinuteDate(DateTime.UtcNow);
        DateTimeOffset startTime = DateBuilder.NextGivenSecondDate(null, 5);

        IJobDetail job = JobBuilder.Create<QueueConsumer>()
            .WithIdentity("job1", "group1")
            .Build();
        ITrigger trigger = TriggerBuilder.Create()
            .WithIdentity("trigger1", "group1")
            .StartAt(runTime)
            .WithCronSchedule("5 0/1 * * * ?")
            .Build();

        sched.ScheduleJob(job, trigger);

        sched.Start();
    }

    public class QueueConsumer : IJob
    {
        public void Execute(IJobExecutionContext context)
        {
            RequestQueue.Instance().Flush();
        }
    }

    protected void Application_Start()
    {
        InitializeQuartz();

Another solution, could be keeping the records in a memory queue which is not in the same process as WebApi. For example : MemcacheQueue https://github.com/coderrr/memcache_queue

Some of these queue implementation have capabilities of Persistence and hence you wouldn't loose data in any case.

you should try to implement NServiceBus have the ability to schedule messages and send messages in the future, from the service bus documentation you can scheduling capability you can schedule a task or an action/lambda, to be executed repeatedly in a given interval.

that means that you can have a memeory cache and write the content of the array into your solr/lucene impl every 10 minutes for example, thats something as easier as:

Schedule.Every(TimeSpan.FromMinutes(10)).Action(() => { < task to be executed > })

In the case that you will need more flexibility for setting a schedueler you can integrate it quartz.net

the case should be the following:

  • WCF as windows service and NServiceBus should share the same context or implement a cacheManager that can be shared between those two different parts of your system.
  • Every time you do a request you call your wcf pass the parameters and whitin your wcf you add an in memory row to the array (it could be a string array with the same json value you are writing to disk)
  • ServiceBus will handle to manage the queues for the operations into the array and avoid any collisions for the array operations for example:

    • add items to the array
    • empty the array
    • write to your database
public class ThresholdBuffer<T>
{
    private ConcurrentBag<T> _buffer;

    private int _threshold;

    public ThresholdBuffer(int threshold)
    {
        _threshold = threshold;
        _buffer = new ConcurrentBag<T>();
    }

    public void Add(T item)
    {
        _buffer.Add(item);

        if(_buffer.Count >= _threshold)
        {
            Recycle();
        }
    }

    public void Recycle()
    {
        var value = Interlocked.Exchange<ConcurrentBag<T>>(ref _buffer, new ConcurrentBag<T>());
//flush value 
    }
}
  1. Create flushing logic
  2. At Application_Start(Global.asax) event create ThresholdBuffer and store it in Application, Static field, etc
  3. Call Add method
  4. At Application_End manual call Recycle

You may add locking logic in Recycle to prevent multiple ConcurrentBag creation and flushing nearly empty bag. But my opinion is this is less evil than lock.

Update. Lock free without aditional ConcurrentBag creation

public class ThresholdBuffer<T>
{
    private ConcurrentBag<T> _buffer;

    private int _copacity;

    private int _threshold;

    public ThresholdBuffer(int threshold)
    {
        _threshold = threshold;
        _copacity = 0;
        _buffer = new ConcurrentBag<T>();
    }

    public void Add(T item)
    {
        _buffer.Add(item);
        if (Interlocked.Increment(ref _copacity) == _threshold)
        {
            Recycle();
        }
    }

    public void Recycle()
    {
        var value4flasshing = Interlocked.Exchange<ConcurrentBag<T>>(ref _buffer, new ConcurrentBag<T>());
        Thread.VolatileWrite(ref _copacity, 0);
    }
}

ps You may use any ConcurrentCollection instead of ConcurrentBag

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top