diff --git a/core/src/test/java/com/alibaba/datax/core/EngineTest.java b/core/src/test/java/com/alibaba/datax/core/EngineTest.java new file mode 100644 index 00000000..3f36263b --- /dev/null +++ b/core/src/test/java/com/alibaba/datax/core/EngineTest.java @@ -0,0 +1,18 @@ +package com.alibaba.datax.core; + + +public class EngineTest { + + public static void main(String[] args) { + System.out.println(System.getProperty("java.library.path")); +// String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "/Users/yangzy/workspace/DataX/job/opentsdb2stream.json"}; + String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "/Users/yangzy/workspace/DataX/job/opentsdb2tdengine.json"}; + System.setProperty("datax.home", "/Users/yangzy/workspace/DataX/target/datax/datax"); + try { + Engine.entry(params); + } catch (Throwable e) { + e.printStackTrace(); + } + } + +} \ No newline at end of file diff --git a/job/opentsdb2stream.json b/job/opentsdb2stream.json new file mode 100644 index 00000000..1ea43204 --- /dev/null +++ b/job/opentsdb2stream.json @@ -0,0 +1,31 @@ +{ + "job": { + "content": [ + { + "reader": { + "name": "opentsdbreader", + "parameter": { + "endpoint": "http://192.168.1.180:4242", + "column": [ + "weather.temperature" + ], + "beginDateTime": "2021-01-01 00:00:00", + "endDateTime": "2021-01-01 01:00:00" + } + }, + "writer": { + "name": "streamwriter", + "parameter": { + "encoding": "UTF-8", + "print": true + } + } + } + ], + "setting": { + "speed": { + "channel": 1 + } + } + } +} \ No newline at end of file diff --git a/job/opentsdb2tdengine.json b/job/opentsdb2tdengine.json new file mode 100644 index 00000000..cdf2277b --- /dev/null +++ b/job/opentsdb2tdengine.json @@ -0,0 +1,34 @@ +{ + "job": { + "content": [ + { + "reader": { + "name": "opentsdbreader", + "parameter": { + "endpoint": "http://192.168.1.180:4242", + "column": [ + "weather.temperature" + ], + "beginDateTime": "2021-01-01 00:00:00", + "endDateTime": "2021-01-01 01:00:00" + } + }, + "writer": { + "name": "tdenginewriter", + "parameter": { + "host": "192.168.56.105", + "port": 6030, + "db": "test", + "user": "root", + "password": "taosdata" + } + } + } + ], + "setting": { + "speed": { + "channel": 1 + } + } + } +} \ No newline at end of file diff --git a/package.xml b/package.xml index 882dd23b..a93e945c 100755 --- a/package.xml +++ b/package.xml @@ -238,6 +238,13 @@ datax + + tdenginewriter/target/datax/ + + **/*.* + + datax + otswriter/target/datax/ diff --git a/pom.xml b/pom.xml index 3bd75a31..cb635ad3 100644 --- a/pom.xml +++ b/pom.xml @@ -47,66 +47,68 @@ transformer - mysqlreader - drdsreader - sqlserverreader - postgresqlreader - kingbaseesreader - oraclereader + + + + + + odpsreader - otsreader - otsstreamreader + + txtfilereader - hdfsreader + streamreader - ossreader - ftpreader - mongodbreader + + + rdbmsreader - hbase11xreader - hbase094xreader - tsdbreader + + + opentsdbreader - cassandrareader - gdbreader - oceanbasev10reader + + + - mysqlwriter - drdswriter + + odpswriter txtfilewriter - ftpwriter - hdfswriter + + streamwriter - otswriter - oraclewriter - sqlserverwriter - postgresqlwriter - kingbaseeswriter - osswriter - mongodbwriter + + + + + + + adswriter - ocswriter + rdbmswriter - hbase11xwriter - hbase094xwriter - hbase11xsqlwriter - hbase11xsqlreader - elasticsearchwriter - tsdbwriter - adbpgwriter - gdbwriter - cassandrawriter - clickhousewriter - oscarwriter - oceanbasev10writer + + + + + + + + + + + + + plugin-rdbms-util plugin-unstructured-storage-util - hbase20xsqlreader - hbase20xsqlwriter - kuduwriter + + + + tdenginewriter diff --git a/tdenginewriter/pom.xml b/tdenginewriter/pom.xml new file mode 100644 index 00000000..7cef9ac2 --- /dev/null +++ b/tdenginewriter/pom.xml @@ -0,0 +1,75 @@ + + + + datax-all + com.alibaba.datax + 0.0.1-SNAPSHOT + + 4.0.0 + + com.alibaba.datax.tdenginewriter + tdenginewriter + 1.0.0 + + + 8 + 8 + + + + + com.alibaba.datax + datax-common + ${datax-project-version} + + + slf4j-log4j12 + org.slf4j + + + + + + junit + junit + ${junit-version} + test + + + + + + + + + maven-compiler-plugin + + ${jdk-version} + ${jdk-version} + ${project-sourceEncoding} + + + + maven-assembly-plugin + + + src/main/assembly/package.xml + + datax + + + + dwzip + package + + single + + + + + + + + \ No newline at end of file diff --git a/tdenginewriter/src/main/assembly/package.xml b/tdenginewriter/src/main/assembly/package.xml new file mode 100755 index 00000000..f7a7d0bf --- /dev/null +++ b/tdenginewriter/src/main/assembly/package.xml @@ -0,0 +1,34 @@ + + + + dir + + false + + + src/main/resources + + plugin.json + plugin_job_template.json + + plugin/writer/tdenginewriter + + + target/ + + tdenginewriter-1.0.0.jar + + plugin/writer/tdenginewriter + + + + + + false + plugin/writer/tdenginewriter/libs + runtime + + + diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/JniConnection.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/JniConnection.java new file mode 100644 index 00000000..5cbbf2ae --- /dev/null +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/JniConnection.java @@ -0,0 +1,83 @@ +package com.alibaba.datax.plugin.writer; + +import java.util.Properties; + +public class JniConnection { + + private static final long JNI_NULL_POINTER = 0L; + private static final String PROPERTY_KEY_CONFIG_DIR = "cfgdir"; + private static final String PROPERTY_KEY_LOCALE = "locale"; + private static final String PROPERTY_KEY_CHARSET = "charset"; + private static final String PROPERTY_KEY_TIME_ZONE = "timezone"; + + private long psql; + + static { + System.loadLibrary("taos"); + } + + public JniConnection(Properties props) { + if (this.psql != JNI_NULL_POINTER) { + close(); + this.psql = JNI_NULL_POINTER; + } + + initImp(props.getProperty(PROPERTY_KEY_CONFIG_DIR, null)); + + String locale = props.getProperty(PROPERTY_KEY_LOCALE); + if (setOptions(0, locale) < 0) { + throw new RuntimeException("Failed to set locale: " + locale + ". System default will be used."); + } + String charset = props.getProperty(PROPERTY_KEY_CHARSET); + if (setOptions(1, charset) < 0) { + throw new RuntimeException("Failed to set charset: " + charset + ". System default will be used."); + } + String timezone = props.getProperty(PROPERTY_KEY_TIME_ZONE); + if (setOptions(2, timezone) < 0) { + throw new RuntimeException("Failed to set timezone: " + timezone + ". System default will be used."); + } + } + + public long open(String host, int port, String dbname, String user, String password) { + if (this.psql != JNI_NULL_POINTER) { + close(); + this.psql = JNI_NULL_POINTER; + } + + this.psql = connectImp(host, port, dbname, user, password); + if (this.psql == JNI_NULL_POINTER) { + String errMsg = getErrMsgImp(0); + throw new RuntimeException(errMsg); + } + return this.psql; + } + + public void close() { + int code = this.closeConnectionImp(this.psql); + if (code != 0) { + throw new RuntimeException("JNI closeConnection failed"); + } + this.psql = JNI_NULL_POINTER; + } + + private static native void initImp(String configDir); + + private static native int setOptions(int optionIndex, String optionValue); + + private static native String getTsCharset(); + + private native long connectImp(String host, int port, String dbName, String user, String password); + + private native long executeQueryImp(byte[] sqlBytes, long connection); + + private native int getErrCodeImp(long connection, long pSql); + + private native String getErrMsgImp(long pSql); + + private native int getAffectedRowsImp(long connection, long pSql); + + private native int closeConnectionImp(long connection); + + private native long insertOpentsdbJson(String json, long pSql); + +} diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/TDengineWriter.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/TDengineWriter.java new file mode 100644 index 00000000..184279e4 --- /dev/null +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/TDengineWriter.java @@ -0,0 +1,113 @@ +package com.alibaba.datax.plugin.writer; + + +import com.alibaba.datax.common.element.Column; +import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.plugin.RecordReceiver; +import com.alibaba.datax.common.spi.Writer; +import com.alibaba.datax.common.util.Configuration; + +import java.io.BufferedWriter; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +public class TDengineWriter extends Writer { + + private static final String HOST = "host"; + private static final String PORT = "port"; + private static final String DBNAME = "dbname"; + private static final String USER = "user"; + private static final String PASSWORD = "password"; + + public static class Job extends Writer.Job { + + private Configuration originalConfig; + + @Override + public void init() { + this.originalConfig = super.getPluginJobConf(); + } + + @Override + public void destroy() { + + } + + @Override + public List split(int mandatoryNumber) { + List writerSplitConfigs = new ArrayList(); + for (int i = 0; i < mandatoryNumber; i++) { + writerSplitConfigs.add(this.originalConfig); + } + + return writerSplitConfigs; + } + } + + public static class Task extends Writer.Task { + private static final String NEWLINE_FLAG = System.getProperty("line.separator", "\n"); + private Configuration writerSliceConfig; + private String peerPluginName; + + @Override + public void init() { + this.writerSliceConfig = getPluginJobConf(); + this.peerPluginName = getPeerPluginName(); + } + + @Override + public void destroy() { + + } + + @Override + public void startWrite(RecordReceiver lineReceiver) { + + String host = this.writerSliceConfig.getString(HOST); + int port = this.writerSliceConfig.getInt(PORT); + String dbname = this.writerSliceConfig.getString(DBNAME); + String user = this.writerSliceConfig.getString(USER); + String password = this.writerSliceConfig.getString(PASSWORD); + + JniConnection connection = new JniConnection(new Properties()); + long psql = connection.open(host, port, dbname, user, password); + System.out.println("psql: " + psql); + connection.close(); + + try { + BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(System.out, StandardCharsets.UTF_8)); + + Record record; + while ((record = lineReceiver.getFromReader()) != null) { + writer.write(recordToString(record)); + } + writer.flush(); + + } catch (Exception e) { + throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, e); + } + } + + private String recordToString(Record record) { + int recordLength = record.getColumnNumber(); + if (0 == recordLength) { + return NEWLINE_FLAG; + } + + Column column; + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < recordLength; i++) { + column = record.getColumn(i); + sb.append(column.asString()).append("\t"); + } + sb.setLength(sb.length() - 1); + sb.append(NEWLINE_FLAG); + + return sb.toString(); + } + } +} diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/TDengineWriterErrorCode.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/TDengineWriterErrorCode.java new file mode 100644 index 00000000..02e87079 --- /dev/null +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/TDengineWriterErrorCode.java @@ -0,0 +1,31 @@ +package com.alibaba.datax.plugin.writer; + +import com.alibaba.datax.common.spi.ErrorCode; + +public enum TDengineWriterErrorCode implements ErrorCode { + RUNTIME_EXCEPTION("TDengineWriter-00", "运行时异常"); + + private final String code; + private final String description; + + private TDengineWriterErrorCode(String code, String description) { + this.code = code; + this.description = description; + } + + @Override + public String getCode() { + return this.code; + } + + @Override + public String getDescription() { + return this.description; + } + + @Override + public String toString() { + return String.format("Code:[%s], Description:[%s]. ", this.code, + this.description); + } +} diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/com_alibaba_datax_plugin_writer_JniConnection.h b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/com_alibaba_datax_plugin_writer_JniConnection.h new file mode 100644 index 00000000..0a161b92 --- /dev/null +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/com_alibaba_datax_plugin_writer_JniConnection.h @@ -0,0 +1,87 @@ +/* DO NOT EDIT THIS FILE - it is machine generated */ +#include +/* Header for class com_alibaba_datax_plugin_writer_JniConnection */ + +#ifndef _Included_com_alibaba_datax_plugin_writer_JniConnection +#define _Included_com_alibaba_datax_plugin_writer_JniConnection +#ifdef __cplusplus +extern "C" { +#endif +#undef com_alibaba_datax_plugin_writer_JniConnection_JNI_NULL_POINTER +#define com_alibaba_datax_plugin_writer_JniConnection_JNI_NULL_POINTER 0LL +/* + * Class: com_alibaba_datax_plugin_writer_JniConnection + * Method: initImp + * Signature: (Ljava/lang/String;)V + */ +JNIEXPORT void JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_initImp + (JNIEnv *, jclass, jstring); + +/* + * Class: com_alibaba_datax_plugin_writer_JniConnection + * Method: setOptions + * Signature: (ILjava/lang/String;)I + */ +JNIEXPORT jint JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_setOptions + (JNIEnv *, jclass, jint, jstring); + +/* + * Class: com_alibaba_datax_plugin_writer_JniConnection + * Method: getTsCharset + * Signature: ()Ljava/lang/String; + */ +JNIEXPORT jstring JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_getTsCharset + (JNIEnv *, jclass); + +/* + * Class: com_alibaba_datax_plugin_writer_JniConnection + * Method: connectImp + * Signature: (Ljava/lang/String;ILjava/lang/String;Ljava/lang/String;Ljava/lang/String;)J + */ +JNIEXPORT jlong JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_connectImp + (JNIEnv *, jobject, jstring, jint, jstring, jstring, jstring); + +/* + * Class: com_alibaba_datax_plugin_writer_JniConnection + * Method: executeQueryImp + * Signature: ([BJ)J + */ +JNIEXPORT jlong JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_executeQueryImp + (JNIEnv *, jobject, jbyteArray, jlong); + +/* + * Class: com_alibaba_datax_plugin_writer_JniConnection + * Method: getErrCodeImp + * Signature: (JJ)I + */ +JNIEXPORT jint JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_getErrCodeImp + (JNIEnv *, jobject, jlong, jlong); + +/* + * Class: com_alibaba_datax_plugin_writer_JniConnection + * Method: getErrMsgImp + * Signature: (J)Ljava/lang/String; + */ +JNIEXPORT jstring JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_getErrMsgImp + (JNIEnv *, jobject, jlong); + +/* + * Class: com_alibaba_datax_plugin_writer_JniConnection + * Method: getAffectedRowsImp + * Signature: (JJ)I + */ +JNIEXPORT jint JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_getAffectedRowsImp + (JNIEnv *, jobject, jlong, jlong); + +/* + * Class: com_alibaba_datax_plugin_writer_JniConnection + * Method: closeConnectionImp + * Signature: (J)I + */ +JNIEXPORT jint JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_closeConnectionImp + (JNIEnv *, jobject, jlong); + +#ifdef __cplusplus +} +#endif +#endif diff --git a/tdenginewriter/src/main/resources/plugin.json b/tdenginewriter/src/main/resources/plugin.json new file mode 100755 index 00000000..6c900a15 --- /dev/null +++ b/tdenginewriter/src/main/resources/plugin.json @@ -0,0 +1,9 @@ +{ + "name": "tdenginewriter", + "class": "com.alibaba.datax.plugin.writer.TDengineWriter", + "description": { + "useScene": "data migration to tdengine", + "mechanism": "use JNI to write data to tdengine." + }, + "developer": "zyyang-taosdata" +} \ No newline at end of file diff --git a/tdenginewriter/src/main/resources/plugin_job_template.json b/tdenginewriter/src/main/resources/plugin_job_template.json new file mode 100644 index 00000000..5482b26e --- /dev/null +++ b/tdenginewriter/src/main/resources/plugin_job_template.json @@ -0,0 +1,10 @@ +{ + "name": "tdenginewriter", + "parameter": { + "host": "", + "port": 6030, + "db": "", + "user": "", + "password": "" + } +} \ No newline at end of file diff --git a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/JniConnectionTest.java b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/JniConnectionTest.java new file mode 100644 index 00000000..1c9f426f --- /dev/null +++ b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/JniConnectionTest.java @@ -0,0 +1,19 @@ +package com.alibaba.datax.plugin.writer; + +import org.junit.Test; + +import java.util.Properties; + +public class JniConnectionTest { + + @Test + public void test() { + JniConnection connection = new JniConnection(new Properties()); + + long psql = connection.open("192.168.56.107", 6030, "log", "root", "taosdata"); + System.out.println("psql: " + psql); + + connection.close(); + } + +} \ No newline at end of file