Question

I want to consider the latest event based on the payload i.e StockTicksData id and ignore any duplicates within a time window

Payload and Query

var StockTicksData = new[]
{
new { Timestamp = new DateTime(2012, 5, 9, 1, 00, 01), Price = 100, ID = "000361105"    }, 
new { Timestamp = new DateTime(2012, 5, 9, 1, 00, 02), Price = 200, ID = "000361105" }, 
new { Timestamp = new DateTime(2012, 5, 9, 1, 00, 03), Price = 3000, ID = "000361105" }, 
new { Timestamp = new DateTime(2012, 5, 9, 1, 00, 04), Price = 100, ID = "001055102" }, 
new { Timestamp = new DateTime(2012, 5, 9, 1, 00, 05), Price = 700, ID = "001084102" }, 
new { Timestamp = new DateTime(2012, 5, 9, 1, 00, 06), Price = 500, ID = "001084102" }, 
new { Timestamp = new DateTime(2012, 5, 9, 1, 00, 07), Price = 100, ID = "001084102" }, 
};
var stocks = StockTicksData.ToPointStream(Application, t =>
PointEvent.CreateInsert(t.Timestamp, t),
AdvanceTimeSettings.IncreasingStartTime);

var query = (from e in stocks
        group e by e.ID into ipGroup
        from win in ipGroup.TumblingWindow(TimeSpan.FromSeconds(2), 
            HoppingWindowOutputPolicy.ClipToWindowEnd)
        select new 
        {   
             CusipID = cusipGroup.Key,
             Timestamp = win.Max(e => e.Timestamp),
             Price = 0
        });

var cusipIdGroupCepStream = (from px in query
                         join lz in stocks
                         on new { px.CusipID, px.Timestamp }
                         equals new { lz.CusipID, lz.Timestamp }
                         select new 
                           {
                              CusipId = lz.CusipID,
                              Price = lz.Price,
                              TimeofArrival = lz.Timestamp
                           });

The above query works fine, but when I use a input adapter I have to insert a cti event to flush the output. Here is the code

Ticks Generator it has timeof arrival as

priceTick.TimeofArrival = DateTime.Now.AddTicks(1);

Input Adpator

.... In a loop
{
currEvent = CreateInsertEvent();
currEvent.StartTime = priceTick.TimeofArrival; **// each event has time arrival from input which is t+1**
currEvent.Payload = new PriceTick { Id = priceTick.Id, Price = priceTick.Price, TimeofArrival = priceTick.TimeofArrival };
  pendingEvent = null;

 Enqueue(ref currEvent);

 // Also send an CTI event
 EnqueueCtiEvent(priceTick.TimeofArrival.AddTicks(1)); **// Added to flush the output**
 }

The same query does not give the expected output with input adapter and cti events

Any help would be greatly appreciated.

Was it helpful?

Solution

The above query works in LinqPad as there are no CTI event to be generated. The same query is not spitting out the complete set of results when configuted with Input/output adapter.

In order to flush the output you need to configure, generating of CTI event in via ITypedDeclareAdvanceTimeProperties in input factory. It take care of generating the CTI events based on the setting. I am using the below configuration

public AdapterAdvanceTimeSettings DeclareAdvanceTimeProperties(StockTickerInputConfig configInfo, EventShape eventShape) { return new AdapterAdvanceTimeSettings(new AdvanceTimeGenerationSettings(configInfo.CtiFrequency, TimeSpan.FromTicks(1)), AdvanceTimePolicy.Adjust); }

In case of Point event where the frequency of data is very high per second it drops/does not flush out the events to output adapter. I have to manually Enqueue the LAST CTI event to ensure the results are flushed out to output adapter.

The current setting in input factory does not take care of generating the last CTI after my last data feed/ event is enqueued. I created a condition in input adapter to check if this is the last event to be enqueued and then enqueue a cti event via EnqueueCtiEvent

I can provide the code if needed.Let me know anyone has better way of solving this.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top