tdenginewriter

This commit is contained in:
zyyang 2021-10-19 15:58:47 +08:00
parent 5e54aee590
commit ab526ca5c4

View File

@ -45,7 +45,6 @@ public class TDengineWriter extends Writer {
public static class Task extends Writer.Task {
private static final Logger LOG = LoggerFactory.getLogger(Job.class);
private static final String NEWLINE_FLAG = System.getProperty("line.separator", "\n");
private Configuration writerSliceConfig;
@Override
@ -70,25 +69,30 @@ public class TDengineWriter extends Writer {
String peerPluginName = this.writerSliceConfig.getString(PEER_PLUGIN_NAME);
if (peerPluginName.equals("opentsdbreader")) {
// opentsdb json protocol use JNI and schemaless API to write
String host = properties.getProperty(Key.HOST);
int port = Integer.parseInt(properties.getProperty(Key.PORT));
String dbname = properties.getProperty(Key.DBNAME);
String user = properties.getProperty(Key.USER);
String password = properties.getProperty(Key.PASSWORD);
JniConnection conn = null;
try {
JniConnection conn = new JniConnection(properties);
conn = new JniConnection(properties);
conn.open(host, port, dbname, user, password);
LOG.info("TDengine connection established, host: " + host + ", port: " + port + ", dbname: " + dbname + ", user: " + user);
int batchSize = Integer.parseInt(properties.getProperty(Key.BATCH_SIZE, DEFAULT_BATCH_SIZE));
writeOpentsdb(lineReceiver, conn, batchSize);
conn.close();
LOG.info("TDengine connection closed");
} catch (Exception e) {
LOG.error(e.getMessage());
e.printStackTrace();
} finally {
try {
if (conn != null)
conn.close();
} catch (Exception e) {
e.printStackTrace();
}
LOG.info("TDengine connection closed");
}
} else {
// other
@ -132,7 +136,7 @@ public class TDengineWriter extends Writer {
private String recordToString(Record record) {
int recordLength = record.getColumnNumber();
if (0 == recordLength) {
return NEWLINE_FLAG;
return "";
}
Column column;
StringBuilder sb = new StringBuilder();
@ -141,7 +145,6 @@ public class TDengineWriter extends Writer {
sb.append(column.asString()).append("\t");
}
sb.setLength(sb.length() - 1);
sb.append(NEWLINE_FLAG);
return sb.toString();
}
}