current position：Home>Using SeaTunnel to Play IoTDB Data Sync | Lecture Review
Using SeaTunnel to Play IoTDB Data Sync | Lecture Review
2022-11-24 23:07:34【Apache IoTDB】
在今年 10 月份 Apache IoTDB & SeaTunnel 联合 meetup 上,SeaTunnel Committer Wang Hailin brought the title《使用 SeaTunnel 玩转 IoTDB 数据同步》的主题演讲,详细阐述了 IoTDB 支持接入 SeaTunnel 的实现过程,Let users understand more deeply IoTDB The operation method and principle of data synchronization.This article has been modified from this speech.
本篇首先介绍 Apache IoTDB 和 SeaTunnel 的基本概念,And on this basis to introduce emphatically IoTDB Connector 的功能特性,I'll dig into it later IoTDB Connector The data reading and data writing functions and the analysis of the implementation,Finally, some typical usage scenarios and cases will be shown,让大家了解如何使用 IoTDB Connector Landing to the production environment.
Apache IoTDB It is an industrial Internet of Things time series database management system,Can be collected in one piece、存储、Manage and analyze IoT time series data,满足工业物联网领域的海量数据存储、高速数据读取和复杂数据分析需求.IoTDB 的架构如下图所示,分为三个部分：时序文件、数据库引擎和分析引擎.
time series file,TsFile 是 IoTDB 自研的一种专门为时序数据设计的文件存储格式,支持高效的压缩和查询性能,能够为上层应用提供出色的数据存储和查询支持.Store time series data in TsFile 后,即使用TsFile format entry IoTDB 数据管理引擎,Interactive tools are available at this stage for both query and preprocessing,Visualization operations can also be performed through the visualization platform.Analysis engine as an optional analysis component,Can further provide data import and export、数据分析、Abnormal monitoring and other functions,Can also be seamlessly integrated with other big data systems.
SeaTunnel 是一个分布式、高性能、易扩展、用于海量数据（离线&实时）同步和转化的数据集成平台.The structure diagram below can be seen,In order to read the data from various data sources and written to the variety of data sources,SeaTunnel Do a set of abstract API.基于这些API,SeaTunnel 会有一个 translation Layer these API Translate to the corresponding execution engine.数据同步依赖 Connector Connectors read and write data,连接器由 Source 和 Sink 构成.Source Responsible for reading data from various data sources,将其转化成 SeaTunnelRow 抽象层（匹配 SeaTunnel 定义的数据类型）,Sink Responsible for pulling data from the abstraction layer,Write to a specific data store,Convert to storage specific format.
Apache IoTDB Connector
First introduce the integration SeaTunnel 的 Apache IoTDB Connector What functions are supported,为大家提供参考.
首先是 IoTDB 支持 Source 的典型的使用场景,如上图所示,IoTDBSupport such as bulk load device、字段投影、数据类型映射、parallel read, etc..
IoTDB 有一个类似于 group by device 的 SQL 语法,Data from multiple devices can be read in batches.Basic data type when the projection,在IoTDB 的 SQL When querying any indicator, it will be brought by default time,或 group by device 会带上device 列,SeaTunnel Also by default supports projecting to SeaTunnel 的列上.
About parallel reads,IoTDB The data comes with a timestamp,Use timestamps to divide ranges for parallel reads.
状态的恢复,因为 SeaTunnel The time range for reading is divided into different split,所以 IoTDB 可以根据 split Location information for recovery.
IoTDB 支持 Sink 的典型的使用场景,如上图所示.About metadata extraction,IoTDB 支持从 SeaTunnelRow 中提取 measurement、device 等元数据,支持从 SeaTunnelRow Extract from or use the current processing time.批量提交、Exception retry is also supported.
Apache IoTDB Data reading and analysis
接下来解析一下 IoTDB 在 SeaTunnel Analysis of the data reading function implemented on.
The first is the data type mapping,实际上是把 IoTDB The data type reads SeaTunnel 上,所以要转化成 SeaTunnel 的数据类型.
这里列出来的 IoTDB 的数据类型,BOOLEAN、INT32、INT64 等都有对应的 SeaTunnel 数据类型,So they can be converted into each other.其中 IoTDB 的 INT32 type is available according to SeaTunnel The read type on the mapped,When the value range is relatively small,也可以映射成TINYINT、SMALLINT或 INT.
The following figure shows the corresponding sample code,Show how to do the mapping where the type conversion is done.
Field projection on read,我们在读 IoTDB 数据时,可以自动映射 Time 字段,You can also choose to map part of the data to SeaTunnel 上,比如TIMESTAMP、BIGINT.
通过 IoTDB 的 SQLextract column code,You can extract only some of the required columns,在 SeaTunnel 上使用时,可以通过 feilds to specify that the column maps to SeaTunnel 后的名字、类型等.最后读取到SeaTunnelThe above data results are shown in the figure below.
Everyone saw it above,我们 SQL there is no check time 这个列,But the actual detection result has this column,所以 IoTDB 的 time Column fields can also be in SeaTunnel 上进行投影,time Columns can actually be projected into different datatypes,Users can convert according to their needs.The following figure shows the implementation logic.
Reads also involve bulk reads device.这是一个常见的需求,Because when we synchronize data, we may do data synchronization with the same data structure in large batches..
IoTDB 中的 align by device 语法 SeaTunnel 也可用,这样就可以把 deviceColumns are also projected to SeaTunnelRow 上,如下图所示.
假设 IoTDB 中有一张表,Grammarly we put device Columns are also made into data,投影到 SeaTunnel 上,配置了device name column and after specifying the data type,we end up reading SeaTunnel The data on the format as shown in the figure below,包含time、device列,and the actual data value.This allows batch reads to do the samedevice的数据.
If you want to read in parallel,We may have to pass the data range of this table IoTDB 中的 time Columns are ranged,Parallel threads/A process reads a specific range of data.按照 time 范围划分,SeaTunnel Three configurations need to be set,lower_bound、upper_bound 和 num_partitions.By configuring three parameters,The final result will be transformed into a query based on this SQL,原始的 SQL Will add the query conditions,划分成不同的 split to achieve the actual read SQL.
split divided,The next step is to distribute to each parallel reader, need to follow a distribution logic.This logic is based on split 中的 ID 向 reader 取模,This may be more random,如果 split 的 ID If you compare the hash, it will be more uniform,要根据 Connector specific circumstances to achieve.
When reading will also involve state recovery,Because if the task is large,It will take longer to read,If there is an error or exception,It is necessary to consider how to restore the state from the point of error,read again after recovery.SeaTunnel Mainly through the status of the resume reader 把 未读取的 split 信息存到 state里,And then the engine when read on a regular basis for state do snapshot retention,When restoring, you can restore the last snapshot,Continue reading after recovery.
Data write analysis
接下来是关于 IoTDB 在 SeaTunnel The implementation of analytical data to function.
Data writing also involves data type mapping,But here instead of data read,是把 SeaTunnel 的数据类型转换为IoTDB 类型.IoTDB 的 INT32 During the writing process will involve TINYINT 和 SMALLINT Data type promotion for,Other data types can be one-to-one transformation.
The following figure shows the corresponding code,To implement the logic, we need to look at our specific mapping.
Metadata dynamic injection
There is also the problem of original data injection when writing this block,SeaTunnel Support dynamic injection of metadata.When heterogeneous data sources write IoTDB 时,Support extracting from each row of data device、measurement、time,Methods through the serialization SeaTunnelRow When extracting fixed column values by configuration.or use the system time as time,如果未指定 time Columns are filled the current system time;同时,还支持配置 storage group,automatically attached to device 前缀.
举例来说,假设在 SeaTunnel Read the data format shown in the figure above row 的结构,Can be configured to sync to IoTDB 中,获得的结果如下：
Extracted the temperature we need、The two columns of humidity,并提取了 ts 和device name来作为 IoTDB 的原数据.
Batch submission and exception retry
另外,Sink Need to do processing batching and retries when writing.对于批量,You can configure the corresponding batch configuration,Including support for configuring the number and interval of batch submissions;If the data is cached in memory,Can open independent thread submitted regularly.对于重试,SeaTunnel 支持配置重试次数,Waiting interval and maximum number of retries,and when the retries are complete,It can also end if it encounters an unrecoverable error.
Data write analysis
After the previous parsing of read data and write data,Let's look at three typical usage scenario examples.
从 IoTDB 导出数据
The first scene is from IoTDB 导出数据,The example given here is from IoTDB read data in parallel to Console 上.并行度为2,批次数为24,时间范围为 2022-09-25 ~ 2022-09-26.
我们假设在 IoTDB There is a data sheet,We're going to import the data into Console 上,The entire configuration is shown in the figure below,We need to map the data columns we want to export and the time range of the query.这是一个最简单的示例,In practical use Sink 端更为复杂,You need to refer to the documentation of the corresponding data source to make the corresponding configuration.
Another typical usage scenario is to batch write data from other data sources to IoTDB.Suppose you have an external database tables,有 ts、温度、Humidity column,我们将其导入到 IoTDB 中,Two columns of temperature and humidity are required,其他的可以不要.The entire configuration is shown in the figure below,大家可以参考.In this example, the batch write frequency is every 1024 bar or per 1000 ms 提交一次,To extract metadata device、timestamp,measurement.
在 Sink terminal is mainly to specify device 列的 key,For example, what data is extracted from device、Which class is the time extracted from、what to write to IoTDB 等.可以看到,我们可以配置 storage group,也就是 IoTDB 的存储组,可以通过storage group specify storage group.This example specifies the storage group as root.test_group.
The third usage scenario is when IoTDB 与 IoTDB 之间同步数据,批量写入到 IoTDB.假设 IoTDB There is a table in that needs to be synchronized to another IoTDB,The storage group has changed since the sync passed,The name of the index of the data column has also changed,Then you can use the projection index name,并使用 SQL overwrite storage group.
author[Apache IoTDB],Please bring the original link to reprint, thank you.
The sidebar is recommended
- How to make a requirement document that both technology and testing like?
- 7 principles to understand the "dark mode" design
- MetaElfLand, which integrates World Cup + GameFi elements, launched a special event for the World Cup
- How does automated testing improve efficiency in the enterprise?
- Solve the problem of MySQL database executing Update stuck
- Custom persistence layer framework MyORMFramework (2) - framework design
- The long article finishes the summary of MySQL common statements and commands in one go
- No way!This game is better than the king...
- [Other common methods of NSFileManager to create files, etc. Objective-C language]
- Simple analysis of cold start and hot start
guess what you like
Physical address, address space, memory space, io space
C language handwritten love - restore the latest hot drama girl code
Domestic database, how can any company's products be completely independently developed?
Station 5: Operators (Act 1)
Transductive Learning and Inductive Learning
NNDL homework 9: Implement BPTT using numpy and pytorch respectively
End-to-end concepts in deep learning
NNDL Experiment 7 Recurrent Neural Network (2) Gradient Explosion Experiment
Code random recording brush questions record day26 incremental sequence + full arrangement
[C language] Getting to know pointers for the first time (final part)
- open-set recognition (OSR) open set recognition
- ["Programmer's Self-Cultivation --- Link Loading in Library" Reading Notes] Loading and process of executable files
- [Complete super-score and MEF at the same time]
- investment thinking
- [Kubernetes quick combat]
- A simple case to help you understand the concept of entropy
- How to understand information gain?
- Information gain measures the difference between two groups
- [Kubernetes three core concepts]
- LQ0244 Square root [program fill in the blank]
- LQ0242 pi [program fill in the blank]
- LQ0247 The first number [program to fill in the blank]
- Fill in the blanks prime twins LQ0246 【 application 】
- LQ0245 Maximum number [program fill in the blank]
- LQ0243 Cut circle [program fill in the blank]
- It took 6 months, from a monthly income of 3K to 14K, what have I experienced...
- BYD bydatto3 received five-star safety rating from EU safety test
- In-depth understanding of ThreadLocal
- An article takes you to understand the use of PID
- MMsegmentation related
- Jmeter - BeanShell commonly used built-in variables and script development
- Power button 73. Matrix zero C language implementation
- What does solidity address(this) mean?
- Day04[1306. Jumping Game III][703. Kth Largest Element in Data Stream][1337. Kth Weakest Combat Row in Matrix]
- Mysql and Postgresql generate data in batches
- Configuration Practice of OSPF in Frame Relay
- Understand the single sign-on (sso)
- Implementing Partition on a linked list and the Dutch flag problem
- Is it safe and legal to use MT4 for external disk?
- CompletableFuture asynchronous task orchestration
- Are the funds in the futures account safe?Which company is good for opening an account?
- 44 - Access Levels in Inheritance
- [TypeScript] TypeScript Learning — Generics
- 41 and 42 - type conversion function
- 39 - Analysis of the comma operator
- I just graduated and was cheated into a small company. I "take the data and make a watch" every day. I regret that I didn't use this tool earlier.
- 46 - structure of inheritance and the destructor
- 45 - Different ways of inheritance
- Are Long Tou Academy courses reliable?Is it safe to open an account on it?