Инструменты пользователя

Инструменты сайта


netflow

Различия

Показаны различия между двумя версиями страницы.

Ссылка на это сравнение

Предыдущая версия справа и слеваПредыдущая версия
Следующая версия
Предыдущая версия
netflow [12.12.2025 08:27] – [настройка клиента] auginnetflow [12.12.2025 16:54] (текущий) – [настройка клиента] augin
Строка 89: Строка 89:
 #!/usr/bin/env python3 #!/usr/bin/env python3
 """ """
-Коллектор NetFlow для PostgreSQL +Исправленный коллектор NetFlow с детальной отладкой
-Поддерживает NetFlow v5, v9 +
-Требует: pip install nfstream psycopg2-binary+
 """ """
  
Строка 103: Строка 101:
 import psycopg2 import psycopg2
 from psycopg2.extras import RealDictCursor from psycopg2.extras import RealDictCursor
-from typing import Dict, List, Tuple, Optional+from typing import Dict, List, Tuple, Optional, Any
 import json import json
 import os import os
 from dotenv import load_dotenv from dotenv import load_dotenv
 +import binascii
  
-# Загрузка переменных окружения 
 load_dotenv() load_dotenv()
 +
 +# Получаем настройки из .env
 +log_level_str = os.getenv('LOG_LEVEL', 'INFO').upper()
 +log_file = os.getenv('LOG_FILE', 'netflow_collector.log')
 +
 +# Сопоставляем строку с уровнем логирования
 +log_levels = {
 +    'DEBUG': logging.DEBUG,
 +    'INFO': logging.INFO,
 +    'WARNING': logging.WARNING,
 +    'ERROR': logging.ERROR,
 +    'CRITICAL': logging.CRITICAL
 +}
 +
 +log_level = log_levels.get(log_level_str, logging.INFO)
  
 # Настройка логирования # Настройка логирования
 logging.basicConfig( logging.basicConfig(
-    level=logging.INFO,+    level=log_level,
     format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',     format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
     handlers=[     handlers=[
-        logging.FileHandler('netflow_collector.log'),+        logging.FileHandler(log_file),
         logging.StreamHandler()         logging.StreamHandler()
     ]     ]
Строка 123: Строка 136:
  
 class NetFlowCollector: class NetFlowCollector:
-    """Коллектор NetFlow потоков""" +    """Коллектор NetFlow потоков с детальной отладкой""" 
-    +
     def __init__(self, db_config: Dict):     def __init__(self, db_config: Dict):
         self.db_config = db_config         self.db_config = db_config
         self.flow_queue = queue.Queue(maxsize=10000)         self.flow_queue = queue.Queue(maxsize=10000)
         self.running = False         self.running = False
-        self.exporters = {}  # Кэш экспортеров +        self.exporters = {} 
-         + 
-        # Параметры NetFlow+        # Параметры
         self.netflow_port = int(os.getenv('NETFLOW_PORT', 2055))         self.netflow_port = int(os.getenv('NETFLOW_PORT', 2055))
-        self.buffer_size = 4096 +        self.buffer_size = 65535 
-        +
         # Статистика         # Статистика
         self.stats = {         self.stats = {
-            'flows_received': 0,+            'packets_received': 0,
             'flows_processed': 0,             'flows_processed': 0,
-            'bytes_processed': 0, +            'parse_errors': 0, 
-            'last_flush': datetime.now()+            'last_flush': datetime.now()
 +            'versions': {}
         }         }
-         +
-        # Инициализация базы данных+
         self.init_database()         self.init_database()
-        +
     def init_database(self):     def init_database(self):
         """Инициализация подключения к базе данных"""         """Инициализация подключения к базе данных"""
Строка 152: Строка 165:
             self.cursor = self.conn.cursor(cursor_factory=RealDictCursor)             self.cursor = self.conn.cursor(cursor_factory=RealDictCursor)
             logger.info("Подключение к PostgreSQL установлено")             logger.info("Подключение к PostgreSQL установлено")
-             + 
-            # Создаем таблицы если их нет+            # Используем отдельную схему 
 +            schema = os.getenv('DB_SCHEMA', 'netflow'
 +            self.cursor.execute(f"CREATE SCHEMA IF NOT EXISTS {schema}"
 +            self.cursor.execute(f"SET search_path TO {schema}"
             self.create_tables()             self.create_tables()
-            +            self.conn.commit() 
         except Exception as e:         except Exception as e:
             logger.error(f"Ошибка подключения к PostgreSQL: {e}")             logger.error(f"Ошибка подключения к PostgreSQL: {e}")
-            raise +            # Попробуем без схемы 
-            +            try: 
 +                self.conn = psycopg2.connect(**self.db_config) 
 +                self.cursor = self.cursor = self.conn.cursor(cursor_factory=RealDictCursor) 
 +                self.create_tables() 
 +                self.conn.commit() 
 +            except Exception as e2: 
 +                logger.error(f"Критическая ошибка: {e2}"
 +                raise 
     def create_tables(self):     def create_tables(self):
-        """Создание таблиц если они не существуют"""+        """Создание таблиц"""
         create_tables_sql = """         create_tables_sql = """
         CREATE TABLE IF NOT EXISTS netflow_flows (         CREATE TABLE IF NOT EXISTS netflow_flows (
Строка 187: Строка 213:
             application_name VARCHAR(256),             application_name VARCHAR(256),
             bidirectional BOOLEAN DEFAULT false,             bidirectional BOOLEAN DEFAULT false,
 +            netflow_version INTEGER,
 +            received_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
             created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP             created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
         );         );
-        +
         CREATE INDEX IF NOT EXISTS idx_netflow_timestamp ON netflow_flows(flow_start);         CREATE INDEX IF NOT EXISTS idx_netflow_timestamp ON netflow_flows(flow_start);
         CREATE INDEX IF NOT EXISTS idx_netflow_src_ip ON netflow_flows(src_ip);         CREATE INDEX IF NOT EXISTS idx_netflow_src_ip ON netflow_flows(src_ip);
         CREATE INDEX IF NOT EXISTS idx_netflow_dst_ip ON netflow_flows(dst_ip);         CREATE INDEX IF NOT EXISTS idx_netflow_dst_ip ON netflow_flows(dst_ip);
         CREATE INDEX IF NOT EXISTS idx_netflow_exporter ON netflow_flows(exporter_ip);         CREATE INDEX IF NOT EXISTS idx_netflow_exporter ON netflow_flows(exporter_ip);
-        +
         CREATE TABLE IF NOT EXISTS netflow_exporters (         CREATE TABLE IF NOT EXISTS netflow_exporters (
             id SERIAL PRIMARY KEY,             id SERIAL PRIMARY KEY,
Строка 206: Строка 234:
             sampling_rate INTEGER DEFAULT 1             sampling_rate INTEGER DEFAULT 1
         );         );
-         + 
-        CREATE TABLE IF NOT EXISTS netflow_aggregates +        CREATE TABLE IF NOT EXISTS netflow_debug_log 
-            id SERIAL PRIMARY KEY, +            id BIGSERIAL PRIMARY KEY, 
-            period_start TIMESTAMP NOT NULL+            exporter_ip INET
-            period_end TIMESTAMP NOT NULL+            event_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
-            src_ip INET+            event_type VARCHAR(50)
-            dst_ip INET+            message TEXT
-            protocol INTEGER+            packet_data BYTEA
-            total_bytes BIGINT, +            packet_length INTEGER
-            total_packets BIGINT, +
-            flow_count INTEGER+
-            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP+
         );         );
-         
-        CREATE INDEX IF NOT EXISTS idx_aggregates_period ON netflow_aggregates(period_start, period_end); 
-        CREATE INDEX IF NOT EXISTS idx_aggregates_ips ON netflow_aggregates(src_ip, dst_ip); 
         """         """
-        +
         try:         try:
             self.cursor.execute(create_tables_sql)             self.cursor.execute(create_tables_sql)
Строка 231: Строка 253:
             logger.error(f"Ошибка создания таблиц: {e}")             logger.error(f"Ошибка создания таблиц: {e}")
             self.conn.rollback()             self.conn.rollback()
-             + 
-    def parse_netflow_v5(self, data: bytes, exporter_ip: str) -> List[Dict]: +    def log_debug(self, exporter_ip: str, event_type: str, message: str, data: bytes = None): 
-        """Парсинг NetFlow v5 пакета"""+        """Логирование отладочной информации""" 
 +        try: 
 +            self.cursor.execute(""" 
 +                INSERT INTO netflow_debug_log 
 +                (exporter_ip, event_type, message, packet_data, packet_length) 
 +                VALUES (%s, %s, %s, %s, %s) 
 +            """, (exporter_ip, event_type, message, 
 +                  psycopg2.Binary(data) if data else None, 
 +                  len(data) if data else None)) 
 +            self.conn.commit() 
 +        except Exception as e: 
 +            logger.error(f"Ошибка записи в debug log: {e}"
 + 
 +    def parse_netflow_v5_detailed(self, data: bytes, exporter_ip: str) -> List[Dict]: 
 +        """Детальный парсинг NetFlow v5 с отладкой"""
         flows = []         flows = []
-        + 
 +        # Логируем получение пакета 
 +        self.log_debug(exporter_ip, "PACKET_RECEIVED", 
 +                      f"Получен пакет размером {len(data)} байт", data[:100]) 
         try:         try:
-            # Заголовок NetFlow v5 (24 байта) +            # Проверяем минимальный размер 
-            header = struct.unpack('!HHIIIIBBH', data[:24]) +            if len(data) < 24
-            version, count, sys_uptime, unix_secs, unix_nsecs, flow_sequence, engine_type, engine_id, sampling_interval = header +                logger.error(f"Пакет слишком короткий: {len(data)байт")
-             +
-            if version != 5+
-                logger.warning(f"Неверная версия NetFlow: {version}")+
                 return flows                 return flows
-            + 
 +            # Парсим заголовок 
 +            try: 
 +                header = struct.unpack('!HHIIIIBBH', data[:24]) 
 +                version, count, sys_uptime, unix_secs, unix_nsecs, flow_sequence, engine_type, engine_id, sampling_interval = header 
 + 
 +                logger.debug(f"NetFlow v5 заголовок: version={version}, count={count}, sys_uptime={sys_uptime}"
 + 
 +                if version != 5: 
 +                    logger.warning(f"Ожидалась версия 5, получена {version}"
 +                    return flows 
 + 
 +            except struct.error as e: 
 +                logger.error(f"Ошибка парсинга заголовока: {e}"
 +                hex_data = binascii.hexlify(data[:24]).decode('ascii'
 +                logger.error(f"Данные заголовка (hex): {hex_data}"
 +                return flows 
 + 
 +            # Рассчитываем ожидаемый размер 
 +            expected_size = 24 + count * 48 
 + 
 +            if len(data) != expected_size: 
 +                logger.warning(f"Размер пакета не совпадает: ожидалось {expected_size}, получено {len(data)}"
 +                logger.warning(f"Возможно, это не NetFlow v5 или пакет поврежден"
 + 
 +                # Пробуем прочитать столько записей, сколько возможно 
 +                available_records = (len(data) - 24) // 48 
 +                if available_records > 0: 
 +                    count = available_records 
 +                    logger.warning(f"Будет прочитано {count} записей из возможных"
 +                else: 
 +                    logger.error(f"Недостаточно данных для чтения записей"
 +                    return flows 
             offset = 24             offset = 24
-            record_size = 48  # Размер записи NetFlow v5 +
-            +
             for i in range(count):             for i in range(count):
-                if offset + record_size > len(data): +                try: 
-                    break +                    # Проверяем, что достаточно данных для записи 
-                     +                    if offset + 48 > len(data): 
-                record = data[offset:offset + record_size+                        logger.warning(f"Недостаточно данных для записи {i+1}"
-                 +                        break 
-                Разбор записи потока (48 байт) + 
-                flow = struct.unpack('!IIIIIIIIHHBBBBHHBBBBH', record+                    # Извлекаем запись 
-                 +                    record_data = data[offset:offset + 48
-                src_addr = socket.inet_ntoa(struct.pack('!I', flow[0])) + 
-                dst_addr = socket.inet_ntoa(struct.pack('!I', flow[1])) +                    Парсим запись 
-                next_hop = socket.inet_ntoa(struct.pack('!I', flow[2])) +                    # Формат NetFlow v5 записи (48 байт): 
-                input_intf = flow[3+                    # src_addr(4), dst_addr(4), next_hop(4), 
-                output_intf = flow[4+                    # input(2), output(2), packets(4), bytes(4), 
-                packets = flow[5+                    # first(4), last(4), src_port(2), dst_port(2), 
-                octets = flow[6+                    # pad1(1), tcp_flags(1), protocol(1), tos(1), 
-                first = flow[7+                    # src_as(2), dst_as(2), src_mask(1), dst_mask(1), pad2(2) 
-                last = flow[8+ 
-                src_port = flow[9+                    # Распаковываем по частям 
-                dst_port = flow[10+                    src_addr_int = struct.unpack('!I', record_data[0:4])[0] 
-                pad1 = flow[11+                    dst_addr_int = struct.unpack('!I', record_data[4:8])[0] 
-                tcp_flags = flow[12+                    next_hop_int = struct.unpack('!I', record_data[8:12])[0] 
-                protocol = flow[13+ 
-                tos = flow[14+                    src_addr = socket.inet_ntoa(struct.pack('!I', src_addr_int)) 
-                src_as = flow[15+                    dst_addr = socket.inet_ntoa(struct.pack('!I', dst_addr_int)) 
-                dst_as = flow[16+                    next_hop = socket.inet_ntoa(struct.pack('!I', next_hop_int)) 
-                src_mask = flow[17+ 
-                dst_mask = flow[18+                    input_intf = struct.unpack('!H', record_data[12:14])[0
-                pad2 = flow[19+                    output_intf = struct.unpack('!H', record_data[14:16])[0
-                 +                    packets = struct.unpack('!I', record_data[16:20])[0
-                # Расчет времени начала и окончания потока +                    octets = struct.unpack('!I', record_data[20:24])[0
-                flow_start datetime.fromtimestamp(unix_secs - (sys_uptime - first) / 1000) +                    first = struct.unpack('!I', record_data[24:28])[0
-                flow_end datetime.fromtimestamp(unix_secs - (sys_uptime - last) / 1000) +                    last = struct.unpack('!I', record_data[28:32])[0
-                flow_duration = last - first +                    src_port = struct.unpack('!H', record_data[32:34])[0
-                 +                    dst_port = struct.unpack('!H', record_data[34:36])[0
-                flow_data = { + 
-                    'flow_start': flow_start, +                    # Байты 36-39: pad1(1), tcp_flags(1), protocol(1), tos(1) 
-                    'flow_end': flow_end, +                    pad1 = struct.unpack('!B', record_data[36:37])[0
-                    'src_ip': src_addr, +                    tcp_flags = struct.unpack('!B', record_data[37:38])[0
-                    'dst_ip': dst_addr, +                    protocol = struct.unpack('!B', record_data[38:39])[0
-                    'src_port': src_port, +                    tos = struct.unpack('!B', record_data[39:40])[0
-                    'dst_port': dst_port, + 
-                    'protocol': protocol, +                    src_as = struct.unpack('!H', record_data[40:42])[0
-                    'packets': packets, +                    dst_as = struct.unpack('!H', record_data[42:44])[0
-                    'bytes': octets, +                    src_mask = struct.unpack('!B', record_data[44:45])[0
-                    'flow_duration_ms': flow_duration, +                    dst_mask = struct.unpack('!B', record_data[45:46])[0
-                    'exporter_ip': exporter_ip, + 
-                    'input_snmp': input_intf, +                    # Байты 46-47: pad2 (2 байта) 
-                    'output_snmp': output_intf, +                    pad2 = struct.unpack('!H', record_data[46:48])[0
-                    'tcp_flags': tcp_flags, + 
-                    'src_tos': tos, +                    # Рассчитываем временные метки 
-                    'dst_tos': tos, +                    try: 
-                    'src_as': src_as, +                        flow_start_ts = unix_secs - (sys_uptime - first) / 1000.0 
-                    'dst_as': dst_as, +                        flow_end_ts = unix_secs - (sys_uptime - last) / 1000.0 
-                    'next_hop': next_hop, + 
-                    'vlan_id': None, +                        flow_start = datetime.fromtimestamp(flow_start_ts
-                    'application_name': self._get_application_name(protocol, dst_port), +                        flow_end = datetime.fromtimestamp(flow_end_ts) 
-                    'bidirectional': False +                        flow_duration = last - first 
-                +                    except Exception as time_err: 
-                 +                        logger.warning(f"Ошибка расчета времени для записи {i+1}: {time_err}"
-                flows.append(flow_data) +                        flow_start = datetime.fromtimestamp(unix_secs) 
-                offset += record_size +                        flow_end = datetime.fromtimestamp(unix_secs) 
-                +                        flow_duration = 0 
 + 
 +                    flow_data = { 
 +                        'flow_start': flow_start, 
 +                        'flow_end': flow_end, 
 +                        'src_ip': src_addr, 
 +                        'dst_ip': dst_addr, 
 +                        'src_port': src_port, 
 +                        'dst_port': dst_port, 
 +                        'protocol': protocol, 
 +                        'packets': packets, 
 +                        'bytes': octets, 
 +                        'flow_duration_ms': flow_duration, 
 +                        'exporter_ip': exporter_ip, 
 +                        'input_snmp': input_intf, 
 +                        'output_snmp': output_intf, 
 +                        'tcp_flags': tcp_flags, 
 +                        'src_tos': tos, 
 +                        'dst_tos': tos, 
 +                        'src_as': src_as, 
 +                        'dst_as': dst_as, 
 +                        'next_hop': next_hop, 
 +                        'vlan_id': None, 
 +                        'application_name': self._get_application_name(protocol, dst_port), 
 +                        'bidirectional': False, 
 +                        'netflow_version':
 +                    
 + 
 +                    flows.append(flow_data) 
 + 
 +                    # Логируем первую запись для отладки 
 +                    if i == 0: 
 +                        logger.debug(f"Первая запись: {src_addr}:{src_port} -> {dst_addr}:{dst_port},
 +                                   f"протокол: {protocol}, пакеты: {packets}, байты: {octets}"
 + 
 +                except struct.error as e: 
 +                    logger.error(f"Ошибка структуры в записи {i+1}: {e}"
 +                    hex_record binascii.hexlify(record_data).decode('ascii') if 'record_data' in locals() else "N/A" 
 +                    logger.error(f"Данные записи (hex): {hex_record}"
 +                    continue 
 +                except Exception as e: 
 +                    logger.error(f"Неожиданная ошибка в записи {i+1}: {e}"
 +                    continue 
 + 
 +                offset += 48 
 + 
 +            logger.debug(f"Успешно обработано {len(flows)} записей из {count}"
         except Exception as e:         except Exception as e:
-            logger.error(f"Ошибка парсинга NetFlow v5: {e}"+            logger.error(f"Критическая ошибка парсинга: {e}"
-            +            import traceback 
 +            logger.error(traceback.format_exc()) 
 +            self.log_debug(exporter_ip, "PARSE_ERROR", str(e), data[:200]) 
         return flows         return flows
-    +
     def _get_application_name(self, protocol: int, port: int) -> str:     def _get_application_name(self, protocol: int, port: int) -> str:
-        """Определение имени приложения по протоколу и порту""" +        """Определение имени приложения"""
-        # Общие порты+
         common_ports = {         common_ports = {
-            80: 'HTTP', +            80: 'HTTP', 443: 'HTTPS', 53: 'DNS', 22: 'SSH', 25: 'SMTP', 
-            443: 'HTTPS', +            110: 'POP3', 143: 'IMAP', 3389: 'RDP', 3306: 'MySQL', 
-            53: 'DNS', +            5432: 'PostgreSQL', 6379: 'Redis', 27017: 'MongoDB', 
-            22: 'SSH', +            21: 'FTP', 23: 'Telnet', 161: 'SNMP', 162: 'SNMP Trap', 
-            25: 'SMTP', +            67: 'DHCP Server', 68: 'DHCP Client', 123: 'NTP', 
-            110: 'POP3', +            514: 'Syslog', 1194: 'OpenVPN', 1723: 'PPTP', 
-            143: 'IMAP', +            5060: 'SIP', 5061: 'SIPS', 8080: 'HTTP-Proxy', 
-            3389: 'RDP', +            8443: 'HTTPS-Alt', 993: 'IMAPS', 995: 'POP3S', 
-            3306: 'MySQL', +            1433: 'MSSQL', 1521: 'Oracle', 389: 'LDAP', 
-            5432: 'PostgreSQL', +            636: 'LDAPS', 109: 'POP2', 110: 'POP3', 
-            6379: 'Redis', +            143: 'IMAP', 993: 'IMAPS', 995: 'POP3S'
-            27017: 'MongoDB', +
-            21: 'FTP', +
-            23: 'Telnet', +
-            161: 'SNMP', +
-            162: 'SNMP Trap', +
-            67: 'DHCP Server', +
-            68: 'DHCP Client', +
-            123: 'NTP', +
-            514: 'Syslog', +
-            1194: 'OpenVPN', +
-            1723: 'PPTP', +
-            5060: 'SIP', +
-            5061: 'SIPS'+
         }         }
-        +
         protocol_names = {         protocol_names = {
-            6: 'TCP', +            1: 'ICMP', 2: 'IGMP', 6: 'TCP', 17: 'UDP', 
-            17: 'UDP', +            47: 'GRE', 50: 'ESP', 51: 'AH', 89: 'OSPF'
-            1: 'ICMP', +
-            2: 'IGMP', +
-            47: 'GRE', +
-            50: 'ESP', +
-            51: 'AH'+
         }         }
-         + 
-        if protocol == or protocol == 17:  # TCP или UDP +        if protocol in [617]:  # TCP или UDP 
-            app_name = common_ports.get(port) +            if port in common_ports: 
-            if app_name+                return common_ports[port]
-                return app_name+
             elif port < 1024:             elif port < 1024:
-                return f"Системный порт {port}"+                return f"Системный-{port}"
             else:             else:
-                return f"Пользовательский порт {port}"+                return f"Пользовательский-{port}" 
 +        elif protocol in protocol_names: 
 +            return protocol_names[protocol]
         else:         else:
-            return protocol_names.get(protocol, f"Протокол {protocol}"+            return f"Протокол-{protocol}" 
-     +
-    def update_exporter_stats(self, exporter_ip: str, flow_count: int, bytes_count: int): +
-        """Обновление статистики экспортера""" +
-        try: +
-            # Проверяем существует ли экспортер +
-            self.cursor.execute(""" +
-                INSERT INTO netflow_exporters  +
-                (exporter_ip, first_seen, last_seen, total_flows, total_bytes) +
-                VALUES (%s, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, %s, %s) +
-                ON CONFLICT (exporter_ip)  +
-                DO UPDATE SET  +
-                    last_seen = CURRENT_TIMESTAMP, +
-                    total_flows = netflow_exporters.total_flows + %s, +
-                    total_bytes = netflow_exporters.total_bytes + %s +
-            """, (exporter_ip, flow_count, bytes_count, flow_count, bytes_count)) +
-             +
-            self.conn.commit() +
-        except Exception as e: +
-            logger.error(f"Ошибка обновления статистики экспортера: {e}"+
-            self.conn.rollback() +
-    +
     def save_flows_to_db(self, flows: List[Dict]):     def save_flows_to_db(self, flows: List[Dict]):
         """Сохранение потоков в базу данных"""         """Сохранение потоков в базу данных"""
         if not flows:         if not flows:
             return             return
-            +
         try:         try:
-            # Пакетная вставка 
             insert_sql = """             insert_sql = """
                 INSERT INTO netflow_flows (                 INSERT INTO netflow_flows (
Строка 399: Строка 479:
                     protocol, packets, bytes, flow_duration_ms, exporter_ip,                     protocol, packets, bytes, flow_duration_ms, exporter_ip,
                     input_snmp, output_snmp, tcp_flags, src_tos, dst_tos,                     input_snmp, output_snmp, tcp_flags, src_tos, dst_tos,
-                    src_as, dst_as, next_hop, vlan_id, application_name+                    src_as, dst_as, next_hop, vlan_id, application_name
 +                    netflow_version
                 ) VALUES %s                 ) VALUES %s
             """             """
-             +
-            # Подготовка данных+
             values = []             values = []
             total_bytes = 0             total_bytes = 0
-            +            exporter_ip = flows[0]['exporter_ip'] if flows else None 
             for flow in flows:             for flow in flows:
                 values.append((                 values.append((
                     flow['flow_start'], flow['flow_end'], flow['src_ip'], flow['dst_ip'],                     flow['flow_start'], flow['flow_end'], flow['src_ip'], flow['dst_ip'],
-                    flow['src_port'], flow['dst_port'], flow['protocol'], flow['packets'], +                    flow['src_port'or 0, flow['dst_port'or 0, flow['protocol'or 0, 
-                    flow['bytes'], flow['flow_duration_ms'], flow['exporter_ip'], +                    flow['packets'or 0, flow['bytes'or 0, flow['flow_duration_ms'or 0, 
-                    flow['input_snmp'], flow['output_snmp'], flow['tcp_flags'], +                    flow['exporter_ip'], flow['input_snmp'or 0, flow['output_snmp'or 0, 
-                    flow['src_tos'], flow['dst_tos'], flow['src_as'], flow['dst_as'], +                    flow['tcp_flags'or 0, flow['src_tos'or 0, flow['dst_tos'or 0, 
-                    flow['next_hop'], flow['vlan_id'], flow['application_name']+                    flow['src_as'or 0, flow['dst_as'or 0, flow['next_hop'], 
 +                    flow['vlan_id'], flow['application_name'], flow.get('netflow_version', 5)
                 ))                 ))
-                total_bytes += flow['bytes'] +                total_bytes += flow.get('bytes', 0) 
-             + 
-            # Используем execute_values для быстрой пакетной вставки+            # Используем execute_values для пакетной вставки
             from psycopg2.extras import execute_values             from psycopg2.extras import execute_values
             execute_values(self.cursor, insert_sql, values)             execute_values(self.cursor, insert_sql, values)
-            +
             # Обновляем статистику экспортера             # Обновляем статистику экспортера
-            exporter_ip = flows[0]['exporter_ip'] +            if exporter_ip: 
-            self.update_exporter_stats(exporter_ip, len(flows), total_bytes) +                self.update_exporter_stats(exporter_ip, len(flows), total_bytes) 
-            +
             self.conn.commit()             self.conn.commit()
-             +
-            # Обновляем статистику+
             self.stats['flows_processed'] += len(flows)             self.stats['flows_processed'] += len(flows)
-            self.stats['bytes_processed'] += total_bytes + 
-             +            if len(flows) > 0: 
-            logger.debug(f"Сохранено {len(flows)} потоков{total_bytesбайт") +                logger.info(f"Сохранено {len(flows)} потоков от {exporter_ip}") 
-            +
         except Exception as e:         except Exception as e:
             logger.error(f"Ошибка сохранения потоков в БД: {e}")             logger.error(f"Ошибка сохранения потоков в БД: {e}")
             self.conn.rollback()             self.conn.rollback()
-    + 
 +            # Попробуем сохранить по одному 
 +            self.save_flows_one_by_one(flows) 
 + 
 +    def save_flows_one_by_one(self, flows: List[Dict]): 
 +        """Сохранение потоков по одному (fallback)""" 
 +        saved = 0 
 +        for flow in flows: 
 +            try: 
 +                self.cursor.execute(""" 
 +                    INSERT INTO netflow_flows ( 
 +                        flow_start, flow_end, src_ip, dst_ip, src_port, dst_port, 
 +                        protocol, packets, bytes, flow_duration_ms, exporter_ip, 
 +                        input_snmp, output_snmp, tcp_flags, src_tos, dst_tos, 
 +                        src_as, dst_as, next_hop, vlan_id, application_name, 
 +                        netflow_version 
 +                    ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) 
 +                """,
 +                    flow['flow_start'], flow['flow_end'], flow['src_ip'], flow['dst_ip'], 
 +                    flow['src_port'] or 0, flow['dst_port'] or 0, flow['protocol'] or 0, 
 +                    flow['packets'] or 0, flow['bytes'] or 0, flow['flow_duration_ms'] or 0, 
 +                    flow['exporter_ip'], flow['input_snmp'] or 0, flow['output_snmp'] or 0, 
 +                    flow['tcp_flags'] or 0, flow['src_tos'] or 0, flow['dst_tos'] or 0, 
 +                    flow['src_as'] or 0, flow['dst_as'] or 0, flow['next_hop'], 
 +                    flow['vlan_id'], flow['application_name'], flow.get('netflow_version', 5) 
 +                )) 
 +                saved += 1 
 +            except Exception as e: 
 +                logger.error(f"Ошибка сохранения отдельного потока: {e}"
 + 
 +        if saved > 0: 
 +            self.conn.commit() 
 +            logger.info(f"Сохранено {saved}/{len(flows)} потоков по одному"
 + 
 +    def update_exporter_stats(self, exporter_ip: str, flow_count: int, bytes_count: int): 
 +        """Обновление статистики экспортера""" 
 +        try: 
 +            self.cursor.execute(""" 
 +                INSERT INTO netflow_exporters 
 +                (exporter_ip, first_seen, last_seen, total_flows, total_bytes) 
 +                VALUES (%s, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, %s, %s) 
 +                ON CONFLICT (exporter_ip) 
 +                DO UPDATE SET 
 +                    last_seen = CURRENT_TIMESTAMP, 
 +                    total_flows = netflow_exporters.total_flows + %s, 
 +                    total_bytes = netflow_exporters.total_bytes + %s 
 +            """, (exporter_ip, flow_count, bytes_count, flow_count, bytes_count)) 
 +            self.conn.commit() 
 +        except Exception as e: 
 +            logger.error(f"Ошибка обновления статистики экспортера: {e}"
     def start_udp_listener(self):     def start_udp_listener(self):
-        """Запуск UDP слушателя для NetFlow"""+        """Запуск UDP слушателя"""
         sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)         sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
         sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)         sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
-        +
         try:         try:
             sock.bind(('0.0.0.0', self.netflow_port))             sock.bind(('0.0.0.0', self.netflow_port))
 +            sock.settimeout(1)
             logger.info(f"Слушаем NetFlow на порту {self.netflow_port}")             logger.info(f"Слушаем NetFlow на порту {self.netflow_port}")
-            +
             while self.running:             while self.running:
                 try:                 try:
                     data, addr = sock.recvfrom(self.buffer_size)                     data, addr = sock.recvfrom(self.buffer_size)
                     exporter_ip = addr[0]                     exporter_ip = addr[0]
-                     + 
-                    # Добавляем в очередь для обработки +                    # Логируем получение пакета 
-                    self.flow_queue.put((data, exporter_ip)) +                    self.stats['packets_received'] += 1 
-                    self.stats['flows_received'] += 1 + 
-                    +                    # Проверяем минимальный размер 
 +                    if len(data) < 24: 
 +                        logger.warning(f"Слишком короткий пакет от {exporter_ip}: {len(data)} байт"
 +                        continue 
 + 
 +                    # Проверяем версию NetFlow 
 +                    try: 
 +                        version = struct.unpack('!H', data[:2])[0] 
 +                    except: 
 +                        logger.warning(f"Не могу определить версию NetFlow от {exporter_ip}"
 +                        continue 
 + 
 +                    if version == 5: 
 +                        self.flow_queue.put((data, exporter_ip)) 
 +                    else: 
 +                        logger.info(f"Получен NetFlow v{version} от {exporter_ip}, пока не обрабатываем") 
                 except socket.timeout:                 except socket.timeout:
                     continue                     continue
                 except Exception as e:                 except Exception as e:
                     logger.error(f"Ошибка приема UDP: {e}")                     logger.error(f"Ошибка приема UDP: {e}")
-                    +
         finally:         finally:
             sock.close()             sock.close()
-    +            logger.info("UDP сокет закрыт"
     def process_flows(self):     def process_flows(self):
         """Обработка потоков из очереди"""         """Обработка потоков из очереди"""
-        batch_size = 1000  # Размер пакета для вставки+        batch_size = 100
         batch = []         batch = []
-        +
         while self.running:         while self.running:
             try:             try:
-                # Берем данные из очереди с таймаутом 
                 data, exporter_ip = self.flow_queue.get(timeout=1)                 data, exporter_ip = self.flow_queue.get(timeout=1)
-                 + 
-                # Парсим NetFlow v5 +                # Парсим пакет 
-                flows = self.parse_netflow_v5(data, exporter_ip) +                flows = self.parse_netflow_v5_detailed(data, exporter_ip) 
-                batch.extend(flows) + 
-                 +                if flows: 
-                # Если накопили достаточно или прошло время - сохраняем+                    batch.extend(flows) 
 + 
 +                # Периодическое сохранение
                 if len(batch) >= batch_size or (datetime.now() - self.stats['last_flush']).seconds > 5:                 if len(batch) >= batch_size or (datetime.now() - self.stats['last_flush']).seconds > 5:
                     if batch:                     if batch:
Строка 484: Строка 634:
                         batch = []                         batch = []
                     self.stats['last_flush'] = datetime.now()                     self.stats['last_flush'] = datetime.now()
-                    +
                 self.flow_queue.task_done()                 self.flow_queue.task_done()
-                +
             except queue.Empty:             except queue.Empty:
-                # Сохраняем оставшиеся данные+                # Сохраняем остатки
                 if batch:                 if batch:
                     self.save_flows_to_db(batch)                     self.save_flows_to_db(batch)
                     batch = []                     batch = []
                 continue                 continue
-                 
             except Exception as e:             except Exception as e:
-                logger.error(f"Ошибка обработки потока: {e}") +                logger.error(f"Ошибка обработчика: {e}"
-     +                import traceback 
-    def aggregate_statistics(self): +                logger.error(traceback.format_exc()) 
-        """Агрегация статистики за час""" +
-        while self.running: +
-            try: +
-                # Ждем до конца часа +
-                now = datetime.now() +
-                next_hour = (now + timedelta(hours=1)).replace(minute=0, second=0, microsecond=0) +
-                sleep_seconds = (next_hour - now).seconds +
-                 +
-                if sleep_seconds > 0: +
-                    time.sleep(sleep_seconds) +
-                 +
-                # Агрегируем данные за прошедший час +
-                hour_start = next_hour - timedelta(hours=1) +
-                hour_end = next_hour +
-                 +
-                self.cursor.execute(""" +
-                    INSERT INTO netflow_aggregates ( +
-                        period_start, period_end, src_ip, dst_ip, protocol, +
-                        total_bytes, total_packets, flow_count +
-                    ) +
-                    SELECT  +
-                        %s as period_start, +
-                        %s as period_end, +
-                        src_ip, +
-                        dst_ip, +
-                        protocol, +
-                        SUM(bytes) as total_bytes, +
-                        SUM(packets) as total_packets, +
-                        COUNT(*) as flow_count +
-                    FROM netflow_flows +
-                    WHERE flow_start >= %s AND flow_start < %s +
-                    GROUP BY src_ip, dst_ip, protocol +
-                """, (hour_start, hour_end, hour_start, hour_end)) +
-                 +
-                self.conn.commit() +
-                logger.info(f"Агрегирована статистика за {hour_start} - {hour_end}"+
-                 +
-            except Exception as e: +
-                logger.error(f"Ошибка агрегации статистики: {e}"+
-                time.sleep(60 # Подождать минуту при ошибке +
-    +
     def print_statistics(self):     def print_statistics(self):
-        """Вывод статистики работы"""+        """Вывод статистики"""
         while self.running:         while self.running:
             try:             try:
-                time.sleep(60 Выводим статистику каждую минуту +                time.sleep(30) 
-                + 
 +                Обновляем статистику версий 
 +                versions_str = ', '.join([f'v{k}:{v}' for k, v in sorted(self.stats['versions'].items())]) 
                 logger.info(f"""                 logger.info(f"""
-                === СТАТИСТИКА КОЛЛЕКТОРА === +                === СТАТИСТИКА === 
-                Принято потоков: {self.stats['flows_received']} +                Пакетов получено: {self.stats['packets_received']} 
-                Обработано потоков: {self.stats['flows_processed']} +                Потоков обработано: {self.stats['flows_processed']} 
-                Обработано байт: {self.stats['bytes_processed']:,}+                Ошибок парсинга: {self.stats['parse_errors']}
                 В очереди: {self.flow_queue.qsize()}                 В очереди: {self.flow_queue.qsize()}
-                =============================+                ==================
                 """)                 """)
-                +
             except Exception as e:             except Exception as e:
                 logger.error(f"Ошибка вывода статистики: {e}")                 logger.error(f"Ошибка вывода статистики: {e}")
-     + 
-    def start(self):+    def test_parse_function(self): 
 +        """Тестирование функции парсинга""" 
 +        logger.info("Тестирование парсера..."
 + 
 +        # Создаем тестовый сокет 
 +        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 
 +        sock.settimeout(5) 
 + 
 +        try: 
 +            sock.bind(('0.0.0.0', self.netflow_port)) 
 +            logger.info(f"Ожидаем тестовый пакет на порту {self.netflow_port}..."
 + 
 +            data, addr = sock.recvfrom(self.buffer_size) 
 +            logger.info(f"Получен пакет от {addr[0]}, размер: {len(data)} байт"
 + 
 +            # Тестируем парсер 
 +            flows = self.parse_netflow_v5_detailed(data, addr[0]) 
 + 
 +            logger.info(f"Парсер обработал {len(flows)} потоков"
 + 
 +            if flows: 
 +                # Показываем первую запись 
 +                flow = flows[0] 
 +                logger.info(f"Первая запись:"
 +                logger.info(f"  Источник: {flow['src_ip']}:{flow['src_port']}"
 +                logger.info(f"  Назначение: {flow['dst_ip']}:{flow['dst_port']}"
 +                logger.info(f"  Протокол: {flow['protocol']}"
 +                logger.info(f"  Пакеты: {flow['packets']}, Байты: {flow['bytes']}"
 +                logger.info(f"  Начало: {flow['flow_start']}, Конец: {flow['flow_end']}"
 + 
 +                # Пробуем сохранить 
 +                self.save_flows_to_db([flow]) 
 +                logger.info("Запись сохранена в БД"
 + 
 +        except socket.timeout: 
 +            logger.info("Таймаут ожидания пакета"
 +        except Exception as e: 
 +            logger.error(f"Ошибка тестирования: {e}"
 +            import traceback 
 +            logger.error(traceback.format_exc()) 
 +        finally: 
 +            sock.close() 
 + 
 +    def start(self, test_mode=False):
         """Запуск коллектора"""         """Запуск коллектора"""
 +        if test_mode:
 +            self.test_parse_function()
 +            return
 +
         self.running = True         self.running = True
-        +
         # Запускаем потоки         # Запускаем потоки
         threads = []         threads = []
-         +
-        # UDP слушатель+
         udp_thread = threading.Thread(target=self.start_udp_listener, daemon=True)         udp_thread = threading.Thread(target=self.start_udp_listener, daemon=True)
         threads.append(udp_thread)         threads.append(udp_thread)
-         +
-        # Обработчик потоков+
         processor_thread = threading.Thread(target=self.process_flows, daemon=True)         processor_thread = threading.Thread(target=self.process_flows, daemon=True)
         threads.append(processor_thread)         threads.append(processor_thread)
-         +
-        # Агрегатор статистики +
-        aggregator_thread = threading.Thread(target=self.aggregate_statistics, daemon=True) +
-        threads.append(aggregator_thread) +
-         +
-        # Вывод статистики+
         stats_thread = threading.Thread(target=self.print_statistics, daemon=True)         stats_thread = threading.Thread(target=self.print_statistics, daemon=True)
         threads.append(stats_thread)         threads.append(stats_thread)
-         +
-        # Запускаем все потоки+
         for thread in threads:         for thread in threads:
             thread.start()             thread.start()
-        +
         logger.info("Коллектор NetFlow запущен")         logger.info("Коллектор NetFlow запущен")
-         +
-        # Основной цикл+
         try:         try:
             while self.running:             while self.running:
Строка 592: Строка 742:
         except KeyboardInterrupt:         except KeyboardInterrupt:
             logger.info("Получен сигнал завершения")             logger.info("Получен сигнал завершения")
 +        finally:
             self.stop()             self.stop()
-    +
     def stop(self):     def stop(self):
         """Остановка коллектора"""         """Остановка коллектора"""
         logger.info("Остановка коллектора...")         logger.info("Остановка коллектора...")
         self.running = False         self.running = False
-        +
         # Ожидаем завершения очереди         # Ожидаем завершения очереди
         self.flow_queue.join()         self.flow_queue.join()
-         + 
-        # Закрываем соединение с БД+        # Закрываем соединение
         if hasattr(self, 'cursor'):         if hasattr(self, 'cursor'):
             self.cursor.close()             self.cursor.close()
         if hasattr(self, 'conn'):         if hasattr(self, 'conn'):
             self.conn.close()             self.conn.close()
-        +
         logger.info("Коллектор остановлен")         logger.info("Коллектор остановлен")
  
-# Конфигурация из переменных окружения 
 def load_config(): def load_config():
 +    """Загрузка конфигурации"""
     return {     return {
         'host': os.getenv('DB_HOST', 'localhost'),         'host': os.getenv('DB_HOST', 'localhost'),
Строка 621: Строка 772:
  
 if __name__ == "__main__": if __name__ == "__main__":
-    # Загрузка конфигурации+    import argparse 
 + 
 +    parser = argparse.ArgumentParser(description="Коллектор NetFlow"
 +    parser.add_argument('--test', '-t', action='store_true', help='Тестовый режим (парсинг одного пакета)'
 +    parser.add_argument('--port', '-p', type=int, default=None, help='Порт NetFlow'
 +    args = parser.parse_args() 
     config = load_config()     config = load_config()
-     + 
-    # Создание и запуск коллектора+    if args.port: 
 +        os.environ['NETFLOW_PORT'] = str(args.port) 
     collector = NetFlowCollector(config)     collector = NetFlowCollector(config)
-    +
     try:     try:
-        collector.start()+        collector.start(test_mode=args.test)
     except Exception as e:     except Exception as e:
         logger.error(f"Фатальная ошибка: {e}")         logger.error(f"Фатальная ошибка: {e}")
 +        import traceback
 +        logger.error(traceback.format_exc())
         collector.stop()         collector.stop()
 </code> </code>
Строка 793: Строка 954:
 ====== настройка клиента====== ====== настройка клиента======
 <code bash> <code bash>
-interface GigabitEthernet1 ip flow both +show interface 
-ip flow-export destination 192.168.2.14 2055+interface Bridge0 ip flow both 
 +ip flow-export destination 192.168.2.16 2055
 </code> </code>
 <code bash> <code bash>
netflow.1765528030.txt.gz · Последнее изменение: augin