失眠网,内容丰富有趣,生活中的好帮手!
失眠网 > SpringCloud微服务架构 Config 分布式配置中心 Bus 消息总线 Stream 消息驱动 Sleuth+Zipkin 链路追踪

SpringCloud微服务架构 Config 分布式配置中心 Bus 消息总线 Stream 消息驱动 Sleuth+Zipkin 链路追踪

时间:2022-06-25 19:41:30

相关推荐

SpringCloud微服务架构 Config 分布式配置中心 Bus 消息总线  Stream 消息驱动 Sleuth+Zipkin 链路追踪

Config分布式配置中心

Config 概述

概述

• Spring Cloud Config 解决了在分布式场景下多环境配置文件的管理和维护。

• 好处:

• 集中管理配置文件

• 不同环境不同配置,动态化的配置更新

• 配置信息改变时,不需要重启即可更新配置信息到服务

Bus 消息总线

Bus 概述

• Spring Cloud Bus 是用轻量的消息中间件将分布式的节点连接起来,可以用于广播配置文件的更改或者服务的监控管理。关键的思想就是,消息总线可以为微服务做监控,也可以实现应用程序之间相通信。

• Spring Cloud Bus 可选的消息中间件包括 RabbitMQ 和 Kafka 。

使用RabbitMQ做消息中间件

RabbitMQ

RabbitMQ 提供了 6 种工作模式:简单模式、work queues、Publish/Subscribe 发布与订阅模式、Routing

路由模式、Topics 主题模式、RPC 远程调用模式(远程调用,不太算 MQ;暂不作介绍)。

Bus 简单项目案例(代码中有详细解释)

步骤:

分别在 config-server 和 config-client中引入 bus依赖:bus-amqp分别在 config-server 和 config-client中配置 RabbitMQ在config-server中设置暴露监控断点:bus-refresh启动测试

搭建一个Springcloud的父项目

父项目 pom.xml

<?xml version="1.0" encoding="UTF-8"?><project xmlns="/POM/4.0.0"xmlns:xsi="/2001/XMLSchema-instance"xsi:schemaLocation="/POM/4.0.0 /xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.fs</groupId><artifactId>study-springcloud</artifactId><version>1.0-SNAPSHOT</version><modules><module>fs-server-eureka-7001</module><module>fs-config-server</module><module>fs-config-provider-8001</module><module>fs-config-provider-8002</module><module>fs-stream-RabbitMQ-provider-8801</module><module>fs-stream-RabbitMQ-consumer-9901</module></modules><!-- 作为父工程--><packaging>pom</packaging><dependencyManagement><dependencies><!--spring boot --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>2.3.2.RELEASE</version><type>pom</type><!-- import 导入父工程的配置--><scope>import</scope></dependency><!--spring cloud --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>Hoxton.SR6</version><type>pom</type><!-- import 导入父工程的配置--><scope>import</scope></dependency><!-- spring-cloud-alibaba-dependencies 2.2.1.RELEASE --><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-alibaba-dependencies</artifactId><version>2.2.1.RELEASE</version><type>pom</type><scope>import</scope></dependency><!-- eureka-server --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-server</artifactId><version>2.2.4.RELEASE</version></dependency><!-- eureka-client --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId><version>2.2.4.RELEASE</version></dependency><!-- 整合MyBatis--><!-- mysql --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.20</version><scope>runtime</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid-spring-boot-starter</artifactId><version>1.1.20</version></dependency><!--MyBatisPlus --><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.3.2</version></dependency><!-- lombok--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.12</version></dependency></dependencies></dependencyManagement><dependencies></dependencies></project>

fs-server-eureka-7001(请参考另一篇博文,服务治理,有详细说明搭建)

fs-config-server 搭建configserver

pom.xml

<?xml version="1.0" encoding="UTF-8"?><project xmlns="/POM/4.0.0"xmlns:xsi="/2001/XMLSchema-instance"xsi:schemaLocation="/POM/4.0.0 /xsd/maven-4.0.0.xsd"><parent><artifactId>study-springcloud</artifactId><groupId>com.fs</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>fs-config-server</artifactId><dependencies><!--spring-boot-starter-web spring-boot-starter-actuator绑定在一块 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- 添加消息总线RabbitMQ支持--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-bus-amqp</artifactId></dependency><!-- springcloud config--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-config-server</artifactId></dependency><!-- eureka client--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId></dependency></dependencies></project>

application.yml

server:port: 3344spring:application:name: fs-config-servercloud:# 配置git远程厂库地址config:server:git:uri: /xiaofugitee/springcloud-config.git # 码云仓库地址# 搜索目录#search-paths:# - springcloud-config #仓库名字#username: xiaofuGitee#password: ***********# 跳过ssl验证#skip-ssl-validation: true# 读取分支label: master# RabbitMQ相关配置rabbitmq:host: 192.168.93.132port: 5672username: rootpassword: rootvirtual-host: /#服务注册到eurekaeureka:client:service-url:defaultZone: http://localhost:7001/eureka #,:7002/eureka,:7003/eureka# 暴露bus刷新配置的端点,使用bus,只需要在config server中配置就可以management:endpoints:web:exposure:include: 'bus-refresh' # 使用bus发送刷新的post请求# 由于配置了bus消息,gitee上的配置文件进行更改,只需要发送一个3344post请求 处处生效3355,3366的配置都会进行更新,# curl -X POST "http://localhost:3344/actuator/bus-refresh"

ConfigServer3344 主启动

//开启作为配置中心

@EnableConfigServer

package com.fs;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.cloud.config.server.EnableConfigServer;import org.flix.eureka.EnableEurekaClient;@SpringBootApplication@EnableEurekaClient//开启作为配置中心@EnableConfigServerpublic class ConfigServer3344 {public static void main(String[] args) {SpringApplication.run(ConfigServer3344.class,args);}}

fs-config-provider-8001/8002 搭建服务提供,来动态获取配置信息

8001与8002 代码一模一样,只是bootstrap的server.port端口一个是8001,一个是8002

pom.xml

<?xml version="1.0" encoding="UTF-8"?><project xmlns="/POM/4.0.0"xmlns:xsi="/2001/XMLSchema-instance"xsi:schemaLocation="/POM/4.0.0 /xsd/maven-4.0.0.xsd"><parent><artifactId>study-springcloud</artifactId><groupId>com.fs</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>fs-config-provider-8001</artifactId><dependencies><!--spring-boot-starter-web spring-boot-starter-actuator绑定在一块 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- 添加消息总线RabbitMQ支持--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-bus-amqp</artifactId></dependency><!-- springcloud config--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-config</artifactId></dependency><!-- eureka clint--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId></dependency></dependencies></project>

bootstrap.yml

server:port: 8001spring:application:name: fs-config-providercloud:config:label: master # 分支名称name: config # 配置文件前缀名 - 前面的名称profile: dev # 读取后缀名 就是读取码云仓库master分支下的config-dev.yml配置文件#uri: http://localhost:3344 # 配置中心地址# 从eureka中发现configserver的服务的uri 这里写了,就不用写uri了discovery:enabled: true # 是否从注册中心获取信息service-id: fs-config-server #服务名称# RabbitMQ相关配置rabbitmq:host: 192.168.93.132port: 5672username: rootpassword: rootvirtual-host: /eureka:client:service-url:defaultZone: http://localhost:7001/eureka #,:7002/eureka,:7003/eureka# 暴露bus刷新配置的端点 才能动态刷新配置,使用bus,只需要在config server中配置就可以#management:# endpoints:# web:#exposure:# include: 'bus-refresh'# include: '*' #暴露所有

application.yml空 没有任何代码

ConfigProvider8001/8002 主启动

package com.fs;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.flix.eureka.EnableEurekaClient;@SpringBootApplication@EnableEurekaClientpublic class ConfigProvider8001 {public static void main(String[] args) {SpringApplication.run(ConfigProvider8001.class,args);}}

ConfigProviderController

@RefreshScope//配置刷新功能

//cmd手动刷新才能获取到最新的配置信息 curl -X POST “http://localhost:8001/actuator/refresh”

//但是使用了BUS消息总线后,只需要在configserver中发送一次请求,所有绑定的微服务,都会被广播到,

// 就不需要每个微服务都去发送请求,来获取最新的配置信息

package com.fs.controller;import org.springframework.beans.factory.annotation.Value;import org.springframework.cloud.context.config.annotation.RefreshScope;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RestController;@RestController@RefreshScope//配置刷新功能//cmd手动刷新才能获取到最新的配置信息 curl -X POST "http://localhost:8001/actuator/refresh"//但是使用了BUS消息总线后,只需要在configserver中发送一次请求,所有绑定的微服务,都会被广播到,// 就不需要每个微服务都去发送请求,来获取最新的配置信息public class ConfigProviderController {@Value("${server.port}")private String serverPort;//获取云(git) 上的配置文件信息,来确保我们的的服务是否获取到最新的@Value("${config.info}")private String configInfo;//输出一下最新的配置信息@GetMapping("/configInfo")public String getConfigInfo(){return "serverPort:"+serverPort+"~~~configInfo:"+configInfo;}}

. 启动测试

在giree 码云上准备一个厂库(厂库名字与git地址记得在configserver的application.yml文件中修改)

首先启动RabbitMQ

启动服务

依次启动:

1.fs-server-eureka-7001

2.fs-config-server

启动configserver报错,不管,实际上是连接到了的

org.eclipse.jgit.api.errors.TransportException: /xiaofugitee/springcloud-config.git: Secure connection to /xiaofugitee/springcloud-config.git could not be stablished because of SSL problems

3.fs-config-provider-8001

4,fs-config-provider-8002

打开浏览器测试

http://localhost:3344/config-dev.yml

http://localhost:8001/configInfo

http://localhost:8002/configInfo

我们去gitee上修改一下配置文件中的version

修改gitee上的配置文件信息后,再次打开浏览器测试

http://localhost:3344/config-dev.yml

http://localhost:8001/configInfo

http://localhost:8002/configInfo

由于配置了bus消息,gitee上的配置文件进行更改,只需要发送一个3344post请求 处处生效3355,3366的配置都会进行更新,

** curl -X POST http://localhost:3344/actuator/bus-refresh **

发送post请求广播通知后,再次浏览器测试:

http://localhost:3344/config-dev.yml

http://localhost:8001/configInfo

http://localhost:8002/configInfo

Stream 消息驱动

Stream 概述

• Spring Cloud Stream 是一个构建消息驱动微服务应用的框架。

• Stream 解决了开发人员无感知的使用消息中间件的问题,因为Stream对消息中间件的进一步封装,可以做

到代码层面对中间件的无感知,甚至于动态的切换中间件,使得微服务开发的高度解耦,服务可以关注更多

自己的业务流程。

• Spring Cloud Stream目前支持两种消息中间件RabbitMQ和Kafka

Stream 组件

• Spring Cloud Stream 构建的应用程序与消息中间件之间是通过绑定器 Binder

相关联的。绑定器对于应用程序而言起到了隔离作用, 它使得不同消息中间件

的实现细节对应用程序来说是透明的。

• binding 是我们通过配置把应用和spring cloud stream 的 binder 绑定在一起

• output:发送消息 Channel,内置 Source接口

• input:接收消息 Channel,内置 Sink接口

小案列演示 Stream 消息驱动

fs-stream-RabbitMQ-provider-8801 Stream 消息生产者

Stream 消息生产者

创建消息生产者模块,引入依赖 starter-stream-rabbit编写配置,定义 binder,和 bingings定义消息发送业务类。添加 @EnableBinding(Source.class),注入

MessageChannel output ,完成消息发送编写启动类,测试

pom.xml

<?xml version="1.0" encoding="UTF-8"?><project xmlns="/POM/4.0.0"xmlns:xsi="/2001/XMLSchema-instance"xsi:schemaLocation="/POM/4.0.0 /xsd/maven-4.0.0.xsd"><parent><artifactId>study-springcloud</artifactId><groupId>com.fs</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>fs-stream-RabbitMQ-provider-8801</artifactId><dependencies><!--spring-boot-starter-web spring-boot-starter-actuator绑定在一块 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- -stream-rabbit--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency></dependencies></project>

application.yml

server:port: 8801spring:application:name: fs-stream-providercloud:stream:# 定义绑定,绑定到哪个消息中间件上binders: #在此处声明配置要绑定的RabbitMQ的服务信息fs_Rabbit: # 表示定义的名称,用binding整合,随便自定义type: rabbit # 消息组件类型environment: #设置RabbitMQ的相关配置spring:rabbitmq:host: 192.168.93.132port: 5672username: rootpassword: rootvirtual-host: /bindings: #服务的整合处理output: # 这个名字是一个管道的名称,使用默认的outputdefault-binder: fs_Rabbit # 设置要绑定的消息服务的具体设置, 若idea爆红没关系destination: fs_Exchange #表示要使用的Exchange交换机名称 自定义 消息发送的目的地content-type: application/json # 设置消息类型,本次为json 文本设置为test/plain#eureka:# client:# service-url:#defaultZone: :7001/eureka,:7002/eureka,:7003/eureka# instance:# lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认30秒)# lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒间隔(默认是90秒)# instance-id: send- # 在消息列表显示主机名称# prefer-ip-address: true # 访问的路径变ip地址

StreamMain8801 主启动

package com.fs.springcloud;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplicationpublic class StreamMain8801 {public static void main(String[] args) {SpringApplication.run(StreamMain8801.class,args);}}

IMessageProvider 服务提供接口

package com.fs.springcloud.service;public interface IMessageProvider {public String send();}

IMessageProviderImpl 服务提供接口实现类

package com.fs.springcloud.service.impl;import com.fs.springcloud.service.IMessageProvider;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.cloud.stream.annotation.EnableBinding;import org.springframework.cloud.stream.messaging.Source;import org.springframework.messaging.MessageChannel;import org.springframework.messaging.support.MessageBuilder;import org.springframework.stereotype.Service;import java.util.UUID;//定义消息的推送管道@Service@EnableBinding(Source.class)//指信道channel和Exchange绑定在一起public class IMessageProviderImpl implements IMessageProvider {@Autowiredprivate MessageChannel output;//消息发送管道//重写send方法,发送消息@Overridepublic String send() {String s = UUID.randomUUID().toString()+"我是provider使用StreamRabbitMQ发送的消息~~~";output.send(MessageBuilder.withPayload(s).build());System.out.println("~~~~消息"+s);return "success";}}

SendMessageController 服务提供方,用于调用send方法测试发送消息

package com.fs.springcloud.controller;import com.fs.springcloud.service.IMessageProvider;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;/*提供方法,发送消息*/@RestControllerpublic class SendMessageController {@Autowiredprivate IMessageProvider iMessageProvider;@RequestMapping("/sendMessage")public String sendMessage(){return iMessageProvider.send();}}

fs-stream-RabbitMQ-consumer-9901 Stream 消息消费者

创建消息消费者模块,引入依赖 starter-stream-rabbit编写配置,定义 binder,和 bingings定义消息接收业务类。添加 @EnableBinding(Sink.class),使用

@StreamListener(Sink.INPUT),完成消息接收。编写启动类,测试

pom.xml

<?xml version="1.0" encoding="UTF-8"?><project xmlns="/POM/4.0.0"xmlns:xsi="/2001/XMLSchema-instance"xsi:schemaLocation="/POM/4.0.0 /xsd/maven-4.0.0.xsd"><parent><artifactId>study-springcloud</artifactId><groupId>com.fs</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>fs-stream-RabbitMQ-consumer-9901</artifactId><dependencies><!--spring-boot-starter-web spring-boot-starter-actuator绑定在一块 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- -stream-rabbit--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency></dependencies></project>

application.yml

server:port: 9901spring:application:name: fs-stream-consumercloud:stream:binders: #在此处声明配置要绑定的RabbitMQ的服务信息fs_Rabbit: # 表示定义的名称,用binding整合type: rabbit # 消息组件类型environment: #设置RabbitMQ的相关配置spring:rabbitmq:host: 192.168.93.132port: 5672username: root #RabbitMQ的账户密码password: rootvirtual-host: /bindings: #服务的整合处理input: # 这个名字是一个通道的名称 默认inputdefault-binder: fs_Rabbit # 设置要绑定的消息服务的具体设置, 爆红没关系destination: fs_Exchange #表示要使用的Exchange名称定义content-type: application/json # 设置消息类型,本次为json 文本设置为test/plaingroup: xiaofuA #对消费者进行分组,将2个微服务分成同一个组,轮询消费,解决重复消费#eureka:# client:# service-url:#defaultZone: :7001/eureka,:7002/eureka,:7003/eureka# instance:# lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认30秒)# lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒间隔(默认是90秒)# instance-id: receive- # 在消息列表显示主机名称# prefer-ip-address: true # 访问的路径变ip地址

StreamRabbitMQMain9901 主启动

package com.fs.springcloud;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;/*故障现象,重复消费微服务应用放置于同一个group中,就能够保证消息只会被其中一个应用消费一次不同的组是可以重复消费的,同一个租内会发生竞争关系,只有其中一个可以消费*/@SpringBootApplicationpublic class StreamRabbitMQMain9901 {public static void main(String[] args) {SpringApplication.run(StreamRabbitMQMain9901.class,args);}}

ReceiveMessageListenerController 用于消费服务提供方发送的消息

package com.fs.springcloud.controller;import org.springframework.beans.factory.annotation.Value;import org.springframework.cloud.stream.annotation.EnableBinding;import org.springframework.cloud.stream.annotation.StreamListener;import org.springframework.cloud.stream.messaging.Sink;import org.springframework.messaging.Message;import org.ponent;/*消息接收类*/@Component@EnableBinding(Sink.class)//绑定消息public class ReceiveMessageListenerController {@Value("${server.port}")private String serverPort;//定义消息接收方法@StreamListener(Sink.INPUT)public void input(Message<String> message){//打印消息数据//只要提供方发生了消息,这里就会被执行System.out.println("消费者,~~~~~收到的消息:"+message.getPayload()+",port:"+serverPort);}}

测试Stream 消息驱动 小案列

首先打开RabbitMQ

启动2个服务:

fs-stream-RabbitMQ-provider-8801

fs-stream-RabbitMQ-consumer-9901

浏览器发送服务提供方法的controller的地址调用send方法,发送消息到RabbitMQ中

然后观看控9901的控制台

Sleuth+Zipkin 链路追踪

概述

• Spring Cloud Sleuth 其实是一个工具,它在整个分布式系统中能跟踪一个用户请求的过程,捕获这些跟踪数据,就能构建微服务的整个调用链的视图,这是调试和监控微服务的关键工具。

• 耗时分析

• 可视化错误

• 链路优化

• Zipkin 是 Twitter 的一个开源项目,它致力于收集服务的定时数据,以解决微服务架构中的延迟问题,包括数据的收集、存储、查找和展现。

定义:

sleuth:采集程序中的请求等相关数据的工具zipkin:分析并图形化展示调用关系的工具

搭建Sleuth+Zipkin项目

安装启动zipkin。 java –jar zipkin.jar访问zipkin web界面。 http://localhost:9411/在服务提供方和消费方分别引入 sleuth 和 zipkin 依赖分别配置服务提供方和消费方。启动,测试

使用:

服务端(ZipKin)

jar包运行

docker 运行

客户端

导包

<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-zipkin</artifactId></dependency>

配置

zipkin服务器地址信息采样率

spring:zipkin:base-url: http://localhost:9411/ # 设置zipkin的服务端路径sleuth:sampler:probability: 1 # 采集率 默认 0.1 百分之十。

编码

无需任何编码,服务自动被采集数据到zipkin

如果觉得《SpringCloud微服务架构 Config 分布式配置中心 Bus 消息总线 Stream 消息驱动 Sleuth+Zipkin 链路追踪》对你有帮助,请点赞、收藏,并留下你的观点哦!

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。