jemter的线程组可以设置调度器的持续时间,这样如压测1分钟,半小时,一个小时就非常方便
但我们想要设定locust持续运行时间,web-UI 页面是不支持的。解决办法有2个,主要讲第二个:
1、通过命令行的方式来启动,我们可以配置locust启动配置参数,参考 https://blog.csdn.net/weixin_45805339/article/details/121167157 https://blog.csdn.net/arron_12/article/details/130086390
核心配置如下,然后通过命令行 后台运行,时间到了停止。
locustfile = LocustScripts/taskCenter/api_xxx_query.py #功能脚本文件路径
logfile = Logs/infoLog.log # 日志文件路径
loglevel = debug # 日志等级
web-port = 8089 #web页面端口,设置默认时间可忽略该端口
host = https://xxxtest.com #待测服务器地址
headless = true # # 禁用web界面,并立即开始测试。使用-u和-t来控制用户数量和运行时间
user = 1 # 并发用户的峰值数
spawn-rate = 1 # 生成用户的速率(用户每秒)。主要与-headless或-autostart一起使用
run-time = 10s # locust运行时间 (300s、20m、3h、1h30m)2、那么我们想在web-UI上点,又不想整配置,还想设置持续时间,怎么办?(我喜欢)
整个思路:首先需要一个计时器,简单点,读秒的那种
# 秒表计时,到时间返回t1
def time_meter(sec):
    t1=0
    while True:
        startTime=time.time()
        print(startTime)
        print('开始')
        while True:
            t1=round(time.time()-startTime,0)
            print('计时:' + str(t1) + '秒')
            time.sleep(1)
            if t1==sec:
                print('我要整点啥')
                break
        break
    return t1
print((time_meter(20)))几行拙劣的代码搞定。
运行结果:
# 秒表计时,到时间返回t1
def time_meter(sec):
    t1=0
    while True:
        startTime=time.time()
        print(startTime)
        print('开始')
        while True:
            t1=round(time.time()-startTime,0)
            print('计时:' + str(t1) + '秒')
            time.sleep(1)
            if t1==sec:
                res = requests.get('http://localhost:8089/stop')
                break
        break
    return t1
print((time_meter(5)))然后整点啥呢?到时间停止运行locust,查看api文档,get http://localhost:8089/stop 就能停止
最后为了简单的操作,当然是让我们的老朋友tkinter来包装下
class locust_Program(Frame):
    def __init__(self,master=None):
        super().__init__(master)
        self.master=master
        self.pack()
        self.createWidget()
    def createWidget(self):
        #标签:locust压测时间
        self.lable_title=Label(self,text='locust压测时间',bg="light gray",fg='black',anchor='w',width=40,font=('宋体',10,'bold'))
        self.lable_title.grid(row=0,column=0,sticky="w",columnspan=4,padx=5,pady=5)
        self.lable_stamp = Label(self, text='设置压测时间/秒')
        self.lable_stamp.grid(row=3, column=0)
        self.lable_res = Label(self, text='运行结果')
        self.lable_res.grid(row=3, column=3)
        self.inputValue=StringVar()
        self.entry_res=Entry(self,textvariable=self.inputValue,width=10)
        self.entry_res.grid(row=4, column=0,padx=5,pady=5)
        self.btn01 = Button(self, text='开始', bg='green',fg='white',width=10,command=self.beginTime)
        self.btn01.grid(row=4, column=1)
        self.result_time=Text(self,width=15,height=2)
        # self.result_time.insert(2.3, f'请稍等{self.inputValue.get()}秒')
        self.result_time.grid(row=4, column=3,padx=5,pady=5)
    def beginTime(self):
        result=0
        try:
            sec=self.inputValue.get()
            result = self.time_meter(sec)
        except:
            messagebox.showerror(title='错误',message='请输入数字!')
        self.result_time.delete(1.0, END)
        self.result_time.insert(INSERT,'压测完成,已持续压测'+str(result)+'秒')
    # 计时器
    def time_meter(self,second):
        t1=0
        while True:
            startTime=time.time()
            print(startTime)
            print('开始')
            second=float(second)
            while True:
                t1=round(time.time()-startTime,0)
                print('计时:' + str(t1) + '秒')
                time.sleep(1)
                try:
                    if t1==second:
                        res = requests.get('http://localhost:8089/stop')
                        break
                except Exception as error:
                    messagebox.showerror(title='错误', message=error)
                    break
            break
        return t1
root=Tk()
root.title('locust压测小程序')
root.geometry('350x150+500+150')
app = locust_Program(master=root)
root.mainloop()运行小程序

开始使用,设置压测时间(手点的慢,可以多设置1秒,老夫十年功能手速点点点,无视那几毫秒),点击开始。设置locust并发数,启动用户数,Start swarming



20秒后停止压测,完成。











