* Hero管理、调度
* --使用流程--
* 1. 本地启动machete_hero爬虫,它会主动连接本联盟,并加入到爬虫队列等待处理任务(爬虫会定时上报自己的状态给联盟);
* 2. 联盟收到新任务时,存入待处理队列,等待在线的爬虫来获取;
* 3. 爬虫获取到新任务处理完成后,将结果回传给联盟;
* 4. 联盟收到爬虫处理结果触发回调通知并将数据结果发送给任务提交者;
* 5. 任务提交者可自行根据任务编号来联盟查询任务结果;
* --并发处理规则--
* 同一个任务可以被分配给多个爬虫
* 同一个任务可以接收不同爬虫回传的数据,并完成回调
* --数据缓存规则--
* 任务结果数据最大不超过1M,超过的当任务处理失败处理
* 任务数据保存最长 1 天
import fs from 'node:fs';
import { readdir, readFile } from 'node:fs/promises';
import path from 'node:path';
import cron from 'node-cron';
import axios from 'axios';
import common from './common.mjs';
class HeroUnion {
constructor() {
this.config = null;
//this.task_data_dir = path.resolve('./tmp/data/'); //任务数据保存目录
this.task_cache_time = 86400; //任务数据最长缓存时间,单位:秒
this.task_data_max_size = 1024; //任务数据最大字节数,单位:KB
this.notify_timeout = 8; //回调通知请求超时时长,单位:秒
this.stats = {};
this.heros = []; //hero爬虫队列
this.tasks = []; //任务队列
this.taskStatus = {
'total': 0,
'waiting': 0,
'running': 0,
'done': 0,
'failed': 0
this.statusCode = {
'waiting': '待处理',
'running': '处理中',
'done': '完成',
'failed': '失败'
this.supportedPlatforms = {
'douyin': true,
'kuaishou': true,
'xigua': true,
'bilibili': true
getTimestamp() {
return Math.floor(Date.now());
getTimestampInSeconds() {
return Math.floor(Date.now() / 1000);
isDataTooLarge(data) {
return JSON.stringify(data).length > this.task_data_max_size * 1024;
async getConfig(forceReload) {
if ( !this.config || (typeof(forceReload) != 'undefined' && forceReload) ) {
this.config = await common.getConfigFromJsonFile('config.json');
return this.config;
generateTaskId(uuid) {
let timestamp = this.getTimestamp();
return `${uuid}_${timestamp}`;
isSupportedPlatform(platform) {
return typeof(this.supportedPlatforms[platform]) != 'undefined' && this.supportedPlatforms[platform];
* {
* id: '',
* status: '',
* uuid: '',
* country: '',
* lang: '',
* url: '',
* platform: '',
* data_mode: '', //json, html
* notify_url: '',
* results: [],
* created: 0, //timestamp in seconds
* updated: 0, //timestamp in seconds
* error: ''
* }
createTask(uuid, url, platform, data_mode, notify_url, country, lang) {
let timestamp = this.getTimestampInSeconds();
let task = {
id: this.generateTaskId(uuid),
status: 'waiting',
uuid: uuid,
url: url,
platform: platform,
data_mode: 'default',
country: 'china',
lang: 'zh-CN',
notify_url: '',
results: [],
created: timestamp,
updated: timestamp
if (typeof(data_mode) != 'undefined' && data_mode) {
task.data_mode = data_mode;
if (typeof(notify_url) != 'undefined' && notify_url) {
task.notify_url = notify_url;
if (typeof(country) != 'undefined' && country) {
task.country = country;
if (typeof(lang) != 'undefined' && lang) {
task.lang = lang;
this.taskStatus.total ++;
this.taskStatus.waiting ++;
return task;
//参数均可选,获取 1 个待处理的任务
getWaitingTask(platform, country, lang, data_mode) {
let searchResult = null;
let taskIndex = this.tasks.findIndex(function(item) {
if (typeof(platform) != 'undefined' && platform && item.platform != platform) {
return false;
if (typeof(country) != 'undefined' && country && item.country != country) {
return false;
if (typeof(lang) != 'undefined' && lang && item.lang != lang) {
return false;
if (typeof(data_mode) != 'undefined' && data_mode && task.data_mode != data_mode) {
return false;
return true;
if (taskIndex > -1) {
this.tasks[taskIndex].status = 'running';
searchResult = this.tasks[taskIndex];
return searchResult;
saveTaskById(uuid, id, data) {
let done = false;
let taskIndex = this.tasks.findIndex((item) => item.id == id);
if (taskIndex > -1) {
if (this.isDataTooLarge(data)) {
this.tasks[taskIndex].status = 'failed';
this.tasks[taskIndex].error = 'Result is too large to save.';
return false;
data.uuid = uuid;
let resIndex = this.tasks[taskIndex].results.findeIndex((dataItem) => dataItem.uuid == uuid);
if (resIndex == -1) {
}else {
this.tasks[taskIndex].results[resIndex] = data;
this.tasks[taskIndex].updated = this.getTimestampInSeconds();
this.tasks[taskIndex].status = 'done';
done = true;
return done;
getTaskById(id) {
return this.tasks.find((item) => item.id == id);
async getUserToken(uuid) {
let config = await this.getConfig();
return config && typeof(config.tokens[uuid]) != 'undefined' ? config.tokens[uuid] : '';
async handleTaskDone(id) {
let notified = false;
let task = this.getTaskById(id);
let notify_url = task.notify_url;
try {
if (notify_url && /^http(s)?:\/\/[\w\.]+/i.test(notify_url)) {
let params = {
"task_id": task.id,
"task_result": task.results,
"timestamp": this.getTimestamp(),
let token = await this.getUserToken(task.uuid);
params.sign = common.sign(params, token);
const response = await axios.post(notify_url, params, {timeout: this.notify_timeout*1000});
if (response.status == 200) {
notified = true;
}else {
console.error('[FAILED] Notify to %s failed, response status: %s, status text: %s, result: %s',
notify_url, response.status, response.statusText, response.daa);
}catch(err) {
console.error('[ERROR] Notify to %s failed: %s', notify_url, err);
return notified;
heroOnboard() {
stats() {
return this.stats;
export default HeroUnion;