使用 Spring Boot、Redis Pub/Sub 和 Redis Streams 水平扩展 WebSocket 服务器
我的 WebSocket 服务器系列
01:在微服务架构中构建 WebSocket 服务器
02:使用发布-订阅模式横向扩展 WebSocket 服务器的设计注意事项
03:使用 Spring Boot、Redis Pub/Sub 和 Redis Streams 实现可扩展的 WebSocket 服务器
快速回顾
使用发布-订阅模式在微服务架构中扩展 WebSocket 服务器的完整设计
在上一篇文章中,我们确定了水平扩展 WebSocket 服务器和后端微服务时会出现的两个问题:
问题 #1:由于负载均衡器导致的消息丢失
问题 #2:由于多个订阅者导致的重复消息处理
解决方案是将带有消费者群体概念的发布-订阅消息传递模式应用于架构设计。
开始
接下来使用 Spring Boot、Stomp、Redis Pub/Sub 和 Redis Streams 构建可扩展的 WebSocket 服务器。
第 1 步:构建 WebSocket 服务器
按照我之前文章的第 1 步和第 2 步,使用 Spring Boot 和 STOMP 消息传递协议初始化 WebSocket 服务器。
第二步:启动 Redis 服务器
如需快速设置,请使用 docker 在本地运行 Redis 服务器。
docker run --name redis -p 6379:6379 -d redis
第 3 步:配置与 Redis 服务器的连接
在WebSocket服务器的文件中添加如下配置,application.yml连接Redis服务器。
# application.yml
spring.redis:
主机: localhost
端口: 6379
第 4 步:为单向实时通信实施 Pub/Sub(广播频道)
使用 API 和 Pub/Sub(广播)的单向实时通信设计
步骤 4.1:创建 BroadcastEvent 类
BroadcastEvent是一个自定义对象,用于将消息从 WebSocket 服务器的一个实例广播到 WebSocket 服务器的所有实例。
data class BroadcastEvent( @JsonProperty("topic") val topic: String, @JsonProperty("message") val message: String): Serializable
步骤 4.2:配置 Redis Pub/Sub — ReactiveRedisTemplate
ReactiveRedisTemplate是一个帮助类,用于简化 Redis 数据访问代码。在我们的配置中,我们发布/订阅该值BroadcastEvent并Jackson2JsonRedisSerializer用于执行该值的自动序列化/反序列化。
@Configurationclass RedisConfig { @Bean fun reactiveRedisTemplate(factory: LettuceConnectionFactory): ReactiveRedisTemplate<String, BroadcastEvent> { val serializer = Jackson2JsonRedisSerializer(BroadcastEvent::class.java) val builder = RedisSerializationContext.newSerializationContext<String, BroadcastEvent>(StringRedisSerializer()) val context = builder.value(serializer).build() return ReactiveRedisTemplate(factory, context) }}
步骤 4.3:配置 Redis Pub/Sub — 广播服务
RedisBroadcastService包含发布和订阅自定义频道 ( BROADCAST-CHANNEL) 的逻辑。这是用于将消息从 WebSocket 服务器的一个实例广播到 WebSocket 服务器的所有实例的通道。
@Serviceclass RedisBroadcastService( private val reactiveRedisTemplate: ReactiveRedisTemplate<String, BroadcastEvent>, private val websocketTemplate: SimpMessagingTemplate) { fun publish(event: BroadcastEvent) { reactiveRedisTemplate.convertAndSend("BROADCAST-CHANNEL", event).subscribe() } @PostConstruct fun subscribe() { reactiveRedisTemplate.listenTo(ChannelTopic.of("BROADCAST-CHANNEL")) .map(ReactiveSubscription.Message<String, BroadcastEvent>::getMessage) .subscribe { message -> websocketTemplate.convertAndSend(message.topic, message.message) } }}
注意:@PostConstruct是一个 Spring 注释,它允许我们将自定义操作附加到 bean 创建,并且这些方法只运行一次。在我们的例子中,我们订阅了BROADCAST-CHANNELon bean 创建。
步骤 4.4:创建 API 端点
下面的代码创建一个带有 POST 请求端点的 REST 控制器,该端点接收请求正文NewMessageRequest。topic是客户端(前端)订阅的 STOMP 目的地,是message字符串格式的实际消息。
@RestController@RequestMapping("/api/notification")class NotificationController(private val redisBroadcastService: RedisBroadcastService) { @PostMapping fun newMessage(@RequestBody request: NewMessageRequest) { val event = BroadcastEvent(request.topic, request.message) redisBroadcastService.publish(event) }}
API 请求将被广播到上面步骤 4.3 中配置的所有 WebSocket 服务器实例。
步骤 4.5:通过 API 测试单向实时通信
启动 WebSocket 服务器,使用WebSocket 调试工具ws://localhost:8080/stomp通过 STOMP 协议连接到 WebSocket 服务器。连接后,配置 WebSocket 调试器工具以订阅主题。
接下来,使用以下 curl 命令向 WebSocket 服务器发送 HTTP POST 请求:
curl -X POST -d '{"topic": "/topic/frontend", "message": "测试 API 端点" }' -H 'Content-Type: application/json' localhost:8080/api/notification
WebSocket 调试器工具应具有如下所示的输出:
WebSocket 调试器工具的输出屏幕截图
这表明我们已经成功地为 WebSocket 服务器配置了 Redis Pub/Sub,以实现后端微服务和 Web 应用程序(前端)之间的可扩展单向实时通信。
第 5 步:实施 Pub/Sub 与消费者组的双向实时通信
使用 Pub/Sub 和消费者组的双向实时通信设计
在第 5 步中,我们将使用 Redis Streams 作为我们的 Pub/Sub 系统,用于后端微服务和 Web 应用程序(前端)之间的双向实时通信。我们没有使用 Redis Pub/Sub,因为它不支持消费者组的概念。
步骤 5.1:创建 StreamDataEvent 类
StreamDataEvent是订阅者和发布者之间数据交换的自定义对象。message是字符串格式的实际消息,是topicWebSocket 服务器知道要将消息发送到哪个 STOMP 目标的必填字段。
data class StreamDataEvent ( @JsonProperty("message") val message: String, @JsonProperty("topic") val topic: String? = null,)
步骤 5.2:WebSocket 服务器——实现 Redis 流消费者
注意:不需要广播消息,因为所有 WebSocket 服务器实例都会从 Redis Streams 接收消息。
@Serviceclass RedisStreamConsumer( private val websocketTemplate: SimpMessagingTemplate): StreamListener<String, ObjectRecord<String, StreamDataEvent>> { companion object { private val logger = LoggerFactory.getLogger(RedisStreamConsumer::class.java) } override fun onMessage(record: ObjectRecord<String, StreamDataEvent>) { logger.info("[NEW] --> received message: ${record.value} from stream: ${record.stream}") record.value.topic?.let { destination -> websocketTemplate.convertAndSend(destination, record.value.message) } }}
步骤 5.3:WebSocket 服务器——实现 Redis 流配置
以下代码包含订阅 Redis 流的配置,其中消息将由RedisStreamConsumer我们在步骤 5.2 中配置的处理。
在这里,我们将 WebSocket 服务器配置为侦听由 key 标识的流TEST_EVENT_TO_WEBSOCKET_SERVER。您可以根据您的用例创建更多订阅。
@Configurationclass RedisStreamConfig(private val streamListener: StreamListener<String, ObjectRecord<String, StreamDataEvent>>) { private fun initListenerContainer(redisConnectionFactory: RedisConnectionFactory): StreamMessageListenerContainer<String, ObjectRecord<String, StreamDataEvent>> { val options = StreamMessageListenerContainer .StreamMessageListenerContainerOptions .builder() .pollTimeout(Duration.ofSeconds(1)) .targetType(StreamDataEvent::class.java) .build() return StreamMessageListenerContainer.create(redisConnectionFactory, options) } @Bean("TestEventSubscription") fun subscription(redisConnectionFactory: RedisConnectionFactory): Subscription { val listenerContainer = initListenerContainer(redisConnectionFactory) val subscription = listenerContainer.receive(StreamOffset.latest("TEST_EVENT_TO_WEBSOCKET_SERVER"), streamListener) listenerContainer.start() return subscription }}
步骤 5.4:WebSocket 服务器——实现 Redis 流生产者
步骤 5.5 WebSocket 服务器——实现 WebSocket 配置
注意:不需要将消息广播到所有 WebSocket 实例,因为发布到 Redis Streams 已经确保所有后端微服务都接收到消息。有关详细信息,请参阅步骤 5 中的图表。
步骤 5.6:后端微服务——实现 Redis 流消费者
同样,在示例后端微服务中,实现 Redis 流消费者。
@Serviceclass RedisStreamConsumer: StreamListener<String, ObjectRecord<String, StreamDataEvent>> { companion object { private val logger = LoggerFactory.getLogger(RedisStreamConsumer::class.java) } override fun onMessage(record: ObjectRecord<String, StreamDataEvent>?) { logger.info("[NEW] --> received message: ${record?.value} from stream: ${record?.stream}") }}
步骤 5.6:后端微服务——实现 Redis 流配置
这里的配置与 WebSocket 服务器的配置类似。唯一的区别是我们添加了消费者组 ( CONSUMER_GROUP),它确保只有一个后端微服务实例会使用来自 Redis 流的数据。
注意:也可以使用代码来实现这一点,但我将使用 Redis CLI 命令来保持简单。
步骤 5.7:后端微服务——实现 Redis 流生产者
生产者配置类似于 WebSocket 服务器配置。
请注意,微服务有一个定期发布到 Redis 流的计划作业,并且/topic/to-frontend作为我们示例的一部分,该消息经过精心设计以发送到目标主题的 Web 应用程序(前端)。
@Serviceclass RedisStreamProducer( private val reactiveRedisTemplate: ReactiveRedisTemplate<String, String>, @Value("\${spring.application.name}") private val applicationName: String,) { companion object { private val atomicInteger = AtomicInteger(0) private val logger = LoggerFactory.getLogger(RedisStreamProducer::class.java) } fun publishEvent(streamTopic: String, data: StreamDataEvent) { val record = StreamRecords.newRecord().ofObject(data).withStreamKey(streamTopic) reactiveRedisTemplate.opsForStream<String, String>().add(record).subscribe() } @Scheduled(initialDelay = 10000, fixedRate = 5000) fun publishTestMessageToBackend() { val data = StreamDataEvent( topic = "/topic/to-frontend", message = "New Message from $applicationName -- ID = ${atomicInteger.incrementAndGet()}" ) logger.info("Publishing Message: $data to Stream: TEST_EVENT_TO_WEBSOCKET_SERVER") publishEvent("TEST_EVENT_TO_WEBSOCKET_SERVER", data) }}
步骤 5.8:通过 Pub/Sub 测试双向实时通信
我们已经配置了 WebSocket 服务器和示例后端微服务。让我们使用我们在两者中进行的计划数据发布配置来测试来自 Redis 流的数据的发布和订阅RedisStreamProducer。
启动 WebSocket 服务器的两个实例和示例后端微服务的两个实例。您应该注意到输出日志与下面的类似。
Figure 6 后端微服务的输出日志(实例 A)
Figure 7 后端微服务的输出日志(实例 B)
Figure 8 WebSocket 服务器的输出日志(实例 A)
Figure 9 WebSocket 的输出日志(实例 B)
如果您要使用WebSocket 调试器工具连接到 WebSocket 服务器并订阅主题/topic/to-frontend,您应该会看到以下日志:
Figure 10 WebSocket 调试器工具的输出日志(前端)
这表明我们已经成功地为 WebSocket 服务器配置了 Redis Streams,以实现后端微服务和 Web 应用程序(前端)之间的可扩展双向实时通信。
用户评论
这个游戏开发技术听起来真的很前沿啊!Spring Boot和WebSocket的结合简直太赞了。
有20位网友表示赞同!
之前都没见过redis用在WebSocket扩展上,真是学到了新的知识。
有7位网友表示赞同!
使用Streams进行数据流管理,这样处理大量连接应该更流畅吧。
有11位网友表示赞同!
听说游戏性能很重要,这个配置估计能提升很多。
有6位网友表示赞同!
Pub/Sub模式的应用很合理,可以实现即时消息推送。
有16位网友表示赞同!
Spring Boot真的是开发神器,简化了太多工作流程。
有7位网友表示赞同!
WebSocket服务器水平扩展的关键点在这呢,学起来。
有9位网友表示赞同!
Redis作为中间件在这个项目中的角色确实重要。
有6位网友表示赞同!
这样的技术栈,应该能满足大多数大规模游戏的开发了。
有5位网友表示赞同!
对游戏后端开发有兴趣的朋友肯定不能错过这个教程。
有15位网友表示赞同!
了解到了Streams的用武之地,太实用了。
有13位网友表示赞同!
Redis Pub/Sub在游戏中的应用让我眼前一亮。
有5位网友表示赞同!
对于喜欢研究技术的我来说,这样的课程非常有吸引力。
有15位网友表示赞同!
从Zero到Hero的过程,一步步学习WebSocket服务器水平扩展。
有19位网友表示赞同!
这个项目的配置步骤清晰明了,实操性很高。
有15位网友表示赞同!
感觉通过这个游戏项目可以学到很多关于系统架构的知识。
有13位网友表示赞同!
Spring Boot的微服务架构在游戏开发中很有前景。
有12位网友表示赞同!
学习了这些技术后,自己尝试做一个小型游戏都不成问题吧。
有11位网友表示赞同!
这样的水平扩展方案对线上游戏的优化很关键。
有5位网友表示赞同!
Redis Pub/Sub的应用可以大大提高游戏的响应速度。
有14位网友表示赞同!