Skip to main content

hydro_lang/compile/
embedded.rs

1//! "Embedded" deployment backend for Hydro.
2//!
3//! Instead of compiling each location into a standalone binary, this backend generates
4//! a Rust source file containing one function per location. Each function returns a
5//! `dfir_rs::scheduled::graph::Dfir` that can be manually driven by the caller.
6//!
7//! This is useful when you want full control over where and how the projected DFIR
8//! code runs (e.g. embedding it into an existing application).
9//!
10//! # Networking
11//!
12//! Process-to-process (o2o) networking is supported. When a location has network
13//! sends or receives, the generated function takes additional `network_out` and
14//! `network_in` parameters whose types are generated structs with one field per
15//! network port (named after the channel). Network channels must be named via
16//! `.name()` on the networking config.
17//!
18//! - Sinks (`EmbeddedNetworkOut`): one `FnMut(Bytes)` field per outgoing channel.
19//! - Sources (`EmbeddedNetworkIn`): one `Stream<Item = Result<BytesMut, io::Error>>`
20//!   field per incoming channel.
21//!
22//! The caller is responsible for wiring these together (e.g. via in-memory channels,
23//! sockets, etc.). Cluster networking and external ports are not supported.
24
25use std::future::Future;
26use std::io::Error;
27use std::pin::Pin;
28
29use bytes::{Bytes, BytesMut};
30use dfir_lang::diagnostic::Diagnostics;
31use dfir_lang::graph::DfirGraph;
32use futures::{Sink, Stream};
33use proc_macro2::Span;
34use quote::quote;
35use serde::Serialize;
36use serde::de::DeserializeOwned;
37use slotmap::SparseSecondaryMap;
38use stageleft::{QuotedWithContext, q};
39
40use super::deploy_provider::{ClusterSpec, Deploy, ExternalSpec, Node, ProcessSpec, RegisterPort};
41use crate::compile::builder::ExternalPortId;
42use crate::location::dynamic::LocationId;
43use crate::location::member_id::TaglessMemberId;
44use crate::location::{LocationKey, MembershipEvent, NetworkHint};
45
46/// Marker type for the embedded deployment backend.
47///
48/// All networking methods panic — this backend only supports pure local computation.
49pub enum EmbeddedDeploy {}
50
51/// A trivial node type for embedded deployment. Stores a user-provided function name.
52#[derive(Clone)]
53pub struct EmbeddedNode {
54    /// The function name to use in the generated code for this location.
55    pub fn_name: String,
56    /// The location key for this node, used to register network ports.
57    pub location_key: LocationKey,
58}
59
60impl Node for EmbeddedNode {
61    type Port = ();
62    type Meta = ();
63    type InstantiateEnv = EmbeddedInstantiateEnv;
64
65    fn next_port(&self) -> Self::Port {}
66
67    fn update_meta(&self, _meta: &Self::Meta) {}
68
69    fn instantiate(
70        &self,
71        _env: &mut Self::InstantiateEnv,
72        _meta: &mut Self::Meta,
73        _graph: DfirGraph,
74        _extra_stmts: &[syn::Stmt],
75        _sidecars: &[syn::Expr],
76    ) {
77        // No-op: embedded mode doesn't instantiate nodes at deploy time.
78    }
79}
80
81impl<'a> RegisterPort<'a, EmbeddedDeploy> for EmbeddedNode {
82    fn register(&self, _external_port_id: ExternalPortId, _port: Self::Port) {
83        panic!("EmbeddedDeploy does not support external ports");
84    }
85
86    #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
87    fn as_bytes_bidi(
88        &self,
89        _external_port_id: ExternalPortId,
90    ) -> impl Future<
91        Output = super::deploy_provider::DynSourceSink<Result<BytesMut, Error>, Bytes, Error>,
92    > + 'a {
93        async { panic!("EmbeddedDeploy does not support external ports") }
94    }
95
96    #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
97    fn as_bincode_bidi<InT, OutT>(
98        &self,
99        _external_port_id: ExternalPortId,
100    ) -> impl Future<Output = super::deploy_provider::DynSourceSink<OutT, InT, Error>> + 'a
101    where
102        InT: Serialize + 'static,
103        OutT: DeserializeOwned + 'static,
104    {
105        async { panic!("EmbeddedDeploy does not support external ports") }
106    }
107
108    #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
109    fn as_bincode_sink<T>(
110        &self,
111        _external_port_id: ExternalPortId,
112    ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a
113    where
114        T: Serialize + 'static,
115    {
116        async { panic!("EmbeddedDeploy does not support external ports") }
117    }
118
119    #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
120    fn as_bincode_source<T>(
121        &self,
122        _external_port_id: ExternalPortId,
123    ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a
124    where
125        T: DeserializeOwned + 'static,
126    {
127        async { panic!("EmbeddedDeploy does not support external ports") }
128    }
129}
130
131impl<S: Into<String>> ProcessSpec<'_, EmbeddedDeploy> for S {
132    fn build(self, location_key: LocationKey, _name_hint: &str) -> EmbeddedNode {
133        EmbeddedNode {
134            fn_name: self.into(),
135            location_key,
136        }
137    }
138}
139
140impl<S: Into<String>> ClusterSpec<'_, EmbeddedDeploy> for S {
141    fn build(self, location_key: LocationKey, _name_hint: &str) -> EmbeddedNode {
142        EmbeddedNode {
143            fn_name: self.into(),
144            location_key,
145        }
146    }
147}
148
149impl<S: Into<String>> ExternalSpec<'_, EmbeddedDeploy> for S {
150    fn build(self, location_key: LocationKey, _name_hint: &str) -> EmbeddedNode {
151        EmbeddedNode {
152            fn_name: self.into(),
153            location_key,
154        }
155    }
156}
157
158/// Collected embedded input/output registrations, keyed by location.
159///
160/// During `compile_network`, each `HydroSource::Embedded` and `HydroRoot::EmbeddedOutput`
161/// IR node registers its ident, element type, and location key here.
162/// `generate_embedded` then uses this to add the appropriate parameters
163/// to each generated function.
164#[derive(Default)]
165pub struct EmbeddedInstantiateEnv {
166    /// (ident name, element type) pairs per location key, for inputs.
167    pub inputs: SparseSecondaryMap<LocationKey, Vec<(syn::Ident, syn::Type)>>,
168    /// (ident name, element type) pairs per location key, for outputs.
169    pub outputs: SparseSecondaryMap<LocationKey, Vec<(syn::Ident, syn::Type)>>,
170    /// Network output port names per location key (sender side of o2o channels).
171    pub network_outputs: SparseSecondaryMap<LocationKey, Vec<String>>,
172    /// Network input port names per location key (receiver side of o2o channels).
173    pub network_inputs: SparseSecondaryMap<LocationKey, Vec<String>>,
174}
175
176impl<'a> Deploy<'a> for EmbeddedDeploy {
177    type Meta = ();
178    type InstantiateEnv = EmbeddedInstantiateEnv;
179
180    type Process = EmbeddedNode;
181    type Cluster = EmbeddedNode;
182    type External = EmbeddedNode;
183
184    fn o2o_sink_source(
185        env: &mut Self::InstantiateEnv,
186        p1: &Self::Process,
187        _p1_port: &(),
188        p2: &Self::Process,
189        _p2_port: &(),
190        name: Option<&str>,
191    ) -> (syn::Expr, syn::Expr) {
192        let name = name.expect(
193            "EmbeddedDeploy o2o networking requires a channel name. Use `TCP.name(\"my_channel\")` to provide one.",
194        );
195
196        let sink_ident = syn::Ident::new(&format!("__network_out_{name}"), Span::call_site());
197        let source_ident = syn::Ident::new(&format!("__network_in_{name}"), Span::call_site());
198
199        env.network_outputs
200            .entry(p1.location_key)
201            .unwrap()
202            .or_default()
203            .push(name.to_owned());
204        env.network_inputs
205            .entry(p2.location_key)
206            .unwrap()
207            .or_default()
208            .push(name.to_owned());
209
210        (
211            syn::parse_quote!(__root_dfir_rs::sinktools::for_each(#sink_ident)),
212            syn::parse_quote!(#source_ident),
213        )
214    }
215
216    fn o2o_connect(
217        _p1: &Self::Process,
218        _p1_port: &(),
219        _p2: &Self::Process,
220        _p2_port: &(),
221    ) -> Box<dyn FnOnce()> {
222        Box::new(|| {})
223    }
224
225    fn o2m_sink_source(
226        _p1: &Self::Process,
227        _p1_port: &(),
228        _c2: &Self::Cluster,
229        _c2_port: &(),
230    ) -> (syn::Expr, syn::Expr) {
231        panic!("EmbeddedDeploy does not support networking (o2m)")
232    }
233
234    fn o2m_connect(
235        _p1: &Self::Process,
236        _p1_port: &(),
237        _c2: &Self::Cluster,
238        _c2_port: &(),
239    ) -> Box<dyn FnOnce()> {
240        panic!("EmbeddedDeploy does not support networking (o2m)")
241    }
242
243    fn m2o_sink_source(
244        _c1: &Self::Cluster,
245        _c1_port: &(),
246        _p2: &Self::Process,
247        _p2_port: &(),
248    ) -> (syn::Expr, syn::Expr) {
249        panic!("EmbeddedDeploy does not support networking (m2o)")
250    }
251
252    fn m2o_connect(
253        _c1: &Self::Cluster,
254        _c1_port: &(),
255        _p2: &Self::Process,
256        _p2_port: &(),
257    ) -> Box<dyn FnOnce()> {
258        panic!("EmbeddedDeploy does not support networking (m2o)")
259    }
260
261    fn m2m_sink_source(
262        _c1: &Self::Cluster,
263        _c1_port: &(),
264        _c2: &Self::Cluster,
265        _c2_port: &(),
266    ) -> (syn::Expr, syn::Expr) {
267        panic!("EmbeddedDeploy does not support networking (m2m)")
268    }
269
270    fn m2m_connect(
271        _c1: &Self::Cluster,
272        _c1_port: &(),
273        _c2: &Self::Cluster,
274        _c2_port: &(),
275    ) -> Box<dyn FnOnce()> {
276        panic!("EmbeddedDeploy does not support networking (m2m)")
277    }
278
279    fn e2o_many_source(
280        _extra_stmts: &mut Vec<syn::Stmt>,
281        _p2: &Self::Process,
282        _p2_port: &(),
283        _codec_type: &syn::Type,
284        _shared_handle: String,
285    ) -> syn::Expr {
286        panic!("EmbeddedDeploy does not support networking (e2o)")
287    }
288
289    fn e2o_many_sink(_shared_handle: String) -> syn::Expr {
290        panic!("EmbeddedDeploy does not support networking (e2o)")
291    }
292
293    fn e2o_source(
294        _extra_stmts: &mut Vec<syn::Stmt>,
295        _p1: &Self::External,
296        _p1_port: &(),
297        _p2: &Self::Process,
298        _p2_port: &(),
299        _codec_type: &syn::Type,
300        _shared_handle: String,
301    ) -> syn::Expr {
302        panic!("EmbeddedDeploy does not support networking (e2o)")
303    }
304
305    fn e2o_connect(
306        _p1: &Self::External,
307        _p1_port: &(),
308        _p2: &Self::Process,
309        _p2_port: &(),
310        _many: bool,
311        _server_hint: NetworkHint,
312    ) -> Box<dyn FnOnce()> {
313        panic!("EmbeddedDeploy does not support networking (e2o)")
314    }
315
316    fn o2e_sink(
317        _p1: &Self::Process,
318        _p1_port: &(),
319        _p2: &Self::External,
320        _p2_port: &(),
321        _shared_handle: String,
322    ) -> syn::Expr {
323        panic!("EmbeddedDeploy does not support networking (o2e)")
324    }
325
326    #[expect(
327        unreachable_code,
328        reason = "panic before q! which is only for return type"
329    )]
330    fn cluster_ids(
331        _of_cluster: LocationKey,
332    ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a {
333        panic!("EmbeddedDeploy does not support cluster IDs");
334        q!(unreachable!("EmbeddedDeploy does not support cluster IDs"))
335    }
336
337    #[expect(
338        unreachable_code,
339        reason = "panic before q! which is only for return type"
340    )]
341    fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a {
342        panic!("EmbeddedDeploy does not support cluster self ID");
343        q!(unreachable!(
344            "EmbeddedDeploy does not support cluster self ID"
345        ))
346    }
347
348    #[expect(
349        unreachable_code,
350        reason = "panic before q! which is only for return type"
351    )]
352    fn cluster_membership_stream(
353        _location_id: &LocationId,
354    ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>
355    {
356        panic!("EmbeddedDeploy does not support cluster membership streams");
357        q!(unreachable!(
358            "EmbeddedDeploy does not support cluster membership streams"
359        ))
360    }
361
362    fn register_embedded_input(
363        env: &mut Self::InstantiateEnv,
364        location_key: LocationKey,
365        ident: &syn::Ident,
366        element_type: &syn::Type,
367    ) {
368        env.inputs
369            .entry(location_key)
370            .unwrap()
371            .or_default()
372            .push((ident.clone(), element_type.clone()));
373    }
374
375    fn register_embedded_output(
376        env: &mut Self::InstantiateEnv,
377        location_key: LocationKey,
378        ident: &syn::Ident,
379        element_type: &syn::Type,
380    ) {
381        env.outputs
382            .entry(location_key)
383            .unwrap()
384            .or_default()
385            .push((ident.clone(), element_type.clone()));
386    }
387}
388
389impl super::deploy::DeployFlow<'_, EmbeddedDeploy> {
390    /// Generates a `syn::File` containing one function per location in the flow.
391    ///
392    /// Each generated function has the signature:
393    /// ```ignore
394    /// pub fn <fn_name>() -> dfir_rs::scheduled::graph::Dfir<'static>
395    /// ```
396    /// where `fn_name` is the `String` passed to `with_process` / `with_cluster`.
397    ///
398    /// The returned `Dfir` can be manually executed by the caller.
399    ///
400    /// # Arguments
401    ///
402    /// * `crate_name` — the name of the crate containing the Hydro program (used for stageleft
403    ///   re-exports). Hyphens will be replaced with underscores.
404    ///
405    /// # Usage
406    ///
407    /// Typically called from a `build.rs` in a wrapper crate:
408    /// ```ignore
409    /// // build.rs
410    /// let deploy = flow.with_process(&process, "my_fn".to_string());
411    /// let code = deploy.generate_embedded("my_hydro_crate");
412    /// let out_dir = std::env::var("OUT_DIR").unwrap();
413    /// std::fs::write(format!("{out_dir}/embedded.rs"), prettyplease::unparse(&code)).unwrap();
414    /// ```
415    ///
416    /// Then in `lib.rs`:
417    /// ```ignore
418    /// include!(concat!(env!("OUT_DIR"), "/embedded.rs"));
419    /// ```
420    pub fn generate_embedded(mut self, crate_name: &str) -> syn::File {
421        let mut env = EmbeddedInstantiateEnv::default();
422        let compiled = self.compile_internal(&mut env);
423
424        let root = crate::staging_util::get_this_crate();
425        let orig_crate_name = quote::format_ident!("{}", crate_name.replace('-', "_"));
426
427        let mut items: Vec<syn::Item> = Vec::new();
428
429        // Sort location keys for deterministic output.
430        let mut location_keys: Vec<_> = compiled.all_dfir().keys().collect();
431        location_keys.sort();
432
433        for location_key in location_keys {
434            let graph = &compiled.all_dfir()[location_key];
435
436            // Get the user-provided function name from the node.
437            let fn_name = self
438                .processes
439                .get(location_key)
440                .map(|n| &n.fn_name)
441                .or_else(|| self.clusters.get(location_key).map(|n| &n.fn_name))
442                .or_else(|| self.externals.get(location_key).map(|n| &n.fn_name))
443                .expect("location key not found in any node map");
444
445            let fn_ident = syn::Ident::new(fn_name, Span::call_site());
446
447            // Get inputs for this location, sorted by name.
448            let mut loc_inputs = env.inputs.get(location_key).cloned().unwrap_or_default();
449            loc_inputs.sort_by(|a, b| a.0.to_string().cmp(&b.0.to_string()));
450
451            // Get outputs for this location, sorted by name.
452            let mut loc_outputs = env.outputs.get(location_key).cloned().unwrap_or_default();
453            loc_outputs.sort_by(|a, b| a.0.to_string().cmp(&b.0.to_string()));
454
455            // Get network outputs (sinks) for this location, sorted by name.
456            let mut loc_net_outputs = env
457                .network_outputs
458                .get(location_key)
459                .cloned()
460                .unwrap_or_default();
461            loc_net_outputs.sort();
462            loc_net_outputs.dedup();
463
464            // Get network inputs (sources) for this location, sorted by name.
465            let mut loc_net_inputs = env
466                .network_inputs
467                .get(location_key)
468                .cloned()
469                .unwrap_or_default();
470            loc_net_inputs.sort();
471            loc_net_inputs.dedup();
472
473            let mut diagnostics = Diagnostics::new();
474            let dfir_tokens = graph
475                .as_code(&quote! { __root_dfir_rs }, true, quote!(), &mut diagnostics)
476                .expect("DFIR code generation failed with diagnostics.");
477
478            // Build the input parameters.
479            let input_params: Vec<proc_macro2::TokenStream> = loc_inputs
480                .iter()
481                .map(|(ident, element_type)| {
482                    quote! { #ident: impl __root_dfir_rs::futures::Stream<Item = #element_type> + Unpin + 'a }
483                })
484                .collect();
485
486            let has_outputs = !loc_outputs.is_empty();
487            let has_net_out = !loc_net_outputs.is_empty();
488            let has_net_in = !loc_net_inputs.is_empty();
489
490            // --- Build module items (output struct, network structs) ---
491            let mut mod_items: Vec<proc_macro2::TokenStream> = Vec::new();
492            let mut extra_fn_generics: Vec<proc_macro2::TokenStream> = Vec::new();
493            let mut extra_fn_params: Vec<proc_macro2::TokenStream> = Vec::new();
494            let mut extra_destructure: Vec<proc_macro2::TokenStream> = Vec::new();
495
496            // Embedded outputs (FnMut callbacks).
497            if has_outputs {
498                let output_struct_ident = syn::Ident::new("EmbeddedOutputs", Span::call_site());
499
500                let output_generic_idents: Vec<syn::Ident> = loc_outputs
501                    .iter()
502                    .enumerate()
503                    .map(|(i, _)| quote::format_ident!("__Out{}", i))
504                    .collect();
505
506                let struct_fields: Vec<proc_macro2::TokenStream> = loc_outputs
507                    .iter()
508                    .zip(output_generic_idents.iter())
509                    .map(|((ident, _), generic)| {
510                        quote! { pub #ident: #generic }
511                    })
512                    .collect();
513
514                let struct_generics: Vec<proc_macro2::TokenStream> = loc_outputs
515                    .iter()
516                    .zip(output_generic_idents.iter())
517                    .map(|((_, element_type), generic)| {
518                        quote! { #generic: FnMut(#element_type) }
519                    })
520                    .collect();
521
522                for ((_, element_type), generic) in
523                    loc_outputs.iter().zip(output_generic_idents.iter())
524                {
525                    extra_fn_generics.push(quote! { #generic: FnMut(#element_type) + 'a });
526                }
527
528                extra_fn_params.push(quote! {
529                    __outputs: &'a mut #fn_ident::#output_struct_ident<#(#output_generic_idents),*>
530                });
531
532                for (ident, _) in &loc_outputs {
533                    extra_destructure.push(quote! { let mut #ident = &mut __outputs.#ident; });
534                }
535
536                mod_items.push(quote! {
537                    pub struct #output_struct_ident<#(#struct_generics),*> {
538                        #(#struct_fields),*
539                    }
540                });
541            }
542
543            // Network outputs (FnMut(Bytes) sinks).
544            if has_net_out {
545                let net_out_struct_ident = syn::Ident::new("EmbeddedNetworkOut", Span::call_site());
546
547                let net_out_generic_idents: Vec<syn::Ident> = loc_net_outputs
548                    .iter()
549                    .enumerate()
550                    .map(|(i, _)| quote::format_ident!("__NetOut{}", i))
551                    .collect();
552
553                let struct_fields: Vec<proc_macro2::TokenStream> = loc_net_outputs
554                    .iter()
555                    .zip(net_out_generic_idents.iter())
556                    .map(|(name, generic)| {
557                        let field_ident = syn::Ident::new(name, Span::call_site());
558                        quote! { pub #field_ident: #generic }
559                    })
560                    .collect();
561
562                let struct_generics: Vec<proc_macro2::TokenStream> = net_out_generic_idents
563                    .iter()
564                    .map(|generic| {
565                        quote! { #generic: FnMut(#root::runtime_support::dfir_rs::bytes::Bytes) }
566                    })
567                    .collect();
568
569                for generic in &net_out_generic_idents {
570                    extra_fn_generics.push(
571                        quote! { #generic: FnMut(#root::runtime_support::dfir_rs::bytes::Bytes) + 'a },
572                    );
573                }
574
575                extra_fn_params.push(quote! {
576                    __network_out: &'a mut #fn_ident::#net_out_struct_ident<#(#net_out_generic_idents),*>
577                });
578
579                for name in &loc_net_outputs {
580                    let field_ident = syn::Ident::new(name, Span::call_site());
581                    let var_ident =
582                        syn::Ident::new(&format!("__network_out_{name}"), Span::call_site());
583                    extra_destructure
584                        .push(quote! { let mut #var_ident = &mut __network_out.#field_ident; });
585                }
586
587                mod_items.push(quote! {
588                    pub struct #net_out_struct_ident<#(#struct_generics),*> {
589                        #(#struct_fields),*
590                    }
591                });
592            }
593
594            // Network inputs (Stream<Item = Result<BytesMut, io::Error>> sources).
595            if has_net_in {
596                let net_in_struct_ident = syn::Ident::new("EmbeddedNetworkIn", Span::call_site());
597
598                let net_in_generic_idents: Vec<syn::Ident> = loc_net_inputs
599                    .iter()
600                    .enumerate()
601                    .map(|(i, _)| quote::format_ident!("__NetIn{}", i))
602                    .collect();
603
604                let struct_fields: Vec<proc_macro2::TokenStream> = loc_net_inputs
605                    .iter()
606                    .zip(net_in_generic_idents.iter())
607                    .map(|(name, generic)| {
608                        let field_ident = syn::Ident::new(name, Span::call_site());
609                        quote! { pub #field_ident: #generic }
610                    })
611                    .collect();
612
613                let struct_generics: Vec<proc_macro2::TokenStream> = net_in_generic_idents
614                    .iter()
615                    .map(|generic| {
616                        quote! { #generic: __root_dfir_rs::futures::Stream<Item = Result<__root_dfir_rs::bytes::BytesMut, std::io::Error>> + Unpin }
617                    })
618                    .collect();
619
620                for generic in &net_in_generic_idents {
621                    extra_fn_generics.push(
622                        quote! { #generic: __root_dfir_rs::futures::Stream<Item = Result<__root_dfir_rs::bytes::BytesMut, std::io::Error>> + Unpin + 'a },
623                    );
624                }
625
626                extra_fn_params.push(quote! {
627                    __network_in: #fn_ident::#net_in_struct_ident<#(#net_in_generic_idents),*>
628                });
629
630                for name in &loc_net_inputs {
631                    let field_ident = syn::Ident::new(name, Span::call_site());
632                    let var_ident =
633                        syn::Ident::new(&format!("__network_in_{name}"), Span::call_site());
634                    extra_destructure.push(quote! { let #var_ident = __network_in.#field_ident; });
635                }
636
637                mod_items.push(quote! {
638                    pub struct #net_in_struct_ident<#(#struct_generics),*> {
639                        #(#struct_fields),*
640                    }
641                });
642            }
643
644            // Emit the module if there are any structs.
645            if !mod_items.is_empty() {
646                let output_mod: syn::Item = syn::parse_quote! {
647                    pub mod #fn_ident {
648                        use super::*;
649                        #(#mod_items)*
650                    }
651                };
652                items.push(output_mod);
653            }
654
655            // Build the function.
656            let all_params: Vec<proc_macro2::TokenStream> =
657                input_params.into_iter().chain(extra_fn_params).collect();
658
659            let has_generics = !extra_fn_generics.is_empty();
660
661            if has_generics {
662                let func: syn::Item = syn::parse_quote! {
663                    #[allow(unused, non_snake_case, clippy::suspicious_else_formatting)]
664                    pub fn #fn_ident<'a, #(#extra_fn_generics),*>(#(#all_params),*) -> #root::runtime_support::dfir_rs::scheduled::graph::Dfir<'a> {
665                        #(#extra_destructure)*
666                        #dfir_tokens
667                    }
668                };
669                items.push(func);
670            } else {
671                let func: syn::Item = syn::parse_quote! {
672                    #[allow(unused, non_snake_case, clippy::suspicious_else_formatting)]
673                    pub fn #fn_ident<'a>(#(#all_params),*) -> #root::runtime_support::dfir_rs::scheduled::graph::Dfir<'a> {
674                        #dfir_tokens
675                    }
676                };
677                items.push(func);
678            }
679        }
680
681        syn::parse_quote! {
682            use #orig_crate_name::__staged::__deps::*;
683            use #root::prelude::*;
684            use #root::runtime_support::dfir_rs as __root_dfir_rs;
685            pub use #orig_crate_name::__staged;
686
687            #( #items )*
688        }
689    }
690}