Compare commits

...

1 Commits
16659 ... 16640

Author SHA1 Message Date
Martin Molinero
9ec60cb994 Improve shutdown 2024-09-19 17:59:04 -03:00
6 changed files with 71 additions and 31 deletions

View File

@@ -18,7 +18,6 @@ using System.Threading;
using System.Threading.Tasks;
using QuantConnect.Logging;
using QuantConnect.Util;
using static QuantConnect.StringExtensions;
namespace QuantConnect
{
@@ -36,22 +35,6 @@ namespace QuantConnect
get; private set;
}
/// <summary>
/// Algo cancellation controls - cancellation token for algorithm thread.
/// </summary>
public CancellationToken CancellationToken
{
get { return CancellationTokenSource.Token; }
}
/// <summary>
/// Check if this task isolator is cancelled, and exit the analysis
/// </summary>
public bool IsCancellationRequested
{
get { return CancellationTokenSource.IsCancellationRequested; }
}
/// <summary>
/// Initializes a new instance of the <see cref="Isolator"/> class
/// </summary>
@@ -117,7 +100,7 @@ namespace QuantConnect
memoryCap *= 1024 * 1024;
var spikeLimit = memoryCap*2;
while (!task.IsCompleted && utcNow < end)
while (!task.IsCompleted && !CancellationTokenSource.IsCancellationRequested && utcNow < end)
{
// if over 80% allocation force GC then sample
var sample = Convert.ToDouble(GC.GetTotalMemory(memoryUsed > memoryCap * 0.8));
@@ -166,15 +149,26 @@ namespace QuantConnect
utcNow = DateTime.UtcNow;
}
if (task.IsCompleted == false && string.IsNullOrEmpty(message))
if (task.IsCompleted == false)
{
message = Messages.Isolator.MemoryUsageMonitorTaskTimedOut(timeSpan);
Log.Trace($"Isolator.ExecuteWithTimeLimit(): {message}");
if (CancellationTokenSource.IsCancellationRequested)
{
Log.Trace($"Isolator.ExecuteWithTimeLimit(): Operation was canceled");
throw new OperationCanceledException("Operation was canceled");
}
else if (string.IsNullOrEmpty(message))
{
message = Messages.Isolator.MemoryUsageMonitorTaskTimedOut(timeSpan);
Log.Trace($"Isolator.ExecuteWithTimeLimit(): {message}");
}
}
if (!string.IsNullOrEmpty(message))
{
CancellationTokenSource.Cancel();
if (!CancellationTokenSource.IsCancellationRequested)
{
CancellationTokenSource.Cancel();
}
Log.Error($"Security.ExecuteWithTimeLimit(): {message}");
throw new TimeoutException(message);
}

View File

@@ -48,6 +48,7 @@ namespace QuantConnect.Lean.Engine
private IAlgorithm _algorithm;
private readonly object _lock;
private readonly bool _liveMode;
private CancellationTokenSource _cancellationTokenSource;
/// <summary>
/// Publicly accessible algorithm status
@@ -111,14 +112,17 @@ namespace QuantConnect.Lean.Engine
/// <param name="results">Result handler object</param>
/// <param name="realtime">Realtime processing object</param>
/// <param name="leanManager">ILeanManager implementation that is updated periodically with the IAlgorithm instance</param>
/// <param name="token">Cancellation token</param>
/// <param name="cancellationTokenSource">Cancellation token source to monitor</param>
/// <remarks>Modify with caution</remarks>
public void Run(AlgorithmNodePacket job, IAlgorithm algorithm, ISynchronizer synchronizer, ITransactionHandler transactions, IResultHandler results, IRealTimeHandler realtime, ILeanManager leanManager, CancellationToken token)
public void Run(AlgorithmNodePacket job, IAlgorithm algorithm, ISynchronizer synchronizer, ITransactionHandler transactions, IResultHandler results, IRealTimeHandler realtime, ILeanManager leanManager, CancellationTokenSource cancellationTokenSource)
{
//Initialize:
DataPoints = 0;
_algorithm = algorithm;
var token = cancellationTokenSource.Token;
_cancellationTokenSource = cancellationTokenSource;
var backtestMode = (job.Type == PacketType.BacktestNode);
var methodInvokers = new Dictionary<Type, MethodInvoker>();
var marginCallFrequency = TimeSpan.FromMinutes(5);
@@ -607,6 +611,15 @@ namespace QuantConnect.Lean.Engine
{
_algorithm.SetStatus(state);
}
if (state == AlgorithmStatus.Deleted)
{
if (!_cancellationTokenSource.IsCancellationRequested)
{
// if the algorithm was deleted or stopped, let's give the algorithm a few seconds to shutdown and cancel it out
_cancellationTokenSource.CancelAfter(TimeSpan.FromSeconds(5));
}
}
}
}

View File

@@ -334,7 +334,7 @@ namespace QuantConnect.Lean.Engine
// -> Using this Data Feed,
// -> Send Orders to this TransactionHandler,
// -> Send Results to ResultHandler.
algorithmManager.Run(job, algorithm, synchronizer, AlgorithmHandlers.Transactions, AlgorithmHandlers.Results, AlgorithmHandlers.RealTime, SystemHandlers.LeanManager, isolator.CancellationToken);
algorithmManager.Run(job, algorithm, synchronizer, AlgorithmHandlers.Transactions, AlgorithmHandlers.Results, AlgorithmHandlers.RealTime, SystemHandlers.LeanManager, isolator.CancellationTokenSource);
}
catch (Exception err)
{

View File

@@ -130,6 +130,7 @@ namespace QuantConnect.Tests.Brokerages.Paper
var realTime = new BacktestingRealTimeHandler();
using var nullLeanManager = new AlgorithmManagerTests.NullLeanManager();
using var tokenSource = new CancellationTokenSource();
// run algorithm manager
manager.Run(job,
algorithm,
@@ -138,7 +139,7 @@ namespace QuantConnect.Tests.Brokerages.Paper
results,
realTime,
nullLeanManager,
new CancellationToken()
tokenSource
);
var postDividendCash = algorithm.Portfolio.CashBook[Currencies.USD].Amount;

View File

@@ -1,4 +1,4 @@
/*
/*
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
*
@@ -21,7 +21,7 @@ using QuantConnect.Util;
namespace QuantConnect.Tests.Common
{
[TestFixture]
[TestFixture, Parallelizable(ParallelScope.All)]
public class IsolatorTests
{
[Test]
@@ -45,6 +45,38 @@ namespace QuantConnect.Tests.Common
}
}
[Test]
public void Cancellation()
{
var isolator = new Isolator();
var executed = false;
var ended = false;
var canceled = false;
var result = false;
isolator.CancellationTokenSource.CancelAfter(TimeSpan.FromMilliseconds(100));
try
{
result = isolator.ExecuteWithTimeLimit(
TimeSpan.FromSeconds(5),
() => {
executed = true;
Thread.Sleep(5000);
ended = true;
},
5000,
sleepIntervalMillis: 10
);
}
catch (OperationCanceledException)
{
canceled = true;
}
Assert.IsTrue(canceled);
Assert.IsFalse(result);
Assert.IsTrue(executed);
Assert.IsFalse(ended);
}
[TestCase(Language.Python, true)]
[TestCase(Language.Python, false)]
[TestCase(Language.CSharp, true)]
@@ -98,4 +130,4 @@ namespace QuantConnect.Tests.Common
}
}
}
}

View File

@@ -120,7 +120,6 @@ namespace QuantConnect.Tests.Engine
var results = new BacktestingResultHandler();
var realtime = new BacktestingRealTimeHandler();
using var leanManager = new NullLeanManager();
var token = new CancellationToken();
var nullSynchronizer = new NullSynchronizer(algorithm);
algorithm.Initialize();
@@ -136,7 +135,8 @@ namespace QuantConnect.Tests.Engine
Log.Trace("Starting algorithm manager loop to process " + nullSynchronizer.Count + " time slices");
var sw = Stopwatch.StartNew();
algorithmManager.Run(job, algorithm, nullSynchronizer, transactions, results, realtime, leanManager, token);
using var tokenSource = new CancellationTokenSource();
algorithmManager.Run(job, algorithm, nullSynchronizer, transactions, results, realtime, leanManager, tokenSource);
sw.Stop();
realtime.Exit();