Welcome to WuJiGu Developer Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
541 views
in Technique[技术] by (71.8m points)

apache kafka - What kind of data got routed to a dead letter queue topic?

I Have implemented Dead Letter Queues error handling in Kafka. It works and the data are sent to DLQ topics. I am not understanding what types of data got routed in DLQ topics. This is my DLQ topics data

And this is the normal data that got sunk

1st picture is the data that got routed into DLQ Topics and the second one is the normal data that got sunk into databases. Does anyone have any idea how does that key got changed as I have used id as a key?

Here is my source and sink properties:

    "name": "jdbc_source_postgresql_analytics",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "connection.url": "jdbc:postgresql://192.168.5.40:5432/abc",
        "connection.user": "abc",
        "connection.password": "********",
        "topic.prefix": "test_",
        "mode": "timestamp+incrementing",
        "incrementing.column.name": "id",
        "timestamp.column.name": "updatedAt",
        "validate.non.null": true,
        "table.whitelist": "test",
        "key.converter": "org.apache.kafka.connect.converters.IntegerConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": false,
        "value.converter.schemas.enable": false,
        "catalog.pattern": "public",
        "transforms": "createKey,extractInt",
        "transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
        "transforms.createKey.fields": "id",
        "transforms.extractInt.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
        "transforms.extractInt.field": "id",
        "errors.tolerance": "all"

    }
}

sink properties: 
{
    "name": "es_sink_analytics",
    "config": {
        "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "type.name": "_doc",
        "key.converter.schemas.enable": "false",
        "topics": "TEST",
        "topic.index.map": "TEST:te_test",
        "value.converter.schemas.enable": "false",
        "connection.url": "http://192.168.10.40:9200",
        "connection.username": "******",
        "connection.password": "********",
        "key.ignore": "false",
        "errors.tolerance": "all",
        "errors.deadletterqueue.topic.name": "dlq-error-es",
        "errors.deadletterqueue.topic.replication.factor": "1",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter": "org.apache.kafka.connect.converters.IntegerConverter",
        "schema.ignore": "true",
        "error.tolerance":"all"
    }
}

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)
等待大神答复

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to WuJiGu Developer Q&A Community for programmer and developer-Open, Learning and Share
...