hydro_lang/
forward_handle.rs1use sealed::sealed;
4
5use crate::compile::builder::CycleId;
6use crate::location::Location;
7use crate::location::dynamic::LocationId;
8use crate::staging_util::Invariant;
9
10#[sealed]
11pub(crate) trait ReceiverKind {}
12
13pub enum ForwardRef {}
18
19#[sealed]
20impl ReceiverKind for ForwardRef {}
21
22pub enum TickCycle {}
27
28#[sealed]
29impl ReceiverKind for TickCycle {}
30
31pub(crate) trait ReceiverComplete<'a, Marker>
32where
33 Marker: ReceiverKind,
34{
35 fn complete(self, cycle_id: CycleId, expected_location: LocationId);
36}
37
38pub(crate) trait CycleCollection<'a, Kind>: ReceiverComplete<'a, Kind>
39where
40 Kind: ReceiverKind,
41{
42 type Location: Location<'a>;
43
44 fn create_source(id: CycleId, location: Self::Location) -> Self;
45}
46
47pub(crate) trait CycleCollectionWithInitial<'a, Kind>: ReceiverComplete<'a, Kind>
48where
49 Kind: ReceiverKind,
50{
51 type Location: Location<'a>;
52
53 fn create_source_with_initial(
54 cycle_id: CycleId,
55 initial: Self,
56 location: Self::Location,
57 ) -> Self;
58}
59
60#[expect(
64 private_bounds,
65 reason = "only Hydro collections can implement ReceiverComplete"
66)]
67pub struct ForwardHandle<'a, C: ReceiverComplete<'a, ForwardRef>> {
68 completed: bool,
69 cycle_id: CycleId,
70 expected_location: LocationId,
71 _phantom: Invariant<'a, C>,
72}
73
74#[expect(
75 private_bounds,
76 reason = "only Hydro collections can implement ReceiverComplete"
77)]
78impl<'a, C: ReceiverComplete<'a, ForwardRef>> ForwardHandle<'a, C> {
79 pub(crate) fn new(cycle_id: CycleId, expected_location: LocationId) -> Self {
80 Self {
81 completed: false,
82 cycle_id,
83 expected_location,
84 _phantom: std::marker::PhantomData,
85 }
86 }
87}
88
89impl<'a, C: ReceiverComplete<'a, ForwardRef>> Drop for ForwardHandle<'a, C> {
90 fn drop(&mut self) {
91 if !self.completed && !std::thread::panicking() {
92 panic!("ForwardHandle dropped without being completed");
93 }
94 }
95}
96
97#[expect(
98 private_bounds,
99 reason = "only Hydro collections can implement ReceiverComplete"
100)]
101impl<'a, C: ReceiverComplete<'a, ForwardRef>> ForwardHandle<'a, C> {
102 pub fn complete(mut self, stream: impl Into<C>) {
109 self.completed = true;
110 C::complete(stream.into(), self.cycle_id, self.expected_location.clone())
111 }
112}
113
114#[expect(
118 private_bounds,
119 reason = "only Hydro collections can implement ReceiverComplete"
120)]
121pub struct TickCycleHandle<'a, C: ReceiverComplete<'a, TickCycle>> {
122 completed: bool,
123 cycle_id: CycleId,
124 expected_location: LocationId,
125 _phantom: Invariant<'a, C>,
126}
127
128#[expect(
129 private_bounds,
130 reason = "only Hydro collections can implement ReceiverComplete"
131)]
132impl<'a, C: ReceiverComplete<'a, TickCycle>> TickCycleHandle<'a, C> {
133 pub(crate) fn new(cycle_id: CycleId, expected_location: LocationId) -> Self {
134 Self {
135 completed: false,
136 cycle_id,
137 expected_location,
138 _phantom: std::marker::PhantomData,
139 }
140 }
141}
142
143impl<'a, C: ReceiverComplete<'a, TickCycle>> Drop for TickCycleHandle<'a, C> {
144 fn drop(&mut self) {
145 if !self.completed && !std::thread::panicking() {
146 panic!("TickCycleHandle dropped without being completed");
147 }
148 }
149}
150
151#[expect(
152 private_bounds,
153 reason = "only Hydro collections can implement ReceiverComplete"
154)]
155impl<'a, C: ReceiverComplete<'a, TickCycle>> TickCycleHandle<'a, C> {
156 pub fn complete_next_tick(mut self, stream: impl Into<C>) {
160 self.completed = true;
161 C::complete(stream.into(), self.cycle_id, self.expected_location.clone())
162 }
163}
164
165#[doc(hidden)]
168pub trait CompleteCycle<S> {
169 fn complete_next_tick(self, state: S);
171}
172
173impl<'a, C: ReceiverComplete<'a, TickCycle>> CompleteCycle<C> for TickCycleHandle<'a, C> {
174 fn complete_next_tick(self, state: C) {
175 TickCycleHandle::complete_next_tick(self, state)
176 }
177}