/ 城寒 / Spark - Basic Usage

Spark - Basic Usage

2018-01-28 posted in [Spark]

DirectStream

基本操作和SparkSQL


stream.map(s => JSON.parseObject(s.value())) 
// map改变所有条目本身,对类型没有特殊的要求,只要可以序列化即可

.filter(json => boolfunction(json, 60))
// 返回Boolean的函数即可,留下满足条件的
 
.map(json => {
    val time = json.get("timestamp").toString.toInt;
    val name = json.get("name").toString;
    Array(time, name)
	""" scala tips:
		Array可以放入不同类型的数据,但是获取时需要指定类型,直接操作会被认为是string,比如
		scala> val a = Array(1, 2.2, "3") 
		a: Array[Any] = Array(1, 2.2, 3)
		scala> a(1) + 1
		<console>: error: type mismatch;
 		found   : Int(1)
 		required: String
       		a(1) + 1
		g(3).asInstanceOf[Tuple2[Int, Int]] // 复杂的类型可以强制转换
	"""
})
 
// map 是对一条数据进行操作, transform 是把其中一个RDD变成另一个RDD,在这个过程中的RDD就脱离了Streaming当成普通的RDD操作
// 不是函数的transform 也指和 action 相对的一类rdd转换操作
.transform( rdd => {
	// 新版本中,使用SparkSession替代SqlContext,和原来使用方法类似
    val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
	// 类型转换
    import spark.implicits._
    val rddRow = rdd.map(r => Row(r(0),r(1),r(2),r(3)))
    val schema = StructType(
        Seq(
            StructField("name", StringType, true),
            StructField("prop1", StringType, true),
            StructField("prop2", IntegerType, true),
            StructField("prop3", DoubleType, true),
        ))
	// rdd转换为 dataframe,支持sql操作
    val testDF = spark.createDataFrame(rddRow, schema).registerTempTable("testDF")
    val testAVG = spark.sql("select name, max(prop1) as prop11, avg(prop2) as prop22, prop3 from testDF group by name")
    // 转为rdd
	val testrdd = testAVG.rdd
	// flatMap相当于拍扁的map,map是针对一条数据用的,但是如果想把一条数据变成多条数据就需要用flatmap
	// flatMap会将返回的迭代器拼接起来,最直观的区别是,map后条数不变,flatmap一般是条数变多
    val testFlat = testrdd.flatMap(path => {
            // Do something
        })
    testFlat
})
 

transform 和 updateStateByKey


// transform 中,支持collect后,重新生成rdd进行转换
val result = stream.transform( rdd => {
    // 可以直接把键值对类型的rdd转为map
    val all_path = rdd.collect().toMap
    ...
    // 需要使用生成该RDD的sparkcontext来生成新的rdd
    // 不能使用全局sc来创建,否则会出现循环创建错误
    val ip_rank = ip_map.keySet.toList
    rdd.sparkContext.parallelize(ip_rank)
})
 
// updateStateByKey 更新,用于维护一个全局的状态表
// 参数value是输入的值,因为一个key可能有多项,所以是Seq,针对不同情况可以取sum或者size等
// 参数state是原来的值,利用option的特点,结合getOrElse来提取或者创建默认值
// 返回一个

def updateFunction(value: Seq[Double], state: Option[Double]):Option[Double] = {
    val newValue: Int = value.sum.toInt
    val stateValue: Double = state.getOrElse(0)
    Some(stateValue + newValue)
}

val stateDStream = result.updateStateByKey[Double](updateFunction _)

stateDStream.foreachRDD( rdd => {
    // Do something for state rdd
})

Windows

// 实现对每个key按顺序记录一段时间内的所有数据
// 如果没有数据,则会空着
val result_windows = result.reduceByKeyAndWindow(
    {(x, y) => x ++ y}, // 新批次到来如何添加:按照key,把新List拼接在尾部
    {(x, y) => x.tail}, // 过时的批次如何删除:按照key,把最早的元素删除
    Seconds(BATCH_INTERVAL * WINDOW_DURATION), // 窗口总长度 = 批次间隔 * 窗口数
    Seconds(BATCH_INTERVAL)				   // 窗口步长 = 每次窗口移动的时间跨度
)