1. create mysql table like

CREATE TABLE `test` (
`a` tinyint(4) NOT NULL DEFAULT '0',
`b` decimal(12,0) DEFAULT NULL,
`c` decimal(5,0) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci;

 

SRE实战 互联网时代守护先锋,助力企业售后服务体系运筹帷幄!一键直达领取阿里云限量特价优惠。

2. start kafka connect using Debezium mysql plugin

{
  "name": "inventory-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "localhost",
    "database.port": "3306",
    "database.user": "root",
    "database.password": "root",
    "database.server.id": "223344",
    "database.server.name": "localhost",
    "database.whitelist": "inventory",
    "table.whitelist":"inventory.test",
    "database.history.kafka.bootstrap.servers": "localhost:9092",
    "database.history.kafka.topic": "schema-changes.inventory",
    "include.schema.changes":"false",
    "transforms": "extractField",
    "transforms.extractField.type": "com.centchain.kafka.connect.mysql.DebeziumMysql$Value",
    "transforms.extractField.field": "after"
  }
}

 

3. get errors:

 

[2019-05-09 15:23:10,310] INFO WorkerSourceTask{id=cashier-201905091402-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:414)
[2019-05-09 15:23:10,311] ERROR WorkerSourceTask{id=cashier-201905091402-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:269)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:293)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:228)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.avro.AvroRuntimeException: Unknown datum class: class java.lang.Short
    at org.apache.avro.util.internal.JacksonUtils.toJson(JacksonUtils.java:87)
    at org.apache.avro.util.internal.JacksonUtils.toJsonNode(JacksonUtils.java:48)
    at org.apache.avro.Schema$Field.<init>(Schema.java:423)
    at org.apache.avro.Schema$Field.<init>(Schema.java:415)
    at io.confluent.connect.avro.AvroData.addAvroRecordField(AvroData.java:964)
    at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:847)
    at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroDa」

 

4. fix

file location: avro-release-1.8.1/lang/java/avro/src/main/java/org/apache/avro/util/internal/JacksonUtils.java

avro-1.8.1 serialize BigDecimal and Short error fix. 随笔 第1张
  1 /**
  2  * Licensed to the Apache Software Foundation (ASF) under one
  3  * or more contributor license agreements.  See the NOTICE file
  4  * distributed with this work for additional information
  5  * regarding copyright ownership.  The ASF licenses this file
  6  * to you under the Apache License, Version 2.0 (the
  7  * "License"); you may not use this file except in compliance
  8  * with the License.  You may obtain a copy of the License at
  9  *
 10  *     http://www.apache.org/licenses/LICENSE-2.0
 11  *
 12  * Unless required by applicable law or agreed to in writing, software
 13  * distributed under the License is distributed on an "AS IS" BASIS,
 14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  * See the License for the specific language governing permissions and
 16  * limitations under the License.
 17  */
 18 package org.apache.avro.util.internal;
 19 
 20 import java.io.IOException;
 21 import java.io.UnsupportedEncodingException;
 22 import java.math.BigDecimal;
 23 import java.util.ArrayList;
 24 import java.util.Collection;
 25 import java.util.Iterator;
 26 import java.util.LinkedHashMap;
 27 import java.util.List;
 28 import java.util.Map;
 29 import org.apache.avro.AvroRuntimeException;
 30 import org.apache.avro.JsonProperties;
 31 import org.apache.avro.Schema;
 32 import org.codehaus.jackson.JsonGenerator;
 33 import org.codehaus.jackson.JsonNode;
 34 import org.codehaus.jackson.map.ObjectMapper;
 35 import org.codehaus.jackson.util.TokenBuffer;
 36 
 37 public class JacksonUtils {
 38   static final String BYTES_CHARSET = "ISO-8859-1";
 39 
 40   private JacksonUtils() {
 41   }
 42 
 43   public static JsonNode toJsonNode(Object datum) {
 44     if (datum == null) {
 45       return null;
 46     }
 47     try {
 48       TokenBuffer generator = new TokenBuffer(new ObjectMapper());
 49       toJson(datum, generator);
 50       return new ObjectMapper().readTree(generator.asParser());
 51     } catch (IOException e) {
 52       throw new AvroRuntimeException(e);
 53     }
 54   }
 55 
 56   @SuppressWarnings(value="unchecked")
 57   static void toJson(Object datum, JsonGenerator generator) throws IOException {
 58     if (datum == JsonProperties.NULL_VALUE) { // null
 59       generator.writeNull();
 60     } else if (datum instanceof Map) { // record, map
 61       generator.writeStartObject();
 62       for (Map.Entry<Object,Object> entry : ((Map<Object,Object>) datum).entrySet()) {
 63         generator.writeFieldName(entry.getKey().toString());
 64         toJson(entry.getValue(), generator);
 65       }
 66       generator.writeEndObject();
 67     } else if (datum instanceof Collection) { // array
 68       generator.writeStartArray();
 69       for (Object element : (Collection<?>) datum) {
 70         toJson(element, generator);
 71       }
 72       generator.writeEndArray();
 73     } else if (datum instanceof byte[]) { // bytes, fixed
 74       generator.writeString(new String((byte[]) datum, BYTES_CHARSET));
 75     } else if (datum instanceof CharSequence || datum instanceof Enum<?>) { // string, enum
 76       generator.writeString(datum.toString());
 77     } else if (datum instanceof Double) { // double
 78       generator.writeNumber((Double) datum);
 79     } else if (datum instanceof Float) { // float
 80       generator.writeNumber((Float) datum);
 81     } else if (datum instanceof Long) { // long
 82       generator.writeNumber((Long) datum);
 83     } else if (datum instanceof Integer) { // int
 84       generator.writeNumber((Integer) datum);
 85     }else if ( datum instanceof  Short) {  // short
 86       generator.writeNumber(new Integer(datum.toString()));
 87     }else if (datum instanceof Boolean) { // boolean
 88       generator.writeBoolean((Boolean) datum);
 89     }
 90     else if (datum instanceof BigDecimal){
 91       generator.writeNumber((BigDecimal) datum);
 92     } else {
 93       throw new AvroRuntimeException("Unknown datum class: " + datum.getClass());
 94     }
 95   }
 96 
 97   public static Object toObject(JsonNode jsonNode) {
 98     return toObject(jsonNode, null);
 99   }
100 
101   public static Object toObject(JsonNode jsonNode, Schema schema) {
102     if (schema != null && schema.getType().equals(Schema.Type.UNION)) {
103       return toObject(jsonNode, schema.getTypes().get(0));
104     }
105     if (jsonNode == null) {
106       return null;
107     } else if (jsonNode.isNull()) {
108       return JsonProperties.NULL_VALUE;
109     } else if (jsonNode.isBoolean()) {
110       return jsonNode.asBoolean();
111     } else if (jsonNode.isInt()) {
112       if (schema == null || schema.getType().equals(Schema.Type.INT)) {
113         return jsonNode.asInt();
114       } else if (schema.getType().equals(Schema.Type.LONG)) {
115         return jsonNode.asLong();
116       }
117     }else if (jsonNode.isBigDecimal()){
118       return jsonNode.asDouble();
119     }else if (jsonNode.isLong()) {
120       return jsonNode.asLong();
121     } else if (jsonNode.isDouble()) {
122       if (schema == null || schema.getType().equals(Schema.Type.DOUBLE)) {
123         return jsonNode.asDouble();
124       } else if (schema.getType().equals(Schema.Type.FLOAT)) {
125         return (float) jsonNode.asDouble();
126       }
127     } else if (jsonNode.isTextual()) {
128       if (schema == null || schema.getType().equals(Schema.Type.STRING) ||
129           schema.getType().equals(Schema.Type.ENUM)) {
130         return jsonNode.asText();
131       } else if (schema.getType().equals(Schema.Type.BYTES)) {
132         try {
133           return jsonNode.getTextValue().getBytes(BYTES_CHARSET);
134         } catch (UnsupportedEncodingException e) {
135           throw new AvroRuntimeException(e);
136         }
137       }
138     } else if (jsonNode.isArray()) {
139       List l = new ArrayList();
140       for (JsonNode node : jsonNode) {
141         l.add(toObject(node, schema == null ? null : schema.getElementType()));
142       }
143       return l;
144     } else if (jsonNode.isObject()) {
145       Map m = new LinkedHashMap();
146       for (Iterator<String> it = jsonNode.getFieldNames(); it.hasNext(); ) {
147         String key = it.next();
148         Schema s = null;
149         if (schema == null) {
150           s = null;
151         } else if (schema.getType().equals(Schema.Type.MAP)) {
152           s = schema.getValueType();
153         } else if (schema.getType().equals(Schema.Type.RECORD)) {
154           s = schema.getField(key).schema();
155         }
156         Object value = toObject(jsonNode.get(key), s);
157         m.put(key, value);
158       }
159       return m;
160     }
161     return null;
162   }
163 }
View Code

 The key is in

line 85-86 which fix error for short

line 90-91,117-118  which fix error for BigDecimal

 

5. result:

5.1  mysql -> kafka

lenmom@M1701:~/workspace/software/confluent-community-5.1.0-2.11$ bin/kafka-avro-console-consumer --bootstrap-server 127.0.0.1:9092 --from-beginning   --topic localhost.a.test
{"a":1,"b":{"bytes":"\u0001"},"c":{"bytes":"\u0001"},"operation_type":"c","pt_log_d":"20190513","last_update_timestamp":1557676877029}
{"a":1,"b":{"bytes":"\u0001"},"c":{"bytes":"\u0002"},"operation_type":"c","pt_log_d":"20190513","last_update_timestamp":1557676877029}
{"a":1,"b":{"bytes":"\u0001"},"c":{"bytes":"\u0003"},"operation_type":"c","pt_log_d":"20190513","last_update_timestamp":1557676877029}
{"a":1,"b":{"bytes":"\u0001"},"c":{"bytes":"\u0004"},"operation_type":"c","pt_log_d":"20190513","last_update_timestamp":1557676877029}

5.2 kafka-hive

command config for connector:

{
  "name": "hive-sink",
  "config": {
    "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
    "tasks.max": "1",
    "topics": "localhost.a.test",
    "hdfs.url": "hdfs://127.0.0.1:9000/",
    "logs.dir": "/logs",
    "topics.dir": "/inventory/",
    "hadoop.conf.dir": "/home/lenmom/workspace/software/hadoop-2.7.3/etc/hadoop/",
    "flush.size": "1",
    "rotate.interval.ms": "5000",
    "hive.integration": true,
    "hive.database": "inventory",
    "partitioner.class":"io.confluent.connect.hdfs.partitioner.FieldPartitioner",
    "partition.field.name":"pt_log_d",
    "hive.metastore.uris": "thrift://127.0.0.1:9083",
    "schema.compatibility": "BACKWARD"
  }
}

result:

hive> select * from localhost_a_test;
OK
1       1       1       c       20190513        2019-05-13 00:01:17.029
1       1       2       c       20190513        2019-05-13 00:01:17.029
1       1       3       c       20190513        2019-05-13 00:01:17.029
1       1       4       c       20190513        2019-05-13 00:01:17.029
Time taken: 0.168 seconds, Fetched: 4 row(s)

 

扫码关注我们
微信号:SRE实战
拒绝背锅 运筹帷幄