1use core::panic;
2use std::cell::RefCell;
3use std::collections::HashMap;
4use std::fmt::{Debug, Display};
5use std::hash::{Hash, Hasher};
6use std::ops::Deref;
7use std::rc::Rc;
8
9#[cfg(feature = "build")]
10use dfir_lang::graph::FlatGraphBuilder;
11#[cfg(feature = "build")]
12use proc_macro2::Span;
13use proc_macro2::TokenStream;
14use quote::ToTokens;
15#[cfg(feature = "build")]
16use quote::quote;
17#[cfg(feature = "build")]
18use slotmap::{SecondaryMap, SparseSecondaryMap};
19#[cfg(feature = "build")]
20use syn::parse_quote;
21use syn::visit::{self, Visit};
22use syn::visit_mut::VisitMut;
23
24use crate::compile::builder::{CycleId, ExternalPortId};
25#[cfg(feature = "build")]
26use crate::compile::deploy_provider::{Deploy, Node, RegisterPort};
27use crate::location::dynamic::LocationId;
28use crate::location::{LocationKey, NetworkHint};
29
30pub mod backtrace;
31use backtrace::Backtrace;
32
33#[derive(Clone, Hash)]
37pub struct DebugExpr(pub Box<syn::Expr>);
38
39impl From<syn::Expr> for DebugExpr {
40 fn from(expr: syn::Expr) -> Self {
41 Self(Box::new(expr))
42 }
43}
44
45impl Deref for DebugExpr {
46 type Target = syn::Expr;
47
48 fn deref(&self) -> &Self::Target {
49 &self.0
50 }
51}
52
53impl ToTokens for DebugExpr {
54 fn to_tokens(&self, tokens: &mut TokenStream) {
55 self.0.to_tokens(tokens);
56 }
57}
58
59impl Debug for DebugExpr {
60 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61 write!(f, "{}", self.0.to_token_stream())
62 }
63}
64
65impl Display for DebugExpr {
66 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67 let original = self.0.as_ref().clone();
68 let simplified = simplify_q_macro(original);
69
70 write!(f, "q!({})", quote::quote!(#simplified))
73 }
74}
75
76fn simplify_q_macro(mut expr: syn::Expr) -> syn::Expr {
78 let mut simplifier = QMacroSimplifier::new();
81 simplifier.visit_expr_mut(&mut expr);
82
83 if let Some(simplified) = simplifier.simplified_result {
85 simplified
86 } else {
87 expr
88 }
89}
90
91#[derive(Default)]
93pub struct QMacroSimplifier {
94 pub simplified_result: Option<syn::Expr>,
95}
96
97impl QMacroSimplifier {
98 pub fn new() -> Self {
99 Self::default()
100 }
101}
102
103impl VisitMut for QMacroSimplifier {
104 fn visit_expr_mut(&mut self, expr: &mut syn::Expr) {
105 if self.simplified_result.is_some() {
107 return;
108 }
109
110 if let syn::Expr::Call(call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
111 && self.is_stageleft_runtime_support_call(&path_expr.path)
113 && let Some(closure) = self.extract_closure_from_args(&call.args)
115 {
116 self.simplified_result = Some(closure);
117 return;
118 }
119
120 syn::visit_mut::visit_expr_mut(self, expr);
123 }
124}
125
126impl QMacroSimplifier {
127 fn is_stageleft_runtime_support_call(&self, path: &syn::Path) -> bool {
128 if let Some(last_segment) = path.segments.last() {
130 let fn_name = last_segment.ident.to_string();
131 fn_name.contains("_type_hint")
133 && path.segments.len() > 2
134 && path.segments[0].ident == "stageleft"
135 && path.segments[1].ident == "runtime_support"
136 } else {
137 false
138 }
139 }
140
141 fn extract_closure_from_args(
142 &self,
143 args: &syn::punctuated::Punctuated<syn::Expr, syn::Token![,]>,
144 ) -> Option<syn::Expr> {
145 for arg in args {
147 if let syn::Expr::Closure(_) = arg {
148 return Some(arg.clone());
149 }
150 if let Some(closure_expr) = self.find_closure_in_expr(arg) {
152 return Some(closure_expr);
153 }
154 }
155 None
156 }
157
158 fn find_closure_in_expr(&self, expr: &syn::Expr) -> Option<syn::Expr> {
159 let mut visitor = ClosureFinder {
160 found_closure: None,
161 prefer_inner_blocks: true,
162 };
163 visitor.visit_expr(expr);
164 visitor.found_closure
165 }
166}
167
168struct ClosureFinder {
170 found_closure: Option<syn::Expr>,
171 prefer_inner_blocks: bool,
172}
173
174impl<'ast> Visit<'ast> for ClosureFinder {
175 fn visit_expr(&mut self, expr: &'ast syn::Expr) {
176 if self.found_closure.is_some() {
178 return;
179 }
180
181 match expr {
182 syn::Expr::Closure(_) => {
183 self.found_closure = Some(expr.clone());
184 }
185 syn::Expr::Block(block) if self.prefer_inner_blocks => {
186 for stmt in &block.block.stmts {
188 if let syn::Stmt::Expr(stmt_expr, _) = stmt
189 && let syn::Expr::Block(_) = stmt_expr
190 {
191 let mut inner_visitor = ClosureFinder {
193 found_closure: None,
194 prefer_inner_blocks: false, };
196 inner_visitor.visit_expr(stmt_expr);
197 if inner_visitor.found_closure.is_some() {
198 self.found_closure = Some(stmt_expr.clone());
200 return;
201 }
202 }
203 }
204
205 visit::visit_expr(self, expr);
207
208 if self.found_closure.is_some() {
211 }
213 }
214 _ => {
215 visit::visit_expr(self, expr);
217 }
218 }
219 }
220}
221
222#[derive(Clone, PartialEq, Eq, Hash)]
226pub struct DebugType(pub Box<syn::Type>);
227
228impl From<syn::Type> for DebugType {
229 fn from(t: syn::Type) -> Self {
230 Self(Box::new(t))
231 }
232}
233
234impl Deref for DebugType {
235 type Target = syn::Type;
236
237 fn deref(&self) -> &Self::Target {
238 &self.0
239 }
240}
241
242impl ToTokens for DebugType {
243 fn to_tokens(&self, tokens: &mut TokenStream) {
244 self.0.to_tokens(tokens);
245 }
246}
247
248impl Debug for DebugType {
249 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
250 write!(f, "{}", self.0.to_token_stream())
251 }
252}
253
254pub enum DebugInstantiate {
255 Building,
256 Finalized(Box<DebugInstantiateFinalized>),
257}
258
259#[cfg_attr(
260 not(feature = "build"),
261 expect(
262 dead_code,
263 reason = "sink, source unused without `feature = \"build\"`."
264 )
265)]
266pub struct DebugInstantiateFinalized {
267 sink: syn::Expr,
268 source: syn::Expr,
269 connect_fn: Option<Box<dyn FnOnce()>>,
270}
271
272impl From<DebugInstantiateFinalized> for DebugInstantiate {
273 fn from(f: DebugInstantiateFinalized) -> Self {
274 Self::Finalized(Box::new(f))
275 }
276}
277
278impl Debug for DebugInstantiate {
279 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
280 write!(f, "<network instantiate>")
281 }
282}
283
284impl Hash for DebugInstantiate {
285 fn hash<H: Hasher>(&self, _state: &mut H) {
286 }
288}
289
290impl Clone for DebugInstantiate {
291 fn clone(&self) -> Self {
292 match self {
293 DebugInstantiate::Building => DebugInstantiate::Building,
294 DebugInstantiate::Finalized(_) => {
295 panic!("DebugInstantiate::Finalized should not be cloned")
296 }
297 }
298 }
299}
300
301#[derive(Debug, Hash, Clone)]
303pub enum HydroSource {
304 Stream(DebugExpr),
305 ExternalNetwork(),
306 Iter(DebugExpr),
307 Spin(),
308 ClusterMembers(LocationId),
309 Embedded(syn::Ident),
310}
311
312#[cfg(feature = "build")]
313pub trait DfirBuilder {
319 fn singleton_intermediates(&self) -> bool;
321
322 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder;
324
325 fn batch(
326 &mut self,
327 in_ident: syn::Ident,
328 in_location: &LocationId,
329 in_kind: &CollectionKind,
330 out_ident: &syn::Ident,
331 out_location: &LocationId,
332 op_meta: &HydroIrOpMetadata,
333 );
334 fn yield_from_tick(
335 &mut self,
336 in_ident: syn::Ident,
337 in_location: &LocationId,
338 in_kind: &CollectionKind,
339 out_ident: &syn::Ident,
340 out_location: &LocationId,
341 );
342
343 fn begin_atomic(
344 &mut self,
345 in_ident: syn::Ident,
346 in_location: &LocationId,
347 in_kind: &CollectionKind,
348 out_ident: &syn::Ident,
349 out_location: &LocationId,
350 op_meta: &HydroIrOpMetadata,
351 );
352 fn end_atomic(
353 &mut self,
354 in_ident: syn::Ident,
355 in_location: &LocationId,
356 in_kind: &CollectionKind,
357 out_ident: &syn::Ident,
358 );
359
360 #[expect(clippy::too_many_arguments, reason = "TODO // internal")]
361 fn observe_nondet(
362 &mut self,
363 trusted: bool,
364 location: &LocationId,
365 in_ident: syn::Ident,
366 in_kind: &CollectionKind,
367 out_ident: &syn::Ident,
368 out_kind: &CollectionKind,
369 op_meta: &HydroIrOpMetadata,
370 );
371
372 #[expect(clippy::too_many_arguments, reason = "TODO")]
373 fn create_network(
374 &mut self,
375 from: &LocationId,
376 to: &LocationId,
377 input_ident: syn::Ident,
378 out_ident: &syn::Ident,
379 serialize: Option<&DebugExpr>,
380 sink: syn::Expr,
381 source: syn::Expr,
382 deserialize: Option<&DebugExpr>,
383 tag_id: usize,
384 );
385
386 fn create_external_source(
387 &mut self,
388 on: &LocationId,
389 source_expr: syn::Expr,
390 out_ident: &syn::Ident,
391 deserialize: Option<&DebugExpr>,
392 tag_id: usize,
393 );
394
395 fn create_external_output(
396 &mut self,
397 on: &LocationId,
398 sink_expr: syn::Expr,
399 input_ident: &syn::Ident,
400 serialize: Option<&DebugExpr>,
401 tag_id: usize,
402 );
403}
404
405#[cfg(feature = "build")]
406impl DfirBuilder for SecondaryMap<LocationKey, FlatGraphBuilder> {
407 fn singleton_intermediates(&self) -> bool {
408 false
409 }
410
411 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder {
412 self.entry(location.root().key())
413 .expect("location was removed")
414 .or_default()
415 }
416
417 fn batch(
418 &mut self,
419 in_ident: syn::Ident,
420 in_location: &LocationId,
421 in_kind: &CollectionKind,
422 out_ident: &syn::Ident,
423 _out_location: &LocationId,
424 _op_meta: &HydroIrOpMetadata,
425 ) {
426 let builder = self.get_dfir_mut(in_location.root());
427 if in_kind.is_bounded()
428 && matches!(
429 in_kind,
430 CollectionKind::Singleton { .. }
431 | CollectionKind::Optional { .. }
432 | CollectionKind::KeyedSingleton { .. }
433 )
434 {
435 assert!(in_location.is_top_level());
436 builder.add_dfir(
437 parse_quote! {
438 #out_ident = #in_ident -> persist::<'static>();
439 },
440 None,
441 None,
442 );
443 } else {
444 builder.add_dfir(
445 parse_quote! {
446 #out_ident = #in_ident;
447 },
448 None,
449 None,
450 );
451 }
452 }
453
454 fn yield_from_tick(
455 &mut self,
456 in_ident: syn::Ident,
457 in_location: &LocationId,
458 _in_kind: &CollectionKind,
459 out_ident: &syn::Ident,
460 _out_location: &LocationId,
461 ) {
462 let builder = self.get_dfir_mut(in_location.root());
463 builder.add_dfir(
464 parse_quote! {
465 #out_ident = #in_ident;
466 },
467 None,
468 None,
469 );
470 }
471
472 fn begin_atomic(
473 &mut self,
474 in_ident: syn::Ident,
475 in_location: &LocationId,
476 _in_kind: &CollectionKind,
477 out_ident: &syn::Ident,
478 _out_location: &LocationId,
479 _op_meta: &HydroIrOpMetadata,
480 ) {
481 let builder = self.get_dfir_mut(in_location.root());
482 builder.add_dfir(
483 parse_quote! {
484 #out_ident = #in_ident;
485 },
486 None,
487 None,
488 );
489 }
490
491 fn end_atomic(
492 &mut self,
493 in_ident: syn::Ident,
494 in_location: &LocationId,
495 _in_kind: &CollectionKind,
496 out_ident: &syn::Ident,
497 ) {
498 let builder = self.get_dfir_mut(in_location.root());
499 builder.add_dfir(
500 parse_quote! {
501 #out_ident = #in_ident;
502 },
503 None,
504 None,
505 );
506 }
507
508 fn observe_nondet(
509 &mut self,
510 _trusted: bool,
511 location: &LocationId,
512 in_ident: syn::Ident,
513 _in_kind: &CollectionKind,
514 out_ident: &syn::Ident,
515 _out_kind: &CollectionKind,
516 _op_meta: &HydroIrOpMetadata,
517 ) {
518 let builder = self.get_dfir_mut(location);
519 builder.add_dfir(
520 parse_quote! {
521 #out_ident = #in_ident;
522 },
523 None,
524 None,
525 );
526 }
527
528 fn create_network(
529 &mut self,
530 from: &LocationId,
531 to: &LocationId,
532 input_ident: syn::Ident,
533 out_ident: &syn::Ident,
534 serialize: Option<&DebugExpr>,
535 sink: syn::Expr,
536 source: syn::Expr,
537 deserialize: Option<&DebugExpr>,
538 tag_id: usize,
539 ) {
540 let sender_builder = self.get_dfir_mut(from);
541 if let Some(serialize_pipeline) = serialize {
542 sender_builder.add_dfir(
543 parse_quote! {
544 #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink);
545 },
546 None,
547 Some(&format!("send{}", tag_id)),
549 );
550 } else {
551 sender_builder.add_dfir(
552 parse_quote! {
553 #input_ident -> dest_sink(#sink);
554 },
555 None,
556 Some(&format!("send{}", tag_id)),
557 );
558 }
559
560 let receiver_builder = self.get_dfir_mut(to);
561 if let Some(deserialize_pipeline) = deserialize {
562 receiver_builder.add_dfir(
563 parse_quote! {
564 #out_ident = source_stream(#source) -> map(#deserialize_pipeline);
565 },
566 None,
567 Some(&format!("recv{}", tag_id)),
568 );
569 } else {
570 receiver_builder.add_dfir(
571 parse_quote! {
572 #out_ident = source_stream(#source);
573 },
574 None,
575 Some(&format!("recv{}", tag_id)),
576 );
577 }
578 }
579
580 fn create_external_source(
581 &mut self,
582 on: &LocationId,
583 source_expr: syn::Expr,
584 out_ident: &syn::Ident,
585 deserialize: Option<&DebugExpr>,
586 tag_id: usize,
587 ) {
588 let receiver_builder = self.get_dfir_mut(on);
589 if let Some(deserialize_pipeline) = deserialize {
590 receiver_builder.add_dfir(
591 parse_quote! {
592 #out_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
593 },
594 None,
595 Some(&format!("recv{}", tag_id)),
596 );
597 } else {
598 receiver_builder.add_dfir(
599 parse_quote! {
600 #out_ident = source_stream(#source_expr);
601 },
602 None,
603 Some(&format!("recv{}", tag_id)),
604 );
605 }
606 }
607
608 fn create_external_output(
609 &mut self,
610 on: &LocationId,
611 sink_expr: syn::Expr,
612 input_ident: &syn::Ident,
613 serialize: Option<&DebugExpr>,
614 tag_id: usize,
615 ) {
616 let sender_builder = self.get_dfir_mut(on);
617 if let Some(serialize_fn) = serialize {
618 sender_builder.add_dfir(
619 parse_quote! {
620 #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
621 },
622 None,
623 Some(&format!("send{}", tag_id)),
625 );
626 } else {
627 sender_builder.add_dfir(
628 parse_quote! {
629 #input_ident -> dest_sink(#sink_expr);
630 },
631 None,
632 Some(&format!("send{}", tag_id)),
633 );
634 }
635 }
636}
637
638#[cfg(feature = "build")]
639pub enum BuildersOrCallback<'a, L, N>
640where
641 L: FnMut(&mut HydroRoot, &mut usize),
642 N: FnMut(&mut HydroNode, &mut usize),
643{
644 Builders(&'a mut dyn DfirBuilder),
645 Callback(L, N),
646}
647
648#[derive(Debug, Hash)]
652pub enum HydroRoot {
653 ForEach {
654 f: DebugExpr,
655 input: Box<HydroNode>,
656 op_metadata: HydroIrOpMetadata,
657 },
658 SendExternal {
659 to_external_key: LocationKey,
660 to_port_id: ExternalPortId,
661 to_many: bool,
662 unpaired: bool,
663 serialize_fn: Option<DebugExpr>,
664 instantiate_fn: DebugInstantiate,
665 input: Box<HydroNode>,
666 op_metadata: HydroIrOpMetadata,
667 },
668 DestSink {
669 sink: DebugExpr,
670 input: Box<HydroNode>,
671 op_metadata: HydroIrOpMetadata,
672 },
673 CycleSink {
674 cycle_id: CycleId,
675 input: Box<HydroNode>,
676 op_metadata: HydroIrOpMetadata,
677 },
678 EmbeddedOutput {
679 ident: syn::Ident,
680 input: Box<HydroNode>,
681 op_metadata: HydroIrOpMetadata,
682 },
683}
684
685impl HydroRoot {
686 #[cfg(feature = "build")]
687 pub fn compile_network<'a, D>(
688 &mut self,
689 extra_stmts: &mut SparseSecondaryMap<LocationKey, Vec<syn::Stmt>>,
690 seen_tees: &mut SeenTees,
691 processes: &SparseSecondaryMap<LocationKey, D::Process>,
692 clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
693 externals: &SparseSecondaryMap<LocationKey, D::External>,
694 env: &mut D::InstantiateEnv,
695 ) where
696 D: Deploy<'a>,
697 {
698 let refcell_extra_stmts = RefCell::new(extra_stmts);
699 let refcell_env = RefCell::new(env);
700 self.transform_bottom_up(
701 &mut |l| {
702 if let HydroRoot::SendExternal {
703 input,
704 to_external_key,
705 to_port_id,
706 to_many,
707 unpaired,
708 instantiate_fn,
709 ..
710 } = l
711 {
712 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
713 DebugInstantiate::Building => {
714 let to_node = externals
715 .get(*to_external_key)
716 .unwrap_or_else(|| {
717 panic!("A external used in the graph was not instantiated: {}", to_external_key)
718 })
719 .clone();
720
721 match input.metadata().location_id.root() {
722 &LocationId::Process(process_key) => {
723 if *to_many {
724 (
725 (
726 D::e2o_many_sink(format!("{}_{}", *to_external_key, *to_port_id)),
727 parse_quote!(DUMMY),
728 ),
729 Box::new(|| {}) as Box<dyn FnOnce()>,
730 )
731 } else {
732 let from_node = processes
733 .get(process_key)
734 .unwrap_or_else(|| {
735 panic!("A process used in the graph was not instantiated: {}", process_key)
736 })
737 .clone();
738
739 let sink_port = from_node.next_port();
740 let source_port = to_node.next_port();
741
742 if *unpaired {
743 use stageleft::quote_type;
744 use tokio_util::codec::LengthDelimitedCodec;
745
746 to_node.register(*to_port_id, source_port.clone());
747
748 let _ = D::e2o_source(
749 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
750 &to_node, &source_port,
751 &from_node, &sink_port,
752 "e_type::<LengthDelimitedCodec>(),
753 format!("{}_{}", *to_external_key, *to_port_id)
754 );
755 }
756
757 (
758 (
759 D::o2e_sink(
760 &from_node,
761 &sink_port,
762 &to_node,
763 &source_port,
764 format!("{}_{}", *to_external_key, *to_port_id)
765 ),
766 parse_quote!(DUMMY),
767 ),
768 if *unpaired {
769 D::e2o_connect(
770 &to_node,
771 &source_port,
772 &from_node,
773 &sink_port,
774 *to_many,
775 NetworkHint::Auto,
776 )
777 } else {
778 Box::new(|| {}) as Box<dyn FnOnce()>
779 },
780 )
781 }
782 }
783 LocationId::Cluster(_) => todo!(),
784 _ => panic!()
785 }
786 },
787
788 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
789 };
790
791 *instantiate_fn = DebugInstantiateFinalized {
792 sink: sink_expr,
793 source: source_expr,
794 connect_fn: Some(connect_fn),
795 }
796 .into();
797 } else if let HydroRoot::EmbeddedOutput { ident, input, .. } = l {
798 let element_type = match &input.metadata().collection_kind {
799 CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
800 _ => panic!("Embedded output must have Stream collection kind"),
801 };
802 let location_key = match input.metadata().location_id.root() {
803 LocationId::Process(key) | LocationId::Cluster(key) => *key,
804 _ => panic!("Embedded output must be on a process or cluster"),
805 };
806 D::register_embedded_output(
807 &mut refcell_env.borrow_mut(),
808 location_key,
809 ident,
810 &element_type,
811 );
812 }
813 },
814 &mut |n| {
815 if let HydroNode::Network {
816 name,
817 input,
818 instantiate_fn,
819 metadata,
820 ..
821 } = n
822 {
823 let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
824 DebugInstantiate::Building => instantiate_network::<D>(
825 &mut refcell_env.borrow_mut(),
826 input.metadata().location_id.root(),
827 metadata.location_id.root(),
828 processes,
829 clusters,
830 name.as_deref(),
831 ),
832
833 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
834 };
835
836 *instantiate_fn = DebugInstantiateFinalized {
837 sink: sink_expr,
838 source: source_expr,
839 connect_fn: Some(connect_fn),
840 }
841 .into();
842 } else if let HydroNode::ExternalInput {
843 from_external_key,
844 from_port_id,
845 from_many,
846 codec_type,
847 port_hint,
848 instantiate_fn,
849 metadata,
850 ..
851 } = n
852 {
853 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
854 DebugInstantiate::Building => {
855 let from_node = externals
856 .get(*from_external_key)
857 .unwrap_or_else(|| {
858 panic!(
859 "A external used in the graph was not instantiated: {}",
860 from_external_key,
861 )
862 })
863 .clone();
864
865 match metadata.location_id.root() {
866 &LocationId::Process(process_key) => {
867 let to_node = processes
868 .get(process_key)
869 .unwrap_or_else(|| {
870 panic!("A process used in the graph was not instantiated: {}", process_key)
871 })
872 .clone();
873
874 let sink_port = from_node.next_port();
875 let source_port = to_node.next_port();
876
877 from_node.register(*from_port_id, sink_port.clone());
878
879 (
880 (
881 parse_quote!(DUMMY),
882 if *from_many {
883 D::e2o_many_source(
884 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
885 &to_node, &source_port,
886 codec_type.0.as_ref(),
887 format!("{}_{}", *from_external_key, *from_port_id)
888 )
889 } else {
890 D::e2o_source(
891 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
892 &from_node, &sink_port,
893 &to_node, &source_port,
894 codec_type.0.as_ref(),
895 format!("{}_{}", *from_external_key, *from_port_id)
896 )
897 },
898 ),
899 D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
900 )
901 }
902 LocationId::Cluster(_) => todo!(),
903 _ => panic!()
904 }
905 },
906
907 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
908 };
909
910 *instantiate_fn = DebugInstantiateFinalized {
911 sink: sink_expr,
912 source: source_expr,
913 connect_fn: Some(connect_fn),
914 }
915 .into();
916 } else if let HydroNode::Source { source: HydroSource::Embedded(ident), metadata } = n {
917 let element_type = match &metadata.collection_kind {
918 CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
919 _ => panic!("Embedded source must have Stream collection kind"),
920 };
921 let location_key = match metadata.location_id.root() {
922 LocationId::Process(key) | LocationId::Cluster(key) => *key,
923 _ => panic!("Embedded source must be on a process or cluster"),
924 };
925 D::register_embedded_input(
926 &mut refcell_env.borrow_mut(),
927 location_key,
928 ident,
929 &element_type,
930 );
931 }
932 },
933 seen_tees,
934 false,
935 );
936 }
937
938 pub fn connect_network(&mut self, seen_tees: &mut SeenTees) {
939 self.transform_bottom_up(
940 &mut |l| {
941 if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
942 match instantiate_fn {
943 DebugInstantiate::Building => panic!("network not built"),
944
945 DebugInstantiate::Finalized(finalized) => {
946 (finalized.connect_fn.take().unwrap())();
947 }
948 }
949 }
950 },
951 &mut |n| {
952 if let HydroNode::Network { instantiate_fn, .. }
953 | HydroNode::ExternalInput { instantiate_fn, .. } = n
954 {
955 match instantiate_fn {
956 DebugInstantiate::Building => panic!("network not built"),
957
958 DebugInstantiate::Finalized(finalized) => {
959 (finalized.connect_fn.take().unwrap())();
960 }
961 }
962 }
963 },
964 seen_tees,
965 false,
966 );
967 }
968
969 pub fn transform_bottom_up(
970 &mut self,
971 transform_root: &mut impl FnMut(&mut HydroRoot),
972 transform_node: &mut impl FnMut(&mut HydroNode),
973 seen_tees: &mut SeenTees,
974 check_well_formed: bool,
975 ) {
976 self.transform_children(
977 |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
978 seen_tees,
979 );
980
981 transform_root(self);
982 }
983
984 pub fn transform_children(
985 &mut self,
986 mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
987 seen_tees: &mut SeenTees,
988 ) {
989 match self {
990 HydroRoot::ForEach { input, .. }
991 | HydroRoot::SendExternal { input, .. }
992 | HydroRoot::DestSink { input, .. }
993 | HydroRoot::CycleSink { input, .. }
994 | HydroRoot::EmbeddedOutput { input, .. } => {
995 transform(input, seen_tees);
996 }
997 }
998 }
999
1000 pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroRoot {
1001 match self {
1002 HydroRoot::ForEach {
1003 f,
1004 input,
1005 op_metadata,
1006 } => HydroRoot::ForEach {
1007 f: f.clone(),
1008 input: Box::new(input.deep_clone(seen_tees)),
1009 op_metadata: op_metadata.clone(),
1010 },
1011 HydroRoot::SendExternal {
1012 to_external_key,
1013 to_port_id,
1014 to_many,
1015 unpaired,
1016 serialize_fn,
1017 instantiate_fn,
1018 input,
1019 op_metadata,
1020 } => HydroRoot::SendExternal {
1021 to_external_key: *to_external_key,
1022 to_port_id: *to_port_id,
1023 to_many: *to_many,
1024 unpaired: *unpaired,
1025 serialize_fn: serialize_fn.clone(),
1026 instantiate_fn: instantiate_fn.clone(),
1027 input: Box::new(input.deep_clone(seen_tees)),
1028 op_metadata: op_metadata.clone(),
1029 },
1030 HydroRoot::DestSink {
1031 sink,
1032 input,
1033 op_metadata,
1034 } => HydroRoot::DestSink {
1035 sink: sink.clone(),
1036 input: Box::new(input.deep_clone(seen_tees)),
1037 op_metadata: op_metadata.clone(),
1038 },
1039 HydroRoot::CycleSink {
1040 cycle_id,
1041 input,
1042 op_metadata,
1043 } => HydroRoot::CycleSink {
1044 cycle_id: *cycle_id,
1045 input: Box::new(input.deep_clone(seen_tees)),
1046 op_metadata: op_metadata.clone(),
1047 },
1048 HydroRoot::EmbeddedOutput {
1049 ident,
1050 input,
1051 op_metadata,
1052 } => HydroRoot::EmbeddedOutput {
1053 ident: ident.clone(),
1054 input: Box::new(input.deep_clone(seen_tees)),
1055 op_metadata: op_metadata.clone(),
1056 },
1057 }
1058 }
1059
1060 #[cfg(feature = "build")]
1061 pub fn emit<'a, D: Deploy<'a>>(
1062 &mut self,
1063 graph_builders: &mut dyn DfirBuilder,
1064 seen_tees: &mut SeenTees,
1065 built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
1066 next_stmt_id: &mut usize,
1067 ) {
1068 self.emit_core::<D>(
1069 &mut BuildersOrCallback::<
1070 fn(&mut HydroRoot, &mut usize),
1071 fn(&mut HydroNode, &mut usize),
1072 >::Builders(graph_builders),
1073 seen_tees,
1074 built_tees,
1075 next_stmt_id,
1076 );
1077 }
1078
1079 #[cfg(feature = "build")]
1080 pub fn emit_core<'a, D: Deploy<'a>>(
1081 &mut self,
1082 builders_or_callback: &mut BuildersOrCallback<
1083 impl FnMut(&mut HydroRoot, &mut usize),
1084 impl FnMut(&mut HydroNode, &mut usize),
1085 >,
1086 seen_tees: &mut SeenTees,
1087 built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
1088 next_stmt_id: &mut usize,
1089 ) {
1090 match self {
1091 HydroRoot::ForEach { f, input, .. } => {
1092 let input_ident =
1093 input.emit_core::<D>(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1094
1095 match builders_or_callback {
1096 BuildersOrCallback::Builders(graph_builders) => {
1097 graph_builders
1098 .get_dfir_mut(&input.metadata().location_id)
1099 .add_dfir(
1100 parse_quote! {
1101 #input_ident -> for_each(#f);
1102 },
1103 None,
1104 Some(&next_stmt_id.to_string()),
1105 );
1106 }
1107 BuildersOrCallback::Callback(leaf_callback, _) => {
1108 leaf_callback(self, next_stmt_id);
1109 }
1110 }
1111
1112 *next_stmt_id += 1;
1113 }
1114
1115 HydroRoot::SendExternal {
1116 serialize_fn,
1117 instantiate_fn,
1118 input,
1119 ..
1120 } => {
1121 let input_ident =
1122 input.emit_core::<D>(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1123
1124 match builders_or_callback {
1125 BuildersOrCallback::Builders(graph_builders) => {
1126 let (sink_expr, _) = match instantiate_fn {
1127 DebugInstantiate::Building => (
1128 syn::parse_quote!(DUMMY_SINK),
1129 syn::parse_quote!(DUMMY_SOURCE),
1130 ),
1131
1132 DebugInstantiate::Finalized(finalized) => {
1133 (finalized.sink.clone(), finalized.source.clone())
1134 }
1135 };
1136
1137 graph_builders.create_external_output(
1138 &input.metadata().location_id,
1139 sink_expr,
1140 &input_ident,
1141 serialize_fn.as_ref(),
1142 *next_stmt_id,
1143 );
1144 }
1145 BuildersOrCallback::Callback(leaf_callback, _) => {
1146 leaf_callback(self, next_stmt_id);
1147 }
1148 }
1149
1150 *next_stmt_id += 1;
1151 }
1152
1153 HydroRoot::DestSink { sink, input, .. } => {
1154 let input_ident =
1155 input.emit_core::<D>(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1156
1157 match builders_or_callback {
1158 BuildersOrCallback::Builders(graph_builders) => {
1159 graph_builders
1160 .get_dfir_mut(&input.metadata().location_id)
1161 .add_dfir(
1162 parse_quote! {
1163 #input_ident -> dest_sink(#sink);
1164 },
1165 None,
1166 Some(&next_stmt_id.to_string()),
1167 );
1168 }
1169 BuildersOrCallback::Callback(leaf_callback, _) => {
1170 leaf_callback(self, next_stmt_id);
1171 }
1172 }
1173
1174 *next_stmt_id += 1;
1175 }
1176
1177 HydroRoot::CycleSink {
1178 cycle_id, input, ..
1179 } => {
1180 let input_ident =
1181 input.emit_core::<D>(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1182
1183 match builders_or_callback {
1184 BuildersOrCallback::Builders(graph_builders) => {
1185 let elem_type: syn::Type = match &input.metadata().collection_kind {
1186 CollectionKind::KeyedSingleton {
1187 key_type,
1188 value_type,
1189 ..
1190 }
1191 | CollectionKind::KeyedStream {
1192 key_type,
1193 value_type,
1194 ..
1195 } => {
1196 parse_quote!((#key_type, #value_type))
1197 }
1198 CollectionKind::Stream { element_type, .. }
1199 | CollectionKind::Singleton { element_type, .. }
1200 | CollectionKind::Optional { element_type, .. } => {
1201 parse_quote!(#element_type)
1202 }
1203 };
1204
1205 let cycle_id_ident = cycle_id.as_ident();
1206 graph_builders
1207 .get_dfir_mut(&input.metadata().location_id)
1208 .add_dfir(
1209 parse_quote! {
1210 #cycle_id_ident = #input_ident -> identity::<#elem_type>();
1211 },
1212 None,
1213 None,
1214 );
1215 }
1216 BuildersOrCallback::Callback(_, _) => {}
1218 }
1219 }
1220
1221 HydroRoot::EmbeddedOutput { ident, input, .. } => {
1222 let input_ident =
1223 input.emit_core::<D>(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1224
1225 match builders_or_callback {
1226 BuildersOrCallback::Builders(graph_builders) => {
1227 graph_builders
1228 .get_dfir_mut(&input.metadata().location_id)
1229 .add_dfir(
1230 parse_quote! {
1231 #input_ident -> for_each(&mut #ident);
1232 },
1233 None,
1234 Some(&next_stmt_id.to_string()),
1235 );
1236 }
1237 BuildersOrCallback::Callback(leaf_callback, _) => {
1238 leaf_callback(self, next_stmt_id);
1239 }
1240 }
1241
1242 *next_stmt_id += 1;
1243 }
1244 }
1245 }
1246
1247 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
1248 match self {
1249 HydroRoot::ForEach { op_metadata, .. }
1250 | HydroRoot::SendExternal { op_metadata, .. }
1251 | HydroRoot::DestSink { op_metadata, .. }
1252 | HydroRoot::CycleSink { op_metadata, .. }
1253 | HydroRoot::EmbeddedOutput { op_metadata, .. } => op_metadata,
1254 }
1255 }
1256
1257 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
1258 match self {
1259 HydroRoot::ForEach { op_metadata, .. }
1260 | HydroRoot::SendExternal { op_metadata, .. }
1261 | HydroRoot::DestSink { op_metadata, .. }
1262 | HydroRoot::CycleSink { op_metadata, .. }
1263 | HydroRoot::EmbeddedOutput { op_metadata, .. } => op_metadata,
1264 }
1265 }
1266
1267 pub fn input(&self) -> &HydroNode {
1268 match self {
1269 HydroRoot::ForEach { input, .. }
1270 | HydroRoot::SendExternal { input, .. }
1271 | HydroRoot::DestSink { input, .. }
1272 | HydroRoot::CycleSink { input, .. }
1273 | HydroRoot::EmbeddedOutput { input, .. } => input,
1274 }
1275 }
1276
1277 pub fn input_metadata(&self) -> &HydroIrMetadata {
1278 self.input().metadata()
1279 }
1280
1281 pub fn print_root(&self) -> String {
1282 match self {
1283 HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
1284 HydroRoot::SendExternal { .. } => "SendExternal".to_owned(),
1285 HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
1286 HydroRoot::CycleSink { cycle_id, .. } => format!("CycleSink({})", cycle_id),
1287 HydroRoot::EmbeddedOutput { ident, .. } => {
1288 format!("EmbeddedOutput({})", ident)
1289 }
1290 }
1291 }
1292
1293 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
1294 match self {
1295 HydroRoot::ForEach { f, .. } | HydroRoot::DestSink { sink: f, .. } => {
1296 transform(f);
1297 }
1298 HydroRoot::SendExternal { .. }
1299 | HydroRoot::CycleSink { .. }
1300 | HydroRoot::EmbeddedOutput { .. } => {}
1301 }
1302 }
1303}
1304
1305#[cfg(feature = "build")]
1306pub fn emit<'a, D: Deploy<'a>>(
1307 ir: &mut Vec<HydroRoot>,
1308) -> SecondaryMap<LocationKey, FlatGraphBuilder> {
1309 let mut builders = SecondaryMap::new();
1310 let mut seen_tees = HashMap::new();
1311 let mut built_tees = HashMap::new();
1312 let mut next_stmt_id = 0;
1313 for leaf in ir {
1314 leaf.emit::<D>(
1315 &mut builders,
1316 &mut seen_tees,
1317 &mut built_tees,
1318 &mut next_stmt_id,
1319 );
1320 }
1321 builders
1322}
1323
1324#[cfg(feature = "build")]
1325pub fn traverse_dfir<'a, D: Deploy<'a>>(
1326 ir: &mut [HydroRoot],
1327 transform_root: impl FnMut(&mut HydroRoot, &mut usize),
1328 transform_node: impl FnMut(&mut HydroNode, &mut usize),
1329) {
1330 let mut seen_tees = HashMap::new();
1331 let mut built_tees = HashMap::new();
1332 let mut next_stmt_id = 0;
1333 let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
1334 ir.iter_mut().for_each(|leaf| {
1335 leaf.emit_core::<D>(
1336 &mut callback,
1337 &mut seen_tees,
1338 &mut built_tees,
1339 &mut next_stmt_id,
1340 );
1341 });
1342}
1343
1344pub fn transform_bottom_up(
1345 ir: &mut [HydroRoot],
1346 transform_root: &mut impl FnMut(&mut HydroRoot),
1347 transform_node: &mut impl FnMut(&mut HydroNode),
1348 check_well_formed: bool,
1349) {
1350 let mut seen_tees = HashMap::new();
1351 ir.iter_mut().for_each(|leaf| {
1352 leaf.transform_bottom_up(
1353 transform_root,
1354 transform_node,
1355 &mut seen_tees,
1356 check_well_formed,
1357 );
1358 });
1359}
1360
1361pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
1362 let mut seen_tees = HashMap::new();
1363 ir.iter()
1364 .map(|leaf| leaf.deep_clone(&mut seen_tees))
1365 .collect()
1366}
1367
1368type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
1369thread_local! {
1370 static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
1371}
1372
1373pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
1374 PRINTED_TEES.with(|printed_tees| {
1375 let mut printed_tees_mut = printed_tees.borrow_mut();
1376 *printed_tees_mut = Some((0, HashMap::new()));
1377 drop(printed_tees_mut);
1378
1379 let ret = f();
1380
1381 let mut printed_tees_mut = printed_tees.borrow_mut();
1382 *printed_tees_mut = None;
1383
1384 ret
1385 })
1386}
1387
1388pub struct TeeNode(pub Rc<RefCell<HydroNode>>);
1389
1390impl TeeNode {
1391 pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
1392 Rc::as_ptr(&self.0)
1393 }
1394}
1395
1396impl Debug for TeeNode {
1397 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1398 PRINTED_TEES.with(|printed_tees| {
1399 let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
1400 let printed_tees_mut = printed_tees_mut_borrow.as_mut();
1401
1402 if let Some(printed_tees_mut) = printed_tees_mut {
1403 if let Some(existing) = printed_tees_mut
1404 .1
1405 .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
1406 {
1407 write!(f, "<tee {}>", existing)
1408 } else {
1409 let next_id = printed_tees_mut.0;
1410 printed_tees_mut.0 += 1;
1411 printed_tees_mut
1412 .1
1413 .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
1414 drop(printed_tees_mut_borrow);
1415 write!(f, "<tee {}>: ", next_id)?;
1416 Debug::fmt(&self.0.borrow(), f)
1417 }
1418 } else {
1419 drop(printed_tees_mut_borrow);
1420 write!(f, "<tee>: ")?;
1421 Debug::fmt(&self.0.borrow(), f)
1422 }
1423 })
1424 }
1425}
1426
1427impl Hash for TeeNode {
1428 fn hash<H: Hasher>(&self, state: &mut H) {
1429 self.0.borrow_mut().hash(state);
1430 }
1431}
1432
1433#[derive(Clone, PartialEq, Eq, Debug)]
1434pub enum BoundKind {
1435 Unbounded,
1436 Bounded,
1437}
1438
1439#[derive(Clone, PartialEq, Eq, Debug)]
1440pub enum StreamOrder {
1441 NoOrder,
1442 TotalOrder,
1443}
1444
1445#[derive(Clone, PartialEq, Eq, Debug)]
1446pub enum StreamRetry {
1447 AtLeastOnce,
1448 ExactlyOnce,
1449}
1450
1451#[derive(Clone, PartialEq, Eq, Debug)]
1452pub enum KeyedSingletonBoundKind {
1453 Unbounded,
1454 BoundedValue,
1455 Bounded,
1456}
1457
1458#[derive(Clone, PartialEq, Eq, Debug)]
1459pub enum CollectionKind {
1460 Stream {
1461 bound: BoundKind,
1462 order: StreamOrder,
1463 retry: StreamRetry,
1464 element_type: DebugType,
1465 },
1466 Singleton {
1467 bound: BoundKind,
1468 element_type: DebugType,
1469 },
1470 Optional {
1471 bound: BoundKind,
1472 element_type: DebugType,
1473 },
1474 KeyedStream {
1475 bound: BoundKind,
1476 value_order: StreamOrder,
1477 value_retry: StreamRetry,
1478 key_type: DebugType,
1479 value_type: DebugType,
1480 },
1481 KeyedSingleton {
1482 bound: KeyedSingletonBoundKind,
1483 key_type: DebugType,
1484 value_type: DebugType,
1485 },
1486}
1487
1488impl CollectionKind {
1489 pub fn is_bounded(&self) -> bool {
1490 matches!(
1491 self,
1492 CollectionKind::Stream {
1493 bound: BoundKind::Bounded,
1494 ..
1495 } | CollectionKind::Singleton {
1496 bound: BoundKind::Bounded,
1497 ..
1498 } | CollectionKind::Optional {
1499 bound: BoundKind::Bounded,
1500 ..
1501 } | CollectionKind::KeyedStream {
1502 bound: BoundKind::Bounded,
1503 ..
1504 } | CollectionKind::KeyedSingleton {
1505 bound: KeyedSingletonBoundKind::Bounded,
1506 ..
1507 }
1508 )
1509 }
1510}
1511
1512#[derive(Clone)]
1513pub struct HydroIrMetadata {
1514 pub location_id: LocationId,
1515 pub collection_kind: CollectionKind,
1516 pub cardinality: Option<usize>,
1517 pub tag: Option<String>,
1518 pub op: HydroIrOpMetadata,
1519}
1520
1521impl Hash for HydroIrMetadata {
1523 fn hash<H: Hasher>(&self, _: &mut H) {}
1524}
1525
1526impl PartialEq for HydroIrMetadata {
1527 fn eq(&self, _: &Self) -> bool {
1528 true
1529 }
1530}
1531
1532impl Eq for HydroIrMetadata {}
1533
1534impl Debug for HydroIrMetadata {
1535 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1536 f.debug_struct("HydroIrMetadata")
1537 .field("location_id", &self.location_id)
1538 .field("collection_kind", &self.collection_kind)
1539 .finish()
1540 }
1541}
1542
1543#[derive(Clone)]
1546pub struct HydroIrOpMetadata {
1547 pub backtrace: Backtrace,
1548 pub cpu_usage: Option<f64>,
1549 pub network_recv_cpu_usage: Option<f64>,
1550 pub id: Option<usize>,
1551}
1552
1553impl HydroIrOpMetadata {
1554 #[expect(
1555 clippy::new_without_default,
1556 reason = "explicit calls to new ensure correct backtrace bounds"
1557 )]
1558 pub fn new() -> HydroIrOpMetadata {
1559 Self::new_with_skip(1)
1560 }
1561
1562 fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
1563 HydroIrOpMetadata {
1564 backtrace: Backtrace::get_backtrace(2 + skip_count),
1565 cpu_usage: None,
1566 network_recv_cpu_usage: None,
1567 id: None,
1568 }
1569 }
1570}
1571
1572impl Debug for HydroIrOpMetadata {
1573 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1574 f.debug_struct("HydroIrOpMetadata").finish()
1575 }
1576}
1577
1578impl Hash for HydroIrOpMetadata {
1579 fn hash<H: Hasher>(&self, _: &mut H) {}
1580}
1581
1582#[derive(Debug, Hash)]
1585pub enum HydroNode {
1586 Placeholder,
1587
1588 Cast {
1596 inner: Box<HydroNode>,
1597 metadata: HydroIrMetadata,
1598 },
1599
1600 ObserveNonDet {
1606 inner: Box<HydroNode>,
1607 trusted: bool, metadata: HydroIrMetadata,
1609 },
1610
1611 Source {
1612 source: HydroSource,
1613 metadata: HydroIrMetadata,
1614 },
1615
1616 SingletonSource {
1617 value: DebugExpr,
1618 metadata: HydroIrMetadata,
1619 },
1620
1621 CycleSource {
1622 cycle_id: CycleId,
1623 metadata: HydroIrMetadata,
1624 },
1625
1626 Tee {
1627 inner: TeeNode,
1628 metadata: HydroIrMetadata,
1629 },
1630
1631 BeginAtomic {
1632 inner: Box<HydroNode>,
1633 metadata: HydroIrMetadata,
1634 },
1635
1636 EndAtomic {
1637 inner: Box<HydroNode>,
1638 metadata: HydroIrMetadata,
1639 },
1640
1641 Batch {
1642 inner: Box<HydroNode>,
1643 metadata: HydroIrMetadata,
1644 },
1645
1646 YieldConcat {
1647 inner: Box<HydroNode>,
1648 metadata: HydroIrMetadata,
1649 },
1650
1651 Chain {
1652 first: Box<HydroNode>,
1653 second: Box<HydroNode>,
1654 metadata: HydroIrMetadata,
1655 },
1656
1657 ChainFirst {
1658 first: Box<HydroNode>,
1659 second: Box<HydroNode>,
1660 metadata: HydroIrMetadata,
1661 },
1662
1663 CrossProduct {
1664 left: Box<HydroNode>,
1665 right: Box<HydroNode>,
1666 metadata: HydroIrMetadata,
1667 },
1668
1669 CrossSingleton {
1670 left: Box<HydroNode>,
1671 right: Box<HydroNode>,
1672 metadata: HydroIrMetadata,
1673 },
1674
1675 Join {
1676 left: Box<HydroNode>,
1677 right: Box<HydroNode>,
1678 metadata: HydroIrMetadata,
1679 },
1680
1681 Difference {
1682 pos: Box<HydroNode>,
1683 neg: Box<HydroNode>,
1684 metadata: HydroIrMetadata,
1685 },
1686
1687 AntiJoin {
1688 pos: Box<HydroNode>,
1689 neg: Box<HydroNode>,
1690 metadata: HydroIrMetadata,
1691 },
1692
1693 ResolveFutures {
1694 input: Box<HydroNode>,
1695 metadata: HydroIrMetadata,
1696 },
1697 ResolveFuturesOrdered {
1698 input: Box<HydroNode>,
1699 metadata: HydroIrMetadata,
1700 },
1701
1702 Map {
1703 f: DebugExpr,
1704 input: Box<HydroNode>,
1705 metadata: HydroIrMetadata,
1706 },
1707 FlatMap {
1708 f: DebugExpr,
1709 input: Box<HydroNode>,
1710 metadata: HydroIrMetadata,
1711 },
1712 Filter {
1713 f: DebugExpr,
1714 input: Box<HydroNode>,
1715 metadata: HydroIrMetadata,
1716 },
1717 FilterMap {
1718 f: DebugExpr,
1719 input: Box<HydroNode>,
1720 metadata: HydroIrMetadata,
1721 },
1722
1723 DeferTick {
1724 input: Box<HydroNode>,
1725 metadata: HydroIrMetadata,
1726 },
1727 Enumerate {
1728 input: Box<HydroNode>,
1729 metadata: HydroIrMetadata,
1730 },
1731 Inspect {
1732 f: DebugExpr,
1733 input: Box<HydroNode>,
1734 metadata: HydroIrMetadata,
1735 },
1736
1737 Unique {
1738 input: Box<HydroNode>,
1739 metadata: HydroIrMetadata,
1740 },
1741
1742 Sort {
1743 input: Box<HydroNode>,
1744 metadata: HydroIrMetadata,
1745 },
1746 Fold {
1747 init: DebugExpr,
1748 acc: DebugExpr,
1749 input: Box<HydroNode>,
1750 metadata: HydroIrMetadata,
1751 },
1752
1753 Scan {
1754 init: DebugExpr,
1755 acc: DebugExpr,
1756 input: Box<HydroNode>,
1757 metadata: HydroIrMetadata,
1758 },
1759 FoldKeyed {
1760 init: DebugExpr,
1761 acc: DebugExpr,
1762 input: Box<HydroNode>,
1763 metadata: HydroIrMetadata,
1764 },
1765
1766 Reduce {
1767 f: DebugExpr,
1768 input: Box<HydroNode>,
1769 metadata: HydroIrMetadata,
1770 },
1771 ReduceKeyed {
1772 f: DebugExpr,
1773 input: Box<HydroNode>,
1774 metadata: HydroIrMetadata,
1775 },
1776 ReduceKeyedWatermark {
1777 f: DebugExpr,
1778 input: Box<HydroNode>,
1779 watermark: Box<HydroNode>,
1780 metadata: HydroIrMetadata,
1781 },
1782
1783 Network {
1784 name: Option<String>,
1785 serialize_fn: Option<DebugExpr>,
1786 instantiate_fn: DebugInstantiate,
1787 deserialize_fn: Option<DebugExpr>,
1788 input: Box<HydroNode>,
1789 metadata: HydroIrMetadata,
1790 },
1791
1792 ExternalInput {
1793 from_external_key: LocationKey,
1794 from_port_id: ExternalPortId,
1795 from_many: bool,
1796 codec_type: DebugType,
1797 port_hint: NetworkHint,
1798 instantiate_fn: DebugInstantiate,
1799 deserialize_fn: Option<DebugExpr>,
1800 metadata: HydroIrMetadata,
1801 },
1802
1803 Counter {
1804 tag: String,
1805 duration: DebugExpr,
1806 prefix: String,
1807 input: Box<HydroNode>,
1808 metadata: HydroIrMetadata,
1809 },
1810}
1811
1812pub type SeenTees = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
1813pub type SeenTeeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
1814
1815impl HydroNode {
1816 pub fn transform_bottom_up(
1817 &mut self,
1818 transform: &mut impl FnMut(&mut HydroNode),
1819 seen_tees: &mut SeenTees,
1820 check_well_formed: bool,
1821 ) {
1822 self.transform_children(
1823 |n, s| n.transform_bottom_up(transform, s, check_well_formed),
1824 seen_tees,
1825 );
1826
1827 transform(self);
1828
1829 let self_location = self.metadata().location_id.root();
1830
1831 if check_well_formed {
1832 match &*self {
1833 HydroNode::Network { .. } => {}
1834 _ => {
1835 self.input_metadata().iter().for_each(|i| {
1836 if i.location_id.root() != self_location {
1837 panic!(
1838 "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
1839 i,
1840 i.location_id.root(),
1841 self,
1842 self_location
1843 )
1844 }
1845 });
1846 }
1847 }
1848 }
1849 }
1850
1851 #[inline(always)]
1852 pub fn transform_children(
1853 &mut self,
1854 mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
1855 seen_tees: &mut SeenTees,
1856 ) {
1857 match self {
1858 HydroNode::Placeholder => {
1859 panic!();
1860 }
1861
1862 HydroNode::Source { .. }
1863 | HydroNode::SingletonSource { .. }
1864 | HydroNode::CycleSource { .. }
1865 | HydroNode::ExternalInput { .. } => {}
1866
1867 HydroNode::Tee { inner, .. } => {
1868 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
1869 *inner = TeeNode(transformed.clone());
1870 } else {
1871 let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
1872 seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
1873 let mut orig = inner.0.replace(HydroNode::Placeholder);
1874 transform(&mut orig, seen_tees);
1875 *transformed_cell.borrow_mut() = orig;
1876 *inner = TeeNode(transformed_cell);
1877 }
1878 }
1879
1880 HydroNode::Cast { inner, .. }
1881 | HydroNode::ObserveNonDet { inner, .. }
1882 | HydroNode::BeginAtomic { inner, .. }
1883 | HydroNode::EndAtomic { inner, .. }
1884 | HydroNode::Batch { inner, .. }
1885 | HydroNode::YieldConcat { inner, .. } => {
1886 transform(inner.as_mut(), seen_tees);
1887 }
1888
1889 HydroNode::Chain { first, second, .. } => {
1890 transform(first.as_mut(), seen_tees);
1891 transform(second.as_mut(), seen_tees);
1892 }
1893
1894 HydroNode::ChainFirst { first, second, .. } => {
1895 transform(first.as_mut(), seen_tees);
1896 transform(second.as_mut(), seen_tees);
1897 }
1898
1899 HydroNode::CrossSingleton { left, right, .. }
1900 | HydroNode::CrossProduct { left, right, .. }
1901 | HydroNode::Join { left, right, .. } => {
1902 transform(left.as_mut(), seen_tees);
1903 transform(right.as_mut(), seen_tees);
1904 }
1905
1906 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
1907 transform(pos.as_mut(), seen_tees);
1908 transform(neg.as_mut(), seen_tees);
1909 }
1910
1911 HydroNode::ReduceKeyedWatermark {
1912 input, watermark, ..
1913 } => {
1914 transform(input.as_mut(), seen_tees);
1915 transform(watermark.as_mut(), seen_tees);
1916 }
1917
1918 HydroNode::Map { input, .. }
1919 | HydroNode::ResolveFutures { input, .. }
1920 | HydroNode::ResolveFuturesOrdered { input, .. }
1921 | HydroNode::FlatMap { input, .. }
1922 | HydroNode::Filter { input, .. }
1923 | HydroNode::FilterMap { input, .. }
1924 | HydroNode::Sort { input, .. }
1925 | HydroNode::DeferTick { input, .. }
1926 | HydroNode::Enumerate { input, .. }
1927 | HydroNode::Inspect { input, .. }
1928 | HydroNode::Unique { input, .. }
1929 | HydroNode::Network { input, .. }
1930 | HydroNode::Fold { input, .. }
1931 | HydroNode::Scan { input, .. }
1932 | HydroNode::FoldKeyed { input, .. }
1933 | HydroNode::Reduce { input, .. }
1934 | HydroNode::ReduceKeyed { input, .. }
1935 | HydroNode::Counter { input, .. } => {
1936 transform(input.as_mut(), seen_tees);
1937 }
1938 }
1939 }
1940
1941 pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroNode {
1942 match self {
1943 HydroNode::Placeholder => HydroNode::Placeholder,
1944 HydroNode::Cast { inner, metadata } => HydroNode::Cast {
1945 inner: Box::new(inner.deep_clone(seen_tees)),
1946 metadata: metadata.clone(),
1947 },
1948 HydroNode::ObserveNonDet {
1949 inner,
1950 trusted,
1951 metadata,
1952 } => HydroNode::ObserveNonDet {
1953 inner: Box::new(inner.deep_clone(seen_tees)),
1954 trusted: *trusted,
1955 metadata: metadata.clone(),
1956 },
1957 HydroNode::Source { source, metadata } => HydroNode::Source {
1958 source: source.clone(),
1959 metadata: metadata.clone(),
1960 },
1961 HydroNode::SingletonSource { value, metadata } => HydroNode::SingletonSource {
1962 value: value.clone(),
1963 metadata: metadata.clone(),
1964 },
1965 HydroNode::CycleSource { cycle_id, metadata } => HydroNode::CycleSource {
1966 cycle_id: *cycle_id,
1967 metadata: metadata.clone(),
1968 },
1969 HydroNode::Tee { inner, metadata } => {
1970 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
1971 HydroNode::Tee {
1972 inner: TeeNode(transformed.clone()),
1973 metadata: metadata.clone(),
1974 }
1975 } else {
1976 let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
1977 seen_tees.insert(inner.as_ptr(), new_rc.clone());
1978 let cloned = inner.0.borrow().deep_clone(seen_tees);
1979 *new_rc.borrow_mut() = cloned;
1980 HydroNode::Tee {
1981 inner: TeeNode(new_rc),
1982 metadata: metadata.clone(),
1983 }
1984 }
1985 }
1986 HydroNode::YieldConcat { inner, metadata } => HydroNode::YieldConcat {
1987 inner: Box::new(inner.deep_clone(seen_tees)),
1988 metadata: metadata.clone(),
1989 },
1990 HydroNode::BeginAtomic { inner, metadata } => HydroNode::BeginAtomic {
1991 inner: Box::new(inner.deep_clone(seen_tees)),
1992 metadata: metadata.clone(),
1993 },
1994 HydroNode::EndAtomic { inner, metadata } => HydroNode::EndAtomic {
1995 inner: Box::new(inner.deep_clone(seen_tees)),
1996 metadata: metadata.clone(),
1997 },
1998 HydroNode::Batch { inner, metadata } => HydroNode::Batch {
1999 inner: Box::new(inner.deep_clone(seen_tees)),
2000 metadata: metadata.clone(),
2001 },
2002 HydroNode::Chain {
2003 first,
2004 second,
2005 metadata,
2006 } => HydroNode::Chain {
2007 first: Box::new(first.deep_clone(seen_tees)),
2008 second: Box::new(second.deep_clone(seen_tees)),
2009 metadata: metadata.clone(),
2010 },
2011 HydroNode::ChainFirst {
2012 first,
2013 second,
2014 metadata,
2015 } => HydroNode::ChainFirst {
2016 first: Box::new(first.deep_clone(seen_tees)),
2017 second: Box::new(second.deep_clone(seen_tees)),
2018 metadata: metadata.clone(),
2019 },
2020 HydroNode::CrossProduct {
2021 left,
2022 right,
2023 metadata,
2024 } => HydroNode::CrossProduct {
2025 left: Box::new(left.deep_clone(seen_tees)),
2026 right: Box::new(right.deep_clone(seen_tees)),
2027 metadata: metadata.clone(),
2028 },
2029 HydroNode::CrossSingleton {
2030 left,
2031 right,
2032 metadata,
2033 } => HydroNode::CrossSingleton {
2034 left: Box::new(left.deep_clone(seen_tees)),
2035 right: Box::new(right.deep_clone(seen_tees)),
2036 metadata: metadata.clone(),
2037 },
2038 HydroNode::Join {
2039 left,
2040 right,
2041 metadata,
2042 } => HydroNode::Join {
2043 left: Box::new(left.deep_clone(seen_tees)),
2044 right: Box::new(right.deep_clone(seen_tees)),
2045 metadata: metadata.clone(),
2046 },
2047 HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
2048 pos: Box::new(pos.deep_clone(seen_tees)),
2049 neg: Box::new(neg.deep_clone(seen_tees)),
2050 metadata: metadata.clone(),
2051 },
2052 HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
2053 pos: Box::new(pos.deep_clone(seen_tees)),
2054 neg: Box::new(neg.deep_clone(seen_tees)),
2055 metadata: metadata.clone(),
2056 },
2057 HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
2058 input: Box::new(input.deep_clone(seen_tees)),
2059 metadata: metadata.clone(),
2060 },
2061 HydroNode::ResolveFuturesOrdered { input, metadata } => {
2062 HydroNode::ResolveFuturesOrdered {
2063 input: Box::new(input.deep_clone(seen_tees)),
2064 metadata: metadata.clone(),
2065 }
2066 }
2067 HydroNode::Map { f, input, metadata } => HydroNode::Map {
2068 f: f.clone(),
2069 input: Box::new(input.deep_clone(seen_tees)),
2070 metadata: metadata.clone(),
2071 },
2072 HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
2073 f: f.clone(),
2074 input: Box::new(input.deep_clone(seen_tees)),
2075 metadata: metadata.clone(),
2076 },
2077 HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
2078 f: f.clone(),
2079 input: Box::new(input.deep_clone(seen_tees)),
2080 metadata: metadata.clone(),
2081 },
2082 HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
2083 f: f.clone(),
2084 input: Box::new(input.deep_clone(seen_tees)),
2085 metadata: metadata.clone(),
2086 },
2087 HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
2088 input: Box::new(input.deep_clone(seen_tees)),
2089 metadata: metadata.clone(),
2090 },
2091 HydroNode::Enumerate { input, metadata } => HydroNode::Enumerate {
2092 input: Box::new(input.deep_clone(seen_tees)),
2093 metadata: metadata.clone(),
2094 },
2095 HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
2096 f: f.clone(),
2097 input: Box::new(input.deep_clone(seen_tees)),
2098 metadata: metadata.clone(),
2099 },
2100 HydroNode::Unique { input, metadata } => HydroNode::Unique {
2101 input: Box::new(input.deep_clone(seen_tees)),
2102 metadata: metadata.clone(),
2103 },
2104 HydroNode::Sort { input, metadata } => HydroNode::Sort {
2105 input: Box::new(input.deep_clone(seen_tees)),
2106 metadata: metadata.clone(),
2107 },
2108 HydroNode::Fold {
2109 init,
2110 acc,
2111 input,
2112 metadata,
2113 } => HydroNode::Fold {
2114 init: init.clone(),
2115 acc: acc.clone(),
2116 input: Box::new(input.deep_clone(seen_tees)),
2117 metadata: metadata.clone(),
2118 },
2119 HydroNode::Scan {
2120 init,
2121 acc,
2122 input,
2123 metadata,
2124 } => HydroNode::Scan {
2125 init: init.clone(),
2126 acc: acc.clone(),
2127 input: Box::new(input.deep_clone(seen_tees)),
2128 metadata: metadata.clone(),
2129 },
2130 HydroNode::FoldKeyed {
2131 init,
2132 acc,
2133 input,
2134 metadata,
2135 } => HydroNode::FoldKeyed {
2136 init: init.clone(),
2137 acc: acc.clone(),
2138 input: Box::new(input.deep_clone(seen_tees)),
2139 metadata: metadata.clone(),
2140 },
2141 HydroNode::ReduceKeyedWatermark {
2142 f,
2143 input,
2144 watermark,
2145 metadata,
2146 } => HydroNode::ReduceKeyedWatermark {
2147 f: f.clone(),
2148 input: Box::new(input.deep_clone(seen_tees)),
2149 watermark: Box::new(watermark.deep_clone(seen_tees)),
2150 metadata: metadata.clone(),
2151 },
2152 HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
2153 f: f.clone(),
2154 input: Box::new(input.deep_clone(seen_tees)),
2155 metadata: metadata.clone(),
2156 },
2157 HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
2158 f: f.clone(),
2159 input: Box::new(input.deep_clone(seen_tees)),
2160 metadata: metadata.clone(),
2161 },
2162 HydroNode::Network {
2163 name,
2164 serialize_fn,
2165 instantiate_fn,
2166 deserialize_fn,
2167 input,
2168 metadata,
2169 } => HydroNode::Network {
2170 name: name.clone(),
2171 serialize_fn: serialize_fn.clone(),
2172 instantiate_fn: instantiate_fn.clone(),
2173 deserialize_fn: deserialize_fn.clone(),
2174 input: Box::new(input.deep_clone(seen_tees)),
2175 metadata: metadata.clone(),
2176 },
2177 HydroNode::ExternalInput {
2178 from_external_key,
2179 from_port_id,
2180 from_many,
2181 codec_type,
2182 port_hint,
2183 instantiate_fn,
2184 deserialize_fn,
2185 metadata,
2186 } => HydroNode::ExternalInput {
2187 from_external_key: *from_external_key,
2188 from_port_id: *from_port_id,
2189 from_many: *from_many,
2190 codec_type: codec_type.clone(),
2191 port_hint: *port_hint,
2192 instantiate_fn: instantiate_fn.clone(),
2193 deserialize_fn: deserialize_fn.clone(),
2194 metadata: metadata.clone(),
2195 },
2196 HydroNode::Counter {
2197 tag,
2198 duration,
2199 prefix,
2200 input,
2201 metadata,
2202 } => HydroNode::Counter {
2203 tag: tag.clone(),
2204 duration: duration.clone(),
2205 prefix: prefix.clone(),
2206 input: Box::new(input.deep_clone(seen_tees)),
2207 metadata: metadata.clone(),
2208 },
2209 }
2210 }
2211
2212 #[cfg(feature = "build")]
2213 pub fn emit_core<'a, D: Deploy<'a>>(
2214 &mut self,
2215 builders_or_callback: &mut BuildersOrCallback<
2216 impl FnMut(&mut HydroRoot, &mut usize),
2217 impl FnMut(&mut HydroNode, &mut usize),
2218 >,
2219 seen_tees: &mut SeenTees,
2220 built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
2221 next_stmt_id: &mut usize,
2222 ) -> syn::Ident {
2223 let mut ident_stack: Vec<syn::Ident> = Vec::new();
2224
2225 self.transform_bottom_up(
2226 &mut |node: &mut HydroNode| {
2227 let out_location = node.metadata().location_id.clone();
2228 match node {
2229 HydroNode::Placeholder => {
2230 panic!()
2231 }
2232
2233 HydroNode::Cast { .. } => {
2234 match builders_or_callback {
2237 BuildersOrCallback::Builders(_) => {}
2238 BuildersOrCallback::Callback(_, node_callback) => {
2239 node_callback(node, next_stmt_id);
2240 }
2241 }
2242
2243 *next_stmt_id += 1;
2244 }
2246
2247 HydroNode::ObserveNonDet {
2248 inner,
2249 trusted,
2250 metadata,
2251 ..
2252 } => {
2253 let inner_ident = ident_stack.pop().unwrap();
2254
2255 let observe_ident =
2256 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2257
2258 match builders_or_callback {
2259 BuildersOrCallback::Builders(graph_builders) => {
2260 graph_builders.observe_nondet(
2261 *trusted,
2262 &inner.metadata().location_id,
2263 inner_ident,
2264 &inner.metadata().collection_kind,
2265 &observe_ident,
2266 &metadata.collection_kind,
2267 &metadata.op,
2268 );
2269 }
2270 BuildersOrCallback::Callback(_, node_callback) => {
2271 node_callback(node, next_stmt_id);
2272 }
2273 }
2274
2275 *next_stmt_id += 1;
2276
2277 ident_stack.push(observe_ident);
2278 }
2279
2280 HydroNode::Batch {
2281 inner, metadata, ..
2282 } => {
2283 let inner_ident = ident_stack.pop().unwrap();
2284
2285 let batch_ident =
2286 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2287
2288 match builders_or_callback {
2289 BuildersOrCallback::Builders(graph_builders) => {
2290 graph_builders.batch(
2291 inner_ident,
2292 &inner.metadata().location_id,
2293 &inner.metadata().collection_kind,
2294 &batch_ident,
2295 &out_location,
2296 &metadata.op,
2297 );
2298 }
2299 BuildersOrCallback::Callback(_, node_callback) => {
2300 node_callback(node, next_stmt_id);
2301 }
2302 }
2303
2304 *next_stmt_id += 1;
2305
2306 ident_stack.push(batch_ident);
2307 }
2308
2309 HydroNode::YieldConcat { inner, .. } => {
2310 let inner_ident = ident_stack.pop().unwrap();
2311
2312 let yield_ident =
2313 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2314
2315 match builders_or_callback {
2316 BuildersOrCallback::Builders(graph_builders) => {
2317 graph_builders.yield_from_tick(
2318 inner_ident,
2319 &inner.metadata().location_id,
2320 &inner.metadata().collection_kind,
2321 &yield_ident,
2322 &out_location,
2323 );
2324 }
2325 BuildersOrCallback::Callback(_, node_callback) => {
2326 node_callback(node, next_stmt_id);
2327 }
2328 }
2329
2330 *next_stmt_id += 1;
2331
2332 ident_stack.push(yield_ident);
2333 }
2334
2335 HydroNode::BeginAtomic { inner, metadata } => {
2336 let inner_ident = ident_stack.pop().unwrap();
2337
2338 let begin_ident =
2339 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2340
2341 match builders_or_callback {
2342 BuildersOrCallback::Builders(graph_builders) => {
2343 graph_builders.begin_atomic(
2344 inner_ident,
2345 &inner.metadata().location_id,
2346 &inner.metadata().collection_kind,
2347 &begin_ident,
2348 &out_location,
2349 &metadata.op,
2350 );
2351 }
2352 BuildersOrCallback::Callback(_, node_callback) => {
2353 node_callback(node, next_stmt_id);
2354 }
2355 }
2356
2357 *next_stmt_id += 1;
2358
2359 ident_stack.push(begin_ident);
2360 }
2361
2362 HydroNode::EndAtomic { inner, .. } => {
2363 let inner_ident = ident_stack.pop().unwrap();
2364
2365 let end_ident =
2366 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2367
2368 match builders_or_callback {
2369 BuildersOrCallback::Builders(graph_builders) => {
2370 graph_builders.end_atomic(
2371 inner_ident,
2372 &inner.metadata().location_id,
2373 &inner.metadata().collection_kind,
2374 &end_ident,
2375 );
2376 }
2377 BuildersOrCallback::Callback(_, node_callback) => {
2378 node_callback(node, next_stmt_id);
2379 }
2380 }
2381
2382 *next_stmt_id += 1;
2383
2384 ident_stack.push(end_ident);
2385 }
2386
2387 HydroNode::Source {
2388 source, metadata, ..
2389 } => {
2390 if let HydroSource::ExternalNetwork() = source {
2391 ident_stack.push(syn::Ident::new("DUMMY", Span::call_site()));
2392 } else {
2393 let source_ident =
2394 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2395
2396 let source_stmt = match source {
2397 HydroSource::Stream(expr) => {
2398 debug_assert!(metadata.location_id.is_top_level());
2399 parse_quote! {
2400 #source_ident = source_stream(#expr);
2401 }
2402 }
2403
2404 HydroSource::ExternalNetwork() => {
2405 unreachable!()
2406 }
2407
2408 HydroSource::Iter(expr) => {
2409 if metadata.location_id.is_top_level() {
2410 parse_quote! {
2411 #source_ident = source_iter(#expr);
2412 }
2413 } else {
2414 parse_quote! {
2416 #source_ident = source_iter(#expr) -> persist::<'static>();
2417 }
2418 }
2419 }
2420
2421 HydroSource::Spin() => {
2422 debug_assert!(metadata.location_id.is_top_level());
2423 parse_quote! {
2424 #source_ident = spin();
2425 }
2426 }
2427
2428 HydroSource::ClusterMembers(location_id) => {
2429 debug_assert!(metadata.location_id.is_top_level());
2430
2431 let expr = stageleft::QuotedWithContext::splice_untyped_ctx(
2432 D::cluster_membership_stream(location_id),
2433 &(),
2434 );
2435
2436 parse_quote! {
2437 #source_ident = source_stream(#expr);
2438 }
2439 }
2440
2441 HydroSource::Embedded(ident) => {
2442 parse_quote! {
2443 #source_ident = source_stream(#ident);
2444 }
2445 }
2446 };
2447
2448 match builders_or_callback {
2449 BuildersOrCallback::Builders(graph_builders) => {
2450 let builder = graph_builders.get_dfir_mut(&out_location);
2451 builder.add_dfir(source_stmt, None, Some(&next_stmt_id.to_string()));
2452 }
2453 BuildersOrCallback::Callback(_, node_callback) => {
2454 node_callback(node, next_stmt_id);
2455 }
2456 }
2457
2458 *next_stmt_id += 1;
2459
2460 ident_stack.push(source_ident);
2461 }
2462 }
2463
2464 HydroNode::SingletonSource { value, metadata } => {
2465 let source_ident =
2466 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2467
2468 match builders_or_callback {
2469 BuildersOrCallback::Builders(graph_builders) => {
2470 let builder = graph_builders.get_dfir_mut(&out_location);
2471
2472 if metadata.location_id.is_top_level()
2473 && metadata.collection_kind.is_bounded()
2474 {
2475 builder.add_dfir(
2476 parse_quote! {
2477 #source_ident = source_iter([#value]);
2478 },
2479 None,
2480 Some(&next_stmt_id.to_string()),
2481 );
2482 } else {
2483 builder.add_dfir(
2484 parse_quote! {
2485 #source_ident = source_iter([#value]) -> persist::<'static>();
2486 },
2487 None,
2488 Some(&next_stmt_id.to_string()),
2489 );
2490 }
2491 }
2492 BuildersOrCallback::Callback(_, node_callback) => {
2493 node_callback(node, next_stmt_id);
2494 }
2495 }
2496
2497 *next_stmt_id += 1;
2498
2499 ident_stack.push(source_ident);
2500 }
2501
2502 HydroNode::CycleSource { cycle_id, .. } => {
2503 let ident = cycle_id.as_ident();
2504
2505 match builders_or_callback {
2506 BuildersOrCallback::Builders(_) => {}
2507 BuildersOrCallback::Callback(_, node_callback) => {
2508 node_callback(node, next_stmt_id);
2509 }
2510 }
2511
2512 *next_stmt_id += 1;
2514
2515 ident_stack.push(ident);
2516 }
2517
2518 HydroNode::Tee { inner, .. } => {
2519 let ret_ident = if let Some(teed_from) =
2520 built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
2521 {
2522 match builders_or_callback {
2523 BuildersOrCallback::Builders(_) => {}
2524 BuildersOrCallback::Callback(_, node_callback) => {
2525 node_callback(node, next_stmt_id);
2526 }
2527 }
2528
2529 teed_from.clone()
2530 } else {
2531 let inner_ident = ident_stack.pop().unwrap();
2534
2535 let tee_ident =
2536 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2537
2538 built_tees.insert(
2539 inner.0.as_ref() as *const RefCell<HydroNode>,
2540 tee_ident.clone(),
2541 );
2542
2543 match builders_or_callback {
2544 BuildersOrCallback::Builders(graph_builders) => {
2545 let builder = graph_builders.get_dfir_mut(&out_location);
2546 builder.add_dfir(
2547 parse_quote! {
2548 #tee_ident = #inner_ident -> tee();
2549 },
2550 None,
2551 Some(&next_stmt_id.to_string()),
2552 );
2553 }
2554 BuildersOrCallback::Callback(_, node_callback) => {
2555 node_callback(node, next_stmt_id);
2556 }
2557 }
2558
2559 tee_ident
2560 };
2561
2562 *next_stmt_id += 1;
2566 ident_stack.push(ret_ident);
2567 }
2568
2569 HydroNode::Chain { .. } => {
2570 let second_ident = ident_stack.pop().unwrap();
2572 let first_ident = ident_stack.pop().unwrap();
2573
2574 let chain_ident =
2575 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2576
2577 match builders_or_callback {
2578 BuildersOrCallback::Builders(graph_builders) => {
2579 let builder = graph_builders.get_dfir_mut(&out_location);
2580 builder.add_dfir(
2581 parse_quote! {
2582 #chain_ident = chain();
2583 #first_ident -> [0]#chain_ident;
2584 #second_ident -> [1]#chain_ident;
2585 },
2586 None,
2587 Some(&next_stmt_id.to_string()),
2588 );
2589 }
2590 BuildersOrCallback::Callback(_, node_callback) => {
2591 node_callback(node, next_stmt_id);
2592 }
2593 }
2594
2595 *next_stmt_id += 1;
2596
2597 ident_stack.push(chain_ident);
2598 }
2599
2600 HydroNode::ChainFirst { .. } => {
2601 let second_ident = ident_stack.pop().unwrap();
2602 let first_ident = ident_stack.pop().unwrap();
2603
2604 let chain_ident =
2605 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2606
2607 match builders_or_callback {
2608 BuildersOrCallback::Builders(graph_builders) => {
2609 let builder = graph_builders.get_dfir_mut(&out_location);
2610 builder.add_dfir(
2611 parse_quote! {
2612 #chain_ident = chain_first_n(1);
2613 #first_ident -> [0]#chain_ident;
2614 #second_ident -> [1]#chain_ident;
2615 },
2616 None,
2617 Some(&next_stmt_id.to_string()),
2618 );
2619 }
2620 BuildersOrCallback::Callback(_, node_callback) => {
2621 node_callback(node, next_stmt_id);
2622 }
2623 }
2624
2625 *next_stmt_id += 1;
2626
2627 ident_stack.push(chain_ident);
2628 }
2629
2630 HydroNode::CrossSingleton { right, .. } => {
2631 let right_ident = ident_stack.pop().unwrap();
2632 let left_ident = ident_stack.pop().unwrap();
2633
2634 let cross_ident =
2635 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2636
2637 match builders_or_callback {
2638 BuildersOrCallback::Builders(graph_builders) => {
2639 let builder = graph_builders.get_dfir_mut(&out_location);
2640
2641 if right.metadata().location_id.is_top_level()
2642 && right.metadata().collection_kind.is_bounded()
2643 {
2644 builder.add_dfir(
2645 parse_quote! {
2646 #cross_ident = cross_singleton();
2647 #left_ident -> [input]#cross_ident;
2648 #right_ident -> persist::<'static>() -> [single]#cross_ident;
2649 },
2650 None,
2651 Some(&next_stmt_id.to_string()),
2652 );
2653 } else {
2654 builder.add_dfir(
2655 parse_quote! {
2656 #cross_ident = cross_singleton();
2657 #left_ident -> [input]#cross_ident;
2658 #right_ident -> [single]#cross_ident;
2659 },
2660 None,
2661 Some(&next_stmt_id.to_string()),
2662 );
2663 }
2664 }
2665 BuildersOrCallback::Callback(_, node_callback) => {
2666 node_callback(node, next_stmt_id);
2667 }
2668 }
2669
2670 *next_stmt_id += 1;
2671
2672 ident_stack.push(cross_ident);
2673 }
2674
2675 HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
2676 let operator: syn::Ident = if matches!(node, HydroNode::CrossProduct { .. }) {
2677 parse_quote!(cross_join_multiset)
2678 } else {
2679 parse_quote!(join_multiset)
2680 };
2681
2682 let (HydroNode::CrossProduct { left, right, .. }
2683 | HydroNode::Join { left, right, .. }) = node
2684 else {
2685 unreachable!()
2686 };
2687
2688 let is_top_level = left.metadata().location_id.is_top_level()
2689 && right.metadata().location_id.is_top_level();
2690 let left_lifetime = if left.metadata().location_id.is_top_level() {
2691 quote!('static)
2692 } else {
2693 quote!('tick)
2694 };
2695
2696 let right_lifetime = if right.metadata().location_id.is_top_level() {
2697 quote!('static)
2698 } else {
2699 quote!('tick)
2700 };
2701
2702 let right_ident = ident_stack.pop().unwrap();
2703 let left_ident = ident_stack.pop().unwrap();
2704
2705 let stream_ident =
2706 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2707
2708 match builders_or_callback {
2709 BuildersOrCallback::Builders(graph_builders) => {
2710 let builder = graph_builders.get_dfir_mut(&out_location);
2711 builder.add_dfir(
2712 if is_top_level {
2713 parse_quote! {
2716 #stream_ident = #operator::<#left_lifetime, #right_lifetime>() -> multiset_delta();
2717 #left_ident -> [0]#stream_ident;
2718 #right_ident -> [1]#stream_ident;
2719 }
2720 } else {
2721 parse_quote! {
2722 #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
2723 #left_ident -> [0]#stream_ident;
2724 #right_ident -> [1]#stream_ident;
2725 }
2726 }
2727 ,
2728 None,
2729 Some(&next_stmt_id.to_string()),
2730 );
2731 }
2732 BuildersOrCallback::Callback(_, node_callback) => {
2733 node_callback(node, next_stmt_id);
2734 }
2735 }
2736
2737 *next_stmt_id += 1;
2738
2739 ident_stack.push(stream_ident);
2740 }
2741
2742 HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
2743 let operator: syn::Ident = if matches!(node, HydroNode::Difference { .. }) {
2744 parse_quote!(difference)
2745 } else {
2746 parse_quote!(anti_join)
2747 };
2748
2749 let (HydroNode::Difference { neg, .. } | HydroNode::AntiJoin { neg, .. }) =
2750 node
2751 else {
2752 unreachable!()
2753 };
2754
2755 let neg_lifetime = if neg.metadata().location_id.is_top_level() {
2756 quote!('static)
2757 } else {
2758 quote!('tick)
2759 };
2760
2761 let neg_ident = ident_stack.pop().unwrap();
2762 let pos_ident = ident_stack.pop().unwrap();
2763
2764 let stream_ident =
2765 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2766
2767 match builders_or_callback {
2768 BuildersOrCallback::Builders(graph_builders) => {
2769 let builder = graph_builders.get_dfir_mut(&out_location);
2770 builder.add_dfir(
2771 parse_quote! {
2772 #stream_ident = #operator::<'tick, #neg_lifetime>();
2773 #pos_ident -> [pos]#stream_ident;
2774 #neg_ident -> [neg]#stream_ident;
2775 },
2776 None,
2777 Some(&next_stmt_id.to_string()),
2778 );
2779 }
2780 BuildersOrCallback::Callback(_, node_callback) => {
2781 node_callback(node, next_stmt_id);
2782 }
2783 }
2784
2785 *next_stmt_id += 1;
2786
2787 ident_stack.push(stream_ident);
2788 }
2789
2790 HydroNode::ResolveFutures { .. } => {
2791 let input_ident = ident_stack.pop().unwrap();
2792
2793 let futures_ident =
2794 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2795
2796 match builders_or_callback {
2797 BuildersOrCallback::Builders(graph_builders) => {
2798 let builder = graph_builders.get_dfir_mut(&out_location);
2799 builder.add_dfir(
2800 parse_quote! {
2801 #futures_ident = #input_ident -> resolve_futures();
2802 },
2803 None,
2804 Some(&next_stmt_id.to_string()),
2805 );
2806 }
2807 BuildersOrCallback::Callback(_, node_callback) => {
2808 node_callback(node, next_stmt_id);
2809 }
2810 }
2811
2812 *next_stmt_id += 1;
2813
2814 ident_stack.push(futures_ident);
2815 }
2816
2817 HydroNode::ResolveFuturesOrdered { .. } => {
2818 let input_ident = ident_stack.pop().unwrap();
2819
2820 let futures_ident =
2821 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2822
2823 match builders_or_callback {
2824 BuildersOrCallback::Builders(graph_builders) => {
2825 let builder = graph_builders.get_dfir_mut(&out_location);
2826 builder.add_dfir(
2827 parse_quote! {
2828 #futures_ident = #input_ident -> resolve_futures_ordered();
2829 },
2830 None,
2831 Some(&next_stmt_id.to_string()),
2832 );
2833 }
2834 BuildersOrCallback::Callback(_, node_callback) => {
2835 node_callback(node, next_stmt_id);
2836 }
2837 }
2838
2839 *next_stmt_id += 1;
2840
2841 ident_stack.push(futures_ident);
2842 }
2843
2844 HydroNode::Map { f, .. } => {
2845 let input_ident = ident_stack.pop().unwrap();
2846
2847 let map_ident =
2848 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2849
2850 match builders_or_callback {
2851 BuildersOrCallback::Builders(graph_builders) => {
2852 let builder = graph_builders.get_dfir_mut(&out_location);
2853 builder.add_dfir(
2854 parse_quote! {
2855 #map_ident = #input_ident -> map(#f);
2856 },
2857 None,
2858 Some(&next_stmt_id.to_string()),
2859 );
2860 }
2861 BuildersOrCallback::Callback(_, node_callback) => {
2862 node_callback(node, next_stmt_id);
2863 }
2864 }
2865
2866 *next_stmt_id += 1;
2867
2868 ident_stack.push(map_ident);
2869 }
2870
2871 HydroNode::FlatMap { f, .. } => {
2872 let input_ident = ident_stack.pop().unwrap();
2873
2874 let flat_map_ident =
2875 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2876
2877 match builders_or_callback {
2878 BuildersOrCallback::Builders(graph_builders) => {
2879 let builder = graph_builders.get_dfir_mut(&out_location);
2880 builder.add_dfir(
2881 parse_quote! {
2882 #flat_map_ident = #input_ident -> flat_map(#f);
2883 },
2884 None,
2885 Some(&next_stmt_id.to_string()),
2886 );
2887 }
2888 BuildersOrCallback::Callback(_, node_callback) => {
2889 node_callback(node, next_stmt_id);
2890 }
2891 }
2892
2893 *next_stmt_id += 1;
2894
2895 ident_stack.push(flat_map_ident);
2896 }
2897
2898 HydroNode::Filter { f, .. } => {
2899 let input_ident = ident_stack.pop().unwrap();
2900
2901 let filter_ident =
2902 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2903
2904 match builders_or_callback {
2905 BuildersOrCallback::Builders(graph_builders) => {
2906 let builder = graph_builders.get_dfir_mut(&out_location);
2907 builder.add_dfir(
2908 parse_quote! {
2909 #filter_ident = #input_ident -> filter(#f);
2910 },
2911 None,
2912 Some(&next_stmt_id.to_string()),
2913 );
2914 }
2915 BuildersOrCallback::Callback(_, node_callback) => {
2916 node_callback(node, next_stmt_id);
2917 }
2918 }
2919
2920 *next_stmt_id += 1;
2921
2922 ident_stack.push(filter_ident);
2923 }
2924
2925 HydroNode::FilterMap { f, .. } => {
2926 let input_ident = ident_stack.pop().unwrap();
2927
2928 let filter_map_ident =
2929 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2930
2931 match builders_or_callback {
2932 BuildersOrCallback::Builders(graph_builders) => {
2933 let builder = graph_builders.get_dfir_mut(&out_location);
2934 builder.add_dfir(
2935 parse_quote! {
2936 #filter_map_ident = #input_ident -> filter_map(#f);
2937 },
2938 None,
2939 Some(&next_stmt_id.to_string()),
2940 );
2941 }
2942 BuildersOrCallback::Callback(_, node_callback) => {
2943 node_callback(node, next_stmt_id);
2944 }
2945 }
2946
2947 *next_stmt_id += 1;
2948
2949 ident_stack.push(filter_map_ident);
2950 }
2951
2952 HydroNode::Sort { .. } => {
2953 let input_ident = ident_stack.pop().unwrap();
2954
2955 let sort_ident =
2956 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2957
2958 match builders_or_callback {
2959 BuildersOrCallback::Builders(graph_builders) => {
2960 let builder = graph_builders.get_dfir_mut(&out_location);
2961 builder.add_dfir(
2962 parse_quote! {
2963 #sort_ident = #input_ident -> sort();
2964 },
2965 None,
2966 Some(&next_stmt_id.to_string()),
2967 );
2968 }
2969 BuildersOrCallback::Callback(_, node_callback) => {
2970 node_callback(node, next_stmt_id);
2971 }
2972 }
2973
2974 *next_stmt_id += 1;
2975
2976 ident_stack.push(sort_ident);
2977 }
2978
2979 HydroNode::DeferTick { .. } => {
2980 let input_ident = ident_stack.pop().unwrap();
2981
2982 let defer_tick_ident =
2983 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2984
2985 match builders_or_callback {
2986 BuildersOrCallback::Builders(graph_builders) => {
2987 let builder = graph_builders.get_dfir_mut(&out_location);
2988 builder.add_dfir(
2989 parse_quote! {
2990 #defer_tick_ident = #input_ident -> defer_tick_lazy();
2991 },
2992 None,
2993 Some(&next_stmt_id.to_string()),
2994 );
2995 }
2996 BuildersOrCallback::Callback(_, node_callback) => {
2997 node_callback(node, next_stmt_id);
2998 }
2999 }
3000
3001 *next_stmt_id += 1;
3002
3003 ident_stack.push(defer_tick_ident);
3004 }
3005
3006 HydroNode::Enumerate { input, .. } => {
3007 let input_ident = ident_stack.pop().unwrap();
3008
3009 let enumerate_ident =
3010 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3011
3012 match builders_or_callback {
3013 BuildersOrCallback::Builders(graph_builders) => {
3014 let builder = graph_builders.get_dfir_mut(&out_location);
3015 let lifetime = if input.metadata().location_id.is_top_level() {
3016 quote!('static)
3017 } else {
3018 quote!('tick)
3019 };
3020 builder.add_dfir(
3021 parse_quote! {
3022 #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
3023 },
3024 None,
3025 Some(&next_stmt_id.to_string()),
3026 );
3027 }
3028 BuildersOrCallback::Callback(_, node_callback) => {
3029 node_callback(node, next_stmt_id);
3030 }
3031 }
3032
3033 *next_stmt_id += 1;
3034
3035 ident_stack.push(enumerate_ident);
3036 }
3037
3038 HydroNode::Inspect { f, .. } => {
3039 let input_ident = ident_stack.pop().unwrap();
3040
3041 let inspect_ident =
3042 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3043
3044 match builders_or_callback {
3045 BuildersOrCallback::Builders(graph_builders) => {
3046 let builder = graph_builders.get_dfir_mut(&out_location);
3047 builder.add_dfir(
3048 parse_quote! {
3049 #inspect_ident = #input_ident -> inspect(#f);
3050 },
3051 None,
3052 Some(&next_stmt_id.to_string()),
3053 );
3054 }
3055 BuildersOrCallback::Callback(_, node_callback) => {
3056 node_callback(node, next_stmt_id);
3057 }
3058 }
3059
3060 *next_stmt_id += 1;
3061
3062 ident_stack.push(inspect_ident);
3063 }
3064
3065 HydroNode::Unique { input, .. } => {
3066 let input_ident = ident_stack.pop().unwrap();
3067
3068 let unique_ident =
3069 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3070
3071 match builders_or_callback {
3072 BuildersOrCallback::Builders(graph_builders) => {
3073 let builder = graph_builders.get_dfir_mut(&out_location);
3074 let lifetime = if input.metadata().location_id.is_top_level() {
3075 quote!('static)
3076 } else {
3077 quote!('tick)
3078 };
3079
3080 builder.add_dfir(
3081 parse_quote! {
3082 #unique_ident = #input_ident -> unique::<#lifetime>();
3083 },
3084 None,
3085 Some(&next_stmt_id.to_string()),
3086 );
3087 }
3088 BuildersOrCallback::Callback(_, node_callback) => {
3089 node_callback(node, next_stmt_id);
3090 }
3091 }
3092
3093 *next_stmt_id += 1;
3094
3095 ident_stack.push(unique_ident);
3096 }
3097
3098 HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } => {
3099 let operator: syn::Ident = if let HydroNode::Fold { input, .. } = node {
3100 if input.metadata().location_id.is_top_level()
3101 && input.metadata().collection_kind.is_bounded()
3102 {
3103 parse_quote!(fold_no_replay)
3104 } else {
3105 parse_quote!(fold)
3106 }
3107 } else if matches!(node, HydroNode::Scan { .. }) {
3108 parse_quote!(scan)
3109 } else if let HydroNode::FoldKeyed { input, .. } = node {
3110 if input.metadata().location_id.is_top_level()
3111 && input.metadata().collection_kind.is_bounded()
3112 {
3113 todo!("Fold keyed on a top-level bounded collection is not yet supported")
3114 } else {
3115 parse_quote!(fold_keyed)
3116 }
3117 } else {
3118 unreachable!()
3119 };
3120
3121 let (HydroNode::Fold { input, .. }
3122 | HydroNode::FoldKeyed { input, .. }
3123 | HydroNode::Scan { input, .. }) = node
3124 else {
3125 unreachable!()
3126 };
3127
3128 let lifetime = if input.metadata().location_id.is_top_level() {
3129 quote!('static)
3130 } else {
3131 quote!('tick)
3132 };
3133
3134 let input_ident = ident_stack.pop().unwrap();
3135
3136 let (HydroNode::Fold { init, acc, .. }
3137 | HydroNode::FoldKeyed { init, acc, .. }
3138 | HydroNode::Scan { init, acc, .. }) = &*node
3139 else {
3140 unreachable!()
3141 };
3142
3143 let fold_ident =
3144 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3145
3146 match builders_or_callback {
3147 BuildersOrCallback::Builders(graph_builders) => {
3148 if matches!(node, HydroNode::Fold { .. })
3149 && node.metadata().location_id.is_top_level()
3150 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3151 && graph_builders.singleton_intermediates()
3152 && !node.metadata().collection_kind.is_bounded()
3153 {
3154 let builder = graph_builders.get_dfir_mut(&out_location);
3155
3156 let acc: syn::Expr = parse_quote!({
3157 let mut __inner = #acc;
3158 move |__state, __value| {
3159 __inner(__state, __value);
3160 Some(__state.clone())
3161 }
3162 });
3163
3164 builder.add_dfir(
3165 parse_quote! {
3166 source_iter([(#init)()]) -> [0]#fold_ident;
3167 #input_ident -> scan::<#lifetime>(#init, #acc) -> [1]#fold_ident;
3168 #fold_ident = chain();
3169 },
3170 None,
3171 Some(&next_stmt_id.to_string()),
3172 );
3173 } else if matches!(node, HydroNode::FoldKeyed { .. })
3174 && node.metadata().location_id.is_top_level()
3175 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3176 && graph_builders.singleton_intermediates()
3177 && !node.metadata().collection_kind.is_bounded()
3178 {
3179 let builder = graph_builders.get_dfir_mut(&out_location);
3180
3181 let acc: syn::Expr = parse_quote!({
3182 let mut __init = #init;
3183 let mut __inner = #acc;
3184 move |__state, __kv: (_, _)| {
3185 let __state = __state
3187 .entry(::std::clone::Clone::clone(&__kv.0))
3188 .or_insert_with(|| (__init)());
3189 __inner(__state, __kv.1);
3190 Some((__kv.0, ::std::clone::Clone::clone(&*__state)))
3191 }
3192 });
3193
3194 builder.add_dfir(
3195 parse_quote! {
3196 #fold_ident = #input_ident -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #acc);
3197 },
3198 None,
3199 Some(&next_stmt_id.to_string()),
3200 );
3201 } else {
3202 let builder = graph_builders.get_dfir_mut(&out_location);
3203 builder.add_dfir(
3204 parse_quote! {
3205 #fold_ident = #input_ident -> #operator::<#lifetime>(#init, #acc);
3206 },
3207 None,
3208 Some(&next_stmt_id.to_string()),
3209 );
3210 }
3211 }
3212 BuildersOrCallback::Callback(_, node_callback) => {
3213 node_callback(node, next_stmt_id);
3214 }
3215 }
3216
3217 *next_stmt_id += 1;
3218
3219 ident_stack.push(fold_ident);
3220 }
3221
3222 HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
3223 let operator: syn::Ident = if let HydroNode::Reduce { input, .. } = node {
3224 if input.metadata().location_id.is_top_level()
3225 && input.metadata().collection_kind.is_bounded()
3226 {
3227 parse_quote!(reduce_no_replay)
3228 } else {
3229 parse_quote!(reduce)
3230 }
3231 } else if let HydroNode::ReduceKeyed { input, .. } = node {
3232 if input.metadata().location_id.is_top_level()
3233 && input.metadata().collection_kind.is_bounded()
3234 {
3235 todo!(
3236 "Calling keyed reduce on a top-level bounded collection is not supported"
3237 )
3238 } else {
3239 parse_quote!(reduce_keyed)
3240 }
3241 } else {
3242 unreachable!()
3243 };
3244
3245 let (HydroNode::Reduce { input, .. } | HydroNode::ReduceKeyed { input, .. }) = node
3246 else {
3247 unreachable!()
3248 };
3249
3250 let lifetime = if input.metadata().location_id.is_top_level() {
3251 quote!('static)
3252 } else {
3253 quote!('tick)
3254 };
3255
3256 let input_ident = ident_stack.pop().unwrap();
3257
3258 let (HydroNode::Reduce { f, .. } | HydroNode::ReduceKeyed { f, .. }) = &*node
3259 else {
3260 unreachable!()
3261 };
3262
3263 let reduce_ident =
3264 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3265
3266 match builders_or_callback {
3267 BuildersOrCallback::Builders(graph_builders) => {
3268 if matches!(node, HydroNode::Reduce { .. })
3269 && node.metadata().location_id.is_top_level()
3270 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3271 && graph_builders.singleton_intermediates()
3272 && !node.metadata().collection_kind.is_bounded()
3273 {
3274 todo!(
3275 "Reduce with optional intermediates is not yet supported in simulator"
3276 );
3277 } else if matches!(node, HydroNode::ReduceKeyed { .. })
3278 && node.metadata().location_id.is_top_level()
3279 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3280 && graph_builders.singleton_intermediates()
3281 && !node.metadata().collection_kind.is_bounded()
3282 {
3283 todo!(
3284 "Reduce keyed with optional intermediates is not yet supported in simulator"
3285 );
3286 } else {
3287 let builder = graph_builders.get_dfir_mut(&out_location);
3288 builder.add_dfir(
3289 parse_quote! {
3290 #reduce_ident = #input_ident -> #operator::<#lifetime>(#f);
3291 },
3292 None,
3293 Some(&next_stmt_id.to_string()),
3294 );
3295 }
3296 }
3297 BuildersOrCallback::Callback(_, node_callback) => {
3298 node_callback(node, next_stmt_id);
3299 }
3300 }
3301
3302 *next_stmt_id += 1;
3303
3304 ident_stack.push(reduce_ident);
3305 }
3306
3307 HydroNode::ReduceKeyedWatermark {
3308 f,
3309 input,
3310 metadata,
3311 ..
3312 } => {
3313 let lifetime = if input.metadata().location_id.is_top_level() {
3314 quote!('static)
3315 } else {
3316 quote!('tick)
3317 };
3318
3319 let watermark_ident = ident_stack.pop().unwrap();
3321 let input_ident = ident_stack.pop().unwrap();
3322
3323 let chain_ident = syn::Ident::new(
3324 &format!("reduce_keyed_watermark_chain_{}", *next_stmt_id),
3325 Span::call_site(),
3326 );
3327
3328 let fold_ident =
3329 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3330
3331 let agg_operator: syn::Ident = if input.metadata().location_id.is_top_level()
3332 && input.metadata().collection_kind.is_bounded()
3333 {
3334 parse_quote!(fold_no_replay)
3335 } else {
3336 parse_quote!(fold)
3337 };
3338
3339 match builders_or_callback {
3340 BuildersOrCallback::Builders(graph_builders) => {
3341 if metadata.location_id.is_top_level()
3342 && !(matches!(metadata.location_id, LocationId::Atomic(_)))
3343 && graph_builders.singleton_intermediates()
3344 && !metadata.collection_kind.is_bounded()
3345 {
3346 todo!(
3347 "Reduce keyed watermarked on a top-level bounded collection is not yet supported"
3348 )
3349 } else {
3350 let builder = graph_builders.get_dfir_mut(&out_location);
3351 builder.add_dfir(
3352 parse_quote! {
3353 #chain_ident = chain();
3354 #input_ident
3355 -> map(|x| (Some(x), None))
3356 -> [0]#chain_ident;
3357 #watermark_ident
3358 -> map(|watermark| (None, Some(watermark)))
3359 -> [1]#chain_ident;
3360
3361 #fold_ident = #chain_ident
3362 -> #agg_operator::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
3363 let __reduce_keyed_fn = #f;
3364 move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
3365 if let Some((k, v)) = opt_payload {
3366 if let Some(curr_watermark) = *opt_curr_watermark {
3367 if k <= curr_watermark {
3368 return;
3369 }
3370 }
3371 match map.entry(k) {
3372 ::std::collections::hash_map::Entry::Vacant(e) => {
3373 e.insert(v);
3374 }
3375 ::std::collections::hash_map::Entry::Occupied(mut e) => {
3376 __reduce_keyed_fn(e.get_mut(), v);
3377 }
3378 }
3379 } else {
3380 let watermark = opt_watermark.unwrap();
3381 if let Some(curr_watermark) = *opt_curr_watermark {
3382 if watermark <= curr_watermark {
3383 return;
3384 }
3385 }
3386 *opt_curr_watermark = opt_watermark;
3387 map.retain(|k, _| *k > watermark);
3388 }
3389 }
3390 })
3391 -> flat_map(|(map, _curr_watermark)| map);
3392 },
3393 None,
3394 Some(&next_stmt_id.to_string()),
3395 );
3396 }
3397 }
3398 BuildersOrCallback::Callback(_, node_callback) => {
3399 node_callback(node, next_stmt_id);
3400 }
3401 }
3402
3403 *next_stmt_id += 1;
3404
3405 ident_stack.push(fold_ident);
3406 }
3407
3408 HydroNode::Network {
3409 serialize_fn: serialize_pipeline,
3410 instantiate_fn,
3411 deserialize_fn: deserialize_pipeline,
3412 input,
3413 ..
3414 } => {
3415 let input_ident = ident_stack.pop().unwrap();
3416
3417 let receiver_stream_ident =
3418 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3419
3420 match builders_or_callback {
3421 BuildersOrCallback::Builders(graph_builders) => {
3422 let (sink_expr, source_expr) = match instantiate_fn {
3423 DebugInstantiate::Building => (
3424 syn::parse_quote!(DUMMY_SINK),
3425 syn::parse_quote!(DUMMY_SOURCE),
3426 ),
3427
3428 DebugInstantiate::Finalized(finalized) => {
3429 (finalized.sink.clone(), finalized.source.clone())
3430 }
3431 };
3432
3433 graph_builders.create_network(
3434 &input.metadata().location_id,
3435 &out_location,
3436 input_ident,
3437 &receiver_stream_ident,
3438 serialize_pipeline.as_ref(),
3439 sink_expr,
3440 source_expr,
3441 deserialize_pipeline.as_ref(),
3442 *next_stmt_id,
3443 );
3444 }
3445 BuildersOrCallback::Callback(_, node_callback) => {
3446 node_callback(node, next_stmt_id);
3447 }
3448 }
3449
3450 *next_stmt_id += 1;
3451
3452 ident_stack.push(receiver_stream_ident);
3453 }
3454
3455 HydroNode::ExternalInput {
3456 instantiate_fn,
3457 deserialize_fn: deserialize_pipeline,
3458 ..
3459 } => {
3460 let receiver_stream_ident =
3461 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3462
3463 match builders_or_callback {
3464 BuildersOrCallback::Builders(graph_builders) => {
3465 let (_, source_expr) = match instantiate_fn {
3466 DebugInstantiate::Building => (
3467 syn::parse_quote!(DUMMY_SINK),
3468 syn::parse_quote!(DUMMY_SOURCE),
3469 ),
3470
3471 DebugInstantiate::Finalized(finalized) => {
3472 (finalized.sink.clone(), finalized.source.clone())
3473 }
3474 };
3475
3476 graph_builders.create_external_source(
3477 &out_location,
3478 source_expr,
3479 &receiver_stream_ident,
3480 deserialize_pipeline.as_ref(),
3481 *next_stmt_id,
3482 );
3483 }
3484 BuildersOrCallback::Callback(_, node_callback) => {
3485 node_callback(node, next_stmt_id);
3486 }
3487 }
3488
3489 *next_stmt_id += 1;
3490
3491 ident_stack.push(receiver_stream_ident);
3492 }
3493
3494 HydroNode::Counter {
3495 tag,
3496 duration,
3497 prefix,
3498 ..
3499 } => {
3500 let input_ident = ident_stack.pop().unwrap();
3501
3502 let counter_ident =
3503 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3504
3505 match builders_or_callback {
3506 BuildersOrCallback::Builders(graph_builders) => {
3507 let arg = format!("{}({})", prefix, tag);
3508 let builder = graph_builders.get_dfir_mut(&out_location);
3509 builder.add_dfir(
3510 parse_quote! {
3511 #counter_ident = #input_ident -> _counter(#arg, #duration);
3512 },
3513 None,
3514 Some(&next_stmt_id.to_string()),
3515 );
3516 }
3517 BuildersOrCallback::Callback(_, node_callback) => {
3518 node_callback(node, next_stmt_id);
3519 }
3520 }
3521
3522 *next_stmt_id += 1;
3523
3524 ident_stack.push(counter_ident);
3525 }
3526 }
3527 },
3528 seen_tees,
3529 false,
3530 );
3531
3532 ident_stack
3533 .pop()
3534 .expect("ident_stack should have exactly one element after traversal")
3535 }
3536
3537 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
3538 match self {
3539 HydroNode::Placeholder => {
3540 panic!()
3541 }
3542 HydroNode::Cast { .. } | HydroNode::ObserveNonDet { .. } => {}
3543 HydroNode::Source { source, .. } => match source {
3544 HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
3545 HydroSource::ExternalNetwork()
3546 | HydroSource::Spin()
3547 | HydroSource::ClusterMembers(_)
3548 | HydroSource::Embedded(_) => {} },
3550 HydroNode::SingletonSource { value, .. } => {
3551 transform(value);
3552 }
3553 HydroNode::CycleSource { .. }
3554 | HydroNode::Tee { .. }
3555 | HydroNode::YieldConcat { .. }
3556 | HydroNode::BeginAtomic { .. }
3557 | HydroNode::EndAtomic { .. }
3558 | HydroNode::Batch { .. }
3559 | HydroNode::Chain { .. }
3560 | HydroNode::ChainFirst { .. }
3561 | HydroNode::CrossProduct { .. }
3562 | HydroNode::CrossSingleton { .. }
3563 | HydroNode::ResolveFutures { .. }
3564 | HydroNode::ResolveFuturesOrdered { .. }
3565 | HydroNode::Join { .. }
3566 | HydroNode::Difference { .. }
3567 | HydroNode::AntiJoin { .. }
3568 | HydroNode::DeferTick { .. }
3569 | HydroNode::Enumerate { .. }
3570 | HydroNode::Unique { .. }
3571 | HydroNode::Sort { .. } => {}
3572 HydroNode::Map { f, .. }
3573 | HydroNode::FlatMap { f, .. }
3574 | HydroNode::Filter { f, .. }
3575 | HydroNode::FilterMap { f, .. }
3576 | HydroNode::Inspect { f, .. }
3577 | HydroNode::Reduce { f, .. }
3578 | HydroNode::ReduceKeyed { f, .. }
3579 | HydroNode::ReduceKeyedWatermark { f, .. } => {
3580 transform(f);
3581 }
3582 HydroNode::Fold { init, acc, .. }
3583 | HydroNode::Scan { init, acc, .. }
3584 | HydroNode::FoldKeyed { init, acc, .. } => {
3585 transform(init);
3586 transform(acc);
3587 }
3588 HydroNode::Network {
3589 serialize_fn,
3590 deserialize_fn,
3591 ..
3592 } => {
3593 if let Some(serialize_fn) = serialize_fn {
3594 transform(serialize_fn);
3595 }
3596 if let Some(deserialize_fn) = deserialize_fn {
3597 transform(deserialize_fn);
3598 }
3599 }
3600 HydroNode::ExternalInput { deserialize_fn, .. } => {
3601 if let Some(deserialize_fn) = deserialize_fn {
3602 transform(deserialize_fn);
3603 }
3604 }
3605 HydroNode::Counter { duration, .. } => {
3606 transform(duration);
3607 }
3608 }
3609 }
3610
3611 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
3612 &self.metadata().op
3613 }
3614
3615 pub fn metadata(&self) -> &HydroIrMetadata {
3616 match self {
3617 HydroNode::Placeholder => {
3618 panic!()
3619 }
3620 HydroNode::Cast { metadata, .. } => metadata,
3621 HydroNode::ObserveNonDet { metadata, .. } => metadata,
3622 HydroNode::Source { metadata, .. } => metadata,
3623 HydroNode::SingletonSource { metadata, .. } => metadata,
3624 HydroNode::CycleSource { metadata, .. } => metadata,
3625 HydroNode::Tee { metadata, .. } => metadata,
3626 HydroNode::YieldConcat { metadata, .. } => metadata,
3627 HydroNode::BeginAtomic { metadata, .. } => metadata,
3628 HydroNode::EndAtomic { metadata, .. } => metadata,
3629 HydroNode::Batch { metadata, .. } => metadata,
3630 HydroNode::Chain { metadata, .. } => metadata,
3631 HydroNode::ChainFirst { metadata, .. } => metadata,
3632 HydroNode::CrossProduct { metadata, .. } => metadata,
3633 HydroNode::CrossSingleton { metadata, .. } => metadata,
3634 HydroNode::Join { metadata, .. } => metadata,
3635 HydroNode::Difference { metadata, .. } => metadata,
3636 HydroNode::AntiJoin { metadata, .. } => metadata,
3637 HydroNode::ResolveFutures { metadata, .. } => metadata,
3638 HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
3639 HydroNode::Map { metadata, .. } => metadata,
3640 HydroNode::FlatMap { metadata, .. } => metadata,
3641 HydroNode::Filter { metadata, .. } => metadata,
3642 HydroNode::FilterMap { metadata, .. } => metadata,
3643 HydroNode::DeferTick { metadata, .. } => metadata,
3644 HydroNode::Enumerate { metadata, .. } => metadata,
3645 HydroNode::Inspect { metadata, .. } => metadata,
3646 HydroNode::Unique { metadata, .. } => metadata,
3647 HydroNode::Sort { metadata, .. } => metadata,
3648 HydroNode::Scan { metadata, .. } => metadata,
3649 HydroNode::Fold { metadata, .. } => metadata,
3650 HydroNode::FoldKeyed { metadata, .. } => metadata,
3651 HydroNode::Reduce { metadata, .. } => metadata,
3652 HydroNode::ReduceKeyed { metadata, .. } => metadata,
3653 HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
3654 HydroNode::ExternalInput { metadata, .. } => metadata,
3655 HydroNode::Network { metadata, .. } => metadata,
3656 HydroNode::Counter { metadata, .. } => metadata,
3657 }
3658 }
3659
3660 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
3661 &mut self.metadata_mut().op
3662 }
3663
3664 pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
3665 match self {
3666 HydroNode::Placeholder => {
3667 panic!()
3668 }
3669 HydroNode::Cast { metadata, .. } => metadata,
3670 HydroNode::ObserveNonDet { metadata, .. } => metadata,
3671 HydroNode::Source { metadata, .. } => metadata,
3672 HydroNode::SingletonSource { metadata, .. } => metadata,
3673 HydroNode::CycleSource { metadata, .. } => metadata,
3674 HydroNode::Tee { metadata, .. } => metadata,
3675 HydroNode::YieldConcat { metadata, .. } => metadata,
3676 HydroNode::BeginAtomic { metadata, .. } => metadata,
3677 HydroNode::EndAtomic { metadata, .. } => metadata,
3678 HydroNode::Batch { metadata, .. } => metadata,
3679 HydroNode::Chain { metadata, .. } => metadata,
3680 HydroNode::ChainFirst { metadata, .. } => metadata,
3681 HydroNode::CrossProduct { metadata, .. } => metadata,
3682 HydroNode::CrossSingleton { metadata, .. } => metadata,
3683 HydroNode::Join { metadata, .. } => metadata,
3684 HydroNode::Difference { metadata, .. } => metadata,
3685 HydroNode::AntiJoin { metadata, .. } => metadata,
3686 HydroNode::ResolveFutures { metadata, .. } => metadata,
3687 HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
3688 HydroNode::Map { metadata, .. } => metadata,
3689 HydroNode::FlatMap { metadata, .. } => metadata,
3690 HydroNode::Filter { metadata, .. } => metadata,
3691 HydroNode::FilterMap { metadata, .. } => metadata,
3692 HydroNode::DeferTick { metadata, .. } => metadata,
3693 HydroNode::Enumerate { metadata, .. } => metadata,
3694 HydroNode::Inspect { metadata, .. } => metadata,
3695 HydroNode::Unique { metadata, .. } => metadata,
3696 HydroNode::Sort { metadata, .. } => metadata,
3697 HydroNode::Scan { metadata, .. } => metadata,
3698 HydroNode::Fold { metadata, .. } => metadata,
3699 HydroNode::FoldKeyed { metadata, .. } => metadata,
3700 HydroNode::Reduce { metadata, .. } => metadata,
3701 HydroNode::ReduceKeyed { metadata, .. } => metadata,
3702 HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
3703 HydroNode::ExternalInput { metadata, .. } => metadata,
3704 HydroNode::Network { metadata, .. } => metadata,
3705 HydroNode::Counter { metadata, .. } => metadata,
3706 }
3707 }
3708
3709 pub fn input(&self) -> Vec<&HydroNode> {
3710 match self {
3711 HydroNode::Placeholder => {
3712 panic!()
3713 }
3714 HydroNode::Source { .. }
3715 | HydroNode::SingletonSource { .. }
3716 | HydroNode::ExternalInput { .. }
3717 | HydroNode::CycleSource { .. }
3718 | HydroNode::Tee { .. } => {
3719 vec![]
3721 }
3722 HydroNode::Cast { inner, .. }
3723 | HydroNode::ObserveNonDet { inner, .. }
3724 | HydroNode::YieldConcat { inner, .. }
3725 | HydroNode::BeginAtomic { inner, .. }
3726 | HydroNode::EndAtomic { inner, .. }
3727 | HydroNode::Batch { inner, .. } => {
3728 vec![inner]
3729 }
3730 HydroNode::Chain { first, second, .. } => {
3731 vec![first, second]
3732 }
3733 HydroNode::ChainFirst { first, second, .. } => {
3734 vec![first, second]
3735 }
3736 HydroNode::CrossProduct { left, right, .. }
3737 | HydroNode::CrossSingleton { left, right, .. }
3738 | HydroNode::Join { left, right, .. } => {
3739 vec![left, right]
3740 }
3741 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
3742 vec![pos, neg]
3743 }
3744 HydroNode::Map { input, .. }
3745 | HydroNode::FlatMap { input, .. }
3746 | HydroNode::Filter { input, .. }
3747 | HydroNode::FilterMap { input, .. }
3748 | HydroNode::Sort { input, .. }
3749 | HydroNode::DeferTick { input, .. }
3750 | HydroNode::Enumerate { input, .. }
3751 | HydroNode::Inspect { input, .. }
3752 | HydroNode::Unique { input, .. }
3753 | HydroNode::Network { input, .. }
3754 | HydroNode::Counter { input, .. }
3755 | HydroNode::ResolveFutures { input, .. }
3756 | HydroNode::ResolveFuturesOrdered { input, .. }
3757 | HydroNode::Fold { input, .. }
3758 | HydroNode::FoldKeyed { input, .. }
3759 | HydroNode::Reduce { input, .. }
3760 | HydroNode::ReduceKeyed { input, .. }
3761 | HydroNode::Scan { input, .. } => {
3762 vec![input]
3763 }
3764 HydroNode::ReduceKeyedWatermark {
3765 input, watermark, ..
3766 } => {
3767 vec![input, watermark]
3768 }
3769 }
3770 }
3771
3772 pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
3773 self.input()
3774 .iter()
3775 .map(|input_node| input_node.metadata())
3776 .collect()
3777 }
3778
3779 pub fn print_root(&self) -> String {
3780 match self {
3781 HydroNode::Placeholder => {
3782 panic!()
3783 }
3784 HydroNode::Cast { .. } => "Cast()".to_owned(),
3785 HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_owned(),
3786 HydroNode::Source { source, .. } => format!("Source({:?})", source),
3787 HydroNode::SingletonSource { value, .. } => format!("SingletonSource({:?})", value),
3788 HydroNode::CycleSource { cycle_id, .. } => format!("CycleSource({})", cycle_id),
3789 HydroNode::Tee { inner, .. } => format!("Tee({})", inner.0.borrow().print_root()),
3790 HydroNode::YieldConcat { .. } => "YieldConcat()".to_owned(),
3791 HydroNode::BeginAtomic { .. } => "BeginAtomic()".to_owned(),
3792 HydroNode::EndAtomic { .. } => "EndAtomic()".to_owned(),
3793 HydroNode::Batch { .. } => "Batch()".to_owned(),
3794 HydroNode::Chain { first, second, .. } => {
3795 format!("Chain({}, {})", first.print_root(), second.print_root())
3796 }
3797 HydroNode::ChainFirst { first, second, .. } => {
3798 format!(
3799 "ChainFirst({}, {})",
3800 first.print_root(),
3801 second.print_root()
3802 )
3803 }
3804 HydroNode::CrossProduct { left, right, .. } => {
3805 format!(
3806 "CrossProduct({}, {})",
3807 left.print_root(),
3808 right.print_root()
3809 )
3810 }
3811 HydroNode::CrossSingleton { left, right, .. } => {
3812 format!(
3813 "CrossSingleton({}, {})",
3814 left.print_root(),
3815 right.print_root()
3816 )
3817 }
3818 HydroNode::Join { left, right, .. } => {
3819 format!("Join({}, {})", left.print_root(), right.print_root())
3820 }
3821 HydroNode::Difference { pos, neg, .. } => {
3822 format!("Difference({}, {})", pos.print_root(), neg.print_root())
3823 }
3824 HydroNode::AntiJoin { pos, neg, .. } => {
3825 format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
3826 }
3827 HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_owned(),
3828 HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_owned(),
3829 HydroNode::Map { f, .. } => format!("Map({:?})", f),
3830 HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
3831 HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
3832 HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
3833 HydroNode::DeferTick { .. } => "DeferTick()".to_owned(),
3834 HydroNode::Enumerate { .. } => "Enumerate()".to_owned(),
3835 HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
3836 HydroNode::Unique { .. } => "Unique()".to_owned(),
3837 HydroNode::Sort { .. } => "Sort()".to_owned(),
3838 HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
3839 HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
3840 HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
3841 HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
3842 HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
3843 HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
3844 HydroNode::Network { .. } => "Network()".to_owned(),
3845 HydroNode::ExternalInput { .. } => "ExternalInput()".to_owned(),
3846 HydroNode::Counter { tag, duration, .. } => {
3847 format!("Counter({:?}, {:?})", tag, duration)
3848 }
3849 }
3850 }
3851}
3852
3853#[cfg(feature = "build")]
3854fn instantiate_network<'a, D>(
3855 env: &mut D::InstantiateEnv,
3856 from_location: &LocationId,
3857 to_location: &LocationId,
3858 processes: &SparseSecondaryMap<LocationKey, D::Process>,
3859 clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
3860 name: Option<&str>,
3861) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
3862where
3863 D: Deploy<'a>,
3864{
3865 let ((sink, source), connect_fn) = match (from_location, to_location) {
3866 (&LocationId::Process(from), &LocationId::Process(to)) => {
3867 let from_node = processes
3868 .get(from)
3869 .unwrap_or_else(|| {
3870 panic!("A process used in the graph was not instantiated: {}", from)
3871 })
3872 .clone();
3873 let to_node = processes
3874 .get(to)
3875 .unwrap_or_else(|| {
3876 panic!("A process used in the graph was not instantiated: {}", to)
3877 })
3878 .clone();
3879
3880 let sink_port = from_node.next_port();
3881 let source_port = to_node.next_port();
3882
3883 (
3884 D::o2o_sink_source(env, &from_node, &sink_port, &to_node, &source_port, name),
3885 D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
3886 )
3887 }
3888 (&LocationId::Process(from), &LocationId::Cluster(to)) => {
3889 let from_node = processes
3890 .get(from)
3891 .unwrap_or_else(|| {
3892 panic!("A process used in the graph was not instantiated: {}", from)
3893 })
3894 .clone();
3895 let to_node = clusters
3896 .get(to)
3897 .unwrap_or_else(|| {
3898 panic!("A cluster used in the graph was not instantiated: {}", to)
3899 })
3900 .clone();
3901
3902 let sink_port = from_node.next_port();
3903 let source_port = to_node.next_port();
3904
3905 (
3906 D::o2m_sink_source(&from_node, &sink_port, &to_node, &source_port),
3907 D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
3908 )
3909 }
3910 (&LocationId::Cluster(from), &LocationId::Process(to)) => {
3911 let from_node = clusters
3912 .get(from)
3913 .unwrap_or_else(|| {
3914 panic!("A cluster used in the graph was not instantiated: {}", from)
3915 })
3916 .clone();
3917 let to_node = processes
3918 .get(to)
3919 .unwrap_or_else(|| {
3920 panic!("A process used in the graph was not instantiated: {}", to)
3921 })
3922 .clone();
3923
3924 let sink_port = from_node.next_port();
3925 let source_port = to_node.next_port();
3926
3927 (
3928 D::m2o_sink_source(&from_node, &sink_port, &to_node, &source_port),
3929 D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
3930 )
3931 }
3932 (&LocationId::Cluster(from), &LocationId::Cluster(to)) => {
3933 let from_node = clusters
3934 .get(from)
3935 .unwrap_or_else(|| {
3936 panic!("A cluster used in the graph was not instantiated: {}", from)
3937 })
3938 .clone();
3939 let to_node = clusters
3940 .get(to)
3941 .unwrap_or_else(|| {
3942 panic!("A cluster used in the graph was not instantiated: {}", to)
3943 })
3944 .clone();
3945
3946 let sink_port = from_node.next_port();
3947 let source_port = to_node.next_port();
3948
3949 (
3950 D::m2m_sink_source(&from_node, &sink_port, &to_node, &source_port),
3951 D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
3952 )
3953 }
3954 (LocationId::Tick(_, _), _) => panic!(),
3955 (_, LocationId::Tick(_, _)) => panic!(),
3956 (LocationId::Atomic(_), _) => panic!(),
3957 (_, LocationId::Atomic(_)) => panic!(),
3958 };
3959 (sink, source, connect_fn)
3960}
3961
3962#[cfg(test)]
3963mod test {
3964 use std::mem::size_of;
3965
3966 use stageleft::{QuotedWithContext, q};
3967
3968 use super::*;
3969
3970 #[test]
3971 #[cfg_attr(
3972 not(feature = "build"),
3973 ignore = "expects inclusion of feature-gated fields"
3974 )]
3975 fn hydro_node_size() {
3976 assert_eq!(size_of::<HydroNode>(), 240);
3977 }
3978
3979 #[test]
3980 #[cfg_attr(
3981 not(feature = "build"),
3982 ignore = "expects inclusion of feature-gated fields"
3983 )]
3984 fn hydro_root_size() {
3985 assert_eq!(size_of::<HydroRoot>(), 136);
3986 }
3987
3988 #[test]
3989 fn test_simplify_q_macro_basic() {
3990 let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
3992 let result = simplify_q_macro(simple_expr.clone());
3993 assert_eq!(result, simple_expr);
3994 }
3995
3996 #[test]
3997 fn test_simplify_q_macro_actual_stageleft_call() {
3998 let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
4000 let result = simplify_q_macro(stageleft_call);
4001 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
4004 }
4005
4006 #[test]
4007 fn test_closure_no_pipe_at_start() {
4008 let stageleft_call = q!({
4010 let foo = 123;
4011 move |b: usize| b + foo
4012 })
4013 .splice_fn1_ctx(&());
4014 let result = simplify_q_macro(stageleft_call);
4015 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
4016 }
4017}