利用PCA、SVD进行数据降维

利用spark带的org.apache.spark.mllib.feature.PCA来实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.mllib.feature.PCA
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD
object PCAUL {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("PCAUL")
val sc = new SparkContext(conf)
val trainSet = args(0)
val data = MLUtils.loadLibSVMFile(sc, trainSet)
// Compute the top 500 principal components.
val pca = new PCA(500).fit(data.map(_.features))
// Project vectors to the linear space spanned by the top 500 principal
// components, keeping the label
val projected = data.map(p => p.copy(features = pca.transform(p.features)))
println(projected.take(1))
sc.stop()
}
}

这个API要求输入的维数不能超过65535维。

也可以用SVD来实现PCA:

PCA过程参考 理解PCA | Oath2yangmen’s Blog

PCA和SVD联系参考 理解SVD | Oath2yangmen’s Blog

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
object SVDUL {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("SVDUL")
val sc = new SparkContext(conf)
val trainSet = args(0)
val k = args(1).toInt
val output = args(2)
val data = MLUtils.loadLibSVMFile(sc, trainSet)
val features = data.map(_.features).repartition(100).cache()
val label = data.map(_.label)
val mat: RowMatrix = new RowMatrix(features)
val svd: SingularValueDecomposition[RowMatrix, Matrix] = mat.computeSVD(k, computeU = false)
val V: Matrix = svd.V // The V factor is a local dense matrix.
val res = label.zip(mat.multiply(V).rows).map{
case (l,f) => LabeledPoint(l,f)
}
MLUtils.saveAsLibSVMFile(res,output)
sc.stop()
}
}

PCA先计算协方差矩阵,然后svd分解。

spark中计算PCA源码截图

SVD在SVDMode.LocalLAPACK下和pca计算过程一样,其他模式下协方差矩阵会乘一个稠密向量。具体搜索ARPACK或查看spark源码。

spark中计算SVD源码截图



0 Comments