Browse Source

api savetask done, add cronjob to auto clean expired tasks

master
filesite 8 months ago
parent
commit
cbfbb1aa04
  1. 4
      README.md
  2. 6
      common.mjs
  3. 1
      conf/config.json
  4. 77
      heroUnion.mjs
  5. 53
      router_api.mjs
  6. 17
      test/common.test.mjs
  7. 52
      test/heroUnion.test.mjs

4
README.md

@ -227,9 +227,9 @@ https://herounion.filesite.io/api/savetask/ @@ -227,9 +227,9 @@ https://herounion.filesite.io/api/savetask/
* 请求方法:**POST**
* 请求参数:
```
name
task_id
task_result
timestamp
sign
```
@ -248,7 +248,7 @@ var params = { //参数示例 @@ -248,7 +248,7 @@ var params = { //参数示例
};
var sortObj = function(obj) { //参数排序方法
return Object.keys(obj).sort().reduce(function (result, key) {
return Object.keys(obj).sort().reduce(function(result, key) {
result[key] = obj[key];
return result;
}, {});

6
common.mjs

@ -37,7 +37,7 @@ class Common { @@ -37,7 +37,7 @@ class Common {
}
sortDict(obj) { //dict按key排序
return Object.keys(obj).sort().reduce(function (result, key) {
return Object.keys(obj).sort().reduce(function(result, key) {
result[key] = obj[key];
return result;
}, {});
@ -230,6 +230,10 @@ class Common { @@ -230,6 +230,10 @@ class Common {
return args;
}
byteSize(str) {
return new Blob([str]).size;
}
}
let commonFuns = new Common();

1
conf/config.json

@ -11,6 +11,7 @@ @@ -11,6 +11,7 @@
"reloadConfigFrequence": 60,
"heroHeartCheckFrequence": 60,
"autoCleanTaskFrequence": 300,
"max_list_hero_num": 1000,

77
heroUnion.mjs

@ -33,7 +33,7 @@ class HeroUnion { @@ -33,7 +33,7 @@ class HeroUnion {
//默认配置
//this.task_data_dir = path.resolve('./tmp/data/'); //任务数据保存目录
this.task_cache_time = 86400; //任务数据最长缓存时间,单位:秒
this.task_data_max_size = 1024; //任务数据最大字节数,单位:KB
this.task_data_max_size = 1024; //任务数据最大字节数,单位:KB,默认最大1M
this.notify_timeout = 8; //回调通知请求超时时长,单位:秒
this.notify_max_try = 5; //回调通知最多尝试次数
this.heroHeartTimeout = 600; //爬虫心跳超时时长,单位:秒
@ -79,7 +79,7 @@ class HeroUnion { @@ -79,7 +79,7 @@ class HeroUnion {
}
isDataTooLarge(data) {
return JSON.stringify(data).length > this.task_data_max_size * 1024;
return common.byteSize(JSON.stringify(data)) > this.task_data_max_size * 1024;
}
async getConfig(forceReload) {
@ -247,21 +247,25 @@ class HeroUnion { @@ -247,21 +247,25 @@ class HeroUnion {
return searchResult;
}
//保存任务处理结果
saveTaskById(uuid, id, data) {
//保存处理中任务结果
saveTaskById(bot_name, id, data) {
let done = false;
let taskIndex = this.tasks.findIndex((item) => item.id == id);
let taskIndex = this.tasks.findIndex((item) => item.id == id && item.status == 'running');
if (taskIndex > -1) {
if (this.isDataTooLarge(data)) {
//更新统计数据
this.taskStatus.running --;
this.taskStatus.failed ++;
this.tasks[taskIndex].status = 'failed';
this.tasks[taskIndex].error = 'Result is too large to save.';
return false;
}
data.uuid = uuid;
data.provider = bot_name; //记录数据提供者
let resIndex = this.tasks[taskIndex].results.findeIndex((dataItem) => dataItem.uuid == uuid);
let resIndex = this.tasks[taskIndex].results.findIndex((dataItem) => dataItem.provider == bot_name);
if (resIndex == -1) {
this.tasks[taskIndex].results.push(data);
}else {
@ -269,6 +273,11 @@ class HeroUnion { @@ -269,6 +273,11 @@ class HeroUnion {
}
this.tasks[taskIndex].updated = common.getTimestampInSeconds();
//更新统计数据
this.taskStatus.running --;
this.taskStatus.done ++;
this.tasks[taskIndex].status = 'done';
done = true;
@ -297,6 +306,13 @@ class HeroUnion { @@ -297,6 +306,13 @@ class HeroUnion {
try {
if (notify_url && /^http(s)?:\/\/[\w\.]+/i.test(notify_url)) {
//检查任务通知次数是否达到最大尝试次数
if (task.notify_time > this.notify_max_try) {
common.error('[LIMITED] Task %s notify time has reach the max try number %s',
task.id, this.notify_max_try);
return false;
}
let params = {
"task_id": task.id,
"task_result": task.results,
@ -307,14 +323,17 @@ class HeroUnion { @@ -307,14 +323,17 @@ class HeroUnion {
const response = await axios.post(notify_url, params, {timeout: this.notify_timeout*1000});
if (response.status == 200) {
notified = true;
//TODO: 更新任务notified状态以及notify_time通知次数
}else {
//TODO: 检查任务通知次数是否达到最大尝试次数
common.error('[FAILED] Notify to %s failed, response status: %s, status text: %s, result: %s',
notify_url, response.status, response.statusText, response.data);
}
//更新任务notified状态以及notify_time通知次数
let taskIndex = this.tasks.findIndex((item) => item.id == task.id);
if (taskIndex) {
this.tasks[taskIndex].notified = notified;
this.tasks[taskIndex].notify_time ++;
}
}
}catch(err) {
common.error('[ERROR] Notify to %s failed: %s', notify_url, err);
@ -410,6 +429,41 @@ class HeroUnion { @@ -410,6 +429,41 @@ class HeroUnion {
common.log('Cronjob of config auto reload started.');
}
//定期清理过期的任务
autoCleanExpiredTasks() {
let _self = this;
const frequence = typeof(this.config.autoCleanTaskFrequence) != 'undefined'
&& this.config.autoCleanTaskFrequence ? this.config.autoCleanTaskFrequence : 600; //10 分钟检查一次
const cronjob = cron.schedule(`*/${frequence} * * * * *`, () => {
let timestamp = common.getTimestampInSeconds();
let tasksLeft = _self.tasks.reduce(function(accumulator, item) {
if (
(item.status == 'done' || item.status == 'failed')
&& timestamp - item.created > _self.task_cache_time
) {
_self.taskStatus[item.status] --;
_self.taskStatus.total --;
common.log('Task %s is expired, which is created at %s', item.id, item.created);
}else {
accumulator.push(item);
}
return accumulator;
}, []);
if (tasksLeft) {
_self.tasks = tasksLeft;
}
}, {
scheduled: false
});
cronjob.start();
common.log('Cronjob of auto clean expired tasks started.');
}
//获取联盟状态
getStats() {
this.stats.taskStatus = this.taskStatus;
@ -468,6 +522,7 @@ class HeroUnion { @@ -468,6 +522,7 @@ class HeroUnion {
await this.getConfig();
this.autoReloadConfigs();
this.heroHeartCheck();
this.autoCleanExpiredTasks();
}
}

53
router_api.mjs

@ -26,6 +26,7 @@ router.get('/', async (req, res) => { @@ -26,6 +26,7 @@ router.get('/', async (req, res) => {
"/api/onboard/": "爬虫状态上报到联盟",
"/api/gettask/": "爬虫从联盟获取待处理任务",
"/api/savetask/": "爬虫完成任务后保存结果到联盟",
};
const data = {
@ -134,7 +135,7 @@ router.get('/querytask/', async (req, res) => { @@ -134,7 +135,7 @@ router.get('/querytask/', async (req, res) => {
}else if (common.isUuidOk(uuid) == false) {
data.message = '参数uuid应为6-32位的英文字符串,请联系管理员获得';
}else if (common.isTaskIdOk(task_id) == false) {
data.message = '任务编号task_id格式错误,请使用接口/api/newtask/返回数据里的id属性值';
data.message = '任务编号task_id格式错误,请使用接口/api/newtask/返回数据里的任务id属性值';
}else if (common.isNormalName(sign, 32, 32) == false) {
data.message = '签名sign应为32位的英文字符串';
}
@ -219,13 +220,55 @@ router.get('/gettask/', async (req, res) => { @@ -219,13 +220,55 @@ router.get('/gettask/', async (req, res) => {
/**
* hero爬虫向联盟提交某个任务的抓取结果
*
* 参数
* name: 爬虫名字
* task_id: 任务ID
* task_result: 抓取结果数据
* sign: 参数签名
**/
router.post('/savetask/', async (req, res) => {
//TODO: 签名检查
let name = req.body.name,
task_id = req.body.task_id,
task_result = req.body.task_result,
sign = req.body.sign;
//TODO: 任务状态检查,如果已经完成,则忽略当前请求
return res.send('api/savetask/');
let data = {code: 0, message: ''};
//参数检查
if (!name || !task_id || !task_result || !sign) {
data.message = '必选参数不能为空';
}else if (common.isBotNameOk(name) == false) {
data.message = '爬虫名字必须是6 - 32位英文字母、下划线的组合';
}else if (common.isTaskIdOk(task_id) == false) {
data.message = '任务编号task_id格式错误,请使用接口/api/gettask/返回数据里的任务id属性值';
}
//签名检查,如果通过则保存任务数据
let task = heroUnion.getTaskById(task_id);
if (task) {
let paramsCheck = {
name: name,
task_id: task_id,
task_result: task_result
};
let mySign = common.sign(paramsCheck, task.token);
if (mySign.toLowerCase() != sign.toLowerCase()) {
data.message = `签名 ${sign} 不匹配,请确保token正确及签名方法跟文档一致`;
}else {
let saved = heroUnion.saveTaskById(name, task_id, task_result);
if (saved) {
data.code = 1;
data.message = '保存任务数据完成';
}else {
data.message = `任务${task_id}已经完成,请勿重复提交数据`;
}
}
}else {
data.message = `任务${task_id}不存在`;
}
return res.status(200).json(data);
});
/**

17
test/common.test.mjs

@ -122,3 +122,20 @@ test('Common function isTaskIdOk test', async (t) => { @@ -122,3 +122,20 @@ test('Common function isTaskIdOk test', async (t) => {
let case5 = common.isTaskIdOk('test01_1234567890123');
assert.equal(case5, true);
});
test('Common function byteSize test', async (t) => {
let case1 = common.byteSize('a');
assert.equal(case1, 1);
let case2 = common.byteSize('0');
assert.equal(case2, 1);
let case3 = common.byteSize('你');
assert.equal(case3, 3);
let case4 = common.byteSize('😃');
assert.equal(case4, 4);
let case5 = common.byteSize('hello 你');
assert.equal(case5, 9);
});

52
test/heroUnion.test.mjs

@ -167,6 +167,58 @@ test('HeroUnion get waiting task test', async (t) => { @@ -167,6 +167,58 @@ test('HeroUnion get waiting task test', async (t) => {
assert.ifError(response2.data.task);
});
test('HeroUnion task data save test', async (t) => {
let params = {
platforms: 'douyin,kuaishou,xigua,bilibili',
contracts: 'tajiantv',
data_mode: 'json',
country: 'cn',
lang: 'zh'
};
let api = 'http://127.0.0.1:8080/api/gettask/';
let queryOption = axiosConfig;
queryOption.method = 'get';
queryOption.url = api;
queryOption.params = params;
const response = await axios(queryOption);
console.log(response.data);
assert.equal(response.status, 200);
assert.equal(response.data.code, 1);
assert.ok(response.data.task);
let task = response.data.task;
let task_data = {
"title": "标题:HeroUnion英雄联盟",
"description": "描述内容,联盟简介",
"others": "其它内容"
};
api = 'http://127.0.0.1:8080/api/savetask/';
params = {
name: "heroDemo",
task_id: task.id,
task_result: task_data
};
params.sign = common.sign(params, task.token); //对参数进行签名
//case 1
const response2 = await axios.post(api, params, axiosConfig);
console.log(response2.data);
assert.equal(response2.status, 200);
assert.equal(response2.data.code, 1);
//case 2
const response3 = await axios.post(api, params, axiosConfig);
console.log(response3.data);
assert.equal(response3.status, 200);
assert.equal(response3.data.code, 0);
});
test('HeroUnion stats test', async (t) => {
let api = 'http://127.0.0.1:8080/api/stats/';

Loading…
Cancel
Save