Question

I'm trying to understand how the following zip function (esp. the invoke function) can be made more functional. The issue I've got is that the invoke method has to wait for both the left and right side to be filled before it can dispatch the values. The values have to be called in order so that the correct values are zipped, otherwise I would consider a curry/partial function to fulfill this.

Is there anything that I could use that could remove this hinderance.

function zip(state, a, b) {
    var left = [];
    var right = [];

    function invoke() {
        if (left.length > 0 && right.length > 0) {
            state([left.shift(), right.shift()]);
        }
    }

    a.foreach(function(v) {
        left.push(v);
        invoke();
    });

    b.foreach(function(v) {
        right.push(v);
        invoke();
    });
}

Bellow is a simple example of what satisfies the zip function.

function Stream() {
    var env = this;
    env.subs = [];
    env.id = setInterval(function() {
        env.subs.forEach(function(f) {
            f(Math.random()); 
        });
    }, ((Math.random() * 100) + 500) | 0);
}
Stream.prototype.foreach = function(f) {
    this.subs.push(f);
}
zip(function(v) {
    console.log(v);
}, new Stream(), new Stream());

Bonus: Removing mutable array.

Was it helpful?

Solution

A more functional approach would be possible if the Stream had some kind of iterator interface, that divides the list in the first element and its successors (like Haskell lists are built, you seem to know them).

I know this code is more complex (and at least longer) at first, but using the structures gets more convenient:

function Promise(resolver) {
    // you know better promise libs of course
    // this one is not even monadic
    var subs = [],
        res = null;
    resolver(function resolve() {
        res = arguments;
        while (subs.length) subs.shift().apply(null, res);
    });
    this.onData = function(f) {
        if (res)
            f.apply(null, res);
        else
            subs.push(f);
        return this;
    };
}
Promise.all = function() {
    var ps = Array.prototype.concat.apply([], arguments);
    return new Promise(function(resolve) {
        var res = [],
            l = ps.length;
        ps.forEach(function(p, i) {
            p.onData(function() {
                while(res.length < arguments.length) res.push([]);
                for (var j=0; j<arguments.length; j++)
                    res[j][i] = arguments[j];
                if (--l == 0)
                    resolve.apply(null, res);
            });
        });
    });
};
function Stream() {
    // an asynchronous (random) list
    var that = this,
        interval = (Math.random() * 100 + 500) | 0;
    this.first = new Promise(function create(resolve) {
        that.id = setTimeout(function() {
            resolve(Math.random(), new Promise(create));
        }, interval);
    });
}
// this is how to consume a stream:
Stream.prototype.forEach = function(f) {
    this.first.onData(function fire(res, next) {
        f(res);
        next.onData(fire);
    });
    return this;
};
Stream.prototype.end = function() { clearTimeout(this.id); return this; };

But zipping them is easy now:

function zip() {
    var res = Object.create(Stream.prototype); // inherit the Stream interface
    res.first = (function create(firsts) {
        return new Promise(function(resolve) {
            Promise.all(firsts).onData(function(results, nexts) {
                resolve(results, create(nexts));
            });
        });
    })(Array.prototype.map.call(arguments, function(stream) {
        return stream.first;
    }));
    return res;
}
zip(new Stream, new Stream).forEach(console.log.bind(console));

Basically I've generalized your waiting for the first items into the Promise pattern, where Promise.all features parallel waiting, and your mutable arrays of results into nested lists of promises. And I've avoided code duplication (for left and right) by making all functions work with any number of arguments.

OTHER TIPS

I don't see why do you want to make your code more functional. Nevertheless I did improve upon your zip function by removing the invoke function entirely. You really don't need it:

function zip(a, b, callback) {
    var left = [], right = [];

    a.forEach(function (value) {
        if (right.length)
            callback([value, right.shift()]);
        else left.push(value);
    });

    b.forEach(function (value) {
        if (left.length)
            callback([left.shift(), value]);
        else right.push(value);
    });
}

See the output for yourself: http://jsfiddle.net/Tw6K2/

The code is more imperative than functional. However I doubt it gets any better than this.

After cogitating on your problem a little more I believe I found a more general solution. Let's start with the EventStream constructor (which is more general than your Stream constructor):

function EventStream() {
    this.listeners = [];
}

Then we create a dispatch method to add events to the stream:

EventStream.prototype.dispatch = function (event) {
    return this.listeners.map(function (listener) {
        return listener(event);
    });
};

Next we'll create a map method which is again more general than your foreach method:

EventStream.prototype.map = function (f) {
    var stream = new EventStream;

    this.listeners.push(function (x) {
        return stream.dispatch(f(x));
    });

    return stream;
};

Now when you map a function over an event stream you get an entirely new event stream. For example if your stream is [0,1,3,5..] and you map (+2) over it then the new stream would be [2,3,5,7..].

We'll also create a few more beneficial utility methods like filter, scan and merge as follows:

EventStream.prototype.filter = function (f) {
    var stream = new EventStream;

    this.listeners.push(function (x) {
        if (f(x)) return stream.dispatch(x);
    });

    return stream;
};

The filter method filter out certain events in an event stream to create an entirely new event stream. For example given [2,3,5,7..] and the function odd the filtered event stream would be [3,5,7..].

EventStream.prototype.scan = function (a, f) {
    var stream = new EventStream;

    setTimeout(function () {
        stream.dispatch(a);
    });

    this.listeners.push(function (x) {
        return stream.dispatch(a = f(a, x));
    });

    return stream;
};

The scan method is used to cumulatively create a new event stream. For example given the stream [3,5,7..], the initial value 0 and the scanning function (+) the new event stream would be [0,3,8,15..].

EventStream.prototype.merge = function (that) {
    var stream = new EventStream;

    this.listeners.push(function (x) {
        return stream.dispatch(new Left(x));
    });

    this.listeners.push(function (y) {
        return stream.dispatch(new Right(x));
    });

    return stream;
};

function Left(x) {
    this.left = x;
}

function Right(x) {
    this.right = x;
}

The merge method combines two separate event streams into one. To differentiate which stream generated each event we tag all the events as either left or right.


Alright, now onto bigger problems. Let's create a zip method. The really cool thing is that we can create zip using the map, filter, scan and merge methods as follows:

EventStream.prototype.zip = function (that) {
    return this.merge(that).scan([[], [], null], function (acc, event) {
        var left = acc[0], right = acc[1];

        if (event instanceof Left) {
            var value = event.left;

            return right.length ?
                [left, right.slice(1), new Just([value, right[0]])] :
                [left.concat(value), right, null];
        } else {
            var value = event.right;

            return left.length ?
                [left.slice(1), right, new Just([left[0], value])] :
                [tuple(left, right.concat(value), null];
        }
    })

    .filter(function (a) {
        return a[2] instanceof Just;
    })

    .map(function (a) {
        return a[2].just;
    });
};

function Just(x) {
    this.just = x;
}

Now you can use it as follows:

stream1.zip(stream2).map(function (v) {
    console.log(v);
});

You can define stream1 and stream2 as follows:

var stream1 = getRandomStream();
var stream2 = getRandomStream();

function getRandomStream() {
    var stream = new EventStream;

    setInterval(function () {
        stream.dispatch(Math.random());
    }, ((Math.random() * 100) + 500) | 0);

    return stream;
}

That's all there is to it. No need for promises.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top