ScopeSentry/core/util.py

497 lines
19 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding:utf-8 -*-  
# @name: util
# @auth: rainy-autumn@outlook.com
# @version:
import hashlib, random
import re
import string
import sys
from loguru import logger
from core.config import TIMEZONE, APP, Project_List
from datetime import timezone
from datetime import datetime, timedelta
import json
from urllib.parse import urlparse
from core.db import get_mongo_db
def calculate_md5_from_content(content):
md5 = hashlib.md5()
md5.update(content.encode("utf-8"))
return md5.hexdigest()
def evaluate_expression(express):
random_bool = random.choice([True, False])
return str(random_bool)
def generate_random_string(length):
# 生成随机字符串,包括大小写字母和数字
characters = string.ascii_letters + string.digits
random_string = ''.join(random.choice(characters) for _ in range(length))
return random_string
def is_valid_string(s):
# 定义合法字符集
valid_chars = string.ascii_letters + string.digits
# 使用正则表达式判断字符串是否仅包含合法字符
pattern = f"^[{re.escape(valid_chars)}]+$"
return bool(re.match(pattern, s))
def parse_expression(express, eval_expression):
parts = []
part = ""
operator_flag = False
parentheses_depth = 0
for i in range(len(express)):
if express[i] == '(':
if i != 0:
if express[i - 1] != '\\':
parentheses_depth += 1
elif express[i] == ')':
if i != 0:
if express[i - 1] != '\\':
parentheses_depth -= 1
if express[i] == '|' and express[i + 1] == '|' and parentheses_depth == 0:
operator_flag = True
if part[0] == '(':
eval_expression += "("
eval_expression = parse_expression(part.strip("(").strip(")"), eval_expression)
eval_expression += ") or "
else:
eval_expression += evaluate_expression(part) + " or "
part = ""
elif express[i] == '&' and express[i + 1] == '&' and parentheses_depth == 0:
operator_flag = True
if part[0] == '(':
eval_expression += "("
eval_expression = parse_expression(part.strip("(").strip(")"), eval_expression)
eval_expression += ") and "
else:
eval_expression += evaluate_expression(part) + " and "
part = ""
else:
ch = ""
if operator_flag:
ch = express[i + 1]
operator_flag = False
else:
ch = express[i]
part += ch.strip()
if part[0] == '(':
eval_expression += "("
eval_expression = parse_expression(part.strip("(").strip(")"), eval_expression)
eval_expression += ")"
else:
eval_expression += evaluate_expression(part)
return eval_expression
def get_now_time():
TZ = timezone(
timedelta(hours=8),
name=TIMEZONE,
)
utc_now = datetime.utcnow().replace(tzinfo=timezone.utc)
time_now = utc_now.astimezone(TZ)
formatted_time = time_now.strftime("%Y-%m-%d %H:%M:%S")
return formatted_time
def read_json_file(file_path):
with open(file_path, encoding='utf-8') as f:
data = json.load(f)
return data
def transform_db_redis(request_data):
Subfinder = False
Ksubdomain = False
if "Subfinder" in request_data["subdomainConfig"]:
Subfinder = True
if "Ksubdomain" in request_data["subdomainConfig"]:
Ksubdomain = True
add_redis_task_data = {
"TaskId": request_data["id"],
"SubdomainScan": request_data["subdomainScan"],
"Subfinder": Subfinder,
"Ksubdomain": Ksubdomain,
"UrlScan": request_data["urlScan"],
"Duplicates": request_data["duplicates"],
"SensitiveInfoScan": request_data["sensitiveInfoScan"],
"PageMonitoring": request_data["pageMonitoring"],
"CrawlerScan": request_data["crawlerScan"],
"VulScan": request_data["vulScan"],
"VulList": request_data["vulList"],
"PortScan": request_data["portScan"],
"Ports": request_data["ports"],
"Waybackurl": request_data["waybackurl"],
"DirScan": request_data["dirScan"],
"type": 'scan'
}
return add_redis_task_data
def string_to_postfix(expression):
try:
operands_stack = []
expression_stack = []
start_char = 0
skip_flag = False
exp_flag = False
for index, char in enumerate(expression):
if skip_flag:
skip_flag = False
continue
if char == '|' and expression[index + 1] == '|':
skip_flag = True
operands_stack.append("||")
key = expression[start_char:index]
if key != "":
expression_stack.append(key)
start_char = index + 2
elif char == '&' and expression[index + 1] == '&':
skip_flag = True
operands_stack.append("&&")
key = expression[start_char:index]
if key != "":
expression_stack.append(key)
start_char = index + 2
elif char == '(' and expression[index - 1] != '\\' and exp_flag != True:
start_char = index + 1
operands_stack.append('(')
elif char == ')' and expression[index - 1] != '\\' and exp_flag != True:
key = expression[start_char:index]
if key != "":
expression_stack.append(key)
start_char = index + 1
popped_value = operands_stack.pop()
while popped_value != '(':
if popped_value != '(':
if popped_value != "":
expression_stack.append(popped_value)
popped_value = operands_stack.pop()
elif char == " ":
continue
elif char == "\"" and expression[index - 1] != "\\":
if exp_flag == False:
exp_flag = True
else:
if index == len(expression):
exp_flag = False
continue
tmp = expression[index:].replace(" ", "")
if tmp.startswith("\"||") or (tmp.startswith("\"))") and len(tmp) == 3) or tmp.startswith(
"\"&&") or tmp.startswith("\")||") or tmp.startswith("\")&&") or (
tmp.startswith("\")") and len(tmp) == 2) or re.findall(r"^\"[)]*(\|\||\&\&)", tmp):
exp_flag = False
if start_char != len(expression):
key = expression[start_char:]
if key != "":
expression_stack.append(key)
while len(operands_stack) != 0:
expression_stack.append(operands_stack.pop())
tmp = []
for key in expression_stack:
if key != "" and key != " ":
tmp.append(
key.strip().replace('\(', '(').replace('\)', ')').replace('\|\|', '||').replace('\&\&', '&&'))
return tmp
except Exception as e:
logger.error(f"后缀表达式转换出错:{expression}")
return ""
async def search_to_mongodb(expression_raw, keyword):
try:
keyword["task"] = "taskId"
if expression_raw == "":
return [{}]
if len(APP) == 0:
logger.error("WebFinger缓存数据为0请排查~")
expression = string_to_postfix(expression_raw)
stack = []
for expr in expression:
if expr == "&&":
right = stack.pop()
left = stack.pop()
stack.append({"$and": [left, right]})
elif expr == "||":
right = stack.pop()
left = stack.pop()
stack.append({"$or": [left, right]})
elif "!=" in expr:
key, value = expr.split("!=", 1)
key = key.strip()
if key in keyword:
value = value.strip("\"")
if key == 'statuscode' or key == 'length':
value = int(value)
if key == 'project':
if value.lower() in Project_List:
value = Project_List[value.lower()]
if key == 'app':
finger_id = []
for ap_key in APP:
if value.lower() in APP[ap_key].lower():
finger_id.append(ap_key)
tmp_nor = {"$nor": []}
for f_i in finger_id:
tmp_nor['$nor'].append({"webfinger": {"$in": [f_i]}})
tmp_nor['$nor'].append({"technologies": {"$regex": value, "$options": "i"}})
stack.append(tmp_nor)
if type(keyword[key]) is list:
tmp_nor = {"$nor": []}
for v in keyword[key]:
tmp_nor['$nor'].append({v: {"$regex": value, "$options": "i"}})
stack.append(tmp_nor)
else:
tmp_nor = {"$nor": []}
if type(value) is int:
tmp_nor['$nor'].append({keyword[key]: {"$eq": value}})
else:
tmp_nor['$nor'].append({keyword[key]: {"$regex": value, "$options": "i"}})
stack.append(tmp_nor)
elif "==" in expr:
key, value = expr.split("==", 1)
key = key.strip()
if key in keyword:
value = value.strip("\"")
if key == "task":
async for db in get_mongo_db():
query = {"name": {"$eq": value}}
doc = await db.task.find_one(query)
if doc is not None:
taskid = str(doc.get("_id"))
value = taskid
if key == 'statuscode' or key == 'length':
value = int(value)
if key == 'project':
if value.lower() in Project_List:
value = Project_List[value.lower()]
if key == 'app':
finger_id = []
for ap_key in APP:
if value.lower() == APP[ap_key].lower():
finger_id.append(ap_key)
tmp_or = {"$or": []}
for f_i in finger_id:
tmp_or['$or'].append({"webfinger": {"$in": [f_i]}})
tmp_or['$or'].append({"technologies": {"$eq": value}})
stack.append(tmp_or)
if type(keyword[key]) is list:
tmp_or = {"$or": []}
for v in keyword[key]:
tmp_or['$or'].append({v: {"$eq": value}})
stack.append(tmp_or)
else:
tmp_or = {keyword[key]: {"$eq": value}}
stack.append(tmp_or)
elif "=" in expr:
key, value = expr.split("=", 1)
key = key.strip()
if key in keyword:
value = value.strip("\"")
if key == 'project':
if value.lower() in Project_List:
value = Project_List[value.lower()]
if key == 'app':
finger_id = []
for ap_key in APP:
if value.lower() in APP[ap_key].lower():
finger_id.append(ap_key)
tmp_or = {"$or": []}
for f_i in finger_id:
tmp_or['$or'].append({"webfinger": {"$in": [f_i]}})
tmp_or['$or'].append({"technologies": {"$regex": value, "$options": "i"}})
stack.append(tmp_or)
if type(keyword[key]) is list:
tmp_or = {"$or": []}
for v in keyword[key]:
tmp_or['$or'].append({v: {"$regex": value, "$options": "i"}})
stack.append(tmp_or)
else:
stack.append({keyword[key]: {"$regex": value, "$options": "i"}})
return stack
except Exception as e:
logger.error(e)
return ""
async def get_search_query(name, request_data):
global tmp_f_q
search_query = request_data.get("search", "")
search_key_v = {
'sens':{
'url': 'url',
'sname': 'sid',
"body": "body",
"info": "match",
'project': 'project',
'md5': 'md5'
},
'dir': {
'project': 'project',
'statuscode': 'status',
'url': 'url',
'redirect': 'msg',
'length': 'length'
},
'vul': {
'url': 'url',
'vulname': 'vulname',
'project': 'project',
'matched': 'matched',
'request': 'request',
'response': 'response',
'level': 'level'
},
'subdomain': {
'domain': 'host',
'ip': 'ip',
'type': 'type',
'project': 'project',
'value': 'value'
},
'asset': {
'app': '',
'body': 'responsebody',
'header': 'rawheaders',
'project': 'project',
'title': 'title',
'statuscode': 'statuscode',
'icon': 'faviconmmh3',
'ip': ['host', 'ip'],
'domain': ['host', 'url', 'domain'],
'port': 'port',
'protocol': ['protocol', 'type'],
'banner': 'raw',
},
'subdomainTaker': {
'domain': 'input',
'value': 'value',
'type': 'cname',
'response': 'response',
'project': 'project',
},
'url': {
'url': 'output',
'project': 'project',
'input': 'input',
'source': 'source',
"type": "outputtype"
},
'page': {
'url': 'url',
'project': 'project',
'hash': 'hash',
'diff': 'diff',
'response': 'response'
},
'crawler': {
'url': 'url',
'method': 'method',
'body': 'body',
'project': 'project'
}
}
keyword = search_key_v[name]
query = await search_to_mongodb(search_query, keyword)
if query == "" or query is None:
return ""
query = query[0]
filter_key = {'app':'app','color': 'color', 'status': 'status', 'level': 'level', 'type': 'type', 'project': 'project', 'port': 'port', 'protocol': ['protocol', 'type'], 'icon': 'faviconmmh3', "statuscode": "statuscode", "sname": "sid"}
filter = request_data.get("filter", {})
if filter:
query["$and"] = []
for f in filter:
if f in filter_key:
tmp_or = []
for v in filter[f]:
if v != "":
if f == 'app':
for ap_key in APP:
if v == APP[ap_key]:
tmp_or.append({'webfinger': ap_key})
tmp_or.append({'technologies': v})
else:
if type(filter_key[f]) is list:
for li in filter_key[f]:
tmp_or.append({li: v})
else:
tmp_or.append({filter_key[f]: v})
if len(tmp_or) != 0:
query["$and"].append({"$or": tmp_or})
fuzzy_query = request_data.get("fq", {})
fuzzy_query_key = {"sub_host": 'host', "sub_value": "value", "sub_ip": "ip", "port_port": "port", "port_domain":['domain', 'host'], 'port_ip': ['ip', 'host'], 'port_protocol': ['type', 'protocol'],
"service_service": ['type', 'webServer', 'protocol'], "service_domain": ['domain', 'host'], "service_port": "port", "service_ip": ['ip', 'host']}
if fuzzy_query:
if "$and" not in query:
query["$and"] = []
for q in fuzzy_query:
if fuzzy_query[q] != "":
tmp_f_q = []
if q in fuzzy_query_key:
if type(fuzzy_query_key[q]) is list:
for key in fuzzy_query_key[q]:
tmp_f_q.append({key: {"$regex": fuzzy_query[q]}})
else:
tmp_f_q.append({fuzzy_query_key[q]: {"$regex": fuzzy_query[q]}})
if len(tmp_f_q) != 0:
query["$and"].append({"$or": tmp_f_q})
if "$and" in query:
if len(query["$and"]) == 0:
query.pop("$and")
return query
def get_root_domain(url):
# 如果URL不带协议添加一个默认的http协议
global root_domain
if not url.startswith(('http://', 'https://')):
url = 'http://' + url
parsed_url = urlparse(url)
# 检查是否为IP地址
try:
# 使用ip_address来检查
from ipaddress import ip_address
ip_address(parsed_url.netloc)
return parsed_url.netloc # 如果是IP地址直接返回
except ValueError:
pass
domain_parts = parsed_url.netloc.split('.')
# 复合域名列表
compound_domains = [
'com.cn', 'net.cn', 'org.cn', 'gov.cn', 'edu.cn', 'ac.cn', 'mil.cn',
'co.uk', 'org.uk', 'net.uk', 'gov.uk', 'ac.uk', 'sch.uk',
'co.jp', 'ne.jp', 'or.jp', 'go.jp', 'ac.jp', 'ad.jp',
'com.de', 'org.de', 'net.de', 'gov.de',
'com.ca', 'net.ca', 'org.ca', 'gov.ca',
'com.au', 'net.au', 'org.au', 'gov.au', 'edu.au',
'com.fr', 'net.fr', 'org.fr', 'gov.fr',
'com.br', 'com.mx', 'com.ar', 'com.ru',
'co.in', 'co.za',
'co.kr', 'com.tw'
]
# 检查是否为复合域名
is_compound_domain = False
for compound_domain in compound_domains:
if domain_parts[-2:] == compound_domain.split('.'):
is_compound_domain = True
root_domain = '.'.join(domain_parts[-3:])
break
if not is_compound_domain:
root_domain = '.'.join(domain_parts[-2:])
return root_domain