2.4.1 时间概念与Watermark
在Flink中,时间概念主要分为三种类型,即事件时间、处理时间以及接入时间,每种时间的定义和使用范围如表2-2所示。
表2-2 Flink时间概念对比
通过如下三种方式可以抽获和生成Timestamp和Watermark。
1.在SourceFunction中抽取Timestamp和生成Watermark
在SourceFunction中读取数据元素时,SourceContext接口中定义了抽取Timestamp和生成Watermark的方法,如collectWithTimestamp(T element, long timestamp)和emitWatermark(Watermark mark)方法。如果Flink作业基于事件时间的概念,就会使用StreamSourceContexts.ManualWatermarkContext处理Watermark信息。
如代码清单2-22所示,WatermarkContext.collectWithTimestamp方法直接从Source算子接入的数据中抽取事件时间的时间戳信息。
代码清单2-22 WatermarkContext.collectWithTimestamp方法
public void collectWithTimestamp(T element, long timestamp) { synchronized (checkpointLock) { streamStatusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE); if (nextCheck != null) { this.failOnNextCheck = false; } else { scheduleNextIdleDetectionTask(); } //抽取Timestamp信息 processAndCollectWithTimestamp(element, timestamp); } }
生成Watermark主要是通过调用WatermarkContext.emitWatermark()方法进行的。生成的Watermark首先会更新当前Source算子中的CurrentWatermark,然后将Watermark传递给下游的算子继续处理。当下游算子接收到Watermark事件后,也会更新当前算子内部的CurrentWatermark。
如代码清单2-23所示,SourceFunction接口主要调用WatermarkContext.emitWatermark()方法生成并输出Watermark事件,在emitWatermark()方法中会调用processAndEmitWatermark()方法将生成的Watermark实时发送到下游算子中继续处理。
代码清单2-23 WatermarkContext.emitWatermark方法
public void emitWatermark(Watermark mark) { if (allowWatermark(mark)) { synchronized (checkpointLock) { streamStatusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE); if (nextCheck != null) { this.failOnNextCheck = false; } else { scheduleNextIdleDetectionTask(); } //处理并发送Watermark至下游算子 processAndEmitWatermark(mark); } } }
2.通过DataStream中的独立算子抽取Timestamp和生成Watermark
除了能够在SourceFunction中直接分配Timestamp和生成Watermark,也可以在DataStream数据转换的过程中进行相应操作,此时转换操作对应的算子就能使用生成的Timestamp和Watermark信息了。
在DataStream API中提供了3种与抽取Timestamp和生成Watermark相关的Function接口,分别为TimestampExtractor、AssignerWithPeriodicWatermarks以及AssignerWithPunctuatedWatermarks。
如图2-13所示,在TimestampAssigner接口中定义抽取Timestamp的方法。然后分别在AssignerWithPeriodicWatermarks和AssignerWithPunctuatedWatermarks接口中定义生成Watermark的方法。在早期的TimestampExtractor实现中同时包含了Timestamp抽取与生成Watermark的逻辑。
图2-13 TimestampAssigner UML关系图
AssignerWithPeriodicWatermarks和AssignerWithPunctuatedWatermarks的区别如下所示。
·AssignerWithPeriodicWatermarks:事件时间驱动,会周期性地根据事件时间与当前算子中最大的Watermark进行对比,如果当前的EventTime大于Watermark,则触发Watermark更新逻辑,将最新的EventTime赋予CurrentWatermark,并将新生成的Watermark推送至下游算子。
·AssignerWithPunctuatedWatermarks:特殊事件驱动,主要根据数据元素中的特殊事件生成Watermark。例如数据中有产生Watermark的标记,接入数据元素时就会根据该标记调用相关方法生成Watermark。
需要注意的是,AssignerWithPeriodicWatermarks中生成Watermark的默认周期为0,用户可以根据具体情况对周期进行调整,但周期过大会增加数据处理的时延。
从图2-13中我们也可以看到,如果接入事件中的Timestamp是单调递增的,即不会出现乱序的情况,就可以直接使用AssignerWithPeriodicWatermarks接口的默认抽象实现类AscendingTimestampExtractor自动生成Watermark。另外,对于接入数据是有界乱序的情况,可以使用BoundedOutOfOrdernessTimestampExtractor实现类生成Watermark事件。但不论是AscendingTimestampExtractor还是BoundedOutOfOrdernessTimestampExtractor实现类,都需要用户实现extractTimestamp()方法获取EventTime信息。
如代码清单2-24所示,当用户通过实现AssignerWithPeriodicWatermarks抽象类,并调用DataStream.assignTimestampsAndWatermarks()方法时,实际上会根据传入的AssignerWithPeriodicWatermarks创建TimestampsAndPeriodicWatermarksOperator算子。最后调用DataStream.transform()方法将该Operator封装在Transformation中。因此这种获取EventTime和Watermark的方式是通过单独定义算子实现的。
代码清单2-24 DataStream.assignTimestampsAndWatermarks()方法定义
public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks( AssignerWithPeriodicWatermarks<T> timestampAndWatermarkAssigner) { final int inputParallelism = getTransformation().getParallelism(); final AssignerWithPeriodicWatermarks<T> cleanedAssigner = clean(timestamp AndWatermarkAssigner); // 生成TimestampsAndPeriodicWatermarksOperator TimestampsAndPeriodicWatermarksOperator<T> operator = new TimestampsAndPeriodicWatermarksOperator<>(cleanedAssigner); //将生成的Operator加入Transformation列表 return transform("Timestamps/Watermarks", getTransformation(). getOutputType(), operator) .setParallelism(inputParallelism); }
AssignerWithPunctuatedWatermarks的实现和AssignerWithPeriodicWatermarks基本一致,这里我们就不再展开讨论了,读者可以参考相关源码实现。
3.通过Connector提供的接口抽取Timestamp和生成Watermark
对于某些内置的数据源连接器来讲,是通过实现SourceFunction接口接入外部数据的,此时用户无法直接获取SourceFunction的接口方法,会造成无法在SourceOperator中直接生成EventTime和Watermark的情况。在FlinkKafkaConsumer和FlinkKinesisConsumer这些内置的数据源连接器中,已经支持用户将AssignerWithPeriodicWatermarks和AssignerWithPunctuatedWatermarks实现类传递到连接器的接口中,然后再通过连接器应用在对应的SourceFunction中,进而生成EventTime和Watermark。
如代码清单2-25所示,FlinkKafkaConsumer提供了FlinkKafkaConsumerBase.assignTimestampsAndWatermarks()方法,用于设定创建AssignerWithPeriodicWatermarks或AssignerWithPunctuatedWatermarks实现类。
代码清单2-25 FlinkKafkaConsumerBase.assignTimestampsAndWatermarks()方法定义
public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(AssignerWithPer iodicWatermarks<T> assigner) { checkNotNull(assigner); if (this.punctuatedWatermarkAssigner != null) { throw new IllegalStateException("A punctuated watermark emitter has already been set."); } try { ClosureCleaner.clean(assigner, ExecutionConfig.ClosureCleanerLevel. RECURSIVE, true); this.periodicWatermarkAssigner = new SerializedValue<>(assigner); return this; } catch (Exception e) { throw new IllegalArgumentException("The given assigner is not serializable", e); } }
AssignerWithPeriodicWatermarks实现类最终会被AbstractFetcher.emitRecordWithTimestampAndPeriodicWatermark()方法调用。
如代码清单2-26所示,AssignerWithPeriodicWatermarks实现类会被封装在KafkaTopicPartitionStateWithPeriodicWatermarks对象中,调用getTimestampForRecord()方法时,就会调用AssignerWithPeriodicWatermarks.extractTimestamp()方法获取EventTime信息。
代码清单2-26 AbstractFetcher.emitRecordWithTimestampAndPeriodicWatermark()方法定义
private void emitRecordWithTimestampAndPeriodicWatermark( T record, KafkaTopicPartitionState<KPH> partitionState, long offset, long kafkaEventTimestamp) { @SuppressWarnings("unchecked") final KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH> withWatermarksState =(KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>) partitionState; final long timestamp; synchronized (withWatermarksState) { timestamp = withWatermarksState.getTimestampForRecord(record, kafkaEventTimestamp); } synchronized (checkpointLock) { sourceContext.collectWithTimestamp(record, timestamp); partitionState.setOffset(offset); } }
而对于Watermark的生成逻辑,则主要通过ProcessingTimeCallback接口实现,此时会向ProcessingTimeService注册Timer定时器,根据事件时间的变动情况来生成Watermark。
如代码清单2-27所示,在AbstractFetcher中定义了PeriodicWatermarkEmitter类。这里的PeriodicWatermarkEmitter实现了ProcessingTimeCallback接口,专门用于对Watermark的下游输出操作。PeriodicWatermarkEmitter.onProcessingTime()方法主要包含如下逻辑。
·遍历Kafka中所有分区的状态,找到所有分区中最小的Watermark并赋值给minAcrossAll变量,将isEffectiveMinAggregation置为True。
·判断isEffectiveMinAggregation和minAcrossAll大于lastWatermarkTimestamp是否同时满足,也就是最新的Watermark值minAcrossAll是否大于前面生成的Watermark,如果满足则调用emitter.emitWatermark()方法输出Watermark,这里的emitter实际上就是SourceContext。
调用timerService.registerTimer()方法继续注册定时器,定时器的间隔设定为Watermark的产生周期,当定时器条件满足后,会再次调用OnTimer()方法生成Watermark信息。
·Watermark的生成是调用KafkaTopicPartitionStateWithPeriodicWatermarks.getCurrentWatermarkTimestamp()方法实现的,实际上就是调用用户自定义的AssignerWithPeriodicWatermarks.getCurrentWatermark()方法。
代码清单2-27 PeriodicWatermarkEmitter.onProcessingTime()方法定义
public void onProcessingTime(long timestamp) throws Exception { long minAcrossAll = Long.MAX_VALUE; boolean isEffectiveMinAggregation = false; for (KafkaTopicPartitionState<?> state : allPartitions) { final long curr; synchronized (state) { curr = ((KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>) state).getCurrentWatermarkTimestamp(); } minAcrossAll = Math.min(minAcrossAll, curr); isEffectiveMinAggregation = true; } // 输出Watermark if (isEffectiveMinAggregation && minAcrossAll > lastWatermarkTimestamp) { lastWatermarkTimestamp = minAcrossAll; emitter.emitWatermark(new Watermark(minAcrossAll)); } // 调度下次Watermark的生成 timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this); }
可以看出,对于Kafka这类内置连接器来讲,能够将EventTime和Watermark生成的接口释放给用户控制,同时可以避免在并行的SourceFunction中出现因多个分区而产生Watermark不一致的情况。对整个系统来讲,这种获取Watermark的方式更加可靠和准确。