Question

I want implement a Multiplexer/Demultiplexer in rust. It should send the data of several 'upstream' DuplexStreams via one single 'downstream' DuplexStream by simply prepending a port_num identifier of the upstream DuplexStream to the packet. Of course this should also work the other way round: reading the port_num from a packet received from downstream and sent it to the correct upstream Stream.

I started to implement such a MultiplexStream (below code will not compile). However, I'm facing a problem: The open_ports variable that maps port_num to the corresponding upstream DuplexStream must be accessible to several tasks which is not allowed in Rust.

What design pattern could be applied here to resolve my issue?

impl MultiplexStream<T,U> {
    fn new(downstream: DuplexStream<(u32,T), U>) -> MultiplexStream<T,U> {
        let mut open_ports = HashMap::<u32, DuplexStream<(u32,T), U>>new();

        spawn do {
            let res = try {
                loop {
                    let (port_num, data) = downstream.recv();

                    match open_ports.find(port_num) {
                        Some(intermediate) => {
                            let res = try {
                                intermediate.send(data)
                            }

                            if res.is_err() {
                                open_ports.remove(port_num);
                            }
                        }
                        None => {}
                    }
                }
            }

            // downstream was closed => cleanup
            for intermediate in open_ports.values() {
                intermediate.close();
            }
            open_ports.clear();
        }

    }

    fn open<V: Send>(port_num: u32) -> Result(DuplexStream<V,T>, ()) {
        if open_ports.contains_key(port_num) {
            return Err(());
        }

        let (upstream, intermediate) = DuplexStream<V,T>::new();
        open_ports.insert(port_num, intermediate);

        spawn do {
            let res = try {
                loop {
                    let data = intermediate.recv();
                    downstream.send(~(port_num, data));
                }
            }

            // upstream was closed => cleanup
            intermediate.close();
            open_ports.remove(port_num);
        }

        return Ok(upstream);
    }
}
Was it helpful?

Solution

In rust sharing data is done via Arc (from libsync). Basic Arc is for sharing immutable data, for mutable there are MutexArc and RWArc. Sharing with Arc is copy free.

I put together a small example:

extern mod sync;

use std::hashmap::HashMap;

fn main() {
    let arc = sync::RWArc::new(HashMap::<~str, int>::new());
    arc.write(|m| m.insert(~"a", 0));
    for num in range(1, 10) {
        let arc = arc.clone();
        spawn(proc() {
            println!("[task {}] Value before is: {}", num, arc.read(|m| m.get(&~"a").clone()));
            arc.write(|m| { m.insert_or_update_with(~"a", 0, |_, val| *val += 1); });
            println!("[task {}] Value after is: {}", num, arc.read(|m| m.get(&~"a").clone()));
        });
    }
}

For the latest version of rust (0.10pre) use

extern crate collections;
extern crate sync;
use collections::hashmap::HashMap;
use sync::RWArc;

fn main() {
    let arc = RWArc::new(HashMap::<~str, int>::new());
    arc.write(|m| m.insert(~"a", 0));
    for num in range(1, 10) {
        let arc = arc.clone();
        spawn(proc() {
            println!("[task {}] Value before is: {}", num, arc.read(|m| m.get(&~"a").clone()));
            arc.write(|m| { m.insert_or_update_with(~"a", 0, |_, val| *val += 1); });
            println!("[task {}] Value after is: {}", num, arc.read(|m| m.get(&~"a").clone()));
        });
    }
}
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top