본문 바로가기

Framework/Spring

Kotlin + SpringBoot + Kafka 연동

 

 이번 포스트에서는 코틀린 기반의 스프링 부트 프로젝트 설정 및 카프카 연동의 과정을 정리 하려고 한다. 내가 생각하는 코틀린의 큰 강점 중 하나는, 자바로 개발을 해오던 방식을 거의 그대로 유지하면서 코틀린만의 유연한 문법적 혜택을 누릴 수 있다는 것이다. 그러한 맥락에서, 사실 코틀린으로 스프링 부트 프로젝트를 설정하는 것은 자바와 거의 다를 것이 없다. 일단 스프링 부트 프로젝트 부터 만들어 보자.

 

1. 스프링 부트 프로젝트

프로젝트 생성

'Spring Initializr' 를 사용해 프로젝트를 생성하게 되면, 각종 스프링 부트의 starter 의존성을 선택하는 창이 노출 되는데,

여기선 아무런 의존성도 선택하지 않고 그냥 프로젝트를 생성하자. (어짜피 직접 그래들 의존성 추가해주는 것과 똑같다)

 

그래들 설정 변경

일단 프로젝트를 생성하면, 자바기반의 `build.gradle` 파일이 만들어질 것이다. 

자동 생성된 설정을 다 무시하고, 아래의 설정으로 바꿔주자.

buildscript {

    ext {
        kotlinVersion = '1.3.72'
        springBootVersion = '2.0.9.RELEASE'
    }

    repositories {
        mavenCentral()
    }

    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
        classpath("org.jetbrains.kotlin:kotlin-gradle-plugin:${kotlinVersion}")
    }
}

apply plugin: 'kotlin'
apply plugin: 'kotlin-spring'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'

group = 'com.asuraiv.kotlinbackend'
version = '0.0.1-SNAPSHOT'

sourceCompatibility = '1.8'

repositories {
    mavenCentral()
}

dependencies {

    implementation 'org.springframework.boot:spring-boot-starter-web'
    implementation 'org.springframework.kafka:spring-kafka'

    implementation 'org.jetbrains.kotlin:kotlin-stdlib-jdk8'
    implementation 'org.jetbrains.kotlin:kotlin-reflect'
    implementation 'com.fasterxml.jackson.module:jackson-module-kotlin'

    testImplementation('org.springframework.boot:spring-boot-starter-test') {
        exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
    }
}

compileKotlin {
    kotlinOptions {
        jvmTarget = JavaVersion.VERSION_1_8.toString()
    }
}

compileTestKotlin {
    kotlinOptions {
        jvmTarget = JavaVersion.VERSION_1_8.toString()
    }
}


test {
    useJUnitPlatform()
}

 

SpringBootApplication 클래스 작성

import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication

@SpringBootApplication
class KotlinBackendApplication

fun main(args: Array<String>) {
    runApplication<KotlinBackendApplication>(*args)
}

 참고로 코틀린의 `main` 메서드는 'top level' 함수이다. 따라서 같은 파일에 `@SpringBootApplication` 어노테이션을 적용할 클래스를 선언하고, `runApplication` 함수 제네릭 타입으로 해당 클래스를 지정한 후, 스프레드 연산자 `*` 로 arguments를 전달하면 어플리케이션이 구동 된다.

 

 

2. Kafka

 카프카는 '주키퍼(Zookeeper)'를 통해 클러스터가 코디네이팅 되므로 주키퍼를 먼저 설치한다. Homebrew(MacOS 기준)로 쉽게 설치 가능하며, 도커 이미지를 pull 받아 구동시키는 것 또한 간단하므로 그 방법을 사용해도 좋다.

 

주키퍼 설치 및 구동

$ brew install zookeeper
$ brew services start zookeeper

 

카프카 설치 및 구동

$ brew install kafka
$ brew services start kafka

 

토픽 생성

$ kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 partitions 1 --topic sample-topic

 

 

3. 연동 테스트

이제 스프링 설정 클래스와 컨슈머를 작성하고, kafka-producer 커맨드를 이용해 메시지를 인입시켜 컨슈머가 정상적으로 동작하는 것을 확인하자.

 

Kafka Configuration 작성

Configuration 클래스를 작성하기 전에, 'application.yml' 에 카프카 'hosts' 프로퍼티를 추가한다.

kafka:
  hosts: localhost:9092
@Configuration
class KafkaConfig {

    @Value("\${kafka.hosts}")
    lateinit var hosts: String

    @Primary
    @Bean
    fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> {

        val containerFactory = ConcurrentKafkaListenerContainerFactory<String, String>()
        containerFactory.consumerFactory = consumerFactory()

        return containerFactory
    }

    @Primary
    @Bean
    fun consumerFactory(): ConsumerFactory<String, String>? {

        return DefaultKafkaConsumerFactory(consumerProperties())
    }

    @Bean
    fun consumerProperties(): Map<String, Any> {
    
        val hostName = try {
            InetAddress.getLocalHost().hostName + UUID.randomUUID().toString()
        } catch (e: UnknownHostException) {
            UUID.randomUUID().toString()
        }

        return hashMapOf(
                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to hosts,
                ConsumerConfig.GROUP_ID_CONFIG to hostName,
                ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to "true",
                ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest",
                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java
        )
    }
}

값으로 평가(evaluation)되는 `try-catch` 덕에 호스트 이름을 구하는 구문이 간결해진 것이 눈에 띈다. 컨슈머 속성이 담긴 `Map` 은 불변 Map 컨테이너를 생성하는 `hashMapOf` 함수를 통해 생성했으며, 중위 연산자 `to` 를 사용해 각 속성을 매핑한다.

 

컨슈머 작성

스프링 부트 어플리케이션에서 컨슈머(consumer)로 동작할 메서드를 `@KafkaListener` 어노테이션을 추가하여 작성한다.

@Component
class SampleTopicListener {

    val log = LoggerFactory.getLogger(SampleTopicListener::class.java)

    @KafkaListener(topics = ["sample-topic"])
    fun consume(@Payload data: String) {

        log.info("Message: $data")
    }
}

 

연동 테스트

'kafka-console-producer' 를 이용하여 커맨드라인을 통해 'sample-topic' 으로 메시지를 보내보자

$ kafka-console-producer --bootstrap-server localhost:9092 --topic sample-topic
> Hello Kafka!

 

아래와 같이 consume 한 메시지가 출력되면 성공이다.