diff --git a/api/configuration.py b/api/configuration.py index 2c2d968..b9bb0fe 100644 --- a/api/configuration.py +++ b/api/configuration.py @@ -4,7 +4,10 @@ # @version: from bson import ObjectId from fastapi import APIRouter, Depends +from starlette.background import BackgroundTasks +import datetime from api.users import verify_token +from core.apscheduler_handler import scheduler from core.db import get_mongo_db from core.redis_handler import refresh_config from core.config import set_timezone @@ -118,20 +121,87 @@ async def save_system_data(data: dict, db=Depends(get_mongo_db), _: dict = Depen return {"message": "error", "code": 500} -@router.get("/system/deduplication/config") +@router.get("/deduplication/config") async def get_deduplication_config(_: dict = Depends(verify_token), db=Depends(get_mongo_db)): try: - # 查询所有 type 为 "system" 的文档 - cursor = await db.config.find_one({"name": "deduplication"}) - deduplication_data = {} - - async for document in cursor: - deduplication_data[document["name"]] = document["value"] + job = scheduler.get_job("deduplication") + next_rune_time = "" + if job is not None: + next_rune_time = scheduler.get_job("deduplication").next_run_time.strftime("%Y-%m-%d %H:%M:%S") + result = await db.config.find_one({"name": "deduplication"}) + result["next_run_time"] = next_rune_time + result.pop("_id") return { "code": 200, - "data": deduplication_data + "data": result } except Exception as e: logger.error(str(e)) # 根据需要处理异常 - return {"message": "error", "code": 500} \ No newline at end of file + return {"message": "error", "code": 500} + + +@router.post("/deduplication/save") +async def save_deduplication_config(request_data: dict, _: dict = Depends(verify_token), db=Depends(get_mongo_db), background_tasks: BackgroundTasks = BackgroundTasks()): + try: + run_now = request_data.get("runNow", False) + request_data.pop("runNow") + await db.config.update_one( + {"name": "deduplication"}, + {"$set": request_data}, + upsert=True + ) + job = scheduler.get_job("deduplication") + if job is not None: + scheduler.remove_job("deduplication") + if request_data.get('flag', False): + scheduler.add_job(do_asset_deduplication, 'interval', hours=request_data.get('hour', 3), + id='deduplication', jobstore='mongo') + if run_now: + background_tasks.add_task(do_asset_deduplication) + return {"message": "Data saved successfully", "code": 200} + except Exception as e: + logger.error(str(e)) + return {"message": "error", "code": 500} + + +async def do_asset_deduplication(): + async for db in get_mongo_db(): + result = await db.config.find_one({"name": "deduplication"}) + print(result) + + +async def asset_data_dedup(db, filters, groups): + # db[].update_many({}, {'$set': {'process_flag': timestamp}}) + # 去重http资产 + timestamp = datetime.datetime.now() + db['asset'].update_many({}, {'$set': {'process_flag': timestamp}}) + filter = { + "process_flag": timestamp + } + for f in filter: + filter[f] = filters[f] + group = {} + for g in groups: + group[g] = "$" + groups[g] + + pipeline = [ + { + "$match": filter + }, + { + '$sort': {'_id': -1} + }, + { + '$group': { + '_id': group, + 'latestId': {'$first': '$_id'} + } + }, + { + '$project': {'_id': 0, 'latestId': 1} + } + ] + latest_ids = [] + for doc in db['asset'].aggregate(pipeline): + latest_ids.append(doc['latestId']) \ No newline at end of file diff --git a/main.py b/main.py index e32a986..ed0973d 100644 --- a/main.py +++ b/main.py @@ -136,6 +136,17 @@ async def read_root(): return FileResponse("static/index.html") +# @app.on_event("shutdown") +# async def shutdown_event(): +# global subscriber_task +# if subscriber_task: +# subscriber_task.cancel() +# try: +# await subscriber_task +# except asyncio.CancelledError: +# pass + + class MongoDBQueryTimeMiddleware(BaseHTTPMiddleware): async def dispatch(self, request: Request, call_next): start_time = time.time()