The goal of this exercise is to join together the
TaxiFare records for each ride in a more robust way than we did in an earlier exercise.
The problem with using a
RichCoFlatMap for this application is that in a real-world system we have to expect that some records will be lost or corrupted. This means that over time we will accumulate an ever-growing collection of unmatched
TaxiFare records waiting to be matched with event data that will never arrive. Eventually our enrichment job will run out of memory.
You can solve this by using the timers available in a
CoProcessFunction to eventually expire and clear any unmatched state that is being kept.
For this exercise you will work with two data streams, one with
TaxiRide events generated by a
CheckpointedTaxiRideSource and the other with
TaxiFare events generated by a
DataStream<TaxiRide> rides = env.addSource( new CheckpointedTaxiRideSource(ridesFile, servingSpeedFactor)); DataStream<TaxiFare> fares = env.addSource( new CheckpointedTaxiFareSource(faresFile, servingSpeedFactor));
We are recommending you use these checkpointed sources in case you want to run your solution on a cluster and experiment with making it truly fault tolerant.
Simulating Missing Data
You should arrange for some predictable fraction of the input records to be missing, so you can verify that you are correctly handling clearing the corresponding state.
The exercise code does this in a
FilterFunction on the TaxiRides. It drops all END events, and every 1000th START event.
The result of this exercise is a data stream of
Tuple2<TaxiRide, TaxiFare> records, one for each distinct
rideId. You should ignore the END events, and only join the event for the START of each ride with its corresponding fare data.
In order to clearly see what is happening, create side outputs where you collect each unmatched
TaxiFare that is discarded in the
OnTimer method of the
Once the join is basically working, don’t bother printing the joined records. Instead, print to standard out everything going to the side outputs, and verify that the results make sense. If you use the filter proposed above, then you should see something like this. These are
TaxiFare records that were stored in state for a time, but eventually discarded because the matching
TaxiRide events hadn’t arrived.
1> 1000,2013000992,2013000989,2013-01-01 00:05:38,CSH,0.0,4.8,18.3 3> 2000,2013001967,2013001964,2013-01-01 00:08:25,CSH,0.0,0.0,17.5 3> 4000,2013003768,2013003765,2013-01-01 00:13:00,CSH,0.0,0.0,26.5 4> 3000,2013002904,2013002901,2013-01-01 00:11:00,CRD,4.38,0.0,22.38 4> 5000,2013004578,2013004575,2013-01-01 00:15:03,CSH,0.0,0.0,11.0
- Java: com.ververica.flinktraining.exercises.datastream_java.process.ExpiringStateExercise
- Scala: com.ververica.flinktraining.exercises.datastream_scala.process.ExpiringStateExercise
Reference solutions are available at GitHub: