1mod docs {}
46
47use std::{
48 cmp,
49 collections::{BTreeMap, BTreeSet},
50 io,
51 ops::Bound,
52 str::FromStr,
53 sync::RwLock,
54};
55
56use anyhow::Context;
57#[cfg(doc)]
58use lexe_api::types::payments::VecDbPaymentV2;
59use lexe_api::{
60 def::AppNodeRunApi,
61 error::NodeApiError,
62 models::command,
63 types::payments::{
64 BasicPaymentV2, PaymentCreatedIndex, PaymentStatus,
65 PaymentUpdatedIndex, VecBasicPaymentV2,
66 },
67};
68use lexe_common::time::TimestampMs;
69use lexe_node_client::client::NodeClient;
70use serde::{Deserialize, Serialize};
71use tracing::warn;
72
73use crate::{
74 types::{
75 command::PaymentSyncSummary,
76 payment::{Order, PaymentFilter},
77 },
78 unstable::ffs::Ffs,
79};
80
81pub struct PaymentsDb<F> {
83 ffs: F,
84 state: RwLock<PaymentsDbState>,
85}
86
87#[derive(Debug, PartialEq)]
89struct PaymentsDbState {
90 payments: BTreeMap<PaymentCreatedIndex, BasicPaymentV2>,
93
94 pending: BTreeSet<PaymentCreatedIndex>,
100
101 latest_updated_index: Option<PaymentUpdatedIndex>,
111
112 last_synced_at: Option<TimestampMs>,
123}
124
125#[derive(Debug, Default, PartialEq, Serialize, Deserialize)]
128struct PaymentsDbMetadata {
129 last_synced_at: Option<TimestampMs>,
131}
132
133const METADATA_FILENAME: &str = "metadata.json";
136
137#[allow(private_bounds)]
144pub(crate) async fn sync_payments<F: Ffs>(
145 db: &PaymentsDb<F>,
146 node_client: &impl AppNodeRunSyncApi,
147 batch_size: u16,
148) -> anyhow::Result<PaymentSyncSummary> {
149 assert!(batch_size > 0);
150
151 let mut start_index = db.state.read().unwrap().latest_updated_index;
152
153 let mut summary = PaymentSyncSummary {
154 num_new: 0,
155 num_updated: 0,
156 };
157
158 loop {
159 let req = command::GetUpdatedPayments {
162 start_index,
165 limit: Some(batch_size),
166 };
167
168 let updated_payments = node_client
169 .get_updated_payments(req)
170 .await
171 .context("Failed to fetch updated payments")?
172 .payments;
173 let updated_payments_len = updated_payments.len();
174
175 let (new, updated, latest_updated_index) = db
177 .upsert_payments(updated_payments)
178 .context("Failed to upsert payments")?;
179 summary.num_new += new;
180 summary.num_updated += updated;
181
182 start_index = latest_updated_index;
184
185 if updated_payments_len < usize::from(batch_size) {
188 break;
189 }
190 }
191
192 db.record_synced_at(TimestampMs::now())
195 .context("Failed to persist sync timestamp")?;
196
197 Ok(summary)
198}
199
200trait AppNodeRunSyncApi {
205 async fn get_updated_payments(
208 &self,
209 req: command::GetUpdatedPayments,
210 ) -> Result<VecBasicPaymentV2, NodeApiError>;
211}
212
213impl AppNodeRunSyncApi for NodeClient {
214 async fn get_updated_payments(
215 &self,
216 req: command::GetUpdatedPayments,
217 ) -> Result<VecBasicPaymentV2, NodeApiError> {
218 AppNodeRunApi::get_updated_payments(self, req).await
219 }
220}
221
222impl<F: Ffs> PaymentsDb<F> {
223 pub(crate) fn read(ffs: F) -> anyhow::Result<Self> {
227 let state = PaymentsDbState::read(&ffs)
228 .map(RwLock::new)
229 .context("Failed to read on-disk PaymentsDb state")?;
230
231 Ok(Self { ffs, state })
232 }
233
234 pub(crate) fn empty(ffs: F) -> Self {
236 let state = RwLock::new(PaymentsDbState::empty());
237 Self { ffs, state }
238 }
239
240 pub fn clear(&self) -> io::Result<()> {
244 *self.state.write().unwrap() = PaymentsDbState::empty();
245 self.ffs.delete_all()
246 }
247
248 pub fn num_payments(&self) -> usize {
250 self.state.read().unwrap().num_payments()
251 }
252
253 pub fn num_pending(&self) -> usize {
255 self.state.read().unwrap().num_pending()
256 }
257
258 #[cfg_attr(not(feature = "unstable"), allow(dead_code))]
260 pub fn num_finalized(&self) -> usize {
261 self.state.read().unwrap().num_finalized()
262 }
263
264 #[cfg_attr(not(feature = "unstable"), allow(dead_code))]
266 pub fn num_pending_not_junk(&self) -> usize {
267 self.state.read().unwrap().num_pending_not_junk()
268 }
269
270 #[cfg_attr(not(feature = "unstable"), allow(dead_code))]
272 pub fn num_finalized_not_junk(&self) -> usize {
273 self.state.read().unwrap().num_finalized_not_junk()
274 }
275
276 pub fn latest_updated_index(&self) -> Option<PaymentUpdatedIndex> {
278 self.state.read().unwrap().latest_updated_index()
279 }
280
281 pub fn last_synced_at(&self) -> Option<TimestampMs> {
291 self.state.read().unwrap().last_synced_at
292 }
293
294 pub fn get_payment_by_created_index(
296 &self,
297 created_index: &PaymentCreatedIndex,
298 ) -> Option<BasicPaymentV2> {
299 self.state
300 .read()
301 .unwrap()
302 .get_payment_by_created_index(created_index)
303 .cloned()
304 }
305
306 #[cfg_attr(not(feature = "unstable"), allow(dead_code))]
308 pub fn get_payment_by_scroll_idx(
309 &self,
310 scroll_idx: usize,
311 ) -> Option<BasicPaymentV2> {
312 self.state
313 .read()
314 .unwrap()
315 .get_payment_by_scroll_idx(scroll_idx)
316 .cloned()
317 }
318
319 #[cfg_attr(not(feature = "unstable"), allow(dead_code))]
321 pub fn get_pending_payment_by_scroll_idx(
322 &self,
323 scroll_idx: usize,
324 ) -> Option<BasicPaymentV2> {
325 self.state
326 .read()
327 .unwrap()
328 .get_pending_payment_by_scroll_idx(scroll_idx)
329 .cloned()
330 }
331
332 #[cfg_attr(not(feature = "unstable"), allow(dead_code))]
334 pub fn get_pending_not_junk_payment_by_scroll_idx(
335 &self,
336 scroll_idx: usize,
337 ) -> Option<BasicPaymentV2> {
338 self.state
339 .read()
340 .unwrap()
341 .get_pending_not_junk_payment_by_scroll_idx(scroll_idx)
342 .cloned()
343 }
344
345 #[cfg_attr(not(feature = "unstable"), allow(dead_code))]
347 pub fn get_finalized_payment_by_scroll_idx(
348 &self,
349 scroll_idx: usize,
350 ) -> Option<BasicPaymentV2> {
351 self.state
352 .read()
353 .unwrap()
354 .get_finalized_payment_by_scroll_idx(scroll_idx)
355 .cloned()
356 }
357
358 #[cfg_attr(not(feature = "unstable"), allow(dead_code))]
360 pub fn get_finalized_not_junk_payment_by_scroll_idx(
361 &self,
362 scroll_idx: usize,
363 ) -> Option<BasicPaymentV2> {
364 self.state
365 .read()
366 .unwrap()
367 .get_finalized_not_junk_payment_by_scroll_idx(scroll_idx)
368 .cloned()
369 }
370
371 pub fn list_payments(
376 &self,
377 filter: &PaymentFilter,
378 order: Order,
379 limit: usize,
380 after: Option<&PaymentCreatedIndex>,
381 ) -> (Vec<BasicPaymentV2>, Option<PaymentCreatedIndex>) {
382 self.state
383 .read()
384 .unwrap()
385 .list_payments(filter, order, limit, after)
386 }
387
388 fn upsert_payments(
395 &self,
396 payments: impl IntoIterator<Item = BasicPaymentV2>,
397 ) -> io::Result<(usize, usize, Option<PaymentUpdatedIndex>)> {
398 let mut state = self.state.write().unwrap();
399
400 let mut num_new = 0;
401 let mut num_updated = 0;
402 for payment in payments {
403 let (new, updated) =
404 Self::upsert_payment(&self.ffs, &mut state, payment)?;
405 num_new += new;
406 num_updated += updated;
407 }
408
409 Ok((num_new, num_updated, state.latest_updated_index))
410 }
411
412 fn upsert_payment(
419 ffs: &F,
420 state: &mut PaymentsDbState,
421 payment: BasicPaymentV2,
422 ) -> io::Result<(usize, usize)> {
423 let created_index = payment.created_index();
424
425 let maybe_existing = state.payments.get(&created_index);
426 let already_existed = maybe_existing.is_some();
427
428 if let Some(existing) = maybe_existing
430 && payment == *existing
431 {
432 return Ok((0, 0));
433 }
434
435 Self::write_payment(ffs, &payment)?;
438
439 if payment.is_pending() {
443 state.pending.insert(created_index);
444 } else {
445 state.pending.remove(&created_index);
446 }
447 state.latest_updated_index =
449 cmp::max(state.latest_updated_index, Some(payment.updated_index()));
450
451 state.payments.insert(created_index, payment);
453
454 if already_existed {
455 Ok((0, 1))
456 } else {
457 Ok((1, 0))
458 }
459 }
460
461 fn write_payment(ffs: &F, payment: &BasicPaymentV2) -> io::Result<()> {
465 let filename = payment.created_index().to_string();
466 let data =
467 serde_json::to_vec(&payment).expect("Failed to serialize payment");
468 ffs.write(&filename, &data)
469 }
470
471 fn record_synced_at(&self, now: TimestampMs) -> io::Result<()> {
475 let metadata = PaymentsDbMetadata {
476 last_synced_at: Some(now),
477 };
478 let data = serde_json::to_vec(&metadata)
479 .expect("Failed to serialize metadata");
480 self.ffs.write(METADATA_FILENAME, &data)?;
481
482 self.state.write().unwrap().last_synced_at = Some(now);
483 Ok(())
484 }
485
486 pub(crate) fn update_personal_note(
490 &self,
491 req: command::UpdatePersonalNote,
492 ) -> anyhow::Result<()> {
493 let mut state = self.state.write().unwrap();
494
495 let payment = state
496 .get_mut_payment_by_created_index(&req.index)
497 .context("Updating non-existent payment")?;
498
499 payment.personal_note = req.personal_note.map(|n| n.into_inner());
500
501 Self::write_payment(&self.ffs, payment)
502 .context("Failed to write payment to local db")?;
503
504 Ok(())
505 }
506
507 #[cfg(test)]
512 fn debug_assert_invariants(&self) {
513 if cfg!(not(debug_assertions)) {
514 return;
515 }
516
517 let state = self.state.read().unwrap();
518
519 state.debug_assert_invariants();
521
522 let on_disk_state = PaymentsDbState::read(&self.ffs)
524 .expect("Failed to re-read on-disk state");
525 assert_eq!(on_disk_state, *state);
526 }
527}
528
529impl PaymentsDbState {
530 fn empty() -> Self {
532 Self {
533 payments: BTreeMap::new(),
534 pending: BTreeSet::new(),
535 latest_updated_index: None,
536 last_synced_at: None,
537 }
538 }
539
540 fn read(ffs: &impl Ffs) -> anyhow::Result<Self> {
542 let mut buf = Vec::<u8>::new();
543 let mut payments = Vec::<BasicPaymentV2>::new();
544 let mut last_synced_at = None;
545
546 ffs.read_dir_visitor(|filename| {
547 if filename == METADATA_FILENAME {
549 buf.clear();
550 ffs.read_into(filename, &mut buf)?;
551 let metadata =
552 serde_json::from_slice::<PaymentsDbMetadata>(&buf)
553 .context(METADATA_FILENAME)
554 .context("Failed to deserialize payments db metadata")
555 .map_err(io_error_invalid_data)?;
556 last_synced_at = metadata.last_synced_at;
557 return Ok(());
558 }
559
560 let created_index = match PaymentCreatedIndex::from_str(filename) {
562 Ok(idx) => idx,
563 Err(e) => {
564 warn!(
565 %filename,
566 "Error: unrecognized filename in payments dir: {e:#}"
567 );
568 return Ok(());
569 }
570 };
571
572 buf.clear();
574 ffs.read_into(filename, &mut buf)?;
575
576 let payment = serde_json::from_slice::<BasicPaymentV2>(&buf)
578 .with_context(|| filename.to_owned())
579 .context("Failed to deserialize payment file")
580 .map_err(io_error_invalid_data)?;
581
582 let payment_created_index = payment.created_index();
584 if created_index != payment_created_index {
585 return Err(io_error_invalid_data(format!(
586 "Payment DB corruption: filename index '{filename}'
587 different from index in contents '{payment_created_index}'"
588 )));
589 }
590
591 payments.push(payment);
593
594 Ok(())
595 })
596 .context("Failed to read payments db, possibly corrupted?")?;
597
598 Ok(Self::from_vec(payments, last_synced_at))
599 }
600
601 fn from_vec(
602 payments: Vec<BasicPaymentV2>,
603 last_synced_at: Option<TimestampMs>,
604 ) -> Self {
605 let payments = payments
606 .into_iter()
607 .map(|p| (p.created_index(), p))
608 .collect();
609
610 let pending = build_index::pending(&payments);
611 let latest_updated_index = build_index::latest_updated_index(&payments);
612
613 Self {
614 payments,
615 pending,
616 latest_updated_index,
617 last_synced_at,
618 }
619 }
620
621 fn num_payments(&self) -> usize {
622 self.payments.len()
623 }
624
625 fn num_pending(&self) -> usize {
626 self.pending.len()
627 }
628
629 #[cfg_attr(not(feature = "unstable"), allow(dead_code))]
630 fn num_finalized(&self) -> usize {
631 self.payments.len() - self.pending.len()
632 }
633
634 #[cfg_attr(not(feature = "unstable"), allow(dead_code))]
635 fn num_pending_not_junk(&self) -> usize {
636 self.pending
637 .iter()
638 .filter_map(|created_idx| self.payments.get(created_idx))
639 .filter(|p| p.is_pending_not_junk())
640 .count()
641 }
642
643 #[cfg_attr(not(feature = "unstable"), allow(dead_code))]
645 fn num_finalized_not_junk(&self) -> usize {
646 self.payments
647 .values()
648 .filter(|p| p.is_finalized_not_junk())
649 .count()
650 }
651
652 fn latest_updated_index(&self) -> Option<PaymentUpdatedIndex> {
653 self.latest_updated_index
654 }
655
656 fn get_payment_by_created_index(
657 &self,
658 created_index: &PaymentCreatedIndex,
659 ) -> Option<&BasicPaymentV2> {
660 self.payments.get(created_index)
661 }
662
663 fn get_mut_payment_by_created_index(
665 &mut self,
666 created_index: &PaymentCreatedIndex,
667 ) -> Option<&mut BasicPaymentV2> {
668 self.payments.get_mut(created_index)
669 }
670
671 #[cfg_attr(not(feature = "unstable"), allow(dead_code))]
672 fn get_payment_by_scroll_idx(
673 &self,
674 scroll_idx: usize,
675 ) -> Option<&BasicPaymentV2> {
676 self.payments.values().nth_back(scroll_idx)
677 }
678
679 #[cfg_attr(not(feature = "unstable"), allow(dead_code))]
680 fn get_pending_payment_by_scroll_idx(
681 &self,
682 scroll_idx: usize,
683 ) -> Option<&BasicPaymentV2> {
684 self.pending
685 .iter()
686 .nth_back(scroll_idx)
687 .and_then(|created_idx| self.payments.get(created_idx))
688 }
689
690 #[cfg_attr(not(feature = "unstable"), allow(dead_code))]
691 fn get_pending_not_junk_payment_by_scroll_idx(
692 &self,
693 scroll_idx: usize,
694 ) -> Option<&BasicPaymentV2> {
695 self.pending
696 .iter()
697 .rev()
698 .filter_map(|created_idx| self.payments.get(created_idx))
699 .filter(|p| p.is_pending_not_junk())
700 .nth_back(scroll_idx)
701 }
702
703 #[cfg_attr(not(feature = "unstable"), allow(dead_code))]
704 fn get_finalized_payment_by_scroll_idx(
705 &self,
706 scroll_idx: usize,
707 ) -> Option<&BasicPaymentV2> {
708 self.payments
709 .values()
710 .filter(|p| p.is_finalized())
711 .nth_back(scroll_idx)
712 }
713
714 #[cfg_attr(not(feature = "unstable"), allow(dead_code))]
715 fn get_finalized_not_junk_payment_by_scroll_idx(
716 &self,
717 scroll_idx: usize,
718 ) -> Option<&BasicPaymentV2> {
719 self.payments
720 .values()
721 .filter(|p| p.is_finalized_not_junk())
722 .nth_back(scroll_idx)
723 }
724
725 fn list_payments(
729 &self,
730 filter: &PaymentFilter,
731 order: Order,
732 limit: usize,
733 after: Option<&PaymentCreatedIndex>,
734 ) -> (Vec<BasicPaymentV2>, Option<PaymentCreatedIndex>) {
735 if limit == 0 {
736 return (Vec::new(), None);
737 }
738
739 let matches_filter = |p: &&BasicPaymentV2| match filter {
740 PaymentFilter::All => true,
741 PaymentFilter::Pending => p.status == PaymentStatus::Pending,
742 PaymentFilter::Completed => p.status == PaymentStatus::Completed,
743 PaymentFilter::Failed => p.status == PaymentStatus::Failed,
744 PaymentFilter::Finalized => p.status != PaymentStatus::Pending,
745 };
746
747 let take = limit.saturating_add(1);
749
750 let mut payments: Vec<BasicPaymentV2> = match (order, after) {
751 (Order::Asc, Some(c)) => self
752 .payments
753 .range((Bound::Excluded(c), Bound::Unbounded))
754 .map(|(_, p)| p)
755 .filter(matches_filter)
756 .take(take)
757 .cloned()
758 .collect(),
759 (Order::Asc, None) => self
760 .payments
761 .values()
762 .filter(matches_filter)
763 .take(take)
764 .cloned()
765 .collect(),
766 (Order::Desc, Some(c)) => self
767 .payments
768 .range(..c)
769 .rev()
770 .map(|(_, p)| p)
771 .filter(matches_filter)
772 .take(take)
773 .cloned()
774 .collect(),
775 (Order::Desc, None) => self
776 .payments
777 .values()
778 .rev()
779 .filter(matches_filter)
780 .take(take)
781 .cloned()
782 .collect(),
783 };
784
785 let has_more = payments.len() > limit;
787 if has_more {
788 payments.truncate(limit);
789 }
790 let next_index = has_more.then(|| {
791 payments
792 .last()
793 .expect("has_more implies non-empty")
794 .created_index()
795 });
796
797 (payments, next_index)
798 }
799
800 #[cfg(test)]
801 fn is_empty(&self) -> bool {
802 self.payments.is_empty()
803 }
804
805 #[cfg(test)]
807 fn debug_assert_invariants(&self) {
808 if cfg!(not(debug_assertions)) {
809 return;
810 }
811
812 for (idx, payment) in &self.payments {
816 assert_eq!(*idx, payment.created_index());
817 }
818
819 let rebuilt_pending_index = build_index::pending(&self.payments);
823 assert_eq!(rebuilt_pending_index, self.pending);
824
825 self.payments
828 .values()
829 .filter(|p| p.is_pending())
830 .all(|p| self.pending.contains(&p.created_index()));
831
832 let recomputed_latest_updated_index =
835 build_index::latest_updated_index(&self.payments);
836 assert_eq!(recomputed_latest_updated_index, self.latest_updated_index);
837 }
838}
839
840mod build_index {
841 use super::*;
842
843 pub(super) fn pending(
845 payments: &BTreeMap<PaymentCreatedIndex, BasicPaymentV2>,
846 ) -> BTreeSet<PaymentCreatedIndex> {
847 payments
848 .iter()
849 .filter(|(_, p)| p.is_pending())
850 .map(|(idx, _)| *idx)
851 .collect()
852 }
853
854 pub(super) fn latest_updated_index(
856 payments: &BTreeMap<PaymentCreatedIndex, BasicPaymentV2>,
857 ) -> Option<PaymentUpdatedIndex> {
858 payments.values().map(BasicPaymentV2::updated_index).max()
859 }
860}
861
862fn io_error_invalid_data(
864 error: impl Into<Box<dyn std::error::Error + Send + Sync>>,
865) -> io::Error {
866 io::Error::new(io::ErrorKind::InvalidData, error)
867}
868
869#[cfg(test)]
870mod test_utils {
871 use proptest::{
872 prelude::{Strategy, any},
873 sample::SizeRange,
874 };
875
876 use super::*;
877
878 pub(super) struct MockNode {
879 pub payments: BTreeMap<PaymentUpdatedIndex, BasicPaymentV2>,
880 }
881
882 impl MockNode {
883 pub(super) fn new(
885 payments: BTreeMap<PaymentUpdatedIndex, BasicPaymentV2>,
886 ) -> Self {
887 Self { payments }
888 }
889
890 pub(super) fn from_payments(
892 payments: BTreeMap<PaymentCreatedIndex, BasicPaymentV2>,
893 ) -> Self {
894 let payments = payments
895 .into_values()
896 .map(|p| (p.updated_index(), p))
897 .collect();
898 Self { payments }
899 }
900 }
901
902 impl AppNodeRunSyncApi for MockNode {
903 async fn get_updated_payments(
906 &self,
907 req: command::GetUpdatedPayments,
908 ) -> Result<VecBasicPaymentV2, NodeApiError> {
909 let limit = req.limit.unwrap_or(u16::MAX);
910
911 let payments = match req.start_index {
912 Some(start_index) => self
913 .payments
914 .iter()
915 .filter(|(idx, _)| &start_index < *idx)
916 .take(limit as usize)
917 .map(|(_idx, payment)| payment.clone())
918 .collect(),
919 None => self
920 .payments
921 .iter()
922 .take(limit as usize)
923 .map(|(_idx, payment)| payment.clone())
924 .collect(),
925 };
926
927 Ok(VecBasicPaymentV2 { payments })
928 }
929 }
930
931 pub(super) fn any_payments(
932 approx_size: impl Into<SizeRange>,
933 ) -> impl Strategy<Value = BTreeMap<PaymentCreatedIndex, BasicPaymentV2>>
934 {
935 proptest::collection::vec(any::<BasicPaymentV2>(), approx_size)
936 .prop_map(|payments| {
937 payments
938 .into_iter()
939 .map(|payment| (payment.created_index(), payment))
940 .collect::<BTreeMap<_, _>>()
941 })
942 }
943}
944
945#[cfg(test)]
946mod test {
947 use std::{collections::HashSet, time::Duration};
948
949 use lexe_api::types::payments::PaymentStatus;
950 use lexe_crypto::rng::{FastRng, RngExt};
951 use proptest::{
952 collection::vec, prelude::any, proptest, sample::Index,
953 test_runner::Config,
954 };
955 use tempfile::tempdir;
956
957 use super::{test_utils::MockNode, *};
958 use crate::unstable::ffs::{DiskFs, test_utils::InMemoryFfs};
959
960 #[test]
961 fn read_from_empty() {
962 let mock_ffs = InMemoryFfs::new();
963 let mock_ffs_db = PaymentsDb::read(mock_ffs).unwrap();
964 assert!(mock_ffs_db.state.read().unwrap().is_empty());
965 mock_ffs_db.debug_assert_invariants();
966
967 let tempdir = tempfile::tempdir().unwrap();
968 let temp_fs =
969 DiskFs::create_dir_all(tempdir.path().to_path_buf()).unwrap();
970 let temp_fs_db = PaymentsDb::read(temp_fs).unwrap();
971 assert!(temp_fs_db.state.read().unwrap().is_empty());
972 temp_fs_db.debug_assert_invariants();
973
974 assert_eq!(
975 *mock_ffs_db.state.read().unwrap(),
976 *temp_fs_db.state.read().unwrap()
977 );
978 }
979
980 #[test]
981 fn test_upsert() {
982 fn visit_batches<T>(
983 iter: &mut impl Iterator<Item = T>,
984 batch_sizes: Vec<usize>,
985 mut f: impl FnMut(Vec<T>),
986 ) {
987 let batch_sizes = batch_sizes.into_iter();
988
989 for batch_size in batch_sizes {
990 let batch = take_n(iter, batch_size);
991 let batch_len = batch.len();
992
993 if batch_len == 0 {
994 return;
995 }
996
997 f(batch);
998
999 if batch_len < batch_size {
1000 return;
1001 }
1002 }
1003
1004 let batch = iter.collect::<Vec<_>>();
1005
1006 if !batch.is_empty() {
1007 f(batch);
1008 }
1009 }
1010
1011 fn take_n<T>(iter: &mut impl Iterator<Item = T>, n: usize) -> Vec<T> {
1012 let mut out = Vec::with_capacity(n);
1013
1014 while out.len() < n {
1015 match iter.next() {
1016 Some(value) => out.push(value),
1017 None => break,
1018 }
1019 }
1020
1021 out
1022 }
1023
1024 proptest!(
1025 Config::with_cases(10),
1026 |(
1027 rng: FastRng,
1028 payments in test_utils::any_payments(0..20),
1029 batch_sizes in vec(1_usize..20, 0..5),
1030 )| {
1031 let tempdir = tempdir().unwrap();
1032 let temp_fs = DiskFs::create_dir_all(tempdir.path().to_path_buf()).unwrap();
1033 let temp_fs_db = PaymentsDb::empty(temp_fs);
1034
1035 let mock_ffs = InMemoryFfs::from_rng(rng);
1036 let mock_ffs_db = PaymentsDb::empty(mock_ffs);
1037
1038 let mut payments_iter = payments.clone().into_values();
1039 visit_batches(&mut payments_iter, batch_sizes, |new_payment_batch| {
1040 let _ = mock_ffs_db.upsert_payments(
1041 new_payment_batch.clone()
1042 ).unwrap();
1043 let _ = temp_fs_db.upsert_payments(new_payment_batch).unwrap();
1044
1045 mock_ffs_db.debug_assert_invariants();
1046 temp_fs_db.debug_assert_invariants();
1047 });
1048
1049 assert_eq!(
1050 *mock_ffs_db.state.read().unwrap(),
1051 *temp_fs_db.state.read().unwrap()
1052 );
1053 }
1054 );
1055 }
1056
1057 #[tokio::test]
1058 async fn test_sync_empty() {
1059 let mock_node_client = MockNode::new(BTreeMap::new());
1060 let mock_ffs = InMemoryFfs::new();
1061 let db = PaymentsDb::empty(mock_ffs);
1062
1063 sync_payments(&db, &mock_node_client, 5).await.unwrap();
1064
1065 assert!(db.state.read().unwrap().is_empty());
1066 db.debug_assert_invariants();
1067 }
1068
1069 #[tokio::test]
1072 async fn test_last_synced_at() {
1073 let mock_node = MockNode::new(BTreeMap::new());
1074 let db = PaymentsDb::empty(InMemoryFfs::new());
1075
1076 assert_eq!(db.last_synced_at(), None);
1078
1079 let before = TimestampMs::now();
1081 sync_payments(&db, &mock_node, 5).await.unwrap();
1082 let after = TimestampMs::now();
1083 let synced_at = db.last_synced_at().expect("sync should set time");
1084 assert!((before..=after).contains(&synced_at));
1085 db.debug_assert_invariants();
1086
1087 let db = PaymentsDb::read(db.ffs).unwrap();
1089 assert_eq!(db.last_synced_at(), Some(synced_at));
1090
1091 db.clear().unwrap();
1093 assert_eq!(db.last_synced_at(), None);
1094
1095 sync_payments(&db, &mock_node, 5).await.unwrap();
1097 assert!(db.last_synced_at().is_some());
1098 db.ffs.delete(METADATA_FILENAME).unwrap();
1099 let db = PaymentsDb::read(db.ffs).unwrap();
1100 assert_eq!(db.last_synced_at(), None);
1101 }
1102
1103 #[test]
1106 fn test_read_without_metadata() {
1107 proptest!(Config::with_cases(8), |(payments in test_utils::any_payments(0..10))| {
1108 let ffs = InMemoryFfs::new();
1109
1110 for payment in payments.values() {
1112 PaymentsDb::<InMemoryFfs>::write_payment(&ffs, payment).unwrap();
1113 }
1114
1115 let db = PaymentsDb::read(ffs).unwrap();
1116 assert_eq!(db.last_synced_at(), None);
1117 assert_eq!(db.num_payments(), payments.len());
1118 db.debug_assert_invariants();
1119 });
1120 }
1121
1122 #[test]
1123 fn test_sync() {
1124 fn assert_db_payments_eq(
1126 db_payments: &BTreeMap<PaymentCreatedIndex, BasicPaymentV2>,
1127 node_payments: &BTreeMap<PaymentUpdatedIndex, BasicPaymentV2>,
1128 ) {
1129 assert_eq!(db_payments.len(), node_payments.len());
1130 db_payments.iter().for_each(|(_created_idx, payment)| {
1131 let node_payment =
1132 node_payments.get(&payment.updated_index()).unwrap();
1133 assert_eq!(payment, node_payment);
1134 });
1135 node_payments.iter().for_each(|(_updated_idx, payment)| {
1136 let db_payment =
1137 db_payments.get(&payment.created_index()).unwrap();
1138 assert_eq!(payment, db_payment);
1139 });
1140 }
1141
1142 let rt = tokio::runtime::Builder::new_current_thread()
1143 .build()
1144 .unwrap();
1145
1146 proptest!(
1147 Config::with_cases(4),
1148 |(
1149 mut rng: FastRng,
1150 payments in test_utils::any_payments(1..20),
1151 req_batch_size in 1_u16..5,
1152 finalize_indexes in
1153 proptest::collection::vec(any::<Index>(), 1..5),
1154 )| {
1155 let mut mock_node = MockNode::from_payments(payments);
1156
1157 let mut rng2 = FastRng::from_u64(rng.gen_u64());
1158 let mock_ffs = InMemoryFfs::from_rng(rng);
1159
1160 let db = PaymentsDb::empty(mock_ffs);
1162 rt.block_on(sync_payments(&db, &mock_node, req_batch_size))
1163 .unwrap();
1164 assert_db_payments_eq(
1165 &db.state.read().unwrap().payments,
1166 &mock_node.payments,
1167 );
1168 db.debug_assert_invariants();
1169
1170 let mock_ffs = db.ffs;
1172 let db = PaymentsDb::read(mock_ffs).unwrap();
1173 rt.block_on(sync_payments(&db, &mock_node, req_batch_size))
1174 .unwrap();
1175 assert_db_payments_eq(
1176 &db.state.read().unwrap().payments,
1177 &mock_node.payments,
1178 );
1179 db.debug_assert_invariants();
1180
1181 let finalize_some_payments = || {
1183 let pending_payments = mock_node
1184 .payments
1185 .values()
1186 .filter(|p| p.is_pending())
1187 .cloned()
1188 .collect::<Vec<_>>();
1189
1190 if pending_payments.is_empty() {
1191 return;
1192 }
1193
1194 let mut current_time = db
1199 .state
1200 .read()
1201 .unwrap()
1202 .latest_updated_index
1203 .expect("DB should have payments")
1204 .updated_at;
1205
1206 let finalize_idxs = finalize_indexes
1209 .into_iter()
1210 .map(|index| index.index(pending_payments.len()))
1211 .collect::<HashSet<_>>();
1213
1214 for finalize_idx in finalize_idxs {
1215 let final_updated_idx =
1217 pending_payments[finalize_idx].updated_index();
1218
1219 let mut payment = mock_node
1221 .payments
1222 .remove(&final_updated_idx)
1223 .unwrap();
1224
1225 let new_status = if rng2.gen_boolean() {
1227 PaymentStatus::Completed
1228 } else {
1229 PaymentStatus::Failed
1230 };
1231
1232 let bump_u64 = u64::from(rng2.gen_range_u32(1..11));
1234 let bump_dur = Duration::from_millis(bump_u64);
1235 current_time = current_time.saturating_add(bump_dur);
1236
1237 payment.status = new_status;
1239 payment.updated_at = current_time;
1240
1241 let new_updated_index = payment.updated_index();
1243 mock_node
1244 .payments
1245 .insert(new_updated_index, payment);
1246 }
1247 };
1248
1249 finalize_some_payments();
1250
1251 rt.block_on(sync_payments(&db, &mock_node, req_batch_size))
1253 .unwrap();
1254
1255 assert_db_payments_eq(
1256 &db.state.read().unwrap().payments,
1257 &mock_node.payments,
1258 );
1259 db.debug_assert_invariants();
1260 }
1261 );
1262 }
1263}