mirror of
https://github.com/alibaba/DataX.git
synced 2025-05-04 01:32:10 +08:00
data handle use stratgy pattern
This commit is contained in:
parent
ab526ca5c4
commit
7d9543105d
56
core/src/main/job/stream2tdengine.json
Normal file
56
core/src/main/job/stream2tdengine.json
Normal file
@ -0,0 +1,56 @@
|
|||||||
|
{
|
||||||
|
"job": {
|
||||||
|
"content": [
|
||||||
|
{
|
||||||
|
"reader": {
|
||||||
|
"name": "streamreader",
|
||||||
|
"parameter": {
|
||||||
|
"column": [
|
||||||
|
{
|
||||||
|
"random": "2021-01-01 00:00:00, 2021-01-01 23:59:59",
|
||||||
|
"type": "date"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"random": "0, 10000",
|
||||||
|
"type": "long"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"random": "0, 10",
|
||||||
|
"type": "string"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"random": "0, 5",
|
||||||
|
"type": "bool"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"random": "0, 10",
|
||||||
|
"type": "double"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"random": "0, 10",
|
||||||
|
"type": "bytes"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"sliceRecordCount": 100
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"writer": {
|
||||||
|
"name": "tdenginewriter",
|
||||||
|
"parameter": {
|
||||||
|
"host": "192.168.56.105",
|
||||||
|
"port": 6030,
|
||||||
|
"dbname": "test",
|
||||||
|
"user": "root",
|
||||||
|
"password": "taosdata",
|
||||||
|
"batchSize": 1000
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"setting": {
|
||||||
|
"speed": {
|
||||||
|
"channel": 1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -8,7 +8,8 @@ public class EngineTest {
|
|||||||
@Test
|
@Test
|
||||||
public void test() {
|
public void test() {
|
||||||
System.out.println(System.getProperty("java.library.path"));
|
System.out.println(System.getProperty("java.library.path"));
|
||||||
String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/main/job/opentsdb2tdengine.json"};
|
// String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/main/job/opentsdb2tdengine.json"};
|
||||||
|
String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/main/job/stream2tdengine.json"};
|
||||||
System.setProperty("datax.home", "../target/datax/datax");
|
System.setProperty("datax.home", "../target/datax/datax");
|
||||||
try {
|
try {
|
||||||
Engine.entry(params);
|
Engine.entry(params);
|
||||||
|
@ -0,0 +1,10 @@
|
|||||||
|
package com.alibaba.datax.plugin.writer;
|
||||||
|
|
||||||
|
import com.alibaba.datax.common.plugin.RecordReceiver;
|
||||||
|
|
||||||
|
import java.util.Properties;
|
||||||
|
|
||||||
|
public interface DataHandler {
|
||||||
|
|
||||||
|
long handle(RecordReceiver lineReceiver, Properties properties);
|
||||||
|
}
|
@ -0,0 +1,10 @@
|
|||||||
|
package com.alibaba.datax.plugin.writer;
|
||||||
|
|
||||||
|
public class DataHandlerFactory {
|
||||||
|
|
||||||
|
public static DataHandler build(String peerPluginName) {
|
||||||
|
if (peerPluginName.equals("opentsdbreader"))
|
||||||
|
return new OpentsdbDataHandler();
|
||||||
|
return new DefaultDataHandler();
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,34 @@
|
|||||||
|
package com.alibaba.datax.plugin.writer;
|
||||||
|
|
||||||
|
import com.alibaba.datax.common.element.Column;
|
||||||
|
import com.alibaba.datax.common.element.Record;
|
||||||
|
import com.alibaba.datax.common.plugin.RecordReceiver;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.util.Properties;
|
||||||
|
|
||||||
|
public class DefaultDataHandler implements DataHandler {
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(DefaultDataHandler.class);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long handle(RecordReceiver lineReceiver, Properties properties) {
|
||||||
|
long count = 0;
|
||||||
|
Record record;
|
||||||
|
while ((record = lineReceiver.getFromReader()) != null) {
|
||||||
|
|
||||||
|
int recordLength = record.getColumnNumber();
|
||||||
|
StringBuilder sb = new StringBuilder();
|
||||||
|
for (int i = 0; i < recordLength; i++) {
|
||||||
|
Column column = record.getColumn(i);
|
||||||
|
sb.append(column.asString()).append("\t");
|
||||||
|
}
|
||||||
|
sb.setLength(sb.length() - 1);
|
||||||
|
LOG.debug(sb.toString());
|
||||||
|
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
return count;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,98 @@
|
|||||||
|
package com.alibaba.datax.plugin.writer;
|
||||||
|
|
||||||
|
import com.alibaba.datax.common.element.Column;
|
||||||
|
import com.alibaba.datax.common.element.Record;
|
||||||
|
import com.alibaba.datax.common.exception.DataXException;
|
||||||
|
import com.alibaba.datax.common.plugin.RecordReceiver;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.util.Properties;
|
||||||
|
|
||||||
|
public class OpentsdbDataHandler implements DataHandler {
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(OpentsdbDataHandler.class);
|
||||||
|
private static final String DEFAULT_BATCH_SIZE = "1";
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long handle(RecordReceiver lineReceiver, Properties properties) {
|
||||||
|
// opentsdb json protocol use JNI and schemaless API to write
|
||||||
|
String host = properties.getProperty(Key.HOST);
|
||||||
|
int port = Integer.parseInt(properties.getProperty(Key.PORT));
|
||||||
|
String dbname = properties.getProperty(Key.DBNAME);
|
||||||
|
String user = properties.getProperty(Key.USER);
|
||||||
|
String password = properties.getProperty(Key.PASSWORD);
|
||||||
|
|
||||||
|
JniConnection conn = null;
|
||||||
|
long count = 0;
|
||||||
|
try {
|
||||||
|
conn = new JniConnection(properties);
|
||||||
|
conn.open(host, port, dbname, user, password);
|
||||||
|
LOG.info("TDengine connection established, host: " + host + ", port: " + port + ", dbname: " + dbname + ", user: " + user);
|
||||||
|
int batchSize = Integer.parseInt(properties.getProperty(Key.BATCH_SIZE, DEFAULT_BATCH_SIZE));
|
||||||
|
count = writeOpentsdb(lineReceiver, conn, batchSize);
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error(e.getMessage());
|
||||||
|
e.printStackTrace();
|
||||||
|
} finally {
|
||||||
|
try {
|
||||||
|
if (conn != null)
|
||||||
|
conn.close();
|
||||||
|
} catch (Exception e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
LOG.info("TDengine connection closed");
|
||||||
|
}
|
||||||
|
|
||||||
|
return count;
|
||||||
|
}
|
||||||
|
|
||||||
|
private long writeOpentsdb(RecordReceiver lineReceiver, JniConnection conn, int batchSize) {
|
||||||
|
long recordIndex = 1;
|
||||||
|
try {
|
||||||
|
Record record;
|
||||||
|
StringBuilder sb = new StringBuilder();
|
||||||
|
while ((record = lineReceiver.getFromReader()) != null) {
|
||||||
|
if (batchSize == 1) {
|
||||||
|
String jsonData = recordToString(record);
|
||||||
|
LOG.debug(">>> " + jsonData);
|
||||||
|
conn.insertOpentsdbJson(jsonData);
|
||||||
|
} else if (recordIndex % batchSize == 1) {
|
||||||
|
sb.append("[").append(recordToString(record)).append(",");
|
||||||
|
} else if (recordIndex % batchSize == 0) {
|
||||||
|
sb.append(recordToString(record)).append("]");
|
||||||
|
String jsonData = sb.toString();
|
||||||
|
LOG.debug(">>> " + jsonData);
|
||||||
|
conn.insertOpentsdbJson(jsonData);
|
||||||
|
sb.delete(0, sb.length());
|
||||||
|
} else {
|
||||||
|
sb.append(recordToString(record)).append(",");
|
||||||
|
}
|
||||||
|
recordIndex++;
|
||||||
|
}
|
||||||
|
if (sb.length() != 0 && sb.charAt(0) == '[') {
|
||||||
|
String jsonData = sb.deleteCharAt(sb.length() - 1).append("]").toString();
|
||||||
|
LOG.debug(">>> " + jsonData);
|
||||||
|
conn.insertOpentsdbJson(jsonData);
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("TDengineWriter ERROR: " + e.getMessage());
|
||||||
|
throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, e);
|
||||||
|
}
|
||||||
|
return recordIndex - 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
private String recordToString(Record record) {
|
||||||
|
int recordLength = record.getColumnNumber();
|
||||||
|
if (0 == recordLength) {
|
||||||
|
return "";
|
||||||
|
}
|
||||||
|
Column column;
|
||||||
|
StringBuilder sb = new StringBuilder();
|
||||||
|
for (int i = 0; i < recordLength; i++) {
|
||||||
|
column = record.getColumn(i);
|
||||||
|
sb.append(column.asString()).append("\t");
|
||||||
|
}
|
||||||
|
sb.setLength(sb.length() - 1);
|
||||||
|
return sb.toString();
|
||||||
|
}
|
||||||
|
}
|
@ -1,21 +1,20 @@
|
|||||||
package com.alibaba.datax.plugin.writer;
|
package com.alibaba.datax.plugin.writer;
|
||||||
|
|
||||||
|
|
||||||
import com.alibaba.datax.common.element.Column;
|
|
||||||
import com.alibaba.datax.common.element.Record;
|
|
||||||
import com.alibaba.datax.common.exception.DataXException;
|
|
||||||
import com.alibaba.datax.common.plugin.RecordReceiver;
|
import com.alibaba.datax.common.plugin.RecordReceiver;
|
||||||
import com.alibaba.datax.common.spi.Writer;
|
import com.alibaba.datax.common.spi.Writer;
|
||||||
import com.alibaba.datax.common.util.Configuration;
|
import com.alibaba.datax.common.util.Configuration;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.util.*;
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Properties;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
public class TDengineWriter extends Writer {
|
public class TDengineWriter extends Writer {
|
||||||
|
|
||||||
private static final String PEER_PLUGIN_NAME = "peerPluginName";
|
private static final String PEER_PLUGIN_NAME = "peerPluginName";
|
||||||
private static final String DEFAULT_BATCH_SIZE = "1";
|
|
||||||
|
|
||||||
public static class Job extends Writer.Job {
|
public static class Job extends Writer.Job {
|
||||||
|
|
||||||
@ -67,85 +66,11 @@ public class TDengineWriter extends Writer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
String peerPluginName = this.writerSliceConfig.getString(PEER_PLUGIN_NAME);
|
String peerPluginName = this.writerSliceConfig.getString(PEER_PLUGIN_NAME);
|
||||||
if (peerPluginName.equals("opentsdbreader")) {
|
LOG.debug("start to handle record from: " + peerPluginName);
|
||||||
// opentsdb json protocol use JNI and schemaless API to write
|
DataHandler handler = DataHandlerFactory.build(peerPluginName);
|
||||||
String host = properties.getProperty(Key.HOST);
|
long records = handler.handle(lineReceiver, properties);
|
||||||
int port = Integer.parseInt(properties.getProperty(Key.PORT));
|
LOG.debug("handle data finished, records: " + records);
|
||||||
String dbname = properties.getProperty(Key.DBNAME);
|
|
||||||
String user = properties.getProperty(Key.USER);
|
|
||||||
String password = properties.getProperty(Key.PASSWORD);
|
|
||||||
|
|
||||||
JniConnection conn = null;
|
|
||||||
try {
|
|
||||||
conn = new JniConnection(properties);
|
|
||||||
conn.open(host, port, dbname, user, password);
|
|
||||||
LOG.info("TDengine connection established, host: " + host + ", port: " + port + ", dbname: " + dbname + ", user: " + user);
|
|
||||||
int batchSize = Integer.parseInt(properties.getProperty(Key.BATCH_SIZE, DEFAULT_BATCH_SIZE));
|
|
||||||
writeOpentsdb(lineReceiver, conn, batchSize);
|
|
||||||
} catch (Exception e) {
|
|
||||||
LOG.error(e.getMessage());
|
|
||||||
e.printStackTrace();
|
|
||||||
} finally {
|
|
||||||
try {
|
|
||||||
if (conn != null)
|
|
||||||
conn.close();
|
|
||||||
} catch (Exception e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
}
|
|
||||||
LOG.info("TDengine connection closed");
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// other
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void writeOpentsdb(RecordReceiver lineReceiver, JniConnection conn, int batchSize) {
|
|
||||||
try {
|
|
||||||
Record record;
|
|
||||||
StringBuilder sb = new StringBuilder();
|
|
||||||
long recordIndex = 1;
|
|
||||||
while ((record = lineReceiver.getFromReader()) != null) {
|
|
||||||
if (batchSize == 1) {
|
|
||||||
String jsonData = recordToString(record);
|
|
||||||
LOG.debug(">>> " + jsonData);
|
|
||||||
conn.insertOpentsdbJson(jsonData);
|
|
||||||
} else if (recordIndex % batchSize == 1) {
|
|
||||||
sb.append("[").append(recordToString(record)).append(",");
|
|
||||||
} else if (recordIndex % batchSize == 0) {
|
|
||||||
sb.append(recordToString(record)).append("]");
|
|
||||||
String jsonData = sb.toString();
|
|
||||||
LOG.debug(">>> " + jsonData);
|
|
||||||
conn.insertOpentsdbJson(jsonData);
|
|
||||||
sb.delete(0, sb.length());
|
|
||||||
} else {
|
|
||||||
sb.append(recordToString(record)).append(",");
|
|
||||||
}
|
|
||||||
recordIndex++;
|
|
||||||
}
|
|
||||||
if (sb.length() != 0 && sb.charAt(0) == '[') {
|
|
||||||
String jsonData = sb.deleteCharAt(sb.length() - 1).append("]").toString();
|
|
||||||
LOG.debug(">>> " + jsonData);
|
|
||||||
conn.insertOpentsdbJson(jsonData);
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
LOG.error("TDengineWriter ERROR: " + e.getMessage());
|
|
||||||
throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private String recordToString(Record record) {
|
|
||||||
int recordLength = record.getColumnNumber();
|
|
||||||
if (0 == recordLength) {
|
|
||||||
return "";
|
|
||||||
}
|
|
||||||
Column column;
|
|
||||||
StringBuilder sb = new StringBuilder();
|
|
||||||
for (int i = 0; i < recordLength; i++) {
|
|
||||||
column = record.getColumn(i);
|
|
||||||
sb.append(column.asString()).append("\t");
|
|
||||||
}
|
|
||||||
sb.setLength(sb.length() - 1);
|
|
||||||
return sb.toString();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user