- The TaxiRide cleansing program shall write its result stream to a Kafka topic and
- the Popular Places program shall read its input stream from that that Kafka topic.
The Kafka installation instructions explain how to setup and start Kafka. The following instructions help with the necessary modifications:
Adding the Kafka Connector dependency
Flink features connectors to several external systems. In order to keep the dependencies on the core slim, these connectors are organized in separate modules and have to be included as needed. The connector for Kafka 0.11 can be used by adding the following dependency to your
pom.xml file (this dependency has already been added to the training exercises project):
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.11_2.11</artifactId> <version>1.7.1</version> </dependency>
Writing to Kafka
The result of the TaxiRide Cleansing program is a
DataStream<TaxiRide>. The program needs to be modified to write this
DataStream into an Kafka topic instead of printing it to standard out.
Flink’s Kafka Connector provides the
FlinkKafkaProducer011 class to write a
DataStream to a Kafka 0.11 topic. It can be used as follows:
When you start a program that writes to a Kafka sink, the resulting records are appended to the the configured Kafka topic. You can check if the Kafka topic is receiving data by starting Kafka’s Console Consumer, which prints the records of a topic to the console, as follows:
./bin/kafka-console-consumer.sh \ --zookeeper localhost:2181 \ --topic cleansedRides \ --from-beginning
Note: Kafka topics are designed as durable logs. Restarting a program that writes to a Kafka topic means that all records are appended, i.e., the topic is not overwritten! Check the Kafka instructions to learn how a topic can be removed.
Reading from Kafka
After the Kafka topic was filled with cleansed TaxiRides, the next step is to adapt the Popular Places program such that it reads its input from that topic. For that we need to replace the
TaxiRideSource by a
KafkaConsumer data source. The following code snippet shows how to configure and use a
Note: A stream read from Kafka does not automatically have timestamps and watermarks assigned. You have to take care of this yourself in order to make the event-time windows working. Otherwise the program won’t emit any results. Please refer to the Implementation Hints or to the reference implementation below if you need help.
The resulting stream should be printed to standard out.
When you run your program, it will start reading the Kafka topic from the beginning (given that you set the
auto.offset.reset property to
earliest) and stop at the end of the topic. You can also concurrently run the writing and reading program in order to send data from the TaxiRide Cleansing program through Kafka to the Popular Place program.
KafkaConsumer011class has a method
assignTimestampsAndWatermarks()to provide a custom timestamp and watermark assigner. Flink provides the abstract
BoundedOutOfOrdernessTimestampExtractorclass to implement timestamp extractors with bounded out-of-orderness (watermarks follow timestamps after a fixed time interval). You should extend this class to implement a custom timestamp and watermark assigner. The out-of-orderness of the
TaxiRideevents that were provided by the
TaxiRideSourceand that were written to the Kafka topic depends on the
maxEventDelayparameter of the
TaxiRideSourceof the TaxiRide Cleansing program. The extracted timestamp should be the
TaxiRide.startTimefield or the
TaxiRide.endTimefield converted to a
Reference solutions are available at GitHub: