109 lines
3.4 KiB
Python
109 lines
3.4 KiB
Python
##Impor export model
|
|
import joblib
|
|
##Algs
|
|
from sklearn.svm import SVC
|
|
from sklearn.tree import DecisionTreeClassifier
|
|
from sklearn.neural_network import MLPClassifier
|
|
##Metrics
|
|
from sklearn.metrics import accuracy_score,confusion_matrix
|
|
##Preprocessing
|
|
from sklearn.preprocessing import StandardScaler
|
|
##Data
|
|
import pandas as pd
|
|
from tabulate import tabulate
|
|
|
|
|
|
def save_model(model,name):
|
|
joblib.dump(model, name)
|
|
|
|
def data_load():
|
|
head_names=["src_ip","received_bytes","send_bytes","port_duration","total_duration","jitter","response_icmp","differents_port","value"]
|
|
df = pd.read_csv('dataset_final.csv', delimiter=',', header=None) ##train
|
|
df_test = pd.read_csv('dataset_test_final.csv', delimiter=',', header=None) ##test
|
|
|
|
df[8] = 0
|
|
df_test[8] = 0
|
|
|
|
df.columns = head_names
|
|
df_test.columns = head_names
|
|
|
|
for index,row in df.iterrows():
|
|
if row['src_ip'] == '192.168.20.3':
|
|
df.at[index, 'value'] = 1
|
|
|
|
for index,row in df_test.iterrows():
|
|
if row['src_ip'] == '192.168.20.3':
|
|
df_test.at[index, 'value'] = 1
|
|
|
|
|
|
df.pop('src_ip')
|
|
df.pop('response_icmp')
|
|
y_train = df.pop('value')
|
|
x_train = df
|
|
|
|
df_test.pop('src_ip')
|
|
df_test.pop('response_icmp')
|
|
y_test = df_test.pop('value')
|
|
x_test = df_test
|
|
return x_train, y_train, x_test, y_test
|
|
|
|
def print_results(y_test,y_pred):
|
|
|
|
accuracy = accuracy_score(y_test, y_pred)
|
|
print(f"Acc del modelo: {accuracy}")
|
|
|
|
TN, FP, FN, TP = confusion_matrix(y_test, y_pred).ravel()
|
|
|
|
print("Confusion Matrix:")
|
|
data_table = [[TP, FN],
|
|
[FP, TN]]
|
|
table = tabulate(data_table, tablefmt="grid")
|
|
print(table)
|
|
print("Metrics:")
|
|
TPR = TP / (TP + FN)
|
|
FAR = FP / (FP + TN)
|
|
Prec = TP / (TP+ FP)
|
|
|
|
print("TPR: " + str(TPR))
|
|
print("FAR: " + str(FAR))
|
|
print("Prec: " + str(Prec))
|
|
|
|
def dt(x_train, y_train, x_test, y_test):
|
|
class_weights = {1: 0.5, 0: 3.5}
|
|
dt = DecisionTreeClassifier(max_depth=8, min_samples_leaf=13, min_samples_split=6, class_weight=class_weights)
|
|
dt.fit(x_train, y_train)
|
|
y_pred = dt.predict(x_test)
|
|
print_results(y_test,y_pred)
|
|
save_model(dt,'DT.pkl')
|
|
|
|
def mlp(x_train, y_train, x_test, y_test):
|
|
mlp = MLPClassifier(hidden_layer_sizes=(128, 64), max_iter=100, activation= 'tanh', solver= 'adam', learning_rate_init=0.0003)
|
|
mlp.fit(x_train, y_train)
|
|
y_pred = mlp.predict(x_test)
|
|
print_results(y_test,y_pred)
|
|
save_model(mlp,'MLP.pkl')
|
|
|
|
def svc(x_train, y_train, x_test, y_test):
|
|
x_train = StandardScaler().fit_transform(x_train)
|
|
x_test = StandardScaler().fit_transform(x_test)
|
|
svc = SVC(kernel='linear')
|
|
svc.fit(x_train, y_train)
|
|
y_pred = svc.predict(x_test)
|
|
print_results(y_test,y_pred)
|
|
save_model(svc,'SVC.pkl')
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
modelo = input("Please, enter model (DT,MLP,SVC): ")
|
|
if modelo == 'DT' or modelo == 'MLP' or modelo == 'SVC':
|
|
x_train, y_train, x_test, y_test = data_load()
|
|
if modelo == 'DT':
|
|
dt(x_train, y_train, x_test, y_test)
|
|
elif modelo == 'MLP':
|
|
mlp(x_train, y_train, x_test, y_test)
|
|
else:
|
|
svc(x_train, y_train, x_test, y_test)
|
|
else:
|
|
print("Modelo no valido")
|
|
|