一、背景意义
项目中可能需要处理一些比较繁琐的事,而这些事正好可以使用线程来解决,如tv上经常会去dump raw data,若直接在解码线程中dump raw data会导致每一帧的处理时间变长,如果大幅超出33ms,就会出现丢帧情况,而对于h264格式数据来说,丢帧会导致马赛克产生。下面列出线程池的一种实现(部分功能可以再完善下,比如添加job的优先级,然后根据该优先级顺序来处理job......),此文件导入到项目中就可以使用。
二、具体实现
ThreadCore.h
#ifndef ANDROID_THREADCORE_H #define ANDROID_THREADCORE_H #include#include
#include #include #include "utils/Thread.h" #include "utils/Mutex.h" #include "utils/Timers.h" #include "base.h" namespace android { #define THRAED_MAX 2 #define JOB_ENTRY_MAX_SIZE 8 // Opaque handle to a job typedef uint64_t JobHandle; // Function pointer type for a job function typedef void (*JobFunc)(void* pArg); // Function pointer type for a job callback function typedef void (*JobCb)(void* pArg); // Maximum number of job list static size_t MaxJobCount = 20; // Relative priority of a job submitted enum struct JobPriority { Critical = 0, // Critical priority job High = 1, // High priority Normal = 2, // Normal priority Invalid = 3 // Invalid priority }; // Describes the functions to be executed and their priorities struct JobEntry { JobFunc pFuncAddr; JobPriority priority; JobHandle handle; JobCb pCb; bool used = false; }; // Describe the job and their data struct JobData { JobHandle handle; void* pData; }; // Describe the job func to be performed immediately and their data struct RuntimeJob { JobFunc pFuncAddr; JobCb pCb; void* pData; }; // JobManager provider class JobManager : public virtual Refbase { public: JobManager(){}; ~JobManager(){}; bool registerNewJob(JobFunc jobFuncAddr, JobCb jobCbAddr, JobPriority priority, JobHandle* handle); void releaseRuntimeJob(RuntimeJob* job); JobEntry* getJobByHandle(JobHandle jobHandle); JobEntry* getAllJob(){return mJobSlot;}; JobHandle packJobHandle(uint32_t idx); uint32_t getSlotByHandle(JobHandle handle); bool isJobRegistered(JobHandle handle); size_t getJobCount(); bool checkJobsonHold(); bool enqueueJob(JobData jobData); RuntimeJob* dequeueJob(); void flushJob(); private: mutable std::mutex mJobLock; std::list mJobHandleList; JobEntry mJobSlot[JOB_ENTRY_MAX_SIZE]; }; // ThreadManager provider class ThreadManager : public virtual Refbase { public: ThreadManager(); ~ThreadManager(); void initialize(std::string name); bool registerNewJob(JobFunc jobFuncAddr, JobCb jobCbAddr, JobPriority priority, JobHandle* handle); bool isRunning(){ return mRunning;}; bool postJob(JobData jobData); size_t getJobCount(); status_t startThreads(); void stopThreads(); void flush(); void doWork(pid_t tid, std::string threadName); class ThreadFunc : public Thread { public: ThreadFunc(wp parent, char name[]):mParent(parent),mName(name){}; ~ThreadFunc(); virtual bool threadLoop() override; std::string getThreadName(){return mName;}; void stop(); private: std::string mName; const wp mParent; }; private: sp mThreads[THRAED_MAX]; mutable std::mutex mWorkMutex; std::condition_variable mWorkCond; JobManager mJobManager; std::string mName; bool mPending = false; bool mRunning = true; }; } #endif
ThreadCore.cpp
#include "ThreadCore.h"
#define LOG_NDEBUG 0
//#define LOG_NNDEBUG 0
#define LOG_TAG "ThreadCore"
//#define ALOGV(...) ALOGI(__VA_ARGS__)
namespace android {
bool JobManager::registerNewJob(JobFunc jobFuncAddr, JobCb jobCbAddr,
JobPriority priority, JobHandle* handle) {
uint32_t idx;
for (idx = 0;idx < JOB_ENTRY_MAX_SIZE;idx++) {
if (!mJobSlot[idx].used) {
*handle = packJobHandle(idx);
mJobSlot[idx].pFuncAddr = jobFuncAddr;
mJobSlot[idx].priority = priority;
mJobSlot[idx].handle = *handle;
mJobSlot[idx].pCb= jobCbAddr;
mJobSlot[idx].used = true;
break;
}
}
if (idx >= JOB_ENTRY_MAX_SIZE) {
ALOGE("%s: register job failed due to the number of job exceeds %d",
__FUNCTION__, JOB_ENTRY_MAX_SIZE);
return false;
}
return true;
}
JobHandle JobManager::packJobHandle(uint32_t idx) {
JobHandle handle = 1 << 32;
handle |= idx + 1;
return handle;
}
JobEntry* JobManager::getJobByHandle(JobHandle handle) {
uint32_t idx = (handle & 0xF) - 1;
if (idx <= JOB_ENTRY_MAX_SIZE) {
return &mJobSlot[idx];
}
else {
return NULL;
}
}
// This method needs to be implemented
bool JobManager::isJobRegistered(JobHandle handle) {
return false;
}
bool JobManager::checkJobsonHold() {
std::lock_guard lock(mJobLock);
return !mJobHandleList.empty();
}
bool JobManager::enqueueJob(JobData jobData) {
std::lock_guard lock(mJobLock);
if (mJobHandleList.size() < MaxJobCount) {
mJobHandleList.push_back(jobData);
return true;
}
return false;
}
RuntimeJob* JobManager::dequeueJob() {;
std::unique_lock lock(mJobLock);
JobData jobData;
if (!mJobHandleList.empty()) {
jobData = mJobHandleList.front();
mJobHandleList.pop_front();
JobEntry* jobEntry = getJobByHandle(jobData.handle);
if (NULL != jobEntry) {
RuntimeJob* currentJob = (RuntimeJob*)malloc(sizeof(RuntimeJob));
currentJob->pFuncAddr = jobEntry->pFuncAddr;
currentJob->pCb = jobEntry->pCb;
currentJob->pData = jobData.pData;
return currentJob;
}
}
lock.unlock();
return NULL;
}
// This function may have asynchronous security problems
// For the sake of program efficiency, it can be ignored
size_t JobManager::getJobCount() {
return mJobHandleList.size();
}
void JobManager::flushJob() {
ALOGV("+[%s]", __FUNCTION__);
std::lock_guard lock(mJobLock);
for (auto iter = mJobHandleList.begin();iter != mJobHandleList.end();) {
JobEntry* jobEntry = getJobByHandle(iter->handle);
if (NULL != jobEntry->pCb) {
jobEntry->pCb(iter->pData);
}
free(iter->pData);
iter = mJobHandleList.erase(iter);
}
ALOGV("-[%s]", __FUNCTION__);
}
void JobManager::releaseRuntimeJob(RuntimeJob* job) {
if (NULL != job) {
if (NULL != job->pData) {
free(job->pData);
job->pData = NULL;
}
free(job);
}
}
ThreadManager::ThreadManager() {
ALOGV("+[%s]", __FUNCTION__);
ALOGV("-[%s]", __FUNCTION__);
}
ThreadManager::~ThreadManager() {
ALOGV("+[%s]", __FUNCTION__);
ALOGV("-[%s]", __FUNCTION__);
}
void ThreadManager::initialize(std::string name) {
mName = name;
for (uint32_t i = 0;i < THRAED_MAX;i++) {
char threadName[64];
snprintf(threadName, sizeof(threadName), "%s_%d", mName.c_str(), i);
mThreads[i] = new ThreadFunc(this, threadName);
}
}
bool ThreadManager::registerNewJob(JobFunc jobFuncAddr, JobCb jobCbAddr,
JobPriority priority, JobHandle* handle) {
return mJobManager.registerNewJob(jobFuncAddr, jobCbAddr, priority, handle);
}
bool ThreadManager::postJob(JobData jobData) {
ALOGV("+[%s]", __FUNCTION__);
if (mJobManager.enqueueJob(jobData)) {
std::unique_lock lock(mWorkMutex);
mPending = true;
lock.unlock();
mWorkCond.notify_one();
return true;
}
else {
ALOGE("%s: enqueue job failed!", __FUNCTION__);
}
ALOGV("-[%s]", __FUNCTION__);
return false;
}
size_t ThreadManager::getJobCount() {
return mJobManager.getJobCount();
}
status_t ThreadManager::startThreads() {
status_t res = OK;
for (uint32_t i = 0;i < THRAED_MAX;i++) {
res = mThreads[i]->run(mName.c_str());
if (res != OK) {
ALOGE("%s:run thread %s failed, errno %d",
__FUNCTION__, mThreads[i]->getThreadName().c_str(), res);
return res;
}
ALOGI("%s:run thread %s successed", __FUNCTION__,
mThreads[i]->getThreadName().c_str());
}
return res;
}
void ThreadManager::stopThreads() {
ALOGV("+[%s]", __FUNCTION__);
flush();
// the flag of parent is not actived
mRunning = false;
// release all blocked threads
mWorkCond.notify_all();
for (uint32_t i = 0;i < THRAED_MAX;i++) {
if (mThreads[i]) {
mThreads[i]->stop();
}
}
ALOGV("-[%s]", __FUNCTION__);
}
void ThreadManager::flush() {
mJobManager.flushJob();
}
void ThreadManager::doWork(pid_t tid, std::string threadName) {
std::unique_lock lock(mWorkMutex);
while (!mPending && mRunning) {
mWorkCond.wait(lock);
}
lock.unlock();
if (mRunning) {
RuntimeJob* currentJob = mJobManager.dequeueJob();
if (NULL != currentJob) {
if (NULL != currentJob->pFuncAddr) {
currentJob->pFuncAddr(currentJob->pData);
}
if (NULL != currentJob->pCb) {
currentJob->pCb(currentJob->pData);
}
}
else {
ALOGI("%s:thread id %d name %s currentJob is null!", __FUNCTION__, tid, threadName.c_str());
}
mJobManager.releaseRuntimeJob(currentJob);
lock.lock();
if (!mJobManager.checkJobsonHold()) {
mPending = false;
}
lock.unlock();
}
}
ThreadManager::ThreadFunc::~ThreadFunc() {
ALOGV("[%s]", __FUNCTION__);
}
void ThreadManager::ThreadFunc::stop() {
// status_t res = requestExitAndWait();
// if (res != OK)
// {
// ALOGE("Unable to stop %s thread: %d", mName.c_str(), res);
// }
ALOGV("+[%s]", __FUNCTION__);
requestExit();
join();
ALOGV("-[%s]", __FUNCTION__);
}
bool ThreadManager::ThreadFunc::threadLoop() {
auto parent = mParent.promote();
// thread dowork only when parent is alived
if (parent->isRunning()) {
parent->doWork(getTid(), mName);
}
else {
return false;
}
return true;
}
}// namespace android
调用关系如下图所示:
三、线程池介绍
1)主要实现有下面两个类:
class JobManager:负责管理注册到线程池的job,以及等待执行队列中的job;
class ThreadManager:负责管理线程的初始化,启动以及释放。
2)主要api介绍:
// 初始化线程池,主要工作是new出来THRAED_MAX个Thread;
void ThreadManager::initialize(std::string name)
// 注册一个job到jobManager,需要提供所执行函数的地址,callback函数地址,以及优先等级
// 函数会返回一个JobHandle,用于后续提交任务到JobManager
bool ThreadManager::registerNewJob(JobFunc jobFuncAddr, JobCb jobCbAddr,JobPriority priority, JobHandle* handle):
//提交一个job到JobManager,并通知thread去执行
bool ThreadManager::postJob(JobData jobData)
//启动线程池
status_t ThreadManager::startThreads()
//停止线程池
void ThreadManager::stopThreads()
3)使用举例:
// 在需要使用线程池的模块初始化函数中初始化线程池,注册job以及启动线程池
spmThreadManager; mThreadManager = new ThreadManager(); mThreadManager->initialize("CameraDump"); mThreadManager->registerNewJob(printLog, NULL, JobPriority::Critical, &mHandle); mThreadManager->startThreads();
// 下面是随便写的一个需要线程执行的函数
static void printLog(void* data) {
(void*)data;
ALOGD("%s HAODADA NIU BI", __FUNCTION__);
}
// 在模块处理部分postjob,jobData中的handle为注册job时返回handle,pData为线程执行函数时传入的参数
JobData jobData = {
.handle = mHandle,
.pData = (void*)dumpData,
};
if (mThreadManager) {
if (!mThreadManager->postJob(jobData)) {
ALOGE("%s: post job failed!", __FUNCTION__);
}
}
// 在模块结束时需要停止线程池,并释放实例
mThreadManager->stopThreads(); mThreadManager.clear();
四、结语
上述过程如果觉得麻烦,还可以再简单点,
// Describes the functions to be executed and their priorities
struct JobEntry {
JobFunc pFuncAddr;
JobPriority priority;
JobCb pCb;
};
struct JobEntrySlot {
JobEntry pJobEntry;
bool used = false;
};
std::list mJobSlot;
在直接registerNewJob到list中
bool JobManager::registerNewJob(JobEntry* jobEntry) {
uint32_t idx;
if(mJobSlot.size() < JOB_ENTRY_MAX_SIZE) {
for (idx = 0; idx < mJobSlot.size(); idx++) {
if (!mJobSlot[idx].used) {
JobEntry mJobEntry;
mJobEntry.pFuncAddr = jobentry->jobFuncAddr;
mJobEntry.priority = jobentry->priority;
mJobEntry.pCb= jobentry->jobCbAddr;
mJobSlot[idx].pJobEntry = mJobEntry;
mJobSlot[idx].used = true;
break;
}
}
} else {
ALOGE("%s: register job failed due to the number of job exceeds %d",
__FUNCTION__, JOB_ENTRY_MAX_SIZE);
return false;
}
return true;
}
接下来在dowork的时候就可以直接从任务队列中取出相应的job,
RuntimeJob* JobManager::dequeueJob() {;
std::unique_lock lock(mJobLock);
if (!mJobSlot.empty()) {
JobEntrySlot* jobEntrySlot = mJobSlot.front();
mJobSlot.pop_front();
JobEntry* jobEntry = jobEntrySlot->pJobEntry
if (NULL != jobEntry) {
RuntimeJob* currentJob = (RuntimeJob*)malloc(sizeof(RuntimeJob));
currentJob->pFuncAddr = jobEntry->pFuncAddr;
currentJob->pCb = jobEntry->pCb;
currentJob->pData = jobData.pData;
return currentJob;
}
}
lock.unlock();
return NULL;
}
整个流程也就会非常的简单:
①封装JobEntry;
②register->enque ;
③dowork->deque job
有兴趣的可以自己实现一下。后续根据项目需求再完善相应的功能,Bye~



