This commit is contained in:
zyyang 2021-10-13 17:02:43 +08:00
parent f6520cf06b
commit 975096b676
5 changed files with 69 additions and 31 deletions

View File

@ -1,9 +1,12 @@
package com.alibaba.datax.core; package com.alibaba.datax.core;
import org.junit.Test;
public class EngineTest { public class EngineTest {
public static void main(String[] args) { @Test
public void test() {
System.out.println(System.getProperty("java.library.path")); System.out.println(System.getProperty("java.library.path"));
// String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "/Users/yangzy/workspace/DataX/job/opentsdb2stream.json"}; // String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "/Users/yangzy/workspace/DataX/job/opentsdb2stream.json"};
String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "/Users/yangzy/workspace/DataX/job/opentsdb2tdengine.json"}; String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "/Users/yangzy/workspace/DataX/job/opentsdb2tdengine.json"};

View File

@ -7,7 +7,7 @@
"parameter": { "parameter": {
"endpoint": "http://192.168.1.180:4242", "endpoint": "http://192.168.1.180:4242",
"column": [ "column": [
"weather.temperature" "weather_temperature"
], ],
"beginDateTime": "2021-01-01 00:00:00", "beginDateTime": "2021-01-01 00:00:00",
"endDateTime": "2021-01-01 01:00:00" "endDateTime": "2021-01-01 01:00:00"

View File

@ -6,10 +6,10 @@ public class JniConnection {
private static final long JNI_NULL_POINTER = 0L; private static final long JNI_NULL_POINTER = 0L;
private static final int JNI_SUCCESSFUL = 0; private static final int JNI_SUCCESSFUL = 0;
private static final String PROPERTY_KEY_CONFIG_DIR = "cfgdir"; public static final String PROPERTY_KEY_CONFIG_DIR = "cfgdir";
private static final String PROPERTY_KEY_LOCALE = "locale"; public static final String PROPERTY_KEY_LOCALE = "locale";
private static final String PROPERTY_KEY_CHARSET = "charset"; public static final String PROPERTY_KEY_CHARSET = "charset";
private static final String PROPERTY_KEY_TIME_ZONE = "timezone"; public static final String PROPERTY_KEY_TIME_ZONE = "timezone";
private long conn; private long conn;
@ -17,7 +17,7 @@ public class JniConnection {
System.loadLibrary("taos"); System.loadLibrary("taos");
} }
public JniConnection(Properties props) { public JniConnection(Properties props) throws Exception {
if (this.conn != JNI_NULL_POINTER) { if (this.conn != JNI_NULL_POINTER) {
close(); close();
this.conn = JNI_NULL_POINTER; this.conn = JNI_NULL_POINTER;
@ -27,19 +27,19 @@ public class JniConnection {
String locale = props.getProperty(PROPERTY_KEY_LOCALE); String locale = props.getProperty(PROPERTY_KEY_LOCALE);
if (setOptions(0, locale) < 0) { if (setOptions(0, locale) < 0) {
throw new RuntimeException("Failed to set locale: " + locale + ". System default will be used."); throw new Exception("Failed to set locale: " + locale + ". System default will be used.");
} }
String charset = props.getProperty(PROPERTY_KEY_CHARSET); String charset = props.getProperty(PROPERTY_KEY_CHARSET);
if (setOptions(1, charset) < 0) { if (setOptions(1, charset) < 0) {
throw new RuntimeException("Failed to set charset: " + charset + ". System default will be used."); throw new Exception("Failed to set charset: " + charset + ". System default will be used.");
} }
String timezone = props.getProperty(PROPERTY_KEY_TIME_ZONE); String timezone = props.getProperty(PROPERTY_KEY_TIME_ZONE);
if (setOptions(2, timezone) < 0) { if (setOptions(2, timezone) < 0) {
throw new RuntimeException("Failed to set timezone: " + timezone + ". System default will be used."); throw new Exception("Failed to set timezone: " + timezone + ". System default will be used.");
} }
} }
public void open(String host, int port, String dbname, String user, String password) { public void open(String host, int port, String dbname, String user, String password) throws Exception {
if (this.conn != JNI_NULL_POINTER) { if (this.conn != JNI_NULL_POINTER) {
close(); close();
this.conn = JNI_NULL_POINTER; this.conn = JNI_NULL_POINTER;
@ -48,25 +48,25 @@ public class JniConnection {
this.conn = connectImp(host, port, dbname, user, password); this.conn = connectImp(host, port, dbname, user, password);
if (this.conn == JNI_NULL_POINTER) { if (this.conn == JNI_NULL_POINTER) {
String errMsg = getErrMsgImp(0); String errMsg = getErrMsgImp(0);
throw new RuntimeException(errMsg); throw new Exception(errMsg);
} }
} }
public void insertOpentsdbJson(String json) { public void insertOpentsdbJson(String json) throws Exception {
if (this.conn == JNI_NULL_POINTER) { if (this.conn == JNI_NULL_POINTER) {
throw new RuntimeException("JNI connection is NULL"); throw new Exception("JNI connection is NULL");
} }
long code = insertOpentsdbJson(json, this.conn); long code = insertOpentsdbJson(json, this.conn);
if (code != JNI_SUCCESSFUL) { if (code != JNI_SUCCESSFUL) {
String errMsg = getErrMsgByCode(code); String errMsg = getErrMsgByCode(code);
throw new RuntimeException(errMsg); throw new Exception(errMsg);
} }
} }
public void close() { public void close() throws Exception {
int code = this.closeConnectionImp(this.conn); int code = this.closeConnectionImp(this.conn);
if (code != 0) { if (code != 0) {
throw new RuntimeException("JNI closeConnection failed"); throw new Exception("JNI closeConnection failed");
} }
this.conn = JNI_NULL_POINTER; this.conn = JNI_NULL_POINTER;
} }

View File

@ -7,12 +7,12 @@ import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordReceiver; import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.spi.Writer; import com.alibaba.datax.common.spi.Writer;
import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.common.util.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedWriter;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Optional;
import java.util.Properties; import java.util.Properties;
public class TDengineWriter extends Writer { public class TDengineWriter extends Writer {
@ -22,6 +22,7 @@ public class TDengineWriter extends Writer {
private static final String DBNAME = "dbname"; private static final String DBNAME = "dbname";
private static final String USER = "user"; private static final String USER = "user";
private static final String PASSWORD = "password"; private static final String PASSWORD = "password";
private static final String PEER_PLUGIN_NAME = "peerPluginName";
public static class Job extends Writer.Job { public static class Job extends Writer.Job {
@ -30,6 +31,7 @@ public class TDengineWriter extends Writer {
@Override @Override
public void init() { public void init() {
this.originalConfig = super.getPluginJobConf(); this.originalConfig = super.getPluginJobConf();
this.originalConfig.set(PEER_PLUGIN_NAME, getPeerPluginName());
} }
@Override @Override
@ -49,14 +51,14 @@ public class TDengineWriter extends Writer {
} }
public static class Task extends Writer.Task { public static class Task extends Writer.Task {
private static final Logger LOG = LoggerFactory.getLogger(Job.class);
private static final String NEWLINE_FLAG = System.getProperty("line.separator", "\n"); private static final String NEWLINE_FLAG = System.getProperty("line.separator", "\n");
private Configuration writerSliceConfig; private Configuration writerSliceConfig;
private String peerPluginName;
@Override @Override
public void init() { public void init() {
this.writerSliceConfig = getPluginJobConf(); this.writerSliceConfig = getPluginJobConf();
this.peerPluginName = getPeerPluginName();
} }
@Override @Override
@ -67,22 +69,57 @@ public class TDengineWriter extends Writer {
@Override @Override
public void startWrite(RecordReceiver lineReceiver) { public void startWrite(RecordReceiver lineReceiver) {
String host = this.writerSliceConfig.getString(HOST); String host = this.writerSliceConfig.getString(HOST);
int port = this.writerSliceConfig.getInt(PORT); int port = this.writerSliceConfig.getInt(PORT);
String dbname = this.writerSliceConfig.getString(DBNAME); String dbname = this.writerSliceConfig.getString(DBNAME);
String user = this.writerSliceConfig.getString(USER); String user = this.writerSliceConfig.getString(USER);
String password = this.writerSliceConfig.getString(PASSWORD); String password = this.writerSliceConfig.getString(PASSWORD);
try { Properties properties = new Properties();
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(System.out, StandardCharsets.UTF_8)); String cfgdir = this.writerSliceConfig.getString(JniConnection.PROPERTY_KEY_CONFIG_DIR);
if (cfgdir != null && !cfgdir.isEmpty()) {
properties.setProperty(JniConnection.PROPERTY_KEY_CONFIG_DIR, cfgdir);
}
String timezone = this.writerSliceConfig.getString(JniConnection.PROPERTY_KEY_TIME_ZONE);
if (timezone != null && !timezone.isEmpty()) {
properties.setProperty(JniConnection.PROPERTY_KEY_TIME_ZONE, timezone);
}
String locale = this.writerSliceConfig.getString(JniConnection.PROPERTY_KEY_LOCALE);
if (locale != null && !locale.isEmpty()) {
properties.setProperty(JniConnection.PROPERTY_KEY_LOCALE, locale);
}
String charset = this.writerSliceConfig.getString(JniConnection.PROPERTY_KEY_CHARSET);
if (charset != null && !charset.isEmpty()) {
properties.setProperty(JniConnection.PROPERTY_KEY_CHARSET, charset);
}
String peerPluginName = this.writerSliceConfig.getString(PEER_PLUGIN_NAME);
if (peerPluginName.equals("opentsdbreader")) {
try {
JniConnection conn = new JniConnection(properties);
conn.open(host, port, dbname, user, password);
LOG.info("TDengine connection established, host: " + host + ", port: " + port + ", dbname: " + dbname + ", user: " + user);
writeOpentsdb(lineReceiver, conn);
conn.close();
LOG.info("TDengine connection closed");
} catch (Exception e) {
LOG.error(e.getMessage());
e.printStackTrace();
}
}
}
private void writeOpentsdb(RecordReceiver lineReceiver, JniConnection conn) {
try {
Record record; Record record;
while ((record = lineReceiver.getFromReader()) != null) { while ((record = lineReceiver.getFromReader()) != null) {
writer.write(recordToString(record)); String jsonData = recordToString(record);
LOG.debug(">>> " + jsonData);
conn.insertOpentsdbJson(jsonData);
} }
writer.flush();
} catch (Exception e) { } catch (Exception e) {
LOG.error("TDengineWriter ERROR: " + e.getMessage());
throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, e); throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, e);
} }
} }
@ -92,7 +129,6 @@ public class TDengineWriter extends Writer {
if (0 == recordLength) { if (0 == recordLength) {
return NEWLINE_FLAG; return NEWLINE_FLAG;
} }
Column column; Column column;
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
for (int i = 0; i < recordLength; i++) { for (int i = 0; i < recordLength; i++) {
@ -101,7 +137,6 @@ public class TDengineWriter extends Writer {
} }
sb.setLength(sb.length() - 1); sb.setLength(sb.length() - 1);
sb.append(NEWLINE_FLAG); sb.append(NEWLINE_FLAG);
return sb.toString(); return sb.toString();
} }
} }

View File

@ -7,12 +7,12 @@ import java.util.Properties;
public class JniConnectionTest { public class JniConnectionTest {
@Test @Test
public void test() { public void test() throws Exception {
JniConnection connection = new JniConnection(new Properties()); JniConnection connection = new JniConnection(new Properties());
connection.open("192.168.56.105", 6030, "test", "root", "taosdata"); connection.open("192.168.56.105", 6030, "test", "root", "taosdata");
String json = "{\"metric\":\"weather.temperature\",\"timestamp\":1609430400000,\"value\":123,\"tags\":{\"location\":\"beijing\",\"id\":123}}"; String json = "{\"metric\":\"weather_temperature\",\"timestamp\":1609430400000,\"value\":123,\"tags\":{\"location\":\"beijing\",\"id\":\"t123\"}}";
connection.insertOpentsdbJson(json); connection.insertOpentsdbJson(json);
connection.close(); connection.close();