1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
use {
    rocksdb::{DBCompressionType as RocksCompressionType, DBRecoveryMode},
    std::path::Path,
};

pub struct BlockstoreOptions {
    // The access type of blockstore. Default: Primary
    pub access_type: AccessType,
    // Whether to open a blockstore under a recovery mode. Default: None.
    pub recovery_mode: Option<BlockstoreRecoveryMode>,
    // Whether to allow unlimited number of open files. Default: true.
    pub enforce_ulimit_nofile: bool,
    pub column_options: LedgerColumnOptions,
}

impl Default for BlockstoreOptions {
    /// The default options are the values used by [`Blockstore::open`].
    ///
    /// [`Blockstore::open`]: crate::blockstore::Blockstore::open
    fn default() -> Self {
        Self {
            access_type: AccessType::Primary,
            recovery_mode: None,
            enforce_ulimit_nofile: true,
            column_options: LedgerColumnOptions::default(),
        }
    }
}

#[derive(Clone, Debug, PartialEq, Eq)]
pub enum AccessType {
    /// Primary (read/write) access; only one process can have Primary access.
    Primary,
    /// Primary (read/write) access with RocksDB automatic compaction disabled.
    PrimaryForMaintenance,
    /// Secondary (read) access; multiple processes can have Secondary access.
    /// Additionally, Secondary access can be obtained while another process
    /// already has Primary access.
    Secondary,
}

#[derive(Debug, Clone)]
pub enum BlockstoreRecoveryMode {
    TolerateCorruptedTailRecords,
    AbsoluteConsistency,
    PointInTime,
    SkipAnyCorruptedRecord,
}

impl From<&str> for BlockstoreRecoveryMode {
    fn from(string: &str) -> Self {
        match string {
            "tolerate_corrupted_tail_records" => {
                BlockstoreRecoveryMode::TolerateCorruptedTailRecords
            }
            "absolute_consistency" => BlockstoreRecoveryMode::AbsoluteConsistency,
            "point_in_time" => BlockstoreRecoveryMode::PointInTime,
            "skip_any_corrupted_record" => BlockstoreRecoveryMode::SkipAnyCorruptedRecord,
            bad_mode => panic!("Invalid recovery mode: {bad_mode}"),
        }
    }
}

impl From<BlockstoreRecoveryMode> for DBRecoveryMode {
    fn from(brm: BlockstoreRecoveryMode) -> Self {
        match brm {
            BlockstoreRecoveryMode::TolerateCorruptedTailRecords => {
                DBRecoveryMode::TolerateCorruptedTailRecords
            }
            BlockstoreRecoveryMode::AbsoluteConsistency => DBRecoveryMode::AbsoluteConsistency,
            BlockstoreRecoveryMode::PointInTime => DBRecoveryMode::PointInTime,
            BlockstoreRecoveryMode::SkipAnyCorruptedRecord => {
                DBRecoveryMode::SkipAnyCorruptedRecord
            }
        }
    }
}

/// Options for LedgerColumn.
/// Each field might also be used as a tag that supports group-by operation when
/// reporting metrics.
#[derive(Debug, Clone)]
pub struct LedgerColumnOptions {
    // Determine how to store both data and coding shreds. Default: RocksLevel.
    pub shred_storage_type: ShredStorageType,

    // Determine the way to compress column families which are eligible for
    // compression.
    pub compression_type: BlockstoreCompressionType,

    // Control how often RocksDB read/write performance samples are collected.
    // If the value is greater than 0, then RocksDB read/write perf sample
    // will be collected once for every `rocks_perf_sample_interval` ops.
    pub rocks_perf_sample_interval: usize,
}

impl Default for LedgerColumnOptions {
    fn default() -> Self {
        Self {
            shred_storage_type: ShredStorageType::RocksLevel,
            compression_type: BlockstoreCompressionType::default(),
            rocks_perf_sample_interval: 0,
        }
    }
}

impl LedgerColumnOptions {
    pub fn get_storage_type_string(&self) -> &'static str {
        match self.shred_storage_type {
            ShredStorageType::RocksLevel => "rocks_level",
            ShredStorageType::RocksFifo(_) => "rocks_fifo",
        }
    }

    pub fn get_compression_type_string(&self) -> &'static str {
        match self.compression_type {
            BlockstoreCompressionType::None => "None",
            BlockstoreCompressionType::Snappy => "Snappy",
            BlockstoreCompressionType::Lz4 => "Lz4",
            BlockstoreCompressionType::Zlib => "Zlib",
        }
    }
}

#[derive(Debug, Clone)]
pub enum ShredStorageType {
    // Stores shreds under RocksDB's default compaction (level).
    RocksLevel,
    // (Experimental) Stores shreds under RocksDB's FIFO compaction which
    // allows ledger store to reclaim storage more efficiently with
    // lower I/O overhead.
    RocksFifo(BlockstoreRocksFifoOptions),
}

impl Default for ShredStorageType {
    fn default() -> Self {
        Self::RocksLevel
    }
}

pub const BLOCKSTORE_DIRECTORY_ROCKS_LEVEL: &str = "rocksdb";
pub const BLOCKSTORE_DIRECTORY_ROCKS_FIFO: &str = "rocksdb_fifo";

impl ShredStorageType {
    /// Returns a ShredStorageType::RocksFifo, see BlockstoreRocksFifoOptions
    /// for more details on how `max_shred_storage_size` is interpreted.
    pub fn rocks_fifo(max_shred_storage_size: Option<u64>) -> ShredStorageType {
        ShredStorageType::RocksFifo(BlockstoreRocksFifoOptions::new(max_shred_storage_size))
    }

    /// The directory under `ledger_path` to the underlying blockstore.
    pub fn blockstore_directory(&self) -> &str {
        match self {
            ShredStorageType::RocksLevel => BLOCKSTORE_DIRECTORY_ROCKS_LEVEL,
            ShredStorageType::RocksFifo(_) => BLOCKSTORE_DIRECTORY_ROCKS_FIFO,
        }
    }

    /// Returns the ShredStorageType that is used under the specified
    /// ledger_path.
    ///
    /// None will be returned if the ShredStorageType cannot be inferred.
    pub fn from_ledger_path(
        ledger_path: &Path,
        max_fifo_shred_storage_size: Option<u64>,
    ) -> Option<ShredStorageType> {
        let mut result: Option<ShredStorageType> = None;

        if Path::new(ledger_path)
            .join(BLOCKSTORE_DIRECTORY_ROCKS_LEVEL)
            .exists()
        {
            result = Some(ShredStorageType::RocksLevel);
        }

        if Path::new(ledger_path)
            .join(BLOCKSTORE_DIRECTORY_ROCKS_FIFO)
            .exists()
        {
            if result.is_none() {
                result = Some(ShredStorageType::RocksFifo(
                    BlockstoreRocksFifoOptions::new(max_fifo_shred_storage_size),
                ));
            } else {
                result = None;
            }
        }
        result
    }
}

#[derive(Debug, Clone)]
pub struct BlockstoreRocksFifoOptions {
    // The maximum storage size for storing data shreds in column family
    // [`cf::DataShred`].  Typically, data shreds contribute around 25% of the
    // ledger store storage size if the RPC service is enabled, or 50% if RPC
    // service is not enabled.
    //
    // Note that this number must be greater than FIFO_WRITE_BUFFER_SIZE
    // otherwise we won't be able to write any file.  If not, the blockstore
    // will panic.
    pub shred_data_cf_size: u64,
    // The maximum storage size for storing coding shreds in column family
    // [`cf::CodeShred`].  Typically, coding shreds contribute around 20% of the
    // ledger store storage size if the RPC service is enabled, or 40% if RPC
    // service is not enabled.
    //
    // Note that this number must be greater than FIFO_WRITE_BUFFER_SIZE
    // otherwise we won't be able to write any file.  If not, the blockstore
    // will panic.
    pub shred_code_cf_size: u64,
}

pub const MAX_ROCKS_FIFO_SHRED_STORAGE_SIZE_BYTES: u64 = std::u64::MAX;

impl BlockstoreRocksFifoOptions {
    /// Returns a BlockstoreRocksFifoOptions where the specified
    /// `max_shred_storage_size` is equally split between shred_data_cf_size
    /// and shred_code_cf_size. A `None` value for `max_shred_storage_size`
    /// will (functionally) allow unbounded growth in these two columns. Once
    /// a column's total size exceeds the configured value, the oldest file(s)
    /// will be purged to get back within the limit.
    fn new(max_shred_storage_size: Option<u64>) -> Self {
        match max_shred_storage_size {
            Some(size) => Self {
                shred_data_cf_size: size / 2,
                shred_code_cf_size: size / 2,
            },
            None => Self {
                shred_data_cf_size: MAX_ROCKS_FIFO_SHRED_STORAGE_SIZE_BYTES,
                shred_code_cf_size: MAX_ROCKS_FIFO_SHRED_STORAGE_SIZE_BYTES,
            },
        }
    }

    pub fn new_for_tests() -> Self {
        Self {
            shred_data_cf_size: 150_000_000_000,
            shred_code_cf_size: 150_000_000_000,
        }
    }
}

#[derive(Debug, Clone)]
pub enum BlockstoreCompressionType {
    None,
    Snappy,
    Lz4,
    Zlib,
}

impl Default for BlockstoreCompressionType {
    fn default() -> Self {
        Self::None
    }
}

impl BlockstoreCompressionType {
    pub(crate) fn to_rocksdb_compression_type(&self) -> RocksCompressionType {
        match self {
            Self::None => RocksCompressionType::None,
            Self::Snappy => RocksCompressionType::Snappy,
            Self::Lz4 => RocksCompressionType::Lz4,
            Self::Zlib => RocksCompressionType::Zlib,
        }
    }
}

#[test]
fn test_rocksdb_directory() {
    assert_eq!(
        ShredStorageType::RocksLevel.blockstore_directory(),
        BLOCKSTORE_DIRECTORY_ROCKS_LEVEL
    );
    assert_eq!(
        ShredStorageType::RocksFifo(BlockstoreRocksFifoOptions {
            shred_code_cf_size: 0,
            shred_data_cf_size: 0
        })
        .blockstore_directory(),
        BLOCKSTORE_DIRECTORY_ROCKS_FIFO
    );
}