python脚本调用fortigate API完整代码
python通过requests模块可以发起get和post请求,调用第三方API接口,下面是调用fortigate API的例子代码:
#!/usr/bin/env python # coding=utf-8 import json import requests from requests.exceptions import ConnectionError as ReqConnError import inspect, sys, os, gzip, re, traceback import logging.handlers class FGTBaseException(Exception): """Wrapper to catch the unexpected""" def __init__(self, msg=None, *args, **kwargs): if msg is None: msg = "An exception occurred within pyfmg" super(FGTBaseException, self).__init__(msg, *args, **kwargs) class FGTValidSessionException(FGTBaseException): """Raised when a call is made, but there is no valid login instance""" def __init__(self, method, url, *args, **kwargs): msg = "A call using the {method} method was requested to {url} on a FortiOS instance that had no " \ "valid session or was not connected. Paramaters were:\n{params}". \ format(method=method, url=url, params=kwargs) super(FGTValidSessionException, self).__init__(msg, *args, **kwargs) class FGTValueError(ValueError): """Catch value errors such as bad timeout values""" def __init__(self, *args, **kwargs): super(FGTValueError, self).__init__(*args, **kwargs) class FGTResponseNotFormedCorrect(KeyError): """Used only if a response does not have a standard format as based on FGT response guidelines""" def __init__(self, *args, **kwargs): super(FGTResponseNotFormedCorrect, self).__init__(*args, **kwargs) class FGTConnectionError(ReqConnError): """Wrap requests Connection error so requests is not a dependency outside this module""" def __init__(self, *args, **kwargs): super(FGTConnectionError, self).__init__(*args, **kwargs) class FortiGate(object): def __init__(self, host, user, passwd=None, debug=False, use_ssl=True, verify_ssl=False, timeout=300, disable_request_warnings=False, apikey=None): super(FortiGate, self).__init__() self._host = host self._user = user self._req_id = 0 self._url = None self._session = None self._sid = None self._timeout = timeout self._debug = debug self._use_ssl = use_ssl self._verify_ssl = verify_ssl self._apikeyused = True if passwd is None and apikey is not None else False self._passwd = passwd if passwd is not None else apikey if disable_request_warnings: pass #requests.packages.urllib3.disable_warnings(requests.packages.urllib3.exceptions.InsecureRequestWarning) @property def api_key_used(self): return self._apikeyused @api_key_used.setter def api_key_used(self, val): self._apikeyused = val @property def debug(self): return self._debug @debug.setter def debug(self, val): self._debug = val @property def req_id(self): return self._req_id @req_id.setter def req_id(self, val): self._req_id = val def _update_request_id(self, reqid=0): self.req_id = reqid if reqid != 0 else self.req_id + 1 @property def sid(self): return self._sid @sid.setter def sid(self, val): self._sid = val @property def verify_ssl(self): return self._verify_ssl @verify_ssl.setter def verify_ssl(self, val): self._verify_ssl = val @property def timeout(self): return self._timeout @timeout.setter def timeout(self, val): self._timeout = val @staticmethod def jprint(json_obj): try: return json.dumps(json_obj, indent=2, sort_keys=True) except TypeError as te: return json.dumps({"Type Information": te.message}) def dprint(self, msg, s=None): if self.debug: print(msg) if s is not None: print(self.jprint(s) + "\n") pass def _set_sid(self, response): if self.api_key_used: self.sid = "apikeyusednosidavailable" return for cookie in response.cookies: if cookie.name == "ccsrftoken": csrftoken = cookie.value[1:-1] # print(csrftoken) self._session.headers.update({"X-CSRFTOKEN": csrftoken}) if "APSCOOKIE_" in cookie.name: self.sid = cookie.value def _set_url(self, url, *args): if "logincheck" in url or "logout" in url: self._url = "{proto}://{host}/{url}".format(proto="https" if self._use_ssl else "http", host=self._host, url=url) else: if url[0] == "/": url = url[1:] self._url = "{proto}://{host}/api/v2/{url}".format(proto="https" if self._use_ssl else "http", host=self._host, url=url) if len(args) > 0: self._url = "{url}?".format(url=self._url) try: self._url = self._url + "&".join(args) except: pass def __handle_login_values(self, response): # response first character defines if login was successful # 0 Log in failure. Most likely an incorrect username/password combo. # 1 Successful log in* # 2 Admin is now locked out # 3 Two-factor Authentication is needed** try: if response.status_code == 200: if response.text == "" or response.text[0] == "0": return -1, {"status_code": response.status_code, "message": "Failed Login - Most likely incorrect username/password used"} elif response.text[0] == "1": self._set_sid(response) return 0, {"status_code": response.status_code, "message": "Login Successful"} elif response.text[0] == "2": return -1, {"status_code": response.status_code, "message": "Admin Locked Out"} elif response.text[0] == "3": return -1, {"status_code": response.status_code, "message": "Two-factor Required"} else: return -1, {"status_code": response.status_code, "message": "Unknown Error Occurred"} else: return -1, {"status_code": response.status_code, "message": "Login Failed Status Code {} Returned".format(response.status_code)} except IndexError as err: self.dprint("Index error in response: {err_type} {err}\n\n".format(err_type=type(err), err=err)) raise FGTResponseNotFormedCorrect(err) def __handle_response_login(self, response): login_response = self.__handle_login_values(response) self.dprint("RESPONSE:", login_response[1]) return login_response def __handle_response_logout(self, response): self._sid = None self._req_id = 0 if response.status_code == 200: self.dprint("RESPONSE:", {"status_code": response.status_code, "message": "Logout Successful"}) return 0, {"status_code": response.status_code, "message": "Logout Successful"} else: self.dprint("RESPONSE:", {"status_code": response.status_code, "message": "Logout Failed"}) return -1, {"status_code": response.status_code, "message": "Logout Failed"} def _handle_response(self, response): if "logincheck" in self._url: return self.__handle_response_login(response) elif "logout" in self._url: return self.__handle_response_logout(response) else: try: if "text" in response: if type(response["text"]["results"]) is list: result = response["text"]["results"][0] else: result = response["text"]["results"] self.dprint("RESPONSE:", result) if "http_status" in response: return response["http_status"], result else: return response["status"], result else: if "http_status" in response: self.dprint("RESPONSE:", response) return response["http_status"], response else: self.dprint("RESPONSE:", response) return response["status"], response except IndexError as err: self.dprint("Index error in response: {err_type} {err}\n\n".format(err_type=type(err), err=err)) raise FGTResponseNotFormedCorrect(err) except Exception as e: print("Response parser error: {err_type} {err}".format(err_type=type(e), err=e)) return -1, e def _post_request(self, method, url, params): class InterResponse(object): def __init__(self): self.status_code = 200 self.text = "1" if self.sid is None and "logincheck" not in url: raise FGTValidSessionException(method, params) self._update_request_id() if self.api_key_used: headers = {"content-type": "application/json", "Authorization": "Bearer {apikey}".format(apikey=self._passwd)} self._session.headers.update(headers) else: headers = {"content-type": "application/json"} self._session.headers.update(headers) try: if "logincheck" in self._url: if self.api_key_used: iresponse = InterResponse() return self._handle_response(iresponse) else: method_to_call = getattr(self._session, method) json_request = "username={uname}&secretkey={pword}&ajax=1".format(uname=self._user, pword=self._passwd) self.dprint("{method} REQUEST: {url} for user {uname} with password {passwd} ". format(method=method.upper(), url=self._url, uname=self._user, passwd=self._passwd)) response = method_to_call(self._url, headers=headers, data=json_request, verify=self.verify_ssl, timeout=self.timeout) elif "logout" in self._url: if self.api_key_used: iresponse = InterResponse() return self._handle_response(iresponse) else: self._session.headers = None method_to_call = getattr(self._session, method) self.dprint("{method} REQUEST: {url}".format(method=method.upper(), url=self._url, uname=self._user, passwd=self._passwd)) response = method_to_call(self._url, headers=headers, verify=self.verify_ssl, timeout=self.timeout) else: json_request = {} if params is not None: json_request = params method_to_call = getattr(self._session, method) self.dprint("{method} REQUEST: {url}".format(method=method.upper(), url=self._url), json_request) response = method_to_call(self._url, headers=headers, data=json.dumps(json_request), verify=self.verify_ssl, timeout=self.timeout).json() except ReqConnError as err: self.dprint("Connection error: {err_type} {err}\n\n".format(err_type=type(err), err=err)) raise FGTConnectionError(err) except ValueError as err: self.dprint("Value error: {err_type} {err}\n\n".format(err_type=type(err), err=err)) raise FGTValueError(err) except KeyError as err: self.dprint("Key error in response: {err_type} {err}\n\n".format(err_type=type(err), err=err)) raise FGTResponseNotFormedCorrect(err) except Exception as err: self.dprint("Response parser error: {err_type} {err}".format(err_type=type(err), err=err)) raise FGTBaseException(err) return self._handle_response(response) def login(self): self._session = requests.session() login_response = self.post("logincheck") if login_response[0] == 0: return login_response elif login_response[0] == -1 and login_response[1]["message"] == "Two-factor Required": # todo send a login again after getting the 2FA key #pass return login_response else: self._sid = None return login_response def logout(self): return self.post("logout") def __enter__(self): self.login() return self def __exit__(self): self.logout() def common_datagram_params(self, url, *args, **kwargs): self._set_url(url, *args) params = {} if kwargs: keylist = list(kwargs) for k in keylist: kwargs[k.replace("__", "-")] = kwargs.pop(k) params.update(kwargs) return params def get(self, url, *args, **kwargs): # print(self._session.headers) return self._post_request("get", url, self.common_datagram_params(url, *args, **kwargs)) def post(self, url, *args, **kwargs): return self._post_request("post", url, self.common_datagram_params(url, *args, **kwargs)) def put(self, url, *args, **kwargs): return self._post_request("put", url, self.common_datagram_params(url, *args, **kwargs)) def delete(self, url, *args, **kwargs): return self._post_request("delete", url, self.common_datagram_params(url, *args, **kwargs)) def __str__(self): if self.sid is not None: return "FortiOS instance connnected to {host}.".format(host=self._host) return "FortiOS object with no valid connection to a FortiOS appliance." def __repr__(self): if self.sid is not None: return "{classname}(host={host}, pwd omitted, debug={debug}, use_ssl={use_ssl}, " \ "verify_ssl={verify_ssl}, timeout={timeout})".format(classname=self.__class__.__name__, host=self._host, debug=self._debug, use_ssl=self._use_ssl, timeout=self._timeout, verify_ssl=self._verify_ssl) return "FortiOS object with no valid connection to a FortiOS appliance." class CollectIp(object): def __init__(self, rfile): super(CollectIp, self).__init__() self._rfile = rfile def read_gz_file(self): block_ips = [] # if os.path.exists(self._rfile): reg = re.compile(r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})') fin = gzip.open(self._rfile, 'rb') line = fin.readline('results.csv') while line: item = re.findall(reg, line) if item and len(item) > 0 : block_ips.append(item[0]) line = fin.readline('results.csv') fin.close() else: logger.info("the file [{file}] is not exist!".format(file=self._rfile)) return block_ips class InvokeFortigate(object): def __init__(self, host, gpname, ip_list, forsapi): super(InvokeFortigate, self).__init__() self._host = host self._gpname = gpname self._ip_list = ip_list self._forsapi = forsapi def add_address(self, name, ip): param={ "json" : { 'name' : name, 'subnet' : ip } } url="/cmdb/firewall/address?vdom=root" return self._forsapi.post(url, param) def update_group(self, addr_list): param={ "json" : { "member" : addr_list } } url="/cmdb/firewall/addrgrp/{gpname}?vdom=root".format(gpname=self._gpname) return self._forsapi.post(url, param) def block(self): if self._ip_list: addr_list = [] for ip in self._ip_list: name="block_{ip}".format(ip=ip.replace(".", "_")) ip="{ip}/32".format(ip=ip) # res = self.add_address(name, ip) if res[0] == 200: addr_list.append({'name': name}) else: logger.info('add [{ip}] to address failure, reason: {reason}'.format(ip=ip, reason=res[1])) # # if addr_list: gres = self.update_group(addr_list) if gres[0] == 200: addr_list.append({'name': name}) else: logger.info('update address group [{gpname}] failure, reason: {reason}'.format(gpname=self._gpname, reason=gres[1])) if __name__ == '__main__': # host='192.168.1.196' username='admin' password='123456' addrgrp_name='GRP_001' # logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger('block_ip') # this_file = inspect.getfile(inspect.currentframe()) dirpath = os.path.abspath(os.path.dirname(this_file)) log_path = os.path.join(dirpath,'block_logs') if not os.path.exists(log_path): os.makedirs(log_path) log_file = os.path.join(log_path,'block_ip.log') # handler = logging.handlers.TimedRotatingFileHandler(log_file, "midnight", 1, 30) formatter = logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s') handler.setFormatter(formatter) logger.addHandler(handler) logger.setLevel(logging.INFO) logger.info('begin') # try: #results_file = sys.argv[8] results_file='D:/results.csv.gz' # collectip = CollectIp(results_file) ips = collectip.read_gz_file(); logger.info(ips) # forsapi = FortiGate(host, username, password, use_ssl=False) # logger.info('login [{host}]'.format(host=host)) rs = forsapi.login() if(rs[0] == 0): iforti = InvokeFortigate(host, addrgrp_name, ips, forsapi) iforti.block() # logger.info('logout') forsapi.logout() else: logger.info('login failure') except Exception as err: logger.info("error: {error}".format(error=traceback.format_exc())) logger.info('end')