From 7535e33c1f1d13ddfb90fc4f140c9e80368860ce Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Tue, 16 Jun 2026 14:21:42 -0700 Subject: [PATCH 1/7] =?UTF-8?q?improvement(mothership):=20user=5Ftable=20s?= =?UTF-8?q?peed=20parity=20=E2=80=94=20limit=20bounds,=20async=20import/de?= =?UTF-8?q?lete/update=20jobs?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - query_rows / filter ops clamp limit to the contract maxes; query_rows skips execution metadata. - import_file / create_from_file (large CSV/TSV) and delete_rows_by_filter (>1000 unbounded matches) dispatch background table jobs, claiming the per-table job slot; inline paths claim the slot too. - update_rows_by_filter now escalates the same way: >1000 unbounded matches run as a background table job (new 'update' job type + runTableUpdate worker + tableUpdateTask), so a broad update on a huge table no longer loads every row into the request. Best-effort/non-atomic and skips workflow recompute (documented); unique-column patches stay inline. Pagination is limit/offset. Co-Authored-By: Claude Fable 5 --- apps/sim/background/table-update.ts | 38 ++ .../lib/copilot/generated/tool-catalog-v1.ts | 5 +- .../lib/copilot/generated/tool-schemas-v1.ts | 6 +- .../tools/server/table/user-table.test.ts | 472 +++++++++++++++- .../copilot/tools/server/table/user-table.ts | 524 +++++++++++++++--- apps/sim/lib/table/events.ts | 2 +- apps/sim/lib/table/import-runner.test.ts | 117 ++++ apps/sim/lib/table/import-runner.ts | 18 +- apps/sim/lib/table/rows/ordering.ts | 79 +++ apps/sim/lib/table/types.ts | 18 +- apps/sim/lib/table/update-runner.test.ts | 202 +++++++ apps/sim/lib/table/update-runner.ts | 193 +++++++ 12 files changed, 1579 insertions(+), 95 deletions(-) create mode 100644 apps/sim/background/table-update.ts create mode 100644 apps/sim/lib/table/import-runner.test.ts create mode 100644 apps/sim/lib/table/update-runner.test.ts create mode 100644 apps/sim/lib/table/update-runner.ts diff --git a/apps/sim/background/table-update.ts b/apps/sim/background/table-update.ts new file mode 100644 index 00000000000..412c241c535 --- /dev/null +++ b/apps/sim/background/table-update.ts @@ -0,0 +1,38 @@ +import { task } from '@trigger.dev/sdk' +import { + markTableUpdateFailed, + runTableUpdate, + type TableUpdatePayload, +} from '@/lib/table/update-runner' + +/** + * `TableUpdatePayload` with the cutoff as an ISO string — task payloads cross a JSON boundary, so + * the Date is rehydrated in `run` rather than trusting payload serialization. + */ +export interface TableUpdateTaskPayload extends Omit { + cutoff: string +} + +/** + * Trigger.dev wrapper around `runTableUpdate`. Errors propagate out of `run` so the retry policy + * fires; the job is marked failed only in `onFailure`, after the final attempt. Retry-safe: the + * worker keysets by id with a `created_at <= cutoff` floor and the JSONB-merge patch is idempotent + * (re-applying the same patch to an already-patched row is a no-op), so a retried attempt re-walks + * and re-applies whatever remains. The `table_jobs` ownership gate stops a retried run that lost + * the job within one page. + */ +export const tableUpdateTask = task({ + id: 'table-update', + machine: 'small-1x', + retry: { maxAttempts: 3 }, + queue: { + name: 'table-update', + concurrencyLimit: 10, + }, + run: async (payload: TableUpdateTaskPayload) => { + await runTableUpdate({ ...payload, cutoff: new Date(payload.cutoff) }) + }, + onFailure: async ({ payload, error }) => { + await markTableUpdateFailed(payload.tableId, payload.jobId, error) + }, +}) diff --git a/apps/sim/lib/copilot/generated/tool-catalog-v1.ts b/apps/sim/lib/copilot/generated/tool-catalog-v1.ts index ae4296db336..56d3c137a6c 100644 --- a/apps/sim/lib/copilot/generated/tool-catalog-v1.ts +++ b/apps/sim/lib/copilot/generated/tool-catalog-v1.ts @@ -3958,7 +3958,8 @@ export const UserTable: ToolCatalogEntry = { }, limit: { type: 'number', - description: 'Maximum rows to return or affect (optional, default 100)', + description: + 'Maximum rows to return or affect (optional; default 100, max 1000). For delete_rows_by_filter and update_rows_by_filter, omitting it lets matches above 1000 run as a background job.', }, mapping: { type: 'object', @@ -4011,7 +4012,7 @@ export const UserTable: ToolCatalogEntry = { }, offset: { type: 'number', - description: 'Number of rows to skip (optional for query_rows, default 0)', + description: 'Number of rows to skip for query_rows pagination (optional, default 0).', }, outputColumnNames: { type: 'object', diff --git a/apps/sim/lib/copilot/generated/tool-schemas-v1.ts b/apps/sim/lib/copilot/generated/tool-schemas-v1.ts index 5ed8d9224d6..107d033bd79 100644 --- a/apps/sim/lib/copilot/generated/tool-schemas-v1.ts +++ b/apps/sim/lib/copilot/generated/tool-schemas-v1.ts @@ -3686,7 +3686,8 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, limit: { type: 'number', - description: 'Maximum rows to return or affect (optional, default 100)', + description: + 'Maximum rows to return or affect (optional; default 100, max 1000). For delete_rows_by_filter and update_rows_by_filter, omitting it lets matches above 1000 run as a background job.', }, mapping: { type: 'object', @@ -3745,7 +3746,8 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, offset: { type: 'number', - description: 'Number of rows to skip (optional for query_rows, default 0)', + description: + 'Number of rows to skip for query_rows pagination (optional, default 0).', }, outputColumnNames: { type: 'object', diff --git a/apps/sim/lib/copilot/tools/server/table/user-table.test.ts b/apps/sim/lib/copilot/tools/server/table/user-table.test.ts index 7d90e9c00a3..b13d44aa6c4 100644 --- a/apps/sim/lib/copilot/tools/server/table/user-table.test.ts +++ b/apps/sim/lib/copilot/tools/server/table/user-table.test.ts @@ -15,6 +15,14 @@ const { mockCreateTable, mockDeleteTable, mockGetWorkspaceTableLimits, + mockMarkTableJobRunning, + mockReleaseJobClaim, + mockQueryRows, + mockDeleteRowsByFilter, + mockUpdateRowsByFilter, + mockRunTableImport, + mockRunTableDelete, + mockRunTableUpdate, fakeEnrichment, } = vi.hoisted(() => ({ mockResolveWorkspaceFileReference: vi.fn(), @@ -26,6 +34,14 @@ const { mockCreateTable: vi.fn(), mockDeleteTable: vi.fn(), mockGetWorkspaceTableLimits: vi.fn(), + mockMarkTableJobRunning: vi.fn(), + mockReleaseJobClaim: vi.fn(), + mockQueryRows: vi.fn(), + mockDeleteRowsByFilter: vi.fn(), + mockUpdateRowsByFilter: vi.fn(), + mockRunTableImport: vi.fn(), + mockRunTableDelete: vi.fn(), + mockRunTableUpdate: vi.fn(), fakeEnrichment: { id: 'work-email', name: 'Work Email', @@ -83,14 +99,33 @@ vi.mock('@/lib/table/rows/service', () => ({ batchInsertRows: mockBatchInsertRows, batchUpdateRows: vi.fn(), deleteRow: vi.fn(), - deleteRowsByFilter: vi.fn(), + deleteRowsByFilter: mockDeleteRowsByFilter, deleteRowsByIds: vi.fn(), getRowById: vi.fn(), insertRow: vi.fn(), - queryRows: vi.fn(), + queryRows: mockQueryRows, replaceTableRows: mockReplaceTableRows, updateRow: vi.fn(), - updateRowsByFilter: vi.fn(), + updateRowsByFilter: mockUpdateRowsByFilter, +})) + +vi.mock('@/lib/table/jobs/service', () => ({ + markTableJobRunning: mockMarkTableJobRunning, + releaseJobClaim: mockReleaseJobClaim, +})) + +vi.mock('@/lib/table/import-runner', () => ({ + runTableImport: mockRunTableImport, +})) + +vi.mock('@/lib/table/delete-runner', () => ({ + markTableDeleteFailed: vi.fn(), + runTableDelete: mockRunTableDelete, +})) + +vi.mock('@/lib/table/update-runner', () => ({ + markTableUpdateFailed: vi.fn(), + runTableUpdate: mockRunTableUpdate, })) vi.mock('@/lib/table/billing', () => ({ @@ -122,15 +157,25 @@ function buildTable(overrides: Partial = {}): TableDefinition { } } +/** Lets a runDetached microtask chain run before asserting on the work it dispatched. */ +async function flushDetached(): Promise { + await Promise.resolve() + await Promise.resolve() +} + describe('userTableServerTool.import_file', () => { beforeEach(() => { vi.clearAllMocks() mockResolveWorkspaceFileReference.mockResolvedValue({ name: 'people.csv', type: 'text/csv', + key: 'workspace/workspace-1/people.csv', + size: 100, }) mockDownloadWorkspaceFile.mockResolvedValue(Buffer.from('name,age\nAlice,30\nBob,40')) mockGetTableById.mockResolvedValue(buildTable()) + mockMarkTableJobRunning.mockResolvedValue(true) + mockReleaseJobClaim.mockResolvedValue(undefined) mockBatchInsertRows.mockImplementation(async (data: { rows: unknown[] }) => data.rows.map((_, i) => ({ id: `row_${i}` })) ) @@ -253,12 +298,95 @@ describe('userTableServerTool.import_file', () => { expect(result.message).toMatch(/missing required columns/i) expect(mockBatchInsertRows).not.toHaveBeenCalled() }) + + it('claims and releases the table job slot around an inline import', async () => { + const result = await userTableServerTool.execute( + { operation: 'import_file', args: { tableId: 'tbl_1', fileId: 'file-1' } }, + { userId: 'user-1', workspaceId: 'workspace-1' } + ) + + expect(result.success).toBe(true) + expect(mockMarkTableJobRunning).toHaveBeenCalledWith('tbl_1', expect.any(String), 'import') + expect(mockReleaseJobClaim).toHaveBeenCalledWith( + 'tbl_1', + mockMarkTableJobRunning.mock.calls[0][1] + ) + }) + + it('rejects an inline import while another job holds the table slot', async () => { + mockMarkTableJobRunning.mockResolvedValueOnce(false) + const result = await userTableServerTool.execute( + { operation: 'import_file', args: { tableId: 'tbl_1', fileId: 'file-1' } }, + { userId: 'user-1', workspaceId: 'workspace-1' } + ) + + expect(result.success).toBe(false) + expect(result.message).toMatch(/job is already in progress/i) + expect(mockBatchInsertRows).not.toHaveBeenCalled() + expect(mockReleaseJobClaim).not.toHaveBeenCalled() + }) + + it('dispatches a background import for large CSV files', async () => { + mockResolveWorkspaceFileReference.mockResolvedValueOnce({ + name: 'big.csv', + type: 'text/csv', + key: 'workspace/workspace-1/big.csv', + size: 9 * 1024 * 1024, + }) + + const result = await userTableServerTool.execute( + { operation: 'import_file', args: { tableId: 'tbl_1', fileId: 'file-1', mode: 'replace' } }, + { userId: 'user-1', workspaceId: 'workspace-1' } + ) + await flushDetached() + + expect(result.success).toBe(true) + expect(result.data?.jobId).toBeDefined() + expect(result.message).toMatch(/background/i) + expect(mockMarkTableJobRunning).toHaveBeenCalledWith('tbl_1', expect.any(String), 'import') + expect(mockBatchInsertRows).not.toHaveBeenCalled() + expect(mockReplaceTableRows).not.toHaveBeenCalled() + expect(mockDownloadWorkspaceFile).not.toHaveBeenCalled() + expect(mockRunTableImport).toHaveBeenCalledTimes(1) + expect(mockRunTableImport.mock.calls[0][0]).toMatchObject({ + tableId: 'tbl_1', + workspaceId: 'workspace-1', + fileKey: 'workspace/workspace-1/big.csv', + mode: 'replace', + deleteSourceFile: false, + }) + }) + + it('rejects a background import while another job holds the table slot', async () => { + mockResolveWorkspaceFileReference.mockResolvedValueOnce({ + name: 'big.csv', + type: 'text/csv', + key: 'workspace/workspace-1/big.csv', + size: 9 * 1024 * 1024, + }) + mockMarkTableJobRunning.mockResolvedValueOnce(false) + + const result = await userTableServerTool.execute( + { operation: 'import_file', args: { tableId: 'tbl_1', fileId: 'file-1' } }, + { userId: 'user-1', workspaceId: 'workspace-1' } + ) + await flushDetached() + + expect(result.success).toBe(false) + expect(result.message).toMatch(/job is already in progress/i) + expect(mockRunTableImport).not.toHaveBeenCalled() + }) }) describe('userTableServerTool.create_from_file', () => { beforeEach(() => { vi.clearAllMocks() - mockResolveWorkspaceFileReference.mockResolvedValue({ name: 'people.csv', type: 'text/csv' }) + mockResolveWorkspaceFileReference.mockResolvedValue({ + name: 'people.csv', + type: 'text/csv', + key: 'workspace/workspace-1/people.csv', + size: 100, + }) mockDownloadWorkspaceFile.mockResolvedValue(Buffer.from('name,age\nAlice,30\nBob,40')) mockGetWorkspaceTableLimits.mockResolvedValue({ maxRowsPerTable: 1000, maxTables: 3 }) mockCreateTable.mockResolvedValue(buildTable({ id: 'tbl_new', name: 'people' })) @@ -313,6 +441,40 @@ describe('userTableServerTool.create_from_file', () => { expect(result.message).toMatch(/rolled back/i) expect(result.message).toMatch(/must be unique/i) }) + + it('creates a placeholder table and dispatches a background import for large CSV files', async () => { + mockResolveWorkspaceFileReference.mockResolvedValueOnce({ + name: 'big.csv', + type: 'text/csv', + key: 'workspace/workspace-1/big.csv', + size: 9 * 1024 * 1024, + }) + + const result = await userTableServerTool.execute( + { operation: 'create_from_file', args: { fileId: 'file-1' } }, + { userId: 'user-1', workspaceId: 'workspace-1' } + ) + await flushDetached() + + expect(result.success).toBe(true) + expect(result.data?.tableId).toBe('tbl_new') + expect(result.data?.jobId).toBeDefined() + expect(mockDownloadWorkspaceFile).not.toHaveBeenCalled() + expect(mockBatchInsertRows).not.toHaveBeenCalled() + const createArgs = mockCreateTable.mock.calls[0][0] as Record + expect(createArgs).toMatchObject({ + jobStatus: 'running', + jobType: 'import', + jobId: result.data?.jobId, + }) + expect(mockRunTableImport).toHaveBeenCalledTimes(1) + expect(mockRunTableImport.mock.calls[0][0]).toMatchObject({ + tableId: 'tbl_new', + mode: 'create', + fileKey: 'workspace/workspace-1/big.csv', + deleteSourceFile: false, + }) + }) }) describe('userTableServerTool.create', () => { @@ -506,3 +668,305 @@ describe('userTableServerTool.add_enrichment', () => { expect(mockAddWorkflowGroup).not.toHaveBeenCalled() }) }) + +describe('userTableServerTool.query_rows', () => { + const queryRow = (i: number) => ({ + id: `row_${i}`, + data: { name: `r${i}` }, + executions: {}, + position: i, + orderKey: `a${i}`, + createdAt: new Date('2024-01-01'), + updatedAt: new Date('2024-01-01'), + }) + + beforeEach(() => { + vi.clearAllMocks() + mockGetTableById.mockResolvedValue(buildTable()) + mockQueryRows.mockResolvedValue({ + rows: [queryRow(1), queryRow(2)], + rowCount: 2, + totalCount: 10, + limit: 2, + offset: 0, + }) + }) + + it('rejects limits above MAX_QUERY_LIMIT', async () => { + const result = await userTableServerTool.execute( + { operation: 'query_rows', args: { tableId: 'tbl_1', limit: 100000 } }, + { userId: 'user-1', workspaceId: 'workspace-1' } + ) + + expect(result.success).toBe(false) + expect(result.message).toBe('Limit cannot exceed 1000') + expect(mockQueryRows).not.toHaveBeenCalled() + }) + + it('queries without execution metadata and passes limit/offset through', async () => { + const result = await userTableServerTool.execute( + { operation: 'query_rows', args: { tableId: 'tbl_1', limit: 2, offset: 10 } }, + { userId: 'user-1', workspaceId: 'workspace-1' } + ) + + expect(result.success).toBe(true) + const options = mockQueryRows.mock.calls[0][1] as Record + expect(options.withExecutions).toBe(false) + expect(options.offset).toBe(10) + expect(result.data?.nextCursor).toBeUndefined() + }) +}) + +describe('userTableServerTool.delete_rows_by_filter', () => { + beforeEach(() => { + vi.clearAllMocks() + mockGetTableById.mockResolvedValue(buildTable({ rowCount: 50000, maxRows: 100000 })) + mockMarkTableJobRunning.mockResolvedValue(true) + mockDeleteRowsByFilter.mockResolvedValue({ affectedCount: 5, affectedRowIds: ['r1'] }) + mockQueryRows.mockResolvedValue({ + rows: [], + rowCount: 0, + totalCount: 5, + limit: 1, + offset: 0, + }) + }) + + it('rejects limits above MAX_BULK_OPERATION_SIZE', async () => { + const result = await userTableServerTool.execute( + { + operation: 'delete_rows_by_filter', + args: { tableId: 'tbl_1', filter: { name: 'x' }, limit: 5000 }, + }, + { userId: 'user-1', workspaceId: 'workspace-1' } + ) + + expect(result.success).toBe(false) + expect(result.message).toBe('Limit cannot exceed 1000') + expect(mockDeleteRowsByFilter).not.toHaveBeenCalled() + }) + + it('deletes inline when the unbounded match count is within the cap', async () => { + const result = await userTableServerTool.execute( + { operation: 'delete_rows_by_filter', args: { tableId: 'tbl_1', filter: { name: 'x' } } }, + { userId: 'user-1', workspaceId: 'workspace-1' } + ) + + expect(result.success).toBe(true) + expect(result.data?.affectedCount).toBe(5) + expect(mockDeleteRowsByFilter).toHaveBeenCalledTimes(1) + // Inline delete still claims (and releases) the table's write-job slot. + expect(mockMarkTableJobRunning).toHaveBeenCalledWith('tbl_1', expect.any(String), 'delete') + expect(mockReleaseJobClaim).toHaveBeenCalled() + }) + + it('rejects an inline delete while another job holds the table slot', async () => { + mockMarkTableJobRunning.mockResolvedValueOnce(false) + + const result = await userTableServerTool.execute( + { + operation: 'delete_rows_by_filter', + args: { tableId: 'tbl_1', filter: { name: 'x' }, limit: 100 }, + }, + { userId: 'user-1', workspaceId: 'workspace-1' } + ) + + expect(result.success).toBe(false) + expect(result.message).toMatch(/job is already in progress/i) + expect(mockDeleteRowsByFilter).not.toHaveBeenCalled() + }) + + it('dispatches a background delete when the unbounded match count exceeds the cap', async () => { + mockQueryRows.mockResolvedValueOnce({ + rows: [], + rowCount: 0, + totalCount: 20000, + limit: 1, + offset: 0, + }) + + const result = await userTableServerTool.execute( + { operation: 'delete_rows_by_filter', args: { tableId: 'tbl_1', filter: { name: 'x' } } }, + { userId: 'user-1', workspaceId: 'workspace-1' } + ) + await flushDetached() + + expect(result.success).toBe(true) + expect(result.data?.jobId).toBeDefined() + expect(result.data?.doomedCount).toBe(20000) + expect(mockDeleteRowsByFilter).not.toHaveBeenCalled() + const [tableId, jobId, type, payload] = mockMarkTableJobRunning.mock.calls[0] + expect(tableId).toBe('tbl_1') + expect(type).toBe('delete') + expect(payload).toMatchObject({ doomedCount: 20000, cutoff: expect.any(String) }) + expect(mockRunTableDelete).toHaveBeenCalledTimes(1) + expect(mockRunTableDelete.mock.calls[0][0]).toMatchObject({ + jobId, + tableId: 'tbl_1', + workspaceId: 'workspace-1', + cutoff: expect.any(Date), + }) + }) + + it('rejects a background delete while another job holds the table slot', async () => { + mockQueryRows.mockResolvedValueOnce({ + rows: [], + rowCount: 0, + totalCount: 20000, + limit: 1, + offset: 0, + }) + mockMarkTableJobRunning.mockResolvedValueOnce(false) + + const result = await userTableServerTool.execute( + { operation: 'delete_rows_by_filter', args: { tableId: 'tbl_1', filter: { name: 'x' } } }, + { userId: 'user-1', workspaceId: 'workspace-1' } + ) + + expect(result.success).toBe(false) + expect(result.message).toMatch(/job is already in progress/i) + expect(mockDeleteRowsByFilter).not.toHaveBeenCalled() + expect(mockRunTableDelete).not.toHaveBeenCalled() + }) + + it('deletes inline with an explicit limit without counting first', async () => { + const result = await userTableServerTool.execute( + { + operation: 'delete_rows_by_filter', + args: { tableId: 'tbl_1', filter: { name: 'x' }, limit: 100 }, + }, + { userId: 'user-1', workspaceId: 'workspace-1' } + ) + + expect(result.success).toBe(true) + expect(mockQueryRows).not.toHaveBeenCalled() + expect(mockDeleteRowsByFilter).toHaveBeenCalledTimes(1) + }) +}) + +describe('userTableServerTool.update_rows_by_filter', () => { + beforeEach(() => { + vi.clearAllMocks() + mockGetTableById.mockResolvedValue(buildTable()) + mockMarkTableJobRunning.mockResolvedValue(true) + mockUpdateRowsByFilter.mockResolvedValue({ affectedCount: 5, affectedRowIds: ['r1'] }) + mockQueryRows.mockResolvedValue({ rows: [], rowCount: 0, totalCount: 5, limit: 1, offset: 0 }) + }) + + it('rejects limits above MAX_BULK_OPERATION_SIZE', async () => { + const result = await userTableServerTool.execute( + { + operation: 'update_rows_by_filter', + args: { tableId: 'tbl_1', filter: { name: 'x' }, data: { age: 1 }, limit: 5000 }, + }, + { userId: 'user-1', workspaceId: 'workspace-1' } + ) + expect(result.success).toBe(false) + expect(result.message).toBe('Limit cannot exceed 1000') + expect(mockUpdateRowsByFilter).not.toHaveBeenCalled() + }) + + it('updates inline when the unbounded match count is within the cap', async () => { + const result = await userTableServerTool.execute( + { + operation: 'update_rows_by_filter', + args: { tableId: 'tbl_1', filter: { name: 'x' }, data: { age: 1 } }, + }, + { userId: 'user-1', workspaceId: 'workspace-1' } + ) + expect(result.success).toBe(true) + expect(result.data?.affectedCount).toBe(5) + expect(mockUpdateRowsByFilter).toHaveBeenCalledTimes(1) + expect(mockMarkTableJobRunning).not.toHaveBeenCalled() + }) + + it('dispatches a background update when the unbounded match count exceeds the cap', async () => { + mockQueryRows.mockResolvedValueOnce({ + rows: [], + rowCount: 0, + totalCount: 20000, + limit: 1, + offset: 0, + }) + const result = await userTableServerTool.execute( + { + operation: 'update_rows_by_filter', + args: { tableId: 'tbl_1', filter: { name: 'x' }, data: { age: 1 } }, + }, + { userId: 'user-1', workspaceId: 'workspace-1' } + ) + await flushDetached() + + expect(result.success).toBe(true) + expect(result.data?.jobId).toBeDefined() + expect(result.data?.affectedCount).toBe(20000) + expect(mockUpdateRowsByFilter).not.toHaveBeenCalled() + const [tableId, jobId, type, payload] = mockMarkTableJobRunning.mock.calls[0] + expect(tableId).toBe('tbl_1') + expect(type).toBe('update') + expect(payload).toMatchObject({ + affectedCount: 20000, + cutoff: expect.any(String), + data: { age: 1 }, + }) + expect(mockRunTableUpdate).toHaveBeenCalledTimes(1) + expect(mockRunTableUpdate.mock.calls[0][0]).toMatchObject({ + jobId, + tableId: 'tbl_1', + workspaceId: 'workspace-1', + cutoff: expect.any(Date), + }) + }) + + it('keeps a unique-column patch inline even when many rows match', async () => { + mockGetTableById.mockResolvedValue( + buildTable({ schema: { columns: [{ name: 'email', type: 'string', unique: true }] } }) + ) + const result = await userTableServerTool.execute( + { + operation: 'update_rows_by_filter', + args: { tableId: 'tbl_1', filter: { email: 'x' }, data: { email: 'y' } }, + }, + { userId: 'user-1', workspaceId: 'workspace-1' } + ) + expect(result.success).toBe(true) + expect(mockQueryRows).not.toHaveBeenCalled() + expect(mockMarkTableJobRunning).not.toHaveBeenCalled() + expect(mockUpdateRowsByFilter).toHaveBeenCalledTimes(1) + }) + + it('rejects a background update while another job holds the table slot', async () => { + mockQueryRows.mockResolvedValueOnce({ + rows: [], + rowCount: 0, + totalCount: 20000, + limit: 1, + offset: 0, + }) + mockMarkTableJobRunning.mockResolvedValueOnce(false) + const result = await userTableServerTool.execute( + { + operation: 'update_rows_by_filter', + args: { tableId: 'tbl_1', filter: { name: 'x' }, data: { age: 1 } }, + }, + { userId: 'user-1', workspaceId: 'workspace-1' } + ) + expect(result.success).toBe(false) + expect(result.message).toMatch(/job is already in progress/i) + expect(mockUpdateRowsByFilter).not.toHaveBeenCalled() + expect(mockRunTableUpdate).not.toHaveBeenCalled() + }) + + it('updates inline with an explicit limit without counting first', async () => { + const result = await userTableServerTool.execute( + { + operation: 'update_rows_by_filter', + args: { tableId: 'tbl_1', filter: { name: 'x' }, data: { age: 1 }, limit: 100 }, + }, + { userId: 'user-1', workspaceId: 'workspace-1' } + ) + expect(result.success).toBe(true) + expect(mockQueryRows).not.toHaveBeenCalled() + expect(mockUpdateRowsByFilter).toHaveBeenCalledTimes(1) + }) +}) diff --git a/apps/sim/lib/copilot/tools/server/table/user-table.ts b/apps/sim/lib/copilot/tools/server/table/user-table.ts index 7614dc63aea..5ddd58a016b 100644 --- a/apps/sim/lib/copilot/tools/server/table/user-table.ts +++ b/apps/sim/lib/copilot/tools/server/table/user-table.ts @@ -7,9 +7,12 @@ import { type BaseServerTool, type ServerToolContext, } from '@/lib/copilot/tools/server/base-tool' +import { isTriggerDevEnabled } from '@/lib/core/config/env-flags' +import { runDetached } from '@/lib/core/utils/background' import { buildAutoMapping, COLUMN_TYPES, + CSV_ASYNC_IMPORT_THRESHOLD_BYTES, CSV_MAX_BATCH_SIZE, type CsvHeaderMapping, CsvImportValidationError, @@ -17,6 +20,8 @@ import { getWorkspaceTableLimits, inferSchemaFromCsv, parseFileRows, + sanitizeName, + TABLE_LIMITS, validateMapping, } from '@/lib/table' import { @@ -36,6 +41,9 @@ import { updateColumnConstraints, updateColumnType, } from '@/lib/table/columns/service' +import { markTableDeleteFailed, runTableDelete } from '@/lib/table/delete-runner' +import { runTableImport, type TableImportPayload } from '@/lib/table/import-runner' +import { markTableJobRunning, releaseJobClaim } from '@/lib/table/jobs/service' import { batchInsertRows, batchUpdateRows, @@ -52,14 +60,18 @@ import { import { createTable, deleteTable, getTableById, renameTable } from '@/lib/table/service' import type { ColumnDefinition, + Filter, RowData, TableDefinition, + TableDeleteJobPayload, + TableUpdateJobPayload, WorkflowGroup, WorkflowGroupDependencies, WorkflowGroupDeploymentMode, WorkflowGroupInputMapping, WorkflowGroupOutput, } from '@/lib/table/types' +import { markTableUpdateFailed, runTableUpdate } from '@/lib/table/update-runner' import { cancelWorkflowGroupRuns, runWorkflowColumn } from '@/lib/table/workflow-columns' import { addWorkflowGroup, @@ -93,18 +105,126 @@ type UserTableResult = { const MAX_BATCH_SIZE = CSV_MAX_BATCH_SIZE -async function resolveWorkspaceFile( - fileReference: string, - workspaceId: string -): Promise<{ buffer: Buffer; name: string; type: string }> { +async function resolveWorkspaceFileRecordOrThrow(fileReference: string, workspaceId: string) { const record = await resolveWorkspaceFileReference(workspaceId, fileReference) if (!record) { throw new Error( `File not found: "${fileReference}". Use glob("files/**") and read the canonical file path metadata to find workspace files.` ) } - const buffer = await fetchWorkspaceFileBuffer(record) - return { buffer, name: record.name, type: record.type } + return record +} + +/** + * Whether a workspace file should import as a background job instead of inline: + * CSV/TSV at or above the same byte threshold the UI uses. Other formats + * (xlsx/json) aren't supported by the streaming import worker and stay inline. + */ +function shouldImportInBackground(record: { name: string; size: number }): boolean { + const ext = record.name.split('.').pop()?.toLowerCase() + return (ext === 'csv' || ext === 'tsv') && record.size >= CSV_ASYNC_IMPORT_THRESHOLD_BYTES +} + +/** + * Dispatches a background import for an already-claimed job slot, mirroring the + * import-async routes: trigger.dev when enabled (survives deploys, retries), + * detached in-process worker otherwise. A failed dispatch releases the claim so + * a ghost `running` job can't hold the table's one-write-job slot. + */ +async function dispatchImportJob(payload: TableImportPayload): Promise { + if (isTriggerDevEnabled) { + try { + const [{ tableImportTask }, { tasks }] = await Promise.all([ + import('@/background/table-import'), + import('@trigger.dev/sdk'), + ]) + await tasks.trigger('table-import', payload, { + tags: [`tableId:${payload.tableId}`, `jobId:${payload.importId}`], + }) + } catch (error) { + await releaseJobClaim(payload.tableId, payload.importId).catch(() => {}) + throw error + } + } else { + runDetached('table-import', () => runTableImport(payload)) + } +} + +/** + * Dispatches a background filter-delete for an already-claimed job slot, + * mirroring the delete-async route. Same release-on-failed-dispatch guard as + * {@link dispatchImportJob}. + */ +async function dispatchDeleteJob(params: { + jobId: string + tableId: string + workspaceId: string + filter: Filter + cutoff: Date +}): Promise { + const { jobId, tableId, workspaceId, filter, cutoff } = params + if (isTriggerDevEnabled) { + try { + const [{ tableDeleteTask }, { tasks }] = await Promise.all([ + import('@/background/table-delete'), + import('@trigger.dev/sdk'), + ]) + await tasks.trigger( + 'table-delete', + { jobId, tableId, workspaceId, filter, cutoff: cutoff.toISOString() }, + { tags: [`tableId:${tableId}`, `jobId:${jobId}`] } + ) + } catch (error) { + await releaseJobClaim(tableId, jobId).catch(() => {}) + throw error + } + } else { + runDetached('table-delete', () => + runTableDelete({ jobId, tableId, workspaceId, filter, cutoff }).catch(async (error) => { + await markTableDeleteFailed(tableId, jobId, error) + throw error + }) + ) + } +} + +/** + * Dispatches a background bulk update for an already-claimed job slot, mirroring + * {@link dispatchDeleteJob}: trigger.dev when enabled, detached worker otherwise, releasing the + * slot on a failed dispatch. + */ +async function dispatchUpdateJob(params: { + jobId: string + tableId: string + workspaceId: string + filter: Filter + data: RowData + cutoff: Date +}): Promise { + const { jobId, tableId, workspaceId, filter, data, cutoff } = params + if (isTriggerDevEnabled) { + try { + const [{ tableUpdateTask }, { tasks }] = await Promise.all([ + import('@/background/table-update'), + import('@trigger.dev/sdk'), + ]) + await tasks.trigger( + 'table-update', + { jobId, tableId, workspaceId, filter, data, cutoff: cutoff.toISOString() }, + { tags: [`tableId:${tableId}`, `jobId:${jobId}`] } + ) + } catch (error) { + await releaseJobClaim(tableId, jobId).catch(() => {}) + throw error + } + } else { + runDetached('table-update', () => + runTableUpdate({ jobId, tableId, workspaceId, filter, data, cutoff }).catch(async (error) => { + await markTableUpdateFailed(tableId, jobId, error) + throw error + }) + ) + } } /** @@ -159,6 +279,21 @@ function parseDeploymentMode(value: unknown): WorkflowGroupDeploymentMode | unde return value === 'live' || value === 'deployed' ? value : undefined } +/** + * Validates an optional row limit against the same bounds the HTTP contracts + * enforce. Returns an error message, or `null` when the limit is acceptable. + */ +function limitError(limit: unknown, max: number): string | null { + if (limit === undefined) return null + if (typeof limit !== 'number' || !Number.isInteger(limit) || limit < 1) { + return 'Limit must be an integer of at least 1' + } + if (limit > max) { + return `Limit cannot exceed ${max}` + } + return null +} + async function batchInsertAll( tableId: string, rows: RowData[], @@ -444,6 +579,11 @@ export const userTableServerTool: BaseServerTool return { success: false, message: 'Workspace ID is required' } } + const queryLimitError = limitError(args.limit, TABLE_LIMITS.MAX_QUERY_LIMIT) + if (queryLimitError) { + return { success: false, message: queryLimitError } + } + const table = await getTableById(args.tableId) if (!table || table.workspaceId !== workspaceId) { return { success: false, message: `Table not found: ${args.tableId}` } @@ -459,6 +599,7 @@ export const userTableServerTool: BaseServerTool sort: args.sort ? sortNamesToIds(args.sort, idByName) : undefined, limit: args.limit, offset: args.offset, + withExecutions: false, }, requestId ) @@ -559,6 +700,10 @@ export const userTableServerTool: BaseServerTool if (!workspaceId) { return { success: false, message: 'Workspace ID is required' } } + const updateLimitError = limitError(args.limit, TABLE_LIMITS.MAX_BULK_OPERATION_SIZE) + if (updateLimitError) { + return { success: false, message: updateLimitError } + } const table = await getTableById(args.tableId) if (!table || table.workspaceId !== workspaceId) { @@ -566,13 +711,61 @@ export const userTableServerTool: BaseServerTool } const requestId = generateId().slice(0, 8) - assertNotAborted() const idByName = buildIdByName(table.schema) + const idFilter = filterNamesToIds(args.filter, idByName) + const idData = rowDataNameToId(args.data, idByName) + + // Unbounded "update everything matching": measure the blast radius first and hand + // anything past the inline cap to the background update worker — same escalation as + // delete_rows_by_filter, so a broad update on a huge table doesn't load every matching + // row into this request. A patch touching a unique column stays inline (the service + // rejects bulk-setting a unique value across multiple rows). + const patchTouchesUnique = table.schema.columns.some( + (c) => c.unique === true && (c.id ?? c.name) in idData + ) + if (args.limit === undefined && !patchTouchesUnique) { + const { totalCount } = await queryRows( + table, + { filter: idFilter, limit: 1, withExecutions: false }, + requestId + ) + const matchCount = totalCount ?? 0 + if (matchCount > TABLE_LIMITS.MAX_BULK_OPERATION_SIZE) { + const cutoff = new Date() + const jobId = generateId() + const payload: TableUpdateJobPayload = { + filter: idFilter, + data: idData, + cutoff: cutoff.toISOString(), + affectedCount: matchCount, + } + assertNotAborted() + const claimed = await markTableJobRunning(table.id, jobId, 'update', payload) + if (!claimed) { + return { success: false, message: 'A job is already in progress for this table' } + } + await dispatchUpdateJob({ + jobId, + tableId: table.id, + workspaceId, + filter: idFilter, + data: idData, + cutoff, + }) + return { + success: true, + message: `Started background update of ${matchCount} matching rows (job ${jobId}). Rows update in the background — query_rows to check progress. Note: background updates don't auto-recompute workflow/enrichment columns; use run_column afterward if needed.`, + data: { jobId, affectedCount: matchCount }, + } + } + } + + assertNotAborted() const result = await updateRowsByFilter( table, { - filter: filterNamesToIds(args.filter, idByName), - data: rowDataNameToId(args.data, idByName), + filter: idFilter, + data: idData, limit: args.limit, actorUserId: context.userId, }, @@ -596,6 +789,10 @@ export const userTableServerTool: BaseServerTool if (!workspaceId) { return { success: false, message: 'Workspace ID is required' } } + const deleteLimitError = limitError(args.limit, TABLE_LIMITS.MAX_BULK_OPERATION_SIZE) + if (deleteLimitError) { + return { success: false, message: deleteLimitError } + } const table = await getTableById(args.tableId) if (!table || table.workspaceId !== workspaceId) { @@ -603,16 +800,69 @@ export const userTableServerTool: BaseServerTool } const requestId = generateId().slice(0, 8) - assertNotAborted() const idByName = buildIdByName(table.schema) - const result = await deleteRowsByFilter( - table, - { - filter: filterNamesToIds(args.filter, idByName), - limit: args.limit, - }, - requestId - ) + const idFilter = filterNamesToIds(args.filter, idByName) + + // Unbounded "delete everything matching": measure the blast radius + // first, and hand anything past the inline cap to the background + // delete worker (same path as the UI's select-all delete) instead of + // loading every matching row id into this request. + if (args.limit === undefined) { + const { totalCount } = await queryRows( + table, + { filter: idFilter, limit: 1, withExecutions: false }, + requestId + ) + const matchCount = totalCount ?? 0 + if (matchCount > TABLE_LIMITS.MAX_BULK_OPERATION_SIZE) { + const doomedCount = Math.min(matchCount, table.rowCount) + const cutoff = new Date() + const jobId = generateId() + const payload: TableDeleteJobPayload = { + filter: idFilter, + cutoff: cutoff.toISOString(), + doomedCount, + } + assertNotAborted() + const claimed = await markTableJobRunning(table.id, jobId, 'delete', payload) + if (!claimed) { + return { success: false, message: 'A job is already in progress for this table' } + } + await dispatchDeleteJob({ + jobId, + tableId: table.id, + workspaceId, + filter: idFilter, + cutoff, + }) + return { + success: true, + message: `Started background delete of ${doomedCount} matching rows (job ${jobId}). The rows are hidden from reads immediately — query_rows already reflects the post-delete view.`, + data: { jobId, doomedCount }, + } + } + } + + // Claim the table's one-write-job slot for the inline delete too, so it + // can't interleave with a running background import/delete. Mask-safe: a + // payload-less delete job is ignored by pendingDeleteMask, and the delete + // completes synchronously within this request before the slot is released. + assertNotAborted() + const inlineDeleteId = generateId() + const deleteClaimed = await markTableJobRunning(table.id, inlineDeleteId, 'delete') + if (!deleteClaimed) { + return { success: false, message: 'A job is already in progress for this table' } + } + let result: Awaited> + try { + result = await deleteRowsByFilter( + table, + { filter: idFilter, limit: args.limit }, + requestId + ) + } finally { + await releaseJobClaim(table.id, inlineDeleteId).catch(() => {}) + } return { success: true, @@ -741,7 +991,71 @@ export const userTableServerTool: BaseServerTool return { success: false, message: 'Workspace ID is required' } } - const file = await resolveWorkspaceFile(fileReference, workspaceId) + const record = await resolveWorkspaceFileRecordOrThrow(fileReference, workspaceId) + + // Large CSV/TSV: create a placeholder table whose creation claims the + // job slot, then let the streaming import worker infer the schema and + // populate rows in the background (mirrors POST /api/table/import-async). + if (shouldImportInBackground(record)) { + const planLimits = await getWorkspaceTableLimits(workspaceId) + const tableName = + args.name || + sanitizeName(record.name.replace(/\.[^.]+$/, ''), 'imported_table').slice( + 0, + TABLE_LIMITS.MAX_TABLE_NAME_LENGTH + ) + const requestId = generateId().slice(0, 8) + const importId = generateId() + assertNotAborted() + const table = await createTable( + { + name: tableName, + description: args.description || `Imported from ${record.name}`, + schema: { columns: [{ name: 'column_1', type: 'string' }] }, + workspaceId, + userId: context.userId, + maxRows: planLimits.maxRowsPerTable, + maxTables: planLimits.maxTables, + jobStatus: 'running', + jobType: 'import', + jobId: importId, + }, + requestId + ) + try { + await dispatchImportJob({ + importId, + tableId: table.id, + workspaceId, + userId: context.userId, + fileKey: record.key, + fileName: record.name, + delimiter: record.name.toLowerCase().endsWith('.tsv') ? '\t' : ',', + mode: 'create', + deleteSourceFile: false, + }) + } catch (dispatchError) { + // The user never saw the placeholder — archive it back out. + await deleteTable(table.id, generateId().slice(0, 8)).catch(() => {}) + throw dispatchError + } + return { + success: true, + message: `Created table "${table.name}" (${table.id}); importing rows from "${record.name}" in the background (job ${importId}). Columns and rows appear as the import progresses — query_rows to check what has landed.`, + data: { + tableId: table.id, + tableName: table.name, + jobId: importId, + sourceFile: record.name, + }, + } + } + + const file = { + buffer: await fetchWorkspaceFileBuffer(record), + name: record.name, + type: record.type, + } const { headers, rows } = await parseFileRows(file.buffer, file.name, file.type) if (rows.length === 0) { return { success: false, message: 'File contains no data rows' } @@ -866,95 +1180,143 @@ export const userTableServerTool: BaseServerTool return { success: false, message: `Table is archived: ${tableId}` } } - const file = await resolveWorkspaceFile(fileReference, workspaceId) - const { headers, rows } = await parseFileRows(file.buffer, file.name, file.type) - if (rows.length === 0) { - return { success: false, message: 'File contains no data rows' } - } - - const mapping: CsvHeaderMapping = rawMapping ?? buildAutoMapping(headers, table.schema) + const record = await resolveWorkspaceFileRecordOrThrow(fileReference, workspaceId) - let validation: ReturnType - try { - validation = validateMapping({ - csvHeaders: headers, - mapping, - tableSchema: table.schema, + // Large CSV/TSV: claim the table's one-write-job slot and hand the + // file to the streaming import worker (mirrors + // POST /api/table/[tableId]/import-async). + if (shouldImportInBackground(record)) { + const importId = generateId() + assertNotAborted() + const claimed = await markTableJobRunning(table.id, importId, 'import') + if (!claimed) { + return { success: false, message: 'A job is already in progress for this table' } + } + await dispatchImportJob({ + importId, + tableId: table.id, + workspaceId, + userId: context.userId, + fileKey: record.key, + fileName: record.name, + delimiter: record.name.toLowerCase().endsWith('.tsv') ? '\t' : ',', + mode, + mapping: rawMapping, + deleteSourceFile: false, }) - } catch (err) { - if (err instanceof CsvImportValidationError) { - return { success: false, message: err.message } + return { + success: true, + message: `Started background ${mode} import of "${record.name}" into "${table.name}" (job ${importId}). Rows appear as the import progresses — query_rows to check what has landed.`, + data: { tableId: table.id, jobId: importId, mode }, } - throw err } - if (validation.mappedHeaders.length === 0) { - return { - success: false, - message: `No matching columns between file (${headers.join(', ')}) and table (${table.schema.columns.map((c) => c.name).join(', ')})`, - } + // Claim the table's one-write-job slot up front — before the download + // and parse — so the inline import is mutually exclusive with any + // background import/delete for its whole duration, not just the write, + // and contention is detected before the parse work is spent. + const inlineImportId = generateId() + assertNotAborted() + const inlineClaimed = await markTableJobRunning(table.id, inlineImportId, 'import') + if (!inlineClaimed) { + return { success: false, message: 'A job is already in progress for this table' } } + try { + const file = { + buffer: await fetchWorkspaceFileBuffer(record), + name: record.name, + type: record.type, + } + const { headers, rows } = await parseFileRows(file.buffer, file.name, file.type) + if (rows.length === 0) { + return { success: false, message: 'File contains no data rows' } + } - const coerced = coerceRowsForTable(rows, table.schema, validation.effectiveMap) + const mapping: CsvHeaderMapping = rawMapping ?? buildAutoMapping(headers, table.schema) - if (mode === 'replace') { - assertNotAborted() - const requestId = generateId().slice(0, 8) - const result = await replaceTableRows( - { tableId: table.id, rows: coerced, workspaceId, userId: context.userId }, - table, - requestId - ) + let validation: ReturnType + try { + validation = validateMapping({ + csvHeaders: headers, + mapping, + tableSchema: table.schema, + }) + } catch (err) { + if (err instanceof CsvImportValidationError) { + return { success: false, message: err.message } + } + throw err + } + + if (validation.mappedHeaders.length === 0) { + return { + success: false, + message: `No matching columns between file (${headers.join(', ')}) and table (${table.schema.columns.map((c) => c.name).join(', ')})`, + } + } + + const coerced = coerceRowsForTable(rows, table.schema, validation.effectiveMap) + + if (mode === 'replace') { + const requestId = generateId().slice(0, 8) + const result = await replaceTableRows( + { tableId: table.id, rows: coerced, workspaceId, userId: context.userId }, + table, + requestId + ) + + logger.info('Rows replaced from file', { + tableId: table.id, + fileName: file.name, + mode, + matchedColumns: validation.mappedHeaders.length, + deleted: result.deletedCount, + inserted: result.insertedCount, + userId: context.userId, + }) + + return { + success: true, + message: `Replaced rows in "${table.name}" from "${file.name}": deleted ${result.deletedCount}, inserted ${result.insertedCount}`, + data: { + tableId: table.id, + tableName: table.name, + mode, + matchedColumns: validation.mappedHeaders, + skippedColumns: validation.skippedHeaders, + deletedCount: result.deletedCount, + insertedCount: result.insertedCount, + sourceFile: file.name, + }, + } + } + + const inserted = await batchInsertAll(table.id, coerced, table, workspaceId, context) - logger.info('Rows replaced from file', { + logger.info('Rows imported from file', { tableId: table.id, fileName: file.name, mode, matchedColumns: validation.mappedHeaders.length, - deleted: result.deletedCount, - inserted: result.insertedCount, + rows: inserted, userId: context.userId, }) return { success: true, - message: `Replaced rows in "${table.name}" from "${file.name}": deleted ${result.deletedCount}, inserted ${result.insertedCount}`, + message: `Imported ${inserted} rows into "${table.name}" from "${file.name}" (${validation.mappedHeaders.length} columns matched)`, data: { tableId: table.id, tableName: table.name, mode, matchedColumns: validation.mappedHeaders, skippedColumns: validation.skippedHeaders, - deletedCount: result.deletedCount, - insertedCount: result.insertedCount, + rowCount: inserted, sourceFile: file.name, }, } - } - - const inserted = await batchInsertAll(table.id, coerced, table, workspaceId, context) - - logger.info('Rows imported from file', { - tableId: table.id, - fileName: file.name, - mode, - matchedColumns: validation.mappedHeaders.length, - rows: inserted, - userId: context.userId, - }) - - return { - success: true, - message: `Imported ${inserted} rows into "${table.name}" from "${file.name}" (${validation.mappedHeaders.length} columns matched)`, - data: { - tableId: table.id, - tableName: table.name, - mode, - matchedColumns: validation.mappedHeaders, - skippedColumns: validation.skippedHeaders, - rowCount: inserted, - sourceFile: file.name, - }, + } finally { + await releaseJobClaim(table.id, inlineImportId).catch(() => {}) } } diff --git a/apps/sim/lib/table/events.ts b/apps/sim/lib/table/events.ts index 86a6f7ec09d..8b8dc6da93c 100644 --- a/apps/sim/lib/table/events.ts +++ b/apps/sim/lib/table/events.ts @@ -122,7 +122,7 @@ export type TableEvent = kind: 'job' tableId: string jobId: string - type: 'import' | 'delete' | 'export' | 'backfill' + type: 'import' | 'delete' | 'export' | 'backfill' | 'update' status: 'running' | 'ready' | 'failed' | 'canceled' /** Rows processed so far (running) or in total (ready). */ progress?: number diff --git a/apps/sim/lib/table/import-runner.test.ts b/apps/sim/lib/table/import-runner.test.ts new file mode 100644 index 00000000000..29b076a1806 --- /dev/null +++ b/apps/sim/lib/table/import-runner.test.ts @@ -0,0 +1,117 @@ +/** + * @vitest-environment node + */ +import { Readable } from 'node:stream' +import { beforeEach, describe, expect, it, vi } from 'vitest' + +const { + mockGetTableById, + mockBulkInsertImportBatch, + mockUpdateJobProgress, + mockMarkJobReady, + mockMarkJobFailed, + mockNextImportStartPosition, + mockNextImportStartOrderKey, + mockAppendTableEvent, + mockDeleteFile, + mockDownloadFileStream, + mockHeadObject, +} = vi.hoisted(() => ({ + mockGetTableById: vi.fn(), + mockBulkInsertImportBatch: vi.fn(), + mockUpdateJobProgress: vi.fn(), + mockMarkJobReady: vi.fn(), + mockMarkJobFailed: vi.fn(), + mockNextImportStartPosition: vi.fn(), + mockNextImportStartOrderKey: vi.fn(), + mockAppendTableEvent: vi.fn(), + mockDeleteFile: vi.fn(), + mockDownloadFileStream: vi.fn(), + mockHeadObject: vi.fn(), +})) + +vi.mock('@/lib/table/service', () => ({ + getTableById: mockGetTableById, +})) +vi.mock('@/lib/table/import-data', () => ({ + addImportColumns: vi.fn(), + bulkInsertImportBatch: mockBulkInsertImportBatch, + deleteAllTableRows: vi.fn(), + setTableSchemaForImport: vi.fn(), +})) +vi.mock('@/lib/table/jobs/service', () => ({ + markJobFailed: mockMarkJobFailed, + markJobReady: mockMarkJobReady, + updateJobProgress: mockUpdateJobProgress, +})) +vi.mock('@/lib/table/rows/ordering', () => ({ + nextImportStartOrderKey: mockNextImportStartOrderKey, + nextImportStartPosition: mockNextImportStartPosition, +})) +vi.mock('@/lib/table/events', () => ({ appendTableEvent: mockAppendTableEvent })) +vi.mock('@/lib/posthog/server', () => ({ captureServerEvent: vi.fn() })) +vi.mock('@/lib/uploads/core/storage-service', () => ({ + deleteFile: mockDeleteFile, + downloadFileStream: mockDownloadFileStream, + headObject: mockHeadObject, +})) +vi.mock('@/app/api/table/utils', () => ({ + normalizeColumn: (col: unknown) => col, +})) + +import { runTableImport, type TableImportPayload } from '@/lib/table/import-runner' + +const table = { + id: 'tbl_1', + name: 'People', + workspaceId: 'ws_1', + rowCount: 0, + maxRows: 1000, + schema: { columns: [{ id: 'col_name', name: 'name', type: 'string' }] }, +} + +function buildPayload(overrides: Partial = {}): TableImportPayload { + return { + importId: 'job_1', + tableId: 'tbl_1', + workspaceId: 'ws_1', + userId: 'user_1', + fileKey: 'workspace/ws_1/people.csv', + fileName: 'people.csv', + delimiter: ',', + mode: 'append', + ...overrides, + } +} + +describe('runTableImport source-file cleanup', () => { + beforeEach(() => { + vi.clearAllMocks() + mockGetTableById.mockResolvedValue(table) + mockHeadObject.mockResolvedValue({ size: 20 }) + mockDownloadFileStream.mockResolvedValue(Readable.from('name\nAlice\nBob\n')) + mockNextImportStartPosition.mockResolvedValue(0) + mockNextImportStartOrderKey.mockResolvedValue(null) + mockUpdateJobProgress.mockResolvedValue(true) + mockBulkInsertImportBatch.mockResolvedValue({ inserted: 2, lastOrderKey: 'a1' }) + mockMarkJobReady.mockResolvedValue(true) + mockDeleteFile.mockResolvedValue(undefined) + }) + + it('deletes the single-use source object by default', async () => { + await runTableImport(buildPayload()) + + expect(mockMarkJobReady).toHaveBeenCalled() + expect(mockDeleteFile).toHaveBeenCalledWith({ + key: 'workspace/ws_1/people.csv', + context: 'workspace', + }) + }) + + it('keeps a persistent workspace file when deleteSourceFile is false', async () => { + await runTableImport(buildPayload({ deleteSourceFile: false })) + + expect(mockMarkJobReady).toHaveBeenCalled() + expect(mockDeleteFile).not.toHaveBeenCalled() + }) +}) diff --git a/apps/sim/lib/table/import-runner.ts b/apps/sim/lib/table/import-runner.ts index 5391d22a238..f9d2b3f503a 100644 --- a/apps/sim/lib/table/import-runner.ts +++ b/apps/sim/lib/table/import-runner.ts @@ -60,6 +60,13 @@ export interface TableImportPayload { mapping?: CsvHeaderMapping /** (append/replace) CSV headers to auto-create as new columns (types inferred from the sample). */ createColumns?: string[] + /** + * Whether the source object is deleted once the import is terminal. Defaults + * to true (the UI routes upload a single-use temp object per import); pass + * false when importing a persistent workspace file (Mothership) that must + * survive the import. + */ + deleteSourceFile?: boolean } /** @@ -350,9 +357,12 @@ export async function runTableImport(payload: TableImportPayload): Promise // Release the storage stream so its HTTP connection doesn't leak on failure. source?.destroy() // The uploaded source file is single-use (a fresh upload per import) — delete it once the - // import is terminal so the workspace bucket doesn't accumulate. Best-effort. - await deleteFile({ key: fileKey, context: 'workspace' }).catch((err) => { - logger.warn(`[${requestId}] Failed to delete imported file`, { fileKey, err }) - }) + // import is terminal so the workspace bucket doesn't accumulate. Best-effort. Skipped for + // persistent workspace files (deleteSourceFile: false). + if (payload.deleteSourceFile !== false) { + await deleteFile({ key: fileKey, context: 'workspace' }).catch((err) => { + logger.warn(`[${requestId}] Failed to delete imported file`, { fileKey, err }) + }) + } } } diff --git a/apps/sim/lib/table/rows/ordering.ts b/apps/sim/lib/table/rows/ordering.ts index ccddfd51d3f..7646f8c0a1d 100644 --- a/apps/sim/lib/table/rows/ordering.ts +++ b/apps/sim/lib/table/rows/ordering.ts @@ -528,6 +528,51 @@ export async function selectRowIdPage(params: { return rows.map((r) => r.id) } +/** + * Like {@link selectRowIdPage} but returns each row's `data` too, for the bulk-update worker which + * must merge the patch into the existing row to validate the result. Same keyset walk on the + * `(table_id, id)` index, `created_at <= cutoff`, tenant-scoped, seqscan-off for jsonb filters. + * + * `excludeIfPatched` (a JSON patch string) skips rows that already contain the patch + * (`data @> patch`). The update worker passes it so a retried run doesn't re-walk and re-count + * rows an earlier attempt already updated — updated rows still exist (unlike deletes), and they + * still match the filter when the patch doesn't touch a filtered column, so without this a retry + * would double-count progress. It also skips no-op updates of rows that already hold those values. + */ +export async function selectRowDataPage(params: { + tableId: string + workspaceId: string + cutoff: Date + filterClause?: SQL + afterId?: string + limit: number + excludeIfPatched?: string +}): Promise> { + const { tableId, workspaceId, cutoff, filterClause, afterId, limit, excludeIfPatched } = params + const selectPage = (executor: DbExecutor) => + executor + .select({ id: userTableRows.id, data: userTableRows.data }) + .from(userTableRows) + .where( + and( + eq(userTableRows.tableId, tableId), + eq(userTableRows.workspaceId, workspaceId), + lte(userTableRows.createdAt, cutoff), + afterId ? gt(userTableRows.id, afterId) : undefined, + excludeIfPatched + ? sql`NOT (${userTableRows.data} @> ${excludeIfPatched}::jsonb)` + : undefined, + filterClause + ) + ) + .orderBy(asc(userTableRows.id)) + .limit(limit) + const rows = filterClause + ? await withSeqscanOff(async (trx) => selectPage(trx)) + : await selectPage(db) + return rows.map((r) => ({ id: r.id, data: r.data as RowData })) +} + /** * Deletes one page of rows for the async delete-job worker, committing each `DELETE_BATCH_SIZE` * chunk in its own short transaction. One statement per transaction bounds how long the @@ -563,3 +608,37 @@ export async function deletePageByIds( } return deleted } + +/** + * Applies a JSONB-merge patch (`data || patchJson`) to a page of row ids, committed in + * UPDATE_BATCH_SIZE chunks (each its own transaction, 60s timeout) so a large background update + * makes incremental, resumable progress. Returns the number of rows updated. + */ +export async function updatePageByIds( + tableId: string, + workspaceId: string, + rowIds: string[], + patchJson: string +): Promise { + const now = new Date() + let updated = 0 + for (let i = 0; i < rowIds.length; i += TABLE_LIMITS.UPDATE_BATCH_SIZE) { + const batch = rowIds.slice(i, i + TABLE_LIMITS.UPDATE_BATCH_SIZE) + const rows = await db.transaction(async (trx) => { + await setTableTxTimeouts(trx, { statementMs: 60_000 }) + return trx + .update(userTableRows) + .set({ data: sql`${userTableRows.data} || ${patchJson}::jsonb`, updatedAt: now }) + .where( + and( + eq(userTableRows.tableId, tableId), + eq(userTableRows.workspaceId, workspaceId), + inArray(userTableRows.id, batch) + ) + ) + .returning({ id: userTableRows.id }) + }) + updated += rows.length + } + return updated +} diff --git a/apps/sim/lib/table/types.ts b/apps/sim/lib/table/types.ts index cf57842617a..4c4ecbe7c6b 100644 --- a/apps/sim/lib/table/types.ts +++ b/apps/sim/lib/table/types.ts @@ -199,7 +199,7 @@ export type TableJobStatus = 'running' | 'ready' | 'failed' | 'canceled' * mutate row data and share the single-running-job gate; `export` is read-only and bypasses it * (the partial-unique index excludes it), so an export can run alongside any other job. */ -export type TableJobType = 'import' | 'delete' | 'export' | 'backfill' +export type TableJobType = 'import' | 'delete' | 'export' | 'backfill' | 'update' /** * Persisted scope of a running delete job (`table_jobs.payload`). Defines the doomed row set — @@ -217,6 +217,22 @@ export interface TableDeleteJobPayload { doomedCount?: number } +/** + * Persisted scope of a running bulk-update job (`table_jobs.payload`): the same `data` patch is + * merged into every row matching `filter` with `created_at <= cutoff` (so mid-job inserts are + * spared, matching the delete job's snapshot semantics). `affectedCount` is the kickoff estimate, + * display-only. Unlike delete, reads are not masked — updated rows still exist, so a background + * update is eventually consistent (readers may see a mix of patched/unpatched rows mid-job). + */ +export interface TableUpdateJobPayload { + filter: Filter + /** Column-id-keyed partial patch applied to every matched row (JSONB merge). */ + data: RowData + /** ISO timestamp; rows created after it are not patched. */ + cutoff: string + affectedCount?: number +} + /** * Persisted scope of an export job (`table_jobs.payload`). `resultKey` is merged in by the worker * on completion — the storage key of the generated file, served to the client via a presigned URL diff --git a/apps/sim/lib/table/update-runner.test.ts b/apps/sim/lib/table/update-runner.test.ts new file mode 100644 index 00000000000..2767dc1a2c3 --- /dev/null +++ b/apps/sim/lib/table/update-runner.test.ts @@ -0,0 +1,202 @@ +/** + * @vitest-environment node + */ +import { beforeEach, describe, expect, it, vi } from 'vitest' + +const { + mockGetTableById, + mockGetJobProgress, + mockSelectRowDataPage, + mockUpdatePageByIds, + mockUpdateJobProgress, + mockMarkJobReady, + mockMarkJobFailed, + mockAppendTableEvent, + mockBuildFilterClause, + mockValidateRowSize, + mockCoerceRowToSchema, + mockCoerceRowValues, +} = vi.hoisted(() => ({ + mockGetTableById: vi.fn(), + mockGetJobProgress: vi.fn(), + mockSelectRowDataPage: vi.fn(), + mockUpdatePageByIds: vi.fn(), + mockUpdateJobProgress: vi.fn(), + mockMarkJobReady: vi.fn(), + mockMarkJobFailed: vi.fn(), + mockAppendTableEvent: vi.fn(), + mockBuildFilterClause: vi.fn(), + mockValidateRowSize: vi.fn(), + mockCoerceRowToSchema: vi.fn(), + mockCoerceRowValues: vi.fn(), +})) + +vi.mock('@/lib/table/service', () => ({ getTableById: mockGetTableById })) +vi.mock('@/lib/table/jobs/service', () => ({ + getJobProgress: mockGetJobProgress, + updateJobProgress: mockUpdateJobProgress, + markJobReady: mockMarkJobReady, + markJobFailed: mockMarkJobFailed, +})) +vi.mock('@/lib/table/rows/ordering', () => ({ + selectRowDataPage: mockSelectRowDataPage, + updatePageByIds: mockUpdatePageByIds, +})) +vi.mock('@/lib/table/events', () => ({ appendTableEvent: mockAppendTableEvent })) +vi.mock('@/lib/table/sql', () => ({ buildFilterClause: mockBuildFilterClause })) +vi.mock('@/lib/table/validation', () => ({ + validateRowSize: mockValidateRowSize, + coerceRowToSchema: mockCoerceRowToSchema, + coerceRowValues: mockCoerceRowValues, +})) +vi.mock('@/lib/table/constants', () => ({ + TABLE_LIMITS: { DELETE_PAGE_SIZE: 2, UPDATE_BATCH_SIZE: 100 }, + USER_TABLE_ROWS_SQL_NAME: 'user_table_rows', +})) + +import { markTableUpdateFailed, runTableUpdate } from '@/lib/table/update-runner' + +const table = { id: 'tbl_1', workspaceId: 'ws_1', schema: { columns: [] } } +const cutoff = new Date('2026-06-05T00:00:00Z') + +function basePayload(overrides = {}) { + return { + jobId: 'job_1', + tableId: 'tbl_1', + workspaceId: 'ws_1', + filter: { status: 'old' }, + data: { flag: true }, + cutoff, + ...overrides, + } +} +const row = (id: string) => ({ id, data: {} }) + +describe('runTableUpdate', () => { + beforeEach(() => { + vi.clearAllMocks() + mockGetTableById.mockResolvedValue(table) + mockGetJobProgress.mockResolvedValue(0) + mockUpdateJobProgress.mockResolvedValue(true) + mockMarkJobReady.mockResolvedValue(true) + mockMarkJobFailed.mockResolvedValue(undefined) + mockUpdatePageByIds.mockImplementation((_t, _w, ids: string[]) => Promise.resolve(ids.length)) + mockBuildFilterClause.mockReturnValue({}) + mockValidateRowSize.mockReturnValue({ valid: true, errors: [] }) + mockCoerceRowToSchema.mockReturnValue({ valid: true, errors: [] }) + }) + + it('updates every matching page then marks the job ready', async () => { + mockSelectRowDataPage + .mockResolvedValueOnce([row('a'), row('b')]) + .mockResolvedValueOnce([row('c')]) + .mockResolvedValueOnce([]) + + await runTableUpdate(basePayload()) + + expect(mockUpdatePageByIds).toHaveBeenNthCalledWith( + 1, + 'tbl_1', + 'ws_1', + ['a', 'b'], + expect.any(String) + ) + expect(mockUpdatePageByIds).toHaveBeenNthCalledWith( + 2, + 'tbl_1', + 'ws_1', + ['c'], + expect.any(String) + ) + expect(mockMarkJobReady).toHaveBeenCalledWith('tbl_1', 'job_1') + expect(mockAppendTableEvent).toHaveBeenCalledWith( + expect.objectContaining({ kind: 'job', type: 'update', status: 'ready', progress: 3 }) + ) + }) + + it('fails (rethrows) when a merged row is invalid, without writing that page', async () => { + mockSelectRowDataPage.mockResolvedValueOnce([row('a')]) + mockValidateRowSize.mockReturnValueOnce({ valid: false, errors: ['row too large'] }) + + await expect(runTableUpdate(basePayload())).rejects.toThrow(/Row a: row too large/) + expect(mockUpdatePageByIds).not.toHaveBeenCalled() + expect(mockMarkJobFailed).not.toHaveBeenCalled() // caller decides via markTableUpdateFailed + }) + + it('stops without marking ready when the ownership gate is lost', async () => { + mockSelectRowDataPage.mockResolvedValue([row('a'), row('b')]) + mockUpdateJobProgress.mockResolvedValueOnce(true).mockResolvedValueOnce(false) + + await runTableUpdate(basePayload()) + + expect(mockUpdatePageByIds).toHaveBeenCalledTimes(1) + expect(mockMarkJobReady).not.toHaveBeenCalled() + }) + + it('rethrows the root cause so the clean message survives serialization', async () => { + const cause = new Error('canceling statement due to statement timeout') + mockSelectRowDataPage.mockRejectedValue(new Error('Failed query: update ...', { cause })) + + await expect(runTableUpdate(basePayload())).rejects.toThrow( + 'canceling statement due to statement timeout' + ) + expect(mockMarkJobFailed).not.toHaveBeenCalled() + }) + + it('resumes cumulative progress on retry instead of resetting to zero', async () => { + mockGetJobProgress.mockResolvedValue(7) + mockSelectRowDataPage.mockResolvedValueOnce([row('a'), row('b')]).mockResolvedValueOnce([]) + + await runTableUpdate(basePayload()) + + expect(mockUpdateJobProgress).toHaveBeenNthCalledWith(1, 'tbl_1', 7, 'job_1') + expect(mockAppendTableEvent).toHaveBeenCalledWith( + expect.objectContaining({ status: 'ready', progress: 9 }) + ) + }) + + it('stops at the seed read when the job is no longer owned', async () => { + mockGetJobProgress.mockResolvedValue(null) + + await expect(runTableUpdate(basePayload())).resolves.toBeUndefined() + expect(mockSelectRowDataPage).not.toHaveBeenCalled() + expect(mockUpdatePageByIds).not.toHaveBeenCalled() + }) + + it('passes the cutoff and filter clause through to the page query', async () => { + mockSelectRowDataPage.mockResolvedValueOnce([]) + + await runTableUpdate(basePayload()) + + expect(mockBuildFilterClause).toHaveBeenCalledWith( + { status: 'old' }, + 'user_table_rows', + table.schema.columns + ) + expect(mockSelectRowDataPage).toHaveBeenCalledWith( + expect.objectContaining({ + cutoff, + filterClause: {}, + limit: 2, + // Already-patched rows are excluded so a retry doesn't re-walk/double-count. + excludeIfPatched: JSON.stringify({ flag: true }), + }) + ) + }) +}) + +describe('markTableUpdateFailed', () => { + beforeEach(() => { + vi.clearAllMocks() + mockMarkJobFailed.mockResolvedValue(undefined) + }) + + it('marks the job failed and emits the failed event', async () => { + await markTableUpdateFailed('tbl_1', 'job_1', new Error('boom')) + + expect(mockMarkJobFailed).toHaveBeenCalledWith('tbl_1', 'job_1', 'boom') + expect(mockAppendTableEvent).toHaveBeenCalledWith( + expect.objectContaining({ kind: 'job', type: 'update', status: 'failed', error: 'boom' }) + ) + }) +}) diff --git a/apps/sim/lib/table/update-runner.ts b/apps/sim/lib/table/update-runner.ts new file mode 100644 index 00000000000..77a3a5d4231 --- /dev/null +++ b/apps/sim/lib/table/update-runner.ts @@ -0,0 +1,193 @@ +import { createLogger } from '@sim/logger' +import { getErrorMessage, toError } from '@sim/utils/errors' +import { generateId } from '@sim/utils/id' +import { truncate } from '@sim/utils/string' +import type { Filter, RowData } from '@/lib/table' +import { TABLE_LIMITS, USER_TABLE_ROWS_SQL_NAME } from '@/lib/table/constants' +import { appendTableEvent } from '@/lib/table/events' +import { + getJobProgress, + markJobFailed, + markJobReady, + updateJobProgress, +} from '@/lib/table/jobs/service' +import { selectRowDataPage, updatePageByIds } from '@/lib/table/rows/ordering' +import { getTableById } from '@/lib/table/service' +import { buildFilterClause } from '@/lib/table/sql' +import { coerceRowToSchema, coerceRowValues, validateRowSize } from '@/lib/table/validation' + +const logger = createLogger('TableUpdateRunner') + +/** Emit a progress event / heartbeat at most every this many rows. */ +const PROGRESS_INTERVAL_ROWS = 5000 + +/** + * Thrown when this worker discovers it no longer owns the table's job (canceled, or the + * stale-job janitor marked it failed and a newer job took over). The worker stops updating. + */ +class JobSupersededError extends Error {} + +export interface TableUpdatePayload { + jobId: string + tableId: string + workspaceId: string + /** Rows matching this filter get the patch. */ + filter: Filter + /** Column-id-keyed partial patch merged into every matched row. */ + data: RowData + /** Only rows created at/before this instant are patched, so mid-job inserts are spared. */ + cutoff: Date +} + +/** + * Background worker for large filtered row updates (trigger.dev task, or detached on the web + * container when trigger.dev is disabled — see the update dispatch in the user_table tool). + * Applies the same `data` patch (JSONB merge) to every row matching `filter` with + * `created_at <= cutoff`, in keyset-paginated pages. Each page validates the merged result per + * row, then commits in batches — **best-effort, not atomic**: committed pages persist even if a + * later page fails validation (unlike the inline `updateRowsByFilter`, which pre-validates all + * rows in one transaction). Reads are not masked: updated rows still exist, so mid-job reads are + * eventually consistent. Ownership-gated per page so a cancel/supersede stops within one page. + * + * Unlike the inline path, the worker does NOT fire per-row table triggers or auto-recompute + * workflow/enrichment columns — that would be a runaway cascade across thousands of rows. Run + * the affected columns explicitly afterward if downstream recompute is needed. + * + * Unexpected errors are rethrown for the caller's retry machinery; the caller marks the job + * failed via `markTableUpdateFailed`. A superseded run returns quietly. + */ +export async function runTableUpdate(payload: TableUpdatePayload): Promise { + const { jobId, tableId, workspaceId, filter, data, cutoff } = payload + const requestId = generateId().slice(0, 8) + + try { + const table = await getTableById(tableId, { includeArchived: true }) + if (!table) throw new Error(`Update target table ${tableId} not found`) + + const filterClause = buildFilterClause(filter, USER_TABLE_ROWS_SQL_NAME, table.schema.columns) + if (!filterClause) throw new Error('Filter is required for bulk update') + + // Coerce the patch once to the schema's types — the merged validation below and the persisted + // JSONB merge both use this normalized copy. + coerceRowValues(data, table.schema) + const patchJson = JSON.stringify(data) + + // Resume the persisted count: a retried attempt's earlier pages are already committed, so + // starting at zero would overwrite cumulative progress. Doubles as the initial ownership gate. + const resumed = await getJobProgress(tableId, jobId) + if (resumed === null) throw new JobSupersededError() + + let processed = resumed + let lastReported = resumed + let afterId: string | undefined + + while (true) { + const owns = await updateJobProgress(tableId, processed, jobId) + if (!owns) throw new JobSupersededError() + + const page = await selectRowDataPage({ + tableId, + workspaceId, + cutoff, + filterClause, + afterId, + limit: TABLE_LIMITS.DELETE_PAGE_SIZE, + // Skip rows already carrying the patch so a retried run resumes without re-walking / + // double-counting the rows an earlier attempt updated (updated rows still exist and may + // still match the filter, unlike deletes). + excludeIfPatched: patchJson, + }) + if (page.length === 0) break + afterId = page[page.length - 1].id + + // Validate each merged result before writing the page — a row that would overflow the size + // cap or violate the schema fails the job (earlier pages stay applied; best-effort). + for (const row of page) { + const merged = { ...row.data, ...data } + const sizeValidation = validateRowSize(merged) + if (!sizeValidation.valid) { + throw new Error(`Row ${row.id}: ${sizeValidation.errors.join(', ')}`) + } + const schemaValidation = coerceRowToSchema(merged, table.schema) + if (!schemaValidation.valid) { + throw new Error(`Row ${row.id}: ${schemaValidation.errors.join(', ')}`) + } + } + + processed += await updatePageByIds( + tableId, + workspaceId, + page.map((r) => r.id), + patchJson + ) + + if ( + processed - lastReported >= PROGRESS_INTERVAL_ROWS || + (lastReported === 0 && processed > 0) + ) { + lastReported = processed + void appendTableEvent({ + kind: 'job', + type: 'update', + tableId, + jobId, + status: 'running', + progress: processed, + }) + } + } + + await updateJobProgress(tableId, processed, jobId) + const becameReady = await markJobReady(tableId, jobId) + if (becameReady) { + void appendTableEvent({ + kind: 'job', + type: 'update', + tableId, + jobId, + status: 'ready', + progress: processed, + }) + logger.info(`[${requestId}] Update complete`, { tableId, rows: processed }) + } else { + logger.info( + `[${requestId}] Update finished but no longer owns the run (canceled/superseded)`, + { + tableId, + jobId, + } + ) + } + } catch (err) { + if (err instanceof JobSupersededError) { + logger.info(`[${requestId}] Update superseded by a newer run; stopping`, { tableId, jobId }) + return + } + const cause = toError(err).cause + const error = cause ? toError(cause) : toError(err) + logger.error(`[${requestId}] Update failed for table ${tableId}:`, error) + throw error + } +} + +/** + * Marks the update job failed and emits the failed SSE event. Called once the caller gives up on + * the run (trigger.dev `onFailure` after retries, or the detached fallback). Scoped to jobId — a + * no-op if a newer job has taken over. + */ +export async function markTableUpdateFailed( + tableId: string, + jobId: string, + error: unknown +): Promise { + const message = truncate(getErrorMessage(toError(error).cause ?? error, 'Update failed'), 500) + await markJobFailed(tableId, jobId, message).catch(() => {}) + void appendTableEvent({ + kind: 'job', + type: 'update', + tableId, + jobId, + status: 'failed', + error: message, + }) +} From 4ee39374e93bfad01603eac80995adfe3c897a15 Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Wed, 17 Jun 2026 14:21:01 -0700 Subject: [PATCH 2/7] docs(mothership): trim user_table catalog copy to the essentials Drop the verbose doomedCount/affectedCount, delete-mask, workflow-recompute, and unique-column asides from the bulk-op descriptions. The model only needs: large ops return { jobId }, limit maxes at 1000, one job per table. Co-Authored-By: Claude Fable 5 --- apps/sim/lib/copilot/generated/tool-catalog-v1.ts | 2 +- apps/sim/lib/copilot/generated/tool-schemas-v1.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/sim/lib/copilot/generated/tool-catalog-v1.ts b/apps/sim/lib/copilot/generated/tool-catalog-v1.ts index 56d3c137a6c..05d0539c6d4 100644 --- a/apps/sim/lib/copilot/generated/tool-catalog-v1.ts +++ b/apps/sim/lib/copilot/generated/tool-catalog-v1.ts @@ -3959,7 +3959,7 @@ export const UserTable: ToolCatalogEntry = { limit: { type: 'number', description: - 'Maximum rows to return or affect (optional; default 100, max 1000). For delete_rows_by_filter and update_rows_by_filter, omitting it lets matches above 1000 run as a background job.', + 'Maximum rows to return or affect (default 100, max 1000). For update_rows_by_filter / delete_rows_by_filter, omit to act on every match — large match sets run as a background job.', }, mapping: { type: 'object', diff --git a/apps/sim/lib/copilot/generated/tool-schemas-v1.ts b/apps/sim/lib/copilot/generated/tool-schemas-v1.ts index 107d033bd79..3b1efed51fe 100644 --- a/apps/sim/lib/copilot/generated/tool-schemas-v1.ts +++ b/apps/sim/lib/copilot/generated/tool-schemas-v1.ts @@ -3687,7 +3687,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { limit: { type: 'number', description: - 'Maximum rows to return or affect (optional; default 100, max 1000). For delete_rows_by_filter and update_rows_by_filter, omitting it lets matches above 1000 run as a background job.', + 'Maximum rows to return or affect (default 100, max 1000). For update_rows_by_filter / delete_rows_by_filter, omit to act on every match — large match sets run as a background job.', }, mapping: { type: 'object', From d2c93aef04812c5ee29f75fc43484df8a8e8e301 Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Wed, 17 Jun 2026 14:54:20 -0700 Subject: [PATCH 3/7] improvement(mothership): make user_table limit cap internal, not model-facing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The model can now pass any limit — no "cannot exceed 1000" rejection. 1000 becomes an internal threshold: query_rows clamps the page to MAX_QUERY_LIMIT (totalCount signals truncation; the model pages with offset), and bulk filter ops above the cap run as background jobs. update_rows_by_filter loads full row data inline, so an explicit limit above the cap escalates to the background worker with a new maxRows budget (the worker stops after maxRows; update has no read mask so the cap is exact). delete only loads ids inline, so an explicit limit (any size) stays inline — only unbounded deletes use the masked background path, which would over-hide a bounded delete. Co-Authored-By: Claude Fable 5 --- .../lib/copilot/generated/tool-catalog-v1.ts | 2 +- .../lib/copilot/generated/tool-schemas-v1.ts | 2 +- .../tools/server/table/user-table.test.ts | 40 +++++++--- .../copilot/tools/server/table/user-table.ts | 75 +++++++++++-------- apps/sim/lib/table/types.ts | 2 + apps/sim/lib/table/update-runner.test.ts | 17 +++++ apps/sim/lib/table/update-runner.ts | 9 ++- 7 files changed, 101 insertions(+), 46 deletions(-) diff --git a/apps/sim/lib/copilot/generated/tool-catalog-v1.ts b/apps/sim/lib/copilot/generated/tool-catalog-v1.ts index 05d0539c6d4..aaa362440d3 100644 --- a/apps/sim/lib/copilot/generated/tool-catalog-v1.ts +++ b/apps/sim/lib/copilot/generated/tool-catalog-v1.ts @@ -3959,7 +3959,7 @@ export const UserTable: ToolCatalogEntry = { limit: { type: 'number', description: - 'Maximum rows to return or affect (default 100, max 1000). For update_rows_by_filter / delete_rows_by_filter, omit to act on every match — large match sets run as a background job.', + 'Maximum rows to return or affect (optional, default 100). Any value is allowed — large operations run in the background automatically. Omit on update_rows_by_filter / delete_rows_by_filter to act on every match.', }, mapping: { type: 'object', diff --git a/apps/sim/lib/copilot/generated/tool-schemas-v1.ts b/apps/sim/lib/copilot/generated/tool-schemas-v1.ts index 3b1efed51fe..6e3606663e3 100644 --- a/apps/sim/lib/copilot/generated/tool-schemas-v1.ts +++ b/apps/sim/lib/copilot/generated/tool-schemas-v1.ts @@ -3687,7 +3687,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { limit: { type: 'number', description: - 'Maximum rows to return or affect (default 100, max 1000). For update_rows_by_filter / delete_rows_by_filter, omit to act on every match — large match sets run as a background job.', + 'Maximum rows to return or affect (optional, default 100). Any value is allowed — large operations run in the background automatically. Omit on update_rows_by_filter / delete_rows_by_filter to act on every match.', }, mapping: { type: 'object', diff --git a/apps/sim/lib/copilot/tools/server/table/user-table.test.ts b/apps/sim/lib/copilot/tools/server/table/user-table.test.ts index b13d44aa6c4..9e28dba675d 100644 --- a/apps/sim/lib/copilot/tools/server/table/user-table.test.ts +++ b/apps/sim/lib/copilot/tools/server/table/user-table.test.ts @@ -692,15 +692,15 @@ describe('userTableServerTool.query_rows', () => { }) }) - it('rejects limits above MAX_QUERY_LIMIT', async () => { + it('clamps an over-large query limit to MAX_QUERY_LIMIT instead of rejecting', async () => { const result = await userTableServerTool.execute( { operation: 'query_rows', args: { tableId: 'tbl_1', limit: 100000 } }, { userId: 'user-1', workspaceId: 'workspace-1' } ) - expect(result.success).toBe(false) - expect(result.message).toBe('Limit cannot exceed 1000') - expect(mockQueryRows).not.toHaveBeenCalled() + expect(result.success).toBe(true) + const options = mockQueryRows.mock.calls[0][1] as Record + expect(options.limit).toBe(1000) }) it('queries without execution metadata and passes limit/offset through', async () => { @@ -732,7 +732,7 @@ describe('userTableServerTool.delete_rows_by_filter', () => { }) }) - it('rejects limits above MAX_BULK_OPERATION_SIZE', async () => { + it('runs an explicit large limit inline without escalating (delete loads only ids)', async () => { const result = await userTableServerTool.execute( { operation: 'delete_rows_by_filter', @@ -741,9 +741,11 @@ describe('userTableServerTool.delete_rows_by_filter', () => { { userId: 'user-1', workspaceId: 'workspace-1' } ) - expect(result.success).toBe(false) - expect(result.message).toBe('Limit cannot exceed 1000') - expect(mockDeleteRowsByFilter).not.toHaveBeenCalled() + expect(result.success).toBe(true) + // An explicit limit never counts/escalates — it deletes inline, bounded by the limit. + expect(mockQueryRows).not.toHaveBeenCalled() + expect(mockDeleteRowsByFilter).toHaveBeenCalledTimes(1) + expect(mockDeleteRowsByFilter.mock.calls[0][1]).toMatchObject({ limit: 5000 }) }) it('deletes inline when the unbounded match count is within the cap', async () => { @@ -853,7 +855,14 @@ describe('userTableServerTool.update_rows_by_filter', () => { mockQueryRows.mockResolvedValue({ rows: [], rowCount: 0, totalCount: 5, limit: 1, offset: 0 }) }) - it('rejects limits above MAX_BULK_OPERATION_SIZE', async () => { + it('escalates an explicit limit above the cap to a background update with maxRows', async () => { + mockQueryRows.mockResolvedValueOnce({ + rows: [], + rowCount: 0, + totalCount: 20000, + limit: 1, + offset: 0, + }) const result = await userTableServerTool.execute( { operation: 'update_rows_by_filter', @@ -861,9 +870,16 @@ describe('userTableServerTool.update_rows_by_filter', () => { }, { userId: 'user-1', workspaceId: 'workspace-1' } ) - expect(result.success).toBe(false) - expect(result.message).toBe('Limit cannot exceed 1000') + await flushDetached() + + expect(result.success).toBe(true) + // target = min(limit 5000, matchCount 20000) = 5000, above the inline cap → background. + expect(result.data?.affectedCount).toBe(5000) expect(mockUpdateRowsByFilter).not.toHaveBeenCalled() + const [, , type, payload] = mockMarkTableJobRunning.mock.calls[0] + expect(type).toBe('update') + expect(payload).toMatchObject({ affectedCount: 5000, maxRows: 5000 }) + expect(mockRunTableUpdate.mock.calls[0][0]).toMatchObject({ maxRows: 5000 }) }) it('updates inline when the unbounded match count is within the cap', async () => { @@ -909,6 +925,8 @@ describe('userTableServerTool.update_rows_by_filter', () => { cutoff: expect.any(String), data: { age: 1 }, }) + // Unbounded match (no explicit limit) → the worker patches every match, no cap. + expect((payload as { maxRows?: number }).maxRows).toBeUndefined() expect(mockRunTableUpdate).toHaveBeenCalledTimes(1) expect(mockRunTableUpdate.mock.calls[0][0]).toMatchObject({ jobId, diff --git a/apps/sim/lib/copilot/tools/server/table/user-table.ts b/apps/sim/lib/copilot/tools/server/table/user-table.ts index 5ddd58a016b..00638fc81c8 100644 --- a/apps/sim/lib/copilot/tools/server/table/user-table.ts +++ b/apps/sim/lib/copilot/tools/server/table/user-table.ts @@ -200,8 +200,9 @@ async function dispatchUpdateJob(params: { filter: Filter data: RowData cutoff: Date + maxRows?: number }): Promise { - const { jobId, tableId, workspaceId, filter, data, cutoff } = params + const { jobId, tableId, workspaceId, filter, data, cutoff, maxRows } = params if (isTriggerDevEnabled) { try { const [{ tableUpdateTask }, { tasks }] = await Promise.all([ @@ -210,7 +211,7 @@ async function dispatchUpdateJob(params: { ]) await tasks.trigger( 'table-update', - { jobId, tableId, workspaceId, filter, data, cutoff: cutoff.toISOString() }, + { jobId, tableId, workspaceId, filter, data, cutoff: cutoff.toISOString(), maxRows }, { tags: [`tableId:${tableId}`, `jobId:${jobId}`] } ) } catch (error) { @@ -219,10 +220,12 @@ async function dispatchUpdateJob(params: { } } else { runDetached('table-update', () => - runTableUpdate({ jobId, tableId, workspaceId, filter, data, cutoff }).catch(async (error) => { - await markTableUpdateFailed(tableId, jobId, error) - throw error - }) + runTableUpdate({ jobId, tableId, workspaceId, filter, data, cutoff, maxRows }).catch( + async (error) => { + await markTableUpdateFailed(tableId, jobId, error) + throw error + } + ) ) } } @@ -280,17 +283,16 @@ function parseDeploymentMode(value: unknown): WorkflowGroupDeploymentMode | unde } /** - * Validates an optional row limit against the same bounds the HTTP contracts - * enforce. Returns an error message, or `null` when the limit is acceptable. + * Validates an optional row limit. There's no upper bound the caller must respect — the model may + * ask for any number. `MAX_QUERY_LIMIT` / `MAX_BULK_OPERATION_SIZE` are applied internally instead + * (query_rows clamps the page; bulk ops above the bound run as a background job). Returns an error + * message, or `null` when the limit is acceptable. */ -function limitError(limit: unknown, max: number): string | null { +function limitError(limit: unknown): string | null { if (limit === undefined) return null if (typeof limit !== 'number' || !Number.isInteger(limit) || limit < 1) { return 'Limit must be an integer of at least 1' } - if (limit > max) { - return `Limit cannot exceed ${max}` - } return null } @@ -579,7 +581,7 @@ export const userTableServerTool: BaseServerTool return { success: false, message: 'Workspace ID is required' } } - const queryLimitError = limitError(args.limit, TABLE_LIMITS.MAX_QUERY_LIMIT) + const queryLimitError = limitError(args.limit) if (queryLimitError) { return { success: false, message: queryLimitError } } @@ -592,12 +594,18 @@ export const userTableServerTool: BaseServerTool const requestId = generateId().slice(0, 8) const idByName = buildIdByName(table.schema) const nameById = buildNameById(table.schema) + // The model may request any number; we serve at most MAX_QUERY_LIMIT per page so a single + // tool result can't drain a whole table. `totalCount` in the response signals truncation, + // and the model pages with `offset`. const result = await queryRows( table, { filter: args.filter ? filterNamesToIds(args.filter, idByName) : undefined, sort: args.sort ? sortNamesToIds(args.sort, idByName) : undefined, - limit: args.limit, + limit: + args.limit !== undefined + ? Math.min(args.limit, TABLE_LIMITS.MAX_QUERY_LIMIT) + : undefined, offset: args.offset, withExecutions: false, }, @@ -700,7 +708,7 @@ export const userTableServerTool: BaseServerTool if (!workspaceId) { return { success: false, message: 'Workspace ID is required' } } - const updateLimitError = limitError(args.limit, TABLE_LIMITS.MAX_BULK_OPERATION_SIZE) + const updateLimitError = limitError(args.limit) if (updateLimitError) { return { success: false, message: updateLimitError } } @@ -715,29 +723,34 @@ export const userTableServerTool: BaseServerTool const idFilter = filterNamesToIds(args.filter, idByName) const idData = rowDataNameToId(args.data, idByName) - // Unbounded "update everything matching": measure the blast radius first and hand - // anything past the inline cap to the background update worker — same escalation as - // delete_rows_by_filter, so a broad update on a huge table doesn't load every matching - // row into this request. A patch touching a unique column stays inline (the service - // rejects bulk-setting a unique value across multiple rows). + // Inline handles up to MAX_BULK_OPERATION_SIZE rows in one request; a larger operation + // (an explicit limit above the cap, or unbounded "update everything matching") runs in the + // background worker so a broad update on a huge table doesn't load every matching row into + // this request. A small explicit limit is the fast path — no count needed. A patch + // touching a unique column always stays inline (the service rejects bulk-setting a unique + // value across multiple rows). const patchTouchesUnique = table.schema.columns.some( (c) => c.unique === true && (c.id ?? c.name) in idData ) - if (args.limit === undefined && !patchTouchesUnique) { + const updateInlineEligible = + args.limit !== undefined && args.limit <= TABLE_LIMITS.MAX_BULK_OPERATION_SIZE + if (!updateInlineEligible && !patchTouchesUnique) { const { totalCount } = await queryRows( table, { filter: idFilter, limit: 1, withExecutions: false }, requestId ) const matchCount = totalCount ?? 0 - if (matchCount > TABLE_LIMITS.MAX_BULK_OPERATION_SIZE) { + const target = args.limit !== undefined ? Math.min(args.limit, matchCount) : matchCount + if (target > TABLE_LIMITS.MAX_BULK_OPERATION_SIZE) { const cutoff = new Date() const jobId = generateId() const payload: TableUpdateJobPayload = { filter: idFilter, data: idData, cutoff: cutoff.toISOString(), - affectedCount: matchCount, + affectedCount: target, + maxRows: args.limit, } assertNotAborted() const claimed = await markTableJobRunning(table.id, jobId, 'update', payload) @@ -751,11 +764,12 @@ export const userTableServerTool: BaseServerTool filter: idFilter, data: idData, cutoff, + maxRows: args.limit, }) return { success: true, - message: `Started background update of ${matchCount} matching rows (job ${jobId}). Rows update in the background — query_rows to check progress. Note: background updates don't auto-recompute workflow/enrichment columns; use run_column afterward if needed.`, - data: { jobId, affectedCount: matchCount }, + message: `Started background update of ${target} matching rows (job ${jobId}). Rows update in the background — query_rows to check progress. Note: background updates don't auto-recompute workflow/enrichment columns; use run_column afterward if needed.`, + data: { jobId, affectedCount: target }, } } } @@ -789,7 +803,7 @@ export const userTableServerTool: BaseServerTool if (!workspaceId) { return { success: false, message: 'Workspace ID is required' } } - const deleteLimitError = limitError(args.limit, TABLE_LIMITS.MAX_BULK_OPERATION_SIZE) + const deleteLimitError = limitError(args.limit) if (deleteLimitError) { return { success: false, message: deleteLimitError } } @@ -803,10 +817,11 @@ export const userTableServerTool: BaseServerTool const idByName = buildIdByName(table.schema) const idFilter = filterNamesToIds(args.filter, idByName) - // Unbounded "delete everything matching": measure the blast radius - // first, and hand anything past the inline cap to the background - // delete worker (same path as the UI's select-all delete) instead of - // loading every matching row id into this request. + // An explicit limit runs inline (delete loads only row ids, so even a large bounded + // delete is light). Only an unbounded "delete everything matching" measures the blast + // radius and hands off to the background delete worker (same path as the UI's select-all + // delete) — the read-path mask hides exactly the all-matching set, which a bounded delete + // would over-hide. if (args.limit === undefined) { const { totalCount } = await queryRows( table, diff --git a/apps/sim/lib/table/types.ts b/apps/sim/lib/table/types.ts index 4c4ecbe7c6b..9cd74e640ad 100644 --- a/apps/sim/lib/table/types.ts +++ b/apps/sim/lib/table/types.ts @@ -231,6 +231,8 @@ export interface TableUpdateJobPayload { /** ISO timestamp; rows created after it are not patched. */ cutoff: string affectedCount?: number + /** Stop after updating this many rows (an explicit caller-supplied limit). Omitted = every match. */ + maxRows?: number } /** diff --git a/apps/sim/lib/table/update-runner.test.ts b/apps/sim/lib/table/update-runner.test.ts index 2767dc1a2c3..d220effe0fa 100644 --- a/apps/sim/lib/table/update-runner.test.ts +++ b/apps/sim/lib/table/update-runner.test.ts @@ -163,6 +163,23 @@ describe('runTableUpdate', () => { expect(mockUpdatePageByIds).not.toHaveBeenCalled() }) + it('stops once maxRows is reached and never over-fetches a page', async () => { + // budget 3 with page size 2: first page fills 2, second page is capped to the remaining 1. + mockSelectRowDataPage + .mockResolvedValueOnce([row('a'), row('b')]) + .mockResolvedValueOnce([row('c')]) + + await runTableUpdate(basePayload({ maxRows: 3 })) + + expect(mockSelectRowDataPage).toHaveBeenCalledTimes(2) + expect(mockSelectRowDataPage.mock.calls[0][0]).toMatchObject({ limit: 2 }) + expect(mockSelectRowDataPage.mock.calls[1][0]).toMatchObject({ limit: 1 }) + expect(mockUpdatePageByIds).toHaveBeenCalledTimes(2) + expect(mockAppendTableEvent).toHaveBeenCalledWith( + expect.objectContaining({ status: 'ready', progress: 3 }) + ) + }) + it('passes the cutoff and filter clause through to the page query', async () => { mockSelectRowDataPage.mockResolvedValueOnce([]) diff --git a/apps/sim/lib/table/update-runner.ts b/apps/sim/lib/table/update-runner.ts index 77a3a5d4231..194746830c9 100644 --- a/apps/sim/lib/table/update-runner.ts +++ b/apps/sim/lib/table/update-runner.ts @@ -37,6 +37,8 @@ export interface TableUpdatePayload { data: RowData /** Only rows created at/before this instant are patched, so mid-job inserts are spared. */ cutoff: Date + /** Stop after updating this many rows (an explicit caller-supplied limit). Omitted = every match. */ + maxRows?: number } /** @@ -57,8 +59,9 @@ export interface TableUpdatePayload { * failed via `markTableUpdateFailed`. A superseded run returns quietly. */ export async function runTableUpdate(payload: TableUpdatePayload): Promise { - const { jobId, tableId, workspaceId, filter, data, cutoff } = payload + const { jobId, tableId, workspaceId, filter, data, cutoff, maxRows } = payload const requestId = generateId().slice(0, 8) + const budget = maxRows ?? Number.POSITIVE_INFINITY try { const table = await getTableById(tableId, { includeArchived: true }) @@ -81,7 +84,7 @@ export async function runTableUpdate(payload: TableUpdatePayload): Promise let lastReported = resumed let afterId: string | undefined - while (true) { + while (processed < budget) { const owns = await updateJobProgress(tableId, processed, jobId) if (!owns) throw new JobSupersededError() @@ -91,7 +94,7 @@ export async function runTableUpdate(payload: TableUpdatePayload): Promise cutoff, filterClause, afterId, - limit: TABLE_LIMITS.DELETE_PAGE_SIZE, + limit: Math.min(TABLE_LIMITS.DELETE_PAGE_SIZE, budget - processed), // Skip rows already carrying the patch so a retried run resumes without re-walking / // double-counting the rows an earlier attempt updated (updated rows still exist and may // still match the filter, unlike deletes). From f1ee3e90683eff1dca805d322c8d9fd8350e53ea Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Wed, 17 Jun 2026 15:01:46 -0700 Subject: [PATCH 4/7] improvement(mothership): bounded delete above the cap runs async, not inline MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit An explicit delete limit now mirrors update: ≤1000 runs inline, above the cap it escalates to the background worker honoring the limit via maxRows — instead of always staying inline. The worker stops after maxRows (per-page fetch capped to the remaining budget). Bounded background deletes skip pendingDeleteMask: the filter-based mask hides every match, which would over-hide the rows beyond the cap the job never deletes. Unmasked, a bounded delete is eventually consistent like a bounded update (rows disappear as deleted), and doomedCount is omitted from the payload so the count isn't double-subtracted. Co-Authored-By: Claude Fable 5 --- .../tools/server/table/user-table.test.ts | 27 ++++++++-- .../copilot/tools/server/table/user-table.ts | 50 +++++++++++-------- apps/sim/lib/table/delete-runner.test.ts | 16 ++++++ apps/sim/lib/table/delete-runner.ts | 13 +++-- apps/sim/lib/table/types.ts | 10 +++- 5 files changed, 87 insertions(+), 29 deletions(-) diff --git a/apps/sim/lib/copilot/tools/server/table/user-table.test.ts b/apps/sim/lib/copilot/tools/server/table/user-table.test.ts index 9e28dba675d..abf8c7d6f45 100644 --- a/apps/sim/lib/copilot/tools/server/table/user-table.test.ts +++ b/apps/sim/lib/copilot/tools/server/table/user-table.test.ts @@ -732,7 +732,15 @@ describe('userTableServerTool.delete_rows_by_filter', () => { }) }) - it('runs an explicit large limit inline without escalating (delete loads only ids)', async () => { + it('escalates an explicit limit above the cap to a background delete with maxRows (unmasked)', async () => { + mockQueryRows.mockResolvedValueOnce({ + rows: [], + rowCount: 0, + totalCount: 20000, + limit: 1, + offset: 0, + }) + const result = await userTableServerTool.execute( { operation: 'delete_rows_by_filter', @@ -740,12 +748,19 @@ describe('userTableServerTool.delete_rows_by_filter', () => { }, { userId: 'user-1', workspaceId: 'workspace-1' } ) + await flushDetached() expect(result.success).toBe(true) - // An explicit limit never counts/escalates — it deletes inline, bounded by the limit. - expect(mockQueryRows).not.toHaveBeenCalled() - expect(mockDeleteRowsByFilter).toHaveBeenCalledTimes(1) - expect(mockDeleteRowsByFilter.mock.calls[0][1]).toMatchObject({ limit: 5000 }) + // target = min(limit 5000, matchCount 20000) = 5000, above the inline cap → background. + expect(result.data?.doomedCount).toBe(5000) + expect(mockDeleteRowsByFilter).not.toHaveBeenCalled() + const [, , type, payload] = mockMarkTableJobRunning.mock.calls[0] + expect(type).toBe('delete') + // Bounded delete carries maxRows and omits doomedCount so the mask is skipped and the count + // isn't double-subtracted. + expect(payload).toMatchObject({ maxRows: 5000 }) + expect((payload as { doomedCount?: number }).doomedCount).toBeUndefined() + expect(mockRunTableDelete.mock.calls[0][0]).toMatchObject({ maxRows: 5000 }) }) it('deletes inline when the unbounded match count is within the cap', async () => { @@ -801,6 +816,8 @@ describe('userTableServerTool.delete_rows_by_filter', () => { expect(tableId).toBe('tbl_1') expect(type).toBe('delete') expect(payload).toMatchObject({ doomedCount: 20000, cutoff: expect.any(String) }) + // Unbounded delete masks the whole set — no maxRows cap. + expect((payload as { maxRows?: number }).maxRows).toBeUndefined() expect(mockRunTableDelete).toHaveBeenCalledTimes(1) expect(mockRunTableDelete.mock.calls[0][0]).toMatchObject({ jobId, diff --git a/apps/sim/lib/copilot/tools/server/table/user-table.ts b/apps/sim/lib/copilot/tools/server/table/user-table.ts index 00638fc81c8..024dda09adb 100644 --- a/apps/sim/lib/copilot/tools/server/table/user-table.ts +++ b/apps/sim/lib/copilot/tools/server/table/user-table.ts @@ -161,8 +161,9 @@ async function dispatchDeleteJob(params: { workspaceId: string filter: Filter cutoff: Date + maxRows?: number }): Promise { - const { jobId, tableId, workspaceId, filter, cutoff } = params + const { jobId, tableId, workspaceId, filter, cutoff, maxRows } = params if (isTriggerDevEnabled) { try { const [{ tableDeleteTask }, { tasks }] = await Promise.all([ @@ -171,7 +172,7 @@ async function dispatchDeleteJob(params: { ]) await tasks.trigger( 'table-delete', - { jobId, tableId, workspaceId, filter, cutoff: cutoff.toISOString() }, + { jobId, tableId, workspaceId, filter, cutoff: cutoff.toISOString(), maxRows }, { tags: [`tableId:${tableId}`, `jobId:${jobId}`] } ) } catch (error) { @@ -180,10 +181,12 @@ async function dispatchDeleteJob(params: { } } else { runDetached('table-delete', () => - runTableDelete({ jobId, tableId, workspaceId, filter, cutoff }).catch(async (error) => { - await markTableDeleteFailed(tableId, jobId, error) - throw error - }) + runTableDelete({ jobId, tableId, workspaceId, filter, cutoff, maxRows }).catch( + async (error) => { + await markTableDeleteFailed(tableId, jobId, error) + throw error + } + ) ) } } @@ -817,27 +820,31 @@ export const userTableServerTool: BaseServerTool const idByName = buildIdByName(table.schema) const idFilter = filterNamesToIds(args.filter, idByName) - // An explicit limit runs inline (delete loads only row ids, so even a large bounded - // delete is light). Only an unbounded "delete everything matching" measures the blast - // radius and hands off to the background delete worker (same path as the UI's select-all - // delete) — the read-path mask hides exactly the all-matching set, which a bounded delete - // would over-hide. - if (args.limit === undefined) { + // Inline handles up to MAX_BULK_OPERATION_SIZE rows; a larger delete (an explicit limit + // above the cap, or unbounded "delete everything matching") hands off to the background + // delete worker so a broad delete on a huge table doesn't load every matching id into this + // request. A small explicit limit is the fast path. + const deleteInlineEligible = + args.limit !== undefined && args.limit <= TABLE_LIMITS.MAX_BULK_OPERATION_SIZE + if (!deleteInlineEligible) { const { totalCount } = await queryRows( table, { filter: idFilter, limit: 1, withExecutions: false }, requestId ) const matchCount = totalCount ?? 0 - if (matchCount > TABLE_LIMITS.MAX_BULK_OPERATION_SIZE) { - const doomedCount = Math.min(matchCount, table.rowCount) + const target = args.limit !== undefined ? Math.min(args.limit, matchCount) : matchCount + if (target > TABLE_LIMITS.MAX_BULK_OPERATION_SIZE) { + const doomedCount = Math.min(target, table.rowCount) const cutoff = new Date() const jobId = generateId() - const payload: TableDeleteJobPayload = { - filter: idFilter, - cutoff: cutoff.toISOString(), - doomedCount, - } + // Unbounded: mask the whole matching set (instant post-delete view), so `doomedCount` + // drives the count adjustment. Bounded (maxRows): no mask — `doomedCount` is omitted so + // the count isn't double-subtracted; rows disappear progressively as they're deleted. + const bounded = args.limit !== undefined + const payload: TableDeleteJobPayload = bounded + ? { filter: idFilter, cutoff: cutoff.toISOString(), maxRows: args.limit } + : { filter: idFilter, cutoff: cutoff.toISOString(), doomedCount } assertNotAborted() const claimed = await markTableJobRunning(table.id, jobId, 'delete', payload) if (!claimed) { @@ -849,10 +856,13 @@ export const userTableServerTool: BaseServerTool workspaceId, filter: idFilter, cutoff, + maxRows: args.limit, }) return { success: true, - message: `Started background delete of ${doomedCount} matching rows (job ${jobId}). The rows are hidden from reads immediately — query_rows already reflects the post-delete view.`, + message: bounded + ? `Started background delete of up to ${doomedCount} matching rows (job ${jobId}). Rows delete in the background — query_rows to check progress.` + : `Started background delete of ${doomedCount} matching rows (job ${jobId}). The rows are hidden from reads immediately — query_rows already reflects the post-delete view.`, data: { jobId, doomedCount }, } } diff --git a/apps/sim/lib/table/delete-runner.test.ts b/apps/sim/lib/table/delete-runner.test.ts index d06285d51a7..6894b59e443 100644 --- a/apps/sim/lib/table/delete-runner.test.ts +++ b/apps/sim/lib/table/delete-runner.test.ts @@ -82,6 +82,22 @@ describe('runTableDelete', () => { ) }) + it('stops once maxRows is reached and caps the final page fetch to the remaining budget', async () => { + // budget 3 with page size 2: first page fills 2, the second is capped to the remaining 1. + mockSelectRowIdPage.mockResolvedValueOnce(['a', 'b']).mockResolvedValueOnce(['c']) + + await runTableDelete(basePayload({ filter: { status: 'old' }, maxRows: 3 })) + + expect(mockSelectRowIdPage).toHaveBeenCalledTimes(2) + expect(mockSelectRowIdPage.mock.calls[0][0]).toMatchObject({ limit: 2 }) + expect(mockSelectRowIdPage.mock.calls[1][0]).toMatchObject({ limit: 1 }) + expect(mockDeletePageByIds).toHaveBeenCalledTimes(2) + expect(mockMarkJobReady).toHaveBeenCalledWith('tbl_1', 'job_1') + expect(mockAppendTableEvent).toHaveBeenCalledWith( + expect.objectContaining({ status: 'ready', progress: 3 }) + ) + }) + it('skips excluded rows but still advances the keyset cursor past them', async () => { mockSelectRowIdPage.mockResolvedValueOnce(['keep', 'x']).mockResolvedValueOnce([]) diff --git a/apps/sim/lib/table/delete-runner.ts b/apps/sim/lib/table/delete-runner.ts index 5fb4a6e3708..024daab6afc 100644 --- a/apps/sim/lib/table/delete-runner.ts +++ b/apps/sim/lib/table/delete-runner.ts @@ -36,6 +36,12 @@ export interface TableDeletePayload { excludeRowIds?: string[] /** Only rows created at/before this instant are deleted, so mid-job inserts survive. */ cutoff: Date + /** + * Stop after deleting this many rows (an explicit caller-supplied limit). Omitted = every match. + * Not combined with `excludeRowIds` (the UI's select-all path uses excludes and no cap; the + * copilot tool uses a cap and no excludes), so the per-page fetch can be bounded directly. + */ + maxRows?: number } /** @@ -52,8 +58,9 @@ export interface TableDeletePayload { * newer job took the table) returns quietly. */ export async function runTableDelete(payload: TableDeletePayload): Promise { - const { jobId, tableId, workspaceId, filter, excludeRowIds, cutoff } = payload + const { jobId, tableId, workspaceId, filter, excludeRowIds, cutoff, maxRows } = payload const requestId = generateId().slice(0, 8) + const budget = maxRows ?? Number.POSITIVE_INFINITY try { const table = await getTableById(tableId, { includeArchived: true }) @@ -74,7 +81,7 @@ export async function runTableDelete(payload: TableDeletePayload): Promise let lastReported = resumed let afterId: string | undefined - while (true) { + while (processed < budget) { // Ownership gate before every page: once this run loses the table (cancel/supersede), // updateJobProgress returns false and we stop before deleting further. const owns = await updateJobProgress(tableId, processed, jobId) @@ -86,7 +93,7 @@ export async function runTableDelete(payload: TableDeletePayload): Promise cutoff, filterClause, afterId, - limit: TABLE_LIMITS.DELETE_PAGE_SIZE, + limit: Math.min(TABLE_LIMITS.DELETE_PAGE_SIZE, budget - processed), }) if (page.length === 0) break // Advance the keyset cursor past the whole page — excluded ids are skipped (not deleted), diff --git a/apps/sim/lib/table/types.ts b/apps/sim/lib/table/types.ts index 9cd74e640ad..badd3356f70 100644 --- a/apps/sim/lib/table/types.ts +++ b/apps/sim/lib/table/types.ts @@ -213,8 +213,16 @@ export interface TableDeleteJobPayload { /** ISO timestamp; rows created after it are spared. */ cutoff: string /** Doomed-row estimate captured at kickoff — display-only: list/detail counts subtract the - * not-yet-deleted remainder (doomedCount - rows_processed) while the job runs. */ + * not-yet-deleted remainder (doomedCount - rows_processed) while the job runs. Set only for an + * unbounded delete (the masked "delete everything matching" path); omitted when `maxRows` is set. */ doomedCount?: number + /** + * Stop after deleting this many rows (an explicit caller-supplied limit above the inline cap). + * Omitted = delete every match. When set, reads are NOT masked: the delete is eventually + * consistent (rows disappear as they're deleted) like a bounded update, because the filter-based + * mask would over-hide the rows beyond the cap that this job never deletes. + */ + maxRows?: number } /** From 65b12f2a37caf61f5b119c3a8b34bd560070106d Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Wed, 17 Jun 2026 15:08:00 -0700 Subject: [PATCH 5/7] docs(mothership): tidy user_table limit/offset param copy Drop "Any value is allowed" from the limit description and restore the original offset description. Co-Authored-By: Claude Fable 5 --- apps/sim/lib/copilot/generated/tool-catalog-v1.ts | 4 ++-- apps/sim/lib/copilot/generated/tool-schemas-v1.ts | 5 ++--- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/apps/sim/lib/copilot/generated/tool-catalog-v1.ts b/apps/sim/lib/copilot/generated/tool-catalog-v1.ts index aaa362440d3..ffd92d4a1ee 100644 --- a/apps/sim/lib/copilot/generated/tool-catalog-v1.ts +++ b/apps/sim/lib/copilot/generated/tool-catalog-v1.ts @@ -3959,7 +3959,7 @@ export const UserTable: ToolCatalogEntry = { limit: { type: 'number', description: - 'Maximum rows to return or affect (optional, default 100). Any value is allowed — large operations run in the background automatically. Omit on update_rows_by_filter / delete_rows_by_filter to act on every match.', + 'Maximum rows to return or affect (optional, default 100). Large operations run in the background automatically. Omit on update_rows_by_filter / delete_rows_by_filter to act on every match.', }, mapping: { type: 'object', @@ -4012,7 +4012,7 @@ export const UserTable: ToolCatalogEntry = { }, offset: { type: 'number', - description: 'Number of rows to skip for query_rows pagination (optional, default 0).', + description: 'Number of rows to skip (optional for query_rows, default 0)', }, outputColumnNames: { type: 'object', diff --git a/apps/sim/lib/copilot/generated/tool-schemas-v1.ts b/apps/sim/lib/copilot/generated/tool-schemas-v1.ts index 6e3606663e3..3429521dc1f 100644 --- a/apps/sim/lib/copilot/generated/tool-schemas-v1.ts +++ b/apps/sim/lib/copilot/generated/tool-schemas-v1.ts @@ -3687,7 +3687,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { limit: { type: 'number', description: - 'Maximum rows to return or affect (optional, default 100). Any value is allowed — large operations run in the background automatically. Omit on update_rows_by_filter / delete_rows_by_filter to act on every match.', + 'Maximum rows to return or affect (optional, default 100). Large operations run in the background automatically. Omit on update_rows_by_filter / delete_rows_by_filter to act on every match.', }, mapping: { type: 'object', @@ -3746,8 +3746,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, offset: { type: 'number', - description: - 'Number of rows to skip for query_rows pagination (optional, default 0).', + description: 'Number of rows to skip (optional for query_rows, default 0)', }, outputColumnNames: { type: 'object', From 27abafbf8331e266d39cc96c8bd76a7f12242fb9 Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Wed, 17 Jun 2026 15:14:38 -0700 Subject: [PATCH 6/7] fix(tables): skip pendingDeleteMask for bounded background deletes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The bounded-delete commit (f1ee3e9) persisted maxRows and omitted doomedCount but the pendingDeleteMask guard that makes it work was left uncommitted, so the shipped mask still hid every filter+cutoff match — over-hiding the rows beyond maxRows that the job never deletes (they vanished from reads until the job ended, then reappeared). Return no mask when maxRows is set: a bounded delete is eventually consistent (rows disappear as deleted), like a bounded update. Co-Authored-By: Claude Fable 5 --- apps/sim/lib/table/rows/service.ts | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/apps/sim/lib/table/rows/service.ts b/apps/sim/lib/table/rows/service.ts index c676b178130..73559851018 100644 --- a/apps/sim/lib/table/rows/service.ts +++ b/apps/sim/lib/table/rows/service.ts @@ -855,6 +855,11 @@ export async function pendingDeleteMask(table: TableDefinition): Promise 0) { try { From b551d90fd21809c42f101a05076b30e21d6899e5 Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Wed, 17 Jun 2026 15:38:53 -0700 Subject: [PATCH 7/7] docs(mothership): drop redundant background note from limit arg The op descriptions already cover background escalation; the limit arg only needs to say what the param does. Co-Authored-By: Claude Fable 5 --- apps/sim/lib/copilot/generated/tool-catalog-v1.ts | 2 +- apps/sim/lib/copilot/generated/tool-schemas-v1.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/sim/lib/copilot/generated/tool-catalog-v1.ts b/apps/sim/lib/copilot/generated/tool-catalog-v1.ts index ffd92d4a1ee..0cd92c61c81 100644 --- a/apps/sim/lib/copilot/generated/tool-catalog-v1.ts +++ b/apps/sim/lib/copilot/generated/tool-catalog-v1.ts @@ -3959,7 +3959,7 @@ export const UserTable: ToolCatalogEntry = { limit: { type: 'number', description: - 'Maximum rows to return or affect (optional, default 100). Large operations run in the background automatically. Omit on update_rows_by_filter / delete_rows_by_filter to act on every match.', + 'Maximum rows to return or affect (optional, default 100). Omit on update_rows_by_filter / delete_rows_by_filter to act on every match.', }, mapping: { type: 'object', diff --git a/apps/sim/lib/copilot/generated/tool-schemas-v1.ts b/apps/sim/lib/copilot/generated/tool-schemas-v1.ts index 3429521dc1f..31171237e7e 100644 --- a/apps/sim/lib/copilot/generated/tool-schemas-v1.ts +++ b/apps/sim/lib/copilot/generated/tool-schemas-v1.ts @@ -3687,7 +3687,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { limit: { type: 'number', description: - 'Maximum rows to return or affect (optional, default 100). Large operations run in the background automatically. Omit on update_rows_by_filter / delete_rows_by_filter to act on every match.', + 'Maximum rows to return or affect (optional, default 100). Omit on update_rows_by_filter / delete_rows_by_filter to act on every match.', }, mapping: { type: 'object',