[go: up one dir, main page]

Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: add static file helpers on StorageWriter #9740

Merged
merged 2 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 22 additions & 32 deletions crates/engine/tree/src/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use reth_chain_state::ExecutedBlock;
use reth_db::Database;
use reth_errors::ProviderResult;
use reth_primitives::{SealedBlock, StaticFileSegment, TransactionSignedNoHash, B256, U256};
use reth_primitives::{SealedBlock, StaticFileSegment, TransactionSignedNoHash, B256};
use reth_provider::{
writer::StorageWriter, BlockExecutionWriter, BlockNumReader, BlockWriter, HistoryWriter,
OriginalValuesKnown, ProviderFactory, StageCheckpointWriter, StateWriter,
Expand Down Expand Up @@ -135,39 +135,31 @@ impl<DB: Database> PersistenceService<DB> {
Ok(())
}

// TODO: perform read for td and start
/// Writes the transactions to static files, to act as a log.
///
/// The [`update_transaction_meta`](Self::update_transaction_meta) method should be called
/// after this, to update the checkpoints for headers and block bodies.
fn log_transactions(
&self,
block: Arc<SealedBlock>,
start_tx_number: u64,
td: U256,
) -> ProviderResult<u64> {
debug!(target: "tree::persistence", ?td, ?start_tx_number, "Logging transactions");
fn log_transactions(&self, block: Arc<SealedBlock>) -> ProviderResult<u64> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find log_ very confusing can we stick to write ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, renamed

debug!(target: "tree::persistence", "Logging transactions");
let provider = self.provider.static_file_provider();
let mut header_writer = provider.get_writer(block.number, StaticFileSegment::Headers)?;
let mut transactions_writer =
provider.get_writer(block.number, StaticFileSegment::Transactions)?;

header_writer.append_header(block.header(), td, &block.hash())?;
let no_hash_transactions =
block.body.clone().into_iter().map(TransactionSignedNoHash::from);

let mut tx_number = start_tx_number;
for tx in no_hash_transactions {
transactions_writer.append_transaction(tx_number, &tx)?;
tx_number += 1;
}

// increment block for transactions
transactions_writer.increment_block(StaticFileSegment::Transactions, block.number)?;
let header_writer = provider.get_writer(block.number, StaticFileSegment::Headers)?;
let provider_rw = self.provider.provider_rw()?;
let mut storage_writer = StorageWriter::new(Some(&provider_rw), Some(header_writer));
storage_writer.append_headers_from_blocks(
block.header().number,
std::iter::once(&(block.header(), block.hash())),
)?;

// finally commit
transactions_writer.commit()?;
header_writer.commit()?;
let transactions_writer =
provider.get_writer(block.number, StaticFileSegment::Transactions)?;
let mut storage_writer = StorageWriter::new(Some(&provider_rw), Some(transactions_writer));
let no_hash_transactions =
block.body.clone().into_iter().map(TransactionSignedNoHash::from).collect();
storage_writer.append_transactions_from_blocks(
block.header().number,
std::iter::once(&no_hash_transactions),
)?;

Ok(block.number)
}
Expand Down Expand Up @@ -288,10 +280,8 @@ where
// we ignore the error because the caller may or may not care about the result
let _ = sender.send(res);
}
PersistenceAction::LogTransactions((block, start_tx_number, td, sender)) => {
let block_num = self
.log_transactions(block, start_tx_number, td)
.expect("todo: handle errors");
PersistenceAction::LogTransactions((block, sender)) => {
let block_num = self.log_transactions(block).expect("todo: handle errors");
self.update_transaction_meta(block_num).expect("todo: handle errors");

// we ignore the error because the caller may or may not care about the result
Expand All @@ -317,7 +307,7 @@ pub enum PersistenceAction {
///
/// This will first append the header and transactions to static files, then update the
/// checkpoints for headers and block bodies in the database.
LogTransactions((Arc<SealedBlock>, u64, U256, oneshot::Sender<()>)),
LogTransactions((Arc<SealedBlock>, oneshot::Sender<()>)),

/// Removes block data above the given block number from the database.
///
Expand Down
101 changes: 100 additions & 1 deletion crates/storage/provider/src/writer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::borrow::Borrow;

use crate::{providers::StaticFileProviderRWRefMut, DatabaseProviderRW};
use itertools::Itertools;
use reth_db::{
Expand All @@ -7,7 +9,9 @@ use reth_db::{
Database,
};
use reth_errors::{ProviderError, ProviderResult};
use reth_primitives::{BlockNumber, StorageEntry};
use reth_primitives::{
BlockNumber, Header, StaticFileSegment, StorageEntry, TransactionSignedNoHash, B256, U256,
};
use reth_storage_api::ReceiptWriter;
use reth_storage_errors::writer::StorageWriterError;
use reth_trie::HashedPostStateSorted;
Expand Down Expand Up @@ -138,11 +142,106 @@ impl<'a, 'b, DB: Database> StorageWriter<'a, 'b, DB> {
Ok(())
}

/// Appends headers to static files, using the
/// [`HeaderTerminalDifficulties`](tables::HeaderTerminalDifficulties) table to determine the
/// total difficulty of the parent block during header insertion.
///
/// NOTE: The static file writer used to construct this [`StorageWriter`] MUST be a writer for
/// the Headers segment.
pub fn append_headers_from_blocks<H, I>(
&mut self,
initial_block_number: BlockNumber,
headers: impl Iterator<Item = I>,
) -> ProviderResult<()>
where
I: Borrow<(H, B256)>,
H: Borrow<Header>,
{
self.ensure_database_writer()?;
self.ensure_static_file_writer()?;
let mut td_cursor =
self.database_writer().tx_ref().cursor_read::<tables::HeaderTerminalDifficulties>()?;

let first_td = if initial_block_number == 0 {
U256::ZERO
} else {
td_cursor
.seek_exact(initial_block_number - 1)?
.map(|(_, td)| td.0)
.ok_or_else(|| ProviderError::TotalDifficultyNotFound(initial_block_number))?
};

for pair in headers {
let (header, hash) = pair.borrow();
let header = header.borrow();
let td = first_td + header.difficulty;
self.static_file_writer().append_header(header, td, hash)?;
}

Ok(())
}

/// Appends transactions to static files, using the
/// [`BlockBodyIndices`](tables::BlockBodyIndices) table to determine the transaction number
/// when appending to static files.
///
/// NOTE: The static file writer used to construct this [`StorageWriter`] MUST be a writer for
/// the Transactions segment.
pub fn append_transactions_from_blocks<T>(
&mut self,
initial_block_number: BlockNumber,
transactions: impl Iterator<Item = T>,
) -> ProviderResult<()>
where
T: Borrow<Vec<TransactionSignedNoHash>>,
{
self.ensure_database_writer()?;
self.ensure_static_file_writer()?;

let mut bodies_cursor =
self.database_writer().tx_ref().cursor_read::<tables::BlockBodyIndices>()?;

let mut last_tx_idx = None;
for (idx, transactions) in transactions.enumerate() {
let block_number = initial_block_number + idx as u64;

let mut first_tx_index =
bodies_cursor.seek_exact(block_number)?.map(|(_, indices)| indices.first_tx_num());

// If there are no indices, that means there have been no transactions
//
// So instead of returning an error, use zero
if block_number == initial_block_number && first_tx_index.is_none() {
first_tx_index = Some(0);
}

// TODO: I guess this error will never be returned
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unclear what this todo means

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

let mut tx_index = first_tx_index
.or(last_tx_idx)
.ok_or_else(|| ProviderError::BlockBodyIndicesNotFound(block_number))?;

for tx in transactions.borrow() {
self.static_file_writer().append_transaction(tx_index, tx)?;
tx_index += 1;
}

self.static_file_writer()
.increment_block(StaticFileSegment::Transactions, block_number)?;

// update index
last_tx_idx = Some(tx_index);
}
Ok(())
}

/// Appends receipts block by block.
///
/// ATTENTION: If called from [`StorageWriter`] without a static file producer, it will always
/// write them to database. Otherwise, it will look into the pruning configuration to decide.
///
/// NOTE: The static file writer used to construct this [`StorageWriter`] MUST be a writer for
/// the Receipts segment.
///
/// # Parameters
/// - `initial_block_number`: The starting block number.
/// - `blocks`: An iterator over blocks, each block having a vector of optional receipts. If
Expand Down
Loading