1use std::future::Future;
26use std::io::Error;
27use std::pin::Pin;
28
29use bytes::{Bytes, BytesMut};
30use dfir_lang::diagnostic::Diagnostics;
31use dfir_lang::graph::DfirGraph;
32use futures::{Sink, Stream};
33use proc_macro2::Span;
34use quote::quote;
35use serde::Serialize;
36use serde::de::DeserializeOwned;
37use slotmap::SparseSecondaryMap;
38use stageleft::{QuotedWithContext, q};
39
40use super::deploy_provider::{ClusterSpec, Deploy, ExternalSpec, Node, ProcessSpec, RegisterPort};
41use crate::compile::builder::ExternalPortId;
42use crate::location::dynamic::LocationId;
43use crate::location::member_id::TaglessMemberId;
44use crate::location::{LocationKey, MembershipEvent, NetworkHint};
45
46pub enum EmbeddedDeploy {}
50
51#[derive(Clone)]
53pub struct EmbeddedNode {
54 pub fn_name: String,
56 pub location_key: LocationKey,
58}
59
60impl Node for EmbeddedNode {
61 type Port = ();
62 type Meta = ();
63 type InstantiateEnv = EmbeddedInstantiateEnv;
64
65 fn next_port(&self) -> Self::Port {}
66
67 fn update_meta(&self, _meta: &Self::Meta) {}
68
69 fn instantiate(
70 &self,
71 _env: &mut Self::InstantiateEnv,
72 _meta: &mut Self::Meta,
73 _graph: DfirGraph,
74 _extra_stmts: &[syn::Stmt],
75 _sidecars: &[syn::Expr],
76 ) {
77 }
79}
80
81impl<'a> RegisterPort<'a, EmbeddedDeploy> for EmbeddedNode {
82 fn register(&self, _external_port_id: ExternalPortId, _port: Self::Port) {
83 panic!("EmbeddedDeploy does not support external ports");
84 }
85
86 #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
87 fn as_bytes_bidi(
88 &self,
89 _external_port_id: ExternalPortId,
90 ) -> impl Future<
91 Output = super::deploy_provider::DynSourceSink<Result<BytesMut, Error>, Bytes, Error>,
92 > + 'a {
93 async { panic!("EmbeddedDeploy does not support external ports") }
94 }
95
96 #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
97 fn as_bincode_bidi<InT, OutT>(
98 &self,
99 _external_port_id: ExternalPortId,
100 ) -> impl Future<Output = super::deploy_provider::DynSourceSink<OutT, InT, Error>> + 'a
101 where
102 InT: Serialize + 'static,
103 OutT: DeserializeOwned + 'static,
104 {
105 async { panic!("EmbeddedDeploy does not support external ports") }
106 }
107
108 #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
109 fn as_bincode_sink<T>(
110 &self,
111 _external_port_id: ExternalPortId,
112 ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a
113 where
114 T: Serialize + 'static,
115 {
116 async { panic!("EmbeddedDeploy does not support external ports") }
117 }
118
119 #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
120 fn as_bincode_source<T>(
121 &self,
122 _external_port_id: ExternalPortId,
123 ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a
124 where
125 T: DeserializeOwned + 'static,
126 {
127 async { panic!("EmbeddedDeploy does not support external ports") }
128 }
129}
130
131impl<S: Into<String>> ProcessSpec<'_, EmbeddedDeploy> for S {
132 fn build(self, location_key: LocationKey, _name_hint: &str) -> EmbeddedNode {
133 EmbeddedNode {
134 fn_name: self.into(),
135 location_key,
136 }
137 }
138}
139
140impl<S: Into<String>> ClusterSpec<'_, EmbeddedDeploy> for S {
141 fn build(self, location_key: LocationKey, _name_hint: &str) -> EmbeddedNode {
142 EmbeddedNode {
143 fn_name: self.into(),
144 location_key,
145 }
146 }
147}
148
149impl<S: Into<String>> ExternalSpec<'_, EmbeddedDeploy> for S {
150 fn build(self, location_key: LocationKey, _name_hint: &str) -> EmbeddedNode {
151 EmbeddedNode {
152 fn_name: self.into(),
153 location_key,
154 }
155 }
156}
157
158#[derive(Default)]
165pub struct EmbeddedInstantiateEnv {
166 pub inputs: SparseSecondaryMap<LocationKey, Vec<(syn::Ident, syn::Type)>>,
168 pub outputs: SparseSecondaryMap<LocationKey, Vec<(syn::Ident, syn::Type)>>,
170 pub network_outputs: SparseSecondaryMap<LocationKey, Vec<String>>,
172 pub network_inputs: SparseSecondaryMap<LocationKey, Vec<String>>,
174}
175
176impl<'a> Deploy<'a> for EmbeddedDeploy {
177 type Meta = ();
178 type InstantiateEnv = EmbeddedInstantiateEnv;
179
180 type Process = EmbeddedNode;
181 type Cluster = EmbeddedNode;
182 type External = EmbeddedNode;
183
184 fn o2o_sink_source(
185 env: &mut Self::InstantiateEnv,
186 p1: &Self::Process,
187 _p1_port: &(),
188 p2: &Self::Process,
189 _p2_port: &(),
190 name: Option<&str>,
191 ) -> (syn::Expr, syn::Expr) {
192 let name = name.expect(
193 "EmbeddedDeploy o2o networking requires a channel name. Use `TCP.name(\"my_channel\")` to provide one.",
194 );
195
196 let sink_ident = syn::Ident::new(&format!("__network_out_{name}"), Span::call_site());
197 let source_ident = syn::Ident::new(&format!("__network_in_{name}"), Span::call_site());
198
199 env.network_outputs
200 .entry(p1.location_key)
201 .unwrap()
202 .or_default()
203 .push(name.to_owned());
204 env.network_inputs
205 .entry(p2.location_key)
206 .unwrap()
207 .or_default()
208 .push(name.to_owned());
209
210 (
211 syn::parse_quote!(__root_dfir_rs::sinktools::for_each(#sink_ident)),
212 syn::parse_quote!(#source_ident),
213 )
214 }
215
216 fn o2o_connect(
217 _p1: &Self::Process,
218 _p1_port: &(),
219 _p2: &Self::Process,
220 _p2_port: &(),
221 ) -> Box<dyn FnOnce()> {
222 Box::new(|| {})
223 }
224
225 fn o2m_sink_source(
226 _p1: &Self::Process,
227 _p1_port: &(),
228 _c2: &Self::Cluster,
229 _c2_port: &(),
230 ) -> (syn::Expr, syn::Expr) {
231 panic!("EmbeddedDeploy does not support networking (o2m)")
232 }
233
234 fn o2m_connect(
235 _p1: &Self::Process,
236 _p1_port: &(),
237 _c2: &Self::Cluster,
238 _c2_port: &(),
239 ) -> Box<dyn FnOnce()> {
240 panic!("EmbeddedDeploy does not support networking (o2m)")
241 }
242
243 fn m2o_sink_source(
244 _c1: &Self::Cluster,
245 _c1_port: &(),
246 _p2: &Self::Process,
247 _p2_port: &(),
248 ) -> (syn::Expr, syn::Expr) {
249 panic!("EmbeddedDeploy does not support networking (m2o)")
250 }
251
252 fn m2o_connect(
253 _c1: &Self::Cluster,
254 _c1_port: &(),
255 _p2: &Self::Process,
256 _p2_port: &(),
257 ) -> Box<dyn FnOnce()> {
258 panic!("EmbeddedDeploy does not support networking (m2o)")
259 }
260
261 fn m2m_sink_source(
262 _c1: &Self::Cluster,
263 _c1_port: &(),
264 _c2: &Self::Cluster,
265 _c2_port: &(),
266 ) -> (syn::Expr, syn::Expr) {
267 panic!("EmbeddedDeploy does not support networking (m2m)")
268 }
269
270 fn m2m_connect(
271 _c1: &Self::Cluster,
272 _c1_port: &(),
273 _c2: &Self::Cluster,
274 _c2_port: &(),
275 ) -> Box<dyn FnOnce()> {
276 panic!("EmbeddedDeploy does not support networking (m2m)")
277 }
278
279 fn e2o_many_source(
280 _extra_stmts: &mut Vec<syn::Stmt>,
281 _p2: &Self::Process,
282 _p2_port: &(),
283 _codec_type: &syn::Type,
284 _shared_handle: String,
285 ) -> syn::Expr {
286 panic!("EmbeddedDeploy does not support networking (e2o)")
287 }
288
289 fn e2o_many_sink(_shared_handle: String) -> syn::Expr {
290 panic!("EmbeddedDeploy does not support networking (e2o)")
291 }
292
293 fn e2o_source(
294 _extra_stmts: &mut Vec<syn::Stmt>,
295 _p1: &Self::External,
296 _p1_port: &(),
297 _p2: &Self::Process,
298 _p2_port: &(),
299 _codec_type: &syn::Type,
300 _shared_handle: String,
301 ) -> syn::Expr {
302 panic!("EmbeddedDeploy does not support networking (e2o)")
303 }
304
305 fn e2o_connect(
306 _p1: &Self::External,
307 _p1_port: &(),
308 _p2: &Self::Process,
309 _p2_port: &(),
310 _many: bool,
311 _server_hint: NetworkHint,
312 ) -> Box<dyn FnOnce()> {
313 panic!("EmbeddedDeploy does not support networking (e2o)")
314 }
315
316 fn o2e_sink(
317 _p1: &Self::Process,
318 _p1_port: &(),
319 _p2: &Self::External,
320 _p2_port: &(),
321 _shared_handle: String,
322 ) -> syn::Expr {
323 panic!("EmbeddedDeploy does not support networking (o2e)")
324 }
325
326 #[expect(
327 unreachable_code,
328 reason = "panic before q! which is only for return type"
329 )]
330 fn cluster_ids(
331 _of_cluster: LocationKey,
332 ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a {
333 panic!("EmbeddedDeploy does not support cluster IDs");
334 q!(unreachable!("EmbeddedDeploy does not support cluster IDs"))
335 }
336
337 #[expect(
338 unreachable_code,
339 reason = "panic before q! which is only for return type"
340 )]
341 fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a {
342 panic!("EmbeddedDeploy does not support cluster self ID");
343 q!(unreachable!(
344 "EmbeddedDeploy does not support cluster self ID"
345 ))
346 }
347
348 #[expect(
349 unreachable_code,
350 reason = "panic before q! which is only for return type"
351 )]
352 fn cluster_membership_stream(
353 _location_id: &LocationId,
354 ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>
355 {
356 panic!("EmbeddedDeploy does not support cluster membership streams");
357 q!(unreachable!(
358 "EmbeddedDeploy does not support cluster membership streams"
359 ))
360 }
361
362 fn register_embedded_input(
363 env: &mut Self::InstantiateEnv,
364 location_key: LocationKey,
365 ident: &syn::Ident,
366 element_type: &syn::Type,
367 ) {
368 env.inputs
369 .entry(location_key)
370 .unwrap()
371 .or_default()
372 .push((ident.clone(), element_type.clone()));
373 }
374
375 fn register_embedded_output(
376 env: &mut Self::InstantiateEnv,
377 location_key: LocationKey,
378 ident: &syn::Ident,
379 element_type: &syn::Type,
380 ) {
381 env.outputs
382 .entry(location_key)
383 .unwrap()
384 .or_default()
385 .push((ident.clone(), element_type.clone()));
386 }
387}
388
389impl super::deploy::DeployFlow<'_, EmbeddedDeploy> {
390 pub fn generate_embedded(mut self, crate_name: &str) -> syn::File {
421 let mut env = EmbeddedInstantiateEnv::default();
422 let compiled = self.compile_internal(&mut env);
423
424 let root = crate::staging_util::get_this_crate();
425 let orig_crate_name = quote::format_ident!("{}", crate_name.replace('-', "_"));
426
427 let mut items: Vec<syn::Item> = Vec::new();
428
429 let mut location_keys: Vec<_> = compiled.all_dfir().keys().collect();
431 location_keys.sort();
432
433 for location_key in location_keys {
434 let graph = &compiled.all_dfir()[location_key];
435
436 let fn_name = self
438 .processes
439 .get(location_key)
440 .map(|n| &n.fn_name)
441 .or_else(|| self.clusters.get(location_key).map(|n| &n.fn_name))
442 .or_else(|| self.externals.get(location_key).map(|n| &n.fn_name))
443 .expect("location key not found in any node map");
444
445 let fn_ident = syn::Ident::new(fn_name, Span::call_site());
446
447 let mut loc_inputs = env.inputs.get(location_key).cloned().unwrap_or_default();
449 loc_inputs.sort_by(|a, b| a.0.to_string().cmp(&b.0.to_string()));
450
451 let mut loc_outputs = env.outputs.get(location_key).cloned().unwrap_or_default();
453 loc_outputs.sort_by(|a, b| a.0.to_string().cmp(&b.0.to_string()));
454
455 let mut loc_net_outputs = env
457 .network_outputs
458 .get(location_key)
459 .cloned()
460 .unwrap_or_default();
461 loc_net_outputs.sort();
462 loc_net_outputs.dedup();
463
464 let mut loc_net_inputs = env
466 .network_inputs
467 .get(location_key)
468 .cloned()
469 .unwrap_or_default();
470 loc_net_inputs.sort();
471 loc_net_inputs.dedup();
472
473 let mut diagnostics = Diagnostics::new();
474 let dfir_tokens = graph
475 .as_code("e! { __root_dfir_rs }, true, quote!(), &mut diagnostics)
476 .expect("DFIR code generation failed with diagnostics.");
477
478 let input_params: Vec<proc_macro2::TokenStream> = loc_inputs
480 .iter()
481 .map(|(ident, element_type)| {
482 quote! { #ident: impl __root_dfir_rs::futures::Stream<Item = #element_type> + Unpin + 'a }
483 })
484 .collect();
485
486 let has_outputs = !loc_outputs.is_empty();
487 let has_net_out = !loc_net_outputs.is_empty();
488 let has_net_in = !loc_net_inputs.is_empty();
489
490 let mut mod_items: Vec<proc_macro2::TokenStream> = Vec::new();
492 let mut extra_fn_generics: Vec<proc_macro2::TokenStream> = Vec::new();
493 let mut extra_fn_params: Vec<proc_macro2::TokenStream> = Vec::new();
494 let mut extra_destructure: Vec<proc_macro2::TokenStream> = Vec::new();
495
496 if has_outputs {
498 let output_struct_ident = syn::Ident::new("EmbeddedOutputs", Span::call_site());
499
500 let output_generic_idents: Vec<syn::Ident> = loc_outputs
501 .iter()
502 .enumerate()
503 .map(|(i, _)| quote::format_ident!("__Out{}", i))
504 .collect();
505
506 let struct_fields: Vec<proc_macro2::TokenStream> = loc_outputs
507 .iter()
508 .zip(output_generic_idents.iter())
509 .map(|((ident, _), generic)| {
510 quote! { pub #ident: #generic }
511 })
512 .collect();
513
514 let struct_generics: Vec<proc_macro2::TokenStream> = loc_outputs
515 .iter()
516 .zip(output_generic_idents.iter())
517 .map(|((_, element_type), generic)| {
518 quote! { #generic: FnMut(#element_type) }
519 })
520 .collect();
521
522 for ((_, element_type), generic) in
523 loc_outputs.iter().zip(output_generic_idents.iter())
524 {
525 extra_fn_generics.push(quote! { #generic: FnMut(#element_type) + 'a });
526 }
527
528 extra_fn_params.push(quote! {
529 __outputs: &'a mut #fn_ident::#output_struct_ident<#(#output_generic_idents),*>
530 });
531
532 for (ident, _) in &loc_outputs {
533 extra_destructure.push(quote! { let mut #ident = &mut __outputs.#ident; });
534 }
535
536 mod_items.push(quote! {
537 pub struct #output_struct_ident<#(#struct_generics),*> {
538 #(#struct_fields),*
539 }
540 });
541 }
542
543 if has_net_out {
545 let net_out_struct_ident = syn::Ident::new("EmbeddedNetworkOut", Span::call_site());
546
547 let net_out_generic_idents: Vec<syn::Ident> = loc_net_outputs
548 .iter()
549 .enumerate()
550 .map(|(i, _)| quote::format_ident!("__NetOut{}", i))
551 .collect();
552
553 let struct_fields: Vec<proc_macro2::TokenStream> = loc_net_outputs
554 .iter()
555 .zip(net_out_generic_idents.iter())
556 .map(|(name, generic)| {
557 let field_ident = syn::Ident::new(name, Span::call_site());
558 quote! { pub #field_ident: #generic }
559 })
560 .collect();
561
562 let struct_generics: Vec<proc_macro2::TokenStream> = net_out_generic_idents
563 .iter()
564 .map(|generic| {
565 quote! { #generic: FnMut(#root::runtime_support::dfir_rs::bytes::Bytes) }
566 })
567 .collect();
568
569 for generic in &net_out_generic_idents {
570 extra_fn_generics.push(
571 quote! { #generic: FnMut(#root::runtime_support::dfir_rs::bytes::Bytes) + 'a },
572 );
573 }
574
575 extra_fn_params.push(quote! {
576 __network_out: &'a mut #fn_ident::#net_out_struct_ident<#(#net_out_generic_idents),*>
577 });
578
579 for name in &loc_net_outputs {
580 let field_ident = syn::Ident::new(name, Span::call_site());
581 let var_ident =
582 syn::Ident::new(&format!("__network_out_{name}"), Span::call_site());
583 extra_destructure
584 .push(quote! { let mut #var_ident = &mut __network_out.#field_ident; });
585 }
586
587 mod_items.push(quote! {
588 pub struct #net_out_struct_ident<#(#struct_generics),*> {
589 #(#struct_fields),*
590 }
591 });
592 }
593
594 if has_net_in {
596 let net_in_struct_ident = syn::Ident::new("EmbeddedNetworkIn", Span::call_site());
597
598 let net_in_generic_idents: Vec<syn::Ident> = loc_net_inputs
599 .iter()
600 .enumerate()
601 .map(|(i, _)| quote::format_ident!("__NetIn{}", i))
602 .collect();
603
604 let struct_fields: Vec<proc_macro2::TokenStream> = loc_net_inputs
605 .iter()
606 .zip(net_in_generic_idents.iter())
607 .map(|(name, generic)| {
608 let field_ident = syn::Ident::new(name, Span::call_site());
609 quote! { pub #field_ident: #generic }
610 })
611 .collect();
612
613 let struct_generics: Vec<proc_macro2::TokenStream> = net_in_generic_idents
614 .iter()
615 .map(|generic| {
616 quote! { #generic: __root_dfir_rs::futures::Stream<Item = Result<__root_dfir_rs::bytes::BytesMut, std::io::Error>> + Unpin }
617 })
618 .collect();
619
620 for generic in &net_in_generic_idents {
621 extra_fn_generics.push(
622 quote! { #generic: __root_dfir_rs::futures::Stream<Item = Result<__root_dfir_rs::bytes::BytesMut, std::io::Error>> + Unpin + 'a },
623 );
624 }
625
626 extra_fn_params.push(quote! {
627 __network_in: #fn_ident::#net_in_struct_ident<#(#net_in_generic_idents),*>
628 });
629
630 for name in &loc_net_inputs {
631 let field_ident = syn::Ident::new(name, Span::call_site());
632 let var_ident =
633 syn::Ident::new(&format!("__network_in_{name}"), Span::call_site());
634 extra_destructure.push(quote! { let #var_ident = __network_in.#field_ident; });
635 }
636
637 mod_items.push(quote! {
638 pub struct #net_in_struct_ident<#(#struct_generics),*> {
639 #(#struct_fields),*
640 }
641 });
642 }
643
644 if !mod_items.is_empty() {
646 let output_mod: syn::Item = syn::parse_quote! {
647 pub mod #fn_ident {
648 use super::*;
649 #(#mod_items)*
650 }
651 };
652 items.push(output_mod);
653 }
654
655 let all_params: Vec<proc_macro2::TokenStream> =
657 input_params.into_iter().chain(extra_fn_params).collect();
658
659 let has_generics = !extra_fn_generics.is_empty();
660
661 if has_generics {
662 let func: syn::Item = syn::parse_quote! {
663 #[allow(unused, non_snake_case, clippy::suspicious_else_formatting)]
664 pub fn #fn_ident<'a, #(#extra_fn_generics),*>(#(#all_params),*) -> #root::runtime_support::dfir_rs::scheduled::graph::Dfir<'a> {
665 #(#extra_destructure)*
666 #dfir_tokens
667 }
668 };
669 items.push(func);
670 } else {
671 let func: syn::Item = syn::parse_quote! {
672 #[allow(unused, non_snake_case, clippy::suspicious_else_formatting)]
673 pub fn #fn_ident<'a>(#(#all_params),*) -> #root::runtime_support::dfir_rs::scheduled::graph::Dfir<'a> {
674 #dfir_tokens
675 }
676 };
677 items.push(func);
678 }
679 }
680
681 syn::parse_quote! {
682 use #orig_crate_name::__staged::__deps::*;
683 use #root::prelude::*;
684 use #root::runtime_support::dfir_rs as __root_dfir_rs;
685 pub use #orig_crate_name::__staged;
686
687 #( #items )*
688 }
689 }
690}