帝游网提供最新手游APP下载和游戏攻略!

实现高并发WebSocket服务:Spring Boot结合Redis Pub/Sub和Streams的水平扩展策略

发布时间:2024-10-27浏览:94

使用 Spring Boot、Redis Pub/Sub 和 Redis Streams 水平扩展 WebSocket 服务器

我的 WebSocket 服务器系列

01:在微服务架构中构建 WebSocket 服务器

02:使用发布-订阅模式横向扩展 WebSocket 服务器的设计注意事项

03:使用 Spring Boot、Redis Pub/Sub 和 Redis Streams 实现可扩展的 WebSocket 服务器

快速回顾

使用发布-订阅模式在微服务架构中扩展 WebSocket 服务器的完整设计

在上一篇文章中,我们确定了水平扩展 WebSocket 服务器和后端微服务时会出现的两个问题:

问题 #1:由于负载均衡器导致的消息丢失

问题 #2:由于多个订阅者导致的重复消息处理

解决方案是将带有消费者群体概念的发布-订阅消息传递模式应用于架构设计。

开始

接下来使用 Spring Boot、Stomp、Redis Pub/Sub 和 Redis Streams 构建可扩展的 WebSocket 服务器。

第 1 步:构建 WebSocket 服务器

按照我之前文章的第 1 步和第 2 步,使用 Spring Boot 和 STOMP 消息传递协议初始化 WebSocket 服务器。

第二步:启动 Redis 服务器

如需快速设置,请使用 docker 在本地运行 Redis 服务器。

docker run --name redis -p 6379:6379 -d redis

第 3 步:配置与 Redis 服务器的连接

在WebSocket服务器的文件中添加如下配置,application.yml连接Redis服务器。

# application.yml

spring.redis:

主机: localhost

端口: 6379

第 4 步:为单向实时通信实施 Pub/Sub(广播频道)

使用 API 和 Pub/Sub(广播)的单向实时通信设计

步骤 4.1:创建 BroadcastEvent 类

BroadcastEvent是一个自定义对象,用于将消息从 WebSocket 服务器的一个实例广播到 WebSocket 服务器的所有实例。

data class BroadcastEvent( @JsonProperty("topic") val topic: String, @JsonProperty("message") val message: String): Serializable

步骤 4.2:配置 Redis Pub/Sub — ReactiveRedisTemplate

ReactiveRedisTemplate是一个帮助类,用于简化 Redis 数据访问代码。在我们的配置中,我们发布/订阅该值BroadcastEvent并Jackson2JsonRedisSerializer用于执行该值的自动序列化/反序列化。

@Configurationclass RedisConfig { @Bean fun reactiveRedisTemplate(factory: LettuceConnectionFactory): ReactiveRedisTemplate<String, BroadcastEvent> { val serializer = Jackson2JsonRedisSerializer(BroadcastEvent::class.java) val builder = RedisSerializationContext.newSerializationContext<String, BroadcastEvent>(StringRedisSerializer()) val context = builder.value(serializer).build() return ReactiveRedisTemplate(factory, context) }}

步骤 4.3:配置 Redis Pub/Sub — 广播服务

RedisBroadcastService包含发布和订阅自定义频道 ( BROADCAST-CHANNEL) 的逻辑。这是用于将消息从 WebSocket 服务器的一个实例广播到 WebSocket 服务器的所有实例的通道。

@Serviceclass RedisBroadcastService( private val reactiveRedisTemplate: ReactiveRedisTemplate<String, BroadcastEvent>, private val websocketTemplate: SimpMessagingTemplate) { fun publish(event: BroadcastEvent) { reactiveRedisTemplate.convertAndSend("BROADCAST-CHANNEL", event).subscribe() } @PostConstruct fun subscribe() { reactiveRedisTemplate.listenTo(ChannelTopic.of("BROADCAST-CHANNEL")) .map(ReactiveSubscription.Message<String, BroadcastEvent>::getMessage) .subscribe { message -> websocketTemplate.convertAndSend(message.topic, message.message) } }}

注意:@PostConstruct是一个 Spring 注释,它允许我们将自定义操作附加到 bean 创建,并且这些方法只运行一次。在我们的例子中,我们订阅了BROADCAST-CHANNELon bean 创建。

步骤 4.4:创建 API 端点

下面的代码创建一个带有 POST 请求端点的 REST 控制器,该端点接收请求正文NewMessageRequest。topic是客户端(前端)订阅的 STOMP 目的地,是message字符串格式的实际消息。

@RestController@RequestMapping("/api/notification")class NotificationController(private val redisBroadcastService: RedisBroadcastService) { @PostMapping fun newMessage(@RequestBody request: NewMessageRequest) { val event = BroadcastEvent(request.topic, request.message) redisBroadcastService.publish(event) }}

API 请求将被广播到上面步骤 4.3 中配置的所有 WebSocket 服务器实例。

步骤 4.5:通过 API 测试单向实时通信

启动 WebSocket 服务器,使用WebSocket 调试工具ws://localhost:8080/stomp通过 STOMP 协议连接到 WebSocket 服务器。连接后,配置 WebSocket 调试器工具以订阅主题。

接下来,使用以下 curl 命令向 WebSocket 服务器发送 HTTP POST 请求:

curl -X POST -d '{"topic": "/topic/frontend", "message": "测试 API 端点" }' -H 'Content-Type: application/json' localhost:8080/api/notification

WebSocket 调试器工具应具有如下所示的输出:

WebSocket 调试器工具的输出屏幕截图

这表明我们已经成功地为 WebSocket 服务器配置了 Redis Pub/Sub,以实现后端微服务和 Web 应用程序(前端)之间的可扩展单向实时通信。

第 5 步:实施 Pub/Sub 与消费者组的双向实时通信

使用 Pub/Sub 和消费者组的双向实时通信设计

在第 5 步中,我们将使用 Redis Streams 作为我们的 Pub/Sub 系统,用于后端微服务和 Web 应用程序(前端)之间的双向实时通信。我们没有使用 Redis Pub/Sub,因为它不支持消费者组的概念。

步骤 5.1:创建 StreamDataEvent 类

StreamDataEvent是订阅者和发布者之间数据交换的自定义对象。message是字符串格式的实际消息,是topicWebSocket 服务器知道要将消息发送到哪个 STOMP 目标的必填字段。

data class StreamDataEvent ( @JsonProperty("message") val message: String, @JsonProperty("topic") val topic: String? = null,)

步骤 5.2:WebSocket 服务器——实现 Redis 流消费者

注意:不需要广播消息,因为所有 WebSocket 服务器实例都会从 Redis Streams 接收消息。

@Serviceclass RedisStreamConsumer( private val websocketTemplate: SimpMessagingTemplate): StreamListener<String, ObjectRecord<String, StreamDataEvent>> { companion object { private val logger = LoggerFactory.getLogger(RedisStreamConsumer::class.java) } override fun onMessage(record: ObjectRecord<String, StreamDataEvent>) { logger.info("[NEW] --> received message: ${record.value} from stream: ${record.stream}") record.value.topic?.let { destination -> websocketTemplate.convertAndSend(destination, record.value.message) } }}

步骤 5.3:WebSocket 服务器——实现 Redis 流配置

以下代码包含订阅 Redis 流的配置,其中消息将由RedisStreamConsumer我们在步骤 5.2 中配置的处理。

在这里,我们将 WebSocket 服务器配置为侦听由 key 标识的流TEST_EVENT_TO_WEBSOCKET_SERVER。您可以根据您的用例创建更多订阅。

@Configurationclass RedisStreamConfig(private val streamListener: StreamListener<String, ObjectRecord<String, StreamDataEvent>>) { private fun initListenerContainer(redisConnectionFactory: RedisConnectionFactory): StreamMessageListenerContainer<String, ObjectRecord<String, StreamDataEvent>> { val options = StreamMessageListenerContainer .StreamMessageListenerContainerOptions .builder() .pollTimeout(Duration.ofSeconds(1)) .targetType(StreamDataEvent::class.java) .build() return StreamMessageListenerContainer.create(redisConnectionFactory, options) } @Bean("TestEventSubscription") fun subscription(redisConnectionFactory: RedisConnectionFactory): Subscription { val listenerContainer = initListenerContainer(redisConnectionFactory) val subscription = listenerContainer.receive(StreamOffset.latest("TEST_EVENT_TO_WEBSOCKET_SERVER"), streamListener) listenerContainer.start() return subscription }}

步骤 5.4:WebSocket 服务器——实现 Redis 流生产者

步骤 5.5 WebSocket 服务器——实现 WebSocket 配置

注意:不需要将消息广播到所有 WebSocket 实例,因为发布到 Redis Streams 已经确保所有后端微服务都接收到消息。有关详细信息,请参阅步骤 5 中的图表。

步骤 5.6:后端微服务——实现 Redis 流消费者

同样,在示例后端微服务中,实现 Redis 流消费者。

@Serviceclass RedisStreamConsumer: StreamListener<String, ObjectRecord<String, StreamDataEvent>> { companion object { private val logger = LoggerFactory.getLogger(RedisStreamConsumer::class.java) } override fun onMessage(record: ObjectRecord<String, StreamDataEvent>?) { logger.info("[NEW] --> received message: ${record?.value} from stream: ${record?.stream}") }}

步骤 5.6:后端微服务——实现 Redis 流配置

这里的配置与 WebSocket 服务器的配置类似。唯一的区别是我们添加了消费者组 ( CONSUMER_GROUP),它确保只有一个后端微服务实例会使用来自 Redis 流的数据。

注意:也可以使用代码来实现这一点,但我将使用 Redis CLI 命令来保持简单。

步骤 5.7:后端微服务——实现 Redis 流生产者

生产者配置类似于 WebSocket 服务器配置。

请注意,微服务有一个定期发布到 Redis 流的计划作业,并且/topic/to-frontend作为我们示例的一部分,该消息经过精心设计以发送到目标主题的 Web 应用程序(前端)。

@Serviceclass RedisStreamProducer( private val reactiveRedisTemplate: ReactiveRedisTemplate<String, String>, @Value("\${spring.application.name}") private val applicationName: String,) { companion object { private val atomicInteger = AtomicInteger(0) private val logger = LoggerFactory.getLogger(RedisStreamProducer::class.java) } fun publishEvent(streamTopic: String, data: StreamDataEvent) { val record = StreamRecords.newRecord().ofObject(data).withStreamKey(streamTopic) reactiveRedisTemplate.opsForStream<String, String>().add(record).subscribe() } @Scheduled(initialDelay = 10000, fixedRate = 5000) fun publishTestMessageToBackend() { val data = StreamDataEvent( topic = "/topic/to-frontend", message = "New Message from $applicationName -- ID = ${atomicInteger.incrementAndGet()}" ) logger.info("Publishing Message: $data to Stream: TEST_EVENT_TO_WEBSOCKET_SERVER") publishEvent("TEST_EVENT_TO_WEBSOCKET_SERVER", data) }}

步骤 5.8:通过 Pub/Sub 测试双向实时通信

我们已经配置了 WebSocket 服务器和示例后端微服务。让我们使用我们在两者中进行的计划数据发布配置来测试来自 Redis 流的数据的发布和订阅RedisStreamProducer。

启动 WebSocket 服务器的两个实例和示例后端微服务的两个实例。您应该注意到输出日志与下面的类似。

Figure 6 后端微服务的输出日志(实例 A)

Figure 7 后端微服务的输出日志(实例 B)

Figure 8 WebSocket 服务器的输出日志(实例 A)

Figure 9 WebSocket 的输出日志(实例 B)

如果您要使用WebSocket 调试器工具连接到 WebSocket 服务器并订阅主题/topic/to-frontend,您应该会看到以下日志:

Figure 10 WebSocket 调试器工具的输出日志(前端)

这表明我们已经成功地为 WebSocket 服务器配置了 Redis Streams,以实现后端微服务和 Web 应用程序(前端)之间的可扩展双向实时通信。

用户评论

浅嫣婉语

这个游戏开发技术听起来真的很前沿啊!Spring Boot和WebSocket的结合简直太赞了。

    有20位网友表示赞同!

摩天轮的依恋

之前都没见过redis用在WebSocket扩展上,真是学到了新的知识。

    有7位网友表示赞同!

纯真ブ已不复存在

使用Streams进行数据流管理,这样处理大量连接应该更流畅吧。

    有11位网友表示赞同!

ー半忧伤

听说游戏性能很重要,这个配置估计能提升很多。

    有6位网友表示赞同!

回忆未来

Pub/Sub模式的应用很合理,可以实现即时消息推送。

    有16位网友表示赞同!

滴在键盘上的泪

Spring Boot真的是开发神器,简化了太多工作流程。

    有7位网友表示赞同!

灵魂摆渡人

WebSocket服务器水平扩展的关键点在这呢,学起来。

    有9位网友表示赞同!

太易動情也是罪名

Redis作为中间件在这个项目中的角色确实重要。

    有6位网友表示赞同!

青衫故人

这样的技术栈,应该能满足大多数大规模游戏的开发了。

    有5位网友表示赞同!

命硬

对游戏后端开发有兴趣的朋友肯定不能错过这个教程。

    有15位网友表示赞同!

鹿叹

了解到了Streams的用武之地,太实用了。

    有13位网友表示赞同!

枫无痕

Redis Pub/Sub在游戏中的应用让我眼前一亮。

    有5位网友表示赞同!

七级床震

对于喜欢研究技术的我来说,这样的课程非常有吸引力。

    有15位网友表示赞同!

淡写薰衣草的香

从Zero到Hero的过程,一步步学习WebSocket服务器水平扩展。

    有19位网友表示赞同!

残留の笑颜

这个项目的配置步骤清晰明了,实操性很高。

    有15位网友表示赞同!

命该如此

感觉通过这个游戏项目可以学到很多关于系统架构的知识。

    有13位网友表示赞同!

又落空

Spring Boot的微服务架构在游戏开发中很有前景。

    有12位网友表示赞同!

£烟消云散

学习了这些技术后,自己尝试做一个小型游戏都不成问题吧。

    有11位网友表示赞同!

余笙南吟

这样的水平扩展方案对线上游戏的优化很关键。

    有5位网友表示赞同!

万象皆为过客

Redis Pub/Sub的应用可以大大提高游戏的响应速度。

    有14位网友表示赞同!

热点资讯