With this exercise you will connect two streams using a
- a stream of
- a query stream on which you can type queries/rules.
The stream of TaxiRides is keyed by the taxiId, and for each taxi, the most recent ride event is retained in keyed state. This means for every taxi, we have a record of its current situation: either it’s somewhere in the middle of a ride that started at some place and time, or the last thing we heard about was a ride that ended.
The queries are java expressions that when evaluated, produce a boolean value. These query expressions have a
ride and the current
watermark in scope. Whenever a new query expression arrives it is evaluated against all of the stored (keyed) rides, and whenever the query expression evaluates to True for some ride, that ride is emitted by the
KeyedBroadcastProcessFunction. Similarly, as new ride events arrive on their stream, they are also evaluated by the query expression, and emitted if the expression returns True.
true false ride.isStart && (watermark - ride.getEventTime()) > 100 * 60000 !ride.isStart && ride.getEuclideanDistance(-74, 41) < 10.0
Explanations: the third query above matches ongoing rides that started more than 100 minutes ago, and the last query matches rides that end within 10km of the specified location. “true” and “false” are convenient queries if you just want to see that things are basically working.
To keep things simple, the implementation should only have one query at a time – a new query replaces the previous query.
TaxiQueryExercise class we provide creates both streams, and uses Janino to compile the java expressions. What remains to be done is to complete the implementions of the
processBroadcastElement methods of the
Also, since the queries can reference the current watermark of the
KeyedBroadcastProcessFunction, you need to take care that this will work. See the Implementation Hints below for more about this.
On MacOS and Linux you can start the broadcast query stream via
nc -lk 9999
and on Windows you can install ncat from https://nmap.org/ncat/ and then use
ncat -lk 9999
Reference solutions are available at GitHub: