hydro_lang/properties/
mod.rs1use std::marker::PhantomData;
4
5use stageleft::properties::Property;
6
7use crate::live_collections::stream::{ExactlyOnce, Ordering, Retries, TotalOrder};
8
9#[sealed::sealed]
11pub trait CommutativeProof {
12 fn register_proof(&self, expr: &syn::Expr);
16}
17
18#[sealed::sealed]
20pub trait IdempotentProof {
21 fn register_proof(&self, expr: &syn::Expr);
25}
26
27pub struct ManualProof();
29#[sealed::sealed]
30impl CommutativeProof for ManualProof {
31 fn register_proof(&self, _expr: &syn::Expr) {}
32}
33#[sealed::sealed]
34impl IdempotentProof for ManualProof {
35 fn register_proof(&self, _expr: &syn::Expr) {}
36}
37
38pub enum NotProved {}
40
41pub enum Proved {}
43
44pub struct AggFuncAlgebra<Commutative = NotProved, Idempotent = NotProved>(
62 Option<Box<dyn CommutativeProof>>,
63 Option<Box<dyn IdempotentProof>>,
64 PhantomData<(Commutative, Idempotent)>,
65);
66
67impl<C, I> AggFuncAlgebra<C, I> {
68 pub fn commutative(self, proof: impl CommutativeProof + 'static) -> AggFuncAlgebra<Proved, I> {
70 AggFuncAlgebra(Some(Box::new(proof)), self.1, PhantomData)
71 }
72
73 pub fn idempotent(self, proof: impl IdempotentProof + 'static) -> AggFuncAlgebra<C, Proved> {
75 AggFuncAlgebra(self.0, Some(Box::new(proof)), PhantomData)
76 }
77
78 pub(crate) fn register_proof(self, expr: &syn::Expr) {
80 if let Some(comm_proof) = self.0 {
81 comm_proof.register_proof(expr);
82 }
83
84 if let Some(idem_proof) = self.1 {
85 idem_proof.register_proof(expr);
86 }
87 }
88}
89
90impl<C, I> Property for AggFuncAlgebra<C, I> {
91 type Root = AggFuncAlgebra;
92
93 fn make_root(_target: &mut Option<Self>) -> Self::Root {
94 AggFuncAlgebra(None, None, PhantomData)
95 }
96}
97
98#[diagnostic::on_unimplemented(
100 message = "Because the input stream has ordering `{O}`, the closure must demonstrate commutativity with a `commutative = ...` annotation.",
101 label = "required for this call",
102 note = "To intentionally process the stream with a non-deterministic (shuffled) order of elements, use `.assume_ordering`. This bypasses the safety guarantees so avoid unless necessary."
103)]
104#[sealed::sealed]
105pub trait ValidCommutativityFor<O: Ordering> {}
106#[sealed::sealed]
107impl ValidCommutativityFor<TotalOrder> for NotProved {}
108#[sealed::sealed]
109impl<O: Ordering> ValidCommutativityFor<O> for Proved {}
110
111#[diagnostic::on_unimplemented(
113 message = "Because the input stream has retries `{R}`, the closure must demonstrate idempotence with an `idempotent = ...` annotation.",
114 label = "required for this call",
115 note = "To intentionally process the stream with non-deterministic (randomly duplicated) retries, use `.assume_retries`. This bypasses the safety guarantees so avoid unless necessary."
116)]
117#[sealed::sealed]
118pub trait ValidIdempotenceFor<R: Retries> {}
119#[sealed::sealed]
120impl ValidIdempotenceFor<ExactlyOnce> for NotProved {}
121#[sealed::sealed]
122impl<R: Retries> ValidIdempotenceFor<R> for Proved {}