From e1080194467544a17e7d13d19041847e555a3de3 Mon Sep 17 00:00:00 2001 From: "Autumn.home" Date: Fri, 5 Jul 2024 23:12:50 +0800 Subject: [PATCH] add --- api/asset_info.py | 13 +--- api/configuration.py | 138 ++++++++++++++++++++++++++++--------------- core/util.py | 9 ++- 3 files changed, 99 insertions(+), 61 deletions(-) diff --git a/api/asset_info.py b/api/asset_info.py index 8882b25..3300495 100644 --- a/api/asset_info.py +++ b/api/asset_info.py @@ -328,20 +328,11 @@ async def asset_data_statistics(request_data: dict, db=Depends(get_mongo_db), _: @router.post("/subdomain/data") async def asset_data(request_data: dict, db=Depends(get_mongo_db), _: dict = Depends(verify_token)): try: - search_query = request_data.get("search", "") page_index = request_data.get("pageIndex", 1) page_size = request_data.get("pageSize", 10) - keyword = { - 'domain': 'host', - 'ip': 'ip', - 'type': 'type', - 'project': 'project', - 'value': 'value' - } - query = await search_to_mongodb(search_query, keyword) - if query == "" or query is None: + query = await get_search_query("subdomain", request_data) + if query == "": return {"message": "Search condition parsing error", "code": 500} - query = query[0] total_count = await db['subdomain'].count_documents(query) cursor: AsyncIOMotorCursor = ((db['subdomain'].find(query, {"_id": 0, "id": {"$toString": "$_id"}, diff --git a/api/configuration.py b/api/configuration.py index cb31936..d7c35fc 100644 --- a/api/configuration.py +++ b/api/configuration.py @@ -2,6 +2,8 @@ # @name: configuration # @auth: rainy-autumn@outlook.com # @version: +import traceback + from bson import ObjectId from fastapi import APIRouter, Depends from starlette.background import BackgroundTasks @@ -12,6 +14,9 @@ from core.db import get_mongo_db from core.redis_handler import refresh_config from core.config import set_timezone from loguru import logger + +from core.util import generate_random_string + router = APIRouter() @router.get("/subfinder/data") @@ -174,7 +179,7 @@ async def do_asset_deduplication(): result.pop("flag") f_g_k = { "DirScanResult": { - "filters": [], + "filters": {}, "groups": ["url", "status", "msg"] }, # "PageMonitoring": { @@ -186,24 +191,24 @@ async def do_asset_deduplication(): # "groups": ["url"] # }, "SubdoaminTakerResult": { - "filters": [], + "filters": {}, "groups": ["input", "value"] }, "UrlScan": { - "filters": [], + "filters": {}, "groups": ["output"] }, "asset": { - "filters": [], + "filters": {}, "groups": [""] }, "crawler": { - "filters": [], + "filters": {}, "groups": ["url", "body"] }, "subdomain": { - "filters": [], - "groups": ["host", "type", "ip"] + "filters": {}, + "groups": ["host", "type", "sorted_ip"] }, # "vulnerability": { # "filters": [], @@ -215,56 +220,91 @@ async def do_asset_deduplication(): if r in f_g_k: if r == "asset": # http资产去重 - http_filter = [{"type": {"$ne": "other"}}] + http_filter = {"type": {"$ne": "other"}} http_group = ["url", "statuscode", "hashes.body_mmh3"] await asset_data_dedup(db, r, http_filter, http_group) - other_filter = [{"type":"other"}] + other_filter = {"type":"other"} other_group = ["host", "ip", "protocol"] await asset_data_dedup(db, r, other_filter, other_group) + elif r == "subdomain": + await asset_data_dedup(db, r, f_g_k[r]['filters'], f_g_k[r]['groups'], True) else: await asset_data_dedup(db, r, f_g_k[r]['filters'], f_g_k[r]['groups']) -async def asset_data_dedup(db, collection_name, filters, groups): +async def asset_data_dedup(db, collection_name, filters, groups, subdomain = False): # db[].update_many({}, {'$set': {'process_flag': timestamp}}) # 去重http资产 - logger.info(f"{collection_name} 开始去重") - collection = db[collection_name] - timestamp = datetime.datetime.now() - collection.update_many({}, {'$set': {'process_flag': timestamp}}) - filter = { - "process_flag": timestamp - } - for f in filter: - filter[f] = filters[f] - group = {} - for g in groups: - group[g.replace(".")] = "$" + g - - pipeline = [ - { - "$match": filter - }, - { - '$sort': {'_id': -1} - }, - { - '$group': { - '_id': group, - 'latestId': {'$first': '$_id'} - } - }, - { - '$project': {'_id': 0, 'latestId': 1} + try: + logger.info(f"{collection_name} 开始去重") + collection = db[collection_name] + timestamp = datetime.datetime.now() + process_flag = generate_random_string(12) + await collection.update_many(filters, {'$set': {'process_flag': process_flag}}) + filter = { + "process_flag": process_flag } - ] - latest_ids = [] - for doc in collection.aggregate(pipeline): - latest_ids.append(doc['latestId']) - collection.update_many({'_id': {'$in': latest_ids}}, {'$set': {'latest': True}}) - collection.delete_many({'process_flag': timestamp, 'latest': {'$ne': True}}) - collection.update_many({'process_flag': timestamp}, {'$unset': {'process_flag': "", 'latest': ""}}) - timestamp2 = datetime.datetime.now() - time_difference = timestamp2 - timestamp - time_difference_in_seconds = time_difference.total_seconds() - logger.info(f"{collection_name} 去重消耗时间: {time_difference_in_seconds}") \ No newline at end of file + for f in filters: + filter[f] = filters[f] + group = {} + for g in groups: + group[g.replace(".", "")] = "$" + g + pipeline = [ + { + "$match": filter + }, + { + '$sort': {'_id': -1} + }, + { + '$group': { + '_id': group, + 'latestId': {'$first': '$_id'} + } + }, + { + '$project': {'_id': 0, 'latestId': 1} + } + ] + if subdomain: + pipeline = [ + { + "$match": filter + }, + { + "$addFields": { + "sorted_ip": {"$sortArray": {"input": "$ip", "sortBy": 1}} + } + }, + { + '$sort': {'_id': -1} + }, + { + '$group': { + '_id': group, + 'latestId': {'$first': '$_id'} + } + }, + { + '$project': {'_id': 0, 'latestId': 1} + } + ] + + latest_ids = [] + async for doc in collection.aggregate(pipeline): + latest_ids.append(doc['latestId']) + logger.info(f"Latest IDs: {len(latest_ids)}") + result = await collection.update_many({'_id': {'$in': latest_ids}}, {'$set': {'latest': True}}) + if result: + logger.info(f"更新唯一记录: {result.modified_count}条") + result = await collection.delete_many({'process_flag': process_flag, 'latest': {'$ne': True}}) + if result: + logger.info(f"删除重复项:: {result.deleted_count}") + await collection.update_many({'process_flag': process_flag}, {'$unset': {'process_flag': "", 'latest': ""}}) + timestamp2 = datetime.datetime.now() + time_difference = timestamp2 - timestamp + time_difference_in_seconds = time_difference.total_seconds() + logger.info(f"{collection_name} 去重消耗时间: {time_difference_in_seconds}") + except Exception as e: + logger.error(traceback.format_exc()) + logger.error(f"{collection_name} 去重出错") diff --git a/core/util.py b/core/util.py index ae5e8bd..aaa6aa7 100644 --- a/core/util.py +++ b/core/util.py @@ -347,6 +347,13 @@ async def get_search_query(name, request_data): 'request': 'request', 'response': 'response', 'level': 'level' + }, + 'subdomain': { + 'domain': 'host', + 'ip': 'ip', + 'type': 'type', + 'project': 'project', + 'value': 'value' } } keyword = search_key_v[name] @@ -354,7 +361,7 @@ async def get_search_query(name, request_data): if query == "" or query is None: return "" query = query[0] - filter_key = ['color', 'status', 'level'] + filter_key = ['color', 'status', 'level', 'type'] filter = request_data.get("filter", {}) if filter: query["$and"] = []