diff --git a/common.mjs b/common.mjs index c120597..692dc1d 100644 --- a/common.mjs +++ b/common.mjs @@ -3,7 +3,7 @@ */ import fs from 'node:fs'; -import { readdir, readFile } from 'node:fs/promises'; +import { readdir, readFile, appendFile } from 'node:fs/promises'; import { resolve } from 'node:path'; import { Buffer } from 'node:buffer'; import md5 from 'md5'; @@ -213,6 +213,22 @@ class Common { return Buffer.byteLength(str, 'utf8'); } + //保存log到指定文件 + async saveLog(filePath, content) { + let saved = false; + + try { + let saveRes = await appendFile(filePath, content); + if (saveRes == undefined) { + saved = true; + } + } catch (err) { + console.error(`Log save to %s failed: %s`, filePath, err.message); + } + + return saved; + } + } let commonFuns = new Common(); diff --git a/conf/config.json b/conf/config.json index f6a6930..df5e873 100644 --- a/conf/config.json +++ b/conf/config.json @@ -9,15 +9,21 @@ "notify_timeout": 8, "notify_max_try": 5, + "task_timeout": 600, + "task_max_try": 5, + "reloadConfigFrequence": 1, "heroHeartCheckFrequence": 1, "autoCleanTaskFrequence": 5, "autoNotifyTaskFrequence": 1, + "autoResetWaitingTaskFrequence": 6, "max_list_hero_num": 1000, "axios_proxy": false, + "systemLogDir": "log/", + "tokens": { "herounion_demo": "hello#world!" } diff --git a/heroUnion.mjs b/heroUnion.mjs index d955927..8f6d3ac 100644 --- a/heroUnion.mjs +++ b/heroUnion.mjs @@ -14,6 +14,10 @@ * --数据缓存规则-- * 任务结果数据最大不超过1M,超过的当任务处理失败处理 * 任务数据保存最长 1 天 + * + * --异常处理规则-- + * 任务处理超时后将进行中的任务状态改为等待中,以便其它爬虫处理 + * 任务处理超过最多尝试次数,则标记为失败 */ import fs from 'node:fs'; @@ -31,9 +35,11 @@ class HeroUnion { this.config = null; //默认配置 - //this.task_data_dir = path.resolve('./tmp/data/'); //任务数据保存目录 + this.systemLogDir = 'log/'; //系统日志保存目录 this.task_cache_time = 86400; //任务数据最长缓存时间,单位:秒 this.task_data_max_size = 1024; //任务数据最大字节数,单位:KB,默认最大1M + this.task_timeout = 600; //任务处理超时时长,单位:秒 + this.task_max_try = 5; //任务处理最多尝试次数 this.notify_timeout = 8; //回调通知请求超时时长,单位:秒 this.notify_max_try = 5; //回调通知最多尝试次数 this.heroHeartTimeout = 600; //爬虫心跳超时时长,单位:秒 @@ -91,37 +97,19 @@ class HeroUnion { } async getConfig(forceReload) { + const _self = this; + if ( !this.config || (typeof(forceReload) != 'undefined' && forceReload) ) { - this.config = await common.getConfigFromJsonFile('config.json'); + let config = await common.getConfigFromJsonFile('config.json'); //覆盖默认配置 - if (typeof(this.config.task_cache_time) != 'undefined' && this.config.task_cache_time) { - this.task_cache_time = this.config.task_cache_time; //任务数据最长缓存时间,单位:秒 - } - - if (typeof(this.config.task_data_max_size) != 'undefined' && this.config.task_data_max_size) { - this.task_data_max_size = this.config.task_data_max_size; //任务数据最大字节数,单位:KB - } - - if (typeof(this.config.notify_timeout) != 'undefined' && this.config.notify_timeout) { - this.notify_timeout = this.config.notify_timeout; //回调通知请求超时时长,单位:秒 - } - - if (typeof(this.config.notify_max_try) != 'undefined' && this.config.notify_max_try) { - this.notify_max_try = this.config.notify_max_try; //最多回调通知次数 - } - - if (typeof(this.config.heroHeartTimeout) != 'undefined' && this.config.heroHeartTimeout) { - this.heroHeartTimeout = this.config.heroHeartTimeout; //爬虫心跳超时时长,单位:秒 - } - - if (typeof(this.config.max_list_hero_num) != 'undefined' && this.config.max_list_hero_num) { - this.max_list_hero_num = this.config.max_list_hero_num; //最大返回爬虫数量 + for (const key in config) { + if (typeof(_self[key]) != 'undefined') { + _self[key] = config[key]; + } } - if (typeof(this.config.axios_proxy) != 'undefined' && this.config.axios_proxy) { - this.axios_proxy = this.config.axios_proxy; - } + this.config = config; } return this.config; @@ -164,10 +152,11 @@ class HeroUnion { * error: '', * notified: false, //是否成功发送回调通知 * notify_time: 0, //回调通知次数 + * try_time: 0, //任务处理次数 * token: '' //任务密钥,爬虫完成任务回传数据的时候用它签名 * } **/ - createTask(uuid, url, platform, contract, data_mode, notify_url, country, lang) { + async createTask(uuid, url, platform, contract, data_mode, notify_url, country, lang) { let timestamp = common.getTimestampInSeconds(); let task = { @@ -176,6 +165,7 @@ class HeroUnion { notified: false, notify_time: 0, + try_time: 0, //必选 uuid: uuid, @@ -211,6 +201,13 @@ class HeroUnion { this.taskStatus.total ++; this.taskStatus.waiting ++; + //保存任务日志 + let config = await this.getConfig(); + let logFile = path.resolve(config.systemLogDir) + '/tasks.log'; + common.saveLog(logFile, JSON.stringify(task) + "\n"); + + common.log('Task %s created, url %s, notify url %s.', task.id, url, notify_url); + return task; } @@ -248,6 +245,10 @@ class HeroUnion { this.tasks[taskIndex].status = 'running'; //为task生成一个随机密钥,便于爬虫处理完成后回传的时候对数据进行签名 this.tasks[taskIndex].token = this.generateTaskToken(this.tasks[taskIndex].id); + //任务处理次数计数 + this.tasks[taskIndex].try_time ++; + //更新任务修改时间 + this.tasks[taskIndex].updated = common.getTimestampInSeconds(); //更新统计数据 this.taskStatus.waiting --; @@ -414,7 +415,7 @@ class HeroUnion { //定期检查爬虫是否在线 //如果上一次上报状态时间在10分钟前,则设置该爬虫已下线 heroHeartCheck() { - let _self = this; + const _self = this; const frequence = typeof(this.config.heroHeartCheckFrequence) != 'undefined' && this.config.heroHeartCheckFrequence ? this.config.heroHeartCheckFrequence : 1; //1 分钟检查一次 @@ -439,7 +440,7 @@ class HeroUnion { //自动重新加载配置文件 autoReloadConfigs() { - let _self = this; + const _self = this; const frequence = typeof(this.config.reloadConfigFrequence) != 'undefined' && this.config.reloadConfigFrequence ? this.config.reloadConfigFrequence : 5; //5 分钟重新加载一次 @@ -456,7 +457,7 @@ class HeroUnion { //定期清理过期的任务 autoCleanExpiredTasks() { - let _self = this; + const _self = this; const frequence = typeof(this.config.autoCleanTaskFrequence) != 'undefined' && this.config.autoCleanTaskFrequence ? this.config.autoCleanTaskFrequence : 10; //10 分钟检查一次 @@ -489,9 +490,45 @@ class HeroUnion { common.log('Cronjob of auto clean expired tasks started.'); } + //定期重置处理过期的任务 + autoResetRunningTimeoutTasks() { + const _self = this; + + const frequence = typeof(this.config.autoResetWaitingTaskFrequence) != 'undefined' + && this.config.autoResetWaitingTaskFrequence ? this.config.autoResetWaitingTaskFrequence : 6; //6 分钟检查一次 + const cronjob = cron.schedule(`*/${frequence} * * * *`, () => { + let timestamp = common.getTimestampInSeconds(); + + _self.tasks.forEach(function(item, index) { + if ( + item.status == 'running' + && item.try_time < _self.task_max_try + && timestamp - item.updated > _self.task_timeout + ) { + _self.taskStatus.running --; + _self.taskStatus.waiting ++; + _self.tasks[index].status = 'waiting'; + common.log('Task %s running timeout, and reset it to waiting list', item.id); + }else if (item.status == 'running' && item.try_time >= _self.task_max_try) { + //设置任务失败 + _self.taskStatus.running --; + _self.taskStatus.failed ++; + _self.tasks[index].status = 'failed'; + _self.tasks[index].error = 'Task max try time got.'; + common.error('Task %s failed, got the max try time.', item.id); + } + }); + }, { + scheduled: false + }); + + cronjob.start(); + common.log('Cronjob of auto reset expired running tasks started.'); + } + //定期尝试给已完成状态的任务notify_url发送通知回调 autoNotifyTasks() { - let _self = this; + const _self = this; const frequence = typeof(this.config.autoNotifyTaskFrequence) != 'undefined' && this.config.autoNotifyTaskFrequence ? this.config.autoNotifyTaskFrequence : 2; //2 分钟检查一次 @@ -573,6 +610,7 @@ class HeroUnion { this.heroHeartCheck(); this.autoCleanExpiredTasks(); this.autoNotifyTasks(); + this.autoResetRunningTimeoutTasks(); } } diff --git a/log/Readme.md b/log/Readme.md new file mode 100644 index 0000000..de4d95f --- /dev/null +++ b/log/Readme.md @@ -0,0 +1,3 @@ +# 系统日志保存目录 + +用来保存任务提交记录等日志。 diff --git a/router_api.mjs b/router_api.mjs index 5315b54..cceab20 100644 --- a/router_api.mjs +++ b/router_api.mjs @@ -106,7 +106,7 @@ router.post('/newtask/', async (req, res) => { } if (!data.message) { - data.task = heroUnion.createTask(uuid, url, platform, contract, data_mode, notify_url, country, lang); + data.task = await heroUnion.createTask(uuid, url, platform, contract, data_mode, notify_url, country, lang); data.code = 1; data.message = '新爬虫任务提交完成'; } diff --git a/test/common.test.mjs b/test/common.test.mjs index 570199e..baca849 100644 --- a/test/common.test.mjs +++ b/test/common.test.mjs @@ -127,4 +127,16 @@ test('Common function byteSize test', async (t) => { let case5 = common.byteSize('hello 你'); assert.equal(case5, 9); +}); + +test('Common function saveLog test', async (t) => { + let data = {a: 1, b:2}; + let filename = './log/test.log'; + let saved = await common.saveLog(filename, JSON.stringify(data)); + assert.equal(saved, true); + + //case 2 + filename = './logs/test.log'; + saved = await common.saveLog(filename, JSON.stringify(data)); + assert.equal(saved, false); }); \ No newline at end of file