Skip to main content

hydro_lang/live_collections/
singleton.rs

1//! Definitions for the [`Singleton`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::Deref;
6use std::rc::Rc;
7
8use stageleft::{IntoQuotedMut, QuotedWithContext, q};
9
10use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
11use super::optional::Optional;
12use super::sliced::sliced;
13use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
14use crate::compile::builder::CycleId;
15use crate::compile::ir::{CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, TeeNode};
16#[cfg(stageleft_runtime)]
17use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
18use crate::forward_handle::{ForwardRef, TickCycle};
19#[cfg(stageleft_runtime)]
20use crate::location::dynamic::{DynLocation, LocationId};
21use crate::location::tick::{Atomic, NoAtomic};
22use crate::location::{Location, NoTick, Tick, check_matching_location};
23use crate::nondet::{NonDet, nondet};
24
25/// A single Rust value that can asynchronously change over time.
26///
27/// If the singleton is [`Bounded`], the value is frozen and will not change. But if it is
28/// [`Unbounded`], the value will asynchronously change over time.
29///
30/// Singletons are often used to capture state in a Hydro program, such as an event counter which is
31/// a single number that will asynchronously change as events are processed. Singletons also appear
32/// when dealing with bounded collections, to perform regular Rust computations on concrete values,
33/// such as getting the length of a batch of requests.
34///
35/// Type Parameters:
36/// - `Type`: the type of the value in this singleton
37/// - `Loc`: the [`Location`] where the singleton is materialized
38/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
39pub struct Singleton<Type, Loc, Bound: Boundedness> {
40    pub(crate) location: Loc,
41    pub(crate) ir_node: RefCell<HydroNode>,
42
43    _phantom: PhantomData<(Type, Loc, Bound)>,
44}
45
46impl<'a, T, L> From<Singleton<T, L, Bounded>> for Singleton<T, L, Unbounded>
47where
48    T: Clone,
49    L: Location<'a> + NoTick,
50{
51    fn from(value: Singleton<T, L, Bounded>) -> Self {
52        let tick = value.location().tick();
53        value.clone_into_tick(&tick).latest()
54    }
55}
56
57impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
58where
59    L: Location<'a>,
60{
61    type Location = Tick<L>;
62
63    fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
64        let from_previous_tick: Optional<T, Tick<L>, Bounded> = Optional::new(
65            location.clone(),
66            HydroNode::DeferTick {
67                input: Box::new(HydroNode::CycleSource {
68                    cycle_id,
69                    metadata: location.new_node_metadata(Self::collection_kind()),
70                }),
71                metadata: location
72                    .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
73            },
74        );
75
76        from_previous_tick.unwrap_or(initial)
77    }
78}
79
80impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
81where
82    L: Location<'a>,
83{
84    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
85        assert_eq!(
86            Location::id(&self.location),
87            expected_location,
88            "locations do not match"
89        );
90        self.location
91            .flow_state()
92            .borrow_mut()
93            .push_root(HydroRoot::CycleSink {
94                cycle_id,
95                input: Box::new(self.ir_node.into_inner()),
96                op_metadata: HydroIrOpMetadata::new(),
97            });
98    }
99}
100
101impl<'a, T, L> CycleCollection<'a, ForwardRef> for Singleton<T, Tick<L>, Bounded>
102where
103    L: Location<'a>,
104{
105    type Location = Tick<L>;
106
107    fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
108        Singleton::new(
109            location.clone(),
110            HydroNode::CycleSource {
111                cycle_id,
112                metadata: location.new_node_metadata(Self::collection_kind()),
113            },
114        )
115    }
116}
117
118impl<'a, T, L> ReceiverComplete<'a, ForwardRef> for Singleton<T, Tick<L>, Bounded>
119where
120    L: Location<'a>,
121{
122    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
123        assert_eq!(
124            Location::id(&self.location),
125            expected_location,
126            "locations do not match"
127        );
128        self.location
129            .flow_state()
130            .borrow_mut()
131            .push_root(HydroRoot::CycleSink {
132                cycle_id,
133                input: Box::new(self.ir_node.into_inner()),
134                op_metadata: HydroIrOpMetadata::new(),
135            });
136    }
137}
138
139impl<'a, T, L, B: Boundedness> CycleCollection<'a, ForwardRef> for Singleton<T, L, B>
140where
141    L: Location<'a> + NoTick,
142{
143    type Location = L;
144
145    fn create_source(cycle_id: CycleId, location: L) -> Self {
146        Singleton::new(
147            location.clone(),
148            HydroNode::CycleSource {
149                cycle_id,
150                metadata: location.new_node_metadata(Self::collection_kind()),
151            },
152        )
153    }
154}
155
156impl<'a, T, L, B: Boundedness> ReceiverComplete<'a, ForwardRef> for Singleton<T, L, B>
157where
158    L: Location<'a> + NoTick,
159{
160    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
161        assert_eq!(
162            Location::id(&self.location),
163            expected_location,
164            "locations do not match"
165        );
166        self.location
167            .flow_state()
168            .borrow_mut()
169            .push_root(HydroRoot::CycleSink {
170                cycle_id,
171                input: Box::new(self.ir_node.into_inner()),
172                op_metadata: HydroIrOpMetadata::new(),
173            });
174    }
175}
176
177impl<'a, T, L, B: Boundedness> Clone for Singleton<T, L, B>
178where
179    T: Clone,
180    L: Location<'a>,
181{
182    fn clone(&self) -> Self {
183        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
184            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
185            *self.ir_node.borrow_mut() = HydroNode::Tee {
186                inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
187                metadata: self.location.new_node_metadata(Self::collection_kind()),
188            };
189        }
190
191        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
192            Singleton {
193                location: self.location.clone(),
194                ir_node: HydroNode::Tee {
195                    inner: TeeNode(inner.0.clone()),
196                    metadata: metadata.clone(),
197                }
198                .into(),
199                _phantom: PhantomData,
200            }
201        } else {
202            unreachable!()
203        }
204    }
205}
206
207#[cfg(stageleft_runtime)]
208fn zip_inside_tick<'a, T, L: Location<'a>, B: Boundedness, O>(
209    me: Singleton<T, Tick<L>, B>,
210    other: O,
211) -> <Singleton<T, Tick<L>, B> as ZipResult<'a, O>>::Out
212where
213    Singleton<T, Tick<L>, B>: ZipResult<'a, O, Location = Tick<L>>,
214{
215    check_matching_location(
216        &me.location,
217        &Singleton::<T, Tick<L>, B>::other_location(&other),
218    );
219
220    Singleton::<T, Tick<L>, B>::make(
221        me.location.clone(),
222        HydroNode::CrossSingleton {
223            left: Box::new(me.ir_node.into_inner()),
224            right: Box::new(Singleton::<T, Tick<L>, B>::other_ir_node(other)),
225            metadata: me.location.new_node_metadata(CollectionKind::Singleton {
226                bound: B::BOUND_KIND,
227                element_type: stageleft::quote_type::<
228                    <Singleton<T, Tick<L>, B> as ZipResult<'a, O>>::ElementType,
229                >()
230                .into(),
231            }),
232        },
233    )
234}
235
236impl<'a, T, L, B: Boundedness> Singleton<T, L, B>
237where
238    L: Location<'a>,
239{
240    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
241        debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
242        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
243        Singleton {
244            location,
245            ir_node: RefCell::new(ir_node),
246            _phantom: PhantomData,
247        }
248    }
249
250    pub(crate) fn collection_kind() -> CollectionKind {
251        CollectionKind::Singleton {
252            bound: B::BOUND_KIND,
253            element_type: stageleft::quote_type::<T>().into(),
254        }
255    }
256
257    /// Returns the [`Location`] where this singleton is being materialized.
258    pub fn location(&self) -> &L {
259        &self.location
260    }
261
262    /// Transforms the singleton value by applying a function `f` to it,
263    /// continuously as the input is updated.
264    ///
265    /// # Example
266    /// ```rust
267    /// # #[cfg(feature = "deploy")] {
268    /// # use hydro_lang::prelude::*;
269    /// # use futures::StreamExt;
270    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
271    /// let tick = process.tick();
272    /// let singleton = tick.singleton(q!(5));
273    /// singleton.map(q!(|v| v * 2)).all_ticks()
274    /// # }, |mut stream| async move {
275    /// // 10
276    /// # assert_eq!(stream.next().await.unwrap(), 10);
277    /// # }));
278    /// # }
279    /// ```
280    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Singleton<U, L, B>
281    where
282        F: Fn(T) -> U + 'a,
283    {
284        let f = f.splice_fn1_ctx(&self.location).into();
285        Singleton::new(
286            self.location.clone(),
287            HydroNode::Map {
288                f,
289                input: Box::new(self.ir_node.into_inner()),
290                metadata: self
291                    .location
292                    .new_node_metadata(Singleton::<U, L, B>::collection_kind()),
293            },
294        )
295    }
296
297    /// Transforms the singleton value by applying a function `f` to it and then flattening
298    /// the result into a stream, preserving the order of elements.
299    ///
300    /// The function `f` is applied to the singleton value to produce an iterator, and all items
301    /// from that iterator are emitted in the output stream in deterministic order.
302    ///
303    /// The implementation of [`Iterator`] for the output type `I` must produce items in a
304    /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
305    /// If the order is not deterministic, use [`Singleton::flat_map_unordered`] instead.
306    ///
307    /// # Example
308    /// ```rust
309    /// # #[cfg(feature = "deploy")] {
310    /// # use hydro_lang::prelude::*;
311    /// # use futures::StreamExt;
312    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
313    /// let tick = process.tick();
314    /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
315    /// singleton.flat_map_ordered(q!(|v| v)).all_ticks()
316    /// # }, |mut stream| async move {
317    /// // 1, 2, 3
318    /// # for w in vec![1, 2, 3] {
319    /// #     assert_eq!(stream.next().await.unwrap(), w);
320    /// # }
321    /// # }));
322    /// # }
323    /// ```
324    pub fn flat_map_ordered<U, I, F>(
325        self,
326        f: impl IntoQuotedMut<'a, F, L>,
327    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
328    where
329        I: IntoIterator<Item = U>,
330        F: Fn(T) -> I + 'a,
331    {
332        let f = f.splice_fn1_ctx(&self.location).into();
333        Stream::new(
334            self.location.clone(),
335            HydroNode::FlatMap {
336                f,
337                input: Box::new(self.ir_node.into_inner()),
338                metadata: self.location.new_node_metadata(
339                    Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
340                ),
341            },
342        )
343    }
344
345    /// Like [`Singleton::flat_map_ordered`], but allows the implementation of [`Iterator`]
346    /// for the output type `I` to produce items in any order.
347    ///
348    /// The function `f` is applied to the singleton value to produce an iterator, and all items
349    /// from that iterator are emitted in the output stream in non-deterministic order.
350    ///
351    /// # Example
352    /// ```rust
353    /// # #[cfg(feature = "deploy")] {
354    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
355    /// # use futures::StreamExt;
356    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
357    /// let tick = process.tick();
358    /// let singleton = tick.singleton(q!(
359    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
360    /// ));
361    /// singleton.flat_map_unordered(q!(|v| v)).all_ticks()
362    /// # }, |mut stream| async move {
363    /// // 1, 2, 3, but in no particular order
364    /// # let mut results = Vec::new();
365    /// # for _ in 0..3 {
366    /// #     results.push(stream.next().await.unwrap());
367    /// # }
368    /// # results.sort();
369    /// # assert_eq!(results, vec![1, 2, 3]);
370    /// # }));
371    /// # }
372    /// ```
373    pub fn flat_map_unordered<U, I, F>(
374        self,
375        f: impl IntoQuotedMut<'a, F, L>,
376    ) -> Stream<U, L, B, NoOrder, ExactlyOnce>
377    where
378        I: IntoIterator<Item = U>,
379        F: Fn(T) -> I + 'a,
380    {
381        let f = f.splice_fn1_ctx(&self.location).into();
382        Stream::new(
383            self.location.clone(),
384            HydroNode::FlatMap {
385                f,
386                input: Box::new(self.ir_node.into_inner()),
387                metadata: self
388                    .location
389                    .new_node_metadata(Stream::<U, L, B, NoOrder, ExactlyOnce>::collection_kind()),
390            },
391        )
392    }
393
394    /// Flattens the singleton value into a stream, preserving the order of elements.
395    ///
396    /// The singleton value must implement [`IntoIterator`], and all items from that iterator
397    /// are emitted in the output stream in deterministic order.
398    ///
399    /// The implementation of [`Iterator`] for the element type `T` must produce items in a
400    /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
401    /// If the order is not deterministic, use [`Singleton::flatten_unordered`] instead.
402    ///
403    /// # Example
404    /// ```rust
405    /// # #[cfg(feature = "deploy")] {
406    /// # use hydro_lang::prelude::*;
407    /// # use futures::StreamExt;
408    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
409    /// let tick = process.tick();
410    /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
411    /// singleton.flatten_ordered().all_ticks()
412    /// # }, |mut stream| async move {
413    /// // 1, 2, 3
414    /// # for w in vec![1, 2, 3] {
415    /// #     assert_eq!(stream.next().await.unwrap(), w);
416    /// # }
417    /// # }));
418    /// # }
419    /// ```
420    pub fn flatten_ordered<U>(self) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
421    where
422        T: IntoIterator<Item = U>,
423    {
424        self.flat_map_ordered(q!(|x| x))
425    }
426
427    /// Like [`Singleton::flatten_ordered`], but allows the implementation of [`Iterator`]
428    /// for the element type `T` to produce items in any order.
429    ///
430    /// The singleton value must implement [`IntoIterator`], and all items from that iterator
431    /// are emitted in the output stream in non-deterministic order.
432    ///
433    /// # Example
434    /// ```rust
435    /// # #[cfg(feature = "deploy")] {
436    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
437    /// # use futures::StreamExt;
438    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
439    /// let tick = process.tick();
440    /// let singleton = tick.singleton(q!(
441    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
442    /// ));
443    /// singleton.flatten_unordered().all_ticks()
444    /// # }, |mut stream| async move {
445    /// // 1, 2, 3, but in no particular order
446    /// # let mut results = Vec::new();
447    /// # for _ in 0..3 {
448    /// #     results.push(stream.next().await.unwrap());
449    /// # }
450    /// # results.sort();
451    /// # assert_eq!(results, vec![1, 2, 3]);
452    /// # }));
453    /// # }
454    /// ```
455    pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, ExactlyOnce>
456    where
457        T: IntoIterator<Item = U>,
458    {
459        self.flat_map_unordered(q!(|x| x))
460    }
461
462    /// Creates an optional containing the singleton value if it satisfies a predicate `f`.
463    ///
464    /// If the predicate returns `true`, the output optional contains the same value.
465    /// If the predicate returns `false`, the output optional is empty.
466    ///
467    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
468    /// not modify or take ownership of the value. If you need to modify the value while filtering
469    /// use [`Singleton::filter_map`] instead.
470    ///
471    /// # Example
472    /// ```rust
473    /// # #[cfg(feature = "deploy")] {
474    /// # use hydro_lang::prelude::*;
475    /// # use futures::StreamExt;
476    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
477    /// let tick = process.tick();
478    /// let singleton = tick.singleton(q!(5));
479    /// singleton.filter(q!(|&x| x > 3)).all_ticks()
480    /// # }, |mut stream| async move {
481    /// // 5
482    /// # assert_eq!(stream.next().await.unwrap(), 5);
483    /// # }));
484    /// # }
485    /// ```
486    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
487    where
488        F: Fn(&T) -> bool + 'a,
489    {
490        let f = f.splice_fn1_borrow_ctx(&self.location).into();
491        Optional::new(
492            self.location.clone(),
493            HydroNode::Filter {
494                f,
495                input: Box::new(self.ir_node.into_inner()),
496                metadata: self
497                    .location
498                    .new_node_metadata(Optional::<T, L, B>::collection_kind()),
499            },
500        )
501    }
502
503    /// An operator that both filters and maps. It yields the value only if the supplied
504    /// closure `f` returns `Some(value)`.
505    ///
506    /// If the closure returns `Some(new_value)`, the output optional contains `new_value`.
507    /// If the closure returns `None`, the output optional is empty.
508    ///
509    /// # Example
510    /// ```rust
511    /// # #[cfg(feature = "deploy")] {
512    /// # use hydro_lang::prelude::*;
513    /// # use futures::StreamExt;
514    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
515    /// let tick = process.tick();
516    /// let singleton = tick.singleton(q!("42"));
517    /// singleton
518    ///     .filter_map(q!(|s| s.parse::<i32>().ok()))
519    ///     .all_ticks()
520    /// # }, |mut stream| async move {
521    /// // 42
522    /// # assert_eq!(stream.next().await.unwrap(), 42);
523    /// # }));
524    /// # }
525    /// ```
526    pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
527    where
528        F: Fn(T) -> Option<U> + 'a,
529    {
530        let f = f.splice_fn1_ctx(&self.location).into();
531        Optional::new(
532            self.location.clone(),
533            HydroNode::FilterMap {
534                f,
535                input: Box::new(self.ir_node.into_inner()),
536                metadata: self
537                    .location
538                    .new_node_metadata(Optional::<U, L, B>::collection_kind()),
539            },
540        )
541    }
542
543    /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
544    ///
545    /// If the other value is a [`Singleton`], the output will be a [`Singleton`], but if it is an
546    /// [`Optional`], the output will be an [`Optional`] that is non-null only if the argument is
547    /// non-null. This is useful for combining several pieces of state together.
548    ///
549    /// # Example
550    /// ```rust
551    /// # #[cfg(feature = "deploy")] {
552    /// # use hydro_lang::prelude::*;
553    /// # use futures::StreamExt;
554    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
555    /// let tick = process.tick();
556    /// let numbers = process
557    ///   .source_iter(q!(vec![123, 456]))
558    ///   .batch(&tick, nondet!(/** test */));
559    /// let count = numbers.clone().count(); // Singleton
560    /// let max = numbers.max(); // Optional
561    /// count.zip(max).all_ticks()
562    /// # }, |mut stream| async move {
563    /// // [(2, 456)]
564    /// # for w in vec![(2, 456)] {
565    /// #     assert_eq!(stream.next().await.unwrap(), w);
566    /// # }
567    /// # }));
568    /// # }
569    /// ```
570    pub fn zip<O>(self, other: O) -> <Self as ZipResult<'a, O>>::Out
571    where
572        Self: ZipResult<'a, O, Location = L>,
573    {
574        check_matching_location(&self.location, &Self::other_location(&other));
575
576        if L::is_top_level()
577            && let Some(tick) = self.location.try_tick()
578        {
579            let out = zip_inside_tick(
580                self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
581                Optional::<<Self as ZipResult<'a, O>>::OtherType, L, B>::new(
582                    Self::other_location(&other),
583                    Self::other_ir_node(other),
584                )
585                .snapshot(&tick, nondet!(/** eventually stabilizes */)),
586            )
587            .latest();
588
589            Self::make(out.location, out.ir_node.into_inner())
590        } else {
591            Self::make(
592                self.location.clone(),
593                HydroNode::CrossSingleton {
594                    left: Box::new(self.ir_node.into_inner()),
595                    right: Box::new(Self::other_ir_node(other)),
596                    metadata: self.location.new_node_metadata(CollectionKind::Optional {
597                        bound: B::BOUND_KIND,
598                        element_type: stageleft::quote_type::<
599                            <Self as ZipResult<'a, O>>::ElementType,
600                        >()
601                        .into(),
602                    }),
603                },
604            )
605        }
606    }
607
608    /// Filters this singleton into an [`Optional`], passing through the singleton value if the
609    /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
610    ///
611    /// Useful for conditionally processing, such as only emitting a singleton's value outside
612    /// a tick if some other condition is satisfied.
613    ///
614    /// # Example
615    /// ```rust
616    /// # #[cfg(feature = "deploy")] {
617    /// # use hydro_lang::prelude::*;
618    /// # use futures::StreamExt;
619    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
620    /// let tick = process.tick();
621    /// // ticks are lazy by default, forces the second tick to run
622    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
623    ///
624    /// let batch_first_tick = process
625    ///   .source_iter(q!(vec![1]))
626    ///   .batch(&tick, nondet!(/** test */));
627    /// let batch_second_tick = process
628    ///   .source_iter(q!(vec![1, 2, 3]))
629    ///   .batch(&tick, nondet!(/** test */))
630    ///   .defer_tick(); // appears on the second tick
631    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
632    /// batch_first_tick.chain(batch_second_tick).count()
633    ///   .filter_if_some(some_on_first_tick)
634    ///   .all_ticks()
635    /// # }, |mut stream| async move {
636    /// // [1]
637    /// # for w in vec![1] {
638    /// #     assert_eq!(stream.next().await.unwrap(), w);
639    /// # }
640    /// # }));
641    /// # }
642    /// ```
643    pub fn filter_if_some<U>(self, signal: Optional<U, L, B>) -> Optional<T, L, B> {
644        self.zip::<Optional<(), L, B>>(signal.map(q!(|_u| ())))
645            .map(q!(|(d, _signal)| d))
646    }
647
648    /// Filters this singleton into an [`Optional`], passing through the singleton value if the
649    /// argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is null.
650    ///
651    /// Like [`Singleton::filter_if_some`], this is useful for conditional processing, but inverts
652    /// the condition.
653    ///
654    /// # Example
655    /// ```rust
656    /// # #[cfg(feature = "deploy")] {
657    /// # use hydro_lang::prelude::*;
658    /// # use futures::StreamExt;
659    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
660    /// let tick = process.tick();
661    /// // ticks are lazy by default, forces the second tick to run
662    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
663    ///
664    /// let batch_first_tick = process
665    ///   .source_iter(q!(vec![1]))
666    ///   .batch(&tick, nondet!(/** test */));
667    /// let batch_second_tick = process
668    ///   .source_iter(q!(vec![1, 2, 3]))
669    ///   .batch(&tick, nondet!(/** test */))
670    ///   .defer_tick(); // appears on the second tick
671    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
672    /// batch_first_tick.chain(batch_second_tick).count()
673    ///   .filter_if_none(some_on_first_tick)
674    ///   .all_ticks()
675    /// # }, |mut stream| async move {
676    /// // [3]
677    /// # for w in vec![3] {
678    /// #     assert_eq!(stream.next().await.unwrap(), w);
679    /// # }
680    /// # }));
681    /// # }
682    /// ```
683    pub fn filter_if_none<U>(self, other: Optional<U, L, B>) -> Optional<T, L, B> {
684        self.filter_if_some(
685            other
686                .map(q!(|_| ()))
687                .into_singleton()
688                .filter(q!(|o| o.is_none())),
689        )
690    }
691
692    /// An operator which allows you to "name" a `HydroNode`.
693    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
694    pub fn ir_node_named(self, name: &str) -> Singleton<T, L, B> {
695        {
696            let mut node = self.ir_node.borrow_mut();
697            let metadata = node.metadata_mut();
698            metadata.tag = Some(name.to_owned());
699        }
700        self
701    }
702}
703
704impl<'a, T, L, B: Boundedness> Singleton<Option<T>, L, B>
705where
706    L: Location<'a>,
707{
708    /// Converts a `Singleton<Option<U>, L, B>` into an `Optional<U, L, B>` by unwrapping
709    /// the inner `Option`.
710    ///
711    /// This is implemented as an identity [`Singleton::filter_map`], passing through the
712    /// `Option<U>` directly. If the singleton's value is `Some(v)`, the resulting
713    /// [`Optional`] contains `v`; if `None`, the [`Optional`] is empty.
714    ///
715    /// # Example
716    /// ```rust
717    /// # #[cfg(feature = "deploy")] {
718    /// # use hydro_lang::prelude::*;
719    /// # use futures::StreamExt;
720    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
721    /// let tick = process.tick();
722    /// let singleton = tick.singleton(q!(Some(42)));
723    /// singleton.into_optional().all_ticks()
724    /// # }, |mut stream| async move {
725    /// // 42
726    /// # assert_eq!(stream.next().await.unwrap(), 42);
727    /// # }));
728    /// # }
729    /// ```
730    pub fn into_optional(self) -> Optional<T, L, B> {
731        self.filter_map(q!(|v| v))
732    }
733}
734
735impl<'a, T, L, B: Boundedness> Singleton<T, Atomic<L>, B>
736where
737    L: Location<'a> + NoTick,
738{
739    /// Returns a singleton value corresponding to the latest snapshot of the singleton
740    /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
741    /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
742    /// all snapshots of this singleton into the atomic-associated tick will observe the
743    /// same value each tick.
744    ///
745    /// # Non-Determinism
746    /// Because this picks a snapshot of a singleton whose value is continuously changing,
747    /// the output singleton has a non-deterministic value since the snapshot can be at an
748    /// arbitrary point in time.
749    pub fn snapshot_atomic(self, _nondet: NonDet) -> Singleton<T, Tick<L>, Bounded> {
750        Singleton::new(
751            self.location.clone().tick,
752            HydroNode::Batch {
753                inner: Box::new(self.ir_node.into_inner()),
754                metadata: self
755                    .location
756                    .tick
757                    .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
758            },
759        )
760    }
761
762    /// Returns this singleton back into a top-level, asynchronous execution context where updates
763    /// to the value will be asynchronously propagated.
764    pub fn end_atomic(self) -> Singleton<T, L, B> {
765        Singleton::new(
766            self.location.tick.l.clone(),
767            HydroNode::EndAtomic {
768                inner: Box::new(self.ir_node.into_inner()),
769                metadata: self
770                    .location
771                    .tick
772                    .l
773                    .new_node_metadata(Singleton::<T, L, B>::collection_kind()),
774            },
775        )
776    }
777}
778
779impl<'a, T, L, B: Boundedness> Singleton<T, L, B>
780where
781    L: Location<'a>,
782{
783    /// Shifts this singleton into an atomic context, which guarantees that any downstream logic
784    /// will observe the same version of the value and will be executed synchronously before any
785    /// outputs are yielded (in [`Optional::end_atomic`]).
786    ///
787    /// This is useful to enforce local consistency constraints, such as ensuring that several readers
788    /// see a consistent version of local state (since otherwise each [`Singleton::snapshot`] may pick
789    /// a different version).
790    ///
791    /// Entering an atomic section requires a [`Tick`] argument that declares where the singleton will
792    /// be atomically processed. Snapshotting an singleton into the _same_ [`Tick`] will preserve the
793    /// synchronous execution, and all such snapshots in the same [`Tick`] will have the same value.
794    pub fn atomic(self, tick: &Tick<L>) -> Singleton<T, Atomic<L>, B> {
795        let out_location = Atomic { tick: tick.clone() };
796        Singleton::new(
797            out_location.clone(),
798            HydroNode::BeginAtomic {
799                inner: Box::new(self.ir_node.into_inner()),
800                metadata: out_location
801                    .new_node_metadata(Singleton::<T, Atomic<L>, B>::collection_kind()),
802            },
803        )
804    }
805
806    /// Given a tick, returns a singleton value corresponding to a snapshot of the singleton
807    /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
808    /// relevant data that contributed to the snapshot at tick `t`.
809    ///
810    /// # Non-Determinism
811    /// Because this picks a snapshot of a singleton whose value is continuously changing,
812    /// the output singleton has a non-deterministic value since the snapshot can be at an
813    /// arbitrary point in time.
814    pub fn snapshot(self, tick: &Tick<L>, _nondet: NonDet) -> Singleton<T, Tick<L>, Bounded> {
815        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
816        Singleton::new(
817            tick.clone(),
818            HydroNode::Batch {
819                inner: Box::new(self.ir_node.into_inner()),
820                metadata: tick
821                    .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
822            },
823        )
824    }
825
826    /// Eagerly samples the singleton as fast as possible, returning a stream of snapshots
827    /// with order corresponding to increasing prefixes of data contributing to the singleton.
828    ///
829    /// # Non-Determinism
830    /// At runtime, the singleton will be arbitrarily sampled as fast as possible, but due
831    /// to non-deterministic batching and arrival of inputs, the output stream is
832    /// non-deterministic.
833    pub fn sample_eager(self, nondet: NonDet) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
834    where
835        L: NoTick,
836    {
837        sliced! {
838            let snapshot = use(self, nondet);
839            snapshot.into_stream()
840        }
841        .weaken_retries()
842    }
843
844    /// Given a time interval, returns a stream corresponding to snapshots of the singleton
845    /// value taken at various points in time. Because the input singleton may be
846    /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
847    /// represent the value of the singleton given some prefix of the streams leading up to
848    /// it.
849    ///
850    /// # Non-Determinism
851    /// The output stream is non-deterministic in which elements are sampled, since this
852    /// is controlled by a clock.
853    pub fn sample_every(
854        self,
855        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
856        nondet: NonDet,
857    ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
858    where
859        L: NoTick + NoAtomic,
860    {
861        let samples = self.location.source_interval(interval, nondet);
862        sliced! {
863            let snapshot = use(self, nondet);
864            let sample_batch = use(samples, nondet);
865
866            snapshot.filter_if_some(sample_batch.first()).into_stream()
867        }
868        .weaken_retries()
869    }
870
871    /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
872    /// implies that `B == Bounded`.
873    pub fn make_bounded(self) -> Singleton<T, L, Bounded>
874    where
875        B: IsBounded,
876    {
877        Singleton::new(self.location, self.ir_node.into_inner())
878    }
879
880    /// Clones this bounded singleton into a tick, returning a singleton that has the
881    /// same value as the outer singleton. Because the outer singleton is bounded, this
882    /// is deterministic because there is only a single immutable version.
883    pub fn clone_into_tick(self, tick: &Tick<L>) -> Singleton<T, Tick<L>, Bounded>
884    where
885        B: IsBounded,
886        T: Clone,
887    {
888        // TODO(shadaj): avoid printing simulator logs for this snapshot
889        self.snapshot(
890            tick,
891            nondet!(/** bounded top-level singleton so deterministic */),
892        )
893    }
894
895    /// Converts this singleton into a [`Stream`] containing a single element, the value.
896    ///
897    /// # Example
898    /// ```rust
899    /// # #[cfg(feature = "deploy")] {
900    /// # use hydro_lang::prelude::*;
901    /// # use futures::StreamExt;
902    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
903    /// let tick = process.tick();
904    /// let batch_input = process
905    ///   .source_iter(q!(vec![123, 456]))
906    ///   .batch(&tick, nondet!(/** test */));
907    /// batch_input.clone().chain(
908    ///   batch_input.count().into_stream()
909    /// ).all_ticks()
910    /// # }, |mut stream| async move {
911    /// // [123, 456, 2]
912    /// # for w in vec![123, 456, 2] {
913    /// #     assert_eq!(stream.next().await.unwrap(), w);
914    /// # }
915    /// # }));
916    /// # }
917    /// ```
918    pub fn into_stream(self) -> Stream<T, L, Bounded, TotalOrder, ExactlyOnce>
919    where
920        B: IsBounded,
921    {
922        Stream::new(
923            self.location.clone(),
924            HydroNode::Cast {
925                inner: Box::new(self.ir_node.into_inner()),
926                metadata: self.location.new_node_metadata(Stream::<
927                    T,
928                    Tick<L>,
929                    Bounded,
930                    TotalOrder,
931                    ExactlyOnce,
932                >::collection_kind()),
933            },
934        )
935    }
936}
937
938impl<'a, T, L> Singleton<T, Tick<L>, Bounded>
939where
940    L: Location<'a>,
941{
942    /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
943    /// which will stream the value computed in _each_ tick as a separate stream element.
944    ///
945    /// Unlike [`Singleton::latest`], the value computed in each tick is emitted separately,
946    /// producing one element in the output for each tick. This is useful for batched computations,
947    /// where the results from each tick must be combined together.
948    ///
949    /// # Example
950    /// ```rust
951    /// # #[cfg(feature = "deploy")] {
952    /// # use hydro_lang::prelude::*;
953    /// # use futures::StreamExt;
954    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
955    /// let tick = process.tick();
956    /// # // ticks are lazy by default, forces the second tick to run
957    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
958    /// # let batch_first_tick = process
959    /// #   .source_iter(q!(vec![1]))
960    /// #   .batch(&tick, nondet!(/** test */));
961    /// # let batch_second_tick = process
962    /// #   .source_iter(q!(vec![1, 2, 3]))
963    /// #   .batch(&tick, nondet!(/** test */))
964    /// #   .defer_tick(); // appears on the second tick
965    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
966    /// input_batch // first tick: [1], second tick: [1, 2, 3]
967    ///     .count()
968    ///     .all_ticks()
969    /// # }, |mut stream| async move {
970    /// // [1, 3]
971    /// # for w in vec![1, 3] {
972    /// #     assert_eq!(stream.next().await.unwrap(), w);
973    /// # }
974    /// # }));
975    /// # }
976    /// ```
977    pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
978        self.into_stream().all_ticks()
979    }
980
981    /// Synchronously yields the value of this singleton outside the tick as an unbounded stream,
982    /// which will stream the value computed in _each_ tick as a separate stream element.
983    ///
984    /// Unlike [`Singleton::all_ticks`], this preserves synchronous execution, as the output stream
985    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
986    /// singleton's [`Tick`] context.
987    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
988        self.into_stream().all_ticks_atomic()
989    }
990
991    /// Asynchronously yields this singleton outside the tick as an unbounded singleton, which will
992    /// be asynchronously updated with the latest value of the singleton inside the tick.
993    ///
994    /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
995    /// tick that tracks the inner value. This is useful for getting the value as of the
996    /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
997    ///
998    /// # Example
999    /// ```rust
1000    /// # #[cfg(feature = "deploy")] {
1001    /// # use hydro_lang::prelude::*;
1002    /// # use futures::StreamExt;
1003    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1004    /// let tick = process.tick();
1005    /// # // ticks are lazy by default, forces the second tick to run
1006    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1007    /// # let batch_first_tick = process
1008    /// #   .source_iter(q!(vec![1]))
1009    /// #   .batch(&tick, nondet!(/** test */));
1010    /// # let batch_second_tick = process
1011    /// #   .source_iter(q!(vec![1, 2, 3]))
1012    /// #   .batch(&tick, nondet!(/** test */))
1013    /// #   .defer_tick(); // appears on the second tick
1014    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1015    /// input_batch // first tick: [1], second tick: [1, 2, 3]
1016    ///     .count()
1017    ///     .latest()
1018    /// # .sample_eager(nondet!(/** test */))
1019    /// # }, |mut stream| async move {
1020    /// // asynchronously changes from 1 ~> 3
1021    /// # for w in vec![1, 3] {
1022    /// #     assert_eq!(stream.next().await.unwrap(), w);
1023    /// # }
1024    /// # }));
1025    /// # }
1026    /// ```
1027    pub fn latest(self) -> Singleton<T, L, Unbounded> {
1028        Singleton::new(
1029            self.location.outer().clone(),
1030            HydroNode::YieldConcat {
1031                inner: Box::new(self.ir_node.into_inner()),
1032                metadata: self
1033                    .location
1034                    .outer()
1035                    .new_node_metadata(Singleton::<T, L, Unbounded>::collection_kind()),
1036            },
1037        )
1038    }
1039
1040    /// Synchronously yields this singleton outside the tick as an unbounded singleton, which will
1041    /// be updated with the latest value of the singleton inside the tick.
1042    ///
1043    /// Unlike [`Singleton::latest`], this preserves synchronous execution, as the output singleton
1044    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1045    /// singleton's [`Tick`] context.
1046    pub fn latest_atomic(self) -> Singleton<T, Atomic<L>, Unbounded> {
1047        let out_location = Atomic {
1048            tick: self.location.clone(),
1049        };
1050        Singleton::new(
1051            out_location.clone(),
1052            HydroNode::YieldConcat {
1053                inner: Box::new(self.ir_node.into_inner()),
1054                metadata: out_location
1055                    .new_node_metadata(Singleton::<T, Atomic<L>, Unbounded>::collection_kind()),
1056            },
1057        )
1058    }
1059}
1060
1061#[doc(hidden)]
1062/// Helper trait that determines the output collection type for [`Singleton::zip`].
1063///
1064/// The output will be an [`Optional`] if the second input is an [`Optional`], otherwise it is a
1065/// [`Singleton`].
1066#[sealed::sealed]
1067pub trait ZipResult<'a, Other> {
1068    /// The output collection type.
1069    type Out;
1070    /// The type of the tupled output value.
1071    type ElementType;
1072    /// The type of the other collection's value.
1073    type OtherType;
1074    /// The location where the tupled result will be materialized.
1075    type Location: Location<'a>;
1076
1077    /// The location of the second input to the `zip`.
1078    fn other_location(other: &Other) -> Self::Location;
1079    /// The IR node of the second input to the `zip`.
1080    fn other_ir_node(other: Other) -> HydroNode;
1081
1082    /// Constructs the output live collection given an IR node containing the zip result.
1083    fn make(location: Self::Location, ir_node: HydroNode) -> Self::Out;
1084}
1085
1086#[sealed::sealed]
1087impl<'a, T, U, L, B: Boundedness> ZipResult<'a, Singleton<U, L, B>> for Singleton<T, L, B>
1088where
1089    L: Location<'a>,
1090{
1091    type Out = Singleton<(T, U), L, B>;
1092    type ElementType = (T, U);
1093    type OtherType = U;
1094    type Location = L;
1095
1096    fn other_location(other: &Singleton<U, L, B>) -> L {
1097        other.location.clone()
1098    }
1099
1100    fn other_ir_node(other: Singleton<U, L, B>) -> HydroNode {
1101        other.ir_node.into_inner()
1102    }
1103
1104    fn make(location: L, ir_node: HydroNode) -> Self::Out {
1105        Singleton::new(
1106            location.clone(),
1107            HydroNode::Cast {
1108                inner: Box::new(ir_node),
1109                metadata: location.new_node_metadata(Self::Out::collection_kind()),
1110            },
1111        )
1112    }
1113}
1114
1115#[sealed::sealed]
1116impl<'a, T, U, L, B: Boundedness> ZipResult<'a, Optional<U, L, B>> for Singleton<T, L, B>
1117where
1118    L: Location<'a>,
1119{
1120    type Out = Optional<(T, U), L, B>;
1121    type ElementType = (T, U);
1122    type OtherType = U;
1123    type Location = L;
1124
1125    fn other_location(other: &Optional<U, L, B>) -> L {
1126        other.location.clone()
1127    }
1128
1129    fn other_ir_node(other: Optional<U, L, B>) -> HydroNode {
1130        other.ir_node.into_inner()
1131    }
1132
1133    fn make(location: L, ir_node: HydroNode) -> Self::Out {
1134        Optional::new(location, ir_node)
1135    }
1136}
1137
1138#[cfg(test)]
1139mod tests {
1140    #[cfg(feature = "deploy")]
1141    use futures::{SinkExt, StreamExt};
1142    #[cfg(feature = "deploy")]
1143    use hydro_deploy::Deployment;
1144    #[cfg(any(feature = "deploy", feature = "sim"))]
1145    use stageleft::q;
1146
1147    #[cfg(any(feature = "deploy", feature = "sim"))]
1148    use crate::compile::builder::FlowBuilder;
1149    #[cfg(feature = "deploy")]
1150    use crate::live_collections::stream::ExactlyOnce;
1151    #[cfg(any(feature = "deploy", feature = "sim"))]
1152    use crate::location::Location;
1153    #[cfg(any(feature = "deploy", feature = "sim"))]
1154    use crate::nondet::nondet;
1155
1156    #[cfg(feature = "deploy")]
1157    #[tokio::test]
1158    async fn tick_cycle_cardinality() {
1159        let mut deployment = Deployment::new();
1160
1161        let mut flow = FlowBuilder::new();
1162        let node = flow.process::<()>();
1163        let external = flow.external::<()>();
1164
1165        let (input_send, input) = node.source_external_bincode::<_, _, _, ExactlyOnce>(&external);
1166
1167        let node_tick = node.tick();
1168        let (complete_cycle, singleton) = node_tick.cycle_with_initial(node_tick.singleton(q!(0)));
1169        let counts = singleton
1170            .clone()
1171            .into_stream()
1172            .count()
1173            .filter_if_some(input.batch(&node_tick, nondet!(/** testing */)).first())
1174            .all_ticks()
1175            .send_bincode_external(&external);
1176        complete_cycle.complete_next_tick(singleton);
1177
1178        let nodes = flow
1179            .with_process(&node, deployment.Localhost())
1180            .with_external(&external, deployment.Localhost())
1181            .deploy(&mut deployment);
1182
1183        deployment.deploy().await.unwrap();
1184
1185        let mut tick_trigger = nodes.connect(input_send).await;
1186        let mut external_out = nodes.connect(counts).await;
1187
1188        deployment.start().await.unwrap();
1189
1190        tick_trigger.send(()).await.unwrap();
1191
1192        assert_eq!(external_out.next().await.unwrap(), 1);
1193
1194        tick_trigger.send(()).await.unwrap();
1195
1196        assert_eq!(external_out.next().await.unwrap(), 1);
1197    }
1198
1199    #[cfg(feature = "sim")]
1200    #[test]
1201    #[should_panic]
1202    fn sim_fold_intermediate_states() {
1203        let mut flow = FlowBuilder::new();
1204        let node = flow.process::<()>();
1205
1206        let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1207        let folded = source.fold(q!(|| 0), q!(|a, b| *a += b));
1208
1209        let tick = node.tick();
1210        let batch = folded.snapshot(&tick, nondet!(/** test */));
1211        let out_recv = batch.all_ticks().sim_output();
1212
1213        flow.sim().exhaustive(async || {
1214            assert_eq!(out_recv.next().await.unwrap(), 10);
1215        });
1216    }
1217
1218    #[cfg(feature = "sim")]
1219    #[test]
1220    fn sim_fold_intermediate_state_count() {
1221        let mut flow = FlowBuilder::new();
1222        let node = flow.process::<()>();
1223
1224        let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1225        let folded = source.fold(q!(|| 0), q!(|a, b| *a += b));
1226
1227        let tick = node.tick();
1228        let batch = folded.snapshot(&tick, nondet!(/** test */));
1229        let out_recv = batch.all_ticks().sim_output();
1230
1231        let instance_count = flow.sim().exhaustive(async || {
1232            let out = out_recv.collect::<Vec<_>>().await;
1233            assert_eq!(out.last(), Some(&10));
1234        });
1235
1236        assert_eq!(
1237            instance_count,
1238            16 // 2^4 possible subsets of intermediates (including initial state)
1239        )
1240    }
1241
1242    #[cfg(feature = "sim")]
1243    #[test]
1244    fn sim_fold_no_repeat_initial() {
1245        // check that we don't repeat the initial state of the fold in autonomous decisions
1246
1247        let mut flow = FlowBuilder::new();
1248        let node = flow.process::<()>();
1249
1250        let (in_port, input) = node.sim_input();
1251        let folded = input.fold(q!(|| 0), q!(|a, b| *a += b));
1252
1253        let tick = node.tick();
1254        let batch = folded.snapshot(&tick, nondet!(/** test */));
1255        let out_recv = batch.all_ticks().sim_output();
1256
1257        flow.sim().exhaustive(async || {
1258            assert_eq!(out_recv.next().await.unwrap(), 0);
1259
1260            in_port.send(123);
1261
1262            assert_eq!(out_recv.next().await.unwrap(), 123);
1263        });
1264    }
1265
1266    #[cfg(feature = "sim")]
1267    #[test]
1268    #[should_panic]
1269    fn sim_fold_repeats_snapshots() {
1270        // when the tick is driven by a snapshot AND something else, the snapshot can
1271        // "stutter" and repeat the same state multiple times
1272
1273        let mut flow = FlowBuilder::new();
1274        let node = flow.process::<()>();
1275
1276        let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1277        let folded = source.clone().fold(q!(|| 0), q!(|a, b| *a += b));
1278
1279        let tick = node.tick();
1280        let batch = source
1281            .batch(&tick, nondet!(/** test */))
1282            .cross_singleton(folded.snapshot(&tick, nondet!(/** test */)));
1283        let out_recv = batch.all_ticks().sim_output();
1284
1285        flow.sim().exhaustive(async || {
1286            if out_recv.next().await.unwrap() == (1, 3) && out_recv.next().await.unwrap() == (2, 3)
1287            {
1288                panic!("repeated snapshot");
1289            }
1290        });
1291    }
1292
1293    #[cfg(feature = "sim")]
1294    #[test]
1295    fn sim_fold_repeats_snapshots_count() {
1296        // check the number of instances
1297        let mut flow = FlowBuilder::new();
1298        let node = flow.process::<()>();
1299
1300        let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2])));
1301        let folded = source.clone().fold(q!(|| 0), q!(|a, b| *a += b));
1302
1303        let tick = node.tick();
1304        let batch = source
1305            .batch(&tick, nondet!(/** test */))
1306            .cross_singleton(folded.snapshot(&tick, nondet!(/** test */)));
1307        let out_recv = batch.all_ticks().sim_output();
1308
1309        let count = flow.sim().exhaustive(async || {
1310            let _ = out_recv.collect::<Vec<_>>().await;
1311        });
1312
1313        assert_eq!(count, 52);
1314        // don't have a combinatorial explanation for this number yet, but checked via logs
1315    }
1316
1317    #[cfg(feature = "sim")]
1318    #[test]
1319    fn sim_top_level_singleton_exhaustive() {
1320        // ensures that top-level singletons have only one snapshot
1321        let mut flow = FlowBuilder::new();
1322        let node = flow.process::<()>();
1323
1324        let singleton = node.singleton(q!(1));
1325        let tick = node.tick();
1326        let batch = singleton.snapshot(&tick, nondet!(/** test */));
1327        let out_recv = batch.all_ticks().sim_output();
1328
1329        let count = flow.sim().exhaustive(async || {
1330            let _ = out_recv.collect::<Vec<_>>().await;
1331        });
1332
1333        assert_eq!(count, 1);
1334    }
1335
1336    #[cfg(feature = "sim")]
1337    #[test]
1338    fn sim_top_level_singleton_join_count() {
1339        // if a tick consumes a static snapshot and a stream batch, only the batch require space
1340        // exploration
1341
1342        let mut flow = FlowBuilder::new();
1343        let node = flow.process::<()>();
1344
1345        let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1346        let tick = node.tick();
1347        let batch = source_iter
1348            .batch(&tick, nondet!(/** test */))
1349            .cross_singleton(node.singleton(q!(123)).clone_into_tick(&tick));
1350        let out_recv = batch.all_ticks().sim_output();
1351
1352        let instance_count = flow.sim().exhaustive(async || {
1353            let _ = out_recv.collect::<Vec<_>>().await;
1354        });
1355
1356        assert_eq!(
1357            instance_count,
1358            16 // 2^4 ways to split up (including a possibly empty first batch)
1359        )
1360    }
1361
1362    #[cfg(feature = "sim")]
1363    #[test]
1364    fn top_level_singleton_into_stream_no_replay() {
1365        let mut flow = FlowBuilder::new();
1366        let node = flow.process::<()>();
1367
1368        let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1369        let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1370
1371        let out_recv = folded.into_stream().sim_output();
1372
1373        flow.sim().exhaustive(async || {
1374            out_recv.assert_yields_only([10]).await;
1375        });
1376    }
1377}