1use std::cell::RefCell;
8use std::future::Future;
9use std::io::{BufRead, BufReader, Error};
10use std::path::PathBuf;
11use std::pin::Pin;
12use std::process::Stdio;
13use std::rc::Rc;
14
15use bytes::{Bytes, BytesMut};
16use dfir_lang::graph::DfirGraph;
17use futures::{Sink, Stream};
18use serde::Serialize;
19use serde::de::DeserializeOwned;
20use stageleft::{QuotedWithContext, RuntimeData};
21
22use super::deploy_runtime_maelstrom::*;
23use crate::compile::builder::ExternalPortId;
24use crate::compile::deploy_provider::{ClusterSpec, Deploy, Node, RegisterPort};
25use crate::compile::trybuild::generate::{LinkingMode, create_graph_trybuild};
26use crate::location::dynamic::LocationId;
27use crate::location::member_id::TaglessMemberId;
28use crate::location::{LocationKey, MembershipEvent, NetworkHint};
29
30pub enum MaelstromDeploy {}
37
38impl<'a> Deploy<'a> for MaelstromDeploy {
39 type Meta = ();
40 type InstantiateEnv = MaelstromDeployment;
41
42 type Process = MaelstromProcess;
43 type Cluster = MaelstromCluster;
44 type External = MaelstromExternal;
45
46 fn o2o_sink_source(
47 _env: &mut Self::InstantiateEnv,
48 _p1: &Self::Process,
49 _p1_port: &<Self::Process as Node>::Port,
50 _p2: &Self::Process,
51 _p2_port: &<Self::Process as Node>::Port,
52 _name: Option<&str>,
53 ) -> (syn::Expr, syn::Expr) {
54 panic!("Maelstrom deployment does not support processes, only clusters")
55 }
56
57 fn o2o_connect(
58 _p1: &Self::Process,
59 _p1_port: &<Self::Process as Node>::Port,
60 _p2: &Self::Process,
61 _p2_port: &<Self::Process as Node>::Port,
62 ) -> Box<dyn FnOnce()> {
63 panic!("Maelstrom deployment does not support processes, only clusters")
64 }
65
66 fn o2m_sink_source(
67 _p1: &Self::Process,
68 _p1_port: &<Self::Process as Node>::Port,
69 _c2: &Self::Cluster,
70 _c2_port: &<Self::Cluster as Node>::Port,
71 ) -> (syn::Expr, syn::Expr) {
72 panic!("Maelstrom deployment does not support processes, only clusters")
73 }
74
75 fn o2m_connect(
76 _p1: &Self::Process,
77 _p1_port: &<Self::Process as Node>::Port,
78 _c2: &Self::Cluster,
79 _c2_port: &<Self::Cluster as Node>::Port,
80 ) -> Box<dyn FnOnce()> {
81 panic!("Maelstrom deployment does not support processes, only clusters")
82 }
83
84 fn m2o_sink_source(
85 _c1: &Self::Cluster,
86 _c1_port: &<Self::Cluster as Node>::Port,
87 _p2: &Self::Process,
88 _p2_port: &<Self::Process as Node>::Port,
89 ) -> (syn::Expr, syn::Expr) {
90 panic!("Maelstrom deployment does not support processes, only clusters")
91 }
92
93 fn m2o_connect(
94 _c1: &Self::Cluster,
95 _c1_port: &<Self::Cluster as Node>::Port,
96 _p2: &Self::Process,
97 _p2_port: &<Self::Process as Node>::Port,
98 ) -> Box<dyn FnOnce()> {
99 panic!("Maelstrom deployment does not support processes, only clusters")
100 }
101
102 fn m2m_sink_source(
103 _c1: &Self::Cluster,
104 _c1_port: &<Self::Cluster as Node>::Port,
105 _c2: &Self::Cluster,
106 _c2_port: &<Self::Cluster as Node>::Port,
107 ) -> (syn::Expr, syn::Expr) {
108 deploy_maelstrom_m2m(RuntimeData::new("__hydro_lang_maelstrom_meta"))
109 }
110
111 fn m2m_connect(
112 _c1: &Self::Cluster,
113 _c1_port: &<Self::Cluster as Node>::Port,
114 _c2: &Self::Cluster,
115 _c2_port: &<Self::Cluster as Node>::Port,
116 ) -> Box<dyn FnOnce()> {
117 Box::new(|| {})
119 }
120
121 fn e2o_many_source(
122 _extra_stmts: &mut Vec<syn::Stmt>,
123 _p2: &Self::Process,
124 _p2_port: &<Self::Process as Node>::Port,
125 _codec_type: &syn::Type,
126 _shared_handle: String,
127 ) -> syn::Expr {
128 panic!("Maelstrom deployment does not support processes, only clusters")
129 }
130
131 fn e2o_many_sink(_shared_handle: String) -> syn::Expr {
132 panic!("Maelstrom deployment does not support processes, only clusters")
133 }
134
135 fn e2o_source(
136 _extra_stmts: &mut Vec<syn::Stmt>,
137 _p1: &Self::External,
138 _p1_port: &<Self::External as Node>::Port,
139 _p2: &Self::Process,
140 _p2_port: &<Self::Process as Node>::Port,
141 _codec_type: &syn::Type,
142 _shared_handle: String,
143 ) -> syn::Expr {
144 panic!("Maelstrom deployment does not support processes, only clusters")
145 }
146
147 fn e2o_connect(
148 _p1: &Self::External,
149 _p1_port: &<Self::External as Node>::Port,
150 _p2: &Self::Process,
151 _p2_port: &<Self::Process as Node>::Port,
152 _many: bool,
153 _server_hint: NetworkHint,
154 ) -> Box<dyn FnOnce()> {
155 panic!("Maelstrom deployment does not support processes, only clusters")
156 }
157
158 fn o2e_sink(
159 _p1: &Self::Process,
160 _p1_port: &<Self::Process as Node>::Port,
161 _p2: &Self::External,
162 _p2_port: &<Self::External as Node>::Port,
163 _shared_handle: String,
164 ) -> syn::Expr {
165 panic!("Maelstrom deployment does not support processes, only clusters")
166 }
167
168 fn cluster_ids(
169 _of_cluster: LocationKey,
170 ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a {
171 cluster_members(RuntimeData::new("__hydro_lang_maelstrom_meta"), _of_cluster)
172 }
173
174 fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a {
175 cluster_self_id(RuntimeData::new("__hydro_lang_maelstrom_meta"))
176 }
177
178 fn cluster_membership_stream(
179 location_id: &LocationId,
180 ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>
181 {
182 cluster_membership_stream(location_id)
183 }
184}
185
186#[derive(Clone)]
188pub struct MaelstromProcess {
189 _private: (),
190}
191
192impl Node for MaelstromProcess {
193 type Port = String;
194 type Meta = ();
195 type InstantiateEnv = MaelstromDeployment;
196
197 fn next_port(&self) -> Self::Port {
198 panic!("Maelstrom deployment does not support processes")
199 }
200
201 fn update_meta(&self, _meta: &Self::Meta) {}
202
203 fn instantiate(
204 &self,
205 _env: &mut Self::InstantiateEnv,
206 _meta: &mut Self::Meta,
207 _graph: DfirGraph,
208 _extra_stmts: &[syn::Stmt],
209 _sidecars: &[syn::Expr],
210 ) {
211 panic!("Maelstrom deployment does not support processes")
212 }
213}
214
215#[derive(Clone)]
217pub struct MaelstromCluster {
218 next_port: Rc<RefCell<usize>>,
219 name_hint: Option<String>,
220}
221
222impl Node for MaelstromCluster {
223 type Port = String;
224 type Meta = ();
225 type InstantiateEnv = MaelstromDeployment;
226
227 fn next_port(&self) -> Self::Port {
228 let next_port = *self.next_port.borrow();
229 *self.next_port.borrow_mut() += 1;
230 format!("port_{}", next_port)
231 }
232
233 fn update_meta(&self, _meta: &Self::Meta) {}
234
235 fn instantiate(
236 &self,
237 env: &mut Self::InstantiateEnv,
238 _meta: &mut Self::Meta,
239 graph: DfirGraph,
240 extra_stmts: &[syn::Stmt],
241 sidecars: &[syn::Expr],
242 ) {
243 let (bin_name, config) = create_graph_trybuild(
244 graph,
245 extra_stmts,
246 sidecars,
247 self.name_hint.as_deref(),
248 crate::compile::trybuild::generate::DeployMode::Maelstrom,
249 LinkingMode::Static,
250 );
251
252 env.bin_name = Some(bin_name);
253 env.project_dir = Some(config.project_dir);
254 env.target_dir = Some(config.target_dir);
255 env.features = config.features;
256 }
257}
258
259#[derive(Clone)]
261pub enum MaelstromExternal {}
262
263impl Node for MaelstromExternal {
264 type Port = String;
265 type Meta = ();
266 type InstantiateEnv = MaelstromDeployment;
267
268 fn next_port(&self) -> Self::Port {
269 unreachable!()
270 }
271
272 fn update_meta(&self, _meta: &Self::Meta) {}
273
274 fn instantiate(
275 &self,
276 _env: &mut Self::InstantiateEnv,
277 _meta: &mut Self::Meta,
278 _graph: DfirGraph,
279 _extra_stmts: &[syn::Stmt],
280 _sidecars: &[syn::Expr],
281 ) {
282 unreachable!()
283 }
284}
285
286impl<'a> RegisterPort<'a, MaelstromDeploy> for MaelstromExternal {
287 fn register(&self, _external_port_id: ExternalPortId, _port: Self::Port) {
288 unreachable!()
289 }
290
291 #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
292 fn as_bytes_bidi(
293 &self,
294 _external_port_id: ExternalPortId,
295 ) -> impl Future<
296 Output = (
297 Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>>,
298 Pin<Box<dyn Sink<Bytes, Error = Error>>>,
299 ),
300 > + 'a {
301 async move { unreachable!() }
302 }
303
304 #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
305 fn as_bincode_bidi<InT, OutT>(
306 &self,
307 _external_port_id: ExternalPortId,
308 ) -> impl Future<
309 Output = (
310 Pin<Box<dyn Stream<Item = OutT>>>,
311 Pin<Box<dyn Sink<InT, Error = Error>>>,
312 ),
313 > + 'a
314 where
315 InT: Serialize + 'static,
316 OutT: DeserializeOwned + 'static,
317 {
318 async move { unreachable!() }
319 }
320
321 #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
322 fn as_bincode_sink<T: Serialize + 'static>(
323 &self,
324 _external_port_id: ExternalPortId,
325 ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a {
326 async move { unreachable!() }
327 }
328
329 #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
330 fn as_bincode_source<T: DeserializeOwned + 'static>(
331 &self,
332 _external_port_id: ExternalPortId,
333 ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a {
334 async move { unreachable!() }
335 }
336}
337
338#[derive(Clone)]
340pub struct MaelstromClusterSpec;
341
342impl<'a> ClusterSpec<'a, MaelstromDeploy> for MaelstromClusterSpec {
343 fn build(self, key: LocationKey, name_hint: &str) -> MaelstromCluster {
344 assert_eq!(
345 key,
346 LocationKey::FIRST,
347 "there should only be one location for a Maelstrom deployment"
348 );
349 MaelstromCluster {
350 next_port: Rc::new(RefCell::new(0)),
351 name_hint: Some(name_hint.to_owned()),
352 }
353 }
354}
355
356pub struct MaelstromDeployment {
361 pub node_count: usize,
363 pub maelstrom_path: PathBuf,
365 pub workload: String,
367 pub time_limit: Option<u64>,
369 pub rate: Option<u64>,
371 pub availability: Option<String>,
373 pub nemesis: Option<String>,
375 pub extra_args: Vec<String>,
377
378 pub(crate) bin_name: Option<String>,
380 pub(crate) project_dir: Option<PathBuf>,
381 pub(crate) target_dir: Option<PathBuf>,
382 pub(crate) features: Option<Vec<String>>,
383}
384
385impl MaelstromDeployment {
386 pub fn new(workload: impl Into<String>) -> Self {
388 Self {
389 node_count: 1,
390 maelstrom_path: PathBuf::from("maelstrom"),
391 workload: workload.into(),
392 time_limit: None,
393 rate: None,
394 availability: None,
395 nemesis: None,
396 extra_args: vec![],
397 bin_name: None,
398 project_dir: None,
399 target_dir: None,
400 features: None,
401 }
402 }
403
404 pub fn node_count(mut self, count: usize) -> Self {
406 self.node_count = count;
407 self
408 }
409
410 pub fn maelstrom_path(mut self, path: impl Into<PathBuf>) -> Self {
412 self.maelstrom_path = path.into();
413 self
414 }
415
416 pub fn time_limit(mut self, seconds: u64) -> Self {
418 self.time_limit = Some(seconds);
419 self
420 }
421
422 pub fn rate(mut self, rate: u64) -> Self {
424 self.rate = Some(rate);
425 self
426 }
427
428 pub fn availability(mut self, availability: impl Into<String>) -> Self {
430 self.availability = Some(availability.into());
431 self
432 }
433
434 pub fn nemesis(mut self, nemesis: impl Into<String>) -> Self {
436 self.nemesis = Some(nemesis.into());
437 self
438 }
439
440 pub fn extra_args(mut self, args: impl IntoIterator<Item = impl Into<String>>) -> Self {
442 self.extra_args.extend(args.into_iter().map(Into::into));
443 self
444 }
445
446 pub fn build(&self) -> Result<PathBuf, Error> {
449 let bin_name = self
450 .bin_name
451 .as_ref()
452 .expect("No binary name set - did you call deploy?");
453 let project_dir = self.project_dir.as_ref().expect("No project dir set");
454 let target_dir = self.target_dir.as_ref().expect("No target dir set");
455
456 let mut cmd = std::process::Command::new("cargo");
457 cmd.arg("build")
458 .arg("--example")
459 .arg(bin_name)
460 .arg("--no-default-features")
461 .current_dir(project_dir)
462 .env("CARGO_TARGET_DIR", target_dir)
463 .env("STAGELEFT_TRYBUILD_BUILD_STAGED", "1");
464
465 let mut all_features = vec!["hydro___feature_maelstrom_runtime".to_owned()];
467 if let Some(features) = &self.features {
468 all_features.extend(features.iter().cloned());
469 }
470 if !all_features.is_empty() {
471 cmd.arg("--features").arg(all_features.join(","));
472 }
473
474 let status = cmd.status()?;
475 if !status.success() {
476 return Err(Error::other(format!(
477 "cargo build failed with status: {}",
478 status
479 )));
480 }
481
482 Ok(target_dir.join("debug").join("examples").join(bin_name))
483 }
484
485 pub fn run(self) -> Result<(), Error> {
489 let binary_path = self.build()?;
490
491 let mut cmd = std::process::Command::new(&self.maelstrom_path);
492 cmd.arg("test")
493 .arg("-w")
494 .arg(&self.workload)
495 .arg("--bin")
496 .arg(&binary_path)
497 .arg("--node-count")
498 .arg(self.node_count.to_string())
499 .stdout(Stdio::piped());
500
501 if let Some(time_limit) = self.time_limit {
502 cmd.arg("--time-limit").arg(time_limit.to_string());
503 }
504
505 if let Some(rate) = self.rate {
506 cmd.arg("--rate").arg(rate.to_string());
507 }
508
509 if let Some(availability) = self.availability {
510 cmd.arg("--availability").arg(availability);
511 }
512
513 if let Some(nemesis) = self.nemesis {
514 cmd.arg("--nemesis").arg(nemesis);
515 }
516
517 for arg in &self.extra_args {
518 cmd.arg(arg);
519 }
520
521 let spawned = cmd.spawn()?;
522
523 for line in BufReader::new(spawned.stdout.unwrap()).lines() {
524 let line = line?;
525 eprintln!("{}", &line);
526
527 if line.starts_with("Analysis invalid!") {
528 return Err(Error::other("Analysis was invalid"));
529 } else if line.starts_with("Errors occurred during analysis, but no anomalies found.")
530 || line.starts_with("Everything looks good!")
531 {
532 return Ok(());
533 }
534 }
535
536 Err(Error::other("Maelstrom produced an unexpected result"))
537 }
538
539 pub fn binary_path(&self) -> Option<PathBuf> {
541 let bin_name = self.bin_name.as_ref()?;
542 let target_dir = self.target_dir.as_ref()?;
543 Some(target_dir.join("debug").join("examples").join(bin_name))
544 }
545}