diff --git a/core/src/test/java/com/alibaba/datax/core/EngineTest.java b/core/src/test/java/com/alibaba/datax/core/EngineTest.java index 3f36263b..47ce268e 100644 --- a/core/src/test/java/com/alibaba/datax/core/EngineTest.java +++ b/core/src/test/java/com/alibaba/datax/core/EngineTest.java @@ -1,9 +1,12 @@ package com.alibaba.datax.core; +import org.junit.Test; + public class EngineTest { - public static void main(String[] args) { + @Test + public void test() { System.out.println(System.getProperty("java.library.path")); // String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "/Users/yangzy/workspace/DataX/job/opentsdb2stream.json"}; String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "/Users/yangzy/workspace/DataX/job/opentsdb2tdengine.json"}; diff --git a/job/opentsdb2tdengine.json b/job/opentsdb2tdengine.json index cdf2277b..92c0f8e3 100644 --- a/job/opentsdb2tdengine.json +++ b/job/opentsdb2tdengine.json @@ -7,7 +7,7 @@ "parameter": { "endpoint": "http://192.168.1.180:4242", "column": [ - "weather.temperature" + "weather_temperature" ], "beginDateTime": "2021-01-01 00:00:00", "endDateTime": "2021-01-01 01:00:00" diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/JniConnection.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/JniConnection.java index 5512aaf9..a8739c54 100644 --- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/JniConnection.java +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/JniConnection.java @@ -6,10 +6,10 @@ public class JniConnection { private static final long JNI_NULL_POINTER = 0L; private static final int JNI_SUCCESSFUL = 0; - private static final String PROPERTY_KEY_CONFIG_DIR = "cfgdir"; - private static final String PROPERTY_KEY_LOCALE = "locale"; - private static final String PROPERTY_KEY_CHARSET = "charset"; - private static final String PROPERTY_KEY_TIME_ZONE = "timezone"; + public static final String PROPERTY_KEY_CONFIG_DIR = "cfgdir"; + public static final String PROPERTY_KEY_LOCALE = "locale"; + public static final String PROPERTY_KEY_CHARSET = "charset"; + public static final String PROPERTY_KEY_TIME_ZONE = "timezone"; private long conn; @@ -17,7 +17,7 @@ public class JniConnection { System.loadLibrary("taos"); } - public JniConnection(Properties props) { + public JniConnection(Properties props) throws Exception { if (this.conn != JNI_NULL_POINTER) { close(); this.conn = JNI_NULL_POINTER; @@ -27,19 +27,19 @@ public class JniConnection { String locale = props.getProperty(PROPERTY_KEY_LOCALE); if (setOptions(0, locale) < 0) { - throw new RuntimeException("Failed to set locale: " + locale + ". System default will be used."); + throw new Exception("Failed to set locale: " + locale + ". System default will be used."); } String charset = props.getProperty(PROPERTY_KEY_CHARSET); if (setOptions(1, charset) < 0) { - throw new RuntimeException("Failed to set charset: " + charset + ". System default will be used."); + throw new Exception("Failed to set charset: " + charset + ". System default will be used."); } String timezone = props.getProperty(PROPERTY_KEY_TIME_ZONE); if (setOptions(2, timezone) < 0) { - throw new RuntimeException("Failed to set timezone: " + timezone + ". System default will be used."); + throw new Exception("Failed to set timezone: " + timezone + ". System default will be used."); } } - public void open(String host, int port, String dbname, String user, String password) { + public void open(String host, int port, String dbname, String user, String password) throws Exception { if (this.conn != JNI_NULL_POINTER) { close(); this.conn = JNI_NULL_POINTER; @@ -48,25 +48,25 @@ public class JniConnection { this.conn = connectImp(host, port, dbname, user, password); if (this.conn == JNI_NULL_POINTER) { String errMsg = getErrMsgImp(0); - throw new RuntimeException(errMsg); + throw new Exception(errMsg); } } - public void insertOpentsdbJson(String json) { + public void insertOpentsdbJson(String json) throws Exception { if (this.conn == JNI_NULL_POINTER) { - throw new RuntimeException("JNI connection is NULL"); + throw new Exception("JNI connection is NULL"); } long code = insertOpentsdbJson(json, this.conn); if (code != JNI_SUCCESSFUL) { String errMsg = getErrMsgByCode(code); - throw new RuntimeException(errMsg); + throw new Exception(errMsg); } } - public void close() { + public void close() throws Exception { int code = this.closeConnectionImp(this.conn); if (code != 0) { - throw new RuntimeException("JNI closeConnection failed"); + throw new Exception("JNI closeConnection failed"); } this.conn = JNI_NULL_POINTER; } diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/TDengineWriter.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/TDengineWriter.java index ee080f54..28fa4ca5 100644 --- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/TDengineWriter.java +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/TDengineWriter.java @@ -7,12 +7,12 @@ import com.alibaba.datax.common.exception.DataXException; import com.alibaba.datax.common.plugin.RecordReceiver; import com.alibaba.datax.common.spi.Writer; import com.alibaba.datax.common.util.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import java.io.BufferedWriter; -import java.io.OutputStreamWriter; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.Properties; public class TDengineWriter extends Writer { @@ -22,6 +22,7 @@ public class TDengineWriter extends Writer { private static final String DBNAME = "dbname"; private static final String USER = "user"; private static final String PASSWORD = "password"; + private static final String PEER_PLUGIN_NAME = "peerPluginName"; public static class Job extends Writer.Job { @@ -30,6 +31,7 @@ public class TDengineWriter extends Writer { @Override public void init() { this.originalConfig = super.getPluginJobConf(); + this.originalConfig.set(PEER_PLUGIN_NAME, getPeerPluginName()); } @Override @@ -49,14 +51,14 @@ public class TDengineWriter extends Writer { } public static class Task extends Writer.Task { + private static final Logger LOG = LoggerFactory.getLogger(Job.class); + private static final String NEWLINE_FLAG = System.getProperty("line.separator", "\n"); private Configuration writerSliceConfig; - private String peerPluginName; @Override public void init() { this.writerSliceConfig = getPluginJobConf(); - this.peerPluginName = getPeerPluginName(); } @Override @@ -67,22 +69,57 @@ public class TDengineWriter extends Writer { @Override public void startWrite(RecordReceiver lineReceiver) { + String host = this.writerSliceConfig.getString(HOST); int port = this.writerSliceConfig.getInt(PORT); String dbname = this.writerSliceConfig.getString(DBNAME); String user = this.writerSliceConfig.getString(USER); String password = this.writerSliceConfig.getString(PASSWORD); - try { - BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(System.out, StandardCharsets.UTF_8)); + Properties properties = new Properties(); + String cfgdir = this.writerSliceConfig.getString(JniConnection.PROPERTY_KEY_CONFIG_DIR); + if (cfgdir != null && !cfgdir.isEmpty()) { + properties.setProperty(JniConnection.PROPERTY_KEY_CONFIG_DIR, cfgdir); + } + String timezone = this.writerSliceConfig.getString(JniConnection.PROPERTY_KEY_TIME_ZONE); + if (timezone != null && !timezone.isEmpty()) { + properties.setProperty(JniConnection.PROPERTY_KEY_TIME_ZONE, timezone); + } + String locale = this.writerSliceConfig.getString(JniConnection.PROPERTY_KEY_LOCALE); + if (locale != null && !locale.isEmpty()) { + properties.setProperty(JniConnection.PROPERTY_KEY_LOCALE, locale); + } + String charset = this.writerSliceConfig.getString(JniConnection.PROPERTY_KEY_CHARSET); + if (charset != null && !charset.isEmpty()) { + properties.setProperty(JniConnection.PROPERTY_KEY_CHARSET, charset); + } + String peerPluginName = this.writerSliceConfig.getString(PEER_PLUGIN_NAME); + if (peerPluginName.equals("opentsdbreader")) { + try { + JniConnection conn = new JniConnection(properties); + conn.open(host, port, dbname, user, password); + LOG.info("TDengine connection established, host: " + host + ", port: " + port + ", dbname: " + dbname + ", user: " + user); + writeOpentsdb(lineReceiver, conn); + conn.close(); + LOG.info("TDengine connection closed"); + } catch (Exception e) { + LOG.error(e.getMessage()); + e.printStackTrace(); + } + } + } + + private void writeOpentsdb(RecordReceiver lineReceiver, JniConnection conn) { + try { Record record; while ((record = lineReceiver.getFromReader()) != null) { - writer.write(recordToString(record)); + String jsonData = recordToString(record); + LOG.debug(">>> " + jsonData); + conn.insertOpentsdbJson(jsonData); } - writer.flush(); - } catch (Exception e) { + LOG.error("TDengineWriter ERROR: " + e.getMessage()); throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, e); } } @@ -92,7 +129,6 @@ public class TDengineWriter extends Writer { if (0 == recordLength) { return NEWLINE_FLAG; } - Column column; StringBuilder sb = new StringBuilder(); for (int i = 0; i < recordLength; i++) { @@ -101,7 +137,6 @@ public class TDengineWriter extends Writer { } sb.setLength(sb.length() - 1); sb.append(NEWLINE_FLAG); - return sb.toString(); } } diff --git a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/JniConnectionTest.java b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/JniConnectionTest.java index 603931ef..040cf34c 100644 --- a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/JniConnectionTest.java +++ b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/JniConnectionTest.java @@ -7,12 +7,12 @@ import java.util.Properties; public class JniConnectionTest { @Test - public void test() { + public void test() throws Exception { JniConnection connection = new JniConnection(new Properties()); connection.open("192.168.56.105", 6030, "test", "root", "taosdata"); - String json = "{\"metric\":\"weather.temperature\",\"timestamp\":1609430400000,\"value\":123,\"tags\":{\"location\":\"beijing\",\"id\":123}}"; + String json = "{\"metric\":\"weather_temperature\",\"timestamp\":1609430400000,\"value\":123,\"tags\":{\"location\":\"beijing\",\"id\":\"t123\"}}"; connection.insertOpentsdbJson(json); connection.close();