개발 이야기/Spring

[Spring] PostgreSQL Read/Write Replication with R2DBC

제이온 (Jayon) 2023. 3. 5. 22:40

안녕하세요? 제이온입니다.

오늘은 인프라 환경에서 PostgreSQL Read/Write Replication 세팅이 되어 있다고 가정할 때, 스프링 애플리케이션 쪽에서 Replication을 위한 설정을 어떤 것을 해야 하는지, 그리고 테스트는 어떻게 해야 하는지에 관한 가이드를 작성해 보겠습니다.

 

Read/Write End-Point 분리하기

먼저 프로덕션 코드 상에서 읽기 작업 시 Read용 데이터베이스로, 쓰기 작업시 Write용 데이터베이스로 End-Point를 분리하여 쿼리를 요청해야 합니다.

 

가장 쉬운 방법은 ReadConnectionFactory, WriteConnectionFactory 2개의 빈을 생성하고 Repository에 그 빈들을 모두 넣어준 후, 적절히 읽기 작업시 ReadConnectionFactory를 쓰고 쓰기 작업시 WriteConnectionFactory를 쓰는 것입니다.

 

하지만 위 방법은 Repository 내부에서 개발자가 실수로 Connection을 관리할 위험성이 있으므로, 저는 ConnectionFactory는 하나만 두고 특정 기준에 따라 동적으로 ReadConnectionFactory나 WriteConnectionFactory로 라우팅하는 방법을 채택했습니다.

 

class MultiTenantRoutingConnectionFactory : AbstractRoutingConnectionFactory() {

    companion object {
        const val READ_DATA_SOURCE_KEY = "readDataSourceKey"
        const val WRITE_DATA_SOURCE_KEY = "writeDataSourceKey"
    }

    private val logger = PortLoggerFactory.getPortLogger(this.javaClass)

    override fun determineCurrentLookupKey(): Mono<Any> =
        TransactionSynchronizationManager.forCurrentTransaction().map { transactionSynchronizeManager ->
            logger.info("it.getCurrentTransactionName() : ${transactionSynchronizeManager.currentTransactionName}")
            logger.info("it.isActualTransactionActive() : ${transactionSynchronizeManager.isActualTransactionActive}")
            logger.info("it.isCurrentTransactionReadOnly() : ${transactionSynchronizeManager.isCurrentTransactionReadOnly}")

            val dataSourceType = if (transactionSynchronizeManager.isActualTransactionActive) {
                logger.info("isActualTransactionActive....")
                if (transactionSynchronizeManager.isCurrentTransactionReadOnly) {
                    READ_DATA_SOURCE_KEY
                } else {
                    WRITE_DATA_SOURCE_KEY
                }
            } else {
                READ_DATA_SOURCE_KEY
            }
            logger.info("> current dataSourceType: $dataSourceType")
            dataSourceType
        }
}

 

r2dbc는 AbstractRoutingConnectionFactory 클래스를 상속하고 determineCurrentLookupKey() 메서드를 재정의함으로써 ConnectionFactory를 라우팅할 수 있습니다.

 

저는 Transaction의 Read-Only가 true일 때는 ReadConnectionFactory를, false일 때는 WriteConnectionFactory로 선택하도록 구현하였습니다.

 

이렇게 세팅한 MultiTenantRoutingConnectionFactory를 살짝만 응용하여 아래와 같이 ConnectionFactory를 빈으로 등록합니다.

 

    @Primary
    @Bean
    override fun connectionFactory(): ConnectionFactory {
        val multiTenantRoutingConnectionFactory = MultiTenantRoutingConnectionFactory()

        val factories = mapOf(
            MultiTenantRoutingConnectionFactory.READ_DATA_SOURCE_KEY to readConnectionFactory(),
            MultiTenantRoutingConnectionFactory.WRITE_DATA_SOURCE_KEY to writeConnectionFactory()
        )

        multiTenantRoutingConnectionFactory.setLenientFallback(true)
        multiTenantRoutingConnectionFactory.setDefaultTargetConnectionFactory(readConnectionFactory())
        multiTenantRoutingConnectionFactory.setTargetConnectionFactories(factories)
        return multiTenantRoutingConnectionFactory
    }

    @Bean
    fun readConnectionFactory(): ConnectionFactory {
        val connectionFactory = ConnectionFactories.get(
            builder()
                .option(DRIVER, "pool")
                .option(PROTOCOL, "postgresql")
                .option(HOST, env.getProperty("spring.r2dbc.postgresql.read.host", "localhost"))
                .option(PORT, env.getProperty("spring.r2dbc.postgresql.read.port", "25432").toInt())
                .option(DATABASE, env.getProperty("spring.r2dbc.postgresql.read.name", "port"))
                .option(USER, env.getProperty("spring.r2dbc.postgresql.read.username", "postgres"))
                .option(PASSWORD, env.getProperty("spring.r2dbc.postgresql.read.password", "postgres"))
                .build()
        )
        println(env.getProperty("spring.r2dbc.postgresql.read.port", "25432").toInt())
        return configurateConnectionPool(connectionFactory)
    }

    @Bean
    fun writeConnectionFactory(): ConnectionFactory {
        val connectionFactory = ConnectionFactories.get(
            builder()
                .option(DRIVER, "pool")
                .option(PROTOCOL, "postgresql")
                .option(HOST, env.getProperty("spring.r2dbc.postgresql.write.host", "localhost"))
                .option(PORT, env.getProperty("spring.r2dbc.postgresql.write.port", "5432").toInt())
                .option(DATABASE, env.getProperty("spring.r2dbc.postgresql.write.name", "port"))
                .option(USER, env.getProperty("spring.r2dbc.postgresql.write.username", "postgres"))
                .option(PASSWORD, env.getProperty("spring.r2dbc.postgresql.write.password", "postgres"))
                .build()
        )
        println(env.getProperty("spring.r2dbc.postgresql.write.port", "5432").toInt())
        return configurateConnectionPool(connectionFactory)
    }

    @Bean
    fun reactiveTransactionManager(connectionFactory: ConnectionFactory): ReactiveTransactionManager {
        return R2dbcTransactionManager(TransactionAwareConnectionFactoryProxy(connectionFactory))
    }

 

코드는 간단합니다. 아까 라우팅할 때 쓰인 KEY값을 실제 ConnectionFactory에 연결만 하면 끝입니다. 참고로, 스프링 트랜잭션을 사용하기 위해 ReactiveTransactionManager 빈은 꼭 등록이 되어야 합니다.

 

Repository는 위의 connectionFactory 객체를 주입받고, Application은 트랜잭션을 설정할 때 read-only 값을 설정한다면 성공적으로 Read/Write End-Point를 나눌 수 있게 됩니다.

 

@Repository
class SpringDataTestRepository(
    connectionFactory: ConnectionFactory
) : TestRepository {
    val template: R2dbcEntityTemplate = R2dbcEntityTemplate(connectionFactory)

    // ...
}

@Service
class TestService(
    val testRepository: TestRepository,
) {
    @Transactional(readOnly = true)
    suspend fun getTest(request: TestRqeust): TestResult =
        // ...
}

 

테스트 컨테이너를 활용하여 Read/Write Replication 검증하기

위 코드처럼 DB Read/Write End-Point를 나누는 것은 단순히 쿼리 요청 커넥션만 분리하는 일이므로 어렵지 않습니다. 하지만 실제로 테스트 상에서 End-Point가 잘 분리되는지, 더 나아가 Write 서버가 Read 서버로 동기화를 잘 해주는지 까지 테스트하고 싶다면 어떻게 할 수 있을까요?

 

저는 실제 운영 DB와의 스펙을 일치시키기 위해 TestContainers 라이브러리를 활용한 테스트 컨테이너로 테스트해 보겠습니다.

 

먼저 아래와 같이 gradle에 의존성을 추가합니다. 현 프로젝트는 테스트 컨테이너를 testFixture 모듈 안에서 구현하였으므로 testFixturesImplementation을 사용하였습니다.

 

testFixturesImplementation("org.testcontainers:testcontainers:${Version.testContainers}")
testFixturesImplementation("org.testcontainers:r2dbc:${Version.testContainers}")
testFixturesImplementation("org.testcontainers:postgresql:${Version.testContainers}")
testFixturesImplementation("org.testcontainers:junit-jupiter:${Version.testContainers}")
testFixturesImplementation("org.springframework.boot:spring-boot-starter-data-r2dbc")
testFixturesImplementation("io.kotest:kotest-runner-junit5:${Version.kotest}")
testFixturesImplementation("io.r2dbc:r2dbc-postgresql:${Version.r2dbcPostgresql}")

 

만약 트랜잭션의 read-only 값에 따라 커넥션이 잘 라우팅되어 전달되는지 확인하고 싶다면, 아래처럼 테스트 컨테이너를 심플하게 2개 띄우는 코드를 작성하면 됩니다.

 

class TestPostgreSQLContainer : PostgreSQLContainer<TestPostgreSQLContainer>("docker.io/bitnami/postgresql:13") {

    companion object {

        private lateinit var writeInstance: TestPostgreSQLContainer
        private lateinit var readInstance: TestPostgreSQLContainer

        private lateinit var writeDatabaseClient: DatabaseClient
        private lateinit var readDatabaseClient: DatabaseClient

        private lateinit var writeConnectionFactory: PostgresqlConnectionFactory
        private lateinit var readConnectionFactory: PostgresqlConnectionFactory

        init {
            start()
        }

        private fun getConnectionFactoryOption(instance: TestPostgreSQLContainer): ConnectionFactoryOptions {
            return PostgreSQLR2DBCDatabaseContainer.getOptions(instance)
        }

        fun sql(sql: String) {
            writeDatabaseClient.sql(sql).then().block()
        }

        fun start() {
            if (!Companion::writeInstance.isInitialized) {
                writeInstance = TestPostgreSQLContainer()
                    .withUsername("postgres")
                    .withPassword("postgres")
                    .withStartupTimeout(Duration.ofSeconds(60))
                    .withDatabaseName("master-database")
                    .apply { start() }

                writeConnectionFactory = PostgresqlConnectionFactoryProvider().create(getConnectionFactoryOption(writeInstance))
                writeDatabaseClient = DatabaseClient.create(writeConnectionFactory)
            }

            if (!Companion::readInstance.isInitialized) {
                readInstance = TestPostgreSQLContainer()
                    .withUsername("postgres")
                    .withPassword("postgres")
                    .withStartupTimeout(Duration.ofSeconds(60))
                    .withDatabaseName("slave-database")
                    .apply { start() }

                readConnectionFactory = PostgresqlConnectionFactoryProvider().create(getConnectionFactoryOption(readInstance))
                readDatabaseClient = DatabaseClient.create(readConnectionFactory)
            }
        }

        fun stop() {
            writeInstance.stop()
            readInstance.stop()
        }
    }
}

 

이 코드에 문제점은 무엇일까요? Transaction의 read-only 값에 따라 Connection 라우팅 테스트는 성공하겠지만, Write 데이터베이스에 데이터를 업데이트하면, Read 데이터베이스에는 그 데이터가 동기화되지 않아서 통합 테스트시 대부분의 테스트가 실패하게 됩니다. 따라서 Write 데이터베이스의 데이터를 Read 데이터베이스에 반영해 주는 설정이 필요합니다.

 

PostgreSQL에서 Read/Write Replication를 구현하는 원리

테스트를 위한 세팅에 앞서, PostgreSQL은 Read/Write Replication을 어떻게 하는지 살펴 봅시다.

 

Log Shipping

 

  • file-based 복제로 master 서버의 WAL file이 생성되면 이 파일을 scp를 통해 standby 서버로 전달하여 반영합니다.
  • master 서버의 wal file이 생길 때 까지 replication gap이 발생할 수 있다는 단점이 있습니다.

 

Logical Replication

 

  • pub / sub 구조로 양방향 replication이 가능합니다.
  • 특정 테이블만 복제하는 partial replication만 가능합니다.
  • DDL 복제는 안되기 때문에 각각 별도로 수행해 줘야 한다는 단점이 있습니다.

 

Streaming Replication

 

  • Master 서버는 standby 서버에게 transaction log entires를 전달하고 standby 서버는 WAL file을 기다리지 않고 record 단위로 복제를 수행하는 방식입니다.
  • 일반적으로 가장 많이 사용되는 복제 방식이지만, standby 서버에 긴 장애가 발생할 경우 master 서버의 wal 파일을 재활용하므로 데이터가 유실될 가능성이 있습니다. 가령, standby 서버에 긴 장애가 발생하였고, 그동안 master 서버가 가장 오래된 wal 파일을 덮어 써 버린다면, standby 서버가 복구된 이후에도 유실된 wal 내용을 복원할 방법이 없습니다.
  • 따라서 일반적으로 Streaming Replication 방식을 사용하되, 만약을 위해 Log Shipping 방식도 사용하는 편입니다.

 

TestContainers에서 PostgreSQL Read/Write Replication 구현하기

위에 PostgreSQL Replication 원리를 소개하였지만, 실제로 세팅하는 과정은 여러 conf 파일을 수정하고 wal를 위한 디렉토리를 만들고, master 서버와 slave 서버 간의 포트를 열어주고, 리플리케이션을 위한 유저를 만드는 등 꽤나 복잡한 설정이 들어갑니다.

 

이들을 하려면 할 수 있겠지만, 테스트 자동화하는 입장에서 제대로 동작하도록 만드는 것은 꽤나 까다롭습니다. 그렇다면, 우리는 어떻게 테스트 컨테이너에서 PostgreSQL Replication을 구현할 수 있을까요?

 

다행히, Replication을 편하게 할 수 있도록 이미 만들어진 bitnami/postgresql:13라는 이미지가 존재합니다! (https://hub.docker.com/r/bitnami/postgresql)

 

해당 사이트에 접속해 보면 아주 친절하게도 Streaming Replication 방식의 Replication 세팅 가이드가 나와 있습니다.

 

 

저는 이 포스팅에 상세한 Replication 세팅법을 적지는 않고, 바로 docker-compose.yml을 사용하여 테스트 컨테이너를 설정해 보겠습니다.

 

우선 다음과 같이 docker-compose.yml을 작성하고, classpath에 해당하는 디렉토리에 저장해 줍니다.

 

version: '3'

networks:
  my-network:

services:
  postgresql-master:
    image: 'docker.io/bitnami/postgresql:13'
    environment:
      - POSTGRESQL_USERNAME=postgres
      - POSTGRESQL_DATABASE=my_database
      - POSTGRESQL_PASSWORD=postgres
      - POSTGRESQL_PGAUDIT_LOG=READ,WRITE
      - POSTGRESQL_LOG_HOSTNAME=true
      - POSTGRESQL_REPLICATION_MODE=master
      - POSTGRESQL_REPLICATION_USER=repl_user
      - POSTGRESQL_REPLICATION_PASSWORD=repl_password
    networks:
      my-network:
        aliases:
          - postgresql
  postgresql-slave:
    image: 'docker.io/bitnami/postgresql:13'
    depends_on:
      - postgresql-master
    environment:
      - POSTGRESQL_USERNAME=postgres
      - POSTGRESQL_PASSWORD=postgres
      - POSTGRESQL_MASTER_HOST=postgresql-master
      - POSTGRESQL_PGAUDIT_LOG=READ,WRITE
      - POSTGRESQL_LOG_HOSTNAME=true
      - POSTGRESQL_REPLICATION_MODE=slave
      - POSTGRESQL_REPLICATION_USER=repl_user
      - POSTGRESQL_REPLICATION_PASSWORD=repl_password
      - POSTGRESQL_MASTER_PORT_NUMBER=5432
    networks:
      my-network:

 

기존 bitnami/postgresql 세팅과 거의 유사합니다. 저는 두 DB를 하나의 도커 네트워크로 속하도록 설정하였고, 명시적인 포트 포워딩은 하지 않았습니다. 특히 포트 포워딩 쪽은 병렬 테스트할 때 컨테이너가 2개 이상 띄워지는 것을 대비해야 하기 때문입니다. (안 그러면 포트가 겹치는 ㅠㅠ)

 

그리고 다음과 같이 Docker-Compose를 활용한 테스트 컨테이너 코드를 작성해 줍니다.

 

class TestDockerComposeContainer :
    DockerComposeContainer<TestDockerComposeContainer>(
        File(MountableFile.forClasspathResource("testcontainers/docker-compose.yml").resolvedPath)
    ) {

    companion object {

        private lateinit var testDockerComposeContainer: TestDockerComposeContainer
        private lateinit var masterDatabaseClient: DatabaseClient

        lateinit var masterConnectionFactory: PostgresqlConnectionFactory

        init {
            start()
        }

        fun sql(sql: String) {
            masterDatabaseClient.sql(sql).then().block()
            Thread.sleep(100)
        }

        fun start() {
            if (!Companion::testDockerComposeContainer.isInitialized) {
                val masterContainerName = "postgresql-master"
                val slaveContainerName = "postgresql-slave"

                testDockerComposeContainer = TestDockerComposeContainer()
                    .withExposedService(masterContainerName, 5432)
                    .withExposedService(slaveContainerName, 5432)
                    .apply {
                        start()
                    }
                Thread.sleep(3000)

                val masterContainerState = testDockerComposeContainer.getContainerByServiceName("${masterContainerName}_1").get()
                val masterContainerPort = testDockerComposeContainer.getServicePort(masterContainerName, 5432)

                masterConnectionFactory = masterContainerState.run {
                    PostgresqlConnectionFactoryProvider().create(
                        ConnectionFactoryOptions.builder()
                            .option(ConnectionFactoryOptions.DRIVER, "pool")
                            .option(ConnectionFactoryOptions.PROTOCOL, "postgresql")
                            .option(ConnectionFactoryOptions.HOST, this.host.toString())
                            .option(ConnectionFactoryOptions.PORT, masterContainerPort)
                            .option(ConnectionFactoryOptions.DATABASE, "my_database")
                            .option(ConnectionFactoryOptions.USER, "postgres")
                            .option(ConnectionFactoryOptions.PASSWORD, "postgres")
                            .build()
                    )
                }
                masterDatabaseClient = DatabaseClient.create(masterConnectionFactory)

                val slaveContainerState = testDockerComposeContainer.getContainerByServiceName("${slaveContainerName}_1").get()
                val slaveContainerPort = testDockerComposeContainer.getServicePort(slaveContainerName, 5432)

                System.setProperty("spring.r2dbc.postgresql.write.host", masterContainerState.host)
                System.setProperty("spring.r2dbc.postgresql.write.port", masterContainerPort.toString())
                System.setProperty("spring.r2dbc.postgresql.write.name", "my_database")
                System.setProperty("spring.r2dbc.postgresql.write.username", "postgres")
                System.setProperty("spring.r2dbc.postgresql.write.password", "postgres")

                System.setProperty("spring.r2dbc.postgresql.read.host", slaveContainerState.host)
                System.setProperty("spring.r2dbc.postgresql.read.port", slaveContainerPort.toString())
                System.setProperty("spring.r2dbc.postgresql.read.name", "my_database")
                System.setProperty("spring.r2dbc.postgresql.read.username", "postgres")
                System.setProperty("spring.r2dbc.postgresql.read.password", "postgres")
            }
        }

        fun stop() {
            testDockerComposeContainer.stop()
        }
    }
}

 

코드 자체는 어렵지 않으나, 몇 가지 주의해야 할 세팅이 있습니다.

 

  • docker-compose.yml 파일 명시
    • DockerComposeContainer 객체를 생성할 때 docker-compose.yml이 담긴 java.io.File 객체를 전달해야 하는데, File 객체 생성 시 classpath를 지정하는 설정을 못 찾았습니다.
    • 그래서 TestContainers에서 제공하는 MountableFile를 활용하여 classpath로 docker-compsoe.yml을 전달하고 실제 파일 경로를 얻어 왔습니다.
  • withExposedService
    • 특정 컨테이너에 대해 랜덤 포트 포워딩을 할 수 있습니다.
    • 위와 같이 설정하면 xxxx:5432와 같이 포트 포워딩됩니다.
  • start
    • start() 메서드를 실행하면 docker-compose up이 되는 것이지, 실제로 테스트 컨테이너가 구동될 때까지 기다리지는 않습니다.
    • 그래서 명시적으로 3초를 대기해 줬습니다. Wait 객체를 활용하여 더 우아하게 기다리는 방법이 있었으나 제대로 동작하게끔 세팅을 못해서 일단 조금 비효율적인 방법으로 뒀습니다.
  • getContainerByServiceName("${masterContainerName}_1")
    • 이상하게도 getServicePort()나 getServiceHost()는 컨테이너의 이름을 넣어도 잘 포트나 호스트 정보를 얻어올 수 있는데, getContainerByServiceName()할 때만 컨테이너 이름 뒤에 _1을 붙여야 합니다.
    • 이건 뾰족한 우회법을 못 찾아서 이렇게 뒀습니다.

 

이렇게 세팅을 완료하면 테스트 컨테이너가 2개 띄워지고, 쓰기 작업 시 Read 데이터 베이스에도 정상적으로 동기화가 성공합니다.