Merged in feat/redis-fix (pull request #3207)
Feat/redis fix * feat(redis): delete multiple keys in one partition scan * fix(BOOK-603): make it possible to do multiple deletes in redis at once using one partition scan Approved-by: Linus Flood
This commit is contained in:
@@ -1,15 +1,20 @@
|
||||
import * as Sentry from "@sentry/bun";
|
||||
import { Elysia, t } from "elysia";
|
||||
|
||||
import { ModelValidationError } from "@/errors/ModelValidationError";
|
||||
import { redis } from "@/services/redis";
|
||||
import { queueDelete } from "@/services/redis/queueDelete";
|
||||
import { queueDelete, queueDeleteMultiple } from "@/services/redis/queueDelete";
|
||||
import { loggerModule } from "@/utils/logger";
|
||||
import { truncate } from "@/utils/truncate";
|
||||
import { validateKey } from "@/utils/validateKey";
|
||||
|
||||
const MIN_LENGTH = 1;
|
||||
|
||||
const QUERY_TYPE = t.Object({ key: t.String({ minLength: MIN_LENGTH }) });
|
||||
const DELETEMULTIPLE_BODY_TYPE = t.Object({
|
||||
keys: t.Array(t.String({ minLength: MIN_LENGTH })),
|
||||
fuzzy: t.Optional(t.Boolean({ default: false })),
|
||||
});
|
||||
|
||||
const cacheRouteLogger = loggerModule("cacheRoute");
|
||||
export const cacheRoutes = new Elysia({ prefix: "/cache" })
|
||||
.get(
|
||||
@@ -68,6 +73,53 @@ export const cacheRoutes = new Elysia({ prefix: "/cache" })
|
||||
response: { 204: t.Undefined(), 400: t.String() },
|
||||
},
|
||||
)
|
||||
.delete(
|
||||
"/multiple",
|
||||
async ({ body: { keys, fuzzy = false } }) => {
|
||||
const validatedKeys = keys.map(validateKey);
|
||||
|
||||
cacheRouteLogger.debug(
|
||||
`DELETE /multiple keys=${validatedKeys.join(",")} ${fuzzy ? "(fuzzy)" : ""}`,
|
||||
);
|
||||
|
||||
// 1. Fuzzy deletes → Single SCAN pass
|
||||
if (fuzzy) {
|
||||
const patterns = validatedKeys.map((k) => `*${k}*`);
|
||||
|
||||
await queueDeleteMultiple({ patterns });
|
||||
return { status: "queued" };
|
||||
}
|
||||
|
||||
// 2. Exact deletes → Batch unlink
|
||||
const now = performance.now();
|
||||
|
||||
// Use UNLINK for async deletes
|
||||
const deletedKeys = await redis.unlink(...validatedKeys);
|
||||
|
||||
const elapsed = performance.now() - now;
|
||||
|
||||
cacheRouteLogger.info(
|
||||
`Deleted ${deletedKeys} keys in ${elapsed}ms`,
|
||||
{
|
||||
deletedKeys,
|
||||
keys: validatedKeys,
|
||||
elapsed,
|
||||
},
|
||||
);
|
||||
|
||||
return { deletedKeys };
|
||||
},
|
||||
{
|
||||
body: DELETEMULTIPLE_BODY_TYPE,
|
||||
response: {
|
||||
200: t.Union([
|
||||
t.Object({ deletedKeys: t.Number() }),
|
||||
t.Object({ status: t.Literal("queued") }),
|
||||
]),
|
||||
400: t.String(),
|
||||
},
|
||||
},
|
||||
)
|
||||
.delete(
|
||||
"/",
|
||||
async ({ query: { key, fuzzy } }) => {
|
||||
@@ -105,19 +157,3 @@ export const cacheRoutes = new Elysia({ prefix: "/cache" })
|
||||
},
|
||||
},
|
||||
);
|
||||
|
||||
function validateKey(key: string) {
|
||||
const parsedKey = decodeURIComponent(key);
|
||||
|
||||
if (parsedKey.length < MIN_LENGTH) {
|
||||
throw new ModelValidationError(
|
||||
"Key has to be at least 1 character long",
|
||||
);
|
||||
}
|
||||
|
||||
if (parsedKey.includes("*")) {
|
||||
throw new ModelValidationError("Key cannot contain wildcards");
|
||||
}
|
||||
|
||||
return parsedKey;
|
||||
}
|
||||
|
||||
@@ -11,30 +11,52 @@ import { bullmqredis, redis } from ".";
|
||||
const DELETE_JOB = "deleteQueueJob";
|
||||
const deleteQueueLogger = loggerModule("deleteQueue");
|
||||
|
||||
const deleteQueueSchema = z.object({
|
||||
const deleteSingleSchema = z.object({
|
||||
pattern: z.string().min(1, "Pattern must be at least 1 character long"),
|
||||
});
|
||||
|
||||
const deleteMultipleSchema = z.object({
|
||||
patterns: z
|
||||
.array(z.string().min(1, "Pattern must be at least 1 character long"))
|
||||
.min(1, "At least one pattern is required"),
|
||||
});
|
||||
|
||||
const deleteQueue = new Queue(DELETE_JOB, { connection: bullmqredis });
|
||||
const worker = new Worker(
|
||||
DELETE_JOB,
|
||||
async (job) => {
|
||||
let patterns: string[] = [];
|
||||
|
||||
// 1. Normalize job input into patterns[]
|
||||
if (job.name === "delete") {
|
||||
const { pattern } = deleteQueueSchema.parse(job.data);
|
||||
deleteQueueLogger.info(
|
||||
`Job: ${job.id} processing. With pattern: ${pattern}`,
|
||||
{ pattern, jobId: job.id },
|
||||
);
|
||||
|
||||
const now = performance.now();
|
||||
const deletedCount = await deleteWithPattern(pattern);
|
||||
const elapsed = performance.now() - now;
|
||||
|
||||
deleteQueueLogger.info(
|
||||
`Job: ${job.id} completed. Deleted ${deletedCount} keys for pattern '${pattern}' in ${elapsed.toFixed(2)}ms.`,
|
||||
{ deletedCount, pattern, elapsed, jobId: job.id },
|
||||
);
|
||||
const { pattern } = deleteSingleSchema.parse(job.data);
|
||||
patterns = [pattern];
|
||||
}
|
||||
|
||||
if (job.name === "deleteMultiple") {
|
||||
const { patterns: parsedPatterns } = deleteMultipleSchema.parse(
|
||||
job.data,
|
||||
);
|
||||
patterns = parsedPatterns;
|
||||
}
|
||||
|
||||
if (!patterns.length) {
|
||||
throw new Error(`Unknown job name or invalid data: ${job.name}`);
|
||||
}
|
||||
|
||||
deleteQueueLogger.info(`Job: ${job.id} processing.`, {
|
||||
patterns,
|
||||
jobId: job.id,
|
||||
});
|
||||
|
||||
const now = performance.now();
|
||||
const deletedCount = await deleteWithPatterns(patterns);
|
||||
const elapsed = performance.now() - now;
|
||||
|
||||
deleteQueueLogger.info(
|
||||
`Job: ${job.id} completed. Deleted ${deletedCount} keys in ${elapsed.toFixed(2)}ms`,
|
||||
{ deletedCount, patterns, elapsed, jobId: job.id },
|
||||
);
|
||||
},
|
||||
{ connection: bullmqredis },
|
||||
);
|
||||
@@ -43,7 +65,7 @@ worker.on("failed", (job, error) => {
|
||||
deleteQueueLogger.error(`Job failed: ${job?.id} with ${error.message}`, {
|
||||
error,
|
||||
jobId: job?.id,
|
||||
pattern: job?.data?.pattern,
|
||||
patterns: job?.data?.patterns,
|
||||
});
|
||||
|
||||
sentry.captureException(error);
|
||||
@@ -53,33 +75,68 @@ export async function queueDelete({ pattern }: { pattern: string }) {
|
||||
deleteQueue.add("delete", { pattern });
|
||||
}
|
||||
|
||||
async function deleteWithPattern(pattern: string) {
|
||||
export async function queueDeleteMultiple({
|
||||
patterns,
|
||||
}: {
|
||||
patterns: string[];
|
||||
}) {
|
||||
deleteQueue.add("deleteMultiple", { patterns });
|
||||
}
|
||||
|
||||
async function deleteWithPatterns(patterns: string[]) {
|
||||
let cursor = "0";
|
||||
const SCAN_SIZE = env.DELETE_BATCH_SIZE;
|
||||
|
||||
let totalDeleteCount = 0;
|
||||
|
||||
do {
|
||||
const [newCursor, foundKeys] = await redis.scan(
|
||||
const [newCursor, keys] = await redis.scan(
|
||||
cursor,
|
||||
"MATCH",
|
||||
pattern,
|
||||
"*",
|
||||
"COUNT",
|
||||
SCAN_SIZE,
|
||||
);
|
||||
|
||||
cursor = newCursor;
|
||||
if (foundKeys.length === 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const deleteCount = await redis.unlink(foundKeys);
|
||||
if (!keys.length) continue;
|
||||
|
||||
totalDeleteCount += deleteCount;
|
||||
const matchedKeys = keys.filter((key) =>
|
||||
patterns.some((pattern) => matchKey(key, pattern)),
|
||||
);
|
||||
|
||||
if (!matchedKeys.length) continue;
|
||||
|
||||
const deleted = await redis.unlink(...matchedKeys);
|
||||
totalDeleteCount += deleted;
|
||||
|
||||
// Rate limiting to avoid overwhelming the Redis server
|
||||
await timeout(100);
|
||||
} while (cursor !== "0");
|
||||
|
||||
return totalDeleteCount;
|
||||
}
|
||||
|
||||
function matchKey(key: string, pattern: string): boolean {
|
||||
const startsWithWildcard = pattern.startsWith("*");
|
||||
const endsWithWildcard = pattern.endsWith("*");
|
||||
|
||||
const cleanPattern = pattern.replace(/^\*|\*$/g, ""); // remove outer *
|
||||
|
||||
if (!startsWithWildcard && !endsWithWildcard) {
|
||||
return key === pattern;
|
||||
}
|
||||
|
||||
if (startsWithWildcard && endsWithWildcard) {
|
||||
return key.includes(cleanPattern);
|
||||
}
|
||||
|
||||
if (startsWithWildcard) {
|
||||
return key.endsWith(cleanPattern);
|
||||
}
|
||||
|
||||
if (endsWithWildcard) {
|
||||
return key.startsWith(cleanPattern);
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
17
apps/redis-api/src/utils/validateKey.ts
Normal file
17
apps/redis-api/src/utils/validateKey.ts
Normal file
@@ -0,0 +1,17 @@
|
||||
import { ModelValidationError } from "@/errors/ModelValidationError";
|
||||
const MIN_LENGTH = 1;
|
||||
export function validateKey(key: string) {
|
||||
const parsedKey = decodeURIComponent(key);
|
||||
|
||||
if (parsedKey.length < MIN_LENGTH) {
|
||||
throw new ModelValidationError(
|
||||
"Key has to be at least 1 character long",
|
||||
);
|
||||
}
|
||||
|
||||
if (parsedKey.includes("*")) {
|
||||
throw new ModelValidationError("Key cannot contain wildcards");
|
||||
}
|
||||
|
||||
return parsedKey;
|
||||
}
|
||||
@@ -97,44 +97,43 @@ export async function POST(request: NextRequest) {
|
||||
|
||||
const cacheClient = await getCacheClient()
|
||||
|
||||
const keysToDelete = []
|
||||
|
||||
const contentTypeUidTag = generateTag(entryLocale, content_type.uid)
|
||||
revalidateLogger.debug(
|
||||
`Revalidating tag by content_type_uid: ${contentTypeUidTag}`
|
||||
)
|
||||
revalidateTag(contentTypeUidTag)
|
||||
await cacheClient.deleteKey(contentTypeUidTag, { fuzzy: true })
|
||||
keysToDelete.push(contentTypeUidTag)
|
||||
|
||||
revalidateLogger.debug(`Revalidating refsTag: ${refsTag}`)
|
||||
revalidateTag(refsTag)
|
||||
await cacheClient.deleteKey(refsTag, { fuzzy: true })
|
||||
keysToDelete.push(refsTag)
|
||||
|
||||
revalidateLogger.debug(`Revalidating refTag: ${refTag}`)
|
||||
revalidateTag(refTag)
|
||||
await cacheClient.deleteKey(refTag, { fuzzy: true })
|
||||
keysToDelete.push(refTag)
|
||||
|
||||
revalidateLogger.debug(`Revalidating tag: ${tag}`)
|
||||
revalidateTag(tag)
|
||||
await cacheClient.deleteKey(tag, { fuzzy: true })
|
||||
|
||||
keysToDelete.push(tag)
|
||||
revalidateLogger.debug(
|
||||
`Revalidating language switcher tag: ${languageSwitcherTag}`
|
||||
)
|
||||
revalidateTag(languageSwitcherTag)
|
||||
await cacheClient.deleteKey(languageSwitcherTag, { fuzzy: true })
|
||||
keysToDelete.push(languageSwitcherTag)
|
||||
|
||||
revalidateLogger.debug(`Revalidating metadataTag: ${metadataTag}`)
|
||||
revalidateTag(metadataTag)
|
||||
await cacheClient.deleteKey(metadataTag, { fuzzy: true })
|
||||
|
||||
keysToDelete.push(metadataTag)
|
||||
revalidateLogger.debug(`Revalidating contentEntryTag: ${contentEntryTag}`)
|
||||
revalidateTag(contentEntryTag)
|
||||
await cacheClient.deleteKey(contentEntryTag, { fuzzy: true })
|
||||
keysToDelete.push(contentEntryTag)
|
||||
|
||||
if (entry.url) {
|
||||
const resolveEntryTag = resolveEntryCacheKey(entryLocale, entry.url)
|
||||
revalidateLogger.debug(`Revalidating url: ${resolveEntryTag}`)
|
||||
|
||||
await cacheClient.deleteKey(resolveEntryTag, { fuzzy: true })
|
||||
keysToDelete.push(resolveEntryTag)
|
||||
}
|
||||
|
||||
if (entry.breadcrumbs) {
|
||||
@@ -153,11 +152,11 @@ export async function POST(request: NextRequest) {
|
||||
`Revalidating breadcrumbsRefsTag: ${breadcrumbsRefsTag}`
|
||||
)
|
||||
revalidateTag(breadcrumbsRefsTag)
|
||||
await cacheClient.deleteKey(breadcrumbsRefsTag, { fuzzy: true })
|
||||
keysToDelete.push(breadcrumbsRefsTag)
|
||||
|
||||
revalidateLogger.debug(`Revalidating breadcrumbsTag: ${breadcrumbsTag}`)
|
||||
revalidateTag(breadcrumbsTag)
|
||||
await cacheClient.deleteKey(breadcrumbsTag, { fuzzy: true })
|
||||
keysToDelete.push(breadcrumbsTag)
|
||||
}
|
||||
|
||||
if (entry.page_settings) {
|
||||
@@ -169,7 +168,7 @@ export async function POST(request: NextRequest) {
|
||||
|
||||
revalidateLogger.debug(`Revalidating pageSettingsTag: ${pageSettingsTag}`)
|
||||
revalidateTag(pageSettingsTag)
|
||||
await cacheClient.deleteKey(pageSettingsTag, { fuzzy: true })
|
||||
keysToDelete.push(pageSettingsTag)
|
||||
}
|
||||
|
||||
if (content_type.uid === "destination_city_page") {
|
||||
@@ -180,10 +179,12 @@ export async function POST(request: NextRequest) {
|
||||
"city_list_data",
|
||||
cityIdentifier
|
||||
)
|
||||
await cacheClient.deleteKey(cityPageTag, { fuzzy: true })
|
||||
keysToDelete.push(cityPageTag)
|
||||
}
|
||||
}
|
||||
|
||||
await cacheClient.deleteKeys(keysToDelete, { fuzzy: true })
|
||||
|
||||
return Response.json({ revalidated: true, now: Date.now() })
|
||||
} catch (error) {
|
||||
revalidateLogger.error("Failed to revalidate tag(s)", error)
|
||||
|
||||
@@ -79,7 +79,11 @@ export function parseBookingWidgetSearchParams(
|
||||
|
||||
return result
|
||||
} catch (error) {
|
||||
logger.error("[URL] Error parsing search params for booking widget:", error)
|
||||
logger.error(
|
||||
"[URL] Error parsing search params for booking widget:",
|
||||
error,
|
||||
searchParams
|
||||
)
|
||||
return {}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -99,4 +99,12 @@ export type DataCache = {
|
||||
* @returns
|
||||
*/
|
||||
deleteKey: (key: string, opts?: { fuzzy?: boolean }) => Promise<void>
|
||||
|
||||
/**
|
||||
* Deletes a key from the cache
|
||||
* @param keys CacheKeys to delete
|
||||
* @param fuzzy If true, does a wildcard delete. *key*
|
||||
* @returns
|
||||
*/
|
||||
deleteKeys: (keys: string[], opts?: { fuzzy?: boolean }) => Promise<void>
|
||||
}
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import { cacheOrGet } from "./cacheOrGet"
|
||||
import { deleteKey } from "./deleteKey"
|
||||
import { deleteKeys } from "./deleteKeys"
|
||||
import { get } from "./get"
|
||||
import { set } from "./set"
|
||||
|
||||
@@ -12,5 +13,6 @@ export async function createDistributedCache(): Promise<DataCache> {
|
||||
set,
|
||||
cacheOrGet,
|
||||
deleteKey,
|
||||
deleteKeys,
|
||||
} satisfies DataCache
|
||||
}
|
||||
|
||||
45
packages/common/dataCache/DistributedCache/deleteKeys.ts
Normal file
45
packages/common/dataCache/DistributedCache/deleteKeys.ts
Normal file
@@ -0,0 +1,45 @@
|
||||
import * as Sentry from "@sentry/nextjs"
|
||||
|
||||
import { env } from "../../env/server"
|
||||
import { safeTry } from "../../utils/safeTry"
|
||||
import { cacheLogger } from "../logger"
|
||||
import { getDeleteMultipleKeysEndpoint } from "./endpoints"
|
||||
|
||||
const API_KEY = env.REDIS_API_KEY ?? ""
|
||||
export async function deleteKeys(keys: string[], opts?: { fuzzy?: boolean }) {
|
||||
const perf = performance.now()
|
||||
const endpoint = getDeleteMultipleKeysEndpoint()
|
||||
|
||||
const [response, error] = await safeTry(
|
||||
fetch(endpoint, {
|
||||
method: "DELETE",
|
||||
cache: "no-cache",
|
||||
headers: {
|
||||
"x-api-key": API_KEY,
|
||||
},
|
||||
body: JSON.stringify({ keys, fuzzy: opts?.fuzzy ?? false }),
|
||||
signal: AbortSignal.timeout(10_000),
|
||||
})
|
||||
)
|
||||
|
||||
if (!response || !response.ok || error) {
|
||||
if (response?.status !== 404) {
|
||||
Sentry.captureException(
|
||||
error ?? new Error("Unable to DELETE cachekeys"),
|
||||
{
|
||||
extra: {
|
||||
cacheKeys: keys,
|
||||
statusCode: response?.status,
|
||||
statusText: response?.statusText,
|
||||
},
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
return undefined
|
||||
}
|
||||
|
||||
cacheLogger.debug(
|
||||
`Deleted '${keys.join(", ")}' took ${(performance.now() - perf).toFixed(2)}ms`
|
||||
)
|
||||
}
|
||||
@@ -10,3 +10,13 @@ export function getCacheEndpoint(key: string) {
|
||||
|
||||
return url
|
||||
}
|
||||
|
||||
export function getDeleteMultipleKeysEndpoint() {
|
||||
if (!env.REDIS_API_HOST) {
|
||||
throw new Error("REDIS_API_HOST is not set")
|
||||
}
|
||||
|
||||
const url = new URL(`/api/cache/multiple`, env.REDIS_API_HOST)
|
||||
|
||||
return url
|
||||
}
|
||||
|
||||
@@ -0,0 +1,18 @@
|
||||
import { cacheLogger } from "../../logger"
|
||||
import { cacheMap } from "./cacheMap"
|
||||
|
||||
export async function deleteKeys(keys: string[], opts?: { fuzzy?: boolean }) {
|
||||
cacheLogger.debug("Deleting keys", keys)
|
||||
keys.forEach((key) => {
|
||||
if (opts?.fuzzy) {
|
||||
cacheMap.forEach((_, k) => {
|
||||
if (k.includes(key)) {
|
||||
cacheMap.delete(k)
|
||||
}
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
cacheMap.delete(key)
|
||||
})
|
||||
}
|
||||
@@ -1,10 +1,11 @@
|
||||
import { cacheOrGet } from "./cacheOrGet"
|
||||
import { deleteKey } from "./deleteKey"
|
||||
import { deleteKeys } from "./deleteKeys"
|
||||
import { get } from "./get"
|
||||
import { set } from "./set"
|
||||
|
||||
import type { DataCache } from "../../Cache"
|
||||
|
||||
export async function createInMemoryCache(): Promise<DataCache> {
|
||||
return { type: "in-memory", cacheOrGet, deleteKey, get, set }
|
||||
return { type: "in-memory", cacheOrGet, deleteKey, get, set, deleteKeys }
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user