Spring Cloud Bus

1 概述

1.1 Spring Cloud Bus是什么?

  • Spring Cloud Bus配置Spring Cloud Config使用可以实现配置的动态刷新。

Spring Cloud Bus是什么

  • Spring Cloud Bus是用来分布式系统的节点和轻量级消息系统连接起来的框架,它整合了Java事件处理机制和消息中间件的功能。
  • Spring Cloud Bus目前支持RabbitMQ和Kafka。

1.2 Spring Cloud Bus能干嘛?

  • Spring Cloud Bus能管理和传播分布式系统间的消息,就像一个分布式执行器,可用于广播状态更改、事件推送等,也可以当做微服务间的通信通道。

Spring Cloud Bus能干嘛

1.3 为什么称为总线?

1.3.1 什么是总线

  • 在微服务架构的系统中,通常会使用轻量级的消息代理来构建一个共用的消息主题,并让系统中所有微服务实例都连接上来。由于该主题中产生的消息会被所有实例监听和消费,所以称其为消息总线。
  • 在总线上的各个实例,都可以方便的传播一些需要让其他连接在该主题上的实例都知道的消息。

1.3.2 基本原理

  • Config Client实例都监听MQ中的同一个Topic(默认是SpringCloudBus)。当一个服务刷新数据的时候,其他监听到这个主题的服务就会得到通知,然后去更新自身的配置。

2 Spring Cloud Bus动态刷新全局广播

2.1 设计思想

消息总线

  • 根据上图我们可以看出Spring Cloud Bus做配置更新的步骤:
  • 1️⃣提交代码触发POST请求给bus/refresh。
  • 2️⃣server端接收到请求并发送给Spring Cloud Bus。
  • 3️⃣Spring Cloud Bus接收到消息并通知给其他客户端。
  • 4️⃣其他客户端接收到通知,请求Server端获取最新配置。
  • 5️⃣全部客户端获取到最新的配置。

2.2 服务端修改

2.2.1 导入相关依赖

  • 修改部分:
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
  • 完整部分:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xmlns="http://maven.apache.org/POM/4.0.0"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>spring_cloud_demo</artifactId>
        <groupId>org.sunxiaping</groupId>
        <version>1.0</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>config_server9010</artifactId>

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-config-server</artifactId>
        </dependency>
        <!--   导入Eureka Client对应的坐标     -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-bus-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
    </dependencies>

</project>

2.2.3 修改配置文件

  • application.yml
server:
  port: 9010
spring:
  application:
    name: config-server
  # ----修改部分------
  # 配置rabbitmq
  rabbitmq:
    host: 192.168.1.57
    port: 5672
    username: guest
    password: guest
  # ----修改部分------
  cloud:
    config:
      server:
        git:
          # git服务地址
          uri: https://gitee.com/AncientFairy/config-repo.git
          # 配置git的用户名
          username:
          # 配置git的密码
          password:
          search-paths:
            - config-repo
      # 分支
      label: master


# 配置 eureka
eureka:
  instance:
    # 主机名称:服务名称修改,其实就是向eureka server中注册的实例id
    instance-id: config-server:${server.port}
    # 显示IP信息
    prefer-ip-address: true
  client:
    service-url: # 此处修改为 Eureka Server的集群地址
      defaultZone: http://eureka7001.com:7001/eureka/,http://eureka7002.com:7002/eureka/,http://eureka7003.com:7003/eureka/

# ----修改部分------
# 配置端点
management:
  endpoints:
    web:
      exposure:
        include: 'bus-refresh'
# ----修改部分------

2.3.3 启动类

package com.sunxiaping.config;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.config.server.EnableConfigServer;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;

/**
 * @author 许大仙
 * @version 1.0
 * @since 2020-10-09 16:48
 */
@SpringBootApplication
@EnableConfigServer //开启配置中心功能
@EnableEurekaClient
public class ConfigServer9010Application {
    public static void main(String[] args) {
        SpringApplication.run(ConfigServer9010Application.class, args);
    }
}

2.3 客户端修改

2.3.1 导入相关依赖

  • 修改部分:
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
  • 完整部分:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xmlns="http://maven.apache.org/POM/4.0.0"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>spring_cloud_demo</artifactId>
        <groupId>org.sunxiaping</groupId>
        <version>1.0</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>config_client9011</artifactId>

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-bus-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-config</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>
    </dependencies>


</project>

2.3.2 修改配置文件

  • bootstrap.yml:
spring:
  # ----修改部分------
  # 配置rabbitmq
  rabbitmq:
    host: 192.168.1.57
    port: 5672
    username: guest
    password: guest
  # ----修改部分------
  cloud:
    config:
      name: product # 应用名称,需要对应git中配置文件名称的前半部分
      profile: dev # 开发环境,需要对应git中配置文件名称的后半部分
      label: master # 分支名称
      #      uri: http://localhost:9010 # config-server的请求地址
      discovery: # 服务发现
        service-id: config-server
        enabled: true # 从Eureka中获取配置信息

# 配置 eureka
eureka:
  instance:
    # 主机名称:服务名称修改,其实就是向eureka server中注册的实例id
    instance-id: service-product-dev:${server.port}
    # 显示IP信息
    prefer-ip-address: true
  client:
    service-url: # 此处修改为 Eureka Server的集群地址
      defaultZone: http://eureka7001.com:7001/eureka/,http://eureka7002.com:7002/eureka/,http://eureka7003.com:7003/eureka/

2.3.3 启动类

package com.sunxiaping.product;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;

@SpringBootApplication
@EnableEurekaClient //开启Eureka Client
public class ProductApplication {
    public static void main(String[] args) {
        SpringApplication.run(ProductApplication.class, args);
    }
}

2.3.4 业务逻辑

  • Product.java
package com.sunxiaping.product.domain;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

import javax.persistence.*;
import java.io.Serializable;
import java.math.BigDecimal;

@Setter
@Getter
@AllArgsConstructor
@NoArgsConstructor
@Entity
@Table(name = "tb_product")
public class Product implements Serializable {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    @Column(name = "product_name")
    private String productName;

    @Column(name = "status")
    private Integer status;

    @Column(name = "price")
    private BigDecimal price;

    @Column(name = "product_desc")
    private String productDesc;

    @Column(name = "caption")
    private String caption;

    @Column(name = "inventory")
    private String inventory;

}
  • ProductRepository.java
package com.sunxiaping.product.dao;

import com.sunxiaping.product.domain.Product;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.JpaSpecificationExecutor;
import org.springframework.stereotype.Repository;

@Repository
public interface ProductRepository extends JpaRepository<Product, Long>, JpaSpecificationExecutor<Product> {

}
  • ProductService.java
package com.sunxiaping.product.service;

import com.sunxiaping.product.domain.Product;

public interface ProductService {

    /**
     * 根据id查询
     *
     * @param id
     * @return
     */
    Product findById(Long id);


    /**
     * 保存
     *
     * @param product
     */
    void save(Product product);


    /**
     * 更新
     *
     * @param product
     */
    void update(Product product);


    /**
     * 删除
     *
     * @param id
     */
    void delete(Long id);


}
  • ProductServiceImpl.java
package com.sunxiaping.product.service.impl;

import com.sunxiaping.product.dao.ProductRepository;
import com.sunxiaping.product.domain.Product;
import com.sunxiaping.product.service.ProductService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import javax.transaction.Transactional;

@Service
@Transactional
public class ProductServiceImpl implements ProductService {

    @Autowired
    private ProductRepository productRepository;

    @Override
    public Product findById(Long id) {
        return this.productRepository.findById(id).orElse(new Product());
    }

    @Override
    public void save(Product product) {
        this.productRepository.save(product);
    }

    @Override
    public void update(Product product) {
        this.productRepository.save(product);
    }

    @Override
    public void delete(Long id) {
        this.productRepository.deleteById(id);
    }
}
  • ProductController.java
package com.sunxiaping.product.controller;

import com.sunxiaping.product.domain.Product;
import com.sunxiaping.product.service.ProductService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.web.bind.annotation.*;

@RestController
@RequestMapping(value = "/product")
@RefreshScope //开启动态刷新
public class ProductController {

    @Value("${server.port}")
    private String port;

    @Value("${spring.cloud.client.ip-address}")
    private String ip;

    @Autowired
    private ProductService productService;

    @PostMapping(value = "/save")
    public String save(@RequestBody Product product) {
        this.productService.save(product);
        return "新增成功";
    }

    @GetMapping(value = "/findById/{id}")
    public Product findById(@PathVariable(value = "id") Long id) {
        Product product = this.productService.findById(id);
        product.setProductName("访问的地址是:" + this.ip + ":" + this.port);
        return product;
    }
}

2.4 测试

  • 通过postman发送POST请求到http://localhost:9010/actuator/bus-refresh或者在控制台执行curl -X POST "http://localhost:9010/actuator/bus-refresh"命令。

3 Spring Cloud Bus动态刷新定点通知

3.1 概述

  • 有的时候,不需要在刷新服务端的时候,将与之对应的所有客户端都动态刷新,而是只需要刷新具体的某个客户端即可。

3.2 方法

  • 发送POST请求到http://localhost:配置中心的端口号/actuator/bus-refresh/{destination}即可。
原文地址:https://www.cnblogs.com/xuweiweiwoaini/p/13858955.html