Union of hero bots.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

397 lines
13 KiB

/**
* Hero管理、调度
* --使用流程--
* 1. 本地启动machete_hero爬虫,它会主动连接本联盟,并加入到爬虫队列等待处理任务(爬虫会定时上报自己的状态给联盟);
* 2. 联盟收到新任务时,存入待处理队列,等待在线的爬虫来获取;
* 3. 爬虫获取到新任务处理完成后,将结果回传给联盟;
* 4. 联盟收到爬虫处理结果触发回调通知并将数据结果发送给任务提交者;
* 5. 任务提交者可自行根据任务编号来联盟查询任务结果;
*
* --并发处理规则--
* 同一个任务可以被分配给多个爬虫
* 同一个任务可以接收不同爬虫回传的数据,并完成回调
*
* --数据缓存规则--
* 任务结果数据最大不超过1M,超过的当任务处理失败处理
* 任务数据保存最长 1 天
*/
import fs from 'node:fs';
import { readdir, readFile } from 'node:fs/promises';
import path from 'node:path';
import cron from 'node-cron';
import axios from 'axios';
import common from './common.mjs';
class HeroUnion {
//构造函数,设置默认配置
constructor() {
this.config = null;
//默认配置
//this.task_data_dir = path.resolve('./tmp/data/'); //任务数据保存目录
this.task_cache_time = 86400; //任务数据最长缓存时间,单位:秒
this.task_data_max_size = 1024; //任务数据最大字节数,单位:KB
this.notify_timeout = 8; //回调通知请求超时时长,单位:秒
this.heroHeartTimeout = 600; //爬虫心跳超时时长,单位:秒
this.stats = {
start_time: common.getTimestampInSeconds()
};
this.heros = []; //hero爬虫队列
this.tasks = []; //任务队列
//任务相关数据
this.taskStatus = {
'total': 0,
'waiting': 0,
'running': 0,
'done': 0,
'failed': 0
};
this.statusCode = {
'waiting': '待处理',
'running': '处理中',
'done': '完成',
'failed': '失败'
};
this.supportedPlatforms = {
'douyin': true,
'kuaishou': true,
'xigua': true,
'bilibili': true
};
//爬虫相关数据
this.heroStatus = {
'total': 0,
'idle': 0, //空闲
'busy': 0, //繁忙
'offline': 0 //离线
};
}
isDataTooLarge(data) {
return JSON.stringify(data).length > this.task_data_max_size * 1024;
}
async getConfig(forceReload) {
if ( !this.config || (typeof(forceReload) != 'undefined' && forceReload) ) {
this.config = await common.getConfigFromJsonFile('config.json');
//覆盖默认配置
if (typeof(this.config.task_cache_time) != 'undefined' && this.config.task_cache_time) {
this.task_cache_time = this.config.task_cache_time; //任务数据最长缓存时间,单位:秒
}
if (typeof(this.config.task_data_max_size) != 'undefined' && this.config.task_data_max_size) {
this.task_data_max_size = this.config.task_data_max_size; //任务数据最大字节数,单位:KB
}
if (typeof(this.config.notify_timeout) != 'undefined' && this.config.notify_timeout) {
this.notify_timeout = this.config.notify_timeout; //回调通知请求超时时长,单位:秒
}
if (typeof(this.config.heroHeartTimeout) != 'undefined' && this.config.heroHeartTimeout) {
this.heroHeartTimeout = this.config.heroHeartTimeout; //爬虫心跳超时时长,单位:秒
}
}
return this.config;
}
//--任务相关功能--
//根据任务提交者ID和时间戳生成任务ID编号
generateTaskId(uuid) {
let timestamp = common.getTimestamp();
return `${uuid}_${timestamp}`;
}
isSupportedPlatform(platform) {
return typeof(this.supportedPlatforms[platform]) != 'undefined' && this.supportedPlatforms[platform];
}
//提交新任务
/**
* {
* id: '',
* status: '',
* uuid: '',
* country: '',
* lang: '',
* url: '',
* platform: '', //目标网址所属平台,具体参考爬虫所支持的平台
* contract: '', //需要抓取的数据合约,凡是支持此合约的爬虫将根据合约内容抓取数据
* data_mode: '', //json, html
* notify_url: '',
* results: [],
* created: 0, //timestamp in seconds
* updated: 0, //timestamp in seconds
* error: ''
* }
**/
createTask(uuid, url, platform, contract, data_mode, notify_url, country, lang) {
let timestamp = common.getTimestampInSeconds();
let task = {
id: this.generateTaskId(uuid),
status: 'waiting',
//必选
uuid: uuid,
url: url,
platform: platform,
contract: contract,
//可选
data_mode: 'json',
country: 'cn',
lang: 'zh',
notify_url: '',
results: [],
created: timestamp,
updated: timestamp
};
if (typeof(data_mode) != 'undefined' && data_mode) {
task.data_mode = data_mode;
}
if (typeof(notify_url) != 'undefined' && notify_url) {
task.notify_url = notify_url;
}
if (typeof(country) != 'undefined' && country) {
task.country = country;
}
if (typeof(lang) != 'undefined' && lang) {
task.lang = lang;
}
this.tasks.push(task);
this.taskStatus.total ++;
this.taskStatus.waiting ++;
return task;
}
//参数均可选,获取 1 个待处理的任务
getWaitingTask(platform, contract, country, lang, data_mode) {
let searchResult = null;
let taskIndex = this.tasks.findIndex(function(item) {
if (typeof(platform) != 'undefined' && platform && item.platform != platform) {
return false;
}
if (typeof(contract) != 'undefined' && contract && item.contract != contract) {
return false;
}
if (typeof(country) != 'undefined' && country && item.country != country) {
return false;
}
if (typeof(lang) != 'undefined' && lang && item.lang != lang) {
return false;
}
if (typeof(data_mode) != 'undefined' && data_mode && task.data_mode != data_mode) {
return false;
}
return true;
});
if (taskIndex > -1) {
this.tasks[taskIndex].status = 'running';
searchResult = this.tasks[taskIndex];
}
return searchResult;
}
//保存任务处理结果
saveTaskById(uuid, id, data) {
let done = false;
let taskIndex = this.tasks.findIndex((item) => item.id == id);
if (taskIndex > -1) {
if (this.isDataTooLarge(data)) {
this.tasks[taskIndex].status = 'failed';
this.tasks[taskIndex].error = 'Result is too large to save.';
return false;
}
data.uuid = uuid;
let resIndex = this.tasks[taskIndex].results.findeIndex((dataItem) => dataItem.uuid == uuid);
if (resIndex == -1) {
this.tasks[taskIndex].results.push(data);
}else {
this.tasks[taskIndex].results[resIndex] = data;
}
this.tasks[taskIndex].updated = common.getTimestampInSeconds();
this.tasks[taskIndex].status = 'done';
done = true;
}
return done;
}
//查询某个任务的状态及其数据
getTaskById(id) {
return this.tasks.find((item) => item.id == id);
}
//根据uuid获取用户的签名密钥
async getUserToken(uuid) {
let config = await this.getConfig();
return config && typeof(config.tokens[uuid]) != 'undefined' ? config.tokens[uuid] : '';
}
//任务完成触发回调通知
async handleTaskDone(id) {
let notified = false;
let task = this.getTaskById(id);
let notify_url = task.notify_url;
try {
if (notify_url && /^http(s)?:\/\/[\w\.]+/i.test(notify_url)) {
let params = {
"task_id": task.id,
"task_result": task.results,
"timestamp": common.getTimestamp(),
};
let token = await this.getUserToken(task.uuid);
params.sign = common.sign(params, token);
const response = await axios.post(notify_url, params, {timeout: this.notify_timeout*1000});
if (response.status == 200) {
notified = true;
}else {
common.error('[FAILED] Notify to %s failed, response status: %s, status text: %s, result: %s',
notify_url, response.status, response.statusText, response.data);
}
}
}catch(err) {
common.error('[ERROR] Notify to %s failed: %s', notify_url, err);
}
return notified;
}
//--爬虫相关功能--
//接收爬虫状态上报
/**
* bot爬虫属性
* name
* description
* status: [idle, busy]
* platforms: [], //支持的平台,可由爬虫定义
* contracts: [], //支持的数据抓取合约,具体内容由爬虫定义
* timestamp
* country
* lang
* contact
*/
heroOnboard(bot) {
let cachedBotIndex = this.heros.findIndex((item) => item.name == bot.name),
cachedBot = cachedBotIndex > -1 ? this.heros[cachedBotIndex] : null;
if (cachedBot) { //如果是已经存在的爬虫
if (cachedBot.status != bot.status) {
common.log('Hero %s status change from %s to %s', cachedBot.name, cachedBot.status, bot.status);
this.heroStatus[cachedBot.status] --;
this.heroStatus[bot.status] ++;
}
this.heros[cachedBotIndex] = bot; //数据更新
common.log('Hero %s is %s at %s', bot.name, bot.status, bot.timestamp);
}else {
this.heros.push(bot); //添加新爬虫
this.heroStatus.total ++;
if (bot.status == 'idle') {
this.heroStatus.idle ++;
}else {
this.heroStatus.busy ++;
}
common.log('Hero %s is onboard at %s', bot.name, bot.timestamp);
}
}
//定期检查爬虫是否在线
//如果上一次上报状态时间在10分钟前,则设置该爬虫已下线
heroHeartCheck() {
let _self = this;
const frequence = typeof(this.config.heroHeartCheckFrequence) != 'undefined'
&& this.config.heroHeartCheckFrequence ? this.config.heroHeartCheckFrequence : 60; //1 分钟检查一次
const cronjob = cron.schedule(`*/${frequence} * * * * *`, () => {
let timestamp = common.getTimestampInSeconds();
_self.heros.forEach(function(item, index) {
if (item.status != 'offline' && timestamp - item.timestamp > _self.heroHeartTimeout) {
_self.heroStatus[item.status] --;
_self.heros[index].status = 'offline';
_self.heroStatus.offline ++;
common.log('Hero %s is offline, last heart beat at %s', item.name, item.timestamp);
}
});
}, {
scheduled: false
});
cronjob.start();
common.log('Cronjob of hero heart check started.');
}
//自动重新加载配置文件
autoReloadConfigs() {
let _self = this;
const frequence = typeof(this.config.reloadConfigFrequence) != 'undefined'
&& this.config.reloadConfigFrequence ? this.config.reloadConfigFrequence : 300; //5 分钟重新加载一次
const cronjob = cron.schedule(`*/${frequence} * * * * *`, () => {
const forceReload = true;
_self.getConfig(forceReload);
}, {
scheduled: false
});
cronjob.start();
common.log('Cronjob of config auto reload started.');
}
//获取联盟状态
getStats() {
this.stats.taskStatus = this.taskStatus;
this.stats.heroStatus = this.heroStatus;
this.stats.run_seconds = common.getTimestampInSeconds() - this.stats.start_time;
return this.stats;
}
//初始化
async init() {
await this.getConfig();
this.autoReloadConfigs();
this.heroHeartCheck();
}
}
export default HeroUnion;