Spark Streaming消费KafKa数据 demo

时间:2020-08-05 00:30:07   收藏:0   阅读:97

1.首先模拟一个生产者不断的向Kafka灌数据

package com.program.KafkaToSparkStreaming

import java.util.Properties


import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}

import scala.util.Random

/**
  * Created by jihn88 on 2020/8/1.
  */
object Producer extends App{
  //从运行时参数读入topic
  val topic = "test0307"

  //从运行时参数读入brokers
  val brokers = "node1:9092,node2:9092,node3:9092"

  //设置一个随机数
  val random = new Random()

  //配置参数
  val prop = new Properties()

  //配置brokers
  prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokers)

//序列化类型 prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") //建立Kafka连接 val kafkaProducer = new KafkaProducer[String,String](prop) val t = System.currentTimeMillis() //模拟用户名地址数据 val nameAddrs = Map("bob"->"shanghai#200000", "amy"->"beijing#100000", "alice"->"shanghai#200000", "tom"->"beijing#100000", "lulu"->"hangzhou#310000", "nick"->"shanghai#200000") //模拟用户名电话数据 val namePhones = Map( "bob"->"15700079421", "amy"->"18700079458", "alice"->"17730079427", "lulu"->"18800074423", "nick"->"14400033426" ) for(nameAddr <- nameAddrs){ val data = new ProducerRecord[String,String](topic,nameAddr._1, s"${nameAddr._1}\t${nameAddr._2}\t0") //数据写入Kafka kafkaProducer.send(data) if(random.nextInt(100)<50) Thread.sleep(random.nextInt(10)) } for(namePhone<- namePhones){ val data = new ProducerRecord[String,String](topic,namePhone._1, s"${namePhone._1}\t${namePhone._2}\t1") kafkaProducer.send(data) if(random.nextInt(100)<50) Thread.sleep(random.nextInt(10)) } System.out.println("Send peer second:"+(nameAddrs.size+namePhones.size)*1000/ (System.currentTimeMillis()-t)) //生成模拟数据 (name,addr,type:0) kafkaProducer.close() }

2.建立Spark Streaming连接Kafka消费Kafka数据

package com.program.KafkaToSparkStreaming

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

/**
  * Created by jihn88 on 2020/8/1.
  */
object KafkaSparkStreaming extends App{
  val conf = new SparkConf().
    setMaster("local")
    .setAppName("kafkaSparkStreaming")
    .set("spark.streaming.kafka.MaxRatePerPartition","3")
    .set("spark.local.dir","./tmp")
  //创建上下文,2s为批处理间隔
  val ssc = new StreamingContext(conf,Seconds(10))

  //设置日志级别
  ssc.sparkContext.setLogLevel("ERROR")



  //配置kafka参数,根据broker和topic创建连接Kafka 直接连接 direct kafka
  val KafkaParams = Map[String,Object](
    //brokers地址
    "bootstrap.servers"->"node1:9092,node2:9092,node3:9092",
    //序列化类型
    "key.deserializer"->classOf[StringDeserializer],
    "value.deserializer" -> classOf[StringDeserializer],
    "group.id" -> "MyGroupId",
    //设置手动提交消费者offset
    "enable.auto.commit" -> (false: java.lang.Boolean)//默认是true
  )

  //获取KafkaDStream
  val kafkaDirectStream = KafkaUtils.createDirectStream[String,String](ssc,
    //test0307为topic
    PreferConsistent,Subscribe[String,String](List("test0307"),KafkaParams))


  //根据得到的kafak信息,切分得到用户电话DStream
  val nameAddrStream = kafkaDirectStream.map(_.value()).filter(record=>{
    val tokens: Array[String] = record.split("\t")
    tokens(2).toInt==0
  }).map(record=>{
    val tokens = record.split("\t")
    (tokens(0),tokens(1))
  })

  val namePhoneStream = kafkaDirectStream.map(_.value()).filter(
    record=>{
      val tokens = record.split("\t")
      tokens(2).toInt == 1
    }
  ).map(record=>{
    val tokens = record.split("\t")
    (tokens(0),tokens(1))
  })


  //以用户名为key,将地址电话配对在一起,并产生固定格式的地址电话信息
  val nameAddrPhoneStream = nameAddrStream.join(namePhoneStream).map(
    record=>{
      s"姓名:${record._1},地址:${record._2._1},邮编:${record._2._2}"
    }
  )
  //打印输出
  nameAddrPhoneStream.print()

  //开始计算
  ssc.start()

  ssc.awaitTermination()
}

 

最后输出信息如下

技术分享图片

原文:https://www.cnblogs.com/itachilearner/p/13436909.html

评论(0
© 2014 bubuko.com 版权所有 - 联系我们:wmxa8@hotmail.com
打开技术之扣,分享程序人生!