From 9d7315e36b735512f1b98cb40fe1b9f418eac1d6 Mon Sep 17 00:00:00 2001 From: "Autumn.home" Date: Sat, 15 Jun 2024 17:17:55 +0800 Subject: [PATCH] add --- api/page_monitoring.py | 13 ++++------- api/scheduled_tasks.py | 1 + api/task.py | 26 ++++++++++++--------- core/redis_handler.py | 53 +++++++++++++++++++++++++++++++++++++++++- main.py | 1 + 5 files changed, 73 insertions(+), 21 deletions(-) diff --git a/api/page_monitoring.py b/api/page_monitoring.py index f9e147a..eb3c729 100644 --- a/api/page_monitoring.py +++ b/api/page_monitoring.py @@ -52,20 +52,15 @@ async def page_monitoring_result(request_data: dict, db=Depends(get_mongo_db), _ cursor: AsyncIOMotorCursor = db.PageMonitoring.find(query, {"_id": 0, "id": {"$toString": "$_id"}, "url": 1, - "diff": {"$arrayElemAt": ["$diff", -1]}}).sort( + "diff": {"$arrayElemAt": ["$diff", -1]}, + "time": 1 + }).sort( [("time", DESCENDING)]).skip((page_index - 1) * page_size).limit(page_size) result = await cursor.to_list(length=None) - result_list = [] - for r in result: - result_list.append({ - "id": r["id"], - "url": r['url'], - "diff": r['diff'], - }) return { "code": 200, "data": { - 'list': result_list, + 'list': result, 'total': total_count } } diff --git a/api/scheduled_tasks.py b/api/scheduled_tasks.py index a043cd6..dc68464 100644 --- a/api/scheduled_tasks.py +++ b/api/scheduled_tasks.py @@ -173,6 +173,7 @@ async def create_page_monitoring_task(): targetList = await get_page_monitoring_data(db, False) if len(targetList) == 0: return + await redis.delete(f"TaskInfo:page_monitoring") await redis.lpush(f"TaskInfo:page_monitoring", *targetList) add_redis_task_data = { "type": 'page_monitoring', diff --git a/api/task.py b/api/task.py index 1dfe31f..d88bb82 100644 --- a/api/task.py +++ b/api/task.py @@ -14,7 +14,7 @@ from motor.motor_asyncio import AsyncIOMotorCursor from core.apscheduler_handler import scheduler from core.db import get_mongo_db -from core.redis_handler import get_redis_pool +from core.redis_handler import get_redis_pool, check_redis_task_target_is_null from core.util import * from api.node import get_redis_online_data from api.page_monitoring import get_page_monitoring_data @@ -22,7 +22,7 @@ router = APIRouter() @router.post("/task/data") -async def get_task_data(request_data: dict, db=Depends(get_mongo_db), _: dict = Depends(verify_token), background_tasks: BackgroundTasks = BackgroundTasks()): +async def get_task_data(request_data: dict, db=Depends(get_mongo_db), _: dict = Depends(verify_token), background_tasks: BackgroundTasks = BackgroundTasks(), redis_con=Depends(get_redis_pool)): try: background_tasks.add_task(task_progress) search_query = request_data.get("search", "") @@ -37,16 +37,21 @@ async def get_task_data(request_data: dict, db=Depends(get_mongo_db), _: dict = # Perform pagination query cursor: AsyncIOMotorCursor = db.task.find(query).skip((page_index - 1) * page_size).limit(page_size).sort([("creatTime", DESCENDING)]) result = await cursor.to_list(length=None) - if len(result) == 0: - return { - "code": 200, - "data": { - 'list': [], - 'total': 0 - } - } # Process the result as needed response_data = [{"id": str(doc["_id"]), "name": doc["name"], "taskNum": doc["taskNum"], "progress": doc["progress"], "creatTime": doc["creatTime"], "endTime": doc["endTime"]} for doc in result] + # response_data = [] + # for doc in result: + # transformed_doc = { + # "id": str(doc["_id"]), + # "name": doc["name"], + # "taskNum": doc["taskNum"], + # "progress": doc["progress"], + # "creatTime": doc["creatTime"], + # "endTime": doc["endTime"] + # } + # # if doc["progress"] != 100: + # # background_tasks.add_task(check_redis_task_target_is_null, str(doc["_id"]), "", redis_con) + # response_data.append(transformed_doc) return { "code": 200, "data": { @@ -234,7 +239,6 @@ async def retest_task(request_data: dict, db=Depends(get_mongo_db), _: dict = De return {"message": "error", "code": 500} - async def create_scan_task(request_data, id, targetList, redis_con): try: request_data["id"] = str(id) diff --git a/core/redis_handler.py b/core/redis_handler.py index 8b3c33f..d1c5dce 100644 --- a/core/redis_handler.py +++ b/core/redis_handler.py @@ -6,6 +6,7 @@ import asyncio import json from loguru import logger import redis.asyncio as redis + from core.db import * from core.util import * import socket @@ -21,7 +22,8 @@ async def get_redis_pool(): socket.TCP_KEEPINTVL: 10, } } - redis_con = await redis.from_url(f"redis://:{REDIS_PASSWORD}@{REDIS_IP}:{REDIS_PORT}", encoding="utf-8", decode_responses=True, **keep_alive_config) + redis_con = await redis.from_url(f"redis://:{REDIS_PASSWORD}@{REDIS_IP}:{REDIS_PORT}", encoding="utf-8", + decode_responses=True, **keep_alive_config) try: yield redis_con finally: @@ -97,8 +99,57 @@ async def check_node_task(node_name, redis_conn): response_data = [] for doc in result: doc["id"] = str(doc["_id"]) + await check_redis_task_target_is_null(doc["id"], doc["target"], redis_conn) response_data.append(doc) for r in response_data: add_redis_task_data = transform_db_redis(r) await redis_conn.rpush(f"NodeTask:{node_name}", json.dumps(add_redis_task_data)) return + + +async def check_redis_task_target_is_null(id, target, redis_conn): + flag = await redis_conn.exists("TaskInfo:{}".format(id)) + if flag: + return + else: + from_check = False + r = {} + if target == "": + from_check = True + async for mongo_client in get_mongo_db(): + r = await mongo_client.task.find_one({"_id": ObjectId(id)}) + target = r.get("target", "") + task_target = [] + for t in target.split("\n"): + key = f"TaskInfo:progress:{id}:{t}" + res = await redis_conn.hgetall(key) + if "scan_end" in res: + continue + else: + task_target.append(t) + await redis_conn.lpush(f"TaskInfo:{id}", *task_target) + if from_check: + try: + if len(r) != 0: + if r['allNode']: + r["node"] = await get_redis_online_data(redis_conn) + add_redis_task_data = transform_db_redis(r) + for name in r["node"]: + await redis_conn.rpush(f"NodeTask:{name}", json.dumps(add_redis_task_data)) + except Exception as e: + logger.error(str(e)) + return + + +async def get_redis_online_data(redis_con): + async with redis_con as redis: + # 获取所有以 node: 开头的键 + keys = await redis.keys("node:*") + # 构建结果字典 + result = [] + for key in keys: + name = key.split(":")[1] + hash_data = await redis.hgetall(key) + if hash_data.get('state') == '1': + result.append(name) + return result diff --git a/main.py b/main.py index f55e5c2..1571ea2 100644 --- a/main.py +++ b/main.py @@ -1,3 +1,4 @@ +import subprocess import time from loguru import logger