1use std::io::Error;
2use std::pin::Pin;
3
4use bytes::{Bytes, BytesMut};
5use dfir_lang::graph::DfirGraph;
6use futures::{Sink, Stream};
7use serde::Serialize;
8use serde::de::DeserializeOwned;
9use stageleft::QuotedWithContext;
10
11use crate::compile::builder::ExternalPortId;
12use crate::location::dynamic::LocationId;
13use crate::location::member_id::TaglessMemberId;
14use crate::location::{LocationKey, MembershipEvent, NetworkHint};
15
16pub trait Deploy<'a> {
17 type Meta: Default;
18 type InstantiateEnv;
19
20 type Process: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv> + Clone;
21 type Cluster: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv> + Clone;
22 type External: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv>
23 + RegisterPort<'a, Self>;
24
25 fn o2o_sink_source(
26 env: &mut Self::InstantiateEnv,
27 p1: &Self::Process,
28 p1_port: &<Self::Process as Node>::Port,
29 p2: &Self::Process,
30 p2_port: &<Self::Process as Node>::Port,
31 name: Option<&str>,
32 ) -> (syn::Expr, syn::Expr);
33 fn o2o_connect(
34 p1: &Self::Process,
35 p1_port: &<Self::Process as Node>::Port,
36 p2: &Self::Process,
37 p2_port: &<Self::Process as Node>::Port,
38 ) -> Box<dyn FnOnce()>;
39
40 fn o2m_sink_source(
41 p1: &Self::Process,
42 p1_port: &<Self::Process as Node>::Port,
43 c2: &Self::Cluster,
44 c2_port: &<Self::Cluster as Node>::Port,
45 ) -> (syn::Expr, syn::Expr);
46 fn o2m_connect(
47 p1: &Self::Process,
48 p1_port: &<Self::Process as Node>::Port,
49 c2: &Self::Cluster,
50 c2_port: &<Self::Cluster as Node>::Port,
51 ) -> Box<dyn FnOnce()>;
52
53 fn m2o_sink_source(
54 c1: &Self::Cluster,
55 c1_port: &<Self::Cluster as Node>::Port,
56 p2: &Self::Process,
57 p2_port: &<Self::Process as Node>::Port,
58 ) -> (syn::Expr, syn::Expr);
59 fn m2o_connect(
60 c1: &Self::Cluster,
61 c1_port: &<Self::Cluster as Node>::Port,
62 p2: &Self::Process,
63 p2_port: &<Self::Process as Node>::Port,
64 ) -> Box<dyn FnOnce()>;
65
66 fn m2m_sink_source(
67 c1: &Self::Cluster,
68 c1_port: &<Self::Cluster as Node>::Port,
69 c2: &Self::Cluster,
70 c2_port: &<Self::Cluster as Node>::Port,
71 ) -> (syn::Expr, syn::Expr);
72 fn m2m_connect(
73 c1: &Self::Cluster,
74 c1_port: &<Self::Cluster as Node>::Port,
75 c2: &Self::Cluster,
76 c2_port: &<Self::Cluster as Node>::Port,
77 ) -> Box<dyn FnOnce()>;
78
79 fn e2o_many_source(
80 extra_stmts: &mut Vec<syn::Stmt>,
81 p2: &Self::Process,
82 p2_port: &<Self::Process as Node>::Port,
83 codec_type: &syn::Type,
84 shared_handle: String,
85 ) -> syn::Expr;
86 fn e2o_many_sink(shared_handle: String) -> syn::Expr;
87
88 fn e2o_source(
89 extra_stmts: &mut Vec<syn::Stmt>,
90 p1: &Self::External,
91 p1_port: &<Self::External as Node>::Port,
92 p2: &Self::Process,
93 p2_port: &<Self::Process as Node>::Port,
94 codec_type: &syn::Type,
95 shared_handle: String,
96 ) -> syn::Expr;
97 fn e2o_connect(
98 p1: &Self::External,
99 p1_port: &<Self::External as Node>::Port,
100 p2: &Self::Process,
101 p2_port: &<Self::Process as Node>::Port,
102 many: bool,
103 server_hint: NetworkHint,
104 ) -> Box<dyn FnOnce()>;
105
106 fn o2e_sink(
107 p1: &Self::Process,
108 p1_port: &<Self::Process as Node>::Port,
109 p2: &Self::External,
110 p2_port: &<Self::External as Node>::Port,
111 shared_handle: String,
112 ) -> syn::Expr;
113
114 fn cluster_ids(
115 of_cluster: LocationKey,
116 ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a;
117
118 fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a;
119
120 fn cluster_membership_stream(
121 location_id: &LocationId,
122 ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>;
123
124 fn register_embedded_input(
129 _env: &mut Self::InstantiateEnv,
130 _location_key: LocationKey,
131 _ident: &syn::Ident,
132 _element_type: &syn::Type,
133 ) {
134 panic!("register_embedded_input is only supported by EmbeddedDeploy");
135 }
136
137 fn register_embedded_output(
142 _env: &mut Self::InstantiateEnv,
143 _location_key: LocationKey,
144 _ident: &syn::Ident,
145 _element_type: &syn::Type,
146 ) {
147 panic!("register_embedded_output is only supported by EmbeddedDeploy");
148 }
149}
150
151pub trait ProcessSpec<'a, D>
152where
153 D: Deploy<'a> + ?Sized,
154{
155 fn build(self, location_key: LocationKey, name_hint: &str) -> D::Process;
156}
157
158pub trait IntoProcessSpec<'a, D>
159where
160 D: Deploy<'a> + ?Sized,
161{
162 type ProcessSpec: ProcessSpec<'a, D>;
163 fn into_process_spec(self) -> Self::ProcessSpec;
164}
165
166impl<'a, D, T> IntoProcessSpec<'a, D> for T
167where
168 D: Deploy<'a> + ?Sized,
169 T: ProcessSpec<'a, D>,
170{
171 type ProcessSpec = T;
172 fn into_process_spec(self) -> Self::ProcessSpec {
173 self
174 }
175}
176
177pub trait ClusterSpec<'a, D>
178where
179 D: Deploy<'a> + ?Sized,
180{
181 fn build(self, location_key: LocationKey, name_hint: &str) -> D::Cluster;
182}
183
184pub trait ExternalSpec<'a, D>
185where
186 D: Deploy<'a> + ?Sized,
187{
188 fn build(self, location_key: LocationKey, name_hint: &str) -> D::External;
189}
190
191pub trait Node {
192 type Port: Clone;
199 type Meta: Default;
200 type InstantiateEnv;
201
202 fn next_port(&self) -> Self::Port;
204
205 fn update_meta(&self, meta: &Self::Meta);
206
207 fn instantiate(
208 &self,
209 env: &mut Self::InstantiateEnv,
210 meta: &mut Self::Meta,
211 graph: DfirGraph,
212 extra_stmts: &[syn::Stmt],
213 sidecars: &[syn::Expr],
214 );
215}
216
217pub type DynSourceSink<Out, In, InErr> = (
218 Pin<Box<dyn Stream<Item = Out>>>,
219 Pin<Box<dyn Sink<In, Error = InErr>>>,
220);
221
222pub trait RegisterPort<'a, D>: Node + Clone
223where
224 D: Deploy<'a> + ?Sized,
225{
226 fn register(&self, external_port_id: ExternalPortId, port: Self::Port);
227
228 fn as_bytes_bidi(
229 &self,
230 external_port_id: ExternalPortId,
231 ) -> impl Future<Output = DynSourceSink<Result<BytesMut, Error>, Bytes, Error>> + 'a;
232
233 fn as_bincode_bidi<InT, OutT>(
234 &self,
235 external_port_id: ExternalPortId,
236 ) -> impl Future<Output = DynSourceSink<OutT, InT, Error>> + 'a
237 where
238 InT: Serialize + 'static,
239 OutT: DeserializeOwned + 'static;
240
241 fn as_bincode_sink<T>(
242 &self,
243 external_port_id: ExternalPortId,
244 ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a
245 where
246 T: Serialize + 'static;
247
248 fn as_bincode_source<T>(
249 &self,
250 external_port_id: ExternalPortId,
251 ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a
252 where
253 T: DeserializeOwned + 'static;
254}