/** * Hero管理、调度 * --使用流程-- * 1. 本地启动machete_hero爬虫,它会主动连接本联盟,并加入到爬虫队列等待处理任务(爬虫会定时上报自己的状态给联盟); * 2. 联盟收到新任务时,存入待处理队列,等待在线的爬虫来获取; * 3. 爬虫获取到新任务处理完成后,将结果回传给联盟; * 4. 联盟收到爬虫处理结果触发回调通知并将数据结果发送给任务提交者; * 5. 任务提交者可自行根据任务编号来联盟查询任务结果; * * --并发处理规则-- * 同一个任务可以被分配给多个爬虫 * 同一个任务可以接收不同爬虫回传的数据,并完成回调 * * --数据缓存规则-- * 任务结果数据最大不超过1M,超过的当任务处理失败处理 * 任务数据保存最长 1 天 */ import fs from 'node:fs'; import { readdir, readFile } from 'node:fs/promises'; import path from 'node:path'; import cron from 'node-cron'; import axios from 'axios'; import common from './common.mjs'; class HeroUnion { //构造函数,设置默认配置 constructor() { //this.task_data_dir = path.resolve('./tmp/data/'); //任务数据保存目录 this.task_cache_time = 86400; //任务数据最长缓存时间,单位:秒 this.task_data_max_size = 1024; //任务数据最大字节数,单位:KB this.notify_timeout = 8; //回调通知请求超时时长,单位:秒 this.stats = {}; this.heros = []; //hero爬虫队列 this.tasks = []; //任务队列 this.taskStatus = { 'total': 0, 'waiting': 0, 'running': 0, 'done': 0, 'failed': 0 }; this.statusCode = { 'waiting': '待处理', 'running': '处理中', 'done': '完成', 'failed': '失败' }; this.supportedPlatforms = { 'douyin': true, 'kuaishou': true, 'xigua': true, 'bilibili': true }; } //公用方法 getTimestamp() { return Math.floor(Date.now()); } getTimestampInSeconds() { return Math.floor(Date.now() / 1000); } isDataTooLarge(data) { return JSON.stringify(data).length > this.task_data_max_size * 1024; } //--任务相关功能-- //根据任务提交者ID和时间戳生成任务ID编号 generateTaskId(uuid) { let timestamp = this.getTimestamp(); return `${uuid}_${timestamp}`; } isSupportedPlatform(platform) { return typeof(this.supportedPlatforms[platform]) != 'undefined' && this.supportedPlatforms[platform]; } //提交新任务 /** * { * id: '', * status: '', * uuid: '', * country: '', * lang: '', * url: '', * platform: '', * data_mode: '', //json, html * notify_url: '', * results: [], * created: 0, //timestamp in seconds * updated: 0, //timestamp in seconds * error: '' * } **/ createTask(uuid, url, platform, data_mode, notify_url, country, lang) { let timestamp = this.getTimestampInSeconds(); let task = { id: this.generateTaskId(uuid), status: 'waiting', //必选 uuid: uuid, url: url, platform: platform, //可选 data_mode: 'default', country: 'china', lang: 'zh-CN', notify_url: '', results: [], created: timestamp, updated: timestamp }; if (typeof(data_mode) != 'undefined' && data_mode) { task.data_mode = data_mode; } if (typeof(notify_url) != 'undefined' && notify_url) { task.notify_url = notify_url; } if (typeof(country) != 'undefined' && country) { task.country = country; } if (typeof(lang) != 'undefined' && lang) { task.lang = lang; } this.tasks.push(task); this.taskStatus.total ++; this.taskStatus.waiting ++; return task; } //参数均可选,获取 1 个待处理的任务 getWaitingTask(platform, country, lang, data_mode) { let searchResult = null; let taskIndex = this.tasks.findIndex(function(item) { if (typeof(platform) != 'undefined' && platform && item.platform != platform) { return false; } if (typeof(country) != 'undefined' && country && item.country != country) { return false; } if (typeof(lang) != 'undefined' && lang && item.lang != lang) { return false; } if (typeof(data_mode) != 'undefined' && data_mode && task.data_mode != data_mode) { return false; } return true; }); if (taskIndex > -1) { this.tasks[taskIndex].status = 'running'; searchResult = this.tasks[taskIndex]; } return searchResult; } //保存任务处理结果 saveTaskById(uuid, id, data) { let done = false; let taskIndex = this.tasks.findIndex((item) => item.id == id); if (taskIndex > -1) { if (this.isDataTooLarge(data)) { this.tasks[taskIndex].status = 'failed'; this.tasks[taskIndex].error = 'Result is too large to save.'; return false; } data.uuid = uuid; let resIndex = this.tasks[taskIndex].results.findeIndex((dataItem) => dataItem.uuid == uuid); if (resIndex == -1) { this.tasks[taskIndex].results.push(data); }else { this.tasks[taskIndex].results[resIndex] = data; } this.tasks[taskIndex].updated = this.getTimestampInSeconds(); this.tasks[taskIndex].status = 'done'; done = true; } return done; } //查询某个任务的状态及其数据 getTaskById(id) { return this.tasks.find((item) => item.id == id); } //TODO: 根据uuid获取用户的签名密钥 getUserToken(uuid) { return 'token'; } //任务完成触发回调通知 async handleTaskDone(id) { let notified = false; let task = this.getTaskById(id); let notify_url = task.notify_url; try { if (notify_url && /^http(s)?:\/\/[\w\.]+/i.test(notify_url)) { let params = { "task_id": task.id, "task_result": task.results, "timestamp": this.getTimestamp(), }; params.sign = common.sign(params, this.getUserToken(task.uuid)); const response = await axios.post(notify_url, params, {timeout: this.notify_timeout*1000}); if (response.status == 200) { notified = true; }else { console.error('Notify to %s failed, response status: %s, status text: %s, result: %s', notify_url, response.status, response.statusText, response.daa); } } }catch(err) { console.error('Notify to %s failed: %s', notify_url, err); } return notified; } //--爬虫相关功能-- //接收爬虫状态上报 heroOnboard() { } //获取联盟状态 stats() { return this.stats; } } export default HeroUnion;