python使用reuqests实现类似curl命令脚本
python实现类似于curl命令脚本,python的requests模块封装了类似于curl的功能,发送http请求获得返回结果.
curl命令的shell脚本:
python脚本:
curl命令的shell脚本:
#!/bin/bash username="xxxx" apiKey="xxxxx" Nonce=`date '+%Y-%m-%d %H:%M:%S'` LastDay=`date -d 'yesterday' '+%Y-%m-%d %H:%M:%S'` date=`env LANG="en_US.UTF-8" date -u "+%a, %d %b %Y %H:%M:%S GMT"` password=`echo -en "$date" | openssl dgst -sha1 -hmac $apiKey -binary | openssl enc -base64` curl -i --url "https://xxxxxxx" \ -X "POST" \ -u "$username:$password" \ -H "Date: $date" \ -H 'Accept: application/json' \ -H 'Content-Type: application/json' \ -d '{ "customCode":"xxx", "domain":"xxxx", "startdate":"'"$LastDay"'", "enddate":"'"$Nonce"'" }'
python脚本:
#!/usr/bin/env python # coding=utf-8 import time,base64,json import requests import inspect, os, traceback import logging.handlers from hashlib import sha1 import hmac class WafApi(object): def __init__(self, url, username, apiKey): super(WafApi, self).__init__() self._url = url self._username = username self._apiKey = apiKey def query_log(self, param): headers = self.build_headers() response = requests.post(self._url, json.dumps(param), headers=headers) return response def build_headers(self): nowTime = time.strftime("%a, %d %b %Y %H:%M:%S GMT",time.localtime()) sign = hmac.new(apikey,nowTime,sha1).digest() password = base64.b64encode(sign) # headers={ 'Accept':'application/json', 'Content-Type':'application/json; charset=UTF-8', "Date":nowTime, "Authorization": 'Basic {}'.format(base64.b64encode(self._username + ":" + password)) } return headers if __name__ == '__main__': # url='https://xxxxxxxxxxxx' username='xxxxxxx' apikey='xxxxxxx' # logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger('test_api') # this_file = inspect.getfile(inspect.currentframe()) dirpath = os.path.abspath(os.path.dirname(this_file)) log_path = os.path.join(dirpath,'test_logs') if not os.path.exists(log_path): os.makedirs(log_path) log_file = os.path.join(log_path,'test_api.log') # handler = logging.handlers.TimedRotatingFileHandler(log_file, "midnight", 1, 30) formatter = logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s') handler.setFormatter(formatter) logger.addHandler(handler) logger.setLevel(logging.INFO) logger.info('begin') # try: # wafApi = WafApi(url, username, apikey) # ts = time.time() startdate = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(ts - 120)) enddate = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(ts)) logger.info("startdate: {startdate}, enddate: {enddate}".format(startdate=startdate, enddate=enddate)) # param={ "customCode" : "xxxx", "domain" : "xxxxxx", "startdate" : startdate, "enddate" : enddate, "pageSize" : 1000, "pageNum" : 1 } # response = wafApi.query_log(param) if(response.status_code == 200): rs = response.json() data = rs["data"] currentPage = data["currentPage"] pageSize = data["pageSize"] totalCount = data["totalCount"] totalPageCount = data["totalPageCount"] logger.info("currentPage: {currentPage}, pageSize: {pageSize}, totalPageCount: {totalPageCount}, totalCount: {totalCount}".format(currentPage=currentPage, pageSize=pageSize, totalPageCount=totalPageCount, totalCount=totalCount)) # datalist = data['list'] for index in range(len(datalist)): print json.dumps(datalist[index]) # for i in range(1, totalPageCount): curPage = (i+1) param['pageNum'] = curPage subres = wafApi.query_log(param) if(subres.status_code == 200): subrs = subres.json() subdata = subrs["data"] subdatalist = subdata['list'] logger.info("page {pageNum} success.".format(pageNum=curPage)) for sub_index in range(len(subdatalist)): print json.dumps(subdatalist[sub_index]) else: logger.info("page {pageNum} error: {error}".format(pageNum=curPage, error=subres.text)) else: logger.info("first query error: {error}".format(error=response.text)) except Exception as err: logger.info("error: {error}".format(error=traceback.format_exc())) logger.info('end')