본문 바로가기
Spring | Java | Kotlin

[Debezium] Kafka Connect Custom SMT로 JSONB 처리하기

by eFFx 2025. 4. 21.

저번 포스팅에 이어서 JSONB를 사용하는 경우 이를 처리 하는 방법에 대해서 포스팅한다.

 

일단은 자신이 사용하는 데이터의 구조를 파악해서 ElasticSearch Mapping을 작성해야한다.

 

일단 나는 귀찮으니 JSON to ElasticSearch Mapping 도구를 사용했다.(Nested를 처리 못해서 내가 직접 다 수정해주긴 했다.)

 

만약 먼저 생성된 인덱스가 있으면 삭제해주자

curl -X DELETE 'http://localhost:9200/<Index Name>'

 

그리고 새로운 인덱스를 생성 해주자.

{
  "settings": {
    // 만약 사용할 Analyzer가 있으면 설정 해주자.(Nori 등)
  },
  "mappings": {
    "properties": {
      // 작성한 매핑
    }
}

 

다음과 같이 Index 설정을 작성하고

curl -i -X PUT http://localhost:9200/<Index Name> \
          -H "Accept:application/json" \
          -H "Content-Type:application/json" \
          --data ./index.json

 

 

curl 또는 Postman으로 인덱스를 생성 해준다.

 

이제 SMT를 작성 해볼것이다.

IntelliJ를 사용해서 개발을 할 것이기에, New -> Project로 새로운 프로젝트를 파주자 (Gradle DSL은 필자가 Kotlin을 선호해서 선택했으나 다른것을 선택해도 된다.)

 

 

그리고 build.gradle.kts를 다음처럼 수정해준다.

plugins {
    id("java")
    id("com.github.johnrengelman.shadow") version "8.1.1"
}

group = "one.effx"
version = "1.0-SNAPSHOT"

repositories {
    mavenCentral()
}

dependencies {
    implementation("org.apache.kafka:connect-api:3.8.1")
    implementation("org.apache.kafka:connect-json:3.8.1")

    testImplementation(platform("org.junit:junit-bom:5.10.0"))
    testImplementation("org.junit.jupiter:junit-jupiter")
}

tasks.test {
    useJUnitPlatform()
}

tasks.shadowJar {
    archiveClassifier.set("")
    mergeServiceFiles()
    manifest {
        attributes(
            mapOf(
                "Implementation-Title" to "MyCustom SMT",
                "Implementation-Version" to version
            )
        )
    }
}

tasks.build {
    dependsOn(tasks.shadowJar)
}

 

만약 필요한 의존성이 있으면 추가로 작성해주자.

 

다음으로 ParseJsonField.java를 작성한다.

public class ParseJsonField<R extends ConnectRecord<R>> implements Transformation<R> {
    public static final String FIELDS_CONFIG = "fields";

    public static final ConfigDef CONFIG_DEF = new ConfigDef()
            .define(FIELDS_CONFIG, ConfigDef.Type.LIST, Collections.emptyList(), ConfigDef.Importance.HIGH, "List of fields to parse from JSON");

    public List<String> fields;
    private final ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public void configure(Map<String, ?> configs) {
        Object rawFields = configs.get(FIELDS_CONFIG);
        if (rawFields instanceof List) {
            fields = (List<String>) rawFields;
        } else if (rawFields instanceof String) {
            fields = Arrays.asList(((String) rawFields).split("\\s*,\\s*"));
        } else {
            System.err.println("Invalid type for 'fields' config: " + rawFields);
        }
    }

    @Override
    public R apply(R record) {
        Object value = record.value();

        if (value instanceof Map) {
            Map<String, Object> originalMap = (Map<String, Object>) value;
            Map<String, Object> updatedMap = new HashMap<>(originalMap);

            for (String field : fields) {
                Object raw = originalMap.get(field);
                if (raw instanceof String) {
                    try {
                        JsonNode node = objectMapper.readTree((String) raw);
                        Object parsed = objectMapper.convertValue(node, Map.class);
                        updatedMap.put(field, parsed);
                    } catch (Exception e) {
                        System.err.println("Failed to parse field '" + field + "': " + e.getMessage());
                    }
                }
            }

            return record.newRecord(
                    record.topic(), 
                    record.kafkaPartition(),
                    record.keySchema(), 
                    record.key(),
                    record.valueSchema(), 
                    updatedMap,
                    record.timestamp()
            );
        }

        return record;
    }

    @Override
    public ConfigDef config() {
        return CONFIG_DEF;
    }

    @Override
    public void close() {
    }
}

 

configure 메서드는 설정에서 어떤 필드를 파싱할지 입력받는 메서드이다.

그리고 apply 메서드는 Jackson ObjectMapper를 통해 String을 파싱하여 전달하는 메서드이다.

 

마지막으로 src/main/resources/META-INF/services/org.apache.kafka.connect.transforms.Transformation에 파일을 생성해준다.(슬래쉬는 실제로 폴더를 생성해야하고 .은 파일명이다!)

 

그리고 파일에 생성한 클래스의 패키지 경로를 입력한다. (필자의 경우 one.effx.kafka.transforms.ParseJsonField)

 

이렇게 한뒤 다음 명령을 실행한다.

./gradlew clean shadowJar

 

이렇게하면 하나의 파일로 build/libs/test-kafka-connector-1.0-SNAPSHOT.jar 같은 형식으로 파일이 생겼을 것이다.

 

이를 Kafka Connect Plugin에 추가해준다. (저번 포스팅에서 docker-compose 볼륨을 마운트 했으므로 그냥 추가해준다.)

그리고 Debezium 컨테이너를 재시작한다.

 

컨테이너 로그에서 오류 없이 정상적으로 로딩이 되는지 확인해주자.

2025-04-21 12:40:29 2025-04-21 03:40:29,236 INFO   ||  Added plugin 'one.effx.kafka.transforms.ParseJsonField'   [org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader]

 

 

그리고 이전 포스팅에서 작성한 elasticsearch-sink-connector Config를 수정한다.

 

 

{
  "name": "elasticsearch-sink-connector",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "tasks.max": "1",
    "topics": "<앞서 설정한 토픽>",
    "key.ignore": "false",
    "connection.url": "http://elasticsearch:9200",
    "type.name": "kafka",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "schema.ignore": "false",
    "transforms": "parsejson,key",
    "transforms.parsejson.type": "one.effx.kafka.transforms.ParseJsonField",
    "transforms.parsejson.fields": "<JSONB 필드 (콤마로 여러개 작성)>",
    "transforms.key.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
    "transforms.key.field": "id"
  }
}

 

추가된 부분은 다음과 같다.

  • transforms : parsejson을 추가해준다.(이름은 자유)
  • transfroms.parsejson.type : 아까 생성한 패키지 경로를 추가해준다.
  • transforms.parsejson.fields : 원하는 필드 명을 추가한다. (예 "member, raw_data")

 

이렇게 하고 기존 Connector를 지우고 새로 생성해줘야한다.

curl -X DELETE http://localhost:8083/connectors/elasticsearch-sink-connector
curl -i -X POST http://localhost:8083/connectors \
  -H "Accept:application/json" \
  -H "Content-Type:application/json" \
  --data kafka-elasticsearch-sink-connector.json

 

이제 다시 Postgres에 Update 또는 Insert를 해보면?

 

ElaticSearch 응답

 

정상적으로 JSONB 필드가 파싱이 된 것을 확인할 수 있다!

 

만약에 오류가 발생하면 Debezium의 로그를 확인해보고 그래도 안되면, ElasticSearch의 로그를 TRACE로 바꾸고 디버깅해보자.