I use zfrools and flink 1.17.1 to filter event, input/output are read/write file
I keyBy(customerid) and in rule count events which same userid. If count >= 5 write events to file
Result I set parallelism = 1, output 8 event. If i set parallelism =2 output 7 event. If i set parallelism = 5 output = 5 event
Why is the result different?
My question:
Why parallelism change then output are changed?
What is the relationship between paralellism and keyBy?
This is my rule code:
declare SyslogEventV2
@role(event)
@expires( 5m )
end
when
($event : SyslogEventV2(matched_prefilter_stateful_rule_ids contains "Prefilter_Testcase_Windows_002", $user_id : user_id))
and
(accumulate ($allEvent : SyslogEventV2(user_id == $user_id) over window:time(5m); $count : count($allEvent); $count >= 5))
then
$event.addMatchedWindowsRuleId("Testcase_Windows_002");
update($event);
if (channels["output"] != null) {
channels["output"].send($event);
}
This my code flink:
DataStream<EventBase> filteredStream = statefulInputEventStream.map(new EventBaseTransformation()); // If customer_id == null then customer_id = UUID.random()
DataStream<EventBase> keyedStream = filteredStream.keyBy(f -> f.getCustomerId());
DataStream<EventBase> statefulFiltered = keyedStream
.transform(config.get(STATEFUL_FILTER_NAME), stream.getType(), new StatefulFilterFunction(config, config.get(STATEFUL_FILTER_TABLENAME)))
.name(config.get(STATEFUL_FILTER_NAME))
.setParallelism(config.get(STATEFUL_FILTER_PARALLELISM));
// Write to Sink
return statefulFiltered;
This is my input:
{"id":1,"source_ip": "121.101.15.215", "dst_ip": "121.101.50.25", "groupEvent": "sslvpn_user_auth", "user_id": 333}
{"id":2,"source_ip": "121.101.15.215", "dst_ip": "121.101.50.25", "groupEvent": "sslvpn_user_auth", "user_id": 333}
{"id":3,"source_ip": "121.101.15.215", "dst_ip": "121.101.50.25", "groupEvent": "sslvpn_user_auth", "user_id": 333}
{"id":7,"source_ip": "121.101.15.215", "dst_ip": "121.101.50.25", "groupEvent": "sslvpn_user_auth", "user_id": 333,"customer_id":"ggdsd"}
{"id":8,"source_ip": "121.101.15.215", "dst_ip": "121.101.50.25", "groupEvent": "sslvpn_user_auth", "user_id": 333,"customer_id":"ggdsd"}
{"id":9,"source_ip": "121.101.15.215", "dst_ip": "121.101.50.25", "groupEvent": "sslvpn_user_auth", "user_id": 333,"customer_id":"ggdsd"}
{"id":10,"source_ip": "121.101.15.215", "dst_ip": "121.101.50.25", "groupEvent": "sslvpn_user_auth", "user_id": 333,"customer_id":"ggdsd"}
{"id":11,"source_ip": "121.101.15.215", "dst_ip": "121.101.50.25", "groupEvent": "sslvpn_user_auth", "user_id": 333,"customer_id":"ggdsd"}
Question : If STATEFUL_FILTER_PARALLELISM = 1, output = 8 event mactched
{"event_id":null,"eventTime":0,"groupEvent":"sslvpn_user_auth","source_ip":"121.101.15.215","dst_ip":"121.101.50.25","machine_name":null,"time":null,"source_log":null,"matched_prefilter_stateful_rule_ids":"['Prefilter_Testcase_Windows_002','Prefilter_Testcase_Windows_003']","matched_single_rule_ids":"[]","matched_windows_rule_ids":"['Testcase_Windows_002']","key":"ggdsd","id":9}
{"event_id":null,"eventTime":0,"groupEvent":"sslvpn_user_auth","source_ip":"121.101.15.215","dst_ip":"121.101.50.25","machine_name":null,"time":null,"source_log":null,"matched_prefilter_stateful_rule_ids":"['Prefilter_Testcase_Windows_002','Prefilter_Testcase_Windows_003']","matched_single_rule_ids":"[]","matched_windows_rule_ids":"['Testcase_Windows_002']","key":"2e9a78ad-8287-4626-9460-338f569a1ce4","id":2}
{"event_id":null,"eventTime":0,"groupEvent":"sslvpn_user_auth","source_ip":"121.101.15.215","dst_ip":"121.101.50.25","machine_name":null,"time":null,"source_log":null,"matched_prefilter_stateful_rule_ids":"['Prefilter_Testcase_Windows_002','Prefilter_Testcase_Windows_003']","matched_single_rule_ids":"[]","matched_windows_rule_ids":"['Testcase_Windows_002']","key":"ggdsd","id":10}
{"event_id":null,"eventTime":0,"groupEvent":"sslvpn_user_auth","source_ip":"121.101.15.215","dst_ip":"121.101.50.25","machine_name":null,"time":null,"source_log":null,"matched_prefilter_stateful_rule_ids":"['Prefilter_Testcase_Windows_002','Prefilter_Testcase_Windows_003']","matched_single_rule_ids":"[]","matched_windows_rule_ids":"['Testcase_Windows_002']","key":"ggdsd","id":11}
{"event_id":null,"eventTime":0,"groupEvent":"sslvpn_user_auth","source_ip":"121.101.15.215","dst_ip":"121.101.50.25","machine_name":null,"time":null,"source_log":null,"matched_prefilter_stateful_rule_ids":"['Prefilter_Testcase_Windows_002','Prefilter_Testcase_Windows_003']","matched_single_rule_ids":"[]","matched_windows_rule_ids":"['Testcase_Windows_002']","key":"ggdsd","id":8}
{"event_id":null,"eventTime":0,"groupEvent":"sslvpn_user_auth","source_ip":"121.101.15.215","dst_ip":"121.101.50.25","machine_name":null,"time":null,"source_log":null,"matched_prefilter_stateful_rule_ids":"['Prefilter_Testcase_Windows_002','Prefilter_Testcase_Windows_003']","matched_single_rule_ids":"[]","matched_windows_rule_ids":"['Testcase_Windows_002']","key":"2a424ebf-d756-4d9e-81fd-126997e9bc69","id":3}
{"event_id":null,"eventTime":0,"groupEvent":"sslvpn_user_auth","source_ip":"121.101.15.215","dst_ip":"121.101.50.25","machine_name":null,"time":null,"source_log":null,"matched_prefilter_stateful_rule_ids":"['Prefilter_Testcase_Windows_002','Prefilter_Testcase_Windows_003']","matched_single_rule_ids":"[]","matched_windows_rule_ids":"['Testcase_Windows_002']","key":"ggdsd","id":7}
{"event_id":null,"eventTime":0,"groupEvent":"sslvpn_user_auth","source_ip":"121.101.15.215","dst_ip":"121.101.50.25","machine_name":null,"time":null,"source_log":null,"matched_prefilter_stateful_rule_ids":"['Prefilter_Testcase_Windows_002','Prefilter_Testcase_Windows_003']","matched_single_rule_ids":"[]","matched_windows_rule_ids":"['Testcase_Windows_002']","key":"1da89ccc-49f9-481a-9ea1-37fbcf03debf","id":1}
If STATEFUL_FILTER_PARALLELISM = 2, output = 7 event mactched
{"event_id":null,"eventTime":0,"groupEvent":"sslvpn_user_auth","source_ip":"121.101.15.215","dst_ip":"121.101.50.25","machine_name":null,"time":null,"source_log":null,"matched_prefilter_stateful_rule_ids":"['Prefilter_Testcase_Windows_002','Prefilter_Testcase_Windows_003']","matched_single_rule_ids":"[]","matched_windows_rule_ids":"['Testcase_Windows_002']","key":"cd3a0cff-c890-4bac-b7ff-988198c96fff","id":2}
{"event_id":null,"eventTime":0,"groupEvent":"sslvpn_user_auth","source_ip":"121.101.15.215","dst_ip":"121.101.50.25","machine_name":null,"time":null,"source_log":null,"matched_prefilter_stateful_rule_ids":"['Prefilter_Testcase_Windows_002','Prefilter_Testcase_Windows_003']","matched_single_rule_ids":"[]","matched_windows_rule_ids":"['Testcase_Windows_002']","key":"ggdsd","id":7}
{"event_id":null,"eventTime":0,"groupEvent":"sslvpn_user_auth","source_ip":"121.101.15.215","dst_ip":"121.101.50.25","machine_name":null,"time":null,"source_log":null,"matched_prefilter_stateful_rule_ids":"['Prefilter_Testcase_Windows_002','Prefilter_Testcase_Windows_003']","matched_single_rule_ids":"[]","matched_windows_rule_ids":"['Testcase_Windows_002']","key":"ggdsd","id":8}
{"event_id":null,"eventTime":0,"groupEvent":"sslvpn_user_auth","source_ip":"121.101.15.215","dst_ip":"121.101.50.25","machine_name":null,"time":null,"source_log":null,"matched_prefilter_stateful_rule_ids":"['Prefilter_Testcase_Windows_002','Prefilter_Testcase_Windows_003']","matched_single_rule_ids":"[]","matched_windows_rule_ids":"['Testcase_Windows_002']","key":"ggdsd","id":9}
{"event_id":null,"eventTime":0,"groupEvent":"sslvpn_user_auth","source_ip":"121.101.15.215","dst_ip":"121.101.50.25","machine_name":null,"time":null,"source_log":null,"matched_prefilter_stateful_rule_ids":"['Prefilter_Testcase_Windows_002','Prefilter_Testcase_Windows_003']","matched_single_rule_ids":"[]","matched_windows_rule_ids":"['Testcase_Windows_002']","key":"ggdsd","id":11}
{"event_id":null,"eventTime":0,"groupEvent":"sslvpn_user_auth","source_ip":"121.101.15.215","dst_ip":"121.101.50.25","machine_name":null,"time":null,"source_log":null,"matched_prefilter_stateful_rule_ids":"['Prefilter_Testcase_Windows_002','Prefilter_Testcase_Windows_003']","matched_single_rule_ids":"[]","matched_windows_rule_ids":"['Testcase_Windows_002']","key":"b71ee10a-ea9f-46d3-8424-cc8261b5e90e","id":1}
{"event_id":null,"eventTime":0,"groupEvent":"sslvpn_user_auth","source_ip":"121.101.15.215","dst_ip":"121.101.50.25","machine_name":null,"time":null,"source_log":null,"matched_prefilter_stateful_rule_ids":"['Prefilter_Testcase_Windows_002','Prefilter_Testcase_Windows_003']","matched_single_rule_ids":"[]","matched_windows_rule_ids":"['Testcase_Windows_002']","key":"ggdsd","id":10}
If STATEFUL_FILTER_PARALLELISM = 5, output = 5 event mactched
{"event_id":null,"eventTime":0,"groupEvent":"sslvpn_user_auth","source_ip":"121.101.15.215","dst_ip":"121.101.50.25","machine_name":null,"time":null,"source_log":null,"matched_prefilter_stateful_rule_ids":"['Prefilter_Testcase_Windows_002','Prefilter_Testcase_Windows_003']","matched_single_rule_ids":"[]","matched_windows_rule_ids":"['Testcase_Windows_002']","key":"ggdsd","id":7}
{"event_id":null,"eventTime":0,"groupEvent":"sslvpn_user_auth","source_ip":"121.101.15.215","dst_ip":"121.101.50.25","machine_name":null,"time":null,"source_log":null,"matched_prefilter_stateful_rule_ids":"['Prefilter_Testcase_Windows_002','Prefilter_Testcase_Windows_003']","matched_single_rule_ids":"[]","matched_windows_rule_ids":"['Testcase_Windows_002']","key":"ggdsd","id":9}
{"event_id":null,"eventTime":0,"groupEvent":"sslvpn_user_auth","source_ip":"121.101.15.215","dst_ip":"121.101.50.25","machine_name":null,"time":null,"source_log":null,"matched_prefilter_stateful_rule_ids":"['Prefilter_Testcase_Windows_002','Prefilter_Testcase_Windows_003']","matched_single_rule_ids":"[]","matched_windows_rule_ids":"['Testcase_Windows_002']","key":"ggdsd","id":8}
{"event_id":null,"eventTime":0,"groupEvent":"sslvpn_user_auth","source_ip":"121.101.15.215","dst_ip":"121.101.50.25","machine_name":null,"time":null,"source_log":null,"matched_prefilter_stateful_rule_ids":"['Prefilter_Testcase_Windows_002','Prefilter_Testcase_Windows_003']","matched_single_rule_ids":"[]","matched_windows_rule_ids":"['Testcase_Windows_002']","key":"ggdsd","id":10}
{"event_id":null,"eventTime":0,"groupEvent":"sslvpn_user_auth","source_ip":"121.101.15.215","dst_ip":"121.101.50.25","machine_name":null,"time":null,"source_log":null,"matched_prefilter_stateful_rule_ids":"['Prefilter_Testcase_Windows_002','Prefilter_Testcase_Windows_003']","matched_single_rule_ids":"[]","matched_windows_rule_ids":"['Testcase_Windows_002']","key":"ggdsd","id":11}