StreamInsight Get Latest Event within a time frame
-
07-06-2021 - |
Question
I want to consider the latest event based on the payload i.e StockTicksData id and ignore any duplicates within a time window
Payload and Query
var StockTicksData = new[]
{
new { Timestamp = new DateTime(2012, 5, 9, 1, 00, 01), Price = 100, ID = "000361105" },
new { Timestamp = new DateTime(2012, 5, 9, 1, 00, 02), Price = 200, ID = "000361105" },
new { Timestamp = new DateTime(2012, 5, 9, 1, 00, 03), Price = 3000, ID = "000361105" },
new { Timestamp = new DateTime(2012, 5, 9, 1, 00, 04), Price = 100, ID = "001055102" },
new { Timestamp = new DateTime(2012, 5, 9, 1, 00, 05), Price = 700, ID = "001084102" },
new { Timestamp = new DateTime(2012, 5, 9, 1, 00, 06), Price = 500, ID = "001084102" },
new { Timestamp = new DateTime(2012, 5, 9, 1, 00, 07), Price = 100, ID = "001084102" },
};
var stocks = StockTicksData.ToPointStream(Application, t =>
PointEvent.CreateInsert(t.Timestamp, t),
AdvanceTimeSettings.IncreasingStartTime);
var query = (from e in stocks
group e by e.ID into ipGroup
from win in ipGroup.TumblingWindow(TimeSpan.FromSeconds(2),
HoppingWindowOutputPolicy.ClipToWindowEnd)
select new
{
CusipID = cusipGroup.Key,
Timestamp = win.Max(e => e.Timestamp),
Price = 0
});
var cusipIdGroupCepStream = (from px in query
join lz in stocks
on new { px.CusipID, px.Timestamp }
equals new { lz.CusipID, lz.Timestamp }
select new
{
CusipId = lz.CusipID,
Price = lz.Price,
TimeofArrival = lz.Timestamp
});
The above query works fine, but when I use a input adapter I have to insert a cti event to flush the output. Here is the code
Ticks Generator it has timeof arrival as
priceTick.TimeofArrival = DateTime.Now.AddTicks(1);
Input Adpator
.... In a loop
{
currEvent = CreateInsertEvent();
currEvent.StartTime = priceTick.TimeofArrival; **// each event has time arrival from input which is t+1**
currEvent.Payload = new PriceTick { Id = priceTick.Id, Price = priceTick.Price, TimeofArrival = priceTick.TimeofArrival };
pendingEvent = null;
Enqueue(ref currEvent);
// Also send an CTI event
EnqueueCtiEvent(priceTick.TimeofArrival.AddTicks(1)); **// Added to flush the output**
}
The same query does not give the expected output with input adapter and cti events
Any help would be greatly appreciated.
Solution
The above query works in LinqPad as there are no CTI event to be generated. The same query is not spitting out the complete set of results when configuted with Input/output adapter.
In order to flush the output you need to configure, generating of CTI event in via ITypedDeclareAdvanceTimeProperties in input factory. It take care of generating the CTI events based on the setting. I am using the below configuration
public AdapterAdvanceTimeSettings DeclareAdvanceTimeProperties(StockTickerInputConfig configInfo, EventShape eventShape) { return new AdapterAdvanceTimeSettings(new AdvanceTimeGenerationSettings(configInfo.CtiFrequency, TimeSpan.FromTicks(1)), AdvanceTimePolicy.Adjust); }
In case of Point event where the frequency of data is very high per second it drops/does not flush out the events to output adapter. I have to manually Enqueue the LAST CTI event to ensure the results are flushed out to output adapter.
The current setting in input factory does not take care of generating the last CTI after my last data feed/ event is enqueued. I created a condition in input adapter to check if this is the last event to be enqueued and then enqueue a cti event via EnqueueCtiEvent
I can provide the code if needed.Let me know anyone has better way of solving this.