20 septembre 2021

Mirabelle new release and rationales: metrics, events and log processing

I released a few days ago the version 0.5.0 of Mirabelle, a stream processing engine.
I will explain in this article why I built Mirabelle, what problems it solves, and what are its cool features.

Stream processing

I wrote a while ago an article (available in French only, but Google translate should be enough to understand it) about monitoring and pull vs push where I talked about these topics and about tools like Riemann and Vector.

My main point was that push-based monitoring systems are not dead and are sometimes simpler to operate than pull-based systems (Prometheus). Also, I like how push-based platforms can be used by teams autonomously, especially with tools like Kafka: everyone can subscribe to a topic containing logs or metrics and do whatever they want with the data.

Riemann was a revelation for me when I first used it (in 2016 I think ?). The tool is powerful, super flexible, with tons of good ideas. I contributed to it a lot at some point, but found a couple of things which were hard to solve without a rewrite.

I thought a lot about it (and discussed it in the previously linked article), and finally (after multiple experimentations in the last two years) started my own project inspired by Riemann (but still different, I will explain where later) 9 months ago.

Mirabelle

Mirabelle is inspired by Riemann. Some code was also reused, like the TCP server for example. Mirabelle work with the same event format than Riemann (it means tools like Collectd, Telegraf, syslog-ng, languages clients…​ for Riemann also work with Mirabelle).

Events

This is for example a Mirabelle event representing a request counter for a an HTTP application (for a specific endpoint):

{:host "my-server"
 :service "http_request_counter"
 :application "my-web-app"
 :path "/foo"
 :method "get"
 :time 1619731016,145
 :tags ["web"]
 :metric 120}

As you can see, it’s a simple map, which can be extended (you can add all the fields you want). Some fields are important: :time for the event time, :service which contains what is measured (it’s similar to what Prometheus users call measurement), and :metric for the metric value (which can be any kind of numbers).

This format is flexible and can represent a lot of things: metrics (like this example), logs (Riemann users were using it for logs management as well), events. As said previously, you can add extra keys so you are not limited by the format.

The events time

One of Riemann’s issues (for me) is that it was full of side effects. Some streams were flushing windows based on scheduled actions (like: every 10 seconds, flush it).

In Mirabelle, all actions (without exception) use the events time as a wall clock. It has several nice properties:

  • The same inputs will always produce the same outputs. It makes tests easy to write (more on that later).

  • You can stop sending events to Mirabelle and resume later (like, one minute later) without having your internal states updated. Mirabelle streams will just do nothing if they do not receive events.

  • You can use Mirabelle for real-time stream processing, but also to work on old data. You can for example instantiate a stream, feed it with old data (which could be ordered), and the result will be correct: the actual time of the machine hosting Mirabelle is not important, only the events times are.
    I’m sure a lot of funny use cases (continuous queries) can be implemented with Mirabelle. And the same instance of Mirabelle can be used for both modes (real time or not), streams being completely independent.

In conclusion, Mirabelle can be used for a lot of things:

  • Real-time stream processing: you can just use it as a router between various systems (multiple inputs: logs, metrics …​ and multiple outputs: InfluxDB, Elasticsearch, cloud services…​ depending on the events types and services).
    You can also do real time computation (rate, percentiles, correlations between events, clusters monitoring, time windows…​) on events as well before forwarding the result to external systems.

  • Alerting (there is a Pagerduty integration built in).

  • Continuous queries on old data.

EDN for streams

There are several other differences with Riemann, or features missing in Riemann but available in Mirabelle (like a HTTP API to manage streams, a clean way to handle I/O…​ I explain all of this in the documentation) but one thing I changed is super important: streams are represented as a EDN datastructure.

In Riemann, the DSL is actual Clojure code (real functions which are evaluated). In Mirabelle, the DSL is "compiled" to EDN which is understood by Mirabelle at the end.

Let’s take an example:

(streams
 (stream {:name :foo :default true}
   (where [:= :service "http_request_counter"]
     (info))))

This Mirabelle stream named :foo ( :default true indicates that events should be forwarded to this stream by default) will filter events with :service equal to http_request_counter and log them. You can now compile this stream (using mirabelle compile). The result is:

{:foo
 {:default true,
  :actions
  {:action :sdo,
   :children
   ({:action :where,
     :params [[:= :service "http_request_counter"]],
     :children ({:action :info})})}}}

You can easily map the DSL with the EDN representation. Mirabelle will read this EDN file and start logging events.

You can of course define multiple streams (with or without :default, you can also specify in the :stream key on the event the targeted stream), in multiple files. Mirabelle handles all of that as expected.

Using an intermediate representation of streams (EDN) instead of just writing real Clojure functions (like in Riemann) offers tons of advantages:

  • Streams can be compared: Mirabelle supports hot reload and will only reload streams which were updated

  • Streams can be passed between systems: Mirabelle offers an API to dynamically create (or remove) streams (which is super cool).

  • Configuration can be statically analyzed and verified: I almost finished a command which will let you couvert a configuration to a graphviz representation of streams (with all actions detailed) for example.

If you are a Riemann user, you will probably say at this point "but I like writing Clojure in Riemann, it’s extensible, I’m not limited by a DSL". You are right. In Mirabelle, you can only use actions which are known by Mirabelle, you cannot write your own code directly in the configuration file like in Riemann.
Fortunately, Mirabelle can be extended, as explained in the documentation, so at the end you can do whatever you want, the full power of the JVM is available.

I also decided to do that because Clojure was hard for a lot of Riemann operators. Here, you have a DSL which compiles to EDN, it’s not "real" Clojure code, you have a compilation phase to detect issues, so It should be way easier to use for most people.

Note that I added in the latest Mirabelle release the ability to share configuration snippets (with variables support, the ability to read values from environment variables…​) between streams. All of this is explained in the documentation, I really encourage you to read this part because it offers a clean way to reuse parts of configurations.

Fault tolerance

I like simple systems. I think the world needs tools which are easy to run, operate, and with decent performances.
I don’t want to be forced to deploy Kafka, Hadoop, Zookeeper, or something else. I think most users do not need all of that and want simple tools.

Mirabelle is not a distributed application. Is it an issue ? Maybe for some people, but a lot of popular tools are in this situation today.

You can still have high availability with Mirabelle by using external tools, which is I think the way to go (a simple tool which can be combined with others if needed):

  • Send events in tools like Kafka, and make Mirabelle consume it (I used this approach successfully with Riemann).

  • Use a TCP load balancer in front of Mirabelle instances

In the future, I would really like to write my own proxy with consistent hashing which could be deployed in front of Mirabelle, a bit like the proxy in Veneur. It should not be too difficult to build and could really help HA setups.

I’m also thinking about stateful computation and how to maintain internal states between crashes. Today, Mirabelle is able to store events on a disk queue and replay it when it starts to rebuild its state (see documentation about it).
But I’m also thinking about dedicated actions, maybe backed by a store like rocksdb, with a "checkpoint" system to rebuild the state…​ To be honest, I think a lot of users don’t need that. But I may work on something soon.

A complete example

Let’s go back to our event from the beginning of the article. It represents a counter which is always increasing. Let’s use Mirabelle to compute the rate of requests for our applications:

(streams

 (stream {:name :influxdb :default true}
   (push-io! :influxdb))

 (stream {:name :http_req_rate :default true}
   ;; filter events based on the :service value
   (where [:= :service "http_request_counter"]
     ;; split the stream for each compination of these fields
     ;; we indeed want to compute the rate for each unique combination
     ;; and not mix events from multiple hosts etc...
     (by [:host :application :path :method]
       ;; compute the rate
       (ddt
        ;; rename the event :service
        (with {:service "http_request_rate"}
          ;; expose these metrics in a channel
          (publish! :http_request_rate)
          ;; store events into a tap for tests, noop in normal mode
          (tap :http_request_rate)
          ;; push data into influxdb
          (push-io! :influxdb)))))))

I wrote in this example two streams:

  • The first one named :influxdb will forward all events received by Mirabelle into InfluxDB. It’s always useful to have the raw data available. How to define I/O is documented here.

  • The second one is named :http_req_rate. It filters events with :service equal to http_request_counter, and computes correctly the rate of requests based on these events. I will explain what publish! is a bit later.

We now want to test it. The tap action is here for that. It does nothing by default, but will register events in test mode. This is my test:

{:http_req_rate {:input [{:host "my-server"
                          :service "http_request_counter"
                          :application "my-web-app"
                          :path "/foo"
                          :method "get"
                          :time 1
                          :tags ["web"]
                          :metric 110}
                         {:host "my-server"
                          :service "http_request_counter"
                          :application "my-web-app"
                          :path "/bar"
                          :method "post"
                          :time 1
                          :tags ["web"]
                          :metric 30}
                         {:host "my-server"
                          :service "http_request_counter"
                          :application "my-web-app"
                          :path "/foo"
                          :method "get"
                          :time 11
                          :tags ["web"]
                          :metric 210}
                         {:host "my-server"
                          :service "http_request_counter"
                          :application "my-web-app"
                          :path "/bar"
                          :method "post"
                          :time 11
                          :tags ["web"]
                          :metric 80}]
                 :tap-results {:http_request_rate
                               [{:host "my-server"
                                 :service "http_request_rate"
                                 :application "my-web-app"
                                 :path "/foo"
                                 :method "get"
                                 :time 11
                                 :tags ["web"]
                                 :metric 10}
                                {:host "my-server"
                                 :service "http_request_rate"
                                 :application "my-web-app"
                                 :path "/bar"
                                 :method "post"
                                 :time 11
                                 :tags ["web"]
                                 :metric 5}]}}}

Tests are also defined as EDN. Each test contains a list of events in :input, and in :tap-results the expected content of taps. All tests run in isolation, side effects (I/O) are automatically discarded.

We inject here two types of events, one for :path "/foo" and one for :path "/bar" (to check that our by action works as expected).

mirabelle test

09:30:40.400 [main] INFO mirabelle.core - launching tests
{"@timestamp":"2021-09-20T09:30:40.632+02:00","@version":"1","message":"launching test :percentiles","logger_name":"mirabelle.test","thread_name":"main","level":"INFO","level_value":20000}
{"@timestamp":"2021-09-20T09:30:40.651+02:00","@version":"1","message":"Reloading streams","logger_name":"mirabelle.stream","thread_name":"main","level":"INFO","level_value":20000}
{"@timestamp":"2021-09-20T09:30:40.660+02:00","@version":"1","message":"Adding new streams :http_req_rate, :influxdb","logger_name":"mirabelle.stream","thread_name":"main","level":"INFO","level_value":20000}
All tests successful

It passes ! Our HTTP request rate is computed as expected (for example, for "/foo" we have :metric 110 at :time 1, :metric 210 at :time 11 so we expected a rate of 10 during this period.

Riemann tests were sometimes difficult to write, and I think one of Mirabelle strengh is its test system. Being able to easily test transformations, alerting rules…​ is important.

Websocket

Let’s go back to publish! action. Mirabelle (like Riemann) has a Pub-Sub system, with a couple of new features.

You can subscribe to events (using websockets) in Mirabelle. It can be used to quickly get information about some kind of events, build real-time dashboards (here is a Riemann plugin for Grafana which also work with Mirabelle).

Once Mirabelle starts, let’s subscribe to the :http_request_rate channel (the value passed to :publish!. A client is available in the Mirabelle repository.

./websocket.py --channel http_request_rate

Now, let’s send some events to Mirabelle

riemann-client send --host "foo" --service "http_request_counter" --ttl 60 --metric-d 140 --attribute application=example --attribute path="/foo" --attribute method="get"

riemann-client send --host "foo" --service "http_request_counter" --ttl 60 --metric-d 180 --attribute application=example --attribute path="/foo" --attribute method="get"

I get in my websocket client shell:

> Received at Mon Sep 20 09:44:47 2021
{
    "host": "foo",
    "service": "http_request_rate",
    "state": null,
    "description": null,
    "metric": 8.795075031084059,
    "tags": null,
    "time": 1632123887.646,
    "ttl": 60.0,
    "x-client": "riemann-c-client",
    "application": "example",
    "path": "/foo",
    "method": "get"
}

It works ! You can even pass a query to the websocket client to only get some events (queries have the same syntax that where clause):

./websocket.py --channel http_request_rate --query '[:and [:= :service "http_request_rate"] [:> :metric 10]]

This subscription will only keep events with :metric greater than 10 for :service "http_request_rate". You could for example publish all events to some channel (it should not impact performances), like that you know that you can easily extract some events quickly using the websocket client.

I haven’t talked about it in this article but Mirabelle also has a component named the Index, which is instantiated per stream. You can put events into this index, subscribe to it, and detect events expiration (this is where the :ttl field is used). See the documentation for more information.

Conclusion

Mirabelle ships with 65 built-in actions (and I constantly add new ones) to work on events (filtering, aggregating events, various kinds of windows, math operations…​). It supports Pagerduty, ElasticSearch, and InfluxDB as I/O (I would like to add TimescaleDB, Graphite…​ but writing integrations is a lot of work).

I only showed you simple examples in this article, but monitoring distributed systems and today’s infrastructure is possible with Mirabelle. I plan to write an article where I monitor an infrastructure.

I also didn’t demo the HTTP API. As explained before, you can use it to manage streams dynamically. You can also push events to Mirabelle through HTTP, which can simplify integrations with some systems.

If you are interested in Mirabelle, don’t hesitate to test it and give me feedback. If you are a vendor and want to integrate your solution with it, you can also contact me.
I spent a lot of time writing the documentation, which should contain all you need to get started (but if you are blocked or have questions, you can open an issue on Github).

I hope you enjoyed the article.

Tags: devops projects english

Add a comment








If you have a bug/issue with the commenting system, please send me an email (my email is in the "About" section).

Top of page