fix: 整合群聊列表数据、服务重启时禁止ws链接连接

This commit is contained in:
乾乾
2025-08-29 18:54:37 +08:00
parent e2c19eaa2b
commit 69b475ce64
34 changed files with 303 additions and 240 deletions

View File

@@ -25,7 +25,6 @@ import java.util.Map;
public class GrayVersionLoadBalancer implements GrayscaleLoadBalancer {
private DiscoveryClient discoveryClient;
/**
* 根据serviceId 筛选可用服务
*

View File

@@ -72,7 +72,8 @@ public class GroupMemberAddListener {
pushService.sendPushMsg(MemberAdapter.buildMemberAddWS(roomId, onlineUids, memberResps, map), memberUidList, event.getUid());
// 移除缓存
groupMemberCache.evictMemberUidList(roomId);
groupMemberCache.evictMemberList(roomId);
groupMemberCache.evictExceptMemberList(roomId);
}
}

View File

@@ -1,7 +1,6 @@
package com.luohuo.flex.im.core.chat.consumer;
import com.luohuo.basic.context.ContextUtil;
import com.luohuo.basic.utils.TimeUtils;
import com.luohuo.flex.common.constant.MqConstant;
import com.luohuo.flex.im.core.chat.dao.ContactDao;
import com.luohuo.flex.im.core.chat.dao.MessageDao;
@@ -32,10 +31,8 @@ import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
/**
* 发送消息更新房间收信箱,并同步给房间成员信箱
@@ -78,7 +75,7 @@ public class MsgSendConsumer implements RocketMQListener<MsgSendMessageDTO> {
}
// 2. 更新所有群成员的会话时间, 并推送房间成员
refreshContactActiveTimeEnhanced(room.getId(), memberUidList, message.getId(), TimeUtils.getTime(message.getCreateTime()));
contactDao.refreshOrCreateActiveTime(room.getId(), memberUidList, message.getId(), message.getCreateTime());
// 3. 与在线人员交集并进行路由
switch (MessageTypeEnum.of(message.getType())) {
@@ -120,35 +117,4 @@ public class MsgSendConsumer implements RocketMQListener<MsgSendMessageDTO> {
}
}
}
// 增强版防抖控制
private final Map<Long, DebounceInfo> roomDebounceInfo = new ConcurrentHashMap<>();
private static final long MAX_DEBOUNCE_TIME = 10000;
private static final int MAX_DEBOUNCE_COUNT = 1000;
private void refreshContactActiveTimeEnhanced(Long roomId, List<Long> memberUidList, Long messageId, Long createTime) {
Long currentTime = System.currentTimeMillis();
DebounceInfo info = roomDebounceInfo.get(roomId);
if (info == null) {
// 首次更新
contactDao.refreshOrCreateActiveTime(roomId, memberUidList, messageId, TimeUtils.timestampToLocalDateTime(createTime));
roomDebounceInfo.put(roomId, new DebounceInfo(messageId, currentTime, 0));
return;
}
// 检查是否需要立即更新
boolean shouldUpdate = (currentTime - info.getLastUpdateTime()) >= MAX_DEBOUNCE_TIME || info.getPendingCount() >= MAX_DEBOUNCE_COUNT;
if (shouldUpdate) {
// 执行更新并使用最新消息ID
contactDao.refreshOrCreateActiveTime(roomId, memberUidList, messageId, TimeUtils.timestampToLocalDateTime(createTime));
roomDebounceInfo.put(roomId, new DebounceInfo(messageId, currentTime, 0));
log.debug("强制更新会话时间 roomId: {}, 消息ID: {}", roomId, messageId);
} else {
// 累积防抖计数
roomDebounceInfo.put(roomId, new DebounceInfo(messageId, info.getLastUpdateTime(), info.getPendingCount() + 1));
log.debug("防抖中 roomId: {}, 累积消息: {}", roomId, info.getPendingCount() + 1);
}
}
}

View File

@@ -161,7 +161,8 @@ public class ContactDao extends ServiceImpl<ContactMapper, Contact> {
* @return
*/
public List<Contact> getAllContactsByUid(Long uid) {
return cachePlusOps.hGet(UserContactCacheKeyBuilder.build(uid), x -> lambdaQuery().eq(Contact::getUid, uid).list(), true).getValue();
// return cachePlusOps.hGet(UserContactCacheKeyBuilder.build(uid), x -> lambdaQuery().eq(Contact::getUid, uid).list(), true).getValue();
return lambdaQuery().eq(Contact::getUid, uid).list();
}
}

View File

@@ -23,6 +23,7 @@ import org.springframework.stereotype.Service;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import static com.luohuo.flex.im.domain.enums.GroupRoleEnum.ROLE_LIST;
@@ -255,4 +256,10 @@ public class GroupMemberDao extends ServiceImpl<GroupMemberMapper, GroupMember>
public List<ChatMember> getMemberListByUid(List<Long> memberList) {
return baseMapper.getMemberListByUid(memberList);
}
public List<GroupMember> getGroupMemberByGroupIdListAndUid(Long uid, Set<Long> groupIdList) {
QueryWrapper<GroupMember> wrapper = new QueryWrapper<>();
wrapper.in("group_id", groupIdList).eq("uid", uid);
return baseMapper.selectList(wrapper);
}
}

View File

@@ -1,11 +1,10 @@
package com.luohuo.flex.im.core.chat.dao;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.service.IService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.luohuo.flex.im.domain.entity.Room;
import com.luohuo.flex.im.core.chat.mapper.RoomMapper;
import com.luohuo.flex.im.domain.vo.res.GroupListVO;
import com.luohuo.flex.im.domain.vo.response.MemberResp;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
@@ -29,7 +28,7 @@ public class RoomDao extends ServiceImpl<RoomMapper, Room> implements IService<R
.update();
}
public List<GroupListVO> groupList(Long uid, IPage<GroupListVO> page) {
return baseMapper.groupList(uid,page).getRecords();
public List<MemberResp> groupList(Long uid) {
return baseMapper.groupList(uid);
}
}

View File

@@ -1,11 +1,12 @@
package com.luohuo.flex.im.core.chat.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.luohuo.flex.im.domain.vo.res.GroupListVO;
import com.luohuo.flex.im.domain.vo.response.MemberResp;
import org.springframework.stereotype.Repository;
import com.luohuo.flex.im.domain.entity.Room;
import java.util.List;
/**
* <p>
* 房间表 Mapper 接口
@@ -16,5 +17,5 @@ import com.luohuo.flex.im.domain.entity.Room;
@Repository
public interface RoomMapper extends BaseMapper<Room> {
IPage<GroupListVO> groupList(Long uid, IPage<GroupListVO> page);
List<MemberResp> groupList(Long uid);
}

View File

@@ -6,7 +6,6 @@ import com.luohuo.flex.im.domain.vo.request.admin.AdminAddReq;
import com.luohuo.flex.im.domain.vo.request.admin.AdminRevokeReq;
import com.luohuo.flex.im.domain.vo.request.contact.ContactAddReq;
import com.luohuo.flex.im.domain.vo.request.member.MemberExitReq;
import com.luohuo.flex.im.domain.vo.res.GroupListVO;
import jakarta.validation.Valid;
import com.luohuo.flex.im.domain.vo.req.CursorPageBaseReq;
import com.luohuo.flex.im.domain.vo.res.CursorPageBaseResp;
@@ -74,11 +73,10 @@ public interface RoomAppService {
ChatRoomResp getContactDetailByFriend(Long uid, @Valid ContactFriendReq req);
IPage<GroupListVO> groupList(Long uid, IPage<GroupListVO> page);
List<MemberResp> groupList(Long uid);
void asyncOnline(List<Long> uidList, Long roomId, boolean online);
/**
* 申请加群列表
* @param uid 登录用户id

View File

@@ -7,8 +7,8 @@ import com.luohuo.flex.im.domain.entity.Room;
import com.luohuo.flex.im.domain.entity.RoomFriend;
import com.luohuo.flex.im.domain.entity.RoomGroup;
import com.luohuo.flex.im.domain.vo.request.GroupAddReq;
import com.luohuo.flex.im.domain.vo.res.GroupListVO;
import com.luohuo.flex.im.domain.vo.response.AnnouncementsResp;
import com.luohuo.flex.im.domain.vo.response.MemberResp;
import java.util.List;
@@ -49,7 +49,7 @@ public interface RoomService {
* 群聊列表
* @param uid 登录用户id
*/
void groupList(Long uid, IPage<GroupListVO> page);
List<MemberResp> groupList(Long uid);
/**
* 校验当前用户是否在群里

View File

@@ -62,7 +62,7 @@ public class GroupMemberCache {
* @param memberUid 成员用户ID
* @return 成员详细信息
*/
@Cacheable(cacheNames = "luohuo:member:info", key = "#roomId + ':' + #memberUid")
@Cacheable(cacheNames = "luohuo:member:info", key = "#roomId + ':' + #memberUid", unless = "#result == null")
public GroupMember getMemberDetail(Long roomId, Long memberUid) {
return groupMemberDao.getMember(roomId, memberUid);
}
@@ -76,16 +76,6 @@ public class GroupMemberCache {
// 清理所有成员详情缓存(慎用)
}
/**
* 这里的操作要双清
* @param roomId
*/
public void evictMemberUidList(Long roomId) {
evictMemberList(roomId);
evictExceptMemberList(roomId);
}
@CacheEvict(cacheNames = "luohuo:member:except", key = "#roomId")
public void evictExceptMemberList(Long roomId) {
}

View File

@@ -2,8 +2,6 @@ package com.luohuo.flex.im.core.chat.service.impl;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.lang.Pair;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.baomidou.mybatisplus.core.metadata.IPage;
@@ -28,7 +26,6 @@ import com.luohuo.flex.im.domain.vo.request.admin.AdminRevokeReq;
import com.luohuo.flex.im.domain.vo.request.member.MemberExitReq;
import com.luohuo.flex.model.enums.ChatActiveStatusEnum;
import com.luohuo.flex.im.domain.vo.request.contact.ContactAddReq;
import com.luohuo.flex.im.domain.vo.res.GroupListVO;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotNull;
import lombok.AllArgsConstructor;
@@ -108,6 +105,7 @@ import com.luohuo.flex.im.core.user.service.impl.PushService;
import java.time.LocalDateTime;
import java.util.*;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -275,9 +273,27 @@ public class RoomAppServiceImpl implements RoomAppService, InitializingBean {
}
@Override
public IPage<GroupListVO> groupList(Long uid, IPage<GroupListVO> page) {
roomService.groupList(uid,page);
return page;
public List<MemberResp> groupList(Long uid) {
List<MemberResp> voList = roomService.groupList(uid);
Set<Long> groupIdList = voList.stream().map(MemberResp::getGroupId).collect(Collectors.toSet());
List<Long> roomIdList = voList.stream().map(MemberResp::getRoomId).collect(Collectors.toList());
Map<Long, Long> onlineMap = presenceApi.getBatchGroupOnlineCounts(roomIdList).getData();
List<GroupMember> members = groupMemberDao.getGroupMemberByGroupIdListAndUid(uid, groupIdList);
Map<Long, GroupMember> map = members.stream().collect(Collectors.toMap(GroupMember::getGroupId, Function.identity()));
// 渲染群信息
voList.forEach(item -> {
Long memberNum = (long) groupMemberCache.getMemberUidList(item.getRoomId()).size();
GroupMember member = map.get(item.getGroupId());
item.setOnlineNum(onlineMap.get(item.getRoomId()));
item.setRoleId(getGroupRole(uid, item.getGroupId()));
item.setMemberNum(memberNum);
item.setRemark(member.getRemark());
item.setMyName(member.getMyName());
});
return voList;
}
/**
@@ -384,7 +400,7 @@ public class RoomAppServiceImpl implements RoomAppService, InitializingBean {
}
CacheKey ogmKey = PresenceCacheKeyBuilder.onlineGroupMembersKey(roomId);
for (Long uid : uidList) {
for (Long uid : onlineList) {
CacheKey ougKey = PresenceCacheKeyBuilder.onlineUserGroupsKey(uid);
if(online) {
@@ -703,7 +719,7 @@ public class RoomAppServiceImpl implements RoomAppService, InitializingBean {
.account(roomGroup.getAccount())
.remark(member.getRemark())
.myName(member.getMyName())
.roleId(getGroupRole(uid, roomGroup, room))
.roleId(getGroupRole(uid, roomGroup.getId()))
.build();
}
@@ -796,7 +812,7 @@ public class RoomAppServiceImpl implements RoomAppService, InitializingBean {
RoomGroup roomGroup = roomGroupCache.get(request.getRoomId());
AssertUtil.isNotEmpty(roomGroup, "房间号有误");
GroupMember self = groupMemberDao.getMemberByGroupId(roomGroup.getId(), uid);
AssertUtil.isNotEmpty(self, GroupErrorEnum.USER_NOT_IN_GROUP, "groupMember");
AssertUtil.isNotEmpty(self, GroupErrorEnum.USER_NOT_IN_GROUP, "");
// 如果房间人员小于3人 那么直接解散群聊
CacheKey membersKey = PresenceCacheKeyBuilder.groupMembersKey(request.getRoomId());
@@ -830,14 +846,14 @@ public class RoomAppServiceImpl implements RoomAppService, InitializingBean {
contactDao.removeByRoomId(room.getId(), Collections.singletonList(request.getUid()));
return true;
})){
List<Long> memberUidList = groupMemberCache.getMemberExceptUidList(roomGroup.getRoomId());
if(!memberUidList.contains(request.getUid())){
memberUidList.add(request.getUid());
}
WsBaseResp<WSMemberChange> ws = MemberAdapter.buildMemberRemoveWS(roomGroup.getRoomId(), Arrays.asList(member.getUid()), WSMemberChange.CHANGE_TYPE_REMOVE);
pushService.sendPushMsg(ws, memberUidList, uid);
groupMemberCache.evictMemberUidList(room.getId());
groupMemberCache.evictMemberList(room.getId());
groupMemberCache.evictExceptMemberList(room.getId());
groupMemberCache.evictMemberDetail(room.getId(), removedUid);
// 移除群聊缓存
@@ -940,7 +956,8 @@ public class RoomAppServiceImpl implements RoomAppService, InitializingBean {
return true;
});
// 4.5 告知所有人群已经被解散, 这里要走groupMemberDao查询缓存中可能没有屏蔽群的用户
groupMemberCache.evictMemberUidList(room.getId());
groupMemberCache.evictMemberList(room.getId());
groupMemberCache.evictExceptMemberList(room.getId());
groupMemberCache.evictAllMemberDetails();
// 新版解散群聊
CacheKey uKey = PresenceCacheKeyBuilder.userGroupsKey(uid);
@@ -949,7 +966,7 @@ public class RoomAppServiceImpl implements RoomAppService, InitializingBean {
asyncOnline(memberUidList, room.getId(), false);
pushService.sendPushMsg(RoomAdapter.buildGroupDissolution(roomGroup.getName()), memberUidList, uid);
} else {
transactionTemplate.execute(e -> {
if(transactionTemplate.execute(e -> {
// 4.6 删除会话
Boolean isDelContact = contactDao.removeByRoomId(roomId, Collections.singletonList(uid));
AssertUtil.isTrue(isDelContact, "会话移除异常");
@@ -957,51 +974,59 @@ public class RoomAppServiceImpl implements RoomAppService, InitializingBean {
Boolean isDelGroupMember = groupMemberDao.removeByGroupId(roomGroup.getId(), Collections.singletonList(uid));
AssertUtil.isTrue(isDelGroupMember, "群成员移除失败");
return true;
});
// 4.8 发送移除事件告知群成员
WsBaseResp<WSMemberChange> ws = MemberAdapter.buildMemberRemoveWS(roomGroup.getRoomId(), Arrays.asList(uid), WSMemberChange.CHANGE_TYPE_QUIT);
pushService.sendPushMsg(ws, memberUidList, uid);
groupMemberCache.evictMemberUidList(room.getId());
groupMemberCache.evictMemberDetail(room.getId(), uid);
})){
// 4.8 发送移除事件告知群成员
WsBaseResp<WSMemberChange> ws = MemberAdapter.buildMemberRemoveWS(roomGroup.getRoomId(), Arrays.asList(uid), WSMemberChange.CHANGE_TYPE_QUIT);
pushService.sendPushMsg(ws, memberUidList, uid);
groupMemberCache.evictMemberList(room.getId());
groupMemberCache.evictExceptMemberList(room.getId());
groupMemberCache.evictMemberDetail(room.getId(), uid);
// 新版退出群聊
CacheKey uKey = PresenceCacheKeyBuilder.userGroupsKey(uid);
CacheKey gKey = PresenceCacheKeyBuilder.groupMembersKey(room.getId());
cachePlusOps.sRem(gKey, uid);
cachePlusOps.sRem(uKey, room.getId());
asyncOnline(Arrays.asList(uid), room.getId(), false);
// 新版退出群聊
CacheKey uKey = PresenceCacheKeyBuilder.userGroupsKey(uid);
CacheKey gKey = PresenceCacheKeyBuilder.groupMembersKey(room.getId());
cachePlusOps.sRem(gKey, uid);
cachePlusOps.sRem(uKey, room.getId());
asyncOnline(Arrays.asList(uid), room.getId(), false);
}
}
}
@Override
@RedissonLock(prefixKey = "addGroup:", key = "#uid")
@Transactional(rollbackFor = Exception.class)
public Long addGroup(Long uid, GroupAddReq request) {
List<Long> userIdList = request.getUidList();
AssertUtil.isTrue(userIdList.size() > 1,"群聊人数应大于2人");
RoomGroup roomGroup = roomService.createGroupRoom(uid, request);
Map<Long, User> userMap = userCache.getUserInfoBatch(request.getUidList().stream().collect(Collectors.toSet()));
AssertUtil.isTrue(userMap.size() > 1,"群聊人数应大于2人");
// 批量保存群成员
List<GroupMember> groupMembers = RoomAdapter.buildGroupMemberBatch(userIdList, roomGroup.getId());
groupMemberDao.saveBatch(groupMembers);
List<Long> uidList = new ArrayList<>(userMap.keySet());
AtomicReference<Long> roomIdAtomic = new AtomicReference(0L);
// 发送邀请加群消息 ==> 触发每个人的会话
roomGroupCache.evictAllCaches();
// 创建群组数据并推送数据到前端
if(transactionTemplate.execute(e -> {
RoomGroup roomGroup = roomService.createGroupRoom(uid, request);
// 批量保存群成员
List<GroupMember> groupMembers = RoomAdapter.buildGroupMemberBatch(uidList, roomGroup.getId());
groupMemberDao.saveBatch(groupMembers);
roomIdAtomic.set(roomGroup.getRoomId());
return true;
})){
// 发送邀请加群消息 ==> 触发每个人的会话
roomGroupCache.evictAllCaches();
// 处理新房间里面所有在线人员
uidList.add(uid);
groupMemberCache.evictMemberList(roomIdAtomic.get());
groupMemberCache.evictExceptMemberList(roomIdAtomic.get());
// 处理新房间里面所有在线人员
List<Long> uidList = new ArrayList<>(request.getUidList());
uidList.add(uid);
groupMemberCache.evictMemberUidList(roomGroup.getRoomId());
CacheKey gKey = PresenceCacheKeyBuilder.groupMembersKey(roomGroup.getRoomId());
uidList.forEach(id -> {
cachePlusOps.sAdd(gKey, id);
cachePlusOps.sAdd(PresenceCacheKeyBuilder.userGroupsKey(id), roomGroup.getRoomId());
});
asyncOnline(uidList, roomGroup.getRoomId(), true);
SpringUtils.publishEvent(new GroupMemberAddEvent(this, roomGroup.getRoomId(), request.getUidList(), uid));
return roomGroup.getRoomId();
// 更新在线缓存
CacheKey gKey = PresenceCacheKeyBuilder.groupMembersKey(roomIdAtomic.get());
uidList.forEach(id -> {
cachePlusOps.sAdd(gKey, id);
cachePlusOps.sAdd(PresenceCacheKeyBuilder.userGroupsKey(id), roomIdAtomic.get());
});
SpringUtils.publishEvent(new GroupMemberAddEvent(this, roomIdAtomic.get(), request.getUidList(), uid));
asyncOnline(uidList, roomIdAtomic.get(), true);
}
return roomIdAtomic.get();
}
private boolean hasPower(GroupMember self) {
@@ -1013,12 +1038,10 @@ public class RoomAppServiceImpl implements RoomAppService, InitializingBean {
/**
* 获取群角色
*/
private Integer getGroupRole(Long uid, RoomGroup roomGroup, Room room) {
GroupMember member = Objects.isNull(uid) ? null : groupMemberDao.getMemberByGroupId(roomGroup.getId(), uid);
private Integer getGroupRole(Long uid, Long groupId) {
GroupMember member = Objects.isNull(uid) ? null : groupMemberDao.getMemberByGroupId(groupId, uid);
if (Objects.nonNull(member)) {
return GroupRoleAPPEnum.of(member.getRoleId()).getType();
} else if (isHotGroup(room)) {
return GroupRoleAPPEnum.MEMBER.getType();
} else {
return GroupRoleAPPEnum.REMOVE.getType();
}
@@ -1049,7 +1072,7 @@ public class RoomAppServiceImpl implements RoomAppService, InitializingBean {
List<Long> msgIds = roomBaseInfoMap.values().stream().map(RoomBaseInfo::getLastMsgId).collect(Collectors.toList());
List<Message> messages = CollectionUtil.isEmpty(msgIds) ? new ArrayList<>() : messageDao.listByIds(msgIds);
Map<Long, Message> msgMap = messages.stream().collect(Collectors.toMap(Message::getId, Function.identity()));
Map<Long, User> lastMsgUidMap = userInfoCache.getBatch(messages.stream().map(Message::getFromUid).collect(Collectors.toList()));
// Map<Long, User> lastMsgUidMap = userInfoCache.getBatch(messages.stream().map(Message::getFromUid).collect(Collectors.toList()));
// 消息未读数
Map<Long, Integer> unReadCountMap = getUnReadCountMap(contactMap.values());
@@ -1089,18 +1112,13 @@ public class RoomAppServiceImpl implements RoomAppService, InitializingBean {
AbstractMsgHandler strategyNoNull = MsgHandlerFactory.getStrategyNoNull(message.getType());
// 判断是群聊还是单聊
if (Objects.equals(roomBaseInfo.getType(), RoomTypeEnum.GROUP.getType())) {
resp.setText(strategyNoNull.showContactMsg(message));
GroupMember messageUser = groupMemberCache.getMemberDetail(roomId, message.getFromUid());
if (ObjectUtil.isNotNull(messageUser)) {
if (StrUtil.isNotEmpty(messageUser.getMyName())) {
resp.setText(messageUser.getMyName() + ":" + strategyNoNull.showContactMsg(message));
}
// 当自己查看时,且最后一条消息是自己发送的,那么显示群备注
if (uid.equals(message.getFromUid()) && StrUtil.isNotEmpty(messageUser.getRemark())){
resp.setRemark(messageUser.getRemark());
}
} else {
resp.setText((lastMsgUidMap.get(message.getFromUid()).getName()) + ":" + strategyNoNull.showContactMsg(message));
}
} else {
resp.setText(strategyNoNull.showContactMsg(message));

View File

@@ -11,7 +11,6 @@ import com.luohuo.flex.im.core.chat.dao.AnnouncementsReadRecordDao;
import com.luohuo.flex.im.domain.entity.Announcements;
import com.luohuo.flex.im.domain.entity.AnnouncementsReadRecord;
import com.luohuo.flex.im.domain.vo.request.GroupAddReq;
import com.luohuo.flex.im.domain.vo.res.GroupListVO;
import com.luohuo.flex.im.domain.vo.response.AnnouncementsResp;
import com.luohuo.flex.im.core.chat.dao.GroupMemberDao;
import com.luohuo.flex.im.core.chat.dao.RoomDao;
@@ -27,6 +26,7 @@ import com.luohuo.flex.im.core.chat.service.RoomService;
import com.luohuo.flex.im.core.chat.service.adapter.ChatAdapter;
import com.luohuo.flex.im.domain.entity.User;
import com.luohuo.flex.im.core.user.service.cache.UserInfoCache;
import com.luohuo.flex.im.domain.vo.response.MemberResp;
import lombok.AllArgsConstructor;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;
@@ -83,10 +83,9 @@ public class RoomServiceImpl implements RoomService {
}
@Override
@Transactional(rollbackFor = Exception.class)
public RoomGroup createGroupRoom(Long uid, GroupAddReq groupAddReq) {
List<GroupMember> selfGroup = groupMemberDao.getSelfGroup(uid);
AssertUtil.isTrue(selfGroup.size() < 5, "每个人只能创建个群");
AssertUtil.isTrue(selfGroup.size() < 20, "每个人只能创建20个群");
User user = userInfoCache.get(uid);
Room room = createRoom(RoomTypeEnum.GROUP);
// 插入群
@@ -104,8 +103,8 @@ public class RoomServiceImpl implements RoomService {
}
@Override
public void groupList(Long uid, IPage<GroupListVO> page) {
roomDao.groupList(uid, page);
public List<MemberResp> groupList(Long uid) {
return roomDao.groupList(uid);
}
@Override

View File

@@ -25,7 +25,7 @@ public class ItemCache {
return itemConfigDao.getByType(type);
}
@Cacheable(cacheNames = "luohuo:item", key = "'item:'+#itemId")
@Cacheable(cacheNames = "luohuo:item", key = "'item:'+#itemId", unless = "#result == null")
public ItemConfig getById(Long itemId) {
return itemConfigDao.getById(itemId);
}

View File

@@ -183,8 +183,8 @@ public class ApplyServiceImpl implements ApplyService {
if(invite.getType().equals(RoomTypeEnum.FRIEND.getType())){
AssertUtil.equal(invite.getStatus(), AGREE.getCode(), "已同意好友申请");
// 同意申请
AtomicReference<Long> atomicRoomId = null;
AtomicReference<Boolean> atomicIsFromTempSession = null;
AtomicReference<Long> atomicRoomId = new AtomicReference(0L);
AtomicReference<Boolean> atomicIsFromTempSession = new AtomicReference(false);
transactionTemplate.execute(e -> {
userApplyDao.agree(request.getApplyId());
UserFriend userFriend = userFriendDao.getByFriend(uid, invite.getUid());

View File

@@ -5,6 +5,7 @@ import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.ObjectUtil;
import com.luohuo.basic.cache.redis2.CacheResult;
import com.luohuo.basic.cache.repository.CachePlusOps;
import com.luohuo.basic.exception.BizException;
import com.luohuo.basic.model.cache.CacheHashKey;
import com.luohuo.flex.common.cache.common.FeedMediaRelCacheKeyBuilder;
import com.luohuo.flex.im.common.constant.RedisKey;
@@ -118,7 +119,7 @@ public class FeedServiceImpl implements FeedService {
* @param uid 操作人
* @param feed 朋友圈
*/
private void saveFeed(FeedParam param, Long uid, Feed feed) {
public void saveFeed(FeedParam param, Long uid, Feed feed) {
List<Long> pushList = new ArrayList<>();
List<FeedTarget> feedTargets = new ArrayList<>();
List<FeedMedia> mediaList = new ArrayList<>();
@@ -304,11 +305,11 @@ public class FeedServiceImpl implements FeedService {
public Boolean editFeed(Long uid, FeedParam param){
Feed feed = feedDao.getById(param.getId());
if(ObjectUtil.isNull(feed)){
throw new RuntimeException("请选择一条朋友圈!");
throw new BizException("请选择一条朋友圈!");
}
if(!feed.getUid().equals(uid)){
throw new RuntimeException("只能编辑自己的朋友圈!");
throw new BizException("只能编辑自己的朋友圈!");
}
// 1. 更新朋友圈基础信息

View File

@@ -2,11 +2,12 @@
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.luohuo.flex.im.core.chat.mapper.RoomMapper">
<select id="groupList" resultType="com.luohuo.flex.im.domain.vo.res.GroupListVO">
<select id="groupList" resultType="com.luohuo.flex.im.domain.vo.response.MemberResp">
select distinct r.id as roomId,
g.name as roomName,
g.name as groupName,
g.avatar as avatar,
g.id as groupId,
g.account,
r.update_time
from im_room_group g
inner join im_room r on g.room_id = r.id

View File

@@ -4,7 +4,6 @@ import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.luohuo.flex.im.domain.vo.req.IdReqVO;
import com.luohuo.flex.im.domain.vo.req.room.UserApplyResp;
import com.luohuo.flex.im.domain.vo.res.GroupListVO;
import com.luohuo.flex.im.domain.vo.res.IdRespVO;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
@@ -67,10 +66,9 @@ public class RoomController {
@Operation(summary = "群聊列表")
@GetMapping("/group/list")
public R<IPage<GroupListVO>> groupList(@RequestParam("current") Long current, @RequestParam("size") Long size){
public R<List<MemberResp>> groupList(){
Long uid = ContextUtil.getUid();
IPage<GroupListVO> page = new Page<>(current,size);
return R.success(roomService.groupList(uid,page));
return R.success(roomService.groupList(uid));
}
@GetMapping("/group/member/page")

View File

@@ -1,32 +0,0 @@
package com.luohuo.flex.im.domain.vo.res;
import lombok.Data;
/**
* 群聊列表 VO
*
* @author Tian
* @date 2024/12/31
*/
@Data
public class GroupListVO {
/**
* 组 ID
*/
private Long groupId;
/**
* 房间 ID
*/
private Long roomId;
/**
* 房间名称
*/
private String roomName;
/**
* 群头像
*/
private String avatar;
}

View File

@@ -14,6 +14,8 @@ import lombok.NoArgsConstructor;
@AllArgsConstructor
@NoArgsConstructor
public class MemberResp {
@Schema(description ="群聊id")
private Long groupId;
@Schema(description ="房间id")
private Long roomId;
@Schema(description ="群名称")
@@ -24,8 +26,6 @@ public class MemberResp {
private Long onlineNum;
@Schema(description ="成员角色 1群主 2管理员 3普通成员 4踢出群聊")
private Integer roleId;
@Schema(description = "群号")
private String accountCode;
@Schema(description = "群号")
private String account;
@Schema(description = "群成员数")

View File

@@ -8,6 +8,7 @@ luohuo:
username: ${NACOS_USERNAME:@nacos.username@}
password: ${NACOS_PASSWORD:@nacos.password@}
web-port: ${NACOS_WEB_PORT:@nacos.web-port@}
local-ip: ${NACOS_LOCAL_IP:@nacos.local-ip@}
seata:
ip: ${SEATA_IP:@seata.ip@}
port: ${SEATA_PORT:@seata.port@}
@@ -52,6 +53,7 @@ spring:
username: ${luohuo.nacos.username}
password: ${luohuo.nacos.password}
server-addr: ${luohuo.nacos.ip}:${luohuo.nacos.port}
ip: ${luohuo.nacos.local-ip}
namespace: ${luohuo.nacos.namespace}
metadata: # 元数据,用于权限服务实时获取各个服务的所有接口
management.context-path: ${server.servlet.context-path:}${spring.mvc.servlet.path:}${management.endpoints.web.base-path:}

View File

@@ -387,7 +387,7 @@ public abstract class AbstractTokenGranter implements TokenGranter {
tokenSession.setLoginId(userInfo.getId());
tokenSession.set(JWT_KEY_SYSTEM_TYPE, userInfo.getSystemType());
tokenSession.set(JWT_KEY_DEVICE, deviceType);
tokenSession.set(CLIENT_HEADER, clientId);
tokenSession.set(CLIENT_ID, clientId);
if (org.getCurrentTopCompanyId() != null) {
tokenSession.set(JWT_KEY_TOP_COMPANY_ID, org.getCurrentTopCompanyId());
} else {
@@ -473,7 +473,7 @@ public abstract class AbstractTokenGranter implements TokenGranter {
if (CollUtil.isNotEmpty(sameDeviceTokens)) {
for (String token : sameDeviceTokens) {
try {
String clientId = StpUtil.getTokenSessionByToken(token).getString(CLIENT_HEADER);
String clientId = StpUtil.getTokenSessionByToken(token).getString(CLIENT_ID);
StpUtil.kickout(token);
log.info("已踢出会话: token={}", token);

View File

@@ -44,6 +44,7 @@ public class LoginParamVO {
private String mobile;
@Schema(description = "客户端指纹信息")
@NotNull(message = "设备指纹信息异常")
private String clientId = "";
/**

View File

@@ -7,11 +7,13 @@ import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.luohuo.basic.exception.BizException;
import com.luohuo.basic.model.cache.CacheHashKey;
import com.luohuo.basic.model.cache.CacheKey;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.Cursor;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ScanOptions;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.util.*;
@@ -25,27 +27,27 @@ import java.util.stream.Collectors;
@Component
public class NacosRouterService {
private final NamingService namingService;
private final StringRedisTemplate stringRedisTemplate;
RedisTemplate<String, Object> redisTemplate;
@Autowired
public NacosRouterService(
NacosServiceManager nacosServiceManager,
NacosDiscoveryProperties discoveryProperties,
StringRedisTemplate stringRedisTemplate
RedisTemplate<String, Object> redisTemplate
) {
this.namingService = nacosServiceManager.getNamingService(discoveryProperties.getNacosProperties());
this.stringRedisTemplate = stringRedisTemplate;
this.redisTemplate = redisTemplate;
}
// 查询用户设备
public Set<String> getUserDevices(Long uid) {
String globalKey = "luohuo:router:device-node-mapping";
CacheHashKey deviceNodeMap = RouterCacheKeyBuilder.buildDeviceNodeMap(uid + ":*");
// 扫描全局Hash中属于该用户的设备字段 (uid:*)
ScanOptions options = ScanOptions.scanOptions().match(uid + ":*").count(100).build();
Set<String> deviceFields = new HashSet<>();
try (Cursor<Map.Entry<Object, Object>> cursor =
stringRedisTemplate.opsForHash().scan(globalKey, options)) {
redisTemplate.opsForHash().scan(deviceNodeMap.getKey(), options)) {
while (cursor.hasNext()) {
Map.Entry<Object, Object> entry = cursor.next();
deviceFields.add((String) entry.getKey());
@@ -64,9 +66,9 @@ public class NacosRouterService {
*/
public String getDeviceNode(Long uid, String clientId) {
// 1. 直接从全局Hash中获取设备对应的节点
String globalKey = "luohuo:router:device-node-mapping";
String deviceField = uid + ":" + clientId;
String nodeId = (String) stringRedisTemplate.opsForHash().get(globalKey, deviceField);
CacheHashKey deviceNodeMap = RouterCacheKeyBuilder.buildDeviceNodeMap(deviceField);
String nodeId = (String) redisTemplate.opsForHash().get(deviceNodeMap.getKey(), deviceField);
// 2. 如果节点不存在直接返回null
if (nodeId == null) {
@@ -83,9 +85,9 @@ public class NacosRouterService {
* @param nodeId 节点值
*/
public Map<Long, List<String>> getDevicesByNode(String nodeId) {
String key = "luohuo:router:node-devices:" + nodeId;
return stringRedisTemplate.opsForSet().members(key).stream()
.map(entry -> entry.split(":"))
CacheKey cacheKey = RouterCacheKeyBuilder.buildNodeDevices(nodeId);
return redisTemplate.opsForSet().members(cacheKey.getKey()).stream()
.map(entry -> entry.toString().split(":"))
.collect(Collectors.groupingBy(
parts -> Long.parseLong(parts[0]),
Collectors.mapping(parts -> parts[1], Collectors.toList())
@@ -100,13 +102,13 @@ public class NacosRouterService {
*/
public void removeDeviceRoute(Long uid, String clientId, String nodeId) {
// 1. 从全局Hash中删除设备-节点映射
String globalKey = "luohuo:router:device-node-mapping";
String deviceField = uid + ":" + clientId;
stringRedisTemplate.opsForHash().delete(globalKey, deviceField);
CacheHashKey deviceNodeMap = RouterCacheKeyBuilder.buildDeviceNodeMap(deviceField);
redisTemplate.opsForHash().delete(deviceNodeMap.getKey(), deviceField);
// 2. 从节点设备集合中删除设备标识
String nodeDevicesKey = "luohuo:router:node-devices:" + nodeId;
stringRedisTemplate.opsForSet().remove(nodeDevicesKey, deviceField);
CacheKey cacheKey = RouterCacheKeyBuilder.buildNodeDevices(nodeId);
redisTemplate.opsForSet().remove(cacheKey.getKey(), deviceField);
log.debug("移除设备路由: uid={}, clientId={}, nodeId={}", uid, clientId, nodeId);
}
@@ -123,7 +125,7 @@ public class NacosRouterService {
Set<Long> targetUids = new HashSet<>(uids);
// 2. 获取全局设备-节点映射改用HSCAN分批加载
String globalKey = "luohuo:router:device-node-mapping";
CacheHashKey deviceNodeMap = RouterCacheKeyBuilder.buildDeviceNodeMap("");
Map<String, Map<String, Long>> result = new ConcurrentHashMap<>();
// 3. 过滤活跃节点
@@ -131,7 +133,7 @@ public class NacosRouterService {
// 5. 使用HSCAN游标分批遍历
ScanOptions options = ScanOptions.scanOptions().count(500).build();
try (Cursor<Map.Entry<Object, Object>> cursor = stringRedisTemplate.opsForHash().scan(globalKey, options)) {
try (Cursor<Map.Entry<Object, Object>> cursor = redisTemplate.opsForHash().scan(deviceNodeMap.getKey(), options)) {
while (cursor.hasNext()) {
Map.Entry<Object, Object> entry = cursor.next();

View File

@@ -0,0 +1,85 @@
package com.luohuo.flex.router;
import com.luohuo.basic.model.cache.CacheHashKey;
import com.luohuo.basic.model.cache.CacheKey;
import com.luohuo.basic.model.cache.CacheKeyBuilder;
import java.time.Duration;
/**
* 路由模块缓存键构建器
*/
public class RouterCacheKeyBuilder {
public static CacheHashKey buildDeviceNodeMap(String clientId) {
return new DeviceNodeMapping().hashFieldKey(clientId, "device-node-mapping");
}
public static CacheKey buildNodeDevices(String nodeId) {
return new NodeDevices().key(nodeId);
}
/**
* 设备-节点映射表
*/
public static class DeviceNodeMapping implements CacheKeyBuilder {
@Override
public String getPrefix() {
return "luohuo";
}
@Override
public String getTenant() {
return null;
}
@Override
public String getTable() {
return "router";
}
@Override
public ValueType getValueType() {
return ValueType.string;
}
@Override
public Duration getExpire() {
return Duration.ofSeconds(-1);
}
}
/**
* 节点设备集合
*/
public static class NodeDevices implements CacheKeyBuilder {
@Override
public String getPrefix() {
return "luohuo";
}
@Override
public String getTenant() {
return null;
}
@Override
public String getModular() {
return "router";
}
@Override
public String getTable() {
return "node-devices";
}
@Override
public ValueType getValueType() {
return ValueType.string;
}
@Override
public Duration getExpire() {
return Duration.ofSeconds(-1);
}
}
}

View File

@@ -30,6 +30,11 @@ public class ReactiveWebSocketHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
// 0. 检查服务状态
if (!sessionManager.isAcceptingNewConnections()) {
return session.close(CloseStatus.SERVICE_RESTARTED);
}
// 1. 处理消息用户id、指纹信息
String clientId = extractClientId(session);
Long uid = ReactiveContextUtil.getUid();

View File

@@ -61,6 +61,8 @@ public class SessionManager {
// 线程池实例数组(按节点分片)
private ThreadPoolExecutor[] sessionExecutors;
// 服务状态 -> 默认可用
private final AtomicBoolean acceptingNewConnections = new AtomicBoolean(true);
// Session -> clientId, 通过用户会话反向查找用户设备指纹
public final ConcurrentHashMap<String, String> SESSION_CLIENT_MAP = new ConcurrentHashMap<>();
@@ -69,6 +71,15 @@ public class SessionManager {
// uid → (clientId → 会话集合) 管理的是单个用户在此服务上所有ws链接CopyOnWriteArrayList 频繁写入性能较差 所以用Set
private final ConcurrentHashMap<Long, Map<String, Set<WebSocketSession>>> USER_DEVICE_SESSION_MAP = new ConcurrentHashMap<>();
public void setAcceptingNewConnections(boolean accepting) {
acceptingNewConnections.set(accepting);
log.info("新连接接入状态: {}", accepting);
}
public boolean isAcceptingNewConnections() {
return acceptingNewConnections.get();
}
/**
* 获取会话数量,[同一个设备可能有多个会话]
*/
@@ -356,6 +367,8 @@ public class SessionManager {
* 清空所有会话
*/
public void clean() {
// 1. 标记服务不可用状态
setAcceptingNewConnections(false);
// 1. 收集所有设备信息
Map<Long, Set<String>> offlineDevices = new HashMap<>();
USER_DEVICE_SESSION_MAP.forEach((uid, deviceMap) -> offlineDevices.put(uid, new HashSet<>(deviceMap.keySet())));

View File

@@ -6,6 +6,10 @@ import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.client.naming.utils.CollectionUtils;
import com.luohuo.basic.cache.repository.CachePlusOps;
import com.luohuo.basic.model.cache.CacheHashKey;
import com.luohuo.basic.model.cache.CacheKey;
import com.luohuo.flex.router.RouterCacheKeyBuilder;
import com.luohuo.flex.ws.websocket.SessionManager;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
@@ -13,7 +17,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
@@ -44,7 +48,10 @@ public class NacosSessionRegistry {
@Resource
private SessionManager sessionManager;
private StringRedisTemplate stringRedisTemplate;
@Resource
private CachePlusOps cachePlusOps;
RedisTemplate<String, Object> redisTemplate;
// 节点唯一标识
private String nodeId;
@@ -54,9 +61,9 @@ public class NacosSessionRegistry {
private int nodePort;
@Autowired
public NacosSessionRegistry(NacosServiceManager nacosServiceManager, StringRedisTemplate stringRedisTemplate,
public NacosSessionRegistry(NacosServiceManager nacosServiceManager, RedisTemplate<String, Object> redisTemplate,
@Value("${luohuo.node-id}") String nodeId, @Value("${server.port}") int nodePort, NacosDiscoveryProperties discoveryProperties) {
this.stringRedisTemplate = stringRedisTemplate;
this.redisTemplate = redisTemplate;
this.nodeId = nodeId;
this.nodePort = nodePort;
@@ -115,14 +122,13 @@ public class NacosSessionRegistry {
* @param uid 用户id
*/
public void addUserRoute(Long uid, String clientId) {
// 1. 设备指纹→节点映射
String globalKey = "luohuo:router:device-node-mapping";
String deviceField = uid + ":" + clientId;
stringRedisTemplate.opsForHash().put(globalKey, deviceField, nodeId);
// 1. 设备指纹→节点映射
cachePlusOps.hSet(RouterCacheKeyBuilder.buildDeviceNodeMap(deviceField), nodeId);
// 2. 节点→设备指纹映射
String nodeDevicesKey = "luohuo:router:node-devices:" + nodeId;
stringRedisTemplate.opsForSet().add(nodeDevicesKey, deviceField);
cachePlusOps.sAdd(RouterCacheKeyBuilder.buildNodeDevices(nodeId), deviceField);
// 3. 更新节点元数据
Map<String, String> metadata = nodeInstance.getMetadata();
@@ -136,15 +142,15 @@ public class NacosSessionRegistry {
* @param uid 用户id
*/
public void removeDeviceRoute(Long uid, String clientId) {
String globalKey = "luohuo:router:device-node-mapping";
String deviceField = uid + ":" + clientId;
// 清理设备→节点映射
stringRedisTemplate.opsForHash().delete(globalKey, deviceField);
CacheHashKey deviceNodeMap = RouterCacheKeyBuilder.buildDeviceNodeMap(deviceField);
cachePlusOps.hDel(deviceNodeMap);
// 清理节点→设备映射
String nodeKey = "luohuo:router:node-devices:" + nodeId;
stringRedisTemplate.opsForSet().remove(nodeKey, deviceField);
CacheKey nodeDevices = RouterCacheKeyBuilder.buildNodeDevices(nodeId);
cachePlusOps.sRem(nodeDevices, deviceField);
}
/**
@@ -152,16 +158,16 @@ public class NacosSessionRegistry {
*/
public void cleanupNodeRoutes() {
// 1. 清理节点→设备映射
String nodeDevicesKey = "luohuo:router:node-devices:" + nodeId;
Set<String> deviceFields = stringRedisTemplate.opsForSet().members(nodeDevicesKey);
CacheKey cacheKey = RouterCacheKeyBuilder.buildNodeDevices(nodeId);
Set<Object> deviceFields = redisTemplate.opsForSet().members(cacheKey.getKey());
if (!CollectionUtils.isEmpty(deviceFields)) {
// 批量删除全局Hash中的映射
String globalKey = "luohuo:router:device-node-mapping";
stringRedisTemplate.opsForHash().delete(globalKey, deviceFields.toArray());
CacheHashKey deviceNodeMap = RouterCacheKeyBuilder.buildDeviceNodeMap("");
redisTemplate.opsForHash().delete(deviceNodeMap.getKey(), deviceFields.toArray());
// 清理节点本地映射
stringRedisTemplate.delete(nodeDevicesKey);
redisTemplate.delete(cacheKey.getKey());
}
log.info("节点路由清理完成: nodeId={}, 清理设备数={}", nodeId, deviceFields != null ? deviceFields.size() : 0);
}

View File

@@ -3,10 +3,11 @@ package com.luohuo.flex.ws.websocket.nacos.listener;
import com.luohuo.basic.mq.redis.core.pubsub.AbstractRedisChannelMessageListener;
import com.luohuo.flex.model.entity.WsBaseResp;
import com.luohuo.flex.router.NacosRouterService;
import com.luohuo.flex.router.RouterCacheKeyBuilder;
import com.luohuo.flex.ws.websocket.SessionManager;
import com.luohuo.flex.ws.websocket.entity.NodeDownMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.util.List;
@@ -23,12 +24,12 @@ public class NodeDownMessageListener extends AbstractRedisChannelMessageListener
private final SessionManager sessionManager;
private final NacosRouterService routerService;
private final StringRedisTemplate stringRedisTemplate;
RedisTemplate<String, Object> redisTemplate;
public NodeDownMessageListener(SessionManager sessionManager, NacosRouterService routerService, StringRedisTemplate stringRedisTemplate) {
public NodeDownMessageListener(SessionManager sessionManager, NacosRouterService routerService, RedisTemplate<String, Object> redisTemplate) {
this.sessionManager = sessionManager;
this.routerService = routerService;
this.stringRedisTemplate = stringRedisTemplate;
this.redisTemplate = redisTemplate;
}
@Override
@@ -42,7 +43,7 @@ public class NodeDownMessageListener extends AbstractRedisChannelMessageListener
// 1. 获取受影响用户
Map<Long, List<String>> deviceMap = routerService.getDevicesByNode(downNodeId);
deviceMap.forEach((uid, clientIds) -> {
deviceMap.forEach((uid, clientIds) ->
clientIds.forEach(clientId -> {
sessionManager.syncOnline(uid, clientId, false); // 标记设备下线
// 1. 按设备通知重连
@@ -50,11 +51,10 @@ public class NodeDownMessageListener extends AbstractRedisChannelMessageListener
// 2. 清理路由
routerService.removeDeviceRoute(uid, clientId, downNodeId);
});
});
})
);
String nodeKey = "luohuo:router:node-devices:" + downNodeId;
stringRedisTemplate.delete(nodeKey);
redisTemplate.delete(RouterCacheKeyBuilder.buildNodeDevices(downNodeId).getKey());
} catch (Exception e) {
log.error("处理节点下线失败", e);
}

View File

@@ -16,7 +16,7 @@ import java.util.List;
* 功能其他没有依赖luohuo-router的服务比如oauth服务; 需要将消息推送给任何一个用户就需要先将消息推送到当前消费者,再由当前消费者将消息推送到目标 uidList 所在的ws节点
* 直接走 PushService 的服务:直接转发到 websocket_push 子节点从而跳过当前消息信箱节省一次网络io + 序列化
* MessageRouterService 目前有TokenExpireListener事件、私聊群聊音视频在用; TokenExpireListener在 oauth服务oauth无法知道目标在那个节点因此需要经过当前路由工具
* 动态路由: 使用 Redis 存储 luohuo:router:user-device-nodes:{uid}:{clientId} 映射关系, NacosRouterService.findNodeDeviceUser() 批量查询用户所在节点
* 动态路由: 使用 Redis 存储 com.luohuo.flex.router.RouterCacheKeyBuilder.DeviceNodeMapping({uid}:{clientId}) 映射关系, NacosRouterService.findNodeDeviceUser() 批量查询用户所在节点
* 高效分发: 从本质上避免广播风暴,减少网络开销
* 节点隔离: 每个节点只处理自己的 websocket_push{nodeId} 消息, 推送时只处理本节点连接的用户
*/

View File

@@ -9,6 +9,7 @@ luohuo:
username: ${NACOS_USERNAME:@nacos.username@}
password: ${NACOS_PASSWORD:@nacos.password@}
web-port: ${NACOS_WEB_PORT:@nacos.web-port@}
local-ip: ${NACOS_LOCAL_IP:@nacos.local-ip@}
seata:
ip: ${SEATA_IP:@seata.ip@}
port: ${SEATA_PORT:@seata.port@}
@@ -53,6 +54,7 @@ spring:
username: ${luohuo.nacos.username}
password: ${luohuo.nacos.password}
server-addr: ${luohuo.nacos.ip}:${luohuo.nacos.port}
ip: ${luohuo.nacos.local-ip}
namespace: ${luohuo.nacos.namespace}
metadata: # 元数据,用于权限服务实时获取各个服务的所有接口
management.context-path: ${server.servlet.context-path:}${spring.mvc.servlet.path:}${management.endpoints.web.base-path:}

View File

@@ -95,7 +95,7 @@ public abstract class BaseRedis {
}
protected void setExpire(CacheKey key) {
if (key != null && key.getExpire() != null) {
if (key != null && key.getExpire() != null && key.getExpire().getSeconds() > 0) {
redisTemplate.expire(key.getKey(), key.getExpire());
}
}

View File

@@ -253,7 +253,7 @@ public class RedisOpsImpl implements CacheOps, CachePlusOps {
@Override
public Long sAdd(@NonNull CacheKey key, Object value) {
Long result = redisOps.sAdd(key, value);
if (key.getExpire() != null) {
if (key.getExpire() != null && key.getExpire().getSeconds() > 0) {
redisOps.expire(key.getKey(), key.getExpire());
}
return result;

View File

@@ -96,7 +96,7 @@ public final class ContextConstants {
/**
* 请求头和线程变量中的 客户端指纹信息
*/
public static final String CLIENT_HEADER = "client_header";
public static final String CLIENT_ID = "clientId";
/**
* 是否boot项目
*/

View File

@@ -49,14 +49,6 @@ public final class JsonUtil {
return StrPool.EMPTY;
}
public static byte[] toJsonAsBytes(Object object) {
try {
return getInstance().writeValueAsBytes(object);
} catch (JsonProcessingException e) {
throw new BizException(ResponseEnum.JSON_PARSE_ERROR.getCode(), e);
}
}
public static <T> T parse(String content, Class<T> valueType) {
if (StrUtil.isEmpty(content)) {
return null;
@@ -92,18 +84,26 @@ public final class JsonUtil {
return getInstance().convertValue(in, typeReference);
}
public static <T> List<T> parseArray(String content, Class<T> valueTypeRef) {
if (StrUtil.isEmpty(content)) {
return Collections.emptyList();
}
if (!StrUtil.startWith(content, StrPool.LEFT_SQ_BRACKET)) {
content = StrPool.LEFT_SQ_BRACKET + content + StrPool.RIGHT_SQ_BRACKET;
/**
* 这个方法解析纯json字符串的
* @param content json字符串
* @param valueTypeRef 对应的类型
*/
public static <T> List<T> parseArray(String content, Class<T> valueTypeRef) {
if (StrUtil.isEmpty(content)) {
return Collections.emptyList();
}
List<Map<String, Object>> list = getInstance().convertValue(content, new TypeReference<>() {
});
return list.stream().map((map) -> toPojo(map, valueTypeRef)).toList();
}
try {
if (!StrUtil.startWith(content, StrPool.LEFT_SQ_BRACKET)) {
content = StrPool.LEFT_SQ_BRACKET + content + StrPool.RIGHT_SQ_BRACKET;
}
List<Map<String, Object>> list = getInstance().readValue(content, new TypeReference<>() {});
return list.stream().map((map) -> toPojo(map, valueTypeRef)).toList();
} catch (IOException e) {
log.error(e.getMessage(), e);
}
return Collections.emptyList();
}
public static Map<String, Object> toMap(String content) {
return getInstance().convertValue(content, Map.class);