The task of the “Popular Places” exercise is to identify popular places from the taxi ride data stream. This is done by counting every five minutes the number of taxi rides that started and ended in the same area within the last 15 minutes. Arrival and departure locations should be separately counted. Only locations with more arrivals or departures than a provided popularity threshold should be forwarded to the result stream.
GeoUtils class provides a static method
GeoUtils.mapToGridCell(float lon, float lat) which maps a location (longitude, latitude) to a cell id that refers to an area of approximately 100x100 meters size. The
GeoUtils class also provides reverse methods to compute the longitude and latitude of the center of a grid cell.
Please note that the program should operate in event time.
The input data of this exercise is a stream of
TaxiRide events generated by the Taxi Stream Source filtered by the New York City area filter of the Taxi Ride Cleansing exercise (reuse the FilterFunction of the Taxi Ride Cleansing exercise).
TaxiRideSource annotates the generated
DataStream<TaxiRide> with timestamps and watermarks. Hence, there is no need to provide a custom timestamp and watermark assigner in order to correctly use event time.
The result of this exercise is a data stream of
Tuple5<Float, Float, Long, Boolean, Integer> records. Each record contains the longitude and latitude of the location cell (two
Float values), the timestamp of the count (
Long), a flag indicating arrival or departure counts (
Boolean), and the actual count (
The resulting stream should be printed to standard out.
- Java: com.dataartisans.flinktraining.exercises.datastream_java.windows.PopularPlacesExercise
- Scala: com.dataartisans.flinktraining.exercises.datastream_scala.windows.PopularPlacesExercise
TaxiRideevent must be mapped to a cell id. This can be done by a
MapFunctionwhich calls the
GeoUtils.mapToGridCell()method. Start events are mapped to their departure location, end events are mapped to their destination location.
DataStream.timeWindow(Time.minutes(15), Time.minutes(5))to define a sliding time window of 15 minutes that triggers every five minutes. Use
apply(WindowFunction)to count the number of elements in the window and add the end time of the window which can be queried with
Reference solutions are available at GitHub: