add tdengine writer plugin

This commit is contained in:
zyyang 2021-10-12 15:44:24 +08:00
parent 3bdabda6a8
commit d2ab612754
14 changed files with 598 additions and 45 deletions

View File

@ -0,0 +1,18 @@
package com.alibaba.datax.core;
public class EngineTest {
public static void main(String[] args) {
System.out.println(System.getProperty("java.library.path"));
// String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "/Users/yangzy/workspace/DataX/job/opentsdb2stream.json"};
String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "/Users/yangzy/workspace/DataX/job/opentsdb2tdengine.json"};
System.setProperty("datax.home", "/Users/yangzy/workspace/DataX/target/datax/datax");
try {
Engine.entry(params);
} catch (Throwable e) {
e.printStackTrace();
}
}
}

31
job/opentsdb2stream.json Normal file
View File

@ -0,0 +1,31 @@
{
"job": {
"content": [
{
"reader": {
"name": "opentsdbreader",
"parameter": {
"endpoint": "http://192.168.1.180:4242",
"column": [
"weather.temperature"
],
"beginDateTime": "2021-01-01 00:00:00",
"endDateTime": "2021-01-01 01:00:00"
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "UTF-8",
"print": true
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}

View File

@ -0,0 +1,34 @@
{
"job": {
"content": [
{
"reader": {
"name": "opentsdbreader",
"parameter": {
"endpoint": "http://192.168.1.180:4242",
"column": [
"weather.temperature"
],
"beginDateTime": "2021-01-01 00:00:00",
"endDateTime": "2021-01-01 01:00:00"
}
},
"writer": {
"name": "tdenginewriter",
"parameter": {
"host": "192.168.56.105",
"port": 6030,
"db": "test",
"user": "root",
"password": "taosdata"
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}

View File

@ -238,6 +238,13 @@
</includes>
<outputDirectory>datax</outputDirectory>
</fileSet>
<fileSet>
<directory>tdenginewriter/target/datax/</directory>
<includes>
<include>**/*.*</include>
</includes>
<outputDirectory>datax</outputDirectory>
</fileSet>
<fileSet>
<directory>otswriter/target/datax/</directory>
<includes>

92
pom.xml
View File

@ -47,66 +47,68 @@
<module>transformer</module>
<!-- reader -->
<module>mysqlreader</module>
<module>drdsreader</module>
<module>sqlserverreader</module>
<module>postgresqlreader</module>
<module>kingbaseesreader</module>
<module>oraclereader</module>
<!-- <module>mysqlreader</module>-->
<!-- <module>drdsreader</module>-->
<!-- <module>sqlserverreader</module>-->
<!-- <module>postgresqlreader</module>-->
<!-- <module>kingbaseesreader</module>-->
<!-- <module>oraclereader</module>-->
<module>odpsreader</module>
<module>otsreader</module>
<module>otsstreamreader</module>
<!-- <module>otsreader</module>-->
<!-- <module>otsstreamreader</module>-->
<module>txtfilereader</module>
<module>hdfsreader</module>
<!-- <module>hdfsreader</module>-->
<module>streamreader</module>
<module>ossreader</module>
<module>ftpreader</module>
<module>mongodbreader</module>
<!-- <module>ossreader</module>-->
<!-- <module>ftpreader</module>-->
<!-- <module>mongodbreader</module>-->
<module>rdbmsreader</module>
<module>hbase11xreader</module>
<module>hbase094xreader</module>
<module>tsdbreader</module>
<!-- <module>hbase11xreader</module>-->
<!-- <module>hbase094xreader</module>-->
<!-- <module>tsdbreader</module>-->
<module>opentsdbreader</module>
<module>cassandrareader</module>
<module>gdbreader</module>
<module>oceanbasev10reader</module>
<!-- <module>cassandrareader</module>-->
<!-- <module>gdbreader</module>-->
<!-- <module>oceanbasev10reader</module>-->
<!-- writer -->
<module>mysqlwriter</module>
<module>drdswriter</module>
<!-- <module>mysqlwriter</module>-->
<!-- <module>drdswriter</module>-->
<module>odpswriter</module>
<module>txtfilewriter</module>
<module>ftpwriter</module>
<module>hdfswriter</module>
<!-- <module>ftpwriter</module>-->
<!-- <module>hdfswriter</module>-->
<module>streamwriter</module>
<module>otswriter</module>
<module>oraclewriter</module>
<module>sqlserverwriter</module>
<module>postgresqlwriter</module>
<module>kingbaseeswriter</module>
<module>osswriter</module>
<module>mongodbwriter</module>
<!-- <module>otswriter</module>-->
<!-- <module>oraclewriter</module>-->
<!-- <module>sqlserverwriter</module>-->
<!-- <module>postgresqlwriter</module>-->
<!-- <module>kingbaseeswriter</module>-->
<!-- <module>osswriter</module>-->
<!-- <module>mongodbwriter</module>-->
<module>adswriter</module>
<module>ocswriter</module>
<!-- <module>ocswriter</module>-->
<module>rdbmswriter</module>
<module>hbase11xwriter</module>
<module>hbase094xwriter</module>
<module>hbase11xsqlwriter</module>
<module>hbase11xsqlreader</module>
<module>elasticsearchwriter</module>
<module>tsdbwriter</module>
<module>adbpgwriter</module>
<module>gdbwriter</module>
<module>cassandrawriter</module>
<module>clickhousewriter</module>
<module>oscarwriter</module>
<module>oceanbasev10writer</module>
<!-- <module>hbase11xwriter</module>-->
<!-- <module>hbase094xwriter</module>-->
<!-- <module>hbase11xsqlwriter</module>-->
<!-- <module>hbase11xsqlreader</module>-->
<!-- <module>elasticsearchwriter</module>-->
<!-- <module>tsdbwriter</module>-->
<!-- <module>adbpgwriter</module>-->
<!-- <module>gdbwriter</module>-->
<!-- <module>cassandrawriter</module>-->
<!-- <module>clickhousewriter</module>-->
<!-- <module>oscarwriter</module>-->
<!-- <module>oceanbasev10writer</module>-->
<!-- common support module -->
<module>plugin-rdbms-util</module>
<module>plugin-unstructured-storage-util</module>
<module>hbase20xsqlreader</module>
<module>hbase20xsqlwriter</module>
<module>kuduwriter</module>
<!-- <module>hbase20xsqlreader</module>-->
<!-- <module>hbase20xsqlwriter</module>-->
<!-- <module>kuduwriter</module>-->
<module>tdenginewriter</module>
</modules>
<dependencyManagement>

75
tdenginewriter/pom.xml Normal file
View File

@ -0,0 +1,75 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>datax-all</artifactId>
<groupId>com.alibaba.datax</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>com.alibaba.datax.tdenginewriter</groupId>
<artifactId>tdenginewriter</artifactId>
<version>1.0.0</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-common</artifactId>
<version>${datax-project-version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit-version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- compiler plugin -->
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${jdk-version}</source>
<target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptors>
<descriptor>src/main/assembly/package.xml</descriptor>
</descriptors>
<finalName>datax</finalName>
</configuration>
<executions>
<execution>
<id>dwzip</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,34 @@
<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
<id></id>
<formats>
<format>dir</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>src/main/resources</directory>
<includes>
<include>plugin.json</include>
<include>plugin_job_template.json</include>
</includes>
<outputDirectory>plugin/writer/tdenginewriter</outputDirectory>
</fileSet>
<fileSet>
<directory>target/</directory>
<includes>
<include>tdenginewriter-1.0.0.jar</include>
</includes>
<outputDirectory>plugin/writer/tdenginewriter</outputDirectory>
</fileSet>
</fileSets>
<dependencySets>
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<outputDirectory>plugin/writer/tdenginewriter/libs</outputDirectory>
<scope>runtime</scope>
</dependencySet>
</dependencySets>
</assembly>

View File

@ -0,0 +1,83 @@
package com.alibaba.datax.plugin.writer;
import java.util.Properties;
public class JniConnection {
private static final long JNI_NULL_POINTER = 0L;
private static final String PROPERTY_KEY_CONFIG_DIR = "cfgdir";
private static final String PROPERTY_KEY_LOCALE = "locale";
private static final String PROPERTY_KEY_CHARSET = "charset";
private static final String PROPERTY_KEY_TIME_ZONE = "timezone";
private long psql;
static {
System.loadLibrary("taos");
}
public JniConnection(Properties props) {
if (this.psql != JNI_NULL_POINTER) {
close();
this.psql = JNI_NULL_POINTER;
}
initImp(props.getProperty(PROPERTY_KEY_CONFIG_DIR, null));
String locale = props.getProperty(PROPERTY_KEY_LOCALE);
if (setOptions(0, locale) < 0) {
throw new RuntimeException("Failed to set locale: " + locale + ". System default will be used.");
}
String charset = props.getProperty(PROPERTY_KEY_CHARSET);
if (setOptions(1, charset) < 0) {
throw new RuntimeException("Failed to set charset: " + charset + ". System default will be used.");
}
String timezone = props.getProperty(PROPERTY_KEY_TIME_ZONE);
if (setOptions(2, timezone) < 0) {
throw new RuntimeException("Failed to set timezone: " + timezone + ". System default will be used.");
}
}
public long open(String host, int port, String dbname, String user, String password) {
if (this.psql != JNI_NULL_POINTER) {
close();
this.psql = JNI_NULL_POINTER;
}
this.psql = connectImp(host, port, dbname, user, password);
if (this.psql == JNI_NULL_POINTER) {
String errMsg = getErrMsgImp(0);
throw new RuntimeException(errMsg);
}
return this.psql;
}
public void close() {
int code = this.closeConnectionImp(this.psql);
if (code != 0) {
throw new RuntimeException("JNI closeConnection failed");
}
this.psql = JNI_NULL_POINTER;
}
private static native void initImp(String configDir);
private static native int setOptions(int optionIndex, String optionValue);
private static native String getTsCharset();
private native long connectImp(String host, int port, String dbName, String user, String password);
private native long executeQueryImp(byte[] sqlBytes, long connection);
private native int getErrCodeImp(long connection, long pSql);
private native String getErrMsgImp(long pSql);
private native int getAffectedRowsImp(long connection, long pSql);
private native int closeConnectionImp(long connection);
private native long insertOpentsdbJson(String json, long pSql);
}

View File

@ -0,0 +1,113 @@
package com.alibaba.datax.plugin.writer;
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.spi.Writer;
import com.alibaba.datax.common.util.Configuration;
import java.io.BufferedWriter;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
public class TDengineWriter extends Writer {
private static final String HOST = "host";
private static final String PORT = "port";
private static final String DBNAME = "dbname";
private static final String USER = "user";
private static final String PASSWORD = "password";
public static class Job extends Writer.Job {
private Configuration originalConfig;
@Override
public void init() {
this.originalConfig = super.getPluginJobConf();
}
@Override
public void destroy() {
}
@Override
public List<Configuration> split(int mandatoryNumber) {
List<Configuration> writerSplitConfigs = new ArrayList<Configuration>();
for (int i = 0; i < mandatoryNumber; i++) {
writerSplitConfigs.add(this.originalConfig);
}
return writerSplitConfigs;
}
}
public static class Task extends Writer.Task {
private static final String NEWLINE_FLAG = System.getProperty("line.separator", "\n");
private Configuration writerSliceConfig;
private String peerPluginName;
@Override
public void init() {
this.writerSliceConfig = getPluginJobConf();
this.peerPluginName = getPeerPluginName();
}
@Override
public void destroy() {
}
@Override
public void startWrite(RecordReceiver lineReceiver) {
String host = this.writerSliceConfig.getString(HOST);
int port = this.writerSliceConfig.getInt(PORT);
String dbname = this.writerSliceConfig.getString(DBNAME);
String user = this.writerSliceConfig.getString(USER);
String password = this.writerSliceConfig.getString(PASSWORD);
JniConnection connection = new JniConnection(new Properties());
long psql = connection.open(host, port, dbname, user, password);
System.out.println("psql: " + psql);
connection.close();
try {
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(System.out, StandardCharsets.UTF_8));
Record record;
while ((record = lineReceiver.getFromReader()) != null) {
writer.write(recordToString(record));
}
writer.flush();
} catch (Exception e) {
throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, e);
}
}
private String recordToString(Record record) {
int recordLength = record.getColumnNumber();
if (0 == recordLength) {
return NEWLINE_FLAG;
}
Column column;
StringBuilder sb = new StringBuilder();
for (int i = 0; i < recordLength; i++) {
column = record.getColumn(i);
sb.append(column.asString()).append("\t");
}
sb.setLength(sb.length() - 1);
sb.append(NEWLINE_FLAG);
return sb.toString();
}
}
}

View File

@ -0,0 +1,31 @@
package com.alibaba.datax.plugin.writer;
import com.alibaba.datax.common.spi.ErrorCode;
public enum TDengineWriterErrorCode implements ErrorCode {
RUNTIME_EXCEPTION("TDengineWriter-00", "运行时异常");
private final String code;
private final String description;
private TDengineWriterErrorCode(String code, String description) {
this.code = code;
this.description = description;
}
@Override
public String getCode() {
return this.code;
}
@Override
public String getDescription() {
return this.description;
}
@Override
public String toString() {
return String.format("Code:[%s], Description:[%s]. ", this.code,
this.description);
}
}

View File

@ -0,0 +1,87 @@
/* DO NOT EDIT THIS FILE - it is machine generated */
#include <jni.h>
/* Header for class com_alibaba_datax_plugin_writer_JniConnection */
#ifndef _Included_com_alibaba_datax_plugin_writer_JniConnection
#define _Included_com_alibaba_datax_plugin_writer_JniConnection
#ifdef __cplusplus
extern "C" {
#endif
#undef com_alibaba_datax_plugin_writer_JniConnection_JNI_NULL_POINTER
#define com_alibaba_datax_plugin_writer_JniConnection_JNI_NULL_POINTER 0LL
/*
* Class: com_alibaba_datax_plugin_writer_JniConnection
* Method: initImp
* Signature: (Ljava/lang/String;)V
*/
JNIEXPORT void JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_initImp
(JNIEnv *, jclass, jstring);
/*
* Class: com_alibaba_datax_plugin_writer_JniConnection
* Method: setOptions
* Signature: (ILjava/lang/String;)I
*/
JNIEXPORT jint JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_setOptions
(JNIEnv *, jclass, jint, jstring);
/*
* Class: com_alibaba_datax_plugin_writer_JniConnection
* Method: getTsCharset
* Signature: ()Ljava/lang/String;
*/
JNIEXPORT jstring JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_getTsCharset
(JNIEnv *, jclass);
/*
* Class: com_alibaba_datax_plugin_writer_JniConnection
* Method: connectImp
* Signature: (Ljava/lang/String;ILjava/lang/String;Ljava/lang/String;Ljava/lang/String;)J
*/
JNIEXPORT jlong JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_connectImp
(JNIEnv *, jobject, jstring, jint, jstring, jstring, jstring);
/*
* Class: com_alibaba_datax_plugin_writer_JniConnection
* Method: executeQueryImp
* Signature: ([BJ)J
*/
JNIEXPORT jlong JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_executeQueryImp
(JNIEnv *, jobject, jbyteArray, jlong);
/*
* Class: com_alibaba_datax_plugin_writer_JniConnection
* Method: getErrCodeImp
* Signature: (JJ)I
*/
JNIEXPORT jint JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_getErrCodeImp
(JNIEnv *, jobject, jlong, jlong);
/*
* Class: com_alibaba_datax_plugin_writer_JniConnection
* Method: getErrMsgImp
* Signature: (J)Ljava/lang/String;
*/
JNIEXPORT jstring JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_getErrMsgImp
(JNIEnv *, jobject, jlong);
/*
* Class: com_alibaba_datax_plugin_writer_JniConnection
* Method: getAffectedRowsImp
* Signature: (JJ)I
*/
JNIEXPORT jint JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_getAffectedRowsImp
(JNIEnv *, jobject, jlong, jlong);
/*
* Class: com_alibaba_datax_plugin_writer_JniConnection
* Method: closeConnectionImp
* Signature: (J)I
*/
JNIEXPORT jint JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_closeConnectionImp
(JNIEnv *, jobject, jlong);
#ifdef __cplusplus
}
#endif
#endif

View File

@ -0,0 +1,9 @@
{
"name": "tdenginewriter",
"class": "com.alibaba.datax.plugin.writer.TDengineWriter",
"description": {
"useScene": "data migration to tdengine",
"mechanism": "use JNI to write data to tdengine."
},
"developer": "zyyang-taosdata"
}

View File

@ -0,0 +1,10 @@
{
"name": "tdenginewriter",
"parameter": {
"host": "",
"port": 6030,
"db": "",
"user": "",
"password": ""
}
}

View File

@ -0,0 +1,19 @@
package com.alibaba.datax.plugin.writer;
import org.junit.Test;
import java.util.Properties;
public class JniConnectionTest {
@Test
public void test() {
JniConnection connection = new JniConnection(new Properties());
long psql = connection.open("192.168.56.107", 6030, "log", "root", "taosdata");
System.out.println("psql: " + psql);
connection.close();
}
}