栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > C/C++/C#

C++实现毫秒级精度定时器

C/C++/C# 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

C++实现毫秒级精度定时器

定时器的实现一般需要借助系统提供的超时相关接口,比如select、 条件变量、或者sleep, usleep等,sleep,usleep提供的睡眠功能太有限,无法中途唤醒,这就导致他们其实不适合做定时器中的定时方法。而select和条件变量都可以设置定时时长,而且在中途可以唤醒,精度也很高,因此,可以采用select或条件变量作为定时方法,本文讨论使用条件变量如何实现定时任务。

条件变量可以设置超时时间,如果条件变量在没有接收到来自其他线程的条件通知时,将一直阻塞,直到超时。通过这个特性可以按照定时器需要等待的时间让任务线程挂起,直到超时,说明有定时任务可以处理。这里需要有一个专门的线程用来等待超时事件,在这个线程里面每次循环开始,检查超时任务需要等待的时间,将线程通过条件变量的wait_for方法阻塞,当超时时,将超时时间到来的事件进行处理,或者中途如果有新的事件也可以通过notify_one方法唤醒正在阻塞的条件变量,提前结束阻塞,此时可能并没有超时事件需要执行,但是可以将新的定时任务添加到超时等待队列中。

在超时等待任务线程中会处理两种事件,一个是超时任务,一个是添加新的定时任务。

while(1){
    // 计算下一次等待时间
    // wait_for(ms) 开始等待
    // 1. 处理超时任务
    // 2. 添加超时任务
};

采用时间轮方式管理定时任务。时间轮类似于一个时钟,时钟的指针每个刻度表示一个时间间隔。本次设计的定时器精度为1ms,那么就确定时间轮每个刻度为1ms。定时任务可以比喻为日程,分布在未来的某个时间点上。时间轮指针每毫秒转动一个刻度,类似于时钟的指针转动。比如在某个时间T1添加了一个5ms的定时任务task1,那么将此任务放置到从T1刻度开始第5个刻度的位置,当时间轮指针转动到第5个刻度位置时,说明定时任务task1超时时间到,此时就可以执行task1,这就是时间轮定时器的原理。从数据结构上看,时间轮类似于一个环形的哈希表,每个哈希桶里面是一个链表,链表里面的定时任务按照超时时间从小到大排列,当时间轮指针转动到某个时间点时,从链表头开始检查时间是否超时,如果超时则执行任务,直到遇到未超时任务,结束检查。

上图表示时间轮的数据结构和不同超时时间的任务被存放的位置,通过这种数据结构,可以使得超时任务的查找效率达到O(1)。当需要插入一个定时任务到时间轮数据结构时,将当前时间+超时时间得到未来时间,对未来时间进行哈希,将任务定位到某个哈希桶里面,再按照从小到大的顺序插入到链表中,插入的时间复杂度依赖于哈希冲突情况,当某一个哈希桶中存在很多任务时,插入效率将会是O(n),这里可以进一步通过红黑树进行优化,提高插入效率。

时间轮定时器可以每个时间刻度检查一次,这种方式,定时器需要经常被唤醒,如果时间刻度为1ms,那么定时器每1ms都会检查一次超时事件,这种方式使得定时器的检查频率很高,存在某些时刻可能并没有超时事件需要处理,但仍然被唤醒的情况。可以采用每次计算下一次超时事件的到来时间,时间轮不需要每ms都去检查,而是根据下一次定时任务的超时时间,这样能够减少无用的唤醒和时间检查,减少无畏的cpu消耗。

时间的获取采用c++11提供的std::chrono::stead_clock::now,stead_clock获取到的是一个单调递增的时间,不会受系统时间的影响。

定时任务是用户传入的,定时任务的运行在定时器线程中,如果用户任务存在耗时操作,那么将影响定时器的精度,因此,定时器中通过线程池的方式将用户的定时任务放到线程池中执行,保证了定时器的精度。

通过测试,该实现可以达到误差1ms

hello world dt:14, timeout:15, count:40001
hello world dt:50, timeout:50, count:40002
hello world dt:37, timeout:37, count:40003
hello world dt:62, timeout:63, count:40004
hello world dt:14, timeout:15, count:40005
hello world dt:2, timeout:2, count:40006
hello world dt:41, timeout:42, count:40007
hello world dt:87, timeout:88, count:40008
hello world dt:50, timeout:51, count:40009
hello world dt:33, timeout:34, count:40010
hello world dt:54, timeout:55, count:40011
hello world dt:115, timeout:116, count:40012
hello world dt:1, timeout:2, count:40013
hello world dt:81, timeout:82, count:40014
hello world dt:110, timeout:111, count:40015

代码如下:

#ifndef __TIMER_H__
#define __TIMER_H__
#include 
#include 
#include 
#include 
#include 
#include 
#include "ThreadPool.h"

struct TimedTask;
namespace timer{
class Timer
{
public:
    typedef std::function Task;
    static const int WHELL_LEN = 1024;
    static const int INDEX_MASK = WHELL_LEN - 1;
public:
    Timer();
    ~Timer();
public:
    void AddTask(const Task& task, int msec);

private:
    void ThreadRoutine();
    int GetNextWaitMs();
    void Tick(long long tickCount);
    void AddNewTask(const std::vector& taskList);
 
private:
    bool _stop;
    long long _tickCount;
    std::mutex _mutex;
    std::thread _thread;
    std::condition_variable _cond;
    threadpool::ThreadPool _threadPool;
    std::vector _inputTaskList;
    std::list _timedTask[WHELL_LEN];
    std::chrono::time_point _startTime;
};

}   // namespace timer

#endif // __TIMER_H__
#include 
#include "Timer.h"

using namespace timer;

struct TimedTask
{
    TimedTask(const Timer::Task& task, int timeout);
    Timer::Task task;
    int timeout;
    long long deadline;
};

TimedTask::TimedTask(const Timer::Task& task, int timeout)
    : task(task)
    , timeout(timeout)
    , deadline(0)
{

}

Timer::Timer()
    : _stop(false) 
    , _tickCount(0)
{
    _startTime = std::chrono::steady_clock::now();
    _thread = std::thread(&Timer::ThreadRoutine, this);
}

Timer::~Timer()
{
    {
        std::lock_guard lock(_mutex);
        _stop = true; 
    }
    _cond.notify_one();
    _thread.join();
}

void Timer::AddTask(const Task& task, int msec)
{
    if(msec <= 0){
        _threadPool.AddTask(task);
        return;
    }else{
        std::lock_guard lock(_mutex);
        _inputTaskList.push_back(new TimedTask(task, msec));
    }
    _cond.notify_one();
}

void Timer::Tick(long long tickCount)
{
    const int index = tickCount & INDEX_MASK;
    auto& taskList = _timedTask[index];
    for(auto itr = taskList.begin(); itr != taskList.end();){
        auto task = *itr;
        if(tickCount >= task->deadline){
            _threadPool.AddTask(task->task);
            itr = taskList.erase(itr);
            delete task;
            continue;
        }
        break;
    }
}

void Timer::ThreadRoutine()
{
    while(!_stop){
        long long lastMillSec = _tickCount;
        int waitMillSec = GetNextWaitMs();
        assert(waitMillSec);
        std::vector taskList;
        std::cv_status status = std::cv_status::no_timeout;
        {
            std::unique_lock lock(_mutex);
            while(_inputTaskList.size() == 0 && status == std::cv_status::no_timeout && !_stop){
                if(waitMillSec > 0){
                    status = _cond.wait_for(lock, std::chrono::milliseconds(waitMillSec));
                }else{
                    _cond.wait(lock);
                }
            }
            if(_stop){
                return;
            }
            if(_inputTaskList.size() > 0){
                taskList.swap(_inputTaskList);
            }
        }
        _tickCount = std::chrono::duration_cast(std::chrono::steady_clock::now() - _startTime).count(); 
        long long dt = _tickCount - lastMillSec;
        if(dt >= waitMillSec && waitMillSec != -1){
            for(int i = 0; i <= dt - waitMillSec; ++i){
                Tick(lastMillSec + waitMillSec + i);
            }
        }

        if(taskList.size() > 0){
            AddNewTask(taskList);
        }
    }
}

int Timer::GetNextWaitMs()
{
    int minMs = -1;
    bool findNext = false;
    const int startIndex = _tickCount & INDEX_MASK;
    for(int i = 1; i <= WHELL_LEN; ++i){
        const int index = (startIndex + i) & INDEX_MASK;
        auto& tasks = _timedTask[index];
        if(tasks.size() > 0){
            long long dt = tasks.front()->deadline - _tickCount;
            if(findNext){
                if(dt < minMs){
                    minMs = dt;
                }
            }else{
                assert(dt > 0);
                minMs = dt;
                findNext = true;
            }
            if(minMs < WHELL_LEN){
                break;
            }
        }
    }
    assert(minMs != 0);

    return minMs;
}

void Timer::AddNewTask(const std::vector& taskList)
{
    for(auto& task : taskList){
        assert(task->timeout > 0);
        task->deadline = _tickCount + task->timeout;
        const int index = task->deadline & INDEX_MASK;
        auto& bucket = _timedTask[index];
        if(bucket.empty() || task->deadline > bucket.back()->deadline){
            bucket.push_back(task);
        }else{
            for(auto itr = bucket.begin(); itr != bucket.end(); ++itr){
                auto& t = *itr;
                if(task->deadline <= t->deadline){
                    bucket.insert(itr, task);
                    break;
                }
            }
        }
    }
}

ThreadPool的实现:链接

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/857333.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号