497 lines
19 KiB
Python
497 lines
19 KiB
Python
# -*- coding:utf-8 -*-
|
||
# @name: util
|
||
# @auth: rainy-autumn@outlook.com
|
||
# @version:
|
||
import hashlib, random
|
||
import re
|
||
import string
|
||
import sys
|
||
from loguru import logger
|
||
from core.config import TIMEZONE, APP, Project_List
|
||
from datetime import timezone
|
||
from datetime import datetime, timedelta
|
||
import json
|
||
from urllib.parse import urlparse
|
||
|
||
from core.db import get_mongo_db
|
||
|
||
|
||
def calculate_md5_from_content(content):
|
||
md5 = hashlib.md5()
|
||
md5.update(content.encode("utf-8"))
|
||
return md5.hexdigest()
|
||
|
||
|
||
def evaluate_expression(express):
|
||
random_bool = random.choice([True, False])
|
||
return str(random_bool)
|
||
|
||
|
||
def generate_random_string(length):
|
||
# 生成随机字符串,包括大小写字母和数字
|
||
characters = string.ascii_letters + string.digits
|
||
random_string = ''.join(random.choice(characters) for _ in range(length))
|
||
return random_string
|
||
|
||
|
||
def is_valid_string(s):
|
||
# 定义合法字符集
|
||
valid_chars = string.ascii_letters + string.digits
|
||
# 使用正则表达式判断字符串是否仅包含合法字符
|
||
pattern = f"^[{re.escape(valid_chars)}]+$"
|
||
return bool(re.match(pattern, s))
|
||
|
||
def parse_expression(express, eval_expression):
|
||
parts = []
|
||
part = ""
|
||
operator_flag = False
|
||
parentheses_depth = 0
|
||
for i in range(len(express)):
|
||
if express[i] == '(':
|
||
if i != 0:
|
||
if express[i - 1] != '\\':
|
||
parentheses_depth += 1
|
||
elif express[i] == ')':
|
||
if i != 0:
|
||
if express[i - 1] != '\\':
|
||
parentheses_depth -= 1
|
||
|
||
if express[i] == '|' and express[i + 1] == '|' and parentheses_depth == 0:
|
||
operator_flag = True
|
||
if part[0] == '(':
|
||
eval_expression += "("
|
||
eval_expression = parse_expression(part.strip("(").strip(")"), eval_expression)
|
||
eval_expression += ") or "
|
||
else:
|
||
eval_expression += evaluate_expression(part) + " or "
|
||
part = ""
|
||
elif express[i] == '&' and express[i + 1] == '&' and parentheses_depth == 0:
|
||
operator_flag = True
|
||
if part[0] == '(':
|
||
eval_expression += "("
|
||
eval_expression = parse_expression(part.strip("(").strip(")"), eval_expression)
|
||
eval_expression += ") and "
|
||
else:
|
||
eval_expression += evaluate_expression(part) + " and "
|
||
part = ""
|
||
else:
|
||
ch = ""
|
||
if operator_flag:
|
||
ch = express[i + 1]
|
||
operator_flag = False
|
||
else:
|
||
ch = express[i]
|
||
part += ch.strip()
|
||
if part[0] == '(':
|
||
eval_expression += "("
|
||
eval_expression = parse_expression(part.strip("(").strip(")"), eval_expression)
|
||
eval_expression += ")"
|
||
else:
|
||
eval_expression += evaluate_expression(part)
|
||
return eval_expression
|
||
|
||
|
||
def get_now_time():
|
||
TZ = timezone(
|
||
timedelta(hours=8),
|
||
name=TIMEZONE,
|
||
)
|
||
utc_now = datetime.utcnow().replace(tzinfo=timezone.utc)
|
||
time_now = utc_now.astimezone(TZ)
|
||
formatted_time = time_now.strftime("%Y-%m-%d %H:%M:%S")
|
||
return formatted_time
|
||
|
||
|
||
def read_json_file(file_path):
|
||
with open(file_path, encoding='utf-8') as f:
|
||
data = json.load(f)
|
||
return data
|
||
|
||
|
||
def transform_db_redis(request_data):
|
||
Subfinder = False
|
||
Ksubdomain = False
|
||
if "Subfinder" in request_data["subdomainConfig"]:
|
||
Subfinder = True
|
||
if "Ksubdomain" in request_data["subdomainConfig"]:
|
||
Ksubdomain = True
|
||
add_redis_task_data = {
|
||
"TaskId": request_data["id"],
|
||
"SubdomainScan": request_data["subdomainScan"],
|
||
"Subfinder": Subfinder,
|
||
"Ksubdomain": Ksubdomain,
|
||
"UrlScan": request_data["urlScan"],
|
||
"Duplicates": request_data["duplicates"],
|
||
"SensitiveInfoScan": request_data["sensitiveInfoScan"],
|
||
"PageMonitoring": request_data["pageMonitoring"],
|
||
"CrawlerScan": request_data["crawlerScan"],
|
||
"VulScan": request_data["vulScan"],
|
||
"VulList": request_data["vulList"],
|
||
"PortScan": request_data["portScan"],
|
||
"Ports": request_data["ports"],
|
||
"Waybackurl": request_data["waybackurl"],
|
||
"DirScan": request_data["dirScan"],
|
||
"type": 'scan'
|
||
}
|
||
return add_redis_task_data
|
||
|
||
|
||
def string_to_postfix(expression):
|
||
try:
|
||
operands_stack = []
|
||
expression_stack = []
|
||
start_char = 0
|
||
skip_flag = False
|
||
exp_flag = False
|
||
for index, char in enumerate(expression):
|
||
if skip_flag:
|
||
skip_flag = False
|
||
continue
|
||
if char == '|' and expression[index + 1] == '|':
|
||
skip_flag = True
|
||
operands_stack.append("||")
|
||
key = expression[start_char:index]
|
||
if key != "":
|
||
expression_stack.append(key)
|
||
start_char = index + 2
|
||
elif char == '&' and expression[index + 1] == '&':
|
||
skip_flag = True
|
||
operands_stack.append("&&")
|
||
key = expression[start_char:index]
|
||
if key != "":
|
||
expression_stack.append(key)
|
||
start_char = index + 2
|
||
elif char == '(' and expression[index - 1] != '\\' and exp_flag != True:
|
||
start_char = index + 1
|
||
operands_stack.append('(')
|
||
elif char == ')' and expression[index - 1] != '\\' and exp_flag != True:
|
||
key = expression[start_char:index]
|
||
if key != "":
|
||
expression_stack.append(key)
|
||
start_char = index + 1
|
||
popped_value = operands_stack.pop()
|
||
while popped_value != '(':
|
||
if popped_value != '(':
|
||
if popped_value != "":
|
||
expression_stack.append(popped_value)
|
||
popped_value = operands_stack.pop()
|
||
elif char == " ":
|
||
continue
|
||
elif char == "\"" and expression[index - 1] != "\\":
|
||
if exp_flag == False:
|
||
exp_flag = True
|
||
else:
|
||
if index == len(expression):
|
||
exp_flag = False
|
||
continue
|
||
tmp = expression[index:].replace(" ", "")
|
||
if tmp.startswith("\"||") or (tmp.startswith("\"))") and len(tmp) == 3) or tmp.startswith(
|
||
"\"&&") or tmp.startswith("\")||") or tmp.startswith("\")&&") or (
|
||
tmp.startswith("\")") and len(tmp) == 2) or re.findall(r"^\"[)]*(\|\||\&\&)", tmp):
|
||
exp_flag = False
|
||
if start_char != len(expression):
|
||
key = expression[start_char:]
|
||
if key != "":
|
||
expression_stack.append(key)
|
||
while len(operands_stack) != 0:
|
||
expression_stack.append(operands_stack.pop())
|
||
tmp = []
|
||
for key in expression_stack:
|
||
if key != "" and key != " ":
|
||
tmp.append(
|
||
key.strip().replace('\(', '(').replace('\)', ')').replace('\|\|', '||').replace('\&\&', '&&'))
|
||
return tmp
|
||
except Exception as e:
|
||
logger.error(f"后缀表达式转换出错:{expression}")
|
||
return ""
|
||
|
||
|
||
async def search_to_mongodb(expression_raw, keyword):
|
||
try:
|
||
keyword["task"] = "taskId"
|
||
if expression_raw == "":
|
||
return [{}]
|
||
if len(APP) == 0:
|
||
logger.error("WebFinger缓存数据为0,请排查~")
|
||
expression = string_to_postfix(expression_raw)
|
||
stack = []
|
||
for expr in expression:
|
||
if expr == "&&":
|
||
right = stack.pop()
|
||
left = stack.pop()
|
||
stack.append({"$and": [left, right]})
|
||
elif expr == "||":
|
||
right = stack.pop()
|
||
left = stack.pop()
|
||
stack.append({"$or": [left, right]})
|
||
elif "!=" in expr:
|
||
key, value = expr.split("!=", 1)
|
||
key = key.strip()
|
||
if key in keyword:
|
||
value = value.strip("\"")
|
||
if key == 'statuscode' or key == 'length':
|
||
value = int(value)
|
||
if key == 'project':
|
||
if value.lower() in Project_List:
|
||
value = Project_List[value.lower()]
|
||
if key == 'app':
|
||
finger_id = []
|
||
for ap_key in APP:
|
||
if value.lower() in APP[ap_key].lower():
|
||
finger_id.append(ap_key)
|
||
tmp_nor = {"$nor": []}
|
||
for f_i in finger_id:
|
||
tmp_nor['$nor'].append({"webfinger": {"$in": [f_i]}})
|
||
tmp_nor['$nor'].append({"technologies": {"$regex": value, "$options": "i"}})
|
||
stack.append(tmp_nor)
|
||
if type(keyword[key]) is list:
|
||
tmp_nor = {"$nor": []}
|
||
for v in keyword[key]:
|
||
tmp_nor['$nor'].append({v: {"$regex": value, "$options": "i"}})
|
||
stack.append(tmp_nor)
|
||
else:
|
||
tmp_nor = {"$nor": []}
|
||
if type(value) is int:
|
||
tmp_nor['$nor'].append({keyword[key]: {"$eq": value}})
|
||
else:
|
||
tmp_nor['$nor'].append({keyword[key]: {"$regex": value, "$options": "i"}})
|
||
stack.append(tmp_nor)
|
||
elif "==" in expr:
|
||
key, value = expr.split("==", 1)
|
||
key = key.strip()
|
||
if key in keyword:
|
||
value = value.strip("\"")
|
||
if key == "task":
|
||
async for db in get_mongo_db():
|
||
query = {"name": {"$eq": value}}
|
||
doc = await db.task.find_one(query)
|
||
if doc is not None:
|
||
taskid = str(doc.get("_id"))
|
||
value = taskid
|
||
if key == 'statuscode' or key == 'length':
|
||
value = int(value)
|
||
if key == 'project':
|
||
if value.lower() in Project_List:
|
||
value = Project_List[value.lower()]
|
||
if key == 'app':
|
||
finger_id = []
|
||
for ap_key in APP:
|
||
if value.lower() == APP[ap_key].lower():
|
||
finger_id.append(ap_key)
|
||
tmp_or = {"$or": []}
|
||
for f_i in finger_id:
|
||
tmp_or['$or'].append({"webfinger": {"$in": [f_i]}})
|
||
tmp_or['$or'].append({"technologies": {"$eq": value}})
|
||
stack.append(tmp_or)
|
||
if type(keyword[key]) is list:
|
||
tmp_or = {"$or": []}
|
||
for v in keyword[key]:
|
||
tmp_or['$or'].append({v: {"$eq": value}})
|
||
stack.append(tmp_or)
|
||
else:
|
||
tmp_or = {keyword[key]: {"$eq": value}}
|
||
stack.append(tmp_or)
|
||
elif "=" in expr:
|
||
key, value = expr.split("=", 1)
|
||
key = key.strip()
|
||
if key in keyword:
|
||
value = value.strip("\"")
|
||
if key == 'project':
|
||
if value.lower() in Project_List:
|
||
value = Project_List[value.lower()]
|
||
if key == 'app':
|
||
finger_id = []
|
||
for ap_key in APP:
|
||
if value.lower() in APP[ap_key].lower():
|
||
finger_id.append(ap_key)
|
||
tmp_or = {"$or": []}
|
||
for f_i in finger_id:
|
||
tmp_or['$or'].append({"webfinger": {"$in": [f_i]}})
|
||
tmp_or['$or'].append({"technologies": {"$regex": value, "$options": "i"}})
|
||
stack.append(tmp_or)
|
||
if type(keyword[key]) is list:
|
||
tmp_or = {"$or": []}
|
||
for v in keyword[key]:
|
||
tmp_or['$or'].append({v: {"$regex": value, "$options": "i"}})
|
||
stack.append(tmp_or)
|
||
else:
|
||
stack.append({keyword[key]: {"$regex": value, "$options": "i"}})
|
||
return stack
|
||
except Exception as e:
|
||
logger.error(e)
|
||
return ""
|
||
|
||
|
||
async def get_search_query(name, request_data):
|
||
global tmp_f_q
|
||
search_query = request_data.get("search", "")
|
||
search_key_v = {
|
||
'sens':{
|
||
'url': 'url',
|
||
'sname': 'sid',
|
||
"body": "body",
|
||
"info": "match",
|
||
'project': 'project',
|
||
'md5': 'md5'
|
||
},
|
||
'dir': {
|
||
'project': 'project',
|
||
'statuscode': 'status',
|
||
'url': 'url',
|
||
'redirect': 'msg',
|
||
'length': 'length'
|
||
},
|
||
'vul': {
|
||
'url': 'url',
|
||
'vulname': 'vulname',
|
||
'project': 'project',
|
||
'matched': 'matched',
|
||
'request': 'request',
|
||
'response': 'response',
|
||
'level': 'level'
|
||
},
|
||
'subdomain': {
|
||
'domain': 'host',
|
||
'ip': 'ip',
|
||
'type': 'type',
|
||
'project': 'project',
|
||
'value': 'value'
|
||
},
|
||
'asset': {
|
||
'app': '',
|
||
'body': 'responsebody',
|
||
'header': 'rawheaders',
|
||
'project': 'project',
|
||
'title': 'title',
|
||
'statuscode': 'statuscode',
|
||
'icon': 'faviconmmh3',
|
||
'ip': ['host', 'ip'],
|
||
'domain': ['host', 'url', 'domain'],
|
||
'port': 'port',
|
||
'protocol': ['protocol', 'type'],
|
||
'banner': 'raw',
|
||
},
|
||
'subdomainTaker': {
|
||
'domain': 'input',
|
||
'value': 'value',
|
||
'type': 'cname',
|
||
'response': 'response',
|
||
'project': 'project',
|
||
},
|
||
'url': {
|
||
'url': 'output',
|
||
'project': 'project',
|
||
'input': 'input',
|
||
'source': 'source',
|
||
"type": "outputtype"
|
||
},
|
||
'page': {
|
||
'url': 'url',
|
||
'project': 'project',
|
||
'hash': 'hash',
|
||
'diff': 'diff',
|
||
'response': 'response'
|
||
},
|
||
'crawler': {
|
||
'url': 'url',
|
||
'method': 'method',
|
||
'body': 'body',
|
||
'project': 'project'
|
||
}
|
||
}
|
||
keyword = search_key_v[name]
|
||
query = await search_to_mongodb(search_query, keyword)
|
||
if query == "" or query is None:
|
||
return ""
|
||
query = query[0]
|
||
filter_key = {'app':'app','color': 'color', 'status': 'status', 'level': 'level', 'type': 'type', 'project': 'project', 'port': 'port', 'protocol': ['protocol', 'type'], 'icon': 'faviconmmh3', "statuscode": "statuscode", "sname": "sid"}
|
||
filter = request_data.get("filter", {})
|
||
if filter:
|
||
query["$and"] = []
|
||
for f in filter:
|
||
if f in filter_key:
|
||
tmp_or = []
|
||
for v in filter[f]:
|
||
if v != "":
|
||
if f == 'app':
|
||
for ap_key in APP:
|
||
if v == APP[ap_key]:
|
||
tmp_or.append({'webfinger': ap_key})
|
||
tmp_or.append({'technologies': v})
|
||
else:
|
||
if type(filter_key[f]) is list:
|
||
for li in filter_key[f]:
|
||
tmp_or.append({li: v})
|
||
else:
|
||
tmp_or.append({filter_key[f]: v})
|
||
if len(tmp_or) != 0:
|
||
query["$and"].append({"$or": tmp_or})
|
||
fuzzy_query = request_data.get("fq", {})
|
||
fuzzy_query_key = {"sub_host": 'host', "sub_value": "value", "sub_ip": "ip", "port_port": "port", "port_domain":['domain', 'host'], 'port_ip': ['ip', 'host'], 'port_protocol': ['type', 'protocol'],
|
||
"service_service": ['type', 'webServer', 'protocol'], "service_domain": ['domain', 'host'], "service_port": "port", "service_ip": ['ip', 'host']}
|
||
if fuzzy_query:
|
||
if "$and" not in query:
|
||
query["$and"] = []
|
||
for q in fuzzy_query:
|
||
if fuzzy_query[q] != "":
|
||
tmp_f_q = []
|
||
if q in fuzzy_query_key:
|
||
if type(fuzzy_query_key[q]) is list:
|
||
for key in fuzzy_query_key[q]:
|
||
tmp_f_q.append({key: {"$regex": fuzzy_query[q]}})
|
||
else:
|
||
tmp_f_q.append({fuzzy_query_key[q]: {"$regex": fuzzy_query[q]}})
|
||
if len(tmp_f_q) != 0:
|
||
query["$and"].append({"$or": tmp_f_q})
|
||
if "$and" in query:
|
||
if len(query["$and"]) == 0:
|
||
query.pop("$and")
|
||
return query
|
||
|
||
|
||
def get_root_domain(url):
|
||
# 如果URL不带协议,添加一个默认的http协议
|
||
global root_domain
|
||
if not url.startswith(('http://', 'https://')):
|
||
url = 'http://' + url
|
||
|
||
parsed_url = urlparse(url)
|
||
|
||
# 检查是否为IP地址
|
||
try:
|
||
# 使用ip_address来检查
|
||
from ipaddress import ip_address
|
||
ip_address(parsed_url.netloc)
|
||
return parsed_url.netloc # 如果是IP地址,直接返回
|
||
except ValueError:
|
||
pass
|
||
|
||
domain_parts = parsed_url.netloc.split('.')
|
||
|
||
# 复合域名列表
|
||
compound_domains = [
|
||
'com.cn', 'net.cn', 'org.cn', 'gov.cn', 'edu.cn', 'ac.cn', 'mil.cn',
|
||
'co.uk', 'org.uk', 'net.uk', 'gov.uk', 'ac.uk', 'sch.uk',
|
||
'co.jp', 'ne.jp', 'or.jp', 'go.jp', 'ac.jp', 'ad.jp',
|
||
'com.de', 'org.de', 'net.de', 'gov.de',
|
||
'com.ca', 'net.ca', 'org.ca', 'gov.ca',
|
||
'com.au', 'net.au', 'org.au', 'gov.au', 'edu.au',
|
||
'com.fr', 'net.fr', 'org.fr', 'gov.fr',
|
||
'com.br', 'com.mx', 'com.ar', 'com.ru',
|
||
'co.in', 'co.za',
|
||
'co.kr', 'com.tw'
|
||
]
|
||
|
||
# 检查是否为复合域名
|
||
is_compound_domain = False
|
||
for compound_domain in compound_domains:
|
||
if domain_parts[-2:] == compound_domain.split('.'):
|
||
is_compound_domain = True
|
||
root_domain = '.'.join(domain_parts[-3:])
|
||
break
|
||
|
||
if not is_compound_domain:
|
||
root_domain = '.'.join(domain_parts[-2:])
|
||
|
||
return root_domain
|