Skip to main content

hydro_lang/live_collections/
optional.rs

1//! Definitions for the [`Optional`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::Deref;
6use std::rc::Rc;
7
8use stageleft::{IntoQuotedMut, QuotedWithContext, q};
9use syn::parse_quote;
10
11use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
12use super::singleton::Singleton;
13use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
14use crate::compile::builder::CycleId;
15use crate::compile::ir::{CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, TeeNode};
16#[cfg(stageleft_runtime)]
17use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
18use crate::forward_handle::{ForwardRef, TickCycle};
19#[cfg(stageleft_runtime)]
20use crate::location::dynamic::{DynLocation, LocationId};
21use crate::location::tick::{Atomic, DeferTick, NoAtomic};
22use crate::location::{Location, NoTick, Tick, check_matching_location};
23use crate::nondet::{NonDet, nondet};
24
25/// A *nullable* Rust value that can asynchronously change over time.
26///
27/// Optionals are the live collection equivalent of [`Option`]. If the optional is [`Bounded`],
28/// the value is frozen and will not change. But if it is [`Unbounded`], the value will
29/// asynchronously change over time, including becoming present of uninhabited.
30///
31/// Optionals are used in many of the same places as [`Singleton`], but when the value may be
32/// nullable. For example, the first element of a [`Stream`] is exposed as an [`Optional`].
33///
34/// Type Parameters:
35/// - `Type`: the type of the value in this optional (when it is not null)
36/// - `Loc`: the [`Location`] where the optional is materialized
37/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
38pub struct Optional<Type, Loc, Bound: Boundedness> {
39    pub(crate) location: Loc,
40    pub(crate) ir_node: RefCell<HydroNode>,
41
42    _phantom: PhantomData<(Type, Loc, Bound)>,
43}
44
45impl<'a, T, L> From<Optional<T, L, Bounded>> for Optional<T, L, Unbounded>
46where
47    T: Clone,
48    L: Location<'a> + NoTick,
49{
50    fn from(value: Optional<T, L, Bounded>) -> Self {
51        let tick = value.location().tick();
52        value.clone_into_tick(&tick).latest()
53    }
54}
55
56impl<'a, T, L> DeferTick for Optional<T, Tick<L>, Bounded>
57where
58    L: Location<'a>,
59{
60    fn defer_tick(self) -> Self {
61        Optional::defer_tick(self)
62    }
63}
64
65impl<'a, T, L> CycleCollection<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
66where
67    L: Location<'a>,
68{
69    type Location = Tick<L>;
70
71    fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
72        Optional::new(
73            location.clone(),
74            HydroNode::CycleSource {
75                cycle_id,
76                metadata: location.new_node_metadata(Self::collection_kind()),
77            },
78        )
79    }
80}
81
82impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
83where
84    L: Location<'a>,
85{
86    type Location = Tick<L>;
87
88    fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
89        let from_previous_tick: Optional<T, Tick<L>, Bounded> = Optional::new(
90            location.clone(),
91            HydroNode::DeferTick {
92                input: Box::new(HydroNode::CycleSource {
93                    cycle_id,
94                    metadata: location.new_node_metadata(Self::collection_kind()),
95                }),
96                metadata: location
97                    .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
98            },
99        );
100
101        from_previous_tick.or(initial.filter_if_some(location.optional_first_tick(q!(()))))
102    }
103}
104
105impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
106where
107    L: Location<'a>,
108{
109    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
110        assert_eq!(
111            Location::id(&self.location),
112            expected_location,
113            "locations do not match"
114        );
115        self.location
116            .flow_state()
117            .borrow_mut()
118            .push_root(HydroRoot::CycleSink {
119                cycle_id,
120                input: Box::new(self.ir_node.into_inner()),
121                op_metadata: HydroIrOpMetadata::new(),
122            });
123    }
124}
125
126impl<'a, T, L> CycleCollection<'a, ForwardRef> for Optional<T, Tick<L>, Bounded>
127where
128    L: Location<'a>,
129{
130    type Location = Tick<L>;
131
132    fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
133        Optional::new(
134            location.clone(),
135            HydroNode::CycleSource {
136                cycle_id,
137                metadata: location.new_node_metadata(Self::collection_kind()),
138            },
139        )
140    }
141}
142
143impl<'a, T, L> ReceiverComplete<'a, ForwardRef> for Optional<T, Tick<L>, Bounded>
144where
145    L: Location<'a>,
146{
147    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
148        assert_eq!(
149            Location::id(&self.location),
150            expected_location,
151            "locations do not match"
152        );
153        self.location
154            .flow_state()
155            .borrow_mut()
156            .push_root(HydroRoot::CycleSink {
157                cycle_id,
158                input: Box::new(self.ir_node.into_inner()),
159                op_metadata: HydroIrOpMetadata::new(),
160            });
161    }
162}
163
164impl<'a, T, L, B: Boundedness> CycleCollection<'a, ForwardRef> for Optional<T, L, B>
165where
166    L: Location<'a> + NoTick,
167{
168    type Location = L;
169
170    fn create_source(cycle_id: CycleId, location: L) -> Self {
171        Optional::new(
172            location.clone(),
173            HydroNode::CycleSource {
174                cycle_id,
175                metadata: location.new_node_metadata(Self::collection_kind()),
176            },
177        )
178    }
179}
180
181impl<'a, T, L, B: Boundedness> ReceiverComplete<'a, ForwardRef> for Optional<T, L, B>
182where
183    L: Location<'a> + NoTick,
184{
185    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
186        assert_eq!(
187            Location::id(&self.location),
188            expected_location,
189            "locations do not match"
190        );
191        self.location
192            .flow_state()
193            .borrow_mut()
194            .push_root(HydroRoot::CycleSink {
195                cycle_id,
196                input: Box::new(self.ir_node.into_inner()),
197                op_metadata: HydroIrOpMetadata::new(),
198            });
199    }
200}
201
202impl<'a, T, L, B: Boundedness> From<Singleton<T, L, B>> for Optional<T, L, B>
203where
204    L: Location<'a>,
205{
206    fn from(singleton: Singleton<T, L, B>) -> Self {
207        Optional::new(
208            singleton.location.clone(),
209            HydroNode::Cast {
210                inner: Box::new(singleton.ir_node.into_inner()),
211                metadata: singleton
212                    .location
213                    .new_node_metadata(Self::collection_kind()),
214            },
215        )
216    }
217}
218
219#[cfg(stageleft_runtime)]
220fn zip_inside_tick<'a, T, O, L: Location<'a>, B: Boundedness>(
221    me: Optional<T, L, B>,
222    other: Optional<O, L, B>,
223) -> Optional<(T, O), L, B> {
224    check_matching_location(&me.location, &other.location);
225
226    Optional::new(
227        me.location.clone(),
228        HydroNode::CrossSingleton {
229            left: Box::new(me.ir_node.into_inner()),
230            right: Box::new(other.ir_node.into_inner()),
231            metadata: me
232                .location
233                .new_node_metadata(Optional::<(T, O), L, B>::collection_kind()),
234        },
235    )
236}
237
238#[cfg(stageleft_runtime)]
239fn or_inside_tick<'a, T, L: Location<'a>, B: Boundedness>(
240    me: Optional<T, L, B>,
241    other: Optional<T, L, B>,
242) -> Optional<T, L, B> {
243    check_matching_location(&me.location, &other.location);
244
245    Optional::new(
246        me.location.clone(),
247        HydroNode::ChainFirst {
248            first: Box::new(me.ir_node.into_inner()),
249            second: Box::new(other.ir_node.into_inner()),
250            metadata: me
251                .location
252                .new_node_metadata(Optional::<T, L, B>::collection_kind()),
253        },
254    )
255}
256
257impl<'a, T, L, B: Boundedness> Clone for Optional<T, L, B>
258where
259    T: Clone,
260    L: Location<'a>,
261{
262    fn clone(&self) -> Self {
263        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
264            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
265            *self.ir_node.borrow_mut() = HydroNode::Tee {
266                inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
267                metadata: self.location.new_node_metadata(Self::collection_kind()),
268            };
269        }
270
271        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
272            Optional {
273                location: self.location.clone(),
274                ir_node: HydroNode::Tee {
275                    inner: TeeNode(inner.0.clone()),
276                    metadata: metadata.clone(),
277                }
278                .into(),
279                _phantom: PhantomData,
280            }
281        } else {
282            unreachable!()
283        }
284    }
285}
286
287impl<'a, T, L, B: Boundedness> Optional<T, L, B>
288where
289    L: Location<'a>,
290{
291    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
292        debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
293        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
294        Optional {
295            location,
296            ir_node: RefCell::new(ir_node),
297            _phantom: PhantomData,
298        }
299    }
300
301    pub(crate) fn collection_kind() -> CollectionKind {
302        CollectionKind::Optional {
303            bound: B::BOUND_KIND,
304            element_type: stageleft::quote_type::<T>().into(),
305        }
306    }
307
308    /// Returns the [`Location`] where this optional is being materialized.
309    pub fn location(&self) -> &L {
310        &self.location
311    }
312
313    /// Transforms the optional value by applying a function `f` to it,
314    /// continuously as the input is updated.
315    ///
316    /// Whenever the optional is empty, the output optional is also empty.
317    ///
318    /// # Example
319    /// ```rust
320    /// # #[cfg(feature = "deploy")] {
321    /// # use hydro_lang::prelude::*;
322    /// # use futures::StreamExt;
323    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
324    /// let tick = process.tick();
325    /// let optional = tick.optional_first_tick(q!(1));
326    /// optional.map(q!(|v| v + 1)).all_ticks()
327    /// # }, |mut stream| async move {
328    /// // 2
329    /// # assert_eq!(stream.next().await.unwrap(), 2);
330    /// # }));
331    /// # }
332    /// ```
333    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
334    where
335        F: Fn(T) -> U + 'a,
336    {
337        let f = f.splice_fn1_ctx(&self.location).into();
338        Optional::new(
339            self.location.clone(),
340            HydroNode::Map {
341                f,
342                input: Box::new(self.ir_node.into_inner()),
343                metadata: self
344                    .location
345                    .new_node_metadata(Optional::<U, L, B>::collection_kind()),
346            },
347        )
348    }
349
350    /// Transforms the optional value by applying a function `f` to it and then flattening
351    /// the result into a stream, preserving the order of elements.
352    ///
353    /// If the optional is empty, the output stream is also empty. If the optional contains
354    /// a value, `f` is applied to produce an iterator, and all items from that iterator
355    /// are emitted in the output stream in deterministic order.
356    ///
357    /// The implementation of [`Iterator`] for the output type `I` must produce items in a
358    /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
359    /// If the order is not deterministic, use [`Optional::flat_map_unordered`] instead.
360    ///
361    /// # Example
362    /// ```rust
363    /// # #[cfg(feature = "deploy")] {
364    /// # use hydro_lang::prelude::*;
365    /// # use futures::StreamExt;
366    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
367    /// let tick = process.tick();
368    /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
369    /// optional.flat_map_ordered(q!(|v| v)).all_ticks()
370    /// # }, |mut stream| async move {
371    /// // 1, 2, 3
372    /// # for w in vec![1, 2, 3] {
373    /// #     assert_eq!(stream.next().await.unwrap(), w);
374    /// # }
375    /// # }));
376    /// # }
377    /// ```
378    pub fn flat_map_ordered<U, I, F>(
379        self,
380        f: impl IntoQuotedMut<'a, F, L>,
381    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
382    where
383        I: IntoIterator<Item = U>,
384        F: Fn(T) -> I + 'a,
385    {
386        let f = f.splice_fn1_ctx(&self.location).into();
387        Stream::new(
388            self.location.clone(),
389            HydroNode::FlatMap {
390                f,
391                input: Box::new(self.ir_node.into_inner()),
392                metadata: self.location.new_node_metadata(
393                    Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
394                ),
395            },
396        )
397    }
398
399    /// Like [`Optional::flat_map_ordered`], but allows the implementation of [`Iterator`]
400    /// for the output type `I` to produce items in any order.
401    ///
402    /// If the optional is empty, the output stream is also empty. If the optional contains
403    /// a value, `f` is applied to produce an iterator, and all items from that iterator
404    /// are emitted in the output stream in non-deterministic order.
405    ///
406    /// # Example
407    /// ```rust
408    /// # #[cfg(feature = "deploy")] {
409    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
410    /// # use futures::StreamExt;
411    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
412    /// let tick = process.tick();
413    /// let optional = tick.optional_first_tick(q!(
414    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
415    /// ));
416    /// optional.flat_map_unordered(q!(|v| v)).all_ticks()
417    /// # }, |mut stream| async move {
418    /// // 1, 2, 3, but in no particular order
419    /// # let mut results = Vec::new();
420    /// # for _ in 0..3 {
421    /// #     results.push(stream.next().await.unwrap());
422    /// # }
423    /// # results.sort();
424    /// # assert_eq!(results, vec![1, 2, 3]);
425    /// # }));
426    /// # }
427    /// ```
428    pub fn flat_map_unordered<U, I, F>(
429        self,
430        f: impl IntoQuotedMut<'a, F, L>,
431    ) -> Stream<U, L, B, NoOrder, ExactlyOnce>
432    where
433        I: IntoIterator<Item = U>,
434        F: Fn(T) -> I + 'a,
435    {
436        let f = f.splice_fn1_ctx(&self.location).into();
437        Stream::new(
438            self.location.clone(),
439            HydroNode::FlatMap {
440                f,
441                input: Box::new(self.ir_node.into_inner()),
442                metadata: self
443                    .location
444                    .new_node_metadata(Stream::<U, L, B, NoOrder, ExactlyOnce>::collection_kind()),
445            },
446        )
447    }
448
449    /// Flattens the optional value into a stream, preserving the order of elements.
450    ///
451    /// If the optional is empty, the output stream is also empty. If the optional contains
452    /// a value that implements [`IntoIterator`], all items from that iterator are emitted
453    /// in the output stream in deterministic order.
454    ///
455    /// The implementation of [`Iterator`] for the element type `T` must produce items in a
456    /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
457    /// If the order is not deterministic, use [`Optional::flatten_unordered`] instead.
458    ///
459    /// # Example
460    /// ```rust
461    /// # #[cfg(feature = "deploy")] {
462    /// # use hydro_lang::prelude::*;
463    /// # use futures::StreamExt;
464    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
465    /// let tick = process.tick();
466    /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
467    /// optional.flatten_ordered().all_ticks()
468    /// # }, |mut stream| async move {
469    /// // 1, 2, 3
470    /// # for w in vec![1, 2, 3] {
471    /// #     assert_eq!(stream.next().await.unwrap(), w);
472    /// # }
473    /// # }));
474    /// # }
475    /// ```
476    pub fn flatten_ordered<U>(self) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
477    where
478        T: IntoIterator<Item = U>,
479    {
480        self.flat_map_ordered(q!(|v| v))
481    }
482
483    /// Like [`Optional::flatten_ordered`], but allows the implementation of [`Iterator`]
484    /// for the element type `T` to produce items in any order.
485    ///
486    /// If the optional is empty, the output stream is also empty. If the optional contains
487    /// a value that implements [`IntoIterator`], all items from that iterator are emitted
488    /// in the output stream in non-deterministic order.
489    ///
490    /// # Example
491    /// ```rust
492    /// # #[cfg(feature = "deploy")] {
493    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
494    /// # use futures::StreamExt;
495    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
496    /// let tick = process.tick();
497    /// let optional = tick.optional_first_tick(q!(
498    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
499    /// ));
500    /// optional.flatten_unordered().all_ticks()
501    /// # }, |mut stream| async move {
502    /// // 1, 2, 3, but in no particular order
503    /// # let mut results = Vec::new();
504    /// # for _ in 0..3 {
505    /// #     results.push(stream.next().await.unwrap());
506    /// # }
507    /// # results.sort();
508    /// # assert_eq!(results, vec![1, 2, 3]);
509    /// # }));
510    /// # }
511    /// ```
512    pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, ExactlyOnce>
513    where
514        T: IntoIterator<Item = U>,
515    {
516        self.flat_map_unordered(q!(|v| v))
517    }
518
519    /// Creates an optional containing only the value if it satisfies a predicate `f`.
520    ///
521    /// If the optional is empty, the output optional is also empty. If the optional contains
522    /// a value and the predicate returns `true`, the output optional contains the same value.
523    /// If the predicate returns `false`, the output optional is empty.
524    ///
525    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
526    /// not modify or take ownership of the value. If you need to modify the value while filtering
527    /// use [`Optional::filter_map`] instead.
528    ///
529    /// # Example
530    /// ```rust
531    /// # #[cfg(feature = "deploy")] {
532    /// # use hydro_lang::prelude::*;
533    /// # use futures::StreamExt;
534    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
535    /// let tick = process.tick();
536    /// let optional = tick.optional_first_tick(q!(5));
537    /// optional.filter(q!(|&x| x > 3)).all_ticks()
538    /// # }, |mut stream| async move {
539    /// // 5
540    /// # assert_eq!(stream.next().await.unwrap(), 5);
541    /// # }));
542    /// # }
543    /// ```
544    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
545    where
546        F: Fn(&T) -> bool + 'a,
547    {
548        let f = f.splice_fn1_borrow_ctx(&self.location).into();
549        Optional::new(
550            self.location.clone(),
551            HydroNode::Filter {
552                f,
553                input: Box::new(self.ir_node.into_inner()),
554                metadata: self.location.new_node_metadata(Self::collection_kind()),
555            },
556        )
557    }
558
559    /// An operator that both filters and maps. It yields only the value if the supplied
560    /// closure `f` returns `Some(value)`.
561    ///
562    /// If the optional is empty, the output optional is also empty. If the optional contains
563    /// a value and the closure returns `Some(new_value)`, the output optional contains `new_value`.
564    /// If the closure returns `None`, the output optional is empty.
565    ///
566    /// # Example
567    /// ```rust
568    /// # #[cfg(feature = "deploy")] {
569    /// # use hydro_lang::prelude::*;
570    /// # use futures::StreamExt;
571    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
572    /// let tick = process.tick();
573    /// let optional = tick.optional_first_tick(q!("42"));
574    /// optional
575    ///     .filter_map(q!(|s| s.parse::<i32>().ok()))
576    ///     .all_ticks()
577    /// # }, |mut stream| async move {
578    /// // 42
579    /// # assert_eq!(stream.next().await.unwrap(), 42);
580    /// # }));
581    /// # }
582    /// ```
583    pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
584    where
585        F: Fn(T) -> Option<U> + 'a,
586    {
587        let f = f.splice_fn1_ctx(&self.location).into();
588        Optional::new(
589            self.location.clone(),
590            HydroNode::FilterMap {
591                f,
592                input: Box::new(self.ir_node.into_inner()),
593                metadata: self
594                    .location
595                    .new_node_metadata(Optional::<U, L, B>::collection_kind()),
596            },
597        )
598    }
599
600    /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
601    ///
602    /// If the other value is a [`Optional`], the output will be non-null only if the argument is
603    /// non-null. This is useful for combining several pieces of state together.
604    ///
605    /// # Example
606    /// ```rust
607    /// # #[cfg(feature = "deploy")] {
608    /// # use hydro_lang::prelude::*;
609    /// # use futures::StreamExt;
610    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
611    /// let tick = process.tick();
612    /// let numbers = process
613    ///   .source_iter(q!(vec![123, 456, 789]))
614    ///   .batch(&tick, nondet!(/** test */));
615    /// let min = numbers.clone().min(); // Optional
616    /// let max = numbers.max(); // Optional
617    /// min.zip(max).all_ticks()
618    /// # }, |mut stream| async move {
619    /// // [(123, 789)]
620    /// # for w in vec![(123, 789)] {
621    /// #     assert_eq!(stream.next().await.unwrap(), w);
622    /// # }
623    /// # }));
624    /// # }
625    /// ```
626    pub fn zip<O>(self, other: impl Into<Optional<O, L, B>>) -> Optional<(T, O), L, B>
627    where
628        O: Clone,
629    {
630        let other: Optional<O, L, B> = other.into();
631        check_matching_location(&self.location, &other.location);
632
633        if L::is_top_level()
634            && let Some(tick) = self.location.try_tick()
635        {
636            let out = zip_inside_tick(
637                self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
638                other.snapshot(&tick, nondet!(/** eventually stabilizes */)),
639            )
640            .latest();
641
642            Optional::new(out.location, out.ir_node.into_inner())
643        } else {
644            zip_inside_tick(self, other)
645        }
646    }
647
648    /// Passes through `self` when it has a value, otherwise passes through `other`.
649    ///
650    /// Like [`Option::or`], this is helpful for defining a fallback for an [`Optional`], when the
651    /// fallback itself is an [`Optional`]. If the fallback is a [`Singleton`], you can use
652    /// [`Optional::unwrap_or`] to ensure that the output is always non-null.
653    ///
654    /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
655    /// of the inputs change (including to/from null states).
656    ///
657    /// # Example
658    /// ```rust
659    /// # #[cfg(feature = "deploy")] {
660    /// # use hydro_lang::prelude::*;
661    /// # use futures::StreamExt;
662    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
663    /// let tick = process.tick();
664    /// // ticks are lazy by default, forces the second tick to run
665    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
666    ///
667    /// let some_first_tick = tick.optional_first_tick(q!(123));
668    /// let some_second_tick = tick.optional_first_tick(q!(456)).defer_tick();
669    /// some_first_tick.or(some_second_tick).all_ticks()
670    /// # }, |mut stream| async move {
671    /// // [123 /* first tick */, 456 /* second tick */]
672    /// # for w in vec![123, 456] {
673    /// #     assert_eq!(stream.next().await.unwrap(), w);
674    /// # }
675    /// # }));
676    /// # }
677    /// ```
678    pub fn or(self, other: Optional<T, L, B>) -> Optional<T, L, B> {
679        check_matching_location(&self.location, &other.location);
680
681        if L::is_top_level()
682            && !B::BOUNDED // only if unbounded we need to use a tick
683            && let Some(tick) = self.location.try_tick()
684        {
685            let out = or_inside_tick(
686                self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
687                other.snapshot(&tick, nondet!(/** eventually stabilizes */)),
688            )
689            .latest();
690
691            Optional::new(out.location, out.ir_node.into_inner())
692        } else {
693            Optional::new(
694                self.location.clone(),
695                HydroNode::ChainFirst {
696                    first: Box::new(self.ir_node.into_inner()),
697                    second: Box::new(other.ir_node.into_inner()),
698                    metadata: self.location.new_node_metadata(Self::collection_kind()),
699                },
700            )
701        }
702    }
703
704    /// Gets the contents of `self` when it has a value, otherwise passes through `other`.
705    ///
706    /// Like [`Option::unwrap_or`], this is helpful for defining a fallback for an [`Optional`].
707    /// If the fallback is not always defined (an [`Optional`]), you can use [`Optional::or`].
708    ///
709    /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
710    /// of the inputs change (including to/from null states).
711    ///
712    /// # Example
713    /// ```rust
714    /// # #[cfg(feature = "deploy")] {
715    /// # use hydro_lang::prelude::*;
716    /// # use futures::StreamExt;
717    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
718    /// let tick = process.tick();
719    /// // ticks are lazy by default, forces the later ticks to run
720    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
721    ///
722    /// let some_first_tick = tick.optional_first_tick(q!(123));
723    /// some_first_tick
724    ///     .unwrap_or(tick.singleton(q!(456)))
725    ///     .all_ticks()
726    /// # }, |mut stream| async move {
727    /// // [123 /* first tick */, 456 /* second tick */, 456 /* third tick */, 456, ...]
728    /// # for w in vec![123, 456, 456, 456] {
729    /// #     assert_eq!(stream.next().await.unwrap(), w);
730    /// # }
731    /// # }));
732    /// # }
733    /// ```
734    pub fn unwrap_or(self, other: Singleton<T, L, B>) -> Singleton<T, L, B> {
735        let res_option = self.or(other.into());
736        Singleton::new(
737            res_option.location.clone(),
738            HydroNode::Cast {
739                inner: Box::new(res_option.ir_node.into_inner()),
740                metadata: res_option
741                    .location
742                    .new_node_metadata(Singleton::<T, L, B>::collection_kind()),
743            },
744        )
745    }
746
747    /// Converts this optional into a [`Singleton`] with a Rust [`Option`] as its contents.
748    ///
749    /// Useful for writing custom Rust code that needs to interact with both the null and non-null
750    /// states of the [`Optional`]. When possible, you should use the native APIs on [`Optional`]
751    /// so that Hydro can skip any computation on null values.
752    ///
753    /// # Example
754    /// ```rust
755    /// # #[cfg(feature = "deploy")] {
756    /// # use hydro_lang::prelude::*;
757    /// # use futures::StreamExt;
758    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
759    /// let tick = process.tick();
760    /// // ticks are lazy by default, forces the later ticks to run
761    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
762    ///
763    /// let some_first_tick = tick.optional_first_tick(q!(123));
764    /// some_first_tick.into_singleton().all_ticks()
765    /// # }, |mut stream| async move {
766    /// // [Some(123) /* first tick */, None /* second tick */, None /* third tick */, None, ...]
767    /// # for w in vec![Some(123), None, None, None] {
768    /// #     assert_eq!(stream.next().await.unwrap(), w);
769    /// # }
770    /// # }));
771    /// # }
772    /// ```
773    pub fn into_singleton(self) -> Singleton<Option<T>, L, B>
774    where
775        T: Clone,
776    {
777        let none: syn::Expr = parse_quote!(::std::option::Option::None);
778
779        let none_singleton = Singleton::new(
780            self.location.clone(),
781            HydroNode::SingletonSource {
782                value: none.into(),
783                metadata: self
784                    .location
785                    .new_node_metadata(Singleton::<Option<T>, L, B>::collection_kind()),
786            },
787        );
788
789        self.map(q!(|v| Some(v))).unwrap_or(none_singleton)
790    }
791
792    /// An operator which allows you to "name" a `HydroNode`.
793    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
794    pub fn ir_node_named(self, name: &str) -> Optional<T, L, B> {
795        {
796            let mut node = self.ir_node.borrow_mut();
797            let metadata = node.metadata_mut();
798            metadata.tag = Some(name.to_owned());
799        }
800        self
801    }
802
803    /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
804    /// implies that `B == Bounded`.
805    pub fn make_bounded(self) -> Optional<T, L, Bounded>
806    where
807        B: IsBounded,
808    {
809        Optional::new(self.location, self.ir_node.into_inner())
810    }
811
812    /// Clones this bounded optional into a tick, returning a optional that has the
813    /// same value as the outer optional. Because the outer optional is bounded, this
814    /// is deterministic because there is only a single immutable version.
815    pub fn clone_into_tick(self, tick: &Tick<L>) -> Optional<T, Tick<L>, Bounded>
816    where
817        B: IsBounded,
818        T: Clone,
819    {
820        // TODO(shadaj): avoid printing simulator logs for this snapshot
821        self.snapshot(
822            tick,
823            nondet!(/** bounded top-level optional so deterministic */),
824        )
825    }
826
827    /// Converts this optional into a [`Stream`] containing a single element, the value, if it is
828    /// non-null. Otherwise, the stream is empty.
829    ///
830    /// # Example
831    /// ```rust
832    /// # #[cfg(feature = "deploy")] {
833    /// # use hydro_lang::prelude::*;
834    /// # use futures::StreamExt;
835    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
836    /// # let tick = process.tick();
837    /// # // ticks are lazy by default, forces the second tick to run
838    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
839    /// # let batch_first_tick = process
840    /// #   .source_iter(q!(vec![]))
841    /// #   .batch(&tick, nondet!(/** test */));
842    /// # let batch_second_tick = process
843    /// #   .source_iter(q!(vec![123, 456]))
844    /// #   .batch(&tick, nondet!(/** test */))
845    /// #   .defer_tick(); // appears on the second tick
846    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
847    /// input_batch // first tick: [], second tick: [123, 456]
848    ///     .clone()
849    ///     .max()
850    ///     .into_stream()
851    ///     .chain(input_batch)
852    ///     .all_ticks()
853    /// # }, |mut stream| async move {
854    /// // [456, 123, 456]
855    /// # for w in vec![456, 123, 456] {
856    /// #     assert_eq!(stream.next().await.unwrap(), w);
857    /// # }
858    /// # }));
859    /// # }
860    /// ```
861    pub fn into_stream(self) -> Stream<T, L, Bounded, TotalOrder, ExactlyOnce>
862    where
863        B: IsBounded,
864    {
865        Stream::new(
866            self.location.clone(),
867            HydroNode::Cast {
868                inner: Box::new(self.ir_node.into_inner()),
869                metadata: self.location.new_node_metadata(Stream::<
870                    T,
871                    Tick<L>,
872                    Bounded,
873                    TotalOrder,
874                    ExactlyOnce,
875                >::collection_kind()),
876            },
877        )
878    }
879
880    /// Filters this optional, passing through the optional value if it is non-null **and** the
881    /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
882    ///
883    /// Useful for conditionally processing, such as only emitting an optional's value outside
884    /// a tick if some other condition is satisfied.
885    ///
886    /// # Example
887    /// ```rust
888    /// # #[cfg(feature = "deploy")] {
889    /// # use hydro_lang::prelude::*;
890    /// # use futures::StreamExt;
891    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
892    /// let tick = process.tick();
893    /// // ticks are lazy by default, forces the second tick to run
894    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
895    ///
896    /// let batch_first_tick = process
897    ///   .source_iter(q!(vec![]))
898    ///   .batch(&tick, nondet!(/** test */));
899    /// let batch_second_tick = process
900    ///   .source_iter(q!(vec![456]))
901    ///   .batch(&tick, nondet!(/** test */))
902    ///   .defer_tick(); // appears on the second tick
903    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
904    /// batch_first_tick.chain(batch_second_tick).first()
905    ///   .filter_if_some(some_on_first_tick)
906    ///   .unwrap_or(tick.singleton(q!(789)))
907    ///   .all_ticks()
908    /// # }, |mut stream| async move {
909    /// // [789, 789]
910    /// # for w in vec![789, 789] {
911    /// #     assert_eq!(stream.next().await.unwrap(), w);
912    /// # }
913    /// # }));
914    /// # }
915    /// ```
916    pub fn filter_if_some<U>(self, signal: Optional<U, L, B>) -> Optional<T, L, B>
917    where
918        B: IsBounded,
919    {
920        self.zip(signal.map(q!(|_u| ()))).map(q!(|(d, _signal)| d))
921    }
922
923    /// Filters this optional, passing through the optional value if it is non-null **and** the
924    /// argument (a [`Bounded`] [`Optional`]`) is _null_, otherwise the output is null.
925    ///
926    /// Useful for conditionally processing, such as only emitting an optional's value outside
927    /// a tick if some other condition is satisfied.
928    ///
929    /// # Example
930    /// ```rust
931    /// # #[cfg(feature = "deploy")] {
932    /// # use hydro_lang::prelude::*;
933    /// # use futures::StreamExt;
934    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
935    /// let tick = process.tick();
936    /// // ticks are lazy by default, forces the second tick to run
937    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
938    ///
939    /// let batch_first_tick = process
940    ///   .source_iter(q!(vec![]))
941    ///   .batch(&tick, nondet!(/** test */));
942    /// let batch_second_tick = process
943    ///   .source_iter(q!(vec![456]))
944    ///   .batch(&tick, nondet!(/** test */))
945    ///   .defer_tick(); // appears on the second tick
946    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
947    /// batch_first_tick.chain(batch_second_tick).first()
948    ///   .filter_if_none(some_on_first_tick)
949    ///   .unwrap_or(tick.singleton(q!(789)))
950    ///   .all_ticks()
951    /// # }, |mut stream| async move {
952    /// // [789, 789]
953    /// # for w in vec![789, 456] {
954    /// #     assert_eq!(stream.next().await.unwrap(), w);
955    /// # }
956    /// # }));
957    /// # }
958    /// ```
959    pub fn filter_if_none<U>(self, other: Optional<U, L, B>) -> Optional<T, L, B>
960    where
961        B: IsBounded,
962    {
963        self.filter_if_some(
964            other
965                .map(q!(|_| ()))
966                .into_singleton()
967                .filter(q!(|o| o.is_none())),
968        )
969    }
970
971    /// If `self` is null, emits a null optional, but if it non-null, emits `value`.
972    ///
973    /// Useful for gating the release of a [`Singleton`] on a condition of the [`Optional`]
974    /// having a value, such as only releasing a piece of state if the node is the leader.
975    ///
976    /// # Example
977    /// ```rust
978    /// # #[cfg(feature = "deploy")] {
979    /// # use hydro_lang::prelude::*;
980    /// # use futures::StreamExt;
981    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
982    /// let tick = process.tick();
983    /// // ticks are lazy by default, forces the second tick to run
984    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
985    ///
986    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
987    /// some_on_first_tick
988    ///     .if_some_then(tick.singleton(q!(456)))
989    ///     .unwrap_or(tick.singleton(q!(123)))
990    /// # .all_ticks()
991    /// # }, |mut stream| async move {
992    /// // 456 (first tick) ~> 123 (second tick onwards)
993    /// # for w in vec![456, 123, 123] {
994    /// #     assert_eq!(stream.next().await.unwrap(), w);
995    /// # }
996    /// # }));
997    /// # }
998    /// ```
999    pub fn if_some_then<U>(self, value: Singleton<U, L, B>) -> Optional<U, L, B>
1000    where
1001        B: IsBounded,
1002    {
1003        value.filter_if_some(self)
1004    }
1005}
1006
1007impl<'a, T, L, B: Boundedness> Optional<T, Atomic<L>, B>
1008where
1009    L: Location<'a> + NoTick,
1010{
1011    /// Returns an optional value corresponding to the latest snapshot of the optional
1012    /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
1013    /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
1014    /// all snapshots of this optional into the atomic-associated tick will observe the
1015    /// same value each tick.
1016    ///
1017    /// # Non-Determinism
1018    /// Because this picks a snapshot of a optional whose value is continuously changing,
1019    /// the output optional has a non-deterministic value since the snapshot can be at an
1020    /// arbitrary point in time.
1021    pub fn snapshot_atomic(self, _nondet: NonDet) -> Optional<T, Tick<L>, Bounded> {
1022        Optional::new(
1023            self.location.clone().tick,
1024            HydroNode::Batch {
1025                inner: Box::new(self.ir_node.into_inner()),
1026                metadata: self
1027                    .location
1028                    .tick
1029                    .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
1030            },
1031        )
1032    }
1033
1034    /// Returns this optional back into a top-level, asynchronous execution context where updates
1035    /// to the value will be asynchronously propagated.
1036    pub fn end_atomic(self) -> Optional<T, L, B> {
1037        Optional::new(
1038            self.location.tick.l.clone(),
1039            HydroNode::EndAtomic {
1040                inner: Box::new(self.ir_node.into_inner()),
1041                metadata: self
1042                    .location
1043                    .tick
1044                    .l
1045                    .new_node_metadata(Optional::<T, L, B>::collection_kind()),
1046            },
1047        )
1048    }
1049}
1050
1051impl<'a, T, L, B: Boundedness> Optional<T, L, B>
1052where
1053    L: Location<'a>,
1054{
1055    /// Shifts this optional into an atomic context, which guarantees that any downstream logic
1056    /// will observe the same version of the value and will be executed synchronously before any
1057    /// outputs are yielded (in [`Optional::end_atomic`]).
1058    ///
1059    /// This is useful to enforce local consistency constraints, such as ensuring that several readers
1060    /// see a consistent version of local state (since otherwise each [`Optional::snapshot`] may pick
1061    /// a different version).
1062    ///
1063    /// Entering an atomic section requires a [`Tick`] argument that declares where the optional will
1064    /// be atomically processed. Snapshotting an optional into the _same_ [`Tick`] will preserve the
1065    /// synchronous execution, and all such snapshots in the same [`Tick`] will have the same value.
1066    pub fn atomic(self, tick: &Tick<L>) -> Optional<T, Atomic<L>, B> {
1067        let out_location = Atomic { tick: tick.clone() };
1068        Optional::new(
1069            out_location.clone(),
1070            HydroNode::BeginAtomic {
1071                inner: Box::new(self.ir_node.into_inner()),
1072                metadata: out_location
1073                    .new_node_metadata(Optional::<T, Atomic<L>, B>::collection_kind()),
1074            },
1075        )
1076    }
1077
1078    /// Given a tick, returns a optional value corresponding to a snapshot of the optional
1079    /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
1080    /// relevant data that contributed to the snapshot at tick `t`.
1081    ///
1082    /// # Non-Determinism
1083    /// Because this picks a snapshot of a optional whose value is continuously changing,
1084    /// the output optional has a non-deterministic value since the snapshot can be at an
1085    /// arbitrary point in time.
1086    pub fn snapshot(self, tick: &Tick<L>, _nondet: NonDet) -> Optional<T, Tick<L>, Bounded> {
1087        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1088        Optional::new(
1089            tick.clone(),
1090            HydroNode::Batch {
1091                inner: Box::new(self.ir_node.into_inner()),
1092                metadata: tick
1093                    .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
1094            },
1095        )
1096    }
1097
1098    /// Eagerly samples the optional as fast as possible, returning a stream of snapshots
1099    /// with order corresponding to increasing prefixes of data contributing to the optional.
1100    ///
1101    /// # Non-Determinism
1102    /// At runtime, the optional will be arbitrarily sampled as fast as possible, but due
1103    /// to non-deterministic batching and arrival of inputs, the output stream is
1104    /// non-deterministic.
1105    pub fn sample_eager(self, nondet: NonDet) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
1106    where
1107        L: NoTick,
1108    {
1109        let tick = self.location.tick();
1110        self.snapshot(&tick, nondet).all_ticks().weaken_retries()
1111    }
1112
1113    /// Given a time interval, returns a stream corresponding to snapshots of the optional
1114    /// value taken at various points in time. Because the input optional may be
1115    /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
1116    /// represent the value of the optional given some prefix of the streams leading up to
1117    /// it.
1118    ///
1119    /// # Non-Determinism
1120    /// The output stream is non-deterministic in which elements are sampled, since this
1121    /// is controlled by a clock.
1122    pub fn sample_every(
1123        self,
1124        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1125        nondet: NonDet,
1126    ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
1127    where
1128        L: NoTick + NoAtomic,
1129    {
1130        let samples = self.location.source_interval(interval, nondet);
1131        let tick = self.location.tick();
1132
1133        self.snapshot(&tick, nondet)
1134            .filter_if_some(samples.batch(&tick, nondet).first())
1135            .all_ticks()
1136            .weaken_retries()
1137    }
1138}
1139
1140impl<'a, T, L> Optional<T, Tick<L>, Bounded>
1141where
1142    L: Location<'a>,
1143{
1144    /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
1145    /// which will stream the value computed in _each_ tick as a separate stream element (skipping
1146    /// null values).
1147    ///
1148    /// Unlike [`Optional::latest`], the value computed in each tick is emitted separately,
1149    /// producing one element in the output for each (non-null) tick. This is useful for batched
1150    /// computations, where the results from each tick must be combined together.
1151    ///
1152    /// # Example
1153    /// ```rust
1154    /// # #[cfg(feature = "deploy")] {
1155    /// # use hydro_lang::prelude::*;
1156    /// # use futures::StreamExt;
1157    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1158    /// # let tick = process.tick();
1159    /// # // ticks are lazy by default, forces the second tick to run
1160    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1161    /// # let batch_first_tick = process
1162    /// #   .source_iter(q!(vec![]))
1163    /// #   .batch(&tick, nondet!(/** test */));
1164    /// # let batch_second_tick = process
1165    /// #   .source_iter(q!(vec![1, 2, 3]))
1166    /// #   .batch(&tick, nondet!(/** test */))
1167    /// #   .defer_tick(); // appears on the second tick
1168    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1169    /// input_batch // first tick: [], second tick: [1, 2, 3]
1170    ///     .max()
1171    ///     .all_ticks()
1172    /// # }, |mut stream| async move {
1173    /// // [3]
1174    /// # for w in vec![3] {
1175    /// #     assert_eq!(stream.next().await.unwrap(), w);
1176    /// # }
1177    /// # }));
1178    /// # }
1179    /// ```
1180    pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
1181        self.into_stream().all_ticks()
1182    }
1183
1184    /// Synchronously yields the value of this optional outside the tick as an unbounded stream,
1185    /// which will stream the value computed in _each_ tick as a separate stream element.
1186    ///
1187    /// Unlike [`Optional::all_ticks`], this preserves synchronous execution, as the output stream
1188    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1189    /// optional's [`Tick`] context.
1190    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
1191        self.into_stream().all_ticks_atomic()
1192    }
1193
1194    /// Asynchronously yields this optional outside the tick as an unbounded optional, which will
1195    /// be asynchronously updated with the latest value of the optional inside the tick, including
1196    /// whether the optional is null or not.
1197    ///
1198    /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
1199    /// tick that tracks the inner value. This is useful for getting the value as of the
1200    /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
1201    ///
1202    /// # Example
1203    /// ```rust
1204    /// # #[cfg(feature = "deploy")] {
1205    /// # use hydro_lang::prelude::*;
1206    /// # use futures::StreamExt;
1207    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1208    /// # let tick = process.tick();
1209    /// # // ticks are lazy by default, forces the second tick to run
1210    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1211    /// # let batch_first_tick = process
1212    /// #   .source_iter(q!(vec![]))
1213    /// #   .batch(&tick, nondet!(/** test */));
1214    /// # let batch_second_tick = process
1215    /// #   .source_iter(q!(vec![1, 2, 3]))
1216    /// #   .batch(&tick, nondet!(/** test */))
1217    /// #   .defer_tick(); // appears on the second tick
1218    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1219    /// input_batch // first tick: [], second tick: [1, 2, 3]
1220    ///     .max()
1221    ///     .latest()
1222    /// # .into_singleton()
1223    /// # .sample_eager(nondet!(/** test */))
1224    /// # }, |mut stream| async move {
1225    /// // asynchronously changes from None ~> 3
1226    /// # for w in vec![None, Some(3)] {
1227    /// #     assert_eq!(stream.next().await.unwrap(), w);
1228    /// # }
1229    /// # }));
1230    /// # }
1231    /// ```
1232    pub fn latest(self) -> Optional<T, L, Unbounded> {
1233        Optional::new(
1234            self.location.outer().clone(),
1235            HydroNode::YieldConcat {
1236                inner: Box::new(self.ir_node.into_inner()),
1237                metadata: self
1238                    .location
1239                    .outer()
1240                    .new_node_metadata(Optional::<T, L, Unbounded>::collection_kind()),
1241            },
1242        )
1243    }
1244
1245    /// Synchronously yields this optional outside the tick as an unbounded optional, which will
1246    /// be updated with the latest value of the optional inside the tick.
1247    ///
1248    /// Unlike [`Optional::latest`], this preserves synchronous execution, as the output optional
1249    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1250    /// optional's [`Tick`] context.
1251    pub fn latest_atomic(self) -> Optional<T, Atomic<L>, Unbounded> {
1252        let out_location = Atomic {
1253            tick: self.location.clone(),
1254        };
1255
1256        Optional::new(
1257            out_location.clone(),
1258            HydroNode::YieldConcat {
1259                inner: Box::new(self.ir_node.into_inner()),
1260                metadata: out_location
1261                    .new_node_metadata(Optional::<T, Atomic<L>, Unbounded>::collection_kind()),
1262            },
1263        )
1264    }
1265
1266    /// Shifts the state in `self` to the **next tick**, so that the returned optional at tick `T`
1267    /// always has the state of `self` at tick `T - 1`.
1268    ///
1269    /// At tick `0`, the output optional is null, since there is no previous tick.
1270    ///
1271    /// This operator enables stateful iterative processing with ticks, by sending data from one
1272    /// tick to the next. For example, you can use it to compare state across consecutive batches.
1273    ///
1274    /// # Example
1275    /// ```rust
1276    /// # #[cfg(feature = "deploy")] {
1277    /// # use hydro_lang::prelude::*;
1278    /// # use futures::StreamExt;
1279    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1280    /// let tick = process.tick();
1281    /// // ticks are lazy by default, forces the second tick to run
1282    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1283    ///
1284    /// let batch_first_tick = process
1285    ///   .source_iter(q!(vec![1, 2]))
1286    ///   .batch(&tick, nondet!(/** test */));
1287    /// let batch_second_tick = process
1288    ///   .source_iter(q!(vec![3, 4]))
1289    ///   .batch(&tick, nondet!(/** test */))
1290    ///   .defer_tick(); // appears on the second tick
1291    /// let current_tick_sum = batch_first_tick.chain(batch_second_tick)
1292    ///   .reduce(q!(|state, v| *state += v));
1293    ///
1294    /// current_tick_sum.clone().into_singleton().zip(
1295    ///   current_tick_sum.defer_tick().into_singleton() // state from previous tick
1296    /// ).all_ticks()
1297    /// # }, |mut stream| async move {
1298    /// // [(Some(3), None) /* first tick */, (Some(7), Some(3)) /* second tick */]
1299    /// # for w in vec![(Some(3), None), (Some(7), Some(3))] {
1300    /// #     assert_eq!(stream.next().await.unwrap(), w);
1301    /// # }
1302    /// # }));
1303    /// # }
1304    /// ```
1305    pub fn defer_tick(self) -> Optional<T, Tick<L>, Bounded> {
1306        Optional::new(
1307            self.location.clone(),
1308            HydroNode::DeferTick {
1309                input: Box::new(self.ir_node.into_inner()),
1310                metadata: self.location.new_node_metadata(Self::collection_kind()),
1311            },
1312        )
1313    }
1314}
1315
1316#[cfg(test)]
1317mod tests {
1318    #[cfg(feature = "deploy")]
1319    use futures::StreamExt;
1320    #[cfg(feature = "deploy")]
1321    use hydro_deploy::Deployment;
1322    #[cfg(any(feature = "deploy", feature = "sim"))]
1323    use stageleft::q;
1324
1325    #[cfg(feature = "deploy")]
1326    use super::Optional;
1327    #[cfg(any(feature = "deploy", feature = "sim"))]
1328    use crate::compile::builder::FlowBuilder;
1329    #[cfg(any(feature = "deploy", feature = "sim"))]
1330    use crate::location::Location;
1331    #[cfg(feature = "deploy")]
1332    use crate::nondet::nondet;
1333
1334    #[cfg(feature = "deploy")]
1335    #[tokio::test]
1336    async fn optional_or_cardinality() {
1337        let mut deployment = Deployment::new();
1338
1339        let mut flow = FlowBuilder::new();
1340        let node = flow.process::<()>();
1341        let external = flow.external::<()>();
1342
1343        let node_tick = node.tick();
1344        let tick_singleton = node_tick.singleton(q!(123));
1345        let tick_optional_inhabited: Optional<_, _, _> = tick_singleton.into();
1346        let counts = tick_optional_inhabited
1347            .clone()
1348            .or(tick_optional_inhabited)
1349            .into_stream()
1350            .count()
1351            .all_ticks()
1352            .send_bincode_external(&external);
1353
1354        let nodes = flow
1355            .with_process(&node, deployment.Localhost())
1356            .with_external(&external, deployment.Localhost())
1357            .deploy(&mut deployment);
1358
1359        deployment.deploy().await.unwrap();
1360
1361        let mut external_out = nodes.connect(counts).await;
1362
1363        deployment.start().await.unwrap();
1364
1365        assert_eq!(external_out.next().await.unwrap(), 1);
1366    }
1367
1368    #[cfg(feature = "deploy")]
1369    #[tokio::test]
1370    async fn into_singleton_top_level_none_cardinality() {
1371        let mut deployment = Deployment::new();
1372
1373        let mut flow = FlowBuilder::new();
1374        let node = flow.process::<()>();
1375        let external = flow.external::<()>();
1376
1377        let node_tick = node.tick();
1378        let top_level_none = node.singleton(q!(123)).filter(q!(|_| false));
1379        let into_singleton = top_level_none.into_singleton();
1380
1381        let tick_driver = node.spin();
1382
1383        let counts = into_singleton
1384            .snapshot(&node_tick, nondet!(/** test */))
1385            .into_stream()
1386            .count()
1387            .zip(tick_driver.batch(&node_tick, nondet!(/** test */)).count())
1388            .map(q!(|(c, _)| c))
1389            .all_ticks()
1390            .send_bincode_external(&external);
1391
1392        let nodes = flow
1393            .with_process(&node, deployment.Localhost())
1394            .with_external(&external, deployment.Localhost())
1395            .deploy(&mut deployment);
1396
1397        deployment.deploy().await.unwrap();
1398
1399        let mut external_out = nodes.connect(counts).await;
1400
1401        deployment.start().await.unwrap();
1402
1403        assert_eq!(external_out.next().await.unwrap(), 1);
1404        assert_eq!(external_out.next().await.unwrap(), 1);
1405        assert_eq!(external_out.next().await.unwrap(), 1);
1406    }
1407
1408    #[cfg(feature = "deploy")]
1409    #[tokio::test]
1410    async fn into_singleton_unbounded_top_level_none_cardinality() {
1411        let mut deployment = Deployment::new();
1412
1413        let mut flow = FlowBuilder::new();
1414        let node = flow.process::<()>();
1415        let external = flow.external::<()>();
1416
1417        let node_tick = node.tick();
1418        let top_level_none = node_tick.singleton(q!(123)).latest().filter(q!(|_| false));
1419        let into_singleton = top_level_none.into_singleton();
1420
1421        let tick_driver = node.spin();
1422
1423        let counts = into_singleton
1424            .snapshot(&node_tick, nondet!(/** test */))
1425            .into_stream()
1426            .count()
1427            .zip(tick_driver.batch(&node_tick, nondet!(/** test */)).count())
1428            .map(q!(|(c, _)| c))
1429            .all_ticks()
1430            .send_bincode_external(&external);
1431
1432        let nodes = flow
1433            .with_process(&node, deployment.Localhost())
1434            .with_external(&external, deployment.Localhost())
1435            .deploy(&mut deployment);
1436
1437        deployment.deploy().await.unwrap();
1438
1439        let mut external_out = nodes.connect(counts).await;
1440
1441        deployment.start().await.unwrap();
1442
1443        assert_eq!(external_out.next().await.unwrap(), 1);
1444        assert_eq!(external_out.next().await.unwrap(), 1);
1445        assert_eq!(external_out.next().await.unwrap(), 1);
1446    }
1447
1448    #[cfg(feature = "sim")]
1449    #[test]
1450    fn top_level_optional_some_into_stream_no_replay() {
1451        let mut flow = FlowBuilder::new();
1452        let node = flow.process::<()>();
1453
1454        let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1455        let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1456        let filtered_some = folded.filter(q!(|_| true));
1457
1458        let out_recv = filtered_some.into_stream().sim_output();
1459
1460        flow.sim().exhaustive(async || {
1461            out_recv.assert_yields_only([10]).await;
1462        });
1463    }
1464
1465    #[cfg(feature = "sim")]
1466    #[test]
1467    fn top_level_optional_none_into_stream_no_replay() {
1468        let mut flow = FlowBuilder::new();
1469        let node = flow.process::<()>();
1470
1471        let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1472        let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1473        let filtered_none = folded.filter(q!(|_| false));
1474
1475        let out_recv = filtered_none.into_stream().sim_output();
1476
1477        flow.sim().exhaustive(async || {
1478            out_recv.assert_yields_only([] as [i32; 0]).await;
1479        });
1480    }
1481}