Spark Louvain社区算法(Python+Graphx)

  • 样本数据

边:edge.cvs

  陈二,张三
  李四,刘一
  王五,刘一
  王五,张三
  赵六,刘一
  孙七,张三

顶点:vertices.cvs

  刘一,true,false
  陈二,false,false
  张三,false,false
  李四,false,false
  王五,false,false
  赵六,false,false
  孙七,false,false
  • Scala代码
import org.apache.spark.SparkConf
import org.apache.spark.graphx.{Edge, Graph, GraphXUtils, VertexId}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.storage.StorageLevel

import java.util
import scala.collection.mutable
import scala.io.Source
import scala.sys.process.Process

/**
 *
 * @author Jianbo.Peng <pengjianbo@baiqishi.com>
 * @date 2021/1/11 11:08 上午
 */
object Louvain {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf()
    val spark = SparkSession
      .builder()
      .config(conf)
      .appName("Louvain sample")
      .master("local[*]")
      .getOrCreate()
    GraphXUtils.registerKryoClasses(conf)

    //1.源数据RDD
    val srcVerticesRDD = getVerticesRDD(spark)
    val srcEdgesRDD = getEdgesRDD(spark)

    //2.顶点转换成Long类型的RDD
    val verticesRDD = verticesZipWithIndex(srcVerticesRDD)
    verticesRDD.persist(StorageLevel.MEMORY_AND_DISK)
    val edgesRDD = edgesLongJoinWithRDD(verticesRDD, srcEdgesRDD)
    edgesRDD.persist(StorageLevel.MEMORY_AND_DISK)

    //3.graphx图实例
    val lastVerticesRDD = verticesRDD.map(x => (x._2._3, ""))
    val graph = Graph(lastVerticesRDD, edgesRDD)

    //4.划分子网
    val ccRDD = graph.connectedComponents()
      .vertices //(顶点ID,子网ID)
    graph.unpersist()

    //5.执行louvain算法
    val edgesAllRDD = edgesRDD.map(x => (x.srcId, x.dstId)).union(edgesRDD.map(x => (x.dstId, x.srcId)))
    val rdd: RDD[(VertexId, Set[VertexId])] = ccRDD.join(edgesAllRDD)
      .map(x => {
        (x._2._1, Set((x._1, x._2._2)))
      }).reduceByKey(_ ++ _)
      .flatMap(x => {
        val map = new util.HashMap[VertexId, VertexId]()
        val str = new StringBuilder()
        for (elem <- x._2) {
          if (!(map.containsKey(elem._1) && map.containsValue(x._2))) {
            map.put(elem._1, elem._2)
            str.append("," + elem._1 + ":" + elem._2)
          }
        }

        //通过执行Python方式执行louvain算法效率不高,因为每次都需要创建进程会消耗一定的时间,可以考虑在Python中直接读取子网数据
        val shell = "python louvain_networkx_simple.py " + str.substring(1)
        val (process, stream) = StreamProcessLogger.run(Process(shell))

        val result = stream.filter(y => {
          y != null && y.contains("result=")
        }).map(y => {
          y.trim
            .replace("result=", "")
            .replace(" ", "")
            .replace("'", "")
            .replace("}", "")
            .replace("{", "")
        })
        val resultSet = mutable.Set[(VertexId, Set[VertexId])]()
        if (!(result == null || result.isEmpty)) {
          val dataMap = new mutable.HashMap[String, mutable.Set[VertexId]]()
          result.flatMap(x => {
            val array = x.split(",")
            array.toSet
          }).foreach(x => {
            val array = x.split(":")
            val dataSet: mutable.Set[VertexId] = dataMap.getOrElse(array(1), mutable.Set[VertexId]())
            dataSet += array(0).toLong
            dataMap.put(array(1), dataSet)
          })

          for (elem <- dataMap) {
            val id = elem._2.min
            resultSet += (id, elem._2.toSet).x
          }
        }
        resultSet.toSet
      })

    rdd.collect().foreach(println)
  }

  def getVerticesRDD(spark: SparkSession): RDD[(String, (Boolean, Boolean))] = {
    val in = this.getClass.getClassLoader.getResourceAsStream("vertices.csv")
    val list = Source.fromInputStream(in).getLines().map(x => {
      val array = x.split(",")
      (array(0), (array(1).toBoolean, array(2).toBoolean)) //(顶点,黑,会)
    }).toList
    spark.sparkContext.parallelize(list)
  }

  def getEdgesRDD(spark: SparkSession): RDD[(String, String)] = {
    val in = this.getClass.getClassLoader.getResourceAsStream("edges.csv")
    val list = Source.fromInputStream(in).getLines().map(x => {
      val array = x.split(",")
      (array(0), array(1)) //(源点,目标点)
    }).toList

    spark.sparkContext.parallelize(list)
  }

  def verticesZipWithIndex(rdd: RDD[(String, (Boolean, Boolean))]): RDD[(String, (Boolean, Boolean, Long))] = {
    rdd.zipWithUniqueId().map(x => {
      (x._1._1, (x._1._2._1, x._1._2._2, x._2))
    })
  }

  def edgesLongJoinWithRDD(longVerticesRDD: RDD[(String, (Boolean, Boolean, Long))], edgeRDD: RDD[(String, String)]): RDD[Edge[String]] = {
    edgeRDD.join(longVerticesRDD).map(_._2).join(longVerticesRDD).map(x => Edge(x._2._1._3, x._2._2._3, ""))
  }
}
  • python代码(louvain_networkx_simple.py)
# coding=utf-8
import community as community_louvain
import matplotlib.cm as cm
import matplotlib.pyplot as plt
import networkx as nx
import pandas as pd

import sys
# pip3 install python-louvain
# pip3 install community
# pip3 install matplotlib
# pip3 install networkx

#获得边信息,格式如下:顶点1:顶点2,顶点1:顶点3,顶点2:顶点4 表示有3条边
str_edge=sys.argv[1]
print(str_edge)
l_edge=str_edge.strip().split(",")
l_edge=[edge.split(":") for edge in l_edge]

G = nx.Graph() # 创建无向图
#顶点数_边数_子网id.png
G.add_edges_from(l_edge)

#直接调用库networkx跑鲁文函数,封装得太厉害了
partition = community_louvain.best_partition(G)

print("result=",partition)
  • 运行结果
(刘一,0.4149655466297736)
(陈二,0.03557955867115836)
(张三,0.125576471090981)
(李四,0.11757313251729504)
(王五,0.1531526911884534)
(赵六,0.11757313251729504)
(孙七,0.03557955867115836)
如果觉得我的文章对你有用,请随意赞赏