From 7faba645b0c170e216047dd3bc8b266c4ff375d4 Mon Sep 17 00:00:00 2001 From: molvqingtai Date: Tue, 7 Oct 2025 05:31:54 +0800 Subject: [PATCH] refactor: optimize protocol structure and naming consistency MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Remove redundant peerId from SiteMetaSchema (use message-level peerId) - Rename FromInfo to FromSite, fromInfos to fromSites - Fix unused imports and parameters πŸ€– Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- .../content/components/danmaku-message.tsx | 4 +- src/app/content/components/message-item.tsx | 32 +- src/app/content/components/prompt-item.tsx | 8 +- src/app/content/views/footer/index.tsx | 30 +- src/app/content/views/header/index.tsx | 54 +- src/app/content/views/main/index.tsx | 16 +- src/app/content/views/setup/index.tsx | 30 +- src/domain/AppStatus.ts | 4 +- src/domain/ChatRoom.ts | 480 +++++++++--------- src/domain/HLCClock.ts | 110 ++++ src/domain/MessageList.ts | 64 +-- src/domain/Notification.ts | 4 +- src/domain/WorldRoom.ts | 82 +-- src/protocol/ChatRoom.ts | 105 +--- src/protocol/Message.ts | 259 ++++++++++ src/protocol/WorldRoom.ts | 38 +- src/protocol/index.ts | 1 + src/service/Notification/index.ts | 4 +- src/utils/hlc.ts | 109 ++++ src/utils/index.ts | 11 + 20 files changed, 954 insertions(+), 491 deletions(-) create mode 100644 src/domain/HLCClock.ts create mode 100644 src/protocol/Message.ts create mode 100644 src/utils/hlc.ts diff --git a/src/app/content/components/danmaku-message.tsx b/src/app/content/components/danmaku-message.tsx index 99b8f12..a4acd72 100644 --- a/src/app/content/components/danmaku-message.tsx +++ b/src/app/content/components/danmaku-message.tsx @@ -25,8 +25,8 @@ const DanmakuMessage: FC = ({ data, className, onClick, onMouse )} > - - {data.username.at(0)} + + {data.sender.name.at(0)}
{data.body}
diff --git a/src/app/content/components/message-item.tsx b/src/app/content/components/message-item.tsx index ac41b3b..4d339d0 100644 --- a/src/app/content/components/message-item.tsx +++ b/src/app/content/components/message-item.tsx @@ -5,11 +5,11 @@ import FormatDate from './format-date' import { Avatar, AvatarImage, AvatarFallback } from '@/components/ui/avatar' import { Markdown } from '@/components/markdown' -import { type NormalMessage } from '@/domain/MessageList' +import { type TextMessage } from '@/domain/MessageList' import { cn } from '@/utils' export interface MessageItemProps { - data: NormalMessage + data: TextMessage index?: number like: boolean hate: boolean @@ -28,18 +28,18 @@ const MessageItem: FC = memo((props) => { let content = props.data.body - // Check if the field exists, compatible with old data - if (props.data.atUsers) { - const atUserPositions = props.data.atUsers.flatMap((user) => - user.positions.map((position) => ({ username: user.username, userId: user.userId, position })) + // Check if mentions exist + if (props.data.mentions && props.data.mentions.length > 0) { + const mentionPositions = props.data.mentions.flatMap((user) => + user.positions.map((position) => ({ name: user.name, id: user.id, position })) ) // Replace from back to front according to position to avoid affecting previous indices - atUserPositions + mentionPositions .sort((a, b) => b.position[0] - a.position[0]) - .forEach(({ position, username }) => { + .forEach(({ position, name }) => { const [start, end] = position - content = `${content.slice(0, start)} **@${username}** ${content.slice(end + 1)}` + content = `${content.slice(0, start)} **@${name}** ${content.slice(end + 1)}` }) } @@ -52,13 +52,15 @@ const MessageItem: FC = memo((props) => { )} > - - {props.data.username.at(0)} + + {props.data.sender.name.at(0)}
-
{props.data.username}
- +
+ {props.data.sender.name} +
+
@@ -68,7 +70,7 @@ const MessageItem: FC = memo((props) => { handleLikeChange(checked)} - count={props.data.likeUsers.length} + count={props.data.reactions.likes.length} > @@ -77,7 +79,7 @@ const MessageItem: FC = memo((props) => { handleHateChange(checked)} - count={props.data.hateUsers.length} + count={props.data.reactions.hates.length} > diff --git a/src/app/content/components/prompt-item.tsx b/src/app/content/components/prompt-item.tsx index 61290e3..cf4b285 100644 --- a/src/app/content/components/prompt-item.tsx +++ b/src/app/content/components/prompt-item.tsx @@ -1,12 +1,12 @@ import { Avatar, AvatarFallback } from '@/components/ui/avatar' import { Badge } from '@/components/ui/badge' -import type { PromptMessage } from '@/domain/MessageList' +import type { SystemPromptMessage } from '@/domain/MessageList' import { cn } from '@/utils' import { AvatarImage } from '@radix-ui/react-avatar' import { type FC, memo } from 'react' export interface PromptItemProps { - data: PromptMessage + data: SystemPromptMessage className?: string } @@ -15,8 +15,8 @@ const PromptItem: FC = memo(({ data, className }) => {
- - {data.username.at(0)} + + {data.sender.name.at(0)} {data.body} diff --git a/src/app/content/views/footer/index.tsx b/src/app/content/views/footer/index.tsx index 1f6460f..fa0d209 100644 --- a/src/app/content/views/footer/index.tsx +++ b/src/app/content/views/footer/index.tsx @@ -84,8 +84,8 @@ const Footer: FC = () => { atUserRecord.current.forEach((item, userId) => { // Pre-calculate the offset after InputCommand const positionList = [...item].filter((item) => { - const username = message.slice(item[0], item[1] + 1) - return username === `@${userList.find((user) => user.userId === userId)?.username}` + const name = message.slice(item[0], item[1] + 1) + return name === `@${userList.find((user) => user.id === userId)?.name}` }) if (positionList.length) { atUserRecord.current.set(userId, new Set(positionList)) @@ -102,10 +102,10 @@ const Footer: FC = () => { const autoCompleteList = useMemo(() => { return userList - .filter((user) => user.userId !== userInfo?.id) + .filter((user) => user.id !== userInfo?.id) .map((item) => ({ ...item, - similarity: getTextSimilarity(searchNameKeyword.toLowerCase(), item.username.toLowerCase()) + similarity: getTextSimilarity(searchNameKeyword.toLowerCase(), item.name.toLowerCase()) })) .toSorted((a, b) => b.similarity - a.similarity) }, [searchNameKeyword, userList, userInfo]) @@ -136,14 +136,14 @@ const Footer: FC = () => { return } const transformedMessage = await transformMessage(message) - const atUsers = [...atUserRecord.current] + const mentions = [...atUserRecord.current] .map(([userId, positions]) => { - const user = userList.find((user) => user.userId === userId) + const user = userList.find((user) => user.id === userId) return (user ? { ...user, positions: [...positions] } : undefined)! }) .filter(Boolean) - const newMessage = { body: transformedMessage, atUsers } + const newMessage = { body: transformedMessage, mentions } const byteSize = getTextByteSize(JSON.stringify(newMessage)) if (byteSize > WEB_RTC_MAX_MESSAGE_SIZE) { @@ -151,7 +151,7 @@ const Footer: FC = () => { } send([ - chatRoomDomain.command.SendTextMessageCommand({ body: transformedMessage, atUsers }), + chatRoomDomain.command.SendTextMessageCommand({ body: transformedMessage, mentions }), messageInputDomain.command.ClearCommand() ]) } @@ -193,7 +193,7 @@ const Footer: FC = () => { if (isComposing.current) return if (autoCompleteListShow && autoCompleteList.length) { - handleInjectAtSyntax(selectedUser.username) + handleInjectAtSyntax(selectedUser.name) } else { handleSend() } @@ -314,7 +314,7 @@ const Footer: FC = () => { // Calculate the difference after replacing @text with @user const offset = newMessage.length - message.length - (atUserPosition[1] - atUserPosition[0]) - updateAtUserAtRecord(newMessage, ...atUserPosition, offset, selectedUser.userId) + updateAtUserAtRecord(newMessage, ...atUserPosition, offset, selectedUser.id) send(messageInputDomain.command.InputCommand(newMessage)) requestIdleCallback(() => { @@ -343,8 +343,8 @@ const Footer: FC = () => { customScrollParent={scrollParentRef!} itemContent={(index, user) => (
handleInjectAtSyntax(user.username)} + key={user.id} + onClick={() => handleInjectAtSyntax(user.name)} onMouseEnter={() => setSelectedUserIndex(index)} className={cn( 'flex cursor-pointer select-none items-center gap-x-2 rounded-md px-2 py-1.5 outline-none', @@ -354,10 +354,10 @@ const Footer: FC = () => { )} > - - {user.username.at(0)} + + {user.name.at(0)} -
{user.username}
+
{user.name}
)} > diff --git a/src/app/content/views/header/index.tsx b/src/app/content/views/header/index.tsx index f4097e7..8d4ffee 100644 --- a/src/app/content/views/header/index.tsx +++ b/src/app/content/views/header/index.tsx @@ -1,4 +1,4 @@ -import { useState, type FC } from 'react' +import { useState, useMemo, type FC } from 'react' import { Globe2Icon } from 'lucide-react' import { Avatar, AvatarFallback, AvatarImage } from '@/components/ui/avatar' import { HoverCard, HoverCardContent, HoverCardTrigger } from '@/components/ui/hover-card' @@ -6,7 +6,7 @@ import { Button } from '@/components/ui/button' import { cn, getSiteMeta } from '@/utils' import { useRemeshDomain, useRemeshQuery } from 'remesh-react' import ChatRoomDomain from '@/domain/ChatRoom' -import type { FromInfo, RoomUser } from '@/domain/WorldRoom' +import type { FromSite, RoomUser } from '@/domain/WorldRoom' import WorldRoomDomain from '@/domain/WorldRoom' import { ScrollArea } from '@/components/ui/scroll-area' import { Virtuoso } from 'react-virtuoso' @@ -22,19 +22,23 @@ const Header: FC = () => { const worldUserList = useRemeshQuery(worldRoomDomain.query.UserListQuery()) const chatOnlineCount = chatUserList.length - const worldOnlineGroup = worldUserList - .flatMap((user) => user.fromInfos.map((from) => ({ from, user }))) - .reduce<(FromInfo & { users: RoomUser[] })[]>((acc, item) => { - const existSite = acc.find((group) => group.origin === item.from.origin) - if (existSite) { - const existUser = existSite.users.find((user) => user.userId === item.user.userId) - !existUser && existSite.users.push(item.user) - } else { - acc.push({ ...item.from, users: [item.user] }) - } - return acc - }, []) - .sort((a, b) => b.users.length - a.users.length) + const worldOnlineGroup = useMemo( + () => + worldUserList + .flatMap((user) => user.fromSites.map((from) => ({ from, user }))) + .reduce<(FromSite & { users: RoomUser[] })[]>((acc, item) => { + const existSite = acc.find((group) => group.origin === item.from.origin) + if (existSite) { + const existUser = existSite.users.find((user: RoomUser) => user.id === item.user.id) + !existUser && existSite.users.push(item.user) + } else { + acc.push({ ...item.from, users: [item.user] }) + } + return acc + }, []) + .sort((a, b) => b.users.length - a.users.length), + [worldUserList] + ) const [chatUserListScrollParentRef, setChatUserListScrollParentRef] = useState(null) const [worldOnlineGroupScrollParentRef, setWorldOnlineGroupScrollParentRef] = useState(null) @@ -60,7 +64,11 @@ const Header: FC = () => { site.origin} + skipAnimationFrameInResizeObserver itemContent={(_index, site) => ( {
- user.userAvatar)} /> + user.avatar)} + />
)} @@ -154,14 +166,18 @@ const Header: FC = () => { user.id} + skipAnimationFrameInResizeObserver itemContent={(_index, user) => (
- - {user.username.at(0)} + + {user.name.at(0)} -
{user.username}
+
{user.name}
)} >
diff --git a/src/app/content/views/main/index.tsx b/src/app/content/views/main/index.tsx index b932b00..50f935a 100644 --- a/src/app/content/views/main/index.tsx +++ b/src/app/content/views/main/index.tsx @@ -8,7 +8,7 @@ import UserInfoDomain from '@/domain/UserInfo' import ChatRoomDomain from '@/domain/ChatRoom' import MessageListDomain from '@/domain/MessageList' import useDataId from '@/hooks/useDataId' -import { ChatRoomMessageType } from '@/protocol' +import { compareHLC } from '@/utils' const Main: FC = () => { const send = useRemeshSend() @@ -24,31 +24,31 @@ const Main: FC = () => { () => _messageList .map((message) => { - if (message.type === ChatRoomMessageType.Normal) { + if (message.type === 'text') { return { ...message, - like: message.likeUsers.some((likeUser) => likeUser.userId === userInfo?.id), - hate: message.hateUsers.some((hateUser) => hateUser.userId === userInfo?.id) + like: message.reactions.likes.some((likeUser) => likeUser.id === userInfo?.id), + hate: message.reactions.hates.some((hateUser) => hateUser.id === userInfo?.id) } } return message }) - .toSorted((a, b) => a.sendTime - b.sendTime), + .toSorted((a, b) => compareHLC(a.hlc, b.hlc)), [messageListId, userInfo?.id] ) const handleLikeChange = (messageId: string) => { - send(chatRoomDomain.command.SendLikeMessageCommand(messageId)) + send(chatRoomDomain.command.SendReactionCommand({ messageId, reaction: 'like' })) } const handleHateChange = (messageId: string) => { - send(chatRoomDomain.command.SendHateMessageCommand(messageId)) + send(chatRoomDomain.command.SendReactionCommand({ messageId, reaction: 'hate' })) } return ( {messageList.map((message, index) => - message.type === ChatRoomMessageType.Normal ? ( + message.type === 'text' ? ( => { } const generateMessage = async (userInfo: UserInfo): Promise => { - const { name: username, avatar: userAvatar, id: userId } = userInfo + const { name, avatar, id } = userInfo + const now = Date.now() return { id: nanoid(), + type: MESSAGE_TYPE.TEXT, + hlc: createHLC(), + sentAt: now, + receivedAt: now, + sender: { + id, + name, + avatar + }, body: mockTextList.shift()!, - sendTime: Date.now(), - receiveTime: Date.now(), - type: ChatRoomMessageType.Normal, - userId, - username, - userAvatar, - likeUsers: mockTextList.length ? [] : [{ userId, username, userAvatar }], - hateUsers: [], - atUsers: [] + mentions: [], + reactions: { + likes: mockTextList.length ? [] : [{ id, name, avatar }], + hates: [] + } } } diff --git a/src/domain/AppStatus.ts b/src/domain/AppStatus.ts index c9523fb..519a5d7 100644 --- a/src/domain/AppStatus.ts +++ b/src/domain/AppStatus.ts @@ -5,7 +5,7 @@ import { APP_STATUS_STORAGE_KEY } from '@/constants/config' import StorageEffect from './modules/StorageEffect' import ChatRoomDomain from '@/domain/ChatRoom' import { map } from 'rxjs' -import { ChatRoomSendType } from '@/protocol' +import { MESSAGE_TYPE } from '@/protocol/Message' export interface AppStatus { open: boolean @@ -136,7 +136,7 @@ const AppStatusDomain = Remesh.domain({ const onMessage$ = fromEvent(chatRoomDomain.event.OnMessageEvent).pipe( map((message) => { const status = get(StatusState()) - if (!status.open && message.type === ChatRoomSendType.Text) { + if (!status.open && message.type === MESSAGE_TYPE.TEXT) { return UpdateUnreadCommand(status.unread + 1) } return null diff --git a/src/domain/ChatRoom.ts b/src/domain/ChatRoom.ts index 9760977..326a1f3 100644 --- a/src/domain/ChatRoom.ts +++ b/src/domain/ChatRoom.ts @@ -1,34 +1,35 @@ import { Remesh } from 'remesh' import { map, merge, of, EMPTY, mergeMap, fromEventPattern, bufferTime, filter } from 'rxjs' -import type { AtUser, NormalMessage } from './MessageList' -import { type MessageUser } from './MessageList' +import type { MessageUser, MentionedUser } from './MessageList' import { ChatRoomExtern } from '@/domain/externs/ChatRoom' import MessageListDomain from '@/domain/MessageList' import UserInfoDomain from '@/domain/UserInfo' -import { desert, getTextByteSize, upsert } from '@/utils' +import HLCClockDomain from '@/domain/HLCClock' +import { desert, getTextByteSize, upsert, compareHLC, sendEvent } from '@/utils' import { nanoid } from 'nanoid' import StatusModule from '@/domain/modules/Status' import { SYNC_HISTORY_MAX_DAYS, WEB_RTC_MAX_MESSAGE_SIZE } from '@/constants/config' import hash from 'hash-it' import { - checkChatRoomMessage, - ChatRoomMessageType, - ChatRoomSendType, - type ChatRoomMessage, - type ChatRoomTextMessage, - type ChatRoomLikeMessage, - type ChatRoomHateMessage, - type ChatRoomSyncUserMessage, - type ChatRoomSyncHistoryMessage -} from '@/protocol' + validateNetworkMessage, + type NetworkMessage, + type TextMessage, + type ReactionMessage, + type PeerSyncMessage, + type HistorySyncMessage, + MESSAGE_TYPE, + REACTION_TYPE, + PROMPT_TYPE +} from '@/protocol/Message' -export type RoomUser = MessageUser & { peerIds: string[]; joinTime: number } +export type RoomUser = MessageUser & { peerIds: string[]; joinedAt: number } const ChatRoomDomain = Remesh.domain({ name: 'ChatRoomDomain', impl: (domain) => { const messageListDomain = domain.getDomain(MessageListDomain()) const userInfoDomain = domain.getDomain(UserInfoDomain()) + const hlcClockDomain = domain.getDomain(HLCClockDomain()) const chatRoomExtern = domain.getExtern(ChatRoomExtern) const PeerIdState = domain.state({ @@ -66,14 +67,18 @@ const ChatRoomDomain = Remesh.domain({ } }) - const LastMessageTimeQuery = domain.query({ - name: 'Room.LastMessageTimeQuery', + const LastMessageHLCQuery = domain.query({ + name: 'Room.LastMessageHLCQuery', impl: ({ get }) => { - return ( - get(messageListDomain.query.ListQuery()) - .filter((message) => message.type === ChatRoomMessageType.Normal) - .toSorted((a, b) => b.sendTime - a.sendTime)[0]?.sendTime ?? new Date(1970, 1, 1).getTime() + const messages = get(messageListDomain.query.ListQuery()).filter( + (message) => message.type === MESSAGE_TYPE.TEXT ) + + if (!messages.length) { + return { timestamp: 0, counter: 0 } + } + + return messages.reduce((latest, msg) => (compareHLC(msg.hlc, latest.hlc) > 0 ? msg : latest)).hlc } }) @@ -99,63 +104,66 @@ const ChatRoomDomain = Remesh.domain({ */ const HandleJoinLeaveMessageCommand = domain.command({ name: 'Room.HandleJoinLeaveMessageCommand', - impl: ( - { get }, - payload: { userId: string; username: string; userAvatar: string; messageType: 'join' | 'leave' } - ) => { - const { userId, username, userAvatar, messageType } = payload + impl: ({ get }, payload: { id: string; name: string; avatar: string; messageType: 'join' | 'leave' }) => { + const { id, name, avatar, messageType } = payload const now = Date.now() - const messageBody = messageType === 'join' ? `"${username}" joined the chat` : `"${username}" left the chat` + const currentHLC = get(hlcClockDomain.query.CurrentHLCQuery()) + const newHLC = sendEvent(currentHLC) + const messageBody = messageType === 'join' ? `"${name}" joined the chat` : `"${name}" left the chat` // Find user's most recent join/leave message const messageList = get(messageListDomain.query.ListQuery()) const userPromptMessages = messageList - .filter((msg) => msg.type === ChatRoomMessageType.Prompt && msg.userId === userId) - .toSorted((a, b) => b.sendTime - a.sendTime) + .filter((msg) => msg.type === MESSAGE_TYPE.SYSTEM_PROMPT && msg.sender.id === id) + .toSorted((a, b) => compareHLC(b.hlc, a.hlc)) const lastMessage = userPromptMessages[0] // If the previous message is from the same user, delete it if (lastMessage) { return [ + hlcClockDomain.command.SendEventCommand(), messageListDomain.command.DeleteItemCommand(lastMessage.id), messageListDomain.command.CreateItemCommand({ + type: MESSAGE_TYPE.SYSTEM_PROMPT, id: nanoid(), - userId, - username, - userAvatar, + hlc: newHLC, + sentAt: now, + receivedAt: now, + sender: { id, name, avatar }, body: messageBody, - type: ChatRoomMessageType.Prompt, - sendTime: now, - receiveTime: now + promptType: messageType === 'join' ? PROMPT_TYPE.JOIN : PROMPT_TYPE.LEAVE }) ] } // Create new message (first message from this user) - return messageListDomain.command.CreateItemCommand({ - id: nanoid(), - userId, - username, - userAvatar, - body: messageBody, - type: ChatRoomMessageType.Prompt, - sendTime: now, - receiveTime: now - }) + return [ + hlcClockDomain.command.SendEventCommand(), + messageListDomain.command.CreateItemCommand({ + type: MESSAGE_TYPE.SYSTEM_PROMPT, + id: nanoid(), + hlc: newHLC, + sentAt: now, + receivedAt: now, + sender: { id, name, avatar }, + body: messageBody, + promptType: messageType === 'join' ? PROMPT_TYPE.JOIN : PROMPT_TYPE.LEAVE + }) + ] } }) const JoinRoomCommand = domain.command({ name: 'Room.JoinRoomCommand', impl: ({ get }) => { - const { id: userId, name: username, avatar: userAvatar } = get(userInfoDomain.query.UserInfoQuery())! + const { id, name, avatar } = get(userInfoDomain.query.UserInfoQuery())! return [ UpdateUserListCommand({ type: 'create', - user: { peerId: chatRoomExtern.peerId, joinTime: Date.now(), userId, username, userAvatar } + user: { peerId: chatRoomExtern.peerId, joinedAt: Date.now(), id, name, avatar } }), - HandleJoinLeaveMessageCommand({ userId, username, userAvatar, messageType: 'join' }), + HandleJoinLeaveMessageCommand({ id, name, avatar, messageType: PROMPT_TYPE.JOIN }), JoinStatusModule.command.SetFinishedCommand(), JoinRoomEvent(chatRoomExtern.roomId), SelfJoinRoomEvent(chatRoomExtern.roomId) @@ -171,12 +179,12 @@ const ChatRoomDomain = Remesh.domain({ const LeaveRoomCommand = domain.command({ name: 'Room.LeaveRoomCommand', impl: ({ get }) => { - const { id: userId, name: username, avatar: userAvatar } = get(userInfoDomain.query.UserInfoQuery())! + const { id, name, avatar } = get(userInfoDomain.query.UserInfoQuery())! return [ - HandleJoinLeaveMessageCommand({ userId, username, userAvatar, messageType: 'leave' }), + HandleJoinLeaveMessageCommand({ id, name, avatar, messageType: PROMPT_TYPE.LEAVE }), UpdateUserListCommand({ type: 'delete', - user: { peerId: chatRoomExtern.peerId, joinTime: Date.now(), userId, username, userAvatar } + user: { peerId: chatRoomExtern.peerId, joinedAt: Date.now(), id, name, avatar } }), JoinStatusModule.command.SetInitialCommand(), LeaveRoomEvent(chatRoomExtern.roomId), @@ -192,25 +200,29 @@ const ChatRoomDomain = Remesh.domain({ const SendTextMessageCommand = domain.command({ name: 'Room.SendTextMessageCommand', - impl: ({ get }, message: string | { body: string; atUsers: AtUser[] }) => { + impl: ({ get }, message: string | { body: string; mentions: MentionedUser[] }) => { const self = get(SelfUserQuery()) + const now = Date.now() + const currentHLC = get(hlcClockDomain.query.CurrentHLCQuery()) + const newHLC = sendEvent(currentHLC) - const textMessage: ChatRoomTextMessage = { - ...self, + const textMessage: TextMessage = { + type: MESSAGE_TYPE.TEXT, id: nanoid(), - type: ChatRoomSendType.Text, - sendTime: Date.now(), + hlc: newHLC, + sentAt: now, + receivedAt: now, + sender: { + id: self.id, + name: self.name, + avatar: self.avatar + }, body: typeof message === 'string' ? message : message.body, - atUsers: typeof message === 'string' ? [] : message.atUsers - } - - const listMessage: NormalMessage = { - ...textMessage, - type: ChatRoomMessageType.Normal, - receiveTime: Date.now(), - likeUsers: [], - hateUsers: [], - atUsers: typeof message === 'string' ? [] : message.atUsers + mentions: typeof message === 'string' ? [] : message.mentions, + reactions: { + likes: [], + hates: [] + } } /** @@ -228,25 +240,52 @@ const ChatRoomDomain = Remesh.domain({ // Only send to network if there are other peers, but always save to local peerIds.length && chatRoomExtern.sendMessage(textMessage, peerIds) - return [messageListDomain.command.CreateItemCommand(listMessage), SendTextMessageEvent(textMessage)] + return [ + hlcClockDomain.command.SendEventCommand(), + messageListDomain.command.CreateItemCommand(textMessage), + SendTextMessageEvent(textMessage) + ] } }) - const SendLikeMessageCommand = domain.command({ - name: 'Room.SendLikeMessageCommand', - impl: ({ get }, messageId: string) => { + const SendReactionCommand = domain.command({ + name: 'Room.SendReactionCommand', + impl: ({ get }, payload: { messageId: string; reaction: 'like' | 'hate' }) => { + const { messageId, reaction } = payload const self = get(SelfUserQuery()) - const localMessage = get(messageListDomain.query.ItemQuery(messageId)) as NormalMessage + const now = Date.now() + const currentHLC = get(hlcClockDomain.query.CurrentHLCQuery()) + const newHLC = sendEvent(currentHLC) + const localMessage = get(messageListDomain.query.ItemQuery(messageId)) as TextMessage - const likeMessage: ChatRoomLikeMessage = { - ...self, - id: messageId, - sendTime: Date.now(), - type: ChatRoomSendType.Like + const reactionMessage: ReactionMessage = { + type: MESSAGE_TYPE.REACTION, + id: nanoid(), + hlc: newHLC, + sentAt: now, + receivedAt: now, + sender: { + id: self.id, + name: self.name, + avatar: self.avatar + }, + targetId: messageId, + reaction: reaction === REACTION_TYPE.LIKE ? REACTION_TYPE.LIKE : REACTION_TYPE.HATE } - const listMessage: NormalMessage = { + + const senderInfo = { id: self.id, name: self.name, avatar: self.avatar } + const updatedMessage: TextMessage = { ...localMessage, - likeUsers: desert(localMessage.likeUsers, likeMessage, 'userId') + reactions: { + likes: + reaction === REACTION_TYPE.LIKE + ? desert(localMessage.reactions.likes, senderInfo, 'id') + : localMessage.reactions.likes, + hates: + reaction === REACTION_TYPE.HATE + ? desert(localMessage.reactions.hates, senderInfo, 'id') + : localMessage.reactions.hates + } } /** @@ -256,39 +295,13 @@ const ChatRoomDomain = Remesh.domain({ const peerIds = get(PeerListQuery()) // Only send to network if there are other peers, but always save to local - peerIds.length && chatRoomExtern.sendMessage(likeMessage, peerIds) + peerIds.length && chatRoomExtern.sendMessage(reactionMessage, peerIds) - return [messageListDomain.command.UpdateItemCommand(listMessage), SendLikeMessageEvent(likeMessage)] - } - }) - - const SendHateMessageCommand = domain.command({ - name: 'Room.SendHateMessageCommand', - impl: ({ get }, messageId: string) => { - const self = get(SelfUserQuery()) - const localMessage = get(messageListDomain.query.ItemQuery(messageId)) as NormalMessage - - const hateMessage: ChatRoomHateMessage = { - ...self, - id: messageId, - sendTime: Date.now(), - type: ChatRoomSendType.Hate - } - const listMessage: NormalMessage = { - ...localMessage, - hateUsers: desert(localMessage.hateUsers, hateMessage, 'userId') - } - - /** - * Get all peerIds from UserList except self. - * @see SendTextMessageCommand for detailed explanation. - */ - const peerIds = get(PeerListQuery()) - - // Only send to network if there are other peers, but always save to local - peerIds.length && chatRoomExtern.sendMessage(hateMessage, peerIds) - - return [messageListDomain.command.UpdateItemCommand(listMessage), SendHateMessageEvent(hateMessage)] + return [ + hlcClockDomain.command.SendEventCommand(), + messageListDomain.command.UpdateItemCommand(updatedMessage), + SendReactionMessageEvent(reactionMessage) + ] } }) @@ -296,19 +309,29 @@ const ChatRoomDomain = Remesh.domain({ name: 'Room.SendSyncUserMessageCommand', impl: ({ get }, peerId: string) => { const self = get(SelfUserQuery()) - const lastMessageTime = get(LastMessageTimeQuery()) + const now = Date.now() + const currentHLC = get(hlcClockDomain.query.CurrentHLCQuery()) + const newHLC = sendEvent(currentHLC) + const lastMessageHLC = get(LastMessageHLCQuery()) - const syncUserMessage: ChatRoomSyncUserMessage = { - ...self, + const syncUserMessage: PeerSyncMessage = { + type: MESSAGE_TYPE.PEER_SYNC, id: nanoid(), + hlc: newHLC, + sentAt: now, + receivedAt: now, + sender: { + id: self.id, + name: self.name, + avatar: self.avatar + }, peerId: chatRoomExtern.peerId, - sendTime: Date.now(), - lastMessageTime, - type: ChatRoomSendType.SyncUser + joinedAt: self.joinedAt, + lastMessageHLC } chatRoomExtern.sendMessage(syncUserMessage, peerId) - return [SendSyncUserMessageEvent(syncUserMessage)] + return [hlcClockDomain.command.SendEventCommand(), SendSyncUserMessageEvent(syncUserMessage)] } }) @@ -333,28 +356,41 @@ const ChatRoomDomain = Remesh.domain({ */ const SendSyncHistoryMessageCommand = domain.command({ name: 'Room.SendSyncHistoryMessageCommand', - impl: ({ get }, { peerId, lastMessageTime }: { peerId: string; lastMessageTime: number }) => { + impl: ( + { get }, + { peerId, lastMessageHLC }: { peerId: string; lastMessageHLC: { timestamp: number; counter: number } } + ) => { const self = get(SelfUserQuery()) + const now = Date.now() const historyMessages = get(messageListDomain.query.ListQuery()).filter((message) => { return ( - message.type === ChatRoomMessageType.Normal && - message.sendTime > lastMessageTime && - message.sendTime >= Date.now() - SYNC_HISTORY_MAX_DAYS * 24 * 60 * 60 * 1000 + message.type === MESSAGE_TYPE.TEXT && + compareHLC(message.hlc, lastMessageHLC) > 0 && + message.hlc.timestamp >= Date.now() - SYNC_HISTORY_MAX_DAYS * 24 * 60 * 60 * 1000 ) - }) + }) as TextMessage[] /** * Message chunking to ensure that each message does not exceed WEB_RTC_MAX_MESSAGE_SIZE * If the message itself exceeds the size limit, skip syncing that message directly. */ - const pushHistoryMessageList = historyMessages.reduce((acc, cur) => { - const pushHistoryMessage: ChatRoomSyncHistoryMessage = { - ...self, + const pushHistoryMessageList = historyMessages.reduce((acc, cur) => { + const currentHLC = get(hlcClockDomain.query.CurrentHLCQuery()) + const newHLC = sendEvent(currentHLC) + + const pushHistoryMessage: HistorySyncMessage = { + type: MESSAGE_TYPE.HISTORY_SYNC, id: nanoid(), - sendTime: Date.now(), - type: ChatRoomSendType.SyncHistory, - messages: [cur as NormalMessage] + hlc: newHLC, + sentAt: now, + receivedAt: now, + sender: { + id: self.id, + name: self.name, + avatar: self.avatar + }, + messages: [cur] } const pushHistoryMessageByteSize = getTextByteSize(JSON.stringify(pushHistoryMessage)) @@ -362,7 +398,7 @@ const ChatRoomDomain = Remesh.domain({ if (acc.length) { const mergedSize = getTextByteSize(JSON.stringify(acc[acc.length - 1])) + pushHistoryMessageByteSize if (mergedSize < WEB_RTC_MAX_MESSAGE_SIZE) { - acc[acc.length - 1].messages.push(cur as NormalMessage) + acc[acc.length - 1].messages.push(cur) } else { acc.push(pushHistoryMessage) } @@ -375,7 +411,7 @@ const ChatRoomDomain = Remesh.domain({ return pushHistoryMessageList.map((message) => { chatRoomExtern.sendMessage(message, peerId) - return SendSyncHistoryMessageEvent(message) + return [hlcClockDomain.command.SendEventCommand(), SendSyncHistoryMessageEvent(message)] }) } }) @@ -384,14 +420,14 @@ const ChatRoomDomain = Remesh.domain({ name: 'Room.UpdateUserListCommand', impl: ({ get }, action: { type: 'create' | 'delete'; user: Omit & { peerId: string } }) => { const userList = get(UserListState()) - const existUser = userList.find((user) => user.userId === action.user.userId) + const existUser = userList.find((user) => user.id === action.user.id) if (action.type === 'create') { return [ UserListState().new( upsert( userList, { ...action.user, peerIds: [...new Set(existUser?.peerIds || []), action.user.peerId] }, - 'userId' + 'id' ) ) ] @@ -404,7 +440,7 @@ const ChatRoomDomain = Remesh.domain({ ...action.user, peerIds: existUser?.peerIds?.filter((peerId) => peerId !== action.user.peerId) || [] }, - 'userId' + 'id' ).filter((user) => user.peerIds.length) ) ] @@ -412,24 +448,20 @@ const ChatRoomDomain = Remesh.domain({ } }) - const SendSyncHistoryMessageEvent = domain.event({ + const SendSyncHistoryMessageEvent = domain.event({ name: 'Room.SendSyncHistoryMessageEvent' }) - const SendSyncUserMessageEvent = domain.event({ + const SendSyncUserMessageEvent = domain.event({ name: 'Room.SendSyncUserMessageEvent' }) - const SendTextMessageEvent = domain.event({ + const SendTextMessageEvent = domain.event({ name: 'Room.SendTextMessageEvent' }) - const SendLikeMessageEvent = domain.event({ - name: 'Room.SendLikeMessageEvent' - }) - - const SendHateMessageEvent = domain.event({ - name: 'Room.SendHateMessageEvent' + const SendReactionMessageEvent = domain.event({ + name: 'Room.SendReactionMessageEvent' }) const JoinRoomEvent = domain.event({ @@ -440,32 +472,28 @@ const ChatRoomDomain = Remesh.domain({ name: 'Room.LeaveRoomEvent' }) - const OnMessageEvent = domain.event({ + const OnMessageEvent = domain.event({ name: 'Room.OnMessageEvent' }) - const OnTextMessageEvent = domain.event({ + const OnTextMessageEvent = domain.event({ name: 'Room.OnTextMessageEvent' }) - const OnSyncUserMessageEvent = domain.event({ + const OnSyncUserMessageEvent = domain.event({ name: 'Room.OnSyncUserMessageEvent' }) - const OnSyncHistoryMessageEvent = domain.event({ + const OnSyncHistoryMessageEvent = domain.event({ name: 'Room.OnSyncHistoryMessageEvent' }) - const OnSyncMessageEvent = domain.event({ + const OnSyncMessageEvent = domain.event({ name: 'Room.OnSyncMessageEvent' }) - const OnLikeMessageEvent = domain.event({ - name: 'Room.OnLikeMessageEvent' - }) - - const OnHateMessageEvent = domain.event({ - name: 'Room.OnHateMessageEvent' + const OnReactionMessageEvent = domain.event({ + name: 'Room.OnReactionMessageEvent' }) const OnJoinRoomEvent = domain.event({ @@ -508,10 +536,10 @@ const ChatRoomDomain = Remesh.domain({ domain.effect({ name: 'Room.OnMessageEffect', impl: () => { - const onMessage$ = fromEventPattern(chatRoomExtern.onMessage).pipe( + const onMessage$ = fromEventPattern(chatRoomExtern.onMessage).pipe( mergeMap((message) => { // Filter out messages that do not conform to the format - if (!checkChatRoomMessage(message)) { + if (!validateNetworkMessage(message)) { console.warn('Invalid message format', message) return EMPTY } @@ -521,16 +549,14 @@ const ChatRoomDomain = Remesh.domain({ // Emit specific message type events const specificEvent$ = (() => { switch (message.type) { - case ChatRoomSendType.Text: + case MESSAGE_TYPE.TEXT: return of(OnTextMessageEvent(message)) - case ChatRoomSendType.SyncUser: + case MESSAGE_TYPE.PEER_SYNC: return of(OnSyncUserMessageEvent(message)) - case ChatRoomSendType.SyncHistory: + case MESSAGE_TYPE.HISTORY_SYNC: return of(OnSyncHistoryMessageEvent(message)) - case ChatRoomSendType.Like: - return of(OnLikeMessageEvent(message)) - case ChatRoomSendType.Hate: - return of(OnHateMessageEvent(message)) + case MESSAGE_TYPE.REACTION: + return of(OnReactionMessageEvent(message)) default: console.warn('Unsupported message type', message) return EMPTY @@ -549,13 +575,15 @@ const ChatRoomDomain = Remesh.domain({ impl: ({ fromEvent }) => { return fromEvent(OnTextMessageEvent).pipe( map((message) => { - return messageListDomain.command.CreateItemCommand({ + // Update local HLC based on received message + const receivedMessage: TextMessage = { ...message, - type: ChatRoomMessageType.Normal, - receiveTime: Date.now(), - likeUsers: [], - hateUsers: [] - }) + receivedAt: Date.now() + } + return [ + hlcClockDomain.command.ReceiveEventCommand(message.hlc), + messageListDomain.command.CreateItemCommand(receivedMessage) + ] }) ) } @@ -569,26 +597,33 @@ const ChatRoomDomain = Remesh.domain({ const selfUser = get(SelfUserQuery()) // If a new user joins after the current user has entered the room, a join log message needs to be created. - const existUser = get(UserListQuery()).find((user) => user.userId === message.userId) - const isNewJoinUser = !existUser && message.joinTime > selfUser.joinTime + const existUser = get(UserListQuery()).find((user) => user.id === message.sender.id) + const isNewJoinUser = !existUser && message.joinedAt > selfUser.joinedAt - const lastMessageTime = get(LastMessageTimeQuery()) - const needSyncHistory = lastMessageTime > message.lastMessageTime + const lastMessageHLC = get(LastMessageHLCQuery()) + const needSyncHistory = compareHLC(lastMessageHLC, message.lastMessageHLC) > 0 + + const userForList = { + ...message.sender, + peerId: message.peerId, + joinedAt: message.joinedAt + } return of( - UpdateUserListCommand({ type: 'create', user: message }), + hlcClockDomain.command.ReceiveEventCommand(message.hlc), + UpdateUserListCommand({ type: 'create', user: userForList }), isNewJoinUser ? HandleJoinLeaveMessageCommand({ - userId: message.userId, - username: message.username, - userAvatar: message.userAvatar, - messageType: 'join' + id: message.sender.id, + name: message.sender.name, + avatar: message.sender.avatar, + messageType: PROMPT_TYPE.JOIN }) : null, needSyncHistory ? SendSyncHistoryMessageCommand({ peerId: message.peerId, - lastMessageTime: message.lastMessageTime + lastMessageHLC: message.lastMessageHLC }) : null ) @@ -609,7 +644,7 @@ const ChatRoomDomain = Remesh.domain({ // Deduplicate messages by id, keep the latest one const uniqueMessages = [ - ...allMessages.reduce((map, msg) => map.set(msg.id, msg), new Map()).values() + ...allMessages.reduce((map, msg) => map.set(msg.id, msg), new Map()).values() ] // Filter out messages that haven't changed @@ -622,9 +657,16 @@ const ChatRoomDomain = Remesh.domain({ } }) + // Update HLC for each received history message + const maxHLC = uniqueMessages.reduce((max, msg) => (compareHLC(msg.hlc, max) > 0 ? msg.hlc : max), { + timestamp: 0, + counter: 0 + }) + // Return batched upsert commands and single OnSyncMessageEvent for all sync messages return changedMessages.length ? of( + hlcClockDomain.command.ReceiveEventCommand(maxHLC), ...changedMessages.map((message) => messageListDomain.command.UpsertItemCommand(message)), OnSyncMessageEvent(syncMessages) ) @@ -635,57 +677,33 @@ const ChatRoomDomain = Remesh.domain({ }) domain.effect({ - name: 'Room.OnLikeMessageEffect', + name: 'Room.OnReactionMessageEffect', impl: ({ get, fromEvent }) => { - return fromEvent(OnLikeMessageEvent).pipe( + return fromEvent(OnReactionMessageEvent).pipe( mergeMap((message) => { - if (!get(messageListDomain.query.HasItemQuery(message.id))) { + if (!get(messageListDomain.query.HasItemQuery(message.targetId))) { return EMPTY } - const _message = get(messageListDomain.query.ItemQuery(message.id)) as NormalMessage - return of( - messageListDomain.command.UpdateItemCommand({ - ..._message, - receiveTime: Date.now(), - likeUsers: desert( - _message.likeUsers, - { - userId: message.userId, - username: message.username, - userAvatar: message.userAvatar - }, - 'userId' - ) - }) - ) - }) - ) - } - }) + const targetMessage = get(messageListDomain.query.ItemQuery(message.targetId)) as TextMessage - domain.effect({ - name: 'Room.OnHateMessageEffect', - impl: ({ get, fromEvent }) => { - return fromEvent(OnHateMessageEvent).pipe( - mergeMap((message) => { - if (!get(messageListDomain.query.HasItemQuery(message.id))) { - return EMPTY + const updatedMessage: TextMessage = { + ...targetMessage, + receivedAt: Date.now(), + reactions: { + likes: + message.reaction === REACTION_TYPE.LIKE + ? desert(targetMessage.reactions.likes, message.sender, 'id') + : targetMessage.reactions.likes, + hates: + message.reaction === REACTION_TYPE.HATE + ? desert(targetMessage.reactions.hates, message.sender, 'id') + : targetMessage.reactions.hates + } } - const _message = get(messageListDomain.query.ItemQuery(message.id)) as NormalMessage + return of( - messageListDomain.command.UpdateItemCommand({ - ..._message, - receiveTime: Date.now(), - hateUsers: desert( - _message.hateUsers, - { - userId: message.userId, - username: message.username, - userAvatar: message.userAvatar - }, - 'userId' - ) - }) + hlcClockDomain.command.ReceiveEventCommand(message.hlc), + messageListDomain.command.UpdateItemCommand(updatedMessage) ) }) ) @@ -709,10 +727,10 @@ const ChatRoomDomain = Remesh.domain({ UpdateUserListCommand({ type: 'delete', user: { ...existUser, peerId } }), existUser.peerIds.length === 1 ? HandleJoinLeaveMessageCommand({ - userId: existUser.userId, - username: existUser.username, - userAvatar: existUser.userAvatar, - messageType: 'leave' + id: existUser.id, + name: existUser.name, + avatar: existUser.avatar, + messageType: PROMPT_TYPE.LEAVE }) : null, OnLeaveRoomEvent(peerId) @@ -744,21 +762,20 @@ const ChatRoomDomain = Remesh.domain({ PeerIdQuery, UserListQuery, PeerListQuery, - JoinIsFinishedQuery + JoinIsFinishedQuery, + LastMessageHLCQuery }, command: { JoinRoomCommand, LeaveRoomCommand, SendTextMessageCommand, - SendLikeMessageCommand, - SendHateMessageCommand, + SendReactionCommand, SendSyncUserMessageCommand, SendSyncHistoryMessageCommand }, event: { SendTextMessageEvent, - SendLikeMessageEvent, - SendHateMessageEvent, + SendReactionMessageEvent, SendSyncUserMessageEvent, SendSyncHistoryMessageEvent, JoinRoomEvent, @@ -767,6 +784,7 @@ const ChatRoomDomain = Remesh.domain({ SelfLeaveRoomEvent, OnMessageEvent, OnTextMessageEvent, + OnReactionMessageEvent, OnSyncMessageEvent, OnJoinRoomEvent, OnLeaveRoomEvent, diff --git a/src/domain/HLCClock.ts b/src/domain/HLCClock.ts new file mode 100644 index 0000000..e8ac2bb --- /dev/null +++ b/src/domain/HLCClock.ts @@ -0,0 +1,110 @@ +import { Remesh } from 'remesh' +import { createHLC, sendEvent as hlcSendEvent, receiveEvent as hlcReceiveEvent, type HLC } from '@/utils' + +/** + * HLCClock Domain + * + * Manages the local Hybrid Logical Clock state for the application. + * This clock is used to: + * - Generate timestamps for outgoing messages + * - Update local time based on received messages + * - Ensure causal ordering across distributed peers + */ +const HLCClockDomain = Remesh.domain({ + name: 'HLCClockDomain', + impl: (domain) => { + /** + * Current HLC state + */ + const HLCState = domain.state({ + name: 'HLCClock.HLCState', + default: createHLC() + }) + + /** + * Query current HLC + */ + const CurrentHLCQuery = domain.query({ + name: 'HLCClock.CurrentHLCQuery', + impl: ({ get }) => { + return get(HLCState()) + } + }) + + /** + * Generate new HLC for sending a message + * This updates the local clock and returns the new value + */ + const SendEventCommand = domain.command({ + name: 'HLCClock.SendEventCommand', + impl: ({ get }) => { + const currentHLC = get(HLCState()) + const newHLC = hlcSendEvent(currentHLC) + return [HLCState().new(newHLC), SendEventEvent(newHLC)] + } + }) + + /** + * Update local HLC based on received message + * Takes the remote HLC and merges it with local clock + */ + const ReceiveEventCommand = domain.command({ + name: 'HLCClock.ReceiveEventCommand', + impl: ({ get }, remoteHLC: HLC) => { + const currentHLC = get(HLCState()) + const newHLC = hlcReceiveEvent(currentHLC, remoteHLC) + return [HLCState().new(newHLC), ReceiveEventEvent(newHLC)] + } + }) + + /** + * Reset HLC to initial state + */ + const ResetCommand = domain.command({ + name: 'HLCClock.ResetCommand', + impl: () => { + const initialHLC = createHLC() + return [HLCState().new(initialHLC), ResetEvent()] + } + }) + + /** + * Event emitted when HLC is updated by send + */ + const SendEventEvent = domain.event({ + name: 'HLCClock.SendEventEvent' + }) + + /** + * Event emitted when HLC is updated by receive + */ + const ReceiveEventEvent = domain.event({ + name: 'HLCClock.ReceiveEventEvent' + }) + + /** + * Event emitted when HLC is reset + */ + const ResetEvent = domain.event({ + name: 'HLCClock.ResetEvent' + }) + + return { + query: { + CurrentHLCQuery + }, + command: { + SendEventCommand, + ReceiveEventCommand, + ResetCommand + }, + event: { + SendEventEvent, + ReceiveEventEvent, + ResetEvent + } + } + } +}) + +export default HLCClockDomain diff --git a/src/domain/MessageList.ts b/src/domain/MessageList.ts index a2fcc43..3a9f7c2 100644 --- a/src/domain/MessageList.ts +++ b/src/domain/MessageList.ts @@ -4,38 +4,16 @@ import { IndexDBStorageExtern } from '@/domain/externs/Storage' import StorageEffect from '@/domain/modules/StorageEffect' import StatusModule from './modules/Status' import { MESSAGE_LIST_STORAGE_KEY } from '@/constants/config' -import type { ChatRoomMessageType } from '@/protocol' +import type { LocalMessage } from '@/protocol/Message' -export interface MessageUser { - userId: string - username: string - userAvatar: string -} - -export interface AtUser extends MessageUser { - positions: [number, number][] -} - -export interface NormalMessage extends MessageUser { - type: ChatRoomMessageType.Normal - id: string - body: string - sendTime: number - receiveTime: number - likeUsers: MessageUser[] - hateUsers: MessageUser[] - atUsers: AtUser[] -} - -export interface PromptMessage extends MessageUser { - type: ChatRoomMessageType.Prompt - id: string - body: string - sendTime: number - receiveTime: number -} - -export type Message = NormalMessage | PromptMessage +// Re-export types +export type { + LocalMessage as Message, + TextMessage, + SystemPromptMessage, + MentionedUser, + MessageUser +} from '@/protocol/Message' const MessageListDomain = Remesh.domain({ name: 'MessageListDomain', @@ -46,7 +24,7 @@ const MessageListDomain = Remesh.domain({ key: MESSAGE_LIST_STORAGE_KEY }) - const MessageListModule = ListModule(domain, { + const MessageListModule = ListModule(domain, { name: 'MessageListModule', key: (message) => message.id }) @@ -70,13 +48,13 @@ const MessageListDomain = Remesh.domain({ } }) - const CreateItemEvent = domain.event({ + const CreateItemEvent = domain.event({ name: 'MessageList.CreateItemEvent' }) const CreateItemCommand = domain.command({ name: 'MessageList.CreateItemCommand', - impl: (_, message: Message) => { + impl: (_, message: LocalMessage) => { return [ MessageListModule.command.AddItemCommand(message), CreateItemEvent(message), @@ -86,13 +64,13 @@ const MessageListDomain = Remesh.domain({ } }) - const UpdateItemEvent = domain.event({ + const UpdateItemEvent = domain.event({ name: 'MessageList.UpdateItemEvent' }) const UpdateItemCommand = domain.command({ name: 'MessageList.UpdateItemCommand', - impl: (_, message: Message) => { + impl: (_, message: LocalMessage) => { return [ MessageListModule.command.UpdateItemCommand(message), UpdateItemEvent(message), @@ -120,7 +98,7 @@ const MessageListDomain = Remesh.domain({ const UpsertItemCommand = domain.command({ name: 'MessageList.UpsertItemCommand', - impl: (_, message: Message) => { + impl: (_, message: LocalMessage) => { return [ MessageListModule.command.UpsertItemCommand(message), UpsertItemEvent(message), @@ -130,13 +108,13 @@ const MessageListDomain = Remesh.domain({ } }) - const UpsertItemEvent = domain.event({ + const UpsertItemEvent = domain.event({ name: 'MessageList.UpsertItemEvent' }) const ResetListCommand = domain.command({ name: 'MessageList.ResetListCommand', - impl: (_, messages: Message[]) => { + impl: (_, messages: LocalMessage[]) => { return [ MessageListModule.command.SetListCommand(messages), ResetListEvent(messages), @@ -146,7 +124,7 @@ const MessageListDomain = Remesh.domain({ } }) - const ResetListEvent = domain.event({ + const ResetListEvent = domain.event({ name: 'MessageList.ResetListEvent' }) @@ -168,20 +146,20 @@ const MessageListDomain = Remesh.domain({ } }) - const SyncToStateEvent = domain.event({ + const SyncToStateEvent = domain.event({ name: 'MessageList.SyncToStateEvent' }) const SyncToStateCommand = domain.command({ name: 'MessageList.SyncToStateCommand', - impl: (_, messages: Message[]) => { + impl: (_, messages: LocalMessage[]) => { return [MessageListModule.command.SetListCommand(messages), SyncToStateEvent(messages)] } }) storageEffect .set(SyncToStorageEvent) - .get((value) => [SyncToStateCommand(value ?? []), LoadStatusModule.command.SetFinishedCommand()]) + .get((value) => [SyncToStateCommand(value ?? []), LoadStatusModule.command.SetFinishedCommand()]) return { query: { diff --git a/src/domain/Notification.ts b/src/domain/Notification.ts index 54564c7..7bc0dbb 100644 --- a/src/domain/Notification.ts +++ b/src/domain/Notification.ts @@ -79,7 +79,7 @@ const NotificationDomain = Remesh.domain({ } const userInfo = get(userInfoDomain.query.UserInfoQuery()) - if (message.userId === userInfo?.id) { + if (message.sender.id === userInfo?.id) { return null } @@ -88,7 +88,7 @@ const NotificationDomain = Remesh.domain({ } if (userInfo?.notificationType === 'at') { - const hasAtSelf = message.atUsers.find((user) => user.userId === userInfo?.id) + const hasAtSelf = message.mentions.find((user) => user.id === userInfo?.id) if (hasAtSelf) { return PushCommand(message) } diff --git a/src/domain/WorldRoom.ts b/src/domain/WorldRoom.ts index 816f831..0f9f6e5 100644 --- a/src/domain/WorldRoom.ts +++ b/src/domain/WorldRoom.ts @@ -8,16 +8,16 @@ import { nanoid } from 'nanoid' import StatusModule from '@/domain/modules/Status' import getSiteMeta from '@/utils/getSiteMeta' import { - WorldRoomSendType, type WorldRoomMessage, - type WorldRoomSyncUserMessage, - type WorldRoomMessageFromInfo, + type WorldRoomPeerSyncMessage, + type WorldRoomSiteMeta, checkWorldRoomMessage } from '@/protocol' +import { MESSAGE_TYPE } from '@/protocol/Message' -export type FromInfo = WorldRoomMessageFromInfo +export type FromSite = WorldRoomSiteMeta & { peerId: string } -export type RoomUser = MessageUser & { peerIds: string[]; fromInfos: FromInfo[]; joinTime: number } +export type RoomUser = MessageUser & { peerIds: string[]; fromSites: FromSite[]; joinedAt: number } const WorldRoomDomain = Remesh.domain({ name: 'WorldRoomDomain', @@ -65,17 +65,17 @@ const WorldRoomDomain = Remesh.domain({ const JoinRoomCommand = domain.command({ name: 'Room.JoinRoomCommand', impl: ({ get }) => { - const { id: userId, name: username, avatar: userAvatar } = get(userInfoDomain.query.UserInfoQuery())! + const { id, name, avatar } = get(userInfoDomain.query.UserInfoQuery())! return [ UpdateUserListCommand({ type: 'create', user: { peerId: worldRoomExtern.peerId, - fromInfo: { ...getSiteMeta(), peerId: worldRoomExtern.peerId }, - joinTime: Date.now(), - userId, - username, - userAvatar + fromSite: { ...getSiteMeta(), peerId: worldRoomExtern.peerId }, + joinedAt: Date.now(), + id, + name, + avatar } }), @@ -94,17 +94,17 @@ const WorldRoomDomain = Remesh.domain({ const LeaveRoomCommand = domain.command({ name: 'Room.LeaveRoomCommand', impl: ({ get }) => { - const { id: userId, name: username, avatar: userAvatar } = get(userInfoDomain.query.UserInfoQuery())! + const { id, name, avatar } = get(userInfoDomain.query.UserInfoQuery())! return [ UpdateUserListCommand({ type: 'delete', user: { peerId: worldRoomExtern.peerId, - fromInfo: { ...getSiteMeta(), peerId: worldRoomExtern.peerId }, - joinTime: Date.now(), - userId, - username, - userAvatar + fromSite: { ...getSiteMeta(), peerId: worldRoomExtern.peerId }, + joinedAt: Date.now(), + id, + name, + avatar } }), JoinStatusModule.command.SetInitialCommand(), @@ -125,11 +125,11 @@ const WorldRoomDomain = Remesh.domain({ { get }, action: { type: 'create' | 'delete' - user: Omit & { peerId: string; fromInfo: FromInfo } + user: Omit & { peerId: string; fromSite: FromSite } } ) => { const userList = get(UserListState()) - const existUser = userList.find((user) => user.userId === action.user.userId) + const existUser = userList.find((user) => user.id === action.user.id) if (action.type === 'create') { return [ UserListState().new( @@ -138,9 +138,9 @@ const WorldRoomDomain = Remesh.domain({ { ...action.user, peerIds: [...new Set(existUser?.peerIds || []), action.user.peerId], - fromInfos: upsert(existUser?.fromInfos || [], action.user.fromInfo, 'peerId') + fromSites: upsert(existUser?.fromSites || [], action.user.fromSite, 'peerId') }, - 'userId' + 'id' ) ) ] @@ -152,9 +152,9 @@ const WorldRoomDomain = Remesh.domain({ { ...action.user, peerIds: existUser?.peerIds?.filter((peerId) => peerId !== action.user.peerId) || [], - fromInfos: existUser?.fromInfos?.filter((fromInfo) => fromInfo.peerId !== action.user.peerId) || [] + fromSites: existUser?.fromSites?.filter((fromSite) => fromSite.peerId !== action.user.peerId) || [] }, - 'userId' + 'id' ).filter((user) => user.peerIds.length) ) ] @@ -166,14 +166,22 @@ const WorldRoomDomain = Remesh.domain({ name: 'Room.SendSyncUserMessageCommand', impl: ({ get }, peerId: string) => { const self = get(SelfUserQuery()) + const now = Date.now() - const syncUserMessage: WorldRoomSyncUserMessage = { - ...self, + const syncUserMessage: WorldRoomPeerSyncMessage = { + type: MESSAGE_TYPE.PEER_SYNC, id: nanoid(), + hlc: { timestamp: now, counter: 0 }, + sentAt: now, + receivedAt: now, + sender: { + id: self.id, + name: self.name, + avatar: self.avatar + }, peerId: worldRoomExtern.peerId, - sendTime: Date.now(), - fromInfo: { ...getSiteMeta(), peerId: worldRoomExtern.peerId }, - type: WorldRoomSendType.SyncUser + joinedAt: self.joinedAt, + siteMeta: getSiteMeta() } worldRoomExtern.sendMessage(syncUserMessage, peerId) @@ -181,7 +189,7 @@ const WorldRoomDomain = Remesh.domain({ } }) - const SendSyncUserMessageEvent = domain.event({ + const SendSyncUserMessageEvent = domain.event({ name: 'Room.SendSyncUserMessageEvent' }) @@ -249,8 +257,18 @@ const WorldRoomDomain = Remesh.domain({ const messageCommand$ = (() => { switch (message.type) { - case WorldRoomSendType.SyncUser: { - return of(UpdateUserListCommand({ type: 'create', user: message })) + case MESSAGE_TYPE.PEER_SYNC: { + return of( + UpdateUserListCommand({ + type: 'create', + user: { + ...message.sender, + peerId: message.peerId, + fromSite: { ...message.siteMeta, peerId: message.peerId }, + joinedAt: message.joinedAt + } + }) + ) } default: @@ -282,7 +300,7 @@ const WorldRoomDomain = Remesh.domain({ return [ UpdateUserListCommand({ type: 'delete', - user: { ...existUser, peerId, fromInfo: { ...getSiteMeta(), peerId } } + user: { ...existUser, peerId, fromSite: { ...getSiteMeta(), peerId } } }), OnLeaveRoomEvent(peerId) ] diff --git a/src/protocol/ChatRoom.ts b/src/protocol/ChatRoom.ts index 973c200..07d66ac 100644 --- a/src/protocol/ChatRoom.ts +++ b/src/protocol/ChatRoom.ts @@ -1,94 +1,43 @@ import * as v from 'valibot' +import { MESSAGE_TYPE, REACTION_TYPE, HLCSchema, MessageMetaSchema, TextMessageSchema } from './Message' -// ChatRoom MessageType -export enum ChatRoomMessageType { - Normal = 'normal', - Prompt = 'prompt' -} - -// ChatRoom SendType -export enum ChatRoomSendType { - Text = 'Text', - Like = 'Like', - Hate = 'Hate', - SyncUser = 'SyncUser', - SyncHistory = 'SyncHistory' -} - -// ChatRoom Message Schemas -const ChatRoomMessageUserSchema = { - userId: v.string(), - username: v.string(), - userAvatar: v.string() -} - -const ChatRoomMessageAtUserSchema = { - positions: v.array(v.tuple([v.number(), v.number()])), - ...ChatRoomMessageUserSchema -} - -export const ChatRoomNormalMessageSchema = { - id: v.string(), - type: v.literal(ChatRoomMessageType.Normal), - body: v.string(), - sendTime: v.number(), - receiveTime: v.number(), - likeUsers: v.array(v.object(ChatRoomMessageUserSchema)), - hateUsers: v.array(v.object(ChatRoomMessageUserSchema)), - atUsers: v.array(v.object(ChatRoomMessageAtUserSchema)), - ...ChatRoomMessageUserSchema -} - -// ChatRoom Message Schema +// ChatRoom-specific message schemas export const ChatRoomMessageSchema = v.union([ + // Text Message (reuse from Message.ts) + TextMessageSchema, + + // Reaction Message v.object({ - type: v.literal(ChatRoomSendType.Text), - id: v.string(), - body: v.string(), - sendTime: v.number(), - atUsers: v.array(v.object(ChatRoomMessageAtUserSchema)), - ...ChatRoomMessageUserSchema + ...MessageMetaSchema.entries, + type: v.literal(MESSAGE_TYPE.REACTION), + targetId: v.string(), + reaction: v.union([v.literal(REACTION_TYPE.LIKE), v.literal(REACTION_TYPE.HATE)]) }), + + // Peer Sync Message v.object({ - type: v.literal(ChatRoomSendType.Like), - id: v.string(), - sendTime: v.number(), - ...ChatRoomMessageUserSchema - }), - v.object({ - type: v.literal(ChatRoomSendType.Hate), - id: v.string(), - sendTime: v.number(), - ...ChatRoomMessageUserSchema - }), - v.object({ - type: v.literal(ChatRoomSendType.SyncUser), - id: v.string(), + ...MessageMetaSchema.entries, + type: v.literal(MESSAGE_TYPE.PEER_SYNC), peerId: v.string(), - joinTime: v.number(), - sendTime: v.number(), - lastMessageTime: v.number(), - ...ChatRoomMessageUserSchema + joinedAt: v.number(), + lastMessageHLC: HLCSchema }), + + // History Sync Message v.object({ - type: v.literal(ChatRoomSendType.SyncHistory), - id: v.string(), - sendTime: v.number(), - messages: v.array(v.object(ChatRoomNormalMessageSchema)), - ...ChatRoomMessageUserSchema + ...MessageMetaSchema.entries, + type: v.literal(MESSAGE_TYPE.HISTORY_SYNC), + messages: v.array(TextMessageSchema) }) ]) // ChatRoom Types -export type ChatRoomMessageUser = v.InferOutput> -export type ChatRoomMessageAtUser = v.InferOutput> -export type ChatRoomNormalMessage = v.InferOutput> -export type ChatRoomTextMessage = v.InferOutput<(typeof ChatRoomMessageSchema.options)[0]> -export type ChatRoomLikeMessage = v.InferOutput<(typeof ChatRoomMessageSchema.options)[1]> -export type ChatRoomHateMessage = v.InferOutput<(typeof ChatRoomMessageSchema.options)[2]> -export type ChatRoomSyncUserMessage = v.InferOutput<(typeof ChatRoomMessageSchema.options)[3]> -export type ChatRoomSyncHistoryMessage = v.InferOutput<(typeof ChatRoomMessageSchema.options)[4]> export type ChatRoomMessage = v.InferInput +export type ChatRoomTextMessage = v.InferOutput<(typeof ChatRoomMessageSchema.options)[0]> +export type ChatRoomReactionMessage = v.InferOutput<(typeof ChatRoomMessageSchema.options)[1]> +export type ChatRoomPeerSyncMessage = v.InferOutput<(typeof ChatRoomMessageSchema.options)[2]> +export type ChatRoomHistorySyncMessage = v.InferOutput<(typeof ChatRoomMessageSchema.options)[3]> // Check if the message conforms to the format -export const checkChatRoomMessage = (message: ChatRoomMessage) => v.safeParse(ChatRoomMessageSchema, message).success +export const checkChatRoomMessage = (message: unknown): message is ChatRoomMessage => + v.safeParse(ChatRoomMessageSchema, message).success diff --git a/src/protocol/Message.ts b/src/protocol/Message.ts new file mode 100644 index 0000000..b0efcdf --- /dev/null +++ b/src/protocol/Message.ts @@ -0,0 +1,259 @@ +import * as v from 'valibot' + +// ============ Message Type Constants ============ + +/** + * Message type constants for type safety and consistency + */ +export const MESSAGE_TYPE = { + TEXT: 'text', + REACTION: 'reaction', + PEER_SYNC: 'peer-sync', + HISTORY_SYNC: 'history-sync', + SYSTEM_PROMPT: 'system-prompt' +} as const + +export type MessageType = (typeof MESSAGE_TYPE)[keyof typeof MESSAGE_TYPE] + +/** + * Reaction type constants + */ +export const REACTION_TYPE = { + LIKE: 'like', + HATE: 'hate' +} as const + +export type ReactionType = (typeof REACTION_TYPE)[keyof typeof REACTION_TYPE] + +/** + * System prompt type constants + */ +export const PROMPT_TYPE = { + JOIN: 'join', + LEAVE: 'leave', + INFO: 'info' +} as const + +export type PromptType = (typeof PROMPT_TYPE)[keyof typeof PROMPT_TYPE] + +// ============ Base Types ============ + +/** + * Hybrid Logical Clock + * Provides causal ordering in distributed systems + */ +export interface HLC { + timestamp: number // Physical time in milliseconds + counter: number // Logical counter for same timestamp +} + +/** + * User information + * Can represent sender, receiver, or mentioned user + */ +export interface MessageUser { + id: string + name: string + avatar: string +} + +/** + * Mentioned user with position information + */ +export interface MentionedUser extends MessageUser { + positions: [number, number][] // Position ranges in message body +} + +/** + * Base metadata for all messages + */ +export interface MessageMetadata { + id: string // Unique message identifier + hlc: HLC // Hybrid Logical Clock for ordering and sync + sentAt: number // Sender's local physical time (for display) + receivedAt: number // Receiver's local physical time (for local record) + sender: MessageUser // Sender information +} + +// ============ Message Types ============ + +/** + * Text message (used for both network transmission and local storage) + */ +export interface TextMessage extends MessageMetadata { + type: typeof MESSAGE_TYPE.TEXT + body: string + mentions: MentionedUser[] + reactions: { + likes: MessageUser[] // Users who liked this message + hates: MessageUser[] // Users who disliked this message + } +} + +/** + * Reaction message (network transmission only, not stored) + * Used to add/remove like/hate on a text message + */ +export interface ReactionMessage extends MessageMetadata { + type: typeof MESSAGE_TYPE.REACTION + targetId: string // Target message ID + reaction: ReactionType +} + +/** + * Peer sync message (network transmission only, not stored) + * Exchanged when peers connect to sync user info + */ +export interface PeerSyncMessage extends MessageMetadata { + type: typeof MESSAGE_TYPE.PEER_SYNC + peerId: string + joinedAt: number // Timestamp when user joined the room + lastMessageHLC: HLC // Last message HLC for history sync decision +} + +/** + * History sync message (network transmission only, not stored) + * Used to sync historical messages to newly joined peers + */ +export interface HistorySyncMessage extends MessageMetadata { + type: typeof MESSAGE_TYPE.HISTORY_SYNC + messages: TextMessage[] // Historical text messages +} + +/** + * System prompt message (local storage only, not transmitted) + * Used for system notifications like user join/leave + */ +export interface SystemPromptMessage extends MessageMetadata { + type: typeof MESSAGE_TYPE.SYSTEM_PROMPT + body: string + promptType: PromptType +} + +/** + * Network message union type + * Messages that can be transmitted over the network + */ +export type NetworkMessage = TextMessage | ReactionMessage | PeerSyncMessage | HistorySyncMessage + +/** + * Local message union type + * Messages that are stored locally + */ +export type LocalMessage = TextMessage | SystemPromptMessage + +// ============ Valibot Schemas ============ + +export const HLCSchema = v.object({ + timestamp: v.number(), + counter: v.number() +}) + +export const MessageUserSchema = v.object({ + id: v.string(), + name: v.string(), + avatar: v.string() +}) + +export const MentionedUserSchema = v.object({ + id: v.string(), + name: v.string(), + avatar: v.string(), + positions: v.array(v.tuple([v.number(), v.number()])) +}) + +export const MessageMetaSchema = v.object({ + id: v.string(), + hlc: HLCSchema, + sentAt: v.number(), + receivedAt: v.number(), + sender: MessageUserSchema +}) + +// ============ Network Message Schemas ============ + +export const TextMessageSchema = v.object({ + type: v.literal(MESSAGE_TYPE.TEXT), + id: v.string(), + hlc: HLCSchema, + sentAt: v.number(), + receivedAt: v.number(), + sender: MessageUserSchema, + body: v.string(), + mentions: v.array(MentionedUserSchema), + reactions: v.object({ + likes: v.array(MessageUserSchema), + hates: v.array(MessageUserSchema) + }) +}) + +export const ReactionMessageSchema = v.object({ + type: v.literal(MESSAGE_TYPE.REACTION), + id: v.string(), + hlc: HLCSchema, + sentAt: v.number(), + receivedAt: v.number(), + sender: MessageUserSchema, + targetId: v.string(), + reaction: v.union([v.literal(REACTION_TYPE.LIKE), v.literal(REACTION_TYPE.HATE)]) +}) + +export const PeerSyncMessageSchema = v.object({ + type: v.literal(MESSAGE_TYPE.PEER_SYNC), + id: v.string(), + hlc: HLCSchema, + sentAt: v.number(), + receivedAt: v.number(), + sender: MessageUserSchema, + peerId: v.string(), + joinedAt: v.number(), + lastMessageHLC: HLCSchema +}) + +export const HistorySyncMessageSchema = v.object({ + type: v.literal(MESSAGE_TYPE.HISTORY_SYNC), + id: v.string(), + hlc: HLCSchema, + sentAt: v.number(), + receivedAt: v.number(), + sender: MessageUserSchema, + messages: v.array(TextMessageSchema) +}) + +export const NetworkMessageSchema = v.union([ + TextMessageSchema, + ReactionMessageSchema, + PeerSyncMessageSchema, + HistorySyncMessageSchema +]) + +// ============ Stored Message Schemas ============ + +export const SystemPromptMessageSchema = v.object({ + type: v.literal(MESSAGE_TYPE.SYSTEM_PROMPT), + id: v.string(), + hlc: HLCSchema, + sentAt: v.number(), + receivedAt: v.number(), + sender: MessageUserSchema, + body: v.string(), + promptType: v.union([v.literal(PROMPT_TYPE.JOIN), v.literal(PROMPT_TYPE.LEAVE), v.literal(PROMPT_TYPE.INFO)]) +}) + +export const LocalMessageSchema = v.union([TextMessageSchema, SystemPromptMessageSchema]) + +// ============ Utility Functions ============ + +/** + * Validate network message format + */ +export const validateNetworkMessage = (message: unknown): message is NetworkMessage => { + return v.safeParse(NetworkMessageSchema, message).success +} + +/** + * Validate local message format + */ +export const validateLocalMessage = (message: unknown): message is LocalMessage => { + return v.safeParse(LocalMessageSchema, message).success +} diff --git a/src/protocol/WorldRoom.ts b/src/protocol/WorldRoom.ts index ade6f55..611bd0f 100644 --- a/src/protocol/WorldRoom.ts +++ b/src/protocol/WorldRoom.ts @@ -1,19 +1,8 @@ import * as v from 'valibot' +import { PeerSyncMessageSchema } from './Message' -// WorldRoom SendType -export enum WorldRoomSendType { - SyncUser = 'SyncUser' -} - -// WorldRoom Message Schemas -const WorldRoomMessageUserSchema = { - userId: v.string(), - username: v.string(), - userAvatar: v.string() -} - -const WorldRoomMessageFromInfoSchema = { - peerId: v.string(), +// Site metadata schema +const SiteMetaSchema = v.object({ host: v.string(), hostname: v.string(), href: v.string(), @@ -21,26 +10,23 @@ const WorldRoomMessageFromInfoSchema = { title: v.string(), icon: v.string(), description: v.string() -} +}) // WorldRoom Message Schema +// Extends PeerSyncMessageSchema with siteMeta field, but omits lastMessageHLC +// WorldRoom only handles user discovery, not message history sync export const WorldRoomMessageSchema = v.union([ v.object({ - type: v.literal(WorldRoomSendType.SyncUser), - id: v.string(), - peerId: v.string(), - joinTime: v.number(), - sendTime: v.number(), - fromInfo: v.object(WorldRoomMessageFromInfoSchema), - ...WorldRoomMessageUserSchema + ...v.omit(PeerSyncMessageSchema, ['lastMessageHLC']).entries, + siteMeta: SiteMetaSchema }) ]) // WorldRoom Types -export type WorldRoomMessageUser = v.InferOutput> -export type WorldRoomMessageFromInfo = v.InferOutput> -export type WorldRoomSyncUserMessage = v.InferOutput<(typeof WorldRoomMessageSchema.options)[0]> +export type WorldRoomSiteMeta = v.InferOutput +export type WorldRoomPeerSyncMessage = v.InferOutput<(typeof WorldRoomMessageSchema.options)[0]> export type WorldRoomMessage = v.InferInput // Check if the message conforms to the format -export const checkWorldRoomMessage = (message: WorldRoomMessage) => v.safeParse(WorldRoomMessageSchema, message).success +export const checkWorldRoomMessage = (message: unknown): message is WorldRoomMessage => + v.safeParse(WorldRoomMessageSchema, message).success diff --git a/src/protocol/index.ts b/src/protocol/index.ts index 02716c6..5283245 100644 --- a/src/protocol/index.ts +++ b/src/protocol/index.ts @@ -1,2 +1,3 @@ +export * from './Message' export * from './ChatRoom' export * from './WorldRoom' diff --git a/src/service/Notification/index.ts b/src/service/Notification/index.ts index 960ad07..f7e174f 100644 --- a/src/service/Notification/index.ts +++ b/src/service/Notification/index.ts @@ -47,8 +47,8 @@ export class Notification implements NotificationExternType { const id = await browser.notifications.create({ type: 'basic', - iconUrl: message.userAvatar, - title: message.username, + iconUrl: message.sender.avatar, + title: message.sender.name, message: message.body, contextMessage: messageTab?.url }) diff --git a/src/utils/hlc.ts b/src/utils/hlc.ts new file mode 100644 index 0000000..54cb0cb --- /dev/null +++ b/src/utils/hlc.ts @@ -0,0 +1,109 @@ +/** + * Hybrid Logical Clock (HLC) utilities + * + * HLC combines physical time with logical counters to provide: + * - Causal ordering in distributed systems + * - Tolerance to clock skew + * - Timestamps close to physical time + * + * @see https://www.cse.buffalo.edu/tech-reports/2014-04.pdf + */ + +export interface HLC { + timestamp: number // Physical time in milliseconds + counter: number // Logical counter for same timestamp +} + +/** + * Compare two HLCs + * @returns negative if a < b, 0 if a == b, positive if a > b + */ +export const compareHLC = (a: HLC, b: HLC): number => { + if (a.timestamp !== b.timestamp) { + return a.timestamp - b.timestamp + } + return a.counter - b.counter +} + +/** + * Create initial HLC with epoch time + */ +export const createHLC = (): HLC => { + return { timestamp: 0, counter: 0 } +} + +/** + * Update local HLC for a send event + * + * Rules: + * - If physical time advanced: use new time, reset counter + * - If physical time unchanged: increment counter + */ +export const sendEvent = (localHLC: HLC): HLC => { + const now = Date.now() + + if (now > localHLC.timestamp) { + // Physical clock advanced, reset counter + return { timestamp: now, counter: 0 } + } else { + // Physical clock unchanged, increment counter + return { timestamp: localHLC.timestamp, counter: localHLC.counter + 1 } + } +} + +/** + * Update local HLC for a receive event + * + * Rules: + * - Take max of (local time, local HLC, remote HLC) + * - If max timestamp appears in HLCs, increment counter + * - Otherwise reset counter + */ +export const receiveEvent = (localHLC: HLC, remoteHLC: HLC): HLC => { + const now = Date.now() + const maxTimestamp = Math.max(now, localHLC.timestamp, remoteHLC.timestamp) + + if (maxTimestamp === now && now > localHLC.timestamp && now > remoteHLC.timestamp) { + // Local physical time is newest + return { timestamp: now, counter: 0 } + } + + // Find max counter among HLCs with max timestamp + let maxCounter = 0 + if (maxTimestamp === localHLC.timestamp) { + maxCounter = Math.max(maxCounter, localHLC.counter) + } + if (maxTimestamp === remoteHLC.timestamp) { + maxCounter = Math.max(maxCounter, remoteHLC.counter) + } + + return { timestamp: maxTimestamp, counter: maxCounter + 1 } +} + +/** + * Check if HLC is within a time window + */ +export const isWithinTimeWindow = (hlc: HLC, windowMs: number): boolean => { + return hlc.timestamp >= Date.now() - windowMs +} + +/** + * Format HLC as readable string for debugging + */ +export const formatHLC = (hlc: HLC): string => { + return `${new Date(hlc.timestamp).toISOString()}:${hlc.counter}` +} + +/** + * Check if HLC is valid (non-negative values) + */ +export const isValidHLC = (hlc: HLC): boolean => { + return hlc.timestamp >= 0 && hlc.counter >= 0 && Number.isInteger(hlc.counter) +} + +/** + * Clone HLC (defensive copy) + */ +export const cloneHLC = (hlc: HLC): HLC => { + return { timestamp: hlc.timestamp, counter: hlc.counter } +} diff --git a/src/utils/index.ts b/src/utils/index.ts index 3e44576..00274f6 100644 --- a/src/utils/index.ts +++ b/src/utils/index.ts @@ -19,3 +19,14 @@ export { getTextByteSize } from './getTextByteSize' export { default as isEqual } from './isEqual' export { default as setIntervalImmediate } from './setIntervalImmediate' export { cleanURL, isAbsoluteURL, assembleURL, buildFullURL, safeUrl } from './url' +export { + type HLC, + compareHLC, + createHLC, + sendEvent, + receiveEvent, + isWithinTimeWindow, + formatHLC, + isValidHLC, + cloneHLC +} from './hlc'