diff --git a/Resgrid.sln b/Resgrid.sln
index 8a8961539..390f6643b 100644
--- a/Resgrid.sln
+++ b/Resgrid.sln
@@ -106,6 +106,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Resgrid.Chatbot.NLU", "Core
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Resgrid.Providers.Chatbot", "Providers\Resgrid.Providers.Chatbot\Resgrid.Providers.Chatbot.csproj", "{D3E4F5A6-B7C8-9012-CDEF-012345678902}"
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Quidjibo.SqlServer", "Workers\Support\Quidjibo.SqlServer\Quidjibo.SqlServer.csproj", "{93931385-3360-455F-B051-CF7B56C0ED17}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Azure|Any CPU = Azure|Any CPU
@@ -1496,6 +1498,42 @@ Global
{D3E4F5A6-B7C8-9012-CDEF-012345678902}.Staging|x86.Build.0 = Debug|Any CPU
{D3E4F5A6-B7C8-9012-CDEF-012345678902}.Staging|x64.ActiveCfg = Debug|Any CPU
{D3E4F5A6-B7C8-9012-CDEF-012345678902}.Staging|x64.Build.0 = Debug|Any CPU
+ {93931385-3360-455F-B051-CF7B56C0ED17}.Azure|Any CPU.ActiveCfg = Debug|Any CPU
+ {93931385-3360-455F-B051-CF7B56C0ED17}.Azure|Any CPU.Build.0 = Debug|Any CPU
+ {93931385-3360-455F-B051-CF7B56C0ED17}.Azure|x86.ActiveCfg = Debug|Any CPU
+ {93931385-3360-455F-B051-CF7B56C0ED17}.Azure|x86.Build.0 = Debug|Any CPU
+ {93931385-3360-455F-B051-CF7B56C0ED17}.Azure|x64.ActiveCfg = Debug|Any CPU
+ {93931385-3360-455F-B051-CF7B56C0ED17}.Azure|x64.Build.0 = Debug|Any CPU
+ {93931385-3360-455F-B051-CF7B56C0ED17}.Cloud|Any CPU.ActiveCfg = Debug|Any CPU
+ {93931385-3360-455F-B051-CF7B56C0ED17}.Cloud|Any CPU.Build.0 = Debug|Any CPU
+ {93931385-3360-455F-B051-CF7B56C0ED17}.Cloud|x86.ActiveCfg = Debug|Any CPU
+ {93931385-3360-455F-B051-CF7B56C0ED17}.Cloud|x86.Build.0 = Debug|Any CPU
+ {93931385-3360-455F-B051-CF7B56C0ED17}.Cloud|x64.ActiveCfg = Debug|Any CPU
+ {93931385-3360-455F-B051-CF7B56C0ED17}.Cloud|x64.Build.0 = Debug|Any CPU
+ {93931385-3360-455F-B051-CF7B56C0ED17}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {93931385-3360-455F-B051-CF7B56C0ED17}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {93931385-3360-455F-B051-CF7B56C0ED17}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {93931385-3360-455F-B051-CF7B56C0ED17}.Debug|x86.Build.0 = Debug|Any CPU
+ {93931385-3360-455F-B051-CF7B56C0ED17}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {93931385-3360-455F-B051-CF7B56C0ED17}.Debug|x64.Build.0 = Debug|Any CPU
+ {93931385-3360-455F-B051-CF7B56C0ED17}.Docker|Any CPU.ActiveCfg = Debug|Any CPU
+ {93931385-3360-455F-B051-CF7B56C0ED17}.Docker|Any CPU.Build.0 = Debug|Any CPU
+ {93931385-3360-455F-B051-CF7B56C0ED17}.Docker|x86.ActiveCfg = Debug|Any CPU
+ {93931385-3360-455F-B051-CF7B56C0ED17}.Docker|x86.Build.0 = Debug|Any CPU
+ {93931385-3360-455F-B051-CF7B56C0ED17}.Docker|x64.ActiveCfg = Debug|Any CPU
+ {93931385-3360-455F-B051-CF7B56C0ED17}.Docker|x64.Build.0 = Debug|Any CPU
+ {93931385-3360-455F-B051-CF7B56C0ED17}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {93931385-3360-455F-B051-CF7B56C0ED17}.Release|Any CPU.Build.0 = Release|Any CPU
+ {93931385-3360-455F-B051-CF7B56C0ED17}.Release|x86.ActiveCfg = Release|Any CPU
+ {93931385-3360-455F-B051-CF7B56C0ED17}.Release|x86.Build.0 = Release|Any CPU
+ {93931385-3360-455F-B051-CF7B56C0ED17}.Release|x64.ActiveCfg = Release|Any CPU
+ {93931385-3360-455F-B051-CF7B56C0ED17}.Release|x64.Build.0 = Release|Any CPU
+ {93931385-3360-455F-B051-CF7B56C0ED17}.Staging|Any CPU.ActiveCfg = Debug|Any CPU
+ {93931385-3360-455F-B051-CF7B56C0ED17}.Staging|Any CPU.Build.0 = Debug|Any CPU
+ {93931385-3360-455F-B051-CF7B56C0ED17}.Staging|x86.ActiveCfg = Debug|Any CPU
+ {93931385-3360-455F-B051-CF7B56C0ED17}.Staging|x86.Build.0 = Debug|Any CPU
+ {93931385-3360-455F-B051-CF7B56C0ED17}.Staging|x64.ActiveCfg = Debug|Any CPU
+ {93931385-3360-455F-B051-CF7B56C0ED17}.Staging|x64.Build.0 = Debug|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -1540,6 +1578,7 @@ Global
{B1A2C3D4-E5F6-7890-ABCD-EF1234567890} = {D43D1D6B-66A9-4A57-9EA3-8DECC92FA583}
{C2D3E4F5-A6B7-8901-BCDE-F12345678901} = {D43D1D6B-66A9-4A57-9EA3-8DECC92FA583}
{D3E4F5A6-B7C8-9012-CDEF-012345678902} = {F06D475C-635C-4DE4-82BA-C49A90BA8FCD}
+ {93931385-3360-455F-B051-CF7B56C0ED17} = {89331D76-C527-479D-8F30-8033A04C625F}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {156116FF-243E-45E8-8717-DB72E95F56AF}
diff --git a/Workers/Resgrid.Workers.Console/Resgrid.Workers.Console.csproj b/Workers/Resgrid.Workers.Console/Resgrid.Workers.Console.csproj
index 679fcfce7..07b207feb 100644
--- a/Workers/Resgrid.Workers.Console/Resgrid.Workers.Console.csproj
+++ b/Workers/Resgrid.Workers.Console/Resgrid.Workers.Console.csproj
@@ -29,7 +29,6 @@
-
@@ -58,6 +57,7 @@
+
diff --git a/Workers/Support/Quidjibo.Postgres/Factories/PostgresProgressProviderFactory.cs b/Workers/Support/Quidjibo.Postgres/Factories/PostgresProgressProviderFactory.cs
index cc6af12b3..ff650aa9f 100644
--- a/Workers/Support/Quidjibo.Postgres/Factories/PostgresProgressProviderFactory.cs
+++ b/Workers/Support/Quidjibo.Postgres/Factories/PostgresProgressProviderFactory.cs
@@ -36,9 +36,9 @@ public PostgresProgressProviderFactory(
return _provider;
}
+ await SyncLock.WaitAsync(cancellationToken);
try
{
- await SyncLock.WaitAsync(cancellationToken);
await PostgresRunner.ExecuteAsync(async cmd =>
{
cmd.CommandText = await SqlLoader.GetScript("Progress.Setup");
diff --git a/Workers/Support/Quidjibo.Postgres/Factories/PostgresScheduleProviderFactory.cs b/Workers/Support/Quidjibo.Postgres/Factories/PostgresScheduleProviderFactory.cs
index 6ac56412c..7a43fd1dc 100644
--- a/Workers/Support/Quidjibo.Postgres/Factories/PostgresScheduleProviderFactory.cs
+++ b/Workers/Support/Quidjibo.Postgres/Factories/PostgresScheduleProviderFactory.cs
@@ -30,9 +30,9 @@ public PostgresScheduleProviderFactory(
public async Task CreateAsync(string[] queues, CancellationToken cancellationToken = default(CancellationToken))
{
+ await SyncLock.WaitAsync(cancellationToken);
try
{
- await SyncLock.WaitAsync(cancellationToken);
await PostgresRunner.ExecuteAsync(async cmd =>
{
var schemaSetup = await SqlLoader.GetScript("Schema.Setup");
diff --git a/Workers/Support/Quidjibo.Postgres/Factories/PostgresWorkProviderFactory.cs b/Workers/Support/Quidjibo.Postgres/Factories/PostgresWorkProviderFactory.cs
index 824db74b9..fe56022c6 100644
--- a/Workers/Support/Quidjibo.Postgres/Factories/PostgresWorkProviderFactory.cs
+++ b/Workers/Support/Quidjibo.Postgres/Factories/PostgresWorkProviderFactory.cs
@@ -31,9 +31,9 @@ public PostgresWorkProviderFactory(
public async Task CreateAsync(string[] queues, CancellationToken cancellationToken = default(CancellationToken))
{
+ await SyncLock.WaitAsync(cancellationToken);
try
{
- await SyncLock.WaitAsync(cancellationToken);
if (!_initialized)
{
await PostgresRunner.ExecuteAsync(async cmd =>
diff --git a/Workers/Support/Quidjibo.SqlServer/Configurations/SqlServerQuidjiboConfiguration.cs b/Workers/Support/Quidjibo.SqlServer/Configurations/SqlServerQuidjiboConfiguration.cs
new file mode 100644
index 000000000..1c06d67e5
--- /dev/null
+++ b/Workers/Support/Quidjibo.SqlServer/Configurations/SqlServerQuidjiboConfiguration.cs
@@ -0,0 +1,48 @@
+// // Copyright (c) smiggleworth. All rights reserved.
+// // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
+
+using Quidjibo.Configurations;
+using Quidjibo.Constants;
+
+namespace Quidjibo.SqlServer.Configurations
+{
+ public class SqlServerQuidjiboConfiguration : IQuidjiboConfiguration
+ {
+ public int PollingInterval { get; set; } = 10;
+ public string ConnectionString { get; set; }
+
+ ///
+ /// The number of days to keep completed/faulted work items.
+ ///
+ public int DaysToKeep { get; set; } = 3;
+
+ public int BatchSize { get; set; } = 5;
+
+ ///
+ public int? WorkPollingInterval { get; set; }
+
+ ///
+ public bool EnableScheduler { get; set; } = true;
+
+ ///
+ public int? SchedulePollingInterval { get; set; }
+
+ ///
+ public bool EnableWorker { get; set; } = true;
+
+ ///
+ public bool SingleLoop { get; set; } = true;
+
+ ///
+ public int LockInterval { get; set; } = 30;
+
+ ///
+ public int MaxAttempts { get; set; } = 5;
+
+ ///
+ public int Throttle { get; set; } = 10;
+
+ ///
+ public string[] Queues { get; set; } = Default.Queues;
+ }
+}
\ No newline at end of file
diff --git a/Workers/Support/Quidjibo.SqlServer/Extensions/QuidjiboBuilderExtensions.cs b/Workers/Support/Quidjibo.SqlServer/Extensions/QuidjiboBuilderExtensions.cs
new file mode 100644
index 000000000..a6256bca6
--- /dev/null
+++ b/Workers/Support/Quidjibo.SqlServer/Extensions/QuidjiboBuilderExtensions.cs
@@ -0,0 +1,95 @@
+using System;
+using Quidjibo.Constants;
+using Quidjibo.SqlServer.Configurations;
+using Quidjibo.SqlServer.Factories;
+
+namespace Quidjibo.SqlServer.Extensions
+{
+ public static class QuidjiboBuilderExtensions
+ {
+ ///
+ /// Use Sql Server for Work, Progress, an Scheduled Jobs
+ ///
+ ///
+ ///
+ ///
+ public static QuidjiboBuilder UseSqlServer(this QuidjiboBuilder builder, Action sqlServerQuidjiboConfiguration)
+ {
+ var config = new SqlServerQuidjiboConfiguration();
+ sqlServerQuidjiboConfiguration(config);
+ return builder.UseSqlServer(config);
+ }
+
+ ///
+ /// Use Sql Server for Work, Progress, and Scheduled Jobs
+ ///
+ ///
+ ///
+ ///
+ public static QuidjiboBuilder UseSqlServer(this QuidjiboBuilder builder, SqlServerQuidjiboConfiguration sqlServerQuidjiboConfiguration)
+ {
+ return builder.Configure(sqlServerQuidjiboConfiguration)
+ .ConfigureWorkProviderFactory(new SqlWorkProviderFactory(builder.LoggerFactory, sqlServerQuidjiboConfiguration))
+ .ConfigureProgressProviderFactory(new SqlProgressProviderFactory(builder.LoggerFactory, sqlServerQuidjiboConfiguration.ConnectionString))
+ .ConfigureScheduleProviderFactory(new SqlScheduleProviderFactory(builder.LoggerFactory, sqlServerQuidjiboConfiguration.ConnectionString));
+ }
+
+ ///
+ /// Use Sql Server for Work, Progress, and Scheduled Jobs
+ ///
+ ///
+ ///
+ ///
+ ///
+ public static QuidjiboBuilder UseSqlServer(this QuidjiboBuilder builder, string connectionString, params string[] queues)
+ {
+ if (queues == null || queues.Length == 0)
+ {
+ queues = Default.Queues;
+ }
+
+ var config = new SqlServerQuidjiboConfiguration
+ {
+ ConnectionString = connectionString,
+ Queues = queues
+ };
+ return builder.Configure(config)
+ .ConfigureWorkProviderFactory(new SqlWorkProviderFactory(builder.LoggerFactory, config))
+ .ConfigureProgressProviderFactory(new SqlProgressProviderFactory(builder.LoggerFactory, connectionString))
+ .ConfigureScheduleProviderFactory(new SqlScheduleProviderFactory(builder.LoggerFactory, connectionString));
+ }
+
+ ///
+ /// Use Sql Server For Work
+ ///
+ ///
+ ///
+ ///
+ public static QuidjiboBuilder UseSqlServerForWork(this QuidjiboBuilder builder, SqlServerQuidjiboConfiguration sqlServerQuidjiboConfiguration)
+ {
+ return builder.ConfigureWorkProviderFactory(new SqlWorkProviderFactory(builder.LoggerFactory, sqlServerQuidjiboConfiguration));
+ }
+
+ ///
+ /// Use Sql Server For Progress
+ ///
+ ///
+ ///
+ ///
+ public static QuidjiboBuilder UseSqlServerForProgress(this QuidjiboBuilder builder, string connectionString)
+ {
+ return builder.ConfigureProgressProviderFactory(new SqlProgressProviderFactory(builder.LoggerFactory, connectionString));
+ }
+
+ ///
+ /// Use Sql Server For Scheduled Jobs
+ ///
+ ///
+ ///
+ ///
+ public static QuidjiboBuilder UseSqlServerForSchedule(this QuidjiboBuilder builder, string connectionString)
+ {
+ return builder.ConfigureScheduleProviderFactory(new SqlScheduleProviderFactory(builder.LoggerFactory, connectionString));
+ }
+ }
+}
\ No newline at end of file
diff --git a/Workers/Support/Quidjibo.SqlServer/Extensions/SqlCommandExtensions.cs b/Workers/Support/Quidjibo.SqlServer/Extensions/SqlCommandExtensions.cs
new file mode 100644
index 000000000..01062c6b5
--- /dev/null
+++ b/Workers/Support/Quidjibo.SqlServer/Extensions/SqlCommandExtensions.cs
@@ -0,0 +1,71 @@
+// Copyright (c) smiggleworth. All rights reserved.
+// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
+
+using System;
+using Microsoft.Data.SqlClient;
+using System.Threading;
+using System.Threading.Tasks;
+using Quidjibo.Models;
+using Quidjibo.SqlServer.Providers;
+using Quidjibo.SqlServer.Utils;
+
+namespace Quidjibo.SqlServer.Extensions
+{
+ public static class SqlCommandExtensions
+ {
+ public static async Task PrepareForSendAsync(this SqlCommand cmd, WorkItem item, int delay, CancellationToken cancellationToken)
+ {
+ var createdOn = DateTime.UtcNow;
+ var visibleOn = createdOn.AddSeconds(delay);
+ var expireOn = item.ExpireOn == default(DateTime) ? visibleOn.AddDays(SqlWorkProvider.DEFAULT_EXPIRE_DAYS) : item.ExpireOn;
+
+#pragma warning disable CA2100 // Review SQL queries for security vulnerabilities
+ cmd.CommandText = await SqlLoader.GetScript("Work.Send");
+#pragma warning restore CA2100 // Review SQL queries for security vulnerabilities
+ cmd.AddParameter("@Id", item.Id);
+ cmd.AddParameter("@ScheduleId", item.ScheduleId);
+ cmd.AddParameter("@CorrelationId", item.CorrelationId);
+ cmd.AddParameter("@Name", item.Name);
+ cmd.AddParameter("@Worker", item.Worker);
+ cmd.AddParameter("@Queue", item.Queue);
+ cmd.AddParameter("@Attempts", item.Attempts);
+ cmd.AddParameter("@CreatedOn", createdOn);
+ cmd.AddParameter("@ExpireOn", expireOn);
+ cmd.AddParameter("@VisibleOn", visibleOn);
+ cmd.AddParameter("@Status", SqlWorkProvider.StatusFlags.New);
+ cmd.AddParameter("@Payload", item.Payload);
+ }
+
+ public static async Task PrepareForRenewAsync(this SqlCommand cmd, WorkItem item, DateTime lockExpireOn, CancellationToken cancellationToken)
+ {
+#pragma warning disable CA2100 // Review SQL queries for security vulnerabilities
+ cmd.CommandText = await SqlLoader.GetScript("Work.Renew");
+#pragma warning restore CA2100 // Review SQL queries for security vulnerabilities
+ cmd.AddParameter("@Id", item.Id);
+ cmd.AddParameter("@VisibleOn", lockExpireOn);
+ }
+
+ public static async Task PrepareForCompleteAsync(this SqlCommand cmd, WorkItem item, CancellationToken cancellationToken)
+ {
+#pragma warning disable CA2100 // Review SQL queries for security vulnerabilities
+ cmd.CommandText = await SqlLoader.GetScript("Work.Complete");
+#pragma warning restore CA2100 // Review SQL queries for security vulnerabilities
+ cmd.AddParameter("@Id", item.Id);
+ cmd.AddParameter("@Complete", SqlWorkProvider.StatusFlags.Complete);
+ }
+
+ public static async Task PrepareForFaultAsync(this SqlCommand cmd, WorkItem item, int visibilityTimeout, CancellationToken cancellationToken)
+ {
+ var faultedOn = DateTime.UtcNow;
+
+#pragma warning disable CA2100 // Review SQL queries for security vulnerabilities
+ cmd.CommandText = await SqlLoader.GetScript("Work.Fault");
+#pragma warning restore CA2100 // Review SQL queries for security vulnerabilities
+ cmd.AddParameter("@Id", item.Id);
+ cmd.AddParameter("@VisibleOn", faultedOn.AddSeconds(Math.Max(visibilityTimeout, 30)));
+ cmd.AddParameter("@Faulted", SqlWorkProvider.StatusFlags.Faulted);
+ }
+
+ }
+
+}
\ No newline at end of file
diff --git a/Workers/Support/Quidjibo.SqlServer/Extensions/SqlParameterCollectionExtensions.cs b/Workers/Support/Quidjibo.SqlServer/Extensions/SqlParameterCollectionExtensions.cs
new file mode 100644
index 000000000..96dc0a90c
--- /dev/null
+++ b/Workers/Support/Quidjibo.SqlServer/Extensions/SqlParameterCollectionExtensions.cs
@@ -0,0 +1,13 @@
+using System;
+using Microsoft.Data.SqlClient;
+
+namespace Quidjibo.SqlServer.Extensions
+{
+ public static class SqlParameterCollectionExtensions
+ {
+ public static SqlParameter AddParameter(this SqlCommand cmd, string name, object value)
+ {
+ return cmd.Parameters.AddWithValue(name, value ?? DBNull.Value);
+ }
+ }
+}
\ No newline at end of file
diff --git a/Workers/Support/Quidjibo.SqlServer/Factories/SqlProgressProviderFactory.cs b/Workers/Support/Quidjibo.SqlServer/Factories/SqlProgressProviderFactory.cs
new file mode 100644
index 000000000..302d821c9
--- /dev/null
+++ b/Workers/Support/Quidjibo.SqlServer/Factories/SqlProgressProviderFactory.cs
@@ -0,0 +1,59 @@
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.Extensions.Logging;
+using Quidjibo.Factories;
+using Quidjibo.Providers;
+using Quidjibo.SqlServer.Providers;
+using Quidjibo.SqlServer.Utils;
+
+namespace Quidjibo.SqlServer.Factories
+{
+ public class SqlProgressProviderFactory : IProgressProviderFactory
+ {
+ private static readonly SemaphoreSlim SyncLock = new SemaphoreSlim(1, 1);
+ private readonly string _connectionString;
+
+ private readonly ILoggerFactory _loggerFactory;
+ private IProgressProvider _provider;
+
+ public SqlProgressProviderFactory(
+ ILoggerFactory loggerFactory,
+ string connectionString)
+ {
+ _loggerFactory = loggerFactory;
+ _connectionString = connectionString;
+ }
+
+ public Task CreateAsync(string queues, CancellationToken cancellationToken = default(CancellationToken))
+ {
+ return CreateAsync(queues.Split(','), cancellationToken);
+ }
+
+ public async Task CreateAsync(string[] queues, CancellationToken cancellationToken = default(CancellationToken))
+ {
+ if (_provider != null)
+ {
+ return _provider;
+ }
+
+ await SyncLock.WaitAsync(cancellationToken);
+ try
+ {
+ await SqlRunner.ExecuteAsync(async cmd =>
+ {
+ cmd.CommandText = await SqlLoader.GetScript("Progress.Setup");
+ await cmd.ExecuteNonQueryAsync(cancellationToken);
+ }, _connectionString, false, cancellationToken);
+
+ _provider = new SqlProgressProvider(
+ _loggerFactory.CreateLogger(),
+ _connectionString);
+ return _provider;
+ }
+ finally
+ {
+ SyncLock.Release();
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/Workers/Support/Quidjibo.SqlServer/Factories/SqlScheduleProviderFactory.cs b/Workers/Support/Quidjibo.SqlServer/Factories/SqlScheduleProviderFactory.cs
new file mode 100644
index 000000000..72c78c72c
--- /dev/null
+++ b/Workers/Support/Quidjibo.SqlServer/Factories/SqlScheduleProviderFactory.cs
@@ -0,0 +1,54 @@
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.Extensions.Logging;
+using Quidjibo.Factories;
+using Quidjibo.Providers;
+using Quidjibo.SqlServer.Providers;
+using Quidjibo.SqlServer.Utils;
+
+namespace Quidjibo.SqlServer.Factories
+{
+ public class SqlScheduleProviderFactory : IScheduleProviderFactory
+ {
+ private static readonly SemaphoreSlim SyncLock = new SemaphoreSlim(1, 1);
+ private readonly string _connectionString;
+
+ private readonly ILoggerFactory _loggerFactory;
+
+ public SqlScheduleProviderFactory(
+ ILoggerFactory loggerFactory,
+ string connectionString)
+ {
+ _loggerFactory = loggerFactory;
+ _connectionString = connectionString;
+ }
+
+ public async Task CreateAsync(string queues, CancellationToken cancellationToken = default(CancellationToken))
+ {
+ return await CreateAsync(queues.Split(','), cancellationToken);
+ }
+
+ public async Task CreateAsync(string[] queues, CancellationToken cancellationToken = default(CancellationToken))
+ {
+ await SyncLock.WaitAsync(cancellationToken);
+ try
+ {
+ await SqlRunner.ExecuteAsync(async cmd =>
+ {
+ var schemaSetup = await SqlLoader.GetScript("Schema.Setup");
+ var scheduleSetup = await SqlLoader.GetScript("Schedule.Setup");
+ cmd.CommandText = $"{schemaSetup};\r\n{scheduleSetup}";
+ await cmd.ExecuteNonQueryAsync(cancellationToken);
+ }, _connectionString, false, cancellationToken);
+
+ return await Task.FromResult(new SqlScheduleProvider(
+ _loggerFactory.CreateLogger(),
+ _connectionString, queues));
+ }
+ finally
+ {
+ SyncLock.Release();
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/Workers/Support/Quidjibo.SqlServer/Factories/SqlWorkProviderFactory.cs b/Workers/Support/Quidjibo.SqlServer/Factories/SqlWorkProviderFactory.cs
new file mode 100644
index 000000000..87ad8fc33
--- /dev/null
+++ b/Workers/Support/Quidjibo.SqlServer/Factories/SqlWorkProviderFactory.cs
@@ -0,0 +1,63 @@
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.Extensions.Logging;
+using Quidjibo.Factories;
+using Quidjibo.Providers;
+using Quidjibo.SqlServer.Configurations;
+using Quidjibo.SqlServer.Providers;
+using Quidjibo.SqlServer.Utils;
+
+namespace Quidjibo.SqlServer.Factories
+{
+ public class SqlWorkProviderFactory : IWorkProviderFactory
+ {
+ private static readonly SemaphoreSlim SyncLock = new SemaphoreSlim(1, 1);
+ private readonly ILoggerFactory _loggerFactory;
+ private readonly SqlServerQuidjiboConfiguration _sqlServerQuidjiboConfiguration;
+ private bool _initialized;
+
+ public SqlWorkProviderFactory(
+ ILoggerFactory loggerFactory,
+ SqlServerQuidjiboConfiguration sqlServerQuidjiboConfiguration)
+ {
+ _loggerFactory = loggerFactory;
+ _sqlServerQuidjiboConfiguration = sqlServerQuidjiboConfiguration;
+ }
+
+ public Task CreateAsync(string queues, CancellationToken cancellationToken = default(CancellationToken))
+ {
+ return CreateAsync(queues.Split(','), cancellationToken);
+ }
+
+ public async Task CreateAsync(string[] queues, CancellationToken cancellationToken = default(CancellationToken))
+ {
+ await SyncLock.WaitAsync(cancellationToken);
+ try
+ {
+ if (!_initialized)
+ {
+ await SqlRunner.ExecuteAsync(async cmd =>
+ {
+ var schemaSetup = await SqlLoader.GetScript("Schema.Setup");
+ var workSetup = await SqlLoader.GetScript("Work.Setup");
+ cmd.CommandText = $"{schemaSetup};\r\n{workSetup}";
+ await cmd.ExecuteNonQueryAsync(cancellationToken);
+ }, _sqlServerQuidjiboConfiguration.ConnectionString, false, cancellationToken);
+ _initialized = true;
+ }
+
+ return new SqlWorkProvider(
+ _loggerFactory.CreateLogger(),
+ _sqlServerQuidjiboConfiguration.ConnectionString,
+ queues,
+ _sqlServerQuidjiboConfiguration.LockInterval,
+ _sqlServerQuidjiboConfiguration.BatchSize,
+ _sqlServerQuidjiboConfiguration.DaysToKeep);
+ }
+ finally
+ {
+ SyncLock.Release();
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/Workers/Support/Quidjibo.SqlServer/Providers/SqlProgressProvider.cs b/Workers/Support/Quidjibo.SqlServer/Providers/SqlProgressProvider.cs
new file mode 100644
index 000000000..35e96bd2c
--- /dev/null
+++ b/Workers/Support/Quidjibo.SqlServer/Providers/SqlProgressProvider.cs
@@ -0,0 +1,79 @@
+using System;
+using System.Collections.Generic;
+using Microsoft.Data.SqlClient;
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.Extensions.Logging;
+using Quidjibo.Models;
+using Quidjibo.Providers;
+using Quidjibo.SqlServer.Extensions;
+using Quidjibo.SqlServer.Utils;
+
+namespace Quidjibo.SqlServer.Providers
+{
+ public class SqlProgressProvider : IProgressProvider
+ {
+ private readonly string _connectionString;
+ private readonly ILogger _logger;
+
+ public SqlProgressProvider(
+ ILogger logger,
+ string connectionString)
+ {
+ _logger = logger;
+ _connectionString = connectionString;
+ }
+
+ public async Task ReportAsync(ProgressItem item, CancellationToken cancellationToken)
+ {
+ await ExecuteAsync(async cmd =>
+ {
+ cmd.CommandText = await SqlLoader.GetScript("Progress.Create");
+ cmd.AddParameter("@Id", item.Id);
+ cmd.AddParameter("@WorkId", item.WorkId);
+ cmd.AddParameter("@CorrelationId", item.CorrelationId);
+ cmd.AddParameter("@Name", item.Name);
+ cmd.AddParameter("@Queue", item.Queue);
+ cmd.AddParameter("@RecordedOn", item.RecordedOn);
+ cmd.AddParameter("@Value", item.Value);
+ cmd.AddParameter("@Note", item.Note);
+ await cmd.ExecuteNonQueryAsync(cancellationToken);
+ }, cancellationToken);
+ await Task.CompletedTask;
+ }
+
+ public async Task> LoadByCorrelationIdAsync(Guid correlationId, CancellationToken cancellationToken)
+ {
+ var items = new List();
+ await ExecuteAsync(async cmd =>
+ {
+ cmd.CommandText = await SqlLoader.GetScript("Progress.LoadByCorrelationId");
+ cmd.AddParameter("@CorrelationId", correlationId);
+ using (var rdr = await cmd.ExecuteReaderAsync(cancellationToken))
+ {
+ while (await rdr.ReadAsync(cancellationToken))
+ {
+ var workItem = new ProgressItem
+ {
+ CorrelationId = rdr.Map(nameof(ProgressItem.CorrelationId)),
+ Id = rdr.Map(nameof(ProgressItem.Id)),
+ Name = rdr.Map(nameof(ProgressItem.Name)),
+ Note = rdr.Map(nameof(ProgressItem.Note)),
+ Queue = rdr.Map(nameof(ProgressItem.Queue)),
+ RecordedOn = rdr.Map(nameof(ProgressItem.RecordedOn)),
+ Value = rdr.Map(nameof(ProgressItem.Value)),
+ WorkId = rdr.Map(nameof(ProgressItem.WorkId))
+ };
+ items.Add(workItem);
+ }
+ }
+ }, cancellationToken);
+ return items;
+ }
+
+ private Task ExecuteAsync(Func func, CancellationToken cancellationToken)
+ {
+ return SqlRunner.ExecuteAsync(func, _connectionString, true, cancellationToken);
+ }
+ }
+}
\ No newline at end of file
diff --git a/Workers/Support/Quidjibo.SqlServer/Providers/SqlScheduleProvider.cs b/Workers/Support/Quidjibo.SqlServer/Providers/SqlScheduleProvider.cs
new file mode 100644
index 000000000..feec4411b
--- /dev/null
+++ b/Workers/Support/Quidjibo.SqlServer/Providers/SqlScheduleProvider.cs
@@ -0,0 +1,164 @@
+using System;
+using System.Collections.Generic;
+using Microsoft.Data.SqlClient;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.Extensions.Logging;
+using Quidjibo.Models;
+using Quidjibo.Providers;
+using Quidjibo.SqlServer.Extensions;
+using Quidjibo.SqlServer.Utils;
+
+namespace Quidjibo.SqlServer.Providers
+{
+ public class SqlScheduleProvider : IScheduleProvider
+ {
+ private readonly string _connectionString;
+ private readonly ILogger _logger;
+ private readonly string[] _queues;
+
+ private string _receiveSql;
+
+ public SqlScheduleProvider(
+ ILogger logger,
+ string connectionString,
+ string[] queues)
+ {
+ _logger = logger;
+ _connectionString = connectionString;
+ _queues = queues;
+ }
+
+ public async Task> ReceiveAsync(CancellationToken cancellationToken = default(CancellationToken))
+ {
+ var receiveOn = DateTime.UtcNow;
+
+ if (_receiveSql == null)
+ {
+ _receiveSql = await SqlLoader.GetScript("Schedule.Receive");
+ if (_queues.Length > 0)
+ {
+ _receiveSql = _receiveSql.Replace("@Queue1",
+ string.Join(",", _queues.Select((x, i) => $"@Queue{i}")));
+ }
+ }
+
+ var items = new List(100);
+ await ExecuteAsync(async cmd =>
+ {
+ cmd.CommandText = _receiveSql;
+ cmd.AddParameter("@Take", 100);
+ cmd.AddParameter("@ReceiveOn", receiveOn);
+ cmd.AddParameter("@VisibleOn", receiveOn.AddSeconds(60));
+
+ // dynamic parameters
+ _queues.Select((q, i) => cmd.Parameters.AddWithValue($"@Queue{i}", q)).ToList();
+ using (var rdr = await cmd.ExecuteReaderAsync(cancellationToken))
+ {
+ while (await rdr.ReadAsync(cancellationToken))
+ {
+ var item = MapScheduleItem(rdr);
+ items.Add(item);
+ }
+ }
+ }, cancellationToken);
+ return items;
+ }
+
+ public async Task CompleteAsync(ScheduleItem item, CancellationToken cancellationToken = default(CancellationToken))
+ {
+ await ExecuteAsync(async cmd =>
+ {
+ cmd.CommandText = await SqlLoader.GetScript("Schedule.Complete");
+ cmd.AddParameter("@Id", item.Id);
+ cmd.AddParameter("@EnqueueOn", item.EnqueueOn);
+ cmd.AddParameter("@EnqueuedOn", item.EnqueuedOn);
+ await cmd.ExecuteNonQueryAsync(cancellationToken);
+ }, cancellationToken);
+ }
+
+ public async Task CreateAsync(ScheduleItem item, CancellationToken cancellationToken = default(CancellationToken))
+ {
+ await ExecuteAsync(async cmd =>
+ {
+ cmd.CommandText = await SqlLoader.GetScript("Schedule.Create");
+ cmd.AddParameter("@Id", item.Id);
+ cmd.AddParameter("@Name", item.Name);
+ cmd.AddParameter("@Queue", item.Queue);
+ cmd.AddParameter("@CronExpression", item.CronExpression);
+ cmd.AddParameter("@CreatedOn", item.CreatedOn);
+ cmd.AddParameter("@EnqueuedOn", item.EnqueuedOn);
+ cmd.AddParameter("@EnqueueOn", item.EnqueueOn);
+ cmd.AddParameter("@VisibleOn", item.VisibleOn);
+ cmd.AddParameter("@Payload", item.Payload);
+ await cmd.ExecuteNonQueryAsync(cancellationToken);
+ }, cancellationToken);
+ }
+
+ public async Task LoadByNameAsync(string name, CancellationToken cancellationToken = default(CancellationToken))
+ {
+ var item = default(ScheduleItem);
+ await ExecuteAsync(async cmd =>
+ {
+ cmd.CommandText = await SqlLoader.GetScript("Schedule.LoadByName");
+ cmd.AddParameter("@Name", name);
+ using (var rdr = await cmd.ExecuteReaderAsync(cancellationToken))
+ {
+ if (await rdr.ReadAsync(cancellationToken))
+ {
+ item = MapScheduleItem(rdr);
+ }
+ }
+ }, cancellationToken);
+ return item;
+ }
+
+ public Task UpdateAsync(ScheduleItem item, CancellationToken cancellationToken = default(CancellationToken))
+ {
+ throw new NotImplementedException();
+ }
+
+ public async Task DeleteAsync(Guid id, CancellationToken cancellationToken = default(CancellationToken))
+ {
+ await ExecuteAsync(async cmd =>
+ {
+ cmd.CommandText = await SqlLoader.GetScript("Schedule.Delete");
+ cmd.AddParameter("@Id", id);
+ await cmd.ExecuteNonQueryAsync(cancellationToken);
+ }, cancellationToken);
+ }
+
+ public async Task ExistsAsync(string name, CancellationToken cancellationToken = default(CancellationToken))
+ {
+ var count = 0;
+ await ExecuteAsync(async cmd =>
+ {
+ cmd.CommandText = await SqlLoader.GetScript("Schedule.Exists");
+ cmd.AddParameter("@Name", name);
+ count = (int)await cmd.ExecuteScalarAsync(cancellationToken);
+ }, cancellationToken);
+ return count > 0;
+ }
+
+ private ScheduleItem MapScheduleItem(SqlDataReader rdr)
+ {
+ return new ScheduleItem
+ {
+ CreatedOn = rdr.Map(nameof(ScheduleItem.CreatedOn)),
+ CronExpression = rdr.Map(nameof(ScheduleItem.CronExpression)),
+ EnqueuedOn = rdr.Map(nameof(ScheduleItem.EnqueuedOn)),
+ EnqueueOn = rdr.Map(nameof(ScheduleItem.EnqueueOn)),
+ Id = rdr.Map(nameof(ScheduleItem.Id)),
+ Name = rdr.Map(nameof(ScheduleItem.Name)),
+ Payload = rdr.Map(nameof(ScheduleItem.Payload)),
+ Queue = rdr.Map(nameof(ScheduleItem.Queue))
+ };
+ }
+
+ private Task ExecuteAsync(Func func, CancellationToken cancellationToken)
+ {
+ return SqlRunner.ExecuteAsync(func, _connectionString, true, cancellationToken);
+ }
+ }
+}
\ No newline at end of file
diff --git a/Workers/Support/Quidjibo.SqlServer/Providers/SqlWorkProvider.cs b/Workers/Support/Quidjibo.SqlServer/Providers/SqlWorkProvider.cs
new file mode 100644
index 000000000..edc79f5c3
--- /dev/null
+++ b/Workers/Support/Quidjibo.SqlServer/Providers/SqlWorkProvider.cs
@@ -0,0 +1,160 @@
+// // Copyright (c) smiggleworth. All rights reserved.
+// // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
+
+using System;
+using System.Collections.Generic;
+using Microsoft.Data.SqlClient;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.Extensions.Logging;
+using Quidjibo.Models;
+using Quidjibo.Providers;
+using Quidjibo.SqlServer.Extensions;
+using Quidjibo.SqlServer.Utils;
+
+namespace Quidjibo.SqlServer.Providers
+{
+ public class SqlWorkProvider : IWorkProvider
+ {
+ public enum StatusFlags
+ {
+ Faulted = -1,
+ New = 0,
+ InFlight = 1,
+ Complete = 2
+ }
+
+ public const int DEFAULT_EXPIRE_DAYS = 7;
+
+ private readonly int _batchSize;
+ private readonly string _connectionString;
+ private readonly int _daysToKeep;
+ private readonly ILogger _logger;
+ private readonly int _maxAttempts;
+ private readonly string[] _queues;
+ private readonly int _visibilityTimeout;
+ private string _receiveSql;
+
+ public SqlWorkProvider(
+ ILogger logger,
+ string connectionString,
+ string[] queues,
+ int visibilityTimeout,
+ int batchSize,
+ int daysToKeep
+ )
+ {
+ _logger = logger;
+ _queues = queues;
+ _visibilityTimeout = visibilityTimeout;
+ _batchSize = batchSize;
+ _maxAttempts = 10;
+ _connectionString = connectionString;
+ _daysToKeep = Math.Abs(daysToKeep);
+ }
+
+ public async Task SendAsync(WorkItem item, int delay, CancellationToken cancellationToken)
+ {
+ await ExecuteAsync(async cmd =>
+ {
+ await cmd.PrepareForSendAsync(item, delay, cancellationToken);
+ await cmd.ExecuteNonQueryAsync(cancellationToken);
+ }, cancellationToken).ConfigureAwait(false);
+ }
+
+ public async Task> ReceiveAsync(string worker, CancellationToken cancellationToken)
+ {
+ var receiveOn = DateTime.UtcNow;
+ var deleteOn = _daysToKeep > 0 ? receiveOn.AddDays(-_daysToKeep) : receiveOn.AddHours(-1);
+
+ if (_receiveSql == null)
+ {
+ _receiveSql = await SqlLoader.GetScript("Work.Receive");
+ if (_queues.Length > 0)
+ {
+ _receiveSql = _receiveSql.Replace("@Queue1",
+ string.Join(",", _queues.Select((x, i) => $"@Queue{i}")));
+ }
+ }
+
+ var workItems = new List(_batchSize);
+ await ExecuteAsync(async cmd =>
+ {
+ cmd.CommandText = _receiveSql;
+ cmd.AddParameter("@Worker", worker);
+ cmd.AddParameter("@Take", _batchSize);
+ cmd.AddParameter("@InFlight", StatusFlags.InFlight);
+ cmd.AddParameter("@VisibleOn", receiveOn.AddSeconds(Math.Max(_visibilityTimeout, 30)));
+ cmd.AddParameter("@ReceiveOn", receiveOn);
+ cmd.AddParameter("@MaxAttempts", _maxAttempts);
+ cmd.AddParameter("@DeleteOn", deleteOn);
+
+ // dynamic parameters
+ for (var i = 0; i < _queues.Length; i++)
+ {
+ cmd.Parameters.AddWithValue($"@Queue{i}", _queues[i]);
+ }
+
+ using (var rdr = await cmd.ExecuteReaderAsync(cancellationToken))
+ {
+ while (await rdr.ReadAsync(cancellationToken))
+ {
+ var workItem = new WorkItem
+ {
+ Attempts = rdr.Map(nameof(WorkItem.Attempts)),
+ CorrelationId = rdr.Map(nameof(WorkItem.CorrelationId)),
+ ExpireOn = rdr.Map(nameof(WorkItem.ExpireOn)),
+ Id = rdr.Map(nameof(WorkItem.Id)),
+ Name = rdr.Map(nameof(WorkItem.Name)),
+ Payload = rdr.Map(nameof(WorkItem.Payload)),
+ Queue = rdr.Map(nameof(WorkItem.Queue)),
+ ScheduleId = rdr.Map(nameof(WorkItem.ScheduleId))
+ };
+ workItem.Token = workItem.Id.ToString();
+ workItems.Add(workItem);
+ }
+ }
+ }, cancellationToken);
+ return workItems;
+ }
+
+ public async Task RenewAsync(WorkItem item, CancellationToken cancellationToken)
+ {
+ var lockExpireOn = (item.VisibleOn ?? DateTime.UtcNow).AddSeconds(Math.Max(_visibilityTimeout, 30));
+ await ExecuteAsync(async cmd =>
+ {
+ await cmd.PrepareForRenewAsync(item, lockExpireOn, cancellationToken);
+ await cmd.ExecuteNonQueryAsync(cancellationToken);
+ }, cancellationToken);
+ return lockExpireOn;
+ }
+
+ public async Task CompleteAsync(WorkItem item, CancellationToken cancellationToken)
+ {
+ await ExecuteAsync(async cmd =>
+ {
+ await cmd.PrepareForCompleteAsync(item, cancellationToken);
+ await cmd.ExecuteNonQueryAsync(cancellationToken);
+ }, cancellationToken);
+ }
+
+ public async Task FaultAsync(WorkItem item, CancellationToken cancellationToken)
+ {
+ await ExecuteAsync(async cmd =>
+ {
+ await cmd.PrepareForFaultAsync(item, _visibilityTimeout, cancellationToken);
+ await cmd.ExecuteNonQueryAsync(cancellationToken);
+ }, cancellationToken);
+ }
+
+ public void Dispose()
+ {
+ }
+
+ private Task ExecuteAsync(Func func, CancellationToken cancellationToken)
+ {
+ return SqlRunner.ExecuteAsync(func, _connectionString, true, cancellationToken);
+ }
+ }
+}
\ No newline at end of file
diff --git a/Workers/Support/Quidjibo.SqlServer/Quidjibo.SqlServer.csproj b/Workers/Support/Quidjibo.SqlServer/Quidjibo.SqlServer.csproj
new file mode 100644
index 000000000..0735562f0
--- /dev/null
+++ b/Workers/Support/Quidjibo.SqlServer/Quidjibo.SqlServer.csproj
@@ -0,0 +1,28 @@
+
+
+
+ net9.0
+ Smiggleworth
+ Copyright © 2017
+ Quidjibo delivers flexible async background jobs for .NET applications. Vendored SqlServer provider using Microsoft.Data.SqlClient (the legacy System.Data.SqlClient fails with a SqlGuidCaster TypeLoadException on .NET 8+).
+ https://quidjibo.com/
+ https://github.com/smiggleworth/Quidjibo
+ Smiggleworth
+ async tasks workers cron
+ false
+ 1.0.0
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/Workers/Support/Quidjibo.SqlServer/Scripts/Progress/Create.sql b/Workers/Support/Quidjibo.SqlServer/Scripts/Progress/Create.sql
new file mode 100644
index 000000000..f8ac63068
--- /dev/null
+++ b/Workers/Support/Quidjibo.SqlServer/Scripts/Progress/Create.sql
@@ -0,0 +1,22 @@
+INSERT INTO [Quidjibo].[Progress]
+(
+ [Id],
+ [WorkId],
+ [CorrelationId],
+ [Name],
+ [Queue],
+ [Note],
+ [Value],
+ [RecordedOn]
+)
+VALUES
+(
+ @Id,
+ @WorkId,
+ @CorrelationId,
+ @Name,
+ @Queue,
+ @Note,
+ @Value,
+ @RecordedOn
+);
\ No newline at end of file
diff --git a/Workers/Support/Quidjibo.SqlServer/Scripts/Progress/LoadByCorrelationId.sql b/Workers/Support/Quidjibo.SqlServer/Scripts/Progress/LoadByCorrelationId.sql
new file mode 100644
index 000000000..0debae0ef
--- /dev/null
+++ b/Workers/Support/Quidjibo.SqlServer/Scripts/Progress/LoadByCorrelationId.sql
@@ -0,0 +1,3 @@
+SELECT [Id], [WorkId], [CorrelationId], [Name], [Queue], [Note], [Value], [RecordedOn]
+FROM [Quidjibo].[Progress] WITH (READPAST)
+WHERE [CorrelationId] = @CorrelationId
\ No newline at end of file
diff --git a/Workers/Support/Quidjibo.SqlServer/Scripts/Progress/Setup.sql b/Workers/Support/Quidjibo.SqlServer/Scripts/Progress/Setup.sql
new file mode 100644
index 000000000..72b161503
--- /dev/null
+++ b/Workers/Support/Quidjibo.SqlServer/Scripts/Progress/Setup.sql
@@ -0,0 +1,28 @@
+IF NOT EXISTS (SELECT * FROM sys.objects WHERE object_id = OBJECT_ID(N'[Quidjibo].[Progress]') AND type in (N'U'))
+BEGIN
+CREATE TABLE [Quidjibo].[Progress](
+ [Id] [UNIQUEIDENTIFIER] NOT NULL,
+ [Sequence] [bigint] IDENTITY(1,1) NOT NULL,
+ [WorkId] [uniqueidentifier] NULL,
+ [CorrelationId] [uniqueidentifier] NOT NULL,
+ [Name] [nvarchar](250) NOT NULL,
+ [Queue] [nvarchar](250) NOT NULL,
+ [Note] [nvarchar](250) NOT NULL,
+ [Value] [int] NOT NULL,
+ [RecordedOn] [datetime] NULL,
+CONSTRAINT [PK_Progress] PRIMARY KEY NONCLUSTERED
+(
+ [Id] ASC
+)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY],
+ CONSTRAINT [IX_Progress_Sequence] UNIQUE CLUSTERED
+(
+ [Sequence] ASC
+)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
+) ON [PRIMARY]
+END
+
+IF NOT EXISTS (SELECT * FROM sys.indexes WHERE object_id = OBJECT_ID(N'[Quidjibo].[Progress]') AND name = N'IX_Progress_CorrelationId')
+CREATE NONCLUSTERED INDEX [IX_Progress_CorrelationId] ON [Quidjibo].[Progress]
+(
+ [CorrelationId] ASC
+)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, SORT_IN_TEMPDB = OFF, DROP_EXISTING = OFF, ONLINE = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
diff --git a/Workers/Support/Quidjibo.SqlServer/Scripts/Schedule/Complete.sql b/Workers/Support/Quidjibo.SqlServer/Scripts/Schedule/Complete.sql
new file mode 100644
index 000000000..0c6671d18
--- /dev/null
+++ b/Workers/Support/Quidjibo.SqlServer/Scripts/Schedule/Complete.sql
@@ -0,0 +1,5 @@
+UPDATE schdl
+SET [EnqueuedOn] = @EnqueuedOn,
+ [EnqueueOn] = @EnqueueOn
+FROM [Quidjibo].[Schedule] schdl WITH (ROWLOCK, UPDLOCK)
+WHERE [Id] = @Id
\ No newline at end of file
diff --git a/Workers/Support/Quidjibo.SqlServer/Scripts/Schedule/Create.sql b/Workers/Support/Quidjibo.SqlServer/Scripts/Schedule/Create.sql
new file mode 100644
index 000000000..df588fc61
--- /dev/null
+++ b/Workers/Support/Quidjibo.SqlServer/Scripts/Schedule/Create.sql
@@ -0,0 +1,24 @@
+INSERT INTO [Quidjibo].[Schedule]
+(
+ [Id],
+ [Name],
+ [Queue],
+ [CronExpression],
+ [CreatedOn],
+ [EnqueueOn],
+ [EnqueuedOn],
+ [VisibleOn],
+ [Payload]
+)
+VALUES
+(
+ @Id,
+ @Name,
+ @Queue,
+ @CronExpression,
+ @CreatedOn,
+ @EnqueueOn,
+ @EnqueuedOn,
+ @VisibleOn,
+ @Payload
+);
\ No newline at end of file
diff --git a/Workers/Support/Quidjibo.SqlServer/Scripts/Schedule/Delete.sql b/Workers/Support/Quidjibo.SqlServer/Scripts/Schedule/Delete.sql
new file mode 100644
index 000000000..96806ed30
--- /dev/null
+++ b/Workers/Support/Quidjibo.SqlServer/Scripts/Schedule/Delete.sql
@@ -0,0 +1 @@
+DELETE FROM [Quidjibo].[Schedule] WHERE [Id] = @Id
\ No newline at end of file
diff --git a/Workers/Support/Quidjibo.SqlServer/Scripts/Schedule/Exists.sql b/Workers/Support/Quidjibo.SqlServer/Scripts/Schedule/Exists.sql
new file mode 100644
index 000000000..506b4cb58
--- /dev/null
+++ b/Workers/Support/Quidjibo.SqlServer/Scripts/Schedule/Exists.sql
@@ -0,0 +1,3 @@
+SELECT COUNT(*)
+FROM [Quidjibo].[Schedule]
+WHERE [Name] = @Name
\ No newline at end of file
diff --git a/Workers/Support/Quidjibo.SqlServer/Scripts/Schedule/List.sql b/Workers/Support/Quidjibo.SqlServer/Scripts/Schedule/List.sql
new file mode 100644
index 000000000..9a56d37f1
--- /dev/null
+++ b/Workers/Support/Quidjibo.SqlServer/Scripts/Schedule/List.sql
@@ -0,0 +1 @@
+SELECT TOP(@Take) [Quidjibo].[Schedule].*
diff --git a/Workers/Support/Quidjibo.SqlServer/Scripts/Schedule/Load.sql b/Workers/Support/Quidjibo.SqlServer/Scripts/Schedule/Load.sql
new file mode 100644
index 000000000..54d6d8225
--- /dev/null
+++ b/Workers/Support/Quidjibo.SqlServer/Scripts/Schedule/Load.sql
@@ -0,0 +1,3 @@
+SELECT *
+FROM [Quidjibo].[Schedule]
+WHERE [Id] = @Id
\ No newline at end of file
diff --git a/Workers/Support/Quidjibo.SqlServer/Scripts/Schedule/LoadByName.sql b/Workers/Support/Quidjibo.SqlServer/Scripts/Schedule/LoadByName.sql
new file mode 100644
index 000000000..ed465b539
--- /dev/null
+++ b/Workers/Support/Quidjibo.SqlServer/Scripts/Schedule/LoadByName.sql
@@ -0,0 +1,3 @@
+SELECT *
+FROM [Quidjibo].[Schedule]
+WHERE [Name] = @Name
\ No newline at end of file
diff --git a/Workers/Support/Quidjibo.SqlServer/Scripts/Schedule/Receive.sql b/Workers/Support/Quidjibo.SqlServer/Scripts/Schedule/Receive.sql
new file mode 100644
index 000000000..d02e7e827
--- /dev/null
+++ b/Workers/Support/Quidjibo.SqlServer/Scripts/Schedule/Receive.sql
@@ -0,0 +1,12 @@
+WITH schdl AS
+(
+ SELECT TOP(@Take) [Quidjibo].[Schedule].*
+ FROM [Quidjibo].[Schedule] WITH (ROWLOCK, READPAST, UPDLOCK)
+ WHERE [VisibleOn] < @ReceiveOn
+ AND [EnqueueOn] < @ReceiveOn
+ AND [Queue] IN (@Queue1)
+ ORDER BY [Sequence]
+)
+UPDATE schdl
+SET [VisibleOn] = @VisibleOn
+OUTPUT inserted.*
\ No newline at end of file
diff --git a/Workers/Support/Quidjibo.SqlServer/Scripts/Schedule/Setup.sql b/Workers/Support/Quidjibo.SqlServer/Scripts/Schedule/Setup.sql
new file mode 100644
index 000000000..e01f07429
--- /dev/null
+++ b/Workers/Support/Quidjibo.SqlServer/Scripts/Schedule/Setup.sql
@@ -0,0 +1,23 @@
+IF NOT EXISTS (SELECT * FROM sys.objects WHERE object_id = OBJECT_ID(N'[Quidjibo].[Schedule]') AND type in (N'U'))
+BEGIN
+CREATE TABLE [Quidjibo].[Schedule](
+ [Id] [uniqueidentifier] NOT NULL,
+ [Sequence] [bigint] IDENTITY(1,1) NOT NULL,
+ [Name] [nvarchar](250) NOT NULL,
+ [Queue] [nvarchar](250) NOT NULL,
+ [CronExpression] [nvarchar](50) NOT NULL,
+ [CreatedOn] [datetime] NULL,
+ [EnqueueOn] [datetime] NULL,
+ [EnqueuedOn] [datetime] NULL,
+ [VisibleOn] [datetime] NULL,
+ [Payload] [varbinary](max) NOT NULL,
+ CONSTRAINT [PK_Schedule] PRIMARY KEY NONCLUSTERED
+(
+ [Id] ASC
+)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY],
+ CONSTRAINT [IX_Schedule_Sequence] UNIQUE CLUSTERED
+(
+ [Sequence] ASC
+)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
+) ON [PRIMARY] TEXTIMAGE_ON [PRIMARY]
+END
\ No newline at end of file
diff --git a/Workers/Support/Quidjibo.SqlServer/Scripts/Schema/Setup.sql b/Workers/Support/Quidjibo.SqlServer/Scripts/Schema/Setup.sql
new file mode 100644
index 000000000..3691c8efe
--- /dev/null
+++ b/Workers/Support/Quidjibo.SqlServer/Scripts/Schema/Setup.sql
@@ -0,0 +1,4 @@
+IF NOT EXISTS (SELECT * FROM sys.schemas WHERE name = N'Quidjibo')
+BEGIN
+EXEC sys.sp_executesql N'CREATE SCHEMA [Quidjibo]'
+END
\ No newline at end of file
diff --git a/Workers/Support/Quidjibo.SqlServer/Scripts/Work/Complete.sql b/Workers/Support/Quidjibo.SqlServer/Scripts/Work/Complete.sql
new file mode 100644
index 000000000..568f5b5be
--- /dev/null
+++ b/Workers/Support/Quidjibo.SqlServer/Scripts/Work/Complete.sql
@@ -0,0 +1,6 @@
+UPDATE wrk
+SET [ExpireOn] = NULL,
+ [VisibleOn] = Null,
+ [Status] = @Complete
+FROM [Quidjibo].[Work] wrk WITH (ROWLOCK, UPDLOCK)
+WHERE [Id] = @Id
\ No newline at end of file
diff --git a/Workers/Support/Quidjibo.SqlServer/Scripts/Work/Fault.sql b/Workers/Support/Quidjibo.SqlServer/Scripts/Work/Fault.sql
new file mode 100644
index 000000000..1bd71af48
--- /dev/null
+++ b/Workers/Support/Quidjibo.SqlServer/Scripts/Work/Fault.sql
@@ -0,0 +1,5 @@
+UPDATE wrk
+SET [VisibleOn] = @VisibleOn,
+ [Status] = @Faulted
+FROM [Quidjibo].[Work] wrk WITH (ROWLOCK, READPAST, UPDLOCK)
+WHERE [Id] = @Id
\ No newline at end of file
diff --git a/Workers/Support/Quidjibo.SqlServer/Scripts/Work/List.sql b/Workers/Support/Quidjibo.SqlServer/Scripts/Work/List.sql
new file mode 100644
index 000000000..c4ccd9cae
--- /dev/null
+++ b/Workers/Support/Quidjibo.SqlServer/Scripts/Work/List.sql
@@ -0,0 +1 @@
+-- List of work items in all states
\ No newline at end of file
diff --git a/Workers/Support/Quidjibo.SqlServer/Scripts/Work/Receive.sql b/Workers/Support/Quidjibo.SqlServer/Scripts/Work/Receive.sql
new file mode 100644
index 000000000..d720f802a
--- /dev/null
+++ b/Workers/Support/Quidjibo.SqlServer/Scripts/Work/Receive.sql
@@ -0,0 +1,20 @@
+DELETE TOP(100) FROM [Quidjibo].[Work] WITH (ROWLOCK, READPAST, UPDLOCK)
+WHERE [CreatedOn] < @DeleteOn
+ AND ([ExpireOn] IS NULL OR [ExpireOn] < @DeleteOn);
+
+WITH wrk AS
+(
+ SELECT TOP (@Take) [Quidjibo].[Work].*
+ FROM [Quidjibo].[Work] WITH (ROWLOCK, READPAST, UPDLOCK)
+ WHERE [Status] < 2
+ AND [Attempts] < @MaxAttempts
+ AND [VisibleOn] < @ReceiveOn
+ AND [Queue] IN (@Queue1)
+ ORDER BY [Sequence]
+)
+UPDATE wrk
+SET [Worker] = @Worker,
+ [VisibleOn] = @VisibleOn,
+ [Status] = @InFlight,
+ [Attempts] = [Attempts] + 1
+OUTPUT inserted.*
\ No newline at end of file
diff --git a/Workers/Support/Quidjibo.SqlServer/Scripts/Work/Renew.sql b/Workers/Support/Quidjibo.SqlServer/Scripts/Work/Renew.sql
new file mode 100644
index 000000000..31d04b0e5
--- /dev/null
+++ b/Workers/Support/Quidjibo.SqlServer/Scripts/Work/Renew.sql
@@ -0,0 +1,4 @@
+UPDATE wrk
+SET [VisibleOn] = @VisibleOn
+FROM [Quidjibo].[Work] wrk
+WHERE [Id] = @Id
\ No newline at end of file
diff --git a/Workers/Support/Quidjibo.SqlServer/Scripts/Work/Send.sql b/Workers/Support/Quidjibo.SqlServer/Scripts/Work/Send.sql
new file mode 100644
index 000000000..ef1bcb143
--- /dev/null
+++ b/Workers/Support/Quidjibo.SqlServer/Scripts/Work/Send.sql
@@ -0,0 +1,28 @@
+INSERT INTO [Quidjibo].[Work]
+(
+ [Id],
+ [ScheduleId],
+ [CorrelationId],
+ [Name],
+ [Queue],
+ [Attempts],
+ [CreatedOn],
+ [ExpireOn],
+ [VisibleOn],
+ [Status],
+ [Payload]
+)
+VALUES
+(
+ @Id,
+ @ScheduleId,
+ @CorrelationId,
+ @Name,
+ @Queue,
+ @Attempts,
+ @CreatedOn,
+ @ExpireOn,
+ @VisibleOn,
+ @Status,
+ @Payload
+)
\ No newline at end of file
diff --git a/Workers/Support/Quidjibo.SqlServer/Scripts/Work/Setup.sql b/Workers/Support/Quidjibo.SqlServer/Scripts/Work/Setup.sql
new file mode 100644
index 000000000..3dca3c58f
--- /dev/null
+++ b/Workers/Support/Quidjibo.SqlServer/Scripts/Work/Setup.sql
@@ -0,0 +1,42 @@
+IF NOT EXISTS (SELECT * FROM sys.objects WHERE object_id = OBJECT_ID(N'[Quidjibo].[Work]') AND type in (N'U'))
+BEGIN
+CREATE TABLE [Quidjibo].[Work](
+ [Id] [uniqueidentifier] NOT NULL,
+ [Sequence] [bigint] IDENTITY(1,1) NOT NULL,
+ [ScheduleId] [uniqueidentifier] NULL,
+ [CorrelationId] [uniqueidentifier] NOT NULL,
+ [Name] [nvarchar](250) NOT NULL,
+ [Worker] [nvarchar](250) NULL,
+ [Queue] [nvarchar](250) NOT NULL,
+ [Status] [int] NOT NULL,
+ [Attempts] [int] NOT NULL,
+ [CreatedOn] [datetime] NULL,
+ [ExpireOn] [datetime] NULL,
+ [VisibleOn] [datetime] NULL,
+ [Payload] [varbinary](max) NOT NULL
+ CONSTRAINT [PK_Work] PRIMARY KEY NONCLUSTERED
+(
+ [Id] ASC
+)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY],
+ CONSTRAINT [IX_Work_Sequence] UNIQUE CLUSTERED
+(
+ [Sequence] ASC
+)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
+) ON [PRIMARY]
+END
+
+IF NOT EXISTS (SELECT * FROM sys.indexes WHERE object_id = OBJECT_ID(N'[Quidjibo].[Work]') AND name = N'IX_Work_Receive')
+CREATE NONCLUSTERED INDEX [IX_Work_Receive] ON [Quidjibo].[Work]
+(
+ [Queue],
+ [Status],
+ [Attempts],
+ [VisibleOn]
+)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, SORT_IN_TEMPDB = OFF, DROP_EXISTING = OFF, ONLINE = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
+
+IF NOT EXISTS (SELECT * FROM sys.indexes WHERE object_id = OBJECT_ID(N'[Quidjibo].[Work]') AND name = N'IX_Work_Cleanup')
+CREATE NONCLUSTERED INDEX [IX_Work_Cleanup] ON [Quidjibo].[Work]
+(
+ [CreatedOn] ASC,
+ [ExpireOn] ASC
+)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, SORT_IN_TEMPDB = OFF, DROP_EXISTING = OFF, ONLINE = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
diff --git a/Workers/Support/Quidjibo.SqlServer/Utils/SqlLoader.cs b/Workers/Support/Quidjibo.SqlServer/Utils/SqlLoader.cs
new file mode 100644
index 000000000..532c14bd3
--- /dev/null
+++ b/Workers/Support/Quidjibo.SqlServer/Utils/SqlLoader.cs
@@ -0,0 +1,38 @@
+using System;
+using System.Collections.Concurrent;
+using System.IO;
+using System.Reflection;
+using System.Threading.Tasks;
+
+namespace Quidjibo.SqlServer.Utils
+{
+ public class SqlLoader
+ {
+ private static readonly ConcurrentDictionary Scripts = new ConcurrentDictionary();
+
+ public static async Task GetScript(string scriptName)
+ {
+ if (Scripts.TryGetValue(scriptName, out var script))
+ {
+ return script;
+ }
+
+ var assembly = typeof(SqlLoader).GetTypeInfo().Assembly;
+ var fullName = $"Quidjibo.SqlServer.Scripts.{scriptName}.sql";
+ var stream = assembly.GetManifestResourceStream(fullName);
+ if (stream == null)
+ {
+ throw new InvalidOperationException($"Embedded SQL script '{fullName}' was not found in assembly '{assembly.GetName().Name}'.");
+ }
+
+ using (stream)
+ using (var reader = new StreamReader(stream))
+ {
+ script = await reader.ReadToEndAsync();
+ Scripts.TryAdd(scriptName, script);
+ }
+
+ return script;
+ }
+ }
+}
\ No newline at end of file
diff --git a/Workers/Support/Quidjibo.SqlServer/Utils/SqlRunner.cs b/Workers/Support/Quidjibo.SqlServer/Utils/SqlRunner.cs
new file mode 100644
index 000000000..e821bbc26
--- /dev/null
+++ b/Workers/Support/Quidjibo.SqlServer/Utils/SqlRunner.cs
@@ -0,0 +1,69 @@
+using System;
+using Microsoft.Data.SqlClient;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Quidjibo.SqlServer.Utils
+{
+ public static class SqlRunner
+ {
+ public static Task ExecuteAsync(Func func, string connectionString, CancellationToken cancellationToken)
+ {
+ return ExecuteAsync(func, connectionString, true, cancellationToken);
+ }
+
+ public static async Task ExecuteAsync(Func func, string connectionString, bool inTransaction, CancellationToken cancellationToken)
+ {
+ using (var conn = new SqlConnection(connectionString))
+ using (var cmd = conn.CreateCommand())
+ {
+ {
+ await conn.OpenAsync(cancellationToken);
+ if (inTransaction)
+ {
+ using (var tran = conn.BeginTransaction())
+ {
+ try
+ {
+ cmd.Transaction = tran;
+ await func(cmd);
+ tran.Commit();
+ }
+ catch
+ {
+ try
+ {
+ tran.Rollback();
+ }
+ catch
+ {
+ // Rollback can fail if the transaction/connection is already
+ // aborted; swallow that secondary error so the original
+ // exception below is preserved rather than masked.
+ }
+
+ throw;
+ }
+ }
+
+ return;
+ }
+
+ await func(cmd);
+ }
+ }
+ }
+
+
+ public static T Map(this SqlDataReader rdr, string name)
+ {
+ var val = rdr[name];
+ if (val == DBNull.Value)
+ {
+ return default(T);
+ }
+
+ return (T)val;
+ }
+ }
+}
\ No newline at end of file