The task of the “Travel Time Prediction” exercise is to predict the travel time of taxi rides when they start, i.e., the program should emit a prediction for each taxi ride start event that is processed. The predictions are computed from a regression model that is incrementally updated for each taxi ride end events, which carry the information how much time a ride took. In the following, we describe the task in more detail and introduce the provided utility classes.
The idea of the prediction exercise is to train a model for each destination grid cell based on the direction and air-line distance of the departure to the destination location. The
GeoUtils class provides two methods to compute these values:
GeoUtils.getDirectionAngle(). The actual travel time can be computed from the start and end times of a taxi ride end event. We provide a simple prediction model
TravelTimePredictionModel for the prediction task.
TravelTimePredictionModel.predictTravelTime() returns a time prediction for a given distance and direction and
-1 if no prediction is possible yet.
TravelTimePredictionModel.refineModel() improves the model for a given direction, distance, and actual travel time.
Since the prediction model is valuable operator state, it should not get lost in case of a failure. Therefore, you should register the model as operator state such that Flink can take care of checkpointing the model and restoring it in case of a failure.
Stateful and fault-tolerant streaming applications require a few settings on the
Configure Flink to perform a consistent checkpoint of a program’s operator state every 1000ms.
StreamExecutionEnvironment env = ... env.enableCheckpointing(1000);
Configure Flink to try to restart the job 60 times with a 10 second delay. If the job cannot be restarted within 60 attempts, it fails.
env.setRestartStrategy( RestartStrategies.fixedDelayRestart( 60, // 60 retries Time.of(10, TimeUnit.SECONDS) // 10 secs delay ));
Note that by default, Flink’s checkpoints are persisted on the JobManager’s heap. This is usually fine for development and testing, so long as your application doesn’t have large amounts of state. But this exercise is likely to keep too much state for that to suffice, and you should configure Flink to use the filesystem state backend instead:
This exercise is based on a stream of taxi ride events. Since the
TaxiRideSource that we used so far is not able to checkpoint its internal state. Instead we are using the
CheckpointedTaxiRideSource for this exercise.
CheckpointedTaxiRideSource is used similar to
TaxiRideSource except that it does not accept a
The result of the exercise should be a
DataStream<Tuple2<Long, Integer>> where the first
Long field is the
rideId of a taxi ride and the second
Integer field is the predicted travel time in minutes. If no prediction is possible the prediction should be
The resulting stream should be printed to standard out.
TaxiRidesource function and requires a filter transformation to remove all records that do not start or end in New York City. Since we want to build a model for each destination grid cell, we need to compute grid cell id of the destination location for each event and organize the stream by that cell id. Subsequently, we need a
FlatMapFunctionthat emits predictions for start ride events and updates the model for end ride events. Finally, the predictions are printed to the standard out. Do not forget to configure the checkpointing interval to enable state checkpoints by calling
MapFunctionthat calls the
TaxiRide.endLatcoordinates and a subsequent
keyByoperation can be used to organize the stream by grid cell id.
RichFlatMapFunctionthat holds the prediction model as a key-value
ValueState. The key-value state is defined in the function’s
open()method by creating a
ValueStateDescriptorand calling the
getRuntimeContext().getState()method which returns a
ValueStateobject that is kept as a local member variable in the function. In the
flatMap()method, the state for the current key is obtained by calling
ValueState.value()and updated by calling
Reference solutions are available at GitHub: