Browse Source

add task log, add task handle timeout and max try time

master
filesite 7 months ago
parent
commit
f88aa13131
  1. 18
      common.mjs
  2. 6
      conf/config.json
  3. 102
      heroUnion.mjs
  4. 3
      log/Readme.md
  5. 2
      router_api.mjs
  6. 12
      test/common.test.mjs

18
common.mjs

@ -3,7 +3,7 @@
*/ */
import fs from 'node:fs'; import fs from 'node:fs';
import { readdir, readFile } from 'node:fs/promises'; import { readdir, readFile, appendFile } from 'node:fs/promises';
import { resolve } from 'node:path'; import { resolve } from 'node:path';
import { Buffer } from 'node:buffer'; import { Buffer } from 'node:buffer';
import md5 from 'md5'; import md5 from 'md5';
@ -213,6 +213,22 @@ class Common {
return Buffer.byteLength(str, 'utf8'); return Buffer.byteLength(str, 'utf8');
} }
//保存log到指定文件
async saveLog(filePath, content) {
let saved = false;
try {
let saveRes = await appendFile(filePath, content);
if (saveRes == undefined) {
saved = true;
}
} catch (err) {
console.error(`Log save to %s failed: %s`, filePath, err.message);
}
return saved;
}
} }
let commonFuns = new Common(); let commonFuns = new Common();

6
conf/config.json

@ -9,15 +9,21 @@
"notify_timeout": 8, "notify_timeout": 8,
"notify_max_try": 5, "notify_max_try": 5,
"task_timeout": 600,
"task_max_try": 5,
"reloadConfigFrequence": 1, "reloadConfigFrequence": 1,
"heroHeartCheckFrequence": 1, "heroHeartCheckFrequence": 1,
"autoCleanTaskFrequence": 5, "autoCleanTaskFrequence": 5,
"autoNotifyTaskFrequence": 1, "autoNotifyTaskFrequence": 1,
"autoResetWaitingTaskFrequence": 6,
"max_list_hero_num": 1000, "max_list_hero_num": 1000,
"axios_proxy": false, "axios_proxy": false,
"systemLogDir": "log/",
"tokens": { "tokens": {
"herounion_demo": "hello#world!" "herounion_demo": "hello#world!"
} }

102
heroUnion.mjs

@ -14,6 +14,10 @@
* --数据缓存规则-- * --数据缓存规则--
* 任务结果数据最大不超过1M超过的当任务处理失败处理 * 任务结果数据最大不超过1M超过的当任务处理失败处理
* 任务数据保存最长 1 * 任务数据保存最长 1
*
* --异常处理规则--
* 任务处理超时后将进行中的任务状态改为等待中以便其它爬虫处理
* 任务处理超过最多尝试次数则标记为失败
*/ */
import fs from 'node:fs'; import fs from 'node:fs';
@ -31,9 +35,11 @@ class HeroUnion {
this.config = null; this.config = null;
//默认配置 //默认配置
//this.task_data_dir = path.resolve('./tmp/data/'); //任务数据保存目录 this.systemLogDir = 'log/'; //系统日志保存目录
this.task_cache_time = 86400; //任务数据最长缓存时间,单位:秒 this.task_cache_time = 86400; //任务数据最长缓存时间,单位:秒
this.task_data_max_size = 1024; //任务数据最大字节数,单位:KB,默认最大1M this.task_data_max_size = 1024; //任务数据最大字节数,单位:KB,默认最大1M
this.task_timeout = 600; //任务处理超时时长,单位:秒
this.task_max_try = 5; //任务处理最多尝试次数
this.notify_timeout = 8; //回调通知请求超时时长,单位:秒 this.notify_timeout = 8; //回调通知请求超时时长,单位:秒
this.notify_max_try = 5; //回调通知最多尝试次数 this.notify_max_try = 5; //回调通知最多尝试次数
this.heroHeartTimeout = 600; //爬虫心跳超时时长,单位:秒 this.heroHeartTimeout = 600; //爬虫心跳超时时长,单位:秒
@ -91,37 +97,19 @@ class HeroUnion {
} }
async getConfig(forceReload) { async getConfig(forceReload) {
const _self = this;
if ( !this.config || (typeof(forceReload) != 'undefined' && forceReload) ) { if ( !this.config || (typeof(forceReload) != 'undefined' && forceReload) ) {
this.config = await common.getConfigFromJsonFile('config.json'); let config = await common.getConfigFromJsonFile('config.json');
//覆盖默认配置 //覆盖默认配置
if (typeof(this.config.task_cache_time) != 'undefined' && this.config.task_cache_time) { for (const key in config) {
this.task_cache_time = this.config.task_cache_time; //任务数据最长缓存时间,单位:秒 if (typeof(_self[key]) != 'undefined') {
} _self[key] = config[key];
}
if (typeof(this.config.task_data_max_size) != 'undefined' && this.config.task_data_max_size) {
this.task_data_max_size = this.config.task_data_max_size; //任务数据最大字节数,单位:KB
}
if (typeof(this.config.notify_timeout) != 'undefined' && this.config.notify_timeout) {
this.notify_timeout = this.config.notify_timeout; //回调通知请求超时时长,单位:秒
}
if (typeof(this.config.notify_max_try) != 'undefined' && this.config.notify_max_try) {
this.notify_max_try = this.config.notify_max_try; //最多回调通知次数
}
if (typeof(this.config.heroHeartTimeout) != 'undefined' && this.config.heroHeartTimeout) {
this.heroHeartTimeout = this.config.heroHeartTimeout; //爬虫心跳超时时长,单位:秒
}
if (typeof(this.config.max_list_hero_num) != 'undefined' && this.config.max_list_hero_num) {
this.max_list_hero_num = this.config.max_list_hero_num; //最大返回爬虫数量
} }
if (typeof(this.config.axios_proxy) != 'undefined' && this.config.axios_proxy) { this.config = config;
this.axios_proxy = this.config.axios_proxy;
}
} }
return this.config; return this.config;
@ -164,10 +152,11 @@ class HeroUnion {
* error: '', * error: '',
* notified: false, //是否成功发送回调通知 * notified: false, //是否成功发送回调通知
* notify_time: 0, //回调通知次数 * notify_time: 0, //回调通知次数
* try_time: 0, //任务处理次数
* token: '' //任务密钥,爬虫完成任务回传数据的时候用它签名 * token: '' //任务密钥,爬虫完成任务回传数据的时候用它签名
* } * }
**/ **/
createTask(uuid, url, platform, contract, data_mode, notify_url, country, lang) { async createTask(uuid, url, platform, contract, data_mode, notify_url, country, lang) {
let timestamp = common.getTimestampInSeconds(); let timestamp = common.getTimestampInSeconds();
let task = { let task = {
@ -176,6 +165,7 @@ class HeroUnion {
notified: false, notified: false,
notify_time: 0, notify_time: 0,
try_time: 0,
//必选 //必选
uuid: uuid, uuid: uuid,
@ -211,6 +201,13 @@ class HeroUnion {
this.taskStatus.total ++; this.taskStatus.total ++;
this.taskStatus.waiting ++; this.taskStatus.waiting ++;
//保存任务日志
let config = await this.getConfig();
let logFile = path.resolve(config.systemLogDir) + '/tasks.log';
common.saveLog(logFile, JSON.stringify(task) + "\n");
common.log('Task %s created, url %s, notify url %s.', task.id, url, notify_url);
return task; return task;
} }
@ -248,6 +245,10 @@ class HeroUnion {
this.tasks[taskIndex].status = 'running'; this.tasks[taskIndex].status = 'running';
//为task生成一个随机密钥,便于爬虫处理完成后回传的时候对数据进行签名 //为task生成一个随机密钥,便于爬虫处理完成后回传的时候对数据进行签名
this.tasks[taskIndex].token = this.generateTaskToken(this.tasks[taskIndex].id); this.tasks[taskIndex].token = this.generateTaskToken(this.tasks[taskIndex].id);
//任务处理次数计数
this.tasks[taskIndex].try_time ++;
//更新任务修改时间
this.tasks[taskIndex].updated = common.getTimestampInSeconds();
//更新统计数据 //更新统计数据
this.taskStatus.waiting --; this.taskStatus.waiting --;
@ -414,7 +415,7 @@ class HeroUnion {
//定期检查爬虫是否在线 //定期检查爬虫是否在线
//如果上一次上报状态时间在10分钟前,则设置该爬虫已下线 //如果上一次上报状态时间在10分钟前,则设置该爬虫已下线
heroHeartCheck() { heroHeartCheck() {
let _self = this; const _self = this;
const frequence = typeof(this.config.heroHeartCheckFrequence) != 'undefined' const frequence = typeof(this.config.heroHeartCheckFrequence) != 'undefined'
&& this.config.heroHeartCheckFrequence ? this.config.heroHeartCheckFrequence : 1; //1 分钟检查一次 && this.config.heroHeartCheckFrequence ? this.config.heroHeartCheckFrequence : 1; //1 分钟检查一次
@ -439,7 +440,7 @@ class HeroUnion {
//自动重新加载配置文件 //自动重新加载配置文件
autoReloadConfigs() { autoReloadConfigs() {
let _self = this; const _self = this;
const frequence = typeof(this.config.reloadConfigFrequence) != 'undefined' const frequence = typeof(this.config.reloadConfigFrequence) != 'undefined'
&& this.config.reloadConfigFrequence ? this.config.reloadConfigFrequence : 5; //5 分钟重新加载一次 && this.config.reloadConfigFrequence ? this.config.reloadConfigFrequence : 5; //5 分钟重新加载一次
@ -456,7 +457,7 @@ class HeroUnion {
//定期清理过期的任务 //定期清理过期的任务
autoCleanExpiredTasks() { autoCleanExpiredTasks() {
let _self = this; const _self = this;
const frequence = typeof(this.config.autoCleanTaskFrequence) != 'undefined' const frequence = typeof(this.config.autoCleanTaskFrequence) != 'undefined'
&& this.config.autoCleanTaskFrequence ? this.config.autoCleanTaskFrequence : 10; //10 分钟检查一次 && this.config.autoCleanTaskFrequence ? this.config.autoCleanTaskFrequence : 10; //10 分钟检查一次
@ -489,9 +490,45 @@ class HeroUnion {
common.log('Cronjob of auto clean expired tasks started.'); common.log('Cronjob of auto clean expired tasks started.');
} }
//定期重置处理过期的任务
autoResetRunningTimeoutTasks() {
const _self = this;
const frequence = typeof(this.config.autoResetWaitingTaskFrequence) != 'undefined'
&& this.config.autoResetWaitingTaskFrequence ? this.config.autoResetWaitingTaskFrequence : 6; //6 分钟检查一次
const cronjob = cron.schedule(`*/${frequence} * * * *`, () => {
let timestamp = common.getTimestampInSeconds();
_self.tasks.forEach(function(item, index) {
if (
item.status == 'running'
&& item.try_time < _self.task_max_try
&& timestamp - item.updated > _self.task_timeout
) {
_self.taskStatus.running --;
_self.taskStatus.waiting ++;
_self.tasks[index].status = 'waiting';
common.log('Task %s running timeout, and reset it to waiting list', item.id);
}else if (item.status == 'running' && item.try_time >= _self.task_max_try) {
//设置任务失败
_self.taskStatus.running --;
_self.taskStatus.failed ++;
_self.tasks[index].status = 'failed';
_self.tasks[index].error = 'Task max try time got.';
common.error('Task %s failed, got the max try time.', item.id);
}
});
}, {
scheduled: false
});
cronjob.start();
common.log('Cronjob of auto reset expired running tasks started.');
}
//定期尝试给已完成状态的任务notify_url发送通知回调 //定期尝试给已完成状态的任务notify_url发送通知回调
autoNotifyTasks() { autoNotifyTasks() {
let _self = this; const _self = this;
const frequence = typeof(this.config.autoNotifyTaskFrequence) != 'undefined' const frequence = typeof(this.config.autoNotifyTaskFrequence) != 'undefined'
&& this.config.autoNotifyTaskFrequence ? this.config.autoNotifyTaskFrequence : 2; //2 分钟检查一次 && this.config.autoNotifyTaskFrequence ? this.config.autoNotifyTaskFrequence : 2; //2 分钟检查一次
@ -573,6 +610,7 @@ class HeroUnion {
this.heroHeartCheck(); this.heroHeartCheck();
this.autoCleanExpiredTasks(); this.autoCleanExpiredTasks();
this.autoNotifyTasks(); this.autoNotifyTasks();
this.autoResetRunningTimeoutTasks();
} }
} }

3
log/Readme.md

@ -0,0 +1,3 @@
# 系统日志保存目录
用来保存任务提交记录等日志。

2
router_api.mjs

@ -106,7 +106,7 @@ router.post('/newtask/', async (req, res) => {
} }
if (!data.message) { if (!data.message) {
data.task = heroUnion.createTask(uuid, url, platform, contract, data_mode, notify_url, country, lang); data.task = await heroUnion.createTask(uuid, url, platform, contract, data_mode, notify_url, country, lang);
data.code = 1; data.code = 1;
data.message = '新爬虫任务提交完成'; data.message = '新爬虫任务提交完成';
} }

12
test/common.test.mjs

@ -127,4 +127,16 @@ test('Common function byteSize test', async (t) => {
let case5 = common.byteSize('hello 你'); let case5 = common.byteSize('hello 你');
assert.equal(case5, 9); assert.equal(case5, 9);
});
test('Common function saveLog test', async (t) => {
let data = {a: 1, b:2};
let filename = './log/test.log';
let saved = await common.saveLog(filename, JSON.stringify(data));
assert.equal(saved, true);
//case 2
filename = './logs/test.log';
saved = await common.saveLog(filename, JSON.stringify(data));
assert.equal(saved, false);
}); });
Loading…
Cancel
Save