1use std::fmt::Debug;
17use std::marker::PhantomData;
18use std::num::ParseIntError;
19use std::time::Duration;
20
21use bytes::{Bytes, BytesMut};
22use futures::stream::Stream as FuturesStream;
23use proc_macro2::Span;
24use quote::quote;
25use serde::de::DeserializeOwned;
26use serde::{Deserialize, Serialize};
27use slotmap::{Key, new_key_type};
28use stageleft::runtime_support::{FreeVariableWithContextWithProps, QuoteTokens};
29use stageleft::{QuotedWithContext, q, quote_type};
30use syn::parse_quote;
31use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec};
32
33use crate::compile::ir::{DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot, HydroSource};
34use crate::forward_handle::ForwardRef;
35#[cfg(stageleft_runtime)]
36use crate::forward_handle::{CycleCollection, ForwardHandle};
37use crate::live_collections::boundedness::{Bounded, Unbounded};
38use crate::live_collections::keyed_stream::KeyedStream;
39use crate::live_collections::singleton::Singleton;
40use crate::live_collections::stream::{
41 ExactlyOnce, NoOrder, Ordering, Retries, Stream, TotalOrder,
42};
43use crate::location::dynamic::LocationId;
44use crate::location::external_process::{
45 ExternalBincodeBidi, ExternalBincodeSink, ExternalBytesPort, Many, NotMany,
46};
47use crate::nondet::NonDet;
48#[cfg(feature = "sim")]
49use crate::sim::SimSender;
50use crate::staging_util::get_this_crate;
51
52pub mod dynamic;
53
54#[expect(missing_docs, reason = "TODO")]
55pub mod external_process;
56pub use external_process::External;
57
58#[expect(missing_docs, reason = "TODO")]
59pub mod process;
60pub use process::Process;
61
62#[expect(missing_docs, reason = "TODO")]
63pub mod cluster;
64pub use cluster::Cluster;
65
66#[expect(missing_docs, reason = "TODO")]
67pub mod member_id;
68pub use member_id::{MemberId, TaglessMemberId};
69
70#[expect(missing_docs, reason = "TODO")]
71pub mod tick;
72pub use tick::{Atomic, NoTick, Tick};
73
74#[expect(missing_docs, reason = "TODO")]
75#[derive(PartialEq, Eq, Clone, Debug, Hash, Serialize, Deserialize)]
76pub enum MembershipEvent {
77 Joined,
78 Left,
79}
80
81#[expect(missing_docs, reason = "TODO")]
82#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
83pub enum NetworkHint {
84 Auto,
85 TcpPort(Option<u16>),
86}
87
88pub(crate) fn check_matching_location<'a, L: Location<'a>>(l1: &L, l2: &L) {
89 assert_eq!(Location::id(l1), Location::id(l2), "locations do not match");
90}
91
92#[stageleft::export(LocationKey)]
93new_key_type! {
94 pub struct LocationKey;
96}
97
98impl std::fmt::Display for LocationKey {
99 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100 write!(f, "loc{:?}", self.data()) }
102}
103
104impl std::str::FromStr for LocationKey {
107 type Err = Option<ParseIntError>;
108
109 fn from_str(s: &str) -> Result<Self, Self::Err> {
110 let nvn = s.strip_prefix("loc").ok_or(None)?;
111 let (idx, ver) = nvn.split_once("v").ok_or(None)?;
112 let idx: u64 = idx.parse()?;
113 let ver: u64 = ver.parse()?;
114 Ok(slotmap::KeyData::from_ffi((ver << 32) | idx).into())
115 }
116}
117
118impl LocationKey {
119 pub const FIRST: Self = Self(slotmap::KeyData::from_ffi(0x0000000100000001)); #[cfg(test)]
125 pub const TEST_KEY_1: Self = Self(slotmap::KeyData::from_ffi(0x000000ff00000001)); #[cfg(test)]
129 pub const TEST_KEY_2: Self = Self(slotmap::KeyData::from_ffi(0x000000ff00000002)); }
131
132impl<Ctx> FreeVariableWithContextWithProps<Ctx, ()> for LocationKey {
134 type O = LocationKey;
135
136 fn to_tokens(self, _ctx: &Ctx) -> (QuoteTokens, ())
137 where
138 Self: Sized,
139 {
140 let root = get_this_crate();
141 let n = Key::data(&self).as_ffi();
142 (
143 QuoteTokens {
144 prelude: None,
145 expr: Some(quote! {
146 #root::location::LocationKey::from(#root::runtime_support::slotmap::KeyData::from_ffi(#n))
147 }),
148 },
149 (),
150 )
151 }
152}
153
154#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq, Serialize)]
156pub enum LocationType {
157 Process,
159 Cluster,
161 External,
163}
164
165#[expect(
179 private_bounds,
180 reason = "only internal Hydro code can define location types"
181)]
182pub trait Location<'a>: dynamic::DynLocation {
183 type Root: Location<'a>;
188
189 fn root(&self) -> Self::Root;
194
195 fn try_tick(&self) -> Option<Tick<Self>> {
202 if Self::is_top_level() {
203 let id = self.flow_state().borrow_mut().next_clock_id();
204 Some(Tick {
205 id,
206 l: self.clone(),
207 })
208 } else {
209 None
210 }
211 }
212
213 fn id(&self) -> LocationId {
215 dynamic::DynLocation::id(self)
216 }
217
218 fn tick(&self) -> Tick<Self>
244 where
245 Self: NoTick,
246 {
247 let id = self.flow_state().borrow_mut().next_clock_id();
248 Tick {
249 id,
250 l: self.clone(),
251 }
252 }
253
254 fn spin(&self) -> Stream<(), Self, Unbounded, TotalOrder, ExactlyOnce>
279 where
280 Self: Sized + NoTick,
281 {
282 Stream::new(
283 self.clone(),
284 HydroNode::Source {
285 source: HydroSource::Spin(),
286 metadata: self.new_node_metadata(Stream::<
287 (),
288 Self,
289 Unbounded,
290 TotalOrder,
291 ExactlyOnce,
292 >::collection_kind()),
293 },
294 )
295 }
296
297 fn source_stream<T, E>(
318 &self,
319 e: impl QuotedWithContext<'a, E, Self>,
320 ) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
321 where
322 E: FuturesStream<Item = T> + Unpin,
323 Self: Sized + NoTick,
324 {
325 let e = e.splice_untyped_ctx(self);
326
327 Stream::new(
328 self.clone(),
329 HydroNode::Source {
330 source: HydroSource::Stream(e.into()),
331 metadata: self.new_node_metadata(Stream::<
332 T,
333 Self,
334 Unbounded,
335 TotalOrder,
336 ExactlyOnce,
337 >::collection_kind()),
338 },
339 )
340 }
341
342 fn source_iter<T, E>(
364 &self,
365 e: impl QuotedWithContext<'a, E, Self>,
366 ) -> Stream<T, Self, Bounded, TotalOrder, ExactlyOnce>
367 where
368 E: IntoIterator<Item = T>,
369 Self: Sized + NoTick,
370 {
371 let e = e.splice_typed_ctx(self);
372
373 Stream::new(
374 self.clone(),
375 HydroNode::Source {
376 source: HydroSource::Iter(e.into()),
377 metadata: self.new_node_metadata(
378 Stream::<T, Self, Bounded, TotalOrder, ExactlyOnce>::collection_kind(),
379 ),
380 },
381 )
382 }
383
384 fn source_cluster_members<C: 'a>(
418 &self,
419 cluster: &Cluster<'a, C>,
420 ) -> KeyedStream<MemberId<C>, MembershipEvent, Self, Unbounded>
421 where
422 Self: Sized + NoTick,
423 {
424 Stream::new(
425 self.clone(),
426 HydroNode::Source {
427 source: HydroSource::ClusterMembers(cluster.id()),
428 metadata: self.new_node_metadata(Stream::<
429 (TaglessMemberId, MembershipEvent),
430 Self,
431 Unbounded,
432 TotalOrder,
433 ExactlyOnce,
434 >::collection_kind()),
435 },
436 )
437 .map(q!(|(k, v)| (MemberId::from_tagless(k), v)))
438 .into_keyed()
439 }
440
441 fn source_external_bytes<L>(
449 &self,
450 from: &External<L>,
451 ) -> (
452 ExternalBytesPort,
453 Stream<BytesMut, Self, Unbounded, TotalOrder, ExactlyOnce>,
454 )
455 where
456 Self: Sized + NoTick,
457 {
458 let (port, stream, sink) =
459 self.bind_single_client::<_, Bytes, LengthDelimitedCodec>(from, NetworkHint::Auto);
460
461 sink.complete(self.source_iter(q!([])));
462
463 (port, stream)
464 }
465
466 #[expect(clippy::type_complexity, reason = "stream markers")]
473 fn source_external_bincode<L, T, O: Ordering, R: Retries>(
474 &self,
475 from: &External<L>,
476 ) -> (
477 ExternalBincodeSink<T, NotMany, O, R>,
478 Stream<T, Self, Unbounded, O, R>,
479 )
480 where
481 Self: Sized + NoTick,
482 T: Serialize + DeserializeOwned,
483 {
484 let (port, stream, sink) = self.bind_single_client_bincode::<_, T, ()>(from);
485 sink.complete(self.source_iter(q!([])));
486
487 (
488 ExternalBincodeSink {
489 process_key: from.key,
490 port_id: port.port_id,
491 _phantom: PhantomData,
492 },
493 stream.weaken_ordering().weaken_retries(),
494 )
495 }
496
497 #[cfg(feature = "sim")]
502 #[expect(clippy::type_complexity, reason = "stream markers")]
503 fn sim_input<T, O: Ordering, R: Retries>(
504 &self,
505 ) -> (SimSender<T, O, R>, Stream<T, Self, Unbounded, O, R>)
506 where
507 Self: Sized + NoTick,
508 T: Serialize + DeserializeOwned,
509 {
510 let external_location: External<'a, ()> = External {
511 key: LocationKey::FIRST,
512 flow_state: self.flow_state().clone(),
513 _phantom: PhantomData,
514 };
515
516 let (external, stream) = self.source_external_bincode(&external_location);
517
518 (SimSender(external.port_id, PhantomData), stream)
519 }
520
521 fn embedded_input<T>(
527 &self,
528 name: impl Into<String>,
529 ) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
530 where
531 Self: Sized + NoTick,
532 {
533 let ident = syn::Ident::new(&name.into(), Span::call_site());
534
535 Stream::new(
536 self.clone(),
537 HydroNode::Source {
538 source: HydroSource::Embedded(ident),
539 metadata: self.new_node_metadata(Stream::<
540 T,
541 Self,
542 Unbounded,
543 TotalOrder,
544 ExactlyOnce,
545 >::collection_kind()),
546 },
547 )
548 }
549
550 #[expect(clippy::type_complexity, reason = "stream markers")]
595 fn bind_single_client<L, T, Codec: Encoder<T> + Decoder>(
596 &self,
597 from: &External<L>,
598 port_hint: NetworkHint,
599 ) -> (
600 ExternalBytesPort<NotMany>,
601 Stream<<Codec as Decoder>::Item, Self, Unbounded, TotalOrder, ExactlyOnce>,
602 ForwardHandle<'a, Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>>,
603 )
604 where
605 Self: Sized + NoTick,
606 {
607 let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
608
609 let (fwd_ref, to_sink) =
610 self.forward_ref::<Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>>();
611 let mut flow_state_borrow = self.flow_state().borrow_mut();
612
613 flow_state_borrow.push_root(HydroRoot::SendExternal {
614 to_external_key: from.key,
615 to_port_id: next_external_port_id,
616 to_many: false,
617 unpaired: false,
618 serialize_fn: None,
619 instantiate_fn: DebugInstantiate::Building,
620 input: Box::new(to_sink.ir_node.into_inner()),
621 op_metadata: HydroIrOpMetadata::new(),
622 });
623
624 let raw_stream: Stream<
625 Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>,
626 Self,
627 Unbounded,
628 TotalOrder,
629 ExactlyOnce,
630 > = Stream::new(
631 self.clone(),
632 HydroNode::ExternalInput {
633 from_external_key: from.key,
634 from_port_id: next_external_port_id,
635 from_many: false,
636 codec_type: quote_type::<Codec>().into(),
637 port_hint,
638 instantiate_fn: DebugInstantiate::Building,
639 deserialize_fn: None,
640 metadata: self.new_node_metadata(Stream::<
641 Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>,
642 Self,
643 Unbounded,
644 TotalOrder,
645 ExactlyOnce,
646 >::collection_kind()),
647 },
648 );
649
650 (
651 ExternalBytesPort {
652 process_key: from.key,
653 port_id: next_external_port_id,
654 _phantom: PhantomData,
655 },
656 raw_stream.flatten_ordered(),
657 fwd_ref,
658 )
659 }
660
661 #[expect(clippy::type_complexity, reason = "stream markers")]
671 fn bind_single_client_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
672 &self,
673 from: &External<L>,
674 ) -> (
675 ExternalBincodeBidi<InT, OutT, NotMany>,
676 Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce>,
677 ForwardHandle<'a, Stream<OutT, Self, Unbounded, TotalOrder, ExactlyOnce>>,
678 )
679 where
680 Self: Sized + NoTick,
681 {
682 let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
683
684 let (fwd_ref, to_sink) =
685 self.forward_ref::<Stream<OutT, Self, Unbounded, TotalOrder, ExactlyOnce>>();
686 let mut flow_state_borrow = self.flow_state().borrow_mut();
687
688 let root = get_this_crate();
689
690 let out_t_type = quote_type::<OutT>();
691 let ser_fn: syn::Expr = syn::parse_quote! {
692 #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#out_t_type, _>(
693 |b| #root::runtime_support::bincode::serialize(&b).unwrap().into()
694 )
695 };
696
697 flow_state_borrow.push_root(HydroRoot::SendExternal {
698 to_external_key: from.key,
699 to_port_id: next_external_port_id,
700 to_many: false,
701 unpaired: false,
702 serialize_fn: Some(ser_fn.into()),
703 instantiate_fn: DebugInstantiate::Building,
704 input: Box::new(to_sink.ir_node.into_inner()),
705 op_metadata: HydroIrOpMetadata::new(),
706 });
707
708 let in_t_type = quote_type::<InT>();
709
710 let deser_fn: syn::Expr = syn::parse_quote! {
711 |res| {
712 let b = res.unwrap();
713 #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap()
714 }
715 };
716
717 let raw_stream: Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce> = Stream::new(
718 self.clone(),
719 HydroNode::ExternalInput {
720 from_external_key: from.key,
721 from_port_id: next_external_port_id,
722 from_many: false,
723 codec_type: quote_type::<LengthDelimitedCodec>().into(),
724 port_hint: NetworkHint::Auto,
725 instantiate_fn: DebugInstantiate::Building,
726 deserialize_fn: Some(deser_fn.into()),
727 metadata: self.new_node_metadata(Stream::<
728 InT,
729 Self,
730 Unbounded,
731 TotalOrder,
732 ExactlyOnce,
733 >::collection_kind()),
734 },
735 );
736
737 (
738 ExternalBincodeBidi {
739 process_key: from.key,
740 port_id: next_external_port_id,
741 _phantom: PhantomData,
742 },
743 raw_stream,
744 fwd_ref,
745 )
746 }
747
748 #[expect(clippy::type_complexity, reason = "stream markers")]
760 fn bidi_external_many_bytes<L, T, Codec: Encoder<T> + Decoder>(
761 &self,
762 from: &External<L>,
763 port_hint: NetworkHint,
764 ) -> (
765 ExternalBytesPort<Many>,
766 KeyedStream<u64, <Codec as Decoder>::Item, Self, Unbounded, TotalOrder, ExactlyOnce>,
767 KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>,
768 ForwardHandle<'a, KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>,
769 )
770 where
771 Self: Sized + NoTick,
772 {
773 let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
774
775 let (fwd_ref, to_sink) =
776 self.forward_ref::<KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>();
777 let mut flow_state_borrow = self.flow_state().borrow_mut();
778
779 flow_state_borrow.push_root(HydroRoot::SendExternal {
780 to_external_key: from.key,
781 to_port_id: next_external_port_id,
782 to_many: true,
783 unpaired: false,
784 serialize_fn: None,
785 instantiate_fn: DebugInstantiate::Building,
786 input: Box::new(to_sink.entries().ir_node.into_inner()),
787 op_metadata: HydroIrOpMetadata::new(),
788 });
789
790 let raw_stream: Stream<
791 Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
792 Self,
793 Unbounded,
794 TotalOrder,
795 ExactlyOnce,
796 > = Stream::new(
797 self.clone(),
798 HydroNode::ExternalInput {
799 from_external_key: from.key,
800 from_port_id: next_external_port_id,
801 from_many: true,
802 codec_type: quote_type::<Codec>().into(),
803 port_hint,
804 instantiate_fn: DebugInstantiate::Building,
805 deserialize_fn: None,
806 metadata: self.new_node_metadata(Stream::<
807 Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
808 Self,
809 Unbounded,
810 TotalOrder,
811 ExactlyOnce,
812 >::collection_kind()),
813 },
814 );
815
816 let membership_stream_ident = syn::Ident::new(
817 &format!(
818 "__hydro_deploy_many_{}_{}_membership",
819 from.key, next_external_port_id
820 ),
821 Span::call_site(),
822 );
823 let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
824 let raw_membership_stream: KeyedStream<
825 u64,
826 bool,
827 Self,
828 Unbounded,
829 TotalOrder,
830 ExactlyOnce,
831 > = KeyedStream::new(
832 self.clone(),
833 HydroNode::Source {
834 source: HydroSource::Stream(membership_stream_expr.into()),
835 metadata: self.new_node_metadata(KeyedStream::<
836 u64,
837 bool,
838 Self,
839 Unbounded,
840 TotalOrder,
841 ExactlyOnce,
842 >::collection_kind()),
843 },
844 );
845
846 (
847 ExternalBytesPort {
848 process_key: from.key,
849 port_id: next_external_port_id,
850 _phantom: PhantomData,
851 },
852 raw_stream
853 .flatten_ordered() .into_keyed(),
855 raw_membership_stream.map(q!(|join| {
856 if join {
857 MembershipEvent::Joined
858 } else {
859 MembershipEvent::Left
860 }
861 })),
862 fwd_ref,
863 )
864 }
865
866 #[expect(clippy::type_complexity, reason = "stream markers")]
882 fn bidi_external_many_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
883 &self,
884 from: &External<L>,
885 ) -> (
886 ExternalBincodeBidi<InT, OutT, Many>,
887 KeyedStream<u64, InT, Self, Unbounded, TotalOrder, ExactlyOnce>,
888 KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>,
889 ForwardHandle<'a, KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>,
890 )
891 where
892 Self: Sized + NoTick,
893 {
894 let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
895
896 let (fwd_ref, to_sink) =
897 self.forward_ref::<KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>();
898 let mut flow_state_borrow = self.flow_state().borrow_mut();
899
900 let root = get_this_crate();
901
902 let out_t_type = quote_type::<OutT>();
903 let ser_fn: syn::Expr = syn::parse_quote! {
904 #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(u64, #out_t_type), _>(
905 |(id, b)| (id, #root::runtime_support::bincode::serialize(&b).unwrap().into())
906 )
907 };
908
909 flow_state_borrow.push_root(HydroRoot::SendExternal {
910 to_external_key: from.key,
911 to_port_id: next_external_port_id,
912 to_many: true,
913 unpaired: false,
914 serialize_fn: Some(ser_fn.into()),
915 instantiate_fn: DebugInstantiate::Building,
916 input: Box::new(to_sink.entries().ir_node.into_inner()),
917 op_metadata: HydroIrOpMetadata::new(),
918 });
919
920 let in_t_type = quote_type::<InT>();
921
922 let deser_fn: syn::Expr = syn::parse_quote! {
923 |res| {
924 let (id, b) = res.unwrap();
925 (id, #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap())
926 }
927 };
928
929 let raw_stream: KeyedStream<u64, InT, Self, Unbounded, TotalOrder, ExactlyOnce> =
930 KeyedStream::new(
931 self.clone(),
932 HydroNode::ExternalInput {
933 from_external_key: from.key,
934 from_port_id: next_external_port_id,
935 from_many: true,
936 codec_type: quote_type::<LengthDelimitedCodec>().into(),
937 port_hint: NetworkHint::Auto,
938 instantiate_fn: DebugInstantiate::Building,
939 deserialize_fn: Some(deser_fn.into()),
940 metadata: self.new_node_metadata(KeyedStream::<
941 u64,
942 InT,
943 Self,
944 Unbounded,
945 TotalOrder,
946 ExactlyOnce,
947 >::collection_kind()),
948 },
949 );
950
951 let membership_stream_ident = syn::Ident::new(
952 &format!(
953 "__hydro_deploy_many_{}_{}_membership",
954 from.key, next_external_port_id
955 ),
956 Span::call_site(),
957 );
958 let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
959 let raw_membership_stream: KeyedStream<
960 u64,
961 bool,
962 Self,
963 Unbounded,
964 TotalOrder,
965 ExactlyOnce,
966 > = KeyedStream::new(
967 self.clone(),
968 HydroNode::Source {
969 source: HydroSource::Stream(membership_stream_expr.into()),
970 metadata: self.new_node_metadata(KeyedStream::<
971 u64,
972 bool,
973 Self,
974 Unbounded,
975 TotalOrder,
976 ExactlyOnce,
977 >::collection_kind()),
978 },
979 );
980
981 (
982 ExternalBincodeBidi {
983 process_key: from.key,
984 port_id: next_external_port_id,
985 _phantom: PhantomData,
986 },
987 raw_stream,
988 raw_membership_stream.map(q!(|join| {
989 if join {
990 MembershipEvent::Joined
991 } else {
992 MembershipEvent::Left
993 }
994 })),
995 fwd_ref,
996 )
997 }
998
999 fn singleton<T>(&self, e: impl QuotedWithContext<'a, T, Self>) -> Singleton<T, Self, Bounded>
1017 where
1018 T: Clone,
1019 Self: Sized,
1020 {
1021 let e = e.splice_untyped_ctx(self);
1022
1023 Singleton::new(
1024 self.clone(),
1025 HydroNode::SingletonSource {
1026 value: e.into(),
1027 metadata: self.new_node_metadata(Singleton::<T, Self, Bounded>::collection_kind()),
1028 },
1029 )
1030 }
1031
1032 fn source_interval(
1042 &self,
1043 interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1044 _nondet: NonDet,
1045 ) -> Stream<tokio::time::Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
1046 where
1047 Self: Sized + NoTick,
1048 {
1049 self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
1050 tokio::time::interval(interval)
1051 )))
1052 }
1053
1054 fn source_interval_delayed(
1065 &self,
1066 delay: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1067 interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1068 _nondet: NonDet,
1069 ) -> Stream<tokio::time::Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
1070 where
1071 Self: Sized + NoTick,
1072 {
1073 self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
1074 tokio::time::interval_at(tokio::time::Instant::now() + delay, interval)
1075 )))
1076 }
1077
1078 fn forward_ref<S>(&self) -> (ForwardHandle<'a, S>, S)
1112 where
1113 S: CycleCollection<'a, ForwardRef, Location = Self>,
1114 {
1115 let cycle_id = self.flow_state().borrow_mut().next_cycle_id();
1116 (
1117 ForwardHandle::new(cycle_id, Location::id(self)),
1118 S::create_source(cycle_id, self.clone()),
1119 )
1120 }
1121}
1122
1123#[cfg(feature = "deploy")]
1124#[cfg(test)]
1125mod tests {
1126 use std::collections::HashSet;
1127
1128 use futures::{SinkExt, StreamExt};
1129 use hydro_deploy::Deployment;
1130 use stageleft::q;
1131 use tokio_util::codec::LengthDelimitedCodec;
1132
1133 use crate::compile::builder::FlowBuilder;
1134 use crate::live_collections::stream::{ExactlyOnce, TotalOrder};
1135 use crate::location::{Location, NetworkHint};
1136 use crate::nondet::nondet;
1137
1138 #[tokio::test]
1139 async fn top_level_singleton_replay_cardinality() {
1140 let mut deployment = Deployment::new();
1141
1142 let mut flow = FlowBuilder::new();
1143 let node = flow.process::<()>();
1144 let external = flow.external::<()>();
1145
1146 let (in_port, input) =
1147 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
1148 let singleton = node.singleton(q!(123));
1149 let tick = node.tick();
1150 let out = input
1151 .batch(&tick, nondet!())
1152 .cross_singleton(singleton.clone().snapshot(&tick, nondet!()))
1153 .cross_singleton(
1154 singleton
1155 .snapshot(&tick, nondet!())
1156 .into_stream()
1157 .count(),
1158 )
1159 .all_ticks()
1160 .send_bincode_external(&external);
1161
1162 let nodes = flow
1163 .with_process(&node, deployment.Localhost())
1164 .with_external(&external, deployment.Localhost())
1165 .deploy(&mut deployment);
1166
1167 deployment.deploy().await.unwrap();
1168
1169 let mut external_in = nodes.connect(in_port).await;
1170 let mut external_out = nodes.connect(out).await;
1171
1172 deployment.start().await.unwrap();
1173
1174 external_in.send(1).await.unwrap();
1175 assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
1176
1177 external_in.send(2).await.unwrap();
1178 assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
1179 }
1180
1181 #[tokio::test]
1182 async fn tick_singleton_replay_cardinality() {
1183 let mut deployment = Deployment::new();
1184
1185 let mut flow = FlowBuilder::new();
1186 let node = flow.process::<()>();
1187 let external = flow.external::<()>();
1188
1189 let (in_port, input) =
1190 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
1191 let tick = node.tick();
1192 let singleton = tick.singleton(q!(123));
1193 let out = input
1194 .batch(&tick, nondet!())
1195 .cross_singleton(singleton.clone())
1196 .cross_singleton(singleton.into_stream().count())
1197 .all_ticks()
1198 .send_bincode_external(&external);
1199
1200 let nodes = flow
1201 .with_process(&node, deployment.Localhost())
1202 .with_external(&external, deployment.Localhost())
1203 .deploy(&mut deployment);
1204
1205 deployment.deploy().await.unwrap();
1206
1207 let mut external_in = nodes.connect(in_port).await;
1208 let mut external_out = nodes.connect(out).await;
1209
1210 deployment.start().await.unwrap();
1211
1212 external_in.send(1).await.unwrap();
1213 assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
1214
1215 external_in.send(2).await.unwrap();
1216 assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
1217 }
1218
1219 #[tokio::test]
1220 async fn external_bytes() {
1221 let mut deployment = Deployment::new();
1222
1223 let mut flow = FlowBuilder::new();
1224 let first_node = flow.process::<()>();
1225 let external = flow.external::<()>();
1226
1227 let (in_port, input) = first_node.source_external_bytes(&external);
1228 let out = input.send_bincode_external(&external);
1229
1230 let nodes = flow
1231 .with_process(&first_node, deployment.Localhost())
1232 .with_external(&external, deployment.Localhost())
1233 .deploy(&mut deployment);
1234
1235 deployment.deploy().await.unwrap();
1236
1237 let mut external_in = nodes.connect(in_port).await.1;
1238 let mut external_out = nodes.connect(out).await;
1239
1240 deployment.start().await.unwrap();
1241
1242 external_in.send(vec![1, 2, 3].into()).await.unwrap();
1243
1244 assert_eq!(external_out.next().await.unwrap(), vec![1, 2, 3]);
1245 }
1246
1247 #[tokio::test]
1248 async fn multi_external_source() {
1249 let mut deployment = Deployment::new();
1250
1251 let mut flow = FlowBuilder::new();
1252 let first_node = flow.process::<()>();
1253 let external = flow.external::<()>();
1254
1255 let (in_port, input, _membership, complete_sink) =
1256 first_node.bidi_external_many_bincode(&external);
1257 let out = input.entries().send_bincode_external(&external);
1258 complete_sink.complete(
1259 first_node
1260 .source_iter::<(u64, ()), _>(q!([]))
1261 .into_keyed()
1262 .weaken_ordering(),
1263 );
1264
1265 let nodes = flow
1266 .with_process(&first_node, deployment.Localhost())
1267 .with_external(&external, deployment.Localhost())
1268 .deploy(&mut deployment);
1269
1270 deployment.deploy().await.unwrap();
1271
1272 let (_, mut external_in_1) = nodes.connect_bincode(in_port.clone()).await;
1273 let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
1274 let external_out = nodes.connect(out).await;
1275
1276 deployment.start().await.unwrap();
1277
1278 external_in_1.send(123).await.unwrap();
1279 external_in_2.send(456).await.unwrap();
1280
1281 assert_eq!(
1282 external_out.take(2).collect::<HashSet<_>>().await,
1283 vec![(0, 123), (1, 456)].into_iter().collect()
1284 );
1285 }
1286
1287 #[tokio::test]
1288 async fn second_connection_only_multi_source() {
1289 let mut deployment = Deployment::new();
1290
1291 let mut flow = FlowBuilder::new();
1292 let first_node = flow.process::<()>();
1293 let external = flow.external::<()>();
1294
1295 let (in_port, input, _membership, complete_sink) =
1296 first_node.bidi_external_many_bincode(&external);
1297 let out = input.entries().send_bincode_external(&external);
1298 complete_sink.complete(
1299 first_node
1300 .source_iter::<(u64, ()), _>(q!([]))
1301 .into_keyed()
1302 .weaken_ordering(),
1303 );
1304
1305 let nodes = flow
1306 .with_process(&first_node, deployment.Localhost())
1307 .with_external(&external, deployment.Localhost())
1308 .deploy(&mut deployment);
1309
1310 deployment.deploy().await.unwrap();
1311
1312 let (_, mut _external_in_1) = nodes.connect_bincode(in_port.clone()).await;
1314 let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
1315 let mut external_out = nodes.connect(out).await;
1316
1317 deployment.start().await.unwrap();
1318
1319 external_in_2.send(456).await.unwrap();
1320
1321 assert_eq!(external_out.next().await.unwrap(), (1, 456));
1322 }
1323
1324 #[tokio::test]
1325 async fn multi_external_bytes() {
1326 let mut deployment = Deployment::new();
1327
1328 let mut flow = FlowBuilder::new();
1329 let first_node = flow.process::<()>();
1330 let external = flow.external::<()>();
1331
1332 let (in_port, input, _membership, complete_sink) = first_node
1333 .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1334 let out = input.entries().send_bincode_external(&external);
1335 complete_sink.complete(
1336 first_node
1337 .source_iter(q!([]))
1338 .into_keyed()
1339 .weaken_ordering(),
1340 );
1341
1342 let nodes = flow
1343 .with_process(&first_node, deployment.Localhost())
1344 .with_external(&external, deployment.Localhost())
1345 .deploy(&mut deployment);
1346
1347 deployment.deploy().await.unwrap();
1348
1349 let mut external_in_1 = nodes.connect(in_port.clone()).await.1;
1350 let mut external_in_2 = nodes.connect(in_port).await.1;
1351 let external_out = nodes.connect(out).await;
1352
1353 deployment.start().await.unwrap();
1354
1355 external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
1356 external_in_2.send(vec![4, 5].into()).await.unwrap();
1357
1358 assert_eq!(
1359 external_out.take(2).collect::<HashSet<_>>().await,
1360 vec![
1361 (0, (&[1u8, 2, 3] as &[u8]).into()),
1362 (1, (&[4u8, 5] as &[u8]).into())
1363 ]
1364 .into_iter()
1365 .collect()
1366 );
1367 }
1368
1369 #[tokio::test]
1370 async fn single_client_external_bytes() {
1371 let mut deployment = Deployment::new();
1372 let mut flow = FlowBuilder::new();
1373 let first_node = flow.process::<()>();
1374 let external = flow.external::<()>();
1375 let (port, input, complete_sink) = first_node
1376 .bind_single_client::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1377 complete_sink.complete(input.map(q!(|data| {
1378 let mut resp: Vec<u8> = data.into();
1379 resp.push(42);
1380 resp.into() })));
1382
1383 let nodes = flow
1384 .with_process(&first_node, deployment.Localhost())
1385 .with_external(&external, deployment.Localhost())
1386 .deploy(&mut deployment);
1387
1388 deployment.deploy().await.unwrap();
1389 deployment.start().await.unwrap();
1390
1391 let (mut external_out, mut external_in) = nodes.connect(port).await;
1392
1393 external_in.send(vec![1, 2, 3].into()).await.unwrap();
1394 assert_eq!(
1395 external_out.next().await.unwrap().unwrap(),
1396 vec![1, 2, 3, 42]
1397 );
1398 }
1399
1400 #[tokio::test]
1401 async fn echo_external_bytes() {
1402 let mut deployment = Deployment::new();
1403
1404 let mut flow = FlowBuilder::new();
1405 let first_node = flow.process::<()>();
1406 let external = flow.external::<()>();
1407
1408 let (port, input, _membership, complete_sink) = first_node
1409 .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1410 complete_sink
1411 .complete(input.map(q!(|bytes| { bytes.into_iter().map(|x| x + 1).collect() })));
1412
1413 let nodes = flow
1414 .with_process(&first_node, deployment.Localhost())
1415 .with_external(&external, deployment.Localhost())
1416 .deploy(&mut deployment);
1417
1418 deployment.deploy().await.unwrap();
1419
1420 let (mut external_out_1, mut external_in_1) = nodes.connect(port.clone()).await;
1421 let (mut external_out_2, mut external_in_2) = nodes.connect(port).await;
1422
1423 deployment.start().await.unwrap();
1424
1425 external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
1426 external_in_2.send(vec![4, 5].into()).await.unwrap();
1427
1428 assert_eq!(external_out_1.next().await.unwrap().unwrap(), vec![2, 3, 4]);
1429 assert_eq!(external_out_2.next().await.unwrap().unwrap(), vec![5, 6]);
1430 }
1431
1432 #[tokio::test]
1433 async fn echo_external_bincode() {
1434 let mut deployment = Deployment::new();
1435
1436 let mut flow = FlowBuilder::new();
1437 let first_node = flow.process::<()>();
1438 let external = flow.external::<()>();
1439
1440 let (port, input, _membership, complete_sink) =
1441 first_node.bidi_external_many_bincode(&external);
1442 complete_sink.complete(input.map(q!(|text: String| { text.to_uppercase() })));
1443
1444 let nodes = flow
1445 .with_process(&first_node, deployment.Localhost())
1446 .with_external(&external, deployment.Localhost())
1447 .deploy(&mut deployment);
1448
1449 deployment.deploy().await.unwrap();
1450
1451 let (mut external_out_1, mut external_in_1) = nodes.connect_bincode(port.clone()).await;
1452 let (mut external_out_2, mut external_in_2) = nodes.connect_bincode(port).await;
1453
1454 deployment.start().await.unwrap();
1455
1456 external_in_1.send("hi".to_owned()).await.unwrap();
1457 external_in_2.send("hello".to_owned()).await.unwrap();
1458
1459 assert_eq!(external_out_1.next().await.unwrap(), "HI");
1460 assert_eq!(external_out_2.next().await.unwrap(), "HELLO");
1461 }
1462}