es7.observable.js 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. 'use strict';
  2. // https://github.com/zenparsing/es-observable
  3. var $export = require('./_export')
  4. , global = require('./_global')
  5. , core = require('./_core')
  6. , microtask = require('./_microtask')()
  7. , OBSERVABLE = require('./_wks')('observable')
  8. , aFunction = require('./_a-function')
  9. , anObject = require('./_an-object')
  10. , anInstance = require('./_an-instance')
  11. , redefineAll = require('./_redefine-all')
  12. , hide = require('./_hide')
  13. , forOf = require('./_for-of')
  14. , RETURN = forOf.RETURN;
  15. var getMethod = function(fn){
  16. return fn == null ? undefined : aFunction(fn);
  17. };
  18. var cleanupSubscription = function(subscription){
  19. var cleanup = subscription._c;
  20. if(cleanup){
  21. subscription._c = undefined;
  22. cleanup();
  23. }
  24. };
  25. var subscriptionClosed = function(subscription){
  26. return subscription._o === undefined;
  27. };
  28. var closeSubscription = function(subscription){
  29. if(!subscriptionClosed(subscription)){
  30. subscription._o = undefined;
  31. cleanupSubscription(subscription);
  32. }
  33. };
  34. var Subscription = function(observer, subscriber){
  35. anObject(observer);
  36. this._c = undefined;
  37. this._o = observer;
  38. observer = new SubscriptionObserver(this);
  39. try {
  40. var cleanup = subscriber(observer)
  41. , subscription = cleanup;
  42. if(cleanup != null){
  43. if(typeof cleanup.unsubscribe === 'function')cleanup = function(){ subscription.unsubscribe(); };
  44. else aFunction(cleanup);
  45. this._c = cleanup;
  46. }
  47. } catch(e){
  48. observer.error(e);
  49. return;
  50. } if(subscriptionClosed(this))cleanupSubscription(this);
  51. };
  52. Subscription.prototype = redefineAll({}, {
  53. unsubscribe: function unsubscribe(){ closeSubscription(this); }
  54. });
  55. var SubscriptionObserver = function(subscription){
  56. this._s = subscription;
  57. };
  58. SubscriptionObserver.prototype = redefineAll({}, {
  59. next: function next(value){
  60. var subscription = this._s;
  61. if(!subscriptionClosed(subscription)){
  62. var observer = subscription._o;
  63. try {
  64. var m = getMethod(observer.next);
  65. if(m)return m.call(observer, value);
  66. } catch(e){
  67. try {
  68. closeSubscription(subscription);
  69. } finally {
  70. throw e;
  71. }
  72. }
  73. }
  74. },
  75. error: function error(value){
  76. var subscription = this._s;
  77. if(subscriptionClosed(subscription))throw value;
  78. var observer = subscription._o;
  79. subscription._o = undefined;
  80. try {
  81. var m = getMethod(observer.error);
  82. if(!m)throw value;
  83. value = m.call(observer, value);
  84. } catch(e){
  85. try {
  86. cleanupSubscription(subscription);
  87. } finally {
  88. throw e;
  89. }
  90. } cleanupSubscription(subscription);
  91. return value;
  92. },
  93. complete: function complete(value){
  94. var subscription = this._s;
  95. if(!subscriptionClosed(subscription)){
  96. var observer = subscription._o;
  97. subscription._o = undefined;
  98. try {
  99. var m = getMethod(observer.complete);
  100. value = m ? m.call(observer, value) : undefined;
  101. } catch(e){
  102. try {
  103. cleanupSubscription(subscription);
  104. } finally {
  105. throw e;
  106. }
  107. } cleanupSubscription(subscription);
  108. return value;
  109. }
  110. }
  111. });
  112. var $Observable = function Observable(subscriber){
  113. anInstance(this, $Observable, 'Observable', '_f')._f = aFunction(subscriber);
  114. };
  115. redefineAll($Observable.prototype, {
  116. subscribe: function subscribe(observer){
  117. return new Subscription(observer, this._f);
  118. },
  119. forEach: function forEach(fn){
  120. var that = this;
  121. return new (core.Promise || global.Promise)(function(resolve, reject){
  122. aFunction(fn);
  123. var subscription = that.subscribe({
  124. next : function(value){
  125. try {
  126. return fn(value);
  127. } catch(e){
  128. reject(e);
  129. subscription.unsubscribe();
  130. }
  131. },
  132. error: reject,
  133. complete: resolve
  134. });
  135. });
  136. }
  137. });
  138. redefineAll($Observable, {
  139. from: function from(x){
  140. var C = typeof this === 'function' ? this : $Observable;
  141. var method = getMethod(anObject(x)[OBSERVABLE]);
  142. if(method){
  143. var observable = anObject(method.call(x));
  144. return observable.constructor === C ? observable : new C(function(observer){
  145. return observable.subscribe(observer);
  146. });
  147. }
  148. return new C(function(observer){
  149. var done = false;
  150. microtask(function(){
  151. if(!done){
  152. try {
  153. if(forOf(x, false, function(it){
  154. observer.next(it);
  155. if(done)return RETURN;
  156. }) === RETURN)return;
  157. } catch(e){
  158. if(done)throw e;
  159. observer.error(e);
  160. return;
  161. } observer.complete();
  162. }
  163. });
  164. return function(){ done = true; };
  165. });
  166. },
  167. of: function of(){
  168. for(var i = 0, l = arguments.length, items = Array(l); i < l;)items[i] = arguments[i++];
  169. return new (typeof this === 'function' ? this : $Observable)(function(observer){
  170. var done = false;
  171. microtask(function(){
  172. if(!done){
  173. for(var i = 0; i < items.length; ++i){
  174. observer.next(items[i]);
  175. if(done)return;
  176. } observer.complete();
  177. }
  178. });
  179. return function(){ done = true; };
  180. });
  181. }
  182. });
  183. hide($Observable.prototype, OBSERVABLE, function(){ return this; });
  184. $export($export.G, {Observable: $Observable});
  185. require('./_set-species')('Observable');