手写PageRank算法
- 样本数据
边:edge.cvs
陈二,张三
李四,刘一
王五,刘一
王五,张三
赵六,刘一
孙七,张三
顶点:vertices.cvs
刘一,true,false
陈二,false,false
张三,false,false
李四,false,false
王五,false,false
赵六,false,false
孙七,false,false
- 代码
import breeze.linalg.{DenseMatrix, DenseVector}
import org.apache.spark.SparkConf
import org.apache.spark.graphx.{Edge, Graph, GraphXUtils, VertexId}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.storage.StorageLevel
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import scala.io.Source
object PageRank {
val DAMPING = 0.85
val TOL = 1e-06
val MAX_ITER: Int = 100
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
val spark = SparkSession
.builder()
.config(conf)
.appName("PageRank sample")
.master("local[*]")
.getOrCreate()
GraphXUtils.registerKryoClasses(conf)
//1.源数据RDD
val srcVerticesRDD = getVerticesRDD(spark)
val srcEdgesRDD = getEdgesRDD(spark)
//2.顶点转换成Long类型的RDD
val verticesRDD = verticesZipWithIndex(srcVerticesRDD)
verticesRDD.persist(StorageLevel.MEMORY_AND_DISK)
val edgesRDD = edgesLongJoinWithRDD(verticesRDD, srcEdgesRDD)
edgesRDD.persist(StorageLevel.MEMORY_AND_DISK)
//3.graphx图实例
val lastVerticesRDD = verticesRDD.map(x => (x._2._3, ""))
val graph = Graph(lastVerticesRDD, edgesRDD)
//4.CC算法划分子网
val ccRDD = graph.connectedComponents()
.vertices //(顶点ID,子网ID)
//5.计算pagerank
//得到顶点的一度所有顶点
val oneDegreeRDD = edgesRDD.flatMap(x => {
Set((x.srcId, Set(x.dstId)), (x.dstId, Set(x.srcId)))
}).reduceByKey(_ ++ _)
edgesRDD.unpersist()
val rankRDD = ccRDD.join(oneDegreeRDD)
.map(x => (x._2._1, List((x._1, x._2._2)))) //(子网ID,(顶点ID,Set(目标顶点ID)))
.reduceByKey(_ ::: _)
.map(_._2)
.map(pageRank(_, MAX_ITER, DAMPING, TOL))
.flatMap(_.seq)
//6.得到文本类型顶点
val simpleVerticesRDD = verticesRDD.map(x => (x._2._3, x._1))
rankRDD.join(simpleVerticesRDD).map(_._2.swap).collect().foreach(println)
spark.stop()
}
/**
* pagerank计算子网数据
*
* @param subGraph 子网信息
* @param iterations 最大迭代次数
* @param damping 衰减因子
* @param tol 收敛标准
* @return RDD[(顶点ID,rank值)]
*/
def pageRank(subGraph: List[(VertexId, Set[VertexId])], iterations: Int, damping: Double, tol: Double): List[(VertexId, Double)] = {
//顶点数
val vertexSize = subGraph.size
//矩阵
val M = DenseMatrix.zeros[Double](vertexSize, vertexSize)
//矩阵赋值
for (i <- 0 until vertexSize) {
val dvArray = mutable.ArrayBuffer[Double]()
for (j <- 0 until vertexSize) {
if (i == j) {
dvArray += 0
} else {
val item = subGraph.apply(j)
var weight: Double = 0
//判断x.apply(j)._1顶点是否有x.apply(i)._1的边
if (item._2.contains(subGraph.apply(i)._1)) {
val target = item._2
weight = 1f / target.size
}
dvArray += weight
}
}
M(i, ::) := DenseVector[Double](dvArray.toArray).t
}
val v = new Array[Double](vertexSize)
for (i <- 0 until vertexSize) {
v(i) = Math.random()
}
var v_cur = new Array[Double](vertexSize)
var norm: Double = 0
for (i <- v) {
norm = norm + i
}
for (i <- 0 until vertexSize) {
v_cur(i) = v(i) / norm
}
val M_hat = DenseMatrix.zeros[Double](vertexSize, vertexSize)
for (i <- 0 until vertexSize) {
for (j <- 0 until vertexSize) {
M_hat(i, j) = M(i, j) * damping + (1 - damping) / vertexSize
}
}
var diff_tol: Double = tol + 1
var iter: Int = 0
var v_new: Array[Double] = null
while (diff_tol > tol && iter <= iterations) {
v_new = new Array[Double](vertexSize)
for (i <- 0 until vertexSize) {
for (j <- 0 until vertexSize) {
v_new(i) = M_hat(i, j) * v_cur(j) + v_new(i)
}
}
var diff_tol_tmp: Double = 0
for (i <- 0 until vertexSize) {
diff_tol_tmp = diff_tol_tmp + Math.abs(v_new(i) - v_cur(i))
}
diff_tol = diff_tol_tmp
v_cur = v_new
iter = iter + 1
}
val result = new ListBuffer[(VertexId, Double)]()
for (i <- 0 until vertexSize) {
result += (subGraph.apply(i)._1, v_new(i)).x
}
result.toList
}
def getVerticesRDD(spark: SparkSession): RDD[(String, (Boolean, Boolean))] = {
val in = this.getClass.getClassLoader.getResourceAsStream("vertices.csv")
val list = Source.fromInputStream(in).getLines().map(x => {
val array = x.split(",")
(array(0), (array(1).toBoolean, array(2).toBoolean)) //(顶点,黑,会)
}).toList
spark.sparkContext.parallelize(list)
}
def getEdgesRDD(spark: SparkSession): RDD[(String, String)] = {
val in = this.getClass.getClassLoader.getResourceAsStream("edges.csv")
val list = Source.fromInputStream(in).getLines().map(x => {
val array = x.split(",")
(array(0), array(1)) //(源点,目标点)
}).toList
spark.sparkContext.parallelize(list)
}
def verticesZipWithIndex(rdd: RDD[(String, (Boolean, Boolean))]): RDD[(String, (Boolean, Boolean, Long))] = {
rdd.zipWithUniqueId().map(x => {
(x._1._1, (x._1._2._1, x._1._2._2, x._2))
})
}
def edgesLongJoinWithRDD(longVerticesRDD: RDD[(String, (Boolean, Boolean, Long))], edgeRDD: RDD[(String, String)]): RDD[Edge[String]] = {
edgeRDD.join(longVerticesRDD).map(_._2).join(longVerticesRDD).map(x => Edge(x._2._1._3, x._2._2._3, ""))
}
}
- 运行结果
(刘一,0.2413130877467077)
(陈二,0.08980057573420146)
(张三,0.24131308774671542)
(李四,0.0898005757342042)
(王五,0.1581725580906551)
(赵六,0.0898005757342042)
(孙七,0.08980057573420146)
版权属于:版权归 bbmax.cc 所有,转载请注明出处
本文链接:https://www.bbmax.cc/index.php/archives/26/
转载时须注明出处及本声明