netflow
Различия
Показаны различия между двумя версиями страницы.
| Следующая версия | Предыдущая версия | ||
| netflow [11.12.2025 23:18] – создано augin | netflow [12.12.2025 16:54] (текущий) – [настройка клиента] augin | ||
|---|---|---|---|
| Строка 67: | Строка 67: | ||
| INDEX idx_period (period_start, | INDEX idx_period (period_start, | ||
| ); | ); | ||
| + | </ | ||
| + | |||
| + | <code bash> | ||
| + | -- Подключитесь к базе данных | ||
| + | \c netflow_db | ||
| + | |||
| + | -- Предоставьте права на схему public вашему пользователю | ||
| + | GRANT ALL PRIVILEGES ON SCHEMA public TO netflow_user; | ||
| + | GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO netflow_user; | ||
| + | GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA public TO netflow_user; | ||
| + | |||
| + | -- Для автоматического предоставления прав на будущие таблицы | ||
| + | ALTER DEFAULT PRIVILEGES IN SCHEMA public | ||
| + | GRANT ALL PRIVILEGES ON TABLES TO netflow_user; | ||
| + | |||
| + | ALTER DEFAULT PRIVILEGES IN SCHEMA public | ||
| + | GRANT ALL PRIVILEGES ON SEQUENCES TO netflow_user; | ||
| </ | </ | ||
| ====== 3. Основной коллектор NetFlow ====== | ====== 3. Основной коллектор NetFlow ====== | ||
| Строка 72: | Строка 89: | ||
| # | # | ||
| """ | """ | ||
| - | Коллектор NetFlow | + | Исправленный коллектор NetFlow |
| - | Поддерживает NetFlow v5, v9 | + | |
| - | Требует: | + | |
| """ | """ | ||
| Строка 86: | Строка 101: | ||
| import psycopg2 | import psycopg2 | ||
| from psycopg2.extras import RealDictCursor | from psycopg2.extras import RealDictCursor | ||
| - | from typing import Dict, List, Tuple, Optional | + | from typing import Dict, List, Tuple, Optional, Any |
| import json | import json | ||
| import os | import os | ||
| from dotenv import load_dotenv | from dotenv import load_dotenv | ||
| + | import binascii | ||
| - | # Загрузка переменных окружения | ||
| load_dotenv() | load_dotenv() | ||
| + | |||
| + | # Получаем настройки из .env | ||
| + | log_level_str = os.getenv(' | ||
| + | log_file = os.getenv(' | ||
| + | |||
| + | # Сопоставляем строку с уровнем логирования | ||
| + | log_levels = { | ||
| + | ' | ||
| + | ' | ||
| + | ' | ||
| + | ' | ||
| + | ' | ||
| + | } | ||
| + | |||
| + | log_level = log_levels.get(log_level_str, | ||
| # Настройка логирования | # Настройка логирования | ||
| logging.basicConfig( | logging.basicConfig( | ||
| - | level=logging.INFO, | + | level=log_level, |
| format=' | format=' | ||
| handlers=[ | handlers=[ | ||
| - | logging.FileHandler(' | + | logging.FileHandler(log_file), |
| logging.StreamHandler() | logging.StreamHandler() | ||
| ] | ] | ||
| Строка 106: | Строка 136: | ||
| class NetFlowCollector: | class NetFlowCollector: | ||
| - | """ | + | """ |
| - | + | ||
| def __init__(self, | def __init__(self, | ||
| self.db_config = db_config | self.db_config = db_config | ||
| self.flow_queue = queue.Queue(maxsize=10000) | self.flow_queue = queue.Queue(maxsize=10000) | ||
| self.running = False | self.running = False | ||
| - | self.exporters = {} # Кэш экспортеров | + | self.exporters = {} |
| - | + | ||
| - | # Параметры | + | # Параметры |
| self.netflow_port = int(os.getenv(' | self.netflow_port = int(os.getenv(' | ||
| - | self.buffer_size = 4096 | + | self.buffer_size = 65535 |
| - | + | ||
| # Статистика | # Статистика | ||
| self.stats = { | self.stats = { | ||
| - | 'flows_received': 0, | + | 'packets_received': 0, |
| ' | ' | ||
| - | 'bytes_processed': 0, | + | 'parse_errors': 0, |
| - | ' | + | ' |
| + | ' | ||
| } | } | ||
| - | + | ||
| - | # Инициализация базы данных | + | |
| self.init_database() | self.init_database() | ||
| - | | + | |
| def init_database(self): | def init_database(self): | ||
| """ | """ | ||
| Строка 135: | Строка 165: | ||
| self.cursor = self.conn.cursor(cursor_factory=RealDictCursor) | self.cursor = self.conn.cursor(cursor_factory=RealDictCursor) | ||
| logger.info(" | logger.info(" | ||
| - | | + | |
| - | # Создаем таблицы | + | # Используем отдельную схему |
| + | schema = os.getenv(' | ||
| + | self.cursor.execute(f" | ||
| + | self.cursor.execute(f" | ||
| self.create_tables() | self.create_tables() | ||
| - | | + | |
| except Exception as e: | except Exception as e: | ||
| logger.error(f" | logger.error(f" | ||
| - | | + | |
| - | | + | |
| + | self.conn = psycopg2.connect(**self.db_config) | ||
| + | self.cursor = self.cursor = self.conn.cursor(cursor_factory=RealDictCursor) | ||
| + | self.create_tables() | ||
| + | self.conn.commit() | ||
| + | except Exception as e2: | ||
| + | logger.error(f" | ||
| + | raise | ||
| def create_tables(self): | def create_tables(self): | ||
| - | """ | + | """ |
| create_tables_sql = """ | create_tables_sql = """ | ||
| CREATE TABLE IF NOT EXISTS netflow_flows ( | CREATE TABLE IF NOT EXISTS netflow_flows ( | ||
| Строка 170: | Строка 213: | ||
| application_name VARCHAR(256), | application_name VARCHAR(256), | ||
| bidirectional BOOLEAN DEFAULT false, | bidirectional BOOLEAN DEFAULT false, | ||
| + | netflow_version INTEGER, | ||
| + | received_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | ||
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP | created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP | ||
| ); | ); | ||
| - | | + | |
| CREATE INDEX IF NOT EXISTS idx_netflow_timestamp ON netflow_flows(flow_start); | CREATE INDEX IF NOT EXISTS idx_netflow_timestamp ON netflow_flows(flow_start); | ||
| CREATE INDEX IF NOT EXISTS idx_netflow_src_ip ON netflow_flows(src_ip); | CREATE INDEX IF NOT EXISTS idx_netflow_src_ip ON netflow_flows(src_ip); | ||
| CREATE INDEX IF NOT EXISTS idx_netflow_dst_ip ON netflow_flows(dst_ip); | CREATE INDEX IF NOT EXISTS idx_netflow_dst_ip ON netflow_flows(dst_ip); | ||
| CREATE INDEX IF NOT EXISTS idx_netflow_exporter ON netflow_flows(exporter_ip); | CREATE INDEX IF NOT EXISTS idx_netflow_exporter ON netflow_flows(exporter_ip); | ||
| - | | + | |
| CREATE TABLE IF NOT EXISTS netflow_exporters ( | CREATE TABLE IF NOT EXISTS netflow_exporters ( | ||
| id SERIAL PRIMARY KEY, | id SERIAL PRIMARY KEY, | ||
| Строка 189: | Строка 234: | ||
| sampling_rate INTEGER DEFAULT 1 | sampling_rate INTEGER DEFAULT 1 | ||
| ); | ); | ||
| - | | + | |
| - | CREATE TABLE IF NOT EXISTS | + | CREATE TABLE IF NOT EXISTS |
| - | id SERIAL | + | id BIGSERIAL |
| - | | + | |
| - | | + | |
| - | | + | |
| - | | + | |
| - | | + | |
| - | | + | |
| - | total_packets BIGINT, | + | |
| - | flow_count | + | |
| - | created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP | + | |
| ); | ); | ||
| - | | ||
| - | CREATE INDEX IF NOT EXISTS idx_aggregates_period ON netflow_aggregates(period_start, | ||
| - | CREATE INDEX IF NOT EXISTS idx_aggregates_ips ON netflow_aggregates(src_ip, | ||
| """ | """ | ||
| - | | + | |
| try: | try: | ||
| self.cursor.execute(create_tables_sql) | self.cursor.execute(create_tables_sql) | ||
| Строка 214: | Строка 253: | ||
| logger.error(f" | logger.error(f" | ||
| self.conn.rollback() | self.conn.rollback() | ||
| - | | + | |
| - | def parse_netflow_v5(self, data: bytes, exporter_ip: | + | def log_debug(self, |
| - | """ | + | """ |
| + | try: | ||
| + | self.cursor.execute(""" | ||
| + | INSERT INTO netflow_debug_log | ||
| + | (exporter_ip, | ||
| + | VALUES (%s, %s, %s, %s, %s) | ||
| + | """, | ||
| + | psycopg2.Binary(data) if data else None, | ||
| + | len(data) if data else None)) | ||
| + | self.conn.commit() | ||
| + | except Exception as e: | ||
| + | logger.error(f" | ||
| + | |||
| + | def parse_netflow_v5_detailed(self, data: bytes, exporter_ip: | ||
| + | """ | ||
| flows = [] | flows = [] | ||
| - | | + | |
| + | # Логируем получение пакета | ||
| + | self.log_debug(exporter_ip, | ||
| + | f" | ||
| try: | try: | ||
| - | # Заголовок NetFlow v5 (24 байта) | + | # Проверяем минимальный размер |
| - | | + | |
| - | version, count, sys_uptime, unix_secs, unix_nsecs, flow_sequence, | + | logger.error(f"Пакет слишком короткий: {len(data)} байт") |
| - | + | ||
| - | if version != 5: | + | |
| - | logger.warning(f"Неверная версия NetFlow: {version}") | + | |
| return flows | return flows | ||
| - | | + | |
| + | # Парсим заголовок | ||
| + | try: | ||
| + | header = struct.unpack(' | ||
| + | version, count, sys_uptime, unix_secs, unix_nsecs, flow_sequence, | ||
| + | |||
| + | logger.debug(f" | ||
| + | |||
| + | if version != 5: | ||
| + | logger.warning(f" | ||
| + | return flows | ||
| + | |||
| + | except struct.error as e: | ||
| + | logger.error(f" | ||
| + | hex_data = binascii.hexlify(data[: | ||
| + | logger.error(f" | ||
| + | return flows | ||
| + | |||
| + | # Рассчитываем ожидаемый размер | ||
| + | expected_size = 24 + count * 48 | ||
| + | |||
| + | if len(data) != expected_size: | ||
| + | logger.warning(f" | ||
| + | logger.warning(f" | ||
| + | |||
| + | # Пробуем прочитать столько записей, | ||
| + | available_records = (len(data) - 24) // 48 | ||
| + | if available_records > 0: | ||
| + | count = available_records | ||
| + | logger.warning(f" | ||
| + | else: | ||
| + | logger.error(f" | ||
| + | return flows | ||
| offset = 24 | offset = 24 | ||
| - | record_size = 48 # Размер записи NetFlow v5 | + | |
| - | | + | |
| for i in range(count): | for i in range(count): | ||
| - | if offset + record_size | + | |
| - | break | + | # Проверяем, |
| - | + | | |
| - | | + | |
| - | + | | |
| - | # Разбор записи потока (48 байт) | + | |
| - | | + | |
| - | + | | |
| - | src_addr = socket.inet_ntoa(struct.pack(' | + | |
| - | dst_addr = socket.inet_ntoa(struct.pack(' | + | # Парсим |
| - | next_hop = socket.inet_ntoa(struct.pack(' | + | # Формат NetFlow v5 записи |
| - | input_intf = flow[3] | + | # src_addr(4), |
| - | output_intf = flow[4] | + | # input(2), output(2), packets(4), bytes(4), |
| - | packets = flow[5] | + | # first(4), last(4), src_port(2), |
| - | octets = flow[6] | + | # pad1(1), tcp_flags(1), |
| - | first = flow[7] | + | # src_as(2), dst_as(2), src_mask(1), |
| - | last = flow[8] | + | |
| - | src_port = flow[9] | + | # Распаковываем по частям |
| - | dst_port = flow[10] | + | src_addr_int |
| - | pad1 = flow[11] | + | |
| - | tcp_flags = flow[12] | + | |
| - | protocol = flow[13] | + | |
| - | tos = flow[14] | + | |
| - | src_as = flow[15] | + | dst_addr = socket.inet_ntoa(struct.pack(' |
| - | dst_as = flow[16] | + | next_hop = socket.inet_ntoa(struct.pack(' |
| - | src_mask = flow[17] | + | |
| - | dst_mask = flow[18] | + | |
| - | pad2 = flow[19] | + | output_intf = struct.unpack(' |
| - | + | packets = struct.unpack(' | |
| - | # Расчет времени начала и окончания потока | + | octets = struct.unpack(' |
| - | | + | first = struct.unpack(' |
| - | | + | last = struct.unpack(' |
| - | flow_duration = last - first | + | src_port = struct.unpack(' |
| - | + | dst_port = struct.unpack(' | |
| - | flow_data = { | + | |
| - | ' | + | # Байты 36-39: pad1(1), tcp_flags(1), |
| - | ' | + | |
| - | ' | + | tcp_flags = struct.unpack(' |
| - | ' | + | protocol = struct.unpack(' |
| - | ' | + | tos = struct.unpack(' |
| - | ' | + | |
| - | ' | + | |
| - | ' | + | dst_as = struct.unpack(' |
| - | ' | + | src_mask = struct.unpack(' |
| - | ' | + | dst_mask = struct.unpack(' |
| - | ' | + | |
| - | ' | + | # Байты 46-47: pad2 (2 байта) |
| - | ' | + | |
| - | ' | + | |
| - | ' | + | # Рассчитываем |
| - | ' | + | |
| - | ' | + | flow_start_ts |
| - | ' | + | |
| - | ' | + | |
| - | ' | + | flow_start = datetime.fromtimestamp(flow_start_ts) |
| - | ' | + | |
| - | ' | + | |
| - | } | + | |
| - | + | | |
| - | flows.append(flow_data) | + | flow_start = datetime.fromtimestamp(unix_secs) |
| - | | + | flow_end = datetime.fromtimestamp(unix_secs) |
| - | | + | flow_duration = 0 |
| + | |||
| + | | ||
| + | ' | ||
| + | ' | ||
| + | ' | ||
| + | ' | ||
| + | ' | ||
| + | ' | ||
| + | ' | ||
| + | ' | ||
| + | ' | ||
| + | ' | ||
| + | ' | ||
| + | ' | ||
| + | ' | ||
| + | ' | ||
| + | ' | ||
| + | ' | ||
| + | ' | ||
| + | ' | ||
| + | ' | ||
| + | ' | ||
| + | ' | ||
| + | ' | ||
| + | ' | ||
| + | | ||
| + | |||
| + | flows.append(flow_data) | ||
| + | |||
| + | # Логируем первую запись для отладки | ||
| + | if i == 0: | ||
| + | logger.debug(f" | ||
| + | | ||
| + | |||
| + | | ||
| + | logger.error(f" | ||
| + | hex_record | ||
| + | logger.error(f" | ||
| + | continue | ||
| + | | ||
| + | logger.error(f" | ||
| + | continue | ||
| + | |||
| + | offset += 48 | ||
| + | |||
| + | logger.debug(f" | ||
| except Exception as e: | except Exception as e: | ||
| - | logger.error(f" | + | logger.error(f" |
| - | | + | |
| + | logger.error(traceback.format_exc()) | ||
| + | self.log_debug(exporter_ip, | ||
| return flows | return flows | ||
| - | | + | |
| def _get_application_name(self, | def _get_application_name(self, | ||
| - | """ | + | """ |
| - | # Общие порты | + | |
| common_ports = { | common_ports = { | ||
| - | 80: ' | + | 80: ' |
| - | | + | 110: ' |
| - | | + | 5432: ' |
| - | | + | 21: ' |
| - | | + | 67: 'DHCP Server', |
| - | 110: ' | + | 514: ' |
| - | | + | 5060: ' |
| - | | + | 8443: ' |
| - | | + | 1433: ' |
| - | 5432: ' | + | 636: ' |
| - | | + | 143: ' |
| - | | + | |
| - | 21: ' | + | |
| - | | + | |
| - | | + | |
| - | | + | |
| - | 67: 'DHCP Server', | + | |
| - | | + | |
| - | | + | |
| - | 514: ' | + | |
| - | | + | |
| - | | + | |
| - | 5060: ' | + | |
| - | | + | |
| } | } | ||
| - | | + | |
| protocol_names = { | protocol_names = { | ||
| - | | + | |
| - | 17: 'UDP', | + | 47: ' |
| - | 1: 'ICMP', | + | |
| - | 2: 'IGMP', | + | |
| - | 47: ' | + | |
| - | | + | |
| - | | + | |
| } | } | ||
| - | | + | |
| - | if protocol | + | if protocol |
| - | | + | |
| - | if app_name: | + | return |
| - | return | + | |
| elif port < 1024: | elif port < 1024: | ||
| - | return f" | + | return f" |
| else: | else: | ||
| - | return f" | + | return f" |
| + | elif protocol in protocol_names: | ||
| + | return protocol_names[protocol] | ||
| else: | else: | ||
| - | return | + | return f" |
| - | + | ||
| - | def update_exporter_stats(self, | + | |
| - | """ | + | |
| - | try: | + | |
| - | # Проверяем существует ли экспортер | + | |
| - | self.cursor.execute(""" | + | |
| - | INSERT INTO netflow_exporters | + | |
| - | (exporter_ip, | + | |
| - | VALUES (%s, CURRENT_TIMESTAMP, | + | |
| - | ON CONFLICT (exporter_ip) | + | |
| - | DO UPDATE SET | + | |
| - | last_seen = CURRENT_TIMESTAMP, | + | |
| - | total_flows = netflow_exporters.total_flows + %s, | + | |
| - | total_bytes = netflow_exporters.total_bytes + %s | + | |
| - | """, | + | |
| - | + | ||
| - | self.conn.commit() | + | |
| - | except Exception as e: | + | |
| - | logger.error(f" | + | |
| - | self.conn.rollback() | + | |
| - | + | ||
| def save_flows_to_db(self, | def save_flows_to_db(self, | ||
| """ | """ | ||
| if not flows: | if not flows: | ||
| return | return | ||
| - | | + | |
| try: | try: | ||
| - | # Пакетная вставка | ||
| insert_sql = """ | insert_sql = """ | ||
| INSERT INTO netflow_flows ( | INSERT INTO netflow_flows ( | ||
| Строка 382: | Строка 479: | ||
| protocol, packets, bytes, flow_duration_ms, | protocol, packets, bytes, flow_duration_ms, | ||
| input_snmp, output_snmp, | input_snmp, output_snmp, | ||
| - | src_as, dst_as, next_hop, vlan_id, application_name | + | src_as, dst_as, next_hop, vlan_id, application_name, |
| + | netflow_version | ||
| ) VALUES %s | ) VALUES %s | ||
| """ | """ | ||
| - | + | ||
| - | # Подготовка данных | + | |
| values = [] | values = [] | ||
| total_bytes = 0 | total_bytes = 0 | ||
| - | | + | |
| for flow in flows: | for flow in flows: | ||
| values.append(( | values.append(( | ||
| flow[' | flow[' | ||
| - | flow[' | + | flow[' |
| - | | + | |
| - | | + | |
| - | | + | |
| - | | + | |
| + | | ||
| )) | )) | ||
| - | total_bytes += flow[' | + | total_bytes += flow.get(' |
| - | + | ||
| - | # Используем execute_values для | + | # Используем execute_values для пакетной вставки |
| from psycopg2.extras import execute_values | from psycopg2.extras import execute_values | ||
| execute_values(self.cursor, | execute_values(self.cursor, | ||
| - | | + | |
| # Обновляем статистику экспортера | # Обновляем статистику экспортера | ||
| - | exporter_ip | + | |
| - | self.update_exporter_stats(exporter_ip, | + | self.update_exporter_stats(exporter_ip, |
| - | + | ||
| self.conn.commit() | self.conn.commit() | ||
| - | + | ||
| - | # Обновляем статистику | + | |
| self.stats[' | self.stats[' | ||
| - | self.stats[' | + | |
| - | + | | |
| - | logger.debug(f" | + | logger.info(f" |
| - | + | ||
| except Exception as e: | except Exception as e: | ||
| logger.error(f" | logger.error(f" | ||
| self.conn.rollback() | self.conn.rollback() | ||
| - | | + | |
| + | # Попробуем сохранить по одному | ||
| + | self.save_flows_one_by_one(flows) | ||
| + | |||
| + | def save_flows_one_by_one(self, | ||
| + | """ | ||
| + | saved = 0 | ||
| + | for flow in flows: | ||
| + | try: | ||
| + | self.cursor.execute(""" | ||
| + | INSERT INTO netflow_flows ( | ||
| + | flow_start, flow_end, src_ip, dst_ip, src_port, dst_port, | ||
| + | protocol, packets, bytes, flow_duration_ms, | ||
| + | input_snmp, output_snmp, | ||
| + | src_as, dst_as, next_hop, vlan_id, application_name, | ||
| + | netflow_version | ||
| + | ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) | ||
| + | """, | ||
| + | flow[' | ||
| + | flow[' | ||
| + | flow[' | ||
| + | flow[' | ||
| + | flow[' | ||
| + | flow[' | ||
| + | flow[' | ||
| + | )) | ||
| + | saved += 1 | ||
| + | except Exception as e: | ||
| + | logger.error(f" | ||
| + | |||
| + | if saved > 0: | ||
| + | self.conn.commit() | ||
| + | logger.info(f" | ||
| + | |||
| + | def update_exporter_stats(self, | ||
| + | """ | ||
| + | try: | ||
| + | self.cursor.execute(""" | ||
| + | INSERT INTO netflow_exporters | ||
| + | (exporter_ip, | ||
| + | VALUES (%s, CURRENT_TIMESTAMP, | ||
| + | ON CONFLICT (exporter_ip) | ||
| + | DO UPDATE SET | ||
| + | last_seen = CURRENT_TIMESTAMP, | ||
| + | total_flows = netflow_exporters.total_flows + %s, | ||
| + | total_bytes = netflow_exporters.total_bytes + %s | ||
| + | """, | ||
| + | self.conn.commit() | ||
| + | except Exception as e: | ||
| + | logger.error(f" | ||
| def start_udp_listener(self): | def start_udp_listener(self): | ||
| - | """ | + | """ |
| sock = socket.socket(socket.AF_INET, | sock = socket.socket(socket.AF_INET, | ||
| sock.setsockopt(socket.SOL_SOCKET, | sock.setsockopt(socket.SOL_SOCKET, | ||
| - | | + | |
| try: | try: | ||
| sock.bind((' | sock.bind((' | ||
| + | sock.settimeout(1) | ||
| logger.info(f" | logger.info(f" | ||
| - | | + | |
| while self.running: | while self.running: | ||
| try: | try: | ||
| data, addr = sock.recvfrom(self.buffer_size) | data, addr = sock.recvfrom(self.buffer_size) | ||
| exporter_ip = addr[0] | exporter_ip = addr[0] | ||
| - | | + | |
| - | # Добавляем в очередь для обработки | + | # Логируем получение пакета |
| - | self.flow_queue.put((data, | + | self.stats[' |
| - | | + | |
| - | + | # Проверяем минимальный размер | |
| + | if len(data) < 24: | ||
| + | logger.warning(f" | ||
| + | continue | ||
| + | |||
| + | # Проверяем версию NetFlow | ||
| + | try: | ||
| + | version = struct.unpack(' | ||
| + | except: | ||
| + | logger.warning(f" | ||
| + | continue | ||
| + | |||
| + | | ||
| + | | ||
| + | | ||
| + | logger.info(f" | ||
| except socket.timeout: | except socket.timeout: | ||
| continue | continue | ||
| except Exception as e: | except Exception as e: | ||
| logger.error(f" | logger.error(f" | ||
| - | | + | |
| finally: | finally: | ||
| sock.close() | sock.close() | ||
| - | | + | logger.info(" |
| def process_flows(self): | def process_flows(self): | ||
| """ | """ | ||
| - | batch_size = 1000 # Размер пакета для вставки | + | batch_size = 100 |
| batch = [] | batch = [] | ||
| - | | + | |
| while self.running: | while self.running: | ||
| try: | try: | ||
| - | # Берем данные из очереди с таймаутом | ||
| data, exporter_ip = self.flow_queue.get(timeout=1) | data, exporter_ip = self.flow_queue.get(timeout=1) | ||
| - | | + | |
| - | # Парсим | + | # Парсим |
| - | flows = self.parse_netflow_v5(data, exporter_ip) | + | flows = self.parse_netflow_v5_detailed(data, exporter_ip) |
| - | batch.extend(flows) | + | |
| - | + | | |
| - | # Если накопили достаточно или прошло время - сохраняем | + | |
| + | |||
| + | # Периодическое сохранение | ||
| if len(batch) >= batch_size or (datetime.now() - self.stats[' | if len(batch) >= batch_size or (datetime.now() - self.stats[' | ||
| if batch: | if batch: | ||
| Строка 467: | Строка 634: | ||
| batch = [] | batch = [] | ||
| self.stats[' | self.stats[' | ||
| - | | + | |
| self.flow_queue.task_done() | self.flow_queue.task_done() | ||
| - | | + | |
| except queue.Empty: | except queue.Empty: | ||
| - | # Сохраняем оставшиеся данные | + | # Сохраняем остатки |
| if batch: | if batch: | ||
| self.save_flows_to_db(batch) | self.save_flows_to_db(batch) | ||
| batch = [] | batch = [] | ||
| continue | continue | ||
| - | | ||
| except Exception as e: | except Exception as e: | ||
| - | logger.error(f" | + | logger.error(f" |
| - | + | | |
| - | def aggregate_statistics(self): | + | logger.error(traceback.format_exc()) |
| - | """ | + | |
| - | while self.running: | + | |
| - | try: | + | |
| - | | + | |
| - | now = datetime.now() | + | |
| - | next_hour = (now + timedelta(hours=1)).replace(minute=0, | + | |
| - | sleep_seconds = (next_hour - now).seconds | + | |
| - | + | ||
| - | if sleep_seconds > 0: | + | |
| - | time.sleep(sleep_seconds) | + | |
| - | + | ||
| - | # Агрегируем данные за прошедший час | + | |
| - | hour_start = next_hour - timedelta(hours=1) | + | |
| - | hour_end = next_hour | + | |
| - | + | ||
| - | self.cursor.execute(""" | + | |
| - | INSERT INTO netflow_aggregates ( | + | |
| - | period_start, | + | |
| - | total_bytes, | + | |
| - | ) | + | |
| - | SELECT | + | |
| - | %s as period_start, | + | |
| - | %s as period_end, | + | |
| - | src_ip, | + | |
| - | dst_ip, | + | |
| - | protocol, | + | |
| - | SUM(bytes) as total_bytes, | + | |
| - | SUM(packets) as total_packets, | + | |
| - | COUNT(*) as flow_count | + | |
| - | FROM netflow_flows | + | |
| - | WHERE flow_start >= %s AND flow_start < %s | + | |
| - | GROUP BY src_ip, dst_ip, protocol | + | |
| - | """, | + | |
| - | + | ||
| - | self.conn.commit() | + | |
| - | logger.info(f" | + | |
| - | + | ||
| - | except Exception as e: | + | |
| - | logger.error(f" | + | |
| - | time.sleep(60) # Подождать минуту при ошибке | + | |
| - | + | ||
| def print_statistics(self): | def print_statistics(self): | ||
| - | """ | + | """ |
| while self.running: | while self.running: | ||
| try: | try: | ||
| - | time.sleep(60) # Выводим статистику | + | time.sleep(30) |
| - | | + | |
| + | | ||
| + | | ||
| logger.info(f""" | logger.info(f""" | ||
| - | === СТАТИСТИКА КОЛЛЕКТОРА === | + | === СТАТИСТИКА === |
| - | Принято потоков: {self.stats[' | + | Пакетов получено: {self.stats[' |
| - | | + | |
| - | Обработано байт: {self.stats[' | + | Ошибок парсинга: {self.stats[' |
| В очереди: | В очереди: | ||
| - | | + | ================== |
| """ | """ | ||
| - | | + | |
| except Exception as e: | except Exception as e: | ||
| logger.error(f" | logger.error(f" | ||
| - | | + | |
| - | def start(self): | + | def test_parse_function(self): |
| + | """ | ||
| + | logger.info(" | ||
| + | |||
| + | # Создаем тестовый сокет | ||
| + | sock = socket.socket(socket.AF_INET, | ||
| + | sock.settimeout(5) | ||
| + | |||
| + | try: | ||
| + | sock.bind((' | ||
| + | logger.info(f" | ||
| + | |||
| + | data, addr = sock.recvfrom(self.buffer_size) | ||
| + | logger.info(f" | ||
| + | |||
| + | # Тестируем парсер | ||
| + | flows = self.parse_netflow_v5_detailed(data, | ||
| + | |||
| + | logger.info(f" | ||
| + | |||
| + | if flows: | ||
| + | # Показываем первую запись | ||
| + | flow = flows[0] | ||
| + | logger.info(f" | ||
| + | logger.info(f" | ||
| + | logger.info(f" | ||
| + | logger.info(f" | ||
| + | logger.info(f" | ||
| + | logger.info(f" | ||
| + | |||
| + | # Пробуем сохранить | ||
| + | self.save_flows_to_db([flow]) | ||
| + | logger.info(" | ||
| + | |||
| + | except socket.timeout: | ||
| + | logger.info(" | ||
| + | except Exception as e: | ||
| + | logger.error(f" | ||
| + | import traceback | ||
| + | logger.error(traceback.format_exc()) | ||
| + | finally: | ||
| + | sock.close() | ||
| + | |||
| + | def start(self, test_mode=False): | ||
| """ | """ | ||
| + | if test_mode: | ||
| + | self.test_parse_function() | ||
| + | return | ||
| + | |||
| self.running = True | self.running = True | ||
| - | | + | |
| # Запускаем потоки | # Запускаем потоки | ||
| threads = [] | threads = [] | ||
| - | + | ||
| - | # UDP слушатель | + | |
| udp_thread = threading.Thread(target=self.start_udp_listener, | udp_thread = threading.Thread(target=self.start_udp_listener, | ||
| threads.append(udp_thread) | threads.append(udp_thread) | ||
| - | + | ||
| - | # Обработчик потоков | + | |
| processor_thread = threading.Thread(target=self.process_flows, | processor_thread = threading.Thread(target=self.process_flows, | ||
| threads.append(processor_thread) | threads.append(processor_thread) | ||
| - | + | ||
| - | # Агрегатор статистики | + | |
| - | aggregator_thread = threading.Thread(target=self.aggregate_statistics, | + | |
| - | threads.append(aggregator_thread) | + | |
| - | + | ||
| - | # Вывод статистики | + | |
| stats_thread = threading.Thread(target=self.print_statistics, | stats_thread = threading.Thread(target=self.print_statistics, | ||
| threads.append(stats_thread) | threads.append(stats_thread) | ||
| - | + | ||
| - | # Запускаем все потоки | + | |
| for thread in threads: | for thread in threads: | ||
| thread.start() | thread.start() | ||
| - | | + | |
| logger.info(" | logger.info(" | ||
| - | + | ||
| - | # Основной цикл | + | |
| try: | try: | ||
| while self.running: | while self.running: | ||
| Строка 575: | Строка 742: | ||
| except KeyboardInterrupt: | except KeyboardInterrupt: | ||
| logger.info(" | logger.info(" | ||
| + | finally: | ||
| self.stop() | self.stop() | ||
| - | | + | |
| def stop(self): | def stop(self): | ||
| """ | """ | ||
| logger.info(" | logger.info(" | ||
| self.running = False | self.running = False | ||
| - | | + | |
| # Ожидаем завершения очереди | # Ожидаем завершения очереди | ||
| self.flow_queue.join() | self.flow_queue.join() | ||
| - | | + | |
| - | # Закрываем соединение | + | # Закрываем соединение |
| if hasattr(self, | if hasattr(self, | ||
| self.cursor.close() | self.cursor.close() | ||
| if hasattr(self, | if hasattr(self, | ||
| self.conn.close() | self.conn.close() | ||
| - | | + | |
| logger.info(" | logger.info(" | ||
| - | # Конфигурация из переменных окружения | ||
| def load_config(): | def load_config(): | ||
| + | """ | ||
| return { | return { | ||
| ' | ' | ||
| Строка 604: | Строка 772: | ||
| if __name__ == " | if __name__ == " | ||
| - | | + | |
| + | |||
| + | parser = argparse.ArgumentParser(description=" | ||
| + | parser.add_argument(' | ||
| + | parser.add_argument(' | ||
| + | args = parser.parse_args() | ||
| config = load_config() | config = load_config() | ||
| - | | + | |
| - | | + | |
| + | os.environ[' | ||
| collector = NetFlowCollector(config) | collector = NetFlowCollector(config) | ||
| - | | + | |
| try: | try: | ||
| - | collector.start() | + | collector.start(test_mode=args.test) |
| except Exception as e: | except Exception as e: | ||
| logger.error(f" | logger.error(f" | ||
| + | import traceback | ||
| + | logger.error(traceback.format_exc()) | ||
| collector.stop() | collector.stop() | ||
| </ | </ | ||
| Строка 774: | Строка 952: | ||
| Коллектор готов к работе в production-среде, | Коллектор готов к работе в production-среде, | ||
| + | ====== настройка клиента====== | ||
| + | <code bash> | ||
| + | show interface | ||
| + | interface Bridge0 ip flow both | ||
| + | ip flow-export destination 192.168.2.16 2055 | ||
| + | </ | ||
| + | <code bash> | ||
| + | Я например хочу повесить наблюдение за интерфейсом PPPoE0 | ||
| + | Для этого мне надо ввести следующую команду: | ||
| + | interface {name} ip flow (ingress|egress|both) | ||
| + | Т.е. в моем случае это: | ||
| + | interface PPPoE0 ip flow both | ||
| + | both — так как я хочу слушать как исходящий, | ||
| + | Дальше нужно указать адрес сервера, | ||
| + | Это делается командой: | ||
| + | ip flow-export destination {address} {port} | ||
| + | Посмотреть информацию о состоянии можно командой: | ||
| + | more proc:/ | ||
| + | more proc:/ | ||
| + | Версия Netflow по умолчанию используется 5. | ||
| + | Если надо поменять например на 9 или 10, то это можно сделать следующей командой: | ||
| + | system set net.netflow.protocol 9 | ||
| + | Отключить netflow можно следующими командами: | ||
| + | ip no flow-export destination | ||
| + | interaface PPPoE0 no ip flow | ||
| + | </ | ||
netflow.1765495082.txt.gz · Последнее изменение: — augin
