/** * Hero管理、调度 * --使用流程-- * 1. 本地启动machete_hero爬虫,它会主动连接本联盟,并加入到爬虫队列等待处理任务(爬虫会定时上报自己的状态给联盟); * 2. 联盟收到新任务时,存入待处理队列,等待在线的爬虫来获取; * 3. 爬虫获取到新任务处理完成后,将结果回传给联盟; * 4. 联盟收到爬虫处理结果触发回调通知并将数据结果发送给任务提交者; * 5. 任务提交者可自行根据任务编号来联盟查询任务结果; * * --并发处理规则-- * 同一个任务可以被分配给多个爬虫 * 同一个任务可以接收不同爬虫回传的数据,并完成回调 * * --数据缓存规则-- * 任务结果数据最大不超过1M,超过的当任务处理失败处理 * 任务数据保存最长 1 天 */ import fs from 'node:fs'; import { readdir, readFile } from 'node:fs/promises'; import path from 'node:path'; import cron from 'node-cron'; import axios from 'axios'; import common from './common.mjs'; import md5 from 'md5'; class HeroUnion { //构造函数,设置默认配置 constructor() { this.config = null; //默认配置 //this.task_data_dir = path.resolve('./tmp/data/'); //任务数据保存目录 this.task_cache_time = 86400; //任务数据最长缓存时间,单位:秒 this.task_data_max_size = 1024; //任务数据最大字节数,单位:KB,默认最大1M this.notify_timeout = 8; //回调通知请求超时时长,单位:秒 this.notify_max_try = 5; //回调通知最多尝试次数 this.heroHeartTimeout = 600; //爬虫心跳超时时长,单位:秒 this.max_list_hero_num = 1000; //在接口getHeros()里最多返回的爬虫数量 this.axios_proxy = false; //axios库发送请求时是否使用系统代理 this.stats = { start_time: common.getTimestampInSeconds() }; this.heros = []; //hero爬虫队列 this.tasks = []; //任务队列 //任务相关数据 this.taskStatus = { 'total': 0, 'waiting': 0, 'running': 0, 'done': 0, 'failed': 0 }; this.statusCode = { 'waiting': '待处理', 'running': '处理中', 'done': '完成', 'failed': '失败' }; this.supportedPlatforms = { 'douyin': true, 'kuaishou': true, 'xigua': true, 'bilibili': true }; //爬虫相关数据 this.heroStatus = { 'total': 0, 'idle': 0, //空闲 'busy': 0, //繁忙 'offline': 0 //离线 }; } isDataTooLarge(data) { return common.byteSize(JSON.stringify(data)) > this.task_data_max_size * 1024; } async getConfig(forceReload) { if ( !this.config || (typeof(forceReload) != 'undefined' && forceReload) ) { this.config = await common.getConfigFromJsonFile('config.json'); //覆盖默认配置 if (typeof(this.config.task_cache_time) != 'undefined' && this.config.task_cache_time) { this.task_cache_time = this.config.task_cache_time; //任务数据最长缓存时间,单位:秒 } if (typeof(this.config.task_data_max_size) != 'undefined' && this.config.task_data_max_size) { this.task_data_max_size = this.config.task_data_max_size; //任务数据最大字节数,单位:KB } if (typeof(this.config.notify_timeout) != 'undefined' && this.config.notify_timeout) { this.notify_timeout = this.config.notify_timeout; //回调通知请求超时时长,单位:秒 } if (typeof(this.config.notify_max_try) != 'undefined' && this.config.notify_max_try) { this.notify_max_try = this.config.notify_max_try; //最多回调通知次数 } if (typeof(this.config.heroHeartTimeout) != 'undefined' && this.config.heroHeartTimeout) { this.heroHeartTimeout = this.config.heroHeartTimeout; //爬虫心跳超时时长,单位:秒 } if (typeof(this.config.max_list_hero_num) != 'undefined' && this.config.max_list_hero_num) { this.max_list_hero_num = this.config.max_list_hero_num; //最大返回爬虫数量 } if (typeof(this.config.axios_proxy) != 'undefined' && this.config.axios_proxy) { this.axios_proxy = this.config.axios_proxy; } } return this.config; } //--任务相关功能-- //根据任务提交者ID和时间戳生成任务ID编号 generateTaskId(uuid) { let timestamp = common.getTimestamp(); return `${uuid}_${timestamp}`; } //根据当前时间生成任务的密钥 generateTaskToken(id) { let timestamp = common.getTimestamp(); return md5(`${id}_${timestamp}`); } isSupportedPlatform(platform) { return typeof(this.supportedPlatforms[platform]) != 'undefined' && this.supportedPlatforms[platform]; } //提交新任务 /** * { * id: '', * status: '', * uuid: '', * country: '', * lang: '', * url: '', * platform: '', //目标网址所属平台,具体参考爬虫所支持的平台 * contract: '', //需要抓取的数据合约,凡是支持此合约的爬虫将根据合约内容抓取数据 * data_mode: '', //json, html * notify_url: '', * results: [], * created: 0, //timestamp in seconds * updated: 0, //timestamp in seconds * error: '', * notified: false, //是否成功发送回调通知 * notify_time: 0, //回调通知次数 * token: '' //任务密钥,爬虫完成任务回传数据的时候用它签名 * } **/ createTask(uuid, url, platform, contract, data_mode, notify_url, country, lang) { let timestamp = common.getTimestampInSeconds(); let task = { id: this.generateTaskId(uuid), status: 'waiting', notified: false, notify_time: 0, //必选 uuid: uuid, url: url, platform: platform, contract: contract, //可选 data_mode: 'json', country: 'cn', lang: 'zh', notify_url: '', results: [], created: timestamp, updated: timestamp }; if (typeof(data_mode) != 'undefined' && data_mode) { task.data_mode = data_mode; } if (typeof(notify_url) != 'undefined' && notify_url) { task.notify_url = notify_url; } if (typeof(country) != 'undefined' && country) { task.country = country; } if (typeof(lang) != 'undefined' && lang) { task.lang = lang; } this.tasks.push(task); this.taskStatus.total ++; this.taskStatus.waiting ++; return task; } //参数均可选,获取 1 个待处理的任务 getWaitingTask(platforms, contracts, country, lang, data_mode) { let searchResult = null; let taskIndex = this.tasks.findIndex(function(item) { if (item.status != 'waiting') {return false;} if (typeof(platforms) != 'undefined' && platforms && platforms.indexOf(item.platform) == -1) { return false; } if (typeof(contracts) != 'undefined' && contracts && contracts.indexOf(item.contract) == -1) { return false; } if (typeof(country) != 'undefined' && country && item.country != country) { return false; } if (typeof(lang) != 'undefined' && lang && item.lang != lang) { return false; } if (typeof(data_mode) != 'undefined' && data_mode && item.data_mode != data_mode) { return false; } return true; }); if (taskIndex > -1) { this.tasks[taskIndex].status = 'running'; //为task生成一个随机密钥,便于爬虫处理完成后回传的时候对数据进行签名 this.tasks[taskIndex].token = this.generateTaskToken(this.tasks[taskIndex].id); //更新统计数据 this.taskStatus.waiting --; this.taskStatus.running ++; searchResult = this.tasks[taskIndex]; } return searchResult; } //保存处理中任务结果 saveTaskById(bot_name, id, data) { let done = false; let taskIndex = this.tasks.findIndex((item) => item.id == id && item.status == 'running'); if (taskIndex > -1) { if (this.isDataTooLarge(data)) { //更新统计数据 this.taskStatus.running --; this.taskStatus.failed ++; this.tasks[taskIndex].status = 'failed'; this.tasks[taskIndex].error = 'Result is too large to save.'; common.error('Task %s save data failed by bot %s, data is too large.', id, bot_name); return false; } data.provider = bot_name; //记录数据提供者 let resIndex = this.tasks[taskIndex].results.findIndex((dataItem) => dataItem.provider == bot_name); if (resIndex == -1) { this.tasks[taskIndex].results.push(data); }else { this.tasks[taskIndex].results[resIndex] = data; } this.tasks[taskIndex].updated = common.getTimestampInSeconds(); //更新统计数据 this.taskStatus.running --; this.taskStatus.done ++; this.tasks[taskIndex].status = 'done'; common.log('Task %s save data done by bot %s.', id, bot_name); done = true; } return done; } //查询某个任务的状态及其数据 getTaskById(id) { return this.tasks.find((item) => item.id == id); } //根据uuid获取用户的签名密钥 async getUserToken(uuid) { let config = await this.getConfig(); return config && typeof(config.tokens[uuid]) != 'undefined' ? config.tokens[uuid] : ''; } //任务完成触发回调通知 async handleTaskDone(task) { let notified = false; let notify_url = task.notify_url; if (!notify_url || common.isUrlOk(notify_url) == false) { return false; } try { common.log('[%s] Try to notify task %s via %s', task.notify_time, task.id, notify_url); let params = { "task_id": task.id, "task_result": task.results, "timestamp": common.getTimestamp(), }; let token = await this.getUserToken(task.uuid); params.sign = common.sign(params, token); const response = await axios.post(notify_url, params, { timeout: this.notify_timeout*1000, proxy: this.axios_proxy }); if (response.status == 200) { notified = true; common.log('Task %s notify to %s done, response data:', task.id, notify_url, response.data); }else { common.error('[FAILED] Notify to %s failed, response status: %s, status text: %s, result: %s', notify_url, response.status, response.statusText, response.data); } }catch(err) { common.error('[ERROR] Notify to %s failed: %s', notify_url, err); } //更新任务notified状态以及notify_time通知次数 let taskIndex = this.tasks.findIndex((item) => item.id == task.id); if (taskIndex > -1) { this.tasks[taskIndex].notified = notified; this.tasks[taskIndex].notify_time ++; } return notified; } //--爬虫相关功能-- //接收爬虫状态上报 /** * bot爬虫属性 * name * description * status: [idle, busy] * platforms: [], //支持的平台,可由爬虫定义 * contracts: [], //支持的数据抓取合约,具体内容由爬虫定义 * timestamp * country * lang * contact */ heroOnboard(bot) { let cachedBotIndex = this.heros.findIndex((item) => item.name == bot.name), cachedBot = cachedBotIndex > -1 ? this.heros[cachedBotIndex] : null; if (cachedBot) { //如果是已经存在的爬虫 if (cachedBot.status != bot.status) { common.log('Hero %s status change from %s to %s', cachedBot.name, cachedBot.status, bot.status); this.heroStatus[cachedBot.status] --; this.heroStatus[bot.status] ++; } this.heros[cachedBotIndex] = bot; //数据更新 common.log('Hero %s is %s at %s', bot.name, bot.status, bot.timestamp); }else { this.heros.push(bot); //添加新爬虫 this.heroStatus.total ++; if (bot.status == 'idle') { this.heroStatus.idle ++; }else { this.heroStatus.busy ++; } common.log('Hero %s is onboard at %s', bot.name, bot.timestamp); } } //定期检查爬虫是否在线 //如果上一次上报状态时间在10分钟前,则设置该爬虫已下线 heroHeartCheck() { let _self = this; const frequence = typeof(this.config.heroHeartCheckFrequence) != 'undefined' && this.config.heroHeartCheckFrequence ? this.config.heroHeartCheckFrequence : 60; //1 分钟检查一次 const cronjob = cron.schedule(`*/${frequence} * * * * *`, () => { let timestamp = common.getTimestampInSeconds(); _self.heros.forEach(function(item, index) { if (item.status != 'offline' && timestamp - item.timestamp > _self.heroHeartTimeout) { _self.heroStatus[item.status] --; _self.heros[index].status = 'offline'; _self.heroStatus.offline ++; common.log('Hero %s is offline, last heart beat at %s', item.name, item.timestamp); } }); }, { scheduled: false }); cronjob.start(); common.log('Cronjob of hero heart check started.'); } //自动重新加载配置文件 autoReloadConfigs() { let _self = this; const frequence = typeof(this.config.reloadConfigFrequence) != 'undefined' && this.config.reloadConfigFrequence ? this.config.reloadConfigFrequence : 300; //5 分钟重新加载一次 const cronjob = cron.schedule(`*/${frequence} * * * * *`, () => { const forceReload = true; _self.getConfig(forceReload); }, { scheduled: false }); cronjob.start(); common.log('Cronjob of config auto reload started.'); } //定期清理过期的任务 autoCleanExpiredTasks() { let _self = this; const frequence = typeof(this.config.autoCleanTaskFrequence) != 'undefined' && this.config.autoCleanTaskFrequence ? this.config.autoCleanTaskFrequence : 600; //10 分钟检查一次 const cronjob = cron.schedule(`*/${frequence} * * * * *`, () => { let timestamp = common.getTimestampInSeconds(); let tasksLeft = _self.tasks.reduce(function(accumulator, item) { if ( (item.status == 'done' || item.status == 'failed') && timestamp - item.created > _self.task_cache_time ) { _self.taskStatus[item.status] --; _self.taskStatus.total --; common.log('Task %s is expired, which is created at %s', item.id, item.created); }else { accumulator.push(item); } return accumulator; }, []); if (tasksLeft) { _self.tasks = tasksLeft; } }, { scheduled: false }); cronjob.start(); common.log('Cronjob of auto clean expired tasks started.'); } //定期尝试给已完成状态的任务notify_url发送通知回调 autoNotifyTasks() { let _self = this; const frequence = typeof(this.config.autoNotifyTaskFrequence) != 'undefined' && this.config.autoNotifyTaskFrequence ? this.config.autoNotifyTaskFrequence : 120; //2 分钟检查一次 const cronjob = cron.schedule(`*/${frequence} * * * * *`, () => { let task = _self.tasks.find((item) => item.status == 'done' && item.notified == false && item.notify_time < _self.notify_max_try); if (task) { _self.handleTaskDone(task); } }, { scheduled: false }); cronjob.start(); common.log('Cronjob of auto notify done tasks started.'); } //获取联盟状态 getStats() { this.stats.taskStatus = this.taskStatus; this.stats.heroStatus = this.heroStatus; this.stats.run_seconds = common.getTimestampInSeconds() - this.stats.start_time; return this.stats; } //获取爬虫列表 getHeros(page, limit) { if (typeof(page) == 'undefined') { page = 1; } if (typeof(limit) == 'undefined') { limit = 20; } if (page < 1) { page = 1; } if (limit > 100) { limit = 100; } let start = (page - 1)*limit, end = start + limit; if (start >= this.heros.length) { return []; } if (end > this.heros.length) { end = this.heros.length; }else if (end > this.max_list_hero_num) { end = this.max_list_hero_num; } //根据心跳时间从新到旧排序 this.heros.sort(function(itemA, itemB) { if (itemA.timestamp > itemB.timestamp) { return -1; }else if (itemA.timestamp < itemB.timestamp){ return 1; } return 0; }); return this.heros.slice(start, end); } getHeroByName(bot_name) { return this.heros.find((item) => item.name == bot_name); } //初始化 async init() { await this.getConfig(); this.autoReloadConfigs(); this.heroHeartCheck(); this.autoCleanExpiredTasks(); this.autoNotifyTasks(); } } export default HeroUnion;