Refactor notifications to go through a separate stream in streaming API (#16765)
Eliminate need to have custom notifications filtering logic in the streaming API code by publishing notifications into a separate stream and then simply using the multi-stream capability to subscribe to that stream when necessary
This commit is contained in:
		
							parent
							
								
									52e5c07948
								
							
						
					
					
						commit
						a0d4129893
					
				| 
						 | 
				
			
			@ -127,7 +127,7 @@ class NotifyService < BaseService
 | 
			
		|||
  def push_notification!
 | 
			
		||||
    return if @notification.activity.nil?
 | 
			
		||||
 | 
			
		||||
    Redis.current.publish("timeline:#{@recipient.id}", Oj.dump(event: :notification, payload: InlineRenderer.render(@notification, @recipient, :notification)))
 | 
			
		||||
    Redis.current.publish("timeline:#{@recipient.id}:notifications", Oj.dump(event: :notification, payload: InlineRenderer.render(@notification, @recipient, :notification)))
 | 
			
		||||
    send_push_notifications!
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -282,6 +282,14 @@ const startWorker = (workerId) => {
 | 
			
		|||
    next();
 | 
			
		||||
  };
 | 
			
		||||
 | 
			
		||||
  /**
 | 
			
		||||
   * @param {any} req
 | 
			
		||||
   * @param {string[]} necessaryScopes
 | 
			
		||||
   * @return {boolean}
 | 
			
		||||
   */
 | 
			
		||||
  const isInScope = (req, necessaryScopes) =>
 | 
			
		||||
    req.scopes.some(scope => necessaryScopes.includes(scope));
 | 
			
		||||
 | 
			
		||||
  /**
 | 
			
		||||
   * @param {string} token
 | 
			
		||||
   * @param {any} req
 | 
			
		||||
| 
						 | 
				
			
			@ -314,7 +322,6 @@ const startWorker = (workerId) => {
 | 
			
		|||
        req.scopes = result.rows[0].scopes.split(' ');
 | 
			
		||||
        req.accountId = result.rows[0].account_id;
 | 
			
		||||
        req.chosenLanguages = result.rows[0].chosen_languages;
 | 
			
		||||
        req.allowNotifications = req.scopes.some(scope => ['read', 'read:notifications'].includes(scope));
 | 
			
		||||
        req.deviceId = result.rows[0].device_id;
 | 
			
		||||
 | 
			
		||||
        resolve();
 | 
			
		||||
| 
						 | 
				
			
			@ -580,14 +587,12 @@ const startWorker = (workerId) => {
 | 
			
		|||
   * @param {function(string, string): void} output
 | 
			
		||||
   * @param {function(string[], function(string): void): void} attachCloseHandler
 | 
			
		||||
   * @param {boolean=} needsFiltering
 | 
			
		||||
   * @param {boolean=} notificationOnly
 | 
			
		||||
   * @return {function(string): void}
 | 
			
		||||
   */
 | 
			
		||||
  const streamFrom = (ids, req, output, attachCloseHandler, needsFiltering = false, notificationOnly = false) => {
 | 
			
		||||
  const streamFrom = (ids, req, output, attachCloseHandler, needsFiltering = false) => {
 | 
			
		||||
    const accountId  = req.accountId || req.remoteAddress;
 | 
			
		||||
    const streamType = notificationOnly ? ' (notification)' : '';
 | 
			
		||||
 | 
			
		||||
    log.verbose(req.requestId, `Starting stream from ${ids.join(', ')} for ${accountId}${streamType}`);
 | 
			
		||||
    log.verbose(req.requestId, `Starting stream from ${ids.join(', ')} for ${accountId}`);
 | 
			
		||||
 | 
			
		||||
    const listener = message => {
 | 
			
		||||
      const json = parseJSON(message);
 | 
			
		||||
| 
						 | 
				
			
			@ -605,14 +610,6 @@ const startWorker = (workerId) => {
 | 
			
		|||
        output(event, encodedPayload);
 | 
			
		||||
      };
 | 
			
		||||
 | 
			
		||||
      if (notificationOnly && event !== 'notification') {
 | 
			
		||||
        return;
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      if (event === 'notification' && !req.allowNotifications) {
 | 
			
		||||
        return;
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      // Only messages that may require filtering are statuses, since notifications
 | 
			
		||||
      // are already personalized and deletes do not matter
 | 
			
		||||
      if (!needsFiltering || event !== 'update') {
 | 
			
		||||
| 
						 | 
				
			
			@ -759,7 +756,7 @@ const startWorker = (workerId) => {
 | 
			
		|||
      const onSend = streamToHttp(req, res);
 | 
			
		||||
      const onEnd  = streamHttpEnd(req, subscriptionHeartbeat(channelIds));
 | 
			
		||||
 | 
			
		||||
      streamFrom(channelIds, req, onSend, onEnd, options.needsFiltering, options.notificationOnly);
 | 
			
		||||
      streamFrom(channelIds, req, onSend, onEnd, options.needsFiltering);
 | 
			
		||||
    }).catch(err => {
 | 
			
		||||
      log.verbose(req.requestId, 'Subscription error:', err.toString());
 | 
			
		||||
      httpNotFound(res);
 | 
			
		||||
| 
						 | 
				
			
			@ -775,74 +772,92 @@ const startWorker = (workerId) => {
 | 
			
		|||
   * @property {string} [only_media]
 | 
			
		||||
   */
 | 
			
		||||
 | 
			
		||||
  /**
 | 
			
		||||
   * @param {any} req
 | 
			
		||||
   * @return {string[]}
 | 
			
		||||
   */
 | 
			
		||||
  const channelsForUserStream = req => {
 | 
			
		||||
    const arr = [`timeline:${req.accountId}`];
 | 
			
		||||
 | 
			
		||||
    if (isInScope(req, ['crypto']) && req.deviceId) {
 | 
			
		||||
      arr.push(`timeline:${req.accountId}:${req.deviceId}`);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    if (isInScope(req, ['read', 'read:notifications'])) {
 | 
			
		||||
      arr.push(`timeline:${req.accountId}:notifications`);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    return arr;
 | 
			
		||||
  };
 | 
			
		||||
 | 
			
		||||
  /**
 | 
			
		||||
   * @param {any} req
 | 
			
		||||
   * @param {string} name
 | 
			
		||||
   * @param {StreamParams} params
 | 
			
		||||
   * @return {Promise.<{ channelIds: string[], options: { needsFiltering: boolean, notificationOnly: boolean } }>}
 | 
			
		||||
   * @return {Promise.<{ channelIds: string[], options: { needsFiltering: boolean } }>}
 | 
			
		||||
   */
 | 
			
		||||
  const channelNameToIds = (req, name, params) => new Promise((resolve, reject) => {
 | 
			
		||||
    switch(name) {
 | 
			
		||||
    case 'user':
 | 
			
		||||
      resolve({
 | 
			
		||||
        channelIds: req.deviceId ? [`timeline:${req.accountId}`, `timeline:${req.accountId}:${req.deviceId}`] : [`timeline:${req.accountId}`],
 | 
			
		||||
        options: { needsFiltering: false, notificationOnly: false },
 | 
			
		||||
        channelIds: channelsForUserStream(req),
 | 
			
		||||
        options: { needsFiltering: false },
 | 
			
		||||
      });
 | 
			
		||||
 | 
			
		||||
      break;
 | 
			
		||||
    case 'user:notification':
 | 
			
		||||
      resolve({
 | 
			
		||||
        channelIds: [`timeline:${req.accountId}`],
 | 
			
		||||
        options: { needsFiltering: false, notificationOnly: true },
 | 
			
		||||
        channelIds: [`timeline:${req.accountId}:notifications`],
 | 
			
		||||
        options: { needsFiltering: false },
 | 
			
		||||
      });
 | 
			
		||||
 | 
			
		||||
      break;
 | 
			
		||||
    case 'public':
 | 
			
		||||
      resolve({
 | 
			
		||||
        channelIds: ['timeline:public'],
 | 
			
		||||
        options: { needsFiltering: true, notificationOnly: false },
 | 
			
		||||
        options: { needsFiltering: true },
 | 
			
		||||
      });
 | 
			
		||||
 | 
			
		||||
      break;
 | 
			
		||||
    case 'public:local':
 | 
			
		||||
      resolve({
 | 
			
		||||
        channelIds: ['timeline:public:local'],
 | 
			
		||||
        options: { needsFiltering: true, notificationOnly: false },
 | 
			
		||||
        options: { needsFiltering: true },
 | 
			
		||||
      });
 | 
			
		||||
 | 
			
		||||
      break;
 | 
			
		||||
    case 'public:remote':
 | 
			
		||||
      resolve({
 | 
			
		||||
        channelIds: ['timeline:public:remote'],
 | 
			
		||||
        options: { needsFiltering: true, notificationOnly: false },
 | 
			
		||||
        options: { needsFiltering: true },
 | 
			
		||||
      });
 | 
			
		||||
 | 
			
		||||
      break;
 | 
			
		||||
    case 'public:media':
 | 
			
		||||
      resolve({
 | 
			
		||||
        channelIds: ['timeline:public:media'],
 | 
			
		||||
        options: { needsFiltering: true, notificationOnly: false },
 | 
			
		||||
        options: { needsFiltering: true },
 | 
			
		||||
      });
 | 
			
		||||
 | 
			
		||||
      break;
 | 
			
		||||
    case 'public:local:media':
 | 
			
		||||
      resolve({
 | 
			
		||||
        channelIds: ['timeline:public:local:media'],
 | 
			
		||||
        options: { needsFiltering: true, notificationOnly: false },
 | 
			
		||||
        options: { needsFiltering: true },
 | 
			
		||||
      });
 | 
			
		||||
 | 
			
		||||
      break;
 | 
			
		||||
    case 'public:remote:media':
 | 
			
		||||
      resolve({
 | 
			
		||||
        channelIds: ['timeline:public:remote:media'],
 | 
			
		||||
        options: { needsFiltering: true, notificationOnly: false },
 | 
			
		||||
        options: { needsFiltering: true },
 | 
			
		||||
      });
 | 
			
		||||
 | 
			
		||||
      break;
 | 
			
		||||
    case 'direct':
 | 
			
		||||
      resolve({
 | 
			
		||||
        channelIds: [`timeline:direct:${req.accountId}`],
 | 
			
		||||
        options: { needsFiltering: false, notificationOnly: false },
 | 
			
		||||
        options: { needsFiltering: false },
 | 
			
		||||
      });
 | 
			
		||||
 | 
			
		||||
      break;
 | 
			
		||||
| 
						 | 
				
			
			@ -852,7 +867,7 @@ const startWorker = (workerId) => {
 | 
			
		|||
      } else {
 | 
			
		||||
        resolve({
 | 
			
		||||
          channelIds: [`timeline:hashtag:${params.tag.toLowerCase()}`],
 | 
			
		||||
          options: { needsFiltering: true, notificationOnly: false },
 | 
			
		||||
          options: { needsFiltering: true },
 | 
			
		||||
        });
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -863,7 +878,7 @@ const startWorker = (workerId) => {
 | 
			
		|||
      } else {
 | 
			
		||||
        resolve({
 | 
			
		||||
          channelIds: [`timeline:hashtag:${params.tag.toLowerCase()}:local`],
 | 
			
		||||
          options: { needsFiltering: true, notificationOnly: false },
 | 
			
		||||
          options: { needsFiltering: true },
 | 
			
		||||
        });
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -872,7 +887,7 @@ const startWorker = (workerId) => {
 | 
			
		|||
      authorizeListAccess(params.list, req).then(() => {
 | 
			
		||||
        resolve({
 | 
			
		||||
          channelIds: [`timeline:list:${params.list}`],
 | 
			
		||||
          options: { needsFiltering: false, notificationOnly: false },
 | 
			
		||||
          options: { needsFiltering: false },
 | 
			
		||||
        });
 | 
			
		||||
      }).catch(() => {
 | 
			
		||||
        reject('Not authorized to stream this list');
 | 
			
		||||
| 
						 | 
				
			
			@ -919,7 +934,7 @@ const startWorker = (workerId) => {
 | 
			
		|||
 | 
			
		||||
      const onSend        = streamToWs(request, socket, streamNameFromChannelName(channelName, params));
 | 
			
		||||
      const stopHeartbeat = subscriptionHeartbeat(channelIds);
 | 
			
		||||
      const listener      = streamFrom(channelIds, request, onSend, undefined, options.needsFiltering, options.notificationOnly);
 | 
			
		||||
      const listener      = streamFrom(channelIds, request, onSend, undefined, options.needsFiltering);
 | 
			
		||||
 | 
			
		||||
      subscriptions[channelIds.join(';')] = {
 | 
			
		||||
        listener,
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue