Skip to content
Snippets Groups Projects
Commit 21081a11 authored by 汪前's avatar 汪前
Browse files

newcommit

parent 1c712187
No related branches found
No related tags found
1 merge request!4networkmonitor
Showing with 341 additions and 0 deletions
File added
File added
File added
File added
File added
File added
from django.contrib import admin
# Register your models here.
from django.apps import AppConfig
class LogviewConfig(AppConfig):
default_auto_field = "django.db.models.BigAutoField"
name = "logview"
File added
from django.db import models
# Create your models here.
from talktive.hive_client import HiveConnectionPoolManager
class HiveQuery:
"""封装 Hive 数据库的查询"""
def __init__(self, db_name):
"""选择 Hive 数据源(如 hive2 或 hive3)"""
self.db_name = db_name
self.pool_manager = HiveConnectionPoolManager.instance()
def execute_query(self, sql, retry_attempts=50):
"""执行 SQL 查询,遇到 read timeout 重新创建连接并重试"""
conn = None
cursor = None
attempt = 0
while attempt < retry_attempts:
try:
conn = self.pool_manager.get_connection(self.db_name)
cursor = conn.cursor()
cursor.execute(sql)
columns = [desc[0] for desc in cursor.description] # 获取列名
rows = cursor.fetchall()
return [dict(zip(columns, row)) for row in rows] # 返回 [{列名: 值}]
except Exception as e:
error_msg = str(e).lower()
print(f"执行 SQL 失败 (尝试 {attempt + 1}/{retry_attempts}),错误: {error_msg}")
self.pool_manager.remove_connection(conn, self.db_name)
attempt += 1 # 记录重试次数
finally:
if cursor:
cursor.close()
if conn:
conn.close()
return {"error": "查询超时,重试失败"}
def get_app_info(self, page=1, page_size=10):
"""查询 `app_info` 表(分页)"""
offset = (page - 1) * page_size
sql = f"""
SELECT app_id, app_name, app_category
FROM app_info
LIMIT {page_size} OFFSET {offset}
"""
return self.execute_query(sql)
def get_ip_info(self, page=1, page_size=10):
"""查询 `ip_info` 表(分页)"""
offset = (page - 1) * page_size
sql = f"""
SELECT ip, region, country, city, lat, lon, isp
FROM ip_info
LIMIT {page_size} OFFSET {offset}
"""
return self.execute_query(sql)
def get_ip_details(self,ip):
sql=f"""
SELECT ip, region, country, city, lat, lon, isp
FROM ip_info where ip='{ip}'
"""
return self.execute_query(sql)
def get_network_logs(self, page=1, page_size=10):
"""查询 `network_logs` 表(分页)"""
offset = (page - 1) * page_size
sql = f"""
SELECT `timestamp`, source_ip, destination_ip, source_port, destination_port,
protocol, ip_version, cloud_ip, predicted_label, log_date
FROM network_logs
LIMIT {page_size} OFFSET {offset}
"""
return self.execute_query(sql)
def get_http_logs(self, ip,timestamp,page=1, page_size=10):
"""查询 `http_logs` 表(分页)"""
offset = (page - 1) * page_size
sql = f"""
SELECT `timestamp`, host_ip, source_ip, destination_ip, source_port, destination_port,
protocol, cloudip, fwd_header_length, packet_length, http_payload, predicted_label, log_date
FROM http_logs where host_ip='{ip}' and log_date='${timestamp}'
LIMIT {page_size} OFFSET {offset}
"""
return self.execute_query(sql)
def get_cloud_ip(self):
"""查询 `cloud_ip_info` 表(不分页)"""
sql = "SELECT cloudip FROM cloud_ip_info"
return self.execute_query(sql)
def get_all_ip(self, page=1, page_size=10):
"""查询 `source_ip_info` 表(分页)"""
offset = (page - 1) * page_size
sql = f"SELECT source_ip, destination_ip FROM source_ip_info LIMIT {page_size} OFFSET {offset}"
return self.execute_query(sql)
def get_network_logs_by_cloudip(self, cloudip, date, page=1, page_size=10):
"""按 `cloud_ip` 过滤 `network_logs` 表,按 `log_date` 查询"""
offset = (page - 1) * page_size
sql = f"""
SELECT `timestamp`, source_ip, destination_ip, source_port, destination_port,
protocol, ip_version, cloud_ip, predicted_label, log_date
FROM network_logs
WHERE cloud_ip = '{cloudip}' AND log_date = '{date}'
LIMIT {page_size} OFFSET {offset}
"""
return self.execute_query(sql)
def get_network_logs_by_ip(self, ip, date, page=1, page_size=10):
"""按 `source_ip` 过滤 `network_logs` 表"""
offset = (page - 1) * page_size
sql = f"""
SELECT `timestamp`, source_ip, destination_ip, source_port, destination_port,
protocol, ip_version, cloud_ip, predicted_label, log_date
FROM network_logs
WHERE source_ip = '{ip}' AND log_date = '{date}'
LIMIT {page_size} OFFSET {offset}
"""
return self.execute_query(sql)
from django.test import TestCase
# Create your tests here.
from django.urls import path
from . import views
urlpatterns = [
path("app_info/", views.app_info_view, name="app_info"),
path("ip_info/",views.ip_info_view,name="ip_info"),
path("stream_logs/", views.stream_log_cloudip, name="stream_log_cloudip"), # 改为无参数,日期用 `POST`
path("cloud_ip/", views.cloud_log, name="cloud_log"),
path("stream_logs_ip/", views.stream_log_ip, name="stream_log_ip"), # 改为无参数,日期用 `POST`
path("http_logs_ip/", views.http_log_ip, name="http_log_ip"),
path("ip_details/<str:ip>/",views.http_details,name="ip_details")
]
\ No newline at end of file
import json
import urllib.request
from datetime import datetime
from django.http import JsonResponse
from django.shortcuts import render
from django.contrib.auth.decorators import login_required
from .models import HiveQuery
def get_latest_date():
"""获取今天的日期作为查询起点"""
return datetime.today().strftime('%Y-%m-%d')
def generate_pagination_context(page):
"""
生成分页信息:
- 默认显示 `1 - 10`
- `<<` 跳转到上一组
- `>>` 跳转到下一组
"""
page_group_size = 10 # 每组最多 10 页
current_group = (page - 1) // page_group_size # 计算当前页所在的组
start_page = current_group * page_group_size + 1
end_page = start_page + page_group_size - 1
pages = list(range(start_page, end_page + 1))
return {
"pages": pages,
"prev_group": max(1, start_page - page_group_size),
"next_group": start_page + page_group_size
}
@login_required
def app_info_view(request):
"""渲染 `app_info` 页面"""
db_name = "hive3"
page = int(request.GET.get("page", 1))
page_size = int(request.GET.get("page_size", 10))
hive_query = HiveQuery(db_name)
data = hive_query.get_app_info(page, page_size)
pagination = generate_pagination_context(page)
return render(request, "capture/app_info.html", {"data": data, "page": page, "page_size": page_size, "pagination": pagination})
@login_required
def ip_info_view(request):
"""渲染 `ip_info` 页面"""
db_name = "hive3"
page = int(request.GET.get("page", 1))
page_size = int(request.GET.get("page_size", 10))
hive_query = HiveQuery(db_name)
data = hive_query.get_ip_info(page, page_size)
pagination = generate_pagination_context(page)
return render(request, "capture/ip_info.html", {"data": data, "page": page, "page_size": page_size, "pagination": pagination})
@login_required
def stream_log_cloudip(request):
db_name = "hive3"
if request.POST.get("date"):
now_date=request.POST.get("date")
cloud_ip=request.POST.get("cloud_ip")
page=int(request.POST.get("page",1))
page_size=int(request.POST.get("page_size",10))
hive_query=HiveQuery(db_name)
data=hive_query.get_network_logs_by_cloudip(cloudip=cloud_ip,date=now_date,page=page,page_size=page_size)
pagination=generate_pagination_context(page)
return render(request,"capture/stream_log_cloudip.html",
{"data":data,"page":page,"page_size":page_size,
"pagination":pagination,"cloudip":cloud_ip,"date":now_date})
else:
cloud_ip=request.GET.get("cloud_ip")
page=int(request.GET.get("page",1))
page_size = int(request.POST.get("page_size", 10))
if request.GET.get("date"):
now_date=request.GET.get("date")
else:
now_date=get_latest_date()
hive_query=HiveQuery(db_name)
data=hive_query.get_network_logs_by_cloudip(cloudip=cloud_ip,date=now_date,page=page,page_size=page_size)
pagination=generate_pagination_context(page)
return render(request,"capture/stream_log_cloudip.html",
{"data":data,"page":page,"page_size":page_size,
"pagination":pagination,"cloudip":cloud_ip,"date":now_date})
@login_required
def cloud_log(request):
db_name = "hive2"
hive_query=HiveQuery(db_name)
data=hive_query.get_cloud_ip()
print(data)
return render(request,"capture/cloud_ip.html",{"data":data})
@login_required
def stream_log_ip(request):
db_name="hive3"
if request.POST.get("date"):
now_date=request.POST.get("date")
ip=request.POST.get("ip")
page = int(request.POST.get("page", 1))
page_size = int(request.POST.get("page_size", 10))
hives=HiveQuery(db_name)
data=hives.get_network_logs_by_ip(ip,now_date,page,page_size)
pagination=generate_pagination_context(page)
return render(request,"capture/stream_log_ip.html",
{"data":data,"page":page,"page_size":page_size,
"pagination":pagination,"ip":ip,"date":now_date})
else:
if request.GET.get("date"):
now_date = request.GET.get("date")
else:
now_date = get_latest_date()
ip = request.GET.get("ip")
page = int(request.GET.get("page", 1))
page_size = int(request.GET.get("page_size", 10))
hives=HiveQuery(db_name)
data = hives.get_network_logs_by_ip(ip, now_date, page, page_size)
pagination = generate_pagination_context(page)
return render(request, "capture/stream_log_ip.html",
{"data": data, "page": page, "page_size": page_size,
"pagination": pagination,"ip":ip,"date":now_date})
@login_required
def http_log_ip(request):
db_name = "hive2"
if request.POST.get("date"):
now_date = request.POST.get("date")
ip=request.POST.get("ip")
page = int(request.POST.get("page", 1))
page_size = int(request.POST.get("page_size", 10))
hives = HiveQuery(db_name)
data = hives.get_http_logs(ip, now_date, page, page_size)
pagination=generate_pagination_context(page)
return render(request,"capture/stream_http_log.html",{"data": data, "page": page, "page_size": page_size,
"pagination": pagination,"ip":ip,"date":now_date})
else:
if request.GET.get("date"):
now_date = request.GET.get("date")
else:
now_date = get_latest_date()
ip = request.GET.get("ip")
page = int(request.GET.get("page", 1))
page_size = int(request.GET.get("page_size", 10))
hives = HiveQuery(db_name)
data = hives.get_http_logs(ip, now_date, page, page_size)
pagination = generate_pagination_context(page)
return render(request, "capture/stream_http_log.html", {"data": data, "page": page, "page_size": page_size,
"pagination": pagination,"ip":ip,"date":now_date})
def fetch_geolocation(ip):
"""调用 IP-API 获取地理信息(同步方式)"""
url = f"http://ip-api.com/json/{ip}"
try:
with urllib.request.urlopen(url, timeout=30) as response:
geo_data = json.loads(response.read().decode())
if geo_data.get("status") == "success":
return {
"country": geo_data.get("country"),
"region": geo_data.get("regionName"),
"city": geo_data.get("city"),
"lat": geo_data.get("lat"),
"lon": geo_data.get("lon"),
"isp": geo_data.get("isp"),
}
except Exception as e:
print(f"Error fetching geolocation for {ip}: {e}")
return {
"country": "UNKNOWN",
"region": "UNKNOWN",
"city": "UNKNOWN",
"lat": 0.0,
"lon": 0.0,
"isp": "UNKNOWN"
}
@login_required
def http_details(request, ip):
"""查询 `ip_info` 详情,若数据不完整则补充地理信息"""
db_name = "hive3"
hives = HiveQuery(db_name)
details = hives.get_ip_details(ip)
if not details: # 如果 details 为空
return JsonResponse({"error": "No data found"}, status=404)
ip_info = details[0] # 取第一条记录
# 如果 `country` 为 "UNKNOWN",则调用 `fetch_geolocation()` 补充数据
if ip_info.get("country") == "UNKNOWN":
geo_data = fetch_geolocation(ip)
ip_info.update(geo_data) # 更新缺失的数据
return JsonResponse(ip_info) # 返回完整的 IP 详情
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment