current position:Home>Using SeaTunnel to Play IoTDB Data Sync | Lecture Review

Using SeaTunnel to Play IoTDB Data Sync | Lecture Review

2022-11-24 23:07:34Apache IoTDB


在今年 10 月份 Apache IoTDB & SeaTunnel 联合 meetup 上,SeaTunnel Committer Wang Hailin brought the title《使用 SeaTunnel 玩转 IoTDB 数据同步》的主题演讲,详细阐述了 IoTDB 支持接入 SeaTunnel 的实现过程,Let users understand more deeply IoTDB The operation method and principle of data synchronization.This article has been modified from this speech.

本篇首先介绍 Apache IoTDB 和 SeaTunnel 的基本概念,And on this basis to introduce emphatically IoTDB Connector 的功能特性,I'll dig into it later IoTDB Connector The data reading and data writing functions and the analysis of the implementation,Finally, some typical usage scenarios and cases will be shown,让大家了解如何使用 IoTDB Connector Landing to the production environment.


Apache IoTDB 

& SeaTunnel 


Apache IoTDB It is an industrial Internet of Things time series database management system,Can be collected in one piece、存储、Manage and analyze IoT time series data,满足工业物联网领域的海量数据存储、高速数据读取和复杂数据分析需求.IoTDB 的架构如下图所示,分为三个部分:时序文件、数据库引擎和分析引擎.

time series file,TsFile 是 IoTDB 自研的一种专门为时序数据设计的文件存储格式,支持高效的压缩和查询性能,能够为上层应用提供出色的数据存储和查询支持.Store time series data in TsFile 后,即使用TsFile format entry IoTDB 数据管理引擎,Interactive tools are available at this stage for both query and preprocessing,Visualization operations can also be performed through the visualization platform.Analysis engine as an optional analysis component,Can further provide data import and export、数据分析、Abnormal monitoring and other functions,Can also be seamlessly integrated with other big data systems.


SeaTunnel 是一个分布式、高性能、易扩展、用于海量数据(离线&实时)同步和转化的数据集成平台.The structure diagram below can be seen,In order to read the data from various data sources and written to the variety of data sources,SeaTunnel Do a set of abstract API.基于这些API,SeaTunnel 会有一个 translation Layer these API Translate to the corresponding execution engine.数据同步依赖 Connector Connectors read and write data,连接器由 Source 和 Sink 构成.Source Responsible for reading data from various data sources,将其转化成 SeaTunnelRow 抽象层(匹配 SeaTunnel 定义的数据类型),Sink Responsible for pulling data from the abstraction layer,Write to a specific data store,Convert to storage specific format.




Apache IoTDB Connector 


First introduce the integration SeaTunnel 的 Apache IoTDB Connector What functions are supported,为大家提供参考.


Source 功能特性


首先是 IoTDB 支持 Source 的典型的使用场景,如上图所示,IoTDBSupport such as bulk load device、字段投影、数据类型映射、parallel read, etc..

IoTDB 有一个类似于 group by device 的 SQL 语法,Data from multiple devices can be read in batches.Basic data type when the projection,在IoTDB 的 SQL When querying any indicator, it will be brought by default time,或 group by device 会带上device 列,SeaTunnel Also by default supports projecting to SeaTunnel 的列上.

About parallel reads,IoTDB The data comes with a timestamp,Use timestamps to divide ranges for parallel reads.

状态的恢复,因为 SeaTunnel The time range for reading is divided into different split,所以 IoTDB 可以根据 split Location information for recovery.


Sink 功能特性


IoTDB 支持 Sink 的典型的使用场景,如上图所示.About metadata extraction,IoTDB 支持从 SeaTunnelRow 中提取 measurement、device 等元数据,支持从  SeaTunnelRow Extract from or use the current processing time.批量提交、Exception retry is also supported.


Apache IoTDB Data reading and analysis

接下来解析一下 IoTDB 在 SeaTunnel Analysis of the data reading function implemented on.



The first is the data type mapping,实际上是把 IoTDB The data type reads SeaTunnel 上,所以要转化成 SeaTunnel 的数据类型.


这里列出来的 IoTDB 的数据类型,BOOLEAN、INT32、INT64 等都有对应的 SeaTunnel 数据类型,So they can be converted into each other.其中 IoTDB 的 INT32 type is available according to SeaTunnel The read type on the mapped,When the value range is relatively small,也可以映射成TINYINT、SMALLINT或 INT.

The following figure shows the corresponding sample code,Show how to do the mapping where the type conversion is done.




Field projection on read,我们在读 IoTDB 数据时,可以自动映射 Time 字段,You can also choose to map part of the data to SeaTunnel 上,比如TIMESTAMP、BIGINT.

通过 IoTDB 的 SQLextract column code,You can extract only some of the required columns,在 SeaTunnel 上使用时,可以通过 feilds to specify that the column maps to SeaTunnel 后的名字、类型等.最后读取到SeaTunnelThe above data results are shown in the figure below.


Everyone saw it above,我们 SQL there is no check time 这个列,But the actual detection result has this column,所以 IoTDB 的 time Column fields can also be in SeaTunnel 上进行投影,time Columns can actually be projected into different datatypes,Users can convert according to their needs.The following figure shows the implementation logic.



批量读取 device

Reads also involve bulk reads device.这是一个常见的需求,Because when we synchronize data, we may do data synchronization with the same data structure in large batches..

IoTDB 中的 align by device 语法 SeaTunnel 也可用,这样就可以把 deviceColumns are also projected to SeaTunnelRow 上,如下图所示.


假设  IoTDB 中有一张表,Grammarly we put  device Columns are also made into data,投影到 SeaTunnel 上,配置了device name column and after specifying the data type,we end up reading SeaTunnel The data on the format as shown in the figure below,包含time、device列,and the actual data value.This allows batch reads to do the samedevice的数据.




If you want to read in parallel,We may have to pass the data range of this table IoTDB 中的 time Columns are ranged,Parallel threads/A process reads a specific range of data.按照 time 范围划分,SeaTunnel Three configurations need to be set,lower_bound、upper_bound 和 num_partitions.By configuring three parameters,The final result will be transformed into a query based on this SQL,原始的 SQL Will add the query conditions,划分成不同的 split to achieve the actual read SQL.

split divided,The next step is to distribute to each parallel reader, need to follow a distribution logic.This logic is based on split 中的 ID 向 reader 取模,This may be more random,如果 split 的 ID If you compare the hash, it will be more uniform,要根据 Connector specific circumstances to achieve.






When reading will also involve state recovery,Because if the task is large,It will take longer to read,If there is an error or exception,It is necessary to consider how to restore the state from the point of error,read again after recovery.SeaTunnel Mainly through the status of the resume reader 把 未读取的 split 信息存到 state里,And then the engine when read on a regular basis for state do snapshot retention,When restoring, you can restore the last snapshot,Continue reading after recovery.





IoTDB Connector 

Data write analysis

接下来是关于 IoTDB 在 SeaTunnel The implementation of analytical data to function.



Data writing also involves data type mapping,But here instead of data read,是把 SeaTunnel 的数据类型转换为IoTDB 类型.IoTDB 的 INT32 During the writing process will involve TINYINT 和 SMALLINT Data type promotion for,Other data types can be one-to-one transformation.


The following figure shows the corresponding code,To implement the logic, we need to look at our specific mapping.



Metadata dynamic injection

There is also the problem of original data injection when writing this block,SeaTunnel Support dynamic injection of metadata.When heterogeneous data sources write IoTDB 时,Support extracting from each row of data device、measurement、time,Methods through the serialization SeaTunnelRow When extracting fixed column values ​​by configuration.or use the system time as time,如果未指定 time Columns are filled the current system time;同时,还支持配置 storage group,automatically attached to device 前缀.


举例来说,假设在 SeaTunnel Read the data format shown in the figure above row 的结构,Can be configured to sync to IoTDB 中,获得的结果如下:


Extracted the temperature we need、The two columns of humidity,并提取了 ts 和device name来作为 IoTDB 的原数据.


Batch submission and exception retry

另外,Sink Need to do processing batching and retries when writing.对于批量,You can configure the corresponding batch configuration,Including support for configuring the number and interval of batch submissions;If the data is cached in memory,Can open independent thread submitted regularly.对于重试,SeaTunnel 支持配置重试次数,Waiting interval and maximum number of retries,and when the retries are complete,It can also end if it encounters an unrecoverable error.



IoTDB Connector 

Data write analysis

After the previous parsing of read data and write data,Let's look at three typical usage scenario examples.


从 IoTDB 导出数据

The first scene is from IoTDB 导出数据,The example given here is from IoTDB read data in parallel to Console 上.并行度为2,批次数为24,时间范围为 2022-09-25 ~ 2022-09-26.

我们假设在 IoTDB There is a data sheet,We're going to import the data into Console 上,The entire configuration is shown in the figure below,We need to map the data columns we want to export and the time range of the query.这是一个最简单的示例,In practical use Sink 端更为复杂,You need to refer to the documentation of the corresponding data source to make the corresponding configuration.



导入数据到 IoTDB

Another typical usage scenario is to batch write data from other data sources to IoTDB.Suppose you have an external database tables,有 ts、温度、Humidity column,我们将其导入到 IoTDB 中,Two columns of temperature and humidity are required,其他的可以不要.The entire configuration is shown in the figure below,大家可以参考.In this example, the batch write frequency is every 1024 bar or per 1000 ms 提交一次,To extract metadata device、timestamp,measurement.


在 Sink terminal is mainly to specify device 列的 key,For example, what data is extracted from device、Which class is the time extracted from、what to write to IoTDB 等.可以看到,我们可以配置  storage group,也就是 IoTDB 的存储组,可以通过storage group specify storage group.This example specifies the storage group as root.test_group.


IoTDB 之间同步数据

The third usage scenario is when IoTDB 与 IoTDB 之间同步数据,批量写入到 IoTDB.假设 IoTDB There is a table in that needs to be synchronized to another IoTDB,The storage group has changed since the sync passed,The name of the index of the data column has also changed,Then you can use the projection index name,并使用 SQL overwrite storage group.



copyright notice
author[Apache IoTDB],Please bring the original link to reprint, thank you.

Random recommended