# -*- coding:utf-8 -*-   # @name: node # @auth: rainy-autumn@outlook.com # @version: import time from datetime import datetime from fastapi import WebSocket from fastapi import APIRouter, Depends from starlette.websockets import WebSocketDisconnect from core.config import * from api.users import verify_token from core.redis_handler import get_redis_pool from core.util import get_now_time from core.redis_handler import refresh_config import asyncio, json from loguru import logger router = APIRouter() async def update_redis_data(redis, key): await redis.hmset(key, {'state': '3'}) @router.get("/node/data") async def node_data(_: dict = Depends(verify_token), redis_con=Depends(get_redis_pool)): async with redis_con as redis: # 获取所有以 node: 开头的键 keys = await redis.keys("node:*") # 构建结果字典 result = [] for key in keys: name = key.split(":")[1] # 获取哈希中的所有字段和值 hash_data = await redis.hgetall(key) # 在哈希数据中增加键为 name,值为键的名称 hash_data['name'] = name if hash_data.get('state') == '1': update_time_str = hash_data.get('updateTime') if update_time_str: update_time = datetime.strptime(update_time_str, '%Y-%m-%d %H:%M:%S') time_difference = (datetime.strptime(get_now_time(), "%Y-%m-%d %H:%M:%S") - update_time).total_seconds() logger.info(f'节点时间差:{time_difference}, {get_now_time()}, {update_time}') if time_difference > NODE_TIMEOUT: await asyncio.create_task(update_redis_data(redis, key)) hash_data['state'] = '3' # 将哈希数据添加到结果数组中 result.append(hash_data) return { "code": 200, "data": { 'list': result } } @router.get("/node/data/online") async def node_data_online(_: dict = Depends(verify_token), redis_con=Depends(get_redis_pool)): result = await get_redis_online_data(redis_con) return { "code": 200, "data": { 'list': result } } @router.post("/node/config/update") async def node_config_update(config_data: dict, _: dict = Depends(verify_token), redis_con=Depends(get_redis_pool)): try: name = config_data.get("name") old_name = config_data.get("oldName", "") if old_name == "": old_name = name max_task_num = config_data.get("maxTaskNum") state = config_data.get("state") if name is None or max_task_num is None or state is None: return {"code": 400, "message": "Invalid request, missing required parameters"} async with redis_con as redis: key = f"node:{name}" if old_name != name: old_name_key = f"node:{old_name}" value = await redis_con.hgetall(old_name_key) await redis_con.hmset(key, value) await refresh_config(old_name, 'UpdateNodeName', name) await redis_con.delete(old_name_key) flag = 0 # while True: # key_exists = await redis_con.exists(key) # if flag == 5: # logger.error("未检测节点名称更新") # break # if key_exists: # await redis_con.delete(old_name_key) # break # else: # flag += 1 # time.sleep(4) redis_state = await redis.hget(key, "state") if state: if redis_state == "2": await redis.hset(key, "state", "1") else: if redis_state == "1": await redis.hset(key, "state", "2") del config_data["name"] del config_data["state"] del config_data["oldName"] for c in config_data: await redis.hset(key, c, config_data[c]) await refresh_config(name, 'nodeConfig') return {"code": 200, "message": "Node configuration updated successfully"} except Exception as e: return {"code": 500, "message": f"Internal server error: {str(e)}"} @router.post("/node/delete") async def delete_node_rules(request_data: dict, _: dict = Depends(verify_token), redis_con=Depends(get_redis_pool)): try: node_names = request_data.get("names", []) for name in node_names: logger.info("delete node:" + name) await redis_con.delete("node:" + name) return {"message": "Node deleted successfully", "code": 200} except Exception as e: logger.error(str(e)) # Handle exceptions as needed return {"message": "error", "code": 500} @router.post("/node/log/data") async def get_node_logs(request_data: dict, _: dict = Depends(verify_token), redis_con=Depends(get_redis_pool)): try: node_name = request_data.get("name") if not node_name: return {"message": "Node name is required", "code": 400} # 构建日志键 log_key = f"log:{node_name}" # 从 Redis 中获取日志列表 logs = await redis_con.lrange(log_key, 0, -1) log_data = "" for log in logs: log_data += log return {"code": 200, "logs": log_data} except Exception as e: logger.error(str(e)) # Handle exceptions as needed return {"message": "Error retrieving logs", "code": 500} async def get_redis_online_data(redis_con): async with redis_con as redis: # 获取所有以 node: 开头的键 keys = await redis.keys("node:*") # 构建结果字典 result = [] for key in keys: name = key.split(":")[1] hash_data = await redis.hgetall(key) if hash_data.get('state') == '1': # update_time_str = hash_data.get('updateTime') # if update_time_str: # update_time = datetime.strptime(update_time_str, '%Y-%m-%d %H:%M:%S') # time_difference = ( # datetime.strptime(get_now_time(), "%Y-%m-%d %H:%M:%S") - update_time).total_seconds() # logger.info(f'节点时间差:{time_difference}, {get_now_time()}, {update_time}') # if time_difference > NODE_TIMEOUT: # await asyncio.create_task(update_redis_data(redis, key)) # hash_data['state'] = '3' # else: result.append(name) return result async def get_node_all(redis_con): try: result = [] async with redis_con as redis: # 获取所有以 node: 开头的键 keys = await redis.keys("node:*") for key in keys: name = key.split(":")[1] hash_data = await redis.hgetall(key) if hash_data.get('state') != '2': result.append(name) return result except: return []