ScopeSentry/api/task.py

583 lines
24 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding:utf-8 -*-  
# @name: sensitive
# @auth: rainy-autumn@outlook.com
# @version:
import asyncio
import json
import traceback
from loguru import logger
from bson import ObjectId
from fastapi import APIRouter, Depends, BackgroundTasks
from pymongo import DESCENDING
from api.users import verify_token
from motor.motor_asyncio import AsyncIOMotorCursor
from core.apscheduler_handler import scheduler
from core.db import get_mongo_db
from core.redis_handler import get_redis_pool, check_redis_task_target_is_null
from core.util import *
from api.node import get_redis_online_data, get_node_all
from api.page_monitoring import get_page_monitoring_data
router = APIRouter()
@router.post("/task/data")
async def get_task_data(request_data: dict, db=Depends(get_mongo_db), _: dict = Depends(verify_token), background_tasks: BackgroundTasks = BackgroundTasks(), redis_con=Depends(get_redis_pool)):
try:
background_tasks.add_task(task_progress)
search_query = request_data.get("search", "")
page_index = request_data.get("pageIndex", 1)
page_size = request_data.get("pageSize", 10)
# Fuzzy search based on the name field
query = {"name": {"$regex": search_query, "$options": "i"}}
# Get the total count of documents matching the search criteria
total_count = await db.task.count_documents(query)
# Perform pagination query
cursor: AsyncIOMotorCursor = db.task.find(query).skip((page_index - 1) * page_size).limit(page_size).sort([("creatTime", DESCENDING)])
result = await cursor.to_list(length=None)
# Process the result as needed
response_data = [{"id": str(doc["_id"]), "name": doc["name"], "taskNum": doc["taskNum"], "progress": doc["progress"], "creatTime": doc["creatTime"], "endTime": doc["endTime"]} for doc in result]
# response_data = []
# for doc in result:
# transformed_doc = {
# "id": str(doc["_id"]),
# "name": doc["name"],
# "taskNum": doc["taskNum"],
# "progress": doc["progress"],
# "creatTime": doc["creatTime"],
# "endTime": doc["endTime"]
# }
# # if doc["progress"] != 100:
# # background_tasks.add_task(check_redis_task_target_is_null, str(doc["_id"]), "", redis_con)
# response_data.append(transformed_doc)
return {
"code": 200,
"data": {
'list': response_data,
'total': total_count
}
}
except Exception as e:
logger.error(str(e))
# Handle exceptions as needed
return {"message": "error","code":500}
@router.post("/task/add")
async def add_task(request_data: dict, db=Depends(get_mongo_db), _: dict = Depends(verify_token), redis_con=Depends(get_redis_pool)):
try:
name = request_data.get("name")
cursor = db.task.find({"name": {"$eq": name}}, {"_id": 1})
results = await cursor.to_list(length=None)
if len(results) != 0:
return {"code": 400, "message": "name already exists"}
target = request_data.get("target", "")
node = request_data.get("node")
if name == "" or target == "" or node == []:
return {"message": "Null", "code": 500}
scheduledTasks = request_data.get("scheduledTasks", False)
hour = request_data.get("hour", 1)
targetList = []
targetTmp = ""
for t in target.split("\n"):
t.replace("http://","").replace("https://","")
t = t.strip("\n").strip("\r").strip()
if t != "" and t not in targetList:
targetList.append(t)
targetTmp += t + "\n"
taskNum = len(targetList)
request_data['taskNum'] = taskNum
request_data['target'] = targetTmp.strip("\n")
request_data['progress'] = 0
request_data["creatTime"] = get_now_time()
request_data["endTime"] = ""
if "All Poc" in request_data['vulList']:
request_data['vulList'] = ["All Poc"]
result = await db.task.insert_one(request_data)
# Check if the insertion was successful
if result.inserted_id:
if scheduledTasks:
scheduler.add_job(scheduler_scan_task, 'interval', hours=hour, args=[str(result.inserted_id)],
id=str(result.inserted_id), jobstore='mongo')
next_time = scheduler.get_job(str(result.inserted_id)).next_run_time
formatted_time = next_time.strftime("%Y-%m-%d %H:%M:%S")
db.ScheduledTasks.insert_one(
{"id": str(result.inserted_id), "name": name, 'hour': hour, 'type': 'Scan', 'state': True, 'lastTime': get_now_time(), 'nextTime': formatted_time, 'runner_id': str(result.inserted_id)})
f = await create_scan_task(request_data, result.inserted_id, targetList, redis_con)
if f:
return {"code": 200, "message": "Task added successfully"}
else:
return {"code": 400, "message": "Failed to add Task"}
else:
return {"code": 400, "message": "Failed to add Task"}
except Exception as e:
logger.error(str(e))
# Handle exceptions as needed
return {"message": "error", "code": 500}
@router.post("/task/content")
async def task_content(request_data: dict, db=Depends(get_mongo_db), _: dict = Depends(verify_token)):
try:
# Get the ID from the request data
task_id = request_data.get("id")
# Check if ID is provided
if not task_id:
return {"message": "ID is missing in the request data", "code": 400}
# Query the database for content based on ID
query = {"_id": ObjectId(task_id)}
doc = await db.task.find_one(query)
if not doc:
return {"message": "Content not found for the provided ID", "code": 404}
result = {
"name": doc.get("name", ""),
"target": doc.get("target", ""),
"node": doc.get("node", []),
"subdomainScan": doc.get("subdomainScan", False),
"subdomainConfig": doc.get("subdomainConfig", []),
"urlScan": doc.get("urlScan", False),
"sensitiveInfoScan": doc.get("sensitiveInfoScan", False),
"pageMonitoring": doc.get("pageMonitoring", ""),
"crawlerScan": doc.get("crawlerScan", False),
"vulScan": doc.get("vulScan", False),
"vulList": doc.get("vulList", []),
"portScan": doc.get("portScan"),
"ports": doc.get("ports"),
"waybackurl": doc.get("waybackurl"),
"dirScan": doc.get("dirScan"),
"scheduledTasks": doc.get("scheduledTasks"),
"hour": doc.get("hour"),
"duplicates": doc.get("duplicates")
}
return {"code": 200, "data": result}
except Exception as e:
logger.error(str(e))
# Handle exceptions as needed
return {"message": "error", "code": 500}
@router.post("/task/delete")
async def delete_task(request_data: dict, db=Depends(get_mongo_db), _: dict = Depends(verify_token), redis_con=Depends(get_redis_pool), background_tasks: BackgroundTasks = BackgroundTasks()):
try:
# Extract the list of IDs from the request_data dictionary
task_ids = request_data.get("ids", [])
delA = request_data.get("delA", False)
# Convert the provided rule_ids to ObjectId
obj_ids = []
redis_key = []
for task_id in task_ids:
obj_ids.append(ObjectId(task_id))
redis_key.append("TaskInfo:" + task_id)
job = scheduler.get_job(task_id)
if job:
scheduler.remove_job(task_id)
await db.ScheduledTasks.delete_many({"id": {"$in": task_ids}})
if delA:
background_tasks.add_task(delete_asset, task_ids)
await redis_con.delete(*redis_key)
# Delete the SensitiveRule documents based on the provided IDs
result = await db.task.delete_many({"_id": {"$in": obj_ids}})
# Check if the deletion was successful
if result.deleted_count > 0:
return {"code": 200, "message": "Task deleted successfully"}
else:
return {"code": 404, "message": "Task not found"}
except Exception as e:
logger.error(str(e))
# Handle exceptions as needed
return {"message": "error", "code": 500}
@router.post("/task/retest")
async def retest_task(request_data: dict, db=Depends(get_mongo_db), _: dict = Depends(verify_token), redis_con=Depends(get_redis_pool)):
try:
# Get the ID from the request data
task_id = request_data.get("id")
# Check if ID is provided
if not task_id:
return {"message": "ID is missing in the request data", "code": 400}
# Query the database for content based on ID
query = {"_id": ObjectId(task_id)}
doc = await db.task.find_one(query)
if not doc:
return {"message": "Content not found for the provided ID", "code": 404}
target = doc['target']
targetList = target.split("\n")
keys_to_delete = [
f"TaskInfo:tmp:{task_id}",
f"TaskInfo:{task_id}",
f"TaskInfo:time:{task_id}",
f"duplicates:url:{task_id}",
f"duplicates:domain:{task_id}",
f"duplicates:sensresp:{task_id}",
f"duplicates:craw:{task_id}"
]
progresskeys = await redis_con.keys(f"TaskInfo:progress:{task_id}:*")
keys_to_delete.extend(progresskeys)
if keys_to_delete:
await redis_con.delete(*keys_to_delete)
f = await create_scan_task(doc, task_id, targetList, redis_con)
if f:
update_document = {
"$set": {
"progress": 0,
"creatTime": get_now_time(),
"endTime": ""
}
}
await db.task.update_one({"_id": ObjectId(task_id)}, update_document)
return {"code": 200, "message": "Task added successfully"}
else:
return {"code": 400, "message": "Failed to add Task"}
except Exception as e:
logger.error(str(e))
# Handle exceptions as needed
return {"message": "error", "code": 500}
async def create_scan_task(request_data, id, targetList, redis_con):
try:
request_data["id"] = str(id)
if request_data['allNode']:
request_data["node"] = await get_node_all(redis_con)
keys_to_delete = [
f"TaskInfo:tmp:{id}",
f"TaskInfo:{id}",
f"TaskInfo:time:{id}",
f"duplicates:url:{id}",
f"duplicates:domain:{id}",
f"duplicates:sensresp:{id}",
f"duplicates:craw:{id}"
]
progresskeys = await redis_con.keys(f"TaskInfo:progress:{id}:*")
keys_to_delete.extend(progresskeys)
if keys_to_delete:
await redis_con.delete(*keys_to_delete)
add_redis_task_data = transform_db_redis(request_data)
async with redis_con as redis:
await redis.lpush(f"TaskInfo:{id}", *targetList)
for name in request_data["node"]:
await redis.rpush(f"NodeTask:{name}", json.dumps(add_redis_task_data))
return True
except Exception as e:
logger.error(str(e))
# Handle exceptions as needed
return False
@router.post("/task/update")
async def update_task_data(request_data: dict, db=Depends(get_mongo_db), _: dict = Depends(verify_token)):
try:
# Get the ID from the request data
task_id = request_data.get("id")
hour = request_data.get("hour")
# Check if ID is provided
if not task_id:
return {"message": "ID is missing in the request data", "code": 400}
query = {"id": task_id}
doc = await db.ScheduledTasks.find_one(query)
oldScheduledTasks = doc["state"]
old_hour = doc["hour"]
newScheduledTasks = request_data.get("scheduledTasks")
if oldScheduledTasks != newScheduledTasks:
if newScheduledTasks:
scheduler.add_job(scheduler_scan_task, 'interval', hours=hour, args=[task_id],
id=str(task_id), jobstore='mongo')
await db.ScheduledTasks.update_one({"id": task_id}, {"$set": {'state': True}})
else:
scheduler.remove_job(task_id)
await db.ScheduledTasks.update_one({"id": task_id}, {"$set": {'state': False}})
if newScheduledTasks:
if hour != old_hour:
await db.ScheduledTasks.update_one({"id": task_id}, {"$set": {'hour': hour}})
job = scheduler.get_job(task_id)
if job is not None:
scheduler.remove_job(task_id)
scheduler.add_job(scheduler_scan_task, 'interval', hours=hour, args=[task_id],
id=str(task_id), jobstore='mongo')
else:
scheduler.add_job(scheduler_scan_task, 'interval', hours=hour, args=[task_id],
id=str(task_id), jobstore='mongo')
request_data.pop("id")
update_document = {
"$set": request_data
}
result = await db.task.update_one({"_id": ObjectId(task_id)}, update_document)
# Check if the update was successful
if result:
return {"message": "Task updated successfully", "code": 200}
else:
return {"message": "Failed to update data", "code": 404}
except Exception as e:
logger.error(str(e))
logger.error(traceback.format_exc())
return {"message": "error", "code": 500}
async def task_progress():
async for db in get_mongo_db():
async for redis in get_redis_pool():
query = {"progress": {"$ne": 100}}
cursor: AsyncIOMotorCursor = db.task.find(query)
result = await cursor.to_list(length=None)
if len(result) == 0:
return True
for r in result:
id = str(r["_id"])
key = f"TaskInfo:tmp:{id}"
exists = await redis.exists(key)
if exists:
count = await redis.llen(key)
progress_tmp = round(count / r['taskNum'], 2)
progress_tmp = round(progress_tmp * 100, 1)
if progress_tmp > 100:
progress_tmp = 100
if progress_tmp == 100:
time_key = f"TaskInfo:time:{id}"
time_value = await redis.get(time_key)
await db.task.update_one({"_id": r["_id"]}, {"$set": {"endTime": time_value}})
await db.task.update_one({"_id": r["_id"]}, {"$set": {"progress": progress_tmp}})
else:
await db.task.update_one({"_id": r["_id"]}, {"$set": {"progress": 0}})
return
# @router.post("/task/progress/info")
# async def progress_info(request_data: dict, _: dict = Depends(verify_token), redis_con=Depends(get_redis_pool), db=Depends(get_mongo_db)):
# task_id = request_data.get("id")
# type = request_data.get("type")
# runner = request_data.get("runner")
# # Check if ID is provided
# if not task_id:
# return {"message": "ID is missing in the request data", "code": 400}
# query = {"_id": ObjectId(task_id)}
# if type == "scan":
# doc = await db.task.find_one(query)
# else:
# doc = await db.project.find_one(query)
# target_data = await db.ProjectTargetData.find_one({"id": task_id})
# doc["target"] = target_data["target"]
# if not doc:
# return {"message": "Content not found for the provided ID", "code": 404}
# target = doc['target']
# result_list = []
# for t in target.split("\n"):
# progress_result = {
# "subdomain": ["", ""],
# "subdomainTakeover": ["", ""],
# "portScan": ["", ""],
# "assetMapping": ["", ""],
# "urlScan": ["", ""],
# "sensitive": ["", ""],
# "crawler": ["", ""],
# "dirScan": ["", ""],
# "vulnerability": ["", ""],
# "all": ["", ""],
# "target": ""
# }
# if not doc['subdomainScan']:
# progress_result['subdomain'] = ['', '', '']
# if not doc['portScan']:
# progress_result['portScan'] = ['', '', '']
# if not doc['urlScan']:
# progress_result['urlScan'] = ['', '', '']
# if not doc['sensitiveInfoScan']:
# progress_result['sensitive'] = ['', '', '']
# if not doc['crawlerScan']:
# progress_result['crawler'] = ['', '', '']
# if not doc['dirScan']:
# progress_result['dirScan'] = ['', '', '']
# if not doc['vulScan']:
# progress_result['vulnerability'] = ['', '', '']
# if runner != "":
# key = "TaskInfo:progress:" + runner + ":" + t
# else:
# key = "TaskInfo:progress:" + task_id + ":" + t
# data = await redis_con.hgetall(key)
# progress_result["target"] = t
# if not data:
# result_list.append(progress_result)
# else:
# progress_result['subdomain'][0] = data.get("subdomain_start","")
# progress_result['subdomain'][1] = data.get("subdomain_end", "")
# progress_result['subdomainTakeover'][0] = data.get("subdomainTakeover_start", "")
# progress_result['subdomainTakeover'][1] = data.get("subdomainTakeover_end", "")
# progress_result['portScan'][0] = data.get("portScan_start", "")
# progress_result['portScan'][1] = data.get("portScan_end", "")
# progress_result['assetMapping'][0] = data.get("assetMapping_start", "")
# progress_result['assetMapping'][1] = data.get("assetMapping_end", "")
# progress_result['urlScan'][0] = data.get("urlScan_start", "")
# progress_result['urlScan'][1] = data.get("urlScan_end", "")
# progress_result['sensitive'][0] = data.get("sensitive_start", "")
# progress_result['sensitive'][1] = data.get("sensitive_end", "")
# progress_result['crawler'][0] = data.get("crawler_start", "")
# progress_result['crawler'][1] = data.get("crawler_end", "")
# progress_result['dirScan'][0] = data.get("dirScan_start", "")
# progress_result['dirScan'][1] = data.get("dirScan_end", "")
# progress_result['vulnerability'][0] = data.get("vulnerability_start", "")
# progress_result['vulnerability'][1] = data.get("vulnerability_end", "")
# progress_result['all'][0] = data.get("scan_start", "")
# progress_result['all'][1] = data.get("scan_end", "")
# result_list.append(progress_result)
# return {
# "code": 200,
# "data": {
# 'list': result_list,
# "total": len(result_list)
# }
# }
@router.post("/task/progress/info")
async def progress_info(request_data: dict, _: dict = Depends(verify_token), redis_con=Depends(get_redis_pool),
db=Depends(get_mongo_db)):
task_id = request_data.get("id")
type = request_data.get("type")
runner = request_data.get("runner")
if not task_id:
return {"message": "ID is missing in the request data", "code": 400}
query = {"_id": ObjectId(task_id)}
if type == "scan":
doc = await db.task.find_one(query)
else:
doc, target_data = await asyncio.gather(
db.project.find_one(query),
db.ProjectTargetData.find_one({"id": task_id})
)
if target_data:
doc["target"] = target_data["target"]
if not doc:
return {"message": "Content not found for the provided ID", "code": 404}
target = doc['target']
result_list = []
tasks = []
for t in target.split("\n"):
key = f"TaskInfo:progress:{runner or task_id}:{t}"
tasks.append(redis_con.hgetall(key))
redis_results = await asyncio.gather(*tasks)
for t, data in zip(target.split("\n"), redis_results):
progress_result = {
"subdomain": ["", ""],
"subdomainTakeover": ["", ""],
"portScan": ["", ""],
"assetMapping": ["", ""],
"urlScan": ["", ""],
"sensitive": ["", ""],
"crawler": ["", ""],
"dirScan": ["", ""],
"vulnerability": ["", ""],
"all": ["", ""],
"target": t
}
if not data:
result_list.append(progress_result)
continue
progress_result['subdomain'][0] = data.get("subdomain_start", "")
progress_result['subdomain'][1] = data.get("subdomain_end", "")
progress_result['subdomainTakeover'][0] = data.get("subdomainTakeover_start", "")
progress_result['subdomainTakeover'][1] = data.get("subdomainTakeover_end", "")
progress_result['portScan'][0] = data.get("portScan_start", "")
progress_result['portScan'][1] = data.get("portScan_end", "")
progress_result['assetMapping'][0] = data.get("assetMapping_start", "")
progress_result['assetMapping'][1] = data.get("assetMapping_end", "")
progress_result['urlScan'][0] = data.get("urlScan_start", "")
progress_result['urlScan'][1] = data.get("urlScan_end", "")
progress_result['sensitive'][0] = data.get("sensitive_start", "")
progress_result['sensitive'][1] = data.get("sensitive_end", "")
progress_result['crawler'][0] = data.get("crawler_start", "")
progress_result['crawler'][1] = data.get("crawler_end", "")
progress_result['dirScan'][0] = data.get("dirScan_start", "")
progress_result['dirScan'][1] = data.get("dirScan_end", "")
progress_result['vulnerability'][0] = data.get("vulnerability_start", "")
progress_result['vulnerability'][1] = data.get("vulnerability_end", "")
progress_result['all'][0] = data.get("scan_start", "")
progress_result['all'][1] = data.get("scan_end", "")
result_list.append(progress_result)
return {
"code": 200,
"data": {
'list': result_list,
"total": len(result_list)
}
}
async def scheduler_scan_task(id):
logger.info(f"Scheduler scan {id}")
async for db in get_mongo_db():
async for redis in get_redis_pool():
next_time = scheduler.get_job(id).next_run_time
formatted_time = next_time.strftime("%Y-%m-%d %H:%M:%S")
doc = await db.ScheduledTasks.find_one({"id": id})
run_id_last = doc.get("runner_id", "")
if run_id_last != "" and id != run_id_last:
progresskeys = await redis.keys(f"TaskInfo:progress:{run_id_last}:*")
for pgk in progresskeys:
await redis.delete(pgk)
task_id = generate_random_string(15)
update_document = {
"$set": {
"lastTime": get_now_time(),
"nextTime": formatted_time,
"runner_id": task_id
}
}
await db.ScheduledTasks.update_one({"id": id}, update_document)
query = {"_id": ObjectId(id)}
doc = await db.task.find_one(query)
targetList = []
for t in doc['target'].split("\n"):
t.replace("http://", "").replace("https://", "")
t = t.strip("\n").strip("\r").strip()
if t != "" and t not in targetList:
targetList.append(t)
await create_scan_task(doc, task_id, targetList, redis)
async def delete_asset(task_ids, is_project = False):
async for db in get_mongo_db():
key = ["asset", "subdomain", "SubdoaminTakerResult", "UrlScan", "crawler", "SensitiveResult", "DirScanResult", "vulnerability", "PageMonitoring"]
del_query = {"taskId": {"$in": task_ids}}
if is_project:
del_query = {
"$or": [
{"taskId": {"$in": task_ids}},
{"project": {"$in": task_ids}}
]
}
for k in key:
result = await db[k].delete_many(del_query)
if result.deleted_count > 0:
logger.info("Deleted {} {} documents".format(k, result.deleted_count))
else:
logger.info("Deleted {} None documents".format(k))