import { put, take, fork, call, takeEvery, race, select, retry } from 'redux-saga/effects';
import { channel } from 'redux-saga';
import { take as takeFromArray, chunk, uniqBy, flatMap, difference } from 'lodash';
import actions from './actions';
import types from './types';
import { getFromConfig, configurationSelectors } from '../configuration';
import { GetQueue, GetConversationsById } from '../services/ModerationApi';
import { usersActions } from '../users';
import { conversationActions } from '../conversations';
import { websocketTypes } from '../websocket';
import selectors from './selectors';
import { reservedConversationTypes } from '../reservedConversation';
import { agentSelectors, agentTypes } from '../agent';
import ChannelDescription from '../ChannelDescription';

function* createQueue(sagaChannel) {
  while (true) {
    const queue = yield take(sagaChannel);
    const language = yield select(agentSelectors.getLanguage);
    yield put(actions.createQueue(queue.toString()));
    try {
      const { conversations } = yield call(GetQueue, queue.messageType, queue.queueType, queue.id);
      const filteredConversations = conversations.filter(
        ({ language: conversationLanguage }) => conversationLanguage === language
      );
      yield put(actions.retrieveQueueSuccess(queue.toString(), filteredConversations));
    } catch (error) {
      yield put(actions.retrieveQueueFailure(queue.toString(), error));
    }
  }
}

function* createQueueHandler({ workOrder, connections: { messageTypes, channels } }) {
  const sagaChannel = yield call(channel);
  for (let i = 0; i < 3; i += 1) {
    yield fork(createQueue, sagaChannel);
  }
  const queues = messageTypes.reduce(
    (arr, mT) => [
      ...arr,
      ...channels
        .map((channelInfo) =>
          channelInfo.url ? new ChannelDescription(channelInfo.id, channelInfo.type, mT) : null
        )
        .filter((item) => item),
    ],
    []
  );

  console.log(queues);

  for (let i = 0; i < queues.length; i += 1) {
    const queue = queues[i];
    if (workOrder.some(({ name }) => name === queue.toString())) {
      yield put(sagaChannel, queue);
    }
  }
}

function* checkQueueLoaded() {
  while (true) {
    yield take(websocketTypes.connected);
    let i = 0;
    console.log('Load started', new Date());
    do {
      const { add, decrease } = yield race({
        add: take(types.createQueues),
        decrease: take([types.retrieveQueuesSuccess, types.retrieveQueuesFailure]),
      });
      if (add) {
        i += 1;
      }
      if (decrease) {
        i -= 1;
      }
    } while (i !== 0);
    console.log('Load ended', new Date());
    yield put(actions.loaded());
  }
}

function* handleConnect() {
  while (true) {
    yield take(types.load);
    const configuration = yield* getFromConfig(configurationSelectors.getConfiguration);
    yield take(websocketTypes.connected);
    console.log('Connect', configuration);
    yield fork(createQueueHandler, configuration);
  }
}

const minQueueLength = 20;

const conversationBatchSize = 5;

const openChannels = 2;

function* getConversations(chan) {
  while (true) {
    const ids = yield take(chan);
    try {
      const conversations = yield retry(3, 3000, GetConversationsById, ids);
      const users = uniqBy(flatMap(conversations, 'users'), 'id');
      yield put(usersActions.addUsers(users));
      const newConversations = conversations.map((conversation) => ({
        ...conversation,
        users: conversation.users.map(({ id }) => id),
      }));
      yield put(conversationActions.addConversations(newConversations));
    } catch (error) {
      yield put(conversationActions.loadConversationsFailure(error));
    }
  }
}

function* perpetuallyLoadConversations() {
  while (true) {
    yield take(types.loaded);
    const requestChannel = yield channel();
    for (let i = 0; i < openChannels; i += 1) {
      yield fork(getConversations, requestChannel);
    }
    let requestedConversations = [];
    while (true) {
      const conversationQueueIds = yield select(selectors.getQueuedConversationsIds);
      requestedConversations = difference(requestedConversations, conversationQueueIds);
      const conversationQueueLength = conversationQueueIds.length;
      if (conversationQueueLength + requestedConversations.length < minQueueLength) {
        const withoutConversation = yield select(selectors.getIdsWithoutConversation);
        console.log('No Conversation', withoutConversation);
        const potentialIds = difference(withoutConversation, requestedConversations);
        const takeIdLength =
          Math.ceil((minQueueLength - conversationQueueLength) / conversationBatchSize) *
          conversationBatchSize;
        const takeIds = takeFromArray(potentialIds, takeIdLength);
        requestedConversations = [...requestedConversations, ...takeIds];
        const idChunks = chunk(takeIds, conversationBatchSize);
        for (let i = 0; i < idChunks.length; i += 1) {
          yield put(requestChannel, idChunks[i]);
        }
      }
      const { remove, logout, reset } = yield race({
        reset: take(types.resetConversationLoad),
        remove: take([
          reservedConversationTypes.reserveConversationSuccess,
          reservedConversationTypes.deleted,
          types.removeFromQueue,
        ]),
        logout: take(agentTypes.logoutSuccess),
      });
      if (reset) {
        requestedConversations = [];
      }
      if (remove) {
        requestedConversations = requestedConversations.filter((item) => remove.id !== item);
      }
      if (logout) {
        break;
      }
    }
  }
}

export default {
  handleConnect,
  checkQueueLoaded,
  perpetuallyLoadConversations,
};
