栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > C/C++/C#

通用C++线程池

C/C++/C# 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

通用C++线程池

一、背景意义       

        项目中可能需要处理一些比较繁琐的事,而这些事正好可以使用线程来解决,如tv上经常会去dump  raw data,若直接在解码线程中dump raw data会导致每一帧的处理时间变长,如果大幅超出33ms,就会出现丢帧情况,而对于h264格式数据来说,丢帧会导致马赛克产生。下面列出线程池的一种实现(部分功能可以再完善下,比如添加job的优先级,然后根据该优先级顺序来处理job......),此文件导入到项目中就可以使用。

二、具体实现

ThreadCore.h


#ifndef ANDROID_THREADCORE_H
#define ANDROID_THREADCORE_H

#include 
#include 
#include 
#include 
#include "utils/Thread.h"
#include "utils/Mutex.h"
#include "utils/Timers.h"
#include "base.h"

namespace android {

#define THRAED_MAX 2
#define JOB_ENTRY_MAX_SIZE 8

// Opaque handle to a job
typedef uint64_t JobHandle;

// Function pointer type for a job function
typedef void (*JobFunc)(void* pArg);

// Function pointer type for a job callback function
typedef void (*JobCb)(void* pArg);

// Maximum number of job list
static size_t MaxJobCount = 20;

// Relative priority of a job submitted
enum struct JobPriority
{
    Critical = 0,   // Critical priority job
    High     = 1,   // High priority
    Normal   = 2,   // Normal priority
    Invalid  = 3    // Invalid priority
};

// Describes the functions to be executed and their priorities
struct JobEntry {
    JobFunc pFuncAddr;
    JobPriority priority;
    JobHandle handle;
    JobCb pCb;
    bool used = false;
};

// Describe the job and their data
struct JobData {
    JobHandle handle;
    void* pData;
};

// Describe the job func to be performed immediately and their data
struct RuntimeJob {
    JobFunc pFuncAddr;
    JobCb pCb;
    void* pData;
};

// JobManager provider
class JobManager : public virtual Refbase {
  public:
    JobManager(){};
    ~JobManager(){};
    bool registerNewJob(JobFunc jobFuncAddr, JobCb jobCbAddr, JobPriority priority, JobHandle* handle);
    void releaseRuntimeJob(RuntimeJob* job);
    JobEntry* getJobByHandle(JobHandle jobHandle);
    JobEntry* getAllJob(){return mJobSlot;};
    JobHandle packJobHandle(uint32_t idx);
    uint32_t getSlotByHandle(JobHandle handle);
    bool isJobRegistered(JobHandle handle);
    size_t getJobCount();
    bool checkJobsonHold();
    bool enqueueJob(JobData jobData);
    RuntimeJob* dequeueJob();
    void flushJob();

  private:
    mutable std::mutex mJobLock;
    std::list mJobHandleList;
    JobEntry mJobSlot[JOB_ENTRY_MAX_SIZE];
};

// ThreadManager provider
class ThreadManager : public virtual Refbase {
  public:
    ThreadManager();
    ~ThreadManager();

    void initialize(std::string name);
    bool registerNewJob(JobFunc jobFuncAddr, JobCb jobCbAddr, JobPriority priority, JobHandle* handle);
    bool isRunning(){ return mRunning;};
    bool postJob(JobData jobData);
    size_t getJobCount();
    status_t startThreads();
    void stopThreads();
    void flush();
    void doWork(pid_t tid, std::string threadName);


    class ThreadFunc : public Thread {
      public:
        ThreadFunc(wp parent, char name[]):mParent(parent),mName(name){};
        ~ThreadFunc();
        virtual bool threadLoop() override;
        std::string getThreadName(){return mName;};
        void stop();
      private:
		std::string mName;
        const wp mParent;
    };

  private:
    sp mThreads[THRAED_MAX];
    mutable std::mutex mWorkMutex;
    std::condition_variable mWorkCond;
    JobManager mJobManager;
    std::string mName;
    bool mPending = false;
    bool mRunning = true;
};

}

#endif

ThreadCore.cpp


#include "ThreadCore.h"

#define LOG_NDEBUG 0
//#define LOG_NNDEBUG 0

#define LOG_TAG "ThreadCore"
//#define ALOGV(...) ALOGI(__VA_ARGS__)

namespace android {

bool JobManager::registerNewJob(JobFunc jobFuncAddr, JobCb jobCbAddr,
    JobPriority priority, JobHandle* handle) {
    uint32_t idx;
    for (idx = 0;idx < JOB_ENTRY_MAX_SIZE;idx++) {
        if (!mJobSlot[idx].used) {
            *handle = packJobHandle(idx);
            mJobSlot[idx].pFuncAddr = jobFuncAddr;
            mJobSlot[idx].priority = priority;
            mJobSlot[idx].handle = *handle;
            mJobSlot[idx].pCb= jobCbAddr;
            mJobSlot[idx].used = true;
            break;
        }
    }

    if (idx >= JOB_ENTRY_MAX_SIZE) {
        ALOGE("%s: register job failed due to the number of job exceeds %d",
            __FUNCTION__, JOB_ENTRY_MAX_SIZE);
        return false;
    }

    return true;
}

JobHandle JobManager::packJobHandle(uint32_t idx) {
    JobHandle handle = 1 << 32;
    handle |= idx + 1;
    return handle;
}

JobEntry* JobManager::getJobByHandle(JobHandle handle) {
    uint32_t idx = (handle & 0xF) - 1;
    if (idx <= JOB_ENTRY_MAX_SIZE) {
        return &mJobSlot[idx];
    }
    else {
        return NULL;
    }
}

// This method needs to be implemented
bool JobManager::isJobRegistered(JobHandle handle) {
    return false;
}

bool JobManager::checkJobsonHold() {
    std::lock_guard lock(mJobLock);
    return !mJobHandleList.empty();
}

bool JobManager::enqueueJob(JobData jobData) {
    std::lock_guard lock(mJobLock);
    if (mJobHandleList.size() < MaxJobCount) {
        mJobHandleList.push_back(jobData);
        return true;
    }
    return false;
}

RuntimeJob* JobManager::dequeueJob() {;
    std::unique_lock lock(mJobLock);
    JobData jobData;
    if (!mJobHandleList.empty()) {
        jobData = mJobHandleList.front();
        mJobHandleList.pop_front();

        JobEntry* jobEntry = getJobByHandle(jobData.handle);
        if (NULL != jobEntry) {
            RuntimeJob* currentJob = (RuntimeJob*)malloc(sizeof(RuntimeJob));
            currentJob->pFuncAddr = jobEntry->pFuncAddr;
            currentJob->pCb = jobEntry->pCb;
            currentJob->pData = jobData.pData;
            return currentJob;
        }
    }
    lock.unlock();
    return NULL;
}

// This function may have asynchronous security problems
// For the sake of program efficiency, it can be ignored
size_t JobManager::getJobCount() {
    return mJobHandleList.size();
}

void JobManager::flushJob() {
    ALOGV("+[%s]", __FUNCTION__);
    std::lock_guard lock(mJobLock);
    for (auto iter = mJobHandleList.begin();iter != mJobHandleList.end();) {
        JobEntry* jobEntry = getJobByHandle(iter->handle);
        if (NULL != jobEntry->pCb) {
            jobEntry->pCb(iter->pData);
        }
        free(iter->pData);
        iter = mJobHandleList.erase(iter);
    }
    ALOGV("-[%s]", __FUNCTION__);
}

void JobManager::releaseRuntimeJob(RuntimeJob* job) {
    if (NULL != job) {
        if (NULL != job->pData) {
            free(job->pData);
			job->pData = NULL;
        }
        free(job);
    }
}

ThreadManager::ThreadManager() {
    ALOGV("+[%s]", __FUNCTION__);
    ALOGV("-[%s]", __FUNCTION__);
}

ThreadManager::~ThreadManager() {
    ALOGV("+[%s]", __FUNCTION__);
    ALOGV("-[%s]", __FUNCTION__);
}

void ThreadManager::initialize(std::string name) {
    mName = name;
    for (uint32_t i = 0;i < THRAED_MAX;i++) {
        char threadName[64];
        snprintf(threadName, sizeof(threadName), "%s_%d", mName.c_str(), i);
        mThreads[i] = new ThreadFunc(this, threadName);
    }
}

bool ThreadManager::registerNewJob(JobFunc jobFuncAddr, JobCb jobCbAddr,
    JobPriority priority, JobHandle* handle) {
    return mJobManager.registerNewJob(jobFuncAddr, jobCbAddr, priority, handle);
}

bool ThreadManager::postJob(JobData jobData) {
    ALOGV("+[%s]", __FUNCTION__);
    if (mJobManager.enqueueJob(jobData)) {
        std::unique_lock lock(mWorkMutex);
        mPending = true;
        lock.unlock();
        mWorkCond.notify_one();
        return true;
    }
    else {
        ALOGE("%s: enqueue job failed!", __FUNCTION__);
    }
    ALOGV("-[%s]", __FUNCTION__);
    return false;
}

size_t ThreadManager::getJobCount() {
   return mJobManager.getJobCount();
}

status_t ThreadManager::startThreads() {
    status_t res = OK;
    for (uint32_t i = 0;i < THRAED_MAX;i++) {
        res = mThreads[i]->run(mName.c_str());
        if (res != OK) {
            ALOGE("%s:run thread %s failed, errno %d",
                __FUNCTION__, mThreads[i]->getThreadName().c_str(), res);
            return res;
        }
        ALOGI("%s:run thread %s successed", __FUNCTION__,
            mThreads[i]->getThreadName().c_str());
    }
    return res;
}

void ThreadManager::stopThreads() {
    ALOGV("+[%s]", __FUNCTION__);
    flush();
    // the flag of parent is not actived 
    mRunning = false;
    // release all blocked threads
    mWorkCond.notify_all();
    for (uint32_t i = 0;i < THRAED_MAX;i++) {
        if (mThreads[i]) {
            mThreads[i]->stop();
        }
    }
    ALOGV("-[%s]", __FUNCTION__);
}

void ThreadManager::flush() {
    mJobManager.flushJob();
}

void ThreadManager::doWork(pid_t tid, std::string threadName) {
    std::unique_lock lock(mWorkMutex);
    while (!mPending && mRunning) {
        mWorkCond.wait(lock);
    }
    lock.unlock();

    if (mRunning) {
        RuntimeJob* currentJob = mJobManager.dequeueJob();
        if (NULL != currentJob) {

            if (NULL != currentJob->pFuncAddr) {
                currentJob->pFuncAddr(currentJob->pData);
            }

            if (NULL != currentJob->pCb) {
                currentJob->pCb(currentJob->pData);
            }
        }
        else {
            ALOGI("%s:thread id %d name %s currentJob is null!", __FUNCTION__, tid, threadName.c_str());
        }

        mJobManager.releaseRuntimeJob(currentJob);

        lock.lock();
        if (!mJobManager.checkJobsonHold()) {
            mPending = false;
        }
        lock.unlock();
    }
}

ThreadManager::ThreadFunc::~ThreadFunc() {
    ALOGV("[%s]", __FUNCTION__);
}

void ThreadManager::ThreadFunc::stop() {
//    status_t res = requestExitAndWait();
//    if (res != OK)
//    {
//        ALOGE("Unable to stop %s thread: %d", mName.c_str(), res);
//    }
    ALOGV("+[%s]", __FUNCTION__);
    requestExit();
    join();
    ALOGV("-[%s]", __FUNCTION__);
}

bool ThreadManager::ThreadFunc::threadLoop() {
    auto parent = mParent.promote();
    // thread dowork only when parent is alived 
    if (parent->isRunning()) {
        parent->doWork(getTid(), mName);
    }
    else {
        return false;
    }
    return true;
}

}// namespace android

调用关系如下图所示:

三、线程池介绍

1)主要实现有下面两个类:

class JobManager:负责管理注册到线程池的job,以及等待执行队列中的job;

class ThreadManager:负责管理线程的初始化,启动以及释放。

2)主要api介绍:

// 初始化线程池,主要工作是new出来THRAED_MAX个Thread;

void ThreadManager::initialize(std::string name)

// 注册一个job到jobManager,需要提供所执行函数的地址,callback函数地址,以及优先等级

// 函数会返回一个JobHandle,用于后续提交任务到JobManager

bool ThreadManager::registerNewJob(JobFunc jobFuncAddr, JobCb jobCbAddr,JobPriority priority, JobHandle* handle):

//提交一个job到JobManager,并通知thread去执行

bool ThreadManager::postJob(JobData jobData)

//启动线程池

status_t ThreadManager::startThreads()

//停止线程池

void ThreadManager::stopThreads()

3)使用举例:

// 在需要使用线程池的模块初始化函数中初始化线程池,注册job以及启动线程池

sp mThreadManager;
mThreadManager = new ThreadManager();
mThreadManager->initialize("CameraDump");
mThreadManager->registerNewJob(printLog, NULL, JobPriority::Critical, &mHandle);
mThreadManager->startThreads();

// 下面是随便写的一个需要线程执行的函数

static void printLog(void* data) {
   (void*)data;
   ALOGD("%s HAODADA NIU BI", __FUNCTION__);
}

// 在模块处理部分postjob,jobData中的handle为注册job时返回handle,pData为线程执行函数时传入的参数

JobData jobData = {
        .handle = mHandle,
        .pData = (void*)dumpData,
};

if (mThreadManager) {
    if (!mThreadManager->postJob(jobData)) {
         ALOGE("%s: post job failed!", __FUNCTION__);
    }
}

// 在模块结束时需要停止线程池,并释放实例

mThreadManager->stopThreads();
mThreadManager.clear();

四、结语

上述过程如果觉得麻烦,还可以再简单点,

// Describes the functions to be executed and their priorities
struct JobEntry {
    JobFunc pFuncAddr;
    JobPriority priority;
    JobCb pCb;
};
struct JobEntrySlot {
    JobEntry pJobEntry;
    bool used = false;
};
std::list mJobSlot;

在直接registerNewJob到list中

bool JobManager::registerNewJob(JobEntry* jobEntry) {
    uint32_t idx;
	if(mJobSlot.size() < JOB_ENTRY_MAX_SIZE) {
		for (idx = 0; idx < mJobSlot.size(); idx++) {
			if (!mJobSlot[idx].used) {
				JobEntry mJobEntry;
				mJobEntry.pFuncAddr = jobentry->jobFuncAddr;
				mJobEntry.priority = jobentry->priority;
				mJobEntry.pCb= jobentry->jobCbAddr;
				mJobSlot[idx].pJobEntry = mJobEntry;
				mJobSlot[idx].used = true;
				break;
			}
		}
	} else {
        ALOGE("%s: register job failed due to the number of job exceeds %d",
            __FUNCTION__, JOB_ENTRY_MAX_SIZE);
        return false;
    }
    return true;
}

接下来在dowork的时候就可以直接从任务队列中取出相应的job,

RuntimeJob* JobManager::dequeueJob() {;
    std::unique_lock lock(mJobLock);
    if (!mJobSlot.empty()) {
        JobEntrySlot* jobEntrySlot = mJobSlot.front();
		mJobSlot.pop_front();
		JobEntry* jobEntry = jobEntrySlot->pJobEntry
        if (NULL != jobEntry) {
            RuntimeJob* currentJob = (RuntimeJob*)malloc(sizeof(RuntimeJob));
            currentJob->pFuncAddr = jobEntry->pFuncAddr;
            currentJob->pCb = jobEntry->pCb;
            currentJob->pData = jobData.pData;
            return currentJob;
        }
    }
    lock.unlock();
    return NULL;
}

整个流程也就会非常的简单:

①封装JobEntry;

②register->enque ;

③dowork->deque job

有兴趣的可以自己实现一下。后续根据项目需求再完善相应的功能,Bye~

 

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/698830.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号