Normalize two different jsons according to shared values

71 views Asked by At

I have two data sources that I retrieve different format Json's from and I want to create a normalized object that will represent the merged Json's according to distinct values.

For example, the first json:

[
  {
    "zone_group": "us-east-1b",
    "kernel_version": "5.10.130-118.517.amzn2.x86_64",
    "chassis_type": "1",
    "chassis_type_desc": "Other",
    "connection_ip": "172.xx.xx.xx",
    "default_gateway_ip": "172.xx.xx.xx",
    "connection_mac_address": "12-5e-2e-db-xx-xx"
...
  }
]

the second json:

[
  {
    "sourceInfo": {
      "list": [
        {
          "Ec2AssetSourceSimple": {
            "instanceType": "t2.micro",
            "groupName": "AutoScaling-Group-1",
            "macAddress": "12-5e-2e-db-xx-xx",
            "monitoringEnabled": "false",
            "spotInstance": "false",
            "zone": "VPC",
            "instanceState": "RUNNING",
            "type": "EC_2",
            "availabilityZone": "us-east-1b",
            "privateIpAddress": "172.xx.xx.xx",
            "firstDiscovered": "2022-08-18T22:23:04Z"
...
  }
]

I want to normalize the Json's and create a unified representation of them based on values, in this example the IP address "172.xx.xx.xx" will be represented once in the normalized object (name taken from the first Json, but doesn't really matter).

How do I go about and do this?

2

There are 2 answers

2
Jason Baker On

IIUC:

Code:

df1 = pd.DataFrame(data=json1)
df2 = pd.json_normalize(data=[x.get("sourceInfo") for x in json2], record_path="list")
final_df = pd.merge(left=df1,
                    right=df2.rename(columns=lambda x: x.split(".")[-1]),
                    left_on="default_gateway_ip",
                    right_on="privateIpAddress")
print(final_df)

Output:

   zone_group                 kernel_version chassis_type chassis_type_desc connection_ip default_gateway_ip connection_mac_address instanceType            groupName         macAddress monitoringEnabled spotInstance zone instanceState  type availabilityZone privateIpAddress       firstDiscovered
0  us-east-1b  5.10.130-118.517.amzn2.x86_64            1             Other  172.xx.xx.xx       172.xx.xx.xx      12-5e-2e-db-xx-xx     t2.micro  AutoScaling-Group-1  12-5e-2e-db-xx-xx             false        false  VPC       RUNNING  EC_2       us-east-1b     172.xx.xx.xx  2022-08-18T22:23:04Z
0
frederic laurencin On

I would normalize the data using pydantic. and then generate a new aggregated data

from pydantic import BaseModel, Field, AliasChoices, BeforeValidator
from typing import Annotated, List
import json


CohercedInt = Annotated[int, BeforeValidator(int)]


class VmInfoFromSource1(BaseModel):
    region: str = Field(alias=AliasChoices("zone_group"))
    kernel_version: str
    chassis_type: CohercedInt
    chassis_type_desc: str
    ip_address: str = Field(alias=AliasChoices("connection_ip"))
    default_gateway_ip: str = Field(alias=AliasChoices("connection_ip"))
    mac_address: str = Field(alias=AliasChoices("connection_mac_address"))


class VmInfoFromSource2(BaseModel):
    insance_type: str = Field(alias=AliasChoices("instanceType"))
    group_name: str = Field(alias=AliasChoices("groupName"))
    mac_address: str = Field(alias=AliasChoices("macAddress"))
    monitoring_enabled: str = Field(alias=AliasChoices("monitoringEnabled"))
    spot_instance: str = Field(alias=AliasChoices("spotInstance"))
    zone: str
    instance_state: str = Field(alias=AliasChoices("instanceState"))
    instance_type: str = Field(alias=AliasChoices("type"))
    region: str = Field(alias=AliasChoices("availabilityZone"))
    ip_address: str = Field(alias=AliasChoices("privateIpAddress"))
    first_discovered: str = Field(alias=AliasChoices("firstDiscovered"))


def convert_from_source2(source_2_data: List[dict]) -> List[dict]:
    return [
        data["Ec2AssetSourceSimple"]
        for element in source_2_data
        for data in element["sourceInfo"]["list"]
    ]


def associate_data_by_key(
    source_1_data: List[dict], source_2_data: List[dict], key: str = "mac_address"
):
    vm_data_1 = [VmInfoFromSource1(**vm_info) for vm_info in source_1_data]
    vm_data_2 = [
        VmInfoFromSource2(**vm_info) for vm_info in convert_from_source2(source_2_data)
    ]
    indexed_vm_data = {
        getattr(vm_data, key): vm_data.model_dump() for vm_data in vm_data_1
    }
    indexed_vm_data_2 = {
        getattr(vm_data, key): vm_data.model_dump() for vm_data in vm_data_2
    }
    for index, vm_data in indexed_vm_data_2.items():
        if index in indexed_vm_data:
            indexed_vm_data[index].update(vm_data)
        else:
            indexed_vm_data[index] = vm_data
    return indexed_vm_data


if __name__ == "__main__":
    source_1 = [
        {
            "zone_group": "us-east-1b",
            "kernel_version": "5.10.130-118.517.amzn2.x86_64",
            "chassis_type": "1",
            "chassis_type_desc": "Other",
            "connection_ip": "172.xx.xx.xx",
            "default_gateway_ip": "172.xx.xx.xx",
            "connection_mac_address": "12-5e-2e-db-xx-xx",
        }
    ]
    source_2 = [
        {
            "sourceInfo": {
                "list": [
                    {
                        "Ec2AssetSourceSimple": {
                            "instanceType": "t2.micro",
                            "groupName": "AutoScaling-Group-1",
                            "macAddress": "12-5e-2e-db-xx-xx",
                            "monitoringEnabled": "false",
                            "spotInstance": "false",
                            "zone": "VPC",
                            "instanceState": "RUNNING",
                            "type": "EC_2",
                            "availabilityZone": "us-east-1b",
                            "privateIpAddress": "172.xx.xx.xx",
                            "firstDiscovered": "2022-08-18T22:23:04Z",
                        }
                    }
                ]
            }
        }
    ]
    print(
        json.dumps(
            associate_data_by_key(source_1_data=source_1, source_2_data=source_2),
            indent=4,
        )
    )
    print(
        json.dumps(
            list(associate_data_by_key(source_1, source_2).values()),
            indent=4,
        )
    )

Pydantic Models:

  • VmInfoFromSource1 and VmInfoFromSource2 are Pydantic models that define the structure of data from two different sources.They use the Field class to specify aliases for some fields using AliasChoices.

Function: convert_from_source2:

def convert_from_source2(source_2_data: List[dict]) -> List[dict]:
    return [
        data["Ec2AssetSourceSimple"]
        for element in source_2_data
        for data in element["sourceInfo"]["list"]
    ]
  • This function takes a list of dictionaries (source_2_data) and extracts a specific key ("Ec2AssetSourceSimple") from each dictionary in a nested structure.

Function: associate_data_by_key:

def associate_data_by_key(
    source_1_data: List[dict], source_2_data: List[dict], key: str = "mac_address"
):
  • This function associates data from two different sources based on a specified key (default is "mac_address").

    • It creates instances of VmInfoFromSource1 and VmInfoFromSource2 for each set of data.
    • It creates dictionaries (indexed_vm_data and indexed_vm_data_2) where the specified key maps to the serialized form of the corresponding Pydantic model.
    • It merges the data from the two dictionaries based on the specified key. The result is a dictionary where the specified key maps to a dictionary containing merged data.