Skip to content

Conversation

@ianmacartney
Copy link
Contributor

@ianmacartney ianmacartney commented Dec 5, 2025

Allow batching pending writes to reduce the amount of calls to the component during mutations that may be using triggers


By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

Summary by CodeRabbit

  • New Features

    • Added a batch mutation to apply multiple write operations in one pass (namespace-aware batching, mixed inserts/updates/deletes).
    • Added buffering controls with buffer() and flush(), plus auto-flush-before-read behavior and performance comparison tooling.
  • Documentation

    • Added comprehensive example demonstrating buffering patterns, onSuccess callbacks, namespaced batches, and perf comparisons.
  • Tests

    • Added tests verifying buffering, manual flush, and auto-flush behavior.
  • Chores

    • Registered the new batching example in configuration.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link

coderabbitai bot commented Dec 5, 2025

Walkthrough

Adds client-side buffering to Aggregate (buffer/flush), implements a server-side batch mutation that groups operations by namespace and reuses fetched trees, updates btree handlers to accept optional injected tree, and includes examples and tests demonstrating buffering and batched writes.

Changes

Cohort / File(s) Summary
Core Buffering Implementation
src/client/index.ts
Adds buffer(enabled): this and async flush(ctx): Promise<void>, internal isBuffering state and operationQueue, flushBeforeRead helper, and changes write paths to queue operations when buffering is enabled.
Batch Mutation & Tree Injection
src/component/public.ts, src/component/btree.ts
Adds a new batch mutation that accepts multiple operations, groups them by namespace, fetches one tree per group, and applies operations in order. Updates insertHandler and deleteHandler signatures to accept optional treeArg?: Doc<"btree"> for tree injection and reuse.
Example Demonstrations
example/convex/batchedWrites.ts, example/convex/convex.config.ts
Adds example/convex/batchedWrites.ts with multiple exported mutations demonstrating buffering patterns, auto-flush, performance comparisons, and namespace-aware batching. Registers the new aggregate in example/convex/convex.config.ts.
Buffering Test Coverage
src/client/buffer.test.ts
New tests covering DirectAggregate buffering: auto-flush within mutation contexts and manual flush behavior with assertions on inserted item counts.

Sequence Diagram

sequenceDiagram
    participant Client
    participant Aggregate
    participant Buffer as BufferQueue
    participant Batch as BatchMutation
    participant BTree as B-TreeHandlers

    Client->>Aggregate: buffer(true)
    Aggregate-->>Client: buffering enabled

    Client->>Aggregate: insert(ctx, op1)
    Aggregate->>Buffer: enqueue op1

    Client->>Aggregate: insert(ctx, op2)
    Aggregate->>Buffer: enqueue op2

    Client->>Aggregate: flush(ctx)
    Aggregate->>Batch: call batch({ operations: [op1, op2] })

    Batch->>Batch: group operations by namespace
    Batch->>BTree: fetch tree for namespace A
    Batch->>BTree: insertHandler(op1, treeArg)
    BTree-->>Batch: success
    Batch->>BTree: insertHandler(op2, treeArg)
    BTree-->>Batch: success

    Batch-->>Aggregate: batch applied
    Aggregate->>Buffer: clear queue
    Aggregate-->>Client: flush complete
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

  • Attention areas:
    • src/client/index.ts: correctness of buffering state transitions, concurrency/context assumptions for flush(ctx), and that all read paths reliably call flushBeforeRead.
    • src/component/public.ts: correctness of namespace grouping, sentinel encoding/decoding for undefined namespaces, and preservation of per-operation error semantics and ordering.
    • src/component/btree.ts: behavior when treeArg is provided vs undefined; ensure backward compatibility.
    • src/client/buffer.test.ts and examples: confirm tests and examples exercise both auto-flush and manual-flush code paths under realistic contexts.

Poem

🐰 I queued my hops in rows so neat,

Flush day came and beats did meet,
Trees reused, no extra race,
Batched and tidy — what a place! 🥕

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title "batch writes" directly and clearly summarizes the primary change in the changeset—adding support for batching pending writes to reduce component calls during mutations.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch ian/batch-writes

📜 Recent review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 538ed4a and 2c05f07.

📒 Files selected for processing (1)
  • src/client/buffer.test.ts (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/client/buffer.test.ts

Comment @coderabbitai help to get the list of available commands and usage tips.

@pkg-pr-new
Copy link

pkg-pr-new bot commented Dec 5, 2025

Open in StackBlitz

npm i https://pkg.pr.new/get-convex/aggregate/@convex-dev/aggregate@167

commit: 2c05f07

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (5)
src/client/buffer.test.ts (1)

20-53: Consider test isolation for the aggregate instance.

The aggregate instance is shared across both t.run blocks. If the first test block throws before reaching aggregate.buffer(false), the buffering state remains enabled for subsequent tests. Consider creating a fresh instance per test block or wrapping in try/finally to ensure cleanup.

src/client/index.ts (2)

39-81: Consider stronger typing for BufferedOperation.

Using any for keys, values, and namespaces loses type safety. While this may be intentional to avoid complex generics, it could allow type mismatches that would only be caught at runtime.


101-124: Buffering state is instance-level - document thread-safety expectations.

The isBuffering and operationQueue are instance fields. If the same Aggregate instance is used across concurrent mutations (e.g., multiple requests sharing a singleton), the buffering state could interfere. Consider documenting that buffered aggregates should not be shared across concurrent mutation contexts, or make the state context-scoped.

example/convex/batchedWrites.ts (2)

23-50: Missing return validators on all mutations.

Per coding guidelines, all Convex functions must include return validators. Each mutation in this file returns a value but lacks a returns: property.

Example fix for basicBatchedWrites:

 export const basicBatchedWrites = mutation({
   args: {
     count: v.number(),
   },
+  returns: v.object({
+    inserted: v.number(),
+    total: v.number(),
+  }),
   handler: async (ctx, { count }) => {

Apply similar changes to:

  • batchedWritesWithOnSuccess: returns: v.object({ queued: v.number() })
  • complexBatchedOperations: returns: v.object({ operations: v.object({ inserts: v.number(), deletes: v.number(), updates: v.number() }) })
  • comparePerformance: returns: v.object({ method: v.string(), count: v.number(), durationMs: v.number() })
  • autoFlushOnRead: returns: v.object({ queued: v.number(), totalInRange: v.number() })
  • batchedWritesWithNamespaces: returns: v.object({ operations: v.number(), namespaces: v.number(), message: v.string() })

Also applies to: 82-107, 112-180, 185-226, 231-264, 273-321


40-43: Inconsistent buffer/flush ordering.

Here buffering is disabled before flushing (lines 41, 43), but in complexBatchedOperations (lines 167, 170) the order is reversed (flush first, then disable). For clarity and consistency, consider adopting a uniform pattern—typically flush while still buffered, then disable.

-    // Disable buffering after we're done
-    aggregate.buffer(false);
     // Flush all buffered operations in a single batch
     await aggregate.flush(ctx);
+    // Disable buffering after flush
+    aggregate.buffer(false);
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 75b3a2f and 538ed4a.

⛔ Files ignored due to path filters (2)
  • example/convex/_generated/api.d.ts is excluded by !**/_generated/**
  • src/component/_generated/component.ts is excluded by !**/_generated/**
📒 Files selected for processing (6)
  • example/convex/batchedWrites.ts (1 hunks)
  • example/convex/convex.config.ts (1 hunks)
  • src/client/buffer.test.ts (1 hunks)
  • src/client/index.ts (17 hunks)
  • src/component/btree.ts (2 hunks)
  • src/component/public.ts (1 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
example/convex/**/*.{ts,tsx}

📄 CodeRabbit inference engine (example/.cursor/rules/convex_rules.mdc)

example/convex/**/*.{ts,tsx}: ALWAYS use the new function syntax for Convex functions with query, mutation, action decorators and handler property
Use array validators with v.array() to specify element types in Convex function arguments and schemas
Use v.null() validator when returning or defining fields that can be null values in Convex functions
Use v.int64() instead of deprecated v.bigint() for representing signed 64-bit integers
Use v.record() for defining record types; v.map() and v.set() are not supported in Convex
ALWAYS include argument and return validators for all Convex functions including query, internalQuery, mutation, internalMutation, action, and internalAction; use returns: v.null() if function doesn't return anything
Index fields must be queried in the same order they are defined; create separate indexes if you need different query orders
Use Id<'tableName'> TypeScript helper type imported from './_generated/dataModel' for strict typing of document IDs
Use as const for string literals in discriminated union types in TypeScript
Always define arrays as const array: Array<T> = [...] with explicit type annotation
Always define records as const record: Record<KeyType, ValueType> = {...} with explicit type annotation
Use internalQuery, internalMutation, and internalAction to register private functions that are only callable by other Convex functions
Use query, mutation, and action to register public functions exposed to the API; do NOT use these for sensitive internal functions
Use ctx.runQuery to call a query from a query, mutation, or action
Use ctx.runMutation to call a mutation from a mutation or action
Use ctx.runAction to call an action from another action; otherwise pull out shared code into a helper async function
When using ctx.runQuery, ctx.runMutation, or ctx.runAction to call a function in the same file, specify a type annotation on the return value
Organize files with publi...

Files:

  • example/convex/convex.config.ts
  • example/convex/batchedWrites.ts
🧠 Learnings (17)
📚 Learning: 2025-12-03T18:51:23.434Z
Learnt from: CR
Repo: get-convex/aggregate PR: 0
File: example/.cursor/rules/convex_rules.mdc:0-0
Timestamp: 2025-12-03T18:51:23.434Z
Learning: Applies to example/convex/**/*.{ts,tsx} : Use `ctx.runMutation` to call a mutation from a mutation or action

Applied to files:

  • src/component/public.ts
  • src/client/buffer.test.ts
  • example/convex/batchedWrites.ts
📚 Learning: 2025-12-03T18:51:23.434Z
Learnt from: CR
Repo: get-convex/aggregate PR: 0
File: example/.cursor/rules/convex_rules.mdc:0-0
Timestamp: 2025-12-03T18:51:23.434Z
Learning: Applies to example/convex/**/*.{ts,tsx} : Organize files with public query, mutation, or action functions thoughtfully within the `convex/` directory using file-based routing

Applied to files:

  • example/convex/convex.config.ts
  • src/client/buffer.test.ts
  • example/convex/batchedWrites.ts
📚 Learning: 2025-12-03T18:51:23.434Z
Learnt from: CR
Repo: get-convex/aggregate PR: 0
File: example/.cursor/rules/convex_rules.mdc:0-0
Timestamp: 2025-12-03T18:51:23.434Z
Learning: Applies to example/convex/**/*.{ts,tsx} : Use `internalQuery`, `internalMutation`, and `internalAction` to register private functions that are only callable by other Convex functions

Applied to files:

  • example/convex/convex.config.ts
  • src/client/buffer.test.ts
  • example/convex/batchedWrites.ts
📚 Learning: 2025-12-03T18:51:23.434Z
Learnt from: CR
Repo: get-convex/aggregate PR: 0
File: example/.cursor/rules/convex_rules.mdc:0-0
Timestamp: 2025-12-03T18:51:23.434Z
Learning: Applies to example/convex/**/*.{ts,tsx} : Add `'use node';` to the top of files containing actions that use Node.js built-in modules

Applied to files:

  • example/convex/convex.config.ts
📚 Learning: 2025-12-03T18:51:23.434Z
Learnt from: CR
Repo: get-convex/aggregate PR: 0
File: example/.cursor/rules/convex_rules.mdc:0-0
Timestamp: 2025-12-03T18:51:23.434Z
Learning: Applies to example/convex/**/*.{ts,tsx} : By default Convex returns documents in ascending `_creationTime` order; use `.order('asc')` or `.order('desc')` to specify order

Applied to files:

  • example/convex/convex.config.ts
📚 Learning: 2025-12-03T18:51:23.434Z
Learnt from: CR
Repo: get-convex/aggregate PR: 0
File: example/.cursor/rules/convex_rules.mdc:0-0
Timestamp: 2025-12-03T18:51:23.434Z
Learning: Applies to example/convex/**/*.{ts,tsx} : ALWAYS use the new function syntax for Convex functions with `query`, `mutation`, `action` decorators and `handler` property

Applied to files:

  • example/convex/convex.config.ts
  • example/convex/batchedWrites.ts
📚 Learning: 2025-12-03T18:51:23.434Z
Learnt from: CR
Repo: get-convex/aggregate PR: 0
File: example/.cursor/rules/convex_rules.mdc:0-0
Timestamp: 2025-12-03T18:51:23.434Z
Learning: Applies to example/convex/**/*.{ts,tsx} : Use `query`, `mutation`, and `action` to register public functions exposed to the API; do NOT use these for sensitive internal functions

Applied to files:

  • example/convex/convex.config.ts
  • example/convex/batchedWrites.ts
📚 Learning: 2025-12-03T18:51:23.434Z
Learnt from: CR
Repo: get-convex/aggregate PR: 0
File: example/.cursor/rules/convex_rules.mdc:0-0
Timestamp: 2025-12-03T18:51:23.434Z
Learning: Applies to example/convex/**/crons.{ts,tsx} : Define crons by declaring the top-level `crons` object, calling methods on it, and exporting it as default from `convex/crons.ts` or similar

Applied to files:

  • example/convex/convex.config.ts
📚 Learning: 2025-12-03T18:51:23.434Z
Learnt from: CR
Repo: get-convex/aggregate PR: 0
File: example/.cursor/rules/convex_rules.mdc:0-0
Timestamp: 2025-12-03T18:51:23.434Z
Learning: Applies to example/convex/**/*.{ts,tsx} : Use `ctx.db.patch` to shallow merge updates into an existing document; this method throws an error if the document does not exist

Applied to files:

  • example/convex/convex.config.ts
  • src/client/buffer.test.ts
  • example/convex/batchedWrites.ts
📚 Learning: 2025-12-03T18:51:23.434Z
Learnt from: CR
Repo: get-convex/aggregate PR: 0
File: example/.cursor/rules/convex_rules.mdc:0-0
Timestamp: 2025-12-03T18:51:23.434Z
Learning: Applies to example/convex/**/*.{ts,tsx} : Use `v.record()` for defining record types; `v.map()` and `v.set()` are not supported in Convex

Applied to files:

  • example/convex/convex.config.ts
  • src/client/buffer.test.ts
📚 Learning: 2025-12-03T18:51:23.434Z
Learnt from: CR
Repo: get-convex/aggregate PR: 0
File: example/.cursor/rules/convex_rules.mdc:0-0
Timestamp: 2025-12-03T18:51:23.434Z
Learning: Applies to example/convex/**/*.{ts,tsx} : When using `ctx.runQuery`, `ctx.runMutation`, or `ctx.runAction` to call a function in the same file, specify a type annotation on the return value

Applied to files:

  • src/client/buffer.test.ts
  • example/convex/batchedWrites.ts
📚 Learning: 2025-12-03T18:51:23.434Z
Learnt from: CR
Repo: get-convex/aggregate PR: 0
File: example/.cursor/rules/convex_rules.mdc:0-0
Timestamp: 2025-12-03T18:51:23.434Z
Learning: Applies to example/convex/**/*.{ts,tsx} : Convex queries do NOT support `.delete()`; instead `.collect()` the results and call `ctx.db.delete(row._id)` on each result

Applied to files:

  • src/client/buffer.test.ts
  • src/client/index.ts
  • src/component/btree.ts
  • example/convex/batchedWrites.ts
📚 Learning: 2025-12-03T18:51:23.434Z
Learnt from: CR
Repo: get-convex/aggregate PR: 0
File: example/.cursor/rules/convex_rules.mdc:0-0
Timestamp: 2025-12-03T18:51:23.434Z
Learning: Applies to example/convex/**/*.{ts,tsx} : Use `ctx.runQuery` to call a query from a query, mutation, or action

Applied to files:

  • src/client/buffer.test.ts
  • src/client/index.ts
📚 Learning: 2025-12-03T18:51:23.434Z
Learnt from: CR
Repo: get-convex/aggregate PR: 0
File: example/.cursor/rules/convex_rules.mdc:0-0
Timestamp: 2025-12-03T18:51:23.434Z
Learning: Applies to example/convex/schema.{ts,tsx} : Always define schema in `convex/schema.ts` and import schema definition functions from `convex/server`

Applied to files:

  • src/client/buffer.test.ts
📚 Learning: 2025-12-03T18:51:23.434Z
Learnt from: CR
Repo: get-convex/aggregate PR: 0
File: example/.cursor/rules/convex_rules.mdc:0-0
Timestamp: 2025-12-03T18:51:23.434Z
Learning: Applies to example/convex/**/*.{ts,tsx} : Use `ctx.db.replace` to fully replace an existing document; this method throws an error if the document does not exist

Applied to files:

  • src/client/buffer.test.ts
  • example/convex/batchedWrites.ts
📚 Learning: 2025-12-03T18:51:23.434Z
Learnt from: CR
Repo: get-convex/aggregate PR: 0
File: example/.cursor/rules/convex_rules.mdc:0-0
Timestamp: 2025-12-03T18:51:23.434Z
Learning: Applies to example/convex/**/*.{ts,tsx} : Use `for await (const row of query)` syntax for async iteration; do not use `.collect()` or `.take(n)` on query results

Applied to files:

  • src/client/index.ts
  • example/convex/batchedWrites.ts
📚 Learning: 2025-12-03T18:51:23.434Z
Learnt from: CR
Repo: get-convex/aggregate PR: 0
File: example/.cursor/rules/convex_rules.mdc:0-0
Timestamp: 2025-12-03T18:51:23.434Z
Learning: Applies to example/convex/**/*.{ts,tsx} : Use `paginationOptsValidator` with `numItems` and `cursor` properties; paginated queries return objects with `page`, `isDone`, and `continueCursor` properties

Applied to files:

  • src/client/index.ts
🧬 Code graph analysis (6)
src/component/public.ts (1)
src/component/btree.ts (4)
  • getOrCreateTree (937-973)
  • DEFAULT_MAX_NODE_SIZE (25-25)
  • insertHandler (45-81)
  • deleteHandler (83-113)
example/convex/convex.config.ts (1)
src/component/schema.ts (1)
  • aggregate (25-28)
src/client/buffer.test.ts (3)
src/client/setup.test.ts (2)
  • componentSchema (6-6)
  • componentModules (7-7)
src/component/schema.ts (1)
  • aggregate (25-28)
src/client/index.ts (2)
  • DirectAggregate (749-867)
  • count (160-173)
src/client/index.ts (1)
src/client/positions.ts (1)
  • keyToPosition (67-72)
src/component/btree.ts (1)
example/convex/_generated/dataModel.d.ts (1)
  • Doc (30-33)
example/convex/batchedWrites.ts (2)
src/component/schema.ts (1)
  • aggregate (25-28)
src/client/index.ts (2)
  • DirectAggregate (749-867)
  • count (160-173)
🪛 GitHub Actions: Test and lint
src/client/buffer.test.ts

[error] 26-26: TypeScript error TS2339: Property 'aggregateDirect' does not exist on type '{ aggregate: ComponentApi; }'.

🪛 GitHub Check: Test and lint
src/client/buffer.test.ts

[failure] 26-26:
Property 'aggregateDirect' does not exist on type '{ aggregate: ComponentApi; }'.

🔇 Additional comments (9)
example/convex/convex.config.ts (1)

12-12: LGTM!

The new aggregate registration for batchedWrites follows the established pattern and is correctly placed with other aggregate registrations.

src/client/index.ts (2)

131-140: Operations are discarded if flush fails.

The queue is cleared before the mutation is awaited. If ctx.runMutation throws, the buffered operations are lost and cannot be retried. This may be intentional (prevents double-processing), but the caller has no way to recover.

Consider either:

  1. Documenting this behavior in the method's docstring
  2. Restoring the queue on failure:
   async flush(ctx: RunMutationCtx): Promise<void> {
     if (this.operationQueue.length === 0) {
       return;
     }
     const operations = this.operationQueue;
     this.operationQueue = [];
-    await ctx.runMutation(this.component.public.batch, {
-      operations,
-    });
+    try {
+      await ctx.runMutation(this.component.public.batch, {
+        operations,
+      });
+    } catch (e) {
+      // Restore operations on failure so caller can retry
+      this.operationQueue = [...operations, ...this.operationQueue];
+      throw e;
+    }
   }

164-164: Good coverage of flush-before-read across all query paths.

All read operations (count, countBatch, sum, sumBatch, at, atBatch, indexOf, paginate, paginateNamespaces) correctly call flushBeforeRead. Derived methods like min, max, random, and iter also benefit through their use of these primitives.

src/component/btree.ts (2)

45-57: LGTM!

The optional treeArg parameter cleanly enables tree reuse for batch operations while preserving existing behavior when not provided.


83-95: LGTM!

Consistent pattern with insertHandler for the optional tree injection.

src/component/public.ts (2)

229-240: Operation order not preserved across namespaces.

Grouping by namespace means operations are reordered. For example, [insert ns:A, insert ns:B, delete ns:A] becomes [insert A, delete A] then [insert B]. If this reordering is intentional for performance, consider documenting it. If original order must be preserved, process operations in input order while caching trees.

If order must be preserved:

   handler: async (ctx, { operations }) => {
-    // Group operations by namespace to fetch each tree once
-    const namespaceGroups = new Map<string, typeof operations>();
-    for (const op of operations) {
-      ...
-    }
-
-    // Process each namespace group
-    for (const [namespaceKey, ops] of namespaceGroups.entries()) {
-      ...
-    }
+    // Cache trees by namespace to avoid repeated fetches
+    const treeCache = new Map<string, Doc<"btree">>();
+    
+    for (const op of operations) {
+      const namespace = "namespace" in op ? op.namespace : undefined;
+      const key = namespace === undefined ? "__undefined__" : JSON.stringify(namespace);
+      
+      if (!treeCache.has(key)) {
+        treeCache.set(key, await getOrCreateTree(ctx.db, namespace, DEFAULT_MAX_NODE_SIZE, true));
+      }
+      const tree = treeCache.get(key)!;
+      
+      // Process operation with cached tree...
+    }
   },

336-363: insertIfDoesNotExist replaces existing items rather than no-op.

The comment notes it's "implemented as replaceOrInsert", which means if the key exists, it gets deleted and re-inserted. This is consistent with the non-batch _insertIfDoesNotExist implementation, but the name suggests it should be a no-op if the item exists.

Verify this semantic is intentional. If true "insert if not exists" is needed:

         } else if (op.type === "insertIfDoesNotExist") {
-          // insertIfDoesNotExist is implemented as replaceOrInsert
-          try {
-            await deleteHandler(...);
-          } catch (e) { ... }
-          await insertHandler(...);
+          // True insert-if-not-exists: try insert, ignore duplicate key error
+          try {
+            await insertHandler(
+              ctx,
+              {
+                key: op.key,
+                value: op.value,
+                summand: op.summand,
+                namespace: op.namespace,
+              },
+              tree,
+            );
+          } catch (e) {
+            if (!(e instanceof ConvexError && e.data?.code === "DUPLICATE_KEY")) {
+              throw e;
+            }
+          }
example/convex/batchedWrites.ts (2)

286-290: Good pattern: local aggregate instance.

Creating the DirectAggregate inside the handler avoids shared mutable state issues. This is the recommended approach compared to the module-level instance used elsewhere in this file.


15-18: Move DirectAggregate instantiation into each mutation handler.

Module-level mutable state violates Convex's execution model. The aggregate instance maintains state via this.isBuffering and this.operationQueue, which are not reliably preserved across function invocations. Convex executes mutations in isolated contexts and may re-run or move functions between runtimes, causing module-level state to be lost and breaking transactional guarantees. Instantiate DirectAggregate inside each handler instead, as done in batchedWritesWithNamespaces (lines 286–290), so each invocation has its own isolated state.

Comment on lines +60 to +72
const mutationWithBuffering = customMutation(mutation, {
args: {},
input: async () => {
aggregate.buffer(true);
return {
ctx: {},
args: {},
onSuccess: async ({ ctx }) => {
await aggregate.flush(ctx);
},
};
},
});
Copy link

@coderabbitai coderabbitai bot Dec 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Buffering not disabled on handler error.

If the handler throws before onSuccess runs, aggregate.buffer(true) remains set on the shared instance. Subsequent mutations using this aggregate may unintentionally inherit buffering mode. Consider wrapping in try/finally or resetting state defensively.

 const mutationWithBuffering = customMutation(mutation, {
   args: {},
   input: async () => {
     aggregate.buffer(true);
     return {
       ctx: {},
       args: {},
       onSuccess: async ({ ctx }) => {
         await aggregate.flush(ctx);
+        aggregate.buffer(false);
       },
     };
   },
 });

Note: This still doesn't handle the error case. A more robust approach would be to use a local aggregate instance per invocation or ensure cleanup on error paths.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const mutationWithBuffering = customMutation(mutation, {
args: {},
input: async () => {
aggregate.buffer(true);
return {
ctx: {},
args: {},
onSuccess: async ({ ctx }) => {
await aggregate.flush(ctx);
},
};
},
});
const mutationWithBuffering = customMutation(mutation, {
args: {},
input: async () => {
aggregate.buffer(true);
return {
ctx: {},
args: {},
onSuccess: async ({ ctx }) => {
await aggregate.flush(ctx);
aggregate.buffer(false);
},
};
},
});
🤖 Prompt for AI Agents
In example/convex/batchedWrites.ts around lines 60 to 72, calling
aggregate.buffer(true) in input may leave the shared aggregate stuck in
buffering mode if the handler throws; wrap the buffering lifecycle so it is
always disabled on exit (use try/finally to call aggregate.buffer(false) or
aggregate.flush/cleanup in the finally block), or better yet create and use a
local aggregate instance per invocation so state isn't shared across handlers;
ensure any early-throw paths perform the same cleanup to avoid leaking buffering
state to subsequent mutations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mutations are all run in isolated environments - globals are not shared across them

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ianmacartney Thank you for the clarification! I misunderstood the Convex execution model. If each mutation runs in an isolated environment where globals aren't shared across invocations, then my concern about state leaking between mutations is indeed invalid. The buffering state would be fresh for each mutation call, so there's no risk of one mutation affecting another.

I appreciate you taking the time to explain this!


✏️ Learnings added
Learnt from: ianmacartney
Repo: get-convex/aggregate PR: 167
File: example/convex/batchedWrites.ts:60-72
Timestamp: 2025-12-05T03:45:23.412Z
Learning: In Convex, mutations are run in isolated environments where globals are not shared across mutation invocations. A global variable set in one mutation invocation will not affect subsequent mutation invocations.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.


🧠 Learnings used
Learnt from: CR
Repo: get-convex/aggregate PR: 0
File: example/.cursor/rules/convex_rules.mdc:0-0
Timestamp: 2025-12-03T18:51:23.434Z
Learning: Applies to example/convex/**/*.{ts,tsx} : Use `ctx.runMutation` to call a mutation from a mutation or action

Learnt from: CR
Repo: get-convex/aggregate PR: 0
File: example/.cursor/rules/convex_rules.mdc:0-0
Timestamp: 2025-12-03T18:51:23.434Z
Learning: Applies to example/convex/**/*.{ts,tsx} : Use `for await (const row of query)` syntax for async iteration; do not use `.collect()` or `.take(n)` on query results

Learnt from: CR
Repo: get-convex/aggregate PR: 0
File: example/.cursor/rules/convex_rules.mdc:0-0
Timestamp: 2025-12-03T18:51:23.434Z
Learning: Applies to example/convex/**/*.{ts,tsx} : Convex queries do NOT support `.delete()`; instead `.collect()` the results and call `ctx.db.delete(row._id)` on each result

Learnt from: CR
Repo: get-convex/aggregate PR: 0
File: example/.cursor/rules/convex_rules.mdc:0-0
Timestamp: 2025-12-03T18:51:23.434Z
Learning: Applies to example/convex/**/*.{ts,tsx} : Use `internalQuery`, `internalMutation`, and `internalAction` to register private functions that are only callable by other Convex functions

Learnt from: CR
Repo: get-convex/aggregate PR: 0
File: example/.cursor/rules/convex_rules.mdc:0-0
Timestamp: 2025-12-03T18:51:23.434Z
Learning: Applies to example/convex/**/*.{ts,tsx} : Organize files with public query, mutation, or action functions thoughtfully within the `convex/` directory using file-based routing

Learnt from: CR
Repo: get-convex/aggregate PR: 0
File: example/.cursor/rules/convex_rules.mdc:0-0
Timestamp: 2025-12-03T18:51:23.434Z
Learning: Applies to example/convex/**/*.{ts,tsx} : When using `ctx.runQuery`, `ctx.runMutation`, or `ctx.runAction` to call a function in the same file, specify a type annotation on the return value

Comment on lines +274 to +292
} else if (op.type === "replace") {
await deleteHandler(
ctx,
{
key: op.currentKey,
namespace: op.namespace,
},
tree,
);
await insertHandler(
ctx,
{
key: op.newKey,
value: op.value,
summand: op.summand,
namespace: op.newNamespace,
},
tree,
);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Cross-namespace replace uses wrong tree for insert.

When op.namespace !== op.newNamespace, the delete correctly uses the tree for op.namespace, but the insert also uses the same tree even though it should target op.newNamespace. This will insert into the wrong tree.

The insert needs to use the tree for newNamespace:

         } else if (op.type === "replace") {
           await deleteHandler(
             ctx,
             {
               key: op.currentKey,
               namespace: op.namespace,
             },
             tree,
           );
+          // If newNamespace differs, fetch the correct tree
+          const insertTree = op.newNamespace === op.namespace || op.newNamespace === undefined
+            ? tree
+            : await getOrCreateTree(ctx.db, op.newNamespace, DEFAULT_MAX_NODE_SIZE, true);
           await insertHandler(
             ctx,
             {
               key: op.newKey,
               value: op.value,
               summand: op.summand,
               namespace: op.newNamespace,
             },
-            tree,
+            insertTree,
           );

The same issue affects replaceOrInsert at lines 309-335.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
} else if (op.type === "replace") {
await deleteHandler(
ctx,
{
key: op.currentKey,
namespace: op.namespace,
},
tree,
);
await insertHandler(
ctx,
{
key: op.newKey,
value: op.value,
summand: op.summand,
namespace: op.newNamespace,
},
tree,
);
} else if (op.type === "replace") {
await deleteHandler(
ctx,
{
key: op.currentKey,
namespace: op.namespace,
},
tree,
);
// If newNamespace differs, fetch the correct tree
const insertTree = op.newNamespace === op.namespace || op.newNamespace === undefined
? tree
: await getOrCreateTree(ctx.db, op.newNamespace, DEFAULT_MAX_NODE_SIZE, true);
await insertHandler(
ctx,
{
key: op.newKey,
value: op.value,
summand: op.summand,
namespace: op.newNamespace,
},
insertTree,
);
🤖 Prompt for AI Agents
In src/component/public.ts around lines 274 to 292 (and similarly for the
replaceOrInsert block around lines 309 to 335), the replace branch deletes using
the tree for op.namespace but then calls insertHandler with the same tree even
when op.newNamespace differs; update the insertHandler call to look up and pass
the tree for op.newNamespace (not the original tree) so the new key/value is
inserted into the correct namespace tree, and apply the same fix in the
replaceOrInsert section where an insert into newNamespace must use the tree for
newNamespace.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants