栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Python

数据挖掘学习笔记:Apriori算法Python两种实现

Python 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

数据挖掘学习笔记:Apriori算法Python两种实现

简述

先展示两种实现代码(原始版和优化版)。后续有时间再写详细教程。

原始版

def generateCandidates(targetEachlen, frequentTuples=[]):
    candidates = []
    lenOfArr = len(frequentTuples)

    for i in range(lenOfArr):
        for j in range(i+1, lenOfArr):
            tupleA = frequentTuples[i]
            tupleB = frequentTuples[j]
            
            if tupleA[:targetEachlen-2] == tupleB[:targetEachlen-2]:
                tupleC = tupleA + tuple([tupleB[-1]])
                candidates.append( tupleC )
            else:
                break        
            
    return candidates

def genearateFrequentSets(dataSetList, elementTupleList, minSupport=0.5):
    supportDict = {}
    frequentTuples,tempSupportDict= filterSetCandidates(transactions=dataSetList, tupleCandidates=elementTupleList, 
    minSupport=minSupport)
    # print(len(elementTupleList), len(frequentTuples))
    k = 2
    supportDict.update(tempSupportDict)
    frequentTuplesList = [frequentTuples]
    while (k-2) 1:
        tupleCandidates = generateCandidates(k, frequentTuplesList[k-2])
        frequentTuples, tempSupportDict = filterSetCandidates(transactions=dataSetList, tupleCandidates=tupleCandidates, 
        minSupport=minSupport)
        supportDict.update(tempSupportDict)
        if len(frequentTuples) > 0:
            frequentTuplesList.append(frequentTuples)
        # print(k, len(tupleCandidates), len(frequentTuples))
        k += 1
 
    return frequentTuplesList,supportDict


def filterSetCandidates(transactions, tupleCandidates, minSupport):
    freqItemSets = []
    supportDict = {}
    lenOfTransactions = float(len(transactions))
    for itemSetTuple in tupleCandidates:
        existCount = 0
        for transaction in transactions:
            itemSet = set(itemSetTuple)
            if itemSet.issubset(transaction):
                existCount += 1
        support = float(existCount)/lenOfTransactions
        if support >= minSupport:
            supportDict[itemSetTuple] = support
            freqItemSets.append(itemSetTuple)

    return freqItemSets,supportDict

def generateAllRules(frequentTuplesList, supportDict, minConfidence):
    ruleList = []
    
    for i in range(1, len(frequentTuplesList)):
        for frequentTuple in frequentTuplesList[i]:
            itemsOfTuple = [tuple({item}) for item in frequentTuple]
            if i > 1:
                ruleList.extend(generateRulesOfFrequentSet(frequentTuple, itemsOfTuple, supportDict, minConfidence))
            else:
                ruleList.extend(calcConfidenceOnSubSetList(frequentTuple, itemsOfTuple, supportDict, minConfidence))

    return ruleList

def generateRulesOfFrequentSet(frequentTuple, itemsOfTuple, supportDict, minConfidence):
    confidenceList = []
    k = 1
    tupleCandidates = itemsOfTuple
    lenOfTuple = len(itemsOfTuple)
    while len(tupleCandidates) >= 2:
        confidenceList.extend(calcConfidenceOnSubSetList(frequentTuple, tupleCandidates, supportDict, minConfidence))
        k += 1
        if k>int(lenOfTuple/2):
            break
        tupleCandidates = generateCandidates(k, tupleCandidates)

    return confidenceList

def calcConfidenceOnSubSetList(wholeTuple, subTuples, supportDict={}, minConfidence=1):
    confidenceList = []

    for leftSubTuple in subTuples:
        tempSet = set(leftSubTuple)
        rightSubTuple = tuple([v for v in wholeTuple if v not in tempSet])
        # rightSubTuple = tuple([v for v in wholeTuple if v not in leftSubTuple])
        conf1 = supportDict[wholeTuple] / supportDict[leftSubTuple]
        rule1 = [leftSubTuple, rightSubTuple, conf1]
        if(rule1[2] >= minConfidence):
            confidenceList.append(rule1)
        if len(wholeTuple)/2 == len(leftSubTuple):
            continue 
        conf2 = supportDict[wholeTuple] / supportDict[rightSubTuple]
        rule2 = [rightSubTuple, leftSubTuple, conf2]
        if(rule2[2] >= minConfidence):
            confidenceList.append(rule2)

    return confidenceList

def apriori(data=[[]], minSupport=1, minConfidence=1):
    dataSetList = list(map(set, data))
    tempSet = sorted(set([ele for arr in data for ele in arr]))
    elementTupleList = [tuple([ele]) for ele in tempSet]
    
    frequentTuplesList, supportDict = genearateFrequentSets(dataSetList=dataSetList, 
    elementTupleList=elementTupleList, minSupport=minSupport)
    ruleList = []
    ruleList = generateAllRules(frequentTuplesList=frequentTuplesList, supportDict=supportDict, minConfidence=minConfidence)
    return ruleList

if __name__ == '__main__':
    begin = time.perf_counter()
    dataFrame = pd.read_csv(filepath_or_buffer=r'mydata.csv')
    transactions = [x.strip('{}').split(',') for x in dataFrame['items']]
  
    ruleList =  apriori(data=transactions, minSupport=0.001, minConfidence=0.4)
    end = time.perf_counter()
    print(end-begin)
    print(len(ruleList))

优化版

def generateCandidates(targetEachlen, frequentTuples=[]):
    candidates = []
    lenOfArr = len(frequentTuples)

    for i in range(lenOfArr):
        for j in range(i+1, lenOfArr):
            tupleA = frequentTuples[i]
            tupleB = frequentTuples[j]
            
            if tupleA[:targetEachlen-2] == tupleB[:targetEachlen-2]:
                tupleC = tupleA + tuple([tupleB[-1]])
                candidates.append( tupleC )
            else:
                break        
            
    return candidates


def genearateAllFrequent(data=[[]], minSupport=1):
    lenOfTransactions = float(len(data))
    dataSetList = list(map(set, data))
    tempSet = sorted(set([ele for arr in data for ele in arr])) 
    tempDistributions = []
    nextDistributions = []
    frequentTuples = []
    supportDict = {}

    for ele in tempSet:
        temp = set()
        for i in range(len(dataSetList)):
            if ele in dataSetList[i]:
                temp.add(i)
        support = float(len(temp))/lenOfTransactions
        tempDistributions.append(temp)
        if support >= minSupport:
            nextDistributions.append(temp)
            tupleEle = tuple({ele})
            frequentTuples.append(tupleEle)
            
            supportDict[tupleEle] = support
    # print(1, len(tempSet), len(frequentTuples))
    k = 2
    
    frequentTuplesList = [frequentTuples]
    while (k-2) 1:
        frequentTuples, tempSupportDict, nextDistributions = generatefrequentTuples(frequentTuples=frequentTuples,
        targetEachlen=k,
        lastDistributions=nextDistributions,
        lenOfTransactions=lenOfTransactions, 
        minSupport=minSupport)
        supportDict.update(tempSupportDict)
        if len(frequentTuples) > 0:
            frequentTuplesList.append(frequentTuples)
        k += 1

    return frequentTuplesList,supportDict

def generatefrequentTuples(frequentTuples, targetEachlen, lastDistributions=[], lenOfTransactions=1.0, minSupport=1.0):
    nextFrequentTuples = []
    nextDistributions = []
    
    lenOfArr = len(frequentTuples)
    supportDict = {}
    candidateCount = 0

    for i in range(lenOfArr):
        for j in range(i+1, lenOfArr):
            tupleA = frequentTuples[i]
            tupleB = frequentTuples[j]

            if tupleA[:targetEachlen-2] == tupleB[:targetEachlen-2]:
                # candidateCount += 1
                setC = lastDistributions[i] & lastDistributions[j]
                support = float(len(setC))/lenOfTransactions
                if support >= minSupport:
                    tupleC = tupleA + tuple([tupleB[-1]])
                    nextFrequentTuples.append( tupleC )
                    nextDistributions.append(setC)
                    supportDict[tupleC] = support
            else:
                break        
    
    # print(targetEachlen, candidateCount, len(nextFrequentTuples))
    return nextFrequentTuples, supportDict, nextDistributions

def generateAllRules(frequentTuplesList, supportDict, minConfidence):
    ruleList = []
    
    for i in range(1, len(frequentTuplesList)):
        for frequentTuple in frequentTuplesList[i]:
            itemsOfTuple = [tuple({item}) for item in frequentTuple]
            if i > 1:
                ruleList.extend(generateRulesOfFrequentSet(frequentTuple, itemsOfTuple, supportDict, minConfidence))
            else:
                ruleList.extend(calcConfidenceOnSubSetList(frequentTuple, itemsOfTuple, supportDict, minConfidence))

    return ruleList

def generateRulesOfFrequentSet(frequentTuple, itemsOfTuple, supportDict, minConfidence):
    confidenceList = []
    k = 1
    tupleCandidates = itemsOfTuple
    lenOfTuple = len(itemsOfTuple)
    while len(tupleCandidates) >= 2:
        confidenceList.extend(calcConfidenceOnSubSetList(frequentTuple, tupleCandidates, supportDict, minConfidence))
        k += 1
        if k>int(lenOfTuple/2):
            break
        tupleCandidates = generateCandidates(k, tupleCandidates)

    return confidenceList

def calcConfidenceOnSubSetList(wholeTuple, subTuples, supportDict={}, minConfidence=1):
    confidenceList = []

    for leftSubTuple in subTuples:
        tempSet = set(leftSubTuple)
        rightSubTuple = tuple([v for v in wholeTuple if v not in tempSet])
        # rightSubTuple = tuple([v for v in wholeTuple if v not in leftSubTuple])
        conf1 = supportDict[wholeTuple] / supportDict[leftSubTuple]
        rule1 = [leftSubTuple, rightSubTuple, conf1]
        if(rule1[2] >= minConfidence):
            confidenceList.append(rule1)
        if len(wholeTuple)/2 == len(leftSubTuple):
            continue 
        conf2 = supportDict[wholeTuple] / supportDict[rightSubTuple]
        rule2 = [rightSubTuple, leftSubTuple, conf2]
        if(rule2[2] >= minConfidence):
            confidenceList.append(rule2)

    return confidenceList

def apriori(data=[[]], minSupport=1, minConfidence=1):
    frequentTuplesList, supportDict = genearateAllFrequent(data=data, minSupport=minSupport)
    ruleList = []
    # print(sys.getsizeof(frequentTuplesList), sys.getsizeof(supportDict))
    ruleList = generateAllRules(frequentTuplesList=frequentTuplesList, supportDict=supportDict, minConfidence=minConfidence)
    return ruleList

if __name__ == '__main__':
    begin = datetime.datetime.now()
    dataFrame = pd.read_csv(filepath_or_buffer=r'mydata.csv')
    data = [x.strip('{}').split(',') for x in dataFrame['items']]
    ruleList =  apriori(data=data, minSupport=0.001, minConfidence=0.4)
    # end = time.perf_counter()
    end = datetime.datetime.now()
    print((end-begin).total_seconds())
    print(len(ruleList))
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/875654.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号