栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Python

用Python实现一个毫秒级Timer,间隔执行各种任务

Python 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

用Python实现一个毫秒级Timer,间隔执行各种任务

问题

如果想间隔一定时间发送一些数据,或者处理某一个任务,python里面比较简单的实现方式是sleep,但是简单的sleep会随着任务的持续执行,和最开始的时间偏离越来越大。主要原因有两个

  1. python中sleep执行的时间并不精确,难以做到毫秒级的精度
  2. 任务本身执行的时候会和sleep执行的时间累加

为了解决上面两个问题,设计了下面的代码

实现
from time import sleep, time


class RepeatedTimer(object):
    def __init__(self, interval, function, *args, **kwargs):
        self.interval = interval
        self.function = function
        self.is_running = False
        self.args = args
        self.kwargs = kwargs
        self.start()

    def _run(self):
        self.is_running = False
        self.start()
        self.function(*self.args, **self.kwargs)

    def start(self):
        start = int(time() * 1000)
        i = 0
        if not self.is_running:
            self.is_running = True

            while True:
                if not self.is_running:
                    break

                i += 1
                now = int(time() * 1000)
                sleep_ms = i * self.interval - (now - start)
                if sleep_ms < 0:
                    self.function(*self.args, **self.kwargs)
                else:
                    sleep(sleep_ms / 1000.0)
                    self.function(*self.args, **self.kwargs)

    def stop(self):
        self.is_running = False
使用
def hello(name):
    print(f"[{time()}]t Hello {name}!")
    sleep(0.01)


print("starting...")

rt = RepeatedTimer(100, hello, "world")
try:
    sleep(50)
finally:
    rt.stop()
参考

https://stackoverflow.com/questions/3393612/run-certain-code-every-n-seconds

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/350034.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号