#!/usr/bin/python
# coding:utf-8
import jenkins
import numpy
import threading
import time
import buildinfo
import datetime
from tools import Tools
import collections
import operator
def LoginJenkins(master_url,username=None,password=None):
return jenkins.Jenkins(master_url,username,password)
# 装饰器
def timer(func):
def inner(*args,**kwargs):
t1 = time.time()
ret = func(*args,**kwargs)
t2 = time.time()
print 'n{0} cost time {1} sn'.format(func.__name__,t2-t1)
return ret
return inner
# 多线程类
class GetBuildThread(threading.Thread):
def __init__(self,func,args=()):
threading.Thread.__init__(self)
self.func = func
self.args = args
def run(self):
self.result = self.func(*self.args)
def get_result(self):
try :
return self.result
except Exception as e:
return None
# 多线程函数
def get_buildInfo(jenkinsInstance,job_name,buildnumber):
return buildnumber,buildinfo.BuildInfo(jenkinsInstance,job_name,buildnumber)
class JobInfo(object):
def __init__(self,InstanceJenkins,job_name,depth=0):
self.job_name = job_name
self.jenkins = InstanceJenkins
# print job_name
self.detail_Dict = InstanceJenkins.get_job_info(job_name,depth)
# print self.detail_Dict
@classmethod
def init_URL(cls,url,username=None,password=None):
url_master,job_name,_ = Tools.get_params_use_url(url)
# print Tools.get_params_use_url(url)
InstanceJenkins = jenkins.Jenkins(url_master,username,password)
return cls(InstanceJenkins,job_name)
# 获取工程嘚描述
def get_description(self):
return self.detail_Dict.get('description',"")
#获取工程的url
def get_job_url(self):
return self.detail_Dict.get('url','')
# 检查工程时候被禁用
def get_is_able(self):
return self.detail_Dict.get('buildable',False)
# 获取工程的所有构建号
def get_builds_number_list(self):
l = []
if 'builds' in self.detail_Dict.keys():
for build_info_dict in self.detail_Dict.get('builds',[]):
if 'number' in build_info_dict.keys():
l.append(build_info_dict.get('number',0))
return l
# 或健康分数,不太准确,随时间会变化
def get_health_score(self):
l = []
if 'healthReport' in self.detail_Dict.keys():
for health_dict in self.detail_Dict.get('healthReport',[]):
if 'score' in health_dict.keys():
l.append(health_dict.get('score',0))
return l
def formal1(self,name,sub_name):
if name in self.detail_Dict.keys():
if sub_name in self.detail_Dict.get(name,{}) :
return self.detail_Dict.get(name,{}).get(sub_name,0)
# 第一次构建号
def get_firstBuild_number(self):
return self.formal1( 'firstBuild', 'number')
# 最后一次构建号
def get_lastBuild_number(self):
return self.formal1( 'lastBuild', 'number')
# 最后一次,完成的构建,如果10还在构建,则9就是最后一次完成的构建
def get_lastCompletedBuild_number(self):
return self.formal1( 'lastCompletedBuild', 'number')
# 最后一次失败的构建号
def get_lastFailedBuild_number(self):
return self.formal1('lastFailedBuild', 'number')
# 最后一次成功的构建号
def get_lastSuccessfulBuild_number(self):
return self.formal1('lastSuccessfulBuild', 'number')
# 下一次构建号
def get_nextBuildNumber(self):
return self.detail_Dict.get('nextBuildNumber',0)
# 是否有排队的构建
def get_inQueue(self):
return self.detail_Dict.get('inQueue',False)
# 或排队的项目,id不太准,queueItem
# 获取整个工程的默认参数
# {'git_url': 'http://github.com', 'tag': '09142336'},没有默认参数,值为''
def get_all_params(self):
params_dict = {}
if 'property' in self.detail_Dict.keys():
for items_dict in self.detail_Dict.get('property',[]):
if '_class' in items_dict.keys() and items_dict.get('_class','').find('Parameters') != -1 :
params_list = items_dict.get('parameterDefinitions',[])
for params_dict_sub in params_list:
params_dict[params_dict_sub.get('name','')] = params_dict_sub.
get('defaultParameterValue','').get('value','')
return params_dict
# 获取整个工程的某一个参数
def get_one_params(self,param):
if 'property' in self.detail_Dict.keys():
for items_dict in self.detail_Dict.get('property',[]):
if '_class' in items_dict.keys() and items_dict.get('_class','').find('Parameters') != -1 :
params_list = items_dict.get('parameterDefinitions',[])
for params_dict_sub in params_list:
if param == params_dict_sub.get('name',''):
return params_dict_sub.get('defaultParameterValue','').get('value','')
class Analyze_Job_Builds(JobInfo):
def __init__(self,InstanceJenkins,job_name,depth=0):
JobInfo.__init__(self,InstanceJenkins,job_name,depth)
self.job_list = JobInfo.get_builds_number_list(self)
tmp_dict = {}
threads = []
for number in self.job_list:
t = GetBuildThread(get_buildInfo,args=(self.jenkins,self.job_name,number))
t.start()
threads.append(t)
for th in threads:
th.join()
tmp_number,tmp_build_object = th.result
tmp_dict[tmp_number] = tmp_build_object
# 将字典倒序,注意使用的解释器是2.7 在3.6以下,所以是无序字典。OrderedDict将无序变为有序。
self.buildsInstance = collections.OrderedDict(sorted(tmp_dict.items(), key=operator.itemgetter(0), reverse=True))
# print self.buildsInstance.keys()
# 获取所有成功的构建号。
def get_success_build_numbers(self):
for number in list(self.buildsInstance.keys()):
build = self.buildsInstance.get(number)
if not build.build_result == 'SUCCESS':
self.buildsInstance.pop(number)
self.job_list.remove(number)
return self.buildsInstance
def get_startend_time_build_numbers(self,start_time_str,end_time_str):
start_timestamp = Tools.timestr_to_num(start_time_str)
end_timestamp = Tools.timestr_to_num(end_time_str)
for number in list(self.buildsInstance.keys()):
build = self.buildsInstance.get(number)
if not (build.build_start_time > start_timestamp and build.build_start_time< end_timestamp):
self.buildsInstance.pop(number)
self.job_list.remove(number)
return self.buildsInstance
def get_startend_Success_build_numbers(self,start_time_str,end_time_str):
start_timestamp = Tools.timestr_to_num(start_time_str)
end_timestamp = Tools.timestr_to_num(end_time_str)
for number in list(self.buildsInstance.keys()):
build = self.buildsInstance.get(number)
if not (build.build_start_time > start_timestamp and build.build_start_time< end_timestamp
and build.build_result == 'SUCCESS'):
self.buildsInstance.pop(number)
self.job_list.remove(number)
return self.buildsInstance
def get_yesterday_build_numbers(self):
start_timestamp = Tools.get_yesterday_zero_num()
end_timestamp = Tools.get_today_zero_num()
for number in list(self.buildsInstance.keys()):
build = self.buildsInstance.get(number)
if not (build.build_start_time > start_timestamp and build.build_start_time< end_timestamp):
self.buildsInstance.pop(number)
self.job_list.remove(number)
return self.buildsInstance
def get_yesterday_success_build_numbers(self):
start_timestamp = Tools.get_yesterday_zero_num()
end_timestamp = Tools.get_today_zero_num()
for number in list(self.buildsInstance.keys()):
build = self.buildsInstance.get(number)
if not (build.build_start_time > start_timestamp and build.build_start_time< end_timestamp
and build.build_result == 'SUCCESS'):
self.buildsInstance.pop(number)
self.job_list.remove(number)
return self.buildsInstance
# 取出最近的多少个build
def get_limit_numbers(self,limit):
for xuhao,number in enumerate(list(self.buildsInstance.keys()),1):
if xuhao > limit:
self.buildsInstance.pop(number)
self.job_list.remove(number)
return self.buildsInstance
# 取出最近的多少个build,排除 ABORTED 或正在运行的
def get_limit_no_running_numbers(self,limit):
for number_tmp in list(self.buildsInstance.keys()):
build = self.buildsInstance.get(number_tmp)
# 将正在运行的build删除。
if build.is_building or build.build_result == 'ABORTED':
self.buildsInstance.pop(number_tmp)
self.job_list.remove(number_tmp)
for xuhao,number in enumerate(list(self.buildsInstance.keys()),1):
if xuhao > limit:
self.buildsInstance.pop(number)
self.job_list.remove(number)
return self.buildsInstance
# 耗时中位数,默认单位 分钟
def get_build_time_median(self):
use_time_list = []
for number in list(self.buildsInstance.keys()):
build = self.buildsInstance.get(number)
use_time_list.append(int(build.build_use_time/60))
return numpy.median(use_time_list)
# 根据运行时间,获取build对象列表,默认时间分钟
def get_high_time_numbers(self,max):
for number in list(self.buildsInstance.keys()):
build = self.buildsInstance.get(number)
if int(build.build_use_time/60) < max :
self.buildsInstance.pop(number)
self.job_list.remove(number)
def get_limit_success_rate(self,limit):
all_list = len(self.get_limit_numbers(limit))
print all_list
success_list = len(self.get_success_build_numbers())
print success_list
if success_list == 0 :
return '0%'
return 'percent: {:.2%}'.format(float(success_list)/float(all_list))
# 判断当前的build是否全部失败。
def juge_all_fail(self):
'''
判断所有构建是否失败,
:param buildsInstance:{1:buildobject1,2:buildobject2}
:return:True or False
'''
for number, build in self.buildsInstance.items():
if build.build_result == 'SUCCESS':
return False
return True
# 判断当前job类型是 tag类型还是 非tag类型。
def is_job_type(self):
ch_list = []
tag_list = []
for number,build in self.buildsInstance.items():
try:
changeid = build.get_build_one_parmas('GERRIT_CHANGE_ID')
# print changeid
# 更改一下
# if len(changeid) == 41:
if len(changeid) == 41:
ch_list.append(number)
else:
tag_list.append(number)
except:
return 'all_tag'
# print 'ch_list',ch_list
# print 'tag_list',tag_list
if not ch_list:
return 'all_tag'
return 'not_all_tag'
#把非tag类型的job ,把tag的build全部删除
def del_tag_build(self):
# print self.is_job_type()
if self.is_job_type() == 'not_all_tag':
for number, build in self.buildsInstance.items():
try:
changeid = build.get_build_one_parmas('GERRIT_CHANGE_ID')
if len(changeid) != 41:
self.buildsInstance.pop(number)
self.job_list.remove(number)
except:
print '%s 工程没有GERRIT_CHANGE_ID参数 '.format(self.get_job_url())
if __name__ == '__main__':
jenkins_instance = LoginJenkins('http://10.0.0.70:8080/jenkins/','admin','XXXX')
one_job = Analyze_Job_Builds(jenkins_instance,'all_changed_success')
# print one_job.get_all_params()
# print one_job.buildsInstance.keys()
# for number in one_job.buildsInstance.keys():
# print number
# one_job.get_success_build_number()
# one_job.get_startend_Success_build_numbers('2021-10-04 03:42:00','2021-10-07 12:50:00')
# print one_job.buildsInstance.keys()
#
# one_job.get_limit_numbers(2)
# print one_job.buildsInstance.keys()
# print one_job.get_build_time_median()
print one_job.get_limit_success_rate(5)
# dic = {1:"1",2:'2',3:'3'}
# # dic2 = sorted(dic.items(), key=lambda k: k[0],reverse=True)
# # print dict(dic2)
#
# print sorted(dic.items(),key=lambda x:x[0],reverse=True)
#
# print { k:v for k,v in sorted(dic.items(),key=lambda x:x[0],reverse=True)}
# import collections
#
#
# dic = {1:"1",2:'2',3:'3'}
# sorted_x = sorted(dic.items(),key=operator.itemgetter(0),reverse=True)
# sorted_dict = collections.OrderedDict(sorted_x)
# y = { k:v for k,v in sorted(dic.items(),key=operator.itemgetter(0),reverse=True)}
# print sorted_dict
# print dict(sorted_dict)