hydro_lang/deploy/maelstrom/
mod.rs1use serde::Serialize;
4use serde::de::DeserializeOwned;
5
6use crate::forward_handle::ForwardHandle;
7use crate::live_collections::KeyedStream;
8use crate::live_collections::stream::TotalOrder;
9use crate::location::{Cluster, NoTick};
10use crate::nondet::nondet;
11
12#[cfg(stageleft_runtime)]
13#[cfg(feature = "maelstrom")]
14#[cfg_attr(docsrs, doc(cfg(feature = "maelstrom")))]
15pub mod deploy_maelstrom;
16
17pub mod deploy_runtime_maelstrom;
18
19#[expect(clippy::type_complexity, reason = "stream markers")]
37pub fn maelstrom_bidi_clients<'a, C, In: DeserializeOwned, Out: Serialize>(
38 cluster: &Cluster<'a, C>,
39) -> (
40 KeyedStream<String, In, Cluster<'a, C>>,
41 ForwardHandle<'a, KeyedStream<String, Out, Cluster<'a, C>>>,
42)
43where
44 Cluster<'a, C>: NoTick,
45{
46 use stageleft::q;
47
48 use crate::location::Location;
49
50 let meta: stageleft::RuntimeData<&deploy_runtime_maelstrom::MaelstromMeta> =
51 stageleft::RuntimeData::new("__hydro_lang_maelstrom_meta");
52
53 let input: KeyedStream<String, In, Cluster<'a, C>> = cluster
55 .source_stream(q!(deploy_runtime_maelstrom::maelstrom_client_source(meta)))
56 .into_keyed()
57 .map(q!(|b| serde_json::from_value(b).unwrap()));
58
59 let (fwd_handle, output_stream) =
61 cluster.forward_ref::<KeyedStream<String, Out, Cluster<'a, C>>>();
62
63 output_stream
65 .entries()
66 .assume_ordering::<TotalOrder>(nondet!())
67 .for_each(q!(|(client_id, body)| {
68 deploy_runtime_maelstrom::maelstrom_send_response(
69 &meta.node_id,
70 &client_id,
71 serde_json::to_value(body).unwrap(),
72 );
73 }));
74
75 (input, fwd_handle)
76}