Kafka Stream

时间:2017-10-14 19:14:30   收藏:0   阅读:629

    Kafka Stream是Apache Kafka从0.10版本引入的一个新Feature(当前:1.0.0-rc0,参见:https://github.com/apache/kafka/releases),它提供了对存储于Kafka内的数据进行流式处理和分析的功能。其主要特点如下:

   简言之,Kafka Streams解决了流式处理中的如下困难问题

  为什么要有Kafka Stream


      当前已经有非常多的流式处理系统,最知名且应用最多的开源流式处理系统有Spark Streaming和Apache Storm。Apache Storm发展多年,应用广泛,提供记录级别的处理能力,当前也支持SQL on Stream。而Spark Streaming基于Apache Spark,可以非常方便与图计算,SQL处理等集成,功能强大,对于熟悉其它Spark应用开发的用户而言使用门槛低。另外,目前主流的Hadoop发行版,如MapR,Cloudera和Hortonworks,都集成了Apache Storm和Apache Spark,使得部署更容易。既然Apache Spark与Apache Storm拥用如此多的优势,那为何还需要Kafka Stream呢?主要有如下原因:

KTable vs. KStream


    KTable和KStream是Kafka Stream中非常重要的两个概念,它们是Kafka实现各种语义的基础。因此这里有必要分析下二者的区别。

    以下图为例,假设有一个KStream和KTable,基于同一个Topic创建,并且该Topic中包含如下图所示5条数据。此时遍历KStream将得到与Topic内数据完全一样的所有5条数据,且顺序不变。而此时遍历KTable时,因为这5条记录中有3个不同的Key,所以将得到3条记录,每个Key对应最新的值,并且这三条数据之间的顺序与原来在Topic中的顺序保持一致。这一点与Kafka的日志compact相同。

       技术分享

   此时如果对该KStream和KTable分别基于key做Group,对Value进行Sum,得到的结果将会不同。对KStream的计算结果是<Jack,4>,<Lily,7>,<Mike,4>。而对Ktable的计算结果是<Mike,4>,<Jack,3>,<Lily,5>

State store :      

   流式处理中,部分操作是无状态的,例如过滤操作(Kafka Stream DSL中用filer方法实现)。而部分操作是有状态的,需要记录中间状态,如Window操作和聚合计算。

   State store被用来存储中间状态。它可以是一个持久化的Key-Value存储,也可以是内存中的HashMap,或者是数据库。Kafka提供了基于Topic的状态存储。Topic中存储的数据记录本身是Key-Value形式的,同时Kafka的log compaction机制可对历史数据做compact操作,保留每个Key对应的最后一个Value,从而在保证Key不丢失的前提下,减少总数据量,从而提高查询效率。

   构造KTable时,需要指定其state store name。默认情况下,该名字也即用于存储该KTable的状态的Topic的名字,遍历KTable的过程,实际就是遍历它对应的state store,或者说遍历Topic的所有key,并取每个Key最新值的过程。为了使得该过程更加高效,默认情况下会对该Topic进行compact操作。

   另外,除了KTable,所有状态计算,都需要指定state store name,从而记录中间状态

时间:

   在流式数据处理中,时间是数据的一个非常重要的属性。从Kafka 0.10开始,每条记录除了Key和Value外,还增加了timestamp属性。目前Kafka Stream支持三种时间

   注:Kafka Stream允许通过实现org.apache.kafka.streams.processor.TimestampExtractor接口自定义记录时间。

窗口:

   流式数据是在时间上无界的数据而聚合操作只能作用在特定的数据集,也即有界的数据集上因此需要通过某种方式从无界的数据集上按特定的语义选取出有界的数据。窗口是一种非常常用的设定计算边界的方式。不同的流式处理系统支持的窗口类似,但不尽相同。Kafka Stream支持的窗口如下:

Join:

    kafka Stream由于包含KStream和Ktable两种数据集,因此提供如下Join计算

   对于Join操作,如果要得到正确的计算结果,需要保证参与Join的KTable或KStream中Key相同的数据被分配到同一个Task。具体方法是

聚合与乱序处理:

    聚合操作可应用于KStream和KTable。当聚合发生在KStream上时必须指定窗口,从而限定计算的目标数据集。需要说明的是,聚合操作的结果肯定是KTable。因为KTable是可更新的,可以在晚到的数据到来时(也即发生数据乱序时)更新结果KTable。

    这里举例说明。假设对KStream以5秒为窗口大小,进行Tumbling Time Window上的Count操作。并且KStream先后出现时间为1秒, 3秒, 5秒的数据,此时5秒的窗口已达上限,Kafka Stream关闭该窗口,触发Count操作并将结果3输出到KTable中(假设该结果表示为<1-5,3>)。若1秒后,又收到了时间为2秒的记录,由于1-5秒的窗口已关闭,若直接抛弃该数据,则可认为之前的结果<1-5,3>不准确。而如果直接将完整的结果<1-5,4>输出到KStream中,则KStream中将会包含该窗口的2条记录,<1-5,3>, <1-5,4>,也会存在肮数据。因此Kafka Stream选择将聚合结果存于KTable中,此时新的结果<1-5,4>会替代旧的结果<1-5,3>。用户可得到完整的正确的结果。这种方式保证了数据准确性,同时也提高了容错性。

    但需要说明的是,Kafka Stream并不会对所有晚到的数据都重新计算并更新结果集,而是让用户设置一个retention period,将每个窗口的结果集在内存中保留一定时间,该窗口内的数据晚到时,直接合并计算,并更新结果KTable。超过retention period后,该窗口结果将从内存中删除,并且晚到的数据即使落入窗口,也会被直接丢弃。

容错:

    Kafka Stream从如下几个方面进行容错:

Kafka Stream整体架构


    kafka stream的架构如下:

     技术分享

    前(Kafka 0.11.0.0)Kafka Stream的数据源只能如上图所示是Kafka。但是处理结果并不一定要如上图所示输出到Kafka。上图中的Consumer和Producer并不需要开发者在应用中显示实例化,而是由Kafka Stream根据参数隐式实例化和管理,从而降低了使用门槛。开发者只需要专注于开发核心业务逻辑,也即上图中Task内的部分。

Processor Topology:基于Kafka Stream的流式应用的业务逻辑全部通过一个被称为Processor Topology的地方执行。它与Storm的Topology和Spark的DAG类似,都定义了数据在各个处理单元(在Kafka Stream中被称作Processor)间的流动方式,或者说定义了数据的处理逻辑。 

Kafka Stream并行模型:Kafka Stream的并行模型中,最小粒度为Task,而每个Task包含一个特定子Topology的所有Processor。因此每个Task所执行的代码完全一样,唯一的不同在于所处理的数据集互补。如下图展示了在一个进程(Instance)中以2个Topic(Partition数均为4)为数据源的Kafka Stream应用的并行模型。从图中可以看到,由于Kafka Stream应用的默认线程数为1,所以4个Task全部在一个线程中运行。

       技术分享

为了充分利用多线程的优势,可以设置Kafka Stream的线程数。下图展示了线程数为2时的并行模型。

       技术分享

 Kafka Stream可被嵌入任意Java应用(理论上基于JVM的应用都可以)中,下图展示了在同一台机器的不同进程中同时启动同一Kafka Stream应用时的并行模型。注意,这里要保证两个进程的StreamsConfig.APPLICATION_ID_CONFIG完全一样。因为Kafka Stream将APPLICATION_ID_CONFI作为隐式启动的Consumer的Group ID。只有保证APPLICATION_ID_CONFI相同,才能保证这两个进程的Consumer属于同一个Group,从而可以通过Consumer Rebalance机制拿到互补的数据集。

      技术分享

既然实现了多进程部署,可以以同样的方式实现多机器部署。该部署方式也要求所有进程的APPLICATION_ID_CONFIG完全一样。从图上也可以看到,每个实例中的线程数并不要求一样。但是无论如何部署,Task总数总会保证一致。

      技术分享

应用示例 


   示例完整代码地址: https://github.com/habren/KafkaExample  ,Schemal结构说明:

      现在希望计算每小时购买产地与自己所在地相同的用户总数。

  1. 首先由于希望使用订单时间,而它包含在orderStream的Value中,需要通过提供一个实现TimestampExtractor接口的类从orderStream对应的Topic中抽取出订单时
    • public class OrderTimestampExtractor implements TimestampExtractor {
      
        @Override
        public long extract(ConsumerRecord<Object, Object> record) {
          if(record instanceof Order) {
            return ((Order)record).getTS();
          } else {
            return 0;
          }
        }
      }

       

  2. 通过将orderStream与userTable进行Join,来获取订单用户所在地。由于二者对应的Topic的Partition数相同,且Key都为用户名,再假设Producer往这两个Topic写数据时所用的Partitioner实现相同,则此时上文所述Join条件满足,可直接进行Join(代码注释: 从下面代码中,可以看到,Join时需要指定如何从参与Join双方的记录生成结果记录的Value。Key不需要指定,因为结果记录的Key与Join Key相同,故无须指定。Join结果存于名为orderUserStream的KStream中)
    • orderUserStream = orderStream
          .leftJoin(userTable, 
               // 该lamda表达式定义了如何从orderStream与userTable生成结果集的Value
              (Order order, User user) -> OrderUser.fromOrderUser(order, user), 
               // 结果集Key序列化方式
              Serdes.String(),
               // 结果集Value序列化方式
               SerdesFactory.serdFrom(Order.class))
          .filter((String userName, OrderUser orderUser) -> orderUser.userAddress != null)

       

  3. 接下来需要将orderUserStream与itemTable进行Join,从而获取商品产地。此时orderUserStream的Key仍为用户名,而itemTable对应的Topic的Key为产品名,并且二者的Partition数不一样,因此无法直接Join。此时需要通过through方法,对其中一方或双方进行重新分区,使得二者满足Join条件。这一过程相当于Spark的Shuffle过程和Storm的FieldGrouping(代码注释:从下面代码可见,through时需要指定Key的序列化器,Value的序列化器,以及分区方式和结果集所在的Topic。这里要注意,该Topic(orderuser-repartition-by-item)的Partition数必须与itemTable对应Topic的Partition数相同,并且through使用的分区方法必须与iteamTable对应Topic的分区方式一样。经过这种through操作,orderUserStream与itemTable满足了Join条件,可直接进行Join)
    • orderUserStrea
          .through(
              // Key的序列化方式
              Serdes.String(),
              // Value的序列化方式 
              SerdesFactory.serdFrom(OrderUser.class), 
              // 重新按照商品名进行分区,具体取商品名的哈希值,然后对分区数取模
              (String key, OrderUser orderUser, int numPartitions) -> (orderUser.getItemName().hashCode() & 0x7FFFFFFF) % numPartitions, 
              "orderuser-repartition-by-item")
          .leftJoin(itemTable, (OrderUser orderUser, Item item) -> OrderUserItem.fromOrderUser(orderUser, item), Serdes.String(), SerdesFactory.serdFrom(OrderUser.class))

小结:


  1. Kafka Stream的并行模型完全基于Kafka的分区机制和Rebalance机制,实现了在线动态调整并行度
  2. 同一Task包含了一个子Topology的所有Processor,使得所有处理逻辑都在同一线程内完成,避免了不必的网络通信开销,从而提高了效率。
  3. through方法提供了类似Spark的Shuffle机制,为使用不同分区策略的数据提供了Join的可能
  4. log compact提高了基于Kafka的state store的加载效率
  5. state store为状态计算提供了可能
  6. 基于offset的计算进度管理以及基于state store的中间状态管理为发生Consumer rebalance或Failover时从断点处继续处理提供了可能,并为系统容错性提供了保障
  7. KTable的引入,使得聚合计算拥用了处理乱序问题的能力

参考资料:


原文:http://www.cnblogs.com/tgzhu/p/7660838.html

评论(0
© 2014 bubuko.com 版权所有 - 联系我们:wmxa8@hotmail.com
打开技术之扣,分享程序人生!