失眠网,内容丰富有趣,生活中的好帮手!
失眠网 > Streamsets Data Collector 3.12

Streamsets Data Collector 3.12

时间:2022-04-25 05:17:29

相关推荐

Streamsets Data Collector 3.12

Streamsets Data Collector 3.12

官方文档:/portal/datacollector/3.9.x/help/datacollector/UserGuide/Getting_Started/GettingStarted_Title.html#concept_htw_ghg_jq

目录

Streamsets Data Collector 3.121、简介1.2、特点2、原理组件2.2、什么是Pipeline?2.3、工作原理单线程和多线程管道交货保证2.4、操作 processors2.5、目的地(Destinations)2.5.1、ElasticSearch4、实时任务4.1、Mysql Binary Log 组件4.2、利用StreamSet抓取MySql数据在ElasticSearch中建立索引5.全量任务5.1,JDBC Query Consumer5.2,JDBC Multitable6.增量任务6.1,JDBC Query Consumer6.2、JDBC Multitable7.操作 Processors7.1,定时任务 Cron Scheduler7.2、Pipeline Finisher Executor 自动停止任务7.3、Stream selector 分流操作7.4、JavaScript Evaluator 自定义处理数据7.5、Field Type Converter 类型转化7.6、Field Replacer 替换值7.7、Field Renameer7.8、Field Remover8、源数据库8.1、mysql binlog8.2、JDBC Multitable8.3、kafka consumer8.4、Hadoop FS Standalone9、目标库9.1、KUDU9.2 hive Metadata hive9.3、Hadoop FS9.4、ElasticSearch9.5、JDBC producer

1、简介

StreamSets 是一款图形化的数据集成工具。

由前 Cloudera 首席工程师和前 Informatica 首席产品在 年创建。 年,同 Apache Spark、Tensorflow、ElasticSearch 等一同获得了当年 InfoWorld BOSSIE 最佳开源大数据 工具 奖。

其核心产品为 StreamSets Data Collector。 GitHub 地址

StreamSets 数据连接器,分为源(Origin)和目标(Destination)两类。数据从源进入,经过转换,传输到目标,从而构建出一条数据管道(Pipeline)。 连接器地址

StreamSets Data Collector是一个轻量级、强大的设计和执行引擎,可以实时流式传输数据。使用 Data Collector 路由和处理数据流中的数据。

1.2、特点

StreamSets Data Collector是一款大数据实时采集和ETL工具,可以实现不写一行代码完成数据的采集和流转。通过拖拽式的可视化界面,实现数据管道(Pipelines)的设计和定时任务调度。最大的特点有:

1、可视化界面操作,可以直观排查错误;

2、 内置监控,可是实时查看数据流传输的基本信息和数据的质量;

3、强大的整合力,对现有常用组件全力支持。

对于Streamsets来说,最重要的概念就是数据源(Origins)、操作(Processors)、目的地(Destinations)、执行器(Executor)。

2、原理组件

架构层面,StreamSets将每个数据集成任务抽象成pipeline,数据记录在pipeline中以batch-record的形式流动,而pipeline则由代表数据来源的Origin,代表接收端的Destination,以及包含具体数据转换/映射/过滤等业务逻辑的Processor共同组合实现,具体如下图:

(2)record:

pipeline中的数据是以record形式在上下游之间流动。record可以简单看做是一条条记录,每个record都有自己的schema。record在StreamSets中以Map形式存在,其中key为字段名(field-name),value为字段实际值;record除了包含业务数据外,还自带header属性,保存元数据信息。

2.2、什么是Pipeline?

Pipeline描述了从源系统到目标系统的数据流,并定义了如何在此过程中转换数据。

您可以使用单个源阶段来表示源系统,使用多个处理器阶段来转换数据,并使用多个目标 阶段来表示目标系统。

当您开发Pipeline时,您可以使用开发阶段来提供示例数据并生成错误以测试错误处理。您可以使用数据预览来确定阶段如何通过Pipeline更改数据。

您可以使用执行器阶段来执行事件触发的任务执行或保存事件信息。要处理大量数据,可以使用多线程Pipeline或集群模式Pipeline。

在写入Hive 或 parquet或PostgreSQL的Pipeline中,您可以实现数据漂移解决方案,以检测传入数据中的漂移并更新目标系统中的表。

启动Pipeline时,Data Collector会运行Pipeline,直到您停止Pipeline或关闭Data Collector。您可以使用Data Collector运行多个Pipeline。

在Pipeline运行时,您可以监控Pipeline以验证Pipeline是否按预期执行。您还可以定义指标和数据 规则以及警报 ,以便在达到某些阈值时通知您。

2.3、工作原理

数据分批通过管道。这是它的工作原理:

源在从源系统读取数据或从源系统到达数据时创建一个批次,并注意偏移量。偏移量是原点停止读取的位置。

当批次已满或超过批次等待时间限制时,源会发送该批次。批处理通过管道从一个处理器移动到另一个处理器,直到它到达管道目的地。

目标将批处理写入目标系统,Data Collector在内部提交偏移量。根据管道交付保证,Data Collector要么在写入任何目标系统时立即提交偏移量,要么在收到来自所有目标系统的写入确认后提交偏移量。在偏移提交之后,原始阶段会创建一个新批次。

请注意,这描述了一般管道行为。行为可能因特定的管道配置而异。例如,对于 Kafka Consumer,偏移量存储在 Kafka 或 ZooKeeper 中。对于不存储数据的源系统,例如 Omniture 和 HTTP 客户端,偏移量不会被存储,因为它们不相关。

单线程和多线程管道

上面的信息描述了一个标准的单线程管道——源端创建一个批次并通过管道传递它,只有在处理了前一个批次之后才创建一个新批次。

一些来源可以生成多个线程以启用 多线程管道中的并行处理。在多线程管道中,您可以配置源以创建要使用的线程数或并发量。 并且Data Collector会根据pipeline Max Runners属性创建多个pipeline runners来进行pipeline处理。每个线程都连接到源系统,创建一批数据,并将该批数据传递给可用的管道运行器。

每个管道运行器一次处理一批,就像在单个线程上运行的管道一样。当数据流变慢时,管道运行器会闲置直到需要它们,并定期生成一个空批次。您可以配置 Runner Idle Time 管道属性以指定间隔或选择不生成空批次。

交货保证

配置管道时,您定义了如何处理数据:您要防止数据丢失或数据重复吗?

Delivery Guarantee 管道属性提供以下选择:

至少一次

确保管道处理所有数据。

​ 如果在处理一批数据时发生故障导致Data Collector停止,则在重新启动时,它会重新处理该批。此选项可确保不会丢失任何数据。

​ 使用此选项,Data Collector 在收到来自目标系统的写入确认后提交偏移量。如果在Data Collector将数据传递到目标系统之后但在收到确认并提交偏移量之前发生故障 ,则最多可能会在目标系统中复制一批数据。

最多一次

确保不会多次处理数据。

​ 如果在处理一批数据时发生故障导致Data Collector停止,则在启动时,它将开始处理下一批数据。此选项可避免由于重新处理而导致目标中的数据重复。

​ 使用此选项,Data Collector 在写入后提交偏移量,而无需等待来自目标系统的确认。如果在Data Collector将数据传递到目标并提交偏移量后发生故障 ,则最多有一批数据可能不会写入目标系统。

2.4、操作 processors

/portal/datacollector/3.12.x/help/index.html 技术文档

Data Generator // 数据序列化组件,将Avro、json、protobuf、text、xml等格式的数据序列成bytearray或stringData Parser // 数据反序列化组件,将bytearray或string数据反序列成Avro、json、protobuf、text、xml等格式的数据Delay // 延迟处理组件,用于数据延时处理Encrypt and Decrypt Fields // 加解密组件,支持多种加解密算法Expression Evaluator // 表达式组件,可用该组件添加或修改记录标题属性和字段属性Field Flattener // 数据平铺组件,可以展平整个记录以生成没有嵌套字段的记录Field Hasher // 哈希组件,可用于计算数据的哈希值,支持多种哈希算法Field Mapper // 数据映射组件,可用于将表达式映射到一组字段,以更改字段路径,字段名称或字段值Field Masker // 数据打码组件,可用于将敏感的数据进行打码Field Merger // 数据合并组件,将List或Map类型的记录中的一个或多个字段合并到记录中的其他路径Field Order // 数据排序组件,将List或Map类型的记录中的字段进行排序Field Pivoter // 数据移位组件Field Remover // 字段删除组件,用于保留或删除记录中的某些字段Field Renamer // 重命名组件,用于重命名记录中字段的keyField Replacer // 数据替换组件,用于填充或替换记录中的缺失值Field Splitter // 字段切割组件,用于将数据按某一分隔符进行切割Field Type Converter // 类型转化组件,用于数据的类型转化Field Zip // 拉锁组件,用于将两个数组进行关联Geo IP // Ip解析组件,用于将ip解析成对应的经纬度、地理信息等数据信息Groovy Evaluator // Groovy脚本组件,用于自定义Groovy脚本,根据需求编写一些代码实现一个数据处理任务,功能强大HBase Lookup // HBase 数据查询组件,用于从HBASE查询数据Hive Metadata // Hive 元数据组件,与Hive Metastore目标以及Hadoop FS或MapR FS目标配合使用,作为Hive漂移同步解决方案的一部分HTTP Client // Http 客户端组件,用于从http服务中获取获取数据HTTP Router // Http 路由组件,根据http 请求方式(post put get)和请求路径进行分支路由JavaScript Evaluator // JavaScript脚本组件,用于自定义JavaScript脚本,根据需求编写一些代码实现一个数据处理任务,功能强大JDBC Lookup // JDBC 数据查询组件,用于JDBC从数据库中查询数据,适用于通过JDBC方式连接的数据库(Mysql等)的查询JDBC Tee // JDBC Tee 组件,使用JDBC连接将数据写入MySQL或PostgreSQL数据库表,然后将生成的数据库列值传递给字段。使用JDBC Tee处理器将部分或全部记录字段写入数据库表,然后用其他数据丰富记录JSON Generator // JSON 序列化组件,用于将数据记录序列化成JSON字符串JSON Parser // JSON 反序列化组件,用于将JSON字符串数据反序列化成Java对象数据Jython Evaluator // Jython脚本组件,用于自定义Jython脚本,根据需求编写一些代码实现一个数据处理任务,功能强大Kudu Lookup // Kudu 查询组件,用于从Kudu 系统中读取数据Log Parser // 日志解析组件,支持多种日志格式的的解析,用于将具有一定格式的日志数据,解析成系统平台可处理的结构化格式数据MLeap Evaluator // MLeap 数据分析组件,使用存储在MLeap捆绑软件中的机器学习模型来生成评估,评分或数据分类MongoDB Lookup // MongoDB 数据查询组件,用于从MongoDB中查询数据PostgreSQL Metadata //PostgreSQL元数据组件,确定其中每个记录应写入PostgreSQL的表,记录结构对表结构进行比较,然后根据需要创建或改变的表Record Deduplicator // 记录重复数据删除组件,评估记录中是否有重复数据,并将数据路由到两个流中-一个流用于唯一记录,一个流用于重复记录。使用记录重复数据删除器丢弃重复数据或通过不同的处理逻辑路由重复数据Redis Lookup // Redis数据查询组件,用于从Redis中查询数据Salesforce Lookup // Salesforce数据查询组件,用于从Salesforce中查询数据Schema Generator // Schema 生成组件,基于记录的结构生成模式,并将该模式写入记录头属性。用于生成AvroSchema Spark Evaluator // spark 数据处理组件,用于将平台与spark关联实现数据处理的分布式处理SQL Parser // SQL 解析组件Start Job // 作业启动组件,需要与Controler Hub 配合使用Start Pipeline // 数据流启动组件,用于启动指定的数据流Static Lookup // 静态数据查询组件,执行存储在本地内存中的键/值对的查找,并将查找值传递给字段。使用静态查找将字符串值存储在内存中,Pipeline可以在运行时查找这些值,以用其他数据丰富记录Stream Selector // 数据分选组件,用于通过设置条件,来将数据分选不同分支进行处理TensorFlow Evaluator // TensorFlow 数据分析组件,通过TensorFlow训练的数据模型,并模型配置到指定目录下,在系统平台上使用,实现数据分析功能Whole File Transformer // 全文件转换组件,用于全文件目录或文件的快速拷贝或转换

2.5、目的地(Destinations)

2.5.1、ElasticSearch

ElasticSearch /portal/datacollector/3.9.x/help/datacollector/UserGuide/Destinations/Elasticsearch.html#concept_u5t_vpv_4r

利用StreamSet抓取MySql数据在ElasticSearch中建立索引

/post/6945014553356795935

4、实时任务

4.1、Mysql Binary Log 组件

Credentials:数据库账号密码;Advanced: Include Tables ——添加要同步的表;Ignore Tables——黑名单,不需要同步的数据。

MySQL binlog底层主从同步原理:

主从复制就是依靠binlog

Slave 端,里面有两个线程,一个是IO线程,另一个是SQL线程;IO线程负责从Master上读取信息然后返回,(slave什么时候读取,master会有一个事件通知slave )

slave收到通知后使用IO Thread主动去master读取binlog日志,然后异步写入relay日志(中转日志),然后使 SQL Thread完成对relay日志 的解析然后入库操作,完成同步。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-gHmiSITB-1659665744740)(E:\cache\TyporaChe\605714-213746607-1354459428.png)]

Binlog模式分三种Row、Statement、Mixed。

Row模式存储的是数据修改后的结果,binlog中可以不记录执行的sql语句的上下文相关的信息,仅仅只需要记录那一条记录被修改了,修改成什么样了。对于update mytable set col1=’abc’ where col2=’c’在row模式下可能产生大量的数据,因为语句虽然是一条,但实际影响的数据记录却可能很多。而对于alter table、drop table、create table等信息在Row模式下则不会产生大量的log条目,因为它还是记录的语句,而不是单行数据的变化情况。

优点:在row level模式下,bin-log中可以不记录执行的sql语句的上下文相关的信息,仅仅只需要记录那一条被修改。所以rowlevel的日志内容会非常清楚的记录下每一行数据修改的细节。不会出现某些特定的情况下的存储过程或function,以及trigger的调用和触发无法被正确复制的问题

缺点:row level,所有的执行的语句当记录到日志中的时候,都将以每行记录的修改来记录,会产生大量的日志内容。

Statemnet模式每一条会修改数据的sql都会记录到 master的binlog中。slave在复制的时候sql进程会解析成和原来master端执行过的相同的sql来再次执行。由于他是记录的执行语句,所以,为了让这些语句在slave端也能正确执行,那么他还必须记录每条语句在执行的时候的一些相关信息,也就是上下文信息,以保证所有语句在slave端杯执行的时候能够得到和在master端执行时候相同的结果。

优点:statement level下的优点首先就是解决了row level下的缺点,不需要记录每一行数据的变化,减少bin-log日志量,节约IO,提高性能,因为它只需要在Master上锁执行的语句的细节,以及执行语句的上下文的信息。

缺点:由于只记录语句,所以,在statement level下 已经发现了有不少情况会造成MySQL的复制出现问题,主要是修改数据的时候使用了某些定的函数或者功能的时候会出现。

Mixed模式则是前两种的混合,MySQL会根据执行的每一条具体的sql语句来区分对待记录的日志形式,也就是在Statement和Row之间选择一种。选择性的使用面向行数据变化的Row方式记录,主要是面对一些未决语句(nondeterministic),考虑到安全问题,避免主从库之间数据出现不一致,比如语句面向多行插入,其中又有auto-increment的字段,数据库存储引擎不同,可能带来插入顺序

4.2、利用StreamSet抓取MySql数据在ElasticSearch中建立索引

/post/6945014553356795935

5.全量任务

5.1,JDBC Query Consumer

5.2,JDBC Multitable

6.增量任务

6.1,JDBC Query Consumer

当您为增量模式定义 SQL 查询时,JDBC Query Consumer 需要在查询中包含 WHERE 和 ORDER BY 子句。

使用 OFFSET 常量来表示偏移值

在 WHERE 子句中,使用 ${OFFSET} 表示偏移值。

例如,当您启动Pipeline时,以下查询将返回表中偏移列中的数据大于初始偏移值的所有数据:SELECT * FROM <tablename> WHERE <offset column> > ${OFFSET}**提示:**当偏移值是字符串时,将 ${OFFSET} 括在单引号中。

在 ORDER BY 子句中,包括偏移列作为第一列

为避免返回重复数据,请将偏移列用作 ORDER BY 子句中的第一列。

**注意:**在 ORDER BY 子句中使用不是主键或索引列的列会降低性能。

例如,以下增量模式查询从 ID 列是偏移列的 Invoice 表返回数据。查询返回 ID 大于偏移量的所有数据,并按 ID 对数据进行排序:

SELECT * FROM invoice WHERE id > ${OFFSET} ORDER BY id

6.2、JDBC Multitable

7.操作 Processors

7.1,定时任务 Cron Scheduler

7.2、Pipeline Finisher Executor 自动停止任务

以下源端产生了no-more-data的事件:

Amazon S3源Azure Data Lake Storage Gen1源Azure Data Lake Storage Gen2源Directory源Google Cloud Storage源Hadoop FS Standalone源JDBC Multitable Consumer源JDBC Query Consumer源MongoDB源Salesforce源SFTP/FTP/FTPS Client源SQL Server BDC Multitable Consumer源SQL Server CDC Client源SQL Server Change Tracking源Teradata Consumer源

1)点击origin的目录插件,配置该插件产生事件

-- 此语句 数据接受完停止管道${record:eventType() == 'no-more-data'}

7.3、Stream selector 分流操作

${record:value("/Type") == "DELETE"}通过类型来判断

也可以通过字段值来进行分流${record:value("/字段名称")==0}

${record:attribute('sdc.operation.type') == 5 }${record:attribute('jdbc.cdc.source_name') == '表名' }

7.4、JavaScript Evaluator 自定义处理数据

初始化脚本-可选的初始化脚本,用于设置任何必需的资源或连接。管道启动时,初始化脚本将运行一次。主处理脚本-处理数据的主脚本。根据配置的处理模式,为每个记录或每批数据运行主脚本。销毁脚本-可选的销毁脚本,用于关闭处理器打开的任何资源或连接。当管道停止时,销毁脚本将运行一次。

Record Processing Mode:参数

Record by Record

处理器为每个记录调用脚本。处理器将记录作为映射传递到脚本,并分别处理每个记录。

Batch by Batch

处理器为每个批次调用脚本。处理器将批次作为列表传递到脚本,并一次处理该批次。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-k2Nrpyi3-1659665744744)(E:\cache\TyporaChe\image-0718162935669.png)]

获取年月日传递给下一个操作

var records = sdc.records;var d = new Date()for(var i = 0; i < records.length; i++) {try {var year = d.getFullYear()+'';var month = d.getMonth() + 1 +'';if(month.length==1){month = '0'+month;}var day = d.getDate() -1 +'';if(day.length==1){day = '0'+day;}var str = '/user/hive/warehouse/ads_hubei.db/ads_hotel_isprice_dd/day_id='+ year + month + day;records[i].value.tableName = strsdc.output.write(records[i]);} catch (e) {// Send record to errorsdc.error.write(records[i], e);}}

获取HDFS文件自定义schema

var records = sdc.records;for(var i = 0; i < records.length; i++) {try {if(null!=records[i].value['text']){var strs = records[i].value['text'].split(',')if(strs.length == 6){records[i].value['rateplanId']=strs[0];//records[i].value['hotelId']=strs[1];// records[i].value['valid']=strs[2];if(strs[3]=='false'){records[i].value['isPrice'] = 0;}else{records[i].value['isPrice'] = 1;} records[i].value['cityId']=strs[4];}}sdc.output.write(records[i]);} catch (e) {// Send record to errorsdc.error.write(records[i], e);}}

7.5、Field Type Converter 类型转化

7.6、Field Replacer 替换值

${f:value()+28800000}支持一下类型

ByteDoubleFloatIntegerLongShortString

7.7、Field Renameer

7.8、Field Remover

8、源数据库

8.1、mysql binlog

同 4.1 章节

8.2、JDBC Multitable

同 5.2 章节

8.3、kafka consumer

text 格式 写入到kudu中

8.4、Hadoop FS Standalone

files data format

9、目标库

9.1、KUDU

将数据写入到kudu表里

多表模式Table Name 修改为impala::hubei.${record:value("/Table")}通过JS来获取name;impala::hubei.${record:value("/Database")}_${record:value("/Table")}获取库名表名

9.2 hive Metadata hive

-- Decimal Scale Expression${record:attribute(str:concat(str:concat('jdbc.', field:field()), '.scale'))}-- Decimal Precision Expression ${record:attribute(str:concat(str:concat('jdbc.', field:field()), '.precision'))}

data format 数据格式 Avro/parquet

hadoop FShive metastore

9.3、Hadoop FS

9.4、ElasticSearch

9.5、JDBC producer

如果觉得《Streamsets Data Collector 3.12》对你有帮助,请点赞、收藏,并留下你的观点哦!

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。