Elasticsearch API and Python
I don’t like the Elastic stack. There are various reasons why I do not recommend using it.
Data-enrichment is very complex.
Lookup functions require a separate index, which probably must be added by an admin. Splunk is much simpler here. Lookups are very typical for Security Operations, where you have a list from Threat Intelligence, or of resolved DNS entries.Elasticsearch has a difficult API design with a scroll-based API
Splunk has a jobs-based API, which is much better suited for larger data-processing tasks.Kibana (the charting web frontend) lacks very basic features
You cannot easily use one properly designed search language. Splunk has got one defined search language, which is relatively easy to learn. Elastic uses various Domain-Specific Languages, which are over-engineered
Due to this, the following Wiki entry is subjective and a little ironic
Elasticsearch - can't search this!
Please check the Applies to paragraph to determine the applicability. Elastic often pushes breaking changes.
Using the scroll API
Applies to: Elasticsearch 7.0, elasticsearch
(Python 3 package 7.0.0)
The Elasticsarch API is not intuitive. One of the many shortcomings is, that we need to iterate over a result set larger than 10 000 rows with a Scroll API.
The following code is based upon a blog article at Techoverflow. I only needed to do minor modifications. The Elastic forums are full of deprecated and wrong answers, that will not yield results.
def es_iterate_all_documents(es, index, pagesize=250, scroll_timeout="1m", query=OrderedDict()):
"""
Helper to iterate ALL values from a single index
Yields all the documents.
"""
is_first = True
query.update({'size':pagesize})
query = json.dumps(query)
while True:
# Scroll next
if is_first: # Initialize scroll
result = es.search(index=index, scroll="1m", body=query)
is_first = False
else:
result = es.scroll(body={
"scroll_id": scroll_id,
"scroll": scroll_timeout
})
scroll_id = result["_scroll_id"]
hits = result["hits"]["hits"]
# Stop after no more docs
if not hits:
break
# Yield each entry
yield from (hit['_source'] for hit in hits)
How do issue an Elasticsearch query to this function?
This function returns the typical nested JSON string. By default, 250 rows are returned, until the result set is complete. The query, which has to be passed as an OrderedDict()
parameter, can de defined in a calling function like this:
esq = OrderedDict({
"query":
{"match":
{"example_field": {"query": "1.2.3.4"}}
}
})
Using an OrderedDict()
object here makes this a little more bearable.
How do I pass a client object to this function?
Applies to: Elasticsearch 7.0, elasticsearch
(Python 3 package 7.0.0)
The es
parameter is supposed to hold an Elasticsearch client.
from elasticsearch import Elasticsearch
es = Elasticsearch(hosts="4.3.2.1")
How do I pass the current index to this function?
Assuming that you have a daily rolling index model, you can use:
Passing the result set into a Pandas DataFrame
Append to a list per iteration
Applies to: Elasticsearch 7.0, pandas
(Python 3 package 0.24)
The simple (and not most effective) way is to append the rows per iteration to a list
.
Handling the JSON result set with pandas (pd
) makes tabular integration less painful because Elasticsearch results aren’t easy to process.
Using Elasticsearch result sets with Pandas and Networkx
Let’s assume that we have our data set in a ordered DataFrame:
.drop_duplicates()
is a Pandas function, that will take a tupel from two rows and eliminate the duplicates. There are other statistical functions for different purposes, which allow us to calculate aggregates, median and DataFrame wide statistics.
Now for a quick demo of the capabilities of applied DataScience using Pandas and Elasticsearch, consider the following basic example with NetworkX and JupyterLab :
Using SQL as a query language with the Elasticsearch REST API
Elasticsearch today contains features that are exposed via the REST API, which allows using SQL and RegEx. This can be used with Python and Pandas:
SQL support for Elasticsearch Data Streams is a fairly recent addition. This way you can use Beats data
The result can be read into a DataFrame:
This way we don’t need to use the Elastic DSL with all these curly braces. Sadly, it doesn’t seem that Kibana has been switched to SQL yet.