/***************************************************************************
 * ========================================================================
 * Copyright 2024 VMware, Inc. All rights reserved. VMware Confidential
 * ========================================================================
 */

import {
    BehaviorSubject,
    Observable,
    of,
} from 'rxjs';

import {
    concat,
    concatMap,
    delay,
    skip,
    tap,
} from 'rxjs/operators';

/**
 * @description
 *     Factory used to continuously emit on a stream after an interval of time. In other words, this
 *     can be used to make a polling request like AsyncFactory but with observable streams. The
 *     difference between this a regular interval() is that this waits for the observable to
 *     complete before starting the timer for the next poll, ie. in an HTTP request scenario, it
 *     will wait for the request to return before starting the timer for the next poll instead of
 *     starting the timer when the request is made.
 * @author alextsg
 * @example
 *
 *     // Logs 'foo' every 3 seconds.
 *     const subscription = poll(of(true), 3000).subscribe(() => {
 *         console.log('foo');
 *     });
 *
 *     // Unsubscribes and console logs terminate.
 *     subscription.unsubscribe();
 *
 */
export function poll(
    observable$: Observable<any>,
    pollingInterval: number,
): Observable<any> {
    const request$ = new BehaviorSubject(true);

    return request$.pipe(
        concatMap(() => observable$.pipe(
            concat(of(true).pipe(
                delay(pollingInterval),
                tap(() => request$.next(true)),
                skip(1),
            )),
        )),
    );
}
