[go: up one dir, main page]

Skip to content

Commit

Permalink
feat: move hashed state and trie writing to provider (paradigmxyz#9636)
Browse files Browse the repository at this point in the history
  • Loading branch information
Rjected authored Jul 24, 2024
1 parent 7ad93d5 commit 21335d6
Show file tree
Hide file tree
Showing 24 changed files with 1,382 additions and 1,273 deletions.
9 changes: 5 additions & 4 deletions bin/reth/src/commands/debug_cmd/in_memory_merkle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ use reth_network::NetworkHandle;
use reth_network_api::NetworkInfo;
use reth_primitives::BlockHashOrNumber;
use reth_provider::{
AccountExtReader, ChainSpecProvider, HashingWriter, HeaderProvider, LatestStateProviderRef,
OriginalValuesKnown, ProviderFactory, StageCheckpointReader, StateWriter,
StaticFileProviderFactory, StorageReader,
writer::StorageWriter, AccountExtReader, ChainSpecProvider, HashingWriter, HeaderProvider,
LatestStateProviderRef, OriginalValuesKnown, ProviderFactory, StageCheckpointReader,
StateWriter, StaticFileProviderFactory, StorageReader,
};
use reth_revm::database::StateProviderDatabase;
use reth_stages::StageId;
Expand Down Expand Up @@ -168,7 +168,8 @@ impl Command {
.try_seal_with_senders()
.map_err(|_| BlockValidationError::SenderRecoveryError)?,
)?;
execution_outcome.write_to_storage(&provider_rw, None, OriginalValuesKnown::No)?;
let mut storage_writer = StorageWriter::new(Some(&provider_rw), None);
storage_writer.write_to_storage(execution_outcome, OriginalValuesKnown::No)?;
let storage_lists = provider_rw.changed_storages_with_range(block.number..=block.number)?;
let storages = provider_rw.plain_state_storages(storage_lists)?;
provider_rw.insert_storage_for_hashing(storages)?;
Expand Down
9 changes: 6 additions & 3 deletions bin/reth/src/commands/debug_cmd/merkle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ use reth_network_api::NetworkInfo;
use reth_network_p2p::full_block::FullBlockClient;
use reth_primitives::BlockHashOrNumber;
use reth_provider::{
BlockNumReader, BlockWriter, ChainSpecProvider, HeaderProvider, LatestStateProviderRef,
OriginalValuesKnown, ProviderError, ProviderFactory, StateWriter,
writer::StorageWriter, BlockNumReader, BlockWriter, ChainSpecProvider, HeaderProvider,
LatestStateProviderRef, OriginalValuesKnown, ProviderError, ProviderFactory, StateWriter,
};
use reth_revm::database::StateProviderDatabase;
use reth_stages::{
Expand Down Expand Up @@ -150,7 +150,10 @@ impl Command {
),
));
executor.execute_and_verify_one((&sealed_block.clone().unseal(), td).into())?;
executor.finalize().write_to_storage(&provider_rw, None, OriginalValuesKnown::Yes)?;
let execution_outcome = executor.finalize();

let mut storage_writer = StorageWriter::new(Some(&provider_rw), None);
storage_writer.write_to_storage(execution_outcome, OriginalValuesKnown::Yes)?;

let checkpoint = Some(StageCheckpoint::new(
block_number.checked_sub(1).ok_or(eyre::eyre!("GenesisBlockHasNoParent"))?,
Expand Down
9 changes: 4 additions & 5 deletions crates/engine/tree/src/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,16 +78,15 @@ impl<DB: Database> PersistenceService<DB> {
// Must be written after blocks because of the receipt lookup.
let execution_outcome = block.execution_outcome().clone();
// TODO: do we provide a static file producer here?
execution_outcome.write_to_storage(&provider_rw, None, OriginalValuesKnown::No)?;
let mut storage_writer = StorageWriter::new(Some(&provider_rw), None);
storage_writer.write_to_storage(execution_outcome, OriginalValuesKnown::No)?;

// insert hashes and intermediate merkle nodes
{
let trie_updates = block.trie_updates().clone();
let hashed_state = block.hashed_state();
// TODO: use single storage writer in task when sf / db tasks are combined
let storage_writer = StorageWriter::new(Some(&provider_rw), None);
storage_writer.write_hashed_state(&hashed_state.clone().into_sorted())?;
trie_updates.write_to_database(provider_rw.tx_ref())?;
storage_writer.write_trie_updates(&trie_updates)?;
}

// update history indices
Expand Down Expand Up @@ -186,7 +185,7 @@ impl<DB: Database> PersistenceService<DB> {
let receipts_writer =
provider.get_writer(first_block.number, StaticFileSegment::Receipts)?;

let storage_writer = StorageWriter::new(Some(&provider_rw), Some(receipts_writer));
let mut storage_writer = StorageWriter::new(Some(&provider_rw), Some(receipts_writer));
let receipts_iter = blocks.iter().map(|block| {
let receipts = block.execution_outcome().receipts().receipt_vec.clone();
debug_assert!(receipts.len() == 1);
Expand Down
13 changes: 5 additions & 8 deletions crates/optimism/cli/src/commands/import_receipts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ use reth_node_core::version::SHORT_VERSION;
use reth_optimism_primitives::bedrock_import::is_dup_tx;
use reth_primitives::Receipts;
use reth_provider::{
OriginalValuesKnown, ProviderFactory, StageCheckpointReader, StateWriter,
StaticFileProviderFactory, StaticFileWriter, StatsReader,
writer::StorageWriter, OriginalValuesKnown, ProviderFactory, StageCheckpointReader,
StateWriter, StaticFileProviderFactory, StaticFileWriter, StatsReader,
};
use reth_stages::StageId;
use reth_static_file_types::StaticFileSegment;
Expand Down Expand Up @@ -140,7 +140,7 @@ where
);

// We're reusing receipt writing code internal to
// `ExecutionOutcome::write_to_storage`, so we just use a default empty
// `StorageWriter::append_receipts_from_blocks`, so we just use a default empty
// `BundleState`.
let execution_outcome =
ExecutionOutcome::new(Default::default(), receipts, first_block, Default::default());
Expand All @@ -149,11 +149,8 @@ where
static_file_provider.get_writer(first_block, StaticFileSegment::Receipts)?;

// finally, write the receipts
execution_outcome.write_to_storage(
&provider,
Some(static_file_producer),
OriginalValuesKnown::Yes,
)?;
let mut storage_writer = StorageWriter::new(Some(&provider), Some(static_file_producer));
storage_writer.write_to_storage(execution_outcome, OriginalValuesKnown::Yes)?;
}

provider.commit()?;
Expand Down
9 changes: 4 additions & 5 deletions crates/stages/stages/benches/setup/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use reth_db_api::{
transaction::{DbTx, DbTxMut},
};
use reth_primitives::{Account, Address, SealedBlock, B256, U256};
use reth_provider::TrieWriter;
use reth_stages::{
stages::{AccountHashingStage, StorageHashingStage},
test_utils::{StorageKind, TestStageDB},
Expand Down Expand Up @@ -139,12 +140,10 @@ pub(crate) fn txs_testdata(num_blocks: u64) -> TestStageDB {

let offset = transitions.len() as u64;

let provider_rw = db.factory.provider_rw().unwrap();
db.insert_changesets(transitions, None).unwrap();
db.commit(|tx| {
updates.write_to_database(tx)?;
Ok(())
})
.unwrap();
provider_rw.write_trie_updates(&updates).unwrap();
provider_rw.commit().unwrap();

let (transitions, final_state) = random_changeset_range(
&mut rng,
Expand Down
6 changes: 5 additions & 1 deletion crates/stages/stages/src/stages/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use reth_primitives::{BlockNumber, Header, StaticFileSegment};
use reth_primitives_traits::format_gas_throughput;
use reth_provider::{
providers::{StaticFileProvider, StaticFileProviderRWRefMut, StaticFileWriter},
writer::StorageWriter,
BlockReader, DatabaseProviderRW, HeaderProvider, LatestStateProviderRef, OriginalValuesKnown,
ProviderError, StateWriter, StatsReader, TransactionVariant,
};
Expand Down Expand Up @@ -358,8 +359,11 @@ where
}

let time = Instant::now();

// write output
state.write_to_storage(provider, static_file_producer, OriginalValuesKnown::Yes)?;
let mut writer = StorageWriter::new(Some(provider), static_file_producer);
writer.write_to_storage(state, OriginalValuesKnown::Yes)?;

let db_write_duration = time.elapsed();
debug!(
target: "sync::stages::execution",
Expand Down
16 changes: 10 additions & 6 deletions crates/stages/stages/src/stages/merkle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use reth_db_api::{
};
use reth_primitives::{BlockNumber, GotExpected, SealedHeader, B256};
use reth_provider::{
DatabaseProviderRW, HeaderProvider, ProviderError, StageCheckpointReader,
StageCheckpointWriter, StatsReader,
writer::StorageWriter, DatabaseProviderRW, HeaderProvider, ProviderError,
StageCheckpointReader, StageCheckpointWriter, StatsReader,
};
use reth_stages_api::{
BlockErrorKind, EntitiesCheckpoint, ExecInput, ExecOutput, MerkleCheckpoint, Stage,
Expand Down Expand Up @@ -218,7 +218,8 @@ impl<DB: Database> Stage<DB> for MerkleStage {
})?;
match progress {
StateRootProgress::Progress(state, hashed_entries_walked, updates) => {
updates.write_to_database(tx)?;
let writer = StorageWriter::new(Some(provider), None);
writer.write_trie_updates(&updates)?;

let checkpoint = MerkleCheckpoint::new(
to_block,
Expand All @@ -238,7 +239,8 @@ impl<DB: Database> Stage<DB> for MerkleStage {
})
}
StateRootProgress::Complete(root, hashed_entries_walked, updates) => {
updates.write_to_database(tx)?;
let writer = StorageWriter::new(Some(provider), None);
writer.write_trie_updates(&updates)?;

entities_checkpoint.processed += hashed_entries_walked as u64;

Expand All @@ -253,7 +255,8 @@ impl<DB: Database> Stage<DB> for MerkleStage {
error!(target: "sync::stages::merkle", %e, ?current_block_number, ?to_block, "Incremental state root failed! {INVALID_STATE_ROOT_ERROR_MESSAGE}");
StageError::Fatal(Box::new(e))
})?;
updates.write_to_database(provider.tx_ref())?;
let writer = StorageWriter::new(Some(provider), None);
writer.write_trie_updates(&updates)?;

let total_hashed_entries = (provider.count_entries::<tables::HashedAccounts>()? +
provider.count_entries::<tables::HashedStorages>()?)
Expand Down Expand Up @@ -326,7 +329,8 @@ impl<DB: Database> Stage<DB> for MerkleStage {
validate_state_root(block_root, target.seal_slow(), input.unwind_to)?;

// Validation passed, apply unwind changes to the database.
updates.write_to_database(provider.tx_ref())?;
let writer = StorageWriter::new(Some(provider), None);
writer.write_trie_updates(&updates)?;

// TODO(alexey): update entities checkpoint
} else {
Expand Down
16 changes: 9 additions & 7 deletions crates/storage/db-common/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@ use reth_primitives::{
Account, Address, Bytecode, Receipts, StaticFileSegment, StorageEntry, B256, U256,
};
use reth_provider::{
bundle_state::{BundleStateInit, RevertsInit},
errors::provider::ProviderResult,
providers::{StaticFileProvider, StaticFileWriter},
BlockHashReader, BlockNumReader, ChainSpecProvider, DatabaseProviderRW, ExecutionOutcome,
HashingWriter, HistoryWriter, OriginalValuesKnown, ProviderError, ProviderFactory,
StageCheckpointWriter, StateWriter, StaticFileProviderFactory,
writer::StorageWriter,
BlockHashReader, BlockNumReader, BundleStateInit, ChainSpecProvider, DatabaseProviderRW,
ExecutionOutcome, HashingWriter, HistoryWriter, OriginalValuesKnown, ProviderError,
ProviderFactory, RevertsInit, StageCheckpointWriter, StateWriter, StaticFileProviderFactory,
TrieWriter,
};
use reth_stages_types::{StageCheckpoint, StageId};
use reth_trie::{IntermediateStateRootState, StateRoot as StateRootComputer, StateRootProgress};
Expand Down Expand Up @@ -202,7 +203,8 @@ pub fn insert_state<'a, 'b, DB: Database>(
Vec::new(),
);

execution_outcome.write_to_storage(provider, None, OriginalValuesKnown::Yes)?;
let mut storage_writer = StorageWriter::new(Some(provider), None);
storage_writer.write_to_storage(execution_outcome, OriginalValuesKnown::Yes)?;

trace!(target: "reth::cli", "Inserted state");

Expand Down Expand Up @@ -462,7 +464,7 @@ fn compute_state_root<DB: Database>(provider: &DatabaseProviderRW<DB>) -> eyre::
.root_with_progress()?
{
StateRootProgress::Progress(state, _, updates) => {
let updated_len = updates.write_to_database(tx)?;
let updated_len = provider.write_trie_updates(&updates)?;
total_flushed_updates += updated_len;

trace!(target: "reth::cli",
Expand All @@ -482,7 +484,7 @@ fn compute_state_root<DB: Database>(provider: &DatabaseProviderRW<DB>) -> eyre::
}
}
StateRootProgress::Complete(root, _, updates) => {
let updated_len = updates.write_to_database(tx)?;
let updated_len = provider.write_trie_updates(&updates)?;
total_flushed_updates += updated_len;

trace!(target: "reth::cli",
Expand Down
7 changes: 1 addition & 6 deletions crates/storage/provider/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,4 @@ rand.workspace = true
[features]
optimism = ["reth-primitives/optimism", "reth-execution-types/optimism"]
serde = ["reth-execution-types/serde"]
test-utils = [
"alloy-rlp",
"reth-db/test-utils",
"reth-nippy-jar/test-utils",
"reth-chain-state/test-utils"
]
test-utils = ["alloy-rlp", "reth-db/test-utils", "reth-nippy-jar/test-utils", "reth-trie/test-utils", "reth-chain-state/test-utils", "reth-db/test-utils"]
Loading

0 comments on commit 21335d6

Please sign in to comment.