Skip to main content

hydro_lang/deploy/maelstrom/
deploy_maelstrom.rs

1//! Deployment backend for Hydro that targets Maelstrom for distributed systems testing.
2//!
3//! Maelstrom is a workbench for learning distributed systems by writing your own.
4//! This backend compiles Hydro programs to binaries that communicate via Maelstrom's
5//! stdin/stdout JSON protocol.
6
7use std::cell::RefCell;
8use std::future::Future;
9use std::io::{BufRead, BufReader, Error};
10use std::path::PathBuf;
11use std::pin::Pin;
12use std::process::Stdio;
13use std::rc::Rc;
14
15use bytes::{Bytes, BytesMut};
16use dfir_lang::graph::DfirGraph;
17use futures::{Sink, Stream};
18use serde::Serialize;
19use serde::de::DeserializeOwned;
20use stageleft::{QuotedWithContext, RuntimeData};
21
22use super::deploy_runtime_maelstrom::*;
23use crate::compile::builder::ExternalPortId;
24use crate::compile::deploy_provider::{ClusterSpec, Deploy, Node, RegisterPort};
25use crate::compile::trybuild::generate::{LinkingMode, create_graph_trybuild};
26use crate::location::dynamic::LocationId;
27use crate::location::member_id::TaglessMemberId;
28use crate::location::{LocationKey, MembershipEvent, NetworkHint};
29
30/// Deployment backend that targets Maelstrom for distributed systems testing.
31///
32/// This backend compiles Hydro programs to binaries that communicate via Maelstrom's
33/// stdin/stdout JSON protocol. It is restricted to programs with:
34/// - Exactly one cluster (no processes)
35/// - A single external input channel for client communication
36pub enum MaelstromDeploy {}
37
38impl<'a> Deploy<'a> for MaelstromDeploy {
39    type Meta = ();
40    type InstantiateEnv = MaelstromDeployment;
41
42    type Process = MaelstromProcess;
43    type Cluster = MaelstromCluster;
44    type External = MaelstromExternal;
45
46    fn o2o_sink_source(
47        _env: &mut Self::InstantiateEnv,
48        _p1: &Self::Process,
49        _p1_port: &<Self::Process as Node>::Port,
50        _p2: &Self::Process,
51        _p2_port: &<Self::Process as Node>::Port,
52        _name: Option<&str>,
53    ) -> (syn::Expr, syn::Expr) {
54        panic!("Maelstrom deployment does not support processes, only clusters")
55    }
56
57    fn o2o_connect(
58        _p1: &Self::Process,
59        _p1_port: &<Self::Process as Node>::Port,
60        _p2: &Self::Process,
61        _p2_port: &<Self::Process as Node>::Port,
62    ) -> Box<dyn FnOnce()> {
63        panic!("Maelstrom deployment does not support processes, only clusters")
64    }
65
66    fn o2m_sink_source(
67        _p1: &Self::Process,
68        _p1_port: &<Self::Process as Node>::Port,
69        _c2: &Self::Cluster,
70        _c2_port: &<Self::Cluster as Node>::Port,
71    ) -> (syn::Expr, syn::Expr) {
72        panic!("Maelstrom deployment does not support processes, only clusters")
73    }
74
75    fn o2m_connect(
76        _p1: &Self::Process,
77        _p1_port: &<Self::Process as Node>::Port,
78        _c2: &Self::Cluster,
79        _c2_port: &<Self::Cluster as Node>::Port,
80    ) -> Box<dyn FnOnce()> {
81        panic!("Maelstrom deployment does not support processes, only clusters")
82    }
83
84    fn m2o_sink_source(
85        _c1: &Self::Cluster,
86        _c1_port: &<Self::Cluster as Node>::Port,
87        _p2: &Self::Process,
88        _p2_port: &<Self::Process as Node>::Port,
89    ) -> (syn::Expr, syn::Expr) {
90        panic!("Maelstrom deployment does not support processes, only clusters")
91    }
92
93    fn m2o_connect(
94        _c1: &Self::Cluster,
95        _c1_port: &<Self::Cluster as Node>::Port,
96        _p2: &Self::Process,
97        _p2_port: &<Self::Process as Node>::Port,
98    ) -> Box<dyn FnOnce()> {
99        panic!("Maelstrom deployment does not support processes, only clusters")
100    }
101
102    fn m2m_sink_source(
103        _c1: &Self::Cluster,
104        _c1_port: &<Self::Cluster as Node>::Port,
105        _c2: &Self::Cluster,
106        _c2_port: &<Self::Cluster as Node>::Port,
107    ) -> (syn::Expr, syn::Expr) {
108        deploy_maelstrom_m2m(RuntimeData::new("__hydro_lang_maelstrom_meta"))
109    }
110
111    fn m2m_connect(
112        _c1: &Self::Cluster,
113        _c1_port: &<Self::Cluster as Node>::Port,
114        _c2: &Self::Cluster,
115        _c2_port: &<Self::Cluster as Node>::Port,
116    ) -> Box<dyn FnOnce()> {
117        // No runtime connection needed for Maelstrom - all routing is via stdin/stdout
118        Box::new(|| {})
119    }
120
121    fn e2o_many_source(
122        _extra_stmts: &mut Vec<syn::Stmt>,
123        _p2: &Self::Process,
124        _p2_port: &<Self::Process as Node>::Port,
125        _codec_type: &syn::Type,
126        _shared_handle: String,
127    ) -> syn::Expr {
128        panic!("Maelstrom deployment does not support processes, only clusters")
129    }
130
131    fn e2o_many_sink(_shared_handle: String) -> syn::Expr {
132        panic!("Maelstrom deployment does not support processes, only clusters")
133    }
134
135    fn e2o_source(
136        _extra_stmts: &mut Vec<syn::Stmt>,
137        _p1: &Self::External,
138        _p1_port: &<Self::External as Node>::Port,
139        _p2: &Self::Process,
140        _p2_port: &<Self::Process as Node>::Port,
141        _codec_type: &syn::Type,
142        _shared_handle: String,
143    ) -> syn::Expr {
144        panic!("Maelstrom deployment does not support processes, only clusters")
145    }
146
147    fn e2o_connect(
148        _p1: &Self::External,
149        _p1_port: &<Self::External as Node>::Port,
150        _p2: &Self::Process,
151        _p2_port: &<Self::Process as Node>::Port,
152        _many: bool,
153        _server_hint: NetworkHint,
154    ) -> Box<dyn FnOnce()> {
155        panic!("Maelstrom deployment does not support processes, only clusters")
156    }
157
158    fn o2e_sink(
159        _p1: &Self::Process,
160        _p1_port: &<Self::Process as Node>::Port,
161        _p2: &Self::External,
162        _p2_port: &<Self::External as Node>::Port,
163        _shared_handle: String,
164    ) -> syn::Expr {
165        panic!("Maelstrom deployment does not support processes, only clusters")
166    }
167
168    fn cluster_ids(
169        _of_cluster: LocationKey,
170    ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a {
171        cluster_members(RuntimeData::new("__hydro_lang_maelstrom_meta"), _of_cluster)
172    }
173
174    fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a {
175        cluster_self_id(RuntimeData::new("__hydro_lang_maelstrom_meta"))
176    }
177
178    fn cluster_membership_stream(
179        location_id: &LocationId,
180    ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>
181    {
182        cluster_membership_stream(location_id)
183    }
184}
185
186/// A dummy process type for Maelstrom (processes are not supported).
187#[derive(Clone)]
188pub struct MaelstromProcess {
189    _private: (),
190}
191
192impl Node for MaelstromProcess {
193    type Port = String;
194    type Meta = ();
195    type InstantiateEnv = MaelstromDeployment;
196
197    fn next_port(&self) -> Self::Port {
198        panic!("Maelstrom deployment does not support processes")
199    }
200
201    fn update_meta(&self, _meta: &Self::Meta) {}
202
203    fn instantiate(
204        &self,
205        _env: &mut Self::InstantiateEnv,
206        _meta: &mut Self::Meta,
207        _graph: DfirGraph,
208        _extra_stmts: &[syn::Stmt],
209        _sidecars: &[syn::Expr],
210    ) {
211        panic!("Maelstrom deployment does not support processes")
212    }
213}
214
215/// Represents a cluster in Maelstrom deployment.
216#[derive(Clone)]
217pub struct MaelstromCluster {
218    next_port: Rc<RefCell<usize>>,
219    name_hint: Option<String>,
220}
221
222impl Node for MaelstromCluster {
223    type Port = String;
224    type Meta = ();
225    type InstantiateEnv = MaelstromDeployment;
226
227    fn next_port(&self) -> Self::Port {
228        let next_port = *self.next_port.borrow();
229        *self.next_port.borrow_mut() += 1;
230        format!("port_{}", next_port)
231    }
232
233    fn update_meta(&self, _meta: &Self::Meta) {}
234
235    fn instantiate(
236        &self,
237        env: &mut Self::InstantiateEnv,
238        _meta: &mut Self::Meta,
239        graph: DfirGraph,
240        extra_stmts: &[syn::Stmt],
241        sidecars: &[syn::Expr],
242    ) {
243        let (bin_name, config) = create_graph_trybuild(
244            graph,
245            extra_stmts,
246            sidecars,
247            self.name_hint.as_deref(),
248            crate::compile::trybuild::generate::DeployMode::Maelstrom,
249            LinkingMode::Static,
250        );
251
252        env.bin_name = Some(bin_name);
253        env.project_dir = Some(config.project_dir);
254        env.target_dir = Some(config.target_dir);
255        env.features = config.features;
256    }
257}
258
259/// Represents an external client in Maelstrom deployment.
260#[derive(Clone)]
261pub enum MaelstromExternal {}
262
263impl Node for MaelstromExternal {
264    type Port = String;
265    type Meta = ();
266    type InstantiateEnv = MaelstromDeployment;
267
268    fn next_port(&self) -> Self::Port {
269        unreachable!()
270    }
271
272    fn update_meta(&self, _meta: &Self::Meta) {}
273
274    fn instantiate(
275        &self,
276        _env: &mut Self::InstantiateEnv,
277        _meta: &mut Self::Meta,
278        _graph: DfirGraph,
279        _extra_stmts: &[syn::Stmt],
280        _sidecars: &[syn::Expr],
281    ) {
282        unreachable!()
283    }
284}
285
286impl<'a> RegisterPort<'a, MaelstromDeploy> for MaelstromExternal {
287    fn register(&self, _external_port_id: ExternalPortId, _port: Self::Port) {
288        unreachable!()
289    }
290
291    #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
292    fn as_bytes_bidi(
293        &self,
294        _external_port_id: ExternalPortId,
295    ) -> impl Future<
296        Output = (
297            Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>>,
298            Pin<Box<dyn Sink<Bytes, Error = Error>>>,
299        ),
300    > + 'a {
301        async move { unreachable!() }
302    }
303
304    #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
305    fn as_bincode_bidi<InT, OutT>(
306        &self,
307        _external_port_id: ExternalPortId,
308    ) -> impl Future<
309        Output = (
310            Pin<Box<dyn Stream<Item = OutT>>>,
311            Pin<Box<dyn Sink<InT, Error = Error>>>,
312        ),
313    > + 'a
314    where
315        InT: Serialize + 'static,
316        OutT: DeserializeOwned + 'static,
317    {
318        async move { unreachable!() }
319    }
320
321    #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
322    fn as_bincode_sink<T: Serialize + 'static>(
323        &self,
324        _external_port_id: ExternalPortId,
325    ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a {
326        async move { unreachable!() }
327    }
328
329    #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
330    fn as_bincode_source<T: DeserializeOwned + 'static>(
331        &self,
332        _external_port_id: ExternalPortId,
333    ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a {
334        async move { unreachable!() }
335    }
336}
337
338/// Specification for building a Maelstrom cluster.
339#[derive(Clone)]
340pub struct MaelstromClusterSpec;
341
342impl<'a> ClusterSpec<'a, MaelstromDeploy> for MaelstromClusterSpec {
343    fn build(self, key: LocationKey, name_hint: &str) -> MaelstromCluster {
344        assert_eq!(
345            key,
346            LocationKey::FIRST,
347            "there should only be one location for a Maelstrom deployment"
348        );
349        MaelstromCluster {
350            next_port: Rc::new(RefCell::new(0)),
351            name_hint: Some(name_hint.to_owned()),
352        }
353    }
354}
355
356/// The Maelstrom deployment environment.
357///
358/// This holds configuration for the Maelstrom run and accumulates
359/// compilation artifacts during deployment.
360pub struct MaelstromDeployment {
361    /// Number of nodes in the cluster.
362    pub node_count: usize,
363    /// Path to the maelstrom binary.
364    pub maelstrom_path: PathBuf,
365    /// Workload to run (e.g., "echo", "broadcast", "g-counter").
366    pub workload: String,
367    /// Time limit in seconds.
368    pub time_limit: Option<u64>,
369    /// Rate of requests per second.
370    pub rate: Option<u64>,
371    /// The availability of nodes.
372    pub availability: Option<String>,
373    /// Nemesis to run during tests.
374    pub nemesis: Option<String>,
375    /// Additional maelstrom arguments.
376    pub extra_args: Vec<String>,
377
378    // Populated during deployment
379    pub(crate) bin_name: Option<String>,
380    pub(crate) project_dir: Option<PathBuf>,
381    pub(crate) target_dir: Option<PathBuf>,
382    pub(crate) features: Option<Vec<String>>,
383}
384
385impl MaelstromDeployment {
386    /// Create a new Maelstrom deployment with the given node count.
387    pub fn new(workload: impl Into<String>) -> Self {
388        Self {
389            node_count: 1,
390            maelstrom_path: PathBuf::from("maelstrom"),
391            workload: workload.into(),
392            time_limit: None,
393            rate: None,
394            availability: None,
395            nemesis: None,
396            extra_args: vec![],
397            bin_name: None,
398            project_dir: None,
399            target_dir: None,
400            features: None,
401        }
402    }
403
404    /// Set the node count.
405    pub fn node_count(mut self, count: usize) -> Self {
406        self.node_count = count;
407        self
408    }
409
410    /// Set the path to the maelstrom binary.
411    pub fn maelstrom_path(mut self, path: impl Into<PathBuf>) -> Self {
412        self.maelstrom_path = path.into();
413        self
414    }
415
416    /// Set the time limit in seconds.
417    pub fn time_limit(mut self, seconds: u64) -> Self {
418        self.time_limit = Some(seconds);
419        self
420    }
421
422    /// Set the request rate per second.
423    pub fn rate(mut self, rate: u64) -> Self {
424        self.rate = Some(rate);
425        self
426    }
427
428    /// Set the availability for the test.
429    pub fn availability(mut self, availability: impl Into<String>) -> Self {
430        self.availability = Some(availability.into());
431        self
432    }
433
434    /// Set the nemesis for the test.
435    pub fn nemesis(mut self, nemesis: impl Into<String>) -> Self {
436        self.nemesis = Some(nemesis.into());
437        self
438    }
439
440    /// Add extra arguments to pass to maelstrom.
441    pub fn extra_args(mut self, args: impl IntoIterator<Item = impl Into<String>>) -> Self {
442        self.extra_args.extend(args.into_iter().map(Into::into));
443        self
444    }
445
446    /// Build the compiled binary in dev mode.
447    /// Returns the path to the compiled binary.
448    pub fn build(&self) -> Result<PathBuf, Error> {
449        let bin_name = self
450            .bin_name
451            .as_ref()
452            .expect("No binary name set - did you call deploy?");
453        let project_dir = self.project_dir.as_ref().expect("No project dir set");
454        let target_dir = self.target_dir.as_ref().expect("No target dir set");
455
456        let mut cmd = std::process::Command::new("cargo");
457        cmd.arg("build")
458            .arg("--example")
459            .arg(bin_name)
460            .arg("--no-default-features")
461            .current_dir(project_dir)
462            .env("CARGO_TARGET_DIR", target_dir)
463            .env("STAGELEFT_TRYBUILD_BUILD_STAGED", "1");
464
465        // Always include maelstrom_runtime feature for runtime support
466        let mut all_features = vec!["hydro___feature_maelstrom_runtime".to_owned()];
467        if let Some(features) = &self.features {
468            all_features.extend(features.iter().cloned());
469        }
470        if !all_features.is_empty() {
471            cmd.arg("--features").arg(all_features.join(","));
472        }
473
474        let status = cmd.status()?;
475        if !status.success() {
476            return Err(Error::other(format!(
477                "cargo build failed with status: {}",
478                status
479            )));
480        }
481
482        Ok(target_dir.join("debug").join("examples").join(bin_name))
483    }
484
485    /// Run Maelstrom with the compiled binary, return Ok(()) if all checks pass.
486    ///
487    /// This will block until Maelstrom completes.
488    pub fn run(self) -> Result<(), Error> {
489        let binary_path = self.build()?;
490
491        let mut cmd = std::process::Command::new(&self.maelstrom_path);
492        cmd.arg("test")
493            .arg("-w")
494            .arg(&self.workload)
495            .arg("--bin")
496            .arg(&binary_path)
497            .arg("--node-count")
498            .arg(self.node_count.to_string())
499            .stdout(Stdio::piped());
500
501        if let Some(time_limit) = self.time_limit {
502            cmd.arg("--time-limit").arg(time_limit.to_string());
503        }
504
505        if let Some(rate) = self.rate {
506            cmd.arg("--rate").arg(rate.to_string());
507        }
508
509        if let Some(availability) = self.availability {
510            cmd.arg("--availability").arg(availability);
511        }
512
513        if let Some(nemesis) = self.nemesis {
514            cmd.arg("--nemesis").arg(nemesis);
515        }
516
517        for arg in &self.extra_args {
518            cmd.arg(arg);
519        }
520
521        let spawned = cmd.spawn()?;
522
523        for line in BufReader::new(spawned.stdout.unwrap()).lines() {
524            let line = line?;
525            eprintln!("{}", &line);
526
527            if line.starts_with("Analysis invalid!") {
528                return Err(Error::other("Analysis was invalid"));
529            } else if line.starts_with("Errors occurred during analysis, but no anomalies found.")
530                || line.starts_with("Everything looks good!")
531            {
532                return Ok(());
533            }
534        }
535
536        Err(Error::other("Maelstrom produced an unexpected result"))
537    }
538
539    /// Get the path to the compiled binary (after building).
540    pub fn binary_path(&self) -> Option<PathBuf> {
541        let bin_name = self.bin_name.as_ref()?;
542        let target_dir = self.target_dir.as_ref()?;
543        Some(target_dir.join("debug").join("examples").join(bin_name))
544    }
545}