stream.map(s => JSON.parseObject(s.value()))
// map改变所有条目本身,对类型没有特殊的要求,只要可以序列化即可
.filter(json => boolfunction(json, 60))
// 返回Boolean的函数即可,留下满足条件的
.map(json => {
val time = json.get("timestamp").toString.toInt;
val name = json.get("name").toString;
Array(time, name)
""" scala tips:
Array可以放入不同类型的数据,但是获取时需要指定类型,直接操作会被认为是string,比如
scala> val a = Array(1, 2.2, "3")
a: Array[Any] = Array(1, 2.2, 3)
scala> a(1) + 1
<console>: error: type mismatch;
found : Int(1)
required: String
a(1) + 1
g(3).asInstanceOf[Tuple2[Int, Int]] // 复杂的类型可以强制转换
"""
})
// map 是对一条数据进行操作, transform 是把其中一个RDD变成另一个RDD,在这个过程中的RDD就脱离了Streaming当成普通的RDD操作
// 不是函数的transform 也指和 action 相对的一类rdd转换操作
.transform( rdd => {
// 新版本中,使用SparkSession替代SqlContext,和原来使用方法类似
val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
// 类型转换
import spark.implicits._
val rddRow = rdd.map(r => Row(r(0),r(1),r(2),r(3)))
val schema = StructType(
Seq(
StructField("name", StringType, true),
StructField("prop1", StringType, true),
StructField("prop2", IntegerType, true),
StructField("prop3", DoubleType, true),
))
// rdd转换为 dataframe,支持sql操作
val testDF = spark.createDataFrame(rddRow, schema).registerTempTable("testDF")
val testAVG = spark.sql("select name, max(prop1) as prop11, avg(prop2) as prop22, prop3 from testDF group by name")
// 转为rdd
val testrdd = testAVG.rdd
// flatMap相当于拍扁的map,map是针对一条数据用的,但是如果想把一条数据变成多条数据就需要用flatmap
// flatMap会将返回的迭代器拼接起来,最直观的区别是,map后条数不变,flatmap一般是条数变多
val testFlat = testrdd.flatMap(path => {
// Do something
})
testFlat
})