add
This commit is contained in:
parent
55ee7b7a1c
commit
9d7315e36b
|
@ -52,20 +52,15 @@ async def page_monitoring_result(request_data: dict, db=Depends(get_mongo_db), _
|
|||
cursor: AsyncIOMotorCursor = db.PageMonitoring.find(query, {"_id": 0,
|
||||
"id": {"$toString": "$_id"},
|
||||
"url": 1,
|
||||
"diff": {"$arrayElemAt": ["$diff", -1]}}).sort(
|
||||
"diff": {"$arrayElemAt": ["$diff", -1]},
|
||||
"time": 1
|
||||
}).sort(
|
||||
[("time", DESCENDING)]).skip((page_index - 1) * page_size).limit(page_size)
|
||||
result = await cursor.to_list(length=None)
|
||||
result_list = []
|
||||
for r in result:
|
||||
result_list.append({
|
||||
"id": r["id"],
|
||||
"url": r['url'],
|
||||
"diff": r['diff'],
|
||||
})
|
||||
return {
|
||||
"code": 200,
|
||||
"data": {
|
||||
'list': result_list,
|
||||
'list': result,
|
||||
'total': total_count
|
||||
}
|
||||
}
|
||||
|
|
|
@ -173,6 +173,7 @@ async def create_page_monitoring_task():
|
|||
targetList = await get_page_monitoring_data(db, False)
|
||||
if len(targetList) == 0:
|
||||
return
|
||||
await redis.delete(f"TaskInfo:page_monitoring")
|
||||
await redis.lpush(f"TaskInfo:page_monitoring", *targetList)
|
||||
add_redis_task_data = {
|
||||
"type": 'page_monitoring',
|
||||
|
|
26
api/task.py
26
api/task.py
|
@ -14,7 +14,7 @@ from motor.motor_asyncio import AsyncIOMotorCursor
|
|||
|
||||
from core.apscheduler_handler import scheduler
|
||||
from core.db import get_mongo_db
|
||||
from core.redis_handler import get_redis_pool
|
||||
from core.redis_handler import get_redis_pool, check_redis_task_target_is_null
|
||||
from core.util import *
|
||||
from api.node import get_redis_online_data
|
||||
from api.page_monitoring import get_page_monitoring_data
|
||||
|
@ -22,7 +22,7 @@ router = APIRouter()
|
|||
|
||||
|
||||
@router.post("/task/data")
|
||||
async def get_task_data(request_data: dict, db=Depends(get_mongo_db), _: dict = Depends(verify_token), background_tasks: BackgroundTasks = BackgroundTasks()):
|
||||
async def get_task_data(request_data: dict, db=Depends(get_mongo_db), _: dict = Depends(verify_token), background_tasks: BackgroundTasks = BackgroundTasks(), redis_con=Depends(get_redis_pool)):
|
||||
try:
|
||||
background_tasks.add_task(task_progress)
|
||||
search_query = request_data.get("search", "")
|
||||
|
@ -37,16 +37,21 @@ async def get_task_data(request_data: dict, db=Depends(get_mongo_db), _: dict =
|
|||
# Perform pagination query
|
||||
cursor: AsyncIOMotorCursor = db.task.find(query).skip((page_index - 1) * page_size).limit(page_size).sort([("creatTime", DESCENDING)])
|
||||
result = await cursor.to_list(length=None)
|
||||
if len(result) == 0:
|
||||
return {
|
||||
"code": 200,
|
||||
"data": {
|
||||
'list': [],
|
||||
'total': 0
|
||||
}
|
||||
}
|
||||
# Process the result as needed
|
||||
response_data = [{"id": str(doc["_id"]), "name": doc["name"], "taskNum": doc["taskNum"], "progress": doc["progress"], "creatTime": doc["creatTime"], "endTime": doc["endTime"]} for doc in result]
|
||||
# response_data = []
|
||||
# for doc in result:
|
||||
# transformed_doc = {
|
||||
# "id": str(doc["_id"]),
|
||||
# "name": doc["name"],
|
||||
# "taskNum": doc["taskNum"],
|
||||
# "progress": doc["progress"],
|
||||
# "creatTime": doc["creatTime"],
|
||||
# "endTime": doc["endTime"]
|
||||
# }
|
||||
# # if doc["progress"] != 100:
|
||||
# # background_tasks.add_task(check_redis_task_target_is_null, str(doc["_id"]), "", redis_con)
|
||||
# response_data.append(transformed_doc)
|
||||
return {
|
||||
"code": 200,
|
||||
"data": {
|
||||
|
@ -234,7 +239,6 @@ async def retest_task(request_data: dict, db=Depends(get_mongo_db), _: dict = De
|
|||
return {"message": "error", "code": 500}
|
||||
|
||||
|
||||
|
||||
async def create_scan_task(request_data, id, targetList, redis_con):
|
||||
try:
|
||||
request_data["id"] = str(id)
|
||||
|
|
|
@ -6,6 +6,7 @@ import asyncio
|
|||
import json
|
||||
from loguru import logger
|
||||
import redis.asyncio as redis
|
||||
|
||||
from core.db import *
|
||||
from core.util import *
|
||||
import socket
|
||||
|
@ -21,7 +22,8 @@ async def get_redis_pool():
|
|||
socket.TCP_KEEPINTVL: 10,
|
||||
}
|
||||
}
|
||||
redis_con = await redis.from_url(f"redis://:{REDIS_PASSWORD}@{REDIS_IP}:{REDIS_PORT}", encoding="utf-8", decode_responses=True, **keep_alive_config)
|
||||
redis_con = await redis.from_url(f"redis://:{REDIS_PASSWORD}@{REDIS_IP}:{REDIS_PORT}", encoding="utf-8",
|
||||
decode_responses=True, **keep_alive_config)
|
||||
try:
|
||||
yield redis_con
|
||||
finally:
|
||||
|
@ -97,8 +99,57 @@ async def check_node_task(node_name, redis_conn):
|
|||
response_data = []
|
||||
for doc in result:
|
||||
doc["id"] = str(doc["_id"])
|
||||
await check_redis_task_target_is_null(doc["id"], doc["target"], redis_conn)
|
||||
response_data.append(doc)
|
||||
for r in response_data:
|
||||
add_redis_task_data = transform_db_redis(r)
|
||||
await redis_conn.rpush(f"NodeTask:{node_name}", json.dumps(add_redis_task_data))
|
||||
return
|
||||
|
||||
|
||||
async def check_redis_task_target_is_null(id, target, redis_conn):
|
||||
flag = await redis_conn.exists("TaskInfo:{}".format(id))
|
||||
if flag:
|
||||
return
|
||||
else:
|
||||
from_check = False
|
||||
r = {}
|
||||
if target == "":
|
||||
from_check = True
|
||||
async for mongo_client in get_mongo_db():
|
||||
r = await mongo_client.task.find_one({"_id": ObjectId(id)})
|
||||
target = r.get("target", "")
|
||||
task_target = []
|
||||
for t in target.split("\n"):
|
||||
key = f"TaskInfo:progress:{id}:{t}"
|
||||
res = await redis_conn.hgetall(key)
|
||||
if "scan_end" in res:
|
||||
continue
|
||||
else:
|
||||
task_target.append(t)
|
||||
await redis_conn.lpush(f"TaskInfo:{id}", *task_target)
|
||||
if from_check:
|
||||
try:
|
||||
if len(r) != 0:
|
||||
if r['allNode']:
|
||||
r["node"] = await get_redis_online_data(redis_conn)
|
||||
add_redis_task_data = transform_db_redis(r)
|
||||
for name in r["node"]:
|
||||
await redis_conn.rpush(f"NodeTask:{name}", json.dumps(add_redis_task_data))
|
||||
except Exception as e:
|
||||
logger.error(str(e))
|
||||
return
|
||||
|
||||
|
||||
async def get_redis_online_data(redis_con):
|
||||
async with redis_con as redis:
|
||||
# 获取所有以 node: 开头的键
|
||||
keys = await redis.keys("node:*")
|
||||
# 构建结果字典
|
||||
result = []
|
||||
for key in keys:
|
||||
name = key.split(":")[1]
|
||||
hash_data = await redis.hgetall(key)
|
||||
if hash_data.get('state') == '1':
|
||||
result.append(name)
|
||||
return result
|
||||
|
|
Loading…
Reference in New Issue