diff --git a/src/queue/processors/db/delete-post-dependents.ts b/src/queue/processors/db/delete-post-dependents.ts index 6de21eb05..fb6617e95 100644 --- a/src/queue/processors/db/delete-post-dependents.ts +++ b/src/queue/processors/db/delete-post-dependents.ts @@ -5,7 +5,7 @@ import PostReaction from '../../../models/post-reaction'; import PostWatching from '../../../models/post-watching'; import Post from '../../../models/post'; -export default async ({ data }) => Promise.all([ +export default ({ data }, done) => Promise.all([ Favorite.remove({ postId: data._id }), Notification.remove({ postId: data._id }), PollVote.remove({ postId: data._id }), @@ -19,4 +19,4 @@ export default async ({ data }) => Promise.all([ }), Post.remove({ repostId: data._id }) ])) -]); +]).then(() => done(), done); diff --git a/src/queue/processors/db/index.ts b/src/queue/processors/db/index.ts index 75838c099..468ec442a 100644 --- a/src/queue/processors/db/index.ts +++ b/src/queue/processors/db/index.ts @@ -4,4 +4,4 @@ const handlers = { deletePostDependents }; -export default (job, done) => handlers[job.data.type](job).then(() => done(), done); +export default (job, done) => handlers[job.data.type](job, done); diff --git a/src/queue/processors/http/deliver-post.ts b/src/queue/processors/http/deliver-post.ts index e743fc5f6..8107c8bf7 100644 --- a/src/queue/processors/http/deliver-post.ts +++ b/src/queue/processors/http/deliver-post.ts @@ -5,17 +5,23 @@ import renderCreate from '../../../remote/activitypub/renderer/create'; import renderNote from '../../../remote/activitypub/renderer/note'; import request from '../../../remote/request'; -export default async ({ data }) => { - const promisedTo = User.findOne({ _id: data.toId }) as Promise; - const [from, post] = await Promise.all([ - User.findOne({ _id: data.fromId }), - Post.findOne({ _id: data.postId }) - ]); - const note = await renderNote(from, post); - const to = await promisedTo; - const create = renderCreate(note); +export default async ({ data }, done) => { + try { + const promisedTo = User.findOne({ _id: data.toId }) as Promise; + const [from, post] = await Promise.all([ + User.findOne({ _id: data.fromId }), + Post.findOne({ _id: data.postId }) + ]); + const note = await renderNote(from, post); + const to = await promisedTo; + const create = renderCreate(note); - create['@context'] = context; + create['@context'] = context; - return request(from, to.account.inbox, create); + await request(from, to.account.inbox, create); + } catch (error) { + done(error); + } + + done(); }; diff --git a/src/queue/processors/http/follow.ts b/src/queue/processors/http/follow.ts index 4cb72828e..ba1cc3118 100644 --- a/src/queue/processors/http/follow.ts +++ b/src/queue/processors/http/follow.ts @@ -7,10 +7,8 @@ import notify from '../../../publishers/notify'; import context from '../../../remote/activitypub/renderer/context'; import render from '../../../remote/activitypub/renderer/follow'; import request from '../../../remote/request'; -import Logger from '../../../utils/logger'; -export default async ({ data }) => { - const { followerId, followeeId } = await Following.findOne({ _id: data.following }); +export default ({ data }, done) => Following.findOne({ _id: data.following }).then(async ({ followerId, followeeId }) => { const [follower, followee] = await Promise.all([ User.findOne({ _id: followerId }), User.findOne({ _id: followeeId }) @@ -23,47 +21,46 @@ export default async ({ data }) => { await request(follower, followee.account.inbox, rendered); } - try { - await Promise.all([ - // Increment following count - User.update(followerId, { - $inc: { - followingCount: 1 - } - }), + return [follower, followee]; +}).then(([follower, followee]) => Promise.all([ + // Increment following count + User.update(follower._id, { + $inc: { + followingCount: 1 + } + }), - FollowingLog.insert({ - createdAt: data.following.createdAt, - userId: followerId, - count: follower.followingCount + 1 - }), + FollowingLog.insert({ + createdAt: data.following.createdAt, + userId: follower._id, + count: follower.followingCount + 1 + }), - // Increment followers count - User.update({ _id: followeeId }, { - $inc: { - followersCount: 1 - } - }), + // Increment followers count + User.update({ _id: followee._id }, { + $inc: { + followersCount: 1 + } + }), - FollowedLog.insert({ - createdAt: data.following.createdAt, - userId: followerId, - count: followee.followersCount + 1 - }), + FollowedLog.insert({ + createdAt: data.following.createdAt, + userId: follower._id, + count: followee.followersCount + 1 + }), - // Publish follow event - isLocalUser(follower) && packUser(followee, follower) - .then(packed => event(follower._id, 'follow', packed)), + // Publish follow event + isLocalUser(follower) && packUser(followee, follower) + .then(packed => event(follower._id, 'follow', packed)), - isLocalUser(followee) && Promise.all([ - packUser(follower, followee) - .then(packed => event(followee._id, 'followed', packed)), + isLocalUser(followee) && Promise.all([ + packUser(follower, followee) + .then(packed => event(followee._id, 'followed', packed)), - // Notify - isLocalUser(followee) && notify(followeeId, followerId, 'follow') - ]) - ]); - } catch (error) { - Logger.error(error.toString()); - } -}; + // Notify + isLocalUser(followee) && notify(followee._id, follower._id, 'follow') + ]) +]).then(() => done(), error => { + done(); + throw error; +}), done); diff --git a/src/queue/processors/http/index.ts b/src/queue/processors/http/index.ts index 8f9aa717c..0ea79305c 100644 --- a/src/queue/processors/http/index.ts +++ b/src/queue/processors/http/index.ts @@ -14,4 +14,4 @@ const handlers = { unfollow }; -export default (job, done) => handlers[job.data.type](job).then(() => done(), done); +export default (job, done) => handlers[job.data.type](job, done); diff --git a/src/queue/processors/http/perform-activitypub.ts b/src/queue/processors/http/perform-activitypub.ts index 7b84400d5..ae70c0f0b 100644 --- a/src/queue/processors/http/perform-activitypub.ts +++ b/src/queue/processors/http/perform-activitypub.ts @@ -2,6 +2,7 @@ import User from '../../../models/user'; import act from '../../../remote/activitypub/act'; import Resolver from '../../../remote/activitypub/resolver'; -export default ({ data }) => User.findOne({ _id: data.actor }) +export default ({ data }, done) => User.findOne({ _id: data.actor }) .then(actor => act(new Resolver(), actor, data.outbox)) - .then(Promise.all); + .then(Promise.all) + .then(() => done(), done); diff --git a/src/queue/processors/http/process-inbox.ts b/src/queue/processors/http/process-inbox.ts index de1dbd2f9..88fbb9737 100644 --- a/src/queue/processors/http/process-inbox.ts +++ b/src/queue/processors/http/process-inbox.ts @@ -5,35 +5,40 @@ import act from '../../../remote/activitypub/act'; import resolvePerson from '../../../remote/activitypub/resolve-person'; import Resolver from '../../../remote/activitypub/resolver'; -export default async ({ data }): Promise => { - const keyIdLower = data.signature.keyId.toLowerCase(); - let user; +export default async ({ data }, done) => { + try { + const keyIdLower = data.signature.keyId.toLowerCase(); + let user; - if (keyIdLower.startsWith('acct:')) { - const { username, host } = parseAcct(keyIdLower.slice('acct:'.length)); - if (host === null) { - throw 'request was made by local user'; + if (keyIdLower.startsWith('acct:')) { + const { username, host } = parseAcct(keyIdLower.slice('acct:'.length)); + if (host === null) { + done(); + return; + } + + user = await User.findOne({ usernameLower: username, hostLower: host }) as IRemoteUser; + } else { + user = await User.findOne({ + host: { $ne: null }, + 'account.publicKey.id': data.signature.keyId + }) as IRemoteUser; + + if (user === null) { + user = await resolvePerson(data.signature.keyId); + } } - user = await User.findOne({ usernameLower: username, hostLower: host }) as IRemoteUser; - } else { - user = await User.findOne({ - host: { $ne: null }, - 'account.publicKey.id': data.signature.keyId - }) as IRemoteUser; - - if (user === null) { - user = await resolvePerson(data.signature.keyId); + if (user === null || !verifySignature(data.signature, user.account.publicKey.publicKeyPem)) { + done(); + return; } + + await Promise.all(await act(new Resolver(), user, data.inbox, true)); + } catch (error) { + done(error); + return; } - if (user === null) { - throw 'failed to resolve user'; - } - - if (!verifySignature(data.signature, user.account.publicKey.publicKeyPem)) { - throw 'signature verification failed'; - } - - await Promise.all(await act(new Resolver(), user, data.inbox, true)); + done(); }; diff --git a/src/queue/processors/http/report-github-failure.ts b/src/queue/processors/http/report-github-failure.ts index 21683ba3c..af9659bda 100644 --- a/src/queue/processors/http/report-github-failure.ts +++ b/src/queue/processors/http/report-github-failure.ts @@ -2,23 +2,30 @@ import * as request from 'request-promise-native'; import User from '../../../models/user'; const createPost = require('../../../server/api/endpoints/posts/create'); -export default async ({ data }) => { - const asyncBot = User.findOne({ _id: data.userId }); +export default async ({ data }, done) => { + try { + const asyncBot = User.findOne({ _id: data.userId }); - // Fetch parent status - const parentStatuses = await request({ - url: `${data.parentUrl}/statuses`, - headers: { - 'User-Agent': 'misskey' - }, - json: true - }); + // Fetch parent status + const parentStatuses = await request({ + url: `${data.parentUrl}/statuses`, + headers: { + 'User-Agent': 'misskey' + }, + json: true + }); - const parentState = parentStatuses[0].state; - const stillFailed = parentState == 'failure' || parentState == 'error'; - const text = stillFailed ? - `**⚠️BUILD STILL FAILED⚠️**: ?[${data.message}](${data.htmlUrl})` : - `**🚨BUILD FAILED🚨**: →→→?[${data.message}](${data.htmlUrl})←←←`; + const parentState = parentStatuses[0].state; + const stillFailed = parentState == 'failure' || parentState == 'error'; + const text = stillFailed ? + `**⚠️BUILD STILL FAILED⚠️**: ?[${data.message}](${data.htmlUrl})` : + `**🚨BUILD FAILED🚨**: →→→?[${data.message}](${data.htmlUrl})←←←`; - createPost({ text }, await asyncBot); + createPost({ text }, await asyncBot); + } catch (error) { + done(error); + return; + } + + done(); }; diff --git a/src/queue/processors/http/unfollow.ts b/src/queue/processors/http/unfollow.ts index 801a3612a..dc50e946c 100644 --- a/src/queue/processors/http/unfollow.ts +++ b/src/queue/processors/http/unfollow.ts @@ -7,24 +7,31 @@ import renderFollow from '../../../remote/activitypub/renderer/follow'; import renderUndo from '../../../remote/activitypub/renderer/undo'; import context from '../../../remote/activitypub/renderer/context'; import request from '../../../remote/request'; -import Logger from '../../../utils/logger'; -export default async ({ data }) => { +export default async ({ data }, done) => { const following = await Following.findOne({ _id: data.id }); if (following === null) { + done(); return; } - const [follower, followee] = await Promise.all([ - User.findOne({ _id: following.followerId }), - User.findOne({ _id: following.followeeId }) - ]); + let follower, followee; - if (isLocalUser(follower) && isRemoteUser(followee)) { - const undo = renderUndo(renderFollow(follower, followee)); - undo['@context'] = context; + try { + [follower, followee] = await Promise.all([ + User.findOne({ _id: following.followerId }), + User.findOne({ _id: following.followeeId }) + ]); - await request(follower, followee.account.inbox, undo); + if (isLocalUser(follower) && isRemoteUser(followee)) { + const undo = renderUndo(renderFollow(follower, followee)); + undo['@context'] = context; + + await request(follower, followee.account.inbox, undo); + } + } catch (error) { + done(error); + return; } try { @@ -57,7 +64,7 @@ export default async ({ data }) => { // Publish follow event stream(follower._id, 'unfollow', promisedPackedUser); - } catch (error) { - Logger.error(error.toString()); + } finally { + done(); } };