add deduplication
This commit is contained in:
parent
94f59ad438
commit
b31181bd50
|
@ -4,7 +4,10 @@
|
||||||
# @version:
|
# @version:
|
||||||
from bson import ObjectId
|
from bson import ObjectId
|
||||||
from fastapi import APIRouter, Depends
|
from fastapi import APIRouter, Depends
|
||||||
|
from starlette.background import BackgroundTasks
|
||||||
|
import datetime
|
||||||
from api.users import verify_token
|
from api.users import verify_token
|
||||||
|
from core.apscheduler_handler import scheduler
|
||||||
from core.db import get_mongo_db
|
from core.db import get_mongo_db
|
||||||
from core.redis_handler import refresh_config
|
from core.redis_handler import refresh_config
|
||||||
from core.config import set_timezone
|
from core.config import set_timezone
|
||||||
|
@ -118,20 +121,87 @@ async def save_system_data(data: dict, db=Depends(get_mongo_db), _: dict = Depen
|
||||||
return {"message": "error", "code": 500}
|
return {"message": "error", "code": 500}
|
||||||
|
|
||||||
|
|
||||||
@router.get("/system/deduplication/config")
|
@router.get("/deduplication/config")
|
||||||
async def get_deduplication_config(_: dict = Depends(verify_token), db=Depends(get_mongo_db)):
|
async def get_deduplication_config(_: dict = Depends(verify_token), db=Depends(get_mongo_db)):
|
||||||
try:
|
try:
|
||||||
# 查询所有 type 为 "system" 的文档
|
job = scheduler.get_job("deduplication")
|
||||||
cursor = await db.config.find_one({"name": "deduplication"})
|
next_rune_time = ""
|
||||||
deduplication_data = {}
|
if job is not None:
|
||||||
|
next_rune_time = scheduler.get_job("deduplication").next_run_time.strftime("%Y-%m-%d %H:%M:%S")
|
||||||
async for document in cursor:
|
result = await db.config.find_one({"name": "deduplication"})
|
||||||
deduplication_data[document["name"]] = document["value"]
|
result["next_run_time"] = next_rune_time
|
||||||
|
result.pop("_id")
|
||||||
return {
|
return {
|
||||||
"code": 200,
|
"code": 200,
|
||||||
"data": deduplication_data
|
"data": result
|
||||||
}
|
}
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(str(e))
|
logger.error(str(e))
|
||||||
# 根据需要处理异常
|
# 根据需要处理异常
|
||||||
return {"message": "error", "code": 500}
|
return {"message": "error", "code": 500}
|
||||||
|
|
||||||
|
|
||||||
|
@router.post("/deduplication/save")
|
||||||
|
async def save_deduplication_config(request_data: dict, _: dict = Depends(verify_token), db=Depends(get_mongo_db), background_tasks: BackgroundTasks = BackgroundTasks()):
|
||||||
|
try:
|
||||||
|
run_now = request_data.get("runNow", False)
|
||||||
|
request_data.pop("runNow")
|
||||||
|
await db.config.update_one(
|
||||||
|
{"name": "deduplication"},
|
||||||
|
{"$set": request_data},
|
||||||
|
upsert=True
|
||||||
|
)
|
||||||
|
job = scheduler.get_job("deduplication")
|
||||||
|
if job is not None:
|
||||||
|
scheduler.remove_job("deduplication")
|
||||||
|
if request_data.get('flag', False):
|
||||||
|
scheduler.add_job(do_asset_deduplication, 'interval', hours=request_data.get('hour', 3),
|
||||||
|
id='deduplication', jobstore='mongo')
|
||||||
|
if run_now:
|
||||||
|
background_tasks.add_task(do_asset_deduplication)
|
||||||
|
return {"message": "Data saved successfully", "code": 200}
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(str(e))
|
||||||
|
return {"message": "error", "code": 500}
|
||||||
|
|
||||||
|
|
||||||
|
async def do_asset_deduplication():
|
||||||
|
async for db in get_mongo_db():
|
||||||
|
result = await db.config.find_one({"name": "deduplication"})
|
||||||
|
print(result)
|
||||||
|
|
||||||
|
|
||||||
|
async def asset_data_dedup(db, filters, groups):
|
||||||
|
# db[].update_many({}, {'$set': {'process_flag': timestamp}})
|
||||||
|
# 去重http资产
|
||||||
|
timestamp = datetime.datetime.now()
|
||||||
|
db['asset'].update_many({}, {'$set': {'process_flag': timestamp}})
|
||||||
|
filter = {
|
||||||
|
"process_flag": timestamp
|
||||||
|
}
|
||||||
|
for f in filter:
|
||||||
|
filter[f] = filters[f]
|
||||||
|
group = {}
|
||||||
|
for g in groups:
|
||||||
|
group[g] = "$" + groups[g]
|
||||||
|
|
||||||
|
pipeline = [
|
||||||
|
{
|
||||||
|
"$match": filter
|
||||||
|
},
|
||||||
|
{
|
||||||
|
'$sort': {'_id': -1}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
'$group': {
|
||||||
|
'_id': group,
|
||||||
|
'latestId': {'$first': '$_id'}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
'$project': {'_id': 0, 'latestId': 1}
|
||||||
|
}
|
||||||
|
]
|
||||||
|
latest_ids = []
|
||||||
|
for doc in db['asset'].aggregate(pipeline):
|
||||||
|
latest_ids.append(doc['latestId'])
|
11
main.py
11
main.py
|
@ -136,6 +136,17 @@ async def read_root():
|
||||||
return FileResponse("static/index.html")
|
return FileResponse("static/index.html")
|
||||||
|
|
||||||
|
|
||||||
|
# @app.on_event("shutdown")
|
||||||
|
# async def shutdown_event():
|
||||||
|
# global subscriber_task
|
||||||
|
# if subscriber_task:
|
||||||
|
# subscriber_task.cancel()
|
||||||
|
# try:
|
||||||
|
# await subscriber_task
|
||||||
|
# except asyncio.CancelledError:
|
||||||
|
# pass
|
||||||
|
|
||||||
|
|
||||||
class MongoDBQueryTimeMiddleware(BaseHTTPMiddleware):
|
class MongoDBQueryTimeMiddleware(BaseHTTPMiddleware):
|
||||||
async def dispatch(self, request: Request, call_next):
|
async def dispatch(self, request: Request, call_next):
|
||||||
start_time = time.time()
|
start_time = time.time()
|
||||||
|
|
Loading…
Reference in New Issue