import { isFunction } from 'lodash';
// eslint-disable-next-line no-restricted-imports
import Rx from 'rx';

/**
 * @deprecated
 */
export type StreamError = {
  errorCount: number;
  error: any;
};

export interface CreatePollingStreamParams<T> {
  pollingFunction: () => Promise<T>;
  filterEmitsFunction?: () => boolean;
  filterErrorsFunction?: (error: StreamError) => boolean;
  interval?: number;
  retry?: boolean;
  retryCount?: number;
  shared?: boolean;
}

const MAX_RETRY_INTERVAL = 2 * 60 * 1000;

/**
 * @deprecated use usePolling hook from @neptune/shared/common-business-logic instead
 * Creates RxJS Observable that emits results of polling
 * @param {Function} pollingFunction - function that will be used to get result of polling
 *        must return Promise
 * @param {Function} filterEmitsFunction - function that can prevent a stream to make calls
 *        to pollingFunction by default this function passes all emits down to pollingFunction
 * @param {Function} filterErrorsFunction - function that can prevent a stream to make a retry.
 *        Useful when there is no sense to retry.
 *        Like there is bad request and without changing it pollingFunction will return the same error.
 * @param {Number = 1000} interval - interval of pooling in milliseconds
 * @param {Boolean = true} retry - should stream recover from error
 *        (for example HTTP error or exception in pollingFunction)
 * @param {Number = 60} retryCount - how many times in a row stream should try to recover
 * @param {Boolean = true} shared - indicates if this stream logic should be shared
 *        across all observers
 */
export function createPollingStream<T>({
  pollingFunction,
  filterEmitsFunction = () => true,
  filterErrorsFunction = () => true,
  interval = 1000,
  retry = true,
  retryCount = 60,
  shared = true,
}: CreatePollingStreamParams<T>) {
  if (!isFunction(pollingFunction)) {
    throw new TypeError('pollingFunction is not a function');
  }

  let stream = Rx.Observable.interval(interval)
    .filter(() => filterEmitsFunction())
    // @ts-ignore
    .startWith('run right away')
    .flatMap<T>(() => {
      const result = pollingFunction();

      if (!isThenable(result)) {
        // eslint-disable-next-line no-console
        console.warn(
          'pollingFunction does not return Promise like object!',
          'There can be unexpected results of polling stream!',
        );
      }

      return Rx.Observable.fromPromise(result);
    });

  if (retry) {
    stream = stream.retryWhen((errorsStream: Rx.Observable<any>) =>
      createRetryStream({
        errorsStream,
        retryCount,
        filterErrorsFunction,
      }),
    );
  }

  if (shared) {
    stream = stream.share();
  }

  return stream;
}

interface CreateRetryStreamParams {
  errorsStream: Rx.Observable<any>;
  attempt?: () => number;
  retryCount?: number;
  interval?: number;
  filterErrorsFunction?: (error: StreamError) => boolean;
  onError?: (value: StreamError) => void;
}

/**
 * @deprecated use retryBackoff from `backoff-rx-js`
 */
export function createRetryStream({
  errorsStream,
  attempt,
  retryCount = 60,
  interval = 1000,
  filterErrorsFunction = () => true,
  onError = () => {},
}: CreateRetryStreamParams) {
  return errorsStream
    .map((error, i) => ({ error, errorCount: i }))
    .do(onError)
    .filter(filterErrorsFunction)
    .flatMap((error) => {
      const retriesInRow = attempt ? attempt() : error.errorCount;

      if (retriesInRow >= retryCount) {
        throw error;
      }

      return Rx.Observable.timer(Math.min(retriesInRow * interval, MAX_RETRY_INTERVAL));
    });
}

function isThenable(something: any) {
  if (!something) {
    return false;
  }

  return something instanceof Promise || isFunction(something.then);
}
