RACStream.m 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356
  1. //
  2. // RACStream.m
  3. // ReactiveObjC
  4. //
  5. // Created by Justin Spahr-Summers on 2012-10-31.
  6. // Copyright (c) 2012 GitHub, Inc. All rights reserved.
  7. //
  8. #import "RACStream.h"
  9. #import "NSObject+RACDescription.h"
  10. #import "RACBlockTrampoline.h"
  11. #import "RACTuple.h"
  12. @implementation RACStream
  13. #pragma mark Lifecycle
  14. - (instancetype)init {
  15. self = [super init];
  16. self.name = @"";
  17. return self;
  18. }
  19. #pragma mark Abstract methods
  20. + (__kindof RACStream *)empty {
  21. NSString *reason = [NSString stringWithFormat:@"%@ must be overridden by subclasses", NSStringFromSelector(_cmd)];
  22. @throw [NSException exceptionWithName:NSInternalInconsistencyException reason:reason userInfo:nil];
  23. }
  24. - (__kindof RACStream *)bind:(RACStreamBindBlock (^)(void))block {
  25. NSString *reason = [NSString stringWithFormat:@"%@ must be overridden by subclasses", NSStringFromSelector(_cmd)];
  26. @throw [NSException exceptionWithName:NSInternalInconsistencyException reason:reason userInfo:nil];
  27. }
  28. + (__kindof RACStream *)return:(id)value {
  29. NSString *reason = [NSString stringWithFormat:@"%@ must be overridden by subclasses", NSStringFromSelector(_cmd)];
  30. @throw [NSException exceptionWithName:NSInternalInconsistencyException reason:reason userInfo:nil];
  31. }
  32. - (__kindof RACStream *)concat:(RACStream *)stream {
  33. NSString *reason = [NSString stringWithFormat:@"%@ must be overridden by subclasses", NSStringFromSelector(_cmd)];
  34. @throw [NSException exceptionWithName:NSInternalInconsistencyException reason:reason userInfo:nil];
  35. }
  36. - (__kindof RACStream *)zipWith:(RACStream *)stream {
  37. NSString *reason = [NSString stringWithFormat:@"%@ must be overridden by subclasses", NSStringFromSelector(_cmd)];
  38. @throw [NSException exceptionWithName:NSInternalInconsistencyException reason:reason userInfo:nil];
  39. }
  40. #pragma mark Naming
  41. - (instancetype)setNameWithFormat:(NSString *)format, ... {
  42. if (getenv("RAC_DEBUG_SIGNAL_NAMES") == NULL) return self;
  43. NSCParameterAssert(format != nil);
  44. va_list args;
  45. va_start(args, format);
  46. NSString *str = [[NSString alloc] initWithFormat:format arguments:args];
  47. va_end(args);
  48. self.name = str;
  49. return self;
  50. }
  51. @end
  52. @implementation RACStream (Operations)
  53. - (__kindof RACStream *)flattenMap:(__kindof RACStream * (^)(id value))block {
  54. Class class = self.class;
  55. return [[self bind:^{
  56. return ^(id value, BOOL *stop) {
  57. id stream = block(value) ?: [class empty];
  58. NSCAssert([stream isKindOfClass:RACStream.class], @"Value returned from -flattenMap: is not a stream: %@", stream);
  59. return stream;
  60. };
  61. }] setNameWithFormat:@"[%@] -flattenMap:", self.name];
  62. }
  63. - (__kindof RACStream *)flatten {
  64. return [[self flattenMap:^(id value) {
  65. return value;
  66. }] setNameWithFormat:@"[%@] -flatten", self.name];
  67. }
  68. - (__kindof RACStream *)map:(id (^)(id value))block {
  69. NSCParameterAssert(block != nil);
  70. Class class = self.class;
  71. return [[self flattenMap:^(id value) {
  72. return [class return:block(value)];
  73. }] setNameWithFormat:@"[%@] -map:", self.name];
  74. }
  75. - (__kindof RACStream *)mapReplace:(id)object {
  76. return [[self map:^(id _) {
  77. return object;
  78. }] setNameWithFormat:@"[%@] -mapReplace: %@", self.name, RACDescription(object)];
  79. }
  80. - (__kindof RACStream *)combinePreviousWithStart:(id)start reduce:(id (^)(id previous, id next))reduceBlock {
  81. NSCParameterAssert(reduceBlock != NULL);
  82. return [[[self
  83. scanWithStart:RACTuplePack(start)
  84. reduce:^(RACTuple *previousTuple, id next) {
  85. id value = reduceBlock(previousTuple[0], next);
  86. return RACTuplePack(next, value);
  87. }]
  88. map:^(RACTuple *tuple) {
  89. return tuple[1];
  90. }]
  91. setNameWithFormat:@"[%@] -combinePreviousWithStart: %@ reduce:", self.name, RACDescription(start)];
  92. }
  93. - (__kindof RACStream *)filter:(BOOL (^)(id value))block {
  94. NSCParameterAssert(block != nil);
  95. Class class = self.class;
  96. return [[self flattenMap:^ id (id value) {
  97. if (block(value)) {
  98. return [class return:value];
  99. } else {
  100. return class.empty;
  101. }
  102. }] setNameWithFormat:@"[%@] -filter:", self.name];
  103. }
  104. - (__kindof RACStream *)ignore:(id)value {
  105. return [[self filter:^ BOOL (id innerValue) {
  106. return innerValue != value && ![innerValue isEqual:value];
  107. }] setNameWithFormat:@"[%@] -ignore: %@", self.name, RACDescription(value)];
  108. }
  109. - (__kindof RACStream *)reduceEach:(RACReduceBlock)reduceBlock {
  110. NSCParameterAssert(reduceBlock != nil);
  111. __weak RACStream *stream __attribute__((unused)) = self;
  112. return [[self map:^(RACTuple *t) {
  113. NSCAssert([t isKindOfClass:RACTuple.class], @"Value from stream %@ is not a tuple: %@", stream, t);
  114. return [RACBlockTrampoline invokeBlock:reduceBlock withArguments:t];
  115. }] setNameWithFormat:@"[%@] -reduceEach:", self.name];
  116. }
  117. - (__kindof RACStream *)startWith:(id)value {
  118. return [[[self.class return:value]
  119. concat:self]
  120. setNameWithFormat:@"[%@] -startWith: %@", self.name, RACDescription(value)];
  121. }
  122. - (__kindof RACStream *)skip:(NSUInteger)skipCount {
  123. Class class = self.class;
  124. return [[self bind:^{
  125. __block NSUInteger skipped = 0;
  126. return ^(id value, BOOL *stop) {
  127. if (skipped >= skipCount) return [class return:value];
  128. skipped++;
  129. return class.empty;
  130. };
  131. }] setNameWithFormat:@"[%@] -skip: %lu", self.name, (unsigned long)skipCount];
  132. }
  133. - (__kindof RACStream *)take:(NSUInteger)count {
  134. Class class = self.class;
  135. if (count == 0) return class.empty;
  136. return [[self bind:^{
  137. __block NSUInteger taken = 0;
  138. return ^ id (id value, BOOL *stop) {
  139. if (taken < count) {
  140. ++taken;
  141. if (taken == count) *stop = YES;
  142. return [class return:value];
  143. } else {
  144. return nil;
  145. }
  146. };
  147. }] setNameWithFormat:@"[%@] -take: %lu", self.name, (unsigned long)count];
  148. }
  149. + (__kindof RACStream *)join:(id<NSFastEnumeration>)streams block:(RACStream * (^)(id, id))block {
  150. RACStream *current = nil;
  151. // Creates streams of successively larger tuples by combining the input
  152. // streams one-by-one.
  153. for (RACStream *stream in streams) {
  154. // For the first stream, just wrap its values in a RACTuple. That way,
  155. // if only one stream is given, the result is still a stream of tuples.
  156. if (current == nil) {
  157. current = [stream map:^(id x) {
  158. return RACTuplePack(x);
  159. }];
  160. continue;
  161. }
  162. current = block(current, stream);
  163. }
  164. if (current == nil) return [self empty];
  165. return [current map:^(RACTuple *xs) {
  166. // Right now, each value is contained in its own tuple, sorta like:
  167. //
  168. // (((1), 2), 3)
  169. //
  170. // We need to unwrap all the layers and create a tuple out of the result.
  171. NSMutableArray *values = [[NSMutableArray alloc] init];
  172. while (xs != nil) {
  173. [values insertObject:xs.last ?: RACTupleNil.tupleNil atIndex:0];
  174. xs = (xs.count > 1 ? xs.first : nil);
  175. }
  176. return [RACTuple tupleWithObjectsFromArray:values];
  177. }];
  178. }
  179. + (__kindof RACStream *)zip:(id<NSFastEnumeration>)streams {
  180. return [[self join:streams block:^(RACStream *left, RACStream *right) {
  181. return [left zipWith:right];
  182. }] setNameWithFormat:@"+zip: %@", streams];
  183. }
  184. + (__kindof RACStream *)zip:(id<NSFastEnumeration>)streams reduce:(RACGenericReduceBlock)reduceBlock {
  185. NSCParameterAssert(reduceBlock != nil);
  186. RACStream *result = [self zip:streams];
  187. // Although we assert this condition above, older versions of this method
  188. // supported this argument being nil. Avoid crashing Release builds of
  189. // apps that depended on that.
  190. if (reduceBlock != nil) result = [result reduceEach:reduceBlock];
  191. return [result setNameWithFormat:@"+zip: %@ reduce:", streams];
  192. }
  193. + (__kindof RACStream *)concat:(id<NSFastEnumeration>)streams {
  194. RACStream *result = self.empty;
  195. for (RACStream *stream in streams) {
  196. result = [result concat:stream];
  197. }
  198. return [result setNameWithFormat:@"+concat: %@", streams];
  199. }
  200. - (__kindof RACStream *)scanWithStart:(id)startingValue reduce:(id (^)(id running, id next))reduceBlock {
  201. NSCParameterAssert(reduceBlock != nil);
  202. return [[self
  203. scanWithStart:startingValue
  204. reduceWithIndex:^(id running, id next, NSUInteger index) {
  205. return reduceBlock(running, next);
  206. }]
  207. setNameWithFormat:@"[%@] -scanWithStart: %@ reduce:", self.name, RACDescription(startingValue)];
  208. }
  209. - (__kindof RACStream *)scanWithStart:(id)startingValue reduceWithIndex:(id (^)(id, id, NSUInteger))reduceBlock {
  210. NSCParameterAssert(reduceBlock != nil);
  211. Class class = self.class;
  212. return [[self bind:^{
  213. __block id running = startingValue;
  214. __block NSUInteger index = 0;
  215. return ^(id value, BOOL *stop) {
  216. running = reduceBlock(running, value, index++);
  217. return [class return:running];
  218. };
  219. }] setNameWithFormat:@"[%@] -scanWithStart: %@ reduceWithIndex:", self.name, RACDescription(startingValue)];
  220. }
  221. - (__kindof RACStream *)takeUntilBlock:(BOOL (^)(id x))predicate {
  222. NSCParameterAssert(predicate != nil);
  223. Class class = self.class;
  224. return [[self bind:^{
  225. return ^ id (id value, BOOL *stop) {
  226. if (predicate(value)) return nil;
  227. return [class return:value];
  228. };
  229. }] setNameWithFormat:@"[%@] -takeUntilBlock:", self.name];
  230. }
  231. - (__kindof RACStream *)takeWhileBlock:(BOOL (^)(id x))predicate {
  232. NSCParameterAssert(predicate != nil);
  233. return [[self takeUntilBlock:^ BOOL (id x) {
  234. return !predicate(x);
  235. }] setNameWithFormat:@"[%@] -takeWhileBlock:", self.name];
  236. }
  237. - (__kindof RACStream *)skipUntilBlock:(BOOL (^)(id x))predicate {
  238. NSCParameterAssert(predicate != nil);
  239. Class class = self.class;
  240. return [[self bind:^{
  241. __block BOOL skipping = YES;
  242. return ^ id (id value, BOOL *stop) {
  243. if (skipping) {
  244. if (predicate(value)) {
  245. skipping = NO;
  246. } else {
  247. return class.empty;
  248. }
  249. }
  250. return [class return:value];
  251. };
  252. }] setNameWithFormat:@"[%@] -skipUntilBlock:", self.name];
  253. }
  254. - (__kindof RACStream *)skipWhileBlock:(BOOL (^)(id x))predicate {
  255. NSCParameterAssert(predicate != nil);
  256. return [[self skipUntilBlock:^ BOOL (id x) {
  257. return !predicate(x);
  258. }] setNameWithFormat:@"[%@] -skipWhileBlock:", self.name];
  259. }
  260. - (__kindof RACStream *)distinctUntilChanged {
  261. Class class = self.class;
  262. return [[self bind:^{
  263. __block id lastValue = nil;
  264. __block BOOL initial = YES;
  265. return ^(id x, BOOL *stop) {
  266. if (!initial && (lastValue == x || [x isEqual:lastValue])) return [class empty];
  267. initial = NO;
  268. lastValue = x;
  269. return [class return:x];
  270. };
  271. }] setNameWithFormat:@"[%@] -distinctUntilChanged", self.name];
  272. }
  273. @end