From ab526ca5c4d4c846277224cb122fec581d3f368c Mon Sep 17 00:00:00 2001 From: zyyang Date: Tue, 19 Oct 2021 15:58:47 +0800 Subject: [PATCH] tdenginewriter --- .../datax/plugin/writer/TDengineWriter.java | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/TDengineWriter.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/TDengineWriter.java index 60c76522..9dc42d9d 100644 --- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/TDengineWriter.java +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/TDengineWriter.java @@ -45,7 +45,6 @@ public class TDengineWriter extends Writer { public static class Task extends Writer.Task { private static final Logger LOG = LoggerFactory.getLogger(Job.class); - private static final String NEWLINE_FLAG = System.getProperty("line.separator", "\n"); private Configuration writerSliceConfig; @Override @@ -70,25 +69,30 @@ public class TDengineWriter extends Writer { String peerPluginName = this.writerSliceConfig.getString(PEER_PLUGIN_NAME); if (peerPluginName.equals("opentsdbreader")) { // opentsdb json protocol use JNI and schemaless API to write - String host = properties.getProperty(Key.HOST); int port = Integer.parseInt(properties.getProperty(Key.PORT)); String dbname = properties.getProperty(Key.DBNAME); String user = properties.getProperty(Key.USER); String password = properties.getProperty(Key.PASSWORD); + JniConnection conn = null; try { - JniConnection conn = new JniConnection(properties); + conn = new JniConnection(properties); conn.open(host, port, dbname, user, password); LOG.info("TDengine connection established, host: " + host + ", port: " + port + ", dbname: " + dbname + ", user: " + user); - int batchSize = Integer.parseInt(properties.getProperty(Key.BATCH_SIZE, DEFAULT_BATCH_SIZE)); writeOpentsdb(lineReceiver, conn, batchSize); - conn.close(); - LOG.info("TDengine connection closed"); } catch (Exception e) { LOG.error(e.getMessage()); e.printStackTrace(); + } finally { + try { + if (conn != null) + conn.close(); + } catch (Exception e) { + e.printStackTrace(); + } + LOG.info("TDengine connection closed"); } } else { // other @@ -132,7 +136,7 @@ public class TDengineWriter extends Writer { private String recordToString(Record record) { int recordLength = record.getColumnNumber(); if (0 == recordLength) { - return NEWLINE_FLAG; + return ""; } Column column; StringBuilder sb = new StringBuilder(); @@ -141,7 +145,6 @@ public class TDengineWriter extends Writer { sb.append(column.asString()).append("\t"); } sb.setLength(sb.length() - 1); - sb.append(NEWLINE_FLAG); return sb.toString(); } }