aws防火墙信息抓取

基于Python3写的抓取aws防火墙信息

拉取aws防火墙信息 需要调用到基础接口脚本里的数据库读写和boto3库

import boto3
from botocore.exceptions
import ClientError
from BasicInterfaces import executeMysqlData
#获取所有可用区域
def get_all_regions():
    try:
        ec2_client = boto3.client('ec2')
        response = ec2_client.describe_regions()
        regions = [region['RegionName']
        for region in response['Regions']]
        return regions
    except ClientError as e:
        print(f" Error fetching regions: {e}")
        return []
#获取指定区域的安全组
def get_security_groups_in_region(session, region):
    try:
        ec2_ client = session.client('ec2', region_name=region)
        response = ec2_client.describe_security_groups()
        return response['SecurityGroups']
    except ClientError as e:
        print(f" Error describing security groups in region {region}: {e}")
        return []
#遍历所有区域获取安全组
def get_security_groups_by_region(session):
    security_groups_by_region = {}
    regions = get_all_regions()
    for region in regions:
        print(f" Processing region: {region}... ")
        security_groups = get_security_groups_in_region(session, region)
        security_groups_by_region[region] = security_groups
        return security_groups_by_region
#示例:遍历所有账户和区域的安全组
def collect_security_groups_by_account_and_region(target _ accounts):
    account_security_groups = {}
    for role_arn in target_accounts:
        # 从 RoleArn 提取 Account ID
        account_id = role_arn.split(":")[4]
        print(f" Processing account {account _ id}... ")
        #切换到目标账户角色
        session = assume_role(role_arn)
        if not session:
            continue
        else:
            security_groups_by_region = get_security_groups_by_region(session)
            account_security_groups[account_id] = security_groups_by_region
            return account_security_groups
# AssumeRole 函数(参考前面代码)
def assume_role(role_arn, session_name="CrossAccountSession"):
    sts_client = boto3.client('sts')
    try:
        response = sts_client.assume_role(RoleArn= role_arn,RoleSessionName = session_name)
        credentials = response['Credentials']
        session = boto3.session.Session(
            aws_access_key_id= credentials['AccessKeyId'],
            aws_secret_access_key= credentials['SecretAccessKey'],
            aws_session_token= credentials['SessionToken']
            )
        return session
    except ClientError as e:
        print(f" Error assuming role {role _ arn}: {e}")
        return None
#示例目标账户 RoleArn 列表
TARGET_ACCOUNTS = ["arn:aws:iam::<awsID>:role/<跨账号访问的角色名>"," arn:aws:iam::<awsID>:role/<本地账号角色>"]
#获取所有账户和区域的安全组
security_groups_by_account_and_region = collect_security_groups_by_account_and_region(TARGET_ACCOUNTS)
#打印结果
for account_id,regions in security_groups_by_account_and_region.items():
    account = account_id
    for region,security_groups in regions.items():
        regionRecord = region
        for sg in security_groups:
            rulename = sg['GroupName']
            for rule in sg['IpPermissions']:
                if rule['IpProtocol'] != '-1':
                    for i in rule['IpRanges']:
                        if '0.0.0.0/0' in i['CidrIp']:
                            protocol = rule['IpProtocol']
                            fromPort = rule['FromPort']
                            toPort = rule['ToPort']
                            ports = str(fromPort)+ '-' + str(toPort)
                            sql= f"""INSERT INTO gcpfirewall (cloud,project,rulename,action,direction,protocol,ports) VALUES('aws',{"'" + regionRecord + "'"},{"'" + rulename + "'"},'allow',{"'" + account + "'"},{"'" + protocol + "'"},{"'" + ports + "'"})"""
                            executeMysqLData(1,' write', sql)