-
Notifications
You must be signed in to change notification settings - Fork 228
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat!: allow passing down existing dataset for write #3119
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3119 +/- ##
==========================================
+ Coverage 77.91% 77.94% +0.03%
==========================================
Files 240 242 +2
Lines 81465 81738 +273
Branches 81465 81738 +273
==========================================
+ Hits 63470 63709 +239
- Misses 14816 14857 +41
+ Partials 3179 3172 -7
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚨 Try these New Features:
|
5bca1f8
to
ea24413
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some questions, thanks!
} | ||
|
||
/// Whether to use move-stable row ids. This makes the `_rowid` column stable | ||
/// after compaction, but not updates. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
even during re-compaction?
interesting, not updates, will update create new rows with new _rowid?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even during re-compaction, yeah. We don't keep them during updates because that requires invalidating the secondary indices.
You can read more in the design docs:
- Move-stable row ids
- Primary keys - when we make row ids stable after updates too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Will! will take a look at the design docs!
/// | ||
/// This can be used to stage changes or to handle "secondary" datasets | ||
/// whose lineage is tracked elsewhere. | ||
pub fn with_detached(mut self, detached: bool) -> Self { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
interesting, sounds like git
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
anybody is using this feature or why this feature is asked?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's kind of like git. Though there only one branch, and everything else is detached.
Weston added this feature to support "balanced storage" for large blob columns. Basically we used this to create a separate hidden dataset that stores the blob data, and by doing this we could compact that data at a different rate than other columns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting, may be with 2.1 we don't need to have this kind of detached dataset for blob?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually it was designed with 2.1 in mind. The ideal rows per file differs so much between small and wide columns that they need to essentially be in different datasets to offer good enough OLAP and random access performance. We wrap it up quite seamlessly though, so for the most part it feels like just another column.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, for example, imagine a column contains 1MB images. 1M rows would mean 1TB for just that one column. 1TB files are difficult to work with in cloud storage (operations take a long time, sometimes they fail) and running compaction (e.g. on two 500K files) is very slow.
With balanced storage we have two datasets. The primary dataset has all the smaller columns. The blob dataset has all the larger columns. Each dataset targets a reasonable size for files (e.g. 64GB). The primary dataset has fewer fragments but each fragment has at least 1M rows (important for small data types). The secondary dataset has more fragments but the data types are so large we don't care about the # rows per files as much.
/// Pass an object store registry to use. | ||
/// | ||
/// If an object store is passed, this registry will be ignored. | ||
pub fn with_object_store_registry( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: is there a macro we can use to generate this builder boilerplate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could maybe write a macro, but doesn't seem that worth it to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is https://github.com/colin-kiegel/rust-derive-builder but I agree I'm on the fence about more macros as I think I've developed FUD association between macros and build times.
da2fadd
to
a30eba9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a big fan of the change. I think this looks a lot cleaner and easier to follow than the old stuff. Plus, I think it'll lead really nicely into my task for this week (and eventually for a bulk insert task): #3138
mode: str, default 'append' | ||
The mode to use when writing the data. Options are: | ||
**create** - create a new dataset (raises if uri already exists). | ||
**overwrite** - create a new snapshot version | ||
**append** - create a new version that is the concat of the input the | ||
latest version (raises if uri does not exist) | ||
**kwargs : dict, optional | ||
Additional keyword arguments to pass to :func:`write_dataset`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why call out mode
specifically and leave kwargs
for the rest? Because the default changed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah mainly for the change in default.
elif isinstance(base_uri, LanceDataset): | ||
base_uri = base_uri._ds |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor nit: maybe rename base_uri
to base_uri_or_dataset
? Took me a second to realize this wasn't:
if isinstance(base_uri, Path):
base_uri = str(base_uri)
elif isinstance(base_uri, LanceDataset):
base_uri = base_uri._uri
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted to, but there were unit tests were we passed base_uri
as a named argument, and thus realized this could be an annoying breaking change for some users.
let mut builder = CommitBuilder::new(base_uri) | ||
.with_object_store_registry(object_store_registry) | ||
.enable_v2_manifest_paths(enable_v2_manifest_paths) | ||
.with_detached(detached); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems a little strange to have a builder pattern here but not expose it to the API (e.g. have do_commit
return CommitBuilder
. Is the goal simply to maintain the API for now for backwards compatibility?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The builders are public. I just didn't want to disrupt the API surface too much. I figure we'll deprecate the old methods later in favor of the builders.
@@ -554,6 +569,37 @@ impl WriterGenerator { | |||
} | |||
} | |||
|
|||
// Given input options resolve what the commit handler should be. | |||
async fn resolve_commit_handler( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did this method show up in here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was in src/dataset.rs
and is used both in CommitBuilder
and InsertBuilder
. I thought about moving it to lance-io
, but the logic ended up feeling specific to how those builder APIs wanted to interpret handle parameters.
/// Pass an object store registry to use. | ||
/// | ||
/// If an object store is passed, this registry will be ignored. | ||
pub fn with_object_store_registry( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is https://github.com/colin-kiegel/rust-derive-builder but I agree I'm on the fence about more macros as I think I've developed FUD association between macros and build times.
/// | ||
/// This can be used to stage changes or to handle "secondary" datasets | ||
/// whose lineage is tracked elsewhere. | ||
pub fn with_detached(mut self, detached: bool) -> Self { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, for example, imagine a column contains 1MB images. 1M rows would mean 1TB for just that one column. 1TB files are difficult to work with in cloud storage (operations take a long time, sometimes they fail) and running compaction (e.g. on two 500K files) is very slow.
With balanced storage we have two datasets. The primary dataset has all the smaller columns. The blob dataset has all the larger columns. Each dataset targets a reasonable size for files (e.g. 64GB). The primary dataset has fewer fragments but each fragment has at least 1M rows (important for small data types). The secondary dataset has more fragments but the data types are so large we don't care about the # rows per files as much.
self.use_move_stable_row_ids.unwrap_or(false) | ||
}; | ||
|
||
// Validate storage format matches existing dataset | ||
if let Some(ds) = dest.dataset() { | ||
if let Some(storage_format) = self.storage_format { | ||
let passed_storage_format = DataStorageFormat::new(storage_format); | ||
if ds.manifest.data_storage_format != passed_storage_format | ||
&& !matches!(transaction.operation, Operation::Overwrite { .. }) | ||
{ | ||
return Err(Error::InvalidInput { | ||
source: format!( | ||
"Storage format mismatch. Existing dataset uses {:?}, but new data uses {:?}", | ||
ds.manifest.data_storage_format, | ||
passed_storage_format | ||
).into(), | ||
location: location!(), | ||
}); | ||
} | ||
} | ||
} | ||
|
||
let manifest_config = ManifestWriteConfig { | ||
use_move_stable_row_ids, | ||
storage_format: self.storage_format.map(DataStorageFormat::new), | ||
..Default::default() | ||
}; | ||
|
||
let manifest = if let Some(dataset) = dest.dataset() { | ||
if self.detached { | ||
if matches!(manifest_naming_scheme, ManifestNamingScheme::V1) { | ||
return Err(Error::NotSupported { | ||
source: "detached commits cannot be used with v1 manifest paths".into(), | ||
location: location!(), | ||
}); | ||
} | ||
commit_detached_transaction( | ||
dataset, | ||
object_store.as_ref(), | ||
commit_handler.as_ref(), | ||
&transaction, | ||
&manifest_config, | ||
&self.commit_config, | ||
) | ||
.await? | ||
} else { | ||
commit_transaction( | ||
dataset, | ||
object_store.as_ref(), | ||
commit_handler.as_ref(), | ||
&transaction, | ||
&manifest_config, | ||
&self.commit_config, | ||
manifest_naming_scheme, | ||
) | ||
.await? | ||
} | ||
} else if self.detached { | ||
// I think we may eventually want this, and we can probably handle it, but leaving a TODO for now | ||
return Err(Error::NotSupported { | ||
source: "detached commits cannot currently be used to create new datasets".into(), | ||
location: location!(), | ||
}); | ||
} else { | ||
commit_new_dataset( | ||
object_store.as_ref(), | ||
commit_handler.as_ref(), | ||
&base_path, | ||
&transaction, | ||
&manifest_config, | ||
manifest_naming_scheme, | ||
) | ||
.await? | ||
}; | ||
|
||
let tags = Tags::new( | ||
object_store.clone(), | ||
commit_handler.clone(), | ||
base_path.clone(), | ||
); | ||
|
||
match &self.dest { | ||
WriteDestination::Dataset(dataset) => Ok(Dataset { | ||
manifest: Arc::new(manifest), | ||
session: self.session.unwrap_or(dataset.session.clone()), | ||
..dataset.as_ref().clone() | ||
}), | ||
WriteDestination::Uri(uri) => Ok(Dataset { | ||
object_store, | ||
base: base_path, | ||
uri: uri.to_string(), | ||
manifest: Arc::new(manifest), | ||
session: self.session.unwrap_or_default(), | ||
commit_handler, | ||
tags, | ||
manifest_naming_scheme, | ||
}), | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have enough unit tests around this method that I'll assume this was ported over correctly 😆
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I did a lot of work to get it passing with the existing tests. I feel pretty confident in it.
Ok(()) | ||
} | ||
|
||
async fn resolve_context(&self) -> Result<WriteContext<'a>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this division of labor. Well done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One other thought I had. Could the insert builder have a way of generating a collection of fragments instead of a transaction? E.g. how would I use it if I wanted to run the insert builder twice and then commit both sets of files as a single transaction later?
😄 Later today I'll have a PR that adds a new API to |
Co-authored-by: Weston Pace <weston.pace@gmail.com>
BREAKING CHANGE: return value in Rust of
write_fragments()
has changed toResult<Transaction>
.write_fragments
andDataset::commit()
#3058InsertBuilder
andCommitBuilder
.CommitBuilder
.LanceDataset.insert()
method to modify existing datasets.Table.add
resetsindex_cache_size
lancedb#1655