I have a use case where a large no of logs will be consumed to the apache flink CEP. My use case is to find the brute force attack and port scanning attack. The challenge here is that while in ordinary CEP we compare the value against a constant like "event" = login. In this case the Criteria is different as in the case of brute force attack we have the criteria as follows.
username is constant and event="login failure" (Delimiter the event happens 5 times within 5 minutes). It means the logs with the login failure event is received for the same username 5 times within 5 minutes
And for port Scanning we have the following criteira.
ip address is constant and dest port is variable (Delimiter is the event happens 10 times within 1 minute). It means the logs with constant ip address is received for the 10 different ports within 1 minute.
With Flink, when you want to process the events for something like one username or one ip address in isolation, the way to do this is to partition the stream by a key, using
keyBy()
. The training materials in the Flink docs have a section on Keyed Streams that explains this part of the DataStream API in more detail.keyBy()
is the roughly same concept as aGROUP BY
in SQL, if that helps.With CEP, if you first key the stream, then the pattern will be matched separately for each distinct value of the key, which is what you want.
However, rather than CEP, I would instead recommend Flink SQL, perhaps in combination with MATCH_RECOGNIZE, for this use case. MATCH_RECOGNIZE is a higher-level API, built on top of CEP, and it's easier to work with. In combination with SQL, the result is quite powerful.
You'll find some Flink SQL training materials and examples (including examples that use MATCH_RECOGNIZE) in Ververica's github account.
Update
To be clear, I wouldn't use MATCH_RECOGNIZE for these specific rules; neither it nor CEP is needed for this use case. I mentioned it in case you have other rules where it would be helpful. (My reason for not recommending CEP in this case is that implementing the distinct constraint might be messy.)
For example, for the port scanning case you can do something like this:
The login case is similar, but easier.
Note that when working with streaming SQL, you should give some thought to state retention.
Further update
This query is likely to return a given IP address many times, but it's not desirable to generate multiple alerts.
This could be handled by inserting matching IP addresses into an Alert table, and only generate alerts for IPs that aren't already there.
Or the output of the SQL query could be processed by a de-duplicator implemented using the DataStream API, similar to the example in the Flink docs. If you only want to suppress duplicate alerts for some period of time, use a
KeyedProcessFunction
instead of aRichFlatMapFunction
, and use a Timer to clear the state when it's time to re-enable alerts for a given IP.Yet another update (concerning CEP and distinctness)
Implementing this with CEP should be possible. You'll want to key the stream by the IP address, and have a pattern that has to match within one minute.
The pattern can be roughly like this:
The first iterative condition returns true if the event being added to the pattern has a distinct port from all of the previously matching events. Somewhat similar to the example here, in the docs.
The second iterative condition returns true if
size("distinctPorts") >= 9
and this event also has yet another distinct port.See this Flink Forward talk (youtube video) for a somewhat similar example at the end of the talk.
If you try this and get stuck, please ask a new question, showing us what you've tried and where you're stuck.