手写PageRank算法

  • 样本数据

边:edge.cvs

  陈二,张三
  李四,刘一
  王五,刘一
  王五,张三
  赵六,刘一
  孙七,张三

顶点:vertices.cvs

  刘一,true,false
  陈二,false,false
  张三,false,false
  李四,false,false
  王五,false,false
  赵六,false,false
  孙七,false,false
  • 代码
import breeze.linalg.{DenseMatrix, DenseVector}
import org.apache.spark.SparkConf
import org.apache.spark.graphx.{Edge, Graph, GraphXUtils, VertexId}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.storage.StorageLevel

import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import scala.io.Source

object PageRank {

  val DAMPING = 0.85
  val TOL = 1e-06

  val MAX_ITER: Int = 100

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    val spark = SparkSession
      .builder()
      .config(conf)
      .appName("PageRank sample")
      .master("local[*]")
      .getOrCreate()
    GraphXUtils.registerKryoClasses(conf)

    //1.源数据RDD
    val srcVerticesRDD = getVerticesRDD(spark)
    val srcEdgesRDD = getEdgesRDD(spark)

    //2.顶点转换成Long类型的RDD
    val verticesRDD = verticesZipWithIndex(srcVerticesRDD)
    verticesRDD.persist(StorageLevel.MEMORY_AND_DISK)
    val edgesRDD = edgesLongJoinWithRDD(verticesRDD, srcEdgesRDD)
    edgesRDD.persist(StorageLevel.MEMORY_AND_DISK)

    //3.graphx图实例
    val lastVerticesRDD = verticesRDD.map(x => (x._2._3, ""))
    val graph = Graph(lastVerticesRDD, edgesRDD)

    //4.CC算法划分子网
    val ccRDD = graph.connectedComponents()
      .vertices //(顶点ID,子网ID)

    //5.计算pagerank
    //得到顶点的一度所有顶点
    val oneDegreeRDD = edgesRDD.flatMap(x => {
      Set((x.srcId, Set(x.dstId)), (x.dstId, Set(x.srcId)))
    }).reduceByKey(_ ++ _)
    edgesRDD.unpersist()
    val rankRDD = ccRDD.join(oneDegreeRDD)
      .map(x => (x._2._1, List((x._1, x._2._2)))) //(子网ID,(顶点ID,Set(目标顶点ID)))
      .reduceByKey(_ ::: _)
      .map(_._2)
      .map(pageRank(_, MAX_ITER, DAMPING, TOL))
      .flatMap(_.seq)

    //6.得到文本类型顶点
    val simpleVerticesRDD = verticesRDD.map(x => (x._2._3, x._1))
    rankRDD.join(simpleVerticesRDD).map(_._2.swap).collect().foreach(println)

    spark.stop()
  }


  /**
   * pagerank计算子网数据
   *
   * @param subGraph   子网信息
   * @param iterations 最大迭代次数
   * @param damping    衰减因子
   * @param tol        收敛标准
   * @return RDD[(顶点ID,rank值)]
   */
  def pageRank(subGraph: List[(VertexId, Set[VertexId])], iterations: Int, damping: Double, tol: Double): List[(VertexId, Double)] = {
    //顶点数
    val vertexSize = subGraph.size
    //矩阵
    val M = DenseMatrix.zeros[Double](vertexSize, vertexSize)
    //矩阵赋值
    for (i <- 0 until vertexSize) {
      val dvArray = mutable.ArrayBuffer[Double]()
      for (j <- 0 until vertexSize) {
        if (i == j) {
          dvArray += 0
        } else {
          val item = subGraph.apply(j)
          var weight: Double = 0
          //判断x.apply(j)._1顶点是否有x.apply(i)._1的边
          if (item._2.contains(subGraph.apply(i)._1)) {
            val target = item._2
            weight = 1f / target.size
          }
          dvArray += weight
        }
      }
      M(i, ::) := DenseVector[Double](dvArray.toArray).t
    }

    val v = new Array[Double](vertexSize)
    for (i <- 0 until vertexSize) {
      v(i) = Math.random()
    }

    var v_cur = new Array[Double](vertexSize)
    var norm: Double = 0
    for (i <- v) {
      norm = norm + i
    }
    for (i <- 0 until vertexSize) {
      v_cur(i) = v(i) / norm
    }

    val M_hat = DenseMatrix.zeros[Double](vertexSize, vertexSize)
    for (i <- 0 until vertexSize) {
      for (j <- 0 until vertexSize) {
        M_hat(i, j) = M(i, j) * damping + (1 - damping) / vertexSize
      }
    }

    var diff_tol: Double = tol + 1
    var iter: Int = 0
    var v_new: Array[Double] = null

    while (diff_tol > tol && iter <= iterations) {
      v_new = new Array[Double](vertexSize)
      for (i <- 0 until vertexSize) {
        for (j <- 0 until vertexSize) {
          v_new(i) = M_hat(i, j) * v_cur(j) + v_new(i)
        }
      }

      var diff_tol_tmp: Double = 0
      for (i <- 0 until vertexSize) {
        diff_tol_tmp = diff_tol_tmp + Math.abs(v_new(i) - v_cur(i))
      }
      diff_tol = diff_tol_tmp

      v_cur = v_new

      iter = iter + 1
    }

    val result = new ListBuffer[(VertexId, Double)]()
    for (i <- 0 until vertexSize) {
      result += (subGraph.apply(i)._1, v_new(i)).x
    }

    result.toList
  }

  def getVerticesRDD(spark: SparkSession): RDD[(String, (Boolean, Boolean))] = {
    val in = this.getClass.getClassLoader.getResourceAsStream("vertices.csv")
    val list = Source.fromInputStream(in).getLines().map(x => {
      val array = x.split(",")
      (array(0), (array(1).toBoolean, array(2).toBoolean)) //(顶点,黑,会)
    }).toList
    spark.sparkContext.parallelize(list)
  }

  def getEdgesRDD(spark: SparkSession): RDD[(String, String)] = {
    val in = this.getClass.getClassLoader.getResourceAsStream("edges.csv")
    val list = Source.fromInputStream(in).getLines().map(x => {
      val array = x.split(",")
      (array(0), array(1)) //(源点,目标点)
    }).toList

    spark.sparkContext.parallelize(list)
  }

  def verticesZipWithIndex(rdd: RDD[(String, (Boolean, Boolean))]): RDD[(String, (Boolean, Boolean, Long))] = {
    rdd.zipWithUniqueId().map(x => {
      (x._1._1, (x._1._2._1, x._1._2._2, x._2))
    })

  }

  def edgesLongJoinWithRDD(longVerticesRDD: RDD[(String, (Boolean, Boolean, Long))], edgeRDD: RDD[(String, String)]): RDD[Edge[String]] = {
    edgeRDD.join(longVerticesRDD).map(_._2).join(longVerticesRDD).map(x => Edge(x._2._1._3, x._2._2._3, ""))
  }
}
  • 运行结果
(刘一,0.2413130877467077)
(陈二,0.08980057573420146)
(张三,0.24131308774671542)
(李四,0.0898005757342042)
(王五,0.1581725580906551)
(赵六,0.0898005757342042)
(孙七,0.08980057573420146)
如果觉得我的文章对你有用,请随意赞赏