Introduction
So I was back in .NET land the other day at work, where I had to schedule some code to run periodically on some schedule.
The business also needed this schedule to be adjustable, so that they could adjust it when things were busier and wind it down when they are not
This adjusting of the schedule time would be done via a setting in the App.Config, where the App.Config is monitored for changes. If there is a change then we would look to use the new schedule value from the App.Config to run the job. Ideally the app must not go down to afford this change of job schedule time.
There are some good job / scheduling libraries out there, but for this I just wanted to use something light weight so I went with Quartz.net
Its easy to setup and use, and has a fairly nice API, supports IOC and CRON schedules. In short it fits the bill
In a netshell this post will simple talk about how you can adjust the schedule of a ALREADY scheduled job, there will also be some special caveats that I personally had to deal with in my requirements, which may or may not be an issue for you
Some Weird Issues That I Needed To Cope With
So let me just talk about some of the issues that I had to deal with
The guts of the job code that I run on my schedule is actually writing to Azure Blob Storage and then to Azure SQL DW tables. And as such has several writes to several components one after another.
So this run of the current job run MUST be allowed to complete in FULL (or fail using Exception handling that’s ok to). It would not be acceptable to just stop the Quartz job while there is work in flight.
I guess some folk may be thinking of some sort of transaction here, that must either commit or rollback. Unfortunately that doesn’t work with Azure Blob Storage uploads.
So I had to think of another plan.
So here is what I came up with. I would use threading primitives namely an AutoResetEvent that would control when the Quartz.net job could be changed to use a new schedule.
if a change in the App.Config was seen, then we know that we “should” be using a new schedule time, however the scheduled job MAY have work in flight. So we need to wait for that work to complete (or fail) before we could think about swapping the Quartz.net scheduler time.
So that is what I went for, there are a few other things to be aware of such as I needed threading primitives that worked with Async/Await code. Luckily Stephen Toub from the TPL team has done that for us : asyncautoresetevent
There is also the well known fact that the FileSystemWatcher class fires events twice : http://lmgtfy.com/?q=filesystemwatcher+firing+twice
So as we go through the code you will see how I dealt with those
The Code
Ok so now that we have talked about the problem, lets go through the code.
There are several NuGet packages I am using to make my life easier
- Quartz.Net scheduling library
- Topshelf : Simple way to make windows services
- SimpleConfig : Much easier way to create custom config sections
So lets start with the entry point, which for me is the simple Program class shown below
using System; using System.Collections.Generic; using System.Linq; using System.Reflection; using System.Security.Principal; using System.Text; using System.Threading; using System.Threading.Tasks; using Autofac; using SachaBarber.QuartzJobUpdate.Services; using Topshelf; namespace SachaBarber.QuartzJobUpdate { static class Program { private static ILogger _log = null; [STAThread] public static void Main() { try { var container = ContainerOperations.Container; _log = container.Resolve<ILogger>(); _log.Log("Starting"); AppDomain.CurrentDomain.UnhandledException += AppDomainUnhandledException; TaskScheduler.UnobservedTaskException += TaskSchedulerUnobservedTaskException; Thread.CurrentPrincipal = new WindowsPrincipal(WindowsIdentity.GetCurrent()); HostFactory.Run(c => { c.Service<SomeWindowsService>(s => { s.ConstructUsing(() => container.Resolve<SomeWindowsService>()); s.WhenStarted(tc => tc.Start()); s.WhenStopped(tc => tc.Stop()); }); c.RunAsLocalSystem(); c.SetDescription("Uploads Calc Payouts/Summary data into Azure blob storage for RiskStore DW ingestion"); c.SetDisplayName("SachaBarber.QuartzJobUpdate"); c.SetServiceName("SachaBarber.QuartzJobUpdate"); }); } catch (Exception ex) { _log.Log(ex.Message); } finally { _log.Log("Closing"); } } private static void AppDomainUnhandledException(object sender, UnhandledExceptionEventArgs e) { ProcessUnhandledException((Exception)e.ExceptionObject); } private static void TaskSchedulerUnobservedTaskException(object sender, UnobservedTaskExceptionEventArgs e) { ProcessUnhandledException(e.Exception); e.SetObserved(); } private static void ProcessUnhandledException(Exception ex) { if (ex is TargetInvocationException) { ProcessUnhandledException(ex.InnerException); return; } _log.Log("Error"); } } }
All this does it host the actual windows service class for me using TopShelf. Where the actual service class looks like this
using System; using System.Configuration; using System.IO; using System.Reactive.Disposables; using System.Reactive.Linq; using System.Xml.Linq; using Autofac; using SachaBarber.QuartzJobUpdate.Async; using SachaBarber.QuartzJobUpdate.Configuration; using SachaBarber.QuartzJobUpdate.Jobs; using SachaBarber.QuartzJobUpdate.Services; //logging using Quartz; namespace SachaBarber.QuartzJobUpdate { public class SomeWindowsService { private readonly ILogger _log; private readonly ISchedulingAssistanceService _schedulingAssistanceService; private readonly IRxSchedulerService _rxSchedulerService; private readonly IObservableFileSystemWatcher _observableFileSystemWatcher; private IScheduler _quartzScheduler; private readonly AsyncLock _lock = new AsyncLock(); private readonly SerialDisposable _configWatcherDisposable = new SerialDisposable(); private static readonly JobKey _someScheduledJobKey = new JobKey("SomeScheduledJobKey"); private static readonly TriggerKey _someScheduledJobTriggerKey = new TriggerKey("SomeScheduledJobTriggerKey"); public SomeWindowsService( ILogger log, ISchedulingAssistanceService schedulingAssistanceService, IRxSchedulerService rxSchedulerService, IObservableFileSystemWatcher observableFileSystemWatcher) { _log = log; _schedulingAssistanceService = schedulingAssistanceService; _rxSchedulerService = rxSchedulerService; _observableFileSystemWatcher = observableFileSystemWatcher; } public void Start() { try { var ass = typeof (SomeWindowsService).Assembly; var configFile = $"{ass.Location}.config"; CreateConfigWatcher(new FileInfo(configFile)); _log.Log("Starting SomeWindowsService"); _quartzScheduler = ContainerOperations.Container.Resolve<IScheduler>(); _quartzScheduler.JobFactory = new AutofacJobFactory(ContainerOperations.Container); _quartzScheduler.Start(); //create the Job CreateScheduledJob(); } catch (JobExecutionException jeex) { _log.Log(jeex.Message); } catch (SchedulerConfigException scex) { _log.Log(scex.Message); } catch (SchedulerException sex) { _log.Log(sex.Message); } } public void Stop() { _log.Log("Stopping SomeWindowsService"); _quartzScheduler?.Shutdown(); _configWatcherDisposable.Dispose(); _observableFileSystemWatcher.Dispose(); } private void CreateConfigWatcher(FileInfo configFileInfo) { FileSystemWatcher watcher = new FileSystemWatcher(); watcher.Path = configFileInfo.DirectoryName; watcher.NotifyFilter = NotifyFilters.LastAccess | NotifyFilters.LastWrite | NotifyFilters.FileName | NotifyFilters.DirectoryName; watcher.Filter = configFileInfo.Name; _observableFileSystemWatcher.SetFile(watcher); //FSW is notorious for firing twice see here : //http://stackoverflow.com/questions/1764809/filesystemwatcher-changed-event-is-raised-twice //so lets use Rx to Throttle it a bit _configWatcherDisposable.Disposable = _observableFileSystemWatcher.Changed.SubscribeOn( _rxSchedulerService.TaskPool).Throttle(TimeSpan.FromMilliseconds(500)).Subscribe( async x => { //at this point the config has changed, start a critical section using (var releaser = await _lock.LockAsync()) { //tell current scheduled job that we need to read new config, and wait for it //to signal us that we may continue _log.Log($"Config file {configFileInfo.Name} has changed, attempting to read new config data"); _schedulingAssistanceService.RequiresNewSchedulerSetup = true; _schedulingAssistanceService.SchedulerRestartGate.WaitAsync().GetAwaiter().GetResult(); //recreate the AzureBlobConfiguration, and recreate the scheduler using new settings ConfigurationManager.RefreshSection("schedulingConfiguration"); var newSchedulingConfiguration = SimpleConfig.Configuration.Load<SchedulingConfiguration>(); _log.Log($"SchedulingConfiguration section is now : {newSchedulingConfiguration}"); ContainerOperations.ReInitialiseSchedulingConfiguration(newSchedulingConfiguration); ReScheduleJob(); } }, ex => { _log.Log($"Error encountered attempting to read new config data from config file {configFileInfo.Name}"); }); } private void CreateScheduledJob(IJobDetail existingJobDetail = null) { var azureBlobConfiguration = ContainerOperations.Container.Resolve<SchedulingConfiguration>(); IJobDetail job = JobBuilder.Create<SomeQuartzJob>() .WithIdentity(_someScheduledJobKey) .Build(); ITrigger trigger = TriggerBuilder.Create() .WithIdentity(_someScheduledJobTriggerKey) .WithSimpleSchedule(x => x .RepeatForever() .WithIntervalInSeconds(azureBlobConfiguration.ScheduleTimerInMins) ) .StartAt(DateTimeOffset.Now.AddSeconds(azureBlobConfiguration.ScheduleTimerInMins)) .Build(); _quartzScheduler.ScheduleJob(job, trigger); } private void ReScheduleJob() { if (_quartzScheduler != null) { _quartzScheduler.DeleteJob(_someScheduledJobKey); CreateScheduledJob(); } } } }
There is a fair bit going on here. So lets list some of the work this code does
- It creates the initial Quartz.Net job and scheduled it using the values from a custom config section which are read into an object
- It watches the config file for changes (we will go through that in a moment) and will wait on the AsyncAutoResetEvent to be signalled, at which point it will recreate the Quartz.net job
So lets have a look at some of the small helper parts
This is a simple Rx based file system watcher. The reason Rx is good here is that you can Throttle the events (see this post FileSystemWatcher raises 2 events)
using System; using System.IO; using System.Reactive.Linq; namespace SachaBarber.QuartzJobUpdate.Services { public class ObservableFileSystemWatcher : IObservableFileSystemWatcher { private FileSystemWatcher _watcher; public void SetFile(FileSystemWatcher watcher) { _watcher = watcher; Changed = Observable .FromEventPattern<FileSystemEventHandler, FileSystemEventArgs> (h => _watcher.Changed += h, h => _watcher.Changed -= h) .Select(x => x.EventArgs); Renamed = Observable .FromEventPattern<RenamedEventHandler, RenamedEventArgs> (h => _watcher.Renamed += h, h => _watcher.Renamed -= h) .Select(x => x.EventArgs); Deleted = Observable .FromEventPattern<FileSystemEventHandler, FileSystemEventArgs> (h => _watcher.Deleted += h, h => _watcher.Deleted -= h) .Select(x => x.EventArgs); Errors = Observable .FromEventPattern<ErrorEventHandler, ErrorEventArgs> (h => _watcher.Error += h, h => _watcher.Error -= h) .Select(x => x.EventArgs); Created = Observable .FromEventPattern<FileSystemEventHandler, FileSystemEventArgs> (h => _watcher.Created += h, h => _watcher.Created -= h) .Select(x => x.EventArgs); All = Changed.Merge(Renamed).Merge(Deleted).Merge(Created); _watcher.EnableRaisingEvents = true; } public void Dispose() { _watcher.EnableRaisingEvents = false; _watcher.Dispose(); } public IObservable<FileSystemEventArgs> Changed { get; private set; } public IObservable<RenamedEventArgs> Renamed { get; private set; } public IObservable<FileSystemEventArgs> Deleted { get; private set; } public IObservable<ErrorEventArgs> Errors { get; private set; } public IObservable<FileSystemEventArgs> Created { get; private set; } public IObservable<FileSystemEventArgs> All { get; private set; } } }
And this is a small utility class that will contain the results of the custom config section that may be read using SimpleConfig
namespace SachaBarber.QuartzJobUpdate.Configuration { public class SchedulingConfiguration { public int ScheduleTimerInMins { get; set; } public override string ToString() { return $"ScheduleTimerInMins: {ScheduleTimerInMins}"; } } }
Which you read from the App.Config like this
var newSchedulingConfiguration = SimpleConfig.Configuration.Load<SchedulingConfiguration>();
And this is the Async/Await compatible AutoResetEvent that I took from Stephen Toubs blog
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace SachaBarber.QuartzJobUpdate.Async { public class AsyncAutoResetEvent { private static readonly Task Completed = Task.FromResult(true); private readonly Queue<TaskCompletionSource<bool>> _waits = new Queue<TaskCompletionSource<bool>>(); private bool _signaled; public Task WaitAsync() { lock (_waits) { if (_signaled) { _signaled = false; return Completed; } else { var tcs = new TaskCompletionSource<bool>(); _waits.Enqueue(tcs); return tcs.Task; } } } public void Set() { TaskCompletionSource<bool> toRelease = null; lock (_waits) { if (_waits.Count > 0) toRelease = _waits.Dequeue(); else if (!_signaled) _signaled = true; } toRelease?.SetResult(true); } } }
So the last part of the puzzle is how does the AsynAutoReset event get signalled?
Well as we said above we need to wait for any in progress work to complete first. So the way I tackled that was that within the job code that gets run every Quartz.Net scheduler tick time, we just check whether we have be requested to swap out the current schedule time, and if so we should signal the waiting code of the (shared) AsyncAutoResetEvent, otherwise we just carry on and do the regular job work.
The way that we get the AsyncAutoResetEvent that is used by the waiting code and also the job code (to signal it) is via using a singleton registration in an IOC container. I am using AutoFac which I set up like this, but you could have your own singleton, or IOC container of choice that you could use.
The trick is to make sure that both classes that need to access the AsyncAutoResetEvent use a single instance.
using System; using System.Reflection; using Autofac; using SachaBarber.QuartzJobUpdate.Configuration; using SachaBarber.QuartzJobUpdate.Services; using Quartz; using Quartz.Impl; namespace SachaBarber.QuartzJobUpdate { public class ContainerOperations { private static Lazy<IContainer> _containerSingleton = new Lazy<IContainer>(CreateContainer); public static IContainer Container => _containerSingleton.Value; public static void ReInitialiseSchedulingConfiguration( SchedulingConfiguration newSchedulingConfiguration) { var currentSchedulingConfiguration = Container.Resolve<SchedulingConfiguration>(); currentSchedulingConfiguration.ScheduleTimerInMins = newSchedulingConfiguration.ScheduleTimerInMins; } private static IContainer CreateContainer() { var builder = new ContainerBuilder(); builder.RegisterType<ObservableFileSystemWatcher>() .As<IObservableFileSystemWatcher>().ExternallyOwned(); builder.RegisterType<RxSchedulerService>() .As<IRxSchedulerService>().ExternallyOwned(); builder.RegisterType<Logger>().As<ILogger>().ExternallyOwned(); builder.RegisterType<SomeWindowsService>(); builder.RegisterInstance(new SchedulingAssistanceService()) .As<ISchedulingAssistanceService>(); builder.RegisterInstance( SimpleConfig.Configuration.Load<SchedulingConfiguration>()); // Quartz/jobs builder.Register(c => new StdSchedulerFactory().GetScheduler()) .As<Quartz.IScheduler>(); builder.RegisterAssemblyTypes(Assembly.GetExecutingAssembly()) .Where(x => typeof(IJob).IsAssignableFrom(x)); return builder.Build(); } } }
Where the shared instance in my case is this class
using SachaBarber.QuartzJobUpdate.Async; namespace SachaBarber.QuartzJobUpdate.Services { public class SchedulingAssistanceService : ISchedulingAssistanceService { public SchedulingAssistanceService() { SchedulerRestartGate = new AsyncAutoResetEvent(); RequiresNewSchedulerSetup = false; } public AsyncAutoResetEvent SchedulerRestartGate { get; } public bool RequiresNewSchedulerSetup { get; set; } } }
Here is the actual job code that will check to see if a change in the App.Config has been detected. Which would require this code to signal the waiting code that it may continue.
using System; using System.IO; using System.Net; using System.Threading.Tasks; using Quartz; namespace SachaBarber.QuartzJobUpdate.Services { public class SomeQuartzJob : IJob { private readonly ILogger _log; private readonly ISchedulingAssistanceService _schedulingAssistanceService; public SomeQuartzJob( ILogger log, ISchedulingAssistanceService schedulingAssistanceService) { _log = log; _schedulingAssistanceService = schedulingAssistanceService; } public void Execute(IJobExecutionContext context) { try { ExecuteAsync(context).GetAwaiter().GetResult(); } catch (JobExecutionException jeex) { _log.Log(jeex.Message); throw; } catch (SchedulerConfigException scex) { _log.Log(scex.Message); throw; } catch (SchedulerException sex) { _log.Log(sex.Message); throw; } catch (ArgumentNullException anex) { _log.Log(anex.Message); throw; } catch (OperationCanceledException ocex) { _log.Log(ocex.Message); throw; } catch (IOException ioex) { _log.Log(ioex.Message); throw; } } /// <summary> /// This is called every time the Quartz.net scheduler CRON time ticks /// </summary> public async Task ExecuteAsync(IJobExecutionContext context) { await Task.Run(async () => { if (_schedulingAssistanceService.RequiresNewSchedulerSetup) { //signal the waiting scheduler restart code that it can now restart the scheduler _schedulingAssistanceService.RequiresNewSchedulerSetup = false; _log.Log("Job has been asked to stop, to allow job reschedule due to change in config"); _schedulingAssistanceService.SchedulerRestartGate.Set(); } else { await Task.Delay(1000); _log.Log("Doing the uninterruptible work now"); } }); } } }
So when the AsyncAutoResetEvent is signalled the waiting code (inside the subscribe code of the Rx file system watcher inside the SomeWindowsService.cs code) will proceed to swap out the Quartz.Net scheduler time.
It can do this safely as we know there is NO work in flight as the job has told this waiting to code to proceed, which it can only do if there is no work in flight.
This swapping over of the scheduler time to use the newly read App.Config values is also protected in an AsyncLock class (again taken from Stephen Toub)
using System; using System.Threading; using System.Threading.Tasks; namespace SachaBarber.QuartzJobUpdate.Async { /// <summary> /// See http://blogs.msdn.com/b/pfxteam/archive/2012/02/12/10266988.aspx /// from the fabulous Stephen Toub /// </summary> public class AsyncLock { private readonly AsyncSemaphore m_semaphore; private readonly Task<Releaser> m_releaser; public AsyncLock() { m_semaphore = new AsyncSemaphore(1); m_releaser = Task.FromResult(new Releaser(this)); } public Task<Releaser> LockAsync() { var wait = m_semaphore.WaitAsync(); return wait.IsCompleted ? m_releaser : wait.ContinueWith((_, state) => new Releaser((AsyncLock)state), this, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default); } public struct Releaser : IDisposable { private readonly AsyncLock m_toRelease; internal Releaser(AsyncLock toRelease) { m_toRelease = toRelease; } public void Dispose() { if (m_toRelease != null) m_toRelease.m_semaphore.Release(); } } } }
Where this relies on AsyncSemaphore
using System; using System.Collections.Generic; using System.Threading.Tasks; namespace SachaBarber.QuartzJobUpdate.Async { /// <summary> /// See http://blogs.msdn.com/b/pfxteam/archive/2012/02/12/10266983.aspx /// from the fabulous Stephen Toub /// </summary> public class AsyncSemaphore { private static readonly Task s_completed = Task.FromResult(true); private readonly Queue<TaskCompletionSource<bool>> _mWaiters = new Queue<TaskCompletionSource<bool>>(); private int _mCurrentCount; public AsyncSemaphore(int initialCount) { if (initialCount < 0) throw new ArgumentOutOfRangeException("initialCount"); _mCurrentCount = initialCount; } public Task WaitAsync() { lock (_mWaiters) { if (_mCurrentCount > 0) { --_mCurrentCount; return s_completed; } else { var waiter = new TaskCompletionSource<bool>(); _mWaiters.Enqueue(waiter); return waiter.Task; } } } public void Release() { TaskCompletionSource<bool> toRelease = null; lock (_mWaiters) { if (_mWaiters.Count > 0) toRelease = _mWaiters.Dequeue(); else ++_mCurrentCount; } if (toRelease != null) toRelease.SetResult(true); } } }
Just for completeness this is how you get an App.Config section to refresh at runtime
ConfigurationManager.RefreshSection("schedulingConfiguration");
Anyway this works fine for me, I now have a reactive app that changes to changes in the App.Config without the need to restart the app, and it does so by allowing inflight work to be completed.
Hope it helps someone out there
Where Is The Code?
The code can be found here : : https://github.com/sachabarber/SachaBarber.QuartzJobUpdate