1mod docs {}
46
47use std::{
48 cmp,
49 collections::{BTreeMap, BTreeSet},
50 io,
51 str::FromStr,
52 sync::RwLock,
53};
54
55use anyhow::Context;
56#[cfg(doc)]
57use lexe_api::types::payments::VecDbPaymentV2;
58use lexe_api::{
59 def::AppNodeRunApi,
60 error::NodeApiError,
61 models::command::{GetUpdatedPayments, UpdatePaymentNote},
62 types::payments::{
63 BasicPaymentV2, PaymentCreatedIndex, PaymentUpdatedIndex,
64 VecBasicPaymentV2,
65 },
66};
67use node_client::client::NodeClient;
68use tracing::warn;
69
70use crate::unstable::ffs::Ffs;
71
72pub struct PaymentsDb<F> {
74 ffs: F,
75 state: RwLock<PaymentsDbState>,
76}
77
78#[derive(Debug, PartialEq)]
80struct PaymentsDbState {
81 payments: BTreeMap<PaymentCreatedIndex, BasicPaymentV2>,
84
85 pending: BTreeSet<PaymentCreatedIndex>,
91
92 latest_updated_index: Option<PaymentUpdatedIndex>,
102}
103
104#[derive(Debug)]
106pub struct PaymentSyncSummary {
107 pub num_new: usize,
109 pub num_updated: usize,
111}
112
113impl PaymentSyncSummary {
114 pub fn any_changes(&self) -> bool {
117 self.num_new > 0 || self.num_updated > 0
118 }
119}
120
121#[allow(private_bounds)]
128pub(crate) async fn sync_payments<F: Ffs>(
129 db: &PaymentsDb<F>,
130 node_client: &impl AppNodeRunSyncApi,
131 batch_size: u16,
132) -> anyhow::Result<PaymentSyncSummary> {
133 assert!(batch_size > 0);
134
135 let mut start_index = db.state.read().unwrap().latest_updated_index;
136
137 let mut summary = PaymentSyncSummary {
138 num_new: 0,
139 num_updated: 0,
140 };
141
142 loop {
143 let req = GetUpdatedPayments {
146 start_index,
149 limit: Some(batch_size),
150 };
151
152 let updated_payments = node_client
153 .get_updated_payments(req)
154 .await
155 .context("Failed to fetch updated payments")?
156 .payments;
157 let updated_payments_len = updated_payments.len();
158
159 let (new, updated, latest_updated_index) = db
161 .upsert_payments(updated_payments)
162 .context("Failed to upsert payments")?;
163 summary.num_new += new;
164 summary.num_updated += updated;
165
166 start_index = latest_updated_index;
168
169 if updated_payments_len < usize::from(batch_size) {
172 break;
173 }
174 }
175
176 Ok(summary)
177}
178
179trait AppNodeRunSyncApi {
184 async fn get_updated_payments(
187 &self,
188 req: GetUpdatedPayments,
189 ) -> Result<VecBasicPaymentV2, NodeApiError>;
190}
191
192impl AppNodeRunSyncApi for NodeClient {
193 async fn get_updated_payments(
194 &self,
195 req: GetUpdatedPayments,
196 ) -> Result<VecBasicPaymentV2, NodeApiError> {
197 AppNodeRunApi::get_updated_payments(self, req).await
198 }
199}
200
201impl<F: Ffs> PaymentsDb<F> {
202 pub(crate) fn read(ffs: F) -> anyhow::Result<Self> {
206 let state = PaymentsDbState::read(&ffs)
207 .map(RwLock::new)
208 .context("Failed to read on-disk PaymentsDb state")?;
209
210 Ok(Self { ffs, state })
211 }
212
213 pub(crate) fn empty(ffs: F) -> Self {
215 let state = RwLock::new(PaymentsDbState::empty());
216 Self { ffs, state }
217 }
218
219 pub fn delete(&self) -> io::Result<()> {
223 *self.state.write().unwrap() = PaymentsDbState::empty();
224 self.ffs.delete_all()
225 }
226
227 pub fn num_payments(&self) -> usize {
229 self.state.read().unwrap().num_payments()
230 }
231
232 pub fn num_pending(&self) -> usize {
234 self.state.read().unwrap().num_pending()
235 }
236
237 pub fn num_finalized(&self) -> usize {
239 self.state.read().unwrap().num_finalized()
240 }
241
242 pub fn num_pending_not_junk(&self) -> usize {
244 self.state.read().unwrap().num_pending_not_junk()
245 }
246
247 pub fn num_finalized_not_junk(&self) -> usize {
249 self.state.read().unwrap().num_finalized_not_junk()
250 }
251
252 pub fn latest_updated_index(&self) -> Option<PaymentUpdatedIndex> {
254 self.state.read().unwrap().latest_updated_index()
255 }
256
257 pub fn get_payment_by_created_index(
259 &self,
260 created_index: &PaymentCreatedIndex,
261 ) -> Option<BasicPaymentV2> {
262 self.state
263 .read()
264 .unwrap()
265 .get_payment_by_created_index(created_index)
266 .cloned()
267 }
268
269 pub fn get_payment_by_scroll_idx(
271 &self,
272 scroll_idx: usize,
273 ) -> Option<BasicPaymentV2> {
274 self.state
275 .read()
276 .unwrap()
277 .get_payment_by_scroll_idx(scroll_idx)
278 .cloned()
279 }
280
281 pub fn get_pending_payment_by_scroll_idx(
283 &self,
284 scroll_idx: usize,
285 ) -> Option<BasicPaymentV2> {
286 self.state
287 .read()
288 .unwrap()
289 .get_pending_payment_by_scroll_idx(scroll_idx)
290 .cloned()
291 }
292
293 pub fn get_pending_not_junk_payment_by_scroll_idx(
295 &self,
296 scroll_idx: usize,
297 ) -> Option<BasicPaymentV2> {
298 self.state
299 .read()
300 .unwrap()
301 .get_pending_not_junk_payment_by_scroll_idx(scroll_idx)
302 .cloned()
303 }
304
305 pub fn get_finalized_payment_by_scroll_idx(
307 &self,
308 scroll_idx: usize,
309 ) -> Option<BasicPaymentV2> {
310 self.state
311 .read()
312 .unwrap()
313 .get_finalized_payment_by_scroll_idx(scroll_idx)
314 .cloned()
315 }
316
317 pub fn get_finalized_not_junk_payment_by_scroll_idx(
319 &self,
320 scroll_idx: usize,
321 ) -> Option<BasicPaymentV2> {
322 self.state
323 .read()
324 .unwrap()
325 .get_finalized_not_junk_payment_by_scroll_idx(scroll_idx)
326 .cloned()
327 }
328
329 fn upsert_payments(
336 &self,
337 payments: impl IntoIterator<Item = BasicPaymentV2>,
338 ) -> io::Result<(usize, usize, Option<PaymentUpdatedIndex>)> {
339 let mut state = self.state.write().unwrap();
340
341 let mut num_new = 0;
342 let mut num_updated = 0;
343 for payment in payments {
344 let (new, updated) =
345 Self::upsert_payment(&self.ffs, &mut state, payment)?;
346 num_new += new;
347 num_updated += updated;
348 }
349
350 Ok((num_new, num_updated, state.latest_updated_index))
351 }
352
353 fn upsert_payment(
360 ffs: &F,
361 state: &mut PaymentsDbState,
362 payment: BasicPaymentV2,
363 ) -> io::Result<(usize, usize)> {
364 let created_index = payment.created_index();
365
366 let maybe_existing = state.payments.get(&created_index);
367 let already_existed = maybe_existing.is_some();
368
369 if let Some(existing) = maybe_existing
371 && payment == *existing
372 {
373 return Ok((0, 0));
374 }
375
376 Self::write_payment(ffs, &payment)?;
379
380 if payment.is_pending() {
384 state.pending.insert(created_index);
385 } else {
386 state.pending.remove(&created_index);
387 }
388 state.latest_updated_index =
390 cmp::max(state.latest_updated_index, Some(payment.updated_index()));
391
392 state.payments.insert(created_index, payment);
394
395 if already_existed {
396 Ok((0, 1))
397 } else {
398 Ok((1, 0))
399 }
400 }
401
402 fn write_payment(ffs: &F, payment: &BasicPaymentV2) -> io::Result<()> {
406 let filename = payment.created_index().to_string();
407 let data =
408 serde_json::to_vec(&payment).expect("Failed to serialize payment");
409 ffs.write(&filename, &data)
410 }
411
412 pub(crate) fn update_payment_note(
416 &self,
417 req: UpdatePaymentNote,
418 ) -> anyhow::Result<()> {
419 let mut state = self.state.write().unwrap();
420
421 let payment = state
422 .get_mut_payment_by_created_index(&req.index)
423 .context("Updating non-existent payment")?;
424
425 payment.note = req.note;
426
427 Self::write_payment(&self.ffs, payment)
428 .context("Failed to write payment to local db")?;
429
430 Ok(())
431 }
432
433 #[cfg(test)]
438 fn debug_assert_invariants(&self) {
439 if cfg!(not(debug_assertions)) {
440 return;
441 }
442
443 let state = self.state.read().unwrap();
444
445 state.debug_assert_invariants();
447
448 let on_disk_state = PaymentsDbState::read(&self.ffs)
450 .expect("Failed to re-read on-disk state");
451 assert_eq!(on_disk_state, *state);
452 }
453}
454
455impl PaymentsDbState {
456 fn empty() -> Self {
458 Self {
459 payments: BTreeMap::new(),
460 pending: BTreeSet::new(),
461 latest_updated_index: None,
462 }
463 }
464
465 fn read(ffs: &impl Ffs) -> anyhow::Result<Self> {
467 let mut buf = Vec::<u8>::new();
468 let mut payments = Vec::<BasicPaymentV2>::new();
469
470 ffs.read_dir_visitor(|filename| {
471 let created_index = match PaymentCreatedIndex::from_str(filename) {
473 Ok(idx) => idx,
474 Err(e) => {
475 warn!(
476 %filename,
477 "Error: unrecognized filename in payments dir: {e:#}"
478 );
479 return Ok(());
480 }
481 };
482
483 buf.clear();
485 ffs.read_into(filename, &mut buf)?;
486
487 let payment = serde_json::from_slice::<BasicPaymentV2>(&buf)
489 .with_context(|| filename.to_owned())
490 .context("Failed to deserialize payment file")
491 .map_err(io_error_invalid_data)?;
492
493 let payment_created_index = payment.created_index();
495 if created_index != payment_created_index {
496 return Err(io_error_invalid_data(format!(
497 "Payment DB corruption: filename index '{filename}'
498 different from index in contents '{payment_created_index}'"
499 )));
500 }
501
502 payments.push(payment);
504
505 Ok(())
506 })
507 .context("Failed to read payments db, possibly corrupted?")?;
508
509 Ok(Self::from_vec(payments))
510 }
511
512 fn from_vec(payments: Vec<BasicPaymentV2>) -> Self {
513 let payments = payments
514 .into_iter()
515 .map(|p| (p.created_index(), p))
516 .collect();
517
518 let pending = build_index::pending(&payments);
519 let latest_updated_index = build_index::latest_updated_index(&payments);
520
521 Self {
522 payments,
523 pending,
524 latest_updated_index,
525 }
526 }
527
528 fn num_payments(&self) -> usize {
529 self.payments.len()
530 }
531
532 fn num_pending(&self) -> usize {
533 self.pending.len()
534 }
535
536 fn num_finalized(&self) -> usize {
537 self.payments.len() - self.pending.len()
538 }
539
540 fn num_pending_not_junk(&self) -> usize {
541 self.pending
542 .iter()
543 .filter_map(|created_idx| self.payments.get(created_idx))
544 .filter(|p| p.is_pending_not_junk())
545 .count()
546 }
547
548 fn num_finalized_not_junk(&self) -> usize {
550 self.payments
551 .values()
552 .filter(|p| p.is_finalized_not_junk())
553 .count()
554 }
555
556 fn latest_updated_index(&self) -> Option<PaymentUpdatedIndex> {
557 self.latest_updated_index
558 }
559
560 fn get_payment_by_created_index(
561 &self,
562 created_index: &PaymentCreatedIndex,
563 ) -> Option<&BasicPaymentV2> {
564 self.payments.get(created_index)
565 }
566
567 fn get_mut_payment_by_created_index(
569 &mut self,
570 created_index: &PaymentCreatedIndex,
571 ) -> Option<&mut BasicPaymentV2> {
572 self.payments.get_mut(created_index)
573 }
574
575 fn get_payment_by_scroll_idx(
576 &self,
577 scroll_idx: usize,
578 ) -> Option<&BasicPaymentV2> {
579 self.payments.values().nth_back(scroll_idx)
580 }
581
582 fn get_pending_payment_by_scroll_idx(
583 &self,
584 scroll_idx: usize,
585 ) -> Option<&BasicPaymentV2> {
586 self.pending
587 .iter()
588 .nth_back(scroll_idx)
589 .and_then(|created_idx| self.payments.get(created_idx))
590 }
591
592 fn get_pending_not_junk_payment_by_scroll_idx(
593 &self,
594 scroll_idx: usize,
595 ) -> Option<&BasicPaymentV2> {
596 self.pending
597 .iter()
598 .rev()
599 .filter_map(|created_idx| self.payments.get(created_idx))
600 .filter(|p| p.is_pending_not_junk())
601 .nth_back(scroll_idx)
602 }
603
604 fn get_finalized_payment_by_scroll_idx(
605 &self,
606 scroll_idx: usize,
607 ) -> Option<&BasicPaymentV2> {
608 self.payments
609 .values()
610 .filter(|p| p.is_finalized())
611 .nth_back(scroll_idx)
612 }
613
614 fn get_finalized_not_junk_payment_by_scroll_idx(
615 &self,
616 scroll_idx: usize,
617 ) -> Option<&BasicPaymentV2> {
618 self.payments
619 .values()
620 .filter(|p| p.is_finalized_not_junk())
621 .nth_back(scroll_idx)
622 }
623
624 #[cfg(test)]
625 fn is_empty(&self) -> bool {
626 self.payments.is_empty()
627 }
628
629 #[cfg(test)]
631 fn debug_assert_invariants(&self) {
632 if cfg!(not(debug_assertions)) {
633 return;
634 }
635
636 for (idx, payment) in &self.payments {
640 assert_eq!(*idx, payment.created_index());
641 }
642
643 let rebuilt_pending_index = build_index::pending(&self.payments);
647 assert_eq!(rebuilt_pending_index, self.pending);
648
649 self.payments
652 .values()
653 .filter(|p| p.is_pending())
654 .all(|p| self.pending.contains(&p.created_index()));
655
656 let recomputed_latest_updated_index =
659 build_index::latest_updated_index(&self.payments);
660 assert_eq!(recomputed_latest_updated_index, self.latest_updated_index);
661 }
662}
663
664mod build_index {
665 use super::*;
666
667 pub(super) fn pending(
669 payments: &BTreeMap<PaymentCreatedIndex, BasicPaymentV2>,
670 ) -> BTreeSet<PaymentCreatedIndex> {
671 payments
672 .iter()
673 .filter(|(_, p)| p.is_pending())
674 .map(|(idx, _)| *idx)
675 .collect()
676 }
677
678 pub(super) fn latest_updated_index(
680 payments: &BTreeMap<PaymentCreatedIndex, BasicPaymentV2>,
681 ) -> Option<PaymentUpdatedIndex> {
682 payments.values().map(BasicPaymentV2::updated_index).max()
683 }
684}
685
686fn io_error_invalid_data(
688 error: impl Into<Box<dyn std::error::Error + Send + Sync>>,
689) -> io::Error {
690 io::Error::new(io::ErrorKind::InvalidData, error)
691}
692
693#[cfg(test)]
694mod test_utils {
695 use proptest::{
696 prelude::{Strategy, any},
697 sample::SizeRange,
698 };
699
700 use super::*;
701
702 pub(super) struct MockNode {
703 pub payments: BTreeMap<PaymentUpdatedIndex, BasicPaymentV2>,
704 }
705
706 impl MockNode {
707 pub(super) fn new(
709 payments: BTreeMap<PaymentUpdatedIndex, BasicPaymentV2>,
710 ) -> Self {
711 Self { payments }
712 }
713
714 pub(super) fn from_payments(
716 payments: BTreeMap<PaymentCreatedIndex, BasicPaymentV2>,
717 ) -> Self {
718 let payments = payments
719 .into_values()
720 .map(|p| (p.updated_index(), p))
721 .collect();
722 Self { payments }
723 }
724 }
725
726 impl AppNodeRunSyncApi for MockNode {
727 async fn get_updated_payments(
730 &self,
731 req: GetUpdatedPayments,
732 ) -> Result<VecBasicPaymentV2, NodeApiError> {
733 let limit = req.limit.unwrap_or(u16::MAX);
734
735 let payments = match req.start_index {
736 Some(start_index) => self
737 .payments
738 .iter()
739 .filter(|(idx, _)| &start_index < *idx)
740 .take(limit as usize)
741 .map(|(_idx, payment)| payment.clone())
742 .collect(),
743 None => self
744 .payments
745 .iter()
746 .take(limit as usize)
747 .map(|(_idx, payment)| payment.clone())
748 .collect(),
749 };
750
751 Ok(VecBasicPaymentV2 { payments })
752 }
753 }
754
755 pub(super) fn any_payments(
756 approx_size: impl Into<SizeRange>,
757 ) -> impl Strategy<Value = BTreeMap<PaymentCreatedIndex, BasicPaymentV2>>
758 {
759 proptest::collection::vec(any::<BasicPaymentV2>(), approx_size)
760 .prop_map(|payments| {
761 payments
762 .into_iter()
763 .map(|payment| (payment.created_index(), payment))
764 .collect::<BTreeMap<_, _>>()
765 })
766 }
767}
768
769#[cfg(test)]
770mod test {
771 use std::{collections::HashSet, time::Duration};
772
773 use common::rng::{FastRng, Rng, RngExt};
774 use lexe_api::types::payments::PaymentStatus;
775 use proptest::{
776 collection::vec, prelude::any, proptest, sample::Index,
777 test_runner::Config,
778 };
779 use tempfile::tempdir;
780
781 use super::{test_utils::MockNode, *};
782 use crate::unstable::ffs::{DiskFs, test_utils::InMemoryFfs};
783
784 #[test]
785 fn read_from_empty() {
786 let mock_ffs = InMemoryFfs::new();
787 let mock_ffs_db = PaymentsDb::read(mock_ffs).unwrap();
788 assert!(mock_ffs_db.state.read().unwrap().is_empty());
789 mock_ffs_db.debug_assert_invariants();
790
791 let tempdir = tempfile::tempdir().unwrap();
792 let temp_fs =
793 DiskFs::create_dir_all(tempdir.path().to_path_buf()).unwrap();
794 let temp_fs_db = PaymentsDb::read(temp_fs).unwrap();
795 assert!(temp_fs_db.state.read().unwrap().is_empty());
796 temp_fs_db.debug_assert_invariants();
797
798 assert_eq!(
799 *mock_ffs_db.state.read().unwrap(),
800 *temp_fs_db.state.read().unwrap()
801 );
802 }
803
804 #[test]
805 fn test_upsert() {
806 fn visit_batches<T>(
807 iter: &mut impl Iterator<Item = T>,
808 batch_sizes: Vec<usize>,
809 mut f: impl FnMut(Vec<T>),
810 ) {
811 let batch_sizes = batch_sizes.into_iter();
812
813 for batch_size in batch_sizes {
814 let batch = take_n(iter, batch_size);
815 let batch_len = batch.len();
816
817 if batch_len == 0 {
818 return;
819 }
820
821 f(batch);
822
823 if batch_len < batch_size {
824 return;
825 }
826 }
827
828 let batch = iter.collect::<Vec<_>>();
829
830 if !batch.is_empty() {
831 f(batch);
832 }
833 }
834
835 fn take_n<T>(iter: &mut impl Iterator<Item = T>, n: usize) -> Vec<T> {
836 let mut out = Vec::with_capacity(n);
837
838 while out.len() < n {
839 match iter.next() {
840 Some(value) => out.push(value),
841 None => break,
842 }
843 }
844
845 out
846 }
847
848 proptest!(
849 Config::with_cases(10),
850 |(
851 rng: FastRng,
852 payments in test_utils::any_payments(0..20),
853 batch_sizes in vec(1_usize..20, 0..5),
854 )| {
855 let tempdir = tempdir().unwrap();
856 let temp_fs = DiskFs::create_dir_all(tempdir.path().to_path_buf()).unwrap();
857 let temp_fs_db = PaymentsDb::empty(temp_fs);
858
859 let mock_ffs = InMemoryFfs::from_rng(rng);
860 let mock_ffs_db = PaymentsDb::empty(mock_ffs);
861
862 let mut payments_iter = payments.clone().into_values();
863 visit_batches(&mut payments_iter, batch_sizes, |new_payment_batch| {
864 let _ = mock_ffs_db.upsert_payments(
865 new_payment_batch.clone()
866 ).unwrap();
867 let _ = temp_fs_db.upsert_payments(new_payment_batch).unwrap();
868
869 mock_ffs_db.debug_assert_invariants();
870 temp_fs_db.debug_assert_invariants();
871 });
872
873 assert_eq!(
874 *mock_ffs_db.state.read().unwrap(),
875 *temp_fs_db.state.read().unwrap()
876 );
877 }
878 );
879 }
880
881 #[tokio::test]
882 async fn test_sync_empty() {
883 let mock_node_client = MockNode::new(BTreeMap::new());
884 let mock_ffs = InMemoryFfs::new();
885 let db = PaymentsDb::empty(mock_ffs);
886
887 sync_payments(&db, &mock_node_client, 5).await.unwrap();
888
889 assert!(db.state.read().unwrap().is_empty());
890 db.debug_assert_invariants();
891 }
892
893 #[test]
894 fn test_sync() {
895 fn assert_db_payments_eq(
897 db_payments: &BTreeMap<PaymentCreatedIndex, BasicPaymentV2>,
898 node_payments: &BTreeMap<PaymentUpdatedIndex, BasicPaymentV2>,
899 ) {
900 assert_eq!(db_payments.len(), node_payments.len());
901 db_payments.iter().for_each(|(_created_idx, payment)| {
902 let node_payment =
903 node_payments.get(&payment.updated_index()).unwrap();
904 assert_eq!(payment, node_payment);
905 });
906 node_payments.iter().for_each(|(_updated_idx, payment)| {
907 let db_payment =
908 db_payments.get(&payment.created_index()).unwrap();
909 assert_eq!(payment, db_payment);
910 });
911 }
912
913 let rt = tokio::runtime::Builder::new_current_thread()
914 .build()
915 .unwrap();
916
917 proptest!(
918 Config::with_cases(4),
919 |(
920 mut rng: FastRng,
921 payments in test_utils::any_payments(1..20),
922 req_batch_size in 1_u16..5,
923 finalize_indexes in
924 proptest::collection::vec(any::<Index>(), 1..5),
925 )| {
926 let mut mock_node = MockNode::from_payments(payments);
927
928 let mut rng2 = FastRng::from_u64(rng.gen_u64());
929 let mock_ffs = InMemoryFfs::from_rng(rng);
930
931 let db = PaymentsDb::empty(mock_ffs);
933 rt.block_on(sync_payments(&db, &mock_node, req_batch_size))
934 .unwrap();
935 assert_db_payments_eq(
936 &db.state.read().unwrap().payments,
937 &mock_node.payments,
938 );
939 db.debug_assert_invariants();
940
941 let mock_ffs = db.ffs;
943 let db = PaymentsDb::read(mock_ffs).unwrap();
944 rt.block_on(sync_payments(&db, &mock_node, req_batch_size))
945 .unwrap();
946 assert_db_payments_eq(
947 &db.state.read().unwrap().payments,
948 &mock_node.payments,
949 );
950 db.debug_assert_invariants();
951
952 let finalize_some_payments = || {
954 let pending_payments = mock_node
955 .payments
956 .values()
957 .filter(|p| p.is_pending())
958 .cloned()
959 .collect::<Vec<_>>();
960
961 if pending_payments.is_empty() {
962 return;
963 }
964
965 let mut current_time = db
970 .state
971 .read()
972 .unwrap()
973 .latest_updated_index
974 .expect("DB should have payments")
975 .updated_at;
976
977 let finalize_idxs = finalize_indexes
980 .into_iter()
981 .map(|index| index.index(pending_payments.len()))
982 .collect::<HashSet<_>>();
984
985 for finalize_idx in finalize_idxs {
986 let final_updated_idx =
988 pending_payments[finalize_idx].updated_index();
989
990 let mut payment = mock_node
992 .payments
993 .remove(&final_updated_idx)
994 .unwrap();
995
996 let new_status = if rng2.gen_boolean() {
998 PaymentStatus::Completed
999 } else {
1000 PaymentStatus::Failed
1001 };
1002
1003 let bump_u64 = rng2.gen_range(1..=10);
1005 let bump_dur = Duration::from_millis(bump_u64);
1006 current_time = current_time.saturating_add(bump_dur);
1007
1008 payment.status = new_status;
1010 payment.updated_at = current_time;
1011
1012 let new_updated_index = payment.updated_index();
1014 mock_node
1015 .payments
1016 .insert(new_updated_index, payment);
1017 }
1018 };
1019
1020 finalize_some_payments();
1021
1022 rt.block_on(sync_payments(&db, &mock_node, req_batch_size))
1024 .unwrap();
1025
1026 assert_db_payments_eq(
1027 &db.state.read().unwrap().payments,
1028 &mock_node.payments,
1029 );
1030 db.debug_assert_invariants();
1031 }
1032 );
1033 }
1034}