Union of hero bots.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

368 lines
11 KiB

12 months ago
/**
* Hero管理调度
* --使用流程--
* 1. 本地启动machete_hero爬虫它会主动连接本联盟并加入到爬虫队列等待处理任务爬虫会定时上报自己的状态给联盟
* 2. 联盟收到新任务时存入待处理队列等待在线的爬虫来获取
* 3. 爬虫获取到新任务处理完成后将结果回传给联盟
* 4. 联盟收到爬虫处理结果触发回调通知并将数据结果发送给任务提交者
* 5. 任务提交者可自行根据任务编号来联盟查询任务结果
*
* --并发处理规则--
* 同一个任务可以被分配给多个爬虫
* 同一个任务可以接收不同爬虫回传的数据并完成回调
*
* --数据缓存规则--
* 任务结果数据最大不超过1M超过的当任务处理失败处理
* 任务数据保存最长 1
*/
import fs from 'node:fs';
import { readdir, readFile } from 'node:fs/promises';
import path from 'node:path';
import cron from 'node-cron';
import axios from 'axios';
import common from './common.mjs';
12 months ago
class HeroUnion {
//构造函数,设置默认配置
constructor() {
this.config = null;
//this.task_data_dir = path.resolve('./tmp/data/'); //任务数据保存目录
this.task_cache_time = 86400; //任务数据最长缓存时间,单位:秒
this.task_data_max_size = 1024; //任务数据最大字节数,单位:KB
this.notify_timeout = 8; //回调通知请求超时时长,单位:秒
this.stats = {
start_time: common.getTimestampInSeconds()
};
8 months ago
this.heros = []; //hero爬虫队列
this.tasks = []; //任务队列
//任务相关数据
this.taskStatus = {
'total': 0,
'waiting': 0,
'running': 0,
'done': 0,
'failed': 0
};
this.statusCode = {
'waiting': '待处理',
'running': '处理中',
'done': '完成',
'failed': '失败'
};
this.supportedPlatforms = {
'douyin': true,
'kuaishou': true,
'xigua': true,
'bilibili': true
};
//爬虫相关数据
this.heroHeartTimeout = 600; //爬虫心跳超时时长,单位:秒
this.heroStatus = {
'total': 0,
'idle': 0, //空闲
'busy': 0, //繁忙
'offline': 0 //离线
};
}
isDataTooLarge(data) {
return JSON.stringify(data).length > this.task_data_max_size * 1024;
}
async getConfig(forceReload) {
if ( !this.config || (typeof(forceReload) != 'undefined' && forceReload) ) {
this.config = await common.getConfigFromJsonFile('config.json');
}
return this.config;
}
12 months ago
//--任务相关功能--
//根据任务提交者ID和时间戳生成任务ID编号
generateTaskId(uuid) {
let timestamp = common.getTimestamp();
return `${uuid}_${timestamp}`;
}
isSupportedPlatform(platform) {
return typeof(this.supportedPlatforms[platform]) != 'undefined' && this.supportedPlatforms[platform];
}
12 months ago
//提交新任务
/**
* {
* id: '',
* status: '',
* uuid: '',
* country: '',
* lang: '',
* url: '',
* platform: '',
8 months ago
* data_mode: '', //json, html
* notify_url: '',
* results: [],
* created: 0, //timestamp in seconds
11 months ago
* updated: 0, //timestamp in seconds
* error: ''
* }
**/
createTask(uuid, url, platform, data_mode, notify_url, country, lang) {
let timestamp = common.getTimestampInSeconds();
let task = {
id: this.generateTaskId(uuid),
status: 'waiting',
//必选
uuid: uuid,
url: url,
platform: platform,
//可选
data_mode: 'default',
country: 'china',
lang: 'zh-CN',
notify_url: '',
results: [],
created: timestamp,
updated: timestamp
};
if (typeof(data_mode) != 'undefined' && data_mode) {
task.data_mode = data_mode;
}
if (typeof(notify_url) != 'undefined' && notify_url) {
task.notify_url = notify_url;
}
if (typeof(country) != 'undefined' && country) {
task.country = country;
}
if (typeof(lang) != 'undefined' && lang) {
task.lang = lang;
}
this.tasks.push(task);
this.taskStatus.total ++;
this.taskStatus.waiting ++;
return task;
12 months ago
}
//参数均可选,获取 1 个待处理的任务
getWaitingTask(platform, country, lang, data_mode) {
let searchResult = null;
let taskIndex = this.tasks.findIndex(function(item) {
if (typeof(platform) != 'undefined' && platform && item.platform != platform) {
return false;
}
11 months ago
if (typeof(country) != 'undefined' && country && item.country != country) {
return false;
}
if (typeof(lang) != 'undefined' && lang && item.lang != lang) {
return false;
}
if (typeof(data_mode) != 'undefined' && data_mode && task.data_mode != data_mode) {
return false;
}
return true;
});
if (taskIndex > -1) {
11 months ago
this.tasks[taskIndex].status = 'running';
searchResult = this.tasks[taskIndex];
11 months ago
}
12 months ago
return searchResult;
12 months ago
}
//保存任务处理结果
saveTaskById(uuid, id, data) {
let done = false;
let taskIndex = this.tasks.findIndex((item) => item.id == id);
if (taskIndex > -1) {
if (this.isDataTooLarge(data)) {
11 months ago
this.tasks[taskIndex].status = 'failed';
this.tasks[taskIndex].error = 'Result is too large to save.';
return false;
}
data.uuid = uuid;
let resIndex = this.tasks[taskIndex].results.findeIndex((dataItem) => dataItem.uuid == uuid);
if (resIndex == -1) {
this.tasks[taskIndex].results.push(data);
}else {
this.tasks[taskIndex].results[resIndex] = data;
}
this.tasks[taskIndex].updated = common.getTimestampInSeconds();
11 months ago
this.tasks[taskIndex].status = 'done';
done = true;
}
12 months ago
return done;
12 months ago
}
//查询某个任务的状态及其数据
11 months ago
getTaskById(id) {
return this.tasks.find((item) => item.id == id);
12 months ago
}
//根据uuid获取用户的签名密钥
async getUserToken(uuid) {
let config = await this.getConfig();
return config && typeof(config.tokens[uuid]) != 'undefined' ? config.tokens[uuid] : '';
}
12 months ago
//任务完成触发回调通知
async handleTaskDone(id) {
let notified = false;
let task = this.getTaskById(id);
let notify_url = task.notify_url;
try {
if (notify_url && /^http(s)?:\/\/[\w\.]+/i.test(notify_url)) {
let params = {
"task_id": task.id,
"task_result": task.results,
"timestamp": common.getTimestamp(),
};
let token = await this.getUserToken(task.uuid);
params.sign = common.sign(params, token);
const response = await axios.post(notify_url, params, {timeout: this.notify_timeout*1000});
if (response.status == 200) {
notified = true;
}else {
console.error('[FAILED] Notify to %s failed, response status: %s, status text: %s, result: %s',
notify_url, response.status, response.statusText, response.daa);
}
}
}catch(err) {
console.error('[ERROR] Notify to %s failed: %s', notify_url, err);
}
12 months ago
return notified;
12 months ago
}
//--爬虫相关功能--
//接收爬虫状态上报
/**
* bot爬虫属性
* name
* description
* status: [idle, busy]
* timestamp
* country
* lang
*/
heroOnboard(bot) {
let cachedBot = this.heros.find((item) => item.name == bot.name);
if (cachedBot) { //如果是已经存在的爬虫
if (cachedBot.status != bot.status) {
if (bot.status == 'idle') {
this.heroStatus.idle ++;
this.heroStatus.busy --;
}else {
this.heroStatus.busy ++;
this.heroStatus.idle --;
}
}
cachedBot = bot; //数据更新
}else {
this.heros.push(bot); //添加新爬虫
this.heroStatus.total ++;
if (bot.status == 'idle') {
this.heroStatus.idle ++;
}else {
this.heroStatus.busy ++;
}
}
}
//定期检查爬虫是否在线
//如果上一次上报状态时间在10分钟前,则设置该爬虫已下线
heroHeartCheck() {
let _self = this;
const frequence = 60; //1 分钟检查一次
const cronjob = cron.schedule(`*/${frequence} * * * * *`, () => {
let timestamp = common.getTimestampInSeconds();
_self.heros.forEach(function(item, index) {
if (item.status != 'offline' && timestamp - item.timestamp > _self.heroHeartTimeout) {
_self.heroStatus[item.status] --;
_self.heros[index].status = 'offline';
_self.heroStatus.offline ++;
console.log('Hero %s is offline, its last heart beat is %s', item.name, item.timestamp);
}
});
}, {
scheduled: false
});
12 months ago
cronjob.start();
console.log('Cronjob of hero heart check started.');
}
//自动重新加载配置文件
autoReloadConfigs() {
let _self = this;
const frequence = 300; //5 分钟重新加载一次
const cronjob = cron.schedule(`*/${frequence} * * * * *`, () => {
const forceReload = true;
_self.getConfig(forceReload);
}, {
scheduled: false
});
cronjob.start();
console.log('Cronjob of config auto reload started.');
12 months ago
}
//获取联盟状态
getStats() {
this.stats.taskStatus = this.taskStatus;
this.stats.heroStatus = this.heroStatus;
this.stats.run_seconds = common.getTimestampInSeconds() - this.stats.start_time;
12 months ago
return this.stats;
}
//初始化
init() {
this.getConfig();
this.autoReloadConfigs();
this.heroHeartCheck();
}
12 months ago
}
export default HeroUnion;