Merge pull request #3 from taosdata/feature/TD-10698

data handle use stratgy pattern
This commit is contained in:
huolibo 2021-10-20 14:28:51 +08:00 committed by GitHub
commit a1e48c53e3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 218 additions and 84 deletions

View File

@ -0,0 +1,56 @@
{
"job": {
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"column": [
{
"random": "2021-01-01 00:00:00, 2021-01-01 23:59:59",
"type": "date"
},
{
"random": "0, 10000",
"type": "long"
},
{
"random": "0, 10",
"type": "string"
},
{
"random": "0, 5",
"type": "bool"
},
{
"random": "0, 10",
"type": "double"
},
{
"random": "0, 10",
"type": "bytes"
}
],
"sliceRecordCount": 100
}
},
"writer": {
"name": "tdenginewriter",
"parameter": {
"host": "192.168.56.105",
"port": 6030,
"dbname": "test",
"user": "root",
"password": "taosdata",
"batchSize": 1000
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}

View File

@ -8,7 +8,8 @@ public class EngineTest {
@Test
public void test() {
System.out.println(System.getProperty("java.library.path"));
String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/main/job/opentsdb2tdengine.json"};
// String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/main/job/opentsdb2tdengine.json"};
String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/main/job/stream2tdengine.json"};
System.setProperty("datax.home", "../target/datax/datax");
try {
Engine.entry(params);

View File

@ -0,0 +1,10 @@
package com.alibaba.datax.plugin.writer;
import com.alibaba.datax.common.plugin.RecordReceiver;
import java.util.Properties;
public interface DataHandler {
long handle(RecordReceiver lineReceiver, Properties properties);
}

View File

@ -0,0 +1,10 @@
package com.alibaba.datax.plugin.writer;
public class DataHandlerFactory {
public static DataHandler build(String peerPluginName) {
if (peerPluginName.equals("opentsdbreader"))
return new OpentsdbDataHandler();
return new DefaultDataHandler();
}
}

View File

@ -0,0 +1,34 @@
package com.alibaba.datax.plugin.writer;
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.plugin.RecordReceiver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
public class DefaultDataHandler implements DataHandler {
private static final Logger LOG = LoggerFactory.getLogger(DefaultDataHandler.class);
@Override
public long handle(RecordReceiver lineReceiver, Properties properties) {
long count = 0;
Record record;
while ((record = lineReceiver.getFromReader()) != null) {
int recordLength = record.getColumnNumber();
StringBuilder sb = new StringBuilder();
for (int i = 0; i < recordLength; i++) {
Column column = record.getColumn(i);
sb.append(column.asString()).append("\t");
}
sb.setLength(sb.length() - 1);
LOG.debug(sb.toString());
count++;
}
return count;
}
}

View File

@ -0,0 +1,98 @@
package com.alibaba.datax.plugin.writer;
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordReceiver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
public class OpentsdbDataHandler implements DataHandler {
private static final Logger LOG = LoggerFactory.getLogger(OpentsdbDataHandler.class);
private static final String DEFAULT_BATCH_SIZE = "1";
@Override
public long handle(RecordReceiver lineReceiver, Properties properties) {
// opentsdb json protocol use JNI and schemaless API to write
String host = properties.getProperty(Key.HOST);
int port = Integer.parseInt(properties.getProperty(Key.PORT));
String dbname = properties.getProperty(Key.DBNAME);
String user = properties.getProperty(Key.USER);
String password = properties.getProperty(Key.PASSWORD);
JniConnection conn = null;
long count = 0;
try {
conn = new JniConnection(properties);
conn.open(host, port, dbname, user, password);
LOG.info("TDengine connection established, host: " + host + ", port: " + port + ", dbname: " + dbname + ", user: " + user);
int batchSize = Integer.parseInt(properties.getProperty(Key.BATCH_SIZE, DEFAULT_BATCH_SIZE));
count = writeOpentsdb(lineReceiver, conn, batchSize);
} catch (Exception e) {
LOG.error(e.getMessage());
e.printStackTrace();
} finally {
try {
if (conn != null)
conn.close();
} catch (Exception e) {
e.printStackTrace();
}
LOG.info("TDengine connection closed");
}
return count;
}
private long writeOpentsdb(RecordReceiver lineReceiver, JniConnection conn, int batchSize) {
long recordIndex = 1;
try {
Record record;
StringBuilder sb = new StringBuilder();
while ((record = lineReceiver.getFromReader()) != null) {
if (batchSize == 1) {
String jsonData = recordToString(record);
LOG.debug(">>> " + jsonData);
conn.insertOpentsdbJson(jsonData);
} else if (recordIndex % batchSize == 1) {
sb.append("[").append(recordToString(record)).append(",");
} else if (recordIndex % batchSize == 0) {
sb.append(recordToString(record)).append("]");
String jsonData = sb.toString();
LOG.debug(">>> " + jsonData);
conn.insertOpentsdbJson(jsonData);
sb.delete(0, sb.length());
} else {
sb.append(recordToString(record)).append(",");
}
recordIndex++;
}
if (sb.length() != 0 && sb.charAt(0) == '[') {
String jsonData = sb.deleteCharAt(sb.length() - 1).append("]").toString();
LOG.debug(">>> " + jsonData);
conn.insertOpentsdbJson(jsonData);
}
} catch (Exception e) {
LOG.error("TDengineWriter ERROR: " + e.getMessage());
throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, e);
}
return recordIndex - 1;
}
private String recordToString(Record record) {
int recordLength = record.getColumnNumber();
if (0 == recordLength) {
return "";
}
Column column;
StringBuilder sb = new StringBuilder();
for (int i = 0; i < recordLength; i++) {
column = record.getColumn(i);
sb.append(column.asString()).append("\t");
}
sb.setLength(sb.length() - 1);
return sb.toString();
}
}

View File

@ -1,21 +1,20 @@
package com.alibaba.datax.plugin.writer;
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.spi.Writer;
import com.alibaba.datax.common.util.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.Set;
public class TDengineWriter extends Writer {
private static final String PEER_PLUGIN_NAME = "peerPluginName";
private static final String DEFAULT_BATCH_SIZE = "1";
public static class Job extends Writer.Job {
@ -67,85 +66,11 @@ public class TDengineWriter extends Writer {
}
String peerPluginName = this.writerSliceConfig.getString(PEER_PLUGIN_NAME);
if (peerPluginName.equals("opentsdbreader")) {
// opentsdb json protocol use JNI and schemaless API to write
String host = properties.getProperty(Key.HOST);
int port = Integer.parseInt(properties.getProperty(Key.PORT));
String dbname = properties.getProperty(Key.DBNAME);
String user = properties.getProperty(Key.USER);
String password = properties.getProperty(Key.PASSWORD);
JniConnection conn = null;
try {
conn = new JniConnection(properties);
conn.open(host, port, dbname, user, password);
LOG.info("TDengine connection established, host: " + host + ", port: " + port + ", dbname: " + dbname + ", user: " + user);
int batchSize = Integer.parseInt(properties.getProperty(Key.BATCH_SIZE, DEFAULT_BATCH_SIZE));
writeOpentsdb(lineReceiver, conn, batchSize);
} catch (Exception e) {
LOG.error(e.getMessage());
e.printStackTrace();
} finally {
try {
if (conn != null)
conn.close();
} catch (Exception e) {
e.printStackTrace();
}
LOG.info("TDengine connection closed");
}
} else {
// other
}
LOG.debug("start to handle record from: " + peerPluginName);
DataHandler handler = DataHandlerFactory.build(peerPluginName);
long records = handler.handle(lineReceiver, properties);
LOG.debug("handle data finished, records: " + records);
}
private void writeOpentsdb(RecordReceiver lineReceiver, JniConnection conn, int batchSize) {
try {
Record record;
StringBuilder sb = new StringBuilder();
long recordIndex = 1;
while ((record = lineReceiver.getFromReader()) != null) {
if (batchSize == 1) {
String jsonData = recordToString(record);
LOG.debug(">>> " + jsonData);
conn.insertOpentsdbJson(jsonData);
} else if (recordIndex % batchSize == 1) {
sb.append("[").append(recordToString(record)).append(",");
} else if (recordIndex % batchSize == 0) {
sb.append(recordToString(record)).append("]");
String jsonData = sb.toString();
LOG.debug(">>> " + jsonData);
conn.insertOpentsdbJson(jsonData);
sb.delete(0, sb.length());
} else {
sb.append(recordToString(record)).append(",");
}
recordIndex++;
}
if (sb.length() != 0 && sb.charAt(0) == '[') {
String jsonData = sb.deleteCharAt(sb.length() - 1).append("]").toString();
LOG.debug(">>> " + jsonData);
conn.insertOpentsdbJson(jsonData);
}
} catch (Exception e) {
LOG.error("TDengineWriter ERROR: " + e.getMessage());
throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, e);
}
}
private String recordToString(Record record) {
int recordLength = record.getColumnNumber();
if (0 == recordLength) {
return "";
}
Column column;
StringBuilder sb = new StringBuilder();
for (int i = 0; i < recordLength; i++) {
column = record.getColumn(i);
sb.append(column.asString()).append("\t");
}
sb.setLength(sb.length() - 1);
return sb.toString();
}
}
}