/ 城寒 / Spark - Kafka > Streaming > Hadoop/Redis

Spark - Kafka > Streaming > Hadoop/Redis

2018-01-27 posted in [Spark]

Streaming

// 设置模式、名称、序列化方式、kafka单个分片最大单词获取数
val conf = new SparkConf().setMaster("local[2]") // setMaster 最好在任务提交时设置,此处可以去掉
                          .setAppName("SparkStreamKaflaTest")
                          .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
                          .set("spark.streaming.kafka.maxRatePerPartition", "25000")
// 设置时间间隔和恢复点
val ssc = new StreamingContext(conf, Seconds(30))
ssc.checkpoint("/home/kafka-logs/checkpoint/")
 
// 关于checkpoint的补充
"""
checkpoint 可以从某个记录点恢复程序的状态,通常把文件设置为类似HDFS的文件系统中
需要注意的是,恢复过程中会有各种序列化问题,对于重启需要修改配置的情况,不是很适用
网上关于checkpoint有很多吐槽
使用时需要使用spark提供的方式,只用上面那句话是不起作用的,没有实现过,以下为官方代码
"""

// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
  val ssc = new StreamingContext(...)   // new context
  val lines = ssc.socketTextStream(...) // create DStreams
  // 在这里,完成所有的streamingContext的启动操作,恢复时,程序会在这个状态
  // 可以设置定时更新checkpoint,用来更新数据的状态,但是会有大量的IO,影响性能
  ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
  ssc
}

// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)

// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
// 正常的,和启动无关的操作
context. ...

// Start the context
context.start()
context.awaitTermination()

Kafka DStream

kafka会保存一段时间或者一定大小的数据,与是否被消费无关,通过offset来判断是否被某个消费者消费 采用将偏移量保存到kafka的方式实现程序启动时,从上次位置获取数据

val topic = "kafkatopic"
val kafkaParam = Map(
        "bootstrap.servers"  -> "localhost:9092.....",
        "key.deserializer"   -> classOf[StringDeserializer],
        "value.deserializer" -> classOf[StringDeserializer],
        "group.id"           -> "kafkagroup",
        "auto.offset.reset"  -> "latest", 
		// earliest 最早的,会从kafka现有的第一条数据开始获取
		// latest 最新的,如果没有设置同步,只会接收当前时刻开始新加入的数据
        "enable.auto.commit" -> (false: java.lang.Boolean)
		// 通常都关闭,因为这个同步无法知道什么时候任务完成了,会导致有遗漏
    )
// 实例化kafka streaming
val stream = KafkaUtils.createDirectStream[String,String](
        ssc,
        PreferConsistent,
        Subscribe[String,String](topic,kafkaParam)
    )
 
// 执行完过滤后,保存偏移量
stream.foreachRDD(rdd => {
    // 获取kafka各个主题和分片的偏移量
    val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    offsets.foreach(o => println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}"))
    // 保存偏移量到kafka,手动进行同步
    stream.asInstanceOf[CanCommitOffsets].commitAsync(offsets)
})
 

Kafka 从指定偏移量开始读取

import org.apache.kafka.common.TopicPartition
 
val kafkaParam: Map[String, Object] = Map(
        "bootstrap.servers"  -> "localhost:9092.....",
        "key.deserializer"   -> classOf[StringDeserializer],
        "value.deserializer" -> classOf[StringDeserializer],
        "group.id"           -> "kafkagroup",
        "enable.auto.commit" -> (false: java.lang.Boolean)
    )
val offsetList = List(("kafka_topic", 0, offsets))   //指定topic,partition_no,offset
val fromOffsets = setFromOffsets(offsetList)     //构建参数
val stream = KafkaUtils.createDirectStream[String, String](
    ssc, 
    PreferConsistent,
    Subscribe[String,String](Iterable("kafka_topic"), kafkaParam, fromOffsets))
 
// RDD Process
 
def setFromOffsets(list: List[(String, Int, Long)]): Map[TopicPartition, Long] = {
    var fromOffsets: Map[TopicPartition, Long] = Map()
    for (offset <- list) {
        val tp = new TopicPartition(offset._1, offset._2)//topic和分区数
        fromOffsets += (tp -> offset._3)                 // offset位置
    }
    fromOffsets
}
 

Redis

写入 redis 使用pipline模式,速度显著提升

rdd.foreachPartition ( partition => {
    if(redis_status == true){
        // 和数据库的连接无法序列化,所以每个分片都要建立和redis的连接,分片内的数据用同一个连接上传
        var jr: Jedis = null
        var pl: Pipeline = null
        try { 
            jr = new Jedis(_redis("host").toString, _redis("port").toString.toInt, 
                           _redis("timeout").toString.toInt, _redis("database").toString.toInt)
            pl = jr.pipelined()
        } catch {
            case e: Exception => println("[ERROR]: " + e.toString)
        }
        // 发送结果
        partition.foreach ( s => {
            // Do something
            if(pl != null) { pl.lpush("spark_" + s._1, s._2.toString) }
        })
        if(pl != null) { pl.sync() }
        if(jr != null) { jr.disconnect() }
    }else{
        partition.foreach ( s => println(s._2) )
    }
})

    // 单连接 极慢
    var jr: Jedis = null
    try { 
        jr = new Jedis(redis_info("host").toString, redis_info("port").toString.toInt);
    } catch {
        case e: Exception => println(e.toString)
    }
    partition.foreach( s => {
        if(jr != null) {
            jr.lpush("spark_" + s._1, s._2)
        }
    })

Hadoop

// 按照分区Partition保存
// 会再文件夹下生成一些分区封建 part-0000 等
rdd.saveAsTextFile(path)


// 按照 Key 分文件存储
// 数据每行如 (key1, 123)  (key2, 234) ...
// HDFS_PATH = /app/timestamp,通常 hdfs://ip:port 使用配置文件中的地址
// 这会保存为 /app/timestamp/key1 /app/timestamp/key2 两个文件
rdd.saveAsHadoopFile(HDFS_PATH, classOf[String], classOf[String], classOf[RDDMultipleTextOutputFormat])


class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
  override def generateActualKey(key: Any, value: Any): Any = 
    NullWritable.get()

  override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = 
    key.asInstanceOf[String]
}

import

import com.alibaba.fastjson.JSON
import com.alibaba.fastjson.JSONObject
import com.alibaba.fastjson.JSONException

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat
import org.apache.hadoop.io.NullWritable

import org.apache.spark.sql.Row
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{IntegerType, StringType, DoubleType, StructField, StructType}  

import scala.io.Source
import scala.collection.mutable.ListBuffer
import scala.collection.mutable.Set
import scala.collection.mutable.Map
import scala.collection.mutable.MutableList
import scala.reflect.ClassTag

import java.io._
import java.io.{ ObjectInputStream, ObjectOutputStream }
import java.util.Date
import java.text.SimpleDateFormat  
import java.util.Date