Compare commits

...

1 Commits
16632 ... 14181

Author SHA1 Message Date
Martin-Molinero
c9f3115a16 Smaller live warmup history requests
- Smaller live warmup history requests, will keep track of the last point
  we got from the file based enumerator and start our history enumeration from this point
2022-06-30 18:05:53 -03:00

View File

@@ -437,9 +437,13 @@ namespace QuantConnect.Lean.Engine.DataFeeds
historyWarmup = new SubscriptionRequest(warmupRequest, startTimeUtc: warmupHistoryStartDate);
}
// let's keep track of the last point we got from the file based enumerator and start our history enumeration from this point
// this is much more efficient since these duplicated points will be dropped by the filter righ away causing memory usage spikes
var lastPointTracker = new LastPointTracker();
var synchronizedWarmupEnumerator = TryAddFillForwardEnumerator(warmupRequest,
// we concatenate the file based and history based warmup enumerators, dropping duplicate time stamps
new ConcatEnumerator(true, GetFileBasedWarmupEnumerator(warmupRequest), GetHistoryWarmupEnumerator(historyWarmup)) { CanEmitNull = false },
new ConcatEnumerator(true, GetFileBasedWarmupEnumerator(warmupRequest, lastPointTracker), GetHistoryWarmupEnumerator(historyWarmup, lastPointTracker)) { CanEmitNull = false },
// if required by the original request, we will fill forward the Synced warmup data
request.Configuration.FillDataForward);
@@ -453,14 +457,25 @@ namespace QuantConnect.Lean.Engine.DataFeeds
/// <summary>
/// File based warmup enumerator
/// </summary>
private IEnumerator<BaseData> GetFileBasedWarmupEnumerator(SubscriptionRequest warmup)
private IEnumerator<BaseData> GetFileBasedWarmupEnumerator(SubscriptionRequest warmup, LastPointTracker lastPointTracker)
{
IEnumerator<BaseData> result = null;
try
{
result = new FilterEnumerator<BaseData>(CreateEnumerator(warmup),
// don't let future data past, nor fill forward, that will be handled after merging with the history request response
data => data == null || data.EndTime < warmup.EndTimeLocal && !data.IsFillForward);
data =>
{
// don't let future data past, nor fill forward, that will be handled after merging with the history request response
if (data == null || data.EndTime < warmup.EndTimeLocal && !data.IsFillForward)
{
if (data != null)
{
lastPointTracker.LastDataPoint = data;
}
return true;
}
return false;
});
}
catch (Exception e)
{
@@ -472,42 +487,60 @@ namespace QuantConnect.Lean.Engine.DataFeeds
/// <summary>
/// History based warmup enumerator
/// </summary>
private IEnumerator<BaseData> GetHistoryWarmupEnumerator(SubscriptionRequest warmup)
private IEnumerator<BaseData> GetHistoryWarmupEnumerator(SubscriptionRequest warmup, LastPointTracker lastPointTracker)
{
IEnumerator<BaseData> result = null;
try
IEnumerator<BaseData> result;
if (warmup.IsUniverseSubscription)
{
if (warmup.IsUniverseSubscription)
result = CreateUniverseEnumerator(warmup, createUnderlyingEnumerator: (req) => GetHistoryWarmupEnumerator(req, lastPointTracker));
}
else
{
// we create an enumerable of which we get the enumerator to defer the creation of the history request until the file based enumeration ended
// and potentially the 'lastPointTracker' is available to adjust our start time
result = new[] { warmup }.SelectMany(_ =>
{
result = CreateUniverseEnumerator(warmup, createUnderlyingEnumerator: GetHistoryWarmupEnumerator);
}
else
{
var historyRequest = new Data.HistoryRequest(warmup.Configuration, warmup.ExchangeHours, warmup.StartTimeUtc, warmup.EndTimeUtc);
result = _algorithm.HistoryProvider.GetHistory(new[] { historyRequest }, _algorithm.TimeZone).Select(slice =>
var startTimeUtc = warmup.StartTimeUtc;
if (lastPointTracker != null && lastPointTracker.LastDataPoint != null)
{
try
var utcLastPointTime = lastPointTracker.LastDataPoint.Time.ConvertToUtc(warmup.ExchangeHours.TimeZone);
if(utcLastPointTime > startTimeUtc)
{
var data = slice.Get(historyRequest.DataType);
return (BaseData)data[warmup.Configuration.Symbol];
if (Log.DebuggingEnabled)
{
Log.Debug($"LiveTradingDataFeed.GetHistoryWarmupEnumerator(): Adjusting history warmup start time to {utcLastPointTime} from {startTimeUtc} for {warmup.Configuration}");
}
startTimeUtc = utcLastPointTime;
}
catch (Exception e)
}
var historyRequest = new Data.HistoryRequest(warmup.Configuration, warmup.ExchangeHours, startTimeUtc, warmup.EndTimeUtc);
try
{
return _algorithm.HistoryProvider.GetHistory(new[] { historyRequest }, _algorithm.TimeZone).Select(slice =>
{
Log.Error(e, $"History warmup: {warmup.Configuration}");
}
return null;
}).GetEnumerator();
}
try
{
var data = slice.Get(historyRequest.DataType);
return (BaseData)data[warmup.Configuration.Symbol];
}
catch (Exception e)
{
Log.Error(e, $"History warmup: {warmup.Configuration}");
}
return null;
});
}
catch
{
// some history providers could throw if they do not support a type
}
return Enumerable.Empty<BaseData>();
}).GetEnumerator();
}
return new FilterEnumerator<BaseData>(result,
// don't let future data past, nor fill forward, that will be handled after merging with the file based enumerator
data => data == null || data.EndTime < warmup.EndTimeLocal && !data.IsFillForward);
}
catch
{
// some history providers could throw if they do not support a type
}
return result;
return new FilterEnumerator<BaseData>(result,
// don't let future data past, nor fill forward, that will be handled after merging with the file based enumerator
data => data == null || data.EndTime < warmup.EndTimeLocal && !data.IsFillForward);
}
/// <summary>
@@ -548,5 +581,10 @@ namespace QuantConnect.Lean.Engine.DataFeeds
EnumeratorFinished += (_, _) => enqueueable.Stop();
}
}
private class LastPointTracker
{
public BaseData LastDataPoint { get; set; }
}
}
}