基础接口脚本

基于Python3写的基础接口脚本

主要的功能是读写mysql数据库,读取ELK和Wazuh,发送邮件,后续会增加更多功能,至于import的库,可以自行使用pip下载

from requests import urllib3
from requests.auth import HTTPBasicAuth
import json, requests, mysql.connector, re, os,smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMETextfrom base64
import b64encode
def executeMysqlData(serverID, oper= None,dataSQL= None):
    if serverID == 1:
        mydb = mysql.onnector.connect(
            host="x.x.x.x",
            user="x.x.x.x",
            password="x.x.x.x",
            database="x.x.x.x",
            port=3306)
    elif serverID == 2 :
        mydb = mysql.connector.connect(
            host="y.y.y.y",
            user=" yyyy",
            password=" yyyy",
            database=" yyyy",
            port=3306)
    mycursor = mydb. cursor()
    if oper.upper() == 'WRITE' and dataSQL.split()[θ].upper() != 'SELECT':
        mycursor.execute(dataSQL)
        mydb.commit()
        mycursor.close()
        mydb.close()
        if mycursor.rowcount > θ:
            return True
        else:
            return False
    elif oper.upper() == 'READ' and dataSQL.split()[0].upper() != 'INSERT' and dataSQL.split()[θ].upper() != 'UPDATE':
        mycursor. execute(dataSQL)
        resultList = []
        for i in mycursor:
            resultList. append(i)
        mycursor.close()
        mydb.close()
        return resultList
    else:
        return ' unexpected request! check and try again!'
        #定义 mcafee查询 ip和机器名信息,允许传递 ip地址或者机器名或者FQDN名,返回为 json数据
        def apiFromMcafee(actionId=1, input= None):
            urllib3.disable.warnings()
            username = 'xxx'
            password = 'yyy'
            if actionId == 1:
                url= f'https://xxxx:8443/remote/system.find?searchText={input}&:output=json'
                curl_command=f'curl -u{'"'+username+'"'}:{'"'+password+'"'} -s -k "{url}"'
                data = os.popen(curl_command).read()
                patternIP = re.compile(r'10.((25[θ-5]|2[0-4]\d|1\d{2}|[1-9]?\d)\.){2}')
                record={}
                if bool(re. match(patternIP, input)) == True:
                    if len(data) > 20:
                        if data[θ:2] == 'OK':
                            jsonStr = json. loads(data[5:])
                            for i in range(0, len(jsonStr)):
                                ip = jsonStr[i]['EPOComputerProperties. IPAddress']
                                subnetSearch = "'" + patternIP. search(ip)[0] + '0/24' + "'"
                                querySubnetInfo = f"SELECT name from IPStatus WHERE subnet = {subnetSearch}" 
                                if input == ip:
                                    #这些只是为了看着好看,所以未知的统一改成 Unknown
                                    if jsonStr[i]['EPOComputerProperties. UserName'] == 'N/A' or jsonStr[i]['EPOComputerProperties.Username'] == 'None':
                                        user = ' Unknown'
                                    else:
                                        user = jsonStr[i]['EPOComputerProperties.UserName']
                                    if jsonStr[i]['EPOComputerProperties. IPHostName'] == 'N/A' or jsonStr[i]['EPOComputerProperties.IPHostName'] == 'None':
                                        hostname = ' Unknown'
                                    else:
                                        hostname = jsonStr[i]['EPOComputerProperties.IPHostName']
                                    queryDepartment = executeMysqlData(1,'read',querySubnetInfo)
                                    if len(queryDepartment)== 0:
                                        departmentName = 'Unknown'
                                    else:
                                        departmentName = queryDepartment[0][0]
                                    record = {
                                        'ip':ip,
                                        'username':user,
                                        'department':departmentName,
                                        'computerName':hostname
                                        }
                                    break
                                else:
                                    record = False
                        else:
                            record = False
                else:
                    record = False
            elif actionId == 2:
                url = f'https://xxxx:8443/remote/core.executeQuery?queryId={input}&:output=json'
                curl_command = f'curl -u {'"' + username + '"'}:{'"' + password + '"'} -s -k"{url}"'
                response = os.popen(curl_command).read()
            return response
        #定义发送邮件接口的方法, subject为标题, data为内容, sendTo为收件人, type为邮件类型, 默认为 html, 如乘是纯文本, 则写 plain
        def sendMail(subject, data,sendTo='[email protected]',type='html'):
            msg = MIMEMultipart('alternative')
            msg['Subject'] = subject
            msg['From'] = '[email protected]'
            msg['To'] = sendTo
            text_part = MIMEText(data, type)
            msg.attach(text_part)
            #此处填写邮件服务器接口
            with smtplib.SMTP('xxxx',25) as smtp:
                try:
                    smtp.send_message(msg)
                    return True
                except:
                    return False
        #定义读取ELK的方法, serverId为区分不同的ELK集群,indexData为要搜索的索引目录, bodyData为要搜索的条件, 格式为json,返回json格式的字符串
        def readELK(serverId=2,indexDatα=None,bodyDatα=None,esVersion=8):
            #xxxx
            if serverId == 1:
                apiKey = 'aaaa'
                url = '  https://ssss:9200'
            #wazuh的ELK
            elif serverId == 3:
                username = 'wazuh'
                password = 'wazuh'
                url = 'https://aaaa:9200'
            if esVersion == 8:
                headers = {" Authorization": "ApiKey" + apiKey}
                resultJson = json.loads((requests. get(url + "/" + indexData + "/_search", headers= headers, json=bodyData,verify=False)).text)
            elif esVersion == 7:
                resultJson = json.loads((requests. get(url + "/" + indexData + "/_search", auth=HTTPBasicAuth(username,password),json=bodyData,verify=False)).text)
            print ("读取ELK数据完毕")
            return resultJson
        #定义读写jira的api的方法,第一个为url,也就是条件,第二个为jsonPost,如果要创建case则使用jsonPost,如果只是get的话第二项则不用写
        def apiFromJira(url,jsonPost= None):
            usernameForJira = passwordForJira = 'xxxx'
            urllib3. disable _ warnings()
            if jsonPost is None:
                response = requests. get(url, verify= False, auth=(usernameForJira, passwordForJira))
            else:
                if url[-8:] == 'assignee':
                    response = requests.put(url,json=jsonPost,verify=False,auth=(usernameForJira, passwordForJira))
                else:
                    response = requests.post(url,json=jsonPost,verify=False,auth=(usernameForJira, passwordForJira))
            return response. text
        def getClientFromWazuh(option=1, params='limit=10000', action='get',urlParameters=''):
            urllib3. disable _ warnings()
            #默认用户名和密码
            username = password = 'xxxx'
            if option == 1:
                host = 'xxxx'
            elif option == 2:
                host= 'wazuh.bstops.com'
                username = 'xxxx'
                password = 'xxxx'
            host = '10.190.18.8'
            protocol = 'https'
            port = 55000
            login_endpoint = 'security/user/authenticate'
            login_url = f"{protocol}://{host}:{port}/{login_endpoint}"
            basic_auth = f"{username}:{password}".encode()
            login_headers = {
                'Content-Type': 'application/json',
                'Authorization': f'Basic{b64encode(basic_auth).decode()}'
                }
            response = requests.post(login_url, headers = login_headers, verify= False)
            if response.status_code == 405:
                response = requests.get(login_url, headers = login_headers, verify= False)
            token = json.loads(response.content.decode())['data']['token']
            requests_headers = {
                'Content-Type': 'application/json',
                'Authorization': f'Bearer{token}'
                }
            if action == 'get':
                if urlParameters == '' or 'agents/' in urlParameters:
                    response = requests.get(f"{protocol}://{host}:{port}/{urlParameters}?{params}&pretty=true",headers = requests_headers, verify=False)
                else:
                    response = requests.get(f"{protocol}://{host}:{port}/{urlParameters}", headers = requests_headers,verify = False,params = params)
                return response.text
            elif action == 'put':
                response = requests.put(f"{protocol}://{host}:{port}/{urlParameters}?agents_list=2310429&pretty=true&wait_for_complete=true",headers = requests_headers,verify = False , params = params)
                return response.text
            elif action == 'post':
                response = requests.post(f"{protocol}://{host}:{port}/{urlParameters}", headers= requests_headers, verify = False , params = params)
                return response.text