diff --git a/apps/redis-api/package.json b/apps/redis-api/package.json index 07a31e414..17194cd03 100644 --- a/apps/redis-api/package.json +++ b/apps/redis-api/package.json @@ -13,6 +13,7 @@ "@sentry/bun": "^9.15.0", "@sentry/tracing": "^7.120.3", "@t3-oss/env-core": "^0.13.0", + "bullmq": "^5.54.3", "elysia": "^1.3.0", "ioredis": "^5.6.1", "pino": "^9.6.0" diff --git a/apps/redis-api/src/routes/api/cache.ts b/apps/redis-api/src/routes/api/cache.ts index bf8f4b004..9b8feb898 100644 --- a/apps/redis-api/src/routes/api/cache.ts +++ b/apps/redis-api/src/routes/api/cache.ts @@ -4,9 +4,8 @@ import { Elysia, t } from "elysia"; import { ModelValidationError } from "@/errors/ModelValidationError"; import { redis } from "@/services/redis"; import { loggerModule } from "@/utils/logger"; -import { timeout } from "@/utils/timeout"; import { truncate } from "@/utils/truncate"; -import { env } from "@/env"; +import { queueDelete } from "@/services/redis/queueDelete"; const MIN_LENGTH = 1; @@ -73,19 +72,21 @@ export const cacheRoutes = new Elysia({ prefix: "/cache" }) "/", async ({ query: { key, fuzzy } }) => { key = validateKey(key); + cacheRouteLogger.debug( + `DELETE /cache ${key} ${fuzzy ? "fuzzy" : ""}`, + ); - const keyToDelete = fuzzy ? `*${key}*` : key; - cacheRouteLogger.debug(`DELETE /cache ${keyToDelete}`); - + if (fuzzy) { + await queueDelete({ pattern: `*${key}*` }); + return { status: "queued" }; + } const now = performance.now(); - const deletedKeys: number = fuzzy - ? await deleteWithPattern(keyToDelete) - : await redis.del(keyToDelete); + const deletedKeys = await redis.unlink(key); const elapsed = performance.now() - now; cacheRouteLogger.info( - `Deleted ${deletedKeys} keys for '${keyToDelete}' in ${elapsed}ms`, - { fuzzy, deletedKeys, keyToDelete, elapsed }, + `Deleted ${deletedKeys} keys for '${key}' in ${elapsed}ms`, + { fuzzy, deletedKeys, key, elapsed }, ); return { deletedKeys }; @@ -96,7 +97,10 @@ export const cacheRoutes = new Elysia({ prefix: "/cache" }) ...t.Object({ fuzzy: t.Optional(t.Boolean()) }).properties, }), response: { - 200: t.Object({ deletedKeys: t.Number() }), + 200: t.Union([ + t.Object({ deletedKeys: t.Number() }), + t.Object({ status: t.Literal("queued") }), + ]), 400: t.String(), }, }, @@ -117,32 +121,3 @@ function validateKey(key: string) { return parsedKey; } - -async function deleteWithPattern(pattern: string) { - let cursor = "0"; - const SCAN_SIZE = env.DELETE_BATCH_SIZE; - - let totalDeleteCount = 0; - - do { - const [newCursor, foundKeys] = await redis.scan( - cursor, - "MATCH", - pattern, - "COUNT", - SCAN_SIZE, - ); - - cursor = newCursor; - if (foundKeys.length === 0) { - continue; - } - - const deleteCount = await redis.del(foundKeys); - - cacheRouteLogger.debug(`Deleted ${deleteCount} keys in this batch.`); - totalDeleteCount += deleteCount; - } while (cursor !== "0"); - - return totalDeleteCount; -} diff --git a/apps/redis-api/src/services/redis.ts b/apps/redis-api/src/services/redis/index.ts similarity index 54% rename from apps/redis-api/src/services/redis.ts rename to apps/redis-api/src/services/redis/index.ts index 54f7b280e..76edaa790 100644 --- a/apps/redis-api/src/services/redis.ts +++ b/apps/redis-api/src/services/redis/index.ts @@ -2,7 +2,7 @@ import ioredis from "ioredis"; import { env, redisConfig } from "@/env"; -const redis = new ioredis({ +export const redis = new ioredis({ host: redisConfig.host, port: redisConfig.port, username: redisConfig.username, @@ -18,4 +18,15 @@ const redis = new ioredis({ socketTimeout: 5_000, }); -export { redis }; +export const bullmqredis = new ioredis({ + host: redisConfig.host, + port: redisConfig.port, + username: redisConfig.username, + password: redisConfig.password, + maxRetriesPerRequest: null, // Avoid excessive retries, + tls: !env.IS_DEV + ? { + rejectUnauthorized: true, + } + : undefined, +}); diff --git a/apps/redis-api/src/services/redis/queueDelete.ts b/apps/redis-api/src/services/redis/queueDelete.ts new file mode 100644 index 000000000..d611b8542 --- /dev/null +++ b/apps/redis-api/src/services/redis/queueDelete.ts @@ -0,0 +1,84 @@ +import { redis, bullmqredis } from "."; +import z from "zod"; +import { loggerModule } from "@/utils/logger"; + +import { Worker, Queue } from "bullmq"; +import { timeout } from "@/utils/timeout"; +import { env } from "@/env"; +import { sentry } from "@/server/sentry.server.config"; + +const DELETE_JOB = "deleteQueueJob"; +const deleteQueueLogger = loggerModule("deleteQueue"); + +const deleteQueueSchema = z.object({ + pattern: z.string().min(1, "Pattern must be at least 1 character long"), +}); + +const deleteQueue = new Queue(DELETE_JOB, { connection: bullmqredis }); +const worker = new Worker( + DELETE_JOB, + async (job) => { + if (job.name === "delete") { + const { pattern } = deleteQueueSchema.parse(job.data); + deleteQueueLogger.info( + `Job: ${job.id} processing. With pattern: ${pattern}`, + { pattern, jobId: job.id }, + ); + + const now = performance.now(); + const deletedCount = await deleteWithPattern(pattern); + const elapsed = performance.now() - now; + + deleteQueueLogger.info( + `Job: ${job.id} completed. Deleted ${deletedCount} keys for pattern '${pattern}' in ${elapsed.toFixed(2)}ms.`, + { deletedCount, pattern, elapsed, jobId: job.id }, + ); + } + }, + { connection: bullmqredis }, +); + +worker.on("failed", (job, error) => { + deleteQueueLogger.error(`Job failed: ${job?.id} with ${error.message}`, { + error, + jobId: job?.id, + pattern: job?.data?.pattern, + }); + + sentry.captureException(error); +}); + +export async function queueDelete({ pattern }: { pattern: string }) { + deleteQueue.add("delete", { pattern }); +} + +async function deleteWithPattern(pattern: string) { + let cursor = "0"; + const SCAN_SIZE = env.DELETE_BATCH_SIZE; + + let totalDeleteCount = 0; + + do { + const [newCursor, foundKeys] = await redis.scan( + cursor, + "MATCH", + pattern, + "COUNT", + SCAN_SIZE, + ); + + cursor = newCursor; + if (foundKeys.length === 0) { + continue; + } + + const deleteCount = await redis.unlink(foundKeys); + + totalDeleteCount += deleteCount; + + // Rate limiting to avoid overwhelming the Redis server + await timeout(100); + } while (cursor !== "0"); + + return totalDeleteCount; +} diff --git a/yarn.lock b/yarn.lock index 90fdb98de..f0cf7cf46 100644 --- a/yarn.lock +++ b/yarn.lock @@ -3352,6 +3352,48 @@ __metadata: languageName: node linkType: hard +"@msgpackr-extract/msgpackr-extract-darwin-arm64@npm:3.0.3": + version: 3.0.3 + resolution: "@msgpackr-extract/msgpackr-extract-darwin-arm64@npm:3.0.3" + conditions: os=darwin & cpu=arm64 + languageName: node + linkType: hard + +"@msgpackr-extract/msgpackr-extract-darwin-x64@npm:3.0.3": + version: 3.0.3 + resolution: "@msgpackr-extract/msgpackr-extract-darwin-x64@npm:3.0.3" + conditions: os=darwin & cpu=x64 + languageName: node + linkType: hard + +"@msgpackr-extract/msgpackr-extract-linux-arm64@npm:3.0.3": + version: 3.0.3 + resolution: "@msgpackr-extract/msgpackr-extract-linux-arm64@npm:3.0.3" + conditions: os=linux & cpu=arm64 + languageName: node + linkType: hard + +"@msgpackr-extract/msgpackr-extract-linux-arm@npm:3.0.3": + version: 3.0.3 + resolution: "@msgpackr-extract/msgpackr-extract-linux-arm@npm:3.0.3" + conditions: os=linux & cpu=arm + languageName: node + linkType: hard + +"@msgpackr-extract/msgpackr-extract-linux-x64@npm:3.0.3": + version: 3.0.3 + resolution: "@msgpackr-extract/msgpackr-extract-linux-x64@npm:3.0.3" + conditions: os=linux & cpu=x64 + languageName: node + linkType: hard + +"@msgpackr-extract/msgpackr-extract-win32-x64@npm:3.0.3": + version: 3.0.3 + resolution: "@msgpackr-extract/msgpackr-extract-win32-x64@npm:3.0.3" + conditions: os=win32 & cpu=x64 + languageName: node + linkType: hard + "@netlify/blobs@npm:^8.1.0": version: 8.1.1 resolution: "@netlify/blobs@npm:8.1.1" @@ -7041,6 +7083,7 @@ __metadata: "@types/bun": "npm:latest" "@typescript-eslint/eslint-plugin": "npm:^8.32.0" "@typescript-eslint/parser": "npm:^8.32.0" + bullmq: "npm:^5.54.3" elysia: "npm:^1.3.0" eslint: "npm:^9" eslint-plugin-simple-import-sort: "npm:^10.0.0" @@ -11046,6 +11089,21 @@ __metadata: languageName: node linkType: hard +"bullmq@npm:^5.54.3": + version: 5.54.3 + resolution: "bullmq@npm:5.54.3" + dependencies: + cron-parser: "npm:^4.9.0" + ioredis: "npm:^5.4.1" + msgpackr: "npm:^1.11.2" + node-abort-controller: "npm:^3.1.1" + semver: "npm:^7.5.4" + tslib: "npm:^2.0.0" + uuid: "npm:^9.0.0" + checksum: 10c0/efb5ba7868e80d5b656396ea54bbe12557c0e18ce7fbcb1f19af07960839625cd24946e2b68f33b55ae4302e80d9b456653b5fbc53b7e993e793fdfba0fa7cf4 + languageName: node + linkType: hard + "bun-types@npm:1.2.5": version: 1.2.5 resolution: "bun-types@npm:1.2.5" @@ -11965,6 +12023,15 @@ __metadata: languageName: node linkType: hard +"cron-parser@npm:^4.9.0": + version: 4.9.0 + resolution: "cron-parser@npm:4.9.0" + dependencies: + luxon: "npm:^3.2.1" + checksum: 10c0/348622bdcd1a15695b61fc33af8a60133e5913a85cf99f6344367579e7002896514ba3b0a9d6bb569b02667d6b06836722bf2295fcd101b3de378f71d37bed0b + languageName: node + linkType: hard + "cross-fetch@npm:3.1.5": version: 3.1.5 resolution: "cross-fetch@npm:3.1.5" @@ -12490,6 +12557,13 @@ __metadata: languageName: node linkType: hard +"detect-libc@npm:^2.0.1": + version: 2.0.4 + resolution: "detect-libc@npm:2.0.4" + checksum: 10c0/c15541f836eba4b1f521e4eecc28eefefdbc10a94d3b8cb4c507689f332cc111babb95deda66f2de050b22122113189986d5190be97d51b5a2b23b938415e67c + languageName: node + linkType: hard + "detect-libc@npm:^2.0.3": version: 2.0.3 resolution: "detect-libc@npm:2.0.3" @@ -15279,7 +15353,7 @@ __metadata: languageName: node linkType: hard -"ioredis@npm:^5.6.1": +"ioredis@npm:^5.4.1, ioredis@npm:^5.6.1": version: 5.6.1 resolution: "ioredis@npm:5.6.1" dependencies: @@ -17429,6 +17503,13 @@ __metadata: languageName: node linkType: hard +"luxon@npm:^3.2.1": + version: 3.6.1 + resolution: "luxon@npm:3.6.1" + checksum: 10c0/906d57a9dc4d1de9383f2e9223e378c298607c1b4d17b6657b836a3cd120feb1c1de3b5d06d846a3417e1ca764de8476e8c23b3cd4083b5cdb870adcb06a99d5 + languageName: node + linkType: hard + "lws-basic-auth@npm:^2.0.0": version: 2.0.0 resolution: "lws-basic-auth@npm:2.0.0" @@ -18141,6 +18222,49 @@ __metadata: languageName: node linkType: hard +"msgpackr-extract@npm:^3.0.2": + version: 3.0.3 + resolution: "msgpackr-extract@npm:3.0.3" + dependencies: + "@msgpackr-extract/msgpackr-extract-darwin-arm64": "npm:3.0.3" + "@msgpackr-extract/msgpackr-extract-darwin-x64": "npm:3.0.3" + "@msgpackr-extract/msgpackr-extract-linux-arm": "npm:3.0.3" + "@msgpackr-extract/msgpackr-extract-linux-arm64": "npm:3.0.3" + "@msgpackr-extract/msgpackr-extract-linux-x64": "npm:3.0.3" + "@msgpackr-extract/msgpackr-extract-win32-x64": "npm:3.0.3" + node-gyp: "npm:latest" + node-gyp-build-optional-packages: "npm:5.2.2" + dependenciesMeta: + "@msgpackr-extract/msgpackr-extract-darwin-arm64": + optional: true + "@msgpackr-extract/msgpackr-extract-darwin-x64": + optional: true + "@msgpackr-extract/msgpackr-extract-linux-arm": + optional: true + "@msgpackr-extract/msgpackr-extract-linux-arm64": + optional: true + "@msgpackr-extract/msgpackr-extract-linux-x64": + optional: true + "@msgpackr-extract/msgpackr-extract-win32-x64": + optional: true + bin: + download-msgpackr-prebuilds: bin/download-prebuilds.js + checksum: 10c0/e504fd8bf86a29d7527c83776530ee6dc92dcb0273bb3679fd4a85173efead7f0ee32fb82c8410a13c33ef32828c45f81118ffc0fbed5d6842e72299894623b4 + languageName: node + linkType: hard + +"msgpackr@npm:^1.11.2": + version: 1.11.4 + resolution: "msgpackr@npm:1.11.4" + dependencies: + msgpackr-extract: "npm:^3.0.2" + dependenciesMeta: + msgpackr-extract: + optional: true + checksum: 10c0/171f6e15b628e91969cbb715c076e218886dc505fdac9ce31aa9e8641877cb5cf52d89fe0ca2930520711b1bbc9f792e10d0a9fc08806ad5d543c50abfab322c + languageName: node + linkType: hard + "muggle-string@npm:^0.4.1": version: 0.4.1 resolution: "muggle-string@npm:0.4.1" @@ -18396,6 +18520,13 @@ __metadata: languageName: node linkType: hard +"node-abort-controller@npm:^3.1.1": + version: 3.1.1 + resolution: "node-abort-controller@npm:3.1.1" + checksum: 10c0/f7ad0e7a8e33809d4f3a0d1d65036a711c39e9d23e0319d80ebe076b9a3b4432b4d6b86a7fab65521de3f6872ffed36fc35d1327487c48eb88c517803403eda3 + languageName: node + linkType: hard + "node-addon-api@npm:^7.0.0": version: 7.1.1 resolution: "node-addon-api@npm:7.1.1" @@ -18433,6 +18564,19 @@ __metadata: languageName: node linkType: hard +"node-gyp-build-optional-packages@npm:5.2.2": + version: 5.2.2 + resolution: "node-gyp-build-optional-packages@npm:5.2.2" + dependencies: + detect-libc: "npm:^2.0.1" + bin: + node-gyp-build-optional-packages: bin.js + node-gyp-build-optional-packages-optional: optional.js + node-gyp-build-optional-packages-test: build-test.js + checksum: 10c0/c81128c6f91873381be178c5eddcbdf66a148a6a89a427ce2bcd457593ce69baf2a8662b6d22cac092d24aa9c43c230dec4e69b3a0da604503f4777cd77e282b + languageName: node + linkType: hard + "node-gyp@npm:latest": version: 11.1.0 resolution: "node-gyp@npm:11.1.0"