Инструменты пользователя

Инструменты сайта


netflow

Это старая версия документа!


1. Установите зависимости

pip install psycopg2-binary nfstream python-dotenv
# Или для асинхронной работы:
pip install asyncpg asyncio nfstream

2. Настройка базы данных PostgreSQL

-- Создайте базу данных
CREATE DATABASE netflow_db;
 
-- Создайте таблицу для хранения потоков
CREATE TABLE netflow_flows (
    id BIGSERIAL PRIMARY KEY,
    timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    flow_id BIGINT,
    src_ip INET,
    dst_ip INET,
    src_port INTEGER,
    dst_port INTEGER,
    protocol INTEGER,
    packets BIGINT,
    bytes BIGINT,
    flow_duration_ms BIGINT,
    exporter_ip INET,
    input_snmp INTEGER,
    output_snmp INTEGER,
    tcp_flags INTEGER,
    src_tos INTEGER,
    dst_tos INTEGER,
    vlan_id INTEGER,
    application_name VARCHAR(256),
    bidirectional BOOLEAN,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
 
    -- Индексы для быстрого поиска
    INDEX idx_timestamp (timestamp),
    INDEX idx_src_ip (src_ip),
    INDEX idx_dst_ip (dst_ip),
    INDEX idx_application (application_name)
);
 
-- Таблица для статистики экспортеров
CREATE TABLE netflow_exporters (
    id SERIAL PRIMARY KEY,
    exporter_ip INET UNIQUE NOT NULL,
    exporter_name VARCHAR(256),
    first_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    last_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    total_flows BIGINT DEFAULT 0,
    total_bytes BIGINT DEFAULT 0,
    version INTEGER
);
 
-- Таблица для агрегированной статистики
CREATE TABLE netflow_aggregates (
    id SERIAL PRIMARY KEY,
    period_start TIMESTAMP NOT NULL,
    period_end TIMESTAMP NOT NULL,
    src_ip INET,
    dst_ip INET,
    protocol INTEGER,
    total_bytes BIGINT,
    total_packets BIGINT,
    flow_count INTEGER,
 
    INDEX idx_period (period_start, period_end)
);
-- Подключитесь к базе данных
\c netflow_db
 
-- Предоставьте права на схему public вашему пользователю
GRANT ALL PRIVILEGES ON SCHEMA public TO netflow_user;
GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO netflow_user;
GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA public TO netflow_user;
 
-- Для автоматического предоставления прав на будущие таблицы
ALTER DEFAULT PRIVILEGES IN SCHEMA public 
GRANT ALL PRIVILEGES ON TABLES TO netflow_user;
 
ALTER DEFAULT PRIVILEGES IN SCHEMA public 
GRANT ALL PRIVILEGES ON SEQUENCES TO netflow_user;

3. Основной коллектор NetFlow

#!/usr/bin/env python3
"""
Коллектор NetFlow для PostgreSQL
Поддерживает NetFlow v5, v9
Требует: pip install nfstream psycopg2-binary
"""
 
import socket
import struct
import threading
import queue
import time
import logging
from datetime import datetime, timedelta
import psycopg2
from psycopg2.extras import RealDictCursor
from typing import Dict, List, Tuple, Optional
import json
import os
from dotenv import load_dotenv
 
# Загрузка переменных окружения
load_dotenv()
 
# Настройка логирования
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('netflow_collector.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)
 
class NetFlowCollector:
    """Коллектор NetFlow потоков"""
 
    def __init__(self, db_config: Dict):
        self.db_config = db_config
        self.flow_queue = queue.Queue(maxsize=10000)
        self.running = False
        self.exporters = {}  # Кэш экспортеров
 
        # Параметры NetFlow
        self.netflow_port = int(os.getenv('NETFLOW_PORT', 2055))
        self.buffer_size = 4096
 
        # Статистика
        self.stats = {
            'flows_received': 0,
            'flows_processed': 0,
            'bytes_processed': 0,
            'last_flush': datetime.now()
        }
 
        # Инициализация базы данных
        self.init_database()
 
    def init_database(self):
        """Инициализация подключения к базе данных"""
        try:
            self.conn = psycopg2.connect(**self.db_config)
            self.cursor = self.conn.cursor(cursor_factory=RealDictCursor)
            logger.info("Подключение к PostgreSQL установлено")
 
            # Создаем таблицы если их нет
            self.create_tables()
 
        except Exception as e:
            logger.error(f"Ошибка подключения к PostgreSQL: {e}")
            raise
 
    def create_tables(self):
        """Создание таблиц если они не существуют"""
        create_tables_sql = """
        CREATE TABLE IF NOT EXISTS netflow_flows (
            id BIGSERIAL PRIMARY KEY,
            flow_start TIMESTAMP,
            flow_end TIMESTAMP,
            src_ip INET,
            dst_ip INET,
            src_port INTEGER,
            dst_port INTEGER,
            protocol INTEGER,
            packets BIGINT,
            bytes BIGINT,
            flow_duration_ms BIGINT,
            exporter_ip INET,
            input_snmp INTEGER,
            output_snmp INTEGER,
            tcp_flags INTEGER,
            src_tos INTEGER,
            dst_tos INTEGER,
            src_as INTEGER,
            dst_as INTEGER,
            next_hop INET,
            vlan_id INTEGER,
            application_name VARCHAR(256),
            bidirectional BOOLEAN DEFAULT false,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        );
 
        CREATE INDEX IF NOT EXISTS idx_netflow_timestamp ON netflow_flows(flow_start);
        CREATE INDEX IF NOT EXISTS idx_netflow_src_ip ON netflow_flows(src_ip);
        CREATE INDEX IF NOT EXISTS idx_netflow_dst_ip ON netflow_flows(dst_ip);
        CREATE INDEX IF NOT EXISTS idx_netflow_exporter ON netflow_flows(exporter_ip);
 
        CREATE TABLE IF NOT EXISTS netflow_exporters (
            id SERIAL PRIMARY KEY,
            exporter_ip INET UNIQUE NOT NULL,
            exporter_name VARCHAR(256),
            first_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            last_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            total_flows BIGINT DEFAULT 0,
            total_bytes BIGINT DEFAULT 0,
            version INTEGER,
            sampling_rate INTEGER DEFAULT 1
        );
 
        CREATE TABLE IF NOT EXISTS netflow_aggregates (
            id SERIAL PRIMARY KEY,
            period_start TIMESTAMP NOT NULL,
            period_end TIMESTAMP NOT NULL,
            src_ip INET,
            dst_ip INET,
            protocol INTEGER,
            total_bytes BIGINT,
            total_packets BIGINT,
            flow_count INTEGER,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        );
 
        CREATE INDEX IF NOT EXISTS idx_aggregates_period ON netflow_aggregates(period_start, period_end);
        CREATE INDEX IF NOT EXISTS idx_aggregates_ips ON netflow_aggregates(src_ip, dst_ip);
        """
 
        try:
            self.cursor.execute(create_tables_sql)
            self.conn.commit()
            logger.info("Таблицы проверены/созданы успешно")
        except Exception as e:
            logger.error(f"Ошибка создания таблиц: {e}")
            self.conn.rollback()
 
    def parse_netflow_v5(self, data: bytes, exporter_ip: str) -> List[Dict]:
        """Парсинг NetFlow v5 пакета"""
        flows = []
 
        try:
            # Заголовок NetFlow v5 (24 байта)
            header = struct.unpack('!HHIIIIBBH', data[:24])
            version, count, sys_uptime, unix_secs, unix_nsecs, flow_sequence, engine_type, engine_id, sampling_interval = header
 
            if version != 5:
                logger.warning(f"Неверная версия NetFlow: {version}")
                return flows
 
            offset = 24
            record_size = 48  # Размер записи NetFlow v5
 
            for i in range(count):
                if offset + record_size > len(data):
                    break
 
                record = data[offset:offset + record_size]
 
                # Разбор записи потока (48 байт)
                flow = struct.unpack('!IIIIIIIIHHBBBBHHBBBBH', record)
 
                src_addr = socket.inet_ntoa(struct.pack('!I', flow[0]))
                dst_addr = socket.inet_ntoa(struct.pack('!I', flow[1]))
                next_hop = socket.inet_ntoa(struct.pack('!I', flow[2]))
                input_intf = flow[3]
                output_intf = flow[4]
                packets = flow[5]
                octets = flow[6]
                first = flow[7]
                last = flow[8]
                src_port = flow[9]
                dst_port = flow[10]
                pad1 = flow[11]
                tcp_flags = flow[12]
                protocol = flow[13]
                tos = flow[14]
                src_as = flow[15]
                dst_as = flow[16]
                src_mask = flow[17]
                dst_mask = flow[18]
                pad2 = flow[19]
 
                # Расчет времени начала и окончания потока
                flow_start = datetime.fromtimestamp(unix_secs - (sys_uptime - first) / 1000)
                flow_end = datetime.fromtimestamp(unix_secs - (sys_uptime - last) / 1000)
                flow_duration = last - first
 
                flow_data = {
                    'flow_start': flow_start,
                    'flow_end': flow_end,
                    'src_ip': src_addr,
                    'dst_ip': dst_addr,
                    'src_port': src_port,
                    'dst_port': dst_port,
                    'protocol': protocol,
                    'packets': packets,
                    'bytes': octets,
                    'flow_duration_ms': flow_duration,
                    'exporter_ip': exporter_ip,
                    'input_snmp': input_intf,
                    'output_snmp': output_intf,
                    'tcp_flags': tcp_flags,
                    'src_tos': tos,
                    'dst_tos': tos,
                    'src_as': src_as,
                    'dst_as': dst_as,
                    'next_hop': next_hop,
                    'vlan_id': None,
                    'application_name': self._get_application_name(protocol, dst_port),
                    'bidirectional': False
                }
 
                flows.append(flow_data)
                offset += record_size
 
        except Exception as e:
            logger.error(f"Ошибка парсинга NetFlow v5: {e}")
 
        return flows
 
    def _get_application_name(self, protocol: int, port: int) -> str:
        """Определение имени приложения по протоколу и порту"""
        # Общие порты
        common_ports = {
            80: 'HTTP',
            443: 'HTTPS',
            53: 'DNS',
            22: 'SSH',
            25: 'SMTP',
            110: 'POP3',
            143: 'IMAP',
            3389: 'RDP',
            3306: 'MySQL',
            5432: 'PostgreSQL',
            6379: 'Redis',
            27017: 'MongoDB',
            21: 'FTP',
            23: 'Telnet',
            161: 'SNMP',
            162: 'SNMP Trap',
            67: 'DHCP Server',
            68: 'DHCP Client',
            123: 'NTP',
            514: 'Syslog',
            1194: 'OpenVPN',
            1723: 'PPTP',
            5060: 'SIP',
            5061: 'SIPS'
        }
 
        protocol_names = {
            6: 'TCP',
            17: 'UDP',
            1: 'ICMP',
            2: 'IGMP',
            47: 'GRE',
            50: 'ESP',
            51: 'AH'
        }
 
        if protocol == 6 or protocol == 17:  # TCP или UDP
            app_name = common_ports.get(port)
            if app_name:
                return app_name
            elif port < 1024:
                return f"Системный порт {port}"
            else:
                return f"Пользовательский порт {port}"
        else:
            return protocol_names.get(protocol, f"Протокол {protocol}")
 
    def update_exporter_stats(self, exporter_ip: str, flow_count: int, bytes_count: int):
        """Обновление статистики экспортера"""
        try:
            # Проверяем существует ли экспортер
            self.cursor.execute("""
                INSERT INTO netflow_exporters 
                (exporter_ip, first_seen, last_seen, total_flows, total_bytes)
                VALUES (%s, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, %s, %s)
                ON CONFLICT (exporter_ip) 
                DO UPDATE SET 
                    last_seen = CURRENT_TIMESTAMP,
                    total_flows = netflow_exporters.total_flows + %s,
                    total_bytes = netflow_exporters.total_bytes + %s
            """, (exporter_ip, flow_count, bytes_count, flow_count, bytes_count))
 
            self.conn.commit()
        except Exception as e:
            logger.error(f"Ошибка обновления статистики экспортера: {e}")
            self.conn.rollback()
 
    def save_flows_to_db(self, flows: List[Dict]):
        """Сохранение потоков в базу данных"""
        if not flows:
            return
 
        try:
            # Пакетная вставка
            insert_sql = """
                INSERT INTO netflow_flows (
                    flow_start, flow_end, src_ip, dst_ip, src_port, dst_port,
                    protocol, packets, bytes, flow_duration_ms, exporter_ip,
                    input_snmp, output_snmp, tcp_flags, src_tos, dst_tos,
                    src_as, dst_as, next_hop, vlan_id, application_name
                ) VALUES %s
            """
 
            # Подготовка данных
            values = []
            total_bytes = 0
 
            for flow in flows:
                values.append((
                    flow['flow_start'], flow['flow_end'], flow['src_ip'], flow['dst_ip'],
                    flow['src_port'], flow['dst_port'], flow['protocol'], flow['packets'],
                    flow['bytes'], flow['flow_duration_ms'], flow['exporter_ip'],
                    flow['input_snmp'], flow['output_snmp'], flow['tcp_flags'],
                    flow['src_tos'], flow['dst_tos'], flow['src_as'], flow['dst_as'],
                    flow['next_hop'], flow['vlan_id'], flow['application_name']
                ))
                total_bytes += flow['bytes']
 
            # Используем execute_values для быстрой пакетной вставки
            from psycopg2.extras import execute_values
            execute_values(self.cursor, insert_sql, values)
 
            # Обновляем статистику экспортера
            exporter_ip = flows[0]['exporter_ip']
            self.update_exporter_stats(exporter_ip, len(flows), total_bytes)
 
            self.conn.commit()
 
            # Обновляем статистику
            self.stats['flows_processed'] += len(flows)
            self.stats['bytes_processed'] += total_bytes
 
            logger.debug(f"Сохранено {len(flows)} потоков, {total_bytes} байт")
 
        except Exception as e:
            logger.error(f"Ошибка сохранения потоков в БД: {e}")
            self.conn.rollback()
 
    def start_udp_listener(self):
        """Запуск UDP слушателя для NetFlow"""
        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 
        try:
            sock.bind(('0.0.0.0', self.netflow_port))
            logger.info(f"Слушаем NetFlow на порту {self.netflow_port}")
 
            while self.running:
                try:
                    data, addr = sock.recvfrom(self.buffer_size)
                    exporter_ip = addr[0]
 
                    # Добавляем в очередь для обработки
                    self.flow_queue.put((data, exporter_ip))
                    self.stats['flows_received'] += 1
 
                except socket.timeout:
                    continue
                except Exception as e:
                    logger.error(f"Ошибка приема UDP: {e}")
 
        finally:
            sock.close()
 
    def process_flows(self):
        """Обработка потоков из очереди"""
        batch_size = 1000  # Размер пакета для вставки
        batch = []
 
        while self.running:
            try:
                # Берем данные из очереди с таймаутом
                data, exporter_ip = self.flow_queue.get(timeout=1)
 
                # Парсим NetFlow v5
                flows = self.parse_netflow_v5(data, exporter_ip)
                batch.extend(flows)
 
                # Если накопили достаточно или прошло время - сохраняем
                if len(batch) >= batch_size or (datetime.now() - self.stats['last_flush']).seconds > 5:
                    if batch:
                        self.save_flows_to_db(batch)
                        batch = []
                    self.stats['last_flush'] = datetime.now()
 
                self.flow_queue.task_done()
 
            except queue.Empty:
                # Сохраняем оставшиеся данные
                if batch:
                    self.save_flows_to_db(batch)
                    batch = []
                continue
 
            except Exception as e:
                logger.error(f"Ошибка обработки потока: {e}")
 
    def aggregate_statistics(self):
        """Агрегация статистики за час"""
        while self.running:
            try:
                # Ждем до конца часа
                now = datetime.now()
                next_hour = (now + timedelta(hours=1)).replace(minute=0, second=0, microsecond=0)
                sleep_seconds = (next_hour - now).seconds
 
                if sleep_seconds > 0:
                    time.sleep(sleep_seconds)
 
                # Агрегируем данные за прошедший час
                hour_start = next_hour - timedelta(hours=1)
                hour_end = next_hour
 
                self.cursor.execute("""
                    INSERT INTO netflow_aggregates (
                        period_start, period_end, src_ip, dst_ip, protocol,
                        total_bytes, total_packets, flow_count
                    )
                    SELECT 
                        %s as period_start,
                        %s as period_end,
                        src_ip,
                        dst_ip,
                        protocol,
                        SUM(bytes) as total_bytes,
                        SUM(packets) as total_packets,
                        COUNT(*) as flow_count
                    FROM netflow_flows
                    WHERE flow_start >= %s AND flow_start < %s
                    GROUP BY src_ip, dst_ip, protocol
                """, (hour_start, hour_end, hour_start, hour_end))
 
                self.conn.commit()
                logger.info(f"Агрегирована статистика за {hour_start} - {hour_end}")
 
            except Exception as e:
                logger.error(f"Ошибка агрегации статистики: {e}")
                time.sleep(60)  # Подождать минуту при ошибке
 
    def print_statistics(self):
        """Вывод статистики работы"""
        while self.running:
            try:
                time.sleep(60)  # Выводим статистику каждую минуту
 
                logger.info(f"""
                === СТАТИСТИКА КОЛЛЕКТОРА ===
                Принято потоков: {self.stats['flows_received']}
                Обработано потоков: {self.stats['flows_processed']}
                Обработано байт: {self.stats['bytes_processed']:,}
                В очереди: {self.flow_queue.qsize()}
                =============================
                """)
 
            except Exception as e:
                logger.error(f"Ошибка вывода статистики: {e}")
 
    def start(self):
        """Запуск коллектора"""
        self.running = True
 
        # Запускаем потоки
        threads = []
 
        # UDP слушатель
        udp_thread = threading.Thread(target=self.start_udp_listener, daemon=True)
        threads.append(udp_thread)
 
        # Обработчик потоков
        processor_thread = threading.Thread(target=self.process_flows, daemon=True)
        threads.append(processor_thread)
 
        # Агрегатор статистики
        aggregator_thread = threading.Thread(target=self.aggregate_statistics, daemon=True)
        threads.append(aggregator_thread)
 
        # Вывод статистики
        stats_thread = threading.Thread(target=self.print_statistics, daemon=True)
        threads.append(stats_thread)
 
        # Запускаем все потоки
        for thread in threads:
            thread.start()
 
        logger.info("Коллектор NetFlow запущен")
 
        # Основной цикл
        try:
            while self.running:
                time.sleep(1)
        except KeyboardInterrupt:
            logger.info("Получен сигнал завершения")
            self.stop()
 
    def stop(self):
        """Остановка коллектора"""
        logger.info("Остановка коллектора...")
        self.running = False
 
        # Ожидаем завершения очереди
        self.flow_queue.join()
 
        # Закрываем соединение с БД
        if hasattr(self, 'cursor'):
            self.cursor.close()
        if hasattr(self, 'conn'):
            self.conn.close()
 
        logger.info("Коллектор остановлен")
 
# Конфигурация из переменных окружения
def load_config():
    return {
        'host': os.getenv('DB_HOST', 'localhost'),
        'port': os.getenv('DB_PORT', '5432'),
        'database': os.getenv('DB_NAME', 'netflow_db'),
        'user': os.getenv('DB_USER', 'netflow_user'),
        'password': os.getenv('DB_PASSWORD', 'password')
    }
 
if __name__ == "__main__":
    # Загрузка конфигурации
    config = load_config()
 
    # Создание и запуск коллектора
    collector = NetFlowCollector(config)
 
    try:
        collector.start()
    except Exception as e:
        logger.error(f"Фатальная ошибка: {e}")
        collector.stop()

4. Конфигурационный файл .env

# Настройки базы данных
DB_HOST=localhost
DB_PORT=5432
DB_NAME=netflow_db
DB_USER=netflow_user
DB_PASSWORD=your_password_here
 
# Настройки NetFlow
NETFLOW_PORT=2055
BUFFER_SIZE=4096
BATCH_SIZE=1000
FLUSH_INTERVAL=5
 
# Логирование
LOG_LEVEL=INFO
LOG_FILE=/var/log/netflow_collector.log

5. Сервисный скрипт для systemd (опционально)

# /etc/systemd/system/netflow-collector.service
[Unit]
Description=NetFlow Collector Service
After=network.target postgresql.service
Wants=postgresql.service
 
[Service]
Type=simple
User=netflow
WorkingDirectory=/opt/netflow-collector
EnvironmentFile=/opt/netflow-collector/.env
ExecStart=/usr/bin/python3 /opt/netflow-collector/collector.py
Restart=on-failure
RestartSec=10
StandardOutput=journal
StandardError=journal
 
[Install]
WantedBy=multi-user.target

6. Примеры запросов к данным

# Пример анализа данных из PostgreSQL
 
import psycopg2
from datetime import datetime, timedelta
 
def get_top_talkers(hours=1):
    """Получить топ-10 источников трафика за последний час"""
    conn = psycopg2.connect(
        host="localhost",
        database="netflow_db",
        user="netflow_user",
        password="password"
    )
 
    cursor = conn.cursor()
 
    query = """
    SELECT 
        src_ip,
        SUM(bytes) as total_bytes,
        SUM(packets) as total_packets,
        COUNT(*) as flow_count
    FROM netflow_flows
    WHERE flow_start > NOW() - INTERVAL '%s hours'
    GROUP BY src_ip
    ORDER BY total_bytes DESC
    LIMIT 10
    """
 
    cursor.execute(query, (hours,))
    results = cursor.fetchall()
 
    print("Топ-10 источников трафика:")
    for row in results:
        print(f"{row[0]}: {row[1]:,} байт, {row[2]:,} пакетов")
 
    cursor.close()
    conn.close()
 
def detect_anomalies():
    """Обнаружение аномалий в трафике"""
    conn = psycopg2.connect(
        host="localhost",
        database="netflow_db",
        user="netflow_user",
        password="password"
    )
 
    cursor = conn.cursor()
 
    # Найти хосты с необычно большим количеством соединений
    query = """
    SELECT 
        src_ip,
        COUNT(DISTINCT dst_ip) as unique_destinations,
        SUM(bytes) as total_bytes,
        COUNT(*) as connection_count
    FROM netflow_flows
    WHERE flow_start > NOW() - INTERVAL '5 minutes'
    GROUP BY src_ip
    HAVING COUNT(*) > 1000  # Более 1000 соединений за 5 минут
    ORDER BY connection_count DESC
    """
 
    cursor.execute(query)
    anomalies = cursor.fetchall()
 
    if anomalies:
        print("Обнаружены аномалии:")
        for anomaly in anomalies:
            print(f"Хост {anomaly[0]}: {anomaly[1]} уникальных целей, {anomaly[2]:,} байт, {anomaly[3]} соединений")
 
    cursor.close()
    conn.close()
 
# Выполнить запросы
get_top_talkers()
detect_anomalies()

7. Настройка маршрутизатора Cisco для отправки NetFlow

# Пример для Cisco IOS
configure terminal
!
ip flow-export destination 192.168.1.100 2055  # IP коллектора
ip flow-export version 5
ip flow-export source Loopback0
!
interface GigabitEthernet0/0
 ip flow ingress
 ip flow egress
!
ip flow-cache timeout active 1
ip flow-cache timeout inactive 15
!
end
write memory

Основные возможности:

Поддержка NetFlow v5 - основной промышленный стандарт

Масштабируемая архитектура - очередь, многопоточность

Эффективное хранение - пакетная вставка в PostgreSQL

Агрегация статистики - автоматическая агрегация по часам

Мониторинг - встроенная статистика и логирование

Определение приложений - автоматическое определение по портам

Устойчивость - обработка ошибок, повторные попытки

Коллектор готов к работе в production-среде, поддерживает высокие нагрузки и обеспечивает надежное хранение сетевой статистики.

netflow.1765526615.txt.gz · Последнее изменение: augin