|
|
|
/**
|
|
|
|
* 对爬虫任务列表目录进行监控
|
|
|
|
* 发现新任务
|
|
|
|
* 删除已完成的任务文件
|
|
|
|
* 内存中保存所有任务,及其状态
|
|
|
|
* 返回当前任务状态
|
|
|
|
* -------------------
|
|
|
|
* 注意:任务清单文件名不能重复,如果一个新任务文件名跟已经处理过的任务重名,则不会被处理
|
|
|
|
* -------------------
|
|
|
|
* task数据结构:{id:'', url: '', status:''}
|
|
|
|
*/
|
|
|
|
import common from './common.mjs';
|
|
|
|
import fs from 'node:fs';
|
|
|
|
import { readdir, readFile } from 'node:fs/promises';
|
|
|
|
import path from 'node:path';
|
|
|
|
import cron from 'node-cron';
|
|
|
|
|
|
|
|
class TaskMoniter {
|
|
|
|
|
|
|
|
constructor(task_list_dir) {
|
|
|
|
this.check_time_gap = 10; //检测间隔时间,单位:秒
|
|
|
|
this.checking = false;
|
|
|
|
|
|
|
|
this.task_dir = task_list_dir; //监控目录:任务列表保存目录
|
|
|
|
this.tasks = {}; //内存中的任务列表
|
|
|
|
this.taskStatus = { //当前任务状态
|
|
|
|
total: 0, //总任务数
|
|
|
|
waiting: 0, //等待执行的任务数
|
|
|
|
running: 0, //正在执行的任务数
|
|
|
|
done: 0, //已完成的任务数
|
|
|
|
failed: 0 //执行失败的任务数
|
|
|
|
};
|
|
|
|
|
|
|
|
this.statusCode = {
|
|
|
|
waiting: 'waiting',
|
|
|
|
running: 'running',
|
|
|
|
done: 'done',
|
|
|
|
failed: 'failed',
|
|
|
|
};
|
|
|
|
}
|
|
|
|
|
|
|
|
getTaskFilePath(task_id) {
|
|
|
|
const dirPath = path.resolve(this.task_dir);
|
|
|
|
return `${dirPath}/${task_id}.task`;
|
|
|
|
}
|
|
|
|
|
|
|
|
//注意:任务文件名不能重复,已经用过的文件名不能再使用
|
|
|
|
//推荐以时间戳为任务文件名,如:1694762776985.task
|
|
|
|
getTaskId(filename) {
|
|
|
|
return filename.replace('.task', '');
|
|
|
|
}
|
|
|
|
|
|
|
|
getStatus() {
|
|
|
|
return this.taskStatus;
|
|
|
|
}
|
|
|
|
|
|
|
|
getNewTask() {
|
|
|
|
let task = null;
|
|
|
|
|
|
|
|
for (const id in this.tasks) {
|
|
|
|
if (this.tasks[id].status == this.statusCode.waiting) {
|
|
|
|
task = this.tasks[id];
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return task;
|
|
|
|
}
|
|
|
|
|
|
|
|
setTaskRunning(task_id) {
|
|
|
|
if (typeof(this.tasks[task_id]) == 'undefined') {
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
this.tasks[task_id].status = this.statusCode.running;
|
|
|
|
this.taskStatus[this.statusCode.running] ++;
|
|
|
|
this.taskStatus[this.statusCode.waiting] --;
|
|
|
|
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
setTaskDone(task_id) {
|
|
|
|
if (typeof(this.tasks[task_id]) == 'undefined') {
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
this.tasks[task_id].status = this.statusCode.done;
|
|
|
|
this.taskStatus[this.statusCode.done] ++;
|
|
|
|
this.taskStatus[this.statusCode.running] --;
|
|
|
|
|
|
|
|
const filepath = this.getTaskFilePath(task_id);
|
|
|
|
common.removeFile(filepath); //async delete
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
setTaskFailed(task_id) {
|
|
|
|
if (typeof(this.tasks[task_id]) == 'undefined') {
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
this.tasks[task_id].status = this.statusCode.failed;
|
|
|
|
this.taskStatus[this.statusCode.failed] ++;
|
|
|
|
this.taskStatus[this.statusCode.running] --;
|
|
|
|
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
async parseTaskFile(filename, filepath) {
|
|
|
|
let task = {};
|
|
|
|
|
|
|
|
try {
|
|
|
|
task.id = this.getTaskId(filename);
|
|
|
|
task.status = this.statusCode.waiting;
|
|
|
|
|
|
|
|
task.url = await readFile(filepath, { encoding: 'utf8' });
|
|
|
|
if (task.url) {
|
|
|
|
task.url = task.url.replace(/[\r\n]/g, '');
|
|
|
|
}
|
|
|
|
}catch(error) {
|
|
|
|
console.error('Get task file content failed: %s', error);
|
|
|
|
}
|
|
|
|
|
|
|
|
return task;
|
|
|
|
}
|
|
|
|
|
|
|
|
addTask(task) {
|
|
|
|
if (typeof(this.tasks[task.id]) != 'undefined') {
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
this.tasks[task.id] = task;
|
|
|
|
this.taskStatus[task.status] ++;
|
|
|
|
this.taskStatus.total ++;
|
|
|
|
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
async checkTasks() {
|
|
|
|
if (this.checking == true) {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
try {
|
|
|
|
console.log('[%s] TaskMoniter auto check...', common.getTimeString());
|
|
|
|
|
|
|
|
this.checking = true;
|
|
|
|
|
|
|
|
const dirPath = path.resolve(this.task_dir);
|
|
|
|
const files = await readdir(dirPath);
|
|
|
|
let task = null, task_id = null;
|
|
|
|
for (const filename of files) {
|
|
|
|
if (filename.indexOf('.task') === -1) {continue;} //ignore not *.task files
|
|
|
|
|
|
|
|
task_id = this.getTaskId(filename);
|
|
|
|
if (typeof(this.tasks[task_id]) != 'undefined') { //跳过已经存在的任务
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
|
|
|
task = await this.parseTaskFile(filename, `${dirPath}/${filename}`);
|
|
|
|
this.addTask(task);
|
|
|
|
}
|
|
|
|
|
|
|
|
this.checking = false;
|
|
|
|
}catch(error) {
|
|
|
|
this.checking = false;
|
|
|
|
console.error('Check tasks failed: %s', error);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
run() { //开始监控任务目录,把所有任务缓存到内存
|
|
|
|
console.log('[%s] TaskMoniter started.', common.getTimeString());
|
|
|
|
|
|
|
|
//auto check new tasks
|
|
|
|
const _self = this;
|
|
|
|
const task_check_time = this.check_time_gap;
|
|
|
|
const task_auto_run = cron.schedule(`*/${task_check_time} * * * * *`, async () => {
|
|
|
|
await _self.checkTasks();
|
|
|
|
console.log('Status', _self.getStatus());
|
|
|
|
}, {
|
|
|
|
scheduled: false
|
|
|
|
});
|
|
|
|
|
|
|
|
task_auto_run.start();
|
|
|
|
console.log('[%s] TaskMoniter auto check started.', common.getTimeString());
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
export default TaskMoniter;
|