- Snapshot – a generic term referring to a global, consistent image of the state of a Flink job. A snapshot includes a pointer into each of the data sources (e.g., an offset into a file or Kafka partition), as well as a copy of the state from each of the job’s stateful operators that resulted from having processed all of the events up to those positions in the sources.
- Checkpoint – a snapshot taken automatically by Flink for the purpose of being able to recover from faults. Checkpoints can be incremental, and are optimized for being restored quickly.
- Externalized Checkpoint – normally checkpoints are not intended to be manipulated by users. Flink retains only the n-most-recent checkpoints (n being configurable) while a job is running, and deletes them when a job is cancelled. But you can configure them to be retained instead, in which case you can manually resume from them.
- Savepoint – a snapshot triggered manually by a user (or an API call) for some operational purpose, such as a stateful redeploy/upgrade/rescaling operation. Savepoints are always complete, and are optimized for operational flexibility.
How does State Snapshotting Work?
Briefly though, when a task manager is instructed by the checkpoint coordinator (part of the job manager) to begin a checkpoint, it has all of the sources record their offsets and insert numbered checkpoint barriers into their streams. These barriers flow through the job graph, indicating the part of the stream before and after each checkpoint.
Checkpoint n will contain the state of each operator that resulted from having consumed every event before checkpoint barrier n, and none of the events after it.
As each operator in the job graph receives one of these barriers, it records its state. Operators with two input streams (such as a CoProcessFunction) perform barrier alignment so that the snapshot will reflect the state resulting from consuming events from both input streams up to (but not past) both barriers.
Flink’s state backends use a copy-on-write mechanism to allow stream processing to continue unimpeded while older versions of the state are being asynchronously snapshotted. Only when the snapshots have been durably persisted will these older versions of the state be garbage collected.
Exactly Once Guarantees
When things go wrong in a stream processing application, it’s possible to have either lost, or duplicated results. With Flink, depending on the choices you make for your application and the cluster you run it on, any of these outcomes is possible:
- Flink makes no effort to recover from failures (at most once)
- Nothing is lost, but you may experience duplicated results (at least once)
- Nothing is lost or duplicated (exactly once)
Given that Flink recovers from faults by rewinding and replaying the source data streams, when we describe the ideal situation as “exactly once” we don’t mean that every event has been processed exactly once. Instead, we mean that every event has affected the state being managed by Flink exactly once.
To achieve exactly once end-to-end, so that every event from the sources affects the sinks exactly once, the following must be true:
- your sources must be replayable, and
- your sinks must be transactional (or idempotent)
The Flink documentation describes which of its source and sink connectors satisfy these requirements (link).
If you don’t need exactly once semantics, you can gain some performance by disabling barrier alignment. This is done by configuring Flink to use
From the documentation:
- Data Streaming Fault Tolerance
- Fault Tolerance Guarantees of Data Sources and Sinks
- Enabling and Configuring Checkpointing
- Tuning Checkpoints and Large State
- Monitoring Checkpointing
- Restart Strategies
From Flink Forward: