initial commit

새로운 repository 를 만들었습니다.
This commit is contained in:
David Ko
2025-04-29 14:11:00 +09:00
commit 82f8b93a2c
127 changed files with 7053422 additions and 0 deletions

97
backend/README.md Normal file
View File

@@ -0,0 +1,97 @@
# URL Malicious Detection API
This FastAPI application provides an API for detecting malicious URLs using two different machine learning models.
## Features
- Two independent ML models for URL analysis
- RESTful API for easy integration
- High performance with `uv` package management
## Installation
1. Clone the repository
2. Create and activate a virtual environment using `uv`:
```bash
cd backend
uv venv
source .venv/bin/activate
```
3. Install dependencies using `uv pip`:
```bash
uv pip install -r requirements.txt
```
## Running the Application
Start the FastAPI server:
```bash
uvicorn app.main:app --reload
```
The API will be available at http://localhost:8000
## API Endpoints
### GET /
Health check endpoint that confirms the API is running.
**Response**:
```json
{
"message": "URL 악성 판별기 FastAPI 서버 정상 작동 중!"
}
```
### POST /predict
Analyzes a URL to determine if it's malicious.
**Request Body**:
```json
{
"url": "http://example.com"
}
```
**Response**:
```json
{
"url": "http://example.com",
"model1": 0.2048, // Lower value means less likely to be malicious
"model2": {
"url": "http://example.com",
"malicious_probability": 0.1076, // Probability of being malicious
"is_malicious": false, // Boolean classification result
"threshold": 0.4034 // Classification threshold
}
}
```
## Models
The application uses two different models for URL analysis:
1. **XGBoost Ensemble** (model1): Ensemble of 4 XGBoost models
2. **Neural Network** (model2): TensorFlow/Keras model
## Dependencies
Main dependencies:
- FastAPI
- TensorFlow
- XGBoost
- scikit-learn
- pandas
- numpy
- tldextract
- uvicorn
## License
This project is licensed under the MIT License.

14
backend/app/.vscode/tasks.json vendored Normal file
View File

@@ -0,0 +1,14 @@
{
"version": "2.0.0",
"tasks": [
{
"label": "Run FastAPI with uvicorn",
"type": "shell",
"command": "uvicorn main:app --host 0.0.0.0 --port 8000 --reload",
"args": [],
"group": "none",
"isBackground": true,
"problemMatcher": "$eslint-stylish"
}
]
}

381
backend/app/PreP.py Normal file
View File

@@ -0,0 +1,381 @@
import numpy as np
import pandas as pd
import string
import re
from urllib.parse import urlparse, parse_qs
import tldextract
from collections import Counter
import math
import zlib
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import MinMaxScaler
### 전처리에 사용할 함수 정의
## 필요 데이터
suspicious_keywords = ["PayPal", "login", "signin", "bank", "account", "update", "free", "lucky", \
"service", "bonus", "ebayisapi", "webscr", "verify", "secure", "banking", \
"paypal", "confirm", "auth", "redirect", "admin", "support", "server", \
"password", "click", "urgent", "immediate", "alert", "security", "prompt"]
# suspicious keywords 추가
additional_keywords = [
'verify', 'wallet', 'cryptocurrency', 'bitcoin', 'ethereum',
'validation', 'authenticate', 'reset', 'recover', 'access',
'limited', 'offer', 'prize', 'win', 'winner', 'payment',
'bank', 'credit', 'debit', 'card', 'expire', 'suspension',
'unusual', 'activity', 'verify', 'document', 'invoice'
]
# suspicious keywords 전체 목록
all_keywords = list(set(suspicious_keywords + additional_keywords))
# 피싱에 자주 사용되는 브랜드 목록
popular_brands = [
'paypal', 'apple', 'microsoft', 'amazon', 'netflix', 'google',
'facebook', 'instagram', 'twitter', 'linkedin', 'chase', 'wellsfargo',
'bankofamerica', 'citibank', 'amex', 'americanexpress', 'dropbox',
'yahoo', 'outlook', 'office365', 'onedrive', 'icloud', 'gmail'
]
# 국가 도메인을 나타내는 2자리 영문 목록
ccTLDs = [
'ac', 'ad', 'ae', 'af', 'ag', 'ai', 'al', 'am', 'ao', 'aq', 'ar', 'as', 'at', 'au', 'aw', 'ax', 'az',
'ba', 'bb', 'bd', 'be', 'bf', 'bg', 'bh', 'bi', 'bj', 'bm', 'bn', 'bo', 'br', 'bs', 'bt', 'bv', 'bw', 'by', 'bz',
'ca', 'cc', 'cd', 'cf', 'cg', 'ch', 'ci', 'ck', 'cl', 'cm', 'cn', 'co', 'cr', 'cu', 'cv', 'cw', 'cx', 'cy', 'cz',
'de', 'dj', 'dk', 'dm', 'do', 'dz', 'ec', 'ee', 'eg', 'eh', 'er', 'es', 'et', 'eu',
'fi', 'fj', 'fk', 'fm', 'fo', 'fr',
'ga', 'gb', 'gd', 'ge', 'gf', 'gg', 'gh', 'gi', 'gl', 'gm', 'gn', 'gp', 'gq', 'gr', 'gt', 'gu', 'gw', 'gy',
'hk', 'hm', 'hn', 'hr', 'ht', 'hu',
'id', 'ie', 'il', 'im', 'in', 'io', 'iq', 'ir', 'is', 'it',
'je', 'jm', 'jo', 'jp',
'ke', 'kg', 'kh', 'ki', 'km', 'kn', 'kp', 'kr', 'kw', 'ky', 'kz',
'la', 'lb', 'lc', 'li', 'lk', 'lr', 'ls', 'lt', 'lu', 'lv', 'ly',
'ma', 'mc', 'md', 'me', 'mg', 'mh', 'mk', 'ml', 'mm', 'mn', 'mo', 'mp', 'mq', 'mr', 'ms', 'mt', 'mu', 'mv', 'mw', 'mx', 'my', 'mz',
'na', 'nc', 'ne', 'nf', 'ng', 'ni', 'nl', 'no', 'np', 'nr', 'nu', 'nz',
'om',
'pa', 'pe', 'pf', 'pg', 'ph', 'pk', 'pl', 'pm', 'pn', 'pr', 'pt', 'pw', 'py',
'qa',
're', 'ro', 'rs', 'ru', 'rw',
'sa', 'sb', 'sc', 'sd', 'se', 'sg', 'sh', 'si', 'sj', 'sk', 'sl', 'sm', 'sn', 'so', 'sr', 'ss', 'st', 'sv', 'sx', 'sy', 'sz',
'tc', 'td', 'tf', 'tg', 'th', 'tj', 'tk', 'tl', 'tm', 'tn', 'to', 'tr', 'tt', 'tv', 'tw', 'tz',
'ua', 'ug', 'uk', 'um', 'us', 'uy', 'uz',
'va', 'vc', 've', 'vg', 'vi', 'vn', 'vu',
'wf', 'ws',
'ye', 'yt',
'za', 'zm', 'zw'
]
# 스케일링을 적용하는 피처들
nor_col = ['subdomain_count', 'digits_count', 'special_chars_count',
'path_depth', 'max_numeric_sequence', 'suspicious_keyword_count',
'repeated', 'num_underbar', 'query_length', 'query_param_count']
# 자주 사용되는 악성 국가 도메인
suspicious_tlds = ['ru', 'cn', 'br', 'np', 'tk', 'ml', 'ga', 'cf', 'ro', 'su']
## 필요 함수 정의
# 개수 카운트 하기
def count_letters(url):
num_letters = sum(char.isalpha() for char in url)
return num_letters
def count_digits(url):
num_digits = sum(char.isdigit() for char in url)
return num_digits
def count_special_chars(url):
special_chars = set(string.punctuation)
num_special_chars = sum(char in special_chars for char in url)
return num_special_chars
# ip 주소 형식 사용 여부
def having_ip_address(url):
match = re.search(
'(([01]?\\d\\d?|2[0-4]\\d|25[0-5])\\.([01]?\\d\\d?|2[0-4]\\d|25[0-5])\\.([01]?\\d\\d?|2[0-4]\\d|25[0-5])\\.'
'([01]?\\d\\d?|2[0-4]\\d|25[0-5])\\/)|' # IPv4
'((0x[0-9a-fA-F]{1,2})\\.(0x[0-9a-fA-F]{1,2})\\.(0x[0-9a-fA-F]{1,2})\\.(0x[0-9a-fA-F]{1,2})\\/)' # IPv4 in hexadecimal
'(?:[a-fA-F0-9]{1,4}:){7}[a-fA-F0-9]{1,4}', url) # Ipv6
if match:
return 1
else:
return 0
# 파일 확장자 포함 여부
def file_ext(url):
match = re.search(".php|.html|.htm|.hwp|.hwpx|.pptx|.docx|.iso|.js|.lnk|.vbs|.xls|.xml|.zip|.xlsx", url)
if match:
return 1
else:
return 0
# URL 길이를 범주형 값(0~3)으로 변환하는 함수
def categorize_url_length(length):
if length <= 12:
return 0
elif length <= 16:
return 1
elif length <= 22:
return 2
else:
return 3
# 피싱에 주로 사용되는 위험 단어들을 포함하는지 여부
def count_suspicious_keywords(text: str) -> int:
return sum(len(re.findall(keyword, text, flags=re.IGNORECASE)) for keyword in suspicious_keywords)
# 반복된 숫자 여부
def repeated_num(url):
repeat_num = len(re.findall(r'(\d)\1+', url)) # 같은 수 반복 찾기, 반복이 없으면 0
if repeat_num > 0: # 반복이 나타나면 1을 리턴
return 1
else: # 반복이 없으면 0을 리턴
return 0
# 반복된 숫자 갯수
def repeated(url):
repeat_num = len(re.findall(r'(\d)\1+', url)) # 같은 수 반복 찾기, 반복이 없으면 0
return repeat_num
# URL 주소에 쿼리를 가지고 있는지 여부
def has_query_f(url):
parsed_url = urlparse(url)
query = parsed_url.query
has_query = 1 if query else 0
return has_query
# URL 주소에 가지고 있는 쿼리의 길이
def query_length_f(url):
parsed_url = urlparse(url)
query = parsed_url.query
query_length = len(query) if query else 0
return query_length
# 쿼리에 가지고 있는 파라미터의 갯수
def query_params_f(url):
parsed_url = urlparse(url)
query = parsed_url.query
query_params = parse_qs(query) # Count query parameters
query_param_count = len(query_params) if query_params else 0
return query_param_count
# 샤논 엔트로피
# 악성 URL의 경우, 난수 기반의 서브 도메인,
# Base64 기반 인코딩, 복잡한 문자 조합 등을 사용하기에 엔트로피가 높게 나타남
def entropy_f(url):
if url:
char_counts = Counter(url)
total_chars = len(url)
char_frequencies = {char: count/total_chars for char, count in char_counts.items()} # 빈도 비율
entropy = -sum(freq * math.log2(freq) for freq in char_frequencies.values()) # 샤논 엔트로피 공식
else:
entropy = 0
return entropy
# 서브도메인을 가지고 있으며, 그것이 숫자를 포함하고 있는지 여부
def subdomain_f(url):
extracted = tldextract.extract(url)
subdomain = extracted.subdomain
has_numeric_subdomain = 1 if subdomain and any(c.isdigit() for c in subdomain) else 0
return has_numeric_subdomain
# URL에서 연속된 자음으로 구성된 길이의 비율을 계산
def consonant_ratio(url):
url = url.lower()
# 자음만으로 이루어진 5글자 이상 연속된 패턴 탐색 (의미 없는 문자열 판단 기준)
consonant_groups = re.findall(r'[^aeiou\W\d_]{5,}', url)
total_consonant_len = sum(len(group) for group in consonant_groups)
return total_consonant_len / len(url) if len(url) > 0 else 0
# 포트 번호
def has_port_number(url):
"""
URL에 포트 번호(:숫자)가 포함되어 있는지를 확인
"""
# 정규표현식으로 ":숫자" 형태를 찾음 (예: :8080)
match = re.search(r':\d{2,5}(?=/|$)', url)
return int(bool(match))
# 반복되는 같은 패턴 갯수
def repeated_char_count(url):
repeated = re.findall(r'(.)\1{2,}', url) # 같은 문자 3번 이상 반복
# '0'은 제외하고 나머지 문자 개수만 카운트
filtered = [char for char in repeated if char != '0']
return len(filtered)
# 3-gram 엔트로피를 샤논 엔트로피 방식으로 계산하고 정규화를 수행
def ngram_entropy_norm(text, n=3):
if not text or len(text) < n:
return 0.0
ngrams = [text[i:i+n] for i in range(len(text) - n + 1)]
total = len(ngrams)
ngram_counts = Counter(ngrams)
# 엔트로피 계산
probs = [count / total for count in ngram_counts.values()]
entropy = -sum(p * math.log2(p) for p in probs)
# 정규화를 수행하고 리턴
max_entropy = math.log2(len(ngram_counts)) if len(ngram_counts) > 1 else 1
return entropy / max_entropy
# 문자열의 다양성을 계산
def unique_char_ratio(url: str) -> float:
if not url:
return 0.0
return len(set(url)) / len(url)
# .cc 형식으로 끝나는 국가 도메인을 가진 URL을 확인하는 피처
def has_country_domain(url):
try:
parsed = urlparse(url if "://" in url else "http://" + url)
hostname = parsed.hostname
if hostname:
domain_parts = hostname.lower().split('.')
if len(domain_parts) >= 2:
last_part = domain_parts[-1]
return 1 if last_part in ccTLDs else 0
except:
pass
return 0
# 악성 국가 도메인을 포함하는지 여부
def has_suspicious_tlds(url):
try:
# 스킴이 없으면 http:// 붙이기
parsed = urlparse(url if "://" in url else "http://" + url)
domain = parsed.netloc.lower()
domain_parts = domain.split('.')
if len(domain_parts) >= 2:
tld = domain_parts[-1]
return 1 if tld in suspicious_tlds else 0
except Exception as e:
pass
return 0
# 흔하게 사용되는 tld 포함 여부
def common_tld(url):
common_tlds = ['com', 'org', 'net', 'edu', 'gov', 'mil', 'io', 'co', 'info', 'biz']
extracted = tldextract.extract(url)
tld = extracted.suffix
is_common_tld = 1 if tld in common_tlds else 0
return is_common_tld
# 악성 url에 자주 사용되는 tld 포함 여부
def haz_tld(url):
haz_tlds = ['xyz', 'top', 'club', 'online', 'site', 'icu', 'vip', 'work', 'rest', 'fit']
extracted = tldextract.extract(url)
tld = extracted.suffix
is_haz_tld = 1 if tld in haz_tlds else 0
return is_haz_tld
# 축약형 url 포함 여부
def has_shortener(url):
try:
url_shorteners = ['bit.ly', 'tinyurl.com', 'goo.gl', 't.co', 'ow.ly', 'is.gd', 'buff.ly', 'adf.ly', 'tiny.cc']
extracted = tldextract.extract(url)
domain = f"{extracted.domain}.{extracted.suffix}"
return 1 if domain in url_shorteners else 0
except:
return 0
## 피처 정규화
# 데이터프레임에서 숫자 형식(int, float)의 피처를 정규화
# 이진 분류, 카테고리형인 컬럼은 제외
def normalize_features(df):
exclude_cols = ['label', 'use_of_ip', 'file_extension', \
'url_length_cat', 'has_suspicious_keyword', 'repeated_num', \
'numer', 'upper', 'has_query', 'has_numeric_subdomain']
# 정규화를 적용할 컬럼 탐색
num_cols = df.select_dtypes(include=['int64', 'float64']).columns
num_cols = [col for col in num_cols if col not in exclude_cols]
# StandardScaler를 사용하여 지정된 피처를 정규화
scaler = StandardScaler()
df[num_cols] = scaler.fit_transform(df[num_cols])
return df
## 전처리 함수 적용
# 입력 URL을 처리하는 함수
# String 형식의 URL을 입력 받아서 전처리 후 DataFrame 형식으로 Return
# 총 N개 컬럼(피처)을 생성
def preprocess_single_url(url):
# 초기 데이터프레임 생성
df = pd.DataFrame({'URL': [str(url)]})
# 전처리를 통해 컬럼(피처) 추가
df['subdomain_count'] = df['URL'].str.split('.').apply(lambda x: len(x) - 2) # 서브 도메인 갯수
df['letters_count'] = df['URL'].apply(count_letters) # 문자 갯수
df['digits_count'] = df['URL'].apply(count_digits) # 숫자 갯수
df['special_chars_count'] = df['URL'].apply(count_special_chars) # 특수 문자 갯수
df['use_of_ip'] = df['URL'].apply(lambda i: having_ip_address(i)) # ip 주소 형식 사용 여부
df['path_depth'] = df['URL'].str.count('/') # / 갯수를 활용한 URL 깊이
df['max_numeric_sequence'] = df['URL'].apply(lambda x: max([len(seq) for seq in re.findall(r'\d+', x)] or [0])) # 최대 연속된 숫자 길이
df['file_extension'] = df['URL'].apply(lambda i: file_ext(i)) # 확장자 포함 여부
df['special_char_count'] = df['URL'].apply(lambda x: sum(1 for c in x if c in '-_/')) # 특수 문자('-', '_', '/') 개수
df['url_length_cat'] = df['letters_count'].apply(categorize_url_length) # url 길이에 따른 범주화
df['suspicious_keyword_count'] = df['URL'].apply(count_suspicious_keywords) # 위험 단어 보유 갯수
df['has_suspicious_keyword'] = (df['suspicious_keyword_count'] > 0).astype(int) # 위험 단어 보유 여부
df['repeated'] = df['URL'].apply(repeated)
df['repeated_num'] = df['URL'].apply(repeated_num)
df['num_underbar'] = df['URL'].apply(lambda url : url.count("_"))
df['numer'] = df['URL'].apply(lambda url : int(bool(len(re.findall(r'(\d)(?!\1)(\d)(?!\2)(\d)', url)))))
df["upper"] = df['URL'].apply(lambda url : int(any(c.isupper() for c in url)))
df['has_query'] = df['URL'].apply(has_query_f)
df['query_length'] = df['URL'].apply(query_length_f)
df['query_param_count'] = df['URL'].apply(query_params_f)
df['entropy'] = df['URL'].apply(entropy_f)
df['has_numeric_subdomain'] = df['URL'].apply(subdomain_f)
# 생성된 피처에 정규화 수행
df = normalize_features(df)
df['has_port_number'] = df['URL'].apply(has_port_number)
df['consonant_ratio'] = df['URL'].apply(consonant_ratio)
df['repeated_char_count'] = df['URL'].apply(repeated_char_count)
df['ngram_entropy_norm'] = df['URL'].apply(ngram_entropy_norm)
df['unique_char_ratio'] = df['URL'].apply(unique_char_ratio)
df['has_country_domain'] = df['URL'].apply(has_country_domain)
scaler = MinMaxScaler()
for col in nor_col:
new_col_name = f'{col}_scaled'
df[new_col_name] = scaler.fit_transform(df[[col]])
df.drop(col, axis = 1, inplace = True)
df['has_suspicious_tlds'] = df['URL'].apply(has_suspicious_tlds)
df['has_common_tlds'] = df['URL'].apply(common_tld)
df['has_hazardous_tlds'] = df['URL'].apply(haz_tld)
df['has_shorteners'] = df['URL'].apply(has_shortener)
df.drop(columns = ['letters_count', 'special_char_count'], axis = 1, inplace = True)
return df

0
backend/app/__init__.py Normal file
View File

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -0,0 +1,8 @@
{
"folders": [
{
"path": ".."
}
],
"settings": {}
}

52
backend/app/exe.py Normal file
View File

@@ -0,0 +1,52 @@
from app.junPreP import extract_features
import numpy as np
import pickle
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from tensorflow.keras.models import load_model
import tensorflow as tf
import os
# 모델 및 스케일러 경로 (FastAPI 기준으로 맞춰서 절대 경로 또는 경로 설정)
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
MODEL_PATH = os.path.join(BASE_DIR, "models", "Recall_0.77.keras")
SCALER_PATH = os.path.join(BASE_DIR, "models", "scaler.pkl")
# 모델 및 스케일러 로드 (1회만 수행)
model = load_model(MODEL_PATH)
with open(SCALER_PATH, 'rb') as f:
scaler = pickle.load(f)
# @tf.function으로 추론 최적화
@tf.function(reduce_retracing=True)
def predict_with_model(model, input_data):
return model(input_data)
# Threshold (적절히 조정 가능)
BEST_THRESHOLD = 0.4034
# 📦 예측 함수 정의 (FastAPI에서 import해서 사용)
def predict_url_maliciousness(url: str) -> dict:
# 특성 추출
features = extract_features(url)
input_df = pd.DataFrame([list(features.values())], columns=features.keys())
# 스케일링
input_scaled = scaler.transform(input_df)
# 예측
prediction = predict_with_model(model, input_scaled)
malicious_prob = float(prediction[0][0])
# 임계값 기반 판단
is_malicious = bool(malicious_prob > BEST_THRESHOLD)
# Ensure all values are Python native types (not numpy types)
return {
"url": str(url),
"malicious_probability": float(malicious_prob),
"is_malicious": bool(is_malicious),
"threshold": float(BEST_THRESHOLD)
}

204
backend/app/junPreP.py Normal file
View File

@@ -0,0 +1,204 @@
import re
from urllib.parse import urlparse, parse_qs
import tldextract
import zlib
import re
from urllib.parse import urlparse
from collections import Counter
import math
def check_similar_brand(url):
# 자주 사용되는 브랜드/도메인 목록
common_brands = {
'google', 'facebook', 'amazon', 'microsoft', 'apple',
'netflix', 'paypal', 'twitter', 'instagram', 'linkedin',
'youtube', 'yahoo', 'gmail', 'whatsapp', 'tiktok',
'geocities', 'angelfire', 'newadvent', 'wikipedia',
}
# 2. 유사 브랜드 확인
try:
# URL 파싱
parsed = urlparse(url if '//' in url else '//' + url)
domain = parsed.netloc.lower() if parsed.netloc else url.lower()
for brand in common_brands:
if brand not in domain:
similar = False
# 비슷한 철자 패턴 확인
patterns = [
brand.replace('o', '0'),
brand.replace('i', '1'),
brand.replace('l', '1'),
brand.replace('e', '3'),
brand.replace('a', '4'),
brand.replace('s', '5'),
brand + '-',
brand + '_',
brand[:-1], # 마지막 문자 제거
''.join(c + c for c in brand), # 문자 중복
]
for pattern in patterns:
if pattern in domain:
similar = True
break
if similar:
return True # 유사 브랜드가 발견되면 True 반환
except Exception as e:
return False # 예외 발생 시 False 반환
return False # 유사 브랜드가 없으면 False 반환
# url 압축 비율 계산 함수
def compression_ratio(url: str) -> float:
if not url:
return 0.0
original_length = len(url.encode('utf-8'))
compressed_data = zlib.compress(url.encode('utf-8'))
compressed_length = len(compressed_data)
return compressed_length / original_length
def extract_features(url):
parsed_url = urlparse(url)
suspicious_keywords = [
'login', 'verify', 'account', 'update', 'secure', 'banking',
'paypal', 'confirm', 'signin', 'auth', 'redirect', 'free',
'bonus', 'admin', 'support', 'server', 'password', 'click',
'urgent', 'immediate', 'alert', 'security', 'prompt'
]
additional_keywords = [
'verify', 'wallet', 'cryptocurrency', 'bitcoin', 'ethereum',
'validation', 'authenticate', 'reset', 'recover', 'access',
'limited', 'offer', 'prize', 'win', 'winner', 'payment',
'bank', 'credit', 'debit', 'card', 'expire', 'suspension',
'unusual', 'activity', 'verify', 'document', 'invoice'
]
all_keywords = list(set(suspicious_keywords + additional_keywords))
contains_keyword = 0
keyword_count = 0
for keyword in all_keywords:
if re.search(r'\b' + keyword + r'\b', url, re.IGNORECASE):
contains_keyword = 1
keyword_count += 1
url_length = len(url)
extracted = tldextract.extract(url)
tld = extracted.suffix
domain = extracted.domain
subdomain = extracted.subdomain
tld_length = len(tld) if tld else 0
common_tlds = ['com', 'org', 'net', 'edu', 'gov', 'mil', 'io', 'co', 'info', 'biz']
is_common_tld = 1 if tld in common_tlds else 0
country_tlds = ['us', 'uk', 'ca', 'au', 'de', 'fr', 'jp', 'cn', 'ru', 'br', 'in', 'it', 'es']
is_country_tld = 1 if tld in country_tlds else 0
suspicious_tlds = ['xyz', 'top', 'club', 'online', 'site', 'icu', 'vip', 'work', 'rest', 'fit']
is_suspicious_tld = 1 if tld in suspicious_tlds else 0
url_shorteners = ['bit.ly', 'tinyurl.com', 'goo.gl', 't.co', 'ow.ly', 'is.gd', 'buff.ly', 'adf.ly', 'tiny.cc']
full_domain = f"{domain}.{tld}" if tld else domain
is_shortened = 1 if full_domain in url_shorteners else 0
domain_length = len(domain) if domain else 0
has_subdomain = 1 if subdomain else 0
subdomain_length = len(subdomain) if subdomain else 0
subdomain_count = len(subdomain.split('.')) if subdomain else 0
path = parsed_url.path
path_length = len(path)
path_depth = path.count('/') if path else 0
query = parsed_url.query
has_query = 1 if query else 0
query_length = len(query) if query else 0
query_params = parse_qs(query)
query_param_count = len(query_params) if query_params else 0
has_fragment = 1 if parsed_url.fragment else 0
fragment_length = len(parsed_url.fragment) if parsed_url.fragment else 0
# Character type ratios
letter_count = sum(c.isalpha() for c in url)
digit_count = sum(c.isdigit() for c in url)
special_char_count = len(re.findall(r'[^a-zA-Z0-9]', url))
letter_ratio = letter_count / url_length if url_length > 0 else 0
digit_ratio = digit_count / url_length if url_length > 0 else 0
special_char_ratio = special_char_count / url_length if url_length > 0 else 0
# Character distribution and entropy
if url:
char_counts = Counter(url)
total_chars = len(url)
char_frequencies = {char: count/total_chars for char, count in char_counts.items()}
entropy = -sum(freq * math.log2(freq) for freq in char_frequencies.values())
else:
entropy = 0
if url_length <= 13:
url_length_cat = 0
elif url_length <= 18:
url_length_cat = 1
elif url_length <= 25:
url_length_cat = 2
else:
url_length_cat = 3
return {
# "url_length": url_length,
"url_length_cat": url_length_cat,
"num_dots": url.count("."),
"num_digits": sum(c.isdigit() for c in url),
"num_special_chars": len(re.findall(r"[^a-zA-Z0-9]", url)),
"url_keyword": contains_keyword,
# "url_keyword_count": keyword_count,
"num_underbar": url.count("_"),
"extract_consecutive_numbers": int(bool(re.findall(r'(\d)\1+', url))),
"number": int(bool(len(re.findall(r'(\d)(?!\1)(\d)(?!\2)(\d)', url)))),
"upper": int(any(c.isupper() for c in url)),
"is_common_tld": is_common_tld,
"is country_tld": is_country_tld,
"is_suspicious_tld": is_suspicious_tld,
"domain_length": domain_length,
"has_subdomain": has_subdomain,
"subdomain_length": subdomain_length,
"subdomain_count": subdomain_count,
# "path_length": path_length,
"path_depth": path_depth,
"has_query": has_query,
"query_length": query_length,
"query_param_count": query_param_count,
# "has_fragment": has_fragment,
# "fragment_length": fragment_length,
"url_shorteners": is_shortened,
# 새로 추가된 특성
"compression_ratio": compression_ratio(url),
"check_similar_brand" : check_similar_brand(url),
# Advanced text analysis
"entropy": entropy,
#"letter_ratio": letter_ratio,
"digit_ratio": digit_ratio,
"special_char_ratio": special_char_ratio
}

40
backend/app/main.py Normal file
View File

@@ -0,0 +1,40 @@
from fastapi import FastAPI
from pydantic import BaseModel
from app.model_load import use_model # predictor.py에서 함수 import
from app.exe import predict_url_maliciousness
from app.utils import convert_numpy_to_python_types
from fastapi.middleware.cors import CORSMiddleware
app = FastAPI()
# FastAPI 인스턴스에 CORS 미들웨어 추가
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # 또는 ["http://localhost:3000"] (프론트 URL)
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 요청 데이터 구조 정의
class UrlRequest(BaseModel):
url: str
@app.get("/")
def root():
return {"message": "URL 악성 판별기 FastAPI 서버 정상 작동 중!"}
@app.post("/predict")
def predict(request: UrlRequest):
url = request.url
result_model1 = convert_numpy_to_python_types(use_model(url))
result_model2 = convert_numpy_to_python_types(predict_url_maliciousness(url))
response_data = {
"url": url,
"model1": result_model1,
"model2": result_model2
}
return convert_numpy_to_python_types(response_data)

34
backend/app/model_load.py Normal file
View File

@@ -0,0 +1,34 @@
from app.PreP import preprocess_single_url
import joblib
import numpy as np
import os
# 현재 파일 기준 경로
current_dir = os.path.dirname(os.path.abspath(__file__))
# 절대 경로로 모델 로드
models_load = [
joblib.load(os.path.join(current_dir, 'models', f'sampled_xgboost_model_{i+1}.joblib'))
for i in range(4)
]
# 최적 임계값
best_threshold = 0.7211
# best_threshold = 0.7 # recall을 우선할 때의 임계값
def use_model(url : str):
# 전처리 함수를 호출하여 피처를 가진 데이터 프레임 생성
featured_df = preprocess_single_url(url)
# 모델에 적용할 피처 추출
features_cols = []
except_cols = ['URL'] # 모델에 적용하지 않을 피처
features_cols = (col for col in featured_df.columns if col not in except_cols)
input_data = featured_df[features_cols]
# 학습된 모델에 적용
model_pred = round(float(np.mean([model.predict_proba(input_data)[:, 1] for model in models_load])), 4)
return model_pred

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

BIN
backend/app/models/scaler.pkl Executable file

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

72
backend/app/predictor.py Normal file
View File

@@ -0,0 +1,72 @@
import joblib
import os
from app.PreP import preprocess_single_url
import numpy as np
# model directory
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
MODEL_DIR = os.path.join(BASE_DIR, "models")
'''
# 임의의 url을 입력하고 함수 적용하여 전처리
url_test = 'https://www.msn.com/ko-kr/news/other/%EC%84%9C%EB%B2%84%EC%97%90-%EC%82%AC%EC%A7%84-%EC%95%88-%EC%98%AC%EB%A6%AC%EA%B3%A0%EB%8F%84-%EC%A7%80%EB%B8%8C%EB%A6%AC%ED%92%8D%EC%9C%BC%EB%A1%9C-%EC%82%AC%EC%83%9D%ED%99%9C-%EC%B9%A8%ED%95%B4-%EB%A7%89%EB%8A%94-ai-%EA%B0%9C%EB%B0%9C/ar-AA1CyK6x?ocid=msedgntp&pc=U531&cvid=fdbbff03231b4babb8bf42b0036d8141&ei=9' # 테스트할 URL 입력
url_pre = preprocess_single_url(url_test)
'''
# 훈련에 사용할 features
features_cols = ['subdomain_count', 'letters_count', \
'digits_count', 'special_chars_count', 'use_of_ip', 'path_depth', \
'max_numeric_sequence', 'file_extension', 'special_char_count', \
'url_length_cat', 'suspicious_keyword_count', 'has_suspicious_keyword']
'''
# 적용할 데이터
input_data = url_pre[features_cols]
'''
## threshold 설정
BEST_THRESHOLD = 0.6563
## 모델 불러오기
# 단일 모델 불러오기
# model_load = joblib.load('xgboost_model_fold1.pkl')
# 여러 모델 불러오기
models_load = [joblib.load(os.path.join(MODEL_DIR, f'xgboost_model_fold{i+1}.pkl')) for i in range(4)]
## threshold 설정
BEST_THRESHOLD = 0.6563
def predict_url(url: str) -> dict:
try:
preprocessed = preprocess_single_url(url)
input_data = preprocessed[features_cols]
# ✅ 전처리된 데이터 확인
print("Preprocessed input:", input_data)
# 평균 확률 계산
probs = [float(model.predict_proba(input_data)[0, 1]) for model in models_load]
mean_pred = float(np.mean(probs))
# 모델 예측 결과 확인
print("Model prediction result:", mean_pred)
# 진단 결과 판단
is_malicious = bool(mean_pred > BEST_THRESHOLD)
# 예: malicious_probability가 np.float32 타입일 경우
return {
"url": url,
"malicious_probability": mean_pred, # ⬅️ numpy -> float
"is_malicious": bool(is_malicious), # ⬅️ numpy -> bool
"threshold": float(BEST_THRESHOLD) # ⬅️ numpy -> float
}
except Exception as e:
return {"error": str(e)}

18
backend/app/utils.py Normal file
View File

@@ -0,0 +1,18 @@
import numpy as np
def convert_numpy_to_python_types(obj):
"""
Recursively convert numpy types to native Python types.
"""
if isinstance(obj, np.ndarray):
return convert_numpy_to_python_types(obj.tolist())
elif isinstance(obj, np.number):
return float(obj) if isinstance(obj, np.floating) else int(obj)
elif isinstance(obj, np.bool_):
return bool(obj)
elif isinstance(obj, dict):
return {k: convert_numpy_to_python_types(v) for k, v in obj.items()}
elif isinstance(obj, list) or isinstance(obj, tuple):
return [convert_numpy_to_python_types(item) for item in obj]
else:
return obj

32
backend/requirements.txt Normal file
View File

@@ -0,0 +1,32 @@
annotated-types==0.7.0
anyio==4.9.0
certifi==2025.1.31
charset-normalizer==3.4.1
click==8.1.8
exceptiongroup==1.2.2
fastapi==0.115.12
h11==0.14.0
idna==3.10
joblib==1.4.2
numpy<2.0
pandas==2.2.3
pydantic==2.11.3
pydantic_core==2.33.1
python-dateutil==2.9.0.post0
pytz==2025.2
requests==2.32.3
scikit-learn==1.6.1
scipy==1.15.2
six==1.17.0
sniffio==1.3.1
starlette==0.46.1
threadpoolctl==3.6.0
typing-inspection==0.4.0
typing_extensions==4.13.1
tzdata==2025.2
urllib3==2.3.0
uvicorn==0.34.0
tldextract
xgboost
tensorflow

27
backend/start_server.sh Normal file
View File

@@ -0,0 +1,27 @@
#!/bin/bash
# Exit on error
set -e
# Navigate to the backend directory
cd "$(dirname "$0")"
# Check if virtual environment exists, create if it doesn't
if [ ! -d ".venv" ]; then
echo "Creating virtual environment..."
uv venv
fi
# Activate virtual environment
source .venv/bin/activate
# Install dependencies if needed
echo "Checking dependencies..."
uv pip install -r requirements.txt
# Start the FastAPI server
echo "Starting server..."
uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload
# Note: The --host 0.0.0.0 parameter makes the server accessible from other devices on the network
# Remove it if you only want local access