失眠网,内容丰富有趣,生活中的好帮手!
失眠网 > 12_Apache Pulsar 实战篇 大数据平台架构 基于Canel采集数据到Pulsar 启动 Puls

12_Apache Pulsar 实战篇 大数据平台架构 基于Canel采集数据到Pulsar 启动 Puls

时间:2023-10-04 09:19:17

相关推荐

12_Apache Pulsar 实战篇 大数据平台架构 基于Canel采集数据到Pulsar 启动 Puls

3.Apache Pulsar 实战篇

3.1.大数据平台基本介绍

3.2.大数据平台架构

3.3.实战项目介绍

3.3.1.实战项目介绍

3.4.初始化数据源

3.4.1.初始化数据源: 在Mysql中建库建表

3.4.2.始化数据源: 在Mysql中建库建表

3.5.基于Canel采集数据到Pulsar

3.5.1.Canal基本介绍

3.5.2.Canal安装

3.5.3.基于canal采集MySQL数据到Pulsar

3.5.4.开启MySQL BinLog日志

3.5.5.启动 Pulsar Connectors

3.5.6.启动 Canal开始进行采集数据

3.Apache Pulsar 实战篇

3.1.大数据平台基本介绍

大数据平台是传智教育在初开始构建, 最初始主要是进行离线的数仓平台构建,力争将公司核心数据(访问咨询数据,意向用户,报名数据以及学员考勤数据等)进行整合,对这些过往数据以天为单位进行挖掘分析, 从而能够更加了解学员的相关的指标, 能够更好的为学员服务

在初, 大数据平台开始引入流式的处理, 主要采用Pulsar完成实时数据的传输, 基于Flink进行实时数据预处理以及转换操作, 最终基于CK完成实时指标统计, 构建实时数仓

同时公司高层要求能够快速便捷的查询过去历史数据集, 为了满足此需要, 我们对离线平台做了重新设计,将其纳入流式平台中, 构建基于HBase的实时查询系统以及离线分析平台, 对整个大数据平台进行重构.

3.2.大数据平台架构

3.3.实战项目介绍

传智教育整个大数据平台大致设计了有以下多个主题的流批开发: 访问咨询, 意向用户主题, 线索主题, 报名用户主题, 学生考勤主题, 学员就业主题,标签画像等.

本次课程主要以访问咨询主题为代表, 讲解传智教育整个项目开发流程以及实施方案

数据源说明:

本主题主要涉及到核心表有二个 web_chat_ems 表 和 web_chat_text_ems表

核心字段介绍:

3.3.1.实战项目介绍

3.4.初始化数据源

3.4.1.初始化数据源: 在Mysql中建库建表

1- 创建库:

CREATE DATABASE itcast_ems CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;

2- 构建表: 构建 web_chat_ems 和 web_chat_text_ems

说明:

将文件中建表语句复制出来, 进行执行, 即可构建相关表

3.4.2.始化数据源: 在Mysql中建库建表

表字段完整说明:

表字段完整说明:

本次两个表各提供了1000条数据,进行后续的测试操作,目前暂时不导入

3.5.基于Canel采集数据到Pulsar

基于MySQL 数据库增量日志解析,提供增量数据订阅和消费早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger(触发器) 获取增量变更

3.5.1.Canal基本介绍

从 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务,基于日志增量订阅和消费的业务包括。 数据库镜像数据库实时备份索引构建和实时维护(拆分异构索引、倒排索引等)业务 cache 刷新带业务逻辑的增量数据处理。 当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.xgithub地址:/alibaba/canal

3.5.2.Canal安装

下载地址:/alibaba/canal/releases/tag/canal-1.1.4/

或: 大家可以直接选择资料中提供好的下载包

1- 将下载好的canal安装包 上传到/export/software 目录下2- 在/export/servers/创建文件夹canal, 将canal.deployer-1.1.4.tar.gz 解压到该目录

3.5.3.基于canal采集MySQL数据到Pulsar

1- 修改canal下的conf目录中canal.properties

cd /export/server/canal/conf vim canal.properties内容如下:修改96行: 指定采集目的地的配置文件路径所在的目录名字canal.destinations = itcast_collect注: 可以配置多个, 配置几个, 后续就得按照这个名字配置几个目录, 构建目的地配置信息

2- 在conf目录下, 创建 itcast_collect 目录

cd /export/server/canal/conf mkdir -p itcast_collect

3- 将conf下的example目录中的instance.properties 拷贝到刚刚创建的 itcast_collect 中

cd /export/server/canal/conf cp example/instance.properties itcast_collect/

4- 进入itcast_collect, 编辑instance.properties文件

cd /export/server/canal/conf/itcast_collect vim instance.properties修改如下: # 第9行 canal.instance.master.address=:3306 # 第 33和34行: canal.instance.dbUsername=root canal.instance.dbPassword=123456 # 第 41行: canal.instance.filter.regex=itcast_ems\\..* #50行: 删除topic名称 canal.mq.topic=

5- 在Pulsar配置与canal对接的配置信息

cd /export/server/pulsar-2.8.1/conf/ vim canal-mysql-source-config.yaml内容如下: configs: zkServers: "" batchSize: "1" destination: "itcast_collect" username: "canal" password: "canal" cluster: false singleHostname: "" singlePort: "11111"

6-下载Pulsar与Canal集成的connector IO 依赖包

cd /export/server/pulsar-2.8.1mkdir -p connectors进入connectors目录: cd /export/server/brokers/connectors执行下载wget /dist/pulsar/pulsar-2.8.1/connectors/pulsar-io-canal-2.8.1.nar

或者, 可以选择直接将资料中提供好的io包进行上传即可

3.5.4.开启MySQL BinLog日志

修改mysql的配置文件(本次课程 MySQL安装在node1节点上)

vim /etc/f在[mysqld]标签下, 新增如下代码: lower_case_table_names=1log-bin=mysql-bin # 开启 binlogbinlog-format=ROW # 选择 ROW 模式,可选值[STATEMENT(记录SQL) |ROW(记录数据) | MIXED(混合使用) ]server_id=1 # 配置 MySQL 主从复制,需要定义,不要和 canal 的 slaveId 重复

重启mysql服务

systemctl restart mysqld

3.5.5.启动 Pulsar Connectors

1- 在Pulsar中创建Topic

bin/pulsar-admin topics create-partitioned-topic persistent://public/default/itcast_canal_collect --partitions 3

2- 创建并启动connector

./bin/pulsar-admin source create \ --archive ./connectors/pulsar-io-canal-2.8.1.nar \ --classname org.apache.pulsar.io.canal.CanalStringSource \ --tenant public \ --namespace default \ --name canal_collect \ --destination-topic-name itcast_canal_collect \ --source-config-file /export/server/pulsar-2.8.1/conf/canal-mysql-source-config.yaml \ --parallelism 3

3- 查看状态信息

./bin/pulsar-admin source status --name canal_collect

3.5.6.启动 Canal开始进行采集数据

1-查看状态信息

cd /export/servers/canal/bin sh startup.sh

测试:

1- 在Pulsar中启动一个消费者,用于监听Canal是否可以将数据采集到Pulsar中

./bin/pulsar-client consume -s 'itcast_test' -n 0 itcast_canal_collect

2- 将项目中对两个表中添加数据的SQL语句各拿出一条, 添加到mysql中, 观察是否可以采集到

消费者成功收到消息

12_Apache Pulsar 实战篇 大数据平台架构 基于Canel采集数据到Pulsar 启动 Pulsar Connectors 启动 Canal开始进行采集数据

如果觉得《12_Apache Pulsar 实战篇 大数据平台架构 基于Canel采集数据到Pulsar 启动 Puls》对你有帮助,请点赞、收藏,并留下你的观点哦!

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。