Skip to content

@utilslib/core/batchAsync


批量执行异步函数,限制并发数


batchAsync

函数签名

typescript
function batchAsync<T>(tasks: (() => Promise<T>)[], concurrency: number = 5): Promise<T[]>

描述

批量执行异步函数,限制并发数

类型参数

参数名约束默认值描述
T--- 任务函数返回值类型

参数

参数名类型可选默认值描述
tasks(() => Promise<T>)[]--
concurrencynumber5-

返回值

Promise<T[]>

点击查看源码
js
/**
 * 批量执行异步函数,限制并发数
 *
 * @template T - 任务函数返回值类型
 * @param {(() => Promise<T>)[]} tasks - 异步任务函数数组
 * @param {number} concurrency - 最大并发数
 * @returns {Promise<T[]>} 所有任务的结果数组,顺序与任务数组一致
 */
export async function batchAsync(tasks, concurrency = 5) {
  if (!tasks.length) return [];
  const results = new Array(tasks.length);
  const executing = [];
  let index = 0;
  const enqueue = async () => {
    if (index >= tasks.length) return;
    const currentIndex = index++;
    const task = tasks[currentIndex];
    try {
      results[currentIndex] = await task();
    } catch (error) {
      results[currentIndex] = error;
    }
    // 从执行队列中移除已完成的任务
    const executingIndex = executing.indexOf(executingPromise);
    if (executingIndex !== -1) {
      executing.splice(executingIndex, 1);
    }
    // 继续执行下一个任务
    return enqueue();
  };
  const executingPromise = enqueue();
  executing.push(executingPromise);
  // 启动初始的并发任务
  for (let i = 1; i < concurrency && i < tasks.length; i++) {
    const executingPromise = enqueue();
    executing.push(executingPromise);
  }
  // 等待所有任务完成
  await Promise.all(executing);
  return results;
}
ts
/**
 * 批量执行异步函数,限制并发数
 *
 * @template T - 任务函数返回值类型
 * @param {(() => Promise<T>)[]} tasks - 异步任务函数数组
 * @param {number} concurrency - 最大并发数
 * @returns {Promise<T[]>} 所有任务的结果数组,顺序与任务数组一致
 */
export async function batchAsync<T>(
  tasks: (() => Promise<T>)[],
  concurrency: number = 5,
): Promise<T[]> {
  if (!tasks.length) return [];

  const results: T[] = new Array(tasks.length);
  const executing: Promise<void>[] = [];
  let index = 0;

  const enqueue = async (): Promise<void> => {
    if (index >= tasks.length) return;

    const currentIndex = index++;
    const task = tasks[currentIndex];

    try {
      results[currentIndex] = await task();
    } catch (error) {
      results[currentIndex] = error as any;
    }

    // 从执行队列中移除已完成的任务
    const executingIndex = executing.indexOf(executingPromise);
    if (executingIndex !== -1) {
      executing.splice(executingIndex, 1);
    }

    // 继续执行下一个任务
    return enqueue();
  };

  const executingPromise = enqueue();
  executing.push(executingPromise);

  // 启动初始的并发任务
  for (let i = 1; i < concurrency && i < tasks.length; i++) {
    const executingPromise = enqueue();
    executing.push(executingPromise);
  }

  // 等待所有任务完成
  await Promise.all(executing);

  return results;
}

如有错误,请提交issue :::