admin管理员组

文章数量:1287527

I have a Node.js application that interacts with a PostgreSQL database containing millions of users. For a specific feature, I need to fetch around 100,000 users based on a criterion (for example, users that have a particular channel id in their tracking configuration) and then process each user (e.g., creating notification or autotrade tasks).

Currently, my implementation fetches all matching users into memory and then processes them in chunks, like so:

async function processMessageForSubscribers(channelId, channelName, message, addresses) {
  try {
  
    // get around 100000 users in memory
    const users = await getUsersByTrackedTelegramChannel(channelId);
    const CHUNK_SIZE = 500;
    const notifyTasks = [];
    const autotradeTasks = [];

    // Split users into chunks for parallel processing
    const processUserChunk = async (userChunk) => {
      await Promise.all(
        userChunk.map(async (user) => {
          const config = user.trackingConfig[channelId];
          const autotradeAmount = config?.autotradeAmount;
          
          if (config.newPost === 'NOTIFY') {
            // Create notification tasks
            createNotificationTask(user, addresses, message, channelId, channelName, autotradeAmount, notifyTasks);
          }
          
        })
      );
    };

    // Process users in chunks
    for (let i = 0; i < users.length; i += CHUNK_SIZE) {
      const chunk = users.slice(i, i + CHUNK_SIZE);
      await processUserChunk(chunk);
    }

    await queueTasks(notifyTasks, autotradeTasks);
  } catch (error) {
    console.error('Error processing subscribers:', error);
    throw error;
  }
}

本文标签: