Compare commits
9 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6c42a266be | ||
|
|
8b7da686ea | ||
|
|
d3280c5e60 | ||
|
|
fe1f22543f | ||
|
|
57ddbbbf05 | ||
|
|
9579c4263b | ||
|
|
f082a40f34 | ||
|
|
d739873daa | ||
|
|
0f7dfe8ec8 |
129
Algorithm.CSharp/InternalSubscriptionHistoryRequestAlgorithm.cs
Normal file
129
Algorithm.CSharp/InternalSubscriptionHistoryRequestAlgorithm.cs
Normal file
@@ -0,0 +1,129 @@
|
||||
/*
|
||||
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
|
||||
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
using System;
|
||||
using System.Linq;
|
||||
using QuantConnect.Data;
|
||||
using QuantConnect.Interfaces;
|
||||
using System.Collections.Generic;
|
||||
|
||||
namespace QuantConnect.Algorithm.CSharp
|
||||
{
|
||||
/// <summary>
|
||||
/// Regression algorithm asserting data returned by a history requests uses internal subscriptions correctly
|
||||
/// </summary>
|
||||
public class InternalSubscriptionHistoryRequestAlgorithm : QCAlgorithm, IRegressionAlgorithmDefinition
|
||||
{
|
||||
/// <summary>
|
||||
/// Initialise the data and resolution required, as well as the cash and start-end dates for your algorithm. All algorithms must initialized.
|
||||
/// </summary>
|
||||
public override void Initialize()
|
||||
{
|
||||
SetStartDate(2013, 10, 07);
|
||||
SetEndDate(2013, 10, 11);
|
||||
|
||||
AddEquity("AAPL", Resolution.Hour);
|
||||
SetBenchmark("SPY");
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// OnData event is the primary entry point for your algorithm. Each new data point will be pumped in here.
|
||||
/// </summary>
|
||||
/// <param name="data">Slice object keyed by symbol containing the stock data</param>
|
||||
public override void OnData(Slice data)
|
||||
{
|
||||
if (!Portfolio.Invested)
|
||||
{
|
||||
SetHoldings("AAPL", 1);
|
||||
|
||||
var spy = QuantConnect.Symbol.Create("SPY", SecurityType.Equity, Market.USA);
|
||||
|
||||
var history = History(new[] { spy }, TimeSpan.FromDays(10));
|
||||
if (!history.Any() || !history.All(slice => slice.Bars.All(pair => pair.Value.Period == TimeSpan.FromHours(1))))
|
||||
{
|
||||
throw new Exception("Unexpected history result for internal subscription");
|
||||
}
|
||||
|
||||
// we add SPY using Daily > default benchmark using hourly
|
||||
AddEquity("SPY", Resolution.Daily);
|
||||
|
||||
history = History(new[] { spy }, TimeSpan.FromDays(10));
|
||||
if (!history.Any() || !history.All(slice => slice.Bars.All(pair => pair.Value.Period == TimeSpan.FromDays(1))))
|
||||
{
|
||||
throw new Exception("Unexpected history result for user subscription");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// This is used by the regression test system to indicate if the open source Lean repository has the required data to run this algorithm.
|
||||
/// </summary>
|
||||
public bool CanRunLocally { get; } = true;
|
||||
|
||||
/// <summary>
|
||||
/// This is used by the regression test system to indicate which languages this algorithm is written in.
|
||||
/// </summary>
|
||||
public Language[] Languages { get; } = { Language.CSharp };
|
||||
|
||||
/// <summary>
|
||||
/// This is used by the regression test system to indicate what the expected statistics are from running the algorithm
|
||||
/// </summary>
|
||||
public Dictionary<string, string> ExpectedStatistics => new Dictionary<string, string>
|
||||
{
|
||||
{"Total Trades", "1"},
|
||||
{"Average Win", "0%"},
|
||||
{"Average Loss", "0%"},
|
||||
{"Compounding Annual Return", "32.114%"},
|
||||
{"Drawdown", "2.300%"},
|
||||
{"Expectancy", "0"},
|
||||
{"Net Profit", "0.382%"},
|
||||
{"Sharpe Ratio", "4.667"},
|
||||
{"Probabilistic Sharpe Ratio", "60.815%"},
|
||||
{"Loss Rate", "0%"},
|
||||
{"Win Rate", "0%"},
|
||||
{"Profit-Loss Ratio", "0"},
|
||||
{"Alpha", "-0.04"},
|
||||
{"Beta", "0.55"},
|
||||
{"Annual Standard Deviation", "0.156"},
|
||||
{"Annual Variance", "0.024"},
|
||||
{"Information Ratio", "-4.654"},
|
||||
{"Tracking Error", "0.144"},
|
||||
{"Treynor Ratio", "1.326"},
|
||||
{"Total Fees", "$32.11"},
|
||||
{"Estimated Strategy Capacity", "$66000000.00"},
|
||||
{"Lowest Capacity Asset", "AAPL R735QTJ8XC9X"},
|
||||
{"Fitness Score", "0.189"},
|
||||
{"Kelly Criterion Estimate", "0"},
|
||||
{"Kelly Criterion Probability Value", "0"},
|
||||
{"Sortino Ratio", "4.037"},
|
||||
{"Return Over Maximum Drawdown", "15.534"},
|
||||
{"Portfolio Turnover", "0.2"},
|
||||
{"Total Insights Generated", "0"},
|
||||
{"Total Insights Closed", "0"},
|
||||
{"Total Insights Analysis Completed", "0"},
|
||||
{"Long Insight Count", "0"},
|
||||
{"Short Insight Count", "0"},
|
||||
{"Long/Short Ratio", "100%"},
|
||||
{"Estimated Monthly Alpha Value", "$0"},
|
||||
{"Total Accumulated Estimated Alpha Value", "$0"},
|
||||
{"Mean Population Estimated Insight Value", "$0"},
|
||||
{"Mean Population Direction", "0%"},
|
||||
{"Mean Population Magnitude", "0%"},
|
||||
{"Rolling Averaged Population Direction", "0%"},
|
||||
{"Rolling Averaged Population Magnitude", "0%"},
|
||||
{"OrderListHash", "b7b8e83e4456e143c2c4c11fa31a1cf2"}
|
||||
};
|
||||
}
|
||||
}
|
||||
@@ -660,17 +660,42 @@ namespace QuantConnect.Algorithm
|
||||
private IEnumerable<SubscriptionDataConfig> GetMatchingSubscriptions(Symbol symbol, Type type, Resolution? resolution = null)
|
||||
{
|
||||
var matchingSubscriptions = SubscriptionManager.SubscriptionDataConfigService
|
||||
// we add internal subscription so that history requests are covered, this allows us to warm them up too
|
||||
.GetSubscriptionDataConfigs(symbol, includeInternalConfigs:true)
|
||||
// find all subscriptions matching the requested type with a higher resolution than requested
|
||||
.OrderByDescending(s => s.Resolution)
|
||||
// lets make sure to respect the order of the data types
|
||||
.ThenByDescending(config => GetTickTypeOrder(config.SecurityType, config.TickType))
|
||||
.ThenBy(config => config.IsInternalFeed ? 1 : 0)
|
||||
.Where(s => SubscriptionDataConfigTypeFilter(type, s.Type))
|
||||
.ToList();
|
||||
.Where(s => SubscriptionDataConfigTypeFilter(type, s.Type));
|
||||
|
||||
var internalConfig = new List<SubscriptionDataConfig>();
|
||||
var userConfig = new List<SubscriptionDataConfig>();
|
||||
foreach (var config in matchingSubscriptions)
|
||||
{
|
||||
if (config.IsInternalFeed)
|
||||
{
|
||||
internalConfig.Add(config);
|
||||
}
|
||||
else
|
||||
{
|
||||
userConfig.Add(config);
|
||||
}
|
||||
}
|
||||
|
||||
// if we have any user defined subscription configuration we use it, else we use internal ones if any
|
||||
List<SubscriptionDataConfig> configs = null;
|
||||
if(userConfig.Count != 0)
|
||||
{
|
||||
configs = userConfig;
|
||||
}
|
||||
else if (internalConfig.Count != 0)
|
||||
{
|
||||
configs = internalConfig;
|
||||
}
|
||||
|
||||
// we use the subscription manager registered configurations here, we can not rely on the Securities collection
|
||||
// since this might be called when creating a security and warming it up
|
||||
if (matchingSubscriptions.Count != 0)
|
||||
if (configs != null && configs.Count != 0)
|
||||
{
|
||||
if (resolution.HasValue
|
||||
&& (resolution == Resolution.Daily || resolution == Resolution.Hour)
|
||||
@@ -679,10 +704,10 @@ namespace QuantConnect.Algorithm
|
||||
// for Daily and Hour resolution, for equities, we have to
|
||||
// filter out any existing subscriptions that could be of Quote type
|
||||
// This could happen if they were Resolution.Minute/Second/Tick
|
||||
return matchingSubscriptions.Where(s => s.TickType != TickType.Quote);
|
||||
return configs.Where(s => s.TickType != TickType.Quote);
|
||||
}
|
||||
|
||||
return matchingSubscriptions;
|
||||
return configs;
|
||||
}
|
||||
else
|
||||
{
|
||||
|
||||
@@ -49,6 +49,11 @@ namespace QuantConnect.Brokerages
|
||||
/// </summary>
|
||||
public event EventHandler<OrderEvent> OptionPositionAssigned;
|
||||
|
||||
/// <summary>
|
||||
/// Event that fires each time an option position has changed
|
||||
/// </summary>
|
||||
public event EventHandler<OptionNotificationEventArgs> OptionNotification;
|
||||
|
||||
/// <summary>
|
||||
/// Event that fires each time a user's brokerage account is changed
|
||||
/// </summary>
|
||||
@@ -157,6 +162,24 @@ namespace QuantConnect.Brokerages
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Event invocator for the OptionNotification event
|
||||
/// </summary>
|
||||
/// <param name="e">The OptionNotification event arguments</param>
|
||||
protected virtual void OnOptionNotification(OptionNotificationEventArgs e)
|
||||
{
|
||||
try
|
||||
{
|
||||
Log.Debug("Brokerage.OnOptionNotification(): " + e);
|
||||
|
||||
OptionNotification?.Invoke(this, e);
|
||||
}
|
||||
catch (Exception err)
|
||||
{
|
||||
Log.Error(err);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Event invocator for the AccountChanged event
|
||||
/// </summary>
|
||||
|
||||
@@ -79,5 +79,14 @@ namespace QuantConnect.Brokerages.InteractiveBrokers.Client
|
||||
RealisedPnl = realisedPnl;
|
||||
AccountName = accountName;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Returns a string that represents the current object.
|
||||
/// </summary>
|
||||
/// <returns>A string that represents the current object.</returns>
|
||||
public override string ToString()
|
||||
{
|
||||
return $"Contract: {Contract}, ConId: {Contract.ConId}, Position: {Position}, MarketPrice: {MarketPrice}, MarketValue: {MarketValue}, AverageCost: {AverageCost}, UnrealisedPnl: {UnrealisedPnl}, RealisedPnl: {RealisedPnl}, AccountName: {AccountName}";
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -998,7 +998,8 @@ namespace QuantConnect.Brokerages.InteractiveBrokers
|
||||
|
||||
if (order.Type == OrderType.OptionExercise)
|
||||
{
|
||||
_client.ClientSocket.exerciseOptions(ibOrderId, contract, 1, decimal.ToInt32(order.Quantity), _account, 0);
|
||||
// IB API requires exerciseQuantity to be positive
|
||||
_client.ClientSocket.exerciseOptions(ibOrderId, contract, 1, decimal.ToInt32(order.AbsoluteQuantity), _account, 0);
|
||||
}
|
||||
else
|
||||
{
|
||||
@@ -1749,6 +1750,16 @@ namespace QuantConnect.Brokerages.InteractiveBrokers
|
||||
{
|
||||
try
|
||||
{
|
||||
Log.Trace($"InteractiveBrokersBrokerage.HandlePortfolioUpdates(): {e}");
|
||||
|
||||
// notify the transaction handler about all option position updates
|
||||
if (e.Contract.SecType is IB.SecurityType.Option or IB.SecurityType.FutureOption)
|
||||
{
|
||||
var symbol = MapSymbol(e.Contract);
|
||||
|
||||
OnOptionNotification(new OptionNotificationEventArgs(symbol, e.Position));
|
||||
}
|
||||
|
||||
_accountHoldingsResetEvent.Reset();
|
||||
if (_loadExistingHoldings)
|
||||
{
|
||||
@@ -3544,4 +3555,5 @@ namespace QuantConnect.Brokerages.InteractiveBrokers
|
||||
1100, 1101, 1102, 2103, 2104, 2105, 2106, 2107, 2108, 2119, 2157, 2158, 10197
|
||||
};
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -38,7 +38,7 @@
|
||||
</PackageReference>
|
||||
<PackageReference Include="Newtonsoft.Json" Version="12.0.3" />
|
||||
<PackageReference Include="NodaTime" Version="3.0.5" />
|
||||
<PackageReference Include="QuantConnect.IBAutomater" Version="2.0.63" />
|
||||
<PackageReference Include="QuantConnect.IBAutomater" Version="2.0.64" />
|
||||
<PackageReference Include="RestSharp" Version="106.12.0" />
|
||||
</ItemGroup>
|
||||
<ItemGroup>
|
||||
|
||||
47
Common/Brokerages/OptionNotificationEventArgs.cs
Normal file
47
Common/Brokerages/OptionNotificationEventArgs.cs
Normal file
@@ -0,0 +1,47 @@
|
||||
/*
|
||||
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
|
||||
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
using System;
|
||||
using QuantConnect.Interfaces;
|
||||
|
||||
namespace QuantConnect.Brokerages
|
||||
{
|
||||
/// <summary>
|
||||
/// Event arguments class for the <see cref="IBrokerage.OptionNotification"/> event
|
||||
/// </summary>
|
||||
public sealed class OptionNotificationEventArgs : EventArgs
|
||||
{
|
||||
/// <summary>
|
||||
/// Gets the option symbol which has received a notification
|
||||
/// </summary>
|
||||
public Symbol Symbol { get; }
|
||||
|
||||
/// <summary>
|
||||
/// Gets the new option position (positive for long, zero for flat, negative for short)
|
||||
/// </summary>
|
||||
public decimal Position { get; }
|
||||
|
||||
/// <summary>
|
||||
/// Initializes a new instance of the <see cref="OptionNotificationEventArgs"/> class
|
||||
/// </summary>
|
||||
/// <param name="symbol">The symbol</param>
|
||||
/// <param name="position">The new option position</param>
|
||||
public OptionNotificationEventArgs(Symbol symbol, decimal position)
|
||||
{
|
||||
Symbol = symbol;
|
||||
Position = position;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -154,7 +154,7 @@ namespace QuantConnect.Data.Custom.Tiingo
|
||||
|
||||
var tiingoTicker = TiingoSymbolMapper.GetTiingoTicker(config.Symbol);
|
||||
var source = Invariant($"https://api.tiingo.com/tiingo/daily/{tiingoTicker}/prices?startDate={startDate:yyyy-MM-dd}&token={Tiingo.AuthCode}");
|
||||
return new SubscriptionDataSource(source, SubscriptionTransportMedium.RemoteFile, FileFormat.Collection);
|
||||
return new SubscriptionDataSource(source, SubscriptionTransportMedium.RemoteFile, FileFormat.UnfoldingCollection);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
|
||||
@@ -38,11 +38,19 @@ namespace QuantConnect.Data
|
||||
/// <summary>
|
||||
/// Reader returns a BaseDataCollection object.
|
||||
/// </summary>
|
||||
Collection,
|
||||
/// <remarks>Lean will unfold the collection and consume it as individual data points</remarks>
|
||||
UnfoldingCollection,
|
||||
|
||||
/// <summary>
|
||||
/// Data stored using an intermediate index source
|
||||
/// </summary>
|
||||
Index
|
||||
Index,
|
||||
|
||||
/// <summary>
|
||||
/// Data type inherits from BaseDataCollection.
|
||||
/// Reader method can return a non BaseDataCollection type which will be folded, based on unique time,
|
||||
/// into an instance of the data type.
|
||||
/// </summary>
|
||||
FoldingCollection
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,15 +15,16 @@
|
||||
*/
|
||||
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Collections;
|
||||
using System.Collections.Generic;
|
||||
|
||||
namespace QuantConnect.Data.UniverseSelection
|
||||
{
|
||||
/// <summary>
|
||||
/// This type exists for transport of data as a single packet
|
||||
/// </summary>
|
||||
public class BaseDataCollection : BaseData
|
||||
public class BaseDataCollection : BaseData, IEnumerable<BaseData>
|
||||
{
|
||||
private DateTime _endTime;
|
||||
|
||||
@@ -98,5 +99,23 @@ namespace QuantConnect.Data.UniverseSelection
|
||||
{
|
||||
return new BaseDataCollection(Time, EndTime, Symbol, Data);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Returns an IEnumerator for this enumerable Object. The enumerator provides
|
||||
/// a simple way to access all the contents of a collection.
|
||||
/// </summary>
|
||||
public IEnumerator<BaseData> GetEnumerator()
|
||||
{
|
||||
return (Data ?? Enumerable.Empty<BaseData>()).GetEnumerator();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Returns an IEnumerator for this enumerable Object. The enumerator provides
|
||||
/// a simple way to access all the contents of a collection.
|
||||
/// </summary>
|
||||
IEnumerator IEnumerable.GetEnumerator()
|
||||
{
|
||||
return GetEnumerator();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -38,6 +38,11 @@ namespace QuantConnect.Interfaces
|
||||
/// </summary>
|
||||
event EventHandler<OrderEvent> OptionPositionAssigned;
|
||||
|
||||
/// <summary>
|
||||
/// Event that fires each time an option position has changed
|
||||
/// </summary>
|
||||
event EventHandler<OptionNotificationEventArgs> OptionNotification;
|
||||
|
||||
/// <summary>
|
||||
/// Event that fires each time a user's brokerage account is changed
|
||||
/// </summary>
|
||||
@@ -124,4 +129,4 @@ namespace QuantConnect.Interfaces
|
||||
/// <returns>An enumerable of bars covering the span specified in the request</returns>
|
||||
IEnumerable<BaseData> GetHistory(HistoryRequest request);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
/*
|
||||
/*
|
||||
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
|
||||
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
|
||||
*
|
||||
@@ -103,7 +103,7 @@ namespace QuantConnect.Securities.Option
|
||||
/// <returns>True if the option contract is expired at the specified time, false otherwise</returns>
|
||||
public static bool IsOptionContractExpired(Symbol symbol, DateTime currentTimeUtc)
|
||||
{
|
||||
if (symbol.SecurityType != SecurityType.Option)
|
||||
if (!symbol.SecurityType.IsOption())
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
89
Engine/DataFeeds/BaseDataCollectionAggregatorReader.cs
Normal file
89
Engine/DataFeeds/BaseDataCollectionAggregatorReader.cs
Normal file
@@ -0,0 +1,89 @@
|
||||
/*
|
||||
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
|
||||
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
using System;
|
||||
using QuantConnect.Data;
|
||||
using QuantConnect.Interfaces;
|
||||
using System.Collections.Generic;
|
||||
using QuantConnect.Data.UniverseSelection;
|
||||
|
||||
namespace QuantConnect.Lean.Engine.DataFeeds
|
||||
{
|
||||
/// <summary>
|
||||
/// Data source reader that will aggregate data points into a base data collection
|
||||
/// </summary>
|
||||
public class BaseDataCollectionAggregatorReader : TextSubscriptionDataSourceReader
|
||||
{
|
||||
private readonly Type _collectionType;
|
||||
private BaseDataCollection _collection;
|
||||
|
||||
/// <summary>
|
||||
/// Initializes a new instance of the <see cref="TextSubscriptionDataSourceReader"/> class
|
||||
/// </summary>
|
||||
/// <param name="dataCacheProvider">This provider caches files if needed</param>
|
||||
/// <param name="config">The subscription's configuration</param>
|
||||
/// <param name="date">The date this factory was produced to read data for</param>
|
||||
/// <param name="isLiveMode">True if we're in live mode, false for backtesting</param>
|
||||
public BaseDataCollectionAggregatorReader(IDataCacheProvider dataCacheProvider, SubscriptionDataConfig config, DateTime date, bool isLiveMode)
|
||||
: base(dataCacheProvider, config, date, isLiveMode)
|
||||
{
|
||||
_collectionType = config.Type;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Reads the specified <paramref name="source"/>
|
||||
/// </summary>
|
||||
/// <param name="source">The source to be read</param>
|
||||
/// <returns>An <see cref="IEnumerable{BaseData}"/> that contains the data in the source</returns>
|
||||
public override IEnumerable<BaseData> Read(SubscriptionDataSource source)
|
||||
{
|
||||
foreach (var point in base.Read(source))
|
||||
{
|
||||
if (point is BaseDataCollection)
|
||||
{
|
||||
// if underlying already is returning a collection let it through as is
|
||||
yield return point;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (_collection != null && _collection.EndTime != point.EndTime)
|
||||
{
|
||||
// when we get a new time we flush current collection instance, if any
|
||||
yield return _collection;
|
||||
_collection = null;
|
||||
}
|
||||
|
||||
if (_collection == null)
|
||||
{
|
||||
_collection = (BaseDataCollection)Activator.CreateInstance(_collectionType);
|
||||
_collection.Time = point.Time;
|
||||
_collection.Symbol = point.Symbol;
|
||||
_collection.EndTime = point.EndTime;
|
||||
}
|
||||
// aggregate the data points
|
||||
_collection.Data.Add(point);
|
||||
}
|
||||
}
|
||||
|
||||
// underlying reader ended, flush current collection instance if any
|
||||
if (_collection != null)
|
||||
{
|
||||
yield return _collection;
|
||||
_collection = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -14,10 +14,10 @@
|
||||
*/
|
||||
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.ComponentModel;
|
||||
using QuantConnect.Data;
|
||||
using System.ComponentModel;
|
||||
using QuantConnect.Interfaces;
|
||||
using System.Collections.Generic;
|
||||
using QuantConnect.Lean.Engine.DataFeeds.Transport;
|
||||
|
||||
namespace QuantConnect.Lean.Engine.DataFeeds
|
||||
@@ -41,7 +41,7 @@ namespace QuantConnect.Lean.Engine.DataFeeds
|
||||
/// Event fired when the specified source is considered invalid, this may
|
||||
/// be from a missing file or failure to download a remote source
|
||||
/// </summary>
|
||||
public abstract event EventHandler<InvalidSourceEventArgs> InvalidSource;
|
||||
public event EventHandler<InvalidSourceEventArgs> InvalidSource;
|
||||
|
||||
/// <summary>
|
||||
/// Creates a new instance
|
||||
@@ -66,40 +66,55 @@ namespace QuantConnect.Lean.Engine.DataFeeds
|
||||
/// <returns>A new instance of <see cref="IStreamReader"/> to read the source, or null if there was an error</returns>
|
||||
protected IStreamReader CreateStreamReader(SubscriptionDataSource subscriptionDataSource)
|
||||
{
|
||||
IStreamReader reader;
|
||||
switch (subscriptionDataSource.TransportMedium)
|
||||
IStreamReader reader = null;
|
||||
try
|
||||
{
|
||||
case SubscriptionTransportMedium.LocalFile:
|
||||
reader = HandleLocalFileSource(subscriptionDataSource);
|
||||
break;
|
||||
switch (subscriptionDataSource.TransportMedium)
|
||||
{
|
||||
case SubscriptionTransportMedium.LocalFile:
|
||||
reader = new LocalFileSubscriptionStreamReader(DataCacheProvider, subscriptionDataSource.Source);
|
||||
break;
|
||||
|
||||
case SubscriptionTransportMedium.RemoteFile:
|
||||
reader = HandleRemoteSourceFile(subscriptionDataSource);
|
||||
break;
|
||||
case SubscriptionTransportMedium.RemoteFile:
|
||||
reader = HandleRemoteSourceFile(subscriptionDataSource);
|
||||
break;
|
||||
|
||||
case SubscriptionTransportMedium.Rest:
|
||||
reader = new RestSubscriptionStreamReader(subscriptionDataSource.Source, subscriptionDataSource.Headers, IsLiveMode);
|
||||
break;
|
||||
case SubscriptionTransportMedium.Rest:
|
||||
reader = new RestSubscriptionStreamReader(subscriptionDataSource.Source, subscriptionDataSource.Headers, IsLiveMode);
|
||||
break;
|
||||
|
||||
default:
|
||||
throw new InvalidEnumArgumentException("Unexpected SubscriptionTransportMedium specified: " + subscriptionDataSource.TransportMedium);
|
||||
default:
|
||||
throw new InvalidEnumArgumentException("Unexpected SubscriptionTransportMedium specified: " + subscriptionDataSource.TransportMedium);
|
||||
}
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
OnInvalidSource(subscriptionDataSource, e);
|
||||
return reader;
|
||||
}
|
||||
|
||||
if (reader == null || reader.EndOfStream)
|
||||
{
|
||||
OnInvalidSource(subscriptionDataSource, new Exception($"The reader was empty for source: ${subscriptionDataSource.Source}"));
|
||||
return null;
|
||||
}
|
||||
return reader;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Opens up an IStreamReader for a local file source
|
||||
/// Event invocator for the <see cref="InvalidSource"/> event
|
||||
/// </summary>
|
||||
protected IStreamReader HandleLocalFileSource(SubscriptionDataSource source)
|
||||
/// <param name="source">The <see cref="SubscriptionDataSource"/> that was invalid</param>
|
||||
/// <param name="exception">The exception if one was raised, otherwise null</param>
|
||||
protected void OnInvalidSource(SubscriptionDataSource source, Exception exception)
|
||||
{
|
||||
// handles zip or text files
|
||||
return new LocalFileSubscriptionStreamReader(DataCacheProvider, source.Source);
|
||||
InvalidSource?.Invoke(this, new InvalidSourceEventArgs(source, exception));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Opens up an IStreamReader for a remote file source
|
||||
/// </summary>
|
||||
protected IStreamReader HandleRemoteSourceFile(SubscriptionDataSource source)
|
||||
private IStreamReader HandleRemoteSourceFile(SubscriptionDataSource source)
|
||||
{
|
||||
SubscriptionDataSourceReader.CheckRemoteFileCache();
|
||||
|
||||
|
||||
@@ -13,11 +13,11 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using QuantConnect.Data;
|
||||
using QuantConnect.Data.UniverseSelection;
|
||||
using QuantConnect.Util;
|
||||
using QuantConnect.Interfaces;
|
||||
using QuantConnect.Lean.Engine.DataFeeds.Transport;
|
||||
using System.Collections.Generic;
|
||||
using QuantConnect.Data.UniverseSelection;
|
||||
|
||||
namespace QuantConnect.Lean.Engine.DataFeeds
|
||||
{
|
||||
@@ -25,14 +25,11 @@ namespace QuantConnect.Lean.Engine.DataFeeds
|
||||
/// Collection Subscription Factory takes a BaseDataCollection from BaseData factories
|
||||
/// and yields it one point at a time to the algorithm
|
||||
/// </summary>
|
||||
public class CollectionSubscriptionDataSourceReader : ISubscriptionDataSourceReader
|
||||
public class CollectionSubscriptionDataSourceReader : BaseSubscriptionDataSourceReader
|
||||
{
|
||||
|
||||
private readonly DateTime _date;
|
||||
private readonly bool _isLiveMode;
|
||||
private readonly BaseData _factory;
|
||||
private readonly SubscriptionDataConfig _config;
|
||||
private readonly IDataCacheProvider _dataCacheProvider;
|
||||
|
||||
/// <summary>
|
||||
/// Initializes a new instance of the <see cref="CollectionSubscriptionDataSourceReader"/> class
|
||||
@@ -42,20 +39,13 @@ namespace QuantConnect.Lean.Engine.DataFeeds
|
||||
/// <param name="date">The date this factory was produced to read data for</param>
|
||||
/// <param name="isLiveMode">True if we're in live mode, false for backtesting</param>
|
||||
public CollectionSubscriptionDataSourceReader(IDataCacheProvider dataCacheProvider, SubscriptionDataConfig config, DateTime date, bool isLiveMode)
|
||||
:base(dataCacheProvider, isLiveMode)
|
||||
{
|
||||
_dataCacheProvider = dataCacheProvider;
|
||||
_date = date;
|
||||
_config = config;
|
||||
_isLiveMode = isLiveMode;
|
||||
_factory = _config.GetBaseDataInstance();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Event fired when the specified source is considered invalid, this may
|
||||
/// be from a missing file or failure to download a remote source
|
||||
/// </summary>
|
||||
public event EventHandler<InvalidSourceEventArgs> InvalidSource;
|
||||
|
||||
/// <summary>
|
||||
/// Event fired when an exception is thrown during a call to
|
||||
/// <see cref="BaseData.Reader(SubscriptionDataConfig, string, DateTime, bool)"/>
|
||||
@@ -67,38 +57,16 @@ namespace QuantConnect.Lean.Engine.DataFeeds
|
||||
/// </summary>
|
||||
/// <param name="source">The source to be read</param>
|
||||
/// <returns>An <see cref="IEnumerable{BaseData}"/> that contains the data in the source</returns>
|
||||
public IEnumerable<BaseData> Read(SubscriptionDataSource source)
|
||||
public override IEnumerable<BaseData> Read(SubscriptionDataSource source)
|
||||
{
|
||||
SubscriptionDataSourceReader.CheckRemoteFileCache();
|
||||
|
||||
IStreamReader reader = null;
|
||||
try
|
||||
{
|
||||
try
|
||||
reader = CreateStreamReader(source);
|
||||
if (reader == null)
|
||||
{
|
||||
switch (source.TransportMedium)
|
||||
{
|
||||
default:
|
||||
case SubscriptionTransportMedium.Rest:
|
||||
reader = new RestSubscriptionStreamReader(source.Source, source.Headers, _isLiveMode);
|
||||
break;
|
||||
case SubscriptionTransportMedium.LocalFile:
|
||||
reader = new LocalFileSubscriptionStreamReader(_dataCacheProvider, source.Source);
|
||||
break;
|
||||
case SubscriptionTransportMedium.RemoteFile:
|
||||
reader = new RemoteFileSubscriptionStreamReader(_dataCacheProvider, source.Source, Globals.Cache, source.Headers);
|
||||
break;
|
||||
}
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
OnInvalidSource(source, e);
|
||||
yield break;
|
||||
}
|
||||
|
||||
if (reader.EndOfStream)
|
||||
{
|
||||
OnInvalidSource(source, new Exception($"The reader was empty for source: ${source.Source}"));
|
||||
yield break;
|
||||
}
|
||||
|
||||
@@ -109,7 +77,7 @@ namespace QuantConnect.Lean.Engine.DataFeeds
|
||||
try
|
||||
{
|
||||
raw = reader.ReadLine();
|
||||
var result = _factory.Reader(_config, raw, _date, _isLiveMode);
|
||||
var result = _factory.Reader(_config, raw, _date, IsLiveMode);
|
||||
instances = result as BaseDataCollection;
|
||||
if (instances == null && !reader.ShouldBeRateLimited)
|
||||
{
|
||||
@@ -126,11 +94,13 @@ namespace QuantConnect.Lean.Engine.DataFeeds
|
||||
}
|
||||
}
|
||||
|
||||
if (_isLiveMode
|
||||
if (IsLiveMode
|
||||
// this shouldn't happen, rest reader is the only one to be rate limited
|
||||
// and in live mode, but just in case...
|
||||
|| instances == null && reader.ShouldBeRateLimited)
|
||||
{
|
||||
// in live trading these data points will be unrolled at the
|
||||
// 'LiveCustomDataSubscriptionEnumeratorFactory' level
|
||||
yield return instances;
|
||||
}
|
||||
else
|
||||
@@ -147,8 +117,7 @@ namespace QuantConnect.Lean.Engine.DataFeeds
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (reader != null)
|
||||
reader.Dispose();
|
||||
reader.DisposeSafely();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -162,16 +131,5 @@ namespace QuantConnect.Lean.Engine.DataFeeds
|
||||
var handler = ReaderError;
|
||||
if (handler != null) handler(this, new ReaderErrorEventArgs(line, exception));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Event invocator for the <see cref="InvalidSource"/> event
|
||||
/// </summary>
|
||||
/// <param name="source">The <see cref="SubscriptionDataSource"/> that was invalid</param>
|
||||
/// <param name="exception">The exception if one was raised, otherwise null</param>
|
||||
private void OnInvalidSource(SubscriptionDataSource source, Exception exception)
|
||||
{
|
||||
var handler = InvalidSource;
|
||||
if (handler != null) handler(this, new InvalidSourceEventArgs(source, exception));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -98,7 +98,7 @@ namespace QuantConnect.Lean.Engine.DataFeeds.Enumerators.Factories
|
||||
enumerator = new RateLimitEnumerator<BaseData>(enumerator, _timeProvider, minimumTimeBetweenCalls);
|
||||
}
|
||||
|
||||
if (source.Format == FileFormat.Collection)
|
||||
if (source.Format == FileFormat.UnfoldingCollection)
|
||||
{
|
||||
// unroll collections into individual data points after fast forward/rate limiting applied
|
||||
enumerator = enumerator.SelectMany(data =>
|
||||
|
||||
@@ -15,10 +15,10 @@
|
||||
*/
|
||||
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using QuantConnect.Data;
|
||||
using QuantConnect.Interfaces;
|
||||
using QuantConnect.Util;
|
||||
using QuantConnect.Interfaces;
|
||||
using System.Collections.Generic;
|
||||
|
||||
namespace QuantConnect.Lean.Engine.DataFeeds
|
||||
{
|
||||
@@ -35,12 +35,6 @@ namespace QuantConnect.Lean.Engine.DataFeeds
|
||||
private IDataProvider _dataProvider;
|
||||
private readonly IndexedBaseData _factory;
|
||||
|
||||
/// <summary>
|
||||
/// Event fired when the specified source is considered invalid, this may
|
||||
/// be from a missing file or failure to download a remote source
|
||||
/// </summary>
|
||||
public override event EventHandler<InvalidSourceEventArgs> InvalidSource;
|
||||
|
||||
/// <summary>
|
||||
/// Creates a new instance of this <see cref="ISubscriptionDataSourceReader"/>
|
||||
/// </summary>
|
||||
@@ -121,16 +115,5 @@ namespace QuantConnect.Lean.Engine.DataFeeds
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Event invocator for the <see cref="InvalidSource"/> event
|
||||
/// </summary>
|
||||
/// <param name="source">The <see cref="SubscriptionDataSource"/> that was invalid</param>
|
||||
/// <param name="exception">The exception if one was raised, otherwise null</param>
|
||||
private void OnInvalidSource(SubscriptionDataSource source, Exception exception)
|
||||
{
|
||||
var handler = InvalidSource;
|
||||
if (handler != null) handler(this, new InvalidSourceEventArgs(source, exception));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -300,11 +300,11 @@ namespace QuantConnect.Lean.Engine.DataFeeds
|
||||
_customExchange.AddEnumerator(new EnumeratorHandler(config.Symbol, enumerator, enqueueable));
|
||||
enumerator = enqueueable;
|
||||
}
|
||||
else if (config.Type == typeof (CoarseFundamental))
|
||||
else if (config.Type == typeof (CoarseFundamental) || config.Type == typeof (ETFConstituentData))
|
||||
{
|
||||
Log.Trace($"LiveTradingDataFeed.CreateUniverseSubscription(): Creating coarse universe: {config.Symbol.ID}");
|
||||
Log.Trace($"LiveTradingDataFeed.CreateUniverseSubscription(): Creating {config.Type.Name} universe: {config.Symbol.ID}");
|
||||
|
||||
// Will try to pull coarse data from the data folder every 10min, file with today's date.
|
||||
// Will try to pull data from the data folder every 10min, file with yesterdays date.
|
||||
// If lean is started today it will trigger initial coarse universe selection
|
||||
var factory = new LiveCustomDataSubscriptionEnumeratorFactory(_timeProvider,
|
||||
// we adjust time to the previous tradable date
|
||||
@@ -320,8 +320,7 @@ namespace QuantConnect.Lean.Engine.DataFeeds
|
||||
var enqueable = new EnqueueableEnumerator<BaseData>();
|
||||
_customExchange.SetDataHandler(config.Symbol, data =>
|
||||
{
|
||||
var coarseData = data as BaseDataCollection;
|
||||
enqueable.Enqueue(new BaseDataCollection(coarseData.Time, config.Symbol, coarseData.Data));
|
||||
enqueable.Enqueue(data);
|
||||
subscription.OnNewDataAvailable();
|
||||
});
|
||||
enumerator = GetConfiguredFrontierAwareEnumerator(enqueable, tzOffsetProvider,
|
||||
|
||||
@@ -475,12 +475,18 @@ namespace QuantConnect.Lean.Engine.DataFeeds
|
||||
|
||||
private void AttachEventHandlers(ISubscriptionDataSourceReader dataSourceReader, SubscriptionDataSource source)
|
||||
{
|
||||
// NOTE: There seems to be some overlap in InvalidSource and CreateStreamReaderError
|
||||
// this may be worthy of further investigation and potential consolidation of events.
|
||||
|
||||
// handle missing files
|
||||
dataSourceReader.InvalidSource += (sender, args) =>
|
||||
{
|
||||
if (_config.IsCustomData && !_config.Type.GetBaseDataInstance().IsSparseData())
|
||||
{
|
||||
OnDownloadFailed(
|
||||
new DownloadFailedEventArgs(_config.Symbol,
|
||||
"We could not fetch the requested data. " +
|
||||
"This may not be valid data, or a failed download of custom data. " +
|
||||
$"Skipping source ({args.Source.Source})."));
|
||||
return;
|
||||
}
|
||||
|
||||
switch (args.Source.TransportMedium)
|
||||
{
|
||||
case SubscriptionTransportMedium.LocalFile:
|
||||
@@ -507,18 +513,6 @@ namespace QuantConnect.Lean.Engine.DataFeeds
|
||||
{
|
||||
// handle empty files/instantiation errors
|
||||
var textSubscriptionFactory = (TextSubscriptionDataSourceReader)dataSourceReader;
|
||||
textSubscriptionFactory.CreateStreamReaderError += (sender, args) =>
|
||||
{
|
||||
if (_config.IsCustomData && !_config.Type.GetBaseDataInstance().IsSparseData())
|
||||
{
|
||||
OnDownloadFailed(
|
||||
new DownloadFailedEventArgs(_config.Symbol,
|
||||
"We could not fetch the requested data. " +
|
||||
"This may not be valid data, or a failed download of custom data. " +
|
||||
$"Skipping source ({args.Source.Source})."));
|
||||
}
|
||||
};
|
||||
|
||||
// handle parser errors
|
||||
textSubscriptionFactory.ReaderError += (sender, args) =>
|
||||
{
|
||||
|
||||
@@ -16,10 +16,10 @@
|
||||
|
||||
using System;
|
||||
using System.IO;
|
||||
using QuantConnect.Configuration;
|
||||
using QuantConnect.Data;
|
||||
using QuantConnect.Interfaces;
|
||||
using QuantConnect.Logging;
|
||||
using QuantConnect.Interfaces;
|
||||
using QuantConnect.Configuration;
|
||||
|
||||
namespace QuantConnect.Lean.Engine.DataFeeds
|
||||
{
|
||||
@@ -44,14 +44,13 @@ namespace QuantConnect.Lean.Engine.DataFeeds
|
||||
public static ISubscriptionDataSourceReader ForSource(SubscriptionDataSource source, IDataCacheProvider dataCacheProvider, SubscriptionDataConfig config, DateTime date, bool isLiveMode, BaseData factory, IDataProvider dataProvider)
|
||||
{
|
||||
ISubscriptionDataSourceReader reader;
|
||||
TextSubscriptionDataSourceReader textReader = null;
|
||||
switch (source.Format)
|
||||
{
|
||||
case FileFormat.Csv:
|
||||
reader = textReader = new TextSubscriptionDataSourceReader(dataCacheProvider, config, date, isLiveMode);
|
||||
reader = new TextSubscriptionDataSourceReader(dataCacheProvider, config, date, isLiveMode);
|
||||
break;
|
||||
|
||||
case FileFormat.Collection:
|
||||
case FileFormat.UnfoldingCollection:
|
||||
reader = new CollectionSubscriptionDataSourceReader(dataCacheProvider, config, date, isLiveMode);
|
||||
break;
|
||||
|
||||
@@ -62,6 +61,10 @@ namespace QuantConnect.Lean.Engine.DataFeeds
|
||||
case FileFormat.Index:
|
||||
return new IndexSubscriptionDataSourceReader(dataCacheProvider, config, date, isLiveMode, dataProvider);
|
||||
|
||||
case FileFormat.FoldingCollection:
|
||||
reader = new BaseDataCollectionAggregatorReader(dataCacheProvider, config, date, isLiveMode);
|
||||
break;
|
||||
|
||||
default:
|
||||
throw new NotImplementedException("SubscriptionFactory.ForSource(" + source + ") has not been implemented yet.");
|
||||
}
|
||||
@@ -72,10 +75,6 @@ namespace QuantConnect.Lean.Engine.DataFeeds
|
||||
if (!factory.IsSparseData())
|
||||
{
|
||||
reader.InvalidSource += (sender, args) => Log.Error($"SubscriptionDataSourceReader.InvalidSource(): File not found: {args.Source.Source}");
|
||||
if (textReader != null)
|
||||
{
|
||||
textReader.CreateStreamReaderError += (sender, args) => Log.Error($"SubscriptionDataSourceReader.CreateStreamReaderError(): File not found: {args.Source.Source}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -17,12 +17,12 @@ using System;
|
||||
using System.IO;
|
||||
using System.Linq;
|
||||
using QuantConnect.Data;
|
||||
using QuantConnect.Logging;
|
||||
using QuantConnect.Interfaces;
|
||||
using QuantConnect.Data.Market;
|
||||
using System.Collections.Generic;
|
||||
using QuantConnect.Data.Fundamental;
|
||||
using QuantConnect.Data.UniverseSelection;
|
||||
using QuantConnect.Logging;
|
||||
|
||||
namespace QuantConnect.Lean.Engine.DataFeeds
|
||||
{
|
||||
@@ -43,24 +43,12 @@ namespace QuantConnect.Lean.Engine.DataFeeds
|
||||
private static volatile Dictionary<string, List<BaseData>> BaseDataSourceCache = new Dictionary<string, List<BaseData>>(100);
|
||||
private static Queue<string> CacheKeys = new Queue<string>(100);
|
||||
|
||||
/// <summary>
|
||||
/// Event fired when the specified source is considered invalid, this may
|
||||
/// be from a missing file or failure to download a remote source
|
||||
/// </summary>
|
||||
public override event EventHandler<InvalidSourceEventArgs> InvalidSource;
|
||||
|
||||
/// <summary>
|
||||
/// Event fired when an exception is thrown during a call to
|
||||
/// <see cref="BaseData.Reader(SubscriptionDataConfig,string,DateTime,bool)"/>
|
||||
/// </summary>
|
||||
public event EventHandler<ReaderErrorEventArgs> ReaderError;
|
||||
|
||||
/// <summary>
|
||||
/// Event fired when there's an error creating an <see cref="IStreamReader"/> or the
|
||||
/// instantiated <see cref="IStreamReader"/> has no data.
|
||||
/// </summary>
|
||||
public event EventHandler<CreateStreamReaderErrorEventArgs> CreateStreamReaderError;
|
||||
|
||||
/// <summary>
|
||||
/// Initializes a new instance of the <see cref="TextSubscriptionDataSourceReader"/> class
|
||||
/// </summary>
|
||||
@@ -116,10 +104,9 @@ namespace QuantConnect.Lean.Engine.DataFeeds
|
||||
cache = _shouldCacheDataPoints ? new List<BaseData>(30000) : null;
|
||||
using (var reader = CreateStreamReader(source))
|
||||
{
|
||||
// if the reader doesn't have data then we're done with this subscription
|
||||
if (reader == null || reader.EndOfStream)
|
||||
if (reader == null)
|
||||
{
|
||||
OnCreateStreamReaderError(_date, source);
|
||||
// if the reader doesn't have data then we're done with this subscription
|
||||
yield break;
|
||||
}
|
||||
|
||||
@@ -216,17 +203,6 @@ namespace QuantConnect.Lean.Engine.DataFeeds
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Event invocator for the <see cref="InvalidSource"/> event
|
||||
/// </summary>
|
||||
/// <param name="source">The <see cref="SubscriptionDataSource"/> that was invalid</param>
|
||||
/// <param name="exception">The exception if one was raised, otherwise null</param>
|
||||
private void OnInvalidSource(SubscriptionDataSource source, Exception exception)
|
||||
{
|
||||
var handler = InvalidSource;
|
||||
if (handler != null) handler(this, new InvalidSourceEventArgs(source, exception));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Event invocator for the <see cref="ReaderError"/> event
|
||||
/// </summary>
|
||||
@@ -238,17 +214,6 @@ namespace QuantConnect.Lean.Engine.DataFeeds
|
||||
if (handler != null) handler(this, new ReaderErrorEventArgs(line, exception));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Event invocator for the <see cref="CreateStreamReaderError"/> event
|
||||
/// </summary>
|
||||
/// <param name="date">The date of the source</param>
|
||||
/// <param name="source">The source that caused the error</param>
|
||||
private void OnCreateStreamReaderError(DateTime date, SubscriptionDataSource source)
|
||||
{
|
||||
var handler = CreateStreamReaderError;
|
||||
if (handler != null) handler(this, new CreateStreamReaderErrorEventArgs(date, source));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Set the cache size to use
|
||||
/// </summary>
|
||||
@@ -263,4 +228,4 @@ namespace QuantConnect.Lean.Engine.DataFeeds
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -25,6 +25,7 @@ using QuantConnect.Logging;
|
||||
using QuantConnect.Orders;
|
||||
using QuantConnect.Orders.Fees;
|
||||
using QuantConnect.Securities;
|
||||
using QuantConnect.Securities.Option;
|
||||
using QuantConnect.Securities.Positions;
|
||||
using QuantConnect.Util;
|
||||
|
||||
@@ -166,6 +167,11 @@ namespace QuantConnect.Lean.Engine.TransactionHandlers
|
||||
HandlePositionAssigned(fill);
|
||||
};
|
||||
|
||||
_brokerage.OptionNotification += (sender, e) =>
|
||||
{
|
||||
HandleOptionNotification(e);
|
||||
};
|
||||
|
||||
IsActive = true;
|
||||
|
||||
_algorithm = algorithm;
|
||||
@@ -1131,6 +1137,101 @@ namespace QuantConnect.Lean.Engine.TransactionHandlers
|
||||
_algorithm.OnAssignmentOrderEvent(fill);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Option notification event is received and new order events are generated
|
||||
/// </summary>
|
||||
private void HandleOptionNotification(OptionNotificationEventArgs e)
|
||||
{
|
||||
if (_algorithm.Securities.TryGetValue(e.Symbol, out var security))
|
||||
{
|
||||
if (OptionSymbol.IsOptionContractExpired(e.Symbol, CurrentTimeUtc))
|
||||
{
|
||||
if (e.Position == 0)
|
||||
{
|
||||
Log.Trace(
|
||||
"BrokerageTransactionHandler.HandleOptionNotification(): clearing position for expired option holding: " +
|
||||
$"Symbol: {e.Symbol.Value}, " +
|
||||
$"Quantity: {security.Holdings.Quantity}");
|
||||
|
||||
var quantity = -security.Holdings.Quantity;
|
||||
|
||||
var exerciseOrder = GenerateOptionExerciseOrder(security, quantity);
|
||||
|
||||
EmitOptionNotificationEvents(security, exerciseOrder);
|
||||
}
|
||||
else
|
||||
{
|
||||
Log.Error("BrokerageTransactionHandler.HandleOptionNotification(): " +
|
||||
$"unexpected position ({e.Position} instead of zero) " +
|
||||
$"for expired option contract: {e.Symbol.Value}");
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// if position is reduced, could be an early exercise or early assignment
|
||||
if (Math.Abs(e.Position) < security.Holdings.AbsoluteQuantity)
|
||||
{
|
||||
// if we are long the option and there is an open exercise order, assume it's an early exercise
|
||||
if (security.Holdings.IsLong)
|
||||
{
|
||||
if (GetOpenOrders(x =>
|
||||
x.Symbol == e.Symbol &&
|
||||
x.Type == OrderType.OptionExercise)
|
||||
.FirstOrDefault() is OptionExerciseOrder exerciseOrder)
|
||||
{
|
||||
EmitOptionNotificationEvents(security, exerciseOrder);
|
||||
}
|
||||
}
|
||||
|
||||
// if we are short the option and there are no buy orders, assume it's an early assignment
|
||||
else if (security.Holdings.IsShort)
|
||||
{
|
||||
if (!GetOpenOrders(x =>
|
||||
x.Symbol == e.Symbol &&
|
||||
x.Direction == OrderDirection.Buy)
|
||||
.Any())
|
||||
{
|
||||
var quantity = e.Position - security.Holdings.Quantity;
|
||||
|
||||
var exerciseOrder = GenerateOptionExerciseOrder(security, quantity);
|
||||
|
||||
EmitOptionNotificationEvents(security, exerciseOrder);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private OptionExerciseOrder GenerateOptionExerciseOrder(Security security, decimal quantity)
|
||||
{
|
||||
// generate new exercise order and ticket for the option
|
||||
var order = new OptionExerciseOrder(security.Symbol, quantity, CurrentTimeUtc)
|
||||
{
|
||||
Id = _algorithm.Transactions.GetIncrementOrderId()
|
||||
};
|
||||
|
||||
var ticket = order.ToOrderTicket(_algorithm.Transactions);
|
||||
|
||||
AddOpenOrder(order, ticket);
|
||||
|
||||
Interlocked.Increment(ref _totalOrderCount);
|
||||
|
||||
return order;
|
||||
}
|
||||
|
||||
private void EmitOptionNotificationEvents(Security security, OptionExerciseOrder order)
|
||||
{
|
||||
// generate the order events reusing the option exercise model
|
||||
var option = (Option)security;
|
||||
var orderEvents = option.OptionExerciseModel.OptionExercise(option, order);
|
||||
|
||||
foreach (var orderEvent in orderEvents)
|
||||
{
|
||||
HandleOrderEvent(orderEvent);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Gets the amount of time since the last call to algorithm.Portfolio.ProcessFill(fill)
|
||||
/// </summary>
|
||||
|
||||
@@ -68,7 +68,7 @@ namespace QuantConnect.Indicators
|
||||
WarmUpPeriod = period;
|
||||
|
||||
//Initialise ATR and SMA
|
||||
AverageTrueRange = new AverageTrueRange(name + "_AverageTrueRange", period, MovingAverageType.Simple);
|
||||
AverageTrueRange = new AverageTrueRange(name + "_AverageTrueRange", period, movingAverageType);
|
||||
MiddleBand = movingAverageType.AsIndicator(name + "_MiddleBand", period);
|
||||
|
||||
//Compute Lower Band
|
||||
|
||||
@@ -167,9 +167,9 @@
|
||||
// Required for IEX history requests
|
||||
"iex-cloud-api-key": "",
|
||||
|
||||
// Required for market data from Coin API
|
||||
// Required for market data from Coin API
|
||||
"coinapi-api-key": "",
|
||||
"coinapi-product": "free", // free, startup, streamer, professional, enterprise
|
||||
"coinapi-product": "free", // free, startup, streamer, professional, enterprise
|
||||
|
||||
// Required for streaming Polygon.io data
|
||||
// To get your access token go to https://polygon.io
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
/*
|
||||
/*
|
||||
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
|
||||
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
|
||||
*
|
||||
@@ -76,6 +76,7 @@ namespace QuantConnect.Tests.Algorithm
|
||||
#pragma warning disable 0067 // NullBrokerage doesn't use any of these so we will just ignore them
|
||||
public event EventHandler<OrderEvent> OrderStatusChanged;
|
||||
public event EventHandler<OrderEvent> OptionPositionAssigned;
|
||||
public event EventHandler<OptionNotificationEventArgs> OptionNotification;
|
||||
public event EventHandler<AccountEvent> AccountChanged;
|
||||
public event EventHandler<BrokerageMessageEvent> Message;
|
||||
#pragma warning restore 0067
|
||||
|
||||
@@ -94,4 +94,15 @@ namespace QuantConnect.Tests.Brokerages
|
||||
|
||||
public override bool ExpectedCancellationResult => true;
|
||||
}
|
||||
|
||||
// to be used with brokerages which do not support UpdateOrder
|
||||
public class NonUpdateableLimitIfTouchedOrderTestParameters : LimitIfTouchedOrderTestParameters
|
||||
{
|
||||
public NonUpdateableLimitIfTouchedOrderTestParameters(Symbol symbol, decimal highLimit, decimal lowLimit, IOrderProperties properties = null)
|
||||
: base(symbol, highLimit, lowLimit, properties)
|
||||
{
|
||||
}
|
||||
|
||||
public override bool ModifyUntilFilled => false;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Reflection;
|
||||
using System.Threading;
|
||||
using Moq;
|
||||
using NodaTime;
|
||||
@@ -1234,6 +1235,218 @@ namespace QuantConnect.Tests.Engine.BrokerageTransactionHandlerTests
|
||||
Assert.AreEqual(_algorithm.OrderEvents.Count(orderEvent => orderEvent.Status == OrderStatus.Invalid), 1);
|
||||
}
|
||||
|
||||
// Short Call --> OTM (expired worthless)
|
||||
[TestCase(-1, OptionRight.Call, 455, 100, 450, 1, 0, 100, "OTM")]
|
||||
// Short Put --> OTM (expired worthless)
|
||||
[TestCase(-1, OptionRight.Put, 455, 100, 460, 1, 0, 100, "OTM")]
|
||||
// Long Call --> OTM (expired worthless)
|
||||
[TestCase(1, OptionRight.Call, 455, 100, 450, 1, 0, 100, "OTM")]
|
||||
// Long Put --> OTM (expired worthless)
|
||||
[TestCase(1, OptionRight.Put, 455, 100, 460, 1, 0, 100, "OTM")]
|
||||
// Short Call --> ITM (assigned)
|
||||
[TestCase(-1, OptionRight.Call, 450, 100, 455, 2, 0, 0, "Automatic Assignment")]
|
||||
// Short Put --> ITM (assigned)
|
||||
[TestCase(-1, OptionRight.Put, 455, 100, 450, 2, 0, 200, "Automatic Assignment")]
|
||||
// Long Call --> ITM (auto-exercised)
|
||||
[TestCase(1, OptionRight.Call, 450, 100, 455, 2, 0, 200, "Automatic Exercise")]
|
||||
// Long Put --> ITM (auto-exercised)
|
||||
[TestCase(1, OptionRight.Put, 455, 100, 450, 2, 0, 0, "Automatic Exercise")]
|
||||
public void OptionExpirationEmitsOrderEvents(
|
||||
int initialOptionPosition,
|
||||
OptionRight optionRight,
|
||||
decimal strikePrice,
|
||||
int initialUnderlyingPosition,
|
||||
decimal underlyingPrice,
|
||||
int expectedOrderEvents,
|
||||
int expectedOptionPosition,
|
||||
int expectedUnderlyingPosition,
|
||||
string expectedMessage
|
||||
)
|
||||
{
|
||||
var algorithm = new TestAlgorithm();
|
||||
algorithm.SubscriptionManager.SetDataManager(new DataManagerStub(algorithm));
|
||||
|
||||
var equity = algorithm.AddEquity("SPY");
|
||||
var optionSymbol = Symbol.CreateOption(equity.Symbol, equity.Symbol.ID.Market, OptionStyle.American, optionRight, strikePrice,
|
||||
new DateTime(2021, 9, 8));
|
||||
var option = algorithm.AddOptionContract(optionSymbol);
|
||||
|
||||
algorithm.Portfolio[equity.Symbol].SetHoldings(underlyingPrice, initialUnderlyingPosition);
|
||||
algorithm.Portfolio[option.Symbol].SetHoldings(0.01m, initialOptionPosition);
|
||||
|
||||
equity.SetMarketPrice(new Tick { Value = underlyingPrice });
|
||||
algorithm.SetFinishedWarmingUp();
|
||||
|
||||
using var brokerage = new NoSubmitTestBrokerage(algorithm);
|
||||
var transactionHandler = new TestBrokerageTransactionHandler();
|
||||
transactionHandler.Initialize(algorithm, brokerage, new BacktestingResultHandler());
|
||||
|
||||
// 9 PM ET
|
||||
transactionHandler.TestCurrentTimeUtc = new DateTime(2021, 9, 9, 1, 0, 0);
|
||||
|
||||
algorithm.Transactions.SetOrderProcessor(transactionHandler);
|
||||
|
||||
var method = typeof(BrokerageTransactionHandler).GetMethod("HandleOptionNotification", BindingFlags.NonPublic | BindingFlags.Instance);
|
||||
Assert.IsNotNull(method);
|
||||
|
||||
var parameters = new object[] { new OptionNotificationEventArgs(optionSymbol, 0) };
|
||||
method.Invoke(transactionHandler, parameters);
|
||||
|
||||
transactionHandler.Exit();
|
||||
|
||||
var tickets = algorithm.Transactions.GetOrderTickets().ToList();
|
||||
Assert.AreEqual(1, tickets.Count);
|
||||
|
||||
var ticket = tickets.First();
|
||||
Assert.IsTrue(ticket.HasOrder);
|
||||
|
||||
Assert.AreEqual(expectedOrderEvents, ticket.OrderEvents.Count);
|
||||
Assert.AreEqual(1, ticket.OrderEvents.Count(x => x.Message.Contains(expectedMessage, StringComparison.InvariantCulture)));
|
||||
|
||||
Assert.AreEqual(expectedUnderlyingPosition, algorithm.Portfolio[equity.Symbol].Quantity);
|
||||
Assert.AreEqual(expectedOptionPosition, algorithm.Portfolio[optionSymbol].Quantity);
|
||||
}
|
||||
|
||||
// Long Call --> ITM (exercised early - full)
|
||||
[TestCase(1, OptionRight.Call, 450, 100, 455, 2, 0, 200, "Automatic Exercise")]
|
||||
// Long Put --> ITM (exercised early - full)
|
||||
[TestCase(1, OptionRight.Put, 455, 100, 450, 2, 0, 0, "Automatic Exercise")]
|
||||
// Long Call --> ITM (exercised early - partial)
|
||||
[TestCase(3, OptionRight.Call, 450, 100, 455, 2, 1, 300, "Automatic Exercise")]
|
||||
// Long Put --> ITM (exercised early - partial)
|
||||
[TestCase(3, OptionRight.Put, 455, 300, 450, 2, 1, 100, "Automatic Exercise")]
|
||||
public void EarlyExerciseEmitsOrderEvents(
|
||||
int initialOptionPosition,
|
||||
OptionRight optionRight,
|
||||
decimal strikePrice,
|
||||
int initialUnderlyingPosition,
|
||||
decimal underlyingPrice,
|
||||
int expectedOrderEvents,
|
||||
int expectedOptionPosition,
|
||||
int expectedUnderlyingPosition,
|
||||
string expectedMessage
|
||||
)
|
||||
{
|
||||
var algorithm = new TestAlgorithm();
|
||||
algorithm.SubscriptionManager.SetDataManager(new DataManagerStub(algorithm));
|
||||
|
||||
var equity = algorithm.AddEquity("SPY");
|
||||
var optionSymbol = Symbol.CreateOption(equity.Symbol, equity.Symbol.ID.Market, OptionStyle.American, optionRight, strikePrice,
|
||||
new DateTime(2021, 9, 8));
|
||||
var option = algorithm.AddOptionContract(optionSymbol);
|
||||
|
||||
algorithm.Portfolio[equity.Symbol].SetHoldings(underlyingPrice, initialUnderlyingPosition);
|
||||
algorithm.Portfolio[option.Symbol].SetHoldings(0.01m, initialOptionPosition);
|
||||
|
||||
equity.SetMarketPrice(new Tick { Value = underlyingPrice });
|
||||
algorithm.SetFinishedWarmingUp();
|
||||
|
||||
using var brokerage = new NoSubmitTestBrokerage(algorithm);
|
||||
var transactionHandler = new TestBrokerageTransactionHandler();
|
||||
transactionHandler.Initialize(algorithm, brokerage, new BacktestingResultHandler());
|
||||
|
||||
// 10 AM ET
|
||||
transactionHandler.TestCurrentTimeUtc = new DateTime(2021, 9, 8, 14, 0, 0);
|
||||
|
||||
algorithm.Transactions.SetOrderProcessor(transactionHandler);
|
||||
|
||||
// Creates an exercise order
|
||||
var exerciseQuantity = initialOptionPosition - expectedOptionPosition;
|
||||
var orderRequest = new SubmitOrderRequest(OrderType.OptionExercise, option.Type, option.Symbol, -exerciseQuantity, 0, 0, transactionHandler.TestCurrentTimeUtc, "");
|
||||
|
||||
// Submit and process the exercise order
|
||||
var orderTicket = transactionHandler.Process(orderRequest);
|
||||
transactionHandler.HandleOrderRequest(orderRequest);
|
||||
Assert.IsTrue(orderRequest.Response.IsProcessed);
|
||||
Assert.IsTrue(orderRequest.Response.IsSuccess);
|
||||
Assert.AreEqual(orderTicket.Status, OrderStatus.New);
|
||||
|
||||
var method = typeof(BrokerageTransactionHandler).GetMethod("HandleOptionNotification", BindingFlags.NonPublic | BindingFlags.Instance);
|
||||
Assert.IsNotNull(method);
|
||||
|
||||
var parameters = new object[] { new OptionNotificationEventArgs(optionSymbol, expectedOptionPosition) };
|
||||
method.Invoke(transactionHandler, parameters);
|
||||
|
||||
transactionHandler.Exit();
|
||||
|
||||
var tickets = algorithm.Transactions.GetOrderTickets().ToList();
|
||||
Assert.AreEqual(1, tickets.Count);
|
||||
|
||||
var ticket = tickets.First();
|
||||
Assert.IsTrue(ticket.HasOrder);
|
||||
|
||||
Assert.AreEqual(expectedOrderEvents, ticket.OrderEvents.Count);
|
||||
Assert.AreEqual(1, ticket.OrderEvents.Count(x => x.Message.Contains(expectedMessage, StringComparison.InvariantCulture)));
|
||||
|
||||
Assert.AreEqual(expectedUnderlyingPosition, algorithm.Portfolio[equity.Symbol].Quantity);
|
||||
Assert.AreEqual(expectedOptionPosition, algorithm.Portfolio[optionSymbol].Quantity);
|
||||
}
|
||||
|
||||
// Short Call --> ITM (assigned early - full)
|
||||
[TestCase(-1, OptionRight.Call, 450, 100, 455, 2, 0, 0, "Automatic Assignment")]
|
||||
// Short Put --> ITM (assigned early - full)
|
||||
[TestCase(-1, OptionRight.Put, 455, 100, 450, 2, 0, 200, "Automatic Assignment")]
|
||||
// Short Call --> ITM (assigned early - partial)
|
||||
[TestCase(-3, OptionRight.Call, 450, 300, 455, 2, -1, 100, "Automatic Assignment")]
|
||||
// Short Put --> ITM (assigned early - partial)
|
||||
[TestCase(-3, OptionRight.Put, 455, 100, 450, 2, -1, 300, "Automatic Assignment")]
|
||||
public void EarlyAssignmentEmitsOrderEvents(
|
||||
int initialOptionPosition,
|
||||
OptionRight optionRight,
|
||||
decimal strikePrice,
|
||||
int initialUnderlyingPosition,
|
||||
decimal underlyingPrice,
|
||||
int expectedOrderEvents,
|
||||
int expectedOptionPosition,
|
||||
int expectedUnderlyingPosition,
|
||||
string expectedMessage
|
||||
)
|
||||
{
|
||||
var algorithm = new TestAlgorithm();
|
||||
algorithm.SubscriptionManager.SetDataManager(new DataManagerStub(algorithm));
|
||||
|
||||
var equity = algorithm.AddEquity("SPY");
|
||||
var optionSymbol = Symbol.CreateOption(equity.Symbol, equity.Symbol.ID.Market, OptionStyle.American, optionRight, strikePrice,
|
||||
new DateTime(2021, 9, 8));
|
||||
var option = algorithm.AddOptionContract(optionSymbol);
|
||||
|
||||
algorithm.Portfolio[equity.Symbol].SetHoldings(underlyingPrice, initialUnderlyingPosition);
|
||||
algorithm.Portfolio[option.Symbol].SetHoldings(0.01m, initialOptionPosition);
|
||||
|
||||
equity.SetMarketPrice(new Tick { Value = underlyingPrice });
|
||||
algorithm.SetFinishedWarmingUp();
|
||||
|
||||
using var brokerage = new NoSubmitTestBrokerage(algorithm);
|
||||
var transactionHandler = new TestBrokerageTransactionHandler();
|
||||
transactionHandler.Initialize(algorithm, brokerage, new BacktestingResultHandler());
|
||||
|
||||
// 10 AM ET
|
||||
transactionHandler.TestCurrentTimeUtc = new DateTime(2021, 9, 8, 14, 0, 0);
|
||||
|
||||
algorithm.Transactions.SetOrderProcessor(transactionHandler);
|
||||
|
||||
var method = typeof(BrokerageTransactionHandler).GetMethod("HandleOptionNotification", BindingFlags.NonPublic | BindingFlags.Instance);
|
||||
Assert.IsNotNull(method);
|
||||
|
||||
var parameters = new object[] { new OptionNotificationEventArgs(optionSymbol, expectedOptionPosition) };
|
||||
method.Invoke(transactionHandler, parameters);
|
||||
|
||||
transactionHandler.Exit();
|
||||
|
||||
var tickets = algorithm.Transactions.GetOrderTickets().ToList();
|
||||
Assert.AreEqual(1, tickets.Count);
|
||||
|
||||
var ticket = tickets.First();
|
||||
Assert.IsTrue(ticket.HasOrder);
|
||||
|
||||
Assert.AreEqual(expectedOrderEvents, ticket.OrderEvents.Count);
|
||||
Assert.AreEqual(1, ticket.OrderEvents.Count(x => x.Message.Contains(expectedMessage, StringComparison.InvariantCulture)));
|
||||
|
||||
Assert.AreEqual(expectedUnderlyingPosition, algorithm.Portfolio[equity.Symbol].Quantity);
|
||||
Assert.AreEqual(expectedOptionPosition, algorithm.Portfolio[optionSymbol].Quantity);
|
||||
}
|
||||
|
||||
|
||||
internal class TestIncrementalOrderIdAlgorithm : OrderTicketDemoAlgorithm
|
||||
{
|
||||
public static readonly Dictionary<int, int> OrderEventIds = new Dictionary<int, int>();
|
||||
@@ -1361,6 +1574,7 @@ namespace QuantConnect.Tests.Engine.BrokerageTransactionHandlerTests
|
||||
base.RoundOrderPrices(order, security);
|
||||
}
|
||||
}
|
||||
|
||||
private class TestNonShortableProvider : IShortableProvider
|
||||
{
|
||||
public Dictionary<Symbol, long> AllShortableSymbols(DateTime localTime)
|
||||
|
||||
@@ -0,0 +1,92 @@
|
||||
/*
|
||||
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
|
||||
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
using System;
|
||||
using System.Linq;
|
||||
using NUnit.Framework;
|
||||
using QuantConnect.Util;
|
||||
using QuantConnect.Data;
|
||||
using QuantConnect.Data.Market;
|
||||
using QuantConnect.Lean.Engine.DataFeeds;
|
||||
using QuantConnect.Data.UniverseSelection;
|
||||
|
||||
namespace QuantConnect.Tests.Engine.DataFeeds
|
||||
{
|
||||
[TestFixture, Parallelizable(ParallelScope.Fixtures)]
|
||||
public class BaseDataCollectionAggregatorReaderTests
|
||||
{
|
||||
[TestCase(Resolution.Daily, 1, true)]
|
||||
[TestCase( Resolution.Hour, 1, true)]
|
||||
[TestCase(Resolution.Daily, 5849, false)]
|
||||
[TestCase(Resolution.Hour, 40832, false)]
|
||||
public void AggregatesDataPerTime(Resolution resolution, int expectedCount, bool singleDate)
|
||||
{
|
||||
var reader = Initialize(false, resolution, out var dataSource);
|
||||
TestBaseDataCollection.SingleDate = singleDate;
|
||||
|
||||
var result = reader.Read(dataSource).ToList();
|
||||
|
||||
Assert.AreEqual(expectedCount, result.Count);
|
||||
Assert.IsTrue(result.All(data => data is TestBaseDataCollection));
|
||||
|
||||
if (expectedCount == 1)
|
||||
{
|
||||
var collection = result[0] as TestBaseDataCollection;
|
||||
Assert.IsNotNull(collection);
|
||||
Assert.GreaterOrEqual(collection.Data.Count, 5000);
|
||||
Assert.AreEqual(expectedCount, collection.Data.DistinctBy(data => data.Time).Count());
|
||||
}
|
||||
}
|
||||
|
||||
private static ISubscriptionDataSourceReader Initialize(bool liveMode, Resolution resolution, out SubscriptionDataSource source)
|
||||
{
|
||||
using var dataProvider = new DefaultDataProvider();
|
||||
using var cache = new ZipDataCacheProvider(dataProvider);
|
||||
var config = new SubscriptionDataConfig(typeof(TestBaseDataCollection),
|
||||
Symbols.SPY,
|
||||
resolution,
|
||||
TimeZones.NewYork,
|
||||
TimeZones.NewYork,
|
||||
false,
|
||||
false,
|
||||
false);
|
||||
var date = DateTime.MinValue;
|
||||
var path = LeanData.GenerateZipFilePath(Globals.DataFolder, config.Symbol, date, resolution, TickType.Trade);
|
||||
source = new SubscriptionDataSource(path, SubscriptionTransportMedium.LocalFile);
|
||||
return new BaseDataCollectionAggregatorReader(cache, config, date, liveMode);
|
||||
}
|
||||
|
||||
private class TestBaseDataCollection : BaseDataCollection
|
||||
{
|
||||
public static volatile bool SingleDate;
|
||||
private static readonly TradeBar _factory = new TradeBar();
|
||||
public override BaseData Reader(SubscriptionDataConfig config, string line, DateTime date, bool isLiveMode)
|
||||
{
|
||||
var dataPoint = _factory.Reader(config, line, date, isLiveMode);
|
||||
if (SingleDate)
|
||||
{
|
||||
// single day
|
||||
dataPoint.Time = date;
|
||||
}
|
||||
return dataPoint;
|
||||
}
|
||||
|
||||
public override SubscriptionDataSource GetSource(SubscriptionDataConfig config, DateTime date, bool isLiveMode)
|
||||
{
|
||||
return _factory.GetSource(config, date, isLiveMode);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -120,7 +120,7 @@ namespace QuantConnect.Tests.Engine.DataFeeds.Enumerators.Factories
|
||||
_dataSourceReader.Setup(dsr => dsr.Read(It.Is<SubscriptionDataSource>(sds =>
|
||||
sds.Source == "rest.collection.source" &&
|
||||
sds.TransportMedium == SubscriptionTransportMedium.Rest &&
|
||||
sds.Format == FileFormat.Collection))
|
||||
sds.Format == FileFormat.UnfoldingCollection))
|
||||
)
|
||||
.Returns(Enumerable.Range(0, 100)
|
||||
.Select(i => new BaseDataCollection(_referenceLocal.AddSeconds(i), Symbols.SPY, Enumerable.Range(0, DataPerTimeStep)
|
||||
@@ -169,7 +169,7 @@ namespace QuantConnect.Tests.Engine.DataFeeds.Enumerators.Factories
|
||||
Assert.IsTrue(_enumerator.MoveNext());
|
||||
Assert.IsNull(_enumerator.Current);
|
||||
|
||||
VerifyGetSourceInvocationCount(_dataSourceReader, 1, "rest.collection.source", SubscriptionTransportMedium.Rest, FileFormat.Collection);
|
||||
VerifyGetSourceInvocationCount(_dataSourceReader, 1, "rest.collection.source", SubscriptionTransportMedium.Rest, FileFormat.UnfoldingCollection);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -477,7 +477,7 @@ namespace QuantConnect.Tests.Engine.DataFeeds.Enumerators.Factories
|
||||
{
|
||||
public override SubscriptionDataSource GetSource(SubscriptionDataConfig config, DateTime date, bool isLiveMode)
|
||||
{
|
||||
return new SubscriptionDataSource("rest.collection.source", SubscriptionTransportMedium.Rest, FileFormat.Collection);
|
||||
return new SubscriptionDataSource("rest.collection.source", SubscriptionTransportMedium.Rest, FileFormat.UnfoldingCollection);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -39,18 +39,21 @@ namespace QuantConnect.Tests.Engine.DataFeeds
|
||||
public void CoarseUniverseRotatesActiveSecurity()
|
||||
{
|
||||
var startDate = new DateTime(2014, 3, 24);
|
||||
var endDate = new DateTime(2014, 3, 29);
|
||||
var endDate = new DateTime(2014, 3, 31);
|
||||
|
||||
var timeProvider = new ManualTimeProvider(TimeZones.NewYork);
|
||||
timeProvider.SetCurrentTime(startDate);
|
||||
|
||||
var coarseTimes = new List<DateTime>
|
||||
{
|
||||
// coarse files go from the 24th (Monday) to the 28th (Friday)
|
||||
// they are emitted in the next day, excluding saturday
|
||||
new DateTime(2014, 3, 25, 5, 0, 0, 0),
|
||||
new DateTime(2014, 3, 26, 5, 0, 0, 0),
|
||||
new DateTime(2014, 3, 27, 5, 0, 0, 0),
|
||||
new DateTime(2014, 3, 28, 5, 0, 0, 0),
|
||||
new DateTime(2014, 3, 29, 5, 0, 0, 0)
|
||||
// 29th is Saturday
|
||||
new DateTime(2014, 3, 30, 5, 0, 0, 0)
|
||||
}.ToHashSet();
|
||||
|
||||
var coarseSymbols = new List<Symbol> { Symbols.SPY, Symbols.AAPL, Symbols.MSFT };
|
||||
|
||||
@@ -24,7 +24,6 @@ using NodaTime;
|
||||
using NUnit.Framework;
|
||||
using QuantConnect.Algorithm;
|
||||
using QuantConnect.Data;
|
||||
using QuantConnect.Data.Auxiliary;
|
||||
using QuantConnect.Data.Custom.IconicTypes;
|
||||
using QuantConnect.Data.Fundamental;
|
||||
using QuantConnect.Data.Market;
|
||||
@@ -584,11 +583,11 @@ namespace QuantConnect.Tests.Engine.DataFeeds
|
||||
}
|
||||
|
||||
[TestCase(FileFormat.Csv, true, false)]
|
||||
[TestCase(FileFormat.Collection, true, false)]
|
||||
[TestCase(FileFormat.UnfoldingCollection, true, false)]
|
||||
[TestCase(FileFormat.Csv, false, false)]
|
||||
[TestCase(FileFormat.Collection, false, false)]
|
||||
[TestCase(FileFormat.UnfoldingCollection, false, false)]
|
||||
[TestCase(FileFormat.Csv, false, true)]
|
||||
[TestCase(FileFormat.Collection, false, true)]
|
||||
[TestCase(FileFormat.UnfoldingCollection, false, true)]
|
||||
public void RestCustomDataReturningNullDoesNotInfinitelyPoll(FileFormat fileFormat, bool returnsNull, bool throwsException)
|
||||
{
|
||||
TestCustomData.FileFormat = fileFormat;
|
||||
@@ -739,34 +738,41 @@ namespace QuantConnect.Tests.Engine.DataFeeds
|
||||
Assert.AreEqual(1, receivedDelisted, $"Did not receive {DelistingType.Delisted}");
|
||||
}
|
||||
|
||||
[Test]
|
||||
public void CoarseFundamentalDataIsHoldUntilTimeIsRight()
|
||||
[TestCase("20140325", typeof(CoarseFundamental))]
|
||||
[TestCase("20201202", typeof(ETFConstituentData))]
|
||||
public void UniverseDataIsHoldUntilTimeIsRight(string dateTime, Type universeData)
|
||||
{
|
||||
_startDate = new DateTime(2014, 3, 25);
|
||||
_startDate = Time.ParseDate(dateTime);
|
||||
CustomMockedFileBaseData.StartDate = _startDate;
|
||||
_manualTimeProvider.SetCurrentTimeUtc(_startDate);
|
||||
|
||||
Log.Trace($"StartTime {_manualTimeProvider.GetUtcNow()}");
|
||||
Log.Debug($"StartTime {_manualTimeProvider.GetUtcNow()}");
|
||||
|
||||
// we just want to emit one single coarse data packet
|
||||
var feed = RunDataFeed(getNextTicksFunction: fdqh => Enumerable.Empty<BaseData>());
|
||||
|
||||
_algorithm.AddUniverse(coarse => coarse.Take(10).Select(x => x.Symbol));
|
||||
if (universeData == typeof(CoarseFundamental))
|
||||
{
|
||||
_algorithm.AddUniverse(coarse => coarse.Take(10).Select(x => x.Symbol));
|
||||
}
|
||||
else
|
||||
{
|
||||
_algorithm.AddUniverse(_algorithm.Universe.ETF("SPY", Market.USA, _algorithm.UniverseSettings,
|
||||
constituentData => constituentData.Take(10).Select(x => x.Symbol)));
|
||||
}
|
||||
// will add the universe
|
||||
_algorithm.OnEndOfTimeStep();
|
||||
|
||||
var receivedCoarseData = false;
|
||||
var receivedUniverseData = false;
|
||||
ConsumeBridge(feed, TimeSpan.FromSeconds(5), ts =>
|
||||
{
|
||||
if (ts.UniverseData.Count > 0 &&
|
||||
ts.UniverseData.First().Value.Data.First() is CoarseFundamental)
|
||||
ts.UniverseData.First().Value.Data.First().GetType() == universeData)
|
||||
{
|
||||
var now = _manualTimeProvider.GetUtcNow();
|
||||
Log.Trace($"Received BaseDataCollection {now}");
|
||||
|
||||
// Assert data got hold until time was right
|
||||
Assert.IsTrue(now.Hour < 23 && now.Hour > 5, $"Unexpected now value: {now}");
|
||||
receivedCoarseData = true;
|
||||
receivedUniverseData = true;
|
||||
|
||||
// we got what we wanted, end unit test
|
||||
_manualTimeProvider.SetCurrentTimeUtc(DateTime.UtcNow);
|
||||
@@ -776,9 +782,9 @@ namespace QuantConnect.Tests.Engine.DataFeeds
|
||||
secondsTimeStep: 3600,
|
||||
endDate: _startDate.AddDays(1));
|
||||
|
||||
Log.Trace($"EndTime {_manualTimeProvider.GetUtcNow()}");
|
||||
Log.Debug($"EndTime {_manualTimeProvider.GetUtcNow()}");
|
||||
|
||||
Assert.IsTrue(receivedCoarseData, "Did not receive Coarse data.");
|
||||
Assert.IsTrue(receivedUniverseData, "Did not receive universe data.");
|
||||
}
|
||||
|
||||
[Test]
|
||||
@@ -2433,7 +2439,7 @@ namespace QuantConnect.Tests.Engine.DataFeeds
|
||||
Symbol = config.Symbol
|
||||
};
|
||||
|
||||
if (FileFormat == FileFormat.Collection)
|
||||
if (FileFormat == FileFormat.UnfoldingCollection)
|
||||
{
|
||||
return new BaseDataCollection
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user