lexe/unstable/
payments_db.rs

1//! Local payments db and payment sync
2
3/// NOTE: Be careful about `pub`! This module is part of the stable API.
4///
5/// ### [`PaymentsDb`]
6///
7/// The app's [`PaymentsDb`] maintains a local copy of all [`BasicPaymentV2`]s,
8/// synced from the user node. The user nodes are the source-of-truth for
9/// payment state; consequently, this payment db is effectively a projection of
10/// the user node's payment state.
11///
12/// Currently the [`BasicPaymentV2`]s in the [`PaymentsDb`] are just dumped into
13/// a subdirectory of the app's data directory as unencrypted json blobs. On
14/// startup, we just load all on-disk [`BasicPaymentV2`]s into memory.
15///
16/// In the future, this could be a SQLite DB or something.
17///
18/// ### Payment Syncing
19///
20/// Syncing payments is done by tailing payments from the user node by
21/// [`PaymentUpdatedIndex`]. We simply keep track of the latest updated at index
22/// in our DB, and fetch batches of updated payments, merging them into our DB,
23/// until no payments are returned.
24///
25/// ## Design goals
26///
27/// We want efficient random access for pending payments, which are always
28/// displayed, and efficient fetching for the last N finalized payments, which
29/// are the first payments displayed in the finalized payments list.
30///
31/// Cursors don't work well with flutter's lazy lists, since they want random
32/// access from scroll index -> content.
33///
34/// Performance when syncing payments is secondary, since that's done
35/// asynchronously and off the main UI thread, which is highly latency
36/// sensitive.
37///
38/// In the future, we could be even more clever and serialize+store the pending
39/// index on-disk. If we also added a finalized index, we could avoid the need
40/// to load all the payments on startup and could instead lazy load them.
41///
42/// [`BasicPaymentV2`]: lexe_api::types::payments::BasicPaymentV2
43/// [`PaymentsDb`]: crate::unstable::payments_db::PaymentsDb
44/// [`PaymentUpdatedIndex`]: lexe_api::types::payments::PaymentUpdatedIndex
45mod docs {}
46
47use std::{
48    cmp,
49    collections::{BTreeMap, BTreeSet},
50    io,
51    ops::Bound,
52    str::FromStr,
53    sync::RwLock,
54};
55
56use anyhow::Context;
57#[cfg(doc)]
58use lexe_api::types::payments::VecDbPaymentV2;
59use lexe_api::{
60    def::AppNodeRunApi,
61    error::NodeApiError,
62    models::command,
63    types::payments::{
64        BasicPaymentV2, PaymentCreatedIndex, PaymentStatus,
65        PaymentUpdatedIndex, VecBasicPaymentV2,
66    },
67};
68use lexe_common::time::TimestampMs;
69use lexe_node_client::client::NodeClient;
70use serde::{Deserialize, Serialize};
71use tracing::warn;
72
73use crate::{
74    types::{
75        command::PaymentSyncSummary,
76        payment::{Order, PaymentFilter},
77    },
78    unstable::ffs::Ffs,
79};
80
81/// The app's local [`BasicPaymentV2`] database, synced from the user node.
82pub struct PaymentsDb<F> {
83    ffs: F,
84    state: RwLock<PaymentsDbState>,
85}
86
87/// Pure in-memory state of the [`PaymentsDb`].
88#[derive(Debug, PartialEq)]
89struct PaymentsDbState {
90    /// All locally synced payments,
91    /// from oldest to newest (reverse of the UI scroll order).
92    payments: BTreeMap<PaymentCreatedIndex, BasicPaymentV2>,
93
94    /// An index of currently pending payments, sorted by `created_at` index.
95    //
96    // For now, we only have a `pending` index, because it is sparse - there's
97    // no point in indexing `finalized` as most payments are finalized. If we
98    // later want filters for "offers only" or similar, we can add more.
99    pending: BTreeSet<PaymentCreatedIndex>,
100
101    /// The latest `updated_at` index of any payment in the db.
102    ///
103    /// Invariant:
104    ///
105    /// ```ignore
106    /// latest_updated_index == payments.iter()
107    ///     .map(|p| p.updated_index())
108    ///     .max()
109    /// ```
110    latest_updated_index: Option<PaymentUpdatedIndex>,
111
112    /// The time we last successfully synced, or `None` if we've never synced.
113    ///
114    /// Tracks when we last *checked* for updates, so unlike
115    /// [`latest_updated_index`] it stays current even when no new payments
116    /// arrive.
117    ///
118    /// NOTE: This is measured by the device's local clock, not the node's
119    /// clock, so it's subject to clock skew.
120    ///
121    /// [`latest_updated_index`]: Self::latest_updated_index
122    last_synced_at: Option<TimestampMs>,
123}
124
125/// Payments DB metadata, persisted at [`METADATA_FILENAME`] in the same folder
126/// as the per-payment files.
127#[derive(Debug, Default, PartialEq, Serialize, Deserialize)]
128struct PaymentsDbMetadata {
129    /// See [`PaymentsDbState::last_synced_at`].
130    last_synced_at: Option<TimestampMs>,
131}
132
133/// Filename of the [`PaymentsDbMetadata`] file. It is not a valid
134/// [`PaymentCreatedIndex`], so it never collides with a payment file.
135const METADATA_FILENAME: &str = "metadata.json";
136
137/// Sync the app's local payment state from the user node.
138///
139/// We tail updated payments from the user node, merging results into our DB
140/// until there are no more updates left to sync.
141//
142// Unstable: Takes Ffs as a param which is still evolving
143#[allow(private_bounds)]
144pub(crate) async fn sync_payments<F: Ffs>(
145    db: &PaymentsDb<F>,
146    node_client: &impl AppNodeRunSyncApi,
147    batch_size: u16,
148) -> anyhow::Result<PaymentSyncSummary> {
149    assert!(batch_size > 0);
150
151    let mut start_index = db.state.read().unwrap().latest_updated_index;
152
153    let mut summary = PaymentSyncSummary {
154        num_new: 0,
155        num_updated: 0,
156    };
157
158    loop {
159        // In every loop iteration, we fetch one batch of updated payments.
160
161        let req = command::GetUpdatedPayments {
162            // Remember, this start index is _exclusive_.
163            // The payment w/ this index will _NOT_ be included in the response.
164            start_index,
165            limit: Some(batch_size),
166        };
167
168        let updated_payments = node_client
169            .get_updated_payments(req)
170            .await
171            .context("Failed to fetch updated payments")?
172            .payments;
173        let updated_payments_len = updated_payments.len();
174
175        // Update the db: persist, then update in-memory state.
176        let (new, updated, latest_updated_index) = db
177            .upsert_payments(updated_payments)
178            .context("Failed to upsert payments")?;
179        summary.num_new += new;
180        summary.num_updated += updated;
181
182        // Update the `start_index` we'll use for the next batch.
183        start_index = latest_updated_index;
184
185        // If the node returned fewer payments than our requested batch size,
186        // then we are done (there are no more new payments after this batch).
187        if updated_payments_len < usize::from(batch_size) {
188            break;
189        }
190    }
191
192    // Record the local-clock sync time, even if nothing changed, so we can
193    // later tell users how fresh their local cache is.
194    db.record_synced_at(TimestampMs::now())
195        .context("Failed to persist sync timestamp")?;
196
197    Ok(summary)
198}
199
200/// The specific `AppNodeRunApi` method that we need to sync payments.
201///
202/// This lets us mock out the method in the tests below,
203/// without also mocking out the entire `AppNodeRunApi` trait.
204trait AppNodeRunSyncApi {
205    /// GET /node/v1/payments/updated [`command::GetUpdatedPayments`]
206    ///                            -> [`VecDbPaymentV2`]
207    async fn get_updated_payments(
208        &self,
209        req: command::GetUpdatedPayments,
210    ) -> Result<VecBasicPaymentV2, NodeApiError>;
211}
212
213impl AppNodeRunSyncApi for NodeClient {
214    async fn get_updated_payments(
215        &self,
216        req: command::GetUpdatedPayments,
217    ) -> Result<VecBasicPaymentV2, NodeApiError> {
218        AppNodeRunApi::get_updated_payments(self, req).await
219    }
220}
221
222impl<F: Ffs> PaymentsDb<F> {
223    // --- Constructors (unstable b/c Ffs is still in flux) --- //
224
225    /// Read all the payments on-disk into a new `PaymentsDb`.
226    pub(crate) fn read(ffs: F) -> anyhow::Result<Self> {
227        let state = PaymentsDbState::read(&ffs)
228            .map(RwLock::new)
229            .context("Failed to read on-disk PaymentsDb state")?;
230
231        Ok(Self { ffs, state })
232    }
233
234    /// Create a new empty `PaymentsDb`. Does not touch disk/storage.
235    pub(crate) fn empty(ffs: F) -> Self {
236        let state = RwLock::new(PaymentsDbState::empty());
237        Self { ffs, state }
238    }
239
240    // --- Public API  --- //
241
242    /// Clear the in-memory state and delete the on-disk payment db.
243    pub fn clear(&self) -> io::Result<()> {
244        *self.state.write().unwrap() = PaymentsDbState::empty();
245        self.ffs.delete_all()
246    }
247
248    /// Total number of payments in the database.
249    pub fn num_payments(&self) -> usize {
250        self.state.read().unwrap().num_payments()
251    }
252
253    /// Number of pending (unfinalized) payments.
254    pub fn num_pending(&self) -> usize {
255        self.state.read().unwrap().num_pending()
256    }
257
258    /// Number of finalized payments.
259    #[cfg_attr(not(feature = "unstable"), allow(dead_code))]
260    pub fn num_finalized(&self) -> usize {
261        self.state.read().unwrap().num_finalized()
262    }
263
264    /// Number of pending payments that are not junk.
265    #[cfg_attr(not(feature = "unstable"), allow(dead_code))]
266    pub fn num_pending_not_junk(&self) -> usize {
267        self.state.read().unwrap().num_pending_not_junk()
268    }
269
270    /// Number of finalized payments that are not junk.
271    #[cfg_attr(not(feature = "unstable"), allow(dead_code))]
272    pub fn num_finalized_not_junk(&self) -> usize {
273        self.state.read().unwrap().num_finalized_not_junk()
274    }
275
276    /// The latest `updated_at` index of any payment in the db.
277    pub fn latest_updated_index(&self) -> Option<PaymentUpdatedIndex> {
278        self.state.read().unwrap().latest_updated_index()
279    }
280
281    /// The time we last successfully synced with the user node, or `None` if
282    /// we've never synced.
283    ///
284    /// Tracks when we last *checked* for updates, so it can be much more recent
285    /// than the newest payment's `updated_at` if a sync turned up no new
286    /// payments.
287    ///
288    /// NOTE: This is measured by the device's local clock, not the node's
289    /// clock, so it's subject to clock skew.
290    pub fn last_synced_at(&self) -> Option<TimestampMs> {
291        self.state.read().unwrap().last_synced_at
292    }
293
294    /// Get a payment by its `PaymentCreatedIndex`.
295    pub fn get_payment_by_created_index(
296        &self,
297        created_index: &PaymentCreatedIndex,
298    ) -> Option<BasicPaymentV2> {
299        self.state
300            .read()
301            .unwrap()
302            .get_payment_by_created_index(created_index)
303            .cloned()
304    }
305
306    /// Get a payment by scroll index in UI order (newest to oldest).
307    #[cfg_attr(not(feature = "unstable"), allow(dead_code))]
308    pub fn get_payment_by_scroll_idx(
309        &self,
310        scroll_idx: usize,
311    ) -> Option<BasicPaymentV2> {
312        self.state
313            .read()
314            .unwrap()
315            .get_payment_by_scroll_idx(scroll_idx)
316            .cloned()
317    }
318
319    /// Get a pending payment by scroll index in UI order (newest to oldest).
320    #[cfg_attr(not(feature = "unstable"), allow(dead_code))]
321    pub fn get_pending_payment_by_scroll_idx(
322        &self,
323        scroll_idx: usize,
324    ) -> Option<BasicPaymentV2> {
325        self.state
326            .read()
327            .unwrap()
328            .get_pending_payment_by_scroll_idx(scroll_idx)
329            .cloned()
330    }
331
332    /// Get a "pending and not junk" payment by scroll index in UI order.
333    #[cfg_attr(not(feature = "unstable"), allow(dead_code))]
334    pub fn get_pending_not_junk_payment_by_scroll_idx(
335        &self,
336        scroll_idx: usize,
337    ) -> Option<BasicPaymentV2> {
338        self.state
339            .read()
340            .unwrap()
341            .get_pending_not_junk_payment_by_scroll_idx(scroll_idx)
342            .cloned()
343    }
344
345    /// Get a finalized payment by scroll index in UI order (newest to oldest).
346    #[cfg_attr(not(feature = "unstable"), allow(dead_code))]
347    pub fn get_finalized_payment_by_scroll_idx(
348        &self,
349        scroll_idx: usize,
350    ) -> Option<BasicPaymentV2> {
351        self.state
352            .read()
353            .unwrap()
354            .get_finalized_payment_by_scroll_idx(scroll_idx)
355            .cloned()
356    }
357
358    /// Get a "finalized and not junk" payment by scroll index in UI order.
359    #[cfg_attr(not(feature = "unstable"), allow(dead_code))]
360    pub fn get_finalized_not_junk_payment_by_scroll_idx(
361        &self,
362        scroll_idx: usize,
363    ) -> Option<BasicPaymentV2> {
364        self.state
365            .read()
366            .unwrap()
367            .get_finalized_not_junk_payment_by_scroll_idx(scroll_idx)
368            .cloned()
369    }
370
371    /// List payments from local storage with cursor-based pagination.
372    ///
373    /// Returns `(payments, next_index)` where `next_index` is the cursor
374    /// for fetching the next page (`None` when there are no more results).
375    pub fn list_payments(
376        &self,
377        filter: &PaymentFilter,
378        order: Order,
379        limit: usize,
380        after: Option<&PaymentCreatedIndex>,
381    ) -> (Vec<BasicPaymentV2>, Option<PaymentCreatedIndex>) {
382        self.state
383            .read()
384            .unwrap()
385            .list_payments(filter, order, limit, after)
386    }
387
388    // --- Private helpers --- //
389
390    /// Upsert a batch of payments synced from the user node.
391    ///
392    /// Returns `(num_new, num_updated, latest_updated_index)`.
393    /// May return `(0, 0, _)` if nothing was inserted or updated.
394    fn upsert_payments(
395        &self,
396        payments: impl IntoIterator<Item = BasicPaymentV2>,
397    ) -> io::Result<(usize, usize, Option<PaymentUpdatedIndex>)> {
398        let mut state = self.state.write().unwrap();
399
400        let mut num_new = 0;
401        let mut num_updated = 0;
402        for payment in payments {
403            let (new, updated) =
404                Self::upsert_payment(&self.ffs, &mut state, payment)?;
405            num_new += new;
406            num_updated += updated;
407        }
408
409        Ok((num_new, num_updated, state.latest_updated_index))
410    }
411
412    /// Upserts a payment into the db.
413    ///
414    /// Persists the payment and updates in-memory state, including indices.
415    ///
416    /// Returns the number of new and updated payments, respectively.
417    /// May return `(0, 0)` if nothing was inserted or updated.
418    fn upsert_payment(
419        ffs: &F,
420        state: &mut PaymentsDbState,
421        payment: BasicPaymentV2,
422    ) -> io::Result<(usize, usize)> {
423        let created_index = payment.created_index();
424
425        let maybe_existing = state.payments.get(&created_index);
426        let already_existed = maybe_existing.is_some();
427
428        // Skip if payment exists and there is no change to it.
429        if let Some(existing) = maybe_existing
430            && payment == *existing
431        {
432            return Ok((0, 0));
433        }
434
435        // Persist the updated payment to storage. Since this is fallible, we
436        // do this first, otherwise we may corrupt our in-memory state.
437        Self::write_payment(ffs, &payment)?;
438
439        // --- 'Commit' by updating our in-memory state --- //
440
441        // Update indices first to avoid a clone
442        if payment.is_pending() {
443            state.pending.insert(created_index);
444        } else {
445            state.pending.remove(&created_index);
446        }
447        // It is always true that `None < Some(_)`
448        state.latest_updated_index =
449            cmp::max(state.latest_updated_index, Some(payment.updated_index()));
450
451        // Update main payments map
452        state.payments.insert(created_index, payment);
453
454        if already_existed {
455            Ok((0, 1))
456        } else {
457            Ok((1, 0))
458        }
459    }
460
461    /// Write a payment to on-disk storage as JSON bytes. The caller is
462    /// responsible for updating the in-memory [`PaymentsDb`] state and indices.
463    // Making this an associated fn avoids some borrow checker issues.
464    fn write_payment(ffs: &F, payment: &BasicPaymentV2) -> io::Result<()> {
465        let filename = payment.created_index().to_string();
466        let data =
467            serde_json::to_vec(&payment).expect("Failed to serialize payment");
468        ffs.write(&filename, &data)
469    }
470
471    /// Record that a successful sync just completed at the given local time.
472    ///
473    /// Persists the metadata first, then updates in-memory state.
474    fn record_synced_at(&self, now: TimestampMs) -> io::Result<()> {
475        let metadata = PaymentsDbMetadata {
476            last_synced_at: Some(now),
477        };
478        let data = serde_json::to_vec(&metadata)
479            .expect("Failed to serialize metadata");
480        self.ffs.write(METADATA_FILENAME, &data)?;
481
482        self.state.write().unwrap().last_synced_at = Some(now);
483        Ok(())
484    }
485
486    /// Update the personal note on an existing payment in this [`PaymentsDb`].
487    /// This does NOT actually update the note on the user node, hence why this
488    /// is not a public API.
489    pub(crate) fn update_personal_note(
490        &self,
491        req: command::UpdatePersonalNote,
492    ) -> anyhow::Result<()> {
493        let mut state = self.state.write().unwrap();
494
495        let payment = state
496            .get_mut_payment_by_created_index(&req.index)
497            .context("Updating non-existent payment")?;
498
499        payment.personal_note = req.personal_note.map(|n| n.into_inner());
500
501        Self::write_payment(&self.ffs, payment)
502            .context("Failed to write payment to local db")?;
503
504        Ok(())
505    }
506
507    /// Check the integrity of the whole PaymentsDb.
508    ///
509    /// (1.) The in-memory state should not be corrupted.
510    /// (2.) The current on-disk state should match the in-memory state.
511    #[cfg(test)]
512    fn debug_assert_invariants(&self) {
513        if cfg!(not(debug_assertions)) {
514            return;
515        }
516
517        let state = self.state.read().unwrap();
518
519        // (1.)
520        state.debug_assert_invariants();
521
522        // (2.)
523        let on_disk_state = PaymentsDbState::read(&self.ffs)
524            .expect("Failed to re-read on-disk state");
525        assert_eq!(on_disk_state, *state);
526    }
527}
528
529impl PaymentsDbState {
530    /// Create a new empty [`PaymentsDbState`]. Does not touch disk/storage.
531    fn empty() -> Self {
532        Self {
533            payments: BTreeMap::new(),
534            pending: BTreeSet::new(),
535            latest_updated_index: None,
536            last_synced_at: None,
537        }
538    }
539
540    /// Read the DB state from disk.
541    fn read(ffs: &impl Ffs) -> anyhow::Result<Self> {
542        let mut buf = Vec::<u8>::new();
543        let mut payments = Vec::<BasicPaymentV2>::new();
544        let mut last_synced_at = None;
545
546        ffs.read_dir_visitor(|filename| {
547            // The metadata file lives alongside the payment files.
548            if filename == METADATA_FILENAME {
549                buf.clear();
550                ffs.read_into(filename, &mut buf)?;
551                let metadata =
552                    serde_json::from_slice::<PaymentsDbMetadata>(&buf)
553                        .context(METADATA_FILENAME)
554                        .context("Failed to deserialize payments db metadata")
555                        .map_err(io_error_invalid_data)?;
556                last_synced_at = metadata.last_synced_at;
557                return Ok(());
558            }
559
560            // Parse created_at index from filename; skip unrecognized files.
561            let created_index = match PaymentCreatedIndex::from_str(filename) {
562                Ok(idx) => idx,
563                Err(e) => {
564                    warn!(
565                        %filename,
566                        "Error: unrecognized filename in payments dir: {e:#}"
567                    );
568                    return Ok(());
569                }
570            };
571
572            // Read payment into buffer
573            buf.clear();
574            ffs.read_into(filename, &mut buf)?;
575
576            // Deserialize payment
577            let payment = serde_json::from_slice::<BasicPaymentV2>(&buf)
578                .with_context(|| filename.to_owned())
579                .context("Failed to deserialize payment file")
580                .map_err(io_error_invalid_data)?;
581
582            // Sanity check: Index in filename should match index in payment.
583            let payment_created_index = payment.created_index();
584            if created_index != payment_created_index {
585                return Err(io_error_invalid_data(format!(
586                    "Payment DB corruption: filename index '{filename}'
587                     different from index in contents '{payment_created_index}'"
588                )));
589            }
590
591            // Collect the payment
592            payments.push(payment);
593
594            Ok(())
595        })
596        .context("Failed to read payments db, possibly corrupted?")?;
597
598        Ok(Self::from_vec(payments, last_synced_at))
599    }
600
601    fn from_vec(
602        payments: Vec<BasicPaymentV2>,
603        last_synced_at: Option<TimestampMs>,
604    ) -> Self {
605        let payments = payments
606            .into_iter()
607            .map(|p| (p.created_index(), p))
608            .collect();
609
610        let pending = build_index::pending(&payments);
611        let latest_updated_index = build_index::latest_updated_index(&payments);
612
613        Self {
614            payments,
615            pending,
616            latest_updated_index,
617            last_synced_at,
618        }
619    }
620
621    fn num_payments(&self) -> usize {
622        self.payments.len()
623    }
624
625    fn num_pending(&self) -> usize {
626        self.pending.len()
627    }
628
629    #[cfg_attr(not(feature = "unstable"), allow(dead_code))]
630    fn num_finalized(&self) -> usize {
631        self.payments.len() - self.pending.len()
632    }
633
634    #[cfg_attr(not(feature = "unstable"), allow(dead_code))]
635    fn num_pending_not_junk(&self) -> usize {
636        self.pending
637            .iter()
638            .filter_map(|created_idx| self.payments.get(created_idx))
639            .filter(|p| p.is_pending_not_junk())
640            .count()
641    }
642
643    // TODO(max): If needed, we can add an index for this.
644    #[cfg_attr(not(feature = "unstable"), allow(dead_code))]
645    fn num_finalized_not_junk(&self) -> usize {
646        self.payments
647            .values()
648            .filter(|p| p.is_finalized_not_junk())
649            .count()
650    }
651
652    fn latest_updated_index(&self) -> Option<PaymentUpdatedIndex> {
653        self.latest_updated_index
654    }
655
656    fn get_payment_by_created_index(
657        &self,
658        created_index: &PaymentCreatedIndex,
659    ) -> Option<&BasicPaymentV2> {
660        self.payments.get(created_index)
661    }
662
663    /// Get a mutable payment by its `PaymentCreatedIndex`.
664    fn get_mut_payment_by_created_index(
665        &mut self,
666        created_index: &PaymentCreatedIndex,
667    ) -> Option<&mut BasicPaymentV2> {
668        self.payments.get_mut(created_index)
669    }
670
671    #[cfg_attr(not(feature = "unstable"), allow(dead_code))]
672    fn get_payment_by_scroll_idx(
673        &self,
674        scroll_idx: usize,
675    ) -> Option<&BasicPaymentV2> {
676        self.payments.values().nth_back(scroll_idx)
677    }
678
679    #[cfg_attr(not(feature = "unstable"), allow(dead_code))]
680    fn get_pending_payment_by_scroll_idx(
681        &self,
682        scroll_idx: usize,
683    ) -> Option<&BasicPaymentV2> {
684        self.pending
685            .iter()
686            .nth_back(scroll_idx)
687            .and_then(|created_idx| self.payments.get(created_idx))
688    }
689
690    #[cfg_attr(not(feature = "unstable"), allow(dead_code))]
691    fn get_pending_not_junk_payment_by_scroll_idx(
692        &self,
693        scroll_idx: usize,
694    ) -> Option<&BasicPaymentV2> {
695        self.pending
696            .iter()
697            .rev()
698            .filter_map(|created_idx| self.payments.get(created_idx))
699            .filter(|p| p.is_pending_not_junk())
700            .nth_back(scroll_idx)
701    }
702
703    #[cfg_attr(not(feature = "unstable"), allow(dead_code))]
704    fn get_finalized_payment_by_scroll_idx(
705        &self,
706        scroll_idx: usize,
707    ) -> Option<&BasicPaymentV2> {
708        self.payments
709            .values()
710            .filter(|p| p.is_finalized())
711            .nth_back(scroll_idx)
712    }
713
714    #[cfg_attr(not(feature = "unstable"), allow(dead_code))]
715    fn get_finalized_not_junk_payment_by_scroll_idx(
716        &self,
717        scroll_idx: usize,
718    ) -> Option<&BasicPaymentV2> {
719        self.payments
720            .values()
721            .filter(|p| p.is_finalized_not_junk())
722            .nth_back(scroll_idx)
723    }
724
725    /// List payments with cursor-based pagination.
726    ///
727    /// Returns `(payments, next_index)`.
728    fn list_payments(
729        &self,
730        filter: &PaymentFilter,
731        order: Order,
732        limit: usize,
733        after: Option<&PaymentCreatedIndex>,
734    ) -> (Vec<BasicPaymentV2>, Option<PaymentCreatedIndex>) {
735        if limit == 0 {
736            return (Vec::new(), None);
737        }
738
739        let matches_filter = |p: &&BasicPaymentV2| match filter {
740            PaymentFilter::All => true,
741            PaymentFilter::Pending => p.status == PaymentStatus::Pending,
742            PaymentFilter::Completed => p.status == PaymentStatus::Completed,
743            PaymentFilter::Failed => p.status == PaymentStatus::Failed,
744            PaymentFilter::Finalized => p.status != PaymentStatus::Pending,
745        };
746
747        // Fetch `limit + 1` to detect whether there's a next page.
748        let take = limit.saturating_add(1);
749
750        let mut payments: Vec<BasicPaymentV2> = match (order, after) {
751            (Order::Asc, Some(c)) => self
752                .payments
753                .range((Bound::Excluded(c), Bound::Unbounded))
754                .map(|(_, p)| p)
755                .filter(matches_filter)
756                .take(take)
757                .cloned()
758                .collect(),
759            (Order::Asc, None) => self
760                .payments
761                .values()
762                .filter(matches_filter)
763                .take(take)
764                .cloned()
765                .collect(),
766            (Order::Desc, Some(c)) => self
767                .payments
768                .range(..c)
769                .rev()
770                .map(|(_, p)| p)
771                .filter(matches_filter)
772                .take(take)
773                .cloned()
774                .collect(),
775            (Order::Desc, None) => self
776                .payments
777                .values()
778                .rev()
779                .filter(matches_filter)
780                .take(take)
781                .cloned()
782                .collect(),
783        };
784
785        // If we got more than `limit`, there's a next page.
786        let has_more = payments.len() > limit;
787        if has_more {
788            payments.truncate(limit);
789        }
790        let next_index = has_more.then(|| {
791            payments
792                .last()
793                .expect("has_more implies non-empty")
794                .created_index()
795        });
796
797        (payments, next_index)
798    }
799
800    #[cfg(test)]
801    fn is_empty(&self) -> bool {
802        self.payments.is_empty()
803    }
804
805    /// Check the integrity of the in-memory state.
806    #[cfg(test)]
807    fn debug_assert_invariants(&self) {
808        if cfg!(not(debug_assertions)) {
809            return;
810        }
811
812        // --- `payments` invariants: --- //
813
814        // Each payment is stored under its own created_at index
815        for (idx, payment) in &self.payments {
816            assert_eq!(*idx, payment.created_index());
817        }
818
819        // --- `pending` index invariants: --- //
820
821        // Rebuilding the index recreates the same index exactly
822        let rebuilt_pending_index = build_index::pending(&self.payments);
823        assert_eq!(rebuilt_pending_index, self.pending);
824
825        // All pending payments are in the index
826        // (in case there is a bug in `index::build_pending`)
827        self.payments
828            .values()
829            .filter(|p| p.is_pending())
830            .all(|p| self.pending.contains(&p.created_index()));
831
832        // --- `latest_updated_index` invariant: --- //
833
834        let recomputed_latest_updated_index =
835            build_index::latest_updated_index(&self.payments);
836        assert_eq!(recomputed_latest_updated_index, self.latest_updated_index);
837    }
838}
839
840mod build_index {
841    use super::*;
842
843    /// Build the `pending` index from the given payments.
844    pub(super) fn pending(
845        payments: &BTreeMap<PaymentCreatedIndex, BasicPaymentV2>,
846    ) -> BTreeSet<PaymentCreatedIndex> {
847        payments
848            .iter()
849            .filter(|(_, p)| p.is_pending())
850            .map(|(idx, _)| *idx)
851            .collect()
852    }
853
854    /// Find the latest [`PaymentUpdatedIndex`] from the given payments.
855    pub(super) fn latest_updated_index(
856        payments: &BTreeMap<PaymentCreatedIndex, BasicPaymentV2>,
857    ) -> Option<PaymentUpdatedIndex> {
858        payments.values().map(BasicPaymentV2::updated_index).max()
859    }
860}
861
862/// Construct an [`io::Error`] of kind `InvalidData` from the given error.
863fn io_error_invalid_data(
864    error: impl Into<Box<dyn std::error::Error + Send + Sync>>,
865) -> io::Error {
866    io::Error::new(io::ErrorKind::InvalidData, error)
867}
868
869#[cfg(test)]
870mod test_utils {
871    use proptest::{
872        prelude::{Strategy, any},
873        sample::SizeRange,
874    };
875
876    use super::*;
877
878    pub(super) struct MockNode {
879        pub payments: BTreeMap<PaymentUpdatedIndex, BasicPaymentV2>,
880    }
881
882    impl MockNode {
883        /// Construct from a map of updated_at index -> payment.
884        pub(super) fn new(
885            payments: BTreeMap<PaymentUpdatedIndex, BasicPaymentV2>,
886        ) -> Self {
887            Self { payments }
888        }
889
890        /// Construct from a map of created_at index -> payment.
891        pub(super) fn from_payments(
892            payments: BTreeMap<PaymentCreatedIndex, BasicPaymentV2>,
893        ) -> Self {
894            let payments = payments
895                .into_values()
896                .map(|p| (p.updated_index(), p))
897                .collect();
898            Self { payments }
899        }
900    }
901
902    impl AppNodeRunSyncApi for MockNode {
903        /// GET /node/v1/payments/updated [`command::GetUpdatedPayments`]
904        ///                            -> [`VecDbPaymentV2`]
905        async fn get_updated_payments(
906            &self,
907            req: command::GetUpdatedPayments,
908        ) -> Result<VecBasicPaymentV2, NodeApiError> {
909            let limit = req.limit.unwrap_or(u16::MAX);
910
911            let payments = match req.start_index {
912                Some(start_index) => self
913                    .payments
914                    .iter()
915                    .filter(|(idx, _)| &start_index < *idx)
916                    .take(limit as usize)
917                    .map(|(_idx, payment)| payment.clone())
918                    .collect(),
919                None => self
920                    .payments
921                    .iter()
922                    .take(limit as usize)
923                    .map(|(_idx, payment)| payment.clone())
924                    .collect(),
925            };
926
927            Ok(VecBasicPaymentV2 { payments })
928        }
929    }
930
931    pub(super) fn any_payments(
932        approx_size: impl Into<SizeRange>,
933    ) -> impl Strategy<Value = BTreeMap<PaymentCreatedIndex, BasicPaymentV2>>
934    {
935        proptest::collection::vec(any::<BasicPaymentV2>(), approx_size)
936            .prop_map(|payments| {
937                payments
938                    .into_iter()
939                    .map(|payment| (payment.created_index(), payment))
940                    .collect::<BTreeMap<_, _>>()
941            })
942    }
943}
944
945#[cfg(test)]
946mod test {
947    use std::{collections::HashSet, time::Duration};
948
949    use lexe_api::types::payments::PaymentStatus;
950    use lexe_crypto::rng::{FastRng, RngExt};
951    use proptest::{
952        collection::vec, prelude::any, proptest, sample::Index,
953        test_runner::Config,
954    };
955    use tempfile::tempdir;
956
957    use super::{test_utils::MockNode, *};
958    use crate::unstable::ffs::{DiskFs, test_utils::InMemoryFfs};
959
960    #[test]
961    fn read_from_empty() {
962        let mock_ffs = InMemoryFfs::new();
963        let mock_ffs_db = PaymentsDb::read(mock_ffs).unwrap();
964        assert!(mock_ffs_db.state.read().unwrap().is_empty());
965        mock_ffs_db.debug_assert_invariants();
966
967        let tempdir = tempfile::tempdir().unwrap();
968        let temp_fs =
969            DiskFs::create_dir_all(tempdir.path().to_path_buf()).unwrap();
970        let temp_fs_db = PaymentsDb::read(temp_fs).unwrap();
971        assert!(temp_fs_db.state.read().unwrap().is_empty());
972        temp_fs_db.debug_assert_invariants();
973
974        assert_eq!(
975            *mock_ffs_db.state.read().unwrap(),
976            *temp_fs_db.state.read().unwrap()
977        );
978    }
979
980    #[test]
981    fn test_upsert() {
982        fn visit_batches<T>(
983            iter: &mut impl Iterator<Item = T>,
984            batch_sizes: Vec<usize>,
985            mut f: impl FnMut(Vec<T>),
986        ) {
987            let batch_sizes = batch_sizes.into_iter();
988
989            for batch_size in batch_sizes {
990                let batch = take_n(iter, batch_size);
991                let batch_len = batch.len();
992
993                if batch_len == 0 {
994                    return;
995                }
996
997                f(batch);
998
999                if batch_len < batch_size {
1000                    return;
1001                }
1002            }
1003
1004            let batch = iter.collect::<Vec<_>>();
1005
1006            if !batch.is_empty() {
1007                f(batch);
1008            }
1009        }
1010
1011        fn take_n<T>(iter: &mut impl Iterator<Item = T>, n: usize) -> Vec<T> {
1012            let mut out = Vec::with_capacity(n);
1013
1014            while out.len() < n {
1015                match iter.next() {
1016                    Some(value) => out.push(value),
1017                    None => break,
1018                }
1019            }
1020
1021            out
1022        }
1023
1024        proptest!(
1025            Config::with_cases(10),
1026            |(
1027                rng: FastRng,
1028                payments in test_utils::any_payments(0..20),
1029                batch_sizes in vec(1_usize..20, 0..5),
1030            )| {
1031                let tempdir = tempdir().unwrap();
1032                let temp_fs = DiskFs::create_dir_all(tempdir.path().to_path_buf()).unwrap();
1033                let temp_fs_db = PaymentsDb::empty(temp_fs);
1034
1035                let mock_ffs = InMemoryFfs::from_rng(rng);
1036                let mock_ffs_db = PaymentsDb::empty(mock_ffs);
1037
1038                let mut payments_iter = payments.clone().into_values();
1039                visit_batches(&mut payments_iter, batch_sizes, |new_payment_batch| {
1040                    let _ = mock_ffs_db.upsert_payments(
1041                        new_payment_batch.clone()
1042                    ).unwrap();
1043                    let _ = temp_fs_db.upsert_payments(new_payment_batch).unwrap();
1044
1045                    mock_ffs_db.debug_assert_invariants();
1046                    temp_fs_db.debug_assert_invariants();
1047                });
1048
1049                assert_eq!(
1050                    *mock_ffs_db.state.read().unwrap(),
1051                    *temp_fs_db.state.read().unwrap()
1052                );
1053            }
1054        );
1055    }
1056
1057    #[tokio::test]
1058    async fn test_sync_empty() {
1059        let mock_node_client = MockNode::new(BTreeMap::new());
1060        let mock_ffs = InMemoryFfs::new();
1061        let db = PaymentsDb::empty(mock_ffs);
1062
1063        sync_payments(&db, &mock_node_client, 5).await.unwrap();
1064
1065        assert!(db.state.read().unwrap().is_empty());
1066        db.debug_assert_invariants();
1067    }
1068
1069    /// `last_synced_at` is set on every sync (even with no payments),
1070    /// survives a reload from disk, and is reset by `clear`.
1071    #[tokio::test]
1072    async fn test_last_synced_at() {
1073        let mock_node = MockNode::new(BTreeMap::new());
1074        let db = PaymentsDb::empty(InMemoryFfs::new());
1075
1076        // Never synced yet.
1077        assert_eq!(db.last_synced_at(), None);
1078
1079        // A sync records the local-clock sync time, even with no payments.
1080        let before = TimestampMs::now();
1081        sync_payments(&db, &mock_node, 5).await.unwrap();
1082        let after = TimestampMs::now();
1083        let synced_at = db.last_synced_at().expect("sync should set time");
1084        assert!((before..=after).contains(&synced_at));
1085        db.debug_assert_invariants();
1086
1087        // The sync time survives a reload from disk.
1088        let db = PaymentsDb::read(db.ffs).unwrap();
1089        assert_eq!(db.last_synced_at(), Some(synced_at));
1090
1091        // Clearing resets it.
1092        db.clear().unwrap();
1093        assert_eq!(db.last_synced_at(), None);
1094
1095        // Re-sync, then deleting `metadata.json` also resets it on reload.
1096        sync_payments(&db, &mock_node, 5).await.unwrap();
1097        assert!(db.last_synced_at().is_some());
1098        db.ffs.delete(METADATA_FILENAME).unwrap();
1099        let db = PaymentsDb::read(db.ffs).unwrap();
1100        assert_eq!(db.last_synced_at(), None);
1101    }
1102
1103    /// Backwards compat: pre-existing dbs with no `metadata.json` still read
1104    /// fine, with `last_synced_at` defaulting to `None`.
1105    #[test]
1106    fn test_read_without_metadata() {
1107        proptest!(Config::with_cases(8), |(payments in test_utils::any_payments(0..10))| {
1108            let ffs = InMemoryFfs::new();
1109
1110            // Write only payment files; no metadata.json (simulates old db).
1111            for payment in payments.values() {
1112                PaymentsDb::<InMemoryFfs>::write_payment(&ffs, payment).unwrap();
1113            }
1114
1115            let db = PaymentsDb::read(ffs).unwrap();
1116            assert_eq!(db.last_synced_at(), None);
1117            assert_eq!(db.num_payments(), payments.len());
1118            db.debug_assert_invariants();
1119        });
1120    }
1121
1122    #[test]
1123    fn test_sync() {
1124        /// Assert that the payments in the db equal those in the mock node.
1125        fn assert_db_payments_eq(
1126            db_payments: &BTreeMap<PaymentCreatedIndex, BasicPaymentV2>,
1127            node_payments: &BTreeMap<PaymentUpdatedIndex, BasicPaymentV2>,
1128        ) {
1129            assert_eq!(db_payments.len(), node_payments.len());
1130            db_payments.iter().for_each(|(_created_idx, payment)| {
1131                let node_payment =
1132                    node_payments.get(&payment.updated_index()).unwrap();
1133                assert_eq!(payment, node_payment);
1134            });
1135            node_payments.iter().for_each(|(_updated_idx, payment)| {
1136                let db_payment =
1137                    db_payments.get(&payment.created_index()).unwrap();
1138                assert_eq!(payment, db_payment);
1139            });
1140        }
1141
1142        let rt = tokio::runtime::Builder::new_current_thread()
1143            .build()
1144            .unwrap();
1145
1146        proptest!(
1147            Config::with_cases(4),
1148            |(
1149                mut rng: FastRng,
1150                payments in test_utils::any_payments(1..20),
1151                req_batch_size in 1_u16..5,
1152                finalize_indexes in
1153                    proptest::collection::vec(any::<Index>(), 1..5),
1154            )| {
1155                let mut mock_node = MockNode::from_payments(payments);
1156
1157                let mut rng2 = FastRng::from_u64(rng.gen_u64());
1158                let mock_ffs = InMemoryFfs::from_rng(rng);
1159
1160                // Sync empty DB from node
1161                let db = PaymentsDb::empty(mock_ffs);
1162                rt.block_on(sync_payments(&db, &mock_node, req_batch_size))
1163                    .unwrap();
1164                assert_db_payments_eq(
1165                    &db.state.read().unwrap().payments,
1166                    &mock_node.payments,
1167                );
1168                db.debug_assert_invariants();
1169
1170                // Reread db from ffs and resync - should still match node
1171                let mock_ffs = db.ffs;
1172                let db = PaymentsDb::read(mock_ffs).unwrap();
1173                rt.block_on(sync_payments(&db, &mock_node, req_batch_size))
1174                    .unwrap();
1175                assert_db_payments_eq(
1176                    &db.state.read().unwrap().payments,
1177                    &mock_node.payments,
1178                );
1179                db.debug_assert_invariants();
1180
1181                // Finalize some payments
1182                let finalize_some_payments = || {
1183                    let pending_payments = mock_node
1184                        .payments
1185                        .values()
1186                        .filter(|p| p.is_pending())
1187                        .cloned()
1188                        .collect::<Vec<_>>();
1189
1190                    if pending_payments.is_empty() {
1191                        return;
1192                    }
1193
1194                    // Simulates the current time
1195                    //
1196                    // We bump it before every update to ensure finalized
1197                    // payments have a later updated_at than in our DB
1198                    let mut current_time = db
1199                        .state
1200                        .read()
1201                        .unwrap()
1202                        .latest_updated_index
1203                        .expect("DB should have payments")
1204                        .updated_at;
1205
1206                    // The array indices of the payments inside
1207                    // `pending_payments` to finalize
1208                    let finalize_idxs = finalize_indexes
1209                        .into_iter()
1210                        .map(|index| index.index(pending_payments.len()))
1211                        // Collect into HashSet to dedup without sorting
1212                        .collect::<HashSet<_>>();
1213
1214                    for finalize_idx in finalize_idxs {
1215                        // The updated_at index of the payment to finalize
1216                        let final_updated_idx =
1217                            pending_payments[finalize_idx].updated_index();
1218
1219                        // Remove the payment to finalize from map
1220                        let mut payment = mock_node
1221                            .payments
1222                            .remove(&final_updated_idx)
1223                            .unwrap();
1224
1225                        // The finalized status to set for this payment
1226                        let new_status = if rng2.gen_boolean() {
1227                            PaymentStatus::Completed
1228                        } else {
1229                            PaymentStatus::Failed
1230                        };
1231
1232                        // Bump the current time so new updated_at is fresh
1233                        let bump_u64 = u64::from(rng2.gen_range_u32(1..11));
1234                        let bump_dur = Duration::from_millis(bump_u64);
1235                        current_time = current_time.saturating_add(bump_dur);
1236
1237                        // Update payment
1238                        payment.status = new_status;
1239                        payment.updated_at = current_time;
1240
1241                        // Re-insert with new updated_at as the key
1242                        let new_updated_index = payment.updated_index();
1243                        mock_node
1244                            .payments
1245                            .insert(new_updated_index, payment);
1246                        }
1247                };
1248
1249                finalize_some_payments();
1250
1251                // resync -- should pick up the finalized payments
1252                rt.block_on(sync_payments(&db, &mock_node, req_batch_size))
1253                    .unwrap();
1254
1255                assert_db_payments_eq(
1256                    &db.state.read().unwrap().payments,
1257                    &mock_node.payments,
1258                );
1259                db.debug_assert_invariants();
1260            }
1261        );
1262    }
1263}