By: Walker Rowe, April 05, 2017 (08:31 AM)

Using ElasticSearch for CyberSecurity

ELK (ElasticSearch, LogStash, and Kibana) is the most popular log processing tool and certainly the easiest to set up. They scale linearly and can automatically parse multiple file formats, thus saving you the time of writing some complex regular expression. There are 120 parsers, which can be found on Github here.

ELK also creates stunning visualizations and dashboards that you could use in any SOC (security operations center) or just let your staff query your servers for operations, security, and debugging. The idea is you dump web servers, Windows, and all other logs for your organization there. Not only does that make it easy to query for errors and security events, it helps put you in SOX and HIPAA compliance.

Below are the results of parsing ssh.log.

Kibana security Data Visualization

Below is as architecture diagram. It is easiest to explain by explaining the components. We do that and show the configuration that we used to load the file ssh.log.

ELK - Elasticsearch Logstash Kibana

Filebeat—you install this agent on source systems, like Windows or Linux, to ship logs to either LogStash or ElasticSearch.

cat /etc/filebeat/filebeat.yml

– input_type: log
– /tmp/log/*

hosts: [“localhost:9200”]

LogStash—you run one instance of this for each file format you want to process. Like we said above it is programmed to handle almost any file format that you would encounter. We did not use LogStash in this example.

ElasticSearch—LogStash is designed to send parsed log data to the ES JSON database. ElasticSearch runs in a cluster. So it can scale immensely, like Hadoop. In this example there was no need to change any ES configuration as there would be if we want to run it on a cluster.

We can show what indices and how many documents for each we have loaded in ES like this:

curl 'localhost:9200/_cat/indices?v'

health status index uuid pri rep docs.count docs.deleted store.size
yellow open .kibana u9blQpePQcCO782-V9tFFQ 1 1 2 0 9.6kb 9.6kb
yellow open filebeat-2017.03.26 rHXsxb9oTNKfq1hClgUMnQ 5 1 7143 0 1.9mb 1.9mb

Kibana—this web page is designed to work with ElasticSearch. It supports the Lucene Query language which is a powerful text search engine. No changes are needed to kibana.yml either to get started.

Nginx—You should front end Kibana with Nginx as a reverse proxy server. That way you do not have to open any firewall ports. Below shows how we set that up and how we put a password on Nginx. You should put a password on it if it will have a public IP address.

sudo htpasswd -c /etc/nginx/.htpasswd nginx

cat /etc/nginx/conf.d/elastic.conf

server {
listen 80;
location / {
proxy_pass http://localhost:5601;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection 'upgrade';
proxy_set_header Host $host;
proxy_cache_bypass $http_upgrade;

auth_basic “Private Property”;
auth_basic_user_file /etc/nginx/.htpasswd;

Security and Threat Detection with ElasticSearch

There are several cybersecurity plugins for ELK, including machine learning models that we are developing at Cursive Security using Apache Spark machine learning algorithms. For example, we use the K-means clustering algorithm to group users and IP addresses into normal behavior clusters. That way we can flag outliers.

Using ElasticSearch Spark Hadoop Connector

The ElasticSearch Spark Connector is called ElasticSearch Hadoop and not ES Spark, since Hadoop is a component used by Spark. To use it you download the Jar file then start the Spark shell like this (see note below on Jar version):

spark-shell --master local --jars /usr/local/spark/elasticsearch-hadoop-5.2.2/dist/elasticsearch-spark-20_2.11-5.2.2.jar

The version numbers for the Jar are of the format elasticsearch-spark-{$sparkMajorMinorVersionNoDots}_{$scalaVersion}-{$esHadoopVersion}.jar.

So for:

Spark version 2.1.0
Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_121)
ES-Hadoop 5.2.2
elasticsearch 5.2.2

We use elasticsearch-spark-20_2.11-5.2.2.jar.

Then list data that you have indexed in ES like this:

curl 'localhost:9200/_cat/indices?v'

Then this Scala program will create an RDD from that data in ES and print it out:

import org.elasticsearch.spark._

val RDD = sc.esRDD(“filebeat-2017.03.26”)



res0: (String, scala.collection.Map[String,AnyRef]) = (AVsMV5_C73mlp2_ZI1eB,Map(@timestamp -> Sun Mar 26 22:38:40 CEST 2017, beat -> Map(hostname ->, name ->, version -> 5.2.2), input_type -> log, message -> 1331901030.210000 CBHpSz2Zi3rdKbAvwd 35820 22 failure INBOUND SSH-2.0-OpenSSH_5.0 SSH-1.99-Cisco-1.25 - - - - -, offset -> 284, source -> /tmp/log/ssh.log, type -> log))

Be Informed. Stay One Step Ahead.

Sign up for our newsletter and stay up to date with the latest industry news, trends, and technologies