Question

I've been trying to come up with a solution to create a queue in RAC but failed to figure out a viable construct so far. I need simple queue functionality. There is a producer that provides input regardless of readers and the queue stores all that input as long as there are no consumers. The queue should keep all messages that are sent while no subscribers are available and send those out one-by-one when a subscription happens this way emptying the queue.

A theoretical way I could solve this is by using an intermediate storage, a mutable array, where the input is stored and where from the output is produced. But this is far from being an idiomatic solution.

Maybe something could be set up using replay:. This would require activating the replay functionality only when there are no subscribers.

So the question is: is there a way to implement a queue with RAC idioms?

Was it helpful?

Solution

There are problem solutions for which ReactiveCocoa is not directly well-suited. I say "directly", because I've found that there is often plenty of opportunity to use RAC near the solution (often by wrapping it), even if it doesn't make sense to use RAC within the solution. Your situation is somewhat at odds with RAC best practices in that it involves delivering values that come from shared mutable state.

RAC works best when you can think of your problem in terms of "cold signals". A cold signal is one that results in the generation of values that get sent uniquely to that subscriber. This is in contrast to a "hot signal". A hot signal is one in which subscription does not actually cause the values to be generated; it is presumed that the values are generated regardless of whether or not there are subscribers, and the values are delivered to all subscribers. One way in which hot signals introduce problems is that they can expose race conditions and questions about the timing in which a single value is delivered to different subscribers. Even with only one subscriber, much of the flexibility of signals is removed when the code responsible for the sending of values resides outside of the jurisdiction of the signal subscription.

For example, many signal operations will have some effect on the timing of value delivery. If the values sent to a subscriber are unique to that subscriber (as is the case with a cold signal), then a race condition is impossible, since there's only one contestant in the race (the subscriber). An example of how this is useful is in the use of RACSchedulers: RAC makes it easy to work with multiple threads without writing explicit synchronization, because the values are only ever visible to one subscriber.

On the other hand, if multiple subscribers receive shared "global" values (because those values are not unique to each subscriber, but rather come from some shared source), it can be hard to reason about or synchronize the delivery of those values to subscribers. The events happen at time "A", but one subscriber sees them at time "B" and another subscriber sees them at time "C". This can happen even if both subscribers are using the same signal operations. While this may be acceptable in some cases, you would be surprised at how easily this can cause problems in the correctness of (or in reasoning about) your code.

The queue that you describe is an instance of the latter. If you actually built it entirely with RAC, you would end up with some form of hot signal, which you generally want to avoid. This doesn't mean you can't use ReactiveCocoa. It means if you want to use ReactiveCocoa, you should try to find the cold signal in this problem. There are many examples in the RAC framework itself that work with shared mutable state; an example of this is RACObserve(): Any code, anywhere, could call -setFoo: on the observed object, and the signal returned from RACObserve() will send that value. That's shared mutable state. But if you look carefully at how RACObserve() actually works, you'll see that it creates a cold signal, because each subscription has a unique effect: it adds a new observer to the observed object at that key path. It's true that the values delivered to one subscriber may be the same as the values delivered to another subscriber in this case, but it's understood that the subscriptions are separate, and have added unique key/value observers to the object being observed.

You can do the same thing to implement your queue. Consider designing your queue without any RAC concepts "built-in", and then use RAC as needed make it easier to use the queue by adding and removing consumers when subscriptions are created and disposed. For example:

@interface MyQueue
+ (instancetype)queueWithProducer:(id<MyProducer>)producer;
- (void)addConsumer:(id<MyQueueConsumer>)queueConsumer;
- (void)removeConsumer:(id<MyQueueConsumer>)queueConsumer;
@end

(Presumably, you will have some logic to distribute values in the queue to multiple consumers, but that's getting beyond the subject of this question.)

Once you have a working queue that behaves according to your specification, you can think of ways to use ReactiveCocoa to make working with it easier. Following the precedent of RACObserve(), you might add a category method that returns a signal, which adds a consumer for each subscriber to that signal (you could also just add it directly to the MyQueue interface instead of using categories):

@interface MyQueue (SignalSupport)
- (RACSignal *)consumerSignal;
+ (id<MyQueueConsumer>)consumerForSubscriber:(id<RACSubscriber>)subscriber;
@end

@implementation MyQueue (SignalSupport)
- (RACSignal *)consumerSignal
{
    @weakify(self);
    return [RACSignal createSignal:^RACDisposable * (id<RACSubscriber> subscriber) {
        @strongify(self);
        id<MyQueueConsumer> consumer = [MyQueue consumerForSubscriber:subscriber];
        [self addConsumer:consumer];

        return [RACDisposable disposableWithBlock:^() {
            @strongify(self);
            [self removeConsumer:consumer];
        }];
    }];
}

+ (id<MyQueueConsumer>)consumerForSubscriber:(id<RACSubscriber>)subscriber
{
    // Assuming you had already created a `MyBlockConsumerImpl` class:
    MyBlockConsumerImpl *impl = [[MyBlockConsumerImpl alloc] initWithConsumeBlock:^(id value) {
        [subscriber sendNext:value];
    }];
    return impl;
}
@end

You can see that ReactiveCocoa isn't actually used to implement the queue itself, but queue consumers (via signal subscriptions) still benefit from signal composition.

I've been down the path of trying to build a complex and stateful system entirely from signal operations. It can be done, but you can end up with code that is difficult to understand and maintain. You will likely end up someday replacing it with a straightforward implementation, and finding an appropriate way to complement it with ReactiveCocoa. It's always better to use ReactiveCocoa to find ways to eliminate state entirely, but there are times when you won't be able to avoid it, because this is Objective-C, and Cocoa is a stateful environment. In those cases, you can usually still find ways to benefit from ReactiveCocoa, but don't get carried away.

OTHER TIPS

RACCommand seems to match your requirements

RACCommand *command = [[RACCommand alloc] initWithSignalBlock:^RACSignal *(id input) {
    return [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
        NSString *value = [NSString stringWithFormat:@"value:%@", input];
        [subscriber sendNext:value];
        return nil;
    }];
}];

command.allowsConcurrentExecution = YES;

[[command.executionSignals switchToLatest] subscribeNext:^(id x) {
    NSLog(@"got %@", x);
}];

for (int i = 0; i < 10; i++) {
    [command execute:@(i)];
}

---- update ----

use replayLazily

__block id<RACSubscriber> producer;

RACSignal *signal = [[RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
    producer = subscriber;
    return nil;
}] replayLazily];

// to trigger signal creation manually
[signal subscribeNext:^(id x) {}];

[producer sendNext:@"foo"];
[producer sendNext:@"bar"];

[signal subscribeNext:^(id x) {
    NSLog(@"x:%@", x);
}];

@limboy gives me the direction of serializing RACSignals by RACCommand.

But RACCommand doesn't really provide any serializations, so I made RACSerialCommand to serialize the command execution.

Feel free to try it.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top