I believe there's a subtle bug in the update()
method, and how it decides on
either loading from a checkpoint or applying the log from json files.
Here's the important bits:
Ok(last_check_point) => {
if self.last_check_point.is_none()
|| self.last_check_point == Some(last_check_point)
{
self.last_check_point = Some(last_check_point);
self.restore_checkpoint(last_check_point).await?;
self.version = last_check_point.version + 1;
}
}
What this is doing is
- load from checkpoint if new checkpoint is the same as previously used checkpoint
- if the new checkpoint is different, don't use it but instead apply the json deltas.
I believe what we want instead is use the json deltas if the new checkpoint is the
same as the previous one, and use the checkpoint if it is newer than what we previously
had loaded.
Note that the final result - an up to date delta table - is still achieved, but it
unnecessarily loads checkpoints or json deltas in either scenario, so it could be
more efficient.
I wasn't able to write a self-contained test inside rust, as I'm not sure about
the status of write support (and checkpointing in particular), but I validated
my assumptions by running a spark shell session to generate commits and a rust
session (with some println!
sparkles) side by side, based on the following
"data generator" in scala:
def createCommits(numCommits: Int, deltaUrl: String): Unit =
(0 until numCommits).foreach { c =>
Seq(c).toDF("version").write.format("delta").mode("append").save(deltaUrl)
}
- start: 5 commits
createCommits(5, "<some delta path>")
reading delta table: "<some delta path>"
[apply_logs_after_current_version] applied 0
[apply_logs_after_current_version] applied 1
[apply_logs_after_current_version] applied 2
[apply_logs_after_current_version] applied 3
[apply_logs_after_current_version] applied 4
[apply_logs_after_current_version] end of log : 5, rollback
initial table loaded, version 4
waiting on keypress for update, current version 4
Init: loading the table, all good.
- 4 more commits: no checkpoints yet
updating ...
[update] process started
[update] no previous checkpoint, trying version = 5 ff
[apply_logs_after_current_version] applied 5
[apply_logs_after_current_version] applied 6
[apply_logs_after_current_version] applied 7
[apply_logs_after_current_version] applied 8
[apply_logs_after_current_version] end of log : 9, rollback
waiting on keypress for update, current version 8
Expected behavior: start with our current version and apply new comits in sequence.
- 4 more commits: new checkpoint at v10
updating ...
[update] process started
[update] retrieved checkpoint CheckPoint { version: 10, size: 13, parts: None }, previous = None
[update] determined to load a checkpoint: CheckPoint { version: 10, size: 13, parts: None }
[restore_checkpoint]: CheckPoint { version: 10, size: 13, parts: None }
[apply_logs_after_current_version] applied 11
[apply_logs_after_current_version] applied 12
[apply_logs_after_current_version] end of log : 13, rollback
waiting on keypress for update, current version 12
Expected behavior: load checkpoint, apply changes from there
- 4 more commits: no new checkpoint, still at v10
updating ...
[update] process started
[update] retrieved checkpoint CheckPoint { version: 10, size: 13, parts: None }, previous = Some(CheckPoint { version: 10, size: 13, parts: None })
[update] determined to load a checkpoint: CheckPoint { version: 10, size: 13, parts: None }
[restore_checkpoint]: CheckPoint { version: 10, size: 13, parts: None }
[apply_logs_after_current_version] applied 11
[apply_logs_after_current_version] applied 12
[apply_logs_after_current_version] applied 13
[apply_logs_after_current_version] applied 14
[apply_logs_after_current_version] applied 15
[apply_logs_after_current_version] applied 16
[apply_logs_after_current_version] end of log : 17, rollback
waiting on keypress for update, current version 16
Unexpected behavior: we already have read until version 12, but we reload from
checkpoint at version 10, applying more json than necessary (and a checkpoint
that does not help, either).
- no new commits, should do nothing
updating ...
[update] process started
[update] retrieved checkpoint CheckPoint { version: 10, size: 13, parts: None }, previous = Some(CheckPoint { version: 10, size: 13, parts: None })
[update] determined to load a checkpoint: CheckPoint { version: 10, size: 13, parts: None }
[restore_checkpoint]: CheckPoint { version: 10, size: 13, parts: None }
[apply_logs_after_current_version] applied 11
[apply_logs_after_current_version] applied 12
[apply_logs_after_current_version] applied 13
[apply_logs_after_current_version] applied 14
[apply_logs_after_current_version] applied 15
[apply_logs_after_current_version] applied 16
[apply_logs_after_current_version] end of log : 17, rollback
waiting on keypress for update, current version 16
Unexpected behavior: we reload starting from the previous checkpoint, even though
now new commits where added to the delta log.
- 4 more commits: new checkpoint at v20
updating ...
[update] process started
[update] retrieved checkpoint CheckPoint { version: 20, size: 23, parts: None }, previous = Some(CheckPoint { version: 10, size: 13, parts: None })
[apply_logs_after_current_version] applied 16
[apply_logs_after_current_version] applied 17
[apply_logs_after_current_version] applied 18
[apply_logs_after_current_version] applied 19
[apply_logs_after_current_version] applied 20
[apply_logs_after_current_version] end of log : 21, rollback
waiting on keypress for update, current version 20
Unexpected behavior: despite having a checkpoint at 20, we use json all
the way up for versions 16 .. 20.
I believe the expected behavior can be achieved with a single-character change
in the logical expression. I'd gladly provide a pull request.