I am using Logstash version 6.5.4 Data is read and parsed using Filebeat -> Logstash -> Elasticsearch
But as I can see on Kibana for a few cases, I am getting duplicate field data i.e data is converted into an array of duplicate field data separated by a comma .
My message sent from filebeat was:
2024-03-12 13:27:00.126,d85b4ecb-c4b7-4168-bcc1-9b6a3508ce4a,System,NotificationsProcessor,MQ_TO_JOBSERVICE,Tue Mar 12 13:27:00 IST 2024,TENANT,,null,SUCCESSFUL,94,126,'Task completed successfully',TESTING,null,'{"notificationsCount":31}'
{
"_version": 3,
"_source": {
"jobSpecificMetaData": [
"{\"notificationsCount\":31}",
"{\"notificationsCount\":31}",
"{\"notificationsCount\":31}"
],
"current_step_time_ms": 94,
"request_id": [
"d85b4ecb-c4b7-4168-bcc1-9b6a3508ce4a",
"d85b4ecb-c4b7-4168-bcc1-9b6a3508ce4a",
"d85b4ecb-c4b7-4168-bcc1-9b6a3508ce4a"
],
"tenant_mode": [
"TESTING",
"TESTING",
"TESTING"
],
"total_time_ms": 126,
"created_timestamp": [
"2024-03-12 13:27:00.126",
"2024-03-12 13:27:00.126",
"2024-03-12 13:27:00.126"
],
"@timestamp": "2024-03-12T07:57:02.887Z",
"execution_level": [
"TENANT",
"TENANT",
"TENANT"
],
"status": [
"SUCCESSFUL",
"SUCCESSFUL",
"SUCCESSFUL"
]
}
}
I am unable to debug this issue. Kindly help as this is breaking my dashboard and reporting.
Filebeat:
# Each - is an input. Most options can be set at the input level, so
# you can use different inputs for various configurations.
# Below are the input specific configurations.
- type: log
# Change to true to enable this input configuration.
enabled: true
# Paths that should be crawled and fetched. Glob based paths.
paths:
- /usr/local/apache-tomcat/logs/access_log*.txt
fields:
release_version: 1619
#----------------------------- Logstash output --------------------------------
output.logstash:
# The Logstash hosts
hosts: ["host1","host2"]
loadbalance: true
Logstash:
input {
beats {
port => 5044
}
}
filter{
mutate {
remove_field => ["offset","prospector","input","version","tags"]
add_field => { "host" => "%{[beat][name]}" }
}
grok {
match => { "message" => "\[%{USER:tenant} (?<email>-|[a-zA-Z0-9_.+=:-]+@[0-9A-Za-z][0-9A-Za-z-]{0,62}(?:\.(?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*)(?:\s+%{DATA:api_user_name})?\] %{IP:client_ip} \[%{HTTPDATE:apache_timestamp}\] \"%{WORD:method} %{NOTSPACE:url} HTTP/%{NUMBER:http_version}\" \[%{GREEDYDATA:OperationName}\] \[%{GREEDYDATA:useragent}\] %{NUMBER:server_response} %{NUMBER:bytes_sent} %{NUMBER:req_process_ms} \[%{GREEDYDATA:androidAppReleaseVersion}\] \[%{GREEDYDATA:androidDeviceUUID}\] \[%{GREEDYDATA:egress_time} %{GREEDYDATA:script_excl_egress_time} %{GREEDYDATA:channel_src_code} %{GREEDYDATA:courier_src_code}\] \[%{DATA:api_status} %{DATA:request_identifier} %{DATA:error_response_code} %{GREEDYDATA:error_response_message}\]" }
match => { "message" => "\[%{USER:tenant} (?<email>-|[a-zA-Z0-9_.+=:-]+@[0-9A-Za-z][0-9A-Za-z-]{0,62}(?:\.(?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*)(?:\s+%{DATA:api_user_name})?\] %{IP:client_ip} \[%{HTTPDATE:apache_timestamp}\] \"%{WORD:method} %{NOTSPACE:url} HTTP/%{NUMBER:http_version}\" \[%{GREEDYDATA:OperationName}\] \[%{GREEDYDATA:useragent}\] %{NUMBER:server_response} %{NUMBER:bytes_sent} %{NUMBER:req_process_ms} \[%{GREEDYDATA:androidAppReleaseVersion}\] \[%{GREEDYDATA:androidDeviceUUID}\] \[%{GREEDYDATA:egress_time} %{GREEDYDATA:script_excl_egress_time} %{GREEDYDATA:channel_src_code} %{GREEDYDATA:courier_src_code}\]"}
match => { "message" => "\[%{USER:tenant} (?<email>-|[a-zA-Z0-9_.+=:-]+@[0-9A-Za-z][0-9A-Za-z-]{0,62}(?:\.(?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*)\] %{IP:client_ip} \[%{HTTPDATE:apache_timestamp}\] \"%{WORD:method} %{NOTSPACE:url} HTTP/%{NUMBER:http_version}\" \[%{GREEDYDATA:OperationName}\] \[%{GREEDYDATA:useragent}\] %{NUMBER:server_response} %{NUMBER:bytes_sent} %{NUMBER:req_process_ms} \[%{GREEDYDATA:androidAppReleaseVersion}\] \[%{GREEDYDATA:androidDeviceUUID}\]" }
}
if [useragent] != "-" and [useragent] != "" {
useragent {
add_tag => [ "UA" ]
source => "useragent"
}
}
if [tenant] == '-'{
drop { }
}
if "_grokparsefailure" in [tags] {
drop { }
}
ruby {
code => "
event.set('index-name', event.get('source').split('_log.').last.split('.').first);
if event.get('OperationName') != '-' && event.get('url') != nil
if event.get('url').include?'/services/soap/?'
event.set('url', '/services/soap/' + event.get('OperationName') + event.get('url').split('/services/soap/').last);
elsif event.get('url').include?'/services/soap/'
event.set('url', '/services/soap/' + event.get('OperationName') + '/' + event.get('url').split('/services/soap/').last);
else
event.set('url', '/services/soap/' + event.get('OperationName') + event.get('url').split('/services/soap').last);
end
end
if event.get('url').include?'?'
event.set('base_url',event.get('url').split('?').first);
event.set('query_params',event.get('url').split('?').last);
else
event.set('base_url',event.get('url'));
end
event.set('cloud',event.get('host').split('.',2).last.split('.').first);
if event.get('cloud').start_with?'cloud'
event.set('client_type','seller');
else
event.set('client_type','enterprise');
end
if event.get('base_url').include?'soap'
event.set('api_group','soap');
else
event.set('api_group','rest');
end
if event.get('egress_time') != nil && event.get('egress_time') == '-'
event.set('egress_time','0');
end
if event.get('script_excl_egress_time') != nil && event.get('script_excl_egress_time') == '-'
event.set('script_excl_egress_time','0');
end
if event.get('error_response_code') != nil && event.get('error_response_code') == '-'
event.set('error_response_code','0');
end
"
}
mutate {
convert => {
"req_process_ms" => "integer"
"egress_time" => "integer"
"script_excl_egress_time" => "integer"
}
}
ruby {
code => "
event.set('req_prcoess_excl_script_time',event.get('req_process_ms') - event.get('egress_time') - event.get('script_excl_egress_time'));
"
}
}
}
output {
elasticsearch {
hosts => ["elastic1","elastic2","elastic3"]
index => "access-%{index-name}"
}
}