개발 이야기/Spring

[Spring] Spring Event 정리

제이온 (Jayon) 2022. 11. 4.

주문 프로그램을 예시로 들어 설명하겠습니다.

 

Spring MVC 기준

Spring 4.2 이전 Spring Event 프로그래밍 방법

  • 개발 환경: Spring MVC + Spring Data JPA + Kotlin

 

// Domain
@Entity
@Table(name = "order_info")
data class Order (

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    var id: Long? = null,
    val productName: String,
    val customerName: String
)

// Repository
@Repository
interface OrderRepository : JpaRepository<Order, Long> {
}

// Application
@Service
class OrderService(val orderRepository: OrderRepository) {

    @Transactional
    fun save(orderRequest: OrderController.Companion.OrderRequest): OrderController.Companion.OrderResponse {
        val result = orderRepository.save(
            Order(
                productName = orderRequest.productName,
                customerName = orderRequest.customerName
            )
        )

        sendKakao()
        sendEmail()

        return OrderController.Companion.OrderResponse(
            id = result.id!!,
            productName = result.productName,
            customerName = result.customerName
        )
    }

    private fun sendKakao() {
        println("SEND KAKAO MESSAGE")
    }

    private fun sendEmail() {
        println("SEND EMAIL")
    }

    fun find(id: String): OrderController.Companion.OrderResponse {
        val result = orderRepository.findById(id.toLong()).orElseThrow { RuntimeException("해당 ID에 맞는 주문이 없습니다.") }
        return OrderController.Companion.OrderResponse(
            id = result.id!!,
            productName = result.productName,
            customerName = result.customerName
        )
    }
}

// Controller
@RestController
class OrderController(val orderService: OrderService) {

    companion object {
        data class OrderRequest(
            val productName: String,
            val customerName: String
        )

        data class OrderResponse(
            val id: Long,
            val productName: String,
            val customerName: String
        )
    }

    @ResponseStatus(HttpStatus.CREATED)
    @PostMapping("/orders")
    fun save(@RequestBody orderRequest: OrderRequest) : OrderResponse {
        return orderService.save(orderRequest)
    }

    @ResponseStatus(HttpStatus.OK)
    @GetMapping("/orders/{id}")
    fun view(@PathVariable id: String) : OrderResponse {
        return orderService.find(id)
    }
}

 

  • OrderService의 save() 메서드는 주문 정보를 저장한 뒤, 카카오 및 이메일 알림 전송하는 일을 수행하고 있습니다. 이렇게 할 경우 주문을 처리하는 로직과 메시지를 발송하는 로직이 섞이게 됩니다.
  • Event 방식으로 개선해 봅시다.

 

// domain
data class OrderEvent(val eventSource: Any, val data: Order) : ApplicationEvent(eventSource)

// Application
@Service
class MessageService {

    fun sendKakao() {
        println("SEND KAKAO MESSAGE")
    }

    fun sendEmail() {
        println("SEND EMAIL")
    }
}

@Component
class OrderHandler(val messageService: MessageService) : ApplicationListener<OrderEvent> {

    override fun onApplicationEvent(event: OrderEvent) {
        messageService.sendKakao()
        messageService.sendEmail()
    }
}

@Service
class OrderService(val applicationEventPublisher: ApplicationEventPublisher, val orderRepository: OrderRepository) {

    @Transactional
    fun save(orderRequest: OrderController.Companion.OrderRequest): OrderController.Companion.OrderResponse {
        val result = orderRepository.save(
            Order(
                productName = orderRequest.productName,
                customerName = orderRequest.customerName
            )
        )

        applicationEventPublisher.publishEvent(OrderEvent(this, result))

        return OrderController.Companion.OrderResponse(
            id = result.id!!,
            productName = result.productName,
            customerName = result.customerName
        )
    }

    fun find(id: String): OrderController.Companion.OrderResponse {
        val result = orderRepository.findById(id.toLong()).orElseThrow { RuntimeException("해당 ID에 맞는 주문이 없습니다.") }
        return OrderController.Companion.OrderResponse(
            id = result.id!!,
            productName = result.productName,
            customerName = result.customerName
        )
    }
}

 

  • event를 위한 OrderEvent를 정의해 줍니다. 이때, ApplicationEvent를 상속 받아야 합니다.
  • 메시지 알림 로직을 MessageService로 위임합니다.
  • OrderHandler는 OrderEvent를 받는 이벤트 리스너입니다. ApplicationListener를 상속 받고, onApplicationEvent() 메서드를 오버라이딩하여 이벤트를 받았을 때 행동을 정의하면 됩니다.
  • OrderService는 ApplicationEventPublisher를 주입 받고, 이벤트를 publishEvent() 메서드 호출하여 퍼블리싱하면 됩니다.
  • DB 로직 직후, 메시지 알림이 성공한 것을 볼 수 있습니다.

 

 

Spring 4.2 이후 Spring Event 프로그래밍 방법

  • 위 코드로도 이벤트를 정상적으로 퍼블리싱 및 리스닝이 가능하지만, ApplicationEvent, ApplicationListener를 상속해야 한다는 단점이 있습니다.
  • 이것은 코드의 길이도 길어지고, 다중 상속이 불가능하다는 문제가 있습니다.
  • 다행히 Spring 4.2 이후부터는 @EventListener를 통해 좀 더 편하게 개발할 수 있게 되었습니다.

 

// application
@Component
class OrderHandler(val messageService: MessageService) {

    @EventListener
    fun sendMessage(order: Order) {
        messageService.sendKakao()
        messageService.sendEmail()
    }
}

@Service
class OrderService(val applicationEventPublisher: ApplicationEventPublisher, val orderRepository: OrderRepository) {

    @Transactional
    fun save(orderRequest: OrderController.Companion.OrderRequest): OrderController.Companion.OrderResponse {
        val result = orderRepository.save(
            Order(
                productName = orderRequest.productName,
                customerName = orderRequest.customerName
            )
        )

        applicationEventPublisher.publishEvent(result)

        return OrderController.Companion.OrderResponse(
            id = result.id!!,
            productName = result.productName,
            customerName = result.customerName
        )
    }

    fun find(id: String): OrderController.Companion.OrderResponse {
        val result = orderRepository.findById(id.toLong()).orElseThrow { RuntimeException("해당 ID에 맞는 주문이 없습니다.") }
        return OrderController.Companion.OrderResponse(
            id = result.id!!,
            productName = result.productName,
            customerName = result.customerName
        )
    }
}

 

  • OrderHandler에서 원하는 메소드를 정의하고 그 위에 @EventLister를 붙인 후, 파라미터에 리스닝할 이벤트를 정의하면 됩니다.  OrderEvent에서 Order로 바뀐 것을 알 수 있습니다.
  • OrderService에서는 퍼블리싱할 때 OrderEvent가 아니라 Order로 변경하면 됩니다.

 

개선 사항

  • save 내부 로직이 동기 & 블로킹 방식으로 처리되므로 save 메서드가 주문 처리 및 메시지 발송까지 마쳐야 종료가 됩니다.
  • save 로직은 어쨌든 데이터를 저장하는 것이 목적인데, 메시지 발송까지 대기해야하므로 비효율적인 측면이 있고, 메시지 발송 과정도 트랜잭션에 묶이므로 메시지 발송 실패 시 데이터 저장도 실패하게 됩니다.

 

 

동기 방식으로 트랜잭션 분리

@Component
class OrderHandler(val messageService: MessageService) {

    @TransactionalEventListener
    fun sendMessage(order: Order) {
        messageService.sendKakao()
        throw RuntimeException("test error")
        messageService.sendEmail()
    }
}

 

  • 기존 @EventListener 대신 @TransactionalEventListener로 갈아 끼우기만 하면 됩니다.

 

 

  • 메시지 전송 로직에 에러가 발생하더라도 트랜잭션은 성공적으로 커밋되는 것을 알 수 있습니다.
  • @TransactionalEventListener는 디폴트 설정이 이전 트랜잭션이 끝나면 해당 이벤트 리스너 로직을 수행하기 때문에 가능한 것입니다. 즉, save 트랜잭션이 성공적으로 끝나야 비로소 메시지 전송 로직이 수행됩니다.
  • 다만 여전히 동기 방식이므로 메시지 전송 로직을 수행할 동안 다른 작업을 할 수 없습니다.

 

비동기 방식으로 이벤트 처리

// application
@Component
class OrderHandler(val messageService: MessageService) {

    @Async
    @TransactionalEventListener
    fun sendMessage(order: Order) {
        messageService.sendKakao()
        messageService.sendEmail()
    }
}

// configuration
@EnableAsync
@Configuration
class AsyncConfiguration : AsyncConfigurer {

    override fun getAsyncExecutor(): Executor {
        val executor = ThreadPoolTaskExecutor()
        executor.setThreadNamePrefix("async-thread-")
        executor.corePoolSize = 2
        executor.maxPoolSize = 100
        executor.queueCapacity = 5
        executor.initialize()
        return executor
    }

    override fun getAsyncUncaughtExceptionHandler(): AsyncUncaughtExceptionHandler? {
        return SimpleAsyncUncaughtExceptionHandler()
    }
}

 

  • 이벤트 리스너 코드에 @Async를 달아주고, @Async와 관련된 설정 클래스를 만들면 됩니다.
  • @Async를 이벤트 리스너 코드에 달고, main이나 설정 클래스에서 @EnableAsync만 달아도 비동기적으로 돌기는 하지만, @Async가 붙은 코드는 항상 새로운 스레드를 만들어서 할당하게 됩니다. 즉, 수많은 요청이 들어왔을 때 매번 스레드를 생성하여 OutOfMemoryError가 발생할 수 있습니다.
  • 따라서 @Async가 붙은 코드에 대해 새로운 스레드를 할당하되, 스레드 개수를 제한하는 스레드 풀 설정이 필요합니다. 그것이 위 AsyncConfiguration의 getAsyncExecutor()이 하는 역할입니다.
  • 메시지 전송 성공 여부와 관계 없이 DB 로직은 완수하고 트랜잭션 커밋까지 날리는 모습을 볼 수 있고, 메시지 전송 로직은 async-thread prefix가 붙은 스레드가 새로 할당되어 작업하는 것을 확인할 수 있습니다.

 

 

  • 만약 메시지 전송 중 에러가 발생한다고 하더라도, DB 로직은 정상적으로 트랜잭션 커밋이 실행되는 것을 알 수 있습니다. 또한, 메시지 전송 로직은 새로운 스레드가 수행하므로 다른 작업을 다른 스레드(DB 로직 수행하던 스레드)가 이어서 진행할 수 있습니다.

 

 

정리

  • 대부분 Spring 4.2 이상을 사용 중일 것으로 추측되므로 @EventListener 방식을 추천하며, 동기 방식일 때는 @TransactionalListener를 상황에 맞게 적용하고, 비동기 방식을 적용할 때는 @TransactionalListener, @Async 및 스레드 풀 설정을 적용해 주면 됩니다.
  • 트랜잭션을 사용하지 않는다면, @EventListener만 사용해도 무방합니다. (혹은 @Async + @EventListener)
    • @EventListener vs @TransactionalListener
      • @EventListener: 부모 메서드에서 예외가 발생하더라도 예외 발생 전에 이벤트가 publish되었다면 무조건 이벤트를 자식 메서드에서 listen하여 작업을 수행합니다.
      • @TransactionalListener: 부모 메서드에서 예외가 발생하였다면 이벤트가 publish되지 않으므로 자식 메서드에서 이벤트를 listen하지 않습니다.

 

Spring Webflux 기준

Spring 5 이후 Spring Event 프로그래밍 방법

  • 개발 환경: Spring Webflux + Spring Data R2DBC(R2dbc Repository 사용) + Kotlin

 

// domain
@Table("order_info")
data class Order (

    @Id
    var id: Long? = null,
    val productName: String,
    val customerName: String
)

// Repository
@Repository
interface OrderRepository : R2dbcRepository<Order, Long> {
}

// Configuration
@Configuration
class R2dbcConfiguration {

    @Bean
    fun mySQLTransactionManager(connectionFactory: ConnectionFactory): ReactiveTransactionManager {
        return R2dbcTransactionManager(connectionFactory)
    }
}

@EnableAsync
@Configuration
class AsyncConfiguration : AsyncConfigurer {

    override fun getAsyncExecutor(): Executor {
        val executor = ThreadPoolTaskExecutor()
        executor.setThreadNamePrefix("async-thread-")
        executor.corePoolSize = 2
        executor.maxPoolSize = 100
        executor.queueCapacity = 5
        executor.initialize()
        return executor
    }

    override fun getAsyncUncaughtExceptionHandler(): AsyncUncaughtExceptionHandler? {
        return SimpleAsyncUncaughtExceptionHandler()
    }
}

 

  • R2DBC Repository는 save할 때 ID가 null이면 ID를 auto-generate해 줍니다.
  • R2DBC는 사용할 트랜잭션 매니저를 지정해야, 추후 @Transactional을 정상적으로 동작시킬 수 있습니다.
  • @EventListener는 비동기로 사용하려면 @Async가 필요하므로 스레드 풀 설정을 해야 합니다.

 

// application
@Service
class OrderService(val logger: Logger = LoggerFactory.getLogger(OrderService::class.java), val applicationEventPublisher: ApplicationEventPublisher, val orderRepository: OrderRepository) {

    @Transactional
    fun save(orderRequest: OrderController.Companion.OrderRequest): Mono<OrderController.Companion.OrderResponse> {
        logger.info("save method start")
        return orderRepository.save(
            Order(
                productName = orderRequest.productName,
                customerName = orderRequest.customerName
            )
        ).log().doOnNext {
            applicationEventPublisher.publishEvent(it)
        }.log().map {
            OrderController.Companion.OrderResponse(
                id = it.id!!,
                productName = it.productName,
                customerName = it.customerName
            )
        }.log().doOnNext {
            logger.info("save method end")
        }
    }

    fun find(id: String): Mono<OrderController.Companion.OrderResponse> {
        return orderRepository.findById(id.toLong())
            .switchIfEmpty { throw RuntimeException("해당 ID에 맞는 주문이 없습니다.") }
            .map {
                OrderController.Companion.OrderResponse(
                    id = it.id!!,
                    productName = it.productName,
                    customerName = it.customerName
                )
            }
    }
}

@Component
class OrderHandler(val logger: Logger = LoggerFactory.getLogger(OrderHandler::class.java), val messageService: MessageService) {

    @Async
    @EventListener
    fun sendMessage(order: Order) : Mono<Unit> {
        logger.info("sendMessage start")
        messageService.sendKakao()
         
        var i = 0L
        while(i <= 1_000_000_000) {
            i++
        }

        messageService.sendEmail()
        logger.info("sendMessage end")
        return Mono.empty()
    }
}

@Service
class MessageService(val logger: Logger = LoggerFactory.getLogger(MessageService::class.java)) {

    fun sendKakao() {
        logger.info("SEND KAKAO MESSAGE")
    }

    fun sendEmail() {
        logger.info("SEND EMAIL")
    }
}

 

  • 기존 서비스 로직을 웹플럭스에 맞게 Reactor 코드로 변경하였습니다.
  • 이벤트 리스너는 @EventListener을 사용하였습니다. (@TransactionalEventListener을 사용하지 않는 이유는 아래에서 후술.)

 

// controller
@RestController
class OrderController(val orderService: OrderService) {

    companion object {
        data class OrderRequest(
            val productName: String,
            val customerName: String
        )

        data class OrderResponse(
            val id: Long,
            val productName: String,
            val customerName: String
        )
    }

    @ResponseStatus(HttpStatus.CREATED)
    @PostMapping("/orders")
    fun save(@RequestBody orderRequest: OrderRequest) : Mono<OrderResponse> {
        return orderService.save(orderRequest)
    }

    @ResponseStatus(HttpStatus.OK)
    @GetMapping("/orders/{id}")
    fun view(@PathVariable id: String) : Mono<OrderResponse> {
        return orderService.find(id)
    }
}

 

  • Mono를 반환해주도록 수정하였습니다.

 

테스트 결과

  • 우선 save 로직과 메시지 로직이 비동기로 동작하는지 확인해 봅시다.

 

 

  • DB 로직과 메시지 전송 로직이 서로 별개의 스레드에서 작동하므로 비동기 방식으로 잘 동작하는 것을 확인할 수 있습니다.
  • 만약 EventListener에 @Async를 붙여주지 않는다면, 같은 스레드가 DB 로직 작업을 하고 메시지 전송 작업을 하므로 비동기로 동작하지 않습니다. 또한 아래처럼 메시지 전송 로직에서 에러가 발생하면 트랜잭션이 롤백됩니다.

 

 

@TransactionalEventListener을 사용하지 않은 이유

  • 처음에 @TransactionalEventListener을 사용했으나 정상적으로 동작하지 않았습니다.

 

 

  • No transaction is active 메시지와 함께 이벤트 리스너 쪽에서 관련 로그가 찍히지 않는 것을 보니, 이벤트 리스너가 동작하지 않는다고 유추할 수 있습니다.

 

 

  • @TransactionalListener 내부 코드를 살펴보니, 위와 같이 ReactiveTransactionalManager는 트랜잭션의 상태를 스레드 로컬이 아닌 리액터 컨텍스트에 저장하므로 @TransactionalListener를 사용할 수 없다고 나와 있습니다. (2022/11/1 현재 기준으로도 개발자 피셜 @ReactiveTransactionalListener를 아직 제공할 생각이 없다고 함.)

 

현 구조의 문제점

  • 위 서비스는 부모 메서드가 트랜잭션을 갖고 있으므로 중간에 예외가 발생하더라도 자식 메서드를 실행한 상태라면 그대로 로직을 수행하게 됩니다. (@Async + @EventListener 방식이기 때문임.)
  • 가령, 데이터는 저장하지 못했는데 사용자에게 데이터는 저장되었다고 메시지가 전달되었다고 이해하면 됩니다.

 

개선 사항

  • 당장 생각나는 방법은 이벤트 발행 로직을 트랜잭션 최하단부에 배치시키는 것입니다.

 

@Transactional
fun save(orderRequest: OrderController.Companion.OrderRequest): Mono<OrderController.Companion.OrderResponse> {
    logger.info("save method start")
    return orderRepository.save(
        Order(
            productName = orderRequest.productName,
            customerName = orderRequest.customerName
        )
    ).log().map {
        OrderController.Companion.OrderResponse(
            id = it.id!!,
            productName = it.productName,
            customerName = it.customerName
        )
    }.log().doOnNext {
        applicationEventPublisher.publishEvent(Order(
            id = it.id,
            productName = it.productName,
            customerName = it.customerName
        ))
    }
}

 

  • 이렇게 하면 부모 메서드에서 예외가 발생하지 않아야 정상적으로 이벤트가 발행되도록 할 수 있습니다.
  • 혹은 TransactionalOperator를 사용하여 save하는 로직만 트랜잭션 영역으로 구분짓고, 그 외에는 비트랜잭션 영역으로 구분짓는 방법도 있겠습니다.

댓글

추천 글