import { Observable, merge } from 'rxjs';
import { filter, tap } from 'rxjs/operators';

declare module 'rxjs/internal/Observable' {
    interface Observable<T> {
        blockStream<T>(v: Observable<boolean>): Observable<T>;

        notNull(): Observable<NonNullable<T>>;
    }
}

Observable.prototype.blockStream = blockStream;
Observable.prototype.notNull = notNullPrototype;

/**
 * Skips the emission from the source temporarily whenever another boolean Observable emits true
 * @param other
 */
function blockStream<T>(this: Observable<T>, other: Observable<boolean>): Observable<T> {
    let letThrough = true;

    return (
        merge(
            this as Observable<T>,
            other.pipe(
                tap(v => {
                    letThrough = !v;
                }),
                filter(() => false),
            ),
        ) as Observable<T>
    ).pipe(
        filter(() => {
            return letThrough;
        }),
    );
}

/**
 * notNull operator for Observables
 * can be used as a prototype observable.notNull()
 */
function notNullPrototype<T>(this: Observable<T>): Observable<T> {
    return this.pipe(filter((value): value is NonNullable<T> => value != null));
}

/**
 * notNull operator for Observables
 * can be used in a pipe .pipe(notNull())
 */
export function notNull<T>(): (source: Observable<T>) => Observable<NonNullable<T>> {
    return (source: Observable<T>) =>
        source.pipe(filter((value): value is NonNullable<T> => value != null));
}
