[TD-10787]<feature>: migrate mysql data to tdengine

This commit is contained in:
zyyang 2021-11-12 15:48:24 +08:00
parent bb0d715eae
commit 906cc24ba1
11 changed files with 125 additions and 55 deletions

View File

@ -6,19 +6,18 @@
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "root",
"password": "123456",
"column": [
"id",
"name"
"*"
],
"splitPk": "db_id",
"splitPk": "f1",
"connection": [
{
"table": [
"test"
"weather"
],
"jdbcUrl": [
"jdbc:mysql://127.0.0.1:3306/database"
"jdbc:mysql://192.168.56.105:3306/test?useSSL=false&useUnicode=true&characterEncoding=utf8"
]
}
]
@ -32,6 +31,7 @@
"dbname": "test",
"user": "root",
"password": "taosdata",
"table": "weather",
"batchSize": 1000
}
}

View File

@ -1,31 +0,0 @@
{
"job": {
"content": [
{
"reader": {
"name": "opentsdbreader",
"parameter": {
"endpoint": "http://192.168.1.180:4242",
"column": [
"weather.temperature"
],
"beginDateTime": "2021-01-01 00:00:00",
"endDateTime": "2021-01-01 01:00:00"
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "UTF-8",
"print": true
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}

View File

@ -9,7 +9,7 @@ public class EngineTest {
public void test() {
System.out.println(System.getProperty("java.library.path"));
// String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/main/job/opentsdb2tdengine.json"};
String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/main/job/stream2tdengine.json"};
String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/main/job/mysql2tdengine.json"};
System.setProperty("datax.home", "../target/datax/datax");
try {
Engine.entry(params);

View File

@ -31,6 +31,12 @@
</exclusions>
</dependency>
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>2.0.34</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>

View File

@ -0,0 +1,20 @@
package com.alibaba.datax.plugin.writer;
import com.alibaba.datax.common.util.Configuration;
import java.util.Properties;
import java.util.Set;
public class CommonUtil {
public static Properties toProperties(Configuration configuration) {
Set<String> keys = configuration.getKeys();
Properties properties = new Properties();
for (String key : keys) {
String value = configuration.getString(key);
if (value != null)
properties.setProperty(key, value);
}
return properties;
}
}

View File

@ -1,10 +1,9 @@
package com.alibaba.datax.plugin.writer;
import com.alibaba.datax.common.plugin.RecordReceiver;
import java.util.Properties;
import com.alibaba.datax.common.util.Configuration;
public interface DataHandler {
long handle(RecordReceiver lineReceiver, Properties properties);
long handle(RecordReceiver lineReceiver, Configuration properties);
}

View File

@ -5,6 +5,8 @@ public class DataHandlerFactory {
public static DataHandler build(String peerPluginName) {
if (peerPluginName.equals("opentsdbreader"))
return new OpentsdbDataHandler();
if (peerPluginName.equals("mysqlreader"))
return new MysqlDataHandler();
return new DefaultDataHandler();
}
}

View File

@ -3,16 +3,15 @@ package com.alibaba.datax.plugin.writer;
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.util.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
public class DefaultDataHandler implements DataHandler {
private static final Logger LOG = LoggerFactory.getLogger(DefaultDataHandler.class);
@Override
public long handle(RecordReceiver lineReceiver, Properties properties) {
public long handle(RecordReceiver lineReceiver, Configuration configuration) {
long count = 0;
Record record;
while ((record = lineReceiver.getFromReader()) != null) {

View File

@ -0,0 +1,73 @@
package com.alibaba.datax.plugin.writer;
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.util.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Properties;
public class MysqlDataHandler implements DataHandler {
private static final Logger LOG = LoggerFactory.getLogger(MysqlDataHandler.class);
Connection conn;
@Override
public long handle(RecordReceiver lineReceiver, Configuration configuration) {
Properties properties = CommonUtil.toProperties(configuration);
long count = 0;
try {
conn = getConnection(properties);
Record record;
while ((record = lineReceiver.getFromReader()) != null) {
int recordLength = record.getColumnNumber();
StringBuilder sb = new StringBuilder();
for (int i = 0; i < recordLength; i++) {
Column column = record.getColumn(i);
sb.append(column.asString()).append("\t");
}
sb.setLength(sb.length() - 1);
LOG.debug(sb.toString());
count++;
}
} finally {
if (conn != null) {
try {
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
return count;
}
private Connection getConnection(Properties properties) {
String host = properties.getProperty(Key.HOST);
int port = Integer.parseInt(properties.getProperty(Key.PORT));
String dbname = properties.getProperty(Key.DBNAME);
String user = properties.getProperty(Key.USER);
String password = properties.getProperty(Key.PASSWORD);
String url = "jdbc:TAOS://" + host + ":" + port + "/" + dbname + "?user=" + user + "&password=" + password;
Connection connection = null;
try {
connection = DriverManager.getConnection(url, properties);
LOG.info("TDengine connection established, host: " + host + ", port: " + port + ", dbname: " + dbname + ", user: " + user);
} catch (SQLException e) {
LOG.error(e.getMessage());
e.printStackTrace();
}
return connection;
}
}

View File

@ -4,6 +4,7 @@ import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.util.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -14,8 +15,9 @@ public class OpentsdbDataHandler implements DataHandler {
private static final String DEFAULT_BATCH_SIZE = "1";
@Override
public long handle(RecordReceiver lineReceiver, Properties properties) {
public long handle(RecordReceiver lineReceiver, Configuration configuration) {
// opentsdb json protocol use JNI and schemaless API to write
Properties properties = CommonUtil.toProperties(configuration);
String host = properties.getProperty(Key.HOST);
int port = Integer.parseInt(properties.getProperty(Key.PORT));
String dbname = properties.getProperty(Key.DBNAME);

View File

@ -9,13 +9,19 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.Set;
public class TDengineWriter extends Writer {
private static final String PEER_PLUGIN_NAME = "peerPluginName";
static {
try {
Class.forName("com.taosdata.jdbc.TSDBDriver");
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
public static class Job extends Writer.Job {
private Configuration originalConfig;
@ -49,6 +55,7 @@ public class TDengineWriter extends Writer {
@Override
public void init() {
this.writerSliceConfig = getPluginJobConf();
}
@Override
@ -58,17 +65,10 @@ public class TDengineWriter extends Writer {
@Override
public void startWrite(RecordReceiver lineReceiver) {
Set<String> keys = this.writerSliceConfig.getKeys();
Properties properties = new Properties();
for (String key : keys) {
String value = this.writerSliceConfig.getString(key);
properties.setProperty(key, value);
}
String peerPluginName = this.writerSliceConfig.getString(PEER_PLUGIN_NAME);
LOG.debug("start to handle record from: " + peerPluginName);
DataHandler handler = DataHandlerFactory.build(peerPluginName);
long records = handler.handle(lineReceiver, properties);
long records = handler.handle(lineReceiver, writerSliceConfig);
LOG.debug("handle data finished, records: " + records);
}