主要功能: 根据交通出行数据,通过刷卡记录,获取用户的所有出行od,以天为单位构建矩阵,对矩阵进行聚类
主要采用了kmeans进行聚类,轮盘法(kmeans++思想)进行簇初始化,采用SSE(拐点)进行聚类效果评价
kmeans++ 与kmeans参考该篇博客:
https://www.cnblogs.com/wang2825/articles/8696830.html
SSE选择最佳K:
import breeze.linalg.{Axis, DenseMatrix, sum}
import breeze.numerics._
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
/**
* Created by GangTian on 2020/4/30 in SIBAT
*/
object Kmeans {
val time_coefficient = 0.5 //时间系数(向量距离计算)
val geo_coefficient = 0.5 // 地理位置系数(向量距离计算)
val k_factor = 0.5 //最大聚类系数,最大簇数 = k_factor * daysOfTrip(出行总天数)
val iterator = 10 //k-means迭代次数
val mse = 0.01 //簇中心变化幅度
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
val spark = SparkSession.builder().master("local[*]").getOrCreate()
val sql = spark.sqlContext
val sc = spark.sparkContext
import spark.implicits._
// 1.数据集准备-----造了一份数据,特征未经过标准化处理,实数据处理过
val list = new ListBuffer[(String, String, String, Double, Double, String, Double, Double, Double, Double)]()
list.append(("001", "2020-05-01", "A", 1, 1, "B", 1.1, 1.1, 7.5, 8.0))
list.append(("001", "2020-05-01", "B", 1.1, 1.1, "A", 1, 1, 18.5, 19.0))
list.append(("001", "2020-05-02", "C", 7, 7, "D", 8, 8, 9.5, 10.5))
list.append(("001", "2020-05-02", "D", 8, 8, "C", 7, 7, 20.5, 21.5))
list.append(("001", "2020-05-03", "A", 1, 1, "B", 1.1, 1.1, 7.6, 8.1))
list.append(("001", "2020-05-03", "A", 1, 1, "B", 1.1, 1.1, 7.6, 8.1))
list.append(("001", "2020-05-03", "A", 1, 1, "B", 1.1, 1.1, 7.6, 8.1))
list.append(("001", "2020-05-03", "A", 1, 1, "B", 1.1, 1.1, 7.6, 8.1))
list.append(("001", "2020-05-03", "B", 1.1, 1.1, "A", 1, 1, 18.4, 19.2))
list.append(("001", "2020-05-04", "C", 7, 7, "D", 8, 8, 9.6, 10.4))
list.append(("001", "2020-05-04", "D", 8, 8, "C", 7, 7, 20.7, 21.6))
list.append(("001", "2020-05-05", "A", 1, 1, "B", 1.1, 1.1, 7.7, 8.3))
list.append(("001", "2020-05-05", "B", 1.1, 1.1, "A", 1, 1, 18.3, 19.0))
list.append(("001", "2020-05-06", "C", 7, 7, "D", 8, 8, 9.2, 10.2))
list.append(("001", "2020-05-06", "C", 7, 7, "D", 8, 8, 9.2, 10.2))
list.append(("001", "2020-05-06", "D", 8, 8, "C", 7, 7, 20.2, 21.3))
list.append(("001", "2020-05-07", "E", 5, 5, "F", 15, 15, 6.1, 8.1))
list.append(("001", "2020-05-07", "F", 15, 15, "G", 20, 20, 16.1, 18.1))
list.append(("001", "2020-05-07", "G", 20, 20, "E", 5, 5, 20.1, 22.1))
list.append(("001", "2020-05-08", "A", 1, 1, "D", 8, 8, 0.5, 10.5))
list.append(("001", "2020-05-08", "D", 8, 8, "F", 15, 15, 10.5, 14.5))
list.append(("001", "2020-05-08", "F", 15, 15, "A", 1, 1, 17.5, 21.5))
list.append(("001", "2020-05-08", "F", 15, 15, "A", 1, 1, 17.5, 21.5))
list.append(("001", "2020-05-08", "F", 15, 15, "A", 1, 1, 17.5, 21.5))
list.append(("001", "2020-05-09", "E", 5, 5, "F", 15, 15, 6.3, 8.4))
list.append(("001", "2020-05-09", "F", 15, 15, "G", 20, 20, 16.4, 18.2))
list.append(("001", "2020-05-09", "G", 20, 20, "E", 5, 5, 20.2, 21.9))
sc.parallelize(list).toDF("card_id", "date", "departLocation", "departLon", "departLat", "arrivalLocation", "arrivalLon", "arrivalLat", "departTime", "arrivalTime")
.sort("date").show(100)
val kmax = 8 //最大天数
val res = new ListBuffer[((Int,Double, ListBuffer[(DenseMatrix[Double], DenseMatrix[Double])]))]()
for(i<-1 to(kmax)){
res.append(run(list,i))
}
val best_k = calPointToLineDis(res.map(tp =>(tp._1,tp._2)))
println("best_k: " + best_k)
// 数据还原回去
val dateArr = new ArrayBuffer[(String,Int)]()
var label = 1
val b2 = res.filter(_._1 == best_k).head._3.groupBy(_._2)
.foreach(tp =>{
val k = tp._2
for(r <- k){
val date_pre = r._1(0,0).toLong.toString
val date = date_pre.substring(0,4) + "-" + date_pre.substring(4,6) + "-"+ date_pre.substring(6)
dateArr.append((date,label))
}
label += 1
})
dateArr.foreach(println(_))
}
/**
* calculate the distance of one point to the line
* @return
*/
def calPointToLineDis(points:ListBuffer[(Int,Double)]):Double={
val points_new = points.sortBy(_._1)
val startPoint = points_new.head
val endPoint = points_new.last
//求取直线系数,k和b
val k = (startPoint._2 - endPoint._2) / (startPoint._1 - endPoint._1)
val b = startPoint._2 - k * startPoint._1
// 求取拐点,拐点离直线距离最远
val disArr = new ArrayBuffer[(Int,Double)]()
for(point <- points_new){
val dis = math.abs(k * point._1 + b - point._2)
disArr.append((point._1,dis))
}
val best_k = disArr.sortBy(_._2).last._1
best_k
}
/**
* the entry of kmeans program, the method return the SSE and matrix and cluster within the k clusters,the matrix and cluster are in the shape of tuple
* @param list
* @param k
* @return
*/
def run(list: ListBuffer[(String, String, String, Double, Double, String, Double, Double, Double, Double)],k:Int):(Int,Double, ListBuffer[(DenseMatrix[Double], DenseMatrix[Double])]) = {
//1.日期数据转化成matrix
val matrix_list = list2Matrix(list)
//2.基于kmeans++ 思想初始化聚类中心
val initial_clusters = initialize_cluster(matrix_list, k)
//3.声明可变的cluster
var variable_clusters = new ListBuffer[DenseMatrix[Double]]()
for (initial_cluster <- initial_clusters) variable_clusters.append(initial_cluster)
//3.计算每个样本xi到K个聚类中心的距离,并将其划分到距离最小的聚类中心对应的类中(matrix,cluster)
var clusterResult = new ListBuffer[(DenseMatrix[Double],DenseMatrix[Double])]()
var flag = true
for (i <- 0 until (iterator) if flag) {
val res = new ListBuffer[(DenseMatrix[Double], DenseMatrix[Double])] //res存放matrix 和其对应的center
for (matrix <- matrix_list) {
val center = calDisOfMatrix2(matrix, variable_clusters)
res.append((matrix, center))
}
clusterResult = res
//根据聚类后的结果,更新center矩阵
val newCenter = reflash_center(res)
// 判断center是否发生变化,若center不变,则不再迭代
var flag2 = true
for(ct <- newCenter if flag2){
if(!variable_clusters.contains(ct)){
flag2 = false
variable_clusters = newCenter
}
}
if(flag2) flag = false
}
//4. 计算当前 k对应的SSE指标
val sse = estimate(clusterResult)
(k,sse,clusterResult)
}
/**
* calculate the SSE(sum of the squared error)
* @param clusterResult
* @return
*/
def estimate(clusterResult: ListBuffer[(DenseMatrix[Double], DenseMatrix[Double])]):Double = {
var sse = 0.0
for(tp<- clusterResult){
sse += calDisOfMatrix(tp._1,Array(tp._2))
}
sse
}
/** reflash the center
*
* @param res res store the matrix and it corresponding center in tuple form
*/
def reflash_center(res: ListBuffer[(DenseMatrix[Double], DenseMatrix[Double])]): ListBuffer[DenseMatrix[Double]] = {
val c1 = res.head._2
val rows = c1.rows // center对应的行数
val cols = c1.cols // center对应的列数
val result = new ListBuffer[DenseMatrix[Double]]()
val groups = res.groupBy(_._2)
for (group <- groups) {
val value = group._2.map(_._1)
val len = value.length * 1.0
var center: DenseMatrix[Double] = DenseMatrix.zeros[Double](rows, cols)
for (va <- value) {
center += va
}
val center2: DenseMatrix[Double] = center / len
result.append(center2)
}
result
}
/**
* based on the kmeans++ algorithm to select the k clusters
*
* @param list input data with shape of denseMatrix
* @param k represent the initialize numbers of clusters
* @return
*/
def initialize_cluster(list: ListBuffer[DenseMatrix[Double]], k: Int): Array[DenseMatrix[Double]] = {
var remain_k = k
val seed = (math.random * list.length).toInt
val clusters = new ArrayBuffer[DenseMatrix[Double]]()
//c1簇
clusters.append(list(seed))
remain_k -= 1
//其它簇采用轮盘法选取
for (i <- 0 until (remain_k)) {
// 1. 计算剩余样本与当前所有簇中心的最短距离
val disArr = new ArrayBuffer[(DenseMatrix[Double], Double)]()
for (matrix <- list if !clusters.contains(matrix)) {
val dis = calDisOfMatrix(matrix, clusters.toArray)
disArr.append((matrix, dis))
}
//2.计算概率px
val total = disArr.map(_._2).sum
val px = disArr.map(x => x._2 / total)
//3.根据概率计算区间
val sum_px = new ArrayBuffer[Double]()
var sum_all = 0.0
for (p <- px) {
sum_all += p
sum_px.append(sum_all)
}
//4.随机选择新的簇
val rand = math.random
var flag = true
if (rand < sum_px(0)) {
clusters.append(disArr.head._1)
flag = false
} else if (rand > sum_px(sum_px.length - 2)) {
clusters.append(disArr.last._1)
} else {
for (i <- 0 until (sum_px.length - 1) if flag) {
if (rand > sum_px(i) && rand < sum_px(i + 1)) {
clusters.append(disArr(i + 1)._1)
flag = false
}
}
}
// 5.选除新的簇后,remain_k自减1
remain_k -= 1
}
clusters.toArray
}
/**
* calculate the number of departure and destination pairs,and expandding the one day data to a matrix,
* so every day has the same matrix shape including rows and cols
*
* @param list
* @return
*/
def list2Matrix(list: ListBuffer[(String, String, String, Double, Double, String, Double, Double, Double, Double)]): ListBuffer[DenseMatrix[Double]] = {
// 1.从历史记录中筛选所有od 对
val ods = list.map(tp => (tp._3 + tp._6)).distinct.sorted
// 2.统计每一组od对在所有天数中出现次数的最高频次
val groups = list.groupBy(_._2).toArray
val odNum = new ArrayBuffer[(String, Int)]()
for (od <- ods) {
var max = 0
for (group <- groups) {
val num = group._2.filter(tp => (tp._3 + tp._6) == od).length
if (num > max) max = num
}
odNum.append((od, max))
}
// 3.根据odNum将list记录填充至二维矩阵
val matrix_list = new ListBuffer[DenseMatrix[Double]]()
val rows = odNum.map(_._2).sum //矩阵行数
val cols = 7 //矩阵列数
//4.group代表每一天的记录
for (group <- groups) {
val date = group._1.replace("-", "").toDouble
val odArr = new ListBuffer[(Double, Double, Double, Double, Double, Double, Double)]()
val record = group._2.sortBy(_._9) //按出发时间排序
for (od <- odNum) {
val res = record.filter(tp => (tp._3 + tp._6) == od._1)
try {
res.foreach(tp => odArr.append((date, tp._4, tp._5, tp._7, tp._8, tp._9, tp._10)))
} catch {
case e: Exception => println()
}
// 扩充至od._2
val delta = od._2 - res.length
for (i <- 0 until (delta)) {
odArr.append((date, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0))
}
}
//5. 将扩充后的数组转为矩阵形式
val vec = odArr.map(tp => (tp._1 + "," + tp._2 + "," + tp._3 + "," + tp._4 + "," + tp._5 + "," + tp._6 + "," + tp._7))
.flatMap(str => str.split(","))
.map(_.toDouble)
val matrix = new DenseMatrix[Double](cols, rows, vec.toArray).t
matrix_list.append(matrix)
}
matrix_list
}
/**
* calculate the distance between the input matrix of the clusters,and return the least distance
*
* @param matrix
* @param clusters
*/
def calDisOfMatrix(matrix: DenseMatrix[Double], clusters: Array[DenseMatrix[Double]]): Double = {
var dis = Double.MaxValue //初始化最大value
for (cluster <- clusters) {
val m = matrix - cluster
val dis_1 = sum(sqrt(sum(pow(m(::, 1 to (2)), 2.0), Axis._1)))
val dis_2 = sum(sqrt(sum(pow(m(::, 3 to (4)), 2.0), Axis._1)))
val dis_3 = sum(sum(abs(m(::, 5 to (6))), Axis._1))
val distance = geo_coefficient * (dis_1 + dis_2) + time_coefficient * dis_3
if (distance < dis) dis = distance
}
math.pow(dis, 2.0)
}
/**
* calculate the least distance between the matrix and the clusters,and return the cluster center which the matrix belongs
*
* @param matrix
* @param clusters
*/
def calDisOfMatrix2(matrix: DenseMatrix[Double], clusters: ListBuffer[DenseMatrix[Double]]): DenseMatrix[Double] = {
var dis = Double.MaxValue //初始化最大value
var center: DenseMatrix[Double] = null
for (cluster <- clusters) {
val m = matrix - cluster
val dis_1 = sum(sqrt(sum(pow(m(::, 1 to (2)), 2.0), Axis._1)))
val dis_2 = sum(sqrt(sum(pow(m(::, 3 to (4)), 2.0), Axis._1)))
val dis_3 = sum(sum(abs(m(::, 5 to (6))), Axis._1))
val distance = geo_coefficient * (dis_1 + dis_2) + time_coefficient * dis_3
if (distance < dis) {
dis = distance
center = cluster
}
}
center
}
}
数据(真实,非上述)实战结果:
转载:https://blog.csdn.net/Java_Man_China/article/details/105956464
查看评论