1use sealed::sealed;
2use stageleft::{QuotedWithContext, q};
3
4#[cfg(stageleft_runtime)]
5use super::dynamic::DynLocation;
6use super::{Cluster, Location, LocationId, Process};
7use crate::compile::builder::{ClockId, FlowState};
8use crate::compile::ir::{HydroNode, HydroSource};
9#[cfg(stageleft_runtime)]
10use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial};
11use crate::forward_handle::{TickCycle, TickCycleHandle};
12use crate::live_collections::boundedness::{Bounded, Unbounded};
13use crate::live_collections::optional::Optional;
14use crate::live_collections::singleton::Singleton;
15use crate::live_collections::stream::{ExactlyOnce, Stream, TotalOrder};
16use crate::nondet::nondet;
17
18#[sealed]
19pub trait NoTick {}
20#[sealed]
21impl<T> NoTick for Process<'_, T> {}
22#[sealed]
23impl<T> NoTick for Cluster<'_, T> {}
24
25#[sealed]
26pub trait NoAtomic {}
27#[sealed]
28impl<T> NoAtomic for Process<'_, T> {}
29#[sealed]
30impl<T> NoAtomic for Cluster<'_, T> {}
31#[sealed]
32impl<'a, L> NoAtomic for Tick<L> where L: Location<'a> {}
33
34#[derive(Clone)]
35pub struct Atomic<Loc> {
36 pub(crate) tick: Tick<Loc>,
37}
38
39impl<L: DynLocation> DynLocation for Atomic<L> {
40 fn id(&self) -> LocationId {
41 LocationId::Atomic(Box::new(self.tick.id()))
42 }
43
44 fn flow_state(&self) -> &FlowState {
45 self.tick.flow_state()
46 }
47
48 fn is_top_level() -> bool {
49 L::is_top_level()
50 }
51
52 fn multiversioned(&self) -> bool {
53 self.tick.multiversioned()
54 }
55}
56
57impl<'a, L> Location<'a> for Atomic<L>
58where
59 L: Location<'a>,
60{
61 type Root = L::Root;
62
63 fn root(&self) -> Self::Root {
64 self.tick.root()
65 }
66}
67
68#[sealed]
69impl<L> NoTick for Atomic<L> {}
70
71pub trait DeferTick {
72 fn defer_tick(self) -> Self;
73}
74
75#[derive(Clone)]
77pub struct Tick<L> {
78 pub(crate) id: ClockId,
79 pub(crate) l: L,
81}
82
83impl<L: DynLocation> DynLocation for Tick<L> {
84 fn id(&self) -> LocationId {
85 LocationId::Tick(self.id, Box::new(self.l.id()))
86 }
87
88 fn flow_state(&self) -> &FlowState {
89 self.l.flow_state()
90 }
91
92 fn is_top_level() -> bool {
93 false
94 }
95
96 fn multiversioned(&self) -> bool {
97 self.l.multiversioned()
98 }
99}
100
101impl<'a, L> Location<'a> for Tick<L>
102where
103 L: Location<'a>,
104{
105 type Root = L::Root;
106
107 fn root(&self) -> Self::Root {
108 self.l.root()
109 }
110}
111
112impl<'a, L> Tick<L>
113where
114 L: Location<'a>,
115{
116 pub fn outer(&self) -> &L {
117 &self.l
118 }
119
120 pub fn spin_batch(
121 &self,
122 batch_size: impl QuotedWithContext<'a, usize, L> + Copy + 'a,
123 ) -> Stream<(), Self, Bounded, TotalOrder, ExactlyOnce>
124 where
125 L: NoTick,
126 {
127 let out = self
128 .l
129 .spin()
130 .flat_map_ordered(q!(move |_| 0..batch_size))
131 .map(q!(|_| ()));
132
133 out.batch(self, nondet!())
134 }
135
136 pub fn singleton<T>(
137 &self,
138 e: impl QuotedWithContext<'a, T, Tick<L>>,
139 ) -> Singleton<T, Self, Bounded>
140 where
141 T: Clone,
142 {
143 let e = e.splice_untyped_ctx(self);
144
145 Singleton::new(
146 self.clone(),
147 HydroNode::SingletonSource {
148 value: e.into(),
149 metadata: self.new_node_metadata(Singleton::<T, Self, Bounded>::collection_kind()),
150 },
151 )
152 }
153
154 pub fn none<T>(&self) -> Optional<T, Self, Bounded> {
173 let e = q!([]);
174 let e = QuotedWithContext::<'a, [(); 0], Self>::splice_typed_ctx(e, self);
175
176 let unit_optional: Optional<(), Self, Bounded> = Optional::new(
177 self.clone(),
178 HydroNode::Source {
179 source: HydroSource::Iter(e.into()),
180 metadata: self.new_node_metadata(Optional::<(), Self, Bounded>::collection_kind()),
181 },
182 );
183
184 unit_optional.map(q!(|_| unreachable!())) }
186
187 pub fn optional_first_tick<T: Clone>(
213 &self,
214 e: impl QuotedWithContext<'a, T, Tick<L>>,
215 ) -> Optional<T, Self, Bounded> {
216 let e_arr = q!([e]);
217 let e = e_arr.splice_untyped_ctx(self);
218
219 Optional::new(
220 self.clone(),
221 HydroNode::Batch {
222 inner: Box::new(HydroNode::Source {
223 source: HydroSource::Iter(e.into()),
224 metadata: self
225 .outer()
226 .new_node_metadata(Optional::<T, L, Unbounded>::collection_kind()),
227 }),
228 metadata: self.new_node_metadata(Optional::<T, Self, Bounded>::collection_kind()),
229 },
230 )
231 }
232
233 #[expect(
234 private_bounds,
235 reason = "only Hydro collections can implement ReceiverComplete"
236 )]
237 pub fn cycle<S>(&self) -> (TickCycleHandle<'a, S>, S)
238 where
239 S: CycleCollection<'a, TickCycle, Location = Self> + DeferTick,
240 L: NoTick,
241 {
242 let cycle_id = self.flow_state().borrow_mut().next_cycle_id();
243 (
244 TickCycleHandle::new(cycle_id, Location::id(self)),
245 S::create_source(cycle_id, self.clone()).defer_tick(),
246 )
247 }
248
249 #[expect(
250 private_bounds,
251 reason = "only Hydro collections can implement ReceiverComplete"
252 )]
253 pub fn cycle_with_initial<S>(&self, initial: S) -> (TickCycleHandle<'a, S>, S)
254 where
255 S: CycleCollectionWithInitial<'a, TickCycle, Location = Self>,
256 {
257 let cycle_id = self.flow_state().borrow_mut().next_cycle_id();
258 (
259 TickCycleHandle::new(cycle_id, Location::id(self)),
260 S::create_source_with_initial(cycle_id, initial, self.clone()),
262 )
263 }
264}
265
266#[cfg(test)]
267mod tests {
268 #[cfg(feature = "sim")]
269 use stageleft::q;
270
271 #[cfg(feature = "sim")]
272 use crate::live_collections::sliced::sliced;
273 #[cfg(feature = "sim")]
274 use crate::location::Location;
275 #[cfg(feature = "sim")]
276 use crate::nondet::nondet;
277 #[cfg(feature = "sim")]
278 use crate::prelude::FlowBuilder;
279
280 #[cfg(feature = "sim")]
281 #[test]
282 fn sim_atomic_stream() {
283 let mut flow = FlowBuilder::new();
284 let node = flow.process::<()>();
285
286 let (write_send, write_req) = node.sim_input();
287 let (read_send, read_req) = node.sim_input::<(), _, _>();
288
289 let tick = node.tick();
290 let atomic_write = write_req.atomic(&tick);
291 let current_state = atomic_write.clone().fold(
292 q!(|| 0),
293 q!(|state: &mut i32, v: i32| {
294 *state += v;
295 }),
296 );
297
298 let write_ack_recv = atomic_write.end_atomic().sim_output();
299 let read_response_recv = sliced! {
300 let batch_of_req = use(read_req, nondet!());
301 let latest_singleton = use::atomic(current_state, nondet!());
302 batch_of_req.cross_singleton(latest_singleton)
303 }
304 .sim_output();
305
306 let sim_compiled = flow.sim().compiled();
307 let instances = sim_compiled.exhaustive(async || {
308 write_send.send(1);
309 write_ack_recv.assert_yields([1]).await;
310 read_send.send(());
311 assert!(read_response_recv.next().await.is_some_and(|(_, v)| v >= 1));
312 });
313
314 assert_eq!(instances, 1);
315
316 let instances_read_before_write = sim_compiled.exhaustive(async || {
317 write_send.send(1);
318 read_send.send(());
319 write_ack_recv.assert_yields([1]).await;
320 let _ = read_response_recv.next().await;
321 });
322
323 assert_eq!(instances_read_before_write, 3); }
325
326 #[cfg(feature = "sim")]
327 #[test]
328 #[should_panic]
329 fn sim_non_atomic_stream() {
330 let mut flow = FlowBuilder::new();
332 let node = flow.process::<()>();
333
334 let (write_send, write_req) = node.sim_input();
335 let (read_send, read_req) = node.sim_input::<(), _, _>();
336
337 let current_state = write_req.clone().fold(
338 q!(|| 0),
339 q!(|state: &mut i32, v: i32| {
340 *state += v;
341 }),
342 );
343
344 let write_ack_recv = write_req.sim_output();
345
346 let read_response_recv = sliced! {
347 let batch_of_req = use(read_req, nondet!());
348 let latest_singleton = use(current_state, nondet!());
349 batch_of_req.cross_singleton(latest_singleton)
350 }
351 .sim_output();
352
353 flow.sim().exhaustive(async || {
354 write_send.send(1);
355 write_ack_recv.assert_yields([1]).await;
356 read_send.send(());
357
358 if let Some((_, v)) = read_response_recv.next().await {
359 assert_eq!(v, 1);
360 }
361 });
362 }
363}