1use std::cell::RefCell;
4use std::collections::HashMap;
5use std::future::Future;
6use std::io::Error;
7use std::pin::Pin;
8use std::rc::Rc;
9use std::sync::Arc;
10
11use bytes::{Bytes, BytesMut};
12use dfir_lang::graph::DfirGraph;
13use futures::{Sink, SinkExt, Stream, StreamExt};
14use hydro_deploy::custom_service::CustomClientPort;
15use hydro_deploy::rust_crate::RustCrateService;
16use hydro_deploy::rust_crate::ports::{DemuxSink, RustCrateSink, RustCrateSource, TaggedSource};
17use hydro_deploy::rust_crate::tracing_options::TracingOptions;
18use hydro_deploy::{CustomService, Deployment, Host, RustCrate};
19use hydro_deploy_integration::{ConnectedSink, ConnectedSource};
20use nameof::name_of;
21use proc_macro2::Span;
22use serde::Serialize;
23use serde::de::DeserializeOwned;
24use slotmap::SparseSecondaryMap;
25use stageleft::{QuotedWithContext, RuntimeData};
26use syn::parse_quote;
27
28use super::deploy_runtime::*;
29use crate::compile::builder::ExternalPortId;
30use crate::compile::deploy_provider::{
31 ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec, Node, ProcessSpec, RegisterPort,
32};
33use crate::compile::trybuild::generate::{
34 HYDRO_RUNTIME_FEATURES, LinkingMode, create_graph_trybuild,
35};
36use crate::location::dynamic::LocationId;
37use crate::location::member_id::TaglessMemberId;
38use crate::location::{LocationKey, MembershipEvent, NetworkHint};
39use crate::staging_util::get_this_crate;
40
41pub enum HydroDeploy {}
46
47impl<'a> Deploy<'a> for HydroDeploy {
48 type Meta = SparseSecondaryMap<LocationKey, Vec<TaglessMemberId>>;
50 type InstantiateEnv = Deployment;
51
52 type Process = DeployNode;
53 type Cluster = DeployCluster;
54 type External = DeployExternal;
55
56 fn o2o_sink_source(
57 _env: &mut Self::InstantiateEnv,
58 _p1: &Self::Process,
59 p1_port: &<Self::Process as Node>::Port,
60 _p2: &Self::Process,
61 p2_port: &<Self::Process as Node>::Port,
62 _name: Option<&str>,
63 ) -> (syn::Expr, syn::Expr) {
64 let p1_port = p1_port.as_str();
65 let p2_port = p2_port.as_str();
66 deploy_o2o(
67 RuntimeData::new("__hydro_lang_trybuild_cli"),
68 p1_port,
69 p2_port,
70 )
71 }
72
73 fn o2o_connect(
74 p1: &Self::Process,
75 p1_port: &<Self::Process as Node>::Port,
76 p2: &Self::Process,
77 p2_port: &<Self::Process as Node>::Port,
78 ) -> Box<dyn FnOnce()> {
79 let p1 = p1.clone();
80 let p1_port = p1_port.clone();
81 let p2 = p2.clone();
82 let p2_port = p2_port.clone();
83
84 Box::new(move || {
85 let self_underlying_borrow = p1.underlying.borrow();
86 let self_underlying = self_underlying_borrow.as_ref().unwrap();
87 let source_port = self_underlying.get_port(p1_port.clone());
88
89 let other_underlying_borrow = p2.underlying.borrow();
90 let other_underlying = other_underlying_borrow.as_ref().unwrap();
91 let recipient_port = other_underlying.get_port(p2_port.clone());
92
93 source_port.send_to(&recipient_port)
94 })
95 }
96
97 fn o2m_sink_source(
98 _p1: &Self::Process,
99 p1_port: &<Self::Process as Node>::Port,
100 _c2: &Self::Cluster,
101 c2_port: &<Self::Cluster as Node>::Port,
102 ) -> (syn::Expr, syn::Expr) {
103 let p1_port = p1_port.as_str();
104 let c2_port = c2_port.as_str();
105 deploy_o2m(
106 RuntimeData::new("__hydro_lang_trybuild_cli"),
107 p1_port,
108 c2_port,
109 )
110 }
111
112 fn o2m_connect(
113 p1: &Self::Process,
114 p1_port: &<Self::Process as Node>::Port,
115 c2: &Self::Cluster,
116 c2_port: &<Self::Cluster as Node>::Port,
117 ) -> Box<dyn FnOnce()> {
118 let p1 = p1.clone();
119 let p1_port = p1_port.clone();
120 let c2 = c2.clone();
121 let c2_port = c2_port.clone();
122
123 Box::new(move || {
124 let self_underlying_borrow = p1.underlying.borrow();
125 let self_underlying = self_underlying_borrow.as_ref().unwrap();
126 let source_port = self_underlying.get_port(p1_port.clone());
127
128 let recipient_port = DemuxSink {
129 demux: c2
130 .members
131 .borrow()
132 .iter()
133 .enumerate()
134 .map(|(id, c)| {
135 (
136 id as u32,
137 Arc::new(c.underlying.get_port(c2_port.clone()))
138 as Arc<dyn RustCrateSink + 'static>,
139 )
140 })
141 .collect(),
142 };
143
144 source_port.send_to(&recipient_port)
145 })
146 }
147
148 fn m2o_sink_source(
149 _c1: &Self::Cluster,
150 c1_port: &<Self::Cluster as Node>::Port,
151 _p2: &Self::Process,
152 p2_port: &<Self::Process as Node>::Port,
153 ) -> (syn::Expr, syn::Expr) {
154 let c1_port = c1_port.as_str();
155 let p2_port = p2_port.as_str();
156 deploy_m2o(
157 RuntimeData::new("__hydro_lang_trybuild_cli"),
158 c1_port,
159 p2_port,
160 )
161 }
162
163 fn m2o_connect(
164 c1: &Self::Cluster,
165 c1_port: &<Self::Cluster as Node>::Port,
166 p2: &Self::Process,
167 p2_port: &<Self::Process as Node>::Port,
168 ) -> Box<dyn FnOnce()> {
169 let c1 = c1.clone();
170 let c1_port = c1_port.clone();
171 let p2 = p2.clone();
172 let p2_port = p2_port.clone();
173
174 Box::new(move || {
175 let other_underlying_borrow = p2.underlying.borrow();
176 let other_underlying = other_underlying_borrow.as_ref().unwrap();
177 let recipient_port = other_underlying.get_port(p2_port.clone()).merge();
178
179 for (i, node) in c1.members.borrow().iter().enumerate() {
180 let source_port = node.underlying.get_port(c1_port.clone());
181
182 TaggedSource {
183 source: Arc::new(source_port),
184 tag: i as u32,
185 }
186 .send_to(&recipient_port);
187 }
188 })
189 }
190
191 fn m2m_sink_source(
192 _c1: &Self::Cluster,
193 c1_port: &<Self::Cluster as Node>::Port,
194 _c2: &Self::Cluster,
195 c2_port: &<Self::Cluster as Node>::Port,
196 ) -> (syn::Expr, syn::Expr) {
197 let c1_port = c1_port.as_str();
198 let c2_port = c2_port.as_str();
199 deploy_m2m(
200 RuntimeData::new("__hydro_lang_trybuild_cli"),
201 c1_port,
202 c2_port,
203 )
204 }
205
206 fn m2m_connect(
207 c1: &Self::Cluster,
208 c1_port: &<Self::Cluster as Node>::Port,
209 c2: &Self::Cluster,
210 c2_port: &<Self::Cluster as Node>::Port,
211 ) -> Box<dyn FnOnce()> {
212 let c1 = c1.clone();
213 let c1_port = c1_port.clone();
214 let c2 = c2.clone();
215 let c2_port = c2_port.clone();
216
217 Box::new(move || {
218 for (i, sender) in c1.members.borrow().iter().enumerate() {
219 let source_port = sender.underlying.get_port(c1_port.clone());
220
221 let recipient_port = DemuxSink {
222 demux: c2
223 .members
224 .borrow()
225 .iter()
226 .enumerate()
227 .map(|(id, c)| {
228 (
229 id as u32,
230 Arc::new(c.underlying.get_port(c2_port.clone()).merge())
231 as Arc<dyn RustCrateSink + 'static>,
232 )
233 })
234 .collect(),
235 };
236
237 TaggedSource {
238 source: Arc::new(source_port),
239 tag: i as u32,
240 }
241 .send_to(&recipient_port);
242 }
243 })
244 }
245
246 fn e2o_many_source(
247 extra_stmts: &mut Vec<syn::Stmt>,
248 _p2: &Self::Process,
249 p2_port: &<Self::Process as Node>::Port,
250 codec_type: &syn::Type,
251 shared_handle: String,
252 ) -> syn::Expr {
253 let connect_ident = syn::Ident::new(
254 &format!("__hydro_deploy_many_{}_connect", &shared_handle),
255 Span::call_site(),
256 );
257 let source_ident = syn::Ident::new(
258 &format!("__hydro_deploy_many_{}_source", &shared_handle),
259 Span::call_site(),
260 );
261 let sink_ident = syn::Ident::new(
262 &format!("__hydro_deploy_many_{}_sink", &shared_handle),
263 Span::call_site(),
264 );
265 let membership_ident = syn::Ident::new(
266 &format!("__hydro_deploy_many_{}_membership", &shared_handle),
267 Span::call_site(),
268 );
269
270 let root = get_this_crate();
271
272 extra_stmts.push(syn::parse_quote! {
273 let #connect_ident = __hydro_lang_trybuild_cli
274 .port(#p2_port)
275 .connect::<#root::runtime_support::hydro_deploy_integration::multi_connection::ConnectedMultiConnection<_, _, #codec_type>>();
276 });
277
278 extra_stmts.push(syn::parse_quote! {
279 let #source_ident = #connect_ident.source;
280 });
281
282 extra_stmts.push(syn::parse_quote! {
283 let #sink_ident = #connect_ident.sink;
284 });
285
286 extra_stmts.push(syn::parse_quote! {
287 let #membership_ident = #connect_ident.membership;
288 });
289
290 parse_quote!(#source_ident)
291 }
292
293 fn e2o_many_sink(shared_handle: String) -> syn::Expr {
294 let sink_ident = syn::Ident::new(
295 &format!("__hydro_deploy_many_{}_sink", &shared_handle),
296 Span::call_site(),
297 );
298 parse_quote!(#sink_ident)
299 }
300
301 fn e2o_source(
302 extra_stmts: &mut Vec<syn::Stmt>,
303 _p1: &Self::External,
304 _p1_port: &<Self::External as Node>::Port,
305 _p2: &Self::Process,
306 p2_port: &<Self::Process as Node>::Port,
307 codec_type: &syn::Type,
308 shared_handle: String,
309 ) -> syn::Expr {
310 let connect_ident = syn::Ident::new(
311 &format!("__hydro_deploy_{}_connect", &shared_handle),
312 Span::call_site(),
313 );
314 let source_ident = syn::Ident::new(
315 &format!("__hydro_deploy_{}_source", &shared_handle),
316 Span::call_site(),
317 );
318 let sink_ident = syn::Ident::new(
319 &format!("__hydro_deploy_{}_sink", &shared_handle),
320 Span::call_site(),
321 );
322
323 let root = get_this_crate();
324
325 extra_stmts.push(syn::parse_quote! {
326 let #connect_ident = __hydro_lang_trybuild_cli
327 .port(#p2_port)
328 .connect::<#root::runtime_support::hydro_deploy_integration::single_connection::ConnectedSingleConnection<_, _, #codec_type>>();
329 });
330
331 extra_stmts.push(syn::parse_quote! {
332 let #source_ident = #connect_ident.source;
333 });
334
335 extra_stmts.push(syn::parse_quote! {
336 let #sink_ident = #connect_ident.sink;
337 });
338
339 parse_quote!(#source_ident)
340 }
341
342 fn e2o_connect(
343 p1: &Self::External,
344 p1_port: &<Self::External as Node>::Port,
345 p2: &Self::Process,
346 p2_port: &<Self::Process as Node>::Port,
347 _many: bool,
348 server_hint: NetworkHint,
349 ) -> Box<dyn FnOnce()> {
350 let p1 = p1.clone();
351 let p1_port = p1_port.clone();
352 let p2 = p2.clone();
353 let p2_port = p2_port.clone();
354
355 Box::new(move || {
356 let self_underlying_borrow = p1.underlying.borrow();
357 let self_underlying = self_underlying_borrow.as_ref().unwrap();
358 let source_port = self_underlying.declare_many_client();
359
360 let other_underlying_borrow = p2.underlying.borrow();
361 let other_underlying = other_underlying_borrow.as_ref().unwrap();
362 let recipient_port = other_underlying.get_port_with_hint(
363 p2_port.clone(),
364 match server_hint {
365 NetworkHint::Auto => hydro_deploy::PortNetworkHint::Auto,
366 NetworkHint::TcpPort(p) => hydro_deploy::PortNetworkHint::TcpPort(p),
367 },
368 );
369
370 source_port.send_to(&recipient_port);
371
372 p1.client_ports
373 .borrow_mut()
374 .insert(p1_port.clone(), source_port);
375 })
376 }
377
378 fn o2e_sink(
379 _p1: &Self::Process,
380 _p1_port: &<Self::Process as Node>::Port,
381 _p2: &Self::External,
382 _p2_port: &<Self::External as Node>::Port,
383 shared_handle: String,
384 ) -> syn::Expr {
385 let sink_ident = syn::Ident::new(
386 &format!("__hydro_deploy_{}_sink", &shared_handle),
387 Span::call_site(),
388 );
389 parse_quote!(#sink_ident)
390 }
391
392 fn cluster_ids(
393 of_cluster: LocationKey,
394 ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a {
395 cluster_members(RuntimeData::new("__hydro_lang_trybuild_cli"), of_cluster)
396 }
397
398 fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a {
399 cluster_self_id(RuntimeData::new("__hydro_lang_trybuild_cli"))
400 }
401
402 fn cluster_membership_stream(
403 location_id: &LocationId,
404 ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>
405 {
406 cluster_membership_stream(location_id)
407 }
408}
409
410#[expect(missing_docs, reason = "TODO")]
411pub trait DeployCrateWrapper {
412 fn underlying(&self) -> Arc<RustCrateService>;
413
414 fn stdout(&self) -> tokio::sync::mpsc::UnboundedReceiver<String> {
415 self.underlying().stdout()
416 }
417
418 fn stderr(&self) -> tokio::sync::mpsc::UnboundedReceiver<String> {
419 self.underlying().stderr()
420 }
421
422 fn stdout_filter(
423 &self,
424 prefix: impl Into<String>,
425 ) -> tokio::sync::mpsc::UnboundedReceiver<String> {
426 self.underlying().stdout_filter(prefix.into())
427 }
428
429 fn stderr_filter(
430 &self,
431 prefix: impl Into<String>,
432 ) -> tokio::sync::mpsc::UnboundedReceiver<String> {
433 self.underlying().stderr_filter(prefix.into())
434 }
435}
436
437#[expect(missing_docs, reason = "TODO")]
438#[derive(Clone)]
439pub struct TrybuildHost {
440 host: Arc<dyn Host>,
441 display_name: Option<String>,
442 rustflags: Option<String>,
443 profile: Option<String>,
444 additional_hydro_features: Vec<String>,
445 features: Vec<String>,
446 tracing: Option<TracingOptions>,
447 build_envs: Vec<(String, String)>,
448 name_hint: Option<String>,
449 cluster_idx: Option<usize>,
450}
451
452impl From<Arc<dyn Host>> for TrybuildHost {
453 fn from(host: Arc<dyn Host>) -> Self {
454 Self {
455 host,
456 display_name: None,
457 rustflags: None,
458 profile: None,
459 additional_hydro_features: vec![],
460 features: vec![],
461 tracing: None,
462 build_envs: vec![],
463 name_hint: None,
464 cluster_idx: None,
465 }
466 }
467}
468
469impl<H: Host + 'static> From<Arc<H>> for TrybuildHost {
470 fn from(host: Arc<H>) -> Self {
471 Self {
472 host,
473 display_name: None,
474 rustflags: None,
475 profile: None,
476 additional_hydro_features: vec![],
477 features: vec![],
478 tracing: None,
479 build_envs: vec![],
480 name_hint: None,
481 cluster_idx: None,
482 }
483 }
484}
485
486#[expect(missing_docs, reason = "TODO")]
487impl TrybuildHost {
488 pub fn new(host: Arc<dyn Host>) -> Self {
489 Self {
490 host,
491 display_name: None,
492 rustflags: None,
493 profile: None,
494 additional_hydro_features: vec![],
495 features: vec![],
496 tracing: None,
497 build_envs: vec![],
498 name_hint: None,
499 cluster_idx: None,
500 }
501 }
502
503 pub fn display_name(self, display_name: impl Into<String>) -> Self {
504 if self.display_name.is_some() {
505 panic!("{} already set", name_of!(display_name in Self));
506 }
507
508 Self {
509 display_name: Some(display_name.into()),
510 ..self
511 }
512 }
513
514 pub fn rustflags(self, rustflags: impl Into<String>) -> Self {
515 if self.rustflags.is_some() {
516 panic!("{} already set", name_of!(rustflags in Self));
517 }
518
519 Self {
520 rustflags: Some(rustflags.into()),
521 ..self
522 }
523 }
524
525 pub fn profile(self, profile: impl Into<String>) -> Self {
526 if self.profile.is_some() {
527 panic!("{} already set", name_of!(profile in Self));
528 }
529
530 Self {
531 profile: Some(profile.into()),
532 ..self
533 }
534 }
535
536 pub fn additional_hydro_features(self, additional_hydro_features: Vec<String>) -> Self {
537 Self {
538 additional_hydro_features,
539 ..self
540 }
541 }
542
543 pub fn features(self, features: Vec<String>) -> Self {
544 Self {
545 features: self.features.into_iter().chain(features).collect(),
546 ..self
547 }
548 }
549
550 pub fn tracing(self, tracing: TracingOptions) -> Self {
551 if self.tracing.is_some() {
552 panic!("{} already set", name_of!(tracing in Self));
553 }
554
555 Self {
556 tracing: Some(tracing),
557 ..self
558 }
559 }
560
561 pub fn build_env(self, key: impl Into<String>, value: impl Into<String>) -> Self {
562 Self {
563 build_envs: self
564 .build_envs
565 .into_iter()
566 .chain(std::iter::once((key.into(), value.into())))
567 .collect(),
568 ..self
569 }
570 }
571}
572
573impl IntoProcessSpec<'_, HydroDeploy> for Arc<dyn Host> {
574 type ProcessSpec = TrybuildHost;
575 fn into_process_spec(self) -> TrybuildHost {
576 TrybuildHost {
577 host: self,
578 display_name: None,
579 rustflags: None,
580 profile: None,
581 additional_hydro_features: vec![],
582 features: vec![],
583 tracing: None,
584 build_envs: vec![],
585 name_hint: None,
586 cluster_idx: None,
587 }
588 }
589}
590
591impl<H: Host + 'static> IntoProcessSpec<'_, HydroDeploy> for Arc<H> {
592 type ProcessSpec = TrybuildHost;
593 fn into_process_spec(self) -> TrybuildHost {
594 TrybuildHost {
595 host: self,
596 display_name: None,
597 rustflags: None,
598 profile: None,
599 additional_hydro_features: vec![],
600 features: vec![],
601 tracing: None,
602 build_envs: vec![],
603 name_hint: None,
604 cluster_idx: None,
605 }
606 }
607}
608
609#[expect(missing_docs, reason = "TODO")]
610#[derive(Clone)]
611pub struct DeployExternal {
612 next_port: Rc<RefCell<usize>>,
613 host: Arc<dyn Host>,
614 underlying: Rc<RefCell<Option<Arc<CustomService>>>>,
615 client_ports: Rc<RefCell<HashMap<String, CustomClientPort>>>,
616 allocated_ports: Rc<RefCell<HashMap<ExternalPortId, String>>>,
617}
618
619impl DeployExternal {
620 pub(crate) fn raw_port(&self, external_port_id: ExternalPortId) -> CustomClientPort {
621 self.client_ports
622 .borrow()
623 .get(
624 self.allocated_ports
625 .borrow()
626 .get(&external_port_id)
627 .unwrap(),
628 )
629 .unwrap()
630 .clone()
631 }
632}
633
634impl<'a> RegisterPort<'a, HydroDeploy> for DeployExternal {
635 fn register(&self, external_port_id: ExternalPortId, port: Self::Port) {
636 assert!(
637 self.allocated_ports
638 .borrow_mut()
639 .insert(external_port_id, port.clone())
640 .is_none_or(|old| old == port)
641 );
642 }
643
644 fn as_bytes_bidi(
645 &self,
646 external_port_id: ExternalPortId,
647 ) -> impl Future<
648 Output = (
649 Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>>,
650 Pin<Box<dyn Sink<Bytes, Error = Error>>>,
651 ),
652 > + 'a {
653 let port = self.raw_port(external_port_id);
654
655 async move {
656 let (source, sink) = port.connect().await.into_source_sink();
657 (
658 Box::pin(source) as Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>>,
659 Box::pin(sink) as Pin<Box<dyn Sink<Bytes, Error = Error>>>,
660 )
661 }
662 }
663
664 fn as_bincode_bidi<InT, OutT>(
665 &self,
666 external_port_id: ExternalPortId,
667 ) -> impl Future<
668 Output = (
669 Pin<Box<dyn Stream<Item = OutT>>>,
670 Pin<Box<dyn Sink<InT, Error = Error>>>,
671 ),
672 > + 'a
673 where
674 InT: Serialize + 'static,
675 OutT: DeserializeOwned + 'static,
676 {
677 let port = self.raw_port(external_port_id);
678 async move {
679 let (source, sink) = port.connect().await.into_source_sink();
680 (
681 Box::pin(source.map(|item| bincode::deserialize(&item.unwrap()).unwrap()))
682 as Pin<Box<dyn Stream<Item = OutT>>>,
683 Box::pin(
684 sink.with(|item| async move { Ok(bincode::serialize(&item).unwrap().into()) }),
685 ) as Pin<Box<dyn Sink<InT, Error = Error>>>,
686 )
687 }
688 }
689
690 fn as_bincode_sink<T: Serialize + 'static>(
691 &self,
692 external_port_id: ExternalPortId,
693 ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a {
694 let port = self.raw_port(external_port_id);
695 async move {
696 let sink = port.connect().await.into_sink();
697 Box::pin(sink.with(|item| async move { Ok(bincode::serialize(&item).unwrap().into()) }))
698 as Pin<Box<dyn Sink<T, Error = Error>>>
699 }
700 }
701
702 fn as_bincode_source<T: DeserializeOwned + 'static>(
703 &self,
704 external_port_id: ExternalPortId,
705 ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a {
706 let port = self.raw_port(external_port_id);
707 async move {
708 let source = port.connect().await.into_source();
709 Box::pin(source.map(|item| bincode::deserialize(&item.unwrap()).unwrap()))
710 as Pin<Box<dyn Stream<Item = T>>>
711 }
712 }
713}
714
715impl Node for DeployExternal {
716 type Port = String;
717 type Meta = SparseSecondaryMap<LocationKey, Vec<TaglessMemberId>>;
719 type InstantiateEnv = Deployment;
720
721 fn next_port(&self) -> Self::Port {
722 let next_port = *self.next_port.borrow();
723 *self.next_port.borrow_mut() += 1;
724
725 format!("port_{}", next_port)
726 }
727
728 fn instantiate(
729 &self,
730 env: &mut Self::InstantiateEnv,
731 _meta: &mut Self::Meta,
732 _graph: DfirGraph,
733 extra_stmts: &[syn::Stmt],
734 sidecars: &[syn::Expr],
735 ) {
736 assert!(extra_stmts.is_empty());
737 assert!(sidecars.is_empty());
738 let service = env.CustomService(self.host.clone(), vec![]);
739 *self.underlying.borrow_mut() = Some(service);
740 }
741
742 fn update_meta(&self, _meta: &Self::Meta) {}
743}
744
745impl ExternalSpec<'_, HydroDeploy> for Arc<dyn Host> {
746 fn build(self, _key: LocationKey, _name_hint: &str) -> DeployExternal {
747 DeployExternal {
748 next_port: Rc::new(RefCell::new(0)),
749 host: self,
750 underlying: Rc::new(RefCell::new(None)),
751 allocated_ports: Rc::new(RefCell::new(HashMap::new())),
752 client_ports: Rc::new(RefCell::new(HashMap::new())),
753 }
754 }
755}
756
757impl<H: Host + 'static> ExternalSpec<'_, HydroDeploy> for Arc<H> {
758 fn build(self, _key: LocationKey, _name_hint: &str) -> DeployExternal {
759 DeployExternal {
760 next_port: Rc::new(RefCell::new(0)),
761 host: self,
762 underlying: Rc::new(RefCell::new(None)),
763 allocated_ports: Rc::new(RefCell::new(HashMap::new())),
764 client_ports: Rc::new(RefCell::new(HashMap::new())),
765 }
766 }
767}
768
769pub(crate) enum CrateOrTrybuild {
770 Crate(RustCrate, Arc<dyn Host>),
771 Trybuild(TrybuildHost),
772}
773
774#[expect(missing_docs, reason = "TODO")]
775#[derive(Clone)]
776pub struct DeployNode {
777 next_port: Rc<RefCell<usize>>,
778 service_spec: Rc<RefCell<Option<CrateOrTrybuild>>>,
779 underlying: Rc<RefCell<Option<Arc<RustCrateService>>>>,
780}
781
782impl DeployCrateWrapper for DeployNode {
783 fn underlying(&self) -> Arc<RustCrateService> {
784 Arc::clone(self.underlying.borrow().as_ref().unwrap())
785 }
786}
787
788impl Node for DeployNode {
789 type Port = String;
790 type Meta = SparseSecondaryMap<LocationKey, Vec<TaglessMemberId>>;
792 type InstantiateEnv = Deployment;
793
794 fn next_port(&self) -> String {
795 let next_port = *self.next_port.borrow();
796 *self.next_port.borrow_mut() += 1;
797
798 format!("port_{}", next_port)
799 }
800
801 fn update_meta(&self, meta: &Self::Meta) {
802 let underlying_node = self.underlying.borrow();
803 underlying_node.as_ref().unwrap().update_meta(HydroMeta {
804 clusters: meta.clone(),
805 cluster_id: None,
806 });
807 }
808
809 fn instantiate(
810 &self,
811 env: &mut Self::InstantiateEnv,
812 _meta: &mut Self::Meta,
813 graph: DfirGraph,
814 extra_stmts: &[syn::Stmt],
815 sidecars: &[syn::Expr],
816 ) {
817 let (service, host) = match self.service_spec.borrow_mut().take().unwrap() {
818 CrateOrTrybuild::Crate(c, host) => (c, host),
819 CrateOrTrybuild::Trybuild(trybuild) => {
820 let linking_mode = if !cfg!(target_os = "windows")
822 && trybuild.host.target_type() == hydro_deploy::HostTargetType::Local
823 {
824 LinkingMode::Dynamic
827 } else {
828 LinkingMode::Static
829 };
830 let (bin_name, config) = create_graph_trybuild(
831 graph,
832 extra_stmts,
833 sidecars,
834 trybuild.name_hint.as_deref(),
835 crate::compile::trybuild::generate::DeployMode::HydroDeploy,
836 linking_mode,
837 );
838 let host = trybuild.host.clone();
839 (
840 create_trybuild_service(
841 trybuild,
842 &config.project_dir,
843 &config.target_dir,
844 config.features.as_deref(),
845 &bin_name,
846 &config.linking_mode,
847 ),
848 host,
849 )
850 }
851 };
852
853 *self.underlying.borrow_mut() = Some(env.add_service(service, host));
854 }
855}
856
857#[expect(missing_docs, reason = "TODO")]
858#[derive(Clone)]
859pub struct DeployClusterNode {
860 underlying: Arc<RustCrateService>,
861}
862
863impl DeployCrateWrapper for DeployClusterNode {
864 fn underlying(&self) -> Arc<RustCrateService> {
865 self.underlying.clone()
866 }
867}
868#[expect(missing_docs, reason = "TODO")]
869#[derive(Clone)]
870pub struct DeployCluster {
871 key: LocationKey,
872 next_port: Rc<RefCell<usize>>,
873 cluster_spec: Rc<RefCell<Option<Vec<CrateOrTrybuild>>>>,
874 members: Rc<RefCell<Vec<DeployClusterNode>>>,
875 name_hint: Option<String>,
876}
877
878impl DeployCluster {
879 #[expect(missing_docs, reason = "TODO")]
880 pub fn members(&self) -> Vec<DeployClusterNode> {
881 self.members.borrow().clone()
882 }
883}
884
885impl Node for DeployCluster {
886 type Port = String;
887 type Meta = SparseSecondaryMap<LocationKey, Vec<TaglessMemberId>>;
889 type InstantiateEnv = Deployment;
890
891 fn next_port(&self) -> String {
892 let next_port = *self.next_port.borrow();
893 *self.next_port.borrow_mut() += 1;
894
895 format!("port_{}", next_port)
896 }
897
898 fn instantiate(
899 &self,
900 env: &mut Self::InstantiateEnv,
901 meta: &mut Self::Meta,
902 graph: DfirGraph,
903 extra_stmts: &[syn::Stmt],
904 sidecars: &[syn::Expr],
905 ) {
906 let has_trybuild = self
907 .cluster_spec
908 .borrow()
909 .as_ref()
910 .unwrap()
911 .iter()
912 .any(|spec| matches!(spec, CrateOrTrybuild::Trybuild { .. }));
913
914 let linking_mode = if !cfg!(target_os = "windows")
916 && self
917 .cluster_spec
918 .borrow()
919 .as_ref()
920 .unwrap()
921 .iter()
922 .all(|spec| match spec {
923 CrateOrTrybuild::Crate(_, _) => true, CrateOrTrybuild::Trybuild(t) => {
925 t.host.target_type() == hydro_deploy::HostTargetType::Local
926 }
927 }) {
928 LinkingMode::Dynamic
930 } else {
931 LinkingMode::Static
932 };
933
934 let maybe_trybuild = if has_trybuild {
935 Some(create_graph_trybuild(
936 graph,
937 extra_stmts,
938 sidecars,
939 self.name_hint.as_deref(),
940 crate::compile::trybuild::generate::DeployMode::HydroDeploy,
941 linking_mode,
942 ))
943 } else {
944 None
945 };
946
947 let cluster_nodes = self
948 .cluster_spec
949 .borrow_mut()
950 .take()
951 .unwrap()
952 .into_iter()
953 .map(|spec| {
954 let (service, host) = match spec {
955 CrateOrTrybuild::Crate(c, host) => (c, host),
956 CrateOrTrybuild::Trybuild(trybuild) => {
957 let (bin_name, config) = maybe_trybuild.as_ref().unwrap();
958 let host = trybuild.host.clone();
959 (
960 create_trybuild_service(
961 trybuild,
962 &config.project_dir,
963 &config.target_dir,
964 config.features.as_deref(),
965 bin_name,
966 &config.linking_mode,
967 ),
968 host,
969 )
970 }
971 };
972
973 env.add_service(service, host)
974 })
975 .collect::<Vec<_>>();
976 meta.insert(
977 self.key,
978 (0..(cluster_nodes.len() as u32))
979 .map(TaglessMemberId::from_raw_id)
980 .collect(),
981 );
982 *self.members.borrow_mut() = cluster_nodes
983 .into_iter()
984 .map(|n| DeployClusterNode { underlying: n })
985 .collect();
986 }
987
988 fn update_meta(&self, meta: &Self::Meta) {
989 for (cluster_id, node) in self.members.borrow().iter().enumerate() {
990 node.underlying.update_meta(HydroMeta {
991 clusters: meta.clone(),
992 cluster_id: Some(TaglessMemberId::from_raw_id(cluster_id as u32)),
993 });
994 }
995 }
996}
997
998#[expect(missing_docs, reason = "TODO")]
999#[derive(Clone)]
1000pub struct DeployProcessSpec(RustCrate, Arc<dyn Host>);
1001
1002impl DeployProcessSpec {
1003 #[expect(missing_docs, reason = "TODO")]
1004 pub fn new(t: RustCrate, host: Arc<dyn Host>) -> Self {
1005 Self(t, host)
1006 }
1007}
1008
1009impl ProcessSpec<'_, HydroDeploy> for DeployProcessSpec {
1010 fn build(self, _key: LocationKey, _name_hint: &str) -> DeployNode {
1011 DeployNode {
1012 next_port: Rc::new(RefCell::new(0)),
1013 service_spec: Rc::new(RefCell::new(Some(CrateOrTrybuild::Crate(self.0, self.1)))),
1014 underlying: Rc::new(RefCell::new(None)),
1015 }
1016 }
1017}
1018
1019impl ProcessSpec<'_, HydroDeploy> for TrybuildHost {
1020 fn build(mut self, key: LocationKey, name_hint: &str) -> DeployNode {
1021 self.name_hint = Some(format!("{} (process {})", name_hint, key));
1022 DeployNode {
1023 next_port: Rc::new(RefCell::new(0)),
1024 service_spec: Rc::new(RefCell::new(Some(CrateOrTrybuild::Trybuild(self)))),
1025 underlying: Rc::new(RefCell::new(None)),
1026 }
1027 }
1028}
1029
1030#[expect(missing_docs, reason = "TODO")]
1031#[derive(Clone)]
1032pub struct DeployClusterSpec(Vec<(RustCrate, Arc<dyn Host>)>);
1033
1034impl DeployClusterSpec {
1035 #[expect(missing_docs, reason = "TODO")]
1036 pub fn new(crates: Vec<(RustCrate, Arc<dyn Host>)>) -> Self {
1037 Self(crates)
1038 }
1039}
1040
1041impl ClusterSpec<'_, HydroDeploy> for DeployClusterSpec {
1042 fn build(self, key: LocationKey, _name_hint: &str) -> DeployCluster {
1043 DeployCluster {
1044 key,
1045 next_port: Rc::new(RefCell::new(0)),
1046 cluster_spec: Rc::new(RefCell::new(Some(
1047 self.0
1048 .into_iter()
1049 .map(|(c, h)| CrateOrTrybuild::Crate(c, h))
1050 .collect(),
1051 ))),
1052 members: Rc::new(RefCell::new(vec![])),
1053 name_hint: None,
1054 }
1055 }
1056}
1057
1058impl<T: Into<TrybuildHost>, I: IntoIterator<Item = T>> ClusterSpec<'_, HydroDeploy> for I {
1059 fn build(self, key: LocationKey, name_hint: &str) -> DeployCluster {
1060 let name_hint = format!("{} (cluster {})", name_hint, key);
1061 DeployCluster {
1062 key,
1063 next_port: Rc::new(RefCell::new(0)),
1064 cluster_spec: Rc::new(RefCell::new(Some(
1065 self.into_iter()
1066 .enumerate()
1067 .map(|(idx, b)| {
1068 let mut b = b.into();
1069 b.name_hint = Some(name_hint.clone());
1070 b.cluster_idx = Some(idx);
1071 CrateOrTrybuild::Trybuild(b)
1072 })
1073 .collect(),
1074 ))),
1075 members: Rc::new(RefCell::new(vec![])),
1076 name_hint: Some(name_hint),
1077 }
1078 }
1079}
1080
1081fn create_trybuild_service(
1082 trybuild: TrybuildHost,
1083 dir: &std::path::Path,
1084 target_dir: &std::path::PathBuf,
1085 features: Option<&[String]>,
1086 bin_name: &str,
1087 linking_mode: &LinkingMode,
1088) -> RustCrate {
1089 let crate_dir = match linking_mode {
1091 LinkingMode::Dynamic => dir.join("dylib-examples"),
1092 LinkingMode::Static => dir.to_path_buf(),
1093 };
1094
1095 let mut ret = RustCrate::new(&crate_dir, dir)
1096 .target_dir(target_dir)
1097 .example(bin_name)
1098 .no_default_features();
1099
1100 ret = ret.set_is_dylib(matches!(linking_mode, LinkingMode::Dynamic));
1101
1102 if let Some(display_name) = trybuild.display_name {
1103 ret = ret.display_name(display_name);
1104 } else if let Some(name_hint) = trybuild.name_hint {
1105 if let Some(cluster_idx) = trybuild.cluster_idx {
1106 ret = ret.display_name(format!("{} / {}", name_hint, cluster_idx));
1107 } else {
1108 ret = ret.display_name(name_hint);
1109 }
1110 }
1111
1112 if let Some(rustflags) = trybuild.rustflags {
1113 ret = ret.rustflags(rustflags);
1114 }
1115
1116 if let Some(profile) = trybuild.profile {
1117 ret = ret.profile(profile);
1118 }
1119
1120 if let Some(tracing) = trybuild.tracing {
1121 ret = ret.tracing(tracing);
1122 }
1123
1124 ret = ret.features(
1125 vec!["hydro___feature_deploy_integration".to_owned()]
1126 .into_iter()
1127 .chain(
1128 trybuild
1129 .additional_hydro_features
1130 .into_iter()
1131 .map(|runtime_feature| {
1132 assert!(
1133 HYDRO_RUNTIME_FEATURES.iter().any(|f| f == &runtime_feature),
1134 "{runtime_feature} is not a valid Hydro runtime feature"
1135 );
1136 format!("hydro___feature_{runtime_feature}")
1137 }),
1138 )
1139 .chain(trybuild.features),
1140 );
1141
1142 for (key, value) in trybuild.build_envs {
1143 ret = ret.build_env(key, value);
1144 }
1145
1146 ret = ret.build_env("STAGELEFT_TRYBUILD_BUILD_STAGED", "1");
1147 ret = ret.config("build.incremental = false");
1148
1149 if let Some(features) = features {
1150 ret = ret.features(features);
1151 }
1152
1153 ret
1154}