Clusters
A key restriction of processes in Hydroflow+ is that there can only be one instance of the computation assigned to each process across the entire distributed system. This is fine for simple applications with only pipelined computation, but for scaling out we need the ability to have multiple instances of the same computation running in parallel.
Clusters solve this by providing an nearly-identical API to processes, but representing a set of instances running the same computation instead of a single one. What changes when using a cluster is sending data, since we need to specify which instance(s) of the computation to send the data to.
Computing on Clusters
Instantiating clusters is done using the cluster
method on FlowBuilder
, taking a ClusterSpec
:
pub fn my_flow<'a, D: Deploy<'a>>(
flow: &FlowBuilder<'a, D>,
cluster_spec: &impl ClusterSpec<'a, D>
) {
let cluster = flow.cluster(cluster_spec);
}
This API follows the same pattern as processes, where a cluster spec represents a template for a cluster, which can be instantiated multiple times to create multiple clusters.
Instantiating streams on clusters uses the same APIs as streams: source_iter
and source_stream
are both available. But when using these APIs, the root streams will be instantiated on all instances in the cluster.
let stream = flow.source_iter(&cluster, q!(vec![1, 2, 3]));
stream.for_each(q!(|x| println!("{}", x)))
// will print 1, 2, 3 on **each** instance
Sending Data
Because clusters represent a set of instances, adding networking requires us to specify which instance(s) to send data to. Clusters provide different types depending on if the source or receiver is a cluster or a process.
Elements in a cluster are identified by a cluster ID (a u32
). To get the IDs of all instances in a cluster, use the ids
method on cluster, which returns a runtime expression of type &Vec<u32>
(which can only be used inside q!()
or as an argument to source_iter
). All IDs always are ranging from 0 through the length of the IDs vector.
This can then be passed into source_iter
to load the IDs into the graph.
let stream = flow.source_iter(&process, flow.cluster_members(&cluster)).cloned();
One-to-Many
When sending data from a process to a cluster, the source must be a stream of tuples of the form (u32, T)
and sends each T
element to the instance with the matching u32
ID.
This is useful for partitioning data across instances. For example, we can partition a stream of elements in a round-robin fashion by using enumerate
to add a sequence number to each element, then using send_bincode
to send each element to the instance with the matching sequence number:
let cluster_ids = flow.cluster_members(&cluster);
let stream = flow.source_iter(&process, q!(vec![123, 456, 789]))
.enumerate()
.map(q!(|(i, x)| (
i % cluster_ids.len() as u32,
x
)))
.send_bincode(cluster);
To broadcast data to all instances in a cluster, use broadcast_{bincode,bytes}
, which acts as a shortcut for the cross product.
let stream = flow.source_iter(&process, q!(vec![123, 456, 789]))
.broadcast_bincode(cluster);
Many-to-One
In the other direction, sending data from a cluster to a process, we have a stream of elements of type T
at the sender but on the recipient side we get a stream of tuples of the form (u32, T)
, where the u32
is the ID of the instance that sent the element. The elements received from different instances will be interleaved.
This is useful for aggregating data from multiple instances into a single stream. For example, we can use send_bincode
to send data from all instances to a single process, and then print them all out:
let stream = flow.source_iter(&cluster, q!(vec![123, 456, 789]))
.send_bincode(process)
.for_each(q!(|(id, x)| println!("{}: {}", id, x)));
If you don't care which instance sent the data, you can use send_{bincode,bytes}_interleaved
, where the recipient receives a stream of T
elements, but the elements received from different instances will be interleaved.
let stream = flow.source_iter(&cluster, q!(vec![123, 456, 789]))
.send_bincode_interleaved(process)
.for_each(q!(|x| println!("{}", x)));
Many-to-Many
Finally, when sending data from one cluster to another (or to itself as in distributed protocols), the source emits tuples of the form (u32, T)
and sends each T
element to the instance with the matching u32
ID, but the recipient also gets (u32, T)
tuples with the ID of the sender.
We can use the same shortcuts as before. For example, we can use broadcast_bincode_interleaved
to send data from all instances in a cluster to all instances in another cluster, and then print them all out:
let stream = flow.source_iter(&cluster1, q!(vec![123, 456, 789]))
.broadcast_bincode_interleaved(cluster2)
.for_each(q!(|x| println!("{}", x)));