The goal of this exercise is to join together the TaxiRide and TaxiFare records for each ride.

For each distinct rideId, there are exactly three events:

  1. a TaxiRide START event
  2. a TaxiRide END event
  3. a TaxiFare event (whose timestamp happens to match the start time)

The result should be a DataStream<Tuple2<TaxiRide, TaxiFare>>, with one record one for each distinct rideId. Each tuple should pair the TaxiRide START event for some rideId with its matching TaxiFare.

Input Data

For this exercise you will work with two data streams, one with TaxiRide events generated by a TaxiRideSource and the other with TaxiFare events generated by a TaxiFareSource. See Using the Taxi Data Streams for information on how to download the data and how to work with these stream generators.

Expected Output

The result of this exercise is a data stream of Tuple2<TaxiRide, TaxiFare> records, one for each distinct rideId. The exercise is setup to ignore the END events, and you should join the event for the START of each ride with its corresponding fare event.

The resulting stream is printed to standard out.

Getting Started

Rather than following these links to GitHub, you might prefer to open these classes in your IDE:



Exercise Classes

Implementation Hints

You can use a RichCoFlatMap to implement this join operation. Note that you have no control over the order of arrival of the ride and fare records for each rideId, so you’ll need to be prepared to store either piece of information until the matching info arrives, at which point you can emit a Tuple2<TaxiRide, TaxiFare> joining the two records together.
You should be using Flink’s managed, keyed state to buffer the data that is being held until the matching event arrives. And be sure to clear the state once it is no longer needed.


For the purposes of this exercise it’s okay to assume that the START and fare events are perfectly paired. But in a real-world application you should worry about the fact that whenever an event is missing, the other event for the same rideId will be held in state forever. In a later lab we’ll look at how you might handle that situation.


Reference Solutions

Reference solutions are available at GitHub: