kraaakilo

Command Palette

Search for a command to run...

Post setting-up-elk
cybersecurityMARCH 03, 2024

Setting up ELK Stack

cybersecurity

Setting up the ELK Stack

The ELK stack is a set of powerful tools for log management and data analysis. Composed of Elasticsearch for search and indexing, Logstash for log collection, aggregation and processing, and Kibana for data visualization and analysis, the ELK stack offers a complete solution for managing system information.

Installation Commands

In the following, we will use Vagrant to create a virtual machine and install the necessary tools. This approach is used to abstract the infrastructure and work with IaC (Infrastructure as Code).

Vagrantfile Overview

ruby

Vagrant.configure("2") do |config|
  config.vm.box = "ubuntu/jammy64"

  config.vm.network "private_network", ip: "192.168.56.10"
  config.vm.network "public_network", bridge: "wlan0", ip: "192.168.1.70"

  config.vm.synced_folder ".", "/vagrant", disabled: false
  config.vm.synced_folder "./pipelines", "/etc/logstash/conf.d", disabled: false

  config.vm.provider "virtualbox" do |vb|
    vb.gui = false
    vb.memory = "5192"
  end

  config.vm.provision "shell", inline: <<-SHELL

    # Update the system
    apt-get update

    # Add the Elastic GPG key and repository
    wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo gpg --dearmor -o /usr/share/keyrings/elasticsearch-keyring.gpg
    sudo apt-get install -y apt-transport-https
    echo "deb [signed-by=/usr/share/keyrings/elasticsearch-keyring.gpg] https://artifacts.elastic.co/packages/8.x/apt stable main" | sudo tee /etc/apt/sources.list.d/elastic-8.x.list
    sudo apt-get update

    # Install Elasticsearch and enable it
    sudo apt-get install -y elasticsearch
    sudo echo "-Xms1024m" > /etc/elasticsearch/jvm.options
    sudo echo "-Xmx1024m" >> /etc/elasticsearch/jvm.options
    sudo systemctl enable --now elasticsearch
    sudo systemctl restart elasticsearch

    # Install Kibana and enable it
    sudo apt-get install -y kibana
    sudo echo "-Xms512m" > /etc/kibana/jvm.options
    sudo echo "-Xmx512m" >> /etc/kibana/jvm.options
    sudo systemctl enable --now kibana
    sudo systemctl restart kibana
    sed -i 's|#server.host: "localhost"|server.host: "0.0.0.0"|g' /etc/kibana/kibana.yml
    sed -i 's|#server.port: 5601|server.port: 5601|g' /etc/kibana/kibana.yml

    # Install Logstash and enable it
    sudo apt-get install -y logstash
    sudo echo "-Xms1g" > /etc/logstash/jvm.options
    sudo echo "-Xmx1g" >> /etc/logstash/jvm.options
    sudo systemctl enable --now logstash
    sudo systemctl restart logstash

  SHELL
end

This Vagrantfile will help us provision the machine and point to the various configuration folders of the virtual machine from our configurations on our host system.

We initialize the machine with Vagrant and then connect to it via SSH.

setting-up-elk-illustration-

Installation and Startup Activation of Snort

setting-up-elk-illustration-

Now following the installation of Snort, we will proceed with its configuration in the interactive shell:

  • We choose the boot method to start Snort at each startup. This will avoid restarting the service each time.

setting-up-elk-illustration-

  • Then we must specify the network interfaces on which Snort should listen for packets.

    Here we choose the three interfaces available on the machine to listen to all relevant packets.

setting-up-elk-illustration-

  • Now we need to configure the CIDR notation of the address range to specify to Snort the IP addresses covered by its analysis. Here we choose 192.168.0.0/16 (65,534 addresses included).

setting-up-elk-illustration-

  • Now we will enable Snort's promiscuous mode. This allows it to monitor all traffic on the network segment to which it is connected, enabling it to analyze packets and detect any malicious activity or network anomalies.

setting-up-elk-illustration-

  • To complete the Snort configuration we will disable sending emails containing packet capture logs since we want to perform management with Kibana and Elasticsearch.

setting-up-elk-illustration-

Snort is now running on the machine! We can verify this by inspecting its status.

setting-up-elk-illustration-

Installation and Startup Activation of Suricata

setting-up-elk-illustration-

Startup Activation of Suricata

setting-up-elk-illustration-

  • By default, Suricata uses the af-packet traffic method to capture packets.

    Let's modify the configuration to set the physical network interface of our virtual machine and allow Suricata to capture packets.

    We will configure two interfaces:

    • The host machine's internal network: enp0s8
    • The main local network: enp0s9

    setting-up-elk-illustration-

  • We verify that Suricata is also running correctly on the machine after restarting it.

setting-up-elk-illustration-

At this stage, we have snort configured on all interfaces of our machine and suricata configured on interfaces enp0s9 and enp0s8 to capture packets from the internal network to the host machine and the external local network.

Configuration and Startup of Elasticsearch

We have set the minimum and maximum memory of Elasticsearch to 1GB.

setting-up-elk-illustration-

This may cause performance losses because we are limited by memory.

  • Elasticsearch is running well on the machine.

setting-up-elk-illustration-

Kibana Configuration

  • Maximum memory configured to 512m

setting-up-elk-illustration-

  • Restarting the service and checking status

setting-up-elk-illustration-

NOTE: In the Vagrantfile, the configuration replaced the IP address wildcard to allow external access to Kibana; This option was here:

bash

sed -i 's|#server.host: "localhost"|server.host: "0.0.0.0"|g' /etc/kibana/kibana.yml
sed -i 's|#server.port: 5601|server.port: 5601|g' /etc/kibana/kibana.yml

setting-up-elk-illustration-

  • Generating the enrollment token from Elasticsearch
bash

/usr/share/elasticsearch/bin/elasticsearch-create-enrollment-token --scope kibana

setting-up-elk-illustration-

  • Now let's enter this key in the Kibana entry

    When the key is entered, Kibana asks us for a verification code that can be obtained by running

    bash

    /usr/share/kibana/bin/kibana-verification-code
    

setting-up-elk-illustration-

setting-up-elk-illustration-

  • The verification code works and Kibana begins its automatic configuration

setting-up-elk-illustration-

  • Now we need to log in to the Kibana instance. For this we will reset the password of the superuser elastic because we had not specified one during installation.

setting-up-elk-illustration-

To reset the password:

bash

/usr/share/elasticsearch/bin/elasticsearch-reset-password -u elastic

setting-up-elk-illustration-

  • Logging in to Kibana with the credentials

setting-up-elk-illustration-

setting-up-elk-illustration-

  • Once we have finished with Kibana we will now move on to configuring Logstash

    Logstash uses a pipeline configuration to specify inputs, filters and outputs

    In our case we have:

    • Inputs: Logs from Suricata, Snort and a Windows machine
    • Filters: Will allow parsing of different logs into structured data for our Elasticsearch instance
    • Outputs: This will be our Elasticsearch instance and the standard console output to test groks (pattern matching syntax).

Snort Logstash Pipeline

  • Contents of snort-pipeline.conf in /etc/logstash/conf.d/
bash

input {
    file {
        path => "/var/log/snort/snort.alert.fast"
        start_position => "beginning"
        sincedb_path => "/dev/null"
    }
}

filter {
    grok {
        match => { "message" => "%{MONTHNUM}/%{MONTHDAY}-%{TIME} %{DATA} \[%{DATA}\] \[%{DATA:signature}\] \[%{DATA}\] \[Priority: %{INT:priority}\] \{%{WORD:protocol}\} %{IP:source_address}:%{NUMBER:source_port} -> %{IP:destination_address}:%{NUMBER:destination_port}" }
    }

    date {
        match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
        target => "@timestamp"
    }
}

output {
    elasticsearch {
        index => "logstash-%{+YYYY.MM.dd}"
        hosts => ["https://localhost:9200"]
        user => "elastic"
        password => "maybe_secure_password_here ?!"
        ssl => true
        cacert => "/etc/elasticsearch/certs/http_ca.crt"
        ssl_certificate_verification => true
        manage_template => false
    }
    stdout { codec => rubydebug }
}
  1. input: Defines the data source to process, in this case, a Snort log file.
  2. filter: Specifies the data transformation steps before sending them to the output. In this case, it uses the Grok plugin to extract data fields and the date plugin to parse timestamps.
  3. output: Indicates where to send the processed data, here to Elasticsearch for indexing. It also sends a copy of the data to standard output (stdout) for debugging.

Suricata Logstash Pipeline

  • Contents of suricata-pipeline.conf in /etc/logstash/conf.d/
bash

input {
  file {
    path => ["/var/log/suricata/eve.json"]
    sincedb_path => "/var/lib/logstash/sincedb_suricata"
    codec =>   json
    type => "SuricataIDPS"
  }

}

filter {
  if [type] == "SuricataIDPS" {
    date {
      match => [ "timestamp", "ISO8601" ]
    }
    ruby {
      code => "if event['event_type'] == 'fileinfo'; event['fileinfo']['type']=event['fileinfo']['magic'].to_s.split(',')[0]; end;"
    }
  }

  if [src_ip]  {
    geoip {
      source => "src_ip"
      target => "geoip"
      add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
      add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]
    }
    mutate {
      convert => [ "[geoip][coordinates]", "float" ]
    }
    if ![geoip.ip] {
      if [dest_ip]  {
        geoip {
          source => "dest_ip"
          target => "geoip"
          add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
          add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]
        }
        mutate {
          convert => [ "[geoip][coordinates]", "float" ]
        }
      }
    }
  }
}

output {
  elasticsearch {
    index => "logstash-%{+YYYY.MM.dd}"
    hosts => ["https://localhost:9200"]
    user => "elastic"
    password => "maybe_secure_password_here ?!"
    ssl => true
    cacert => "/etc/elasticsearch/certs/http_ca.crt"
    ssl_certificate_verification => true
    manage_template => false
  }
  stdout { codec => rubydebug }
}
  1. JSON Filter (codec => json): This filter is used to interpret JSON data from the Suricata log file (/var/log/suricata/eve.json). This allows Logstash to understand the JSON data structure and process it correctly.
  2. Type Condition (if [type] == "SuricataIDPS"): This condition checks if the log type is "SuricataIDPS". If so, the filters inside this condition will be applied to the corresponding logs.
  3. Date Filter (date): This filter parses and converts log timestamps in ISO8601 format to Elasticsearch-compatible timestamps. This enables efficient indexing and searching of time-based events.
  4. Ruby Filter (ruby): This filter uses Ruby code to perform custom data manipulation. In this case, it checks if the event type is "fileinfo" and if so, extracts the file type from the "magic" key and places it in the "type" key of the "fileinfo" field.
  5. GeoIP Filter (geoip): This filter adds geolocation information to IP addresses present in the logs. It uses the GeoIP database to determine the country, city and geographic coordinates associated with each IP address. The geographic coordinates are then added to the "geoip.coordinates" field. Before this, the filter checks if the source IP address exists (src_ip), and if it doesn't exist, it checks the destination IP address (dest_ip). If either IP address is found, geolocation is performed for that address.
  6. Mutate Filter (mutate): This filter allows modifying the structure of events by adding or modifying fields. In this case, it is used to convert geographic coordinates from strings to decimal numbers, which is necessary for proper indexing in Elasticsearch.

Tests to Verify Log Collection, Storage and Visualization

To perform the tests, we will remove the Elasticsearch plugin from the output and just keep the console display.

Tests with Snort

Let's start Logstash with the Snort configuration

bash

/usr/share/logstash/bin/logstash --path.settings /etc/logstash/ -f /etc/logstash/conf.d/snort-pipeline.conf

setting-up-elk-illustration-

We can see that the data is well structured and corresponds to what we expect thanks to our filter.

Tests with Suricata

Let's start Logstash with the Suricata configuration

bash

/usr/share/logstash/bin/logstash --path.settings /etc/logstash/ -f /etc/logstash/conf.d/suricata-pipeline.conf
  • Example of packets captured by Suricata

setting-up-elk-illustration-

We can see that the data is well structured and corresponds to what we expect thanks to our filter in the Suricata pipeline.

Indexing Data in Elasticsearch

After successfully testing to verify the consistency and collection of logs by Logstash, we can now send them to our Elasticsearch instance.

  • Let's modify the output of each log to quickly find them when creating Kibana dashboards
bash

## For Snort
output {
    elasticsearch {
        index => "snort-%{+YYYY.MM.dd}"
        hosts => ["https://localhost:9200"]
        user => "elastic"
        password => "maybe_secure_password_here ?!"
        ssl => true
        cacert => "/etc/elasticsearch/certs/http_ca.crt"
        ssl_certificate_verification => true
        manage_template => false
    }
    stdout { codec => rubydebug }
}

## For Suricata
output {
    elasticsearch {
        index => "suricata-%{+YYYY.MM.dd}"
        hosts => ["https://localhost:9200"]
        user => "elastic"
        password => "maybe_secure_password_here ?!"
        ssl => true
        cacert => "/etc/elasticsearch/certs/http_ca.crt"
        ssl_certificate_verification => true
        manage_template => false
    }
    stdout { codec => rubydebug }
}

Indexes Created by Logstash in Elasticsearch

At this stage, the Snort and Suricata logs are sent to Elasticsearch and indexed. We can verify the indexes created by Logstash in Elasticsearch.

setting-up-elk-illustration-

Creating Dashboards and Visualizations to Display Log Data in a User-Friendly Way with Kibana

To create dashboards, go to the Analytics tab then Dashboards.

setting-up-elk-illustration-

  • Then we enter a pattern (in our case: snort-*); This will then associate with our dashboard all matching indexes.

setting-up-elk-illustration-

  • Then we can configure the different visualizations as we wish to display the data.

Creating Visualizations for Snort

setting-up-elk-illustration-

For the occasion we will create a table containing some information and graphs for Snort logs.

setting-up-elk-illustration-

Then we added two graphs showing the most used protocols and the most contacted destination addresses on the network.

setting-up-elk-illustration-

Creating Visualizations for Suricata

Now let's use the data collected by Logstash to create visuals in Kibana.

setting-up-elk-illustration-

Here we have a summary of the different communications on the network. We have a proportion of the use of machine interfaces, the use of different TCP/UDP protocols and an overview of USER AGENTS in a table associated with their IP addresses.

In summary, these different graphs allow us to see in real time what is happening on the network containing our machine. This information could serve as a basis for more in-depth analysis if necessary.

Retrieving Logs from a Windows Machine

In this section we will use the Winlogbeat tool from the Elastic suite.

This will provide us with a lightweight agent that monitors Windows logs in real time, extracts events specified in its configuration (such as application, security or system logs), transforms them into JSON and sends them to a specified destination (Elasticsearch in our case) for indexing and subsequent analysis, thus contributing to centralized monitoring and management of Windows logs.

Installing Winlogbeat

setting-up-elk-illustration-

Here is the Winlogbeat configuration to retrieve logs.

yaml

setup.template.name: "winlogbeat"
setup.template.pattern: "winlogbeat-*"

winlogbeat.event_logs:
  - name: Application
    ignore_older: 72h
  - name: System
  - name: Security
  - name: Microsoft-Windows-Sysmon/Operational
  - name: Windows PowerShell
    event_id: 400, 403, 600, 800
  - name: Microsoft-Windows-PowerShell/Operational
    event_id: 4103, 4104, 4105, 4106
  - name: ForwardedEvents
    tags: [forwarded]

setup.template.settings:
  index.number_of_shards: 1

setup.kibana:
  hosts: ["localhost:5601"]

output.elasticsearch:
  index: "winlogbeat-%{[agent.version]}-%{+yyyy.MM.dd}"
  hosts: ["192.168.1.70:9200"]
  protocol: "https"
  username: "elastic"
  password: "maybe_secure_password_here ?!"
  ssl.certificate_authorities: ["C:\\Users\\vagrant\\Downloads\\winlogbeat-8.12.2-windows-x86_64\\ca.cert"]

processors:
  - add_host_metadata:
      when.not.contains.tags: forwarded
  - add_cloud_metadata: ~

We have set the output to Elasticsearch. This way we can retrieve the data and create dashboards in Kibana. We leave the event configuration by default or a test trial.

setting-up-elk-illustration-

Running Winlogbeat under Windows in command prompt:

bash

./winlogbeat.exe -c winlogbeat.yml -e

setting-up-elk-illustration-

We successfully retrieve logs in Elasticsearch

Creating a Dashboard with Kibana

Now let's configure a data source for the views

setting-up-elk-illustration-

Setting up ELK Stack