如何实现一个 Node 上的并发调度类
如何实现一个 Node 上的并发调度类
/**
* 表示一个并发调度器,用于并发执行多个任务并自带重试功能
* 核心:批量请求中保留尽可能地 fulfilled 接收一定 rejected 状态
* @class
* @constructor
* @param {Object} options - 并发调度器的选项。
* @param {number} options.maxRetries - 失败任务的最大重试次数。默认为3。
* @example
* const scheduler = new ConcurrentScheduler({ maxRetries: 5 });
* const tasks = [task1, task2, task3];
* const initialState = {};
* const resultProcessor = (state, result) => { // process result };
* const finalState = await scheduler.execute(tasks, { initialState, resultProcessor });
*/
class ConcurrentScheduler {
/**
* 创建一个新的 ConcurrentScheduler 实例。
* @param {Object} options - ConcurrentScheduler 的选项。
* @param {number} [options.maxRetries=3] - 每个任务的最大重试次数。
* @constructor
*/
constructor(options) {
const { maxRetries = 3 } = options ?? {};
this.maxRetries = maxRetries
}
/**
* 执行并发任务
* @param {Array<Function>} tasks - 任务列表,每个任务是一个函数
* @param {Object} options - 选项参数
* @param {*} options.initialState - 初始状态
* @param {Function} options.resultProcessor - 结果处理函数
* @returns {*} 执行完任务后的最终状态
* @example
* const tasks = [task1, task2, task3];
* const initialState = {};
* const resultProcessor = (state, result) => { // process result };
* const finalState = await scheduler.execute(tasks, { initialState, resultProcessor });
*/
async execute(tasks, {
initialState,
resultProcessor
}) {
// 逆向思维,先假设所有任务都失败了
let failedTasks = tasks.slice();
let state = initialState;
for (let attempt = 0; attempt < this.maxRetries; attempt++) {
// 没有失败的任务了,直接退出
if (failedTasks.length === 0) break;
try {
const results = await Promise.allSettled(failedTasks.map(task => task()));
failedTasks = [];
results.forEach((result, index) => {
if (result.status === 'fulfilled') {
resultProcessor(state, result.value);
} else {
failedTasks.push(tasks[index]);
}
});
} catch (executionError) {
console.error('An error occurred during task execution:', executionError);
}
}
return state;
}
}
// 使用示例
const asyncTask = (taskName) => {
return () => new Promise((resolve, reject) => {
setTimeout(() => {
Math.random() > 0.5 ? resolve(`Success ${taskName}`) : reject(`Fail ${taskName}`)
}, 300)
})
}
const taskQueue = [
asyncTask('task1'),
asyncTask('task2'),
asyncTask('task3'),
];
const cs = new ConcurrentScheduler();
(async () => {
console.time('execute');
const result = await cs.execute(taskQueue, {
initialState: [],
resultProcessor: (state, value) => state.push(value)
});
console.timeEnd('execute');
console.log(result); // 输出累积的成功结果
})();
module.exports = ConcurrentScheduler;