[Spring] PostgreSQL Read/Write Replication with R2DBC
안녕하세요? 제이온입니다.
오늘은 인프라 환경에서 PostgreSQL Read/Write Replication 세팅이 되어 있다고 가정할 때, 스프링 애플리케이션 쪽에서 Replication을 위한 설정을 어떤 것을 해야 하는지, 그리고 테스트는 어떻게 해야 하는지에 관한 가이드를 작성해 보겠습니다.
Read/Write End-Point 분리하기
먼저 프로덕션 코드 상에서 읽기 작업 시 Read용 데이터베이스로, 쓰기 작업시 Write용 데이터베이스로 End-Point를 분리하여 쿼리를 요청해야 합니다.
가장 쉬운 방법은 ReadConnectionFactory, WriteConnectionFactory 2개의 빈을 생성하고 Repository에 그 빈들을 모두 넣어준 후, 적절히 읽기 작업시 ReadConnectionFactory를 쓰고 쓰기 작업시 WriteConnectionFactory를 쓰는 것입니다.
하지만 위 방법은 Repository 내부에서 개발자가 실수로 Connection을 관리할 위험성이 있으므로, 저는 ConnectionFactory는 하나만 두고 특정 기준에 따라 동적으로 ReadConnectionFactory나 WriteConnectionFactory로 라우팅하는 방법을 채택했습니다.
class MultiTenantRoutingConnectionFactory : AbstractRoutingConnectionFactory() {
companion object {
const val READ_DATA_SOURCE_KEY = "readDataSourceKey"
const val WRITE_DATA_SOURCE_KEY = "writeDataSourceKey"
}
private val logger = PortLoggerFactory.getPortLogger(this.javaClass)
override fun determineCurrentLookupKey(): Mono<Any> =
TransactionSynchronizationManager.forCurrentTransaction().map { transactionSynchronizeManager ->
logger.info("it.getCurrentTransactionName() : ${transactionSynchronizeManager.currentTransactionName}")
logger.info("it.isActualTransactionActive() : ${transactionSynchronizeManager.isActualTransactionActive}")
logger.info("it.isCurrentTransactionReadOnly() : ${transactionSynchronizeManager.isCurrentTransactionReadOnly}")
val dataSourceType = if (transactionSynchronizeManager.isActualTransactionActive) {
logger.info("isActualTransactionActive....")
if (transactionSynchronizeManager.isCurrentTransactionReadOnly) {
READ_DATA_SOURCE_KEY
} else {
WRITE_DATA_SOURCE_KEY
}
} else {
READ_DATA_SOURCE_KEY
}
logger.info("> current dataSourceType: $dataSourceType")
dataSourceType
}
}
r2dbc는 AbstractRoutingConnectionFactory 클래스를 상속하고 determineCurrentLookupKey()
메서드를 재정의함으로써 ConnectionFactory를 라우팅할 수 있습니다.
저는 Transaction의 Read-Only가 true일 때는 ReadConnectionFactory를, false일 때는 WriteConnectionFactory로 선택하도록 구현하였습니다.
이렇게 세팅한 MultiTenantRoutingConnectionFactory를 살짝만 응용하여 아래와 같이 ConnectionFactory를 빈으로 등록합니다.
@Primary
@Bean
override fun connectionFactory(): ConnectionFactory {
val multiTenantRoutingConnectionFactory = MultiTenantRoutingConnectionFactory()
val factories = mapOf(
MultiTenantRoutingConnectionFactory.READ_DATA_SOURCE_KEY to readConnectionFactory(),
MultiTenantRoutingConnectionFactory.WRITE_DATA_SOURCE_KEY to writeConnectionFactory()
)
multiTenantRoutingConnectionFactory.setLenientFallback(true)
multiTenantRoutingConnectionFactory.setDefaultTargetConnectionFactory(readConnectionFactory())
multiTenantRoutingConnectionFactory.setTargetConnectionFactories(factories)
return multiTenantRoutingConnectionFactory
}
@Bean
fun readConnectionFactory(): ConnectionFactory {
val connectionFactory = ConnectionFactories.get(
builder()
.option(DRIVER, "pool")
.option(PROTOCOL, "postgresql")
.option(HOST, env.getProperty("spring.r2dbc.postgresql.read.host", "localhost"))
.option(PORT, env.getProperty("spring.r2dbc.postgresql.read.port", "25432").toInt())
.option(DATABASE, env.getProperty("spring.r2dbc.postgresql.read.name", "port"))
.option(USER, env.getProperty("spring.r2dbc.postgresql.read.username", "postgres"))
.option(PASSWORD, env.getProperty("spring.r2dbc.postgresql.read.password", "postgres"))
.build()
)
println(env.getProperty("spring.r2dbc.postgresql.read.port", "25432").toInt())
return configurateConnectionPool(connectionFactory)
}
@Bean
fun writeConnectionFactory(): ConnectionFactory {
val connectionFactory = ConnectionFactories.get(
builder()
.option(DRIVER, "pool")
.option(PROTOCOL, "postgresql")
.option(HOST, env.getProperty("spring.r2dbc.postgresql.write.host", "localhost"))
.option(PORT, env.getProperty("spring.r2dbc.postgresql.write.port", "5432").toInt())
.option(DATABASE, env.getProperty("spring.r2dbc.postgresql.write.name", "port"))
.option(USER, env.getProperty("spring.r2dbc.postgresql.write.username", "postgres"))
.option(PASSWORD, env.getProperty("spring.r2dbc.postgresql.write.password", "postgres"))
.build()
)
println(env.getProperty("spring.r2dbc.postgresql.write.port", "5432").toInt())
return configurateConnectionPool(connectionFactory)
}
@Bean
fun reactiveTransactionManager(connectionFactory: ConnectionFactory): ReactiveTransactionManager {
return R2dbcTransactionManager(TransactionAwareConnectionFactoryProxy(connectionFactory))
}
코드는 간단합니다. 아까 라우팅할 때 쓰인 KEY값을 실제 ConnectionFactory에 연결만 하면 끝입니다. 참고로, 스프링 트랜잭션을 사용하기 위해 ReactiveTransactionManager 빈은 꼭 등록이 되어야 합니다.
Repository는 위의 connectionFactory 객체를 주입받고, Application은 트랜잭션을 설정할 때 read-only 값을 설정한다면 성공적으로 Read/Write End-Point를 나눌 수 있게 됩니다.
@Repository
class SpringDataTestRepository(
connectionFactory: ConnectionFactory
) : TestRepository {
val template: R2dbcEntityTemplate = R2dbcEntityTemplate(connectionFactory)
// ...
}
@Service
class TestService(
val testRepository: TestRepository,
) {
@Transactional(readOnly = true)
suspend fun getTest(request: TestRqeust): TestResult =
// ...
}
테스트 컨테이너를 활용하여 Read/Write Replication 검증하기
위 코드처럼 DB Read/Write End-Point를 나누는 것은 단순히 쿼리 요청 커넥션만 분리하는 일이므로 어렵지 않습니다. 하지만 실제로 테스트 상에서 End-Point가 잘 분리되는지, 더 나아가 Write 서버가 Read 서버로 동기화를 잘 해주는지 까지 테스트하고 싶다면 어떻게 할 수 있을까요?
저는 실제 운영 DB와의 스펙을 일치시키기 위해 TestContainers 라이브러리를 활용한 테스트 컨테이너로 테스트해 보겠습니다.
먼저 아래와 같이 gradle에 의존성을 추가합니다. 현 프로젝트는 테스트 컨테이너를 testFixture 모듈 안에서 구현하였으므로 testFixturesImplementation
을 사용하였습니다.
testFixturesImplementation("org.testcontainers:testcontainers:${Version.testContainers}")
testFixturesImplementation("org.testcontainers:r2dbc:${Version.testContainers}")
testFixturesImplementation("org.testcontainers:postgresql:${Version.testContainers}")
testFixturesImplementation("org.testcontainers:junit-jupiter:${Version.testContainers}")
testFixturesImplementation("org.springframework.boot:spring-boot-starter-data-r2dbc")
testFixturesImplementation("io.kotest:kotest-runner-junit5:${Version.kotest}")
testFixturesImplementation("io.r2dbc:r2dbc-postgresql:${Version.r2dbcPostgresql}")
만약 트랜잭션의 read-only 값에 따라 커넥션이 잘 라우팅되어 전달되는지 확인하고 싶다면, 아래처럼 테스트 컨테이너를 심플하게 2개 띄우는 코드를 작성하면 됩니다.
class TestPostgreSQLContainer : PostgreSQLContainer<TestPostgreSQLContainer>("docker.io/bitnami/postgresql:13") {
companion object {
private lateinit var writeInstance: TestPostgreSQLContainer
private lateinit var readInstance: TestPostgreSQLContainer
private lateinit var writeDatabaseClient: DatabaseClient
private lateinit var readDatabaseClient: DatabaseClient
private lateinit var writeConnectionFactory: PostgresqlConnectionFactory
private lateinit var readConnectionFactory: PostgresqlConnectionFactory
init {
start()
}
private fun getConnectionFactoryOption(instance: TestPostgreSQLContainer): ConnectionFactoryOptions {
return PostgreSQLR2DBCDatabaseContainer.getOptions(instance)
}
fun sql(sql: String) {
writeDatabaseClient.sql(sql).then().block()
}
fun start() {
if (!Companion::writeInstance.isInitialized) {
writeInstance = TestPostgreSQLContainer()
.withUsername("postgres")
.withPassword("postgres")
.withStartupTimeout(Duration.ofSeconds(60))
.withDatabaseName("master-database")
.apply { start() }
writeConnectionFactory = PostgresqlConnectionFactoryProvider().create(getConnectionFactoryOption(writeInstance))
writeDatabaseClient = DatabaseClient.create(writeConnectionFactory)
}
if (!Companion::readInstance.isInitialized) {
readInstance = TestPostgreSQLContainer()
.withUsername("postgres")
.withPassword("postgres")
.withStartupTimeout(Duration.ofSeconds(60))
.withDatabaseName("slave-database")
.apply { start() }
readConnectionFactory = PostgresqlConnectionFactoryProvider().create(getConnectionFactoryOption(readInstance))
readDatabaseClient = DatabaseClient.create(readConnectionFactory)
}
}
fun stop() {
writeInstance.stop()
readInstance.stop()
}
}
}
이 코드에 문제점은 무엇일까요? Transaction의 read-only 값에 따라 Connection 라우팅 테스트는 성공하겠지만, Write 데이터베이스에 데이터를 업데이트하면, Read 데이터베이스에는 그 데이터가 동기화되지 않아서 통합 테스트시 대부분의 테스트가 실패하게 됩니다. 따라서 Write 데이터베이스의 데이터를 Read 데이터베이스에 반영해 주는 설정이 필요합니다.
PostgreSQL에서 Read/Write Replication를 구현하는 원리
테스트를 위한 세팅에 앞서, PostgreSQL은 Read/Write Replication을 어떻게 하는지 살펴 봅시다.
Log Shipping
- file-based 복제로 master 서버의 WAL file이 생성되면 이 파일을 scp를 통해 standby 서버로 전달하여 반영합니다.
- master 서버의 wal file이 생길 때 까지 replication gap이 발생할 수 있다는 단점이 있습니다.
Logical Replication
- pub / sub 구조로 양방향 replication이 가능합니다.
- 특정 테이블만 복제하는 partial replication만 가능합니다.
- DDL 복제는 안되기 때문에 각각 별도로 수행해 줘야 한다는 단점이 있습니다.
Streaming Replication
- Master 서버는 standby 서버에게 transaction log entires를 전달하고 standby 서버는 WAL file을 기다리지 않고 record 단위로 복제를 수행하는 방식입니다.
- 일반적으로 가장 많이 사용되는 복제 방식이지만, standby 서버에 긴 장애가 발생할 경우 master 서버의 wal 파일을 재활용하므로 데이터가 유실될 가능성이 있습니다. 가령, standby 서버에 긴 장애가 발생하였고, 그동안 master 서버가 가장 오래된 wal 파일을 덮어 써 버린다면, standby 서버가 복구된 이후에도 유실된 wal 내용을 복원할 방법이 없습니다.
- 따라서 일반적으로 Streaming Replication 방식을 사용하되, 만약을 위해 Log Shipping 방식도 사용하는 편입니다.
TestContainers에서 PostgreSQL Read/Write Replication 구현하기
위에 PostgreSQL Replication 원리를 소개하였지만, 실제로 세팅하는 과정은 여러 conf 파일을 수정하고 wal를 위한 디렉토리를 만들고, master 서버와 slave 서버 간의 포트를 열어주고, 리플리케이션을 위한 유저를 만드는 등 꽤나 복잡한 설정이 들어갑니다.
이들을 하려면 할 수 있겠지만, 테스트 자동화하는 입장에서 제대로 동작하도록 만드는 것은 꽤나 까다롭습니다. 그렇다면, 우리는 어떻게 테스트 컨테이너에서 PostgreSQL Replication을 구현할 수 있을까요?
다행히, Replication을 편하게 할 수 있도록 이미 만들어진 bitnami/postgresql:13
라는 이미지가 존재합니다! (https://hub.docker.com/r/bitnami/postgresql)
해당 사이트에 접속해 보면 아주 친절하게도 Streaming Replication 방식의 Replication 세팅 가이드가 나와 있습니다.
저는 이 포스팅에 상세한 Replication 세팅법을 적지는 않고, 바로 docker-compose.yml을 사용하여 테스트 컨테이너를 설정해 보겠습니다.
우선 다음과 같이 docker-compose.yml을 작성하고, classpath에 해당하는 디렉토리에 저장해 줍니다.
version: '3'
networks:
my-network:
services:
postgresql-master:
image: 'docker.io/bitnami/postgresql:13'
environment:
- POSTGRESQL_USERNAME=postgres
- POSTGRESQL_DATABASE=my_database
- POSTGRESQL_PASSWORD=postgres
- POSTGRESQL_PGAUDIT_LOG=READ,WRITE
- POSTGRESQL_LOG_HOSTNAME=true
- POSTGRESQL_REPLICATION_MODE=master
- POSTGRESQL_REPLICATION_USER=repl_user
- POSTGRESQL_REPLICATION_PASSWORD=repl_password
networks:
my-network:
aliases:
- postgresql
postgresql-slave:
image: 'docker.io/bitnami/postgresql:13'
depends_on:
- postgresql-master
environment:
- POSTGRESQL_USERNAME=postgres
- POSTGRESQL_PASSWORD=postgres
- POSTGRESQL_MASTER_HOST=postgresql-master
- POSTGRESQL_PGAUDIT_LOG=READ,WRITE
- POSTGRESQL_LOG_HOSTNAME=true
- POSTGRESQL_REPLICATION_MODE=slave
- POSTGRESQL_REPLICATION_USER=repl_user
- POSTGRESQL_REPLICATION_PASSWORD=repl_password
- POSTGRESQL_MASTER_PORT_NUMBER=5432
networks:
my-network:
기존 bitnami/postgresql 세팅과 거의 유사합니다. 저는 두 DB를 하나의 도커 네트워크로 속하도록 설정하였고, 명시적인 포트 포워딩은 하지 않았습니다. 특히 포트 포워딩 쪽은 병렬 테스트할 때 컨테이너가 2개 이상 띄워지는 것을 대비해야 하기 때문입니다. (안 그러면 포트가 겹치는 ㅠㅠ)
그리고 다음과 같이 Docker-Compose를 활용한 테스트 컨테이너 코드를 작성해 줍니다.
class TestDockerComposeContainer :
DockerComposeContainer<TestDockerComposeContainer>(
File(MountableFile.forClasspathResource("testcontainers/docker-compose.yml").resolvedPath)
) {
companion object {
private lateinit var testDockerComposeContainer: TestDockerComposeContainer
private lateinit var masterDatabaseClient: DatabaseClient
lateinit var masterConnectionFactory: PostgresqlConnectionFactory
init {
start()
}
fun sql(sql: String) {
masterDatabaseClient.sql(sql).then().block()
Thread.sleep(100)
}
fun start() {
if (!Companion::testDockerComposeContainer.isInitialized) {
val masterContainerName = "postgresql-master"
val slaveContainerName = "postgresql-slave"
testDockerComposeContainer = TestDockerComposeContainer()
.withExposedService(masterContainerName, 5432)
.withExposedService(slaveContainerName, 5432)
.apply {
start()
}
Thread.sleep(3000)
val masterContainerState = testDockerComposeContainer.getContainerByServiceName("${masterContainerName}_1").get()
val masterContainerPort = testDockerComposeContainer.getServicePort(masterContainerName, 5432)
masterConnectionFactory = masterContainerState.run {
PostgresqlConnectionFactoryProvider().create(
ConnectionFactoryOptions.builder()
.option(ConnectionFactoryOptions.DRIVER, "pool")
.option(ConnectionFactoryOptions.PROTOCOL, "postgresql")
.option(ConnectionFactoryOptions.HOST, this.host.toString())
.option(ConnectionFactoryOptions.PORT, masterContainerPort)
.option(ConnectionFactoryOptions.DATABASE, "my_database")
.option(ConnectionFactoryOptions.USER, "postgres")
.option(ConnectionFactoryOptions.PASSWORD, "postgres")
.build()
)
}
masterDatabaseClient = DatabaseClient.create(masterConnectionFactory)
val slaveContainerState = testDockerComposeContainer.getContainerByServiceName("${slaveContainerName}_1").get()
val slaveContainerPort = testDockerComposeContainer.getServicePort(slaveContainerName, 5432)
System.setProperty("spring.r2dbc.postgresql.write.host", masterContainerState.host)
System.setProperty("spring.r2dbc.postgresql.write.port", masterContainerPort.toString())
System.setProperty("spring.r2dbc.postgresql.write.name", "my_database")
System.setProperty("spring.r2dbc.postgresql.write.username", "postgres")
System.setProperty("spring.r2dbc.postgresql.write.password", "postgres")
System.setProperty("spring.r2dbc.postgresql.read.host", slaveContainerState.host)
System.setProperty("spring.r2dbc.postgresql.read.port", slaveContainerPort.toString())
System.setProperty("spring.r2dbc.postgresql.read.name", "my_database")
System.setProperty("spring.r2dbc.postgresql.read.username", "postgres")
System.setProperty("spring.r2dbc.postgresql.read.password", "postgres")
}
}
fun stop() {
testDockerComposeContainer.stop()
}
}
}
코드 자체는 어렵지 않으나, 몇 가지 주의해야 할 세팅이 있습니다.
- docker-compose.yml 파일 명시
- DockerComposeContainer 객체를 생성할 때 docker-compose.yml이 담긴 java.io.File 객체를 전달해야 하는데, File 객체 생성 시 classpath를 지정하는 설정을 못 찾았습니다.
- 그래서 TestContainers에서 제공하는 MountableFile를 활용하여 classpath로 docker-compsoe.yml을 전달하고 실제 파일 경로를 얻어 왔습니다.
- withExposedService
- 특정 컨테이너에 대해 랜덤 포트 포워딩을 할 수 있습니다.
- 위와 같이 설정하면 xxxx:5432와 같이 포트 포워딩됩니다.
- start
- start() 메서드를 실행하면 docker-compose up이 되는 것이지, 실제로 테스트 컨테이너가 구동될 때까지 기다리지는 않습니다.
- 그래서 명시적으로 3초를 대기해 줬습니다. Wait 객체를 활용하여 더 우아하게 기다리는 방법이 있었으나 제대로 동작하게끔 세팅을 못해서 일단 조금 비효율적인 방법으로 뒀습니다.
- getContainerByServiceName("${masterContainerName}_1")
- 이상하게도 getServicePort()나 getServiceHost()는 컨테이너의 이름을 넣어도 잘 포트나 호스트 정보를 얻어올 수 있는데, getContainerByServiceName()할 때만 컨테이너 이름 뒤에
_1
을 붙여야 합니다. - 이건 뾰족한 우회법을 못 찾아서 이렇게 뒀습니다.
- 이상하게도 getServicePort()나 getServiceHost()는 컨테이너의 이름을 넣어도 잘 포트나 호스트 정보를 얻어올 수 있는데, getContainerByServiceName()할 때만 컨테이너 이름 뒤에
이렇게 세팅을 완료하면 테스트 컨테이너가 2개 띄워지고, 쓰기 작업 시 Read 데이터 베이스에도 정상적으로 동기화가 성공합니다.