code-dumps/aws/AwsEnvReview.py

464 lines
19 KiB
Python
Executable File

#!/usr/bin/python3
"""
Review AWS environment based on 6 WAR pillars, namely:
1. Operational Excellence
2. Security
3. Reliability
4. Performance Efficiency
5. Cost Optimization
6. Sustainability
"""
import boto3
import botocore
import jmespath
import re
from pprint import pprint
from datetime import date
def printTitle(title):
print("\n")
print("=" * len(title))
print(title.upper())
print("=" * len(title))
return
def printSubTitle(title):
print("\n" + title + "\n")
return
def getAllRegions(myclient):
return jmespath.search("Regions[*].RegionName", myclient.describe_regions(AllRegions=False))
def getAgeFromDate(inputDate):
today = date.today()
delta = today - inputDate.date()
return delta.days
def printResult(content: list, header: str):
header = "Index, " + header
if len(content) <= 0:
print("👍 No issue found.")
else:
print(header)
print("-" * len(header))
for count, row in enumerate(content):
print(count+1, *row, sep=", ")
return
sts = boto3.client("sts")
aid = sts.get_caller_identity().get("Account")
client = boto3.client('ec2', region_name="us-east-1")
regions = getAllRegions(client)
print("AWS Environment Review - " + str(date.today()) + "\n\n")
printTitle("Ec2 service review")
printSubTitle("[Cost Optimization] Instances stopped for over 14 days - Consider backing up and terminate instances "
"or use AutoScalingGroup to spin up and down instances as needed.")
outTable = []
for r in regions:
client = boto3.client('ec2', region_name=r)
response = client.describe_instances()
if len(response.get("Reservations")) > 0:
for i in jmespath.search("Reservations[*].Instances[*]", response):
if i[0].get("State").get("Name") == "stopped":
outTable.append([r, aid, i[0].get("InstanceId"), getAgeFromDate(i[0].get("UsageOperationUpdateTime"))])
printResult(outTable, "Region, AccountID, InstanceId, DaysStopped")
printSubTitle("[Security] Insecure IDMSv1 allowed - Consider requiring IDMSv2. For more information, "
"see https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configuring-instance-metadata-service.html")
outTable = []
for r in regions:
client = boto3.client('ec2', region_name=r)
response = client.describe_instances()
if len(response.get("Reservations")) > 0:
for i in jmespath.search("Reservations[*].Instances[*]", response):
if i[0].get("MetadataOptions").get("HttpTokens") == "optional":
outTable.append([r, aid, i[0].get("InstanceId"), i[0].get("MetadataOptions").get("HttpTokens") ])
printResult(outTable, "Region, AccountID, InstanceId, IDMSv2")
printSubTitle("[Sustainability] Use of previous generation instance type - "
"Consider using current generation instances")
outTable = []
for r in regions:
client = boto3.client('ec2', region_name=r)
response = client.describe_instances()
if len(response.get("Reservations")) > 0:
for i in jmespath.search("Reservations[*].Instances[*]", response):
if re.search("^(t1|t2|m3|m1|m2|m4|c1|c2|c3|c4|r3|r4|i2)", i[0].get("InstanceType")) is not None:
outTable.append([r, aid, i[0].get("InstanceId"), i[0].get("InstanceType")])
printResult(outTable, "Region, AccountID, InstanceId, InstanceType")
printSubTitle("[Cost Optimization] Unattached EBS volumes - Consider backing up the volumes and delete them")
outTable = []
for r in regions:
client = boto3.client('ec2', region_name=r)
response = client.describe_volumes(
Filters=[
{
'Name': 'status',
'Values': ['available']
}
]
)
for i in response.get("Volumes"):
outTable.append([r, aid, i.get("VolumeId"), i.get("Size"), i.get("VolumeType")])
printResult(outTable, "Region, AccountID, VolumeId, Size, VolumeType")
printSubTitle("[Cost Optimization] EBS snapshots more than 365 days old - "
"Consider removing snapshots if no longer needed")
outTable = []
for r in regions:
client = boto3.client('ec2', region_name=r)
response = client.describe_snapshots(
OwnerIds=[aid]
)
for i in response.get("Snapshots"):
if getAgeFromDate(i.get("StartTime")) > 365 and i.get("Description") != "This snapshot is created by the AWS Backup service.":
outTable.append([r, aid, i.get("SnapshotId"), i.get("Description")[:70], getAgeFromDate(i.get("StartTime"))])
printResult(outTable, "Region, AccountID, SnapshotId, Description, SnapshotAge")
printSubTitle("[Security] Unencrypted EBS volumes - Consider replacing volume with encrypted ones. "
"One can do so by stopping the Ec2 instance, creating snapshot for the unencrypted volume, "
"copy the snapshot to a new encrypted snapshot, create a volume from the encrypted snapshot,"
"detach the original volume and attach the encrypted volume. Remember to clean up the volumes"
"and snapshots afterwards.")
outTable = []
for r in regions:
client = boto3.client('ec2', region_name=r)
response = client.describe_volumes(
Filters=[
{
'Name': 'encrypted',
'Values': ['false']
},
{
'Name': 'status',
'Values': ['in-use']
}
]
)
for i in response.get("Volumes"):
outTable.append([r, aid, i.get("VolumeId"), i.get("Size"), i.get("VolumeType")])
printResult(outTable, "Region, AccountID, VolumeId, Size, VolumeType")
printSubTitle("[Cost Optimization] Unused Elastic IP - Consider deleting unused EIP")
outTable = []
for r in regions:
client = boto3.client('ec2', region_name=r)
response = client.describe_addresses()
for i in response.get("Addresses"):
if i.get("AssociationId") is None:
outTable.append([r, aid, i.get("PublicIp")])
printResult(outTable, "Region, AccountID, PublicIp")
printTitle("Security group review")
printSubTitle("[Security] Security group rules allowing ingress from 0.0.0.0/0 - Consider setting more restrictive rules "
"allowing access from specific sources.")
outTable = []
for r in regions:
client = boto3.client('ec2', region_name=r)
response = client.describe_security_group_rules()
for sgr in jmespath.search("SecurityGroupRules[?IsEgress==`false`]", response):
if (not sgr.get("IsEgress")
and sgr.get("CidrIpv4") == "0.0.0.0/0"
and sgr.get("FromPort") != 443
and sgr.get("ToPort") != 443
and sgr.get("FromPort") != 80
and sgr.get("ToPort") != 80):
outTable.append([r, aid, sgr.get("GroupId"), sgr.get("SecurityGroupRuleId"), sgr.get("FromPort"), sgr.get("ToPort")])
printResult(outTable, "Region, AccountID, SecurityGroup, Rule, FromPort, ToPort")
printTitle("Rds service review")
printSubTitle("[Security] Unencrypted RDS instances - Consider encrypting RDS instances. For more detail, see "
"https://docs.aws.amazon.com/prescriptive-guidance/latest/patterns/encrypt-an-existing-amazon-rds-for-postgresql-db-instance.html")
outTable = []
for r in regions:
client = boto3.client('rds', region_name=r)
response = client.describe_db_instances()
for i in response.get("DBInstances"):
if i.get("StorageEncrypted") == "False":
outTable.append([r, aid, i.get("DBInstanceIdentifier"), i.get("Engine")])
response = client.describe_db_clusters()
for i in response.get("DBClusters"):
if i.get("StorageEncrypted") == "False":
outTable.append([r, aid, i.get("DBClusterIdentifier"), i.get("Engine")])
printResult(outTable, "Region, AccountID, DBIdentifier, Engine")
printSubTitle("[Reliability] RDS instance running in single availability zone - "
"Consider enabling multi-az for production use.")
outTable = []
for r in regions:
client = boto3.client('rds', region_name=r)
response = client.describe_db_instances()
for i in response.get("DBInstances"):
if not i.get("MultiAZ"):
outTable.append([r, aid, i.get("DBInstanceIdentifier"), i.get("Engine")])
response = client.describe_db_clusters()
for i in response.get("DBClusters"):
if not i.get("MultiAZ"):
outTable.append([r, aid, i.get("DBClusterIdentifier"), i.get("Engine")])
printResult(outTable, "Region, AccountID, DBIdentifier, Engine")
printTitle("Lambda service review")
printSubTitle("[Security] Outdated Lambda runtime - Consider changing to currently supported Lambda runtime versions, "
"listed on https://docs.aws.amazon.com/lambda/latest/dg/lambda-runtimes.html")
outTable = []
for r in regions:
client = boto3.client('lambda', region_name=r)
response = client.list_functions()
for i in response.get("Functions"):
if i.get("Runtime") is not None:
if re.search("python2|python3.[678]|java8|nodejs[468]|nodejs1[024]|dotnet6", i.get("Runtime")) is not None:
outTable.append([r, aid, i.get("FunctionName"), i.get("Runtime")])
printResult(outTable, "Region, AccountID, FunctionName, Runtime")
printTitle("Iam service review")
printSubTitle("[Security] Iam user access key not rotated for 180 days - Consider rotating access key")
outTable = []
client = boto3.client('iam', region_name="us-east-1")
listUsers = client.list_users()
users = jmespath.search("Users[*].UserName", listUsers)
for u in users:
response = client.list_access_keys(UserName=u)
for i in response.get("AccessKeyMetadata"):
if getAgeFromDate(i.get("CreateDate")) > 180:
outTable.append([aid, u, i.get("AccessKeyId"), getAgeFromDate(i.get("CreateDate"))])
printResult(outTable, "AccountID, UserName, AccessKeyId, AccessKeyAge")
printSubTitle("[Security] Iam AdministratorAccess policy attached - Consider granting minimum privileges "
"to users/groups/roles. AWS managed policies for job functions are recommended. See "
"https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_job-functions.html")
outTable = []
client = boto3.client('iam', region_name="us-east-1")
entityResp = client.list_entities_for_policy(
PolicyArn='arn:aws:iam::aws:policy/AdministratorAccess'
)
for group in jmespath.search("PolicyGroups[*].GroupName", entityResp):
outTable.append([aid, "Group", group])
for user in jmespath.search("PolicyUsers[*].UserName", entityResp):
outTable.append([aid, "User", user])
for role in jmespath.search("PolicyRoles[*].RoleName", entityResp):
outTable.append([aid, "Role", role])
printResult(outTable, "AccountID, Type, Name")
printTitle("Cloudwatch service review")
printSubTitle("[Cost Optimization] Cloudwatch LogGroups without retention period - Consider setting retention")
outTable = []
for r in regions:
client = boto3.client('logs', region_name=r)
response = client.describe_log_groups()
for i in response.get("logGroups"):
if i.get("retentionInDays") is None:
outTable.append([r, aid, i.get("logGroupName"), int(round(i.get("storedBytes")/1024/1024,0))])
printResult(outTable, "Region, AccountID, LogGroup, SizeMiB")
printSubTitle("[Security] Cloudwatch LogGroups unencrypted - Consider encrypting LogGroups")
outTable = []
for r in regions:
client = boto3.client('logs', region_name=r)
response = client.describe_log_groups()
for i in response.get("logGroups"):
if i.get("kmsKeyId") is None:
outTable.append([r, aid, i.get("logGroupName")])
printResult(outTable, "Region, AccountID, LogGroup")
printTitle("Backup service review")
printSubTitle("[Reliability] Ec2/Rds instances found but AWSBackup plan missing - "
"Consider setting up AWSBackup plans to backup AWS resources.")
outTable = []
for r in regions:
client = boto3.client('backup', region_name=r)
response = client.list_backup_plans()
if len(response.get("BackupPlansList")) <= 0:
ec2client = boto3.client("ec2", region_name=r)
ec2resp = ec2client.describe_instances()
ec2instances = jmespath.search("Reservations[*].Instances[*]", ec2resp)
rdsclient = boto3.client("rds", region_name=r)
rdsresp = rdsclient.describe_db_instances()
rdsinstances = rdsresp.get("DBInstances")
instanceCount = len(ec2instances) + len(rdsinstances)
if instanceCount >= 1:
outTable.append([r, aid, "AWSBackup plan missing", instanceCount])
printResult(outTable, "Region, AccountID, BackupPlan, Ec2RdsInstances")
printTitle("S3 service review")
printSubTitle("[Security] S3 bucket policy missing - Consider creating bucket policy and restrict access to bucket")
outTable = []
client = boto3.client('s3', region_name="us-east-1")
response = client.list_buckets()
for i in jmespath.search("Buckets[*].Name", response):
try:
policyResp = client.get_bucket_policy(Bucket=i)
except:
outTable.append([aid, i])
printResult(outTable, "AccountID, BucketName")
printTitle("ElastiCache review")
printSubTitle("[Sustainability] ElastiCache instances on x64 platform - Consider Graviton instances "
"such as t4g/r7g to optimize your infrastructure investment.")
outTable = []
for r in regions:
client = boto3.client('elasticache', region_name=r)
response = client.describe_cache_clusters()
for i in response.get("CacheClusters"):
if re.search("[0-9]g.", i.get("CacheNodeType")) is None:
outTable.append([r, aid, i.get("CacheClusterId"), i.get("CacheNodeType")])
printResult(outTable, "Region, AccountID, CacheClusterId, CacheNodeType")
printTitle("LoadBalancer service review")
printSubTitle("[Cost Optimization] LB Target group without targets - Consider removing empty target groups")
outTable = []
for r in regions:
client = boto3.client('elbv2', region_name=r)
response = client.describe_target_groups()
for i in response.get("TargetGroups"):
tgResp = client.describe_target_health(TargetGroupArn=i.get("TargetGroupArn"))
if len(jmespath.search("TargetHealthDescriptions[*].Target", tgResp)) == 0:
outTable.append([r, aid, i.get("TargetGroupName")])
printResult(outTable, "Region, AccountID, TargetGroup")
printTitle("KMS service review")
printSubTitle("[Security] Customer Managed Keys do not have auto rotation enabled - "
"Consider enabling auto key rotation. When a key is rotated, previous ones "
"are still kept within AWS to allow data retrival.")
outTable = []
for r in regions:
client = boto3.client('kms', region_name=r)
response = client.list_keys()
for i in jmespath.search("Keys[*].KeyId", response):
try:
keyResp = client.describe_key(KeyId=i)
if (keyResp.get("KeyMetadata").get("Enabled") == "True"
and keyResp.get("KeyMetadata").get("KeyManager") == "CUSTOMER"):
krResp = client.get_key_rotation_status(KeyId=i)
if krResp.get("KeyRotationEnabled") != "False":
outTable.append([r, aid, i])
except:
pass
printResult(outTable, "Region, AccountID, KeyId")
printTitle("ApiGateway service review")
printSubTitle("[Security] ApiGateway resource policy missing - Consider restricting access to private API with a "
"policy. Private Api should be accessed through Vpc endpoint and a policy ensures the Api cannot "
"be accessed otherwise. For more detail, see "
"https://docs.aws.amazon.com/apigateway/latest/developerguide/apigateway-resource-policies-examples.html")
outTable = []
for r in regions:
client = boto3.client('apigateway', region_name=r)
response = client.get_rest_apis()
for i in response.get("items"):
if "PRIVATE" in i.get("endpointConfiguration").get("types") and len(i.get("policy")) <= 0:
outTable.append([r, aid, i.get("name")])
printResult(outTable, "Region, AccountID, PrivateApiName")
printTitle("Cloudtrail service review")
printSubTitle("[Security] Cloudtrail not encrypted - Consider enabling encryption for cloudtrail")
outTable = []
for r in regions:
client = boto3.client('cloudtrail', region_name=r)
response = client.describe_trails()
for i in response.get("trailList"):
if i.get("KmsKeyId") is None:
outTable.append([r, aid, i.get("Name")])
printResult(outTable, "Region, AccountID, Trail")
printSubTitle("[Security] Multi-Region cloudtrail not enabled - Consider enabling Multi-Region for at least 1 cloudtrail")
outTable = []
multiRegionTrailCount = 0
for r in regions:
client = boto3.client('cloudtrail', region_name=r)
response = client.describe_trails()
for i in response.get("trailList"):
if i.get("IsMultiRegionTrail"):
multiRegionTrailCount += 1
if multiRegionTrailCount <= 0:
outTable.append([r, aid, "Missing multi region trail"])
printResult(outTable, "Region, AccountID, Status")
printTitle("Vpc service review")
printSubTitle("[Reliability] Insufficient VPN tunnels - Consider having 2 tunnels for each site VPN connection. "
"AWS performs VPN tunnel endpoint maintenance rather frequently. Having 2 tunnel reduces the risk "
"of service interruption.")
outTable = []
for r in regions:
client = boto3.client('ec2', region_name=r)
response = client.describe_vpn_connections()
for i in response.get("VpnConnections"):
if len(jmespath.search("Options.TunnelOptions[*].OutsideIpAddress", i)) < 2:
outTable.append([r, aid, i.get("VpnConnectionId"), len(jmespath.search("Options.TunnelOptions[*].OutsideIpAddress", i))])
printResult(outTable, "Region, AccountID, VpnConnection, TunnelCount")
printTitle("Eks service review")
printSubTitle("[Sustainability] Eks node running on AmazonLinux2 (AL2) - Consider using AmazonLinux2023. "
"AL2's end of life date is 2025-06-30. AmazonLinux2023 runs on newer kernel and libraries, "
"which offers better performance and security.")
outTable = []
for r in regions:
client = boto3.client('eks', region_name=r)
response = client.list_clusters()
for cluster in response.get("clusters"):
ngsResp = client.list_nodegroups(clusterName=cluster)
for ng in ngsResp.get("nodegroups"):
ngResp = client.describe_nodegroup(
clusterName=cluster,
nodegroupName=ng
)
if re.search("^AL2_", ngResp.get("nodegroup").get("amiType")):
outTable.append([r, aid, cluster, ng, ngResp.get("nodegroup").get("amiType")])
printResult(outTable, "Region, AccountID, Cluster, NodeGroup, AmiType")
printSubTitle("[Sustainability] Eks control plane version outdated - Consider using upgrading Eks cluster. "
"Reference https://docs.aws.amazon.com/eks/latest/userguide/kubernetes-versions.html for a list "
"of current versions. Reference https://docs.aws.amazon.com/eks/latest/userguide/update-cluster.html "
"for upgrade instructions.")
outTable = []
for r in regions:
client = boto3.client('eks', region_name=r)
response = client.list_clusters()
for cluster in response.get("clusters"):
clusterResp = client.describe_cluster(name=cluster)
if float(jmespath.search("cluster.version", clusterResp)) < 1.28:
outTable.append([r, aid, cluster, clusterResp.get("cluster").get("version")])
printResult(outTable, "Region, AccountID, Cluster, Version")
# TODO
"""
- config enabled for all regions
- list users/groups/roles with administrator access
"""