栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 其他 > Spark

Spark Graphx 实现图中极大团挖掘, 伪并行化算法

Spark 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力



 ####背景:####

spark graphx并未提供极大团挖掘算法

当下的极大团算法都是串行化的算法,基于Bron–Kerbosch算法

####思路:####

spark graphx提供了连通图的算法,连通图和极大团都是无向图中的概念,极大团为连通图的子集

利用spark graphx 找出连通图,在从各个连通图中,利用串行化的极大团算法,找出极大团 (伪并行化)

对于关联性较强的图,找出来的连通图非常大,这时串行化的极大团算法,仍然会耗时很久,这里利用剪枝的思想减少样本数据量,但是对于大图,优化空间有限

期待真正的并行化的极大团算法

####配置文件:####

  1. graph_data_path=hdfs://localhost/graph_data out_path=hdfs://localhost/clique 
  2. ck_path=hdfs://localhost/checkpoint numIter=50      剪枝次数 
  3. count=3         极大团顶点数大小 algorithm=2     极大团算法,1:个人实现  2:jgrapht 
  4. percent=90      剪枝后的顶点数,占前一次的百分比,如果剪完后,还剩下90%的数据,那么剪枝效率已然不高 spark.master=local 
  5. spark.app.name=graph spark.serializer=org.apache.spark.serializer.KryoSerializer 
  6. spark.yarn.executor.memoryOverhead=20480 spark.yarn.driver.memoryOverhead=20480 
  7. spark.driver.extraJavaOptions=-XX:+UseG1GC -XX:+UseCompressedOops -XX:+DisableExplicitGC spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:+UseCompressedOops -XX:+DisableExplicitGC 
  8. spark.driver.maxResultSize=10g spark.default.parallelism=60 

jgrapht

####样本数据:####

{"src":"0","dst":"1"} {"src":"0","dst":"2"} {"src":"0","dst":"3"} {"src":"1","dst":"0"} {"src":"2","dst":"1"} {"src":"3","dst":"5"} {"src":"4","dst":"6"} {"src":"5","dst":"4"} {"src":"6","dst":"5"} {"src":"3","dst":"2"} {"src":"2","dst":"3"} {"src":"6","dst":"4"} {"src":"3","dst":"4"} {"src":"4","dst":"3"} {"src":"2","dst":"6"} {"src":"6","dst":"2"} {"src":"6","dst":"7"} {"src":"7","dst":"6"}

####样本图:####



 

####输出:####

0,1,2 0,2,3 3,4,5 4,5,6

####代码实现:####

  1. import java.util import java.util.Properties 
  1. import org.apache.spark.broadcast.Broadcast import org.apache.spark.graphx.{Edge, Graph} 
  2. import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Row, SQLContext} 
  3. import org.apache.spark.storage.StorageLevel import org.apache.spark.{SparkConf, SparkContext} 
  4. import org.jgrapht.alg.BronKerboschCliqueFinder import org.jgrapht.graph.{DefaultEdge, SimpleGraph} 
  5.  import scala.collection.JavaConverters._ 
  6. import scala.collection.mutable  
  7. object ApplicationTitan {     def main(args: Array[String]) { 
  8.         val prop = new Properties()         prop.load(getClass.getResourceAsStream("/config.properties")) 
  9.              val graph_data_path = prop.getProperty("graph_data_path") 
  10.         val out_path = prop.getProperty("out_path")         val ck_path = prop.getProperty("ck_path") 
  11.         val count = Integer.parseInt(prop.getProperty("count"))         val numIter = Integer.parseInt(prop.getProperty("numIter")) 
  12.         val algorithm = Integer.parseInt(prop.getProperty("algorithm"))         val percent = Integer.parseInt(prop.getProperty("percent")) 
  13.         val conf = new SparkConf()         try { 
  14.           Runtime.getRuntime.exec("hdfs dfs -rm -r " + out_path) //            Runtime.getRuntime.exec("cmd.exe /C rd /s /q " + out_path) 
  15.         } catch {           case ex: Exception => 
  16.             ex.printStackTrace(System.out)         } 
  17.              prop.stringPropertyNames().asScala.foreach(s => { 
  18.           if (s.startsWith("spark")) {             conf.set(s, prop.getProperty(s)) 
  19.           }         }) 
  20.         conf.registerKryoClasses(Array(getClass))         val sc = new SparkContext(conf) 
  21.         sc.setLogLevel("ERROR")         sc.setCheckpointDir(ck_path) 
  22.         val sqlc = new SQLContext(sc)         try { 
  23.           val e_df = sqlc.read //                        .json(graph_data_path) 
  24.         .parquet(graph_data_path)  
  25.           var e_rdd = e_df             .mapPartitions(it => { 
  26.               it.map({                 case Row(dst: String, src: String) => 
  27.                   val src_long = src.toLong                   val dst_long = dst.toLong 
  28.                   if (src_long < dst_long) (src_long, dst_long) else (dst_long, src_long)               }) 
  29.             }).distinct()           e_rdd.persist(StorageLevel.MEMORY_AND_DISK_SER) 
  30.                var bc: Broadcast[Set[Long]] = null 
  31.           var iter = 0           var bc_size = 0 
  32.          //剪枝           while (iter <= numIter) { 
  33.             val temp = e_rdd               .flatMap(x => List((x._1, 1), (x._2, 1))) 
  34.               .reduceByKey((x, y) => x + y)               .filter(x => x._2 >= count - 1) 
  35.               .mapPartitions(it => it.map(x => x._1))             val bc_value = temp.collect().toSet 
  36.             bc = sc.broadcast(bc_value)             e_rdd = e_rdd.filter(x => bc.value.contains(x._1) && bc.value.contains(x._2)) 
  37.             e_rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)             iter += 1 
  38.             if (bc_size != 0 && bc_value.size >= bc_size * percent / 100) {               println("total iter : "+ iter) 
  39.               iter = Int.MaxValue             } 
  40.             bc_size = bc_value.size           } 
  41.                // 构造图 
  42.           val edge: RDD[Edge[Long]] = e_rdd.mapPartitions(it => it.map(x => Edge(x._1, x._2)))           val graph = Graph.fromEdges(edge, 0, StorageLevel.MEMORY_AND_DISK_SER, StorageLevel.MEMORY_AND_DISK_SER) 
  43.                //连通图 
  44.           val cc = graph.connectedComponents().vertices           cc.persist(StorageLevel.MEMORY_AND_DISK_SER) 
  45.                cc.join(e_rdd) 
  46.             .mapPartitions(it => it.map(x => ((math.random * 10).toInt.toString.concat(x._2._1.toString), (x._1, x._2._2))))             .aggregateByKey(List[(Long, Long)]())((list, v) => list :+ v, (list1, list2) => list1 ::: list2) 
  47.             .mapPartitions(it => it.map(x => (x._1.substring(1), x._2)))             .aggregateByKey(List[(Long, Long)]())((list1, list2) => list1 ::: list2, (list3, list4) => list3 ::: list4) 
  48.             .filter(x => x._2.size >= count - 1)             .flatMap(x => { 
  49.               if (algorithm == 1)                 find(x, count) 
  50.               else                 find2(x, count) 
  51.             })             .mapPartitions(it => { 
  52.               it.map({                 case set => 
  53.                   var temp = ""                   set.asScala.foreach(x => temp += x + ",") 
  54.                   temp.substring(0, temp.length - 1)                 case _ => 
  55.               })             }) 
  56.     //                .coalesce(1)     .saveAsTextFile(out_path) 
  57. }  
  58.     catch {   case ex: Exception => 
  59.     ex.printStackTrace(System.out)     } 
  60.     sc.stop() } 
  61. //自己实现的极大团算法  def find(x: (String, List[(Long, Long)]), count: Int): mutable.Set[util.Set[String]] = { 
  62.     println(x._1 + "|s|" + x._2.size)     println("BKCliqueFinder---" + x._1 + "---" + System.currentTimeMillis()) 
  63.     val neighbors = new util.HashMap[String, util.Set[String]]     val finder = new CliqueFinder(neighbors, count) 
  64.     x._2.foreach(r => {       val v1 = r._1.toString 
  65.       val v2 = r._2.toString       if (neighbors.containsKey(v1)) { 
  66.         neighbors.get(v1).add(v2)       } else { 
  67.         val temp = new util.HashSet[String]()         temp.add(v2) 
  68.         neighbors.put(v1, temp)       } 
  69.       if (neighbors.containsKey(v2)) {         neighbors.get(v2).add(v1) 
  70.       } else {         val temp = new util.HashSet[String]() 
  71.         temp.add(v1)         neighbors.put(v2, temp) 
  72.       }     }) 
  73.     println("BKCliqueFinder---" + x._1 + "---" + System.currentTimeMillis())     finder.findMaxCliques().asScala 
  74. } //jgrapht 中的极大团算法 
  75.  def find2(x: (String, List[(Long, Long)]), count: Int): Set[util.Set[String]] = {     println(x._1 + "|s|" + x._2.size) 
  76.     println("BKCliqueFinder---" + x._1 + "---" + System.currentTimeMillis())     val to_clique = new SimpleGraph[String, DefaultEdge](classOf[DefaultEdge]) 
  77.     x._2.foreach(r => {       val v1 = r._1.toString 
  78.       val v2 = r._2.toString       to_clique.addVertex(v1) 
  79.       to_clique.addVertex(v2)       to_clique.addEdge(v1, v2) 
  80.     })     val finder = new BronKerboschCliqueFinder(to_clique) 
  81.     val list = finder.getAllMaximalCliques.asScala     var result = Set[util.Set[String]]() 
  82.     list.foreach(x => {       if (x.size() >= count) 
  83.         result = result + x     }) 
  84.     println("BKCliqueFinder---" + x._1 + "---" + System.currentTimeMillis())     result 
  85. } } 

//自己实现的极大团算法

  1. import java.util.*;  
  2.  
  3. public class CliqueFinder {     private Map> neighbors; 
  4.     private Set nodes;     private Set> maxCliques = new HashSet<>(); 
  5.     private Integer minSize;  
  6.     public CliqueFinder(Map> neighbors, Integer minSize) {         this.neighbors = neighbors; 
  7.         this.nodes = neighbors.keySet();         this.minSize = minSize; 
  8.     }  
  9.     private void bk3(Set clique, List candidates, List excluded) {         if (candidates.isEmpty() && excluded.isEmpty()) { 
  10.             if (!clique.isEmpty() && clique.size() >= minSize) {                 maxCliques.add(clique); 
  11.             }             return; 
  12.         }  
  13.         for (String s : degeneracy_order(candidates)) {             List new_candidates = new ArrayList<>(candidates); 
  14.             new_candidates.retainAll(neighbors.get(s));  
  15.             List new_excluded = new ArrayList<>(excluded);             new_excluded.retainAll(neighbors.get(s)); 
  16.             Set nextClique = new HashSet<>(clique);             nextClique.add(s); 
  17.             bk2(nextClique, new_candidates, new_excluded);             candidates.remove(s); 
  18.             excluded.add(s);         } 
  19.     }  
  20.     private void bk2(Set clique, List candidates, List excluded) {         if (candidates.isEmpty() && excluded.isEmpty()) { 
  21.             if (!clique.isEmpty() && clique.size() >= minSize) {                 maxCliques.add(clique); 
  22.             }             return; 
  23.         }         String pivot = pick_random(candidates); 
  24.         if (pivot == null) {             pivot = pick_random(excluded); 
  25.         }         List tempc = new ArrayList<>(candidates); 
  26.         tempc.removeAll(neighbors.get(pivot));  
  27.         for (String s : tempc) {             List new_candidates = new ArrayList<>(candidates); 
  28.             new_candidates.retainAll(neighbors.get(s));  
  29.             List new_excluded = new ArrayList<>(excluded);             new_excluded.retainAll(neighbors.get(s)); 
  30.             Set nextClique = new HashSet<>(clique);             nextClique.add(s); 
  31.             bk2(nextClique, new_candidates, new_excluded);             candidates.remove(s); 
  32.             excluded.add(s);         } 
  33.     }  
  34.     private List degeneracy_order(List innerNodes) {         List result = new ArrayList<>(); 
  35.         Map deg = new HashMap<>();         for (String node : innerNodes) { 
  36.             deg.put(node, neighbors.get(node).size());         } 
  37.         while (!deg.isEmpty()) {             Integer min = Collections.min(deg.values()); 
  38.             String minKey = null;             for (String key : deg.keySet()) { 
  39.                 if (deg.get(key).equals(min)) {                     minKey = key; 
  40.                     break;                 } 
  41.             }             result.add(minKey); 
  42.             deg.remove(minKey);             for (String k : neighbors.get(minKey)) { 
  43.                 if (deg.containsKey(k)) {                     deg.put(k, deg.get(k) - 1); 
  44.                 }             } 
  45.          } 
  46.         return result;     } 
  47.   
  48.     private String pick_random(List random) {         if (random != null && !random.isEmpty()) { 
  49.             return random.get(0);         } else { 
  50.             return null;         } 
  51.     }  
  52.     public Set> findMaxCliques() {         this.bk3(new HashSet<>(), new ArrayList<>(nodes), new ArrayList<>()); 
  53.         return maxCliques;     } 
  54.      public static void main(String[] args) { 
  55.         Map> neighbors = new HashMap<>();         neighbors.put("0", new HashSet<>(Arrays.asList("1", "2", "3"))); 
  56.         neighbors.put("1", new HashSet<>(Arrays.asList("0", "2")));         neighbors.put("2", new HashSet<>(Arrays.asList("0", "1", "3", "6"))); 
  57.         neighbors.put("3", new HashSet<>(Arrays.asList("0", "2", "4", "5")));         neighbors.put("4", new HashSet<>(Arrays.asList("3", "5", "6"))); 
  58.         neighbors.put("5", new HashSet<>(Arrays.asList("3", "4", "6")));         neighbors.put("6", new HashSet<>(Arrays.asList("2", "4", "5"))); 
  59.         neighbors.put("7", new HashSet<>(Arrays.asList("6")));         CliqueFinder finder = new CliqueFinder(neighbors, 3); 
  60.         finder.bk3(new HashSet<>(), new ArrayList<>(neighbors.keySet()), new ArrayList<>());         System.out.println(finder.maxCliques); 
  61.     } } 
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/796637.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号