If you haven’t already done so, you’ll need to first setup your Flink development environment. See How to do the Exercises for an overall introduction to these exercises.

With this exercise you will connect two streams using a KeyedBroadcastProcessFunction:

  1. a stream of TaxiRide events, and
  2. a query stream on which you can type queries/rules.

The stream of TaxiRides is keyed by the taxiId, and for each taxi, the most recent ride event is retained in keyed state. This means for every taxi, we have a record of its current situation: either it’s somewhere in the middle of a ride that started at some place and time, or the last thing we heard about was a ride that ended.

The queries are java expressions that when evaluated, produce a boolean value. These query expressions have a ride and the current watermark in scope. Whenever a new query expression arrives it is evaluated against all of the stored (keyed) rides, and whenever the query expression evaluates to True for some ride, that ride is emitted by the KeyedBroadcastProcessFunction. Similarly, as new ride events arrive on their stream, they are also evaluated by the query expression, and emitted if the expression returns True.

Example queries:



ride.isStart && (watermark - ride.getEventTime()) > 100 * 60000

!ride.isStart && ride.getEuclideanDistance(-74, 41) < 10.0

Explanations: the third query above matches ongoing rides that started more than 100 minutes ago, and the last query matches rides that end within 10km of the specified location. “true” and “false” are convenient queries if you just want to see that things are basically working.

To keep things simple, the implementation should only have one query at a time – a new query replaces the previous query.

The TaxiQueryExercise class we provide creates both streams, and uses Janino to compile the java expressions. What remains to be done is to complete the implementions of the processElement and processBroadcastElement methods of the KeyedBroadcastProcessFunction.

Also, since the queries can reference the current watermark of the KeyedBroadcastProcessFunction, you need to take care that this will work. See the Implementation Hints below for more about this.

Getting Started

On MacOS and Linux you can start the broadcast query stream via

nc -lk 9999

and on Windows you can install ncat from https://nmap.org/ncat/ and then use

ncat -lk 9999



Exercise Class

Implementation Hints

Once the two streams are connected, the Watermark of the KeyedBroadcastProcessFunction operator will be the minimum of the Watermarks of the two connected streams. Our query stream has a default Watermark at Long.MIN_VALUE, and this will hold back the event time clock of the KeyedBroadcastProcessFunction, unless we do something about it.
It’s enough to attach an AssignerWithPeriodicWatermarks to the query stream that always returns Watermark.MAX_WATERMARK as the current Watermark.


Reference Solutions

Reference solutions are available at GitHub: