飞道的博客

python接口测试完善

578人阅读  评论(0)

搭建接口测试框架,实现目的
1、监控正式环境接口
2、每次版本发布,跑一遍接口测试
3、钉钉群通知测试结果,发送测试报告到阿里邮箱
4、测试报告使用BeautifulReport,做了二次开发
5、用的unittest框架
6、两套测试用例,一套测试环境的,测试业务逻辑的,一套正式环境的,测试接口状态的,因为业务逻辑测试需要操作数据库,做增删改查,所以测试环境测逻辑,线上环境监控接口状态
1、项目结构

Common文件夹:公用的一些东西,连接数据库、获取token、公共变量和环境域名、请求方法封装
DindDing:发送钉钉的
Email:发送邮件的
report:测试报告存放路径
Test_case:测试用例

2、这个是钉钉通知
每执行一个用例,就打印一下这个用例

这是测试报告


这个邮寄的dat文件不知道是个啥东西,每次都会存在

最终的结果就是:项目发布,跑一遍接口测试,钉钉通知测试结果是否有失败的,邮件查收HTML测试报告分析问题

3、贴一下我写的代码
设置域名和手机号码

"""
Host_Test:测试环境域名
Host_Online:正式环境域名
Mblie:测试环境获取token的手机号,随机获取11位手机号
"""
import random

Host_Test='https://xxxxxxxx'
Host_Online='https://xxxxxxx'
Mblie='1'+'{}{}'.format(random.choice([3,5,7,8]),random.randint(100000000,999999999))

数据库连接

import pymysql
def mysql(sql):
    try:
        conn={
            "host" : '10.10.10.1',
            "user" : 'root',
            "port": 3306,
            "password" : 'xxxxx',
            "database" : 'xxxx',
            "charset" : 'utf8mb4'
        }
        db=pymysql.connect(**conn)
    except Exception as e:
        return '数据库连接失败:'+'\n'+'{}'.format(e)
    cur = db.cursor()
    cur.execute(sql)
    db.commit()
    cur.close()
    db.close()
    return cur.fetchall()

获取token

from zhiya.Common import Request_Method
from zhiya.Common import Common
from zhiya.Common import Database_Connect
import random
#登录获取用户token并调用昵称接口
def get_token():
    login_api='xxx/vtr/verify'
    complete_api='xxx/v/complete'
    body={
        "mobile":Common.Mblie,
        "code":"000000"
    }
    # print(Common.Mblie)
    r=Request_Method.req_method('post', login_api, data=body)
    if r.status_code==200 and r.json()['code']==0:
        if r.json()['data']['info']['authorization']:
            # print(body['mobile'])
            sql_nick='SELECT nickname from mb_user WHERE mobile={}'.format(Common.Mblie)
            if not Database_Connect.mysql(sql_nick)[0][0]:
                authorization=r.json()['data']['info']['authorization']
                header={
                    "authorization": authorization
                }
                body_nick={
                    "nickname": "测试{}".format(random.randint(10000,99999))
                }
                Request_Method.req_method('post',complete_api,header=header,data=body_nick)
            return ''.join(r.json()['data']['info']['authorization'])
        return False
    return False

请求方法封装

在这里插入代码片
from zhiya.Common import Common
import urllib3
import requests
import json
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)#去掉警告信息
#固定请求头
def req_header():
    header = {
        "platform": "android",
        "appversion": "1.8.4",
        "systemversion": "10",
        "imei": "Func",
        "phonemodel": "Vivo NEX",
        "Content-Type": "application/json"
    }
    return header
#四种请求方法
def req_method(method,api,header=None,data=None):
    """
    :param method: 传入请求方法:get、post、put、delete,必传
    :param api: 传入请求的接口地址,不包含域名,必传
    :param header: 传入请求头,和固定头拼接,可传可不传
    :param data: 请求参数,可传可不传
    :return:
    """
    url = Common.Host_Test + api
    if header is None:
        headers = req_header()
    else:
        headers = dict(req_header(), **header)
    if method=='post':
        return requests.post(url, headers=headers, data=json.dumps(data))
    elif method=='get':
        if data is None:
            return requests.get(url, headers=headers)
        return requests.get(url, headers=headers, data=json.dumps(data))
    elif method=='put':
        if data is None:
            return requests.put(url, headers=headers)
        return requests.put(url, headers=headers, data=json.dumps(data))
    elif method=='delete':
        if data is None:
            return requests.delete(url, headers=headers)
        return requests.delete(url, headers=headers, data=json.dumps(data))
    else:
        raise '传递的请求方法有误:{}'.format(method)

发送钉钉模板

在这里插入代码片
from BeautifulReport.BeautifulReport import result
from zhiya.Common import Common
import requests
import json

class Ding_talk:
    def __init__(self):
        self.url="https://oapi.dingtalk.com/robot/send?access_token=fa62462f4979634c184601926646183cf71fa71bc710843ff4286fc6"
        self.header={
            "Content-Type": "application/json"
        }
    def send_ding(self):
        if result[2]>0:
            data_fail = {
                "msgtype": "text",
                "text": {
                    "content": "{}个接口用例执行完成,{}个接口有异常,详情请查阅邮箱附件!测试手机号:{}".format(result[0], result[2], Common.Mblie)

                },
                "at": {
                    "isAtAll": True
                }
            }
            requests.post(self.url,headers=self.header,data=json.dumps(data_fail))
        else:
            data_pass = {
                "msgtype": "text",
                "text": {
                    "content": "{}个接口用例执行完成,测试结果正常!测试手机号:{}".format(result[0], Common.Mblie)

                },
                "at": {
                    "isAtAll": True
                }
            }
            requests.post(self.url,headers=self.header,data=json.dumps(data_pass))
        print('已发送钉钉通知')

发送邮件

在这里插入代码片
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
import smtplib
import time
# from email.mime.multipart import MIMEMultipart
#读取测试结果文件
path='D:\\App_AutoTest\zhiya\\report\\Test_api_report.html'
def read_file():
    f=open(path,'rb')
    fp=f.read()
    f.close()
    return fp
#发送邮件
def send_mail():
    smtpserver = 'smtp.mxhichina.com'  # 邮件服务器
    sender = 'fufff@xiaoaaah.com'  # 发件人邮箱
    pwd = 'xxxxxx'
    receiver = ['futti@xiauuuuech.com']  # 接收人邮箱
    subject = u'我的app接口测试报告'  # 邮件标题
    # 非附件邮件构造
    # msg = MIMEText(read_file(), 'plain', 'utf-8')
    # msg['From'] = sender
    # msg['To'] = ','.join(receiver)
    # msg['Subject'] = subject

    # 附件邮件构造
    msg=MIMEMultipart()
    msg.attach(MIMEText('邮件正文',_subtype='txt',_charset='utf-8'))
    msg['From']=sender
    msg['To']=','.join(receiver)
    msg['Subject']=subject
    att=MIMEText(read_file(),'base64', 'utf-8')
    att["Content-Type"] = 'application/octet-stream'
    att.add_header("Content-Disposition", "attachment", filename=path)
    msg.attach(att)

    # 发送邮件
    # smtp = smtplib.SMTP()
    # smtp.connect(smtpserver)  # 邮箱服务器
    smtp=smtplib.SMTP(smtpserver)
    smtp.login(sender, pwd)  # 登录邮箱
    smtp.sendmail(sender, receiver, msg.as_string())  # 发送者和接收者
    smtp.quit()
    print("测试报告邮件已发出!注意查收。"+str(time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))))

执行测试用例

在这里插入代码片
from zhiya.DingDing import DingDing
from zhiya.Email import Send_Mail
from BeautifulReport import BeautifulReport
from zhiya.Common import Get_Token
from zhiya.Common import Database_Connect
from zhiya.Test_case import (Test_Address_Manage, Test_Ly_Letter, Test_Teacher_List, Test_Book_Shelf_List,
                             Test_User_Integral,Test_New_Media)
import unittest
import time
import sys
if __name__ == '__main__':
    sys.stderr.write('准备执行接口测试...    {}    测试手机号:{}'.format(time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time())), Get_Token.Common.Mblie) + '\n')
    print('\n')
    cases=[
        Test_Ly_Letter.get_liu_letter_case(),
        Test_Teacher_List.get_teacher_list_case(),
        Test_Address_Manage.get_addr_case(),
        Test_Book_Shelf_List.get_book_shelf(),
        Test_User_Integral.get_integrals_case(),
        Test_New_Media.get_new_media_case()
    ]
    suit = unittest.TestSuite()
    if Get_Token.get_token() is not False:
        if isinstance(Database_Connect.sql_conn(), Database_Connect.pymysql.connections.Connection):
            for case in cases:
                suit.addTests(case)
            run = BeautifulReport(suit)
            run.report(filename='Test_api_report', description='APP接口测试报告',
                       report_dir='D:\\App_AutoTest\\zhiya\\report')
            Send_Mail.send_mail()  # 发送测试报告邮件
            print('\n')
            DingDing.Ding_talk().send_ding()  # 发送钉钉机器人通知
        else:
            print(u'测试未执行\n\n', Database_Connect.sql_conn())
    else:
        print(u'登录接口获取token失败,自动化测试未运行')


这是我的测试用例


unittest按指定顺序执行用例,在每个测试文件把测试用例按顺序先组装一下,组装成unittest需要的格式,再返回一个数组,然后再Run下面调用每个返回的list_case

在这里插入代码片
def get_new_media_case():
    list_case =[
        Test_New_Media('test_media_collection'),
        Test_New_Media('test_media_collection'),
        Test_New_Media('test_media_like'),
        Test_New_Media('test_media_like'),
        Test_New_Media('test_media_comment'),
        Test_New_Media('test_media_recommends'),
        Test_New_Media('test_media_comment_reply'),
        Test_New_Media('test_activity'),
        Test_New_Media('test_card_get_integral'),
        Test_New_Media('test_tomorrow_like_integral'),
        Test_New_Media('test_tomorrow_comment_integral'),
        Test_New_Media('test_tomorrow_comment_reply'),
        Test_New_Media('test_tomorrow_collection_integral'),
        Test_New_Media('test_tomorrow_activity_integral'),
        Test_New_Media('test_concept'),
        Test_New_Media('test_concepts_list'),
        Test_New_Media('test_likes_list'),
        Test_New_Media('test_collections_list')
    ]
    return list_case


贴一点我的接口用例

from zhiya.Common import Get_Token
from zhiya.Common import Request_Method
from zhiya.Common import Database_Connect
from zhiya.Common import Common
from zhiya.Common import Set_Member
import unittest
import random
import requests
import time
import datetime
class Test_New_Media(unittest.TestCase):
    @classmethod
    def setUpClass(cls):
        cls.header = {
            "authorization": Get_Token.get_token()
        }
        cls.media_collection='/xxx/user_media_collectytioon'#媒体收藏或取消收藏
        cls.media_comment='/xxx/user_media_comgjymenot'#媒体评论
        cls.media_comment_reply='/xxx/user_media_comurgmentg'#媒体评论回复
        cls.media_like='/xxx/user_medyuia_lioke'#点赞或取消点赞
        cls.media_recommends='/xxx/media_relhgucommends'#媒体点赞收藏评论列表
        cls.activity='/xxx/activity'#用户投票
        cls.user_integral='/xxx/user_integrtral'#用户积分
        cls.integral_concept='/xxx/user_inteuigral_conocoept'#兑换概念图
        cls.concepts_list='/xxx/user_conceputs'#概念图列表
        cls.likes_list='/vxxx/user_likes'#点赞列表
        cls.collections_list='/xxx/user_collectiyuions'#收藏列表
        cls.book='/xxx/boortsaook'#书籍详情
        cls.login='https://xxxxxxxxxx/vehhjgrifpy'#登录接口
        ############################################################
        ############################################################
        cls.type=29#通识百词
        cls.course_id=35#代表通识百词下的课程:接口测试勿动
        cls.media_id=random.choice([51,52,53])#课程下的三个视频,随机取一个
        cls.book_id = 'GljdgA2XBv6o'#用于概念图接口和兑换概念图接口
        cls.product_id = 'GljdgA2XBv6o'#用于概念图接口和兑换概念图接口
        #根据手机号查到的user_id
        cls.user_id=Database_Connect.mysql('SELECT user_id FROM mb_user WHERE mobile={}'.format(Common.Mblie))[0][0]
        sc=Request_Method.req_method('get',cls.user_integral,header=cls.header)
        cls.score=sc.json()['data']['integral_num']#初始化获取用户积分
        today = datetime.date.today()  # 今天日期
        tomorrow = today + datetime.timedelta(days=1)  # 明天日期,2020-07-18格式
        tomorrow_date = str(tomorrow).replace('-', '')  # 明天日期,20200718格式,用于校验库里数据
        cls.tomorrow_start_time = int(time.mktime(time.strptime(str(tomorrow), '%Y-%m-%d')))  # 明天开始的时间戳,用于明天获取点赞积分
        #查询库里第二天剩余获取积分次数
        cls.sql_times='SELECT times FROM mb_media_integral_record WHERE user_id={} AND date={}'.format(cls.user_id,tomorrow_date)
        cls.me=Set_Member.Set_Member()
     def test_tomorrow_like_integral(self):
        """媒体重构:第二天非会员点赞获取积分"""
        #1、设置用户为非会员
        #2、接口传入第二天的时间戳
        #3、校验库里剩余次数、校验接口返回数据,校验我的积分接口
        self.me.set_no_member()#设置为非会员
        body = {
            "course_id": self.course_id,
            "type": self.type,
            "media_id": 54,
            "is_like": 1,
            "time":self.tomorrow_start_time
        }
        res_bef = Request_Method.req_method('get', self.user_integral, header=self.header)
        score=res_bef.json()['data']['integral_num']#第二天点赞前的用户积分
        times=Database_Connect.mysql(self.sql_times)#点赞前库里的次数记录应该没有
        r = Request_Method.req_method('post', self.media_like, header=self.header, data=body)#点赞接口
        self.assertEqual(200,r.status_code)#验证点赞积分接口状态
        self.assertEqual(r.json()['data']['status'],1,'第二天点赞获接口获取积分失败')#验证第二天点赞加快返回数据正确
        if len(times)>0:
            self.assertEqual(times[0][0] - 1, Database_Connect.mysql(self.sql_times)[0][0])
        else:
            self.assertTrue(len(Database_Connect.mysql(self.sql_times))>0, '第二天点赞接口返回正确,库里次数记录表里没插入数据')
            self.assertEqual(Database_Connect.mysql(self.sql_times)[0][0], 19)
        res_aft=Request_Method.req_method('get', self.user_integral, header=self.header)
        # 第二天点赞后的用户积分
        self.assertEqual(score+2,res_aft.json()['data']['integral_num'],'第二天点赞后的积分不正确,我的积分接口数据有误')
     def test_teacher_article_details(self):
        """名师社区:文萃精编详情"""
        #1、通过接口获取所有名师的id
        #2、根据老师id判断哪个有文萃并获取第一个文萃详情
        #3、校验接口返回的文萃详情,校验文萃id和title
        r_lst = Request_Method.req_method('get', self.teacher_list, header=self.header)
        teacher_id = []#名师id集合
        for i in r_lst.json()['data']:
            teacher_id.append(i['teacher_id'])
        art_id=None#文萃精编id
        art_title=None#文萃精编title
        for i in teacher_id:#遍历老师id,判断哪个文萃精编,并取出第一个文萃id和title
            rd = random.randint(0, len(teacher_id) - 1)  # 随机数
            body = {
                "teacher_id": "{}".format(teacher_id[rd]),#随机老师id
                "page": 1
            }
            #根据名师id请求文萃精编列表并取出第一个文萃id和title
            r_art = Request_Method.req_method('get', self.teacher_article_list, header=self.header, data=body)
            if r_art.json()['data']:
                art_id = r_art.json()['data'][0]['list'][0]['article_id']  # 列表获取的文萃精编id
                art_title = r_art.json()['data'][0]['list'][0]['article_title']  # 列表获取的文萃精编title
                break
        #文萃精编详情接口获取id和title
        r=Request_Method.req_method('get', self.teacher_article_details + '/{}'.format(art_id), header=self.header, data=body)
        self.assertEqual(r.status_code,200)
        self.assertEqual(art_id,r.json()['data']['article_id'])#
        self.assertEqual(art_title,r.json()['data']['article_title'])

就写到这里啦!主要是活干完了,看了下离下班还有一个小时,就马马虎虎写点,想到哪写到哪,没有什么顺序,好啦,下班,话说有没有野王带飞啊!!!!!!


转载:https://blog.csdn.net/qq_25126659/article/details/107820702
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场