This commit is contained in:
Autumn.home 2024-07-05 23:12:50 +08:00
parent 0618ef2dc7
commit e108019446
3 changed files with 99 additions and 61 deletions

View File

@ -328,20 +328,11 @@ async def asset_data_statistics(request_data: dict, db=Depends(get_mongo_db), _:
@router.post("/subdomain/data") @router.post("/subdomain/data")
async def asset_data(request_data: dict, db=Depends(get_mongo_db), _: dict = Depends(verify_token)): async def asset_data(request_data: dict, db=Depends(get_mongo_db), _: dict = Depends(verify_token)):
try: try:
search_query = request_data.get("search", "")
page_index = request_data.get("pageIndex", 1) page_index = request_data.get("pageIndex", 1)
page_size = request_data.get("pageSize", 10) page_size = request_data.get("pageSize", 10)
keyword = { query = await get_search_query("subdomain", request_data)
'domain': 'host', if query == "":
'ip': 'ip',
'type': 'type',
'project': 'project',
'value': 'value'
}
query = await search_to_mongodb(search_query, keyword)
if query == "" or query is None:
return {"message": "Search condition parsing error", "code": 500} return {"message": "Search condition parsing error", "code": 500}
query = query[0]
total_count = await db['subdomain'].count_documents(query) total_count = await db['subdomain'].count_documents(query)
cursor: AsyncIOMotorCursor = ((db['subdomain'].find(query, {"_id": 0, cursor: AsyncIOMotorCursor = ((db['subdomain'].find(query, {"_id": 0,
"id": {"$toString": "$_id"}, "id": {"$toString": "$_id"},

View File

@ -2,6 +2,8 @@
# @name: configuration # @name: configuration
# @auth: rainy-autumn@outlook.com # @auth: rainy-autumn@outlook.com
# @version: # @version:
import traceback
from bson import ObjectId from bson import ObjectId
from fastapi import APIRouter, Depends from fastapi import APIRouter, Depends
from starlette.background import BackgroundTasks from starlette.background import BackgroundTasks
@ -12,6 +14,9 @@ from core.db import get_mongo_db
from core.redis_handler import refresh_config from core.redis_handler import refresh_config
from core.config import set_timezone from core.config import set_timezone
from loguru import logger from loguru import logger
from core.util import generate_random_string
router = APIRouter() router = APIRouter()
@router.get("/subfinder/data") @router.get("/subfinder/data")
@ -174,7 +179,7 @@ async def do_asset_deduplication():
result.pop("flag") result.pop("flag")
f_g_k = { f_g_k = {
"DirScanResult": { "DirScanResult": {
"filters": [], "filters": {},
"groups": ["url", "status", "msg"] "groups": ["url", "status", "msg"]
}, },
# "PageMonitoring": { # "PageMonitoring": {
@ -186,24 +191,24 @@ async def do_asset_deduplication():
# "groups": ["url"] # "groups": ["url"]
# }, # },
"SubdoaminTakerResult": { "SubdoaminTakerResult": {
"filters": [], "filters": {},
"groups": ["input", "value"] "groups": ["input", "value"]
}, },
"UrlScan": { "UrlScan": {
"filters": [], "filters": {},
"groups": ["output"] "groups": ["output"]
}, },
"asset": { "asset": {
"filters": [], "filters": {},
"groups": [""] "groups": [""]
}, },
"crawler": { "crawler": {
"filters": [], "filters": {},
"groups": ["url", "body"] "groups": ["url", "body"]
}, },
"subdomain": { "subdomain": {
"filters": [], "filters": {},
"groups": ["host", "type", "ip"] "groups": ["host", "type", "sorted_ip"]
}, },
# "vulnerability": { # "vulnerability": {
# "filters": [], # "filters": [],
@ -215,56 +220,91 @@ async def do_asset_deduplication():
if r in f_g_k: if r in f_g_k:
if r == "asset": if r == "asset":
# http资产去重 # http资产去重
http_filter = [{"type": {"$ne": "other"}}] http_filter = {"type": {"$ne": "other"}}
http_group = ["url", "statuscode", "hashes.body_mmh3"] http_group = ["url", "statuscode", "hashes.body_mmh3"]
await asset_data_dedup(db, r, http_filter, http_group) await asset_data_dedup(db, r, http_filter, http_group)
other_filter = [{"type":"other"}] other_filter = {"type":"other"}
other_group = ["host", "ip", "protocol"] other_group = ["host", "ip", "protocol"]
await asset_data_dedup(db, r, other_filter, other_group) await asset_data_dedup(db, r, other_filter, other_group)
elif r == "subdomain":
await asset_data_dedup(db, r, f_g_k[r]['filters'], f_g_k[r]['groups'], True)
else: else:
await asset_data_dedup(db, r, f_g_k[r]['filters'], f_g_k[r]['groups']) await asset_data_dedup(db, r, f_g_k[r]['filters'], f_g_k[r]['groups'])
async def asset_data_dedup(db, collection_name, filters, groups): async def asset_data_dedup(db, collection_name, filters, groups, subdomain = False):
# db[].update_many({}, {'$set': {'process_flag': timestamp}}) # db[].update_many({}, {'$set': {'process_flag': timestamp}})
# 去重http资产 # 去重http资产
logger.info(f"{collection_name} 开始去重") try:
collection = db[collection_name] logger.info(f"{collection_name} 开始去重")
timestamp = datetime.datetime.now() collection = db[collection_name]
collection.update_many({}, {'$set': {'process_flag': timestamp}}) timestamp = datetime.datetime.now()
filter = { process_flag = generate_random_string(12)
"process_flag": timestamp await collection.update_many(filters, {'$set': {'process_flag': process_flag}})
} filter = {
for f in filter: "process_flag": process_flag
filter[f] = filters[f]
group = {}
for g in groups:
group[g.replace(".")] = "$" + g
pipeline = [
{
"$match": filter
},
{
'$sort': {'_id': -1}
},
{
'$group': {
'_id': group,
'latestId': {'$first': '$_id'}
}
},
{
'$project': {'_id': 0, 'latestId': 1}
} }
] for f in filters:
latest_ids = [] filter[f] = filters[f]
for doc in collection.aggregate(pipeline): group = {}
latest_ids.append(doc['latestId']) for g in groups:
collection.update_many({'_id': {'$in': latest_ids}}, {'$set': {'latest': True}}) group[g.replace(".", "")] = "$" + g
collection.delete_many({'process_flag': timestamp, 'latest': {'$ne': True}}) pipeline = [
collection.update_many({'process_flag': timestamp}, {'$unset': {'process_flag': "", 'latest': ""}}) {
timestamp2 = datetime.datetime.now() "$match": filter
time_difference = timestamp2 - timestamp },
time_difference_in_seconds = time_difference.total_seconds() {
logger.info(f"{collection_name} 去重消耗时间: {time_difference_in_seconds}") '$sort': {'_id': -1}
},
{
'$group': {
'_id': group,
'latestId': {'$first': '$_id'}
}
},
{
'$project': {'_id': 0, 'latestId': 1}
}
]
if subdomain:
pipeline = [
{
"$match": filter
},
{
"$addFields": {
"sorted_ip": {"$sortArray": {"input": "$ip", "sortBy": 1}}
}
},
{
'$sort': {'_id': -1}
},
{
'$group': {
'_id': group,
'latestId': {'$first': '$_id'}
}
},
{
'$project': {'_id': 0, 'latestId': 1}
}
]
latest_ids = []
async for doc in collection.aggregate(pipeline):
latest_ids.append(doc['latestId'])
logger.info(f"Latest IDs: {len(latest_ids)}")
result = await collection.update_many({'_id': {'$in': latest_ids}}, {'$set': {'latest': True}})
if result:
logger.info(f"更新唯一记录: {result.modified_count}")
result = await collection.delete_many({'process_flag': process_flag, 'latest': {'$ne': True}})
if result:
logger.info(f"删除重复项:: {result.deleted_count}")
await collection.update_many({'process_flag': process_flag}, {'$unset': {'process_flag': "", 'latest': ""}})
timestamp2 = datetime.datetime.now()
time_difference = timestamp2 - timestamp
time_difference_in_seconds = time_difference.total_seconds()
logger.info(f"{collection_name} 去重消耗时间: {time_difference_in_seconds}")
except Exception as e:
logger.error(traceback.format_exc())
logger.error(f"{collection_name} 去重出错")

View File

@ -347,6 +347,13 @@ async def get_search_query(name, request_data):
'request': 'request', 'request': 'request',
'response': 'response', 'response': 'response',
'level': 'level' 'level': 'level'
},
'subdomain': {
'domain': 'host',
'ip': 'ip',
'type': 'type',
'project': 'project',
'value': 'value'
} }
} }
keyword = search_key_v[name] keyword = search_key_v[name]
@ -354,7 +361,7 @@ async def get_search_query(name, request_data):
if query == "" or query is None: if query == "" or query is None:
return "" return ""
query = query[0] query = query[0]
filter_key = ['color', 'status', 'level'] filter_key = ['color', 'status', 'level', 'type']
filter = request_data.get("filter", {}) filter = request_data.get("filter", {})
if filter: if filter:
query["$and"] = [] query["$and"] = []