Can you parse then query on the parse value in CloudWatch Insights?

34 views Asked by At

My scenario is, I can search for an error message in CloudWatch, and I get all the results I want. But from that, I want to get the @requestId (only for results that match the error). From that @requestId, I want to return all the logs.

I have tried parsing the message, where the guid for the requestId exists like this:

parse "Z * Task timed out" as msgId 

Then filtering on

filter strcontains(@message, msgId) 

But this returns zero results. Likewise, I've also tried:

filter ispresent(msgId)

But this just returns any results that are not null, from the parse command.

Outside of adding in a dedup command and creating my own list to then create a separate search on, I can't seem to find a way to achieve this.

Can you do what I'm trying to do here? Or if not, what is your recommendation on the alternative?

1

There are 1 answers

0
chromebookdev On BEST ANSWER

While this isn't possible, it can be achieved by either using the SDK or AWS CLI via your language of choice.

After getting that list above, I could iterate through each response and get the logstream, request id, timestamp (and using python save it in a df).

Writing a method like this:

import re
import json
import subprocess
import pandas as pd

# vars
profile_name = <your aws cli profile name>
log_group_name = <your log group name eg: /aws/lambda/your-lambda>

def get_log_data_events(log_stream_name, start_time):
    log_output = subprocess.run(
    [
        'aws', 
        'logs', 
        'get-log-events', 
        '--log-group-name', log_group_name, 
        '--log-stream-name', log_stream_name, 
        '--start-time', start_time, 
        '--start-from-head',
        '--profile', profile_name
    ], capture_output=True, text=True)
    log_output_json = json.loads(log_output.stdout)
    
    return log_output_json['events']

The return of ['events'] just gives you the json data of the event, instead of the extra data you may not need.

Then using df['log_data'].apply like this

df['log_data'] = df.apply(lambda row: get_log_data_events(row['@logStream'], str(row['@timestamp'].timestamp()).split('.')[0]), axis=1)

**(note the split is to remove the .000 values from the default timestamp and convert to a nice epoch. (pre-req would be having this in your df as a Timestamp datatype))

After that, I applied some search queries to find the specific data I needed:

# vars
search_in_message = 'Processing Message: {'

def get_payload_event(log_data):
    payload_search = search_in_message
    for event in log_data:
        if payload_search in event['message']:
            # 
            json_str = re.search(r'{.*}', event['message']).group()
            print(json_str)
            return json.loads(json_str)

This let me isolate the exact message I was looking for using another df.apply

df['message'] = df['log_data'].apply(lambda log_data: get_payload_event(log_data))

Then ultimately, I was able to grab those payloads and resubmit to an sqs queue. But that's outside the scope of my original question.