ScopeSentry/api/vulnerability.py

65 lines
2.2 KiB
Python

# -------------------------------------
# @file : vulnerability.py
# @author : Autumn
# @contact : rainy-autumn@outlook.com
# @time : 2024/4/27 13:25
# -------------------------------------------
from fastapi import APIRouter, Depends
from motor.motor_asyncio import AsyncIOMotorCursor
from pymongo import DESCENDING
from api.users import verify_token
from core.db import get_mongo_db
from core.util import search_to_mongodb, get_search_query
from loguru import logger
router = APIRouter()
@router.post("/vul/data")
async def get_vul_data(request_data: dict, db=Depends(get_mongo_db), _: dict = Depends(verify_token)):
try:
page_index = request_data.get("pageIndex", 1)
page_size = request_data.get("pageSize", 10)
query = await get_search_query("vul", request_data)
if query == "":
return {"message": "Search condition parsing error", "code": 500}
# Get the total count of documents matching the search criteria
total_count = await db.vulnerability.count_documents(query)
if total_count == 0:
return {
"code": 200,
"data": {
'list': [],
'total': 0
}
}
# Perform pagination query
cursor: AsyncIOMotorCursor = db.vulnerability.find(query).skip((page_index - 1) * page_size).limit(page_size).sort([("time", DESCENDING)])
result = await cursor.to_list(length=None)
# Process the result as needed
response_data = []
for doc in result:
data = {
"id": str(doc["_id"]),
"url": doc["url"],
"vulnerability": doc["vulname"],
"matched": doc["matched"],
"time": doc["time"],
"request": doc["request"],
"response": doc["response"],
"level": doc['level']
}
response_data.append(data)
return {
"code": 200,
"data": {
'list': response_data,
'total': total_count
}
}
except Exception as e:
logger.error(str(e))
# Handle exceptions as needed
return {"message": "error","code":500}