Search for a command to run...
The ELK stack is a set of powerful tools for log management and data analysis. Composed of Elasticsearch for search and indexing, Logstash for log collection, aggregation and processing, and Kibana for data visualization and analysis, the ELK stack offers a complete solution for managing system information.
In the following, we will use Vagrant to create a virtual machine and install the necessary tools. This approach is used to abstract the infrastructure and work with IaC (Infrastructure as Code).
Vagrant.configure("2") do |config|
config.vm.box = "ubuntu/jammy64"
config.vm.network "private_network", ip: "192.168.56.10"
config.vm.network "public_network", bridge: "wlan0", ip: "192.168.1.70"
config.vm.synced_folder ".", "/vagrant", disabled: false
config.vm.synced_folder "./pipelines", "/etc/logstash/conf.d", disabled: false
config.vm.provider "virtualbox" do |vb|
vb.gui = false
vb.memory = "5192"
end
config.vm.provision "shell", inline: <<-SHELL
# Update the system
apt-get update
# Add the Elastic GPG key and repository
wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo gpg --dearmor -o /usr/share/keyrings/elasticsearch-keyring.gpg
sudo apt-get install -y apt-transport-https
echo "deb [signed-by=/usr/share/keyrings/elasticsearch-keyring.gpg] https://artifacts.elastic.co/packages/8.x/apt stable main" | sudo tee /etc/apt/sources.list.d/elastic-8.x.list
sudo apt-get update
# Install Elasticsearch and enable it
sudo apt-get install -y elasticsearch
sudo echo "-Xms1024m" > /etc/elasticsearch/jvm.options
sudo echo "-Xmx1024m" >> /etc/elasticsearch/jvm.options
sudo systemctl enable --now elasticsearch
sudo systemctl restart elasticsearch
# Install Kibana and enable it
sudo apt-get install -y kibana
sudo echo "-Xms512m" > /etc/kibana/jvm.options
sudo echo "-Xmx512m" >> /etc/kibana/jvm.options
sudo systemctl enable --now kibana
sudo systemctl restart kibana
sed -i 's|#server.host: "localhost"|server.host: "0.0.0.0"|g' /etc/kibana/kibana.yml
sed -i 's|#server.port: 5601|server.port: 5601|g' /etc/kibana/kibana.yml
# Install Logstash and enable it
sudo apt-get install -y logstash
sudo echo "-Xms1g" > /etc/logstash/jvm.options
sudo echo "-Xmx1g" >> /etc/logstash/jvm.options
sudo systemctl enable --now logstash
sudo systemctl restart logstash
SHELL
end
This Vagrantfile will help us provision the machine and point to the various configuration folders of the virtual machine from our configurations on our host system.
We initialize the machine with Vagrant and then connect to it via SSH.


Now following the installation of Snort, we will proceed with its configuration in the interactive shell:

Then we must specify the network interfaces on which Snort should listen for packets.
Here we choose the three interfaces available on the machine to listen to all relevant packets.




Snort is now running on the machine! We can verify this by inspecting its status.



By default, Suricata uses the af-packet traffic method to capture packets.
Let's modify the configuration to set the physical network interface of our virtual machine and allow Suricata to capture packets.
We will configure two interfaces:

We verify that Suricata is also running correctly on the machine after restarting it.

At this stage, we have
snortconfigured on all interfaces of our machine andsuricataconfigured on interfaces enp0s9 and enp0s8 to capture packets from the internal network to the host machine and the external local network.
We have set the minimum and maximum memory of Elasticsearch to 1GB.

This may cause performance losses because we are limited by memory.



NOTE: In the Vagrantfile, the configuration replaced the IP address wildcard to allow external access to Kibana; This option was here:
sed -i 's|#server.host: "localhost"|server.host: "0.0.0.0"|g' /etc/kibana/kibana.yml
sed -i 's|#server.port: 5601|server.port: 5601|g' /etc/kibana/kibana.yml
http://192.168.1.70:5601/
/usr/share/elasticsearch/bin/elasticsearch-create-enrollment-token --scope kibana

Now let's enter this key in the Kibana entry
When the key is entered, Kibana asks us for a verification code that can be obtained by running
/usr/share/kibana/bin/kibana-verification-code



elastic because we had not specified one during installation.
To reset the password:
/usr/share/elasticsearch/bin/elasticsearch-reset-password -u elastic



Once we have finished with Kibana we will now move on to configuring Logstash
Logstash uses a pipeline configuration to specify inputs, filters and outputs
In our case we have:
Inputs: Logs from Suricata, Snort and a Windows machineFilters: Will allow parsing of different logs into structured data for our Elasticsearch instanceOutputs: This will be our Elasticsearch instance and the standard console output to test groks (pattern matching syntax)./etc/logstash/conf.d/input {
file {
path => "/var/log/snort/snort.alert.fast"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter {
grok {
match => { "message" => "%{MONTHNUM}/%{MONTHDAY}-%{TIME} %{DATA} \[%{DATA}\] \[%{DATA:signature}\] \[%{DATA}\] \[Priority: %{INT:priority}\] \{%{WORD:protocol}\} %{IP:source_address}:%{NUMBER:source_port} -> %{IP:destination_address}:%{NUMBER:destination_port}" }
}
date {
match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
target => "@timestamp"
}
}
output {
elasticsearch {
index => "logstash-%{+YYYY.MM.dd}"
hosts => ["https://localhost:9200"]
user => "elastic"
password => "maybe_secure_password_here ?!"
ssl => true
cacert => "/etc/elasticsearch/certs/http_ca.crt"
ssl_certificate_verification => true
manage_template => false
}
stdout { codec => rubydebug }
}
/etc/logstash/conf.d/input {
file {
path => ["/var/log/suricata/eve.json"]
sincedb_path => "/var/lib/logstash/sincedb_suricata"
codec => json
type => "SuricataIDPS"
}
}
filter {
if [type] == "SuricataIDPS" {
date {
match => [ "timestamp", "ISO8601" ]
}
ruby {
code => "if event['event_type'] == 'fileinfo'; event['fileinfo']['type']=event['fileinfo']['magic'].to_s.split(',')[0]; end;"
}
}
if [src_ip] {
geoip {
source => "src_ip"
target => "geoip"
add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ]
}
mutate {
convert => [ "[geoip][coordinates]", "float" ]
}
if ![geoip.ip] {
if [dest_ip] {
geoip {
source => "dest_ip"
target => "geoip"
add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ]
}
mutate {
convert => [ "[geoip][coordinates]", "float" ]
}
}
}
}
}
output {
elasticsearch {
index => "logstash-%{+YYYY.MM.dd}"
hosts => ["https://localhost:9200"]
user => "elastic"
password => "maybe_secure_password_here ?!"
ssl => true
cacert => "/etc/elasticsearch/certs/http_ca.crt"
ssl_certificate_verification => true
manage_template => false
}
stdout { codec => rubydebug }
}
codec => json): This filter is used to interpret JSON data from the Suricata log file (/var/log/suricata/eve.json). This allows Logstash to understand the JSON data structure and process it correctly.if [type] == "SuricataIDPS"): This condition checks if the log type is "SuricataIDPS". If so, the filters inside this condition will be applied to the corresponding logs.date): This filter parses and converts log timestamps in ISO8601 format to Elasticsearch-compatible timestamps. This enables efficient indexing and searching of time-based events.ruby): This filter uses Ruby code to perform custom data manipulation. In this case, it checks if the event type is "fileinfo" and if so, extracts the file type from the "magic" key and places it in the "type" key of the "fileinfo" field.geoip): This filter adds geolocation information to IP addresses present in the logs. It uses the GeoIP database to determine the country, city and geographic coordinates associated with each IP address. The geographic coordinates are then added to the "geoip.coordinates" field. Before this, the filter checks if the source IP address exists (src_ip), and if it doesn't exist, it checks the destination IP address (dest_ip). If either IP address is found, geolocation is performed for that address.mutate): This filter allows modifying the structure of events by adding or modifying fields. In this case, it is used to convert geographic coordinates from strings to decimal numbers, which is necessary for proper indexing in Elasticsearch.To perform the tests, we will remove the Elasticsearch plugin from the output and just keep the console display.
Let's start Logstash with the Snort configuration
/usr/share/logstash/bin/logstash --path.settings /etc/logstash/ -f /etc/logstash/conf.d/snort-pipeline.conf

We can see that the data is well structured and corresponds to what we expect thanks to our filter.
Let's start Logstash with the Suricata configuration
/usr/share/logstash/bin/logstash --path.settings /etc/logstash/ -f /etc/logstash/conf.d/suricata-pipeline.conf

We can see that the data is well structured and corresponds to what we expect thanks to our filter in the Suricata pipeline.
After successfully testing to verify the consistency and collection of logs by Logstash, we can now send them to our Elasticsearch instance.
## For Snort
output {
elasticsearch {
index => "snort-%{+YYYY.MM.dd}"
hosts => ["https://localhost:9200"]
user => "elastic"
password => "maybe_secure_password_here ?!"
ssl => true
cacert => "/etc/elasticsearch/certs/http_ca.crt"
ssl_certificate_verification => true
manage_template => false
}
stdout { codec => rubydebug }
}
## For Suricata
output {
elasticsearch {
index => "suricata-%{+YYYY.MM.dd}"
hosts => ["https://localhost:9200"]
user => "elastic"
password => "maybe_secure_password_here ?!"
ssl => true
cacert => "/etc/elasticsearch/certs/http_ca.crt"
ssl_certificate_verification => true
manage_template => false
}
stdout { codec => rubydebug }
}
At this stage, the Snort and Suricata logs are sent to Elasticsearch and indexed. We can verify the indexes created by Logstash in Elasticsearch.

To create dashboards, go to the Analytics tab then Dashboards.



For the occasion we will create a table containing some information and graphs for Snort logs.

Then we added two graphs showing the most used protocols and the most contacted destination addresses on the network.

Now let's use the data collected by Logstash to create visuals in Kibana.

Here we have a summary of the different communications on the network. We have a proportion of the use of machine interfaces, the use of different TCP/UDP protocols and an overview of USER AGENTS in a table associated with their IP addresses.
In summary, these different graphs allow us to see in real time what is happening on the network containing our machine. This information could serve as a basis for more in-depth analysis if necessary.
In this section we will use the Winlogbeat tool from the Elastic suite.
This will provide us with a lightweight agent that monitors Windows logs in real time, extracts events specified in its configuration (such as application, security or system logs), transforms them into JSON and sends them to a specified destination (Elasticsearch in our case) for indexing and subsequent analysis, thus contributing to centralized monitoring and management of Windows logs.

Here is the Winlogbeat configuration to retrieve logs.
setup.template.name: "winlogbeat"
setup.template.pattern: "winlogbeat-*"
winlogbeat.event_logs:
- name: Application
ignore_older: 72h
- name: System
- name: Security
- name: Microsoft-Windows-Sysmon/Operational
- name: Windows PowerShell
event_id: 400, 403, 600, 800
- name: Microsoft-Windows-PowerShell/Operational
event_id: 4103, 4104, 4105, 4106
- name: ForwardedEvents
tags: [forwarded]
setup.template.settings:
index.number_of_shards: 1
setup.kibana:
hosts: ["localhost:5601"]
output.elasticsearch:
index: "winlogbeat-%{[agent.version]}-%{+yyyy.MM.dd}"
hosts: ["192.168.1.70:9200"]
protocol: "https"
username: "elastic"
password: "maybe_secure_password_here ?!"
ssl.certificate_authorities: ["C:\\Users\\vagrant\\Downloads\\winlogbeat-8.12.2-windows-x86_64\\ca.cert"]
processors:
- add_host_metadata:
when.not.contains.tags: forwarded
- add_cloud_metadata: ~
We have set the output to Elasticsearch. This way we can retrieve the data and create dashboards in Kibana. We leave the event configuration by default or a test trial.

Running Winlogbeat under Windows in command prompt:
./winlogbeat.exe -c winlogbeat.yml -e

We successfully retrieve logs in Elasticsearch
Now let's configure a data source for the views
