Como Construir um Dashboard de Tráfego de Rede em Tempo Real com Python e Streamlit

Deivison Viana
15 Min Read

Já teve a vontade incontrolável de visualizar o tráfego da sua rede em tempo real, como se fosse um hacker de filme hollywoodiano? 🤔 Neste tutorial, vamos embarcar na aventura de construir um dashboard interativo de análise de tráfego de rede com Python e Streamlit. Sim, você leu certo, Streamlit! Essa belezinha open-source é um framework Python que permite desenvolver aplicações web para análise e processamento de dados, sem precisar vender a alma para o diabo. 😈

Ao final deste tutorial, você será capaz de capturar pacotes de rede crus da sua placa de rede (NIC), processá-los e criar visualizações incríveis que se atualizam em tempo real, tipo aqueles painéis futurísticos de filmes sci-fi. Prepare-se para impressionar os amigos! 😎

A análise de tráfego de rede é crucial em empresas onde as redes são a espinha dorsal de praticamente tudo. Basicamente, analisamos os pacotes de rede, monitoramos o tráfego (entradas e saídas) e interpretamos esses pacotes enquanto eles fluem pela rede. É como bisbilhotar a conversa dos dados, mas com um propósito nobre. 🕵️‍♀️

Com essa técnica, podemos identificar padrões de segurança, detectar anomalias e garantir que a rede funcione como um relógio suíço. ⌚ Este projeto que vamos construir é especialmente útil porque permite visualizar e analisar a atividade da rede em tempo real. Imagine só: você poderá solucionar problemas, otimizar o desempenho e analisar a segurança como um verdadeiro profissional! 👨‍💻

Pré-requisitos (ou o que você precisa ter para não se perder no caminho)

  • Python 3.8 ou superior instalado (a cobra precisa estar viva e pronta para o ataque!).
  • Conhecimento básico de redes de computadores (não precisa ser um guru, mas saber o básico ajuda).
  • Familiaridade com a linguagem Python e suas bibliotecas (pandas, matplotlib, etc.).
  • Noções básicas de visualização de dados (gráficos, dashboards, essas coisas bonitas).

Quer saber mais sobre STREAMLIT?

Compre meu Livro Disponível na amazon: https://www.amazon.com.br/dp/B0CW1BH7NL

Como Configurar o Projeto (preparando o terreno para a aventura)

Crie a estrutura do projeto e instale as ferramentas necessárias com o Pip:

Bash

mkdir network-dashboard
cd network-dashboard
pip install streamlit pandas scapy plotly

Usaremos o Streamlit para as visualizações do dashboard (a cereja do bolo), Pandas para processar os dados (o mestre da organização), Scapy para capturar e processar os pacotes (o espião da rede) e, finalmente, Plotly para plotar gráficos com os dados coletados (o artista visual). 🎨

Como Construir as Funcionalidades Principais (a mágica acontece aqui)

Colocaremos todo o código em um único arquivo chamado dashboard.py. Primeiro, vamos importar os elementos que usaremos:

Python

import streamlit as st
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from scapy.all import *
from collections import defaultdict
import time
from datetime import datetime
import threading
import warnings
import logging
from typing import Dict, List, Optional
import socket

Agora, vamos configurar o logging para rastrear eventos e executar nossa aplicação em modo debug. Definimos o nível de logging como INFO, o que significa que eventos com nível INFO ou superior serão exibidos. Se você não estiver familiarizado com logging em Python, dê uma olhada na documentação oficial. 🤓

Python

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

Em seguida, vamos construir nosso processador de pacotes. Implementaremos a funcionalidade de processamento dos pacotes capturados nesta classe:

Python

class PacketProcessor:
    """Processa e analisa pacotes de rede"""

    def __init__(self):
        self.protocol_map = {
            1: 'ICMP',
            6: 'TCP',
            17: 'UDP'
        }
        self.packet_data = []
        self.start_time = datetime.now()
        self.packet_count = 0
        self.lock = threading.Lock()

    def get_protocol_name(self, protocol_num: int) -> str:
        """Converte o número do protocolo para o nome"""
        return self.protocol_map.get(protocol_num, f'OTHER({protocol_num})')

    def process_packet(self, packet) -> None:
        """Processa um único pacote e extrai informações relevantes"""
        try:
            if IP in packet:
                with self.lock:
                    packet_info = {
                        'timestamp': datetime.now(),
                        'source': packet[IP].src,
                        'destination': packet[IP].dst,
                        'protocol': self.get_protocol_name(packet[IP].proto),
                        'size': len(packet),
                        'time_relative': (datetime.now() - self.start_time).total_seconds()
                    }

                    # Adiciona informações específicas do TCP
                    if TCP in packet:
                        packet_info.update({
                            'src_port': packet[TCP].sport,
                            'dst_port': packet[TCP].dport,
                            'tcp_flags': packet[TCP].flags
                        })

                    # Adiciona informações específicas do UDP
                    elif UDP in packet:
                        packet_info.update({
                            'src_port': packet[UDP].sport,
                            'dst_port': packet[UDP].dport
                        })

                    self.packet_data.append(packet_info)
                    self.packet_count += 1

                    # Mantém apenas os últimos 10000 pacotes para evitar problemas de memória
                    if len(self.packet_data) > 10000:
                        self.packet_data.pop(0)

        except Exception as e:
            logger.error(f"Erro ao processar pacote: {str(e)}")

    def get_dataframe(self) -> pd.DataFrame:
        """Converte os dados do pacote para um DataFrame do pandas"""
        with self.lock:
            return pd.DataFrame(self.packet_data)

Essa classe é o coração da nossa aplicação e possui funções utilitárias para processar os pacotes. Os pacotes de rede são categorizados em dois tipos no nível de transporte (TCP e UDP) e no protocolo ICMP no nível de rede. Se você não conhece os conceitos de TCP/IP, recomendo dar uma olhada neste artigo no freeCodeCamp News. 😉

Nosso construtor rastreia todos os pacotes categorizados nesses tipos de protocolo TCP/IP. Também anotamos o tempo de captura, os dados capturados e o número de pacotes. Usamos um bloqueio de thread para garantir que apenas um pacote seja processado por vez. Isso pode ser estendido para permitir o processamento paralelo de pacotes. 🤯

A função get_protocol_name nos ajuda a obter o tipo correto de protocolo com base em seus números. A IANA (Internet Assigned Numbers Authority) atribui números padronizados para identificar diferentes protocolos em um pacote de rede. Ao encontrarmos esses números no pacote, saberemos qual tipo de protocolo está sendo usado. Para este projeto, mapearemos apenas TCP, UDP e ICMP (Ping). Se encontrarmos outro tipo de pacote, o classificaremos como OTHER(<protocol_num>). 🤔

A função process_packet lida com a funcionalidade principal que processa os pacotes individuais. Se o pacote contiver uma camada IP, ele registrará os endereços IP de origem e destino, o tipo de protocolo, o tamanho do pacote e o tempo decorrido desde o início da captura. Para pacotes com protocolos de camada de transporte específicos (como TCP e UDP), capturaremos as portas de origem e destino, juntamente com as flags TCP para pacotes TCP. Esses detalhes serão armazenados na lista packet_data. Também manteremos o controle do packet_count à medida que os pacotes são processados. 🤓

A função get_dataframe nos ajuda a converter a lista packet_data em um DataFrame do Pandas, que será usado para nossa visualização. 📊

Como Criar as Visualizações no Streamlit (a hora do show)

Agora é hora de construir nosso Dashboard interativo no Streamlit. Definiremos uma função chamada create_visualization no script dashboard.py (fora da classe de processamento de pacotes):

Python

def create_visualizations(df: pd.DataFrame):
    """Cria todas as visualizações do dashboard"""
    if len(df) > 0:
        # Distribuição de protocolos
        protocol_counts = df['protocol'].value_counts()
        fig_protocol = px.pie(
            values=protocol_counts.values,
            names=protocol_counts.index,
            title="Distribuição de Protocolos"
        )
        st.plotly_chart(fig_protocol, use_container_width=True)

        # Linha do tempo dos pacotes
        df['timestamp'] = pd.to_datetime(df['timestamp'])
        df_grouped = df.groupby(df['timestamp'].dt.floor('S')).size()
        fig_timeline = px.line(
            x=df_grouped.index,
            y=df_grouped.values,
            title="Pacotes por Segundo"
        )
        st.plotly_chart(fig_timeline, use_container_width=True)

        # Principais IPs de origem
        top_sources = df['source'].value_counts().head(10)
        fig_sources = px.bar(
            x=top_sources.index,
            y=top_sources.values,
            title="Principais Endereços IP de Origem"
        )
        st.plotly_chart(fig_sources, use_container_width=True)

Essa função recebe o DataFrame como entrada e plota três gráficos:

  • Gráfico de Distribuição de Protocolos: Mostra a proporção de diferentes protocolos (TCP, UDP, ICMP) no tráfego capturado.
  • Gráfico de Linha do Tempo dos Pacotes: Mostra o número de pacotes processados por segundo ao longo do tempo.
  • Gráfico dos Principais Endereços IP de Origem: Destaca os 10 principais endereços IP que enviaram mais pacotes.

O gráfico de distribuição de protocolos é um gráfico de pizza com as contagens de protocolo para os três tipos (junto com OTHER). Usamos as ferramentas Streamlit e Plotly para plotar esses gráficos. Como também anotamos o timestamp desde o início da captura, usaremos esses dados para plotar a tendência dos pacotes capturados ao longo do tempo. 📈

Para o segundo gráfico, faremos uma operação groupby nos dados e obteremos o número de pacotes capturados em cada segundo (S significa segundos) e, finalmente, plotaremos o gráfico. 📊

Para o terceiro gráfico, contaremos os IPs de origem distintos observados e plotaremos um gráfico das contagens de IP para mostrar os 10 principais IPs. 🗺️

Como Capturar os Pacotes de Rede (abrindo o portal para o mundo dos dados)

Agora, vamos construir a funcionalidade para capturar os dados dos pacotes de rede:

Python

def start_packet_capture():
    """Inicia a captura de pacotes em uma thread separada"""
    processor = PacketProcessor()

    def capture_packets():
        sniff(prn=processor.process_packet, store=False)

    capture_thread = threading.Thread(target=capture_packets, daemon=True)
    capture_thread.start()

    return processor

Esta função instancia a classe PacketProcessor e usa a função sniff do módulo scapy para iniciar a captura dos pacotes. Usamos threads para permitir a captura de pacotes independentemente do fluxo principal do programa. Isso garante que a captura não bloqueie outras operações, como a atualização do dashboard em tempo real. Também retornamos a instância PacketProcessor criada para que possa ser usada no programa principal. 🔄

Juntando Tudo (a grande finalização)

Agora vamos juntar todas as peças com a função main, que atuará como a função principal do nosso programa:

Python

def main():
    """Função principal para executar o dashboard"""
    st.set_page_config(page_title="Análise de Tráfego de Rede", layout="wide")
    st.title("Análise de Tráfego de Rede em Tempo Real")

    # Inicializa o processador de pacotes no estado da sessão
    if 'processor' not in st.session_state:
        st.session_state.processor = start_packet_capture()
        st.session_state.start_time = time.time()

    # Cria o layout do dashboard
    col1, col2 = st.columns(2)

    # Obtém os dados atuais
    df = st.session_state.processor.get_dataframe()

    # Exibe as métricas
    with col1:
        st.metric("Total de Pacotes", len(df))
    with col2:
        duration = time.time() - st.session_state.start_time
        st.metric("Duração da Captura", f"{duration:.2f}s")

    # Exibe as visualizações
    create_visualizations(df)

    # Exibe os pacotes recentes
    st.subheader("Pacotes Recentes")
    if len(df) > 0:
        st.dataframe(
            df.tail(10)[['timestamp', 'source', 'destination', 'protocol', 'size']],
            use_container_width=True
        )

    # Adiciona botão de atualização
    if st.button('Atualizar Dados'):
        st.rerun()

    # Atualização automática
    time.sleep(2)
    st.rerun()

Essa função instancia o dashboard do Streamlit e integra todos os nossos componentes. Primeiro, definimos o título da página do dashboard e inicializamos o PacketProcessor. Usamos o estado da sessão no Streamlit para garantir que apenas uma instância de captura de pacotes seja criada e que seu estado seja mantido. 🖥️

Obtemos o DataFrame do estado da sessão sempre que os dados são processados e começamos a exibir as métricas e as visualizações. Também exibimos os pacotes capturados recentemente, juntamente com informações como timestamp, IPs de origem e destino, protocolo e tamanho do pacote. Adicionamos a capacidade de o usuário atualizar manualmente os dados do dashboard, enquanto também o atualizamos automaticamente a cada dois segundos. 🔄

Finalmente, execute o programa com o seguinte comando:

Bash

sudo streamlit run dashboard.py

Observação: Você precisará executar o programa com sudo porque a captura de pacotes requer privilégios administrativos. Se você estiver no Windows, abra o terminal como Administrador e execute o programa sem o prefixo sudo. ⚠️

Dê um tempo para o programa começar a capturar pacotes. Se tudo correr bem, você verá algo como isto:

Essas são todas as visualizações que implementamos no nosso dashboard do Streamlit. 🎉

Melhorias Futuras (para os mais aventureiros)

Aqui estão algumas ideias para aprimorar o dashboard:

  • Adicionar recursos de machine learning para detecção de anomalias (que tal um sistema que prevê ataques hackers?). 🤖
  • Implementar mapeamento geográfico de IPs (para visualizar de onde vêm os pacotes, tipo um mapa de calor global). 🌎
  • Criar alertas personalizados com base em padrões de tráfego (para ser avisado quando algo suspeito estiver acontecendo). 🚨
  • Adicionar opções de análise de payload de pacotes (para mergulhar ainda mais fundo nos dados). 🔬

Conclusão (ufa, chegamos ao fim!)

Parabéns! Você construiu com sucesso um dashboard de análise de tráfego de rede em tempo real com Python e Streamlit. Este programa fornecerá insights valiosos sobre o comportamento da rede e pode ser estendido para vários casos de uso, desde monitoramento de segurança até otimização de rede. 🚀

Espero que você tenha aprendido o básico sobre análise de tráfego de rede e um pouco de programação em Python. Obrigado por ler! 😊

Quer saber mais sobre STREAMLIT?

Compre meu Livro Disponível na Amazon: https://www.amazon.com.br/dp/B0CW1BH7NL



Share This Article
Follow:
Com mais de 15 anos de experiência na área de Tecnologia da Informação, tenho construído uma carreira distintiva como Arquiteto de Software e Gestor de T.I., posicionando-me na vanguarda da inovação tecnológica. Especialista em Big Data, Analytics e Business Intelligence aplicados aos negócios, minha trajetória é pautada na excelência e na transformação digital de organizações através do poder dos dados. Detentor de uma profunda expertise técnica e estratégica, sou certificado em Análise de Dados pela Google, Power BI, Python, além de ter especializações nas principais plataformas de cloud computing: AWS, IBM Cloud, Azure e Google Cloud. Estas qualificações me habilitam a desenvolver soluções de ponta que potencializam a análise de dados, melhoram a tomada de decisão e otimizam a performance empresarial.
Leave a Comment

Deixe um comentário

O seu endereço de e-mail não será publicado. Campos obrigatórios são marcados com *