Merge branch 'fix/redis-mutex' into 'develop'

fix: release redis locks

See merge request firefish/firefish!10586

Co-authored-by: Namekuji <nmkj@waah.day>
This commit is contained in:
Kainoa Kanter 2023-09-02 19:22:13 +00:00
commit 6691939ccf
7 changed files with 30 additions and 10 deletions

View File

@ -12,29 +12,38 @@ const retryDelay = 100;
* @param timeout Lock timeout (ms), The timeout releases previous lock.
* @returns Unlock function
*/
export async function getApLock(uri: string, timeout = 30 * 1000) {
export async function getApLock(
uri: string,
timeout = 30 * 1000,
): Promise<Mutex> {
const lock = new Mutex(redisClient, `ap-object:${uri}`, {
lockTimeout: timeout,
retryInterval: retryDelay,
});
await lock.acquire();
return lock;
}
export async function getFetchInstanceMetadataLock(
host: string,
timeout = 30 * 1000,
) {
): Promise<Mutex> {
const lock = new Mutex(redisClient, `instance:${host}`, {
lockTimeout: timeout,
retryInterval: retryDelay,
});
await lock.acquire();
return lock;
}
export async function getChartInsertLock(lockKey: string, timeout = 30 * 1000) {
export async function getChartInsertLock(
lockKey: string,
timeout = 30 * 1000,
): Promise<Mutex> {
const lock = new Mutex(redisClient, `chart-insert:${lockKey}`, {
lockTimeout: timeout,
retryInterval: retryDelay,
});
await lock.acquire();
return lock;
}

View File

@ -32,6 +32,8 @@ export default async function (
// Interrupt if you block the announcement destination
if (await shouldBlockInstance(extractDbHost(uri))) return;
const lock = await getApLock(uri);
try {
// Check if something with the same URI is already registered
const exist = await fetchNote(uri);
@ -78,6 +80,6 @@ export default async function (
uri,
});
} finally {
await getApLock(uri);
await lock.release();
}
}

View File

@ -31,6 +31,8 @@ export default async function (
}
}
const lock = await getApLock(uri);
try {
const exist = await fetchNote(note);
if (exist) return "skip: note exists";
@ -44,6 +46,6 @@ export default async function (
throw e;
}
} finally {
await getApLock(uri);
await lock.release();
}
}

View File

@ -13,6 +13,8 @@ export default async function (
): Promise<string> {
logger.info(`Deleting the Note: ${uri}`);
const lock = await getApLock(uri);
try {
const dbResolver = new DbResolver();
const note = await dbResolver.getNoteFromApId(uri);
@ -37,6 +39,6 @@ export default async function (
await deleteNode(actor, note);
return "ok: note deleted";
} finally {
await getApLock(uri);
await lock.release();
}
}

View File

@ -415,6 +415,8 @@ export async function resolveNote(
`host ${extractDbHost(uri)} is blocked`,
);
const lock = await getApLock(uri);
try {
//#region Returns if already registered with this server
const exist = await fetchNote(uri);
@ -437,7 +439,7 @@ export async function resolveNote(
// Since the attached Note Object may be disguised, always specify the uri and fetch it from the server.
return await createNote(uri, resolver, true);
} finally {
await getApLock(uri);
await lock.release();
}
}

View File

@ -430,6 +430,7 @@ export default abstract class Chart<T extends Schema> {
? `${this.name}:${date}:${span}:${group}`
: `${this.name}:${date}:${span}`;
const lock = await getChartInsertLock(lockKey);
try {
// ロック内でもう1回チェックする
const currentLog = (await repository.findOneBy({
@ -465,7 +466,7 @@ export default abstract class Chart<T extends Schema> {
return log;
} finally {
await getChartInsertLock(lockKey);
await lock.release();
}
}

View File

@ -15,6 +15,8 @@ export async function fetchInstanceMetadata(
instance: Instance,
force = false,
): Promise<void> {
const lock = await getFetchInstanceMetadataLock(instance.host);
if (!force) {
const _instance = await Instances.findOneBy({ host: instance.host });
const now = Date.now();
@ -22,7 +24,7 @@ export async function fetchInstanceMetadata(
_instance?.infoUpdatedAt &&
now - _instance.infoUpdatedAt.getTime() < 1000 * 60 * 60 * 24
) {
await getFetchInstanceMetadataLock(instance.host);
await lock.release();
return;
}
}
@ -78,7 +80,7 @@ export async function fetchInstanceMetadata(
} catch (e) {
logger.error(`Failed to update metadata of ${instance.host}: ${e}`);
} finally {
await getFetchInstanceMetadataLock(instance.host);
await lock.release();
}
}