1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
//! Keep track of how many banks have been created and how many have been frozen or dropped.
//! This is useful to track foreground progress to understand expected access to accounts db.
use {
    crate::waitable_condvar::WaitableCondvar,
    solana_sdk::timing::AtomicInterval,
    std::sync::{
        atomic::{AtomicU32, Ordering},
        Arc,
    },
};

#[derive(Debug, Default)]
/// Keeps track of when all banks that were started as of a known point in time have been frozen or otherwise destroyed.
/// When 'bank_freeze_or_destruction_count' exceeds a prior value of 'bank_creation_count',
/// this means that we can know all banks that began loading accounts have completed as of the prior value of 'bank_creation_count'.
pub(crate) struct BankCreationFreezingProgress {
    /// Incremented each time a bank is created.
    /// Starting now, this bank could be finding accounts in the index and loading them from accounts db.
    bank_creation_count: AtomicU32,
    /// Incremented each time a bank is frozen or destroyed.
    /// At this point, this bank has completed all account loading.
    bank_freeze_or_destruction_count: AtomicU32,

    /// enable waiting for bank_freeze_or_destruction_count to increment
    bank_frozen_or_destroyed: Arc<WaitableCondvar>,

    last_report: AtomicInterval,
}

impl BankCreationFreezingProgress {
    pub(crate) fn increment_bank_frozen_or_destroyed(&self) {
        self.bank_freeze_or_destruction_count
            .fetch_add(1, Ordering::Release);
        self.bank_frozen_or_destroyed.notify_all();
    }

    pub(crate) fn get_bank_frozen_or_destroyed_count(&self) -> u32 {
        self.bank_freeze_or_destruction_count
            .load(Ordering::Acquire)
    }

    pub(crate) fn increment_bank_creation_count(&self) {
        self.bank_creation_count.fetch_add(1, Ordering::Release);
    }

    pub(crate) fn get_bank_creation_count(&self) -> u32 {
        self.bank_creation_count.load(Ordering::Acquire)
    }

    pub(crate) fn report(&self) {
        if self.last_report.should_update(60_000) {
            datapoint_info!(
                "bank_progress",
                (
                    "difference",
                    self.get_bank_creation_count()
                        .wrapping_sub(self.get_bank_frozen_or_destroyed_count()),
                    i64
                )
            );
        }
    }
}

#[cfg(test)]
pub mod tests {
    use {super::*, solana_sdk::timing::timestamp, std::thread::Builder};

    #[test]
    fn test_count() {
        solana_logger::setup();
        let progress = BankCreationFreezingProgress::default();
        assert_eq!(progress.get_bank_creation_count(), 0);
        assert_eq!(progress.get_bank_frozen_or_destroyed_count(), 0);
        progress.increment_bank_creation_count();
        assert_eq!(progress.get_bank_creation_count(), 1);
        assert_eq!(progress.get_bank_frozen_or_destroyed_count(), 0);
        progress.increment_bank_frozen_or_destroyed();
        assert_eq!(progress.get_bank_creation_count(), 1);
        assert_eq!(progress.get_bank_frozen_or_destroyed_count(), 1);
    }

    #[test]
    fn test_wait() {
        solana_logger::setup();
        let progress = BankCreationFreezingProgress::default();
        let waiter = progress.bank_frozen_or_destroyed.clone();
        let duration = std::time::Duration::default();
        assert!(waiter.wait_timeout(duration));
        let tester = Arc::new(AtomicU32::default());
        let tester2 = tester.clone();

        let thread = Builder::new()
            .name("test_wait".to_string())
            .spawn(move || {
                assert!(!waiter.wait_timeout(std::time::Duration::from_secs(5)));
                tester2.store(1, Ordering::Release);
            })
            .unwrap();
        let start = timestamp();
        let mut i = 0;
        while tester.load(Ordering::Acquire) == 0 {
            // keep incrementing until the waiter thread has picked up the notification that we incremented
            progress.increment_bank_frozen_or_destroyed();
            i += 1;
            assert_eq!(progress.get_bank_frozen_or_destroyed_count(), i);
            let now = timestamp();
            let elapsed = now.wrapping_sub(start);
            assert!(elapsed < 5_000, "elapsed: {elapsed}");
        }
        thread.join().expect("failed");
    }
}