From b54a5b0074c87d4c5e470ae95ed262dbaf82b322 Mon Sep 17 00:00:00 2001 From: Shawn Jackson Date: Mon, 29 Jun 2026 14:54:27 -0500 Subject: [PATCH 1/2] RE1-T122 Trying to fix sql server update issue with worker --- Resgrid.sln | 39 +++++ .../Resgrid.Workers.Console.csproj | 2 +- .../SqlServerQuidjiboConfiguration.cs | 48 +++++ .../Extensions/QuidjiboBuilderExtensions.cs | 95 ++++++++++ .../Extensions/SqlCommandExtensions.cs | 71 ++++++++ .../SqlParameterCollectionExtensions.cs | 13 ++ .../Factories/SqlProgressProviderFactory.cs | 59 +++++++ .../Factories/SqlScheduleProviderFactory.cs | 54 ++++++ .../Factories/SqlWorkProviderFactory.cs | 63 +++++++ .../Providers/SqlProgressProvider.cs | 79 +++++++++ .../Providers/SqlScheduleProvider.cs | 164 ++++++++++++++++++ .../Providers/SqlWorkProvider.cs | 160 +++++++++++++++++ .../Quidjibo.SqlServer.csproj | 28 +++ .../Scripts/Progress/Create.sql | 22 +++ .../Scripts/Progress/LoadByCorrelationId.sql | 3 + .../Scripts/Progress/Setup.sql | 28 +++ .../Scripts/Schedule/Complete.sql | 5 + .../Scripts/Schedule/Create.sql | 24 +++ .../Scripts/Schedule/Delete.sql | 1 + .../Scripts/Schedule/Exists.sql | 3 + .../Scripts/Schedule/List.sql | 1 + .../Scripts/Schedule/Load.sql | 3 + .../Scripts/Schedule/LoadByName.sql | 3 + .../Scripts/Schedule/Receive.sql | 12 ++ .../Scripts/Schedule/Setup.sql | 23 +++ .../Scripts/Schema/Setup.sql | 4 + .../Scripts/Work/Complete.sql | 6 + .../Quidjibo.SqlServer/Scripts/Work/Fault.sql | 5 + .../Quidjibo.SqlServer/Scripts/Work/List.sql | 1 + .../Scripts/Work/Receive.sql | 20 +++ .../Quidjibo.SqlServer/Scripts/Work/Renew.sql | 4 + .../Quidjibo.SqlServer/Scripts/Work/Send.sql | 28 +++ .../Quidjibo.SqlServer/Scripts/Work/Setup.sql | 42 +++++ .../Quidjibo.SqlServer/Utils/SqlLoader.cs | 31 ++++ .../Quidjibo.SqlServer/Utils/SqlRunner.cs | 59 +++++++ 35 files changed, 1202 insertions(+), 1 deletion(-) create mode 100644 Workers/Support/Quidjibo.SqlServer/Configurations/SqlServerQuidjiboConfiguration.cs create mode 100644 Workers/Support/Quidjibo.SqlServer/Extensions/QuidjiboBuilderExtensions.cs create mode 100644 Workers/Support/Quidjibo.SqlServer/Extensions/SqlCommandExtensions.cs create mode 100644 Workers/Support/Quidjibo.SqlServer/Extensions/SqlParameterCollectionExtensions.cs create mode 100644 Workers/Support/Quidjibo.SqlServer/Factories/SqlProgressProviderFactory.cs create mode 100644 Workers/Support/Quidjibo.SqlServer/Factories/SqlScheduleProviderFactory.cs create mode 100644 Workers/Support/Quidjibo.SqlServer/Factories/SqlWorkProviderFactory.cs create mode 100644 Workers/Support/Quidjibo.SqlServer/Providers/SqlProgressProvider.cs create mode 100644 Workers/Support/Quidjibo.SqlServer/Providers/SqlScheduleProvider.cs create mode 100644 Workers/Support/Quidjibo.SqlServer/Providers/SqlWorkProvider.cs create mode 100644 Workers/Support/Quidjibo.SqlServer/Quidjibo.SqlServer.csproj create mode 100644 Workers/Support/Quidjibo.SqlServer/Scripts/Progress/Create.sql create mode 100644 Workers/Support/Quidjibo.SqlServer/Scripts/Progress/LoadByCorrelationId.sql create mode 100644 Workers/Support/Quidjibo.SqlServer/Scripts/Progress/Setup.sql create mode 100644 Workers/Support/Quidjibo.SqlServer/Scripts/Schedule/Complete.sql create mode 100644 Workers/Support/Quidjibo.SqlServer/Scripts/Schedule/Create.sql create mode 100644 Workers/Support/Quidjibo.SqlServer/Scripts/Schedule/Delete.sql create mode 100644 Workers/Support/Quidjibo.SqlServer/Scripts/Schedule/Exists.sql create mode 100644 Workers/Support/Quidjibo.SqlServer/Scripts/Schedule/List.sql create mode 100644 Workers/Support/Quidjibo.SqlServer/Scripts/Schedule/Load.sql create mode 100644 Workers/Support/Quidjibo.SqlServer/Scripts/Schedule/LoadByName.sql create mode 100644 Workers/Support/Quidjibo.SqlServer/Scripts/Schedule/Receive.sql create mode 100644 Workers/Support/Quidjibo.SqlServer/Scripts/Schedule/Setup.sql create mode 100644 Workers/Support/Quidjibo.SqlServer/Scripts/Schema/Setup.sql create mode 100644 Workers/Support/Quidjibo.SqlServer/Scripts/Work/Complete.sql create mode 100644 Workers/Support/Quidjibo.SqlServer/Scripts/Work/Fault.sql create mode 100644 Workers/Support/Quidjibo.SqlServer/Scripts/Work/List.sql create mode 100644 Workers/Support/Quidjibo.SqlServer/Scripts/Work/Receive.sql create mode 100644 Workers/Support/Quidjibo.SqlServer/Scripts/Work/Renew.sql create mode 100644 Workers/Support/Quidjibo.SqlServer/Scripts/Work/Send.sql create mode 100644 Workers/Support/Quidjibo.SqlServer/Scripts/Work/Setup.sql create mode 100644 Workers/Support/Quidjibo.SqlServer/Utils/SqlLoader.cs create mode 100644 Workers/Support/Quidjibo.SqlServer/Utils/SqlRunner.cs diff --git a/Resgrid.sln b/Resgrid.sln index 8a8961539..390f6643b 100644 --- a/Resgrid.sln +++ b/Resgrid.sln @@ -106,6 +106,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Resgrid.Chatbot.NLU", "Core EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Resgrid.Providers.Chatbot", "Providers\Resgrid.Providers.Chatbot\Resgrid.Providers.Chatbot.csproj", "{D3E4F5A6-B7C8-9012-CDEF-012345678902}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Quidjibo.SqlServer", "Workers\Support\Quidjibo.SqlServer\Quidjibo.SqlServer.csproj", "{93931385-3360-455F-B051-CF7B56C0ED17}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Azure|Any CPU = Azure|Any CPU @@ -1496,6 +1498,42 @@ Global {D3E4F5A6-B7C8-9012-CDEF-012345678902}.Staging|x86.Build.0 = Debug|Any CPU {D3E4F5A6-B7C8-9012-CDEF-012345678902}.Staging|x64.ActiveCfg = Debug|Any CPU {D3E4F5A6-B7C8-9012-CDEF-012345678902}.Staging|x64.Build.0 = Debug|Any CPU + {93931385-3360-455F-B051-CF7B56C0ED17}.Azure|Any CPU.ActiveCfg = Debug|Any CPU + {93931385-3360-455F-B051-CF7B56C0ED17}.Azure|Any CPU.Build.0 = Debug|Any CPU + {93931385-3360-455F-B051-CF7B56C0ED17}.Azure|x86.ActiveCfg = Debug|Any CPU + {93931385-3360-455F-B051-CF7B56C0ED17}.Azure|x86.Build.0 = Debug|Any CPU + {93931385-3360-455F-B051-CF7B56C0ED17}.Azure|x64.ActiveCfg = Debug|Any CPU + {93931385-3360-455F-B051-CF7B56C0ED17}.Azure|x64.Build.0 = Debug|Any CPU + {93931385-3360-455F-B051-CF7B56C0ED17}.Cloud|Any CPU.ActiveCfg = Debug|Any CPU + {93931385-3360-455F-B051-CF7B56C0ED17}.Cloud|Any CPU.Build.0 = Debug|Any CPU + {93931385-3360-455F-B051-CF7B56C0ED17}.Cloud|x86.ActiveCfg = Debug|Any CPU + {93931385-3360-455F-B051-CF7B56C0ED17}.Cloud|x86.Build.0 = Debug|Any CPU + {93931385-3360-455F-B051-CF7B56C0ED17}.Cloud|x64.ActiveCfg = Debug|Any CPU + {93931385-3360-455F-B051-CF7B56C0ED17}.Cloud|x64.Build.0 = Debug|Any CPU + {93931385-3360-455F-B051-CF7B56C0ED17}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {93931385-3360-455F-B051-CF7B56C0ED17}.Debug|Any CPU.Build.0 = Debug|Any CPU + {93931385-3360-455F-B051-CF7B56C0ED17}.Debug|x86.ActiveCfg = Debug|Any CPU + {93931385-3360-455F-B051-CF7B56C0ED17}.Debug|x86.Build.0 = Debug|Any CPU + {93931385-3360-455F-B051-CF7B56C0ED17}.Debug|x64.ActiveCfg = Debug|Any CPU + {93931385-3360-455F-B051-CF7B56C0ED17}.Debug|x64.Build.0 = Debug|Any CPU + {93931385-3360-455F-B051-CF7B56C0ED17}.Docker|Any CPU.ActiveCfg = Debug|Any CPU + {93931385-3360-455F-B051-CF7B56C0ED17}.Docker|Any CPU.Build.0 = Debug|Any CPU + {93931385-3360-455F-B051-CF7B56C0ED17}.Docker|x86.ActiveCfg = Debug|Any CPU + {93931385-3360-455F-B051-CF7B56C0ED17}.Docker|x86.Build.0 = Debug|Any CPU + {93931385-3360-455F-B051-CF7B56C0ED17}.Docker|x64.ActiveCfg = Debug|Any CPU + {93931385-3360-455F-B051-CF7B56C0ED17}.Docker|x64.Build.0 = Debug|Any CPU + {93931385-3360-455F-B051-CF7B56C0ED17}.Release|Any CPU.ActiveCfg = Release|Any CPU + {93931385-3360-455F-B051-CF7B56C0ED17}.Release|Any CPU.Build.0 = Release|Any CPU + {93931385-3360-455F-B051-CF7B56C0ED17}.Release|x86.ActiveCfg = Release|Any CPU + {93931385-3360-455F-B051-CF7B56C0ED17}.Release|x86.Build.0 = Release|Any CPU + {93931385-3360-455F-B051-CF7B56C0ED17}.Release|x64.ActiveCfg = Release|Any CPU + {93931385-3360-455F-B051-CF7B56C0ED17}.Release|x64.Build.0 = Release|Any CPU + {93931385-3360-455F-B051-CF7B56C0ED17}.Staging|Any CPU.ActiveCfg = Debug|Any CPU + {93931385-3360-455F-B051-CF7B56C0ED17}.Staging|Any CPU.Build.0 = Debug|Any CPU + {93931385-3360-455F-B051-CF7B56C0ED17}.Staging|x86.ActiveCfg = Debug|Any CPU + {93931385-3360-455F-B051-CF7B56C0ED17}.Staging|x86.Build.0 = Debug|Any CPU + {93931385-3360-455F-B051-CF7B56C0ED17}.Staging|x64.ActiveCfg = Debug|Any CPU + {93931385-3360-455F-B051-CF7B56C0ED17}.Staging|x64.Build.0 = Debug|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -1540,6 +1578,7 @@ Global {B1A2C3D4-E5F6-7890-ABCD-EF1234567890} = {D43D1D6B-66A9-4A57-9EA3-8DECC92FA583} {C2D3E4F5-A6B7-8901-BCDE-F12345678901} = {D43D1D6B-66A9-4A57-9EA3-8DECC92FA583} {D3E4F5A6-B7C8-9012-CDEF-012345678902} = {F06D475C-635C-4DE4-82BA-C49A90BA8FCD} + {93931385-3360-455F-B051-CF7B56C0ED17} = {89331D76-C527-479D-8F30-8033A04C625F} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {156116FF-243E-45E8-8717-DB72E95F56AF} diff --git a/Workers/Resgrid.Workers.Console/Resgrid.Workers.Console.csproj b/Workers/Resgrid.Workers.Console/Resgrid.Workers.Console.csproj index 679fcfce7..07b207feb 100644 --- a/Workers/Resgrid.Workers.Console/Resgrid.Workers.Console.csproj +++ b/Workers/Resgrid.Workers.Console/Resgrid.Workers.Console.csproj @@ -29,7 +29,6 @@ - @@ -58,6 +57,7 @@ + diff --git a/Workers/Support/Quidjibo.SqlServer/Configurations/SqlServerQuidjiboConfiguration.cs b/Workers/Support/Quidjibo.SqlServer/Configurations/SqlServerQuidjiboConfiguration.cs new file mode 100644 index 000000000..1c06d67e5 --- /dev/null +++ b/Workers/Support/Quidjibo.SqlServer/Configurations/SqlServerQuidjiboConfiguration.cs @@ -0,0 +1,48 @@ +// // Copyright (c) smiggleworth. All rights reserved. +// // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using Quidjibo.Configurations; +using Quidjibo.Constants; + +namespace Quidjibo.SqlServer.Configurations +{ + public class SqlServerQuidjiboConfiguration : IQuidjiboConfiguration + { + public int PollingInterval { get; set; } = 10; + public string ConnectionString { get; set; } + + /// + /// The number of days to keep completed/faulted work items. + /// + public int DaysToKeep { get; set; } = 3; + + public int BatchSize { get; set; } = 5; + + /// + public int? WorkPollingInterval { get; set; } + + /// + public bool EnableScheduler { get; set; } = true; + + /// + public int? SchedulePollingInterval { get; set; } + + /// + public bool EnableWorker { get; set; } = true; + + /// + public bool SingleLoop { get; set; } = true; + + /// + public int LockInterval { get; set; } = 30; + + /// + public int MaxAttempts { get; set; } = 5; + + /// + public int Throttle { get; set; } = 10; + + /// + public string[] Queues { get; set; } = Default.Queues; + } +} \ No newline at end of file diff --git a/Workers/Support/Quidjibo.SqlServer/Extensions/QuidjiboBuilderExtensions.cs b/Workers/Support/Quidjibo.SqlServer/Extensions/QuidjiboBuilderExtensions.cs new file mode 100644 index 000000000..a6256bca6 --- /dev/null +++ b/Workers/Support/Quidjibo.SqlServer/Extensions/QuidjiboBuilderExtensions.cs @@ -0,0 +1,95 @@ +using System; +using Quidjibo.Constants; +using Quidjibo.SqlServer.Configurations; +using Quidjibo.SqlServer.Factories; + +namespace Quidjibo.SqlServer.Extensions +{ + public static class QuidjiboBuilderExtensions + { + /// + /// Use Sql Server for Work, Progress, an Scheduled Jobs + /// + /// + /// + /// + public static QuidjiboBuilder UseSqlServer(this QuidjiboBuilder builder, Action sqlServerQuidjiboConfiguration) + { + var config = new SqlServerQuidjiboConfiguration(); + sqlServerQuidjiboConfiguration(config); + return builder.UseSqlServer(config); + } + + /// + /// Use Sql Server for Work, Progress, and Scheduled Jobs + /// + /// + /// + /// + public static QuidjiboBuilder UseSqlServer(this QuidjiboBuilder builder, SqlServerQuidjiboConfiguration sqlServerQuidjiboConfiguration) + { + return builder.Configure(sqlServerQuidjiboConfiguration) + .ConfigureWorkProviderFactory(new SqlWorkProviderFactory(builder.LoggerFactory, sqlServerQuidjiboConfiguration)) + .ConfigureProgressProviderFactory(new SqlProgressProviderFactory(builder.LoggerFactory, sqlServerQuidjiboConfiguration.ConnectionString)) + .ConfigureScheduleProviderFactory(new SqlScheduleProviderFactory(builder.LoggerFactory, sqlServerQuidjiboConfiguration.ConnectionString)); + } + + /// + /// Use Sql Server for Work, Progress, and Scheduled Jobs + /// + /// + /// + /// + /// + public static QuidjiboBuilder UseSqlServer(this QuidjiboBuilder builder, string connectionString, params string[] queues) + { + if (queues == null || queues.Length == 0) + { + queues = Default.Queues; + } + + var config = new SqlServerQuidjiboConfiguration + { + ConnectionString = connectionString, + Queues = queues + }; + return builder.Configure(config) + .ConfigureWorkProviderFactory(new SqlWorkProviderFactory(builder.LoggerFactory, config)) + .ConfigureProgressProviderFactory(new SqlProgressProviderFactory(builder.LoggerFactory, connectionString)) + .ConfigureScheduleProviderFactory(new SqlScheduleProviderFactory(builder.LoggerFactory, connectionString)); + } + + /// + /// Use Sql Server For Work + /// + /// + /// + /// + public static QuidjiboBuilder UseSqlServerForWork(this QuidjiboBuilder builder, SqlServerQuidjiboConfiguration sqlServerQuidjiboConfiguration) + { + return builder.ConfigureWorkProviderFactory(new SqlWorkProviderFactory(builder.LoggerFactory, sqlServerQuidjiboConfiguration)); + } + + /// + /// Use Sql Server For Progress + /// + /// + /// + /// + public static QuidjiboBuilder UseSqlServerForProgress(this QuidjiboBuilder builder, string connectionString) + { + return builder.ConfigureProgressProviderFactory(new SqlProgressProviderFactory(builder.LoggerFactory, connectionString)); + } + + /// + /// Use Sql Server For Scheduled Jobs + /// + /// + /// + /// + public static QuidjiboBuilder UseSqlServerForSchedule(this QuidjiboBuilder builder, string connectionString) + { + return builder.ConfigureScheduleProviderFactory(new SqlScheduleProviderFactory(builder.LoggerFactory, connectionString)); + } + } +} \ No newline at end of file diff --git a/Workers/Support/Quidjibo.SqlServer/Extensions/SqlCommandExtensions.cs b/Workers/Support/Quidjibo.SqlServer/Extensions/SqlCommandExtensions.cs new file mode 100644 index 000000000..01062c6b5 --- /dev/null +++ b/Workers/Support/Quidjibo.SqlServer/Extensions/SqlCommandExtensions.cs @@ -0,0 +1,71 @@ +// Copyright (c) smiggleworth. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using Microsoft.Data.SqlClient; +using System.Threading; +using System.Threading.Tasks; +using Quidjibo.Models; +using Quidjibo.SqlServer.Providers; +using Quidjibo.SqlServer.Utils; + +namespace Quidjibo.SqlServer.Extensions +{ + public static class SqlCommandExtensions + { + public static async Task PrepareForSendAsync(this SqlCommand cmd, WorkItem item, int delay, CancellationToken cancellationToken) + { + var createdOn = DateTime.UtcNow; + var visibleOn = createdOn.AddSeconds(delay); + var expireOn = item.ExpireOn == default(DateTime) ? visibleOn.AddDays(SqlWorkProvider.DEFAULT_EXPIRE_DAYS) : item.ExpireOn; + +#pragma warning disable CA2100 // Review SQL queries for security vulnerabilities + cmd.CommandText = await SqlLoader.GetScript("Work.Send"); +#pragma warning restore CA2100 // Review SQL queries for security vulnerabilities + cmd.AddParameter("@Id", item.Id); + cmd.AddParameter("@ScheduleId", item.ScheduleId); + cmd.AddParameter("@CorrelationId", item.CorrelationId); + cmd.AddParameter("@Name", item.Name); + cmd.AddParameter("@Worker", item.Worker); + cmd.AddParameter("@Queue", item.Queue); + cmd.AddParameter("@Attempts", item.Attempts); + cmd.AddParameter("@CreatedOn", createdOn); + cmd.AddParameter("@ExpireOn", expireOn); + cmd.AddParameter("@VisibleOn", visibleOn); + cmd.AddParameter("@Status", SqlWorkProvider.StatusFlags.New); + cmd.AddParameter("@Payload", item.Payload); + } + + public static async Task PrepareForRenewAsync(this SqlCommand cmd, WorkItem item, DateTime lockExpireOn, CancellationToken cancellationToken) + { +#pragma warning disable CA2100 // Review SQL queries for security vulnerabilities + cmd.CommandText = await SqlLoader.GetScript("Work.Renew"); +#pragma warning restore CA2100 // Review SQL queries for security vulnerabilities + cmd.AddParameter("@Id", item.Id); + cmd.AddParameter("@VisibleOn", lockExpireOn); + } + + public static async Task PrepareForCompleteAsync(this SqlCommand cmd, WorkItem item, CancellationToken cancellationToken) + { +#pragma warning disable CA2100 // Review SQL queries for security vulnerabilities + cmd.CommandText = await SqlLoader.GetScript("Work.Complete"); +#pragma warning restore CA2100 // Review SQL queries for security vulnerabilities + cmd.AddParameter("@Id", item.Id); + cmd.AddParameter("@Complete", SqlWorkProvider.StatusFlags.Complete); + } + + public static async Task PrepareForFaultAsync(this SqlCommand cmd, WorkItem item, int visibilityTimeout, CancellationToken cancellationToken) + { + var faultedOn = DateTime.UtcNow; + +#pragma warning disable CA2100 // Review SQL queries for security vulnerabilities + cmd.CommandText = await SqlLoader.GetScript("Work.Fault"); +#pragma warning restore CA2100 // Review SQL queries for security vulnerabilities + cmd.AddParameter("@Id", item.Id); + cmd.AddParameter("@VisibleOn", faultedOn.AddSeconds(Math.Max(visibilityTimeout, 30))); + cmd.AddParameter("@Faulted", SqlWorkProvider.StatusFlags.Faulted); + } + + } + +} \ No newline at end of file diff --git a/Workers/Support/Quidjibo.SqlServer/Extensions/SqlParameterCollectionExtensions.cs b/Workers/Support/Quidjibo.SqlServer/Extensions/SqlParameterCollectionExtensions.cs new file mode 100644 index 000000000..96dc0a90c --- /dev/null +++ b/Workers/Support/Quidjibo.SqlServer/Extensions/SqlParameterCollectionExtensions.cs @@ -0,0 +1,13 @@ +using System; +using Microsoft.Data.SqlClient; + +namespace Quidjibo.SqlServer.Extensions +{ + public static class SqlParameterCollectionExtensions + { + public static SqlParameter AddParameter(this SqlCommand cmd, string name, object value) + { + return cmd.Parameters.AddWithValue(name, value ?? DBNull.Value); + } + } +} \ No newline at end of file diff --git a/Workers/Support/Quidjibo.SqlServer/Factories/SqlProgressProviderFactory.cs b/Workers/Support/Quidjibo.SqlServer/Factories/SqlProgressProviderFactory.cs new file mode 100644 index 000000000..f3bc6fc44 --- /dev/null +++ b/Workers/Support/Quidjibo.SqlServer/Factories/SqlProgressProviderFactory.cs @@ -0,0 +1,59 @@ +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; +using Quidjibo.Factories; +using Quidjibo.Providers; +using Quidjibo.SqlServer.Providers; +using Quidjibo.SqlServer.Utils; + +namespace Quidjibo.SqlServer.Factories +{ + public class SqlProgressProviderFactory : IProgressProviderFactory + { + private static readonly SemaphoreSlim SyncLock = new SemaphoreSlim(1, 1); + private readonly string _connectionString; + + private readonly ILoggerFactory _loggerFactory; + private IProgressProvider _provider; + + public SqlProgressProviderFactory( + ILoggerFactory loggerFactory, + string connectionString) + { + _loggerFactory = loggerFactory; + _connectionString = connectionString; + } + + public Task CreateAsync(string queues, CancellationToken cancellationToken = default(CancellationToken)) + { + return CreateAsync(queues.Split(','), cancellationToken); + } + + public async Task CreateAsync(string[] queues, CancellationToken cancellationToken = default(CancellationToken)) + { + if (_provider != null) + { + return _provider; + } + + try + { + await SyncLock.WaitAsync(cancellationToken); + await SqlRunner.ExecuteAsync(async cmd => + { + cmd.CommandText = await SqlLoader.GetScript("Progress.Setup"); + await cmd.ExecuteNonQueryAsync(cancellationToken); + }, _connectionString, false, cancellationToken); + + _provider = new SqlProgressProvider( + _loggerFactory.CreateLogger(), + _connectionString); + return _provider; + } + finally + { + SyncLock.Release(); + } + } + } +} \ No newline at end of file diff --git a/Workers/Support/Quidjibo.SqlServer/Factories/SqlScheduleProviderFactory.cs b/Workers/Support/Quidjibo.SqlServer/Factories/SqlScheduleProviderFactory.cs new file mode 100644 index 000000000..93211512f --- /dev/null +++ b/Workers/Support/Quidjibo.SqlServer/Factories/SqlScheduleProviderFactory.cs @@ -0,0 +1,54 @@ +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; +using Quidjibo.Factories; +using Quidjibo.Providers; +using Quidjibo.SqlServer.Providers; +using Quidjibo.SqlServer.Utils; + +namespace Quidjibo.SqlServer.Factories +{ + public class SqlScheduleProviderFactory : IScheduleProviderFactory + { + private static readonly SemaphoreSlim SyncLock = new SemaphoreSlim(1, 1); + private readonly string _connectionString; + + private readonly ILoggerFactory _loggerFactory; + + public SqlScheduleProviderFactory( + ILoggerFactory loggerFactory, + string connectionString) + { + _loggerFactory = loggerFactory; + _connectionString = connectionString; + } + + public async Task CreateAsync(string queues, CancellationToken cancellationToken = default(CancellationToken)) + { + return await CreateAsync(queues.Split(','), cancellationToken); + } + + public async Task CreateAsync(string[] queues, CancellationToken cancellationToken = default(CancellationToken)) + { + try + { + await SyncLock.WaitAsync(cancellationToken); + await SqlRunner.ExecuteAsync(async cmd => + { + var schemaSetup = await SqlLoader.GetScript("Schema.Setup"); + var scheduleSetup = await SqlLoader.GetScript("Schedule.Setup"); + cmd.CommandText = $"{schemaSetup};\r\n{scheduleSetup}"; + await cmd.ExecuteNonQueryAsync(cancellationToken); + }, _connectionString, false, cancellationToken); + + return await Task.FromResult(new SqlScheduleProvider( + _loggerFactory.CreateLogger(), + _connectionString, queues)); + } + finally + { + SyncLock.Release(); + } + } + } +} \ No newline at end of file diff --git a/Workers/Support/Quidjibo.SqlServer/Factories/SqlWorkProviderFactory.cs b/Workers/Support/Quidjibo.SqlServer/Factories/SqlWorkProviderFactory.cs new file mode 100644 index 000000000..bde070207 --- /dev/null +++ b/Workers/Support/Quidjibo.SqlServer/Factories/SqlWorkProviderFactory.cs @@ -0,0 +1,63 @@ +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; +using Quidjibo.Factories; +using Quidjibo.Providers; +using Quidjibo.SqlServer.Configurations; +using Quidjibo.SqlServer.Providers; +using Quidjibo.SqlServer.Utils; + +namespace Quidjibo.SqlServer.Factories +{ + public class SqlWorkProviderFactory : IWorkProviderFactory + { + private static readonly SemaphoreSlim SyncLock = new SemaphoreSlim(1, 1); + private readonly ILoggerFactory _loggerFactory; + private readonly SqlServerQuidjiboConfiguration _sqlServerQuidjiboConfiguration; + private bool _initialized; + + public SqlWorkProviderFactory( + ILoggerFactory loggerFactory, + SqlServerQuidjiboConfiguration sqlServerQuidjiboConfiguration) + { + _loggerFactory = loggerFactory; + _sqlServerQuidjiboConfiguration = sqlServerQuidjiboConfiguration; + } + + public Task CreateAsync(string queues, CancellationToken cancellationToken = default(CancellationToken)) + { + return CreateAsync(queues.Split(','), cancellationToken); + } + + public async Task CreateAsync(string[] queues, CancellationToken cancellationToken = default(CancellationToken)) + { + try + { + await SyncLock.WaitAsync(cancellationToken); + if (!_initialized) + { + await SqlRunner.ExecuteAsync(async cmd => + { + var schemaSetup = await SqlLoader.GetScript("Schema.Setup"); + var workSetup = await SqlLoader.GetScript("Work.Setup"); + cmd.CommandText = $"{schemaSetup};\r\n{workSetup}"; + await cmd.ExecuteNonQueryAsync(cancellationToken); + }, _sqlServerQuidjiboConfiguration.ConnectionString, false, cancellationToken); + _initialized = true; + } + + return new SqlWorkProvider( + _loggerFactory.CreateLogger(), + _sqlServerQuidjiboConfiguration.ConnectionString, + queues, + _sqlServerQuidjiboConfiguration.LockInterval, + _sqlServerQuidjiboConfiguration.BatchSize, + _sqlServerQuidjiboConfiguration.DaysToKeep); + } + finally + { + SyncLock.Release(); + } + } + } +} \ No newline at end of file diff --git a/Workers/Support/Quidjibo.SqlServer/Providers/SqlProgressProvider.cs b/Workers/Support/Quidjibo.SqlServer/Providers/SqlProgressProvider.cs new file mode 100644 index 000000000..35e96bd2c --- /dev/null +++ b/Workers/Support/Quidjibo.SqlServer/Providers/SqlProgressProvider.cs @@ -0,0 +1,79 @@ +using System; +using System.Collections.Generic; +using Microsoft.Data.SqlClient; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; +using Quidjibo.Models; +using Quidjibo.Providers; +using Quidjibo.SqlServer.Extensions; +using Quidjibo.SqlServer.Utils; + +namespace Quidjibo.SqlServer.Providers +{ + public class SqlProgressProvider : IProgressProvider + { + private readonly string _connectionString; + private readonly ILogger _logger; + + public SqlProgressProvider( + ILogger logger, + string connectionString) + { + _logger = logger; + _connectionString = connectionString; + } + + public async Task ReportAsync(ProgressItem item, CancellationToken cancellationToken) + { + await ExecuteAsync(async cmd => + { + cmd.CommandText = await SqlLoader.GetScript("Progress.Create"); + cmd.AddParameter("@Id", item.Id); + cmd.AddParameter("@WorkId", item.WorkId); + cmd.AddParameter("@CorrelationId", item.CorrelationId); + cmd.AddParameter("@Name", item.Name); + cmd.AddParameter("@Queue", item.Queue); + cmd.AddParameter("@RecordedOn", item.RecordedOn); + cmd.AddParameter("@Value", item.Value); + cmd.AddParameter("@Note", item.Note); + await cmd.ExecuteNonQueryAsync(cancellationToken); + }, cancellationToken); + await Task.CompletedTask; + } + + public async Task> LoadByCorrelationIdAsync(Guid correlationId, CancellationToken cancellationToken) + { + var items = new List(); + await ExecuteAsync(async cmd => + { + cmd.CommandText = await SqlLoader.GetScript("Progress.LoadByCorrelationId"); + cmd.AddParameter("@CorrelationId", correlationId); + using (var rdr = await cmd.ExecuteReaderAsync(cancellationToken)) + { + while (await rdr.ReadAsync(cancellationToken)) + { + var workItem = new ProgressItem + { + CorrelationId = rdr.Map(nameof(ProgressItem.CorrelationId)), + Id = rdr.Map(nameof(ProgressItem.Id)), + Name = rdr.Map(nameof(ProgressItem.Name)), + Note = rdr.Map(nameof(ProgressItem.Note)), + Queue = rdr.Map(nameof(ProgressItem.Queue)), + RecordedOn = rdr.Map(nameof(ProgressItem.RecordedOn)), + Value = rdr.Map(nameof(ProgressItem.Value)), + WorkId = rdr.Map(nameof(ProgressItem.WorkId)) + }; + items.Add(workItem); + } + } + }, cancellationToken); + return items; + } + + private Task ExecuteAsync(Func func, CancellationToken cancellationToken) + { + return SqlRunner.ExecuteAsync(func, _connectionString, true, cancellationToken); + } + } +} \ No newline at end of file diff --git a/Workers/Support/Quidjibo.SqlServer/Providers/SqlScheduleProvider.cs b/Workers/Support/Quidjibo.SqlServer/Providers/SqlScheduleProvider.cs new file mode 100644 index 000000000..feec4411b --- /dev/null +++ b/Workers/Support/Quidjibo.SqlServer/Providers/SqlScheduleProvider.cs @@ -0,0 +1,164 @@ +using System; +using System.Collections.Generic; +using Microsoft.Data.SqlClient; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; +using Quidjibo.Models; +using Quidjibo.Providers; +using Quidjibo.SqlServer.Extensions; +using Quidjibo.SqlServer.Utils; + +namespace Quidjibo.SqlServer.Providers +{ + public class SqlScheduleProvider : IScheduleProvider + { + private readonly string _connectionString; + private readonly ILogger _logger; + private readonly string[] _queues; + + private string _receiveSql; + + public SqlScheduleProvider( + ILogger logger, + string connectionString, + string[] queues) + { + _logger = logger; + _connectionString = connectionString; + _queues = queues; + } + + public async Task> ReceiveAsync(CancellationToken cancellationToken = default(CancellationToken)) + { + var receiveOn = DateTime.UtcNow; + + if (_receiveSql == null) + { + _receiveSql = await SqlLoader.GetScript("Schedule.Receive"); + if (_queues.Length > 0) + { + _receiveSql = _receiveSql.Replace("@Queue1", + string.Join(",", _queues.Select((x, i) => $"@Queue{i}"))); + } + } + + var items = new List(100); + await ExecuteAsync(async cmd => + { + cmd.CommandText = _receiveSql; + cmd.AddParameter("@Take", 100); + cmd.AddParameter("@ReceiveOn", receiveOn); + cmd.AddParameter("@VisibleOn", receiveOn.AddSeconds(60)); + + // dynamic parameters + _queues.Select((q, i) => cmd.Parameters.AddWithValue($"@Queue{i}", q)).ToList(); + using (var rdr = await cmd.ExecuteReaderAsync(cancellationToken)) + { + while (await rdr.ReadAsync(cancellationToken)) + { + var item = MapScheduleItem(rdr); + items.Add(item); + } + } + }, cancellationToken); + return items; + } + + public async Task CompleteAsync(ScheduleItem item, CancellationToken cancellationToken = default(CancellationToken)) + { + await ExecuteAsync(async cmd => + { + cmd.CommandText = await SqlLoader.GetScript("Schedule.Complete"); + cmd.AddParameter("@Id", item.Id); + cmd.AddParameter("@EnqueueOn", item.EnqueueOn); + cmd.AddParameter("@EnqueuedOn", item.EnqueuedOn); + await cmd.ExecuteNonQueryAsync(cancellationToken); + }, cancellationToken); + } + + public async Task CreateAsync(ScheduleItem item, CancellationToken cancellationToken = default(CancellationToken)) + { + await ExecuteAsync(async cmd => + { + cmd.CommandText = await SqlLoader.GetScript("Schedule.Create"); + cmd.AddParameter("@Id", item.Id); + cmd.AddParameter("@Name", item.Name); + cmd.AddParameter("@Queue", item.Queue); + cmd.AddParameter("@CronExpression", item.CronExpression); + cmd.AddParameter("@CreatedOn", item.CreatedOn); + cmd.AddParameter("@EnqueuedOn", item.EnqueuedOn); + cmd.AddParameter("@EnqueueOn", item.EnqueueOn); + cmd.AddParameter("@VisibleOn", item.VisibleOn); + cmd.AddParameter("@Payload", item.Payload); + await cmd.ExecuteNonQueryAsync(cancellationToken); + }, cancellationToken); + } + + public async Task LoadByNameAsync(string name, CancellationToken cancellationToken = default(CancellationToken)) + { + var item = default(ScheduleItem); + await ExecuteAsync(async cmd => + { + cmd.CommandText = await SqlLoader.GetScript("Schedule.LoadByName"); + cmd.AddParameter("@Name", name); + using (var rdr = await cmd.ExecuteReaderAsync(cancellationToken)) + { + if (await rdr.ReadAsync(cancellationToken)) + { + item = MapScheduleItem(rdr); + } + } + }, cancellationToken); + return item; + } + + public Task UpdateAsync(ScheduleItem item, CancellationToken cancellationToken = default(CancellationToken)) + { + throw new NotImplementedException(); + } + + public async Task DeleteAsync(Guid id, CancellationToken cancellationToken = default(CancellationToken)) + { + await ExecuteAsync(async cmd => + { + cmd.CommandText = await SqlLoader.GetScript("Schedule.Delete"); + cmd.AddParameter("@Id", id); + await cmd.ExecuteNonQueryAsync(cancellationToken); + }, cancellationToken); + } + + public async Task ExistsAsync(string name, CancellationToken cancellationToken = default(CancellationToken)) + { + var count = 0; + await ExecuteAsync(async cmd => + { + cmd.CommandText = await SqlLoader.GetScript("Schedule.Exists"); + cmd.AddParameter("@Name", name); + count = (int)await cmd.ExecuteScalarAsync(cancellationToken); + }, cancellationToken); + return count > 0; + } + + private ScheduleItem MapScheduleItem(SqlDataReader rdr) + { + return new ScheduleItem + { + CreatedOn = rdr.Map(nameof(ScheduleItem.CreatedOn)), + CronExpression = rdr.Map(nameof(ScheduleItem.CronExpression)), + EnqueuedOn = rdr.Map(nameof(ScheduleItem.EnqueuedOn)), + EnqueueOn = rdr.Map(nameof(ScheduleItem.EnqueueOn)), + Id = rdr.Map(nameof(ScheduleItem.Id)), + Name = rdr.Map(nameof(ScheduleItem.Name)), + Payload = rdr.Map(nameof(ScheduleItem.Payload)), + Queue = rdr.Map(nameof(ScheduleItem.Queue)) + }; + } + + private Task ExecuteAsync(Func func, CancellationToken cancellationToken) + { + return SqlRunner.ExecuteAsync(func, _connectionString, true, cancellationToken); + } + } +} \ No newline at end of file diff --git a/Workers/Support/Quidjibo.SqlServer/Providers/SqlWorkProvider.cs b/Workers/Support/Quidjibo.SqlServer/Providers/SqlWorkProvider.cs new file mode 100644 index 000000000..edc79f5c3 --- /dev/null +++ b/Workers/Support/Quidjibo.SqlServer/Providers/SqlWorkProvider.cs @@ -0,0 +1,160 @@ +// // Copyright (c) smiggleworth. All rights reserved. +// // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using Microsoft.Data.SqlClient; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; +using Quidjibo.Models; +using Quidjibo.Providers; +using Quidjibo.SqlServer.Extensions; +using Quidjibo.SqlServer.Utils; + +namespace Quidjibo.SqlServer.Providers +{ + public class SqlWorkProvider : IWorkProvider + { + public enum StatusFlags + { + Faulted = -1, + New = 0, + InFlight = 1, + Complete = 2 + } + + public const int DEFAULT_EXPIRE_DAYS = 7; + + private readonly int _batchSize; + private readonly string _connectionString; + private readonly int _daysToKeep; + private readonly ILogger _logger; + private readonly int _maxAttempts; + private readonly string[] _queues; + private readonly int _visibilityTimeout; + private string _receiveSql; + + public SqlWorkProvider( + ILogger logger, + string connectionString, + string[] queues, + int visibilityTimeout, + int batchSize, + int daysToKeep + ) + { + _logger = logger; + _queues = queues; + _visibilityTimeout = visibilityTimeout; + _batchSize = batchSize; + _maxAttempts = 10; + _connectionString = connectionString; + _daysToKeep = Math.Abs(daysToKeep); + } + + public async Task SendAsync(WorkItem item, int delay, CancellationToken cancellationToken) + { + await ExecuteAsync(async cmd => + { + await cmd.PrepareForSendAsync(item, delay, cancellationToken); + await cmd.ExecuteNonQueryAsync(cancellationToken); + }, cancellationToken).ConfigureAwait(false); + } + + public async Task> ReceiveAsync(string worker, CancellationToken cancellationToken) + { + var receiveOn = DateTime.UtcNow; + var deleteOn = _daysToKeep > 0 ? receiveOn.AddDays(-_daysToKeep) : receiveOn.AddHours(-1); + + if (_receiveSql == null) + { + _receiveSql = await SqlLoader.GetScript("Work.Receive"); + if (_queues.Length > 0) + { + _receiveSql = _receiveSql.Replace("@Queue1", + string.Join(",", _queues.Select((x, i) => $"@Queue{i}"))); + } + } + + var workItems = new List(_batchSize); + await ExecuteAsync(async cmd => + { + cmd.CommandText = _receiveSql; + cmd.AddParameter("@Worker", worker); + cmd.AddParameter("@Take", _batchSize); + cmd.AddParameter("@InFlight", StatusFlags.InFlight); + cmd.AddParameter("@VisibleOn", receiveOn.AddSeconds(Math.Max(_visibilityTimeout, 30))); + cmd.AddParameter("@ReceiveOn", receiveOn); + cmd.AddParameter("@MaxAttempts", _maxAttempts); + cmd.AddParameter("@DeleteOn", deleteOn); + + // dynamic parameters + for (var i = 0; i < _queues.Length; i++) + { + cmd.Parameters.AddWithValue($"@Queue{i}", _queues[i]); + } + + using (var rdr = await cmd.ExecuteReaderAsync(cancellationToken)) + { + while (await rdr.ReadAsync(cancellationToken)) + { + var workItem = new WorkItem + { + Attempts = rdr.Map(nameof(WorkItem.Attempts)), + CorrelationId = rdr.Map(nameof(WorkItem.CorrelationId)), + ExpireOn = rdr.Map(nameof(WorkItem.ExpireOn)), + Id = rdr.Map(nameof(WorkItem.Id)), + Name = rdr.Map(nameof(WorkItem.Name)), + Payload = rdr.Map(nameof(WorkItem.Payload)), + Queue = rdr.Map(nameof(WorkItem.Queue)), + ScheduleId = rdr.Map(nameof(WorkItem.ScheduleId)) + }; + workItem.Token = workItem.Id.ToString(); + workItems.Add(workItem); + } + } + }, cancellationToken); + return workItems; + } + + public async Task RenewAsync(WorkItem item, CancellationToken cancellationToken) + { + var lockExpireOn = (item.VisibleOn ?? DateTime.UtcNow).AddSeconds(Math.Max(_visibilityTimeout, 30)); + await ExecuteAsync(async cmd => + { + await cmd.PrepareForRenewAsync(item, lockExpireOn, cancellationToken); + await cmd.ExecuteNonQueryAsync(cancellationToken); + }, cancellationToken); + return lockExpireOn; + } + + public async Task CompleteAsync(WorkItem item, CancellationToken cancellationToken) + { + await ExecuteAsync(async cmd => + { + await cmd.PrepareForCompleteAsync(item, cancellationToken); + await cmd.ExecuteNonQueryAsync(cancellationToken); + }, cancellationToken); + } + + public async Task FaultAsync(WorkItem item, CancellationToken cancellationToken) + { + await ExecuteAsync(async cmd => + { + await cmd.PrepareForFaultAsync(item, _visibilityTimeout, cancellationToken); + await cmd.ExecuteNonQueryAsync(cancellationToken); + }, cancellationToken); + } + + public void Dispose() + { + } + + private Task ExecuteAsync(Func func, CancellationToken cancellationToken) + { + return SqlRunner.ExecuteAsync(func, _connectionString, true, cancellationToken); + } + } +} \ No newline at end of file diff --git a/Workers/Support/Quidjibo.SqlServer/Quidjibo.SqlServer.csproj b/Workers/Support/Quidjibo.SqlServer/Quidjibo.SqlServer.csproj new file mode 100644 index 000000000..0735562f0 --- /dev/null +++ b/Workers/Support/Quidjibo.SqlServer/Quidjibo.SqlServer.csproj @@ -0,0 +1,28 @@ + + + + net9.0 + Smiggleworth + Copyright © 2017 + Quidjibo delivers flexible async background jobs for .NET applications. Vendored SqlServer provider using Microsoft.Data.SqlClient (the legacy System.Data.SqlClient fails with a SqlGuidCaster TypeLoadException on .NET 8+). + https://quidjibo.com/ + https://github.com/smiggleworth/Quidjibo + Smiggleworth + async tasks workers cron + false + 1.0.0 + + + + + + + + + + + + + + + diff --git a/Workers/Support/Quidjibo.SqlServer/Scripts/Progress/Create.sql b/Workers/Support/Quidjibo.SqlServer/Scripts/Progress/Create.sql new file mode 100644 index 000000000..f8ac63068 --- /dev/null +++ b/Workers/Support/Quidjibo.SqlServer/Scripts/Progress/Create.sql @@ -0,0 +1,22 @@ +INSERT INTO [Quidjibo].[Progress] +( + [Id], + [WorkId], + [CorrelationId], + [Name], + [Queue], + [Note], + [Value], + [RecordedOn] +) +VALUES +( + @Id, + @WorkId, + @CorrelationId, + @Name, + @Queue, + @Note, + @Value, + @RecordedOn +); \ No newline at end of file diff --git a/Workers/Support/Quidjibo.SqlServer/Scripts/Progress/LoadByCorrelationId.sql b/Workers/Support/Quidjibo.SqlServer/Scripts/Progress/LoadByCorrelationId.sql new file mode 100644 index 000000000..0debae0ef --- /dev/null +++ b/Workers/Support/Quidjibo.SqlServer/Scripts/Progress/LoadByCorrelationId.sql @@ -0,0 +1,3 @@ +SELECT [Id], [WorkId], [CorrelationId], [Name], [Queue], [Note], [Value], [RecordedOn] +FROM [Quidjibo].[Progress] WITH (READPAST) +WHERE [CorrelationId] = @CorrelationId \ No newline at end of file diff --git a/Workers/Support/Quidjibo.SqlServer/Scripts/Progress/Setup.sql b/Workers/Support/Quidjibo.SqlServer/Scripts/Progress/Setup.sql new file mode 100644 index 000000000..72b161503 --- /dev/null +++ b/Workers/Support/Quidjibo.SqlServer/Scripts/Progress/Setup.sql @@ -0,0 +1,28 @@ +IF NOT EXISTS (SELECT * FROM sys.objects WHERE object_id = OBJECT_ID(N'[Quidjibo].[Progress]') AND type in (N'U')) +BEGIN +CREATE TABLE [Quidjibo].[Progress]( + [Id] [UNIQUEIDENTIFIER] NOT NULL, + [Sequence] [bigint] IDENTITY(1,1) NOT NULL, + [WorkId] [uniqueidentifier] NULL, + [CorrelationId] [uniqueidentifier] NOT NULL, + [Name] [nvarchar](250) NOT NULL, + [Queue] [nvarchar](250) NOT NULL, + [Note] [nvarchar](250) NOT NULL, + [Value] [int] NOT NULL, + [RecordedOn] [datetime] NULL, +CONSTRAINT [PK_Progress] PRIMARY KEY NONCLUSTERED +( + [Id] ASC +)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY], + CONSTRAINT [IX_Progress_Sequence] UNIQUE CLUSTERED +( + [Sequence] ASC +)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY] +) ON [PRIMARY] +END + +IF NOT EXISTS (SELECT * FROM sys.indexes WHERE object_id = OBJECT_ID(N'[Quidjibo].[Progress]') AND name = N'IX_Progress_CorrelationId') +CREATE NONCLUSTERED INDEX [IX_Progress_CorrelationId] ON [Quidjibo].[Progress] +( + [CorrelationId] ASC +)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, SORT_IN_TEMPDB = OFF, DROP_EXISTING = OFF, ONLINE = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY] diff --git a/Workers/Support/Quidjibo.SqlServer/Scripts/Schedule/Complete.sql b/Workers/Support/Quidjibo.SqlServer/Scripts/Schedule/Complete.sql new file mode 100644 index 000000000..d03fd9848 --- /dev/null +++ b/Workers/Support/Quidjibo.SqlServer/Scripts/Schedule/Complete.sql @@ -0,0 +1,5 @@ +UPDATE schdl +SET [EnqueuedOn] = @EnqueuedOn, + [EnqueueOn] = @EnqueueOn +FROM [Quidjibo].[Schedule] schdl WITH (ROWLOCK, READPAST, UPDLOCK) +WHERE [Id] = @Id \ No newline at end of file diff --git a/Workers/Support/Quidjibo.SqlServer/Scripts/Schedule/Create.sql b/Workers/Support/Quidjibo.SqlServer/Scripts/Schedule/Create.sql new file mode 100644 index 000000000..df588fc61 --- /dev/null +++ b/Workers/Support/Quidjibo.SqlServer/Scripts/Schedule/Create.sql @@ -0,0 +1,24 @@ +INSERT INTO [Quidjibo].[Schedule] +( + [Id], + [Name], + [Queue], + [CronExpression], + [CreatedOn], + [EnqueueOn], + [EnqueuedOn], + [VisibleOn], + [Payload] +) +VALUES +( + @Id, + @Name, + @Queue, + @CronExpression, + @CreatedOn, + @EnqueueOn, + @EnqueuedOn, + @VisibleOn, + @Payload +); \ No newline at end of file diff --git a/Workers/Support/Quidjibo.SqlServer/Scripts/Schedule/Delete.sql b/Workers/Support/Quidjibo.SqlServer/Scripts/Schedule/Delete.sql new file mode 100644 index 000000000..96806ed30 --- /dev/null +++ b/Workers/Support/Quidjibo.SqlServer/Scripts/Schedule/Delete.sql @@ -0,0 +1 @@ +DELETE FROM [Quidjibo].[Schedule] WHERE [Id] = @Id \ No newline at end of file diff --git a/Workers/Support/Quidjibo.SqlServer/Scripts/Schedule/Exists.sql b/Workers/Support/Quidjibo.SqlServer/Scripts/Schedule/Exists.sql new file mode 100644 index 000000000..506b4cb58 --- /dev/null +++ b/Workers/Support/Quidjibo.SqlServer/Scripts/Schedule/Exists.sql @@ -0,0 +1,3 @@ +SELECT COUNT(*) +FROM [Quidjibo].[Schedule] +WHERE [Name] = @Name \ No newline at end of file diff --git a/Workers/Support/Quidjibo.SqlServer/Scripts/Schedule/List.sql b/Workers/Support/Quidjibo.SqlServer/Scripts/Schedule/List.sql new file mode 100644 index 000000000..9a56d37f1 --- /dev/null +++ b/Workers/Support/Quidjibo.SqlServer/Scripts/Schedule/List.sql @@ -0,0 +1 @@ +SELECT TOP(@Take) [Quidjibo].[Schedule].* diff --git a/Workers/Support/Quidjibo.SqlServer/Scripts/Schedule/Load.sql b/Workers/Support/Quidjibo.SqlServer/Scripts/Schedule/Load.sql new file mode 100644 index 000000000..54d6d8225 --- /dev/null +++ b/Workers/Support/Quidjibo.SqlServer/Scripts/Schedule/Load.sql @@ -0,0 +1,3 @@ +SELECT * +FROM [Quidjibo].[Schedule] +WHERE [Id] = @Id \ No newline at end of file diff --git a/Workers/Support/Quidjibo.SqlServer/Scripts/Schedule/LoadByName.sql b/Workers/Support/Quidjibo.SqlServer/Scripts/Schedule/LoadByName.sql new file mode 100644 index 000000000..ed465b539 --- /dev/null +++ b/Workers/Support/Quidjibo.SqlServer/Scripts/Schedule/LoadByName.sql @@ -0,0 +1,3 @@ +SELECT * +FROM [Quidjibo].[Schedule] +WHERE [Name] = @Name \ No newline at end of file diff --git a/Workers/Support/Quidjibo.SqlServer/Scripts/Schedule/Receive.sql b/Workers/Support/Quidjibo.SqlServer/Scripts/Schedule/Receive.sql new file mode 100644 index 000000000..d02e7e827 --- /dev/null +++ b/Workers/Support/Quidjibo.SqlServer/Scripts/Schedule/Receive.sql @@ -0,0 +1,12 @@ +WITH schdl AS +( + SELECT TOP(@Take) [Quidjibo].[Schedule].* + FROM [Quidjibo].[Schedule] WITH (ROWLOCK, READPAST, UPDLOCK) + WHERE [VisibleOn] < @ReceiveOn + AND [EnqueueOn] < @ReceiveOn + AND [Queue] IN (@Queue1) + ORDER BY [Sequence] +) +UPDATE schdl +SET [VisibleOn] = @VisibleOn +OUTPUT inserted.* \ No newline at end of file diff --git a/Workers/Support/Quidjibo.SqlServer/Scripts/Schedule/Setup.sql b/Workers/Support/Quidjibo.SqlServer/Scripts/Schedule/Setup.sql new file mode 100644 index 000000000..e01f07429 --- /dev/null +++ b/Workers/Support/Quidjibo.SqlServer/Scripts/Schedule/Setup.sql @@ -0,0 +1,23 @@ +IF NOT EXISTS (SELECT * FROM sys.objects WHERE object_id = OBJECT_ID(N'[Quidjibo].[Schedule]') AND type in (N'U')) +BEGIN +CREATE TABLE [Quidjibo].[Schedule]( + [Id] [uniqueidentifier] NOT NULL, + [Sequence] [bigint] IDENTITY(1,1) NOT NULL, + [Name] [nvarchar](250) NOT NULL, + [Queue] [nvarchar](250) NOT NULL, + [CronExpression] [nvarchar](50) NOT NULL, + [CreatedOn] [datetime] NULL, + [EnqueueOn] [datetime] NULL, + [EnqueuedOn] [datetime] NULL, + [VisibleOn] [datetime] NULL, + [Payload] [varbinary](max) NOT NULL, + CONSTRAINT [PK_Schedule] PRIMARY KEY NONCLUSTERED +( + [Id] ASC +)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY], + CONSTRAINT [IX_Schedule_Sequence] UNIQUE CLUSTERED +( + [Sequence] ASC +)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY] +) ON [PRIMARY] TEXTIMAGE_ON [PRIMARY] +END \ No newline at end of file diff --git a/Workers/Support/Quidjibo.SqlServer/Scripts/Schema/Setup.sql b/Workers/Support/Quidjibo.SqlServer/Scripts/Schema/Setup.sql new file mode 100644 index 000000000..3691c8efe --- /dev/null +++ b/Workers/Support/Quidjibo.SqlServer/Scripts/Schema/Setup.sql @@ -0,0 +1,4 @@ +IF NOT EXISTS (SELECT * FROM sys.schemas WHERE name = N'Quidjibo') +BEGIN +EXEC sys.sp_executesql N'CREATE SCHEMA [Quidjibo]' +END \ No newline at end of file diff --git a/Workers/Support/Quidjibo.SqlServer/Scripts/Work/Complete.sql b/Workers/Support/Quidjibo.SqlServer/Scripts/Work/Complete.sql new file mode 100644 index 000000000..2209bf670 --- /dev/null +++ b/Workers/Support/Quidjibo.SqlServer/Scripts/Work/Complete.sql @@ -0,0 +1,6 @@ +UPDATE wrk +SET [ExpireOn] = NULL, + [VisibleOn] = Null, + [Status] = @Complete +FROM [Quidjibo].[Work] wrk WITH (ROWLOCK, READPAST, UPDLOCK) +WHERE [Id] = @Id \ No newline at end of file diff --git a/Workers/Support/Quidjibo.SqlServer/Scripts/Work/Fault.sql b/Workers/Support/Quidjibo.SqlServer/Scripts/Work/Fault.sql new file mode 100644 index 000000000..1bd71af48 --- /dev/null +++ b/Workers/Support/Quidjibo.SqlServer/Scripts/Work/Fault.sql @@ -0,0 +1,5 @@ +UPDATE wrk +SET [VisibleOn] = @VisibleOn, + [Status] = @Faulted +FROM [Quidjibo].[Work] wrk WITH (ROWLOCK, READPAST, UPDLOCK) +WHERE [Id] = @Id \ No newline at end of file diff --git a/Workers/Support/Quidjibo.SqlServer/Scripts/Work/List.sql b/Workers/Support/Quidjibo.SqlServer/Scripts/Work/List.sql new file mode 100644 index 000000000..c4ccd9cae --- /dev/null +++ b/Workers/Support/Quidjibo.SqlServer/Scripts/Work/List.sql @@ -0,0 +1 @@ +-- List of work items in all states \ No newline at end of file diff --git a/Workers/Support/Quidjibo.SqlServer/Scripts/Work/Receive.sql b/Workers/Support/Quidjibo.SqlServer/Scripts/Work/Receive.sql new file mode 100644 index 000000000..d720f802a --- /dev/null +++ b/Workers/Support/Quidjibo.SqlServer/Scripts/Work/Receive.sql @@ -0,0 +1,20 @@ +DELETE TOP(100) FROM [Quidjibo].[Work] WITH (ROWLOCK, READPAST, UPDLOCK) +WHERE [CreatedOn] < @DeleteOn + AND ([ExpireOn] IS NULL OR [ExpireOn] < @DeleteOn); + +WITH wrk AS +( + SELECT TOP (@Take) [Quidjibo].[Work].* + FROM [Quidjibo].[Work] WITH (ROWLOCK, READPAST, UPDLOCK) + WHERE [Status] < 2 + AND [Attempts] < @MaxAttempts + AND [VisibleOn] < @ReceiveOn + AND [Queue] IN (@Queue1) + ORDER BY [Sequence] +) +UPDATE wrk +SET [Worker] = @Worker, + [VisibleOn] = @VisibleOn, + [Status] = @InFlight, + [Attempts] = [Attempts] + 1 +OUTPUT inserted.* \ No newline at end of file diff --git a/Workers/Support/Quidjibo.SqlServer/Scripts/Work/Renew.sql b/Workers/Support/Quidjibo.SqlServer/Scripts/Work/Renew.sql new file mode 100644 index 000000000..31d04b0e5 --- /dev/null +++ b/Workers/Support/Quidjibo.SqlServer/Scripts/Work/Renew.sql @@ -0,0 +1,4 @@ +UPDATE wrk +SET [VisibleOn] = @VisibleOn +FROM [Quidjibo].[Work] wrk +WHERE [Id] = @Id \ No newline at end of file diff --git a/Workers/Support/Quidjibo.SqlServer/Scripts/Work/Send.sql b/Workers/Support/Quidjibo.SqlServer/Scripts/Work/Send.sql new file mode 100644 index 000000000..ef1bcb143 --- /dev/null +++ b/Workers/Support/Quidjibo.SqlServer/Scripts/Work/Send.sql @@ -0,0 +1,28 @@ +INSERT INTO [Quidjibo].[Work] +( + [Id], + [ScheduleId], + [CorrelationId], + [Name], + [Queue], + [Attempts], + [CreatedOn], + [ExpireOn], + [VisibleOn], + [Status], + [Payload] +) +VALUES +( + @Id, + @ScheduleId, + @CorrelationId, + @Name, + @Queue, + @Attempts, + @CreatedOn, + @ExpireOn, + @VisibleOn, + @Status, + @Payload +) \ No newline at end of file diff --git a/Workers/Support/Quidjibo.SqlServer/Scripts/Work/Setup.sql b/Workers/Support/Quidjibo.SqlServer/Scripts/Work/Setup.sql new file mode 100644 index 000000000..3dca3c58f --- /dev/null +++ b/Workers/Support/Quidjibo.SqlServer/Scripts/Work/Setup.sql @@ -0,0 +1,42 @@ +IF NOT EXISTS (SELECT * FROM sys.objects WHERE object_id = OBJECT_ID(N'[Quidjibo].[Work]') AND type in (N'U')) +BEGIN +CREATE TABLE [Quidjibo].[Work]( + [Id] [uniqueidentifier] NOT NULL, + [Sequence] [bigint] IDENTITY(1,1) NOT NULL, + [ScheduleId] [uniqueidentifier] NULL, + [CorrelationId] [uniqueidentifier] NOT NULL, + [Name] [nvarchar](250) NOT NULL, + [Worker] [nvarchar](250) NULL, + [Queue] [nvarchar](250) NOT NULL, + [Status] [int] NOT NULL, + [Attempts] [int] NOT NULL, + [CreatedOn] [datetime] NULL, + [ExpireOn] [datetime] NULL, + [VisibleOn] [datetime] NULL, + [Payload] [varbinary](max) NOT NULL + CONSTRAINT [PK_Work] PRIMARY KEY NONCLUSTERED +( + [Id] ASC +)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY], + CONSTRAINT [IX_Work_Sequence] UNIQUE CLUSTERED +( + [Sequence] ASC +)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY] +) ON [PRIMARY] +END + +IF NOT EXISTS (SELECT * FROM sys.indexes WHERE object_id = OBJECT_ID(N'[Quidjibo].[Work]') AND name = N'IX_Work_Receive') +CREATE NONCLUSTERED INDEX [IX_Work_Receive] ON [Quidjibo].[Work] +( + [Queue], + [Status], + [Attempts], + [VisibleOn] +)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, SORT_IN_TEMPDB = OFF, DROP_EXISTING = OFF, ONLINE = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY] + +IF NOT EXISTS (SELECT * FROM sys.indexes WHERE object_id = OBJECT_ID(N'[Quidjibo].[Work]') AND name = N'IX_Work_Cleanup') +CREATE NONCLUSTERED INDEX [IX_Work_Cleanup] ON [Quidjibo].[Work] +( + [CreatedOn] ASC, + [ExpireOn] ASC +)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, SORT_IN_TEMPDB = OFF, DROP_EXISTING = OFF, ONLINE = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY] diff --git a/Workers/Support/Quidjibo.SqlServer/Utils/SqlLoader.cs b/Workers/Support/Quidjibo.SqlServer/Utils/SqlLoader.cs new file mode 100644 index 000000000..d7ca6c05d --- /dev/null +++ b/Workers/Support/Quidjibo.SqlServer/Utils/SqlLoader.cs @@ -0,0 +1,31 @@ +using System.Collections.Concurrent; +using System.IO; +using System.Reflection; +using System.Threading.Tasks; + +namespace Quidjibo.SqlServer.Utils +{ + public class SqlLoader + { + private static readonly ConcurrentDictionary Scripts = new ConcurrentDictionary(); + + public static async Task GetScript(string scriptName) + { + if (Scripts.TryGetValue(scriptName, out var script)) + { + return script; + } + + var assembly = typeof(SqlLoader).GetTypeInfo().Assembly; + var fullName = $"Quidjibo.SqlServer.Scripts.{scriptName}.sql"; + using (var stream = assembly.GetManifestResourceStream(fullName) ?? new MemoryStream()) + using (var reader = new StreamReader(stream)) + { + script = await reader.ReadToEndAsync(); + Scripts.TryAdd(scriptName, script); + } + + return script; + } + } +} \ No newline at end of file diff --git a/Workers/Support/Quidjibo.SqlServer/Utils/SqlRunner.cs b/Workers/Support/Quidjibo.SqlServer/Utils/SqlRunner.cs new file mode 100644 index 000000000..16cfbc97e --- /dev/null +++ b/Workers/Support/Quidjibo.SqlServer/Utils/SqlRunner.cs @@ -0,0 +1,59 @@ +using System; +using Microsoft.Data.SqlClient; +using System.Threading; +using System.Threading.Tasks; + +namespace Quidjibo.SqlServer.Utils +{ + public static class SqlRunner + { + public static Task ExecuteAsync(Func func, string connectionString, CancellationToken cancellationToken) + { + return ExecuteAsync(func, connectionString, true, cancellationToken); + } + + public static async Task ExecuteAsync(Func func, string connectionString, bool inTransaction, CancellationToken cancellationToken) + { + using (var conn = new SqlConnection(connectionString)) + using (var cmd = conn.CreateCommand()) + { + { + await conn.OpenAsync(cancellationToken); + if (inTransaction) + { + using (var tran = conn.BeginTransaction()) + { + try + { + cmd.Transaction = tran; + await func(cmd); + tran.Commit(); + } + catch + { + tran.Rollback(); + throw; + } + } + + return; + } + + await func(cmd); + } + } + } + + + public static T Map(this SqlDataReader rdr, string name) + { + var val = rdr[name]; + if (val == DBNull.Value) + { + return default(T); + } + + return (T)val; + } + } +} \ No newline at end of file From f1cce4eb007ca594534c436247ab5036bf210ebe Mon Sep 17 00:00:00 2001 From: Shawn Jackson Date: Mon, 29 Jun 2026 16:30:56 -0500 Subject: [PATCH 2/2] RE1-T122 PR#419 fixes --- .../Factories/PostgresProgressProviderFactory.cs | 2 +- .../Factories/PostgresScheduleProviderFactory.cs | 2 +- .../Factories/PostgresWorkProviderFactory.cs | 2 +- .../Factories/SqlProgressProviderFactory.cs | 2 +- .../Factories/SqlScheduleProviderFactory.cs | 2 +- .../Factories/SqlWorkProviderFactory.cs | 2 +- .../Quidjibo.SqlServer/Scripts/Schedule/Complete.sql | 2 +- .../Quidjibo.SqlServer/Scripts/Work/Complete.sql | 2 +- .../Support/Quidjibo.SqlServer/Utils/SqlLoader.cs | 11 +++++++++-- .../Support/Quidjibo.SqlServer/Utils/SqlRunner.cs | 12 +++++++++++- 10 files changed, 28 insertions(+), 11 deletions(-) diff --git a/Workers/Support/Quidjibo.Postgres/Factories/PostgresProgressProviderFactory.cs b/Workers/Support/Quidjibo.Postgres/Factories/PostgresProgressProviderFactory.cs index cc6af12b3..ff650aa9f 100644 --- a/Workers/Support/Quidjibo.Postgres/Factories/PostgresProgressProviderFactory.cs +++ b/Workers/Support/Quidjibo.Postgres/Factories/PostgresProgressProviderFactory.cs @@ -36,9 +36,9 @@ public PostgresProgressProviderFactory( return _provider; } + await SyncLock.WaitAsync(cancellationToken); try { - await SyncLock.WaitAsync(cancellationToken); await PostgresRunner.ExecuteAsync(async cmd => { cmd.CommandText = await SqlLoader.GetScript("Progress.Setup"); diff --git a/Workers/Support/Quidjibo.Postgres/Factories/PostgresScheduleProviderFactory.cs b/Workers/Support/Quidjibo.Postgres/Factories/PostgresScheduleProviderFactory.cs index 6ac56412c..7a43fd1dc 100644 --- a/Workers/Support/Quidjibo.Postgres/Factories/PostgresScheduleProviderFactory.cs +++ b/Workers/Support/Quidjibo.Postgres/Factories/PostgresScheduleProviderFactory.cs @@ -30,9 +30,9 @@ public PostgresScheduleProviderFactory( public async Task CreateAsync(string[] queues, CancellationToken cancellationToken = default(CancellationToken)) { + await SyncLock.WaitAsync(cancellationToken); try { - await SyncLock.WaitAsync(cancellationToken); await PostgresRunner.ExecuteAsync(async cmd => { var schemaSetup = await SqlLoader.GetScript("Schema.Setup"); diff --git a/Workers/Support/Quidjibo.Postgres/Factories/PostgresWorkProviderFactory.cs b/Workers/Support/Quidjibo.Postgres/Factories/PostgresWorkProviderFactory.cs index 824db74b9..fe56022c6 100644 --- a/Workers/Support/Quidjibo.Postgres/Factories/PostgresWorkProviderFactory.cs +++ b/Workers/Support/Quidjibo.Postgres/Factories/PostgresWorkProviderFactory.cs @@ -31,9 +31,9 @@ public PostgresWorkProviderFactory( public async Task CreateAsync(string[] queues, CancellationToken cancellationToken = default(CancellationToken)) { + await SyncLock.WaitAsync(cancellationToken); try { - await SyncLock.WaitAsync(cancellationToken); if (!_initialized) { await PostgresRunner.ExecuteAsync(async cmd => diff --git a/Workers/Support/Quidjibo.SqlServer/Factories/SqlProgressProviderFactory.cs b/Workers/Support/Quidjibo.SqlServer/Factories/SqlProgressProviderFactory.cs index f3bc6fc44..302d821c9 100644 --- a/Workers/Support/Quidjibo.SqlServer/Factories/SqlProgressProviderFactory.cs +++ b/Workers/Support/Quidjibo.SqlServer/Factories/SqlProgressProviderFactory.cs @@ -36,9 +36,9 @@ public SqlProgressProviderFactory( return _provider; } + await SyncLock.WaitAsync(cancellationToken); try { - await SyncLock.WaitAsync(cancellationToken); await SqlRunner.ExecuteAsync(async cmd => { cmd.CommandText = await SqlLoader.GetScript("Progress.Setup"); diff --git a/Workers/Support/Quidjibo.SqlServer/Factories/SqlScheduleProviderFactory.cs b/Workers/Support/Quidjibo.SqlServer/Factories/SqlScheduleProviderFactory.cs index 93211512f..72c78c72c 100644 --- a/Workers/Support/Quidjibo.SqlServer/Factories/SqlScheduleProviderFactory.cs +++ b/Workers/Support/Quidjibo.SqlServer/Factories/SqlScheduleProviderFactory.cs @@ -30,9 +30,9 @@ public SqlScheduleProviderFactory( public async Task CreateAsync(string[] queues, CancellationToken cancellationToken = default(CancellationToken)) { + await SyncLock.WaitAsync(cancellationToken); try { - await SyncLock.WaitAsync(cancellationToken); await SqlRunner.ExecuteAsync(async cmd => { var schemaSetup = await SqlLoader.GetScript("Schema.Setup"); diff --git a/Workers/Support/Quidjibo.SqlServer/Factories/SqlWorkProviderFactory.cs b/Workers/Support/Quidjibo.SqlServer/Factories/SqlWorkProviderFactory.cs index bde070207..87ad8fc33 100644 --- a/Workers/Support/Quidjibo.SqlServer/Factories/SqlWorkProviderFactory.cs +++ b/Workers/Support/Quidjibo.SqlServer/Factories/SqlWorkProviderFactory.cs @@ -31,9 +31,9 @@ public SqlWorkProviderFactory( public async Task CreateAsync(string[] queues, CancellationToken cancellationToken = default(CancellationToken)) { + await SyncLock.WaitAsync(cancellationToken); try { - await SyncLock.WaitAsync(cancellationToken); if (!_initialized) { await SqlRunner.ExecuteAsync(async cmd => diff --git a/Workers/Support/Quidjibo.SqlServer/Scripts/Schedule/Complete.sql b/Workers/Support/Quidjibo.SqlServer/Scripts/Schedule/Complete.sql index d03fd9848..0c6671d18 100644 --- a/Workers/Support/Quidjibo.SqlServer/Scripts/Schedule/Complete.sql +++ b/Workers/Support/Quidjibo.SqlServer/Scripts/Schedule/Complete.sql @@ -1,5 +1,5 @@ UPDATE schdl SET [EnqueuedOn] = @EnqueuedOn, [EnqueueOn] = @EnqueueOn -FROM [Quidjibo].[Schedule] schdl WITH (ROWLOCK, READPAST, UPDLOCK) +FROM [Quidjibo].[Schedule] schdl WITH (ROWLOCK, UPDLOCK) WHERE [Id] = @Id \ No newline at end of file diff --git a/Workers/Support/Quidjibo.SqlServer/Scripts/Work/Complete.sql b/Workers/Support/Quidjibo.SqlServer/Scripts/Work/Complete.sql index 2209bf670..568f5b5be 100644 --- a/Workers/Support/Quidjibo.SqlServer/Scripts/Work/Complete.sql +++ b/Workers/Support/Quidjibo.SqlServer/Scripts/Work/Complete.sql @@ -2,5 +2,5 @@ SET [ExpireOn] = NULL, [VisibleOn] = Null, [Status] = @Complete -FROM [Quidjibo].[Work] wrk WITH (ROWLOCK, READPAST, UPDLOCK) +FROM [Quidjibo].[Work] wrk WITH (ROWLOCK, UPDLOCK) WHERE [Id] = @Id \ No newline at end of file diff --git a/Workers/Support/Quidjibo.SqlServer/Utils/SqlLoader.cs b/Workers/Support/Quidjibo.SqlServer/Utils/SqlLoader.cs index d7ca6c05d..532c14bd3 100644 --- a/Workers/Support/Quidjibo.SqlServer/Utils/SqlLoader.cs +++ b/Workers/Support/Quidjibo.SqlServer/Utils/SqlLoader.cs @@ -1,4 +1,5 @@ -using System.Collections.Concurrent; +using System; +using System.Collections.Concurrent; using System.IO; using System.Reflection; using System.Threading.Tasks; @@ -18,7 +19,13 @@ public static async Task GetScript(string scriptName) var assembly = typeof(SqlLoader).GetTypeInfo().Assembly; var fullName = $"Quidjibo.SqlServer.Scripts.{scriptName}.sql"; - using (var stream = assembly.GetManifestResourceStream(fullName) ?? new MemoryStream()) + var stream = assembly.GetManifestResourceStream(fullName); + if (stream == null) + { + throw new InvalidOperationException($"Embedded SQL script '{fullName}' was not found in assembly '{assembly.GetName().Name}'."); + } + + using (stream) using (var reader = new StreamReader(stream)) { script = await reader.ReadToEndAsync(); diff --git a/Workers/Support/Quidjibo.SqlServer/Utils/SqlRunner.cs b/Workers/Support/Quidjibo.SqlServer/Utils/SqlRunner.cs index 16cfbc97e..e821bbc26 100644 --- a/Workers/Support/Quidjibo.SqlServer/Utils/SqlRunner.cs +++ b/Workers/Support/Quidjibo.SqlServer/Utils/SqlRunner.cs @@ -31,7 +31,17 @@ public static async Task ExecuteAsync(Func func, string connec } catch { - tran.Rollback(); + try + { + tran.Rollback(); + } + catch + { + // Rollback can fail if the transaction/connection is already + // aborted; swallow that secondary error so the original + // exception below is preserved rather than masked. + } + throw; } }