小言_互联网的博客

算法小白的第一次尝试----出行模式分析(矩阵聚类,数据实战)

330人阅读  评论(0)

主要功能: 根据交通出行数据,通过刷卡记录,获取用户的所有出行od,以天为单位构建矩阵,对矩阵进行聚类

主要采用了kmeans进行聚类,轮盘法(kmeans++思想)进行簇初始化,采用SSE(拐点)进行聚类效果评价

kmeans++ 与kmeans参考该篇博客:

https://www.cnblogs.com/wang2825/articles/8696830.html

SSE选择最佳K:

import breeze.linalg.{Axis, DenseMatrix, sum}
import breeze.numerics._
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession

import scala.collection.mutable.{ArrayBuffer, ListBuffer}

/**
  * Created by GangTian on 2020/4/30 in SIBAT
  */
object Kmeans {
  val time_coefficient = 0.5 //时间系数(向量距离计算)
  val geo_coefficient = 0.5 // 地理位置系数(向量距离计算)
  val k_factor = 0.5 //最大聚类系数,最大簇数 = k_factor * daysOfTrip(出行总天数)
  val iterator = 10 //k-means迭代次数
  val mse = 0.01 //簇中心变化幅度

  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.ERROR)
    val spark = SparkSession.builder().master("local[*]").getOrCreate()
    val sql = spark.sqlContext
    val sc = spark.sparkContext
    import spark.implicits._

    // 1.数据集准备-----造了一份数据,特征未经过标准化处理,实数据处理过
    val list = new ListBuffer[(String, String, String, Double, Double, String, Double, Double, Double, Double)]()
    list.append(("001", "2020-05-01", "A", 1, 1, "B", 1.1, 1.1, 7.5, 8.0))
    list.append(("001", "2020-05-01", "B", 1.1, 1.1, "A", 1, 1, 18.5, 19.0))
    list.append(("001", "2020-05-02", "C", 7, 7, "D", 8, 8, 9.5, 10.5))
    list.append(("001", "2020-05-02", "D", 8, 8, "C", 7, 7, 20.5, 21.5))
    list.append(("001", "2020-05-03", "A", 1, 1, "B", 1.1, 1.1, 7.6, 8.1))
    list.append(("001", "2020-05-03", "A", 1, 1, "B", 1.1, 1.1, 7.6, 8.1)) 
    list.append(("001", "2020-05-03", "A", 1, 1, "B", 1.1, 1.1, 7.6, 8.1)) 
    list.append(("001", "2020-05-03", "A", 1, 1, "B", 1.1, 1.1, 7.6, 8.1)) 
    list.append(("001", "2020-05-03", "B", 1.1, 1.1, "A", 1, 1, 18.4, 19.2))
    list.append(("001", "2020-05-04", "C", 7, 7, "D", 8, 8, 9.6, 10.4))
    list.append(("001", "2020-05-04", "D", 8, 8, "C", 7, 7, 20.7, 21.6))
    list.append(("001", "2020-05-05", "A", 1, 1, "B", 1.1, 1.1, 7.7, 8.3))
    list.append(("001", "2020-05-05", "B", 1.1, 1.1, "A", 1, 1, 18.3, 19.0))
    list.append(("001", "2020-05-06", "C", 7, 7, "D", 8, 8, 9.2, 10.2))
    list.append(("001", "2020-05-06", "C", 7, 7, "D", 8, 8, 9.2, 10.2)) 
    list.append(("001", "2020-05-06", "D", 8, 8, "C", 7, 7, 20.2, 21.3))
    list.append(("001", "2020-05-07", "E", 5, 5, "F", 15, 15, 6.1, 8.1))
    list.append(("001", "2020-05-07", "F", 15, 15, "G", 20, 20, 16.1, 18.1))
    list.append(("001", "2020-05-07", "G", 20, 20, "E", 5, 5, 20.1, 22.1))
    list.append(("001", "2020-05-08", "A", 1, 1, "D", 8, 8, 0.5, 10.5))
    list.append(("001", "2020-05-08", "D", 8, 8, "F", 15, 15, 10.5, 14.5))
    list.append(("001", "2020-05-08", "F", 15, 15, "A", 1, 1, 17.5, 21.5))
    list.append(("001", "2020-05-08", "F", 15, 15, "A", 1, 1, 17.5, 21.5)) 
    list.append(("001", "2020-05-08", "F", 15, 15, "A", 1, 1, 17.5, 21.5)) 
    list.append(("001", "2020-05-09", "E", 5, 5, "F", 15, 15, 6.3, 8.4))
    list.append(("001", "2020-05-09", "F", 15, 15, "G", 20, 20, 16.4, 18.2))
    list.append(("001", "2020-05-09", "G", 20, 20, "E", 5, 5, 20.2, 21.9))

    sc.parallelize(list).toDF("card_id", "date", "departLocation", "departLon", "departLat", "arrivalLocation", "arrivalLon", "arrivalLat", "departTime", "arrivalTime")
      .sort("date").show(100)
    
    val kmax = 8 //最大天数
    val res = new ListBuffer[((Int,Double, ListBuffer[(DenseMatrix[Double], DenseMatrix[Double])]))]()
    for(i<-1 to(kmax)){
      res.append(run(list,i))
    }
    val best_k = calPointToLineDis(res.map(tp =>(tp._1,tp._2)))
    println("best_k: " + best_k)

    // 数据还原回去
    val dateArr = new ArrayBuffer[(String,Int)]()
    var label = 1
    val b2 = res.filter(_._1 == best_k).head._3.groupBy(_._2)
      .foreach(tp =>{
        val  k = tp._2
        for(r <- k){
          val date_pre = r._1(0,0).toLong.toString
          val date = date_pre.substring(0,4) + "-" + date_pre.substring(4,6) + "-"+ date_pre.substring(6)
          dateArr.append((date,label))
        }
        label += 1
      })
    dateArr.foreach(println(_))
  }

  /**
    * calculate the distance of one point to the line
    * @return
    */
  def calPointToLineDis(points:ListBuffer[(Int,Double)]):Double={
    val points_new = points.sortBy(_._1)
    val startPoint = points_new.head
    val endPoint = points_new.last
    //求取直线系数,k和b
    val k = (startPoint._2 - endPoint._2) / (startPoint._1 - endPoint._1)
    val b = startPoint._2 - k * startPoint._1
    // 求取拐点,拐点离直线距离最远
    val disArr = new ArrayBuffer[(Int,Double)]()
    for(point <- points_new){
      val dis = math.abs(k * point._1 + b - point._2)
      disArr.append((point._1,dis))
    }
    val best_k = disArr.sortBy(_._2).last._1
    best_k
  }


  /**
    * the entry of kmeans program, the method return the SSE and matrix and cluster within the k clusters,the matrix and cluster are in the shape of tuple
    * @param list
    * @param k
    * @return
    */
  def run(list: ListBuffer[(String, String, String, Double, Double, String, Double, Double, Double, Double)],k:Int):(Int,Double, ListBuffer[(DenseMatrix[Double], DenseMatrix[Double])]) = {
    //1.日期数据转化成matrix
    val matrix_list = list2Matrix(list)

    //2.基于kmeans++ 思想初始化聚类中心
    val initial_clusters = initialize_cluster(matrix_list, k)

    //3.声明可变的cluster
    var variable_clusters = new ListBuffer[DenseMatrix[Double]]()
    for (initial_cluster <- initial_clusters) variable_clusters.append(initial_cluster)

    //3.计算每个样本xi到K个聚类中心的距离,并将其划分到距离最小的聚类中心对应的类中(matrix,cluster)
    var clusterResult = new ListBuffer[(DenseMatrix[Double],DenseMatrix[Double])]()
    var flag = true
    for (i <- 0 until (iterator) if flag) {
      val res = new ListBuffer[(DenseMatrix[Double], DenseMatrix[Double])] //res存放matrix 和其对应的center
      for (matrix <- matrix_list) {
        val center = calDisOfMatrix2(matrix, variable_clusters)
        res.append((matrix, center))
      }
      clusterResult = res

      //根据聚类后的结果,更新center矩阵
      val newCenter = reflash_center(res)

      // 判断center是否发生变化,若center不变,则不再迭代
      var flag2 = true
      for(ct <- newCenter if flag2){
        if(!variable_clusters.contains(ct)){
          flag2 = false
          variable_clusters = newCenter
        }
      }
      if(flag2) flag = false
    }

    //4. 计算当前 k对应的SSE指标
    val sse = estimate(clusterResult)
    (k,sse,clusterResult)
  }


  /**
    * calculate the SSE(sum of the squared error)
    * @param clusterResult
    * @return
    */
  def estimate(clusterResult: ListBuffer[(DenseMatrix[Double], DenseMatrix[Double])]):Double = {
    var sse = 0.0
    for(tp<- clusterResult){
      sse += calDisOfMatrix(tp._1,Array(tp._2))
    }
    sse
  }


  /** reflash the center
    *
    * @param res res store the matrix and it corresponding center in tuple form
    */
  def reflash_center(res: ListBuffer[(DenseMatrix[Double], DenseMatrix[Double])]): ListBuffer[DenseMatrix[Double]] = {
    val c1 = res.head._2
    val rows = c1.rows // center对应的行数
    val cols = c1.cols // center对应的列数
    val result = new ListBuffer[DenseMatrix[Double]]()
    val groups = res.groupBy(_._2)
    for (group <- groups) {
      val value = group._2.map(_._1)
      val len = value.length * 1.0
      var center: DenseMatrix[Double] = DenseMatrix.zeros[Double](rows, cols)
      for (va <- value) {
        center += va
      }
      val center2: DenseMatrix[Double] = center / len
      result.append(center2)
    }
    result
  }


  /**
    * based on the kmeans++ algorithm to select the k clusters
    *
    * @param list input data with shape of denseMatrix
    * @param k    represent the initialize numbers of clusters
    * @return
    */
  def initialize_cluster(list: ListBuffer[DenseMatrix[Double]], k: Int): Array[DenseMatrix[Double]] = {
    var remain_k = k
    val seed = (math.random * list.length).toInt
    val clusters = new ArrayBuffer[DenseMatrix[Double]]()
    //c1簇
    clusters.append(list(seed))
    remain_k -= 1

    //其它簇采用轮盘法选取
    for (i <- 0 until (remain_k)) {
      // 1. 计算剩余样本与当前所有簇中心的最短距离
      val disArr = new ArrayBuffer[(DenseMatrix[Double], Double)]()
      for (matrix <- list if !clusters.contains(matrix)) {
        val dis = calDisOfMatrix(matrix, clusters.toArray)
        disArr.append((matrix, dis))
      }

      //2.计算概率px
      val total = disArr.map(_._2).sum
      val px = disArr.map(x => x._2 / total)

      //3.根据概率计算区间
      val sum_px = new ArrayBuffer[Double]()
      var sum_all = 0.0
      for (p <- px) {
        sum_all += p
        sum_px.append(sum_all)
      }

      //4.随机选择新的簇
      val rand = math.random
      var flag = true
      if (rand < sum_px(0)) {
        clusters.append(disArr.head._1)
        flag = false
      } else if (rand > sum_px(sum_px.length - 2)) {
        clusters.append(disArr.last._1)
      } else {
        for (i <- 0 until (sum_px.length - 1) if flag) {
          if (rand > sum_px(i) && rand < sum_px(i + 1)) {
            clusters.append(disArr(i + 1)._1)
            flag = false
          }
        }
      }
      // 5.选除新的簇后,remain_k自减1
      remain_k -= 1
    }
    clusters.toArray
  }


  /**
    * calculate the number of departure and destination pairs,and expandding the one day data to a matrix,
    * so every day has the same matrix shape including rows and cols
    *
    * @param list
    * @return
    */
  def list2Matrix(list: ListBuffer[(String, String, String, Double, Double, String, Double, Double, Double, Double)]): ListBuffer[DenseMatrix[Double]] = {
    // 1.从历史记录中筛选所有od 对
    val ods = list.map(tp => (tp._3 + tp._6)).distinct.sorted

    // 2.统计每一组od对在所有天数中出现次数的最高频次
    val groups = list.groupBy(_._2).toArray
    val odNum = new ArrayBuffer[(String, Int)]()
    for (od <- ods) {
      var max = 0
      for (group <- groups) {
        val num = group._2.filter(tp => (tp._3 + tp._6) == od).length
        if (num > max) max = num
      }
      odNum.append((od, max))
    }

    // 3.根据odNum将list记录填充至二维矩阵
    val matrix_list = new ListBuffer[DenseMatrix[Double]]()
    val rows = odNum.map(_._2).sum //矩阵行数
    val cols = 7 //矩阵列数

    //4.group代表每一天的记录
    for (group <- groups) {
      val date = group._1.replace("-", "").toDouble
      val odArr = new ListBuffer[(Double, Double, Double, Double, Double, Double, Double)]()
      val record = group._2.sortBy(_._9) //按出发时间排序
      for (od <- odNum) {
        val res = record.filter(tp => (tp._3 + tp._6) == od._1)
        try {
          res.foreach(tp => odArr.append((date, tp._4, tp._5, tp._7, tp._8, tp._9, tp._10)))
        } catch {
          case e: Exception => println()
        }

        // 扩充至od._2
        val delta = od._2 - res.length
        for (i <- 0 until (delta)) {
          odArr.append((date, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0))
        }
      }

      //5. 将扩充后的数组转为矩阵形式
      val vec = odArr.map(tp => (tp._1 + "," + tp._2 + "," + tp._3 + "," + tp._4 + "," + tp._5 + "," + tp._6 + "," + tp._7))
        .flatMap(str => str.split(","))
        .map(_.toDouble)
      val matrix = new DenseMatrix[Double](cols, rows, vec.toArray).t
      matrix_list.append(matrix)
    }
    matrix_list
  }


  /**
    * calculate the distance between the input matrix of the clusters,and return the least distance
    *
    * @param matrix
    * @param clusters
    */
  def calDisOfMatrix(matrix: DenseMatrix[Double], clusters: Array[DenseMatrix[Double]]): Double = {
    var dis = Double.MaxValue //初始化最大value
    for (cluster <- clusters) {
      val m = matrix - cluster
      val dis_1 = sum(sqrt(sum(pow(m(::, 1 to (2)), 2.0), Axis._1)))
      val dis_2 = sum(sqrt(sum(pow(m(::, 3 to (4)), 2.0), Axis._1)))
      val dis_3 = sum(sum(abs(m(::, 5 to (6))), Axis._1))
      val distance = geo_coefficient * (dis_1 + dis_2) + time_coefficient * dis_3
      if (distance < dis) dis = distance
    }
    math.pow(dis, 2.0)
  }

  /**
    * calculate the least distance between the matrix and the clusters,and return the cluster center which the matrix belongs
    *
    * @param matrix
    * @param clusters
    */
  def calDisOfMatrix2(matrix: DenseMatrix[Double], clusters: ListBuffer[DenseMatrix[Double]]): DenseMatrix[Double] = {
    var dis = Double.MaxValue //初始化最大value
    var center: DenseMatrix[Double] = null
    for (cluster <- clusters) {
      val m = matrix - cluster
      val dis_1 = sum(sqrt(sum(pow(m(::, 1 to (2)), 2.0), Axis._1)))
      val dis_2 = sum(sqrt(sum(pow(m(::, 3 to (4)), 2.0), Axis._1)))
      val dis_3 = sum(sum(abs(m(::, 5 to (6))), Axis._1))
      val distance = geo_coefficient * (dis_1 + dis_2) + time_coefficient * dis_3
      if (distance < dis) {
        dis = distance
        center = cluster
      }
    }
    center
  }
}

数据(真实,非上述)实战结果:


转载:https://blog.csdn.net/Java_Man_China/article/details/105956464
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场