DataX/otsstreamreader
2022-01-13 19:36:22 +08:00
..
src/main v0.0 2018-01-31 16:24:49 +08:00
tools v0.0 2018-01-31 16:24:49 +08:00
pom.xml fix log4j 2022-01-13 19:36:22 +08:00
README.md v0.0 2018-01-31 16:24:49 +08:00

TableStore增量数据导出通道TableStoreStreamReader

快速介绍

TableStoreStreamReader插件主要用于TableStore的增量数据导出增量数据可以看作操作日志除了数据本身外还附有操作信息。

与全量导出插件不同,增量导出插件只有多版本模式,同时不支持指定列。这是与增量导出的原理有关的,导出的格式下面有详细介绍。

使用插件前必须确保表上已经开启Stream功能可以在建表的时候指定开启或者使用SDK的UpdateTable接口开启。

开启Stream的方法
SyncClient client = new SyncClient("", "", "", "");
1. 建表的时候开启:
CreateTableRequest createTableRequest = new CreateTableRequest(tableMeta);
createTableRequest.setStreamSpecification(new StreamSpecification(true, 24)); // 24代表增量数据保留24小时
client.createTable(createTableRequest);

2. 如果建表时未开启可以通过UpdateTable开启:
UpdateTableRequest updateTableRequest = new UpdateTableRequest("tableName");
updateTableRequest.setStreamSpecification(new StreamSpecification(true, 24)); 
client.updateTable(updateTableRequest);

实现原理

首先用户使用SDK的UpdateTable功能指定开启Stream并设置过期时间即开启了增量功能。

开启后TableStore服务端就会将用户的操作日志额外保存起来 每个分区有一个有序的操作日志队列,每条操作日志会在一定时间后被垃圾回收,这个时间即用户指定的过期时间。

TableStore的SDK提供了几个Stream相关的API用于将这部分操作日志读取出来增量插件也是通过TableStore SDK的接口获取到增量数据的并将 增量数据转化为多个6元组的形式(pk, colName, version, colValue, opType, sequenceInfo)导入到ODPS中。

Reader的配置模版

"reader": {
    "name" : "otsstreamreader",
    "parameter" : {
        "endpoint" : "",
        "accessId" : "",
        "accessKey" : "",
        "instanceName" : "",
        //dataTable即需要导出数据的表。
        "dataTable" : "",  
        //statusTable是Reader用于保存状态的表若该表不存在Reader会自动创建该表。
        //一次离线导出任务完成后,用户不应删除该表,该表中记录的状态可用于下次导出任务中。
        "statusTable" : "TableStoreStreamReaderStatusTable",  
        //增量数据的时间范围(左闭右开)的左边界。
        "startTimestampMillis" : "",
        //增量数据的时间范围(左闭右开)的右边界。
        "endTimestampMillis" : "",
        //采云间调度只支持天级别所以提供该配置作用与startTimestampMillis和endTimestampMillis类似。
        "date": "",
        //是否导出时序信息。
        "isExportSequenceInfo": true,
        //从TableStore中读增量数据时每次请求的最大重试次数默认为30。
        "maxRetries" : 30
    }
}

参数说明

名称 说明 类型 必选
endpoint TableStoreServer的Endpoint地址。 String
accessId 用于访问TableStore服务的accessId。 String
accessKey 用于访问TableStore服务的accessKey。 String
instanceName TableStore的实例名称。 String
dataTable 需要导出增量数据的表的名称。该表需要开启Stream可以在建表时开启或者使用UpdateTable接口开启。 String
statusTable Reader插件用于记录状态的表的名称这些状态可用于减少对非目标范围内的数据的扫描从而加快导出速度。
1. 用户不需要创建该表只需要给出一个表名。Reader插件会尝试在用户的instance下创建该表若该表不存在即创建新表若该表已存在会判断该表的Meta是否与期望一致若不一致会抛出异常。
2. 在一次导出完成之后,用户不应删除该表,该表的状态可用于下次导出任务。
3. 该表会开启TTL数据自动过期因此可认为其数据量很小。
4. 针对同一个instance下的多个不同的dataTable的Reader配置可以使用同一个statusTable记录的状态信息互不影响。
综上用户配置一个类似TableStoreStreamReaderStatusTable之类的名称即可注意不要与业务相关的表重名。
String
startTimestampMillis 增量数据的时间范围(左闭右开)的左边界,单位毫秒。
1. Reader插件会从statusTable中找对应startTimestampMillis的位点从该点开始读取开始导出数据。
2. 若statusTable中找不到对应的位点则从系统保留的增量数据的第一条开始读取并跳过写入时间小于startTimestampMillis的数据。
Long
endTimestampMillis 增量数据的时间范围(左闭右开)的右边界,单位毫秒。
1. Reader插件从startTimestampMillis位置开始导出数据后当遇到第一条时间戳大于等于endTimestampMillis的数据时结束导出数据导出完成。
2. 当读取完当前全部的增量数据时结束读取即使未达到endTimestampMillis。
Long
date 日期格式为yyyyMMdd如20151111表示导出该日的数据。
若没有指定date则必须指定startTimestampMillis和endTimestampMillis反之也成立。
String
isExportSequenceInfo 是否导出时序信息时序信息包含了数据的写入时间等。默认该值为false即不导出。 Boolean
maxRetries 从TableStore中读增量数据时每次请求的最大重试次数默认为30重试之间有间隔30次重试总时间约为5分钟一般无需更改。 Int

导出的数据格式

首先在TableStore多版本模型下表中的数据组织为“行版本”三级的模式 一行可以有任意列,列名也并非固定的,每一列可以含有多个版本,每个版本都有一个特定的时间戳(版本号)。

用户可以通过TableStore的API进行一系列读写操作 TableStore通过记录用户最近对表的一系列写操作或称为数据更改操作来实现记录增量数据的目的 所以也可以把增量数据看作一批操作记录。

TableStore有三类数据更改操作PutRow、UpdateRow、DeleteRow。

  • PutRow的语义是写入一行若该行已存在即覆盖该行。

  • UpdateRow的语义是更新一行对原行其他数据不做更改 更新可能包括新增或覆盖(若对应列的对应版本已存在)一些列值、删除某一列的全部版本、删除某一列的某个版本。

  • DeleteRow的语义是删除一行。

TableStore会根据每种操作生成对应的增量数据记录Reader插件会读出这些记录并导出成Datax的数据格式。

同时由于TableStore具有动态列、多版本的特性所以Reader插件导出的一行不对应TableStore中的一行而是对应TableStore中的一列的一个版本。 即TableStore中的一行可能会导出很多行每行包含主键值、该列的列名、该列下该版本的时间戳版本号、该版本的值、操作类型。若设置isExportSequenceInfo为true还会包括时序信息。

转换为Datax的数据格式后我们定义了四种操作类型分别为:

  • UUPDATE: 写入一列的一个版本

  • DODELETE_ONE_VERSION: 删除某一列的某个版本

  • DADELETE_ALL_VERSION: 删除某一列的全部版本,此时需要根据主键和列名,将对应列的全部版本删除

  • DRDELETE_ROW: 删除某一行,此时需要根据主键,将该行数据全部删除

举例如下假设该表有两个主键列主键列名分别为pkName1, pkName2

pkName1 pkName2 columnName timestamp columnValue opType
pk1_V1 pk2_V1 col_a 1441803688001 col_val1 U
pk1_V1 pk2_V1 col_a 1441803688002 col_val2 U
pk1_V1 pk2_V1 col_b 1441803688003 col_val3 U
pk1_V2 pk2_V2 col_a 1441803688000 DO
pk1_V2 pk2_V2 col_b DA
pk1_V3 pk2_V3 DR
pk1_V3 pk2_V3 col_a 1441803688005 col_val1 U

假设导出的数据如上共7行对应TableStore表内的3行主键分别是(pk1_V1,pk2_V1), (pk1_V2, pk2_V2), (pk1_V3, pk2_V3)。

对于主键为(pk1_V1, pk2_V1)的一行包含三个操作分别是写入col_a列的两个版本和col_b列的一个版本。

对于主键为(pk1_V2, pk2_V2)的一行包含两个操作分别是删除col_a列的一个版本、删除col_b列的全部版本。

对于主键为(pk1_V3, pk2_V3)的一行包含两个操作分别是删除整行、写入col_a列的一个版本。