How you can Create a Actual-Time Community Site visitors Dashboard with Python and Streamlit

Faheem

Have you ever ever wished to visualise your community site visitors in actual time? On this tutorial, you’ll learn to create an interactive community site visitors evaluation dashboard with Python Streamlit. Streamlit is an open supply Python framework that you should utilize to develop net purposes for information evaluation and information processing.

By the top of this tutorial, you will know how one can get uncooked community packets out of your pc’s NIC (Community Interface Card), course of the info, and create stunning visualizations that replace in actual time. will go

Desk of Contents

Why is community site visitors evaluation essential?

Community site visitors evaluation is a vital requirement in enterprises the place networks kind the spine of just about each software and repair. At its core, we’ve community packet evaluation which entails monitoring the community, capturing all site visitors (inbound and outbound), and deciphering these packets as they journey by the community. You should use this system to establish safety patterns, detect anomalies, and guarantee community safety and efficiency.

The proof-of-concept venture we’ll be engaged on on this tutorial is especially helpful as a result of it helps you visualize and analyze community exercise in real-time. And it’ll help you perceive how one can troubleshoot, optimize efficiency, and carry out safety evaluation in enterprise methods.

Situations

How you can arrange your venture

To get began, construct the venture construction and set up the mandatory instruments with Pip with the next instructions:

mkdir network-dashboard
cd network-dashboard
pip set up streamlit pandas scapy plotly

We are going to use Streamlit For dashboard visualizations, Pandas For information processing, Scapy for community packet capturing and packet processing, and at last Plotly To create charts with the info we gather.

How you can create primary capabilities

We are going to put all of the code in a single file. dashboard.py. First, let’s begin by importing all the weather we’ll be utilizing:

import streamlit as st
import pandas as pd
import plotly.specific as px
import plotly.graph_objects as go
from scapy.all import *
from collections import defaultdict
import time
from datetime import datetime
import threading
import warnings
import logging
from typing import Dict, Listing, Optionally available
import socket

Now let’s configure logging by organising a primary logging configuration. This will likely be used to trace occasions and run our software in debug mode. We have now presently set the logging stage. INFOie occasions alongside the floor INFO or extra will likely be proven. For those who’re not aware of logging in Python, I might suggest trying out this documentation piece that goes in-depth.


logging.basicConfig(
    stage=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

Subsequent, we’ll create our packet processor. We are going to implement our captured packets processing performance on this class.

class PacketProcessor:
    """Course of and analyze community packets"""

    def __init__(self):
        self.protocol_map = {
            1: 'ICMP',
            6: 'TCP',
            17: 'UDP'
        }
        self.packet_data = ()
        self.start_time = datetime.now()
        self.packet_count = 0
        self.lock = threading.Lock()

    def get_protocol_name(self, protocol_num: int) -> str:
        """Convert protocol quantity to call"""
        return self.protocol_map.get(protocol_num, f'OTHER({protocol_num})')

    def process_packet(self, packet) -> None:
        """Course of a single packet and extract related info"""
        attempt:
            if IP in packet:
                with self.lock:
                    packet_info = {
                        'timestamp': datetime.now(),
                        'supply': packet(IP).src,
                        'vacation spot': packet(IP).dst,
                        'protocol': self.get_protocol_name(packet(IP).proto),
                        'dimension': len(packet),
                        'time_relative': (datetime.now() - self.start_time).total_seconds()
                    }

                    
                    if TCP in packet:
                        packet_info.replace({
                            'src_port': packet(TCP).sport,
                            'dst_port': packet(TCP).dport,
                            'tcp_flags': packet(TCP).flags
                        })

                    
                    elif UDP in packet:
                        packet_info.replace({
                            'src_port': packet(UDP).sport,
                            'dst_port': packet(UDP).dport
                        })

                    self.packet_data.append(packet_info)
                    self.packet_count += 1

                    
                    if len(self.packet_data) > 10000:
                        self.packet_data.pop(0)

        besides Exception as e:
            logger.error(f"Error processing packet: {str(e)}")

    def get_dataframe(self) -> pd.DataFrame:
        """Convert packet information to pandas DataFrame"""
        with self.lock:
            return pd.DataFrame(self.packet_data)

This class will kind our core performance and incorporates a number of utility capabilities that will likely be used to course of packets.

Community packets are divided into two on the transport stage (TCP and UDP) and the ICMP protocol on the community stage. If you’re unfamiliar with TCP/IP ideas, I like to recommend trying out this text on freeCodeCamp Information.

Our constructor will search for all packets which might be listed within the TCP/IP protocol kind buckets we have outlined. We can even notice the packet seize time, the info captured, and the variety of packets captured.

We can even leverage threadlock to make sure that just one packet is processed at a time. This scheme may be additional prolonged to allow parallel packet processing.

gave get_protocol_name The helper operate helps us to get the proper protocol kind primarily based on their protocol numbers. To present some background on this, the Web Assigned Numbers Authority (IANA) assigns commonplace numbers to establish totally different protocols in community packets. As and after we see these numbers in a parsed community packet, we are going to know what protocol is getting used within the presently intercepted packet. For the scope of this venture, we are going to solely map to TCP, UDP and ICMP (Ping). If we encounter another kind of packet, we are going to classify it. OTHER().

gave process_packet The operate handles our principal performance that can course of these particular person packets. If the packet has an IP layer, it would notice the supply and vacation spot IP addresses, the protocol kind, the packet dimension, and the time elapsed for the reason that packet was captured.

For packets with particular transport layer protocols (comparable to TCP and UDP), we are going to seize the supply and vacation spot ports with TCP flags for TCP packets. These extracted particulars will likely be saved in reminiscence. packet_data We can even hold monitor of the record. packet_count As and when these packets are processed.

gave get_dataframe Perform helps us change. packet_data Listing a Pandas The information body that can then be used for our visualization.

How you can Create a Streamlit Visualization

Now it is time to create our interactive Streamlit dashboard. We are going to outline a operate known as create_visualization i dashboard.py script (exterior of our PacketProcessing class).

def create_visualizations(df: pd.DataFrame):
    """Create all dashboard visualizations"""
    if len(df) > 0:
        
        protocol_counts = df('protocol').value_counts()
        fig_protocol = px.pie(
            values=protocol_counts.values,
            names=protocol_counts.index,
            title="Protocol Distribution"
        )
        st.plotly_chart(fig_protocol, use_container_width=True)

        
        df('timestamp') = pd.to_datetime(df('timestamp'))
        df_grouped = df.groupby(df('timestamp').dt.flooring('S')).dimension()
        fig_timeline = px.line(
            x=df_grouped.index,
            y=df_grouped.values,
            title="Packets per Second"
        )
        st.plotly_chart(fig_timeline, use_container_width=True)

        
        top_sources = df('supply').value_counts().head(10)
        fig_sources = px.bar(
            x=top_sources.index,
            y=top_sources.values,
            title="High Supply IP Addresses"
        )
        st.plotly_chart(fig_sources, use_container_width=True)

This operate will take a knowledge body as enter and assist us create three charts/graphs:

  1. Protocol Distribution Chart: This chart will present the proportion of various protocols (eg TCP, UDP, ICMP) within the captured packet site visitors.

  2. Packet Timeline Chart: This chart will present the variety of packets processed per second over a time period.

  3. High Supply IP Addresses Chart: This chart will spotlight the highest 10 IP addresses that despatched probably the most packets within the seize site visitors.

A protocol distribution chart is just a pie chart of the three several types of protocol counts (amongst others). We use Streamlit And Plotly Python instruments for plotting these charts. Since we additionally famous the timestamp since packet seize started, we are going to use this information to plot the pattern of captured packets over time.

For the second chart, we’d a groupby Carry out an operation on the info and get the variety of packets captured per second (S stands for seconds) after which lastly we are going to plot the graph.

Lastly, for the third chart, we are going to depend the distinct supply IPs noticed and plot an IP chart displaying the highest 10 IPs.

How you can seize community packets

Now, we create performance to permit community packet information seize.

def start_packet_capture():
    """Begin packet seize in a separate thread"""
    processor = PacketProcessor()

    def capture_packets():
        sniff(prn=processor.process_packet, retailer=False)

    capture_thread = threading.Thread(goal=capture_packets, daemon=True)
    capture_thread.begin()

    return processor

This can be a easy operate that instantiates. PacketProcessor class after which makes use of sniff I occasion scapy Module to start out capturing packets.

We use threading right here to permit us to seize packets independently of the primary program circulation. This ensures that the packet capturing operation doesn’t block different operations comparable to updating the dashboard in actual time. We additionally return the created. PacketProcessor For instance in order that it may be utilized in our principal program.

Conserving every little thing collectively

Now we sew all these items collectively. principal operate that can act as the motive force operate for our program.

def principal():
    """Most important operate to run the dashboard"""
    st.set_page_config(page_title="Community Site visitors Evaluation", format="huge")
    st.title("Actual-time Community Site visitors Evaluation")

    
    if 'processor' not in st.session_state:
        st.session_state.processor = start_packet_capture()
        st.session_state.start_time = time.time()

    
    col1, col2 = st.columns(2)

    
    df = st.session_state.processor.get_dataframe()

    
    with col1:
        st.metric("Whole Packets", len(df))
    with col2:
        period = time.time() - st.session_state.start_time
        st.metric("Seize Length", f"{period:.2f}s")

    
    create_visualizations(df)

    
    st.subheader("Current Packets")
    if len(df) > 0:
        st.dataframe(
            df.tail(10)(('timestamp', 'supply', 'vacation spot', 'protocol', 'dimension')),
            use_container_width=True
        )

    
    if st.button('Refresh Information'):
        st.rerun()

    
    time.sleep(2)
    st.rerun()

This operate can even institute Streamlit Dashboard, and combine all our elements collectively. First we set the title of our web page. Streamlit Dashboard after which begin us. PacketProcessor. We use in session state. Streamlit To make sure that just one occasion of packet capturing is created and its state is maintained.

Now, we are going to dynamically retrieve the info body from the session state each time the info is processed and begin displaying the metrics and visualizations. We can even show not too long ago captured packets with info comparable to timestamp, supply and vacation spot IP, protocol, and packet dimension. We can even add the flexibility for the person to manually refresh information from the dashboard whereas we additionally refresh it routinely each two seconds.

Let’s lastly run this system with the next command:

sudo streamlit run dashboard.py

Be aware that you’ll have to run this system with it. sudo As a result of packet capturing capabilities require administrative privileges. If you’re on Home windows, open your terminal as administrator after which run with out packages. sudo The earlier

Give this system a second to start out capturing packets. If every little thing goes properly, it’s best to see one thing like this:

The Network Traffic Analysis dashboard shows a pie chart with the distribution of protocols: TCP (48.7%), UDP (47.5%), and ICMP (3.8%). Below is a line graph showing packets per second over time with several prominent peaks. The total packets are 6743, and the capture duration is 118.63 seconds.

A dark themed dashboard showing a bar chart of the top source IP addresses and a table of recent packets with details such as timestamp, source, destination, protocol and size.

These are all ideas that we’ve simply carried out in ourselves. Streamlit Dashboard program.

Future development

With that mentioned, listed here are some future enhancement concepts you should utilize to increase the performance of the dashboard:

  1. Add machine studying capabilities for anomaly detection.

  2. Implement geographic IP mapping.

  3. Create customized alerts primarily based on site visitors evaluation patterns

  4. Add packet payload evaluation choices.

The end result

Congratulations! You’ve got now efficiently created a real-time community site visitors evaluation dashboard with Python and Streamlit. This system will present precious perception into community conduct and may be prolonged for a wide range of use instances, from safety monitoring to community optimization.

With that, I hope you’ve realized some fundamentals about Python programming together with community site visitors evaluation. Thanks for studying!

Leave a Comment