AWS Cost and Usage Report difference from AWS Billing Dashboard

54 views Asked by At

I have an activated AWS service called AWS Cost and Usage Report(CUR), which generates on an Hourly basis.

The format of the CUR is in parquet format.

I have a Python code that takes the parquet and reads it.

The issue is that there is a difference in total cost calculated fetched from the parquet file and showing in AWS Billing Dashboard.

The total cost difference is not small but very huge, from example EC2 in Billing Dashboard shows a cost of around $12,000, but the cost calculated from CUR is roughly $4500.

Here is my Python code:

import sys
import time
from fastparquet import ParquetFile
import json
import datetime
from collections import defaultdict

tags_key = 'resource_tags_'
def read_parquet_file(parquet_file_path):
    start_time_rf = time.time()
    dataset = ParquetFile(parquet_file_path)

    # Convert the Parquet table to a Pandas DataFrame 
    df = dataset.to_pandas()

    resource_tags_columns = [col for col in df.columns if col.startswith(tags_key)]
    # print(resource_tags_columns)
    
    relevant_columns = ['identity_line_item_id', 'product_sku', 'line_item_line_item_type', 
                        'line_item_resource_id', 'line_item_product_code', 'product_region', 
                        'bill_billing_period_start_date', 'bill_billing_period_end_date',
                        'line_item_usage_start_date', 'line_item_usage_end_date',
                        'product_usagetype'] + resource_tags_columns + ['line_item_unblended_cost']
    df = df[relevant_columns]

    # Perform the aggregation directly without grouping if possible
    grouped_data = df.groupby(relevant_columns[:-1])['line_item_unblended_cost'].sum()
    # Sort the grouped data by line_item_product_code
    grouped_data = grouped_data.sort_index(level='line_item_product_code')
     # Record the end time
    end_time_rf = time.time()

    # Calculate the elapsed time
    elapsed_time = end_time_rf - start_time_rf
    # print(f'Time taken to read the Parquet file: {elapsed_time} seconds')
    return grouped_data, resource_tags_columns

def parse_data(grouped_data, resource_tags_columns):
    start_time_rf = time.time()
    # Print the total cost for each 'identity_line_item_id'
    final_sum = 0
    tax = 0
    usage = 0

    # grouping data based in username
    final_grouped_data = defaultdict(list)
    def convert_cost(cost):
        try:
            # Attempt direct string formatting for potential efficiency
            return f"{cost:.{len(str(cost).split('.')[-1])}f}"
        except ValueError:
            # Handle cases where direct formatting fails (e.g., scientific notation)
            exponent_str = str(cost)
            base, power = exponent_str.split('e')
            decimal = len(base.split('.')[1]) if len(base.split('.')) > 1 else 0
            power = abs(int(power))
            return f"{cost:.{power + decimal}f}"

    for (identity_line_item_id,product_sku,line_item_line_item_type,line_item_resource_id,line_item_product_code, 
        product_region ,bill_billing_period_start_date,
        bill_billing_period_end_date, line_item_usage_start_date, line_item_usage_end_date,
        product_usagetype, *resource_tags),total_cost in grouped_data.items():
        final_sum += total_cost
        if (line_item_line_item_type == 'Tax' ):
            tax += total_cost
        elif (line_item_line_item_type == 'Usage'):
            usage += total_cost

        entry = {
            'identityLineItemId': identity_line_item_id,
            'line_item_resource_id': line_item_resource_id,
            'totalCost': convert_cost(total_cost),
            'product_sku': product_sku,
            'region': product_region,
            'line_item_line_item_type': line_item_line_item_type,
            'line_item_product_code':line_item_product_code,
            'product_usagetype': product_usagetype,
            'start_date': datetime.datetime.strftime(bill_billing_period_start_date, '%Y-%m-%d %H:%M:%S'),
            'end_date':  datetime.datetime.strftime(bill_billing_period_end_date,'%Y-%m-%d %H:%M:%S'),
            'usage_start_date':datetime.datetime.strftime(line_item_usage_start_date, '%Y-%m-%d %H:%M:%S'),
            'usage_end_date':datetime.datetime.strftime(line_item_usage_end_date, '%Y-%m-%d %H:%M:%S'),
        }

        final_grouped_data[line_item_product_code].append(entry)

        for col, tag_key in enumerate(resource_tags_columns):
            if tag_value := resource_tags[col]:
                entry[tag_key] = tag_value
        

    def groupByRegion(data):
        group_by_region = defaultdict(list)  # Use defaultdict for efficient handling of new keys
        for entry in data:
            if region := entry.get('region'):
                group_by_region[region].append(entry)

        return group_by_region

    def groupByIdentityLineItemId(data):
        obj = {}
        for entry in data:
            # Extract values
            identity_id = entry["identityLineItemId"]
            total_cost = float(entry["totalCost"])

            # Add to existing entry or create a new one
            if identity_id in obj:
                obj[identity_id]["total_cost"] = convert_cost(float(obj[identity_id]["total_cost"]) + total_cost)
            else:
                obj[identity_id] = {
                    "total_cost": convert_cost(total_cost),
                    "identityLineItemId": entry['identityLineItemId'],
                    "line_item_resource_id": entry['line_item_resource_id'],
                    "region": entry['region'],
                    "line_item_product_code": entry['line_item_product_code'],
                    "product_usagetype": entry['product_usagetype']
                }
                for key, value in entry.items():
                    if key.startswith(tags_key):
                        obj[identity_id][key] = value
        return obj
    
    def groupByProductUsageType(data, type):
        group_by_type = defaultdict(list)  # Use defaultdict for efficient handling of new keys
        for entry in data:
            if type == 'AmazonEC2':
                if 'EBS' in entry['product_usagetype']:
                    group_by_type['EBS'].append(entry)
                elif 'AmazonEC2Stopped' in entry['product_usagetype']:
                    group_by_type['AmazonEC2Stopped'].append(entry)
                else:
                    group_by_type['AmazonEC2Running'].append(entry)
                # group_by_type['EBS' if 'EBS' in entry['product_usagetype'] else 'AmazonEC2Running'].append(entry)
            else:
                group_by_type[entry['product_usagetype']].append(entry)
        return getCostByproductUsageType(group_by_type)

    def getCostByRegion(data, type):
        group_by_region = defaultdict(list)  # Use defaultdict for efficient handling
        for region, entries in data.items():
            region_wise_sum = sum(float(entry['totalCost']) for entry in entries)  # Sum total cost using generator expression
            details = groupByProductUsageType(entries, type)  # Group by product usage type
            usage_sum = 0
            for entry in entries:
                if entry['line_item_line_item_type'] == 'Usage':
                    usage_sum += float(entry['totalCost'])
            
            group_by_region[region].append({'total_cost': convert_cost(region_wise_sum),
                'usage_cost': convert_cost(usage_sum), 'details': details})
        return group_by_region

    def getCostByproductUsageType(data):
        group_by_type = defaultdict(list)
        for key, value in data.items():
            regionWiseSum = sum(float(entry['totalCost']) for entry in value)  # Efficient summation

            # Group and convert cost within the loop for clarity
            group_by_type[key].append({
                f'{key}_total_cost': convert_cost(regionWiseSum),
                f'{key}_details': groupByIdentityLineItemId(value)
            })
        return group_by_type


        
    def get_cost_each_line_item_product(grouped_data):
        cost_details = {
            'details': {},
            'cost_details': {
                'total_cost': f'{final_sum:.2f}',
                'tax_cost': f'{tax:.2f}',
                'usage_cost': f'{usage:.2f}'
            }
        }

        for key, data in grouped_data.items():
            total_cost = sum(float(entry['totalCost']) for entry in data)
            tax_service = sum(float(entry['totalCost']) for entry in data if entry['line_item_line_item_type'] == 'Tax')
            usage_service = sum(float(entry['totalCost']) for entry in data if entry['line_item_line_item_type'] == 'Usage')
            region_wise_details = groupByRegion(data) 
            cost_details['details'][key] = {
                'total_cost': convert_cost(total_cost),
                'usage_cost': convert_cost(usage_service),
                'tax_cost': convert_cost(tax_service),
                'region_wise_details': getCostByRegion(region_wise_details, key)
            }

        formatted_json = json.dumps(cost_details, indent=2, ensure_ascii=False)

        # with open('final_code_cpy.json', 'w') as json_file:
        #     json.dump(cost_details, json_file)

        return formatted_json

    data = get_cost_each_line_item_product(final_grouped_data)
    print(data)
    # Record the end time
    end_time_rf = time.time()
    elapsed_time = end_time_rf - start_time_rf
    print(f'Time taken to scan the Parquet file: {elapsed_time} seconds')


if __name__ == '__main__':
   if len(sys.argv) < 2:
       sys.exit(1)

   parquet_file_path = sys.argv[1]
   grouped_data, resource_tags_columns = read_parquet_file(parquet_file_path)
   parse_data(grouped_data, resource_tags_columns)

Note: parquet file that I am using is latest parquet file.

0

There are 0 answers