第 2 部分:添加持久性和集成测试
在这一部分中,我们将使用 Spring Data JDBC 和 H2 作为数据库来实现接口的持久版本。我们将介绍以下类:MessageService
-
PersistentMessageService
– 接口的实现,它将通过 Spring 数据存储库 API 与真实数据存储进行交互。MessageService
-
MessageRepository
– 使用的存储库实现MessageService.
添加新依赖项
首先,我们必须将所需的依赖项添加到项目中。为此,我们需要将以下行添加到“build.gradle.kts”文件中的块中:dependencies
implementation("org.springframework.boot:spring-boot-starter-data-jdbc")
runtimeOnly("com.h2database:h2")
⚠️ 请注意,在这个例子中,我们使用一种轻量级和直接的方式来在 Spring Framework 中使用 JDBC。如果您希望查看 JPA 用法的示例,请参阅以下内容spring-data-jdbc
博客文章.
⚠️ 要刷新项目依赖项列表,请单击编辑器右上角显示的小象图标。
创建数据库架构和配置
添加并解析依赖项后,我们可以开始对数据库架构进行建模。由于这是一个演示项目,我们不会设计任何复杂的东西,我们将坚持以下结构:
CREATE TABLE IF NOT EXISTS messages (
id VARCHAR(60) DEFAULT RANDOM_UUID() PRIMARY KEY,
content VARCHAR NOT NULL,
content_type VARCHAR(128) NOT NULL,
sent TIMESTAMP NOT NULL,
username VARCHAR(60) NOT NULL,
user_avatar_image_link VARCHAR(256) NOT NULL
);
⌨️ 创建一个在目录中调用的新文件夹。然后将上面的SQL代码放入文件中。sql
src/main/resources
src/main/resources/sql/schema.sql
此外,您应该进行修改,使其包含以下属性:application.properties
spring.datasource.schema=classpath:sql/schema.sql
spring.datasource.url=jdbc:h2:file:./build/data/testdb
spring.datasource.driverClassName=org.h2.Driver
spring.datasource.username=sa
spring.datasource.password=password
spring.datasource.initialization-mode=always
使用数据
使用 Spring Data,上面提到的表格可以使用以下域类来表示,这些域类应该放在 'src/main/kotlin/com/example/kotlin/chat/repository/DomainModel.kt '文件中:
import org.springframework.data.annotation.Id
import org.springframework.data.relational.core.mapping.Table
import java.time.Instant
@Table("MESSAGES")
data class Message(
val content: String,
val contentType: ContentType,
val sent: Instant,
val username: String,
val userAvatarImageLink: String,
@Id var id: String? = null)
enum class ContentType {
PLAIN
}
这里有一些事情需要解释。字段如 、 和 镜像类。但是,为了减少表的数量并简化最终的关系结构,我们展平了对象并使其字段成为类的一部分。除此之外,还有一个名为 的新额外字段,它指示存储消息的内容类型。由于大多数新式聊天支持不同的标记语言,因此通常支持不同的消息内容编码。起初我们只支持文本,但稍后我们也将扩展到支持该类型。content
sent
id
MessageVM
User
Message
contentType
PLAIN
ContentType
MARKDOWN
一旦我们将表表示为一个类,我们就可以通过 .Repository
⌨️ 放入文件夹。MessageRepository.kt
src/main/kotlin/com/example/kotlin/chat/repository
import org.springframework.data.jdbc.repository.query.Query
import org.springframework.data.repository.CrudRepository
import org.springframework.data.repository.query.Param
interface MessageRepository : CrudRepository<Message, String> {
// language=SQL
@Query("""
SELECT * FROM (
SELECT * FROM MESSAGES
ORDER BY "SENT" DESC
LIMIT 10
) ORDER BY "SENT"
""")
fun findLatest(): List<Message>
// language=SQL
@Query("""
SELECT * FROM (
SELECT * FROM MESSAGES
WHERE SENT > (SELECT SENT FROM MESSAGES WHERE ID = :id)
ORDER BY "SENT" DESC
) ORDER BY "SENT"
""")
fun findLatest(@Param("id") id: String): List<Message>
}
我们扩展了一个普通方法,并提供了两种不同的自定义查询方法,用于检索最新消息和检索与特定消息 ID 关联的消息。MessageRepository
CrudRepository
💡 你有没有注意到多行字符串用于以可读格式表示 SQL 查询?Kotlin 为 String 提供了一组有用的补充。您可以在 Kotlin 语言中了解有关这些新增内容的更多信息文档
我们的下一步是实现与类集成的类。MessageService
MessageRepository
⌨️ 将类放入文件夹中,替换以前的实现。PersistentMessageService
src/main/kotlin/com/example/kotlin/chat/service
FakeMessageService
package com.example.kotlin.chat.service
import com.example.kotlin.chat.repository.ContentType
import com.example.kotlin.chat.repository.Message
import com.example.kotlin.chat.repository.MessageRepository
import org.springframework.context.annotation.Primary
import org.springframework.stereotype.Service
import java.net.URL
@Service
@Primary
class PersistentMessageService(val messageRepository: MessageRepository) : MessageService {
override fun latest(): List<MessageVM> =
messageRepository.findLatest()
.map { with(it) { MessageVM(content, UserVM(username,
URL(userAvatarImageLink)), sent, id) } }
override fun after(lastMessageId: String): List<MessageVM> =
messageRepository.findLatest(lastMessageId)
.map { with(it) { MessageVM(content, UserVM(username,
URL(userAvatarImageLink)), sent, id) } }
override fun post(message: MessageVM) {
messageRepository.save(
with(message) { Message(content, ContentType.PLAIN, sent,
user.name, user.avatarImageLink.toString()) }
)
}
}
PersistentMessageService
是 的薄层,因为这里我们只是在做一些简单的对象映射。所有业务查询都发生在该级别。另一方面,这种实现的简单性是 Kotlin 语言的优点,它提供了扩展函数,如 和 。MessageRepository
Repository
map
with
如果我们现在启动该应用程序,我们将再次看到一个空的聊天页面。但是,如果我们在文本输入中输入一条消息并发送它,我们将在几分钟后看到它出现在屏幕上。如果我们打开一个新的浏览器页面,我们将再次看到此消息作为消息历史记录的一部分。
最后,我们可以编写一些集成测试,以确保我们的代码随着时间的推移继续正常工作。
添加集成测试
首先,我们必须修改文件并添加我们需要在测试中使用的字段:ChatKotlinApplicationTests
/src/test
import com.example.kotlin.chat.repository.ContentType
import com.example.kotlin.chat.repository.Message
import com.example.kotlin.chat.repository.MessageRepository
import com.example.kotlin.chat.service.MessageVM
import com.example.kotlin.chat.service.UserVM
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.boot.test.web.client.TestRestTemplate
import org.springframework.boot.test.web.client.postForEntity
import org.springframework.core.ParameterizedTypeReference
import org.springframework.http.HttpMethod
import org.springframework.http.RequestEntity
import java.net.URI
import java.net.URL
import java.time.Instant
import java.time.temporal.ChronoUnit.MILLIS
@SpringBootTest(
webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
properties = [
"spring.datasource.url=jdbc:h2:mem:testdb"
]
)
class ChatKotlinApplicationTests {
@Autowired
lateinit var client: TestRestTemplate
@Autowired
lateinit var messageRepository: MessageRepository
lateinit var lastMessageId: String
val now: Instant = Instant.now()
}
我们使用后期关键字,非常适合必须延迟非空字段初始化的情况。在我们的例子中,我们将其用于现场并解决.@Autowire
MessageRepository
TestRestTemplate
为简单起见,我们将测试三种一般情况:
- 在不可用时解析消息。
lastMessageId
- 解析消息(如果存在)。
lastMessageId
- 并发送消息。
为了测试消息解析,我们必须准备一些测试消息,并在每种情况完成后清理存储。将以下内容添加到:ChatKotlinApplicationTests
@BeforeEach
fun setUp() {
val secondBeforeNow = now.minusSeconds(1)
val twoSecondBeforeNow = now.minusSeconds(2)
val savedMessages = messageRepository.saveAll(listOf(
Message(
"*testMessage*",
ContentType.PLAIN,
twoSecondBeforeNow,
"test",
"http://test.com"
),
Message(
"**testMessage2**",
ContentType.PLAIN,
secondBeforeNow,
"test1",
"http://test.com"
),
Message(
"`testMessage3`",
ContentType.PLAIN,
now,
"test2",
"http://test.com"
)
))
lastMessageId = savedMessages.first().id ?: ""
}
@AfterEach
fun tearDown() {
messageRepository.deleteAll()
}
准备完成后,我们可以创建第一个消息检索测试用例:
@ParameterizedTest
@ValueSource(booleans = [true, false])
fun `test that messages API returns latest messages`(withLastMessageId: Boolean) {
val messages: List<MessageVM>? = client.exchange(
RequestEntity<Any>(
HttpMethod.GET,
URI("/api/v1/messages?lastMessageId=${if (withLastMessageId) lastMessageId else ""}")
),
object : ParameterizedTypeReference<List<MessageVM>>() {}).body
if (!withLastMessageId) {
assertThat(messages?.map { with(it) { copy(id = null, sent = sent.truncatedTo(MILLIS))}})
.first()
.isEqualTo(MessageVM(
"*testMessage*",
UserVM("test", URL("http://test.com")),
now.minusSeconds(2).truncatedTo(MILLIS)
))
}
assertThat(messages?.map { with(it) { copy(id = null, sent = sent.truncatedTo(MILLIS))}})
.containsSubsequence(
MessageVM(
"**testMessage2**",
UserVM("test1", URL("http://test.com")),
now.minusSeconds(1).truncatedTo(MILLIS)
),
MessageVM(
"`testMessage3`",
UserVM("test2", URL("http://test.com")),
now.truncatedTo(MILLIS)
)
)
}
💡 所有数据类都有一个copy方法,允许您创建实例的完整副本,同时根据需要自定义某些字段。这在我们的例子中非常有用,因为我们希望将发送的消息时间截断为相同的时间单位,以便我们可以比较时间戳。
💡 Kotlin 对字符串模板是测试的绝佳补充。
一旦我们实现了这个测试,我们必须实现的最后一部分是消息发布测试。将以下代码添加到:ChatKotlinApplicationTests
@Test
fun `test that messages posted to the API is stored`() {
client.postForEntity<Any>(
URI("/api/v1/messages"),
MessageVM(
"`HelloWorld`",
UserVM("test", URL("http://test.com")),
now.plusSeconds(1)
)
)
messageRepository.findAll()
.first { it.content.contains("HelloWorld") }
.apply {
assertThat(this.copy(id = null, sent = sent.truncatedTo(MILLIS)))
.isEqualTo(Message(
"`HelloWorld`",
ContentType.PLAIN,
now.plusSeconds(1).truncatedTo(MILLIS),
"test",
"http://test.com"
))
}
}
💡 在测试中使用带有反引号括起来的空格的函数名称是可以接受的。查看相关文档.
上面的测试看起来与上一个测试相似,只是我们检查发布的消息是否存储在数据库中。在这个例子中,我们可以看到runscope 函数,这使得可以将调用范围内的目标对象用作 。this
一旦我们实施了所有这些测试,我们就可以运行它们并查看它们是否通过。
在此阶段,我们向聊天应用程序添加了消息持久性。现在可以将消息传递到连接到应用程序的所有活动客户端。此外,我们现在可以访问历史数据,因此每个人都可以在需要时阅读以前的消息。
这个实现可能看起来很完整,但我们编写的代码还有一些改进的余地。因此,我们将在下一步中看到如何使用 Kotlin 扩展改进我们的代码。
第 3 部分:实现扩展
在这一部分中,我们将实现扩展函数以减少一些位置的代码重复量。
例如,您可能会注意到 <-→ 转换当前显式发生在 .我们可能还希望通过添加对 Markdown 的支持来扩展对不同内容类型的支持。Message
MessageVM
PersistableMessageService
首先,我们为 和 创建扩展方法。新方法实现了从 到 的转换逻辑,反之亦然:Message
MessageVM
Message
MessageVM
import com.example.kotlin.chat.repository.ContentType
import com.example.kotlin.chat.repository.Message
import com.example.kotlin.chat.service.MessageVM
import com.example.kotlin.chat.service.UserVM
import java.net.URL
fun MessageVM.asDomainObject(contentType: ContentType = ContentType.PLAIN): Message = Message(
content,
contentType,
sent,
user.name,
user.avatarImageLink.toString(),
id
)
fun Message.asViewModel(): MessageVM = MessageVM(
content,
UserVM(username, URL(userAvatarImageLink)),
sent,
id
)
⌨️ 我们将上述函数存储在文件中。src/main/kotlin/com/example/kotlin/chat/Extensions.kt
现在我们有了 的扩展方法和转换,我们可以在 :MessageVM
Message
PersistentMessageService
@Service
class PersistentMessageService(val messageRepository: MessageRepository) : MessageService {
override fun latest(): List<MessageVM> =
messageRepository.findLatest()
.map { it.asViewModel() }
override fun after(lastMessageId: String): List<MessageVM> =
messageRepository.findLatest(lastMessageId)
.map { it.asViewModel() }
override fun post(message: MessageVM) {
messageRepository.save(message.asDomainObject())
}
}
上面的代码比以前更好。它更简洁,读起来更好。但是,我们可以进一步改进。如我们所见,我们对具有特定泛型类型的 a 使用相同的函数。将以下行添加到文件中:map()`operators with the same function mapper twice. In fact, we can improve that by adding a custom `map
List
Extensions.kt
fun List<Message>.mapToViewModel(): List<MessageVM> = map { it.asViewModel() }
包含此行后,Kotlin 将向泛型类型对应于指定泛型类型的任何方法提供上述扩展方法:List
@Service
class PersistentMessageService(val messageRepository: MessageRepository) : MessageService {
override fun latest(): List<MessageVM> =
messageRepository.findLatest()
.mapToViewModel() // now we can use the mentioned extension on List<Message>
override fun after(lastMessageId: String): List<MessageVM> =
messageRepository.findLatest(lastMessageId)
.mapToViewModel()
//...
}
⚠️ 请注意,不能对具有不同泛型类型的同一类使用相同的扩展名。原因是键入擦除,这意味着在运行时,两个类将使用相同的方法,并且无法猜测应该调用哪个类。
应用所有扩展后,我们可以执行类似的技巧,并声明支持扩展以在测试类中使用。将以下内容放入文件中src/test/kotlin/com/example/kotlin/chat/TestExtensions.kt
import com.example.kotlin.chat.repository.Message
import com.example.kotlin.chat.service.MessageVM
import java.time.temporal.ChronoUnit.MILLIS
fun MessageVM.prepareForTesting() = copy(id = null, sent = sent.truncatedTo(MILLIS))
fun Message.prepareForTesting() = copy(id = null, sent = sent.truncatedTo(MILLIS))
现在,我们可以继续前进并实现对内容类型的支持。首先,我们需要添加用于 Markdown 内容渲染的实用程序。为此,我们可以添加一个MARKDOWN
官方降价库从 JetBrains 到文件:build.gradle.kts
dependencies {
...
implementation("org.jetbrains:markdown:0.2.2")
...
}
由于我们已经学会了如何使用扩展,让我们在文件中为枚举创建另一个扩展名,以便每个枚举值都知道如何呈现特定内容。Extensions.kt
ContentType
fun ContentType.render(content: String): String = when (this) {
ContentType.PLAIN -> content
}
在上面的示例中,我们使用when表达式,在 Kotlin 中提供模式匹配。如果用作表达式,则分支是必需的。但是,如果表达式与穷举值一起使用(例如 具有恒定数量的结果或定义的子类数量),则不需要分支。上面的例子正是我们在编译时知道所有可能的结果(并且所有结果都已处理)的情况之一,因此我们不必指定分支。when
else
when
enum
sealed classes
else
else
现在我们知道了表达式的工作原理,让我们最后向枚举添加第二个选项:when
ContentType
enum class ContentType {
PLAIN, MARKDOWN
}
表达的力量伴随着对详尽无遗的强烈要求。每当添加新值时,我们都必须在将软件推送到生产环境之前修复编译问题:when
enum
fun ContentType.render(content: String): String = when (this) {
ContentType.PLAIN -> content
ContentType.MARKDOWN -> {
val flavour = CommonMarkFlavourDescriptor()
HtmlGenerator(content, MarkdownParser(flavour).buildMarkdownTreeFromString(content),
flavour).generateHtml()
}
}
一旦我们修复了支持新方法的方法,我们就可以修改和扩展方法以启用该类型的使用并相应地呈现其内容:render
ContentType
Message
MessageVM
MARKDOWN
fun MessageVM.asDomainObject(contentType: ContentType = ContentType.MARKDOWN): Message = Message(
content,
contentType,
sent,
user.name,
user.avatarImageLink.toString(),
id
)
fun Message.asViewModel(): MessageVM = MessageVM(
contentType.render(content),
UserVM(username, URL(userAvatarImageLink)),
sent,
id
)
我们还需要修改测试以确保内容类型正确呈现。为此,我们必须更改和更改以下内容:MARKDOWN
ChatKotlinApplicationTests.kt
@BeforeEach
fun setUp() {
//...
Message(
"*testMessage*",
ContentType.PLAIN,
twoSecondBeforeNow,
"test",
"http://test.com"
),
Message(
"**testMessage2**",
ContentType.MARKDOWN,
secondBeforeNow,
"test1",
"http://test.com"
),
Message(
"`testMessage3`",
ContentType.MARKDOWN,
now,
"test2",
"http://test.com"
)
//...
}
@ParameterizedTest
@ValueSource(booleans = [true, false])
fun `test that messages API returns latest messages`(withLastMessageId: Boolean) {
//...
assertThat(messages?.map { it.prepareForTesting() })
.containsSubsequence(
MessageVM(
"<body><p><strong>testMessage2</strong></p></body>",
UserVM("test1", URL("http://test.com")),
now.minusSeconds(1).truncatedTo(MILLIS)
),
MessageVM(
"<body><p><code>testMessage3</code></p></body>",
UserVM("test2", URL("http://test.com")),
now.truncatedTo(MILLIS)
)
)
}
@Test
fun `test that messages posted to the API are stored`() {
//...
messageRepository.findAll()
.first { it.content.contains("HelloWorld") }
.apply {
assertThat(this.prepareForTesting())
.isEqualTo(Message(
"`HelloWorld`",
ContentType.MARKDOWN,
now.plusSeconds(1).truncatedTo(MILLIS),
"test",
"http://test.com"
))
}
}
完成此操作后,我们将看到所有测试仍在通过,并且具有内容类型的消息将按预期呈现。MARKDOWN
在此步骤中,我们学习了如何使用扩展来提高代码质量。我们还学习了该表达式以及它在添加新业务功能时如何减少人为错误。when
第 4 部分:使用 Kotlin 协程重构到 Spring WebFlux
在本教程的这一部分中,我们将修改代码库以添加对协程.
本质上,协程是轻量级线程,可以以命令式方式表达异步代码。这解决了各种问题与上面用于实现相同效果的回调(观察者)模式相关联。
⚠️ 在本教程中,我们不会仔细研究协程和标准 kotlinx.coroutines 库。要了解有关协程及其功能的更多信息,请查看以下内容教程.
添加协程
要开始使用 Kotlin 协程,我们必须将三个额外的库添加到:build.gradle.kts
dependencies {
...
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactive")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor")
...
}
添加依赖项后,我们可以开始使用与主协程相关的关键字:。关键字指示正在调用的函数是异步函数。与其他通过 or 关键字公开类似概念的语言不同,该函数必须在协程上下文中处理,该上下文可以是另一个函数或显式协程suspend
suspend
async
await
suspend
suspend
Job使用CoroutineScope.launch或runBlocking功能。
因此,作为将协程引入项目的第一步,我们将向项目的所有控制器和服务方法添加关键字。例如,修改后,界面应如下所示:suspend
MessageService
interface MessageService {
suspend fun latest(): List<MessageVM>
suspend fun after(lastMessageId: String): List<MessageVM>
suspend fun post(message: MessageVM)
}
上面的更改也会影响代码中使用的位置。中的所有函数都必须通过添加关键字进行相应的更新。MessageService
PersistentMessageService
suspend
@Service
class PersistentMessageService(val messageRepository: MessageRepository) : MessageService {
override suspend fun latest(): List<MessageVM> =
messageRepository.findLatest()
.mapToViewModel()
override suspend fun after(messageId: String): List<MessageVM> =
messageRepository.findLatest(messageId)
.mapToViewModel()
override suspend fun post(message: MessageVM) {
messageRepository.save(message.asDomainObject())
}
}
请求处理程序和 也必须进行调整:HtmlController
MessageResource
// src/main/kotlin/com/example/kotlin/chat/controller/HtmlController.kt
@Controller
class HtmlController(val messageService: MessageService) {
@GetMapping("/")
suspend fun index(model: Model): String {
//...
}
}
// src/main/kotlin/com/example/kotlin/chat/controller/MessageResource.kt
@RestController
@RequestMapping("/api/v1/messages")
class MessageResource(val messageService: MessageService) {
@GetMapping
suspend fun latest(@RequestParam(value = "lastMessageId", defaultValue = "") lastMessageId: String): ResponseEntity<List<MessageVM>> {
//...
}
@PostMapping
suspend fun post(@RequestBody message: MessageVM) {
//...
}
}
我们已经准备好了迁移到反应式 Spring 堆栈的代码,Spring WebFlux.继续阅读!
添加 WebFlux 和 R2DBC
尽管在大多数情况下添加依赖项就足够了,但要与 Spring 框架正确集成,我们需要替换 Web 和数据库模块:org.jetbrains.kotlinx:kotlinx-coroutines-core
dependencies {
...
implementation("org.springframework.boot:spring-boot-starter-web")
implementation("org.springframework.boot:spring-boot-starter-data-jdbc")
...
}
与以下内容:
dependencies {
...
implementation("org.springframework.boot:spring-boot-starter-webflux")
implementation("org.springframework.boot:spring-boot-starter-data-r2dbc")
implementation("io.r2dbc:r2dbc-h2")
...
}
通过添加上述依赖项,我们替换了标准阻塞网络MVC具有完全反应和无阻塞网络通量.此外,JDBC 被完全反应和非阻塞所取代R2DBC.
感谢所有 Spring Framework 工程师的辛勤工作,从 Spring Web MVC 迁移到 Spring WebFlux 是无缝的,我们根本不需要重写任何东西!但是,对于R2DBC,我们还有一些额外的步骤。首先,我们需要添加一个配置类。
⌨️ 我们将此类放入文件,即应用程序的方法所在的位置。com/example/kotlin/chat/ChatKotlinApplication.kt
main()
@Configuration
class Config {
@Bean
fun initializer(connectionFactory: ConnectionFactory): ConnectionFactoryInitializer {
val initializer = ConnectionFactoryInitializer()
initializer.setConnectionFactory(connectionFactory)
val populator = CompositeDatabasePopulator()
populator.addPopulators(ResourceDatabasePopulator(ClassPathResource("./sql/schema.sql")))
initializer.setDatabasePopulator(populator)
return initializer
}
}
上述配置可确保在应用程序启动时初始化表的架构。
接下来,我们需要修改 中的属性以仅包含一个属性:application.properties
spring.r2dbc.url=r2dbc:h2:file:///./build/data/testdb;USER=sa;PASSWORD=password
一旦我们做了一些与配置相关的基本更改,我们将执行从Spring Data JDBC到Spring Data R2DBC的迁移。为此,我们需要更新 MessageRepository 接口以派生并用关键字标记其方法。我们这样做如下:CoroutineCrudRepository
suspend
interface MessageRepository : CoroutineCrudRepository<Message, String> {
// language=SQL
@Query("""
SELECT * FROM (
SELECT * FROM MESSAGES
ORDER BY "SENT" DESC
LIMIT 10
) ORDER BY "SENT"
""")
suspend fun findLatest(): List<Message>
// language=SQL
@Query("""
SELECT * FROM (
SELECT * FROM MESSAGES
WHERE SENT > (SELECT SENT FROM MESSAGES WHERE ID = :id)
ORDER BY "SENT" DESC
) ORDER BY "SENT"
""")
suspend fun findLatest(@Param("id") id: String): List<Message>
}
的所有方法在设计时都考虑到了 Kotlin 协程。CoroutineCrudRepository
⚠️ 请注意,注释现在位于不同的包中,因此应按如下方式导入:@Query
import org.springframework.data.r2dbc.repository.Query
在此阶段,这些更改应该足以使应用程序异步且无阻塞。重新运行应用程序后,从功能角度来看,不应有任何更改,但执行现在将是异步和非阻塞的。
最后,我们还需要在测试中应用更多修复程序。由于我们现在是异步的,我们需要更改数据源 URL 并在协程上下文中运行所有相关操作,如下所示(在文件中):MessageRepository
runBlocking
ChatKotlinApplicationTests.kt
// ...
// new imports
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.runBlocking
@SpringBootTest(
webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
properties = [
"spring.r2dbc.url=r2dbc:h2:mem:///testdb;USER=sa;PASSWORD=password"
]
)
class ChatKotlinApplicationTests {
//...
@BeforeEach
fun setUp() {
runBlocking {
//...
}
}
@AfterEach
fun tearDown() {
runBlocking {
//...
}
}
//...
@Test
fun `test that messages posted to the API is stored`() {
runBlocking {
//...
}
}
}
我们的应用程序现在是异步和非阻塞的。但它仍然使用轮询将消息从后端传递到 UI。在下一部分中,我们将修改应用程序以使用 RSocket 将消息流式传输到所有连接的客户端。
第 5 部分:使用 RSocket 进行流式传输
我们将使用RSocket将消息传递转换为类似流式处理的方法。
RSocket 是一种二进制协议,用于字节流传输,如 TCP 和 WebSockets。该 API 适用于各种编程语言,包括科特林.但是,在我们的示例中,我们不需要直接使用 API。相反,我们将使用春季消息传递,它与 RSocket 集成,并提供一种方便的基于注释的配置方法。
要开始使用 RSocket 和 Spring,我们需要添加并导入一个新的依赖项:build.gradle.kts
dependencies {
....
implementation("org.springframework.boot:spring-boot-starter-rsocket")
....
}
接下来,我们将更新以返回通过而不是“List”公开的异步消息流。MessageRepository
Flow<Messages>
interface MessageRepository : CoroutineCrudRepository<Message, String> {
//...
fun findLatest(): Flow<Message>
//...
fun findLatest(@Param("id") id: String): Flow<Message>
}
我们需要对界面进行类似的更改,以准备流式传输。我们不再需要关键字。相反,我们将使用表示异步数据流的接口。任何产生结果的函数现在都会生成一个。post 方法也将接收该类型作为参数。MessageService
suspend
Flow
List
Flow
Flow
import kotlinx.coroutines.flow.Flow
interface MessageService {
fun latest(): Flow<MessageVM>
fun after(messageId: String): Flow<MessageVM>
fun stream(): Flow<MessageVM>
suspend fun post(messages: Flow<MessageVM>)
}
现在我们可以连接点并更新类以集成上述更改。PersistentMessageService
import com.example.kotlin.chat.asDomainObject
import com.example.kotlin.chat.asRendered
import com.example.kotlin.chat.mapToViewModel
import com.example.kotlin.chat.repository.MessageRepository
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.collect
import org.springframework.stereotype.Service
@Service
class PersistentMessageService(val messageRepository: MessageRepository) : MessageService {
val sender: MutableSharedFlow<MessageVM> = MutableSharedFlow()
override fun latest(): Flow<MessageVM> =
messageRepository.findLatest()
.mapToViewModel()
override fun after(messageId: String): Flow<MessageVM> =
messageRepository.findLatest(messageId)
.mapToViewModel()
override fun stream(): Flow<MessageVM> = sender
override suspend fun post(messages: Flow<MessageVM>) =
messages
.onEach { sender.emit(it.asRendered()) }
.map { it.asDomainObject() }
.let { messageRepository.saveAll(it) }
.collect()
}
首先,由于接口已经更改,我们需要在相应的实现中更新方法签名。因此,该类型现在需要该类型的文件。MessageService
mapToViewModel `extension method that we defined previously in the `Extension.kt
List
Flow
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map
fun Flow<Message>.mapToViewModel(): Flow<MessageVM> = map { it.asViewModel() }
为了更好的可读性,我们还为 MessageVM 类添加了扩展函数。在文件中:asRendered
Extensions.kt
fun MessageVM.asRendered(contentType: ContentType = ContentType.MARKDOWN): MessageVM =
this.copy(content = contentType.render(this.content))
接下来,我们将使用MutableSharedFlow从协程 API 将消息广播到连接的客户端。
通过更改,我们越来越接近所需的 UI。接下来,我们将更新和.MessageResource
HtmlController
MessageResource
获得一个全新的实现。首先,我们将使用此类通过应用注释而不是 .新方法,并通过双工通信映射到同一端点。@MessageMapping
@RequestMapping
send()
receive(),
@MessageMapping("stream")
@Controller
@MessageMapping("api.v1.messages")
class MessageResource(val messageService: MessageService) {
@MessageMapping("stream")
suspend fun receive(@Payload inboundMessages: Flow<MessageVM>) =
messageService.post(inboundMessages)
@MessageMapping("stream")
fun send(): Flow<MessageVM> = messageService
.stream()
.onStart {
emitAll(messageService.latest())
}
}
若要将消息发送到 UI,我们打开 ,由该方法实现以开始流式传输事件。当新客户端连接到服务时,它将首先从历史记录中接收消息,这要归功于作为参数提供给该方法的代码块:。然后,通道保持打开状态以流式传输新消息。stream
messageService
PersistentMessageService `class, and call the `onStart
onStart
emitAll(messageService.latest())
该类不再需要处理任何流式处理逻辑。它现在的目的是为静态页面提供服务,因此实现变得微不足道:HtmlController
@Controller
class HtmlController() {
@GetMapping("/")
fun index(): String {
// implemented in src/main/resources/templates/chatrs.html
return "chatrs"
}
}
请注意,UI 模板现在代替 .新模板包括配置 WebSocket 连接并直接与类实现的终结点交互的 JavaScript 代码。chatrs.html
chat.html
api.v1.messages.stream
MessageResource
我们需要对文件进行最后一次更改才能使 RSocket 正常工作。将以下属性添加到配置中:application.properties
spring.rsocket.server.transport=websocket
spring.rsocket.server.mapping-path=/rsocket
应用程序已准备好启动!借助 RSocket,消息现在无需轮询即可传递到聊天 UI。此外,应用程序的后端是完全异步和非阻塞的,这要归功于Spring WebFlux和Kotlin Coroutines。
本教程的最后一步是更新测试。
我们将添加一个专门用于测试的依赖项。涡轮是一个小型测试库。它通过为 kotlinx.coroutines 的接口提供一些有用的扩展来简化测试。Flow
dependencies {
...
testImplementation("app.cash.turbine:turbine:0.4.1")
...
}
库的入口点是 的扩展,它接受实现验证逻辑的代码块。扩展是一个挂起功能,在流完成或取消之前不会返回。我们稍后将研究其应用。test()
Flow<T>
test()
接下来,更新测试依赖项。我们将使用构造函数来注入依赖项,而不是通过字段自动连接。
class ChatKotlinApplicationTests {
@Autowired
lateinit var client: TestRestTemplate
@Autowired
lateinit var messageRepository: MessageRepository
class ChatKotlinApplicationTests(
@Autowired val rsocketBuilder: RSocketRequester.Builder,
@Autowired val messageRepository: MessageRepository,
@LocalServerPort val serverPort: Int
) {
我们使用而不是自 通过 RSocket 协议进行对话实现的端点。在测试中,我们需要构造一个实例并使用它来发出请求。将旧测试替换为以下新代码:RSocketRequest.Builder
TestRestTemplate
MessageResource
RSocketRequester
@ExperimentalTime
@ExperimentalCoroutinesApi
@Test
fun `test that messages API streams latest messages`() {
runBlocking {
val rSocketRequester =
rsocketBuilder.websocket(URI("ws://localhost:${serverPort}/rsocket"))
rSocketRequester
.route("api.v1.messages.stream")
.retrieveFlow<MessageVM>()
.test {
assertThat(expectItem().prepareForTesting())
.isEqualTo(
MessageVM(
"*testMessage*",
UserVM("test", URL("http://test.com")),
now.minusSeconds(2).truncatedTo(MILLIS)
)
)
assertThat(expectItem().prepareForTesting())
.isEqualTo(
MessageVM(
"<body><p><strong>testMessage2</strong></p></body>",
UserVM("test1", URL("http://test.com")),
now.minusSeconds(1).truncatedTo(MILLIS)
)
)
assertThat(expectItem().prepareForTesting())
.isEqualTo(
MessageVM(
"<body><p><code>testMessage3</code></p></body>",
UserVM("test2", URL("http://test.com")),
now.truncatedTo(MILLIS)
)
)
expectNoEvents()
launch {
rSocketRequester.route("api.v1.messages.stream")
.dataWithType(flow {
emit(
MessageVM(
"`HelloWorld`",
UserVM("test", URL("http://test.com")),
now.plusSeconds(1)
)
)
})
.retrieveFlow<Void>()
.collect()
}
assertThat(expectItem().prepareForTesting())
.isEqualTo(
MessageVM(
"<body><p><code>HelloWorld</code></p></body>",
UserVM("test", URL("http://test.com")),
now.plusSeconds(1).truncatedTo(MILLIS)
)
)
cancelAndIgnoreRemainingEvents()
}
}
}
@ExperimentalTime
@Test
fun `test that messages streamed to the API is stored`() {
runBlocking {
launch {
val rSocketRequester =
rsocketBuilder.websocket(URI("ws://localhost:${serverPort}/rsocket"))
rSocketRequester.route("api.v1.messages.stream")
.dataWithType(flow {
emit(
MessageVM(
"`HelloWorld`",
UserVM("test", URL("http://test.com")),
now.plusSeconds(1)
)
)
})
.retrieveFlow<Void>()
.collect()
}
delay(2.seconds)
messageRepository.findAll()
.first { it.content.contains("HelloWorld") }
.apply {
assertThat(this.prepareForTesting())
.isEqualTo(
Message(
"`HelloWorld`",
ContentType.MARKDOWN,
now.plusSeconds(1).truncatedTo(MILLIS),
"test",
"http://test.com"
)
)
}
}
}
总结
这是本教程的最后一部分。我们从一个简单的聊天应用程序开始,其中 UI 轮询新消息,而后端在运行数据库查询时阻塞。我们逐渐向应用程序添加了功能,并将其迁移到反应式 Spring 堆栈。后端现在是完全异步的,利用了Spring WebFlux和Kotlin协程。