The goal of this exercise is to join together the TaxiRide START and the TaxiFare records for each ride in a more robust way than we did in an earlier exercise.

The problem with using a RichCoFlatMap for this application is that in a real-world system we have to expect that some records will be lost or corrupted. This means that over time we will accumulate an ever-growing collection of unmatched TaxiRide and TaxiFare records waiting to be matched with event data that will never arrive. Eventually our enrichment job will run out of memory.

You can solve this by using the timers available in a CoProcessFunction to eventually expire and clear any unmatched state that is being kept.

Input Data

For this exercise you will work with two data streams, one with TaxiRide events generated by a TaxiRideSource and the other with TaxiFare events generated by a TaxiFareSource.

Simulating Missing Data

We have arranged for a predictable fraction of the input records to be missing, so you can verify that you are correctly handling clearing the corresponding state.

The exercise code does this in a FilterFunction on the TaxiRides. It drops all END events, and every 1000th START event.


DataStream<TaxiRide> rides = env
  .addSource(new TaxiRideSource(ridesFile, servingSpeedFactor))
  .filter((TaxiRide ride) -> (ride.isStart && (ride.rideId % 1000 != 0)))


val rides = env
  .addSource(new TaxiRideSource(ridesFile, servingSpeedFactor))
  .filter { ride => ride.isStart && (ride.rideId % 1000 != 0) }

Expected Output

The result of this exercise is a data stream of Tuple2<TaxiRide, TaxiFare> records, one for each distinct rideId. You should join the event for the START of each ride with its corresponding fare data.

In order to clearly see what is happening, create side outputs where you collect each unmatched TaxiRide and TaxiFare that is discarded in the OnTimer method of the CoProcessFunction.

Once the join is basically working, don’t bother printing the joined records. Instead, print to standard out everything going to the side outputs, and verify that the results make sense. If you use the filter proposed above, then you should see something like this. These are TaxiFare records that were stored in state for a time, but eventually discarded because the matching TaxiRide events hadn’t arrived.

1> 1000,2013000992,2013000989,2013-01-01 00:05:38,CSH,0.0,4.8,18.3
3> 2000,2013001967,2013001964,2013-01-01 00:08:25,CSH,0.0,0.0,17.5
3> 4000,2013003768,2013003765,2013-01-01 00:13:00,CSH,0.0,0.0,26.5
4> 3000,2013002904,2013002901,2013-01-01 00:11:00,CRD,4.38,0.0,22.38
4> 5000,2013004578,2013004575,2013-01-01 00:15:03,CSH,0.0,0.0,11.0

Getting Started



Exercise Classes


Reference Solution

Reference solutions are available at GitHub: