Flink设计与实现:核心原理与源码解析
上QQ阅读APP看书,第一时间看更新

2.4.1 时间概念与Watermark

在Flink中,时间概念主要分为三种类型,即事件时间、处理时间以及接入时间,每种时间的定义和使用范围如表2-2所示。

表2-2 Flink时间概念对比

通过如下三种方式可以抽获和生成Timestamp和Watermark。

1.在SourceFunction中抽取Timestamp和生成Watermark

在SourceFunction中读取数据元素时,SourceContext接口中定义了抽取Timestamp和生成Watermark的方法,如collectWithTimestamp(T element, long timestamp)和emitWatermark(Watermark mark)方法。如果Flink作业基于事件时间的概念,就会使用StreamSourceContexts.ManualWatermarkContext处理Watermark信息。

如代码清单2-22所示,WatermarkContext.collectWithTimestamp方法直接从Source算子接入的数据中抽取事件时间的时间戳信息。

代码清单2-22 WatermarkContext.collectWithTimestamp方法


public void collectWithTimestamp(T element, long timestamp) {
   synchronized (checkpointLock) {
      streamStatusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE);
      if (nextCheck != null) {
         this.failOnNextCheck = false;
      } else {
         scheduleNextIdleDetectionTask();
      }
     //抽取Timestamp信息
      processAndCollectWithTimestamp(element, timestamp);
   }
}

生成Watermark主要是通过调用WatermarkContext.emitWatermark()方法进行的。生成的Watermark首先会更新当前Source算子中的CurrentWatermark,然后将Watermark传递给下游的算子继续处理。当下游算子接收到Watermark事件后,也会更新当前算子内部的CurrentWatermark。

如代码清单2-23所示,SourceFunction接口主要调用WatermarkContext.emitWatermark()方法生成并输出Watermark事件,在emitWatermark()方法中会调用processAndEmitWatermark()方法将生成的Watermark实时发送到下游算子中继续处理。

代码清单2-23 WatermarkContext.emitWatermark方法


public void emitWatermark(Watermark mark) {
   if (allowWatermark(mark)) {
      synchronized (checkpointLock) {
         streamStatusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE);
         if (nextCheck != null) {
            this.failOnNextCheck = false;
         } else {
            scheduleNextIdleDetectionTask();
         }
         //处理并发送Watermark至下游算子
         processAndEmitWatermark(mark);
      }
   }
}

2.通过DataStream中的独立算子抽取Timestamp和生成Watermark

除了能够在SourceFunction中直接分配Timestamp和生成Watermark,也可以在DataStream数据转换的过程中进行相应操作,此时转换操作对应的算子就能使用生成的Timestamp和Watermark信息了。

在DataStream API中提供了3种与抽取Timestamp和生成Watermark相关的Function接口,分别为TimestampExtractor、AssignerWithPeriodicWatermarks以及AssignerWithPunctuatedWatermarks。

如图2-13所示,在TimestampAssigner接口中定义抽取Timestamp的方法。然后分别在AssignerWithPeriodicWatermarks和AssignerWithPunctuatedWatermarks接口中定义生成Watermark的方法。在早期的TimestampExtractor实现中同时包含了Timestamp抽取与生成Watermark的逻辑。

图2-13 TimestampAssigner UML关系图

AssignerWithPeriodicWatermarks和AssignerWithPunctuatedWatermarks的区别如下所示。

·AssignerWithPeriodicWatermarks:事件时间驱动,会周期性地根据事件时间与当前算子中最大的Watermark进行对比,如果当前的EventTime大于Watermark,则触发Watermark更新逻辑,将最新的EventTime赋予CurrentWatermark,并将新生成的Watermark推送至下游算子。

·AssignerWithPunctuatedWatermarks:特殊事件驱动,主要根据数据元素中的特殊事件生成Watermark。例如数据中有产生Watermark的标记,接入数据元素时就会根据该标记调用相关方法生成Watermark。

需要注意的是,AssignerWithPeriodicWatermarks中生成Watermark的默认周期为0,用户可以根据具体情况对周期进行调整,但周期过大会增加数据处理的时延。

从图2-13中我们也可以看到,如果接入事件中的Timestamp是单调递增的,即不会出现乱序的情况,就可以直接使用AssignerWithPeriodicWatermarks接口的默认抽象实现类AscendingTimestampExtractor自动生成Watermark。另外,对于接入数据是有界乱序的情况,可以使用BoundedOutOfOrdernessTimestampExtractor实现类生成Watermark事件。但不论是AscendingTimestampExtractor还是BoundedOutOfOrdernessTimestampExtractor实现类,都需要用户实现extractTimestamp()方法获取EventTime信息。

如代码清单2-24所示,当用户通过实现AssignerWithPeriodicWatermarks抽象类,并调用DataStream.assignTimestampsAndWatermarks()方法时,实际上会根据传入的AssignerWithPeriodicWatermarks创建TimestampsAndPeriodicWatermarksOperator算子。最后调用DataStream.transform()方法将该Operator封装在Transformation中。因此这种获取EventTime和Watermark的方式是通过单独定义算子实现的。

代码清单2-24 DataStream.assignTimestampsAndWatermarks()方法定义


public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(
   AssignerWithPeriodicWatermarks<T> timestampAndWatermarkAssigner) {
   final int inputParallelism = getTransformation().getParallelism();
   final AssignerWithPeriodicWatermarks<T> cleanedAssigner = clean(timestamp
      AndWatermarkAssigner);
   // 生成TimestampsAndPeriodicWatermarksOperator
   TimestampsAndPeriodicWatermarksOperator<T> operator =
      new TimestampsAndPeriodicWatermarksOperator<>(cleanedAssigner);
   //将生成的Operator加入Transformation列表
   return transform("Timestamps/Watermarks", getTransformation().
      getOutputType(), operator)
         .setParallelism(inputParallelism);
}

AssignerWithPunctuatedWatermarks的实现和AssignerWithPeriodicWatermarks基本一致,这里我们就不再展开讨论了,读者可以参考相关源码实现。

3.通过Connector提供的接口抽取Timestamp和生成Watermark

对于某些内置的数据源连接器来讲,是通过实现SourceFunction接口接入外部数据的,此时用户无法直接获取SourceFunction的接口方法,会造成无法在SourceOperator中直接生成EventTime和Watermark的情况。在FlinkKafkaConsumer和FlinkKinesisConsumer这些内置的数据源连接器中,已经支持用户将AssignerWithPeriodicWatermarks和AssignerWithPunctuatedWatermarks实现类传递到连接器的接口中,然后再通过连接器应用在对应的SourceFunction中,进而生成EventTime和Watermark。

如代码清单2-25所示,FlinkKafkaConsumer提供了FlinkKafkaConsumerBase.assignTimestampsAndWatermarks()方法,用于设定创建AssignerWithPeriodicWatermarks或AssignerWithPunctuatedWatermarks实现类。

代码清单2-25 FlinkKafkaConsumerBase.assignTimestampsAndWatermarks()方法定义


public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(AssignerWithPer
   iodicWatermarks<T> assigner) {
   checkNotNull(assigner);
   if (this.punctuatedWatermarkAssigner != null) {
      throw new IllegalStateException("A punctuated watermark emitter has 
         already been set.");
   }
   try {
      ClosureCleaner.clean(assigner, ExecutionConfig.ClosureCleanerLevel.
         RECURSIVE, true);
      this.periodicWatermarkAssigner = new SerializedValue<>(assigner);
      return this;
   } catch (Exception e) {
      throw new IllegalArgumentException("The given assigner is not 
         serializable", e);
   }
}

AssignerWithPeriodicWatermarks实现类最终会被AbstractFetcher.emitRecordWithTimestampAndPeriodicWatermark()方法调用。

如代码清单2-26所示,AssignerWithPeriodicWatermarks实现类会被封装在KafkaTopicPartitionStateWithPeriodicWatermarks对象中,调用getTimestampForRecord()方法时,就会调用AssignerWithPeriodicWatermarks.extractTimestamp()方法获取EventTime信息。

代码清单2-26 AbstractFetcher.emitRecordWithTimestampAndPeriodicWatermark()方法定义


private void emitRecordWithTimestampAndPeriodicWatermark(
   T record, KafkaTopicPartitionState<KPH> partitionState, long offset, 
      long kafkaEventTimestamp) {
   @SuppressWarnings("unchecked")
   final KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH> withWatermarksState 
       =(KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>) 
         partitionState;
   final long timestamp;
   synchronized (withWatermarksState) {
      timestamp = withWatermarksState.getTimestampForRecord(record, 
         kafkaEventTimestamp);
   }
   synchronized (checkpointLock) {
      sourceContext.collectWithTimestamp(record, timestamp);
      partitionState.setOffset(offset);
   }

}

而对于Watermark的生成逻辑,则主要通过ProcessingTimeCallback接口实现,此时会向ProcessingTimeService注册Timer定时器,根据事件时间的变动情况来生成Watermark。

如代码清单2-27所示,在AbstractFetcher中定义了PeriodicWatermarkEmitter类。这里的PeriodicWatermarkEmitter实现了ProcessingTimeCallback接口,专门用于对Watermark的下游输出操作。PeriodicWatermarkEmitter.onProcessingTime()方法主要包含如下逻辑。

·遍历Kafka中所有分区的状态,找到所有分区中最小的Watermark并赋值给minAcrossAll变量,将isEffectiveMinAggregation置为True。

·判断isEffectiveMinAggregation和minAcrossAll大于lastWatermarkTimestamp是否同时满足,也就是最新的Watermark值minAcrossAll是否大于前面生成的Watermark,如果满足则调用emitter.emitWatermark()方法输出Watermark,这里的emitter实际上就是SourceContext。

调用timerService.registerTimer()方法继续注册定时器,定时器的间隔设定为Watermark的产生周期,当定时器条件满足后,会再次调用OnTimer()方法生成Watermark信息。

·Watermark的生成是调用KafkaTopicPartitionStateWithPeriodicWatermarks.getCurrentWatermarkTimestamp()方法实现的,实际上就是调用用户自定义的AssignerWithPeriodicWatermarks.getCurrentWatermark()方法。

代码清单2-27 PeriodicWatermarkEmitter.onProcessingTime()方法定义


public void onProcessingTime(long timestamp) throws Exception {
   long minAcrossAll = Long.MAX_VALUE;
   boolean isEffectiveMinAggregation = false;
   for (KafkaTopicPartitionState<?> state : allPartitions) {
      final long curr;
      synchronized (state) {
         curr = ((KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>) 
            state).getCurrentWatermarkTimestamp();
      }
      minAcrossAll = Math.min(minAcrossAll, curr);
      isEffectiveMinAggregation = true;
   }
   // 输出Watermark
   if (isEffectiveMinAggregation && minAcrossAll > lastWatermarkTimestamp) {
      lastWatermarkTimestamp = minAcrossAll;
      emitter.emitWatermark(new Watermark(minAcrossAll));
   }
   // 调度下次Watermark的生成
   timerService.registerTimer(timerService.getCurrentProcessingTime() + 
      interval, this);
}

可以看出,对于Kafka这类内置连接器来讲,能够将EventTime和Watermark生成的接口释放给用户控制,同时可以避免在并行的SourceFunction中出现因多个分区而产生Watermark不一致的情况。对整个系统来讲,这种获取Watermark的方式更加可靠和准确。