개발 이야기/Spring
[Spring] Spring Event 정리
제이온 (Jayon)
2022. 11. 4. 00:14
주문 프로그램을 예시로 들어 설명하겠습니다.
Spring MVC 기준
Spring 4.2 이전 Spring Event 프로그래밍 방법
- 개발 환경: Spring MVC + Spring Data JPA + Kotlin
// Domain
@Entity
@Table(name = "order_info")
data class Order (
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
var id: Long? = null,
val productName: String,
val customerName: String
)
// Repository
@Repository
interface OrderRepository : JpaRepository<Order, Long> {
}
// Application
@Service
class OrderService(val orderRepository: OrderRepository) {
@Transactional
fun save(orderRequest: OrderController.Companion.OrderRequest): OrderController.Companion.OrderResponse {
val result = orderRepository.save(
Order(
productName = orderRequest.productName,
customerName = orderRequest.customerName
)
)
sendKakao()
sendEmail()
return OrderController.Companion.OrderResponse(
id = result.id!!,
productName = result.productName,
customerName = result.customerName
)
}
private fun sendKakao() {
println("SEND KAKAO MESSAGE")
}
private fun sendEmail() {
println("SEND EMAIL")
}
fun find(id: String): OrderController.Companion.OrderResponse {
val result = orderRepository.findById(id.toLong()).orElseThrow { RuntimeException("해당 ID에 맞는 주문이 없습니다.") }
return OrderController.Companion.OrderResponse(
id = result.id!!,
productName = result.productName,
customerName = result.customerName
)
}
}
// Controller
@RestController
class OrderController(val orderService: OrderService) {
companion object {
data class OrderRequest(
val productName: String,
val customerName: String
)
data class OrderResponse(
val id: Long,
val productName: String,
val customerName: String
)
}
@ResponseStatus(HttpStatus.CREATED)
@PostMapping("/orders")
fun save(@RequestBody orderRequest: OrderRequest) : OrderResponse {
return orderService.save(orderRequest)
}
@ResponseStatus(HttpStatus.OK)
@GetMapping("/orders/{id}")
fun view(@PathVariable id: String) : OrderResponse {
return orderService.find(id)
}
}
- OrderService의 save() 메서드는 주문 정보를 저장한 뒤, 카카오 및 이메일 알림 전송하는 일을 수행하고 있습니다. 이렇게 할 경우 주문을 처리하는 로직과 메시지를 발송하는 로직이 섞이게 됩니다.
- Event 방식으로 개선해 봅시다.
// domain
data class OrderEvent(val eventSource: Any, val data: Order) : ApplicationEvent(eventSource)
// Application
@Service
class MessageService {
fun sendKakao() {
println("SEND KAKAO MESSAGE")
}
fun sendEmail() {
println("SEND EMAIL")
}
}
@Component
class OrderHandler(val messageService: MessageService) : ApplicationListener<OrderEvent> {
override fun onApplicationEvent(event: OrderEvent) {
messageService.sendKakao()
messageService.sendEmail()
}
}
@Service
class OrderService(val applicationEventPublisher: ApplicationEventPublisher, val orderRepository: OrderRepository) {
@Transactional
fun save(orderRequest: OrderController.Companion.OrderRequest): OrderController.Companion.OrderResponse {
val result = orderRepository.save(
Order(
productName = orderRequest.productName,
customerName = orderRequest.customerName
)
)
applicationEventPublisher.publishEvent(OrderEvent(this, result))
return OrderController.Companion.OrderResponse(
id = result.id!!,
productName = result.productName,
customerName = result.customerName
)
}
fun find(id: String): OrderController.Companion.OrderResponse {
val result = orderRepository.findById(id.toLong()).orElseThrow { RuntimeException("해당 ID에 맞는 주문이 없습니다.") }
return OrderController.Companion.OrderResponse(
id = result.id!!,
productName = result.productName,
customerName = result.customerName
)
}
}
- event를 위한 OrderEvent를 정의해 줍니다. 이때, ApplicationEvent를 상속 받아야 합니다.
- 메시지 알림 로직을 MessageService로 위임합니다.
- OrderHandler는 OrderEvent를 받는 이벤트 리스너입니다. ApplicationListener를 상속 받고, onApplicationEvent() 메서드를 오버라이딩하여 이벤트를 받았을 때 행동을 정의하면 됩니다.
- OrderService는 ApplicationEventPublisher를 주입 받고, 이벤트를 publishEvent() 메서드 호출하여 퍼블리싱하면 됩니다.
- DB 로직 직후, 메시지 알림이 성공한 것을 볼 수 있습니다.
Spring 4.2 이후 Spring Event 프로그래밍 방법
- 위 코드로도 이벤트를 정상적으로 퍼블리싱 및 리스닝이 가능하지만, ApplicationEvent, ApplicationListener를 상속해야 한다는 단점이 있습니다.
- 이것은 코드의 길이도 길어지고, 다중 상속이 불가능하다는 문제가 있습니다.
- 다행히 Spring 4.2 이후부터는 @EventListener를 통해 좀 더 편하게 개발할 수 있게 되었습니다.
// application
@Component
class OrderHandler(val messageService: MessageService) {
@EventListener
fun sendMessage(order: Order) {
messageService.sendKakao()
messageService.sendEmail()
}
}
@Service
class OrderService(val applicationEventPublisher: ApplicationEventPublisher, val orderRepository: OrderRepository) {
@Transactional
fun save(orderRequest: OrderController.Companion.OrderRequest): OrderController.Companion.OrderResponse {
val result = orderRepository.save(
Order(
productName = orderRequest.productName,
customerName = orderRequest.customerName
)
)
applicationEventPublisher.publishEvent(result)
return OrderController.Companion.OrderResponse(
id = result.id!!,
productName = result.productName,
customerName = result.customerName
)
}
fun find(id: String): OrderController.Companion.OrderResponse {
val result = orderRepository.findById(id.toLong()).orElseThrow { RuntimeException("해당 ID에 맞는 주문이 없습니다.") }
return OrderController.Companion.OrderResponse(
id = result.id!!,
productName = result.productName,
customerName = result.customerName
)
}
}
- OrderHandler에서 원하는 메소드를 정의하고 그 위에 @EventLister를 붙인 후, 파라미터에 리스닝할 이벤트를 정의하면 됩니다. OrderEvent에서 Order로 바뀐 것을 알 수 있습니다.
- OrderService에서는 퍼블리싱할 때 OrderEvent가 아니라 Order로 변경하면 됩니다.
개선 사항
- save 내부 로직이 동기 & 블로킹 방식으로 처리되므로 save 메서드가 주문 처리 및 메시지 발송까지 마쳐야 종료가 됩니다.
- save 로직은 어쨌든 데이터를 저장하는 것이 목적인데, 메시지 발송까지 대기해야하므로 비효율적인 측면이 있고, 메시지 발송 과정도 트랜잭션에 묶이므로 메시지 발송 실패 시 데이터 저장도 실패하게 됩니다.
동기 방식으로 트랜잭션 분리
@Component
class OrderHandler(val messageService: MessageService) {
@TransactionalEventListener
fun sendMessage(order: Order) {
messageService.sendKakao()
throw RuntimeException("test error")
messageService.sendEmail()
}
}
- 기존 @EventListener 대신 @TransactionalEventListener로 갈아 끼우기만 하면 됩니다.
- 메시지 전송 로직에 에러가 발생하더라도 트랜잭션은 성공적으로 커밋되는 것을 알 수 있습니다.
- @TransactionalEventListener는 디폴트 설정이 이전 트랜잭션이 끝나면 해당 이벤트 리스너 로직을 수행하기 때문에 가능한 것입니다. 즉, save 트랜잭션이 성공적으로 끝나야 비로소 메시지 전송 로직이 수행됩니다.
- 다만 여전히 동기 방식이므로 메시지 전송 로직을 수행할 동안 다른 작업을 할 수 없습니다.
비동기 방식으로 이벤트 처리
// application
@Component
class OrderHandler(val messageService: MessageService) {
@Async
@TransactionalEventListener
fun sendMessage(order: Order) {
messageService.sendKakao()
messageService.sendEmail()
}
}
// configuration
@EnableAsync
@Configuration
class AsyncConfiguration : AsyncConfigurer {
override fun getAsyncExecutor(): Executor {
val executor = ThreadPoolTaskExecutor()
executor.setThreadNamePrefix("async-thread-")
executor.corePoolSize = 2
executor.maxPoolSize = 100
executor.queueCapacity = 5
executor.initialize()
return executor
}
override fun getAsyncUncaughtExceptionHandler(): AsyncUncaughtExceptionHandler? {
return SimpleAsyncUncaughtExceptionHandler()
}
}
- 이벤트 리스너 코드에 @Async를 달아주고, @Async와 관련된 설정 클래스를 만들면 됩니다.
- @Async를 이벤트 리스너 코드에 달고, main이나 설정 클래스에서 @EnableAsync만 달아도 비동기적으로 돌기는 하지만, @Async가 붙은 코드는 항상 새로운 스레드를 만들어서 할당하게 됩니다. 즉, 수많은 요청이 들어왔을 때 매번 스레드를 생성하여 OutOfMemoryError가 발생할 수 있습니다.
- 따라서 @Async가 붙은 코드에 대해 새로운 스레드를 할당하되, 스레드 개수를 제한하는 스레드 풀 설정이 필요합니다. 그것이 위 AsyncConfiguration의 getAsyncExecutor()이 하는 역할입니다.
- 메시지 전송 성공 여부와 관계 없이 DB 로직은 완수하고 트랜잭션 커밋까지 날리는 모습을 볼 수 있고, 메시지 전송 로직은 async-thread prefix가 붙은 스레드가 새로 할당되어 작업하는 것을 확인할 수 있습니다.
- 만약 메시지 전송 중 에러가 발생한다고 하더라도, DB 로직은 정상적으로 트랜잭션 커밋이 실행되는 것을 알 수 있습니다. 또한, 메시지 전송 로직은 새로운 스레드가 수행하므로 다른 작업을 다른 스레드(DB 로직 수행하던 스레드)가 이어서 진행할 수 있습니다.
정리
- 대부분 Spring 4.2 이상을 사용 중일 것으로 추측되므로 @EventListener 방식을 추천하며, 동기 방식일 때는 @TransactionalListener를 상황에 맞게 적용하고, 비동기 방식을 적용할 때는 @TransactionalListener, @Async 및 스레드 풀 설정을 적용해 주면 됩니다.
- 트랜잭션을 사용하지 않는다면, @EventListener만 사용해도 무방합니다. (혹은 @Async + @EventListener)
- @EventListener vs @TransactionalListener
- @EventListener: 부모 메서드에서 예외가 발생하더라도 예외 발생 전에 이벤트가 publish되었다면 무조건 이벤트를 자식 메서드에서 listen하여 작업을 수행합니다.
- @TransactionalListener: 부모 메서드에서 예외가 발생하였다면 이벤트가 publish되지 않으므로 자식 메서드에서 이벤트를 listen하지 않습니다.
- @EventListener vs @TransactionalListener
Spring Webflux 기준
Spring 5 이후 Spring Event 프로그래밍 방법
- 개발 환경: Spring Webflux + Spring Data R2DBC(R2dbc Repository 사용) + Kotlin
// domain
@Table("order_info")
data class Order (
@Id
var id: Long? = null,
val productName: String,
val customerName: String
)
// Repository
@Repository
interface OrderRepository : R2dbcRepository<Order, Long> {
}
// Configuration
@Configuration
class R2dbcConfiguration {
@Bean
fun mySQLTransactionManager(connectionFactory: ConnectionFactory): ReactiveTransactionManager {
return R2dbcTransactionManager(connectionFactory)
}
}
@EnableAsync
@Configuration
class AsyncConfiguration : AsyncConfigurer {
override fun getAsyncExecutor(): Executor {
val executor = ThreadPoolTaskExecutor()
executor.setThreadNamePrefix("async-thread-")
executor.corePoolSize = 2
executor.maxPoolSize = 100
executor.queueCapacity = 5
executor.initialize()
return executor
}
override fun getAsyncUncaughtExceptionHandler(): AsyncUncaughtExceptionHandler? {
return SimpleAsyncUncaughtExceptionHandler()
}
}
- R2DBC Repository는 save할 때 ID가 null이면 ID를 auto-generate해 줍니다.
- R2DBC는 사용할 트랜잭션 매니저를 지정해야, 추후 @Transactional을 정상적으로 동작시킬 수 있습니다.
- @EventListener는 비동기로 사용하려면 @Async가 필요하므로 스레드 풀 설정을 해야 합니다.
// application
@Service
class OrderService(val logger: Logger = LoggerFactory.getLogger(OrderService::class.java), val applicationEventPublisher: ApplicationEventPublisher, val orderRepository: OrderRepository) {
@Transactional
fun save(orderRequest: OrderController.Companion.OrderRequest): Mono<OrderController.Companion.OrderResponse> {
logger.info("save method start")
return orderRepository.save(
Order(
productName = orderRequest.productName,
customerName = orderRequest.customerName
)
).log().doOnNext {
applicationEventPublisher.publishEvent(it)
}.log().map {
OrderController.Companion.OrderResponse(
id = it.id!!,
productName = it.productName,
customerName = it.customerName
)
}.log().doOnNext {
logger.info("save method end")
}
}
fun find(id: String): Mono<OrderController.Companion.OrderResponse> {
return orderRepository.findById(id.toLong())
.switchIfEmpty { throw RuntimeException("해당 ID에 맞는 주문이 없습니다.") }
.map {
OrderController.Companion.OrderResponse(
id = it.id!!,
productName = it.productName,
customerName = it.customerName
)
}
}
}
@Component
class OrderHandler(val logger: Logger = LoggerFactory.getLogger(OrderHandler::class.java), val messageService: MessageService) {
@Async
@EventListener
fun sendMessage(order: Order) : Mono<Unit> {
logger.info("sendMessage start")
messageService.sendKakao()
var i = 0L
while(i <= 1_000_000_000) {
i++
}
messageService.sendEmail()
logger.info("sendMessage end")
return Mono.empty()
}
}
@Service
class MessageService(val logger: Logger = LoggerFactory.getLogger(MessageService::class.java)) {
fun sendKakao() {
logger.info("SEND KAKAO MESSAGE")
}
fun sendEmail() {
logger.info("SEND EMAIL")
}
}
- 기존 서비스 로직을 웹플럭스에 맞게 Reactor 코드로 변경하였습니다.
- 이벤트 리스너는 @EventListener을 사용하였습니다. (@TransactionalEventListener을 사용하지 않는 이유는 아래에서 후술.)
// controller
@RestController
class OrderController(val orderService: OrderService) {
companion object {
data class OrderRequest(
val productName: String,
val customerName: String
)
data class OrderResponse(
val id: Long,
val productName: String,
val customerName: String
)
}
@ResponseStatus(HttpStatus.CREATED)
@PostMapping("/orders")
fun save(@RequestBody orderRequest: OrderRequest) : Mono<OrderResponse> {
return orderService.save(orderRequest)
}
@ResponseStatus(HttpStatus.OK)
@GetMapping("/orders/{id}")
fun view(@PathVariable id: String) : Mono<OrderResponse> {
return orderService.find(id)
}
}
- Mono를 반환해주도록 수정하였습니다.
테스트 결과
- 우선 save 로직과 메시지 로직이 비동기로 동작하는지 확인해 봅시다.
- DB 로직과 메시지 전송 로직이 서로 별개의 스레드에서 작동하므로 비동기 방식으로 잘 동작하는 것을 확인할 수 있습니다.
- 만약 EventListener에 @Async를 붙여주지 않는다면, 같은 스레드가 DB 로직 작업을 하고 메시지 전송 작업을 하므로 비동기로 동작하지 않습니다. 또한 아래처럼 메시지 전송 로직에서 에러가 발생하면 트랜잭션이 롤백됩니다.
@TransactionalEventListener을 사용하지 않은 이유
- 처음에 @TransactionalEventListener을 사용했으나 정상적으로 동작하지 않았습니다.
- No transaction is active 메시지와 함께 이벤트 리스너 쪽에서 관련 로그가 찍히지 않는 것을 보니, 이벤트 리스너가 동작하지 않는다고 유추할 수 있습니다.
- @TransactionalListener 내부 코드를 살펴보니, 위와 같이 ReactiveTransactionalManager는 트랜잭션의 상태를 스레드 로컬이 아닌 리액터 컨텍스트에 저장하므로 @TransactionalListener를 사용할 수 없다고 나와 있습니다. (2022/11/1 현재 기준으로도 개발자 피셜 @ReactiveTransactionalListener를 아직 제공할 생각이 없다고 함.)
현 구조의 문제점
- 위 서비스는 부모 메서드가 트랜잭션을 갖고 있으므로 중간에 예외가 발생하더라도 자식 메서드를 실행한 상태라면 그대로 로직을 수행하게 됩니다. (@Async + @EventListener 방식이기 때문임.)
- 가령, 데이터는 저장하지 못했는데 사용자에게 데이터는 저장되었다고 메시지가 전달되었다고 이해하면 됩니다.
개선 사항
- 당장 생각나는 방법은 이벤트 발행 로직을 트랜잭션 최하단부에 배치시키는 것입니다.
@Transactional
fun save(orderRequest: OrderController.Companion.OrderRequest): Mono<OrderController.Companion.OrderResponse> {
logger.info("save method start")
return orderRepository.save(
Order(
productName = orderRequest.productName,
customerName = orderRequest.customerName
)
).log().map {
OrderController.Companion.OrderResponse(
id = it.id!!,
productName = it.productName,
customerName = it.customerName
)
}.log().doOnNext {
applicationEventPublisher.publishEvent(Order(
id = it.id,
productName = it.productName,
customerName = it.customerName
))
}
}
- 이렇게 하면 부모 메서드에서 예외가 발생하지 않아야 정상적으로 이벤트가 발행되도록 할 수 있습니다.
- 혹은 TransactionalOperator를 사용하여 save하는 로직만 트랜잭션 영역으로 구분짓고, 그 외에는 비트랜잭션 영역으로 구분짓는 방법도 있겠습니다.