import { createAsyncThunk, createSlice, PayloadAction } from '@reduxjs/toolkit'
import {
    addMessage,
    createThread as createThreadApiCall,
    getMessages,
    getRun,
    startRun
} from '../../api/conversation/api'
import { lipSync } from '../../api/audio/api'
import {
    AssistantMessage,
    AvatarAction,
    conversationPersistConfig,
    Customizations,
    InitializeConversationInput,
    initialState,
    Role,
    SliceState,
    VisemeInformation,
    SerializedActionNode,
    UserMessage,
    Conversation,
    WordBoundaries
} from './data'
import { Avatar, avatarMap } from '../../components/ConversationPage/Avatar/Avatar.data'
import { persistReducer } from 'redux-persist'
import { scoreResponse } from '../../hooks/conversation/prompting/evaluation/evaluation'
import { config } from '../../config/config'
import OpenAI from 'openai'
import { TextContentBlock } from 'openai/resources/beta/threads/messages'
import { RootState } from '..'

const conversationSlice = createSlice({
    name: 'conversation',
    initialState,
    reducers: {
        setConversationId(state, action: PayloadAction<number>) {
            state.conversationId = action.payload
        },
        stopTalking(state, _action: PayloadAction<void>) {
            state.avatarAction = AvatarAction.STANDING
        },
        startTalking(state, _action: PayloadAction<void>) {
            if (state.conversation === undefined) return
            const messagesWithAvatarState = state
                .conversation!.filter((message) => message.role === Role.ASSISTANT)
                .filter((message) => (message as AssistantMessage).auxillaryInformation !== undefined)
            const lastMessageWithAvatarState = messagesWithAvatarState[messagesWithAvatarState.length - 1]
            if (lastMessageWithAvatarState === undefined) return
            const lastAvatarAction = (lastMessageWithAvatarState as AssistantMessage).avatarAction
            state.avatarAction = lastAvatarAction
        },
        setConversation(state, action: PayloadAction<Conversation | undefined>) {
            state.conversation = action.payload
        },
        setCustomizations(state, action: PayloadAction<Customizations | undefined>) {
            state.customizations = action.payload
        },
        setActionNodes(state, action: PayloadAction<SerializedActionNode[] | undefined>) {
            state.actionNodes = action.payload
        },
        setMostRecentInstruction(state, action: PayloadAction<string | undefined>) {
            state.mostRecentInstruction = action.payload
        },
        setError(state, action: PayloadAction<string>) {
            state.error = action.payload
        },
        resetConversation(state, _action: PayloadAction<void>) {
            state.conversationId = initialState.conversationId
            state.avatarAction = initialState.conversationId
            state.avatarId = initialState.conversationId
            state.customizations = initialState.customizations
            state.runIds = initialState.runIds
            state.threadId = initialState.threadId
            state.conversation = initialState.conversation
            state.actionNodes = initialState.actionNodes
            state.mostRecentInstruction = initialState.mostRecentInstruction
            state.startTime = initialState.startTime
            state.error = initialState.error
        }
    },
    extraReducers: (builder) => {
        // initializeConversation
        builder.addCase(initializeConversation.fulfilled, (state, action) => {
            const newConversationParameters = action.meta.arg

            // state updates for both new and existing conversations
            state.avatarAction = AvatarAction.STANDING
            state.runIds = initialState.runIds
            state.error = initialState.error

            if (newConversationParameters !== undefined) {
                // state updates for new conversations only
                state.conversationId = newConversationParameters.conversationId
                state.avatarId = newConversationParameters.avatarId
                state.customizations = newConversationParameters.customizations
                state.conversation = []
                state.actionNodes = initialState.actionNodes
                state.mostRecentInstruction = initialState.mostRecentInstruction
                state.startTime = new Date().getTime()
            } else {
                /* state updates for existing conversations only */
            }

            const payload = action.payload as unknown as { threadId: string }
            state.threadId = payload.threadId
        })
        builder.addCase(initializeConversation.rejected, (state, action) => {
            state.conversationId = initialState.conversationId
            state.avatarAction = initialState.conversationId
            state.avatarId = initialState.conversationId
            state.customizations = initialState.customizations
            state.threadId = initialState.threadId
            state.conversation = initialState.conversation
            state.actionNodes = initialState.actionNodes
            state.mostRecentInstruction = initialState.mostRecentInstruction
            state.startTime = initialState.startTime
            state.error = initialState.error
            if (!action.meta.aborted) {
                state.error = 'Failed to create your conversation.'
            }
        })

        // runThread
        builder.addCase(runThread.pending, (state, _action) => {
            state.avatarAction = AvatarAction.THINKING
        })
        builder.addCase(runThread.fulfilled, (state, action) => {
            state.runIds = action.payload
        })
        builder.addCase(runThread.rejected, (state, action) => {
            if (action.meta.aborted) return
            state.error = 'There was a problem with processing your message.'
        })

        // sendMessage
        builder.addCase(sendMessage.pending, (state, action) => {
            state.conversation = [
                ...(state.conversation ?? []),
                {
                    role: Role.USER,
                    content: action.meta.arg.message,
                    addedToThread: false,
                    numResponsesToGenerate: action.meta.arg.numResponsesToGenerate
                }
            ]
            state.avatarAction = AvatarAction.THINKING
        })
        builder.addCase(sendMessage.fulfilled, (state, action) => {
            const newConversation = [
                ...(state.conversation?.slice(0, -1) ?? []),
                {
                    role: Role.USER,
                    content: action.meta.arg.message,
                    addedToThread: true,
                    numResponsesToGenerate: action.meta.arg.numResponsesToGenerate
                }
            ]
            state.conversation = newConversation
        })
        builder.addCase(sendMessage.rejected, (state, action) => {
            if (action.meta.aborted) return
            state.error = 'There was a problem with sending your message.'
        })

        // getResponse
        builder.addCase(getResponse.pending, (state, _action) => {
            state.avatarAction = AvatarAction.THINKING
        })
        builder.addCase(getResponse.fulfilled, (state, action) => {
            state.conversation = [...(state.conversation ?? []), action.payload.nextMessage]
            state.threadId = action.payload.threadId
            state.runIds = initialState.runIds
        })
        builder.addCase(getResponse.rejected, (state, action) => {
            if (action.meta.aborted) return
            state.error = 'There was a problem with getting a response.'
        })

        // hydrateAudioInformation
        builder.addCase(hydrateAudioInformation.fulfilled, (state, action) => {
            const toUpdate = action.payload
            const newConversation = [...(state.conversation ?? [])]
            for (const { ix, audio, visemes, wordBoundaries } of toUpdate) {
                newConversation[ix] = {
                    ...newConversation[ix],
                    auxillaryInformation: { audio, visemes, wordBoundaries }
                }
            }

            state.conversation = newConversation
        })
        builder.addCase(hydrateAudioInformation.rejected, (state, _action) => {
            state.error = 'There was a problem getting the audio information.'
        })
    }
})

/**
 * Sets up the information in redux to be able to start a conversation (new or existing).
 * Sanitizes the redux store to ensure that we are in a good state to start a conversation.
 *
 * It handles both new and existing conversations.
 */
const initializeConversation = createAsyncThunk(
    '/conversation/initialize',
    async (
        newConversationParameters: InitializeConversationInput,
        { rejectWithValue, getState, dispatch, fulfillWithValue }
    ) => {
        if (newConversationParameters !== undefined) {
            // need to generate new thread!
            // try to create a thread 5 times before giving up
            for (let ix = 0; ix < 5; ix += 1) {
                const { statusCode, body } = await createThreadApiCall({})
                if (statusCode === 500) {
                    await new Promise<void>((res) => setTimeout(res, 200 * (Math.random() + 0.5)))
                    continue
                }
                return fulfillWithValue({ threadId: body.threadId })
            }

            return rejectWithValue('failed at creating new thread')
        } else {
            // want to keep the conversation consistent with the current state.
            const { threadId } = (getState() as RootState).conversation as SliceState

            // try to get messages 5 times before giving up
            let messages: OpenAI.Beta.Threads.Messages.Message[] = []
            for (let ix = 0; ix < 5; ix += 1) {
                const result = await getMessages({ threadId: threadId! })
                if (result.statusCode === 500) {
                    if (ix === 4) return rejectWithValue('failed at getting existing conversation messages')
                    await new Promise<void>((res) => setTimeout(res, 200 * (Math.random() + 0.5)))
                    continue
                }

                messages = result.body.messages.reverse().filter((message) => message.content[0].text.value != ' ')
                break
            }

            // create a fresh thread, using these messages. Try 5 times before giving up
            let newThreadId: string | undefined = undefined
            for (let ix = 0; ix < 5; ix += 1) {
                const { statusCode, body } = await createThreadApiCall({
                    messages: messages.map((message) => {
                        return { content: (message.content[0] as TextContentBlock).text.value, role: message.role }
                    })
                })

                if (statusCode === 500) {
                    if (ix === 4) return rejectWithValue('failed at getting existing conversation messages')
                    await new Promise<void>((res) => setTimeout(res, 200 * (Math.random() + 0.5)))
                    continue
                }

                newThreadId = body.threadId
                break
            }

            // now that we have a new thread, we need to update the conversation state and populate the auxillary information
            // so that we can resume the conversation
            const { avatarId } = (getState() as RootState).conversation as SliceState
            const newConversation = messages.map((message) => {
                if (message.role === 'user') {
                    return {
                        role: Role.USER,
                        content: (message.content[0] as TextContentBlock).text.value,
                        addedToThread: true,
                        numResponsesToGenerate: 3
                    }
                } else {
                    return {
                        role: Role.ASSISTANT,
                        content: (message.content[0] as TextContentBlock).text.value,
                        avatarAction: AvatarAction.TALKING,
                        avatarId: avatarId!
                    }
                }
            })

            // update conversation
            dispatch(setConversation(newConversation))

            // fill in the auxillary information, and stall until this process is done
            await dispatch(hydrateAudioInformation())

            return fulfillWithValue({ threadId: newThreadId })
        }
    }
)

/**
 * Runs a thread with an assistant.
 */
const runThread = createAsyncThunk(
    '/conversation/runThread',
    async (
        { threadId, assistantId, instructions }: { threadId: string; assistantId: string; instructions?: string },
        { getState, rejectWithValue }
    ) => {
        const { conversation } = (getState() as any).conversation as SliceState
        const userMessages = conversation!.filter((message) => message.role === Role.USER) as UserMessage[]
        const numResponsesToGenerate =
            userMessages.length === 0 ? 3 : userMessages[userMessages.length - 1].numResponsesToGenerate

        const branchTasks = []
        for (let ix = 0; ix < numResponsesToGenerate - 1; ix += 1) {
            branchTasks.push(
                createThreadApiCall({
                    metadata: { type: 'branch' },
                    messages: conversation!.map((message) => {
                        return { content: message.content, role: message.role }
                    })
                })
            )
        }
        const branchThreads = (await Promise.all(branchTasks))
            .filter((result) => result.statusCode === 200)
            .map((result) => (result.body as any).threadId)

        for (let ix = 0; ix < 3; ix += 1) {
            const startRunTasks = []
            for (const thread of [threadId, ...branchThreads]) {
                startRunTasks.push(startRun({ threadId: thread, assistantId, instructions }))
            }
            try {
                const runIds = await Promise.all(startRunTasks)
                const records = runIds
                    .map((task, ix) => {
                        return { thread: [threadId, ...branchThreads][ix], runId: task }
                    })
                    .filter((record) => record.runId.statusCode === 200)
                    .map((record) => {
                        return { threadId: record.thread, runId: (record.runId.body as any).runId }
                    }) as { threadId: string; runId: string }[]
                return records
            } catch (e) {}
        }
        return rejectWithValue('failed at running thread')
    }
)

/**
 * Adds a message to the thread currently in state.
 */
const sendMessage = createAsyncThunk(
    '/conversation/sendMessage',
    async (
        { message, numResponsesToGenerate }: { message: string; numResponsesToGenerate: number },
        { getState, rejectWithValue }
    ) => {
        const { threadId } = (getState() as any).conversation as SliceState
        if (threadId === undefined) {
            return rejectWithValue('failed at sending message')
        }

        for (let ix = 0; ix < 3; ix += 1) {
            const { statusCode } = await addMessage({ threadId, content: message })
            if (statusCode === 500) {
                await new Promise<void>((res) => setTimeout(res, 200))
                continue
            }
            return
        }

        return rejectWithValue('failed at sending message')
    }
)

enum GetResponseFailureType {
    OPENAI = 'openai',
    TIMEOUT = 'timeout',
    EMPTY_CONVERSATION = 'empty conversation',
    EMPTY_AVATAR = 'empty avatar'
}

/**
 * Given a runId, this function constantly polls until we get the response that we want. Once we get the response,
 * we add the message to the conversation state.
 */
const getResponse = createAsyncThunk(
    '/conversation/getResponse',
    async (_args: void, { getState, rejectWithValue }) => {
        const { avatarId, mostRecentInstruction, runIds } = (getState() as any).conversation as SliceState

        for (const { threadId, runId } of runIds!) {
            const startTime = new Date().getTime()
            while (true) {
                const { statusCode, body } = await getRun({ runId, threadId })
                if (statusCode === 500) continue

                const status = body.status
                if (status === 'completed') {
                    break
                }

                if (['cancelling', 'cancelled', 'failed', 'expired'].includes(status)) {
                    return rejectWithValue(GetResponseFailureType.OPENAI)
                }

                const now = new Date().getTime()
                if (now - startTime >= 30 * 1000) {
                    // if we haven't gotten a response in 30 seconds, then fail the request.
                    return rejectWithValue(GetResponseFailureType.TIMEOUT)
                }

                // space out requests to be every half second
                await new Promise<void>((res, _rej) => setTimeout(() => res(), 500))
            }
        }

        const branchResults: { threadId: string; content: string; score: number }[] = []
        const messagePromises = []
        for (const { threadId } of runIds!) {
            // at this point, the run should be done, so we can pick up the message!
            messagePromises.push(
                new Promise<{ threadId: string; result: Awaited<ReturnType<typeof getMessages>> }>(async (res) => {
                    const result = await getMessages({ threadId })
                    res({ threadId, result })
                })
            )
        }

        const messageResults = await Promise.all(messagePromises)
        if (messageResults.some((result) => result.result.statusCode === 500)) {
            return rejectWithValue(GetResponseFailureType.OPENAI)
        }

        for (const { result, threadId } of messageResults) {
            const messages = (result.body as any).messages
            const content = messages[0].content[0].text.value
            const score = scoreResponse(mostRecentInstruction!, content)
            if (config.name !== 'prod') console.log({ content, score })
            branchResults.push({ threadId, content, score })
        }

        const bestResponse = branchResults.sort((a, b) => b.score - a.score)[0]

        if (avatarId === undefined) {
            return rejectWithValue(GetResponseFailureType.EMPTY_AVATAR)
        }

        return {
            threadId: bestResponse.threadId,
            nextMessage: {
                content: bestResponse.content,
                role: Role.ASSISTANT,
                avatarAction: AvatarAction.TALKING,
                avatarId
            }
        }
    }
)

/**
 * Looks for any "gaps" in the conversation. That is, looks for any messages from the assistant
 * without any audio and will go ahead and fill the information in. It will fill the information in
 * chronological order.
 */
const hydrateAudioInformation = createAsyncThunk(
    '/conversation/hydrateAudioInformation',
    async (_args: void, { getState, rejectWithValue }) => {
        const { conversation } = (getState() as any).conversation as SliceState

        if (conversation === undefined || conversation.length === 0) {
            return []
        }

        const messagesToHydrate = conversation
            .map((message, ix) => {
                return { ...message, ix }
            })
            .filter((message) => message.role === Role.ASSISTANT)
            .filter((message) => !Object.keys(message).includes('auxillaryInformation')) as (AssistantMessage & {
            ix: number
        })[]

        const out: Array<{
            ix: number
            audio: string
            visemes: VisemeInformation[]
            wordBoundaries: WordBoundaries[]
        }> = []
        for (const { content, avatarId, ix } of messagesToHydrate) {
            const voice = (avatarMap.get(avatarId) as Avatar).voice
            const { statusCode, body } = await lipSync({ text: content, voice })

            if (statusCode === 500) {
                return rejectWithValue('Failed to hydrate audio')
            }

            const { audio, visemes, wordBoundaries } = body

            const serializedAudio = JSON.stringify((audio as any).data)
            const visemesInformation = visemes.map((viseme: { audioOffset: number; visemeId: number }) => {
                return { timestamp: viseme.audioOffset, visemeId: viseme.visemeId }
            }) as VisemeInformation[]

            out.push({ ix, audio: serializedAudio, visemes: visemesInformation, wordBoundaries })
        }

        return out
    }
)

export const {
    setConversationId,
    setActionNodes,
    setCustomizations,
    setConversation,
    stopTalking,
    startTalking,
    setError,
    resetConversation,
    setMostRecentInstruction
} = conversationSlice.actions
export { runThread, sendMessage, getResponse, hydrateAudioInformation, initializeConversation }
export default persistReducer(conversationPersistConfig, conversationSlice.reducer)
