lexe/
payments_db.rs

1//! Local payments db and payment sync
2
3/// NOTE: Be careful about `pub`! This module is part of the stable API.
4///
5/// ### [`PaymentsDb`]
6///
7/// The app's [`PaymentsDb`] maintains a local copy of all [`BasicPaymentV2`]s,
8/// synced from the user node. The user nodes are the source-of-truth for
9/// payment state; consequently, this payment db is effectively a projection of
10/// the user node's payment state.
11///
12/// Currently the [`BasicPaymentV2`]s in the [`PaymentsDb`] are just dumped into
13/// a subdirectory of the app's data directory as unencrypted json blobs. On
14/// startup, we just load all on-disk [`BasicPaymentV2`]s into memory.
15///
16/// In the future, this could be a SQLite DB or something.
17///
18/// ### Payment Syncing
19///
20/// Syncing payments is done by tailing payments from the user node by
21/// [`PaymentUpdatedIndex`]. We simply keep track of the latest updated at index
22/// in our DB, and fetch batches of updated payments, merging them into our DB,
23/// until no payments are returned.
24///
25/// ## Design goals
26///
27/// We want efficient random access for pending payments, which are always
28/// displayed, and efficient fetching for the last N finalized payments, which
29/// are the first payments displayed in the finalized payments list.
30///
31/// Cursors don't work well with flutter's lazy lists, since they want random
32/// access from scroll index -> content.
33///
34/// Performance when syncing payments is secondary, since that's done
35/// asynchronously and off the main UI thread, which is highly latency
36/// sensitive.
37///
38/// In the future, we could be even more clever and serialize+store the pending
39/// index on-disk. If we also added a finalized index, we could avoid the need
40/// to load all the payments on startup and could instead lazy load them.
41///
42/// [`BasicPaymentV2`]: lexe_api::types::payments::BasicPaymentV2
43/// [`PaymentsDb`]: crate::payments_db::PaymentsDb
44/// [`PaymentUpdatedIndex`]: lexe_api::types::payments::PaymentUpdatedIndex
45mod docs {}
46
47use std::{
48    cmp,
49    collections::{BTreeMap, BTreeSet},
50    io,
51    str::FromStr,
52    sync::RwLock,
53};
54
55use anyhow::Context;
56#[cfg(doc)]
57use lexe_api::types::payments::VecDbPaymentV2;
58use lexe_api::{
59    def::AppNodeRunApi,
60    error::NodeApiError,
61    models::command::{GetUpdatedPayments, UpdatePaymentNote},
62    types::payments::{
63        BasicPaymentV2, PaymentCreatedIndex, PaymentUpdatedIndex,
64        VecBasicPaymentV2,
65    },
66};
67use node_client::client::NodeClient;
68use tracing::warn;
69
70use crate::unstable::ffs::Ffs;
71
72/// The app's local [`BasicPaymentV2`] database, synced from the user node.
73pub struct PaymentsDb<F> {
74    ffs: F,
75    state: RwLock<PaymentsDbState>,
76}
77
78/// Pure in-memory state of the [`PaymentsDb`].
79#[derive(Debug, PartialEq)]
80struct PaymentsDbState {
81    /// All locally synced payments,
82    /// from oldest to newest (reverse of the UI scroll order).
83    payments: BTreeMap<PaymentCreatedIndex, BasicPaymentV2>,
84
85    /// An index of currently pending payments, sorted by `created_at` index.
86    //
87    // For now, we only have a `pending` index, because it is sparse - there's
88    // no point in indexing `finalized` as most payments are finalized. If we
89    // later want filters for "offers only" or similar, we can add more.
90    pending: BTreeSet<PaymentCreatedIndex>,
91
92    /// The latest `updated_at` index of any payment in the db.
93    ///
94    /// Invariant:
95    ///
96    /// ```ignore
97    /// latest_updated_index == payments.iter()
98    ///     .map(|p| p.updated_index())
99    ///     .max()
100    /// ```
101    latest_updated_index: Option<PaymentUpdatedIndex>,
102}
103
104/// Summary of changes from a payment sync operation.
105#[derive(Debug)]
106pub struct PaymentSyncSummary {
107    /// Number of new payments added to the local DB.
108    pub num_new: usize,
109    /// Number of existing payments that were updated.
110    pub num_updated: usize,
111}
112
113impl PaymentSyncSummary {
114    /// Did any payments in the DB change in this sync?
115    /// (i.e., do we need to update part of the UI?)
116    pub fn any_changes(&self) -> bool {
117        self.num_new > 0 || self.num_updated > 0
118    }
119}
120
121/// Sync the app's local payment state from the user node.
122///
123/// We tail updated payments from the user node, merging results into our DB
124/// until there are no more updates left to sync.
125//
126// Unstable: Takes Ffs as a param which is still evolving
127#[allow(private_bounds)]
128pub(crate) async fn sync_payments<F: Ffs>(
129    db: &PaymentsDb<F>,
130    node_client: &impl AppNodeRunSyncApi,
131    batch_size: u16,
132) -> anyhow::Result<PaymentSyncSummary> {
133    assert!(batch_size > 0);
134
135    let mut start_index = db.state.read().unwrap().latest_updated_index;
136
137    let mut summary = PaymentSyncSummary {
138        num_new: 0,
139        num_updated: 0,
140    };
141
142    loop {
143        // In every loop iteration, we fetch one batch of updated payments.
144
145        let req = GetUpdatedPayments {
146            // Remember, this start index is _exclusive_.
147            // The payment w/ this index will _NOT_ be included in the response.
148            start_index,
149            limit: Some(batch_size),
150        };
151
152        let updated_payments = node_client
153            .get_updated_payments(req)
154            .await
155            .context("Failed to fetch updated payments")?
156            .payments;
157        let updated_payments_len = updated_payments.len();
158
159        // Update the db: persist, then update in-memory state.
160        let (new, updated, latest_updated_index) = db
161            .upsert_payments(updated_payments)
162            .context("Failed to upsert payments")?;
163        summary.num_new += new;
164        summary.num_updated += updated;
165
166        // Update the `start_index` we'll use for the next batch.
167        start_index = latest_updated_index;
168
169        // If the node returned fewer payments than our requested batch size,
170        // then we are done (there are no more new payments after this batch).
171        if updated_payments_len < usize::from(batch_size) {
172            break;
173        }
174    }
175
176    Ok(summary)
177}
178
179/// The specific `AppNodeRunApi` method that we need to sync payments.
180///
181/// This lets us mock out the method in the tests below,
182/// without also mocking out the entire `AppNodeRunApi` trait.
183trait AppNodeRunSyncApi {
184    /// GET /node/v1/payments/updated [`GetUpdatedPayments`]
185    ///                            -> [`VecDbPaymentV2`]
186    async fn get_updated_payments(
187        &self,
188        req: GetUpdatedPayments,
189    ) -> Result<VecBasicPaymentV2, NodeApiError>;
190}
191
192impl AppNodeRunSyncApi for NodeClient {
193    async fn get_updated_payments(
194        &self,
195        req: GetUpdatedPayments,
196    ) -> Result<VecBasicPaymentV2, NodeApiError> {
197        AppNodeRunApi::get_updated_payments(self, req).await
198    }
199}
200
201impl<F: Ffs> PaymentsDb<F> {
202    // --- Constructors (unstable b/c Ffs is still in flux) --- //
203
204    /// Read all the payments on-disk into a new `PaymentsDb`.
205    pub(crate) fn read(ffs: F) -> anyhow::Result<Self> {
206        let state = PaymentsDbState::read(&ffs)
207            .map(RwLock::new)
208            .context("Failed to read on-disk PaymentsDb state")?;
209
210        Ok(Self { ffs, state })
211    }
212
213    /// Create a new empty `PaymentsDb`. Does not touch disk/storage.
214    pub(crate) fn empty(ffs: F) -> Self {
215        let state = RwLock::new(PaymentsDbState::empty());
216        Self { ffs, state }
217    }
218
219    // --- Public API  --- //
220
221    /// Clear the in-memory state and delete the on-disk payment db.
222    pub fn delete(&self) -> io::Result<()> {
223        *self.state.write().unwrap() = PaymentsDbState::empty();
224        self.ffs.delete_all()
225    }
226
227    /// Total number of payments in the database.
228    pub fn num_payments(&self) -> usize {
229        self.state.read().unwrap().num_payments()
230    }
231
232    /// Number of pending (unfinalized) payments.
233    pub fn num_pending(&self) -> usize {
234        self.state.read().unwrap().num_pending()
235    }
236
237    /// Number of finalized payments.
238    pub fn num_finalized(&self) -> usize {
239        self.state.read().unwrap().num_finalized()
240    }
241
242    /// Number of pending payments that are not junk.
243    pub fn num_pending_not_junk(&self) -> usize {
244        self.state.read().unwrap().num_pending_not_junk()
245    }
246
247    /// Number of finalized payments that are not junk.
248    pub fn num_finalized_not_junk(&self) -> usize {
249        self.state.read().unwrap().num_finalized_not_junk()
250    }
251
252    /// The latest `updated_at` index of any payment in the db.
253    pub fn latest_updated_index(&self) -> Option<PaymentUpdatedIndex> {
254        self.state.read().unwrap().latest_updated_index()
255    }
256
257    /// Get a payment by its `PaymentCreatedIndex`.
258    pub fn get_payment_by_created_index(
259        &self,
260        created_index: &PaymentCreatedIndex,
261    ) -> Option<BasicPaymentV2> {
262        self.state
263            .read()
264            .unwrap()
265            .get_payment_by_created_index(created_index)
266            .cloned()
267    }
268
269    /// Get a payment by scroll index in UI order (newest to oldest).
270    pub fn get_payment_by_scroll_idx(
271        &self,
272        scroll_idx: usize,
273    ) -> Option<BasicPaymentV2> {
274        self.state
275            .read()
276            .unwrap()
277            .get_payment_by_scroll_idx(scroll_idx)
278            .cloned()
279    }
280
281    /// Get a pending payment by scroll index in UI order (newest to oldest).
282    pub fn get_pending_payment_by_scroll_idx(
283        &self,
284        scroll_idx: usize,
285    ) -> Option<BasicPaymentV2> {
286        self.state
287            .read()
288            .unwrap()
289            .get_pending_payment_by_scroll_idx(scroll_idx)
290            .cloned()
291    }
292
293    /// Get a "pending and not junk" payment by scroll index in UI order.
294    pub fn get_pending_not_junk_payment_by_scroll_idx(
295        &self,
296        scroll_idx: usize,
297    ) -> Option<BasicPaymentV2> {
298        self.state
299            .read()
300            .unwrap()
301            .get_pending_not_junk_payment_by_scroll_idx(scroll_idx)
302            .cloned()
303    }
304
305    /// Get a finalized payment by scroll index in UI order (newest to oldest).
306    pub fn get_finalized_payment_by_scroll_idx(
307        &self,
308        scroll_idx: usize,
309    ) -> Option<BasicPaymentV2> {
310        self.state
311            .read()
312            .unwrap()
313            .get_finalized_payment_by_scroll_idx(scroll_idx)
314            .cloned()
315    }
316
317    /// Get a "finalized and not junk" payment by scroll index in UI order.
318    pub fn get_finalized_not_junk_payment_by_scroll_idx(
319        &self,
320        scroll_idx: usize,
321    ) -> Option<BasicPaymentV2> {
322        self.state
323            .read()
324            .unwrap()
325            .get_finalized_not_junk_payment_by_scroll_idx(scroll_idx)
326            .cloned()
327    }
328
329    // --- Private helpers --- //
330
331    /// Upsert a batch of payments synced from the user node.
332    ///
333    /// Returns `(num_new, num_updated, latest_updated_index)`.
334    /// May return `(0, 0, _)` if nothing was inserted or updated.
335    fn upsert_payments(
336        &self,
337        payments: impl IntoIterator<Item = BasicPaymentV2>,
338    ) -> io::Result<(usize, usize, Option<PaymentUpdatedIndex>)> {
339        let mut state = self.state.write().unwrap();
340
341        let mut num_new = 0;
342        let mut num_updated = 0;
343        for payment in payments {
344            let (new, updated) =
345                Self::upsert_payment(&self.ffs, &mut state, payment)?;
346            num_new += new;
347            num_updated += updated;
348        }
349
350        Ok((num_new, num_updated, state.latest_updated_index))
351    }
352
353    /// Upserts a payment into the db.
354    ///
355    /// Persists the payment and updates in-memory state, including indices.
356    ///
357    /// Returns the number of new and updated payments, respectively.
358    /// May return `(0, 0)` if nothing was inserted or updated.
359    fn upsert_payment(
360        ffs: &F,
361        state: &mut PaymentsDbState,
362        payment: BasicPaymentV2,
363    ) -> io::Result<(usize, usize)> {
364        let created_index = payment.created_index();
365
366        let maybe_existing = state.payments.get(&created_index);
367        let already_existed = maybe_existing.is_some();
368
369        // Skip if payment exists and there is no change to it.
370        if let Some(existing) = maybe_existing
371            && payment == *existing
372        {
373            return Ok((0, 0));
374        }
375
376        // Persist the updated payment to storage. Since this is fallible, we
377        // do this first, otherwise we may corrupt our in-memory state.
378        Self::write_payment(ffs, &payment)?;
379
380        // --- 'Commit' by updating our in-memory state --- //
381
382        // Update indices first to avoid a clone
383        if payment.is_pending() {
384            state.pending.insert(created_index);
385        } else {
386            state.pending.remove(&created_index);
387        }
388        // It is always true that `None < Some(_)`
389        state.latest_updated_index =
390            cmp::max(state.latest_updated_index, Some(payment.updated_index()));
391
392        // Update main payments map
393        state.payments.insert(created_index, payment);
394
395        if already_existed {
396            Ok((0, 1))
397        } else {
398            Ok((1, 0))
399        }
400    }
401
402    /// Write a payment to on-disk storage as JSON bytes. The caller is
403    /// responsible for updating the in-memory [`PaymentsDb`] state and indices.
404    // Making this an associated fn avoids some borrow checker issues.
405    fn write_payment(ffs: &F, payment: &BasicPaymentV2) -> io::Result<()> {
406        let filename = payment.created_index().to_string();
407        let data =
408            serde_json::to_vec(&payment).expect("Failed to serialize payment");
409        ffs.write(&filename, &data)
410    }
411
412    /// Update the note on an existing payment in this [`PaymentsDb`].
413    /// This does NOT actually update the note on the user node, hence why this
414    /// is not a public API.
415    pub(crate) fn update_payment_note(
416        &self,
417        req: UpdatePaymentNote,
418    ) -> anyhow::Result<()> {
419        let mut state = self.state.write().unwrap();
420
421        let payment = state
422            .get_mut_payment_by_created_index(&req.index)
423            .context("Updating non-existent payment")?;
424
425        payment.note = req.note;
426
427        Self::write_payment(&self.ffs, payment)
428            .context("Failed to write payment to local db")?;
429
430        Ok(())
431    }
432
433    /// Check the integrity of the whole PaymentsDb.
434    ///
435    /// (1.) The in-memory state should not be corrupted.
436    /// (2.) The current on-disk state should match the in-memory state.
437    #[cfg(test)]
438    fn debug_assert_invariants(&self) {
439        if cfg!(not(debug_assertions)) {
440            return;
441        }
442
443        let state = self.state.read().unwrap();
444
445        // (1.)
446        state.debug_assert_invariants();
447
448        // (2.)
449        let on_disk_state = PaymentsDbState::read(&self.ffs)
450            .expect("Failed to re-read on-disk state");
451        assert_eq!(on_disk_state, *state);
452    }
453}
454
455impl PaymentsDbState {
456    /// Create a new empty [`PaymentsDbState`]. Does not touch disk/storage.
457    fn empty() -> Self {
458        Self {
459            payments: BTreeMap::new(),
460            pending: BTreeSet::new(),
461            latest_updated_index: None,
462        }
463    }
464
465    /// Read the DB state from disk.
466    fn read(ffs: &impl Ffs) -> anyhow::Result<Self> {
467        let mut buf = Vec::<u8>::new();
468        let mut payments = Vec::<BasicPaymentV2>::new();
469
470        ffs.read_dir_visitor(|filename| {
471            // Parse created_at index from filename; skip unrecognized files.
472            let created_index = match PaymentCreatedIndex::from_str(filename) {
473                Ok(idx) => idx,
474                Err(e) => {
475                    warn!(
476                        %filename,
477                        "Error: unrecognized filename in payments dir: {e:#}"
478                    );
479                    return Ok(());
480                }
481            };
482
483            // Read payment into buffer
484            buf.clear();
485            ffs.read_into(filename, &mut buf)?;
486
487            // Deserialize payment
488            let payment = serde_json::from_slice::<BasicPaymentV2>(&buf)
489                .with_context(|| filename.to_owned())
490                .context("Failed to deserialize payment file")
491                .map_err(io_error_invalid_data)?;
492
493            // Sanity check: Index in filename should match index in payment.
494            let payment_created_index = payment.created_index();
495            if created_index != payment_created_index {
496                return Err(io_error_invalid_data(format!(
497                    "Payment DB corruption: filename index '{filename}'
498                     different from index in contents '{payment_created_index}'"
499                )));
500            }
501
502            // Collect the payment
503            payments.push(payment);
504
505            Ok(())
506        })
507        .context("Failed to read payments db, possibly corrupted?")?;
508
509        Ok(Self::from_vec(payments))
510    }
511
512    fn from_vec(payments: Vec<BasicPaymentV2>) -> Self {
513        let payments = payments
514            .into_iter()
515            .map(|p| (p.created_index(), p))
516            .collect();
517
518        let pending = build_index::pending(&payments);
519        let latest_updated_index = build_index::latest_updated_index(&payments);
520
521        Self {
522            payments,
523            pending,
524            latest_updated_index,
525        }
526    }
527
528    fn num_payments(&self) -> usize {
529        self.payments.len()
530    }
531
532    fn num_pending(&self) -> usize {
533        self.pending.len()
534    }
535
536    fn num_finalized(&self) -> usize {
537        self.payments.len() - self.pending.len()
538    }
539
540    fn num_pending_not_junk(&self) -> usize {
541        self.pending
542            .iter()
543            .filter_map(|created_idx| self.payments.get(created_idx))
544            .filter(|p| p.is_pending_not_junk())
545            .count()
546    }
547
548    // TODO(max): If needed, we can add an index for this.
549    fn num_finalized_not_junk(&self) -> usize {
550        self.payments
551            .values()
552            .filter(|p| p.is_finalized_not_junk())
553            .count()
554    }
555
556    fn latest_updated_index(&self) -> Option<PaymentUpdatedIndex> {
557        self.latest_updated_index
558    }
559
560    fn get_payment_by_created_index(
561        &self,
562        created_index: &PaymentCreatedIndex,
563    ) -> Option<&BasicPaymentV2> {
564        self.payments.get(created_index)
565    }
566
567    /// Get a mutable payment by its `PaymentCreatedIndex`.
568    fn get_mut_payment_by_created_index(
569        &mut self,
570        created_index: &PaymentCreatedIndex,
571    ) -> Option<&mut BasicPaymentV2> {
572        self.payments.get_mut(created_index)
573    }
574
575    fn get_payment_by_scroll_idx(
576        &self,
577        scroll_idx: usize,
578    ) -> Option<&BasicPaymentV2> {
579        self.payments.values().nth_back(scroll_idx)
580    }
581
582    fn get_pending_payment_by_scroll_idx(
583        &self,
584        scroll_idx: usize,
585    ) -> Option<&BasicPaymentV2> {
586        self.pending
587            .iter()
588            .nth_back(scroll_idx)
589            .and_then(|created_idx| self.payments.get(created_idx))
590    }
591
592    fn get_pending_not_junk_payment_by_scroll_idx(
593        &self,
594        scroll_idx: usize,
595    ) -> Option<&BasicPaymentV2> {
596        self.pending
597            .iter()
598            .rev()
599            .filter_map(|created_idx| self.payments.get(created_idx))
600            .filter(|p| p.is_pending_not_junk())
601            .nth_back(scroll_idx)
602    }
603
604    fn get_finalized_payment_by_scroll_idx(
605        &self,
606        scroll_idx: usize,
607    ) -> Option<&BasicPaymentV2> {
608        self.payments
609            .values()
610            .filter(|p| p.is_finalized())
611            .nth_back(scroll_idx)
612    }
613
614    fn get_finalized_not_junk_payment_by_scroll_idx(
615        &self,
616        scroll_idx: usize,
617    ) -> Option<&BasicPaymentV2> {
618        self.payments
619            .values()
620            .filter(|p| p.is_finalized_not_junk())
621            .nth_back(scroll_idx)
622    }
623
624    #[cfg(test)]
625    fn is_empty(&self) -> bool {
626        self.payments.is_empty()
627    }
628
629    /// Check the integrity of the in-memory state.
630    #[cfg(test)]
631    fn debug_assert_invariants(&self) {
632        if cfg!(not(debug_assertions)) {
633            return;
634        }
635
636        // --- `payments` invariants: --- //
637
638        // Each payment is stored under its own created_at index
639        for (idx, payment) in &self.payments {
640            assert_eq!(*idx, payment.created_index());
641        }
642
643        // --- `pending` index invariants: --- //
644
645        // Rebuilding the index recreates the same index exactly
646        let rebuilt_pending_index = build_index::pending(&self.payments);
647        assert_eq!(rebuilt_pending_index, self.pending);
648
649        // All pending payments are in the index
650        // (in case there is a bug in `index::build_pending`)
651        self.payments
652            .values()
653            .filter(|p| p.is_pending())
654            .all(|p| self.pending.contains(&p.created_index()));
655
656        // --- `latest_updated_index` invariant: --- //
657
658        let recomputed_latest_updated_index =
659            build_index::latest_updated_index(&self.payments);
660        assert_eq!(recomputed_latest_updated_index, self.latest_updated_index);
661    }
662}
663
664mod build_index {
665    use super::*;
666
667    /// Build the `pending` index from the given payments.
668    pub(super) fn pending(
669        payments: &BTreeMap<PaymentCreatedIndex, BasicPaymentV2>,
670    ) -> BTreeSet<PaymentCreatedIndex> {
671        payments
672            .iter()
673            .filter(|(_, p)| p.is_pending())
674            .map(|(idx, _)| *idx)
675            .collect()
676    }
677
678    /// Find the latest [`PaymentUpdatedIndex`] from the given payments.
679    pub(super) fn latest_updated_index(
680        payments: &BTreeMap<PaymentCreatedIndex, BasicPaymentV2>,
681    ) -> Option<PaymentUpdatedIndex> {
682        payments.values().map(BasicPaymentV2::updated_index).max()
683    }
684}
685
686/// Construct an [`io::Error`] of kind `InvalidData` from the given error.
687fn io_error_invalid_data(
688    error: impl Into<Box<dyn std::error::Error + Send + Sync>>,
689) -> io::Error {
690    io::Error::new(io::ErrorKind::InvalidData, error)
691}
692
693#[cfg(test)]
694mod test_utils {
695    use proptest::{
696        prelude::{Strategy, any},
697        sample::SizeRange,
698    };
699
700    use super::*;
701
702    pub(super) struct MockNode {
703        pub payments: BTreeMap<PaymentUpdatedIndex, BasicPaymentV2>,
704    }
705
706    impl MockNode {
707        /// Construct from a map of updated_at index -> payment.
708        pub(super) fn new(
709            payments: BTreeMap<PaymentUpdatedIndex, BasicPaymentV2>,
710        ) -> Self {
711            Self { payments }
712        }
713
714        /// Construct from a map of created_at index -> payment.
715        pub(super) fn from_payments(
716            payments: BTreeMap<PaymentCreatedIndex, BasicPaymentV2>,
717        ) -> Self {
718            let payments = payments
719                .into_values()
720                .map(|p| (p.updated_index(), p))
721                .collect();
722            Self { payments }
723        }
724    }
725
726    impl AppNodeRunSyncApi for MockNode {
727        /// GET /node/v1/payments/updated [`GetUpdatedPayments`]
728        ///                            -> [`VecDbPaymentV2`]
729        async fn get_updated_payments(
730            &self,
731            req: GetUpdatedPayments,
732        ) -> Result<VecBasicPaymentV2, NodeApiError> {
733            let limit = req.limit.unwrap_or(u16::MAX);
734
735            let payments = match req.start_index {
736                Some(start_index) => self
737                    .payments
738                    .iter()
739                    .filter(|(idx, _)| &start_index < *idx)
740                    .take(limit as usize)
741                    .map(|(_idx, payment)| payment.clone())
742                    .collect(),
743                None => self
744                    .payments
745                    .iter()
746                    .take(limit as usize)
747                    .map(|(_idx, payment)| payment.clone())
748                    .collect(),
749            };
750
751            Ok(VecBasicPaymentV2 { payments })
752        }
753    }
754
755    pub(super) fn any_payments(
756        approx_size: impl Into<SizeRange>,
757    ) -> impl Strategy<Value = BTreeMap<PaymentCreatedIndex, BasicPaymentV2>>
758    {
759        proptest::collection::vec(any::<BasicPaymentV2>(), approx_size)
760            .prop_map(|payments| {
761                payments
762                    .into_iter()
763                    .map(|payment| (payment.created_index(), payment))
764                    .collect::<BTreeMap<_, _>>()
765            })
766    }
767}
768
769#[cfg(test)]
770mod test {
771    use std::{collections::HashSet, time::Duration};
772
773    use common::rng::{FastRng, Rng, RngExt};
774    use lexe_api::types::payments::PaymentStatus;
775    use proptest::{
776        collection::vec, prelude::any, proptest, sample::Index,
777        test_runner::Config,
778    };
779    use tempfile::tempdir;
780
781    use super::{test_utils::MockNode, *};
782    use crate::unstable::ffs::{DiskFs, test_utils::InMemoryFfs};
783
784    #[test]
785    fn read_from_empty() {
786        let mock_ffs = InMemoryFfs::new();
787        let mock_ffs_db = PaymentsDb::read(mock_ffs).unwrap();
788        assert!(mock_ffs_db.state.read().unwrap().is_empty());
789        mock_ffs_db.debug_assert_invariants();
790
791        let tempdir = tempfile::tempdir().unwrap();
792        let temp_fs =
793            DiskFs::create_dir_all(tempdir.path().to_path_buf()).unwrap();
794        let temp_fs_db = PaymentsDb::read(temp_fs).unwrap();
795        assert!(temp_fs_db.state.read().unwrap().is_empty());
796        temp_fs_db.debug_assert_invariants();
797
798        assert_eq!(
799            *mock_ffs_db.state.read().unwrap(),
800            *temp_fs_db.state.read().unwrap()
801        );
802    }
803
804    #[test]
805    fn test_upsert() {
806        fn visit_batches<T>(
807            iter: &mut impl Iterator<Item = T>,
808            batch_sizes: Vec<usize>,
809            mut f: impl FnMut(Vec<T>),
810        ) {
811            let batch_sizes = batch_sizes.into_iter();
812
813            for batch_size in batch_sizes {
814                let batch = take_n(iter, batch_size);
815                let batch_len = batch.len();
816
817                if batch_len == 0 {
818                    return;
819                }
820
821                f(batch);
822
823                if batch_len < batch_size {
824                    return;
825                }
826            }
827
828            let batch = iter.collect::<Vec<_>>();
829
830            if !batch.is_empty() {
831                f(batch);
832            }
833        }
834
835        fn take_n<T>(iter: &mut impl Iterator<Item = T>, n: usize) -> Vec<T> {
836            let mut out = Vec::with_capacity(n);
837
838            while out.len() < n {
839                match iter.next() {
840                    Some(value) => out.push(value),
841                    None => break,
842                }
843            }
844
845            out
846        }
847
848        proptest!(
849            Config::with_cases(10),
850            |(
851                rng: FastRng,
852                payments in test_utils::any_payments(0..20),
853                batch_sizes in vec(1_usize..20, 0..5),
854            )| {
855                let tempdir = tempdir().unwrap();
856                let temp_fs = DiskFs::create_dir_all(tempdir.path().to_path_buf()).unwrap();
857                let temp_fs_db = PaymentsDb::empty(temp_fs);
858
859                let mock_ffs = InMemoryFfs::from_rng(rng);
860                let mock_ffs_db = PaymentsDb::empty(mock_ffs);
861
862                let mut payments_iter = payments.clone().into_values();
863                visit_batches(&mut payments_iter, batch_sizes, |new_payment_batch| {
864                    let _ = mock_ffs_db.upsert_payments(
865                        new_payment_batch.clone()
866                    ).unwrap();
867                    let _ = temp_fs_db.upsert_payments(new_payment_batch).unwrap();
868
869                    mock_ffs_db.debug_assert_invariants();
870                    temp_fs_db.debug_assert_invariants();
871                });
872
873                assert_eq!(
874                    *mock_ffs_db.state.read().unwrap(),
875                    *temp_fs_db.state.read().unwrap()
876                );
877            }
878        );
879    }
880
881    #[tokio::test]
882    async fn test_sync_empty() {
883        let mock_node_client = MockNode::new(BTreeMap::new());
884        let mock_ffs = InMemoryFfs::new();
885        let db = PaymentsDb::empty(mock_ffs);
886
887        sync_payments(&db, &mock_node_client, 5).await.unwrap();
888
889        assert!(db.state.read().unwrap().is_empty());
890        db.debug_assert_invariants();
891    }
892
893    #[test]
894    fn test_sync() {
895        /// Assert that the payments in the db equal those in the mock node.
896        fn assert_db_payments_eq(
897            db_payments: &BTreeMap<PaymentCreatedIndex, BasicPaymentV2>,
898            node_payments: &BTreeMap<PaymentUpdatedIndex, BasicPaymentV2>,
899        ) {
900            assert_eq!(db_payments.len(), node_payments.len());
901            db_payments.iter().for_each(|(_created_idx, payment)| {
902                let node_payment =
903                    node_payments.get(&payment.updated_index()).unwrap();
904                assert_eq!(payment, node_payment);
905            });
906            node_payments.iter().for_each(|(_updated_idx, payment)| {
907                let db_payment =
908                    db_payments.get(&payment.created_index()).unwrap();
909                assert_eq!(payment, db_payment);
910            });
911        }
912
913        let rt = tokio::runtime::Builder::new_current_thread()
914            .build()
915            .unwrap();
916
917        proptest!(
918            Config::with_cases(4),
919            |(
920                mut rng: FastRng,
921                payments in test_utils::any_payments(1..20),
922                req_batch_size in 1_u16..5,
923                finalize_indexes in
924                    proptest::collection::vec(any::<Index>(), 1..5),
925            )| {
926                let mut mock_node = MockNode::from_payments(payments);
927
928                let mut rng2 = FastRng::from_u64(rng.gen_u64());
929                let mock_ffs = InMemoryFfs::from_rng(rng);
930
931                // Sync empty DB from node
932                let db = PaymentsDb::empty(mock_ffs);
933                rt.block_on(sync_payments(&db, &mock_node, req_batch_size))
934                    .unwrap();
935                assert_db_payments_eq(
936                    &db.state.read().unwrap().payments,
937                    &mock_node.payments,
938                );
939                db.debug_assert_invariants();
940
941                // Reread db from ffs and resync - should still match node
942                let mock_ffs = db.ffs;
943                let db = PaymentsDb::read(mock_ffs).unwrap();
944                rt.block_on(sync_payments(&db, &mock_node, req_batch_size))
945                    .unwrap();
946                assert_db_payments_eq(
947                    &db.state.read().unwrap().payments,
948                    &mock_node.payments,
949                );
950                db.debug_assert_invariants();
951
952                // Finalize some payments
953                let finalize_some_payments = || {
954                    let pending_payments = mock_node
955                        .payments
956                        .values()
957                        .filter(|p| p.is_pending())
958                        .cloned()
959                        .collect::<Vec<_>>();
960
961                    if pending_payments.is_empty() {
962                        return;
963                    }
964
965                    // Simulates the current time
966                    //
967                    // We bump it before every update to ensure finalized
968                    // payments have a later updated_at than in our DB
969                    let mut current_time = db
970                        .state
971                        .read()
972                        .unwrap()
973                        .latest_updated_index
974                        .expect("DB should have payments")
975                        .updated_at;
976
977                    // The array indices of the payments inside
978                    // `pending_payments` to finalize
979                    let finalize_idxs = finalize_indexes
980                        .into_iter()
981                        .map(|index| index.index(pending_payments.len()))
982                        // Collect into HashSet to dedup without sorting
983                        .collect::<HashSet<_>>();
984
985                    for finalize_idx in finalize_idxs {
986                        // The updated_at index of the payment to finalize
987                        let final_updated_idx =
988                            pending_payments[finalize_idx].updated_index();
989
990                        // Remove the payment to finalize from map
991                        let mut payment = mock_node
992                            .payments
993                            .remove(&final_updated_idx)
994                            .unwrap();
995
996                        // The finalized status to set for this payment
997                        let new_status = if rng2.gen_boolean() {
998                            PaymentStatus::Completed
999                        } else {
1000                            PaymentStatus::Failed
1001                        };
1002
1003                        // Bump the current time so new updated_at is fresh
1004                        let bump_u64 = rng2.gen_range(1..=10);
1005                        let bump_dur = Duration::from_millis(bump_u64);
1006                        current_time = current_time.saturating_add(bump_dur);
1007
1008                        // Update payment
1009                        payment.status = new_status;
1010                        payment.updated_at = current_time;
1011
1012                        // Re-insert with new updated_at as the key
1013                        let new_updated_index = payment.updated_index();
1014                        mock_node
1015                            .payments
1016                            .insert(new_updated_index, payment);
1017                        }
1018                };
1019
1020                finalize_some_payments();
1021
1022                // resync -- should pick up the finalized payments
1023                rt.block_on(sync_payments(&db, &mock_node, req_batch_size))
1024                    .unwrap();
1025
1026                assert_db_payments_eq(
1027                    &db.state.read().unwrap().payments,
1028                    &mock_node.payments,
1029                );
1030                db.debug_assert_invariants();
1031            }
1032        );
1033    }
1034}