diff --git a/core/src/main/job/stream2tdengine.json b/core/src/main/job/stream2tdengine.json new file mode 100644 index 00000000..6af68323 --- /dev/null +++ b/core/src/main/job/stream2tdengine.json @@ -0,0 +1,56 @@ +{ + "job": { + "content": [ + { + "reader": { + "name": "streamreader", + "parameter": { + "column": [ + { + "random": "2021-01-01 00:00:00, 2021-01-01 23:59:59", + "type": "date" + }, + { + "random": "0, 10000", + "type": "long" + }, + { + "random": "0, 10", + "type": "string" + }, + { + "random": "0, 5", + "type": "bool" + }, + { + "random": "0, 10", + "type": "double" + }, + { + "random": "0, 10", + "type": "bytes" + } + ], + "sliceRecordCount": 100 + } + }, + "writer": { + "name": "tdenginewriter", + "parameter": { + "host": "192.168.56.105", + "port": 6030, + "dbname": "test", + "user": "root", + "password": "taosdata", + "batchSize": 1000 + } + } + } + ], + "setting": { + "speed": { + "channel": 1 + } + } + } +} \ No newline at end of file diff --git a/core/src/test/java/com/alibaba/datax/core/EngineTest.java b/core/src/test/java/com/alibaba/datax/core/EngineTest.java index bbc5bdc5..92c50a77 100644 --- a/core/src/test/java/com/alibaba/datax/core/EngineTest.java +++ b/core/src/test/java/com/alibaba/datax/core/EngineTest.java @@ -8,7 +8,8 @@ public class EngineTest { @Test public void test() { System.out.println(System.getProperty("java.library.path")); - String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/main/job/opentsdb2tdengine.json"}; +// String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/main/job/opentsdb2tdengine.json"}; + String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/main/job/stream2tdengine.json"}; System.setProperty("datax.home", "../target/datax/datax"); try { Engine.entry(params); diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/DataHandler.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/DataHandler.java new file mode 100644 index 00000000..94d1db30 --- /dev/null +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/DataHandler.java @@ -0,0 +1,10 @@ +package com.alibaba.datax.plugin.writer; + +import com.alibaba.datax.common.plugin.RecordReceiver; + +import java.util.Properties; + +public interface DataHandler { + + long handle(RecordReceiver lineReceiver, Properties properties); +} diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/DataHandlerFactory.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/DataHandlerFactory.java new file mode 100644 index 00000000..a488e7d5 --- /dev/null +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/DataHandlerFactory.java @@ -0,0 +1,10 @@ +package com.alibaba.datax.plugin.writer; + +public class DataHandlerFactory { + + public static DataHandler build(String peerPluginName) { + if (peerPluginName.equals("opentsdbreader")) + return new OpentsdbDataHandler(); + return new DefaultDataHandler(); + } +} diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/DefaultDataHandler.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/DefaultDataHandler.java new file mode 100644 index 00000000..a1d52d75 --- /dev/null +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/DefaultDataHandler.java @@ -0,0 +1,34 @@ +package com.alibaba.datax.plugin.writer; + +import com.alibaba.datax.common.element.Column; +import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.common.plugin.RecordReceiver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Properties; + +public class DefaultDataHandler implements DataHandler { + private static final Logger LOG = LoggerFactory.getLogger(DefaultDataHandler.class); + + @Override + public long handle(RecordReceiver lineReceiver, Properties properties) { + long count = 0; + Record record; + while ((record = lineReceiver.getFromReader()) != null) { + + int recordLength = record.getColumnNumber(); + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < recordLength; i++) { + Column column = record.getColumn(i); + sb.append(column.asString()).append("\t"); + } + sb.setLength(sb.length() - 1); + LOG.debug(sb.toString()); + + count++; + } + return count; + } + +} \ No newline at end of file diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/OpentsdbDataHandler.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/OpentsdbDataHandler.java new file mode 100644 index 00000000..599e5f3e --- /dev/null +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/OpentsdbDataHandler.java @@ -0,0 +1,98 @@ +package com.alibaba.datax.plugin.writer; + +import com.alibaba.datax.common.element.Column; +import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.plugin.RecordReceiver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Properties; + +public class OpentsdbDataHandler implements DataHandler { + private static final Logger LOG = LoggerFactory.getLogger(OpentsdbDataHandler.class); + private static final String DEFAULT_BATCH_SIZE = "1"; + + @Override + public long handle(RecordReceiver lineReceiver, Properties properties) { + // opentsdb json protocol use JNI and schemaless API to write + String host = properties.getProperty(Key.HOST); + int port = Integer.parseInt(properties.getProperty(Key.PORT)); + String dbname = properties.getProperty(Key.DBNAME); + String user = properties.getProperty(Key.USER); + String password = properties.getProperty(Key.PASSWORD); + + JniConnection conn = null; + long count = 0; + try { + conn = new JniConnection(properties); + conn.open(host, port, dbname, user, password); + LOG.info("TDengine connection established, host: " + host + ", port: " + port + ", dbname: " + dbname + ", user: " + user); + int batchSize = Integer.parseInt(properties.getProperty(Key.BATCH_SIZE, DEFAULT_BATCH_SIZE)); + count = writeOpentsdb(lineReceiver, conn, batchSize); + } catch (Exception e) { + LOG.error(e.getMessage()); + e.printStackTrace(); + } finally { + try { + if (conn != null) + conn.close(); + } catch (Exception e) { + e.printStackTrace(); + } + LOG.info("TDengine connection closed"); + } + + return count; + } + + private long writeOpentsdb(RecordReceiver lineReceiver, JniConnection conn, int batchSize) { + long recordIndex = 1; + try { + Record record; + StringBuilder sb = new StringBuilder(); + while ((record = lineReceiver.getFromReader()) != null) { + if (batchSize == 1) { + String jsonData = recordToString(record); + LOG.debug(">>> " + jsonData); + conn.insertOpentsdbJson(jsonData); + } else if (recordIndex % batchSize == 1) { + sb.append("[").append(recordToString(record)).append(","); + } else if (recordIndex % batchSize == 0) { + sb.append(recordToString(record)).append("]"); + String jsonData = sb.toString(); + LOG.debug(">>> " + jsonData); + conn.insertOpentsdbJson(jsonData); + sb.delete(0, sb.length()); + } else { + sb.append(recordToString(record)).append(","); + } + recordIndex++; + } + if (sb.length() != 0 && sb.charAt(0) == '[') { + String jsonData = sb.deleteCharAt(sb.length() - 1).append("]").toString(); + LOG.debug(">>> " + jsonData); + conn.insertOpentsdbJson(jsonData); + } + } catch (Exception e) { + LOG.error("TDengineWriter ERROR: " + e.getMessage()); + throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, e); + } + return recordIndex - 1; + } + + private String recordToString(Record record) { + int recordLength = record.getColumnNumber(); + if (0 == recordLength) { + return ""; + } + Column column; + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < recordLength; i++) { + column = record.getColumn(i); + sb.append(column.asString()).append("\t"); + } + sb.setLength(sb.length() - 1); + return sb.toString(); + } +} diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/TDengineWriter.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/TDengineWriter.java index 9dc42d9d..84600802 100644 --- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/TDengineWriter.java +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/TDengineWriter.java @@ -1,21 +1,20 @@ package com.alibaba.datax.plugin.writer; -import com.alibaba.datax.common.element.Column; -import com.alibaba.datax.common.element.Record; -import com.alibaba.datax.common.exception.DataXException; import com.alibaba.datax.common.plugin.RecordReceiver; import com.alibaba.datax.common.spi.Writer; import com.alibaba.datax.common.util.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.*; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.Set; public class TDengineWriter extends Writer { private static final String PEER_PLUGIN_NAME = "peerPluginName"; - private static final String DEFAULT_BATCH_SIZE = "1"; public static class Job extends Writer.Job { @@ -67,85 +66,11 @@ public class TDengineWriter extends Writer { } String peerPluginName = this.writerSliceConfig.getString(PEER_PLUGIN_NAME); - if (peerPluginName.equals("opentsdbreader")) { - // opentsdb json protocol use JNI and schemaless API to write - String host = properties.getProperty(Key.HOST); - int port = Integer.parseInt(properties.getProperty(Key.PORT)); - String dbname = properties.getProperty(Key.DBNAME); - String user = properties.getProperty(Key.USER); - String password = properties.getProperty(Key.PASSWORD); - - JniConnection conn = null; - try { - conn = new JniConnection(properties); - conn.open(host, port, dbname, user, password); - LOG.info("TDengine connection established, host: " + host + ", port: " + port + ", dbname: " + dbname + ", user: " + user); - int batchSize = Integer.parseInt(properties.getProperty(Key.BATCH_SIZE, DEFAULT_BATCH_SIZE)); - writeOpentsdb(lineReceiver, conn, batchSize); - } catch (Exception e) { - LOG.error(e.getMessage()); - e.printStackTrace(); - } finally { - try { - if (conn != null) - conn.close(); - } catch (Exception e) { - e.printStackTrace(); - } - LOG.info("TDengine connection closed"); - } - } else { - // other - } + LOG.debug("start to handle record from: " + peerPluginName); + DataHandler handler = DataHandlerFactory.build(peerPluginName); + long records = handler.handle(lineReceiver, properties); + LOG.debug("handle data finished, records: " + records); } - private void writeOpentsdb(RecordReceiver lineReceiver, JniConnection conn, int batchSize) { - try { - Record record; - StringBuilder sb = new StringBuilder(); - long recordIndex = 1; - while ((record = lineReceiver.getFromReader()) != null) { - if (batchSize == 1) { - String jsonData = recordToString(record); - LOG.debug(">>> " + jsonData); - conn.insertOpentsdbJson(jsonData); - } else if (recordIndex % batchSize == 1) { - sb.append("[").append(recordToString(record)).append(","); - } else if (recordIndex % batchSize == 0) { - sb.append(recordToString(record)).append("]"); - String jsonData = sb.toString(); - LOG.debug(">>> " + jsonData); - conn.insertOpentsdbJson(jsonData); - sb.delete(0, sb.length()); - } else { - sb.append(recordToString(record)).append(","); - } - recordIndex++; - } - if (sb.length() != 0 && sb.charAt(0) == '[') { - String jsonData = sb.deleteCharAt(sb.length() - 1).append("]").toString(); - LOG.debug(">>> " + jsonData); - conn.insertOpentsdbJson(jsonData); - } - } catch (Exception e) { - LOG.error("TDengineWriter ERROR: " + e.getMessage()); - throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, e); - } - } - - private String recordToString(Record record) { - int recordLength = record.getColumnNumber(); - if (0 == recordLength) { - return ""; - } - Column column; - StringBuilder sb = new StringBuilder(); - for (int i = 0; i < recordLength; i++) { - column = record.getColumn(i); - sb.append(column.asString()).append("\t"); - } - sb.setLength(sb.length() - 1); - return sb.toString(); - } } }