[go: up one dir, main page]

Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: allow passing down existing dataset for write #3119

Merged
merged 12 commits into from
Nov 19, 2024

Conversation

wjones127
Copy link
Contributor
@wjones127 wjones127 commented Nov 11, 2024

BREAKING CHANGE: return value in Rust of write_fragments() has changed to Result<Transaction>.

@codecov-commenter
Copy link
codecov-commenter commented Nov 12, 2024

Codecov Report

Attention: Patch coverage is 73.83178% with 140 lines in your changes missing coverage. Please review.

Project coverage is 77.94%. Comparing base (c47543f) to head (60505db).
Report is 2 commits behind head on main.

Files with missing lines Patch % Lines
rust/lance/src/dataset/write/insert.rs 73.22% 61 Missing and 7 partials ⚠️
rust/lance/src/dataset/write/commit.rs 77.61% 44 Missing and 1 partial ⚠️
rust/lance/src/dataset/write.rs 58.69% 18 Missing and 1 partial ⚠️
rust/lance/src/dataset.rs 75.75% 4 Missing and 4 partials ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #3119      +/-   ##
==========================================
+ Coverage   77.91%   77.94%   +0.03%     
==========================================
  Files         240      242       +2     
  Lines       81465    81738     +273     
  Branches    81465    81738     +273     
==========================================
+ Hits        63470    63709     +239     
- Misses      14816    14857      +41     
+ Partials     3179     3172       -7     
Flag Coverage Δ
unittests 77.94% <73.83%> (+0.03%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.


🚨 Try these New Features:

@wjones127 wjones127 changed the title perf: allow passing down existing dataset for write feat!: allow passing down existing dataset for write Nov 13, 2024
@github-actions github-actions bot added enhancement New feature or request java labels Nov 13, 2024
@wjones127 wjones127 marked this pull request as ready for review November 14, 2024 19:12
Copy link
Collaborator
@LuQQiu LuQQiu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some questions, thanks!

}

/// Whether to use move-stable row ids. This makes the `_rowid` column stable
/// after compaction, but not updates.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

even during re-compaction?
interesting, not updates, will update create new rows with new _rowid?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even during re-compaction, yeah. We don't keep them during updates because that requires invalidating the secondary indices.

You can read more in the design docs:

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Will! will take a look at the design docs!

///
/// This can be used to stage changes or to handle "secondary" datasets
/// whose lineage is tracked elsewhere.
pub fn with_detached(mut self, detached: bool) -> Self {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interesting, sounds like git

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

anybody is using this feature or why this feature is asked?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's kind of like git. Though there only one branch, and everything else is detached.

Weston added this feature to support "balanced storage" for large blob columns. Basically we used this to create a separate hidden dataset that stores the blob data, and by doing this we could compact that data at a different rate than other columns.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, may be with 2.1 we don't need to have this kind of detached dataset for blob?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually it was designed with 2.1 in mind. The ideal rows per file differs so much between small and wide columns that they need to essentially be in different datasets to offer good enough OLAP and random access performance. We wrap it up quite seamlessly though, so for the most part it feels like just another column.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, for example, imagine a column contains 1MB images. 1M rows would mean 1TB for just that one column. 1TB files are difficult to work with in cloud storage (operations take a long time, sometimes they fail) and running compaction (e.g. on two 500K files) is very slow.

With balanced storage we have two datasets. The primary dataset has all the smaller columns. The blob dataset has all the larger columns. Each dataset targets a reasonable size for files (e.g. 64GB). The primary dataset has fewer fragments but each fragment has at least 1M rows (important for small data types). The secondary dataset has more fragments but the data types are so large we don't care about the # rows per files as much.

/// Pass an object store registry to use.
///
/// If an object store is passed, this registry will be ignored.
pub fn with_object_store_registry(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: is there a macro we can use to generate this builder boilerplate?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could maybe write a macro, but doesn't seem that worth it to me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is https://github.com/colin-kiegel/rust-derive-builder but I agree I'm on the fence about more macros as I think I've developed FUD association between macros and build times.

Copy link
Contributor
@westonpace westonpace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a big fan of the change. I think this looks a lot cleaner and easier to follow than the old stuff. Plus, I think it'll lead really nicely into my task for this week (and eventually for a bulk insert task): #3138

Comment on lines +1124 to +1131
mode: str, default 'append'
The mode to use when writing the data. Options are:
**create** - create a new dataset (raises if uri already exists).
**overwrite** - create a new snapshot version
**append** - create a new version that is the concat of the input the
latest version (raises if uri does not exist)
**kwargs : dict, optional
Additional keyword arguments to pass to :func:`write_dataset`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why call out mode specifically and leave kwargs for the rest? Because the default changed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah mainly for the change in default.

Comment on lines +2177 to +2178
elif isinstance(base_uri, LanceDataset):
base_uri = base_uri._ds
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor nit: maybe rename base_uri to base_uri_or_dataset? Took me a second to realize this wasn't:

if isinstance(base_uri, Path):
  base_uri = str(base_uri)
elif isinstance(base_uri, LanceDataset):
  base_uri = base_uri._uri

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to, but there were unit tests were we passed base_uri as a named argument, and thus realized this could be an annoying breaking change for some users.

Comment on lines +630 to +633
let mut builder = CommitBuilder::new(base_uri)
.with_object_store_registry(object_store_registry)
.enable_v2_manifest_paths(enable_v2_manifest_paths)
.with_detached(detached);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems a little strange to have a builder pattern here but not expose it to the API (e.g. have do_commit return CommitBuilder. Is the goal simply to maintain the API for now for backwards compatibility?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The builders are public. I just didn't want to disrupt the API surface too much. I figure we'll deprecate the old methods later in favor of the builders.

@@ -554,6 +569,37 @@ impl WriterGenerator {
}
}

// Given input options resolve what the commit handler should be.
async fn resolve_commit_handler(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did this method show up in here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was in src/dataset.rs and is used both in CommitBuilder and InsertBuilder. I thought about moving it to lance-io, but the logic ended up feeling specific to how those builder APIs wanted to interpret handle parameters.

/// Pass an object store registry to use.
///
/// If an object store is passed, this registry will be ignored.
pub fn with_object_store_registry(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is https://github.com/colin-kiegel/rust-derive-builder but I agree I'm on the fence about more macros as I think I've developed FUD association between macros and build times.

///
/// This can be used to stage changes or to handle "secondary" datasets
/// whose lineage is tracked elsewhere.
pub fn with_detached(mut self, detached: bool) -> Self {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, for example, imagine a column contains 1MB images. 1M rows would mean 1TB for just that one column. 1TB files are difficult to work with in cloud storage (operations take a long time, sometimes they fail) and running compaction (e.g. on two 500K files) is very slow.

With balanced storage we have two datasets. The primary dataset has all the smaller columns. The blob dataset has all the larger columns. Each dataset targets a reasonable size for files (e.g. 64GB). The primary dataset has fewer fragments but each fragment has at least 1M rows (important for small data types). The secondary dataset has more fragments but the data types are so large we don't care about the # rows per files as much.

Comment on lines +164 to +339
self.use_move_stable_row_ids.unwrap_or(false)
};

// Validate storage format matches existing dataset
if let Some(ds) = dest.dataset() {
if let Some(storage_format) = self.storage_format {
let passed_storage_format = DataStorageFormat::new(storage_format);
if ds.manifest.data_storage_format != passed_storage_format
&& !matches!(transaction.operation, Operation::Overwrite { .. })
{
return Err(Error::InvalidInput {
source: format!(
"Storage format mismatch. Existing dataset uses {:?}, but new data uses {:?}",
ds.manifest.data_storage_format,
passed_storage_format
).into(),
location: location!(),
});
}
}
}

let manifest_config = ManifestWriteConfig {
use_move_stable_row_ids,
storage_format: self.storage_format.map(DataStorageFormat::new),
..Default::default()
};

let manifest = if let Some(dataset) = dest.dataset() {
if self.detached {
if matches!(manifest_naming_scheme, ManifestNamingScheme::V1) {
return Err(Error::NotSupported {
source: "detached commits cannot be used with v1 manifest paths".into(),
location: location!(),
});
}
commit_detached_transaction(
dataset,
object_store.as_ref(),
commit_handler.as_ref(),
&transaction,
&manifest_config,
&self.commit_config,
)
.await?
} else {
commit_transaction(
dataset,
object_store.as_ref(),
commit_handler.as_ref(),
&transaction,
&manifest_config,
&self.commit_config,
manifest_naming_scheme,
)
.await?
}
} else if self.detached {
// I think we may eventually want this, and we can probably handle it, but leaving a TODO for now
return Err(Error::NotSupported {
source: "detached commits cannot currently be used to create new datasets".into(),
location: location!(),
});
} else {
commit_new_dataset(
object_store.as_ref(),
commit_handler.as_ref(),
&base_path,
&transaction,
&manifest_config,
manifest_naming_scheme,
)
.await?
};

let tags = Tags::new(
object_store.clone(),
commit_handler.clone(),
base_path.clone(),
);

match &self.dest {
WriteDestination::Dataset(dataset) => Ok(Dataset {
manifest: Arc::new(manifest),
session: self.session.unwrap_or(dataset.session.clone()),
..dataset.as_ref().clone()
}),
WriteDestination::Uri(uri) => Ok(Dataset {
object_store,
base: base_path,
uri: uri.to_string(),
manifest: Arc::new(manifest),
session: self.session.unwrap_or_default(),
commit_handler,
tags,
manifest_naming_scheme,
}),
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have enough unit tests around this method that I'll assume this was ported over correctly 😆

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I did a lot of work to get it passing with the existing tests. I feel pretty confident in it.

rust/lance/src/dataset/write/insert.rs Show resolved Hide resolved
rust/lance/src/dataset/write/insert.rs Show resolved Hide resolved
Ok(())
}

async fn resolve_context(&self) -> Result<WriteContext<'a>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this division of labor. Well done.

Copy link
Contributor
@westonpace westonpace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One other thought I had. Could the insert builder have a way of generating a collection of fragments instead of a transaction? E.g. how would I use it if I wanted to run the insert builder twice and then commit both sets of files as a single transaction later?

@wjones127
Copy link
Contributor Author

One other thought I had. Could the insert builder have a way of generating a collection of fragments instead of a transaction? E.g. how would I use it if I wanted to run the insert builder twice and then commit both sets of files as a single transaction later?

😄 Later today I'll have a PR that adds a new API to CommitBuilder: CommitBuilder::execute_batch(transactions: &[Transaction]) -> BatchCommitResult. This will merge all compatible transactions together into a single transaction and commit that. This makes it kind of a nice API for distributed writes, and at the same time serves as a poor-man's multi statement transaction.

Co-authored-by: Weston Pace <weston.pace@gmail.com>
@wjones127 wjones127 merged commit 566082f into lancedb:main Nov 19, 2024
26 checks passed
@wjones127 wjones127 deleted the perf/pass-down-dataset branch November 19, 2024 19:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants