先展示两种实现代码(原始版和优化版)。后续有时间再写详细教程。
原始版
def generateCandidates(targetEachlen, frequentTuples=[]):
candidates = []
lenOfArr = len(frequentTuples)
for i in range(lenOfArr):
for j in range(i+1, lenOfArr):
tupleA = frequentTuples[i]
tupleB = frequentTuples[j]
if tupleA[:targetEachlen-2] == tupleB[:targetEachlen-2]:
tupleC = tupleA + tuple([tupleB[-1]])
candidates.append( tupleC )
else:
break
return candidates
def genearateFrequentSets(dataSetList, elementTupleList, minSupport=0.5):
supportDict = {}
frequentTuples,tempSupportDict= filterSetCandidates(transactions=dataSetList, tupleCandidates=elementTupleList,
minSupport=minSupport)
# print(len(elementTupleList), len(frequentTuples))
k = 2
supportDict.update(tempSupportDict)
frequentTuplesList = [frequentTuples]
while (k-2) 1:
tupleCandidates = generateCandidates(k, frequentTuplesList[k-2])
frequentTuples, tempSupportDict = filterSetCandidates(transactions=dataSetList, tupleCandidates=tupleCandidates,
minSupport=minSupport)
supportDict.update(tempSupportDict)
if len(frequentTuples) > 0:
frequentTuplesList.append(frequentTuples)
# print(k, len(tupleCandidates), len(frequentTuples))
k += 1
return frequentTuplesList,supportDict
def filterSetCandidates(transactions, tupleCandidates, minSupport):
freqItemSets = []
supportDict = {}
lenOfTransactions = float(len(transactions))
for itemSetTuple in tupleCandidates:
existCount = 0
for transaction in transactions:
itemSet = set(itemSetTuple)
if itemSet.issubset(transaction):
existCount += 1
support = float(existCount)/lenOfTransactions
if support >= minSupport:
supportDict[itemSetTuple] = support
freqItemSets.append(itemSetTuple)
return freqItemSets,supportDict
def generateAllRules(frequentTuplesList, supportDict, minConfidence):
ruleList = []
for i in range(1, len(frequentTuplesList)):
for frequentTuple in frequentTuplesList[i]:
itemsOfTuple = [tuple({item}) for item in frequentTuple]
if i > 1:
ruleList.extend(generateRulesOfFrequentSet(frequentTuple, itemsOfTuple, supportDict, minConfidence))
else:
ruleList.extend(calcConfidenceOnSubSetList(frequentTuple, itemsOfTuple, supportDict, minConfidence))
return ruleList
def generateRulesOfFrequentSet(frequentTuple, itemsOfTuple, supportDict, minConfidence):
confidenceList = []
k = 1
tupleCandidates = itemsOfTuple
lenOfTuple = len(itemsOfTuple)
while len(tupleCandidates) >= 2:
confidenceList.extend(calcConfidenceOnSubSetList(frequentTuple, tupleCandidates, supportDict, minConfidence))
k += 1
if k>int(lenOfTuple/2):
break
tupleCandidates = generateCandidates(k, tupleCandidates)
return confidenceList
def calcConfidenceOnSubSetList(wholeTuple, subTuples, supportDict={}, minConfidence=1):
confidenceList = []
for leftSubTuple in subTuples:
tempSet = set(leftSubTuple)
rightSubTuple = tuple([v for v in wholeTuple if v not in tempSet])
# rightSubTuple = tuple([v for v in wholeTuple if v not in leftSubTuple])
conf1 = supportDict[wholeTuple] / supportDict[leftSubTuple]
rule1 = [leftSubTuple, rightSubTuple, conf1]
if(rule1[2] >= minConfidence):
confidenceList.append(rule1)
if len(wholeTuple)/2 == len(leftSubTuple):
continue
conf2 = supportDict[wholeTuple] / supportDict[rightSubTuple]
rule2 = [rightSubTuple, leftSubTuple, conf2]
if(rule2[2] >= minConfidence):
confidenceList.append(rule2)
return confidenceList
def apriori(data=[[]], minSupport=1, minConfidence=1):
dataSetList = list(map(set, data))
tempSet = sorted(set([ele for arr in data for ele in arr]))
elementTupleList = [tuple([ele]) for ele in tempSet]
frequentTuplesList, supportDict = genearateFrequentSets(dataSetList=dataSetList,
elementTupleList=elementTupleList, minSupport=minSupport)
ruleList = []
ruleList = generateAllRules(frequentTuplesList=frequentTuplesList, supportDict=supportDict, minConfidence=minConfidence)
return ruleList
if __name__ == '__main__':
begin = time.perf_counter()
dataFrame = pd.read_csv(filepath_or_buffer=r'mydata.csv')
transactions = [x.strip('{}').split(',') for x in dataFrame['items']]
ruleList = apriori(data=transactions, minSupport=0.001, minConfidence=0.4)
end = time.perf_counter()
print(end-begin)
print(len(ruleList))
优化版
def generateCandidates(targetEachlen, frequentTuples=[]):
candidates = []
lenOfArr = len(frequentTuples)
for i in range(lenOfArr):
for j in range(i+1, lenOfArr):
tupleA = frequentTuples[i]
tupleB = frequentTuples[j]
if tupleA[:targetEachlen-2] == tupleB[:targetEachlen-2]:
tupleC = tupleA + tuple([tupleB[-1]])
candidates.append( tupleC )
else:
break
return candidates
def genearateAllFrequent(data=[[]], minSupport=1):
lenOfTransactions = float(len(data))
dataSetList = list(map(set, data))
tempSet = sorted(set([ele for arr in data for ele in arr]))
tempDistributions = []
nextDistributions = []
frequentTuples = []
supportDict = {}
for ele in tempSet:
temp = set()
for i in range(len(dataSetList)):
if ele in dataSetList[i]:
temp.add(i)
support = float(len(temp))/lenOfTransactions
tempDistributions.append(temp)
if support >= minSupport:
nextDistributions.append(temp)
tupleEle = tuple({ele})
frequentTuples.append(tupleEle)
supportDict[tupleEle] = support
# print(1, len(tempSet), len(frequentTuples))
k = 2
frequentTuplesList = [frequentTuples]
while (k-2) 1:
frequentTuples, tempSupportDict, nextDistributions = generatefrequentTuples(frequentTuples=frequentTuples,
targetEachlen=k,
lastDistributions=nextDistributions,
lenOfTransactions=lenOfTransactions,
minSupport=minSupport)
supportDict.update(tempSupportDict)
if len(frequentTuples) > 0:
frequentTuplesList.append(frequentTuples)
k += 1
return frequentTuplesList,supportDict
def generatefrequentTuples(frequentTuples, targetEachlen, lastDistributions=[], lenOfTransactions=1.0, minSupport=1.0):
nextFrequentTuples = []
nextDistributions = []
lenOfArr = len(frequentTuples)
supportDict = {}
candidateCount = 0
for i in range(lenOfArr):
for j in range(i+1, lenOfArr):
tupleA = frequentTuples[i]
tupleB = frequentTuples[j]
if tupleA[:targetEachlen-2] == tupleB[:targetEachlen-2]:
# candidateCount += 1
setC = lastDistributions[i] & lastDistributions[j]
support = float(len(setC))/lenOfTransactions
if support >= minSupport:
tupleC = tupleA + tuple([tupleB[-1]])
nextFrequentTuples.append( tupleC )
nextDistributions.append(setC)
supportDict[tupleC] = support
else:
break
# print(targetEachlen, candidateCount, len(nextFrequentTuples))
return nextFrequentTuples, supportDict, nextDistributions
def generateAllRules(frequentTuplesList, supportDict, minConfidence):
ruleList = []
for i in range(1, len(frequentTuplesList)):
for frequentTuple in frequentTuplesList[i]:
itemsOfTuple = [tuple({item}) for item in frequentTuple]
if i > 1:
ruleList.extend(generateRulesOfFrequentSet(frequentTuple, itemsOfTuple, supportDict, minConfidence))
else:
ruleList.extend(calcConfidenceOnSubSetList(frequentTuple, itemsOfTuple, supportDict, minConfidence))
return ruleList
def generateRulesOfFrequentSet(frequentTuple, itemsOfTuple, supportDict, minConfidence):
confidenceList = []
k = 1
tupleCandidates = itemsOfTuple
lenOfTuple = len(itemsOfTuple)
while len(tupleCandidates) >= 2:
confidenceList.extend(calcConfidenceOnSubSetList(frequentTuple, tupleCandidates, supportDict, minConfidence))
k += 1
if k>int(lenOfTuple/2):
break
tupleCandidates = generateCandidates(k, tupleCandidates)
return confidenceList
def calcConfidenceOnSubSetList(wholeTuple, subTuples, supportDict={}, minConfidence=1):
confidenceList = []
for leftSubTuple in subTuples:
tempSet = set(leftSubTuple)
rightSubTuple = tuple([v for v in wholeTuple if v not in tempSet])
# rightSubTuple = tuple([v for v in wholeTuple if v not in leftSubTuple])
conf1 = supportDict[wholeTuple] / supportDict[leftSubTuple]
rule1 = [leftSubTuple, rightSubTuple, conf1]
if(rule1[2] >= minConfidence):
confidenceList.append(rule1)
if len(wholeTuple)/2 == len(leftSubTuple):
continue
conf2 = supportDict[wholeTuple] / supportDict[rightSubTuple]
rule2 = [rightSubTuple, leftSubTuple, conf2]
if(rule2[2] >= minConfidence):
confidenceList.append(rule2)
return confidenceList
def apriori(data=[[]], minSupport=1, minConfidence=1):
frequentTuplesList, supportDict = genearateAllFrequent(data=data, minSupport=minSupport)
ruleList = []
# print(sys.getsizeof(frequentTuplesList), sys.getsizeof(supportDict))
ruleList = generateAllRules(frequentTuplesList=frequentTuplesList, supportDict=supportDict, minConfidence=minConfidence)
return ruleList
if __name__ == '__main__':
begin = datetime.datetime.now()
dataFrame = pd.read_csv(filepath_or_buffer=r'mydata.csv')
data = [x.strip('{}').split(',') for x in dataFrame['items']]
ruleList = apriori(data=data, minSupport=0.001, minConfidence=0.4)
# end = time.perf_counter()
end = datetime.datetime.now()
print((end-begin).total_seconds())
print(len(ruleList))



