Hi I am trying to use the code for connecting aws through steampipe and executing the query cd steampipe-mod-aws-compliance && steampipe check aws_compliance.benchmark.cis_v200 as I see the code is not working is there something which I am missing thank you.
def run_shell(cmd: str) -> str:
try:
result = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, text=True)
output, error = result.communicate()
if error:
print("Error occurred:", error)
return output.strip()
except Exception as e:
print("Exception occurred:", e)
return ""
def aws_login(access_key: str, secret_key: str) -> None:
try:
# Use the provided access_key and secret_key directly to create the S3 client
s3_client = boto3.client('s3', aws_access_key_id=access_key, aws_secret_access_key=secret_key)
# Use the tenant_id in AWS API calls, if needed
# For example, you can list S3 buckets to verify credentials
response = s3_client.list_buckets()
# If the API call succeeds, then the credentials are valid
print("AWS login successful.")
print("S3 Buckets:")
for bucket in response['Buckets']:
print(bucket['Name'])
except Exception as e:
print("AWS login failed. Please check your credentials.")
print("Error occurred:", e)
aws_login(access_key, secret_key)
import os
def create_steam_config() -> None:
cfg_content = ""
content_header = """connection "aws_all" {
type = "aggregator"
plugin = "aws"
connections = ["aws_*"]
}
"""
cfg_content += content_header
content_connection = """connection "aws_sub_1" {
plugin = "aws"
region = "us-east-1"
}
"""
cfg_content += content_connection
# Expand the ~ symbol to the user's home directory
config_path = os.path.expanduser("~/.steampipe/config/aws.spc")
with open(config_path, "w") as cfg_f:
cfg_f.write(cfg_content)
# Call the function to create the Steam config file
create_steam_config()
command = "cd steampipe-mod-aws-compliance && steampipe check aws_compliance.benchmark.cis_v200 --export=cis_v200.json"
try:
# Execute the command in the shell
result = subprocess.run(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if result.returncode == 0:
print("Command executed successfully.")
print("Output:")
print(result.stdout)
else:
print("Command execution failed.")
print("Error:")
print(result.stderr)
except Exception as e:
print("An error occurred:", e)