sinProject-Inc/talk

View on GitHub
server/socket-handler.ts

Summary

Maintainability
A
0 mins
Test Coverage
import { PrismaClient, type ChatLog, type ChatMember, type Text } from '@prisma/client'
import { ChatMemberRepositoryPrisma } from '../src/lib/chat/chat_member_repository_prisma'
// import { createAdapter } from '@socket.io/cluster-adapter'
// import { setupWorker } from '@socket.io/sticky'
import { createAdapter } from '@socket.io/redis-adapter'
import type http from 'http'
import { createClient } from 'redis'
import { Server, Socket } from 'socket.io'
import { logger } from '../src/lib/app/logger'
import type { ChatMemberEntity, MessageSet } from '../src/lib/chat/chat'
import { ChatEntity } from '../src/lib/chat/chat_entity'
import { ChatLogRepositoryPrisma } from '../src/lib/chat/chat_log_repository_prisma'
import { LocaleCode } from '../src/lib/locale/locale_code'
import { SpeechText } from '../src/lib/speech/speech_text'
import { GetTextService } from '../src/lib/text/get_text_service'
import { TextRepositoryPrisma } from '../src/lib/text/text_repository_prisma'
import { GetTranslationService } from '../src/lib/translation/get_translation_service'
import { TranslationRepositoryPrisma } from '../src/lib/translation/translation_repository_prisma'
import { SocketClientAddress } from '../src/lib/network/socket_client_address'

const prisma_client = new PrismaClient()
const chat_member_repository = new ChatMemberRepositoryPrisma(prisma_client)
const chat_log_repository = new ChatLogRepositoryPrisma(prisma_client)
const text_repository = new TextRepositoryPrisma(prisma_client)
const translation_repository = new TranslationRepositoryPrisma(prisma_client)

async function save_chat_log(chat_entity: ChatEntity): Promise<ChatLog> {
    const chat_log = await chat_log_repository.save(chat_entity)

    return chat_log
}

async function get_text(chat_log: ChatLog): Promise<Text> {
    const speech_text = new SpeechText(chat_log.message)
    const locale_code = new LocaleCode(chat_log.locale_code)
    const get_text_service = new GetTextService(text_repository, locale_code, speech_text)
    const text = await get_text_service.execute()

    return text
}

async function get_translation_locale_codes(
    room_id: string,
    locale_code: string
): Promise<string[]> {
    const translation_locale_codes = await chat_member_repository.find_translation_codes(
        room_id,
        locale_code
    )

    return translation_locale_codes
}

async function get_translation(text: Text, translation_locale: string): Promise<void> {
    const locale_code = new LocaleCode(translation_locale)
    const get_translation_service = new GetTranslationService(
        translation_repository,
        text,
        locale_code
    )

    await get_translation_service.execute()
}

async function get_translations(text: Text, translation_locale_codes: string[]): Promise<void> {
    const promises: Promise<void>[] = []

    translation_locale_codes.forEach(async (locale_code) => {
        promises.push(get_translation(text, locale_code))
    })

    await Promise.all(promises)
}

async function save(chat_entity: ChatEntity, room_id: string): Promise<ChatLog> {
    const chat_log = await save_chat_log(chat_entity)
    const text = await get_text(chat_log)
    const translation_locale_codes = await get_translation_locale_codes(room_id, chat_log.locale_code)

    await get_translations(text, translation_locale_codes)

    return chat_log
}

async function find_logs(room_id: string): Promise<ChatLog[]> {
    return await prisma_client.chatLog.findMany({
        where: {
            room_id,
        },
        orderBy: {
            created_at: 'desc',
        },
        take: 10,
    })
}

function get_room_id(socket: Socket): string {
    const socket_rooms = socket.rooms
    const socket_id = socket.id

    for (const socket_room of socket_rooms) {
        if (socket_room !== socket_id) {
            return socket_room
        }
    }

    return ''
}

async function on_message(
    io: Server,
    socket: Socket,
    received_message_set: MessageSet
): Promise<void> {
    const client_address = new SocketClientAddress(socket).value

    try {
        const room_id = get_room_id(socket)

        if (!room_id) return

        logger.info(
            `${client_address} [SOCKET][${room_id}] ${received_message_set.name}: ${received_message_set.message}`
        )

        const chat_entity = new ChatEntity(
            room_id,
            received_message_set.locale_code,
            received_message_set.name,
            received_message_set.sender_id,
            received_message_set.message
        )

        const chat_log = await save(chat_entity, room_id)

        io.to(room_id).emit('message', chat_log)
        socket.emit('message_acknowledged')
    } catch (error) {
        logger.error(`${client_address} [SOCKET] on_message error`, error)
    }
}

async function send_logs(socket: Socket): Promise<void> {
    const room_id = get_room_id(socket)

    if (!room_id) return

    const chat_logs = await find_logs(room_id)

    socket.emit('logs', chat_logs)
}

async function find_room_members(io: Server, room_id: string): Promise<ChatMember[]> {
    const sockets = await io.in(room_id).fetchSockets()
    const socket_ids = sockets.map((socket) => socket.id)

    await chat_member_repository.delete_ghost(room_id, socket_ids)

    const room_members = await chat_member_repository.find_many(room_id)

    return room_members
}

async function send_members(io: Server, room_id: string): Promise<void> {
    const room_members = await find_room_members(io, room_id)

    // room_members を ChatMemberEntity[] に変換
    const room_member_entities = room_members.map((room_member) => {
        return {
            room_id: room_member.room_id,
            name: room_member.name,
            locale_code: room_member.locale_code,
            is_mobile_device: room_member.is_mobile_device,
        } as ChatMemberEntity
    })

    io.to(room_id).emit('members', room_member_entities)
}

function send_joined_member(
    io: Server,
    room_id: string,
    chat_member_entity: ChatMemberEntity
): void {
    io.to(room_id).emit('join', chat_member_entity)
}

function send_leaved_member(
    io: Server,
    room_id: string,
    chat_member_entity: ChatMemberEntity
): void {
    io.to(room_id).emit('leave', chat_member_entity)
}

async function join(
    io: Server,
    socket: Socket,
    chat_member_entity: ChatMemberEntity
): Promise<void> {
    const room_id = chat_member_entity.room_id
    const socket_id = socket.id

    if (!room_id) throw new Error('Room id is required')

    await chat_member_repository.save(socket_id, chat_member_entity)
    socket.join(room_id)

    send_joined_member(io, room_id, chat_member_entity)
    await send_members(io, room_id)
    await send_logs(socket)

    const client_address = new SocketClientAddress(socket).value

    logger.info(`${client_address} [SOCKET][${room_id}] ${chat_member_entity.name} joined`)
}

async function leave(io: Server, socket: Socket): Promise<void> {
    const room_id = get_room_id(socket)

    if (!room_id) return

    const socket_id = socket.id
    const chat_member = await chat_member_repository.find_unique(socket_id)

    if (!chat_member) return

    await chat_member_repository.delete(socket_id)
    socket.leave(room_id)

    send_leaved_member(io, room_id, chat_member)
    await send_members(io, room_id)

    const client_address = new SocketClientAddress(socket).value

    logger.info(`${client_address} [SOCKET][${room_id}] ${chat_member.name} leaved`)
}

async function on_connection(io: Server, socket: Socket): Promise<void> {
    socket.on('join', (chat_member_entity) => join(io, socket, chat_member_entity))
    socket.on('message', async (received_data) => on_message(io, socket, received_data))
    socket.on('leave', () => leave(io, socket))
    socket.on('disconnecting', () => leave(io, socket))
}

async function adapt_redis(io: Server): Promise<void> {
    const pub_client = createClient()
    const sub_client = pub_client.duplicate()
    const redis_adapter = createAdapter(pub_client, sub_client)

    await pub_client.connect()
    await sub_client.connect()

    io.adapter(redis_adapter)
}

export default async function inject_socket_io(server: http.Server): Promise<void> {
    const io = new Server(server)

    adapt_redis(io)

    // io.adapter(createAdapter())
    // setupWorker(io)

    io.on('connection', (socket) => on_connection(io, socket))
}