Merged in feature/redis-queue-deletes (pull request #2397)

Feature/redis queue deletes

* feat: add queuing for deletes

* merge

* .

* .

* .


Approved-by: Linus Flood
This commit is contained in:
Joakim Jäderberg
2025-06-19 09:20:38 +00:00
parent 3af994b0a9
commit 105c4d9cf3
5 changed files with 258 additions and 43 deletions

View File

@@ -13,6 +13,7 @@
"@sentry/bun": "^9.15.0",
"@sentry/tracing": "^7.120.3",
"@t3-oss/env-core": "^0.13.0",
"bullmq": "^5.54.3",
"elysia": "^1.3.0",
"ioredis": "^5.6.1",
"pino": "^9.6.0"

View File

@@ -4,9 +4,8 @@ import { Elysia, t } from "elysia";
import { ModelValidationError } from "@/errors/ModelValidationError";
import { redis } from "@/services/redis";
import { loggerModule } from "@/utils/logger";
import { timeout } from "@/utils/timeout";
import { truncate } from "@/utils/truncate";
import { env } from "@/env";
import { queueDelete } from "@/services/redis/queueDelete";
const MIN_LENGTH = 1;
@@ -73,19 +72,21 @@ export const cacheRoutes = new Elysia({ prefix: "/cache" })
"/",
async ({ query: { key, fuzzy } }) => {
key = validateKey(key);
cacheRouteLogger.debug(
`DELETE /cache ${key} ${fuzzy ? "fuzzy" : ""}`,
);
const keyToDelete = fuzzy ? `*${key}*` : key;
cacheRouteLogger.debug(`DELETE /cache ${keyToDelete}`);
if (fuzzy) {
await queueDelete({ pattern: `*${key}*` });
return { status: "queued" };
}
const now = performance.now();
const deletedKeys: number = fuzzy
? await deleteWithPattern(keyToDelete)
: await redis.del(keyToDelete);
const deletedKeys = await redis.unlink(key);
const elapsed = performance.now() - now;
cacheRouteLogger.info(
`Deleted ${deletedKeys} keys for '${keyToDelete}' in ${elapsed}ms`,
{ fuzzy, deletedKeys, keyToDelete, elapsed },
`Deleted ${deletedKeys} keys for '${key}' in ${elapsed}ms`,
{ fuzzy, deletedKeys, key, elapsed },
);
return { deletedKeys };
@@ -96,7 +97,10 @@ export const cacheRoutes = new Elysia({ prefix: "/cache" })
...t.Object({ fuzzy: t.Optional(t.Boolean()) }).properties,
}),
response: {
200: t.Object({ deletedKeys: t.Number() }),
200: t.Union([
t.Object({ deletedKeys: t.Number() }),
t.Object({ status: t.Literal("queued") }),
]),
400: t.String(),
},
},
@@ -117,32 +121,3 @@ function validateKey(key: string) {
return parsedKey;
}
async function deleteWithPattern(pattern: string) {
let cursor = "0";
const SCAN_SIZE = env.DELETE_BATCH_SIZE;
let totalDeleteCount = 0;
do {
const [newCursor, foundKeys] = await redis.scan(
cursor,
"MATCH",
pattern,
"COUNT",
SCAN_SIZE,
);
cursor = newCursor;
if (foundKeys.length === 0) {
continue;
}
const deleteCount = await redis.del(foundKeys);
cacheRouteLogger.debug(`Deleted ${deleteCount} keys in this batch.`);
totalDeleteCount += deleteCount;
} while (cursor !== "0");
return totalDeleteCount;
}

View File

@@ -2,7 +2,7 @@ import ioredis from "ioredis";
import { env, redisConfig } from "@/env";
const redis = new ioredis({
export const redis = new ioredis({
host: redisConfig.host,
port: redisConfig.port,
username: redisConfig.username,
@@ -18,4 +18,15 @@ const redis = new ioredis({
socketTimeout: 5_000,
});
export { redis };
export const bullmqredis = new ioredis({
host: redisConfig.host,
port: redisConfig.port,
username: redisConfig.username,
password: redisConfig.password,
maxRetriesPerRequest: null, // Avoid excessive retries,
tls: !env.IS_DEV
? {
rejectUnauthorized: true,
}
: undefined,
});

View File

@@ -0,0 +1,84 @@
import { redis, bullmqredis } from ".";
import z from "zod";
import { loggerModule } from "@/utils/logger";
import { Worker, Queue } from "bullmq";
import { timeout } from "@/utils/timeout";
import { env } from "@/env";
import { sentry } from "@/server/sentry.server.config";
const DELETE_JOB = "deleteQueueJob";
const deleteQueueLogger = loggerModule("deleteQueue");
const deleteQueueSchema = z.object({
pattern: z.string().min(1, "Pattern must be at least 1 character long"),
});
const deleteQueue = new Queue(DELETE_JOB, { connection: bullmqredis });
const worker = new Worker(
DELETE_JOB,
async (job) => {
if (job.name === "delete") {
const { pattern } = deleteQueueSchema.parse(job.data);
deleteQueueLogger.info(
`Job: ${job.id} processing. With pattern: ${pattern}`,
{ pattern, jobId: job.id },
);
const now = performance.now();
const deletedCount = await deleteWithPattern(pattern);
const elapsed = performance.now() - now;
deleteQueueLogger.info(
`Job: ${job.id} completed. Deleted ${deletedCount} keys for pattern '${pattern}' in ${elapsed.toFixed(2)}ms.`,
{ deletedCount, pattern, elapsed, jobId: job.id },
);
}
},
{ connection: bullmqredis },
);
worker.on("failed", (job, error) => {
deleteQueueLogger.error(`Job failed: ${job?.id} with ${error.message}`, {
error,
jobId: job?.id,
pattern: job?.data?.pattern,
});
sentry.captureException(error);
});
export async function queueDelete({ pattern }: { pattern: string }) {
deleteQueue.add("delete", { pattern });
}
async function deleteWithPattern(pattern: string) {
let cursor = "0";
const SCAN_SIZE = env.DELETE_BATCH_SIZE;
let totalDeleteCount = 0;
do {
const [newCursor, foundKeys] = await redis.scan(
cursor,
"MATCH",
pattern,
"COUNT",
SCAN_SIZE,
);
cursor = newCursor;
if (foundKeys.length === 0) {
continue;
}
const deleteCount = await redis.unlink(foundKeys);
totalDeleteCount += deleteCount;
// Rate limiting to avoid overwhelming the Redis server
await timeout(100);
} while (cursor !== "0");
return totalDeleteCount;
}