# ------------------------------------- # @file : scheduled_tasks.py # @author : Autumn # @contact : rainy-autumn@outlook.com # @time : 2024/4/28 20:58 # ------------------------------------------- from apscheduler.events import JobSubmissionEvent, EVENT_JOB_MAX_INSTANCES, EVENT_JOB_SUBMITTED from apscheduler.executors.base import MaxInstancesReachedError from bson import ObjectId from fastapi import APIRouter, Depends from pytz import utc from api.users import verify_token from motor.motor_asyncio import AsyncIOMotorCursor from core.apscheduler_handler import scheduler from core.db import get_mongo_db from core.redis_handler import get_redis_pool from core.util import * from api.node import get_redis_online_data from api.page_monitoring import get_page_monitoring_data router = APIRouter() @router.post("/scheduled/task/data") async def get_scheduled_data(request_data: dict, db=Depends(get_mongo_db), _: dict = Depends(verify_token)): try: search_query = request_data.get("search", "") page_index = request_data.get("pageIndex", 1) page_size = request_data.get("pageSize", 10) # Fuzzy search based on the name field query = {"name": {"$regex": search_query, "$options": "i"}} # Get the total count of documents matching the search criteria total_count = await db.ScheduledTasks.count_documents(query) # Perform pagination query cursor: AsyncIOMotorCursor = db.ScheduledTasks.find(query).skip((page_index - 1) * page_size).limit(page_size) result = await cursor.to_list(length=None) if len(result) == 0: return { "code": 200, "data": { 'list': [], 'total': 0 } } result_list = [] for doc in result: tmp = { "id": doc["id"], "name": doc["name"], "type": doc["type"], "lastTime": doc.get("lastTime", ""), "nextTime": doc.get("nextTime", ""), "state": doc.get("state"), "cycle": doc.get("hour"), "node": doc.get("node", []), "allNode": doc.get("allNode", True), "runner_id": doc.get("runner_id", "") } result_list.append(tmp) return { "code": 200, "data": { 'list': result_list, 'total': total_count } } except Exception as e: logger.error(str(e)) # Handle exceptions as needed return {"message": "error", "code": 500} # @router.post("/scheduled/task/run") # async def scheduled_run(request_data: dict, db=Depends(get_mongo_db), _: dict = Depends(verify_token), # jobstore_alias=None): # try: # id = request_data.get("id", "") # job = scheduler.get_job(id) # if job: # executor = scheduler._lookup_executor(job.executor) # run_times = [datetime.now(utc)] # try: # executor.submit_job(job, run_times) # except MaxInstancesReachedError: # scheduler._logger.warning( # 'Execution of job "%s" skipped: maximum number of running ' # 'instances reached (%d)', job, job.max_instances) # event = JobSubmissionEvent(EVENT_JOB_MAX_INSTANCES, job.id, # jobstore_alias, run_times) # scheduler._dispatch_event(event) # except BaseException: # scheduler._logger.exception('Error submitting job "%s" to executor "%s"', # job, job.executor) # else: # event = JobSubmissionEvent(EVENT_JOB_SUBMITTED, job.id, jobstore_alias, # run_times) # scheduler._dispatch_event(event) # return {"message": "task run success", "code": 200} # else: # return {"message": "Not Found Task", "code": 500} # except: # return {"message": "error", "code": 500} @router.post("/scheduled/task/delete") async def delete_task(request_data: dict, db=Depends(get_mongo_db), _: dict = Depends(verify_token), redis_con=Depends(get_redis_pool)): try: # Extract the list of IDs from the request_data dictionary task_ids = request_data.get("ids", []) # Convert the provided rule_ids to ObjectId obj_ids = [] for task_id in task_ids: if task_id != "page_monitoring": obj_ids.append(task_id) job = scheduler.get_job(task_id) if job: function_name = job.func.__name__ if hasattr(job.func, '__name__') else job.func update_document = { "$set": { "scheduledTasks": False } } if function_name == "scheduler_scan_task": await db.task.update_one({"_id": ObjectId(task_id)}, update_document) else: await db.project.update_one({"_id": ObjectId(task_id)}, update_document) scheduler.remove_job(task_id) result = await db.ScheduledTasks.delete_many({"id": {"$in": obj_ids}}) # Check if the deletion was successful if result.deleted_count > 0: return {"code": 200, "message": "Scheduled Task deleted successfully"} else: return {"code": 404, "message": "Scheduled Task not found"} except Exception as e: logger.error(str(e)) # Handle exceptions as needed return {"message": "error", "code": 500} async def get_page_monitoring_time(): async for db in get_mongo_db(): result = await db.ScheduledTasks.find_one({"id": "page_monitoring"}) time = result['hour'] flag = result['state'] return time, flag async def create_page_monitoring_task(): logger.info("create_page_monitoring_task") async for db in get_mongo_db(): async for redis in get_redis_pool(): name_list = [] result = await db.ScheduledTasks.find_one({"id": "page_monitoring"}) next_time = scheduler.get_job("page_monitoring").next_run_time formatted_time = next_time.strftime("%Y-%m-%d %H:%M:%S") update_document = { "$set": { "lastTime": get_now_time(), "nextTime": formatted_time } } await db.ScheduledTasks.update_one({"_id": result['_id']}, update_document) if result['allNode']: tmp = await get_redis_online_data(redis) name_list += tmp else: name_list += result['node'] targetList = await get_page_monitoring_data(db, False) if len(targetList) == 0: return await redis.delete(f"TaskInfo:page_monitoring") await redis.lpush(f"TaskInfo:page_monitoring", *targetList) add_redis_task_data = { "type": 'page_monitoring', "TaskId": "page_monitoring" } for name in name_list: await redis.rpush(f"NodeTask:{name}", json.dumps(add_redis_task_data)) @router.post("/scheduled/task/pagemonit/data") async def get_scheduled_task_pagemonit_data(request_data: dict, db=Depends(get_mongo_db), _: dict = Depends(verify_token)): try: search_query = request_data.get("search", "") page_index = request_data.get("pageIndex", 1) page_size = request_data.get("pageSize", 10) # Fuzzy search based on the name field query = {"url": {"$regex": search_query, "$options": "i"}} # Get the total count of documents matching the search criteria total_count = await db.PageMonitoring.count_documents(query) # Perform pagination query cursor: AsyncIOMotorCursor = db.PageMonitoring.find(query).skip((page_index - 1) * page_size).limit(page_size) result = await cursor.to_list(length=None) result_list = [] for doc in result: tmp = { "id": str(doc["_id"]), "url": doc["url"] } result_list.append(tmp) return { "code": 200, "data": { 'list': result_list, 'total': total_count } } except Exception as e: logger.error(str(e)) # Handle exceptions as needed return {"message": "error", "code": 500} @router.post("/scheduled/task/pagemonit/update") async def update_scheduled_task_pagemonit_data(request_data: dict, db=Depends(get_mongo_db), _: dict = Depends(verify_token)): try: if not request_data: return {"message": "Data to update is missing in the request", "code": 400} state = request_data.get('state') formatted_time = "" job = scheduler.get_job('page_monitoring') if state: if job is None: scheduler.add_job(create_page_monitoring_task, 'interval', hours=request_data.get('hour', 24), id='page_monitoring', jobstore='mongo') next_time = scheduler.get_job('page_monitoring').next_run_time formatted_time = next_time.strftime("%Y-%m-%d %H:%M:%S") else: if job: scheduler.remove_job('page_monitoring') update_document = { "$set": { "hour": request_data.get('hour', 24), "node": request_data.get('node', []), "allNode": request_data.get('allNode', True), "nextTime": formatted_time, "state": request_data.get('state'), } } result = await db.ScheduledTasks.update_one({"id": 'page_monitoring'}, update_document) if result: return {"message": "Data updated successfully", "code": 200} else: return {"message": "Failed to update data", "code": 404} except Exception as e: logger.error(str(e)) # Handle exceptions as needed return {"message": "error", "code": 500} @router.post("/scheduled/task/pagemonit/delete") async def delete_scheduled_task_pagemonit_data(request_data: dict, db=Depends(get_mongo_db), _: dict = Depends(verify_token)): try: # Extract the list of IDs from the request_data dictionary url_ids = request_data.get("ids", []) # Convert the provided rule_ids to ObjectId obj_ids = [ObjectId(url_id) for url_id in url_ids] # Delete the SensitiveRule documents based on the provided IDs result = await db.PageMonitoring.delete_many({"_id": {"$in": obj_ids}}) # Check if the deletion was successful if result.deleted_count > 0: return {"code": 200, "message": "URL deleted successfully"} else: return {"code": 404, "message": "URL not found"} except Exception as e: logger.error(str(e)) # Handle exceptions as needed return {"message": "error", "code": 500} @router.post("/scheduled/task/pagemonit/add") async def add_scheduled_task_pagemonit_data(request_data: dict, db=Depends(get_mongo_db), _: dict = Depends(verify_token)): try: if not request_data: return {"message": "Data to add is missing in the request", "code": 400} url = request_data.get("url") result = await db.PageMonitoring.insert_one({ "url": url, "content": [], "hash": [], "diff": [], "state": 1, "project": '', "time": '' }) if result.inserted_id: return {"message": "Data added successfully", "code": 200} else: return {"message": "Failed to add data", "code": 400} except Exception as e: logger.error(str(e)) # Handle exceptions as needed return {"message": "error", "code": 500}