123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199 |
- 'use strict';
- // https://github.com/zenparsing/es-observable
- var $export = require('./_export')
- , global = require('./_global')
- , core = require('./_core')
- , microtask = require('./_microtask')()
- , OBSERVABLE = require('./_wks')('observable')
- , aFunction = require('./_a-function')
- , anObject = require('./_an-object')
- , anInstance = require('./_an-instance')
- , redefineAll = require('./_redefine-all')
- , hide = require('./_hide')
- , forOf = require('./_for-of')
- , RETURN = forOf.RETURN;
- var getMethod = function(fn){
- return fn == null ? undefined : aFunction(fn);
- };
- var cleanupSubscription = function(subscription){
- var cleanup = subscription._c;
- if(cleanup){
- subscription._c = undefined;
- cleanup();
- }
- };
- var subscriptionClosed = function(subscription){
- return subscription._o === undefined;
- };
- var closeSubscription = function(subscription){
- if(!subscriptionClosed(subscription)){
- subscription._o = undefined;
- cleanupSubscription(subscription);
- }
- };
- var Subscription = function(observer, subscriber){
- anObject(observer);
- this._c = undefined;
- this._o = observer;
- observer = new SubscriptionObserver(this);
- try {
- var cleanup = subscriber(observer)
- , subscription = cleanup;
- if(cleanup != null){
- if(typeof cleanup.unsubscribe === 'function')cleanup = function(){ subscription.unsubscribe(); };
- else aFunction(cleanup);
- this._c = cleanup;
- }
- } catch(e){
- observer.error(e);
- return;
- } if(subscriptionClosed(this))cleanupSubscription(this);
- };
- Subscription.prototype = redefineAll({}, {
- unsubscribe: function unsubscribe(){ closeSubscription(this); }
- });
- var SubscriptionObserver = function(subscription){
- this._s = subscription;
- };
- SubscriptionObserver.prototype = redefineAll({}, {
- next: function next(value){
- var subscription = this._s;
- if(!subscriptionClosed(subscription)){
- var observer = subscription._o;
- try {
- var m = getMethod(observer.next);
- if(m)return m.call(observer, value);
- } catch(e){
- try {
- closeSubscription(subscription);
- } finally {
- throw e;
- }
- }
- }
- },
- error: function error(value){
- var subscription = this._s;
- if(subscriptionClosed(subscription))throw value;
- var observer = subscription._o;
- subscription._o = undefined;
- try {
- var m = getMethod(observer.error);
- if(!m)throw value;
- value = m.call(observer, value);
- } catch(e){
- try {
- cleanupSubscription(subscription);
- } finally {
- throw e;
- }
- } cleanupSubscription(subscription);
- return value;
- },
- complete: function complete(value){
- var subscription = this._s;
- if(!subscriptionClosed(subscription)){
- var observer = subscription._o;
- subscription._o = undefined;
- try {
- var m = getMethod(observer.complete);
- value = m ? m.call(observer, value) : undefined;
- } catch(e){
- try {
- cleanupSubscription(subscription);
- } finally {
- throw e;
- }
- } cleanupSubscription(subscription);
- return value;
- }
- }
- });
- var $Observable = function Observable(subscriber){
- anInstance(this, $Observable, 'Observable', '_f')._f = aFunction(subscriber);
- };
- redefineAll($Observable.prototype, {
- subscribe: function subscribe(observer){
- return new Subscription(observer, this._f);
- },
- forEach: function forEach(fn){
- var that = this;
- return new (core.Promise || global.Promise)(function(resolve, reject){
- aFunction(fn);
- var subscription = that.subscribe({
- next : function(value){
- try {
- return fn(value);
- } catch(e){
- reject(e);
- subscription.unsubscribe();
- }
- },
- error: reject,
- complete: resolve
- });
- });
- }
- });
- redefineAll($Observable, {
- from: function from(x){
- var C = typeof this === 'function' ? this : $Observable;
- var method = getMethod(anObject(x)[OBSERVABLE]);
- if(method){
- var observable = anObject(method.call(x));
- return observable.constructor === C ? observable : new C(function(observer){
- return observable.subscribe(observer);
- });
- }
- return new C(function(observer){
- var done = false;
- microtask(function(){
- if(!done){
- try {
- if(forOf(x, false, function(it){
- observer.next(it);
- if(done)return RETURN;
- }) === RETURN)return;
- } catch(e){
- if(done)throw e;
- observer.error(e);
- return;
- } observer.complete();
- }
- });
- return function(){ done = true; };
- });
- },
- of: function of(){
- for(var i = 0, l = arguments.length, items = Array(l); i < l;)items[i] = arguments[i++];
- return new (typeof this === 'function' ? this : $Observable)(function(observer){
- var done = false;
- microtask(function(){
- if(!done){
- for(var i = 0; i < items.length; ++i){
- observer.next(items[i]);
- if(done)return;
- } observer.complete();
- }
- });
- return function(){ done = true; };
- });
- }
- });
- hide($Observable.prototype, OBSERVABLE, function(){ return this; });
- $export($export.G, {Observable: $Observable});
- require('./_set-species')('Observable');
|