批量执行异步函数,限制并发数
batchAsync
函数签名
typescript
function batchAsync<T>(tasks: (() => Promise<T>)[], concurrency: number = 5): Promise<T[]>描述
批量执行异步函数,限制并发数
类型参数
| 参数名 | 约束 | 默认值 | 描述 |
|---|---|---|---|
T | - | - | - 任务函数返回值类型 |
参数
| 参数名 | 类型 | 可选 | 默认值 | 描述 |
|---|---|---|---|---|
tasks | (() => Promise<T>)[] | 否 | - | - |
concurrency | number | 是 | 5 | - |
返回值
Promise<T[]>
点击查看源码
js
/**
* 批量执行异步函数,限制并发数
*
* @template T - 任务函数返回值类型
* @param {(() => Promise<T>)[]} tasks - 异步任务函数数组
* @param {number} concurrency - 最大并发数
* @returns {Promise<T[]>} 所有任务的结果数组,顺序与任务数组一致
*/
export async function batchAsync(tasks, concurrency = 5) {
if (!tasks.length) return [];
const results = new Array(tasks.length);
const executing = [];
let index = 0;
const enqueue = async () => {
if (index >= tasks.length) return;
const currentIndex = index++;
const task = tasks[currentIndex];
try {
results[currentIndex] = await task();
} catch (error) {
results[currentIndex] = error;
}
// 从执行队列中移除已完成的任务
const executingIndex = executing.indexOf(executingPromise);
if (executingIndex !== -1) {
executing.splice(executingIndex, 1);
}
// 继续执行下一个任务
return enqueue();
};
const executingPromise = enqueue();
executing.push(executingPromise);
// 启动初始的并发任务
for (let i = 1; i < concurrency && i < tasks.length; i++) {
const executingPromise = enqueue();
executing.push(executingPromise);
}
// 等待所有任务完成
await Promise.all(executing);
return results;
}ts
/**
* 批量执行异步函数,限制并发数
*
* @template T - 任务函数返回值类型
* @param {(() => Promise<T>)[]} tasks - 异步任务函数数组
* @param {number} concurrency - 最大并发数
* @returns {Promise<T[]>} 所有任务的结果数组,顺序与任务数组一致
*/
export async function batchAsync<T>(
tasks: (() => Promise<T>)[],
concurrency: number = 5,
): Promise<T[]> {
if (!tasks.length) return [];
const results: T[] = new Array(tasks.length);
const executing: Promise<void>[] = [];
let index = 0;
const enqueue = async (): Promise<void> => {
if (index >= tasks.length) return;
const currentIndex = index++;
const task = tasks[currentIndex];
try {
results[currentIndex] = await task();
} catch (error) {
results[currentIndex] = error as any;
}
// 从执行队列中移除已完成的任务
const executingIndex = executing.indexOf(executingPromise);
if (executingIndex !== -1) {
executing.splice(executingIndex, 1);
}
// 继续执行下一个任务
return enqueue();
};
const executingPromise = enqueue();
executing.push(executingPromise);
// 启动初始的并发任务
for (let i = 1; i < concurrency && i < tasks.length; i++) {
const executingPromise = enqueue();
executing.push(executingPromise);
}
// 等待所有任务完成
await Promise.all(executing);
return results;
}如有错误,请提交issue :::