avro-1.8.1 serialize BigDecimal and Short error fix.
1. create mysql table like
CREATE TABLE `test` ( `a` tinyint(4) NOT NULL DEFAULT '0', `b` decimal(12,0) DEFAULT NULL, `c` decimal(5,0) DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci;
SRE实战 互联网时代守护先锋,助力企业售后服务体系运筹帷幄!一键直达领取阿里云限量特价优惠。
2. start kafka connect using Debezium mysql plugin
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "localhost",
"database.port": "3306",
"database.user": "root",
"database.password": "root",
"database.server.id": "223344",
"database.server.name": "localhost",
"database.whitelist": "inventory",
"table.whitelist":"inventory.test",
"database.history.kafka.bootstrap.servers": "localhost:9092",
"database.history.kafka.topic": "schema-changes.inventory",
"include.schema.changes":"false",
"transforms": "extractField",
"transforms.extractField.type": "com.centchain.kafka.connect.mysql.DebeziumMysql$Value",
"transforms.extractField.field": "after"
}
}
3. get errors:
[2019-05-09 15:23:10,310] INFO WorkerSourceTask{id=cashier-201905091402-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:414) [2019-05-09 15:23:10,311] ERROR WorkerSourceTask{id=cashier-201905091402-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177) org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104) at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:269) at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:293) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:228) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.avro.AvroRuntimeException: Unknown datum class: class java.lang.Short at org.apache.avro.util.internal.JacksonUtils.toJson(JacksonUtils.java:87) at org.apache.avro.util.internal.JacksonUtils.toJsonNode(JacksonUtils.java:48) at org.apache.avro.Schema$Field.<init>(Schema.java:423) at org.apache.avro.Schema$Field.<init>(Schema.java:415) at io.confluent.connect.avro.AvroData.addAvroRecordField(AvroData.java:964) at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:847) at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroDa」
4. fix
file location: avro-release-1.8.1/lang/java/avro/src/main/java/org/apache/avro/util/internal/JacksonUtils.java

1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.avro.util.internal; 19 20 import java.io.IOException; 21 import java.io.UnsupportedEncodingException; 22 import java.math.BigDecimal; 23 import java.util.ArrayList; 24 import java.util.Collection; 25 import java.util.Iterator; 26 import java.util.LinkedHashMap; 27 import java.util.List; 28 import java.util.Map; 29 import org.apache.avro.AvroRuntimeException; 30 import org.apache.avro.JsonProperties; 31 import org.apache.avro.Schema; 32 import org.codehaus.jackson.JsonGenerator; 33 import org.codehaus.jackson.JsonNode; 34 import org.codehaus.jackson.map.ObjectMapper; 35 import org.codehaus.jackson.util.TokenBuffer; 36 37 public class JacksonUtils { 38 static final String BYTES_CHARSET = "ISO-8859-1"; 39 40 private JacksonUtils() { 41 } 42 43 public static JsonNode toJsonNode(Object datum) { 44 if (datum == null) { 45 return null; 46 } 47 try { 48 TokenBuffer generator = new TokenBuffer(new ObjectMapper()); 49 toJson(datum, generator); 50 return new ObjectMapper().readTree(generator.asParser()); 51 } catch (IOException e) { 52 throw new AvroRuntimeException(e); 53 } 54 } 55 56 @SuppressWarnings(value="unchecked") 57 static void toJson(Object datum, JsonGenerator generator) throws IOException { 58 if (datum == JsonProperties.NULL_VALUE) { // null 59 generator.writeNull(); 60 } else if (datum instanceof Map) { // record, map 61 generator.writeStartObject(); 62 for (Map.Entry<Object,Object> entry : ((Map<Object,Object>) datum).entrySet()) { 63 generator.writeFieldName(entry.getKey().toString()); 64 toJson(entry.getValue(), generator); 65 } 66 generator.writeEndObject(); 67 } else if (datum instanceof Collection) { // array 68 generator.writeStartArray(); 69 for (Object element : (Collection<?>) datum) { 70 toJson(element, generator); 71 } 72 generator.writeEndArray(); 73 } else if (datum instanceof byte[]) { // bytes, fixed 74 generator.writeString(new String((byte[]) datum, BYTES_CHARSET)); 75 } else if (datum instanceof CharSequence || datum instanceof Enum<?>) { // string, enum 76 generator.writeString(datum.toString()); 77 } else if (datum instanceof Double) { // double 78 generator.writeNumber((Double) datum); 79 } else if (datum instanceof Float) { // float 80 generator.writeNumber((Float) datum); 81 } else if (datum instanceof Long) { // long 82 generator.writeNumber((Long) datum); 83 } else if (datum instanceof Integer) { // int 84 generator.writeNumber((Integer) datum); 85 }else if ( datum instanceof Short) { // short 86 generator.writeNumber(new Integer(datum.toString())); 87 }else if (datum instanceof Boolean) { // boolean 88 generator.writeBoolean((Boolean) datum); 89 } 90 else if (datum instanceof BigDecimal){ 91 generator.writeNumber((BigDecimal) datum); 92 } else { 93 throw new AvroRuntimeException("Unknown datum class: " + datum.getClass()); 94 } 95 } 96 97 public static Object toObject(JsonNode jsonNode) { 98 return toObject(jsonNode, null); 99 } 100 101 public static Object toObject(JsonNode jsonNode, Schema schema) { 102 if (schema != null && schema.getType().equals(Schema.Type.UNION)) { 103 return toObject(jsonNode, schema.getTypes().get(0)); 104 } 105 if (jsonNode == null) { 106 return null; 107 } else if (jsonNode.isNull()) { 108 return JsonProperties.NULL_VALUE; 109 } else if (jsonNode.isBoolean()) { 110 return jsonNode.asBoolean(); 111 } else if (jsonNode.isInt()) { 112 if (schema == null || schema.getType().equals(Schema.Type.INT)) { 113 return jsonNode.asInt(); 114 } else if (schema.getType().equals(Schema.Type.LONG)) { 115 return jsonNode.asLong(); 116 } 117 }else if (jsonNode.isBigDecimal()){ 118 return jsonNode.asDouble(); 119 }else if (jsonNode.isLong()) { 120 return jsonNode.asLong(); 121 } else if (jsonNode.isDouble()) { 122 if (schema == null || schema.getType().equals(Schema.Type.DOUBLE)) { 123 return jsonNode.asDouble(); 124 } else if (schema.getType().equals(Schema.Type.FLOAT)) { 125 return (float) jsonNode.asDouble(); 126 } 127 } else if (jsonNode.isTextual()) { 128 if (schema == null || schema.getType().equals(Schema.Type.STRING) || 129 schema.getType().equals(Schema.Type.ENUM)) { 130 return jsonNode.asText(); 131 } else if (schema.getType().equals(Schema.Type.BYTES)) { 132 try { 133 return jsonNode.getTextValue().getBytes(BYTES_CHARSET); 134 } catch (UnsupportedEncodingException e) { 135 throw new AvroRuntimeException(e); 136 } 137 } 138 } else if (jsonNode.isArray()) { 139 List l = new ArrayList(); 140 for (JsonNode node : jsonNode) { 141 l.add(toObject(node, schema == null ? null : schema.getElementType())); 142 } 143 return l; 144 } else if (jsonNode.isObject()) { 145 Map m = new LinkedHashMap(); 146 for (Iterator<String> it = jsonNode.getFieldNames(); it.hasNext(); ) { 147 String key = it.next(); 148 Schema s = null; 149 if (schema == null) { 150 s = null; 151 } else if (schema.getType().equals(Schema.Type.MAP)) { 152 s = schema.getValueType(); 153 } else if (schema.getType().equals(Schema.Type.RECORD)) { 154 s = schema.getField(key).schema(); 155 } 156 Object value = toObject(jsonNode.get(key), s); 157 m.put(key, value); 158 } 159 return m; 160 } 161 return null; 162 } 163 }View Code
The key is in
line 85-86 which fix error for short
line 90-91,117-118 which fix error for BigDecimal
5. result:
5.1 mysql -> kafka
lenmom@M1701:~/workspace/software/confluent-community-5.1.0-2.11$ bin/kafka-avro-console-consumer --bootstrap-server 127.0.0.1:9092 --from-beginning --topic localhost.a.test {"a":1,"b":{"bytes":"\u0001"},"c":{"bytes":"\u0001"},"operation_type":"c","pt_log_d":"20190513","last_update_timestamp":1557676877029} {"a":1,"b":{"bytes":"\u0001"},"c":{"bytes":"\u0002"},"operation_type":"c","pt_log_d":"20190513","last_update_timestamp":1557676877029} {"a":1,"b":{"bytes":"\u0001"},"c":{"bytes":"\u0003"},"operation_type":"c","pt_log_d":"20190513","last_update_timestamp":1557676877029} {"a":1,"b":{"bytes":"\u0001"},"c":{"bytes":"\u0004"},"operation_type":"c","pt_log_d":"20190513","last_update_timestamp":1557676877029}
5.2 kafka-hive
command config for connector:
{ "name": "hive-sink", "config": { "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector", "tasks.max": "1", "topics": "localhost.a.test", "hdfs.url": "hdfs://127.0.0.1:9000/", "logs.dir": "/logs", "topics.dir": "/inventory/", "hadoop.conf.dir": "/home/lenmom/workspace/software/hadoop-2.7.3/etc/hadoop/", "flush.size": "1", "rotate.interval.ms": "5000", "hive.integration": true, "hive.database": "inventory", "partitioner.class":"io.confluent.connect.hdfs.partitioner.FieldPartitioner", "partition.field.name":"pt_log_d", "hive.metastore.uris": "thrift://127.0.0.1:9083", "schema.compatibility": "BACKWARD" } }
result:
hive> select * from localhost_a_test; OK 1 1 1 c 20190513 2019-05-13 00:01:17.029 1 1 2 c 20190513 2019-05-13 00:01:17.029 1 1 3 c 20190513 2019-05-13 00:01:17.029 1 1 4 c 20190513 2019-05-13 00:01:17.029 Time taken: 0.168 seconds, Fetched: 4 row(s)

更多精彩