Convert the streaming server to ESM (#29389)
Co-authored-by: Claire <claire.github-309c@sitedethib.com>
This commit is contained in:
		
							parent
							
								
									bc4c5ed918
								
							
						
					
					
						commit
						036f5a05e3
					
				| 
						 | 
				
			
			@ -1,4 +1,8 @@
 | 
			
		|||
/* eslint-disable import/no-commonjs */
 | 
			
		||||
 | 
			
		||||
// @ts-check
 | 
			
		||||
 | 
			
		||||
// @ts-ignore - This needs to be a CJS file (eslint does not yet support ESM configs), and TS is complaining we use require
 | 
			
		||||
const { defineConfig } = require('eslint-define-config');
 | 
			
		||||
 | 
			
		||||
module.exports = defineConfig({
 | 
			
		||||
| 
						 | 
				
			
			@ -22,22 +26,18 @@ module.exports = defineConfig({
 | 
			
		|||
    // to maintain.
 | 
			
		||||
    'no-delete-var': 'off',
 | 
			
		||||
 | 
			
		||||
    // The streaming server is written in commonjs, not ESM for now:
 | 
			
		||||
    'import/no-commonjs': 'off',
 | 
			
		||||
 | 
			
		||||
    // This overrides the base configuration for this rule to pick up
 | 
			
		||||
    // dependencies for the streaming server from the correct package.json file.
 | 
			
		||||
    'import/no-extraneous-dependencies': [
 | 
			
		||||
      'error',
 | 
			
		||||
      {
 | 
			
		||||
        devDependencies: [
 | 
			
		||||
          'streaming/.eslintrc.js',
 | 
			
		||||
        ],
 | 
			
		||||
        devDependencies: ['streaming/.eslintrc.cjs'],
 | 
			
		||||
        optionalDependencies: false,
 | 
			
		||||
        peerDependencies: false,
 | 
			
		||||
        includeTypes: true,
 | 
			
		||||
        packageDir: __dirname,
 | 
			
		||||
      },
 | 
			
		||||
    ],
 | 
			
		||||
    'import/extensions': ['error', 'always'],
 | 
			
		||||
  },
 | 
			
		||||
});
 | 
			
		||||
| 
						 | 
				
			
			@ -5,15 +5,14 @@
 | 
			
		|||
 * override it in let statements.
 | 
			
		||||
 * @type {string}
 | 
			
		||||
 */
 | 
			
		||||
const UNEXPECTED_ERROR_MESSAGE = 'An unexpected error occurred';
 | 
			
		||||
exports.UNKNOWN_ERROR_MESSAGE = UNEXPECTED_ERROR_MESSAGE;
 | 
			
		||||
export const UNEXPECTED_ERROR_MESSAGE = 'An unexpected error occurred';
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * Extracts the status and message properties from the error object, if
 | 
			
		||||
 * available for public use. The `unknown` is for catch statements
 | 
			
		||||
 * @param {Error | AuthenticationError | RequestError | unknown} err
 | 
			
		||||
 */
 | 
			
		||||
exports.extractStatusAndMessage = function(err) {
 | 
			
		||||
export function extractStatusAndMessage(err) {
 | 
			
		||||
  let statusCode = 500;
 | 
			
		||||
  let errorMessage = UNEXPECTED_ERROR_MESSAGE;
 | 
			
		||||
  if (err instanceof AuthenticationError || err instanceof RequestError) {
 | 
			
		||||
| 
						 | 
				
			
			@ -22,9 +21,9 @@ exports.extractStatusAndMessage = function(err) {
 | 
			
		|||
  }
 | 
			
		||||
 | 
			
		||||
  return { statusCode, errorMessage };
 | 
			
		||||
};
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
class RequestError extends Error {
 | 
			
		||||
export class RequestError extends Error {
 | 
			
		||||
  /**
 | 
			
		||||
   * @param {string} message
 | 
			
		||||
   */
 | 
			
		||||
| 
						 | 
				
			
			@ -35,9 +34,7 @@ class RequestError extends Error {
 | 
			
		|||
  }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
exports.RequestError = RequestError;
 | 
			
		||||
 | 
			
		||||
class AuthenticationError extends Error {
 | 
			
		||||
export class AuthenticationError extends Error {
 | 
			
		||||
  /**
 | 
			
		||||
   * @param {string} message
 | 
			
		||||
   */
 | 
			
		||||
| 
						 | 
				
			
			@ -47,5 +44,3 @@ class AuthenticationError extends Error {
 | 
			
		|||
    this.status = 401;
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
exports.AuthenticationError = AuthenticationError;
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,32 +1,36 @@
 | 
			
		|||
// @ts-check
 | 
			
		||||
 | 
			
		||||
const fs = require('fs');
 | 
			
		||||
const http = require('http');
 | 
			
		||||
const path = require('path');
 | 
			
		||||
const url = require('url');
 | 
			
		||||
import fs from 'node:fs';
 | 
			
		||||
import http from 'node:http';
 | 
			
		||||
import path from 'node:path';
 | 
			
		||||
import url from 'node:url';
 | 
			
		||||
 | 
			
		||||
const cors = require('cors');
 | 
			
		||||
const dotenv = require('dotenv');
 | 
			
		||||
const express = require('express');
 | 
			
		||||
const { Redis } = require('ioredis');
 | 
			
		||||
const { JSDOM } = require('jsdom');
 | 
			
		||||
const pg = require('pg');
 | 
			
		||||
const dbUrlToConfig = require('pg-connection-string').parse;
 | 
			
		||||
const WebSocket = require('ws');
 | 
			
		||||
import cors from 'cors';
 | 
			
		||||
import dotenv from 'dotenv';
 | 
			
		||||
import express from 'express';
 | 
			
		||||
import { Redis } from 'ioredis';
 | 
			
		||||
import { JSDOM } from 'jsdom';
 | 
			
		||||
import pg from 'pg';
 | 
			
		||||
import pgConnectionString from 'pg-connection-string';
 | 
			
		||||
import WebSocket from 'ws';
 | 
			
		||||
 | 
			
		||||
const errors = require('./errors');
 | 
			
		||||
const { AuthenticationError, RequestError } = require('./errors');
 | 
			
		||||
const { logger, httpLogger, initializeLogLevel, attachWebsocketHttpLogger, createWebsocketLogger } = require('./logging');
 | 
			
		||||
const { setupMetrics } = require('./metrics');
 | 
			
		||||
const { isTruthy, normalizeHashtag, firstParam } = require("./utils");
 | 
			
		||||
import { AuthenticationError, RequestError, extractStatusAndMessage as extractErrorStatusAndMessage } from './errors.js';
 | 
			
		||||
import { logger, httpLogger, initializeLogLevel, attachWebsocketHttpLogger, createWebsocketLogger } from './logging.js';
 | 
			
		||||
import { setupMetrics } from './metrics.js';
 | 
			
		||||
import { isTruthy, normalizeHashtag, firstParam } from './utils.js';
 | 
			
		||||
 | 
			
		||||
const environment = process.env.NODE_ENV || 'development';
 | 
			
		||||
 | 
			
		||||
// Correctly detect and load .env or .env.production file based on environment:
 | 
			
		||||
const dotenvFile = environment === 'production' ? '.env.production' : '.env';
 | 
			
		||||
const dotenvFilePath = path.resolve(
 | 
			
		||||
  url.fileURLToPath(
 | 
			
		||||
    new URL(path.join('..', dotenvFile), import.meta.url)
 | 
			
		||||
  )
 | 
			
		||||
);
 | 
			
		||||
 | 
			
		||||
dotenv.config({
 | 
			
		||||
  path: path.resolve(__dirname, path.join('..', dotenvFile))
 | 
			
		||||
  path: dotenvFilePath
 | 
			
		||||
});
 | 
			
		||||
 | 
			
		||||
initializeLogLevel(process.env, environment);
 | 
			
		||||
| 
						 | 
				
			
			@ -143,7 +147,7 @@ const pgConfigFromEnv = (env) => {
 | 
			
		|||
  let baseConfig = {};
 | 
			
		||||
 | 
			
		||||
  if (env.DATABASE_URL) {
 | 
			
		||||
    const parsedUrl = dbUrlToConfig(env.DATABASE_URL);
 | 
			
		||||
    const parsedUrl = pgConnectionString.parse(env.DATABASE_URL);
 | 
			
		||||
 | 
			
		||||
    // The result of dbUrlToConfig from pg-connection-string is not type
 | 
			
		||||
    // compatible with pg.PoolConfig, since parts of the connection URL may be
 | 
			
		||||
| 
						 | 
				
			
			@ -326,7 +330,7 @@ const startServer = async () => {
 | 
			
		|||
      // Unfortunately for using the on('upgrade') setup, we need to manually
 | 
			
		||||
      // write a HTTP Response to the Socket to close the connection upgrade
 | 
			
		||||
      // attempt, so the following code is to handle all of that.
 | 
			
		||||
      const {statusCode, errorMessage } = errors.extractStatusAndMessage(err);
 | 
			
		||||
      const {statusCode, errorMessage } = extractErrorStatusAndMessage(err);
 | 
			
		||||
 | 
			
		||||
      /** @type {Record<string, string | number | import('pino-http').ReqId>} */
 | 
			
		||||
      const headers = {
 | 
			
		||||
| 
						 | 
				
			
			@ -748,7 +752,7 @@ const startServer = async () => {
 | 
			
		|||
      return;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    const {statusCode, errorMessage } = errors.extractStatusAndMessage(err);
 | 
			
		||||
    const {statusCode, errorMessage } = extractErrorStatusAndMessage(err);
 | 
			
		||||
 | 
			
		||||
    res.writeHead(statusCode, { 'Content-Type': 'application/json' });
 | 
			
		||||
    res.end(JSON.stringify({ error: errorMessage }));
 | 
			
		||||
| 
						 | 
				
			
			@ -1155,7 +1159,7 @@ const startServer = async () => {
 | 
			
		|||
      // @ts-ignore
 | 
			
		||||
      streamFrom(channelIds, req, req.log, onSend, onEnd, 'eventsource', options.needsFiltering);
 | 
			
		||||
    }).catch(err => {
 | 
			
		||||
      const {statusCode, errorMessage } = errors.extractStatusAndMessage(err);
 | 
			
		||||
      const {statusCode, errorMessage } = extractErrorStatusAndMessage(err);
 | 
			
		||||
 | 
			
		||||
      res.log.info({ err }, 'Eventsource subscription error');
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -1353,7 +1357,7 @@ const startServer = async () => {
 | 
			
		|||
        stopHeartbeat,
 | 
			
		||||
      };
 | 
			
		||||
    }).catch(err => {
 | 
			
		||||
      const {statusCode, errorMessage } = errors.extractStatusAndMessage(err);
 | 
			
		||||
      const {statusCode, errorMessage } = extractErrorStatusAndMessage(err);
 | 
			
		||||
 | 
			
		||||
      logger.error({ err }, 'Websocket subscription error');
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -1482,13 +1486,15 @@ const startServer = async () => {
 | 
			
		|||
      // Decrement the metrics for connected clients:
 | 
			
		||||
      connectedClients.labels({ type: 'websocket' }).dec();
 | 
			
		||||
 | 
			
		||||
      // We need to delete the session object as to ensure it correctly gets
 | 
			
		||||
      // We need to unassign the session object as to ensure it correctly gets
 | 
			
		||||
      // garbage collected, without doing this we could accidentally hold on to
 | 
			
		||||
      // references to the websocket, the request, and the logger, causing
 | 
			
		||||
      // memory leaks.
 | 
			
		||||
      //
 | 
			
		||||
      // @ts-ignore
 | 
			
		||||
      delete session;
 | 
			
		||||
 | 
			
		||||
      // This is commented out because `delete` only operated on object properties
 | 
			
		||||
      // It needs to be replaced by `session = undefined`, but it requires every calls to
 | 
			
		||||
      // `session` to check for it, thus a significant refactor
 | 
			
		||||
      // delete session;
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
    // Note: immediately after the `error` event is emitted, the `close` event
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,6 +1,6 @@
 | 
			
		|||
const { pino } = require('pino');
 | 
			
		||||
const { pinoHttp, stdSerializers: pinoHttpSerializers } = require('pino-http');
 | 
			
		||||
const uuid = require('uuid');
 | 
			
		||||
import { pino } from 'pino';
 | 
			
		||||
import { pinoHttp, stdSerializers as pinoHttpSerializers } from 'pino-http';
 | 
			
		||||
import * as uuid from 'uuid';
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * Generates the Request ID for logging and setting on responses
 | 
			
		||||
| 
						 | 
				
			
			@ -36,7 +36,7 @@ function sanitizeRequestLog(req) {
 | 
			
		|||
  return log;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
const logger = pino({
 | 
			
		||||
export const logger = pino({
 | 
			
		||||
  name: "streaming",
 | 
			
		||||
  // Reformat the log level to a string:
 | 
			
		||||
  formatters: {
 | 
			
		||||
| 
						 | 
				
			
			@ -59,7 +59,7 @@ const logger = pino({
 | 
			
		|||
  }
 | 
			
		||||
});
 | 
			
		||||
 | 
			
		||||
const httpLogger = pinoHttp({
 | 
			
		||||
export const httpLogger = pinoHttp({
 | 
			
		||||
  logger,
 | 
			
		||||
  genReqId: generateRequestId,
 | 
			
		||||
  serializers: {
 | 
			
		||||
| 
						 | 
				
			
			@ -71,7 +71,7 @@ const httpLogger = pinoHttp({
 | 
			
		|||
 * Attaches a logger to the request object received by http upgrade handlers
 | 
			
		||||
 * @param {http.IncomingMessage} request
 | 
			
		||||
 */
 | 
			
		||||
function attachWebsocketHttpLogger(request) {
 | 
			
		||||
export function attachWebsocketHttpLogger(request) {
 | 
			
		||||
  generateRequestId(request);
 | 
			
		||||
 | 
			
		||||
  request.log = logger.child({
 | 
			
		||||
| 
						 | 
				
			
			@ -84,7 +84,7 @@ function attachWebsocketHttpLogger(request) {
 | 
			
		|||
 * @param {http.IncomingMessage} request
 | 
			
		||||
 * @param {import('./index.js').ResolvedAccount} resolvedAccount
 | 
			
		||||
 */
 | 
			
		||||
function createWebsocketLogger(request, resolvedAccount) {
 | 
			
		||||
export function createWebsocketLogger(request, resolvedAccount) {
 | 
			
		||||
  // ensure the request.id is always present.
 | 
			
		||||
  generateRequestId(request);
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -98,17 +98,12 @@ function createWebsocketLogger(request, resolvedAccount) {
 | 
			
		|||
  });
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
exports.logger = logger;
 | 
			
		||||
exports.httpLogger = httpLogger;
 | 
			
		||||
exports.attachWebsocketHttpLogger = attachWebsocketHttpLogger;
 | 
			
		||||
exports.createWebsocketLogger = createWebsocketLogger;
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * Initializes the log level based on the environment
 | 
			
		||||
 * @param {Object<string, any>} env
 | 
			
		||||
 * @param {string} environment
 | 
			
		||||
 */
 | 
			
		||||
exports.initializeLogLevel = function initializeLogLevel(env, environment) {
 | 
			
		||||
export function initializeLogLevel(env, environment) {
 | 
			
		||||
  if (env.LOG_LEVEL && Object.keys(logger.levels.values).includes(env.LOG_LEVEL)) {
 | 
			
		||||
    logger.level = env.LOG_LEVEL;
 | 
			
		||||
  } else if (environment === 'development') {
 | 
			
		||||
| 
						 | 
				
			
			@ -116,4 +111,4 @@ exports.initializeLogLevel = function initializeLogLevel(env, environment) {
 | 
			
		|||
  } else {
 | 
			
		||||
    logger.level = 'info';
 | 
			
		||||
  }
 | 
			
		||||
};
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,6 +1,6 @@
 | 
			
		|||
// @ts-check
 | 
			
		||||
 | 
			
		||||
const metrics = require('prom-client');
 | 
			
		||||
import metrics from 'prom-client';
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * @typedef StreamingMetrics
 | 
			
		||||
| 
						 | 
				
			
			@ -18,7 +18,7 @@ const metrics = require('prom-client');
 | 
			
		|||
 * @param {import('pg').Pool} pgPool
 | 
			
		||||
 * @returns {StreamingMetrics}
 | 
			
		||||
 */
 | 
			
		||||
function setupMetrics(channels, pgPool) {
 | 
			
		||||
export function setupMetrics(channels, pgPool) {
 | 
			
		||||
  // Collect metrics from Node.js
 | 
			
		||||
  metrics.collectDefaultMetrics();
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -101,5 +101,3 @@ function setupMetrics(channels, pgPool) {
 | 
			
		|||
    messagesSent,
 | 
			
		||||
  };
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
exports.setupMetrics = setupMetrics;
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -7,6 +7,7 @@
 | 
			
		|||
  },
 | 
			
		||||
  "description": "Mastodon's Streaming Server",
 | 
			
		||||
  "private": true,
 | 
			
		||||
  "type": "module",
 | 
			
		||||
  "repository": {
 | 
			
		||||
    "type": "git",
 | 
			
		||||
    "url": "https://github.com/mastodon/mastodon.git"
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -2,11 +2,11 @@
 | 
			
		|||
  "extends": "../tsconfig.json",
 | 
			
		||||
  "compilerOptions": {
 | 
			
		||||
    "target": "esnext",
 | 
			
		||||
    "module": "CommonJS",
 | 
			
		||||
    "moduleResolution": "node",
 | 
			
		||||
    "module": "NodeNext",
 | 
			
		||||
    "moduleResolution": "NodeNext",
 | 
			
		||||
    "noUnusedParameters": false,
 | 
			
		||||
    "tsBuildInfoFile": "../tmp/cache/streaming/tsconfig.tsbuildinfo",
 | 
			
		||||
    "paths": {},
 | 
			
		||||
  },
 | 
			
		||||
  "include": ["./*.js", "./.eslintrc.js"],
 | 
			
		||||
  "include": ["./*.js", "./.eslintrc.cjs"],
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -16,11 +16,9 @@ const FALSE_VALUES = [
 | 
			
		|||
 * @param {any} value
 | 
			
		||||
 * @returns {boolean}
 | 
			
		||||
 */
 | 
			
		||||
const isTruthy = value =>
 | 
			
		||||
  value && !FALSE_VALUES.includes(value);
 | 
			
		||||
 | 
			
		||||
exports.isTruthy = isTruthy;
 | 
			
		||||
 | 
			
		||||
export function isTruthy(value) {
 | 
			
		||||
  return value && !FALSE_VALUES.includes(value);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * See app/lib/ascii_folder.rb for the canon definitions
 | 
			
		||||
| 
						 | 
				
			
			@ -33,7 +31,7 @@ const EQUIVALENT_ASCII_CHARS = 'AAAAAAaaaaaaAaAaAaCcCcCcCcCcDdDdDdEEEEeeeeEeEeEe
 | 
			
		|||
 * @param {string} str
 | 
			
		||||
 * @returns {string}
 | 
			
		||||
 */
 | 
			
		||||
function foldToASCII(str) {
 | 
			
		||||
export function foldToASCII(str) {
 | 
			
		||||
  const regex = new RegExp(NON_ASCII_CHARS.split('').join('|'), 'g');
 | 
			
		||||
 | 
			
		||||
  return str.replace(regex, function(match) {
 | 
			
		||||
| 
						 | 
				
			
			@ -42,28 +40,22 @@ function foldToASCII(str) {
 | 
			
		|||
  });
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
exports.foldToASCII = foldToASCII;
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * @param {string} str
 | 
			
		||||
 * @returns {string}
 | 
			
		||||
 */
 | 
			
		||||
function normalizeHashtag(str) {
 | 
			
		||||
export function normalizeHashtag(str) {
 | 
			
		||||
  return foldToASCII(str.normalize('NFKC').toLowerCase()).replace(/[^\p{L}\p{N}_\u00b7\u200c]/gu, '');
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
exports.normalizeHashtag = normalizeHashtag;
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * @param {string|string[]} arrayOrString
 | 
			
		||||
 * @returns {string}
 | 
			
		||||
 */
 | 
			
		||||
function firstParam(arrayOrString) {
 | 
			
		||||
export function firstParam(arrayOrString) {
 | 
			
		||||
  if (Array.isArray(arrayOrString)) {
 | 
			
		||||
    return arrayOrString[0];
 | 
			
		||||
  } else {
 | 
			
		||||
    return arrayOrString;
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
exports.firstParam = firstParam;
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue