diff --git a/README.md b/README.md index 37bc851..b137448 100644 --- a/README.md +++ b/README.md @@ -227,9 +227,9 @@ https://herounion.filesite.io/api/savetask/ * 请求方法:**POST** * 请求参数: ``` +name task_id task_result -timestamp sign ``` @@ -248,7 +248,7 @@ var params = { //参数示例 }; var sortObj = function(obj) { //参数排序方法 - return Object.keys(obj).sort().reduce(function (result, key) { + return Object.keys(obj).sort().reduce(function(result, key) { result[key] = obj[key]; return result; }, {}); diff --git a/common.mjs b/common.mjs index e97703f..acb9d8a 100644 --- a/common.mjs +++ b/common.mjs @@ -37,7 +37,7 @@ class Common { } sortDict(obj) { //dict按key排序 - return Object.keys(obj).sort().reduce(function (result, key) { + return Object.keys(obj).sort().reduce(function(result, key) { result[key] = obj[key]; return result; }, {}); @@ -230,6 +230,10 @@ class Common { return args; } + byteSize(str) { + return new Blob([str]).size; + } + } let commonFuns = new Common(); diff --git a/conf/config.json b/conf/config.json index d400eb8..a35730d 100644 --- a/conf/config.json +++ b/conf/config.json @@ -11,6 +11,7 @@ "reloadConfigFrequence": 60, "heroHeartCheckFrequence": 60, + "autoCleanTaskFrequence": 300, "max_list_hero_num": 1000, diff --git a/heroUnion.mjs b/heroUnion.mjs index d74346b..67fe6cd 100644 --- a/heroUnion.mjs +++ b/heroUnion.mjs @@ -33,7 +33,7 @@ class HeroUnion { //默认配置 //this.task_data_dir = path.resolve('./tmp/data/'); //任务数据保存目录 this.task_cache_time = 86400; //任务数据最长缓存时间,单位:秒 - this.task_data_max_size = 1024; //任务数据最大字节数,单位:KB + this.task_data_max_size = 1024; //任务数据最大字节数,单位:KB,默认最大1M this.notify_timeout = 8; //回调通知请求超时时长,单位:秒 this.notify_max_try = 5; //回调通知最多尝试次数 this.heroHeartTimeout = 600; //爬虫心跳超时时长,单位:秒 @@ -79,7 +79,7 @@ class HeroUnion { } isDataTooLarge(data) { - return JSON.stringify(data).length > this.task_data_max_size * 1024; + return common.byteSize(JSON.stringify(data)) > this.task_data_max_size * 1024; } async getConfig(forceReload) { @@ -247,21 +247,25 @@ class HeroUnion { return searchResult; } - //保存任务处理结果 - saveTaskById(uuid, id, data) { + //保存处理中任务结果 + saveTaskById(bot_name, id, data) { let done = false; - let taskIndex = this.tasks.findIndex((item) => item.id == id); + let taskIndex = this.tasks.findIndex((item) => item.id == id && item.status == 'running'); if (taskIndex > -1) { if (this.isDataTooLarge(data)) { + //更新统计数据 + this.taskStatus.running --; + this.taskStatus.failed ++; + this.tasks[taskIndex].status = 'failed'; this.tasks[taskIndex].error = 'Result is too large to save.'; return false; } - data.uuid = uuid; + data.provider = bot_name; //记录数据提供者 - let resIndex = this.tasks[taskIndex].results.findeIndex((dataItem) => dataItem.uuid == uuid); + let resIndex = this.tasks[taskIndex].results.findIndex((dataItem) => dataItem.provider == bot_name); if (resIndex == -1) { this.tasks[taskIndex].results.push(data); }else { @@ -269,6 +273,11 @@ class HeroUnion { } this.tasks[taskIndex].updated = common.getTimestampInSeconds(); + + //更新统计数据 + this.taskStatus.running --; + this.taskStatus.done ++; + this.tasks[taskIndex].status = 'done'; done = true; @@ -297,6 +306,13 @@ class HeroUnion { try { if (notify_url && /^http(s)?:\/\/[\w\.]+/i.test(notify_url)) { + //检查任务通知次数是否达到最大尝试次数 + if (task.notify_time > this.notify_max_try) { + common.error('[LIMITED] Task %s notify time has reach the max try number %s', + task.id, this.notify_max_try); + return false; + } + let params = { "task_id": task.id, "task_result": task.results, @@ -307,14 +323,17 @@ class HeroUnion { const response = await axios.post(notify_url, params, {timeout: this.notify_timeout*1000}); if (response.status == 200) { notified = true; - - //TODO: 更新任务notified状态以及notify_time通知次数 }else { - //TODO: 检查任务通知次数是否达到最大尝试次数 - common.error('[FAILED] Notify to %s failed, response status: %s, status text: %s, result: %s', notify_url, response.status, response.statusText, response.data); } + + //更新任务notified状态以及notify_time通知次数 + let taskIndex = this.tasks.findIndex((item) => item.id == task.id); + if (taskIndex) { + this.tasks[taskIndex].notified = notified; + this.tasks[taskIndex].notify_time ++; + } } }catch(err) { common.error('[ERROR] Notify to %s failed: %s', notify_url, err); @@ -410,6 +429,41 @@ class HeroUnion { common.log('Cronjob of config auto reload started.'); } + //定期清理过期的任务 + autoCleanExpiredTasks() { + let _self = this; + + const frequence = typeof(this.config.autoCleanTaskFrequence) != 'undefined' + && this.config.autoCleanTaskFrequence ? this.config.autoCleanTaskFrequence : 600; //10 分钟检查一次 + const cronjob = cron.schedule(`*/${frequence} * * * * *`, () => { + let timestamp = common.getTimestampInSeconds(); + + let tasksLeft = _self.tasks.reduce(function(accumulator, item) { + if ( + (item.status == 'done' || item.status == 'failed') + && timestamp - item.created > _self.task_cache_time + ) { + _self.taskStatus[item.status] --; + _self.taskStatus.total --; + common.log('Task %s is expired, which is created at %s', item.id, item.created); + }else { + accumulator.push(item); + } + + return accumulator; + }, []); + + if (tasksLeft) { + _self.tasks = tasksLeft; + } + }, { + scheduled: false + }); + + cronjob.start(); + common.log('Cronjob of auto clean expired tasks started.'); + } + //获取联盟状态 getStats() { this.stats.taskStatus = this.taskStatus; @@ -468,6 +522,7 @@ class HeroUnion { await this.getConfig(); this.autoReloadConfigs(); this.heroHeartCheck(); + this.autoCleanExpiredTasks(); } } diff --git a/router_api.mjs b/router_api.mjs index 0368363..51046b7 100644 --- a/router_api.mjs +++ b/router_api.mjs @@ -26,6 +26,7 @@ router.get('/', async (req, res) => { "/api/onboard/": "爬虫状态上报到联盟", "/api/gettask/": "爬虫从联盟获取待处理任务", + "/api/savetask/": "爬虫完成任务后保存结果到联盟", }; const data = { @@ -134,7 +135,7 @@ router.get('/querytask/', async (req, res) => { }else if (common.isUuidOk(uuid) == false) { data.message = '参数uuid应为6-32位的英文字符串,请联系管理员获得'; }else if (common.isTaskIdOk(task_id) == false) { - data.message = '任务编号task_id格式错误,请使用接口/api/newtask/返回数据里的id属性值'; + data.message = '任务编号task_id格式错误,请使用接口/api/newtask/返回数据里的任务id属性值'; }else if (common.isNormalName(sign, 32, 32) == false) { data.message = '签名sign应为32位的英文字符串'; } @@ -219,13 +220,55 @@ router.get('/gettask/', async (req, res) => { /** * hero爬虫向联盟提交某个任务的抓取结果 * + * 参数: + * name: 爬虫名字 + * task_id: 任务ID + * task_result: 抓取结果数据 + * sign: 参数签名 **/ router.post('/savetask/', async (req, res) => { - //TODO: 签名检查 + let name = req.body.name, + task_id = req.body.task_id, + task_result = req.body.task_result, + sign = req.body.sign; - //TODO: 任务状态检查,如果已经完成,则忽略当前请求 - - return res.send('api/savetask/'); + let data = {code: 0, message: ''}; + + //参数检查 + if (!name || !task_id || !task_result || !sign) { + data.message = '必选参数不能为空'; + }else if (common.isBotNameOk(name) == false) { + data.message = '爬虫名字必须是6 - 32位英文字母、下划线的组合'; + }else if (common.isTaskIdOk(task_id) == false) { + data.message = '任务编号task_id格式错误,请使用接口/api/gettask/返回数据里的任务id属性值'; + } + + //签名检查,如果通过则保存任务数据 + let task = heroUnion.getTaskById(task_id); + if (task) { + let paramsCheck = { + name: name, + task_id: task_id, + task_result: task_result + }; + + let mySign = common.sign(paramsCheck, task.token); + if (mySign.toLowerCase() != sign.toLowerCase()) { + data.message = `签名 ${sign} 不匹配,请确保token正确及签名方法跟文档一致`; + }else { + let saved = heroUnion.saveTaskById(name, task_id, task_result); + if (saved) { + data.code = 1; + data.message = '保存任务数据完成'; + }else { + data.message = `任务${task_id}已经完成,请勿重复提交数据`; + } + } + }else { + data.message = `任务${task_id}不存在`; + } + + return res.status(200).json(data); }); /** diff --git a/test/common.test.mjs b/test/common.test.mjs index a1293ef..16a3032 100644 --- a/test/common.test.mjs +++ b/test/common.test.mjs @@ -122,3 +122,20 @@ test('Common function isTaskIdOk test', async (t) => { let case5 = common.isTaskIdOk('test01_1234567890123'); assert.equal(case5, true); }); + +test('Common function byteSize test', async (t) => { + let case1 = common.byteSize('a'); + assert.equal(case1, 1); + + let case2 = common.byteSize('0'); + assert.equal(case2, 1); + + let case3 = common.byteSize('你'); + assert.equal(case3, 3); + + let case4 = common.byteSize('😃'); + assert.equal(case4, 4); + + let case5 = common.byteSize('hello 你'); + assert.equal(case5, 9); +}); \ No newline at end of file diff --git a/test/heroUnion.test.mjs b/test/heroUnion.test.mjs index 28e7107..2b6740b 100644 --- a/test/heroUnion.test.mjs +++ b/test/heroUnion.test.mjs @@ -167,6 +167,58 @@ test('HeroUnion get waiting task test', async (t) => { assert.ifError(response2.data.task); }); +test('HeroUnion task data save test', async (t) => { + let params = { + platforms: 'douyin,kuaishou,xigua,bilibili', + contracts: 'tajiantv', + data_mode: 'json', + country: 'cn', + lang: 'zh' + }; + + let api = 'http://127.0.0.1:8080/api/gettask/'; + + let queryOption = axiosConfig; + queryOption.method = 'get'; + queryOption.url = api; + queryOption.params = params; + + const response = await axios(queryOption); + console.log(response.data); + + assert.equal(response.status, 200); + assert.equal(response.data.code, 1); + assert.ok(response.data.task); + + let task = response.data.task; + let task_data = { + "title": "标题:HeroUnion英雄联盟", + "description": "描述内容,联盟简介", + "others": "其它内容" + }; + + api = 'http://127.0.0.1:8080/api/savetask/'; + params = { + name: "heroDemo", + task_id: task.id, + task_result: task_data + }; + params.sign = common.sign(params, task.token); //对参数进行签名 + + //case 1 + const response2 = await axios.post(api, params, axiosConfig); + console.log(response2.data); + + assert.equal(response2.status, 200); + assert.equal(response2.data.code, 1); + + //case 2 + const response3 = await axios.post(api, params, axiosConfig); + console.log(response3.data); + + assert.equal(response3.status, 200); + assert.equal(response3.data.code, 0); +}); test('HeroUnion stats test', async (t) => { let api = 'http://127.0.0.1:8080/api/stats/';