gcp IAM信息并判断权限是否开大
抓取gcp IAM信息并判断权限是否开大 需要调用到基础接口脚本里的数据库读写和google cloud库
from google.cloud import resourcemanager_v3
from google.cloud import recommender_v1
from google.oauth2 import service_account
from BasicInterfaces import executeMysqlData
import json,time
credentials = service_account.Credentials.from_service_account_file('<xxx.json>')
resource_manager_client = resourcemanager_v3.ProjectsClient(credentials= credentials)
def list_iam_recommendations(project_id, location="global", recommender_id="google.iam.policy.Recommender"):
"""获取 IAM 权限优化建议"""
parent = f"projects/{project_id}/locations/{location}/recommenders/{recommender_id}"
#获取建议
recommender_client = recommender_v1.RecommenderClient(credentials= credentials)
recommendations = recommender_client.list_recommendations(parent= parent)
userDict = {}
for recommenders in recommendations:
resourcesDict = dict(recommenders.content.overview)
user = resourcesDict['member'].split(':')[-1]
addRoles = []
for i in resourcesDict['addedRoles']:
addRoles.append(i.split('/')[-1])
userDict[user] = {'remove':resourcesDict['removedRole'].split('/')[-1],'add':addRoles}
return userDict
def get_project_iam_policy(project_id, data={}):
"""获取项目的 IAM 策略"""
project_name = f"projects/{project_id}"
policy = resource_manager_client.get_iam_policy(resource= project_name)
for binding in policy.bindings:
for i in binding.members:
if i not in data:
account = i.split(':')[-1].split('?')[θ]
data[account] = {}data[account][" unknown"] = []data[account][" unknown"]. append(str(binding. role). split('/')[-1])return dataproject _ ids = {
"<gcp显示项目名>":" <gcp实际项目名>",
}
for project in project_ids:
try:
iamRecommenderDict = list_iam_recommendations(project_ids[project])
iamDict = get_project_iam_policy(project_ids[project])
for i in iamDict:
if i not in iamRecommenderDict:
iamRecommenderDict[i] = iamDict[i]
for account in iamRecommenderDict:
for i in iamRecommenderDict[account]:
if i == 'unknown':
sql= f"""INSERT INTO gcpIamCheck (project, account,rules,action) VALUES ({"'" + project + "'"},{"'" + account + "'"},{"'" + ",".join(iamRecommenderDict[account][i])+ "'"},{"'" + i + "'"})"""
executeMysqlData(1,' write', sql)
elif i == 'remove':
sql= f"""INSERT INTO gcpIamCheck (project, account, recommend, action) VALUES ({"'" + project + "'"},{"'" + account + "'"},{"'" + iamRecommenderDict[account][i] + "'"},{"'"+ i + "'"})"""
executeMysqlData(1,' write', sql)
elif i == ' add':
if len(iamRecommenderDict[account][i]) != θ:
sql= f"""INSERT INTO gcpIamCheck (project, account, recommend, action) VALUES ({"'" + project + "'"},{"'" + account + "'"},{"'" + iamRecommenderDict[account][i] + "'"},{"'" + i + "'"})"""
executeMysqlData(1,' write', sql)
iamDict = {}
iamRecommenderDict={}
except:
sql= f"""INSERT INTO gcpIamCheck (project, account, action) VALUES ({"'" + project + "'"},'unknown','unknown')"""executeMysqlData(1,' write', sql)