pub(crate) struct SnapshotStorageRebuilder {
file_receiver: Receiver<PathBuf>,
num_threads: usize,
snapshot_storage_lengths: HashMap<Slot, HashMap<usize, usize>>,
storage_paths: DashMap<Slot, Mutex<Vec<PathBuf>>>,
storage: AccountStorageMap,
next_append_vec_id: Arc<AtomicAppendVecId>,
processed_slot_count: AtomicUsize,
num_collisions: AtomicUsize,
snapshot_from: SnapshotFrom,
}
Expand description
Stores state for rebuilding snapshot storages
Fields§
§file_receiver: Receiver<PathBuf>
Receiver for unpacked snapshot storage files
num_threads: usize
Number of threads to rebuild with
snapshot_storage_lengths: HashMap<Slot, HashMap<usize, usize>>
Snapshot storage lengths - from the snapshot file
storage_paths: DashMap<Slot, Mutex<Vec<PathBuf>>>
Container for storing snapshot file paths
storage: AccountStorageMap
Container for storing rebuilt snapshot storages
next_append_vec_id: Arc<AtomicAppendVecId>
Tracks next append_vec_id
processed_slot_count: AtomicUsize
Tracker for number of processed slots
num_collisions: AtomicUsize
Tracks the number of collisions in AppendVecId
snapshot_from: SnapshotFrom
Rebuild from the snapshot files or archives
Implementations§
source§impl SnapshotStorageRebuilder
impl SnapshotStorageRebuilder
sourcepub(crate) fn rebuild_storage(
file_receiver: Receiver<PathBuf>,
num_threads: usize,
next_append_vec_id: Arc<AtomicAppendVecId>,
snapshot_from: SnapshotFrom
) -> Result<RebuiltSnapshotStorage, SnapshotError>
pub(crate) fn rebuild_storage( file_receiver: Receiver<PathBuf>, num_threads: usize, next_append_vec_id: Arc<AtomicAppendVecId>, snapshot_from: SnapshotFrom ) -> Result<RebuiltSnapshotStorage, SnapshotError>
Synchronously spawns threads to rebuild snapshot storages
sourcefn new(
file_receiver: Receiver<PathBuf>,
num_threads: usize,
next_append_vec_id: Arc<AtomicAppendVecId>,
snapshot_storage_lengths: HashMap<Slot, HashMap<usize, usize>>,
snapshot_from: SnapshotFrom
) -> Self
fn new( file_receiver: Receiver<PathBuf>, num_threads: usize, next_append_vec_id: Arc<AtomicAppendVecId>, snapshot_storage_lengths: HashMap<Slot, HashMap<usize, usize>>, snapshot_from: SnapshotFrom ) -> Self
Create the SnapshotStorageRebuilder for storing state during rebuilding - pre-allocates data for storage paths
sourcefn get_version_and_snapshot_files(
file_receiver: &Receiver<PathBuf>
) -> (PathBuf, PathBuf, Vec<PathBuf>)
fn get_version_and_snapshot_files( file_receiver: &Receiver<PathBuf> ) -> (PathBuf, PathBuf, Vec<PathBuf>)
Waits for snapshot file Due to parallel unpacking, we may receive some append_vec files before the snapshot file This function will push append_vec files into a buffer until we receive the snapshot file
sourcefn process_snapshot_file(
snapshot_version: SnapshotVersion,
snapshot_file_path: PathBuf
) -> Result<HashMap<Slot, HashMap<usize, usize>>, Error>
fn process_snapshot_file( snapshot_version: SnapshotVersion, snapshot_file_path: PathBuf ) -> Result<HashMap<Slot, HashMap<usize, usize>>, Error>
Process the snapshot file to get the size of each snapshot storage file
sourcefn spawn_rebuilder_threads(
file_receiver: Receiver<PathBuf>,
num_threads: usize,
next_append_vec_id: Arc<AtomicAppendVecId>,
snapshot_storage_lengths: HashMap<Slot, HashMap<usize, usize>>,
append_vec_files: Vec<PathBuf>,
snapshot_from: SnapshotFrom
) -> Result<AccountStorageMap, Error>
fn spawn_rebuilder_threads( file_receiver: Receiver<PathBuf>, num_threads: usize, next_append_vec_id: Arc<AtomicAppendVecId>, snapshot_storage_lengths: HashMap<Slot, HashMap<usize, usize>>, append_vec_files: Vec<PathBuf>, snapshot_from: SnapshotFrom ) -> Result<AccountStorageMap, Error>
Spawn threads for processing buffered append_vec_files, and then received files
sourcefn process_buffered_files(
&self,
append_vec_files: Vec<PathBuf>
) -> Result<(), Error>
fn process_buffered_files( &self, append_vec_files: Vec<PathBuf> ) -> Result<(), Error>
Processes buffered append_vec_files
sourcefn spawn_receiver_thread(
thread_pool: &ThreadPool,
exit_sender: Sender<Result<(), Error>>,
rebuilder: Arc<SnapshotStorageRebuilder>
)
fn spawn_receiver_thread( thread_pool: &ThreadPool, exit_sender: Sender<Result<(), Error>>, rebuilder: Arc<SnapshotStorageRebuilder> )
Spawn a single thread to process received append_vec_files
sourcefn process_append_vec_file(&self, path: PathBuf) -> Result<(), Error>
fn process_append_vec_file(&self, path: PathBuf) -> Result<(), Error>
Process an append_vec_file
sourcefn insert_storage_file(&self, slot: &Slot, path: PathBuf) -> usize
fn insert_storage_file(&self, slot: &Slot, path: PathBuf) -> usize
Insert storage path into slot and return the number of storage files for the slot
sourcefn process_complete_slot(&self, slot: Slot) -> Result<(), Error>
fn process_complete_slot(&self, slot: Slot) -> Result<(), Error>
Process a slot that has received all storage entries
sourcefn get_unique_append_vec_id(
next_append_vec_id: &Arc<AtomicAppendVecId>,
parent_folder: &Path,
slot: Slot
) -> AppendVecId
fn get_unique_append_vec_id( next_append_vec_id: &Arc<AtomicAppendVecId>, parent_folder: &Path, slot: Slot ) -> AppendVecId
increment next_append_vec_id
until there is no file in parent_folder
with this id and slot
return the id
sourcefn wait_for_completion(
&self,
exit_receiver: Receiver<Result<(), Error>>
) -> Result<(), Error>
fn wait_for_completion( &self, exit_receiver: Receiver<Result<(), Error>> ) -> Result<(), Error>
Wait for the completion of the rebuilding threads
sourcefn build_thread_pool(&self) -> ThreadPool
fn build_thread_pool(&self) -> ThreadPool
Builds thread pool to rebuild with