fix:分布式系统下的websocket通信

This commit is contained in:
ivmiku
2024-08-28 23:14:52 +08:00
parent be3c8728fb
commit e6048a8568
5 changed files with 30 additions and 14 deletions

View File

@@ -11,11 +11,7 @@ import jakarta.websocket.*;
import jakarta.websocket.server.PathParam;
import jakarta.websocket.server.ServerEndpoint;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
@@ -28,7 +24,6 @@ import java.text.ParseException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import static jakarta.websocket.CloseReason.CloseCodes.CLOSED_ABNORMALLY;
@@ -70,6 +65,7 @@ public class WebSocketServer implements ApplicationContextAware {
if (relationService.ifIgnored(msg.getToId(), msg.getFromId())) {
session.getBasicRemote().sendText("您已被对方屏蔽");
} else {
messageService.insertMessage(msg);
rabbitTemplate.convertAndSend("exchange1", "", JSON.toJSONString(msg));
}
}
@@ -133,10 +129,11 @@ public class WebSocketServer implements ApplicationContextAware {
value = @Queue(),
exchange = @Exchange(value = "exchange1",type = ExchangeTypes.FANOUT)
))
public void sendMsg(String message) throws IOException, ParseException {
public void sendMsg(String message) throws IOException {
Message msg = JSON.parseObject(message, Message.class);
if (sessionMap.containsKey(msg.getToId())) {
sessionMap.get(msg.getToId()).getBasicRemote().sendText(JSON.toJSONString(msg));
messageService.deleteMessage(msg);
}
}
}

View File

@@ -104,9 +104,12 @@ public class MessageService {
* @return 查询结果
*/
public List<Message> getUnreadMsg(String userId) {
List<Message> result = redisUtil.listGet("unread:" + userId, 0, -1);
redisUtil.listClear(userId);
return result;
if (redisUtil.ifExist("unread:" + userId)) {
List<Message> result = redisUtil.listGet("unread:" + userId, 0, -1);
redisUtil.listClear("unread:" + userId);
return result;
}
return new ArrayList<>();
}
/**
@@ -233,4 +236,13 @@ public class MessageService {
}
}
}
public void insertMessage(Message message) {
String id = message.getToId();
redisUtil.listAdd("unread:" + id, message);
}
public void deleteMessage(Message message) {
redisUtil.deleteFromList(message);
}
}

View File

@@ -122,4 +122,9 @@ public class RedisUtil {
public String getKey(String userId) {
return (String) redisTemplate.opsForValue().get("sessionkey:" + userId);
}
public void deleteFromList(Message message) {
String id = message.getToId();
redisTemplate.opsForList().remove("unread:" + id, 0, message);
}
}

View File

@@ -11,8 +11,10 @@ server.port=8072
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.password=12345abcde
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/tutorial?useUnicode=true&characterEncoding=utf8&useSSL=false&ServerTimezone=Asia/Shanghai
dubbo.application.qos-enable=false
dubbo.application.qos-enable=false
spring.rabbitmq.port=5672