Union of hero bots.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

240 lines
6.5 KiB

12 months ago
/**
* Hero管理调度
* --使用流程--
* 1. 本地启动machete_hero爬虫它会主动连接本联盟并加入到爬虫队列等待处理任务爬虫会定时上报自己的状态给联盟
* 2. 联盟收到新任务时存入待处理队列等待在线的爬虫来获取
* 3. 爬虫获取到新任务处理完成后将结果回传给联盟
* 4. 联盟收到爬虫处理结果触发回调通知并将数据结果发送给任务提交者
* 5. 任务提交者可自行根据任务编号来联盟查询任务结果
*
* --并发处理规则--
* 同一个任务可以被分配给多个爬虫
* 同一个任务可以接收不同爬虫回传的数据并完成回调
*
* --数据缓存规则--
* 任务结果数据最大不超过1M超过的当任务处理失败处理
* 任务数据保存最长 1
*/
import fs from 'node:fs';
import { readdir, readFile } from 'node:fs/promises';
import path from 'node:path';
import cron from 'node-cron';
class HeroUnion {
//构造函数,设置默认配置
constructor() {
//this.task_data_dir = path.resolve('./tmp/data/'); //任务数据保存目录
this.task_cache_time = 86400; //任务数据最长缓存时间,单位:秒
this.task_data_max_size = 1024; //任务数据最大字节数,单位:KB
12 months ago
this.stats = {};
this.tasks = [];
this.taskStatus = {
'total': 0,
'waiting': 0,
'running': 0,
'done': 0,
'failed': 0
};
this.statusCode = {
'waiting': '待处理',
'running': '处理中',
'done': '完成',
'failed': '失败'
};
this.supportedPlatforms = {
'douyin': true,
'kuaishou': true,
'xigua': true,
'bilibili': true
};
}
//公用方法
getTimestamp() {
return Math.floor(Date.now());
}
12 months ago
getTimestampInSeconds() {
return Math.floor(Date.now() / 1000);
12 months ago
}
isDataTooLarge(data) {
return JSON.stringify(data).length > this.task_data_max_size * 1024;
}
12 months ago
//--任务相关功能--
//根据任务提交者ID和时间戳生成任务ID编号
generateTaskId(uuid) {
let timestamp = this.getTimestamp();
return `${uuid}_${timestamp}`;
}
isSupportedPlatform(platform) {
return typeof(this.supportedPlatforms[platform]) != 'undefined' && this.supportedPlatforms[platform];
}
12 months ago
//提交新任务
/**
* {
* id: '',
* status: '',
* uuid: '',
* country: '',
* lang: '',
* url: '',
* platform: '',
* data_mode: '',
* notify_url: '',
* results: [],
* created: 0, //timestamp in seconds
* updated: 0 //timestamp in seconds
* }
**/
createTask(uuid, url, platform, data_mode, notify_url, country, lang) {
let timestamp = this.getTimestampInSeconds();
let task = {
id: this.generateTaskId(uuid),
status: 'waiting',
//必选
uuid: uuid,
url: url,
platform: platform,
//可选
data_mode: 'default',
country: 'china',
lang: 'zh-CN',
notify_url: '',
results: [],
created: timestamp,
updated: timestamp
};
if (typeof(data_mode) != 'undefined' && data_mode) {
task.data_mode = data_mode;
}
if (typeof(notify_url) != 'undefined' && notify_url) {
task.notify_url = notify_url;
}
if (typeof(country) != 'undefined' && country) {
task.country = country;
}
if (typeof(lang) != 'undefined' && lang) {
task.lang = lang;
}
this.tasks.push(task);
this.taskStatus.total ++;
this.taskStatus.waiting ++;
return task;
12 months ago
}
//参数均可选,获取 1 个待处理的任务
getWaitingTask(platform, country, lang, data_mode) {
let searchResult = null;
for (const task of this.tasks) {
if (task.status == 'waiting') {
if (typeof(platform) != 'undefined' && platform) {
if (platform == task.platform) {
searchResult = task;
}else {
continue;
}
}
if (typeof(country) != 'undefined' && country) {
if (country == task.country) {
searchResult = task;
}else {
continue;
}
}
if (typeof(lang) != 'undefined' && lang) {
if (lang == task.lang) {
searchResult = task;
}else {
continue;
}
}
if (typeof(data_mode) != 'undefined' && data_mode) {
if (data_mode == task.data_mode) {
searchResult = task;
}else {
continue;
}
}
}
}
12 months ago
return searchResult;
12 months ago
}
//保存任务处理结果
saveTaskById(uuid, id, data) {
let done = false;
let taskIndex = this.tasks.findIndex((item) => item.id == id);
if (taskIndex > -1) {
if (this.isDataTooLarge(data)) {
return false;
}
data.uuid = uuid;
let resIndex = this.tasks[taskIndex].results.findeIndex((dataItem) => dataItem.uuid == uuid);
if (resIndex == -1) {
this.tasks[taskIndex].results.push(data);
}else {
this.tasks[taskIndex].results[resIndex] = data;
}
this.tasks[taskIndex].updated = this.getTimestampInSeconds();
done = true;
}
12 months ago
return done;
12 months ago
}
//查询某个任务的状态及其数据
getTaskById() {
}
//任务完成触发回调通知
async handleTaskDone() {
//当任务完成回传回来的时候调用此方法触发回调通知
}
//--爬虫相关功能--
//接收爬虫状态上报
heroOnboard() {
}
//获取联盟状态
stats() {
return this.stats;
}
}
export default HeroUnion;