数字档案管理系统
首页
操作手册
部署手册
微服务框架
首页
操作手册
部署手册
微服务框架
  • 简介
  • 快速上手
  • 项目介绍
  • 架构设计
    • 设计思想
      • 先进性
      • 简易性
      • 专注性
      • 低代码性
      • 稳定性
      • 自主性
    • 架构图
    • 核心模块
      • UPMS
      • 概念
      • UPMS-ER图
      • 组织架构
      • 用户
      • 菜单
      • API资源
      • 权限
      • 角色
      • 数据创建流程
      • 安全认证
      • 概念
      • 授权方式
      • 授权方式控制
      • so-fast授权特点
      • 字典管理
      • 内容管理
      • 公告通知类
      • 文章博客类
      • 日志
      • xss
      • 动态切换数据源
      • 统一异常处理
      • 统一返回体
      • 文件上传
      • 文件上传分类
      • 上传路径配置
      • 上传文件大小配置
      • 文件上传超时问题
      • 分布式锁
      • 使用
      • 1.注解方式
      • 2.自定义方式
      • Rabbitmq
      • 使用
      • 简单模式
      • 工作队列
      • 发布订阅
      • 路由模式
      • 主题模式
      • 手动Ack与自动Ack
      • Ack的种类:
      • 场景
      • 使用方式
      • 附件:sf-rabbit.yml
      • Kafka
      • 使用示例
      • 简易示例
      • 生产者-callback使用示例
      • 生产者-事务使用示例
      • 消费者-指定参数消费使用示例
      • 消费者-批量消费使用示例
      • 消费者-异常处理器
      • 消费者-消息过滤器
      • 消费者-消息转发
      • 附件:sf-kafka.properties
      • 多租户
      • 组件介绍:
      • 实现原理:
      • 使用方式:
      • 页面功能介绍:
      • 列表功能页
      • 模态事件
      • 工作流
      • 流程设计
      • 后台使用方式
  • 前端开发

    • 开发规范
    • 全局配置
  • 后端开发
  • 生产部署
  • 帮助文档与常见问题
  • 测试手册
数字档案管理系统
2025-04-17
目录

架构设计

# 设计思想

本框架初衷是为了解决部门内各项目开发框架混乱、不易沉淀。不易持续优化升级而设计,其根本目标是为了简化开发,统一规范,降低成本。

本框架设计遵循以下设计思想:

# 先进性

框架技术选型符合当前行业内大多数客户的技术栈要求,符合市场的流行性。框架的设计模式具备先进性和可迭代性。

# 简易性

框架应具备思路清晰,上手简单,准入门槛低特性,易于 BP 人员快速掌握。

# 专注性

框架应实现大部分系统常用功能,避免重复开发,使开发人员可专注于业务逻辑编码。

# 低代码性

框架应具备代码生成器,对于大量重复代码提供模板,可一键生成。

# 稳定性

系统经过多轮测试和一定周期试运行,保证系统稳定性和健壮性。

# 自主性

框架每行代码应掌握在本公司手里,借助开源框架的同时,也应深入理解其原理。达到知其然亦知其所以然。

# 架构图

so-fast-diagram

# 核心模块

# UPMS

# 概念

UPMS:User Permissions Management System,通用用户权限管理系统,其最大的价值在于解决重复开发的时间和技术问题,有非常良好的应用性和扩展性,通用用户权限系统基本能解决在用户系统中面临的各种问题。

so-fast 基础框架内置 UPMS 模块,由于不同的项目有不同的定制需求,so-fast 的 UMPS 模块不能做到完全兼容,因此该模块被设计为最基础的 UMPS 模块。在需要业务扩展的情况下,不推荐更改 so-fast 内置数据结构,推荐创建信息扩展表并结合内置的数据表进行业务的扩展。国内技术卡开发委员会也会在后续迭代过程中不断完善本模块,以适应更多的场景,为业务开发提供更大的便利。

# UPMS-ER 图

img

# 组织架构

so-fast 提供简单的分级组织信息维护,sys_organization 表中定义了最基础的组织信息,其中 id 为数据库自增主键,通过 parent_id、parent_ids 维护组织的从属关系,并且可以通过字典操作定义组织类型。so-fast 提供简单的组织信息维护接口,详见 swagger 文档【2. 系统内置模块 API-- 组织机构操作 (opens new window)】。

# 用户

so-fast 提供简单的用户信息维护,sys_user 定义了最基础的用户信息,其中 id 为数据库自增主键,login_name 为用户登录名不可重复。系统内置超级管理员(admin 字段值为 “1”),此种用户为超级用户,禁止对其进行自改密码之外的任何操作。so-fast 提供简单的用户信息维护接口,详见 swagger 文档【2. 系统内置模块 API-- 用户操作 (opens new window)】。

# 菜单

so-fast 提供的系统菜单维护功能,sys_menu_info 定义了系统菜单信息,其中 id 为数据库自增主键。菜单类型有页面和按钮两种,并且在数据中通过 component_path、parent_id 体现了菜单按钮的层级关系。so-fast 提供菜单信息维护接口,详见 swagger 文档【2. 系统内置模块 API-- 菜单操作 (opens new window)】。

# API 资源

so-fast 提供的系统 API 维护功能,sys_api_info 定义了系统 API 信息,其中 id 为数据库自增主键。对于 URI 字段设置切记切实合理,以免造成不必要的权限疏漏。(例:用户操作 API 集合:/user/**)。so-fast 提供菜 API 信息维护接口,详见 swagger 文档【2. 系统内置模块 API--API 操作 (opens new window)】。

# 权限

so-fast 提供的系统权限维护功能,sys_permission 定义了权限信息,其中 id 为数据库自增主键。权限类型分为 API、页面、按钮三类,由于页面按钮均属于前端权限,因此,也可理解为权限分为后台权限与前台权限。so-fast 提供菜权限信息维护接口,详见 swagger 文档【2. 系统内置模块 API-- 权限操作 (opens new window)】。

# 角色

so-fast 提供的系统角色维护功能,sys_role 定义了角色信息,其中 id 为数据库自增主键。role 字段在系统控制时会被用到,因此不可重复。系统内置 admin 角色,此角色禁止删除。so-fast 提供菜权限角色维护接口,详见 swagger 文档【2. 系统内置模块 API-- 角色操作 (opens new window)】。

# 数据创建流程
  • 根据系统需求创建基础 API 信息(如需要精心细粒度的后台 API 权限控制)、菜单信息。
  • 根基系统需求创建基础权限,如系统管理权限、业务操作权限、数据查看权限等妥当的权限划分。
  • 基于系统基础权限创建角色,如管理员、业务员、游客等。
  • 为用户分配角色。
  • 开始使用。

# 安全认证

# 概念

Json web token (JWT), 是为了在网络应用环境间传递声明而执行的一种基于 JSON 的开放标准((RFC 7519 (opens new window)). 该 token 被设计为紧凑且安全的,特别适用于分布式站点的单点登录(SSO)场景。JWT 的声明一般被用来在身份提供者和服务提供者间传递被认证的用户身份信息,以便于从资源服务器获取资源,也可以增加一些额外的其它业务逻辑所必须的声明信息,该 token 也可直接被用于认证,也可被加密。

RBAC:Role-BasedAccess Control,基于角色的访问控制。RBAC 认为权限授权实际上是 Who、What、How 的问题。在 RBAC 模型中,who、what、how 构成了访问权限三元组,也就是 “Who 对 What (Which) 进行 How 的操作,也就是 “主体” 对 “客体” 的操作,其中 who—— 是权限的拥有者或主体(如:User、Role) what—— 是资源或对象(Resource、Class) how—— 是操作方式(CRUD)

so-fast 微服务架构遵循 RBAC 设计理念,集成 spring-security-oauth2、spring-security 实现微服务集群的安全控制。

# 授权方式

so-fast 微服务架构实现了基于 Oauth2.0 标准的四种授权:授权码模式(authorization code)、简化模式(implicit)、密码模式(resource owner password credentials)、客户端模式(client credentials),并规划在后续迭代中提供诸如微信认证、AppleID 认证、微博认证等一些列的第三方授权功能。

# 授权方式控制

so-fast 内置 sys_oauth_client_details 表,以灵活配置不同的授权场景。其可配置:授权方式、token 的有效期、作用域、跳转 url 等授权信息。

当前版本只能手动修改数据库进行配置,后续迭代会提供超级管理页面来实现授权方式的可视化控制。

# so-fast 授权特点

基于 UPMS 设计,so-fast 的安全认证被设计为客户端和服务器端同时持有 Token 令牌,在认证时需要检查 Token 的合法性以及时效性。

so-fast 提供的安全认证主要涉及页面控制、API 控制两个大类。通过上诉 UPMS 模块的设计可以看出,系统的所包含的各个页面、以及 API 接口均可在 UPMS 中进行维护,通过创建权限并指定权限范围就可以进行细粒度的页面与 API 的控制。so-fast 的授权被设计为可应对多种情况的灵活配置

# 核心功能

请求授权

以最常见的密码模式为例。应用终端在向服务器申请授权时,需要向服务器提供三种信息:用于 Basic 认证客户端口令(sys_oauth_client_details 表中配置的客户端以及客户端秘钥)、授权方式、用户名密码。其中客户端口令的设置需要进行 base 转码。

例如在 sys_oauth_client_details 表中配置了一各 ID 为 web、秘钥为 000000 的客户端,并且配置该客户端支持密码模式,客户端口令的获取步骤如下:

1、字符串拼接得到(客户端 ID: 客户端秘钥):web:000000

2、上述字符串 Base64 转码得到:d2ViOjAwMDAwMA==

基于上诉准备工作,请求授权的接口信息如下:

请求方式:post

  • URL:http://XXXXX/so-fast-uaa/login/web (opens new window)

  • 请求方式:post

  • 参数:header

    Authorization:Basic d2ViOjAwMDAwMA==
    Content-Type:application/json
    
    1
    2
    1
    2

    参数:jsonbody

{
"grant_type":"password", //授权方式
"username":"admin2", //用户名
"password":"000000" //密码
}
1
2
3
4
5
1
2
3
4
5

返回内容

{
    "status": 0,
    "code": null,
    "message": null,
    "data": {
        "access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJsaWNlbnNlIjoic28tZmFzdCIsInVzZXJfbmFtZSI6ImFkbWluMiIsInNjb3BlIjpbInJlYWQiLCJ3cml0ZSJdLCJleHAiOjE2MTQ5MTU0OTgsInVzZXJOYW1lIjoiYWRtaW4yIiwidXNlcklkIjozLCJhdXRob3JpdGllcyI6WyJhZG1pbiJdLCJqdGkiOiI4MWNmYWRiZS1jZjQzLTRiYjAtYWZiYi0zYWIyM2M2M2E4OWMiLCJjbGllbnRfaWQiOiJ3ZWIifQ.h_4yFBsqzriN4fZYHk9c5GO2UVI3HaMAlGnvLKqWbwM",
        "jti": "81cfadbe-cf43-4bb0-afbb-3ab23c63a89c",
        "license": "so-fast",
        "refresh_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJsaWNlbnNlIjoic28tZmFzdCIsInVzZXJfbmFtZSI6ImFkbWluMiIsInNjb3BlIjpbInJlYWQiLCJ3cml0ZSJdLCJhdGkiOiI4MWNmYWRiZS1jZjQzLTRiYjAtYWZiYi0zYWIyM2M2M2E4OWMiLCJ1c2VyTmFtZSI6ImFkbWluMiIsInVzZXJJZCI6MywiYXV0aG9yaXRpZXMiOlsiYWRtaW4iXSwianRpIjoiMGU1NTBmMzQtNTVmZC00MTFmLTlkMjYtNTA3ZTE0Mjg2ZjBkIiwiY2xpZW50X2lkIjoid2ViIn0.-ppOrp3oJE_97ABERPrzkbKmS0INbl-IW4cT-eBR8hw",
        "scope": [
            "read",
            "write"
        ],
        "token_type": "bearer",
        "expires_in": 4999,
        "userId": "3",
        "userName": "admin2"
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

token 刷新

在 token 的刷新场景同样需要提供用于 Basic 认证客户端口令(sys_oauth_client_details 表中配置的客户端以及客户端秘钥),授权方式指定为 refresh_token,并将应用端持有的 refresh_token 提交给服务器,详细如下:

  • URL:http://XXXXX/so-fast-uaa/login/refreshToken (opens new window)

  • 请求方式:post

  • 参数:header

    Authorization:Basic d2ViOjAwMDAwMA==
    Content-Type:application/json
    
    1
    2
    1
    2

    参数:jsonbody

{
    "grant_type":"refresh_token",
"refreshToken":"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJsaWNlbnNlIjoic28tZmFzdCIsInVzZXJfbmFtZSI6ImFkbWluMiIsInNjb3BlIjpbInJlYWQiLCJ3cml0ZSJdLCJhdGkiOiI4MWNmYWRiZS1jZjQzLTRiYjAtYWZiYi0zYWIyM2M2M2E4OWMiLCJ1c2VyTmFtZSI6ImFkbWluMiIsInVzZXJJZCI6MywiYXV0aG9yaXRpZXMiOlsiYWRtaW4iXSwianRpIjoiMGU1NTBmMzQtNTVmZC00MTFmLTlkMjYtNTA3ZTE0Mjg2ZjBkIiwiY2xpZW50X2lkIjoid2ViIn0.-ppOrp3oJE_97ABERPrzkbKmS0INbl-IW4cT-eBR8hw"
}
1
2
3
4
1
2
3
4

返回内容

{
    "status": 0,
    "code": null,
    "message": null,
    "data": {
        "access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJsaWNlbnNlIjoic28tZmFzdCIsInVzZXJfbmFtZSI6ImFkbWluMiIsInNjb3BlIjpbInJlYWQiLCJ3cml0ZSJdLCJleHAiOjE2MTQ5MTU3NjAsInVzZXJOYW1lIjoiYWRtaW4yIiwidXNlcklkIjozLCJhdXRob3JpdGllcyI6WyJhZG1pbiJdLCJqdGkiOiI0MGYzNDc4MC1iMzIxLTQ5YWEtOWU3My1mNTk0Y2YxZGViNWYiLCJjbGllbnRfaWQiOiJ3ZWIifQ.6qaF_zi3113cEYhkjzA6XzvSJcgYEnLXo3mgUtOzRPw",
        "jti": "40f34780-b321-49aa-9e73-f594cf1deb5f",
        "license": "so-fast",
        "refresh_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJsaWNlbnNlIjoic28tZmFzdCIsInVzZXJfbmFtZSI6ImFkbWluMiIsInNjb3BlIjpbInJlYWQiLCJ3cml0ZSJdLCJhdGkiOiI0MGYzNDc4MC1iMzIxLTQ5YWEtOWU3My1mNTk0Y2YxZGViNWYiLCJ1c2VyTmFtZSI6ImFkbWluMiIsInVzZXJJZCI6MywiYXV0aG9yaXRpZXMiOlsiYWRtaW4iXSwianRpIjoiMGU1NTBmMzQtNTVmZC00MTFmLTlkMjYtNTA3ZTE0Mjg2ZjBkIiwiY2xpZW50X2lkIjoid2ViIn0.iZ78-f1YSH8H_hu04en7qfLVMly409PfuiP5BHt3t5w",
        "scope": [
            "read",
            "write"
        ],
        "token_type": "bearer",
        "expires_in": 4999,
        "userId": "3",
        "userName": "admin2"
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

业务请求

在普通业务场景下,应用端只需向服务器提供有效的 token 即可,详细如下:

  • URL:http://XXXXX/xxxxx/xxxxx/page (opens new window)

  • 请求方式:post、get 等

  • 参数:header

    Authorization:Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJsaWNlbnNlIjoic28tZmFzdCIsInVzZXJfbmFtZSI6ImFkbWluMiIsInNjb3BlIjpbInJlYWQiLCJ3cml0ZSJdLCJleHAiOjE2MTQ3NTk5OTksInVzZXJOYW1lIjoiYWRtaW4yIiwidXNlcklkIjozLCJhdXRob3JpdGllcyI6WyJhZG1pbiJdLCJqdGkiOiI5MzYxYTk5OS02NWZkLTQyZGYtOWIxMS02MDk1MTgxNjkyZGQiLCJjbGllbnRfaWQiOiJ3ZWIifQ._V-g8_OZQYyy_qfpBpnFx_z5ithPRBukNG8x-sz2j38
    Content-Type:application/json
    
    1
    2
    1
    2

    参数:jsonbody

    略
    
    1
    1

    返回内容

    {
        略
    }
    
    1
    2
    3
    1
    2
    3
# 异常返回
  • 如登录时用户名口令错误,请求状态为 200,返回结果如下:
{
    "code": "Err.E001",
    "message": "登陆失败,请确认用户名口令.",
    "status": 1
}
1
2
3
4
5
1
2
3
4
5
  • 如验证码错误,请求状态为 200,返回结果如下
{
    "code": "Err.E010",
    "message": "请输入正确的验证码.",
    "status": 1
}
{
    "code": "Err.E011",
    "message": "验证码已过期.",
    "status": 1
}
1
2
3
4
5
6
7
8
9
10
1
2
3
4
5
6
7
8
9
10
  • 如访问白名单以外的 API 时不携带 token,则会返回:401 错误,详细内容为:
{
    "code": "Err.E003",
    "message": "TOKEN非法或已过期.",
    "status": 1
}
1
2
3
4
5
1
2
3
4
5
  • 如客户端持有的 token 过期,则会返回:401 错误,详细内容为:
{
    "code": "Err.E003",
    "message": "TOKEN非法或已过期.",
    "status": 1
}
1
2
3
4
5
1
2
3
4
5
  • 如使用非本系统授权的 token,则会返回:401 错误,详细内容为:
{
    "code": "Err.E003",
    "message": "TOKEN非法或已过期.",
    "status": 1
}
1
2
3
4
5
1
2
3
4
5
  • 如访问受限的 API,则会返回:403 错误,详细内容为:
{
    "code": "Err.E002",
    "message": "访问受限,请确认权限.",
    "status": 1
}
1
2
3
4
5
1
2
3
4
5
# 服务保护

微服务集群中的所有服务理论上都需要被认证服务器保护,so-fast 被设计为基于 so-fast 自有注解 EnableSolResourceServer 的方式开启服务的安全防护。详细如下:

@EnableSwagger2
@EnableSolFeign
@EnableDiscoveryClient
@EnableSolResourceServer
@SpringBootApplication(exclude = DruidDataSourceAutoConfigure.class)
public class SoFastUpmsApplication {

    public static void main(String[] args) {
        SpringApplication.run(SoFastUpmsApplication.class, args);
    }

}
1
2
3
4
5
6
7
8
9
10
11
12
1
2
3
4
5
6
7
8
9
10
11
12
# 安全配置

与传统 Oauth2.0 不同的是,so-fast 提供用户踢出和禁止登录功能,以灵活地应对客户需求。so-fast 内置安全相关的默认配置,各项目可依据自身需求酌情修改,配置文件如下:

#安全配置
sofast:
  security:
    jwt:
      #token版权声明
      license: so-fast
      #token加密
      sing-key: 09152205192rytt103080514ShuCCCCC---henLLLLVVVVdedoo(rtertter)dds
    need:
      #token中知否植入用户信息
      user-info: true
      #是否对API进行细粒度的判定
      details-authority: true
    #token校验配置
    limit:
      #是否禁止同一账号多终端登录
      multiple-login: false
      #当检测到同时登录时,是禁止登录还是踢出已登录。此项设置为true时,踢出前一个登录终端
      kicked-out: true
    #token校验配置
    token-check:
      #客户端ID
      client-id: web
      #客户端秘钥
      client-secret: "000000"
      #是否使用服务名指定认证服务器,此项设置为true是,check-token-endpoint-url请设置为集群中的认证服务名称
      check-token-by-service-name: true
      #认证服务地址
      check-token-endpoint-url: http://so-fast-uaa/oauth/check_token
    #服务白名单
    white-list:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

sing-key 为 token 的签名秘钥,切记妥善保存。

token 信息的控制部分在 UAA 工程内覆盖配置,比如当系统不需要在 token 中织入用户信息并且禁止多终端登录时(禁止登录)时,可在 UAA 工程的配置文件中添加以下配置:

#安全配置
sofast:
  security:
    need:
      #token中知否植入用户信息
      user-info: false
    limit:
      #是否禁止同一账号多终端登录
      multiple-login: true
      #当检测到同时登录时,是禁止登录还是踢出已登录。此项设置为true时,踢出前一个登录终端
      kicked-out: false
1
2
3
4
5
6
7
8
9
10
11
1
2
3
4
5
6
7
8
9
10
11

集中鉴权配置在业务服务中可进行配置覆盖,示例如下:

#安全配置
sofast:
  security:
    token-check:
      #客户端ID
      client-id: web
      #客户端秘钥
      client-secret: "000000"
      #是否使用服务名指定认证服务器,此项设置为true是,check-token-endpoint-url请设置为集群中的认证服务名称
      check-token-by-service-name: true
      #认证服务地址
      check-token-endpoint-url: http://so-fast-uaa/oauth/check_token
    #服务白名单
    white-list:/aaaa/**
1
2
3
4
5
6
7
8
9
10
11
12
13
14
1
2
3
4
5
6
7
8
9
10
11
12
13
14

# 字典管理

so-fast 提供了通用的字典管理功能。ER 图如下:

img

为了提高字典的适用性,so-fast 同时提供字典类型以及字典数据的双重管理。在需要添加新的字典数据时,首先添加字典类型,然后基于创建的字典类型创建字典数据。

为了提升字典数据的使用效率,以及减少数据库的开销,so-fast 在启动时会加载字典数据至 redis,并在字典维护的模块提供内存重置的功能以保证数据的时效性。

# 内容管理

# 公告通知类

so-fast 基础框架内置公告基础模块,公告支持简单的富文本编辑,设置初衷是简单、轻量,承载公告类的信息发布。

# 文章博客类

so-fast 基础框架内置文基础模块,包括文章、新闻、博客等富文本内容的编辑、发布、管理等基础功能,审核功能暂不提供,请依据项目实际需求扩展追加

国内开发技术委员会会在后续更新迭代中逐步丰富 CMS 的功能,如系统通知、轮播图管理、文章推荐、内容审核、全文检索等。

# 日志

so-fast 基于 AOP 提供基础的操作日志记录功能,并用过异步线程写入数据库,尽量规避对被织入请求的性能影响。日志记录可以在配置文件中配置日志记录种类,参见如下。日志记录内容主要从注解输入、请求信息以及 token 中获取。

log:
  operator:
    types: insert,update,delete
1
2
3
1
2
3

登录日志如下所示:

img

操作日志如下所示:

img

# 日志分析

略

# xss

so-fast 通过内置的 XssFilter 实现了跨站脚本攻击(xss)的防范。通过在配置文件中配置 XSS 的防范功能开启以及排除连接等信息,可以控制系统 XSS 防范功能。

框架默认不开启 xss,如果使用该功能,请现在 pom.xml 中添加依赖:

<dependency>
    <groupId>com.sofast.cloud</groupId>
    <artifactId>so-fast-xss-starter</artifactId>
</dependency>
1
2
3
4
1
2
3
4

并在配置文件中进行开启和配置:

# 防止XSS攻击
xss:
  # 过滤开关
  enabled: true
  # 排除链接(多个用逗号分隔)
  excludes:
  # 匹配链接
  urlPatterns: /*
1
2
3
4
5
6
7
8
1
2
3
4
5
6
7
8

XssFilter 中,对于非排除对象的所有 request 请求进行拦截过滤和请求的重写,其主要基于 HTMLFilter 进行非法内容而过滤拦截。详见工程中的 HTMLFilter 文件。

# 动态切换数据源

so-fast 框架支持基于注解的动态数据源切换功能。默认只支持主从库。

使用方式如下:

  • 配置文件配置主从 DB

     # 数据源配置
     spring:
       datasource:
         type: com.alibaba.druid.pool.DruidDataSource
         driverClassName: com.mysql.cj.jdbc.Driver
         druid:
           # 主库数据源
           master:
             url: jdbc:mysql://xxxx:3306/so-fast?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
             username: username
             password: password
           # 从库数据源
           slave:
             # 从数据源开关/默认关闭
             enabled: false
             url:
             username:
             password:
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
  • 通过注解切换 DB

    在 Controller 或 Service 的类或方法上添加注解。例如:读写分离场景

     @DynamicDS(value = DataSourceType.SLAVE)
     @ApiOperation(value = "查询操作日志")
     @LogOperator(title = "查询操作日志", type = Constants.SELECT_OPERATOR)
     @GetMapping(value = "/")
     public R<Page<OperatorLog>> list(OperatorLogQueryVo operatorLogQueryVo) {
    
     @DynamicDS(value = DataSourceType.MASTER)
     @ApiOperation(value = "根据ID删除日志")
     @LogOperator(title = "日志删除", type = Constants.DELETE_OPERATOR)
     @PostMapping(value = "/delete/{ids}")
     public R delete(@PathVariable Long[] ids) {
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11

# 统一异常处理

在开发过程中往往存在异常捕捉遗漏、异常处理方式不统一等情况。因此 so-fast 提供了统一的异常处理。

so-fast 基于全局的异常拦截拦截各种系统异常、业务异常,统一处理异常信息。支持以下异常种类:其中 FileException、CommonException、SystemException、BizException 为 so-fast 的内置异常。异常被截获时,通过 logback 写入系统日志之后,封装成为统一的格式返回调用方。

Exception
BindException
IllegalArgumentException
DuplicateKeyException
SQLIntegrityConstraintViolationException
IOException
FileException
CommonException
SystemException
BizException
1
2
3
4
5
6
7
8
9
10
1
2
3
4
5
6
7
8
9
10
# 自定义异常

略

# 统一返回体

so-fast 框架规定必须使用统一返回体,保证接口的统一性。

so-fast 内置 R 作为统一返回体,R 类中已经包含了业务开发中的大多数场景返回值需求。

成功响应时:

R.ok()  // 成功时返回信息,只返回成功状态码。

R.data(data) // 成功时返回信息,只返回状态码和结果体。

R.code(code, codeArgs[]) // 成功时返回信息,只返回自定义提示信息。
1
2
3
4
5
1
2
3
4
5

失败响应时:

R.ng() // 错误时返回信息,只返回状态码

R.ng(code, codeArgs) // 错误时返回信息,返回状态码和错误信息

R.ng(data, code, codeArgs) // 错误时返回信息,返回数据体、状态码和错误信息
1
2
3
4
5
1
2
3
4
5

# 文件上传

文件上传功能是 so-fast 内置功能,推荐用于较小文件上传,对于大文件并没有做切片和多线程的优化。

文件上传功能仅提供本地文件存储或 NFS 文件存储,目前不支持对象存储。

so-fast 框架内置了 BaseFileController 用于所有文件上传的操作。

# 文件上传分类

so-fast 框架推荐将上传的文件进行分类,并内置了对应的 api 进行处理:

头像上传:/file/upload/avatar: 头像上传专用于个人中心的头像处理,仅支持单文件上传。

图片上传:/file/upload/image: 图片上传用于所有图片类型的文件上传,有图片类型的检查,方便专用于图片的上传业务。

文件上传:/file/upload: 文件上传是通用的上传,支持大多数类型文件(可配置)。

ps:当然,也可以不进行分类,统一使用某一种或某两种 api 进行上传。视业务而定

    @Value("${sofast.file.avatarPath}")
    private String avatarPath;

    @Value("${sofast.file.imagePath}")
    private String imagePath;

    @Value("${sofast.file.filePath}")
    private String filePath;

    @ApiOperation(value = "头像上传")
    @PostMapping("/file/upload/avatar")
    public R uploadAvatar(@RequestParam("file") MultipartFile file) throws IOException {
        // 获取文件存储的根目录
        return R.data(SolFileUtils.uploadImage(file, avatarPath));
    }

    @ApiOperation(value = "图片上传")
    @PostMapping(value = "/file/upload/image", headers = "Content-Type=multipart/form-data")
    public R uploadPicture(@RequestParam("files") MultipartFile[] files) throws IOException         {
        // 获取文件存储的根目录
        return R.data(SolFileUtils.uploadImages(files, imagePath));
    }

    @ApiOperation(value = "文件上传,支持所有的文件格式")
    @PostMapping(value = "/file/upload", headers = "Content-Type=multipart/form-data")
    public R upload(@RequestParam("files") MultipartFile[] files) throws IOException {
        // 获取文件存储的根目录
        return R.data(SolFileUtils.uploadFiles(files, filePath));
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

# 上传路径配置

文件上传需要配置上传的目录,根据 api 分类,so-fast 也提供了 3 个文件目录配置项,可分类使用,也可使用其中某一个。

sofast:
  file:
    # 头像存储路径
    avatarPath: /Users/yyq/Downloads/sofast/avatar/
    # 图片文件上传路径
    imagePath: /Users/yyq/Downloads/sofast/images/
    # 文件上传路径
    filePath: /Users/yyq/Downloads/sofast/files/
1
2
3
4
5
6
7
8
1
2
3
4
5
6
7
8

# 上传文件大小配置

文件上传时,一般前后端都会有文件大小的限制与检查。在后台可通过配置文件灵活调整文件 size 限制:

spring:
  # 文件上传
  servlet:
    multipart:
      # 单个文件大小
      max-file-size:  10MB
      # 设置总上传的文件大小
      max-request-size:  200MB
1
2
3
4
5
6
7
8
1
2
3
4
5
6
7
8

# 文件上传超时问题

文件上传超时问题可能是常见问题,需要根据业务中文件的最大 size 进行计算后,配置一个相对合理的超时时间。

涉及超时时间的有:

前台 vue 部分,发送上传请求时有 timeout 参数

后台服务部分,接口的 timeout 参数

spring:
  mvc:
    async:
      # 超时时间
      request-timeout: 25000
1
2
3
4
5
1
2
3
4
5

如果有代理服务器,例如 nginx,那么针对文件上传也要配置一些 nginx 参数。

# 分布式锁

在一些特定业务场景,比如秒杀、提现等情况,需要保证特定业务的执行顺序才能正确的处理业务,而不会出现茅台多卖,薅羊毛等问题出现。在单机部署的系统中,我们可以通过 JDK 提供的 synchronized 关键字、ReentranLock,或者基于 AQS 定制化锁,单机部署的情况下,锁是在多线程之间共享的,但是分布式部署的场景下,却无法提供多进程间的共享。因此需要使用分布式锁来进行多进程间共享。

分布式锁的实现方式有很多种,SoFast 框架内置提供基于 Redis 的分布式锁实现,如果使用该功能,请现在 pom.xml 中添加依赖:

<dependency>
    <groupId>com.sofast.cloud</groupId>
    <artifactId>so-fast-lock-starter</artifactId>
</dependency>
1
2
3
4
1
2
3
4

在配置文件中正常进行 Redis 配置即可:

spring:
    redis:
    # 地址
    host: xxxx
    port: xxxx
    # 密码
    password: xxxxxxx
    # 连接超时时间
    timeout: 15s
    jedis:
      pool:
        # 连接池中的最小空闲连接
        min-idle: 3
        # 连接池中的最大空闲连接
        max-idle: 10
        # 连接池的最大数据库连接数
        max-active: 50
        # #连接池最大阻塞等待时间(使用负值表示没有限制)
        max-wait: -1ms
    database: 4
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

# 使用

在业务开发中,如果要使用分布式锁,请参考以下两种方式:

# 1. 注解方式

SOFAST 框架中封装了分布式锁注解,通过注解可以非常方便的开启分布式锁

例如:@RedisLock (redisKey = "test-redis-lock-1", waitTime = 5)

使用分布式锁注解,必须提供两个属性:

redisKey:指明在 redis 中存储的 key,请保证唯一性

waitTime:获取锁时的最大等待时间,单位秒

注意点:waitTime 需要根据业务情况设置等待时间,目前 redis 锁的默认最大锁定时间为 60 秒,因此该值必须在 60 秒以内,实际上再业务处理中也很难达到 60 秒的等待时间。

该分布式锁可用于解决 Schedule 定时任务多节点部署的问题,可通过使用分布式锁,防止任务重复执行:

1. 添加 RedisLock 注解,并配置参数,waitTime 需要根据业务时间处理,例如配置 3 秒,那么在实际的业务执行中,要保证 3 秒钟不释放锁,即可保证锁的成功。实现方式很多,例如可显示的强制线程锁定 6 秒:Thread.Sleep (6000)

# 2. 自定义方式

自定义方式在使用上更加灵活,能满足各种业务场景,但是在处理上要麻烦一些,锁的获取、释放等需要业务编码进行控制。

# 注入 Bean
@Autowired
RedisLockRegistry lockRegistry;
1
2
1
2
# 定义一个锁
Lock lock = lockRegistry.obtain("test-key");
1
1

注意:这里的 key 是在 redis 中存储的 key,要保证业务唯一性。key 的粒度要尽量小,比如:「xxx 业务 - xxx 用户 - xxx 方法」等组成的 key,可以减少锁冲突。

# 持有锁,并进行业务处理
if (lock.tryLock(3, TimeUnit.SECONDS)) {
  // 持有锁成功,进行业务处理
}
1
2
3
1
2
3

注意:tryLock 时,可以指定时间参数,该参数表示等待获取锁的最大时间。

# 业务处理完成后,释放锁
lock.unlock();
1
1

注意:unlock 一定要在 finally 中执行,保证可以正确的释放锁。

# Rabbitmq

so-fast 提供对消息队列 rabbitmq 的支持。

如果使用该功能,请现在 pom.xml 中添加依赖:

<dependency>
    <groupId>com.sofast.cloud</groupId>
    <artifactId>so-fast-rabbitmq-starter</artifactId>
</dependency>
1
2
3
4
1
2
3
4

并在配置文件中进行配置:

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
1
2
3
4
5
6
1
2
3
4
5
6

这里只是一个配置示例,更多配置可以参考文末 sf-rabbit.yml 文件和官方配置 https://docs.spring.io/spring-boot/docs/2.3.5.RELEASE/reference/html/appendix-application-properties.html#common-application-properties。

# 使用

RabbitMQ 官网提供了七种度列模型,分别是简单队列、工作队列、发布订阅、路由模式、主题模式、RPC 模式、发布者确认模式。前五种是常用模型。在 SpringBoot 中已经对消息队列提供了完善的封装,因此我们直接使用 SpringBoot 的 amqp 进行开发即可。

在 sofast 中对这五种模型进行了默认封装,可直接使用,也可自定义。

# 简单模式

声明队列(sofast 已经内置)

/**
 * 简单队列模式:最简单的工作队列,其中一个消息生产者,一个消息消费者,一个队列。也称为点对点模式
 * 该模式只允许一个生产者和一个消费者,因此只能在单节点使用
 */
String SIMPLE_MODE_QUEUE_DEFAULT = "queue.simple.default";
/**
 * 简单队列
 * @return
 */
@Bean
public Queue simpleQueue() {
    return new Queue(RabbitConstant.SIMPLE_MODE_QUEUE_DEFAULT);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
1
2
3
4
5
6
7
8
9
10
11
12
13

生产者

@Autowired
private AmqpTemplate rabbitTemplate;

public void sendMessage() {
    String message = "简单队列-Message";
    System.out.println("发送消息 : " + message);
    rabbitTemplate.convertAndSend(RabbitConstant.SIMPLE_MODE_QUEUE_DEFAULT,message);
}
1
2
3
4
5
6
7
8
1
2
3
4
5
6
7
8

消费者

@Component
@RabbitListener(queues = RabbitConstant.SIMPLE_MODE_QUEUE_DEFAULT)
public class Consumer {
    /**
     * @RabbitListener 和 @RabbitHandler 搭配使用
     * @RabbitListener可以标注在类上面,需配合 @RabbitHandler 注解一起使用
     * 标注在类上面表示当有收到消息的时候,就交给 @RabbitHandler 的方法处理,
     * 具体使用哪个方法处理,根据接收到的 message 参数类型
     **/
    @RabbitHandler
    public void process(String message) {
        System.out.println("消费成功  : " + message);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 工作队列

声明队列(sofast 已经内置)

/**
 * 工作模式:一个消息生产者,一个交换器,一个消息队列,多个消费者。同样也称为点对点模式
 * 该模式允许一个生产者和多个消费者,因此消费者可允许多节点
 * 该模式下消息分发可分为轮训分发和公平分发,轮训分发为机械性轮流分发,不管理消费者处理的能力是否不同;
 * 公平分发需要设置channel.basicQos(1);每次只发一条,处理完后再发下一条.
 * <p>
 * 默认情况下,我们程序创建的消息队列以及存放在队列里面的消息,都是非持久化的。当RabbitMQ死掉了或者重启了,上次创建的队列、消息都不会保存。
 * 需要进行以下持久化配置:
 * <p>
 * 参数配置一:生产者创建队列声明时,修改第二个参数为 true
 * channel.queueDeclare(QUEUE_NAME, true, false, false, null);
 * <p>
 * 参数配置二:生产者发送消息时,修改第三个参数为MessageProperties.PERSISTENT_TEXT_PLAIN
 * channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
 */
String WORK_MODE_QUEUE_DEFAULT = "query.work.default";
/**
 * 工作队列
 * @return
 */
@Bean
public Queue workQueue() {
    return new Queue(RabbitConstant.WORK_MODE_QUEUE_DEFAULT);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

配置

工作队列和简单队列在代码上是完全一样的,不同的是:简单队列只有一个消费者,而工作队列有多个消费者。

当存在多个消费者时,队列需要根据策略对不同消费者进行消息分发,rabbit 模式采用轮训方式,即不管消费者能力,机械性轮训分发。我们可以通过配置文件修改分发策略

spring:
  rabbitmq:
    virtual-host: /
    listener:
      simple:
        # 公平分发(限制每次发送一条数据, 必须大于等于transaction数量)
        prefetch: 1
1
2
3
4
5
6
7
1
2
3
4
5
6
7
# 发布订阅

声明队列(sofast 已经内置)

/**
 * 发布/订阅模式:无选择接收消息,一个消息生产者,一个交换器,多个消息队列,多个消费者。称为发布/订阅模式
 * 发布订阅模式引入了交换机(EXCHANGE)概念,生产者不是直接操作队列,而是将数据发送给交换机,由交换机将数据发送给与之绑定的队列.
 * <p>
 * =====发布端=====
 * 绑定的交换机 参数1交互机名称 参数2 exchange类型
 * channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
 * <p>
 * 发送消息
 * channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("utf-8"));
 * <p>
 * =====订阅端=====
 * 消费者绑定交换机 参数1 队列 参数2交换机 参数3 routingKey
 * channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
 */
String FANOUT_MODE_QUEUE_DEFAULT = "queue.fanout.default";
String FANOUT_MODE_QUEUE_EXTEND = "queue.fanout.extend";
String FANOUT_MODE_EXCHANGE_DEFAULT = "exchange.fanout.default";
String EXCHANGE_TYPE_FANOUT = "fanout";
// 发布/订阅模式:一个交换机对应多个队列  ======= start

/**
 * 发布/订阅队列(默认队列)
 * @return
 */
@Bean
public Queue fanoutQueueDefault() {
    return new Queue(RabbitConstant.FANOUT_MODE_QUEUE_DEFAULT);
}
/**
 * 发布/订阅队列(扩展队列)
 * @return
 */
@Bean
public Queue fanoutQueueExtend() {
    return new Queue(RabbitConstant.FANOUT_MODE_QUEUE_EXTEND);
}
/**
 * 交换机
 * @return
 */
@Bean
public FanoutExchange fanoutExchange() {
    return new FanoutExchange(RabbitConstant.FANOUT_MODE_EXCHANGE_DEFAULT);
}

/**
 * 发布/订阅默认绑定队列和交换机
 * @param fanoutQueueDefault 绑定默认队列
 * @param fanoutExchange 绑定交换机
 * @return
 */
@Bean
public Binding fanoutBindingDefault(Queue fanoutQueueDefault, FanoutExchange fanoutExchange) {
    return BindingBuilder.bind(fanoutQueueDefault).to(fanoutExchange);
}
/**
 * 发布/订阅默认绑定队列和交换机
 * @param fanoutQueueExtend 绑定扩展队列
 * @param fanoutExchange 绑定交换机
 * @return
 */
@Bean
public Binding fanoutBindingExtend(Queue fanoutQueueExtend, FanoutExchange fanoutExchange) {
    return BindingBuilder.bind(fanoutQueueExtend).to(fanoutExchange);
}

// 发布/订阅模式:一个交换机对应多个队列  ======= end
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68

生产者

@Autowired
private AmqpTemplate rabbitTemplate;

public void sendMessage() {
    String message = "发布订阅模式-message";
    System.out.println("发送消息 : " + message);
    rabbitTemplate.convertAndSend(RabbitConstant.FANOUT_MODE_EXCHANGE_DEFAULT,message);
}
1
2
3
4
5
6
7
8
1
2
3
4
5
6
7
8

消费者

@Component
public class Consumer {
    @RabbitListener(queues = RabbitConstant.FANOUT_MODE_QUEUE_DEFAULT)
    @RabbitHandler
    public void processDefault(String message) {
        System.out.println("默认队列消费成功  : " + message);
    }
    @RabbitListener(queues = RabbitConstant.FANOUT_MODE_QUEUE_EXTEND)
    @RabbitHandler
    public void processExtend(String message) {
        System.out.println("扩展队列消费成功  : " + message);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
1
2
3
4
5
6
7
8
9
10
11
12
13
# 路由模式

路由模式是在发布订阅模式的基础上,有选择的接收消息,即通过设置 routing 进行条件匹配接收消息

声明队列(sofast 已经内置)

    /**
     * 路由模式:在发布/订阅模式的基础上,有选择的接收消息,也就是通过 routing 路由进行匹配条件是否满足接收消息。
     * 路由模式在交换机的基础上又引入了路由(routing)概念,决定消息向队列推送的主要取决于路由,而不是交换机了.
     * <p>
     * =====发布端=====
     * 绑定的交换机 参数1交互机名称 参数2 exchange类型
     * channel.exchangeDeclare(EXCHANGE_NAME, "direct");
     * 发送消息
     * channel.basicPublish(EXCHANGE_NAME, sendType, null, message.getBytes("utf-8"));
     * <p>
     * =====订阅端=====
     * 消费者绑定交换机 参数1 队列 参数2交换机 参数3 routingKey
     * channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routingKey");
     */
    String DIRECT_MODE_EXCHANGE_DEFAULT = "exchange.direct.default";
    String EXCHANGE_TYPE_DIRECT = "direct";
    /** TODO 路由模式下,queue和routingKey具有业务性,应该在消费端根据业务不同进行定义,这里只是示例 */
    String DIRECT_MODE_QUEUE_DEFAULT = "queue.direct.default";
    String DIRECT_MODE_QUEUE_EXTEND = "queue.direct.extend";

    String DIRECT_MODE_ROUTING_KEY_DEFAULT = "routing.key.default";
    String DIRECT_MODE_ROUTING_KEY_EXTEND = "routing.key.extend";
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

因为路由模式,需要根据业务需求提供 routingKey,因此在 sofast 中并没有对此做配置,在业务服务中进行配置使用较好。使用方式同上述发布订阅模式类型,只是增加了 routingKey,交换机类型换成了 Direct。

// 路由模式:通过routing key进行消息选择  ======= start
@Bean
public Queue directQueue() {
    return new Queue(RabbitConstant.DIRECT_MODE_QUEUE_DEFAULT);
}
@Bean
public Queue directQueueExtend() {
    return new Queue(RabbitConstant.DIRECT_MODE_QUEUE_EXTEND);
}

/**
 * 路由模式交换机
 * 交换机和队列之间通过binding key进行关联,只有当消息的routing key和binding key相同时才会被消费
 * @return
 */
@Bean
public DirectExchange directExchange() {
    return new DirectExchange(RabbitConstant.DIRECT_MODE_EXCHANGE_DEFAULT);
}
@Bean
public Binding directBindingDefault(Queue directQueue, DirectExchange directExchange) {
    return BindingBuilder.bind(directExchange).to(directExchange).with(RabbitConstant.DIRECT_MODE_ROUTING_KEY_DEFAULT);
}
@Bean
public Binding directBindingExtend(Queue directQueueExtend, DirectExchange directExchange) {
    return BindingBuilder.bind(directQueueExtend).to(directExchange).with(RabbitConstant.DIRECT_MODE_ROUTING_KEY_EXTEND);
}

// 路由模式:通过routing key进行消息选择  ======= end
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

生产者

@Autowired
private AmqpTemplate rabbitTemplate;

/**
* 生产者将消息发送给交换机,并绑定default key
**/
public void sendMessageDefault() {
    String message = "路由模式-defaultKey-message";
    System.out.println("发送消息 : " + message);
    rabbitTemplate.convertAndSend(RabbitConstant.DIRECT_MODE_EXCHANGE_DEFAULT,RabbitConstant.DIRECT_MODE_ROUTING_KEY_DEFAULT,message);
}

/**
* 生产者将消息发送给交换机,并绑定extend key
**/
public void sendMessageExtend() {
    String message = "路由模式-extendKey-message";
    System.out.println("发送消息 : " + message);
    rabbitTemplate.convertAndSend(RabbitConstant.DIRECT_MODE_EXCHANGE_EXTEND,RabbitConstant.DIRECT_MODE_ROUTING_KEY_EXTEND,message);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

消费者

@Component
public class Consumer {

    @RabbitListener(queues = RabbitConstant.DIRECT_MODE_QUEUE_DEFAULT)
    @RabbitHandler
    public void processA(String message) {
        System.out.println("消息路由到了队列Default: " + message);
    }

    @RabbitListener(queues = RabbitConstant.DIRECT_MODE_QUEUE_EXTEND)
    @RabbitHandler
    public void processB1(String message) {
        System.out.println("消息路由到了队列Extend: " + message);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 主题模式

路由模式只能设置固定的 routingkey,主题模式是在路由模式的基础上增加了通配符来选择信息。

声明队列(sofast 已经内置)

/**
 * 主题模式:同样是在发布/订阅模式的基础上,根据主题匹配进行筛选是否接收消息,比路由模式更灵活。
 * direct 不支持匹配 routingKey,一但绑定了就是绑定了,而 topic 主题模式支持规则匹配,只要符合 routingKey 就能发送到绑定的队列上。
 * <p>
 * topics 模式与 routing 模式比较相近,topics 模式不能具有任意的 routingKey,
 * 必须由一个英文句点号“.”分隔的字符串(我们将被句点号“.”分隔开的每一段独立的字符串称为一个单词)
 * 比如 "lazy.orange.fox"。topics routingKey 中可以存在两种特殊字符“”与“#”,用于做模糊匹配,其中“”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)
 * <p>
 * "*" 表示任何一个词
 * "#" 表示0或1个词
 * <p>
 * =====发布端=====
 * 绑定的交换机 参数1交互机名称 参数2 exchange类型
 * channel.exchangeDeclare(EXCHANGE_NAME, "topic");
 * 发送消息
 * channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());
 * <p>
 * =====订阅端=====
 * 消费者绑定交换机 参数1 队列 参数2交换机 参数3 routingKey
 * channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "log.*");
 */
String TOPIC_MODE_EXCHANGE_DEFAULT = "exchange.topic.default";
String EXCHANGE_TYPE_TOPIC = "topic";
/**
 * topic模式下,queue和routingKey具有业务性,应该在消费端根据业务不同进行定义
 */
String TOPIC_MODE_QUEUE_DEFAULT = "queue.topic.default";
String TOPIC_MODE_QUEUE_ERROR = "queue.topic.error";
/**
 * 示例
 */
String TOPIC_MODE_ROUTING_KEY_DEFAULT = "sofast.info.#";
String TOPIC_MODE_ROUTING_KEY_ERROR = "sofast.error.#";
// 主题模式:  ======= // TODO: 2021/3/11
@Bean
public Queue topicQueue() {
    return new Queue(RabbitConstant.TOPIC_MODE_QUEUE_DEFAULT);
}
@Bean
public Queue topicQueueError() {
    return new Queue(RabbitConstant.TOPIC_MODE_QUEUE_ERROR);
}
/**
 * 主题模式交换机
 * <li>路由格式必须以 . 分隔,比如 user.email 或者 user.aaa.email</li>
 * <li>通配符 * ,代表一个占位符,或者说一个单词,比如路由为 user.*,那么 user.email 可以匹配,但是 user.aaa.email 就匹配不了</li>
 * <li>通配符 # ,代表一个或多个占位符,或者说一个或多个单词,比如路由为 user.#,那么 user.email 可以匹配,user.aaa.email 也可以匹配</li>
 */
@Bean
public TopicExchange topicExchange() {
    return new TopicExchange(RabbitConstant.TOPIC_MODE_EXCHANGE_DEFAULT);
}
@Bean
public Binding topicBindingDefault(Queue topicQueue, TopicExchange topicExchange) {
    return BindingBuilder.bind(topicQueue).to(topicExchange).with(RabbitConstant.TOPIC_MODE_ROUTING_KEY_DEFAULT);
}
@Bean
public Binding topicBindingError(Queue topicQueueError, TopicExchange topicExchange) {
    return BindingBuilder.bind(topicQueueError).to(topicExchange).with(RabbitConstant.TOPIC_MODE_ROUTING_KEY_ERROR);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60

发布者

@Autowired
private AmqpTemplate rabbitTemplate;

public void sendMessage() {
    String message1 = "主题模式-message-routingKey-sofast.info.#";
    String message2 = "主题模式-message-routingKey-sofast.error.#";

    System.out.println("发送message1 : " + message1);
    rabbitTemplate.convertAndSend(RabbitConstant.TOPIC_MODE_EXCHANGE_DEFAULT,"sofast.info.system",message1);

    System.out.println("发送message2 : " + message2);
    rabbitTemplate.convertAndSend(RabbitConstant.TOPIC_MODE_EXCHANGE_DEFAULT,"sofast.eror.user",message2);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
1
2
3
4
5
6
7
8
9
10
11
12
13

消费者

@Component
public class Consumer {

    @RabbitListener(queues = RabbitConstant.1_MODE_QUEUE_DEFAULT)
    @RabbitHandler
    public void processA(String message) {
        System.out.println("消息路由到了队列A: " + message);
    }

    @RabbitListener(queues = RabbitConstant.TOPIC_MODE_QUEUE_ERROR)
    @RabbitHandler
    public void processB1(String message) {
        System.out.println("消息路由到了队列B: " + message);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# 手动 Ack 与自动 Ack

当消息被消费者接收并处理后,队列中的消息就会被删除。那么 rabbitmq 如何能准确知道消费者是否接收到消息呢?这就需要通过消息确认机制。当消费者获取消息后,需要向 rabbitmq 发送回执 ack,告知消息已经被接收到。

# Ack 的种类:
  • 自动 Ack:消息一旦被接收,消费者自动发送 ack 到服务器
  • 手动 Ack:消费者接收消息后,需要显式调用回执方法来发送 ack 到服务器
# 场景
  • 自动 Ack:非事务性消息或消息重要度较低,偶尔丢失 1 条对业务并不会产生影响,建议使用自动 ack
  • 手动 Ack:针对不容丢失的重要业务消息,必须使用手动 Ack 方式,考虑处理成功和失败场景,考虑服务宕机或网络异常等场景。保证消息的有且仅有一次的消息处理。
# 使用方式
  • 自动 Ack:自动 ack 由框架自动完成,因此在自动 Ack 场景下,不需要任何编码来实现 Ack。

    但是在自动模式下,消费端可能会因为某些原因导致消费失败(比如引网络波动导致的短暂性网络不通)等情况,那么可以通过 retry 来进行重试。

    需要注意的是,这里的失败场景一定不是因为自身代码原因导致的,因为这种情况重试多少次也是无法成功的,需要修复代码的 bug 才可以;当因为网络或第三方接口暂时不通等导致的情况,可以通过如下配置示例进行自动重试

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto  # 自动ack
        retry:
          enabled: true
          max-attempts: 5
          max-interval: 10000   # 重试最大间隔时间
          initial-interval: 2000  # 重试初始间隔时间
          multiplier: 2 # 间隔时间乘子,间隔时间*乘子=下一次的间隔时间,最大不能超过设置的最大间隔时间
1
2
3
4
5
6
7
8
9
10
11
1
2
3
4
5
6
7
8
9
10
11

重点来了:这里的重试完全是客户端的行为,与 rabbitmq 是没有任何关系的(并不是 rabbitmq 会重新推送消息过来),而重试的依据是通过客户端抛出异常被 AOP 拦截后,间隔指定的时间后再重新执行该方法来进行的。因此这里最好不要进行全局的 try {} catch (Exception e),可能会导致异常无法抛出而重试失效。

重试次数满了之后,消息应该如何处理,也有多种方式:在自动 Ack 模式下,当消息重试结束后默认自动 ack 本消息并且不会将消息重新发送队列(即:RejectAndDontRequeueRecoverer)。

MessageRecoverer 除了 RejectAndDontRequeueRecoverer 的默认实现外,还有另外两种实现 RepublishMessageRecoverer(重新发布消息)和 ImmediateRequeueMessageRecoverer(立即重新返回队列)。RepublishMessageRecoverer 会以新的 routingKey 重新发送到 rabbitmq,然后通过专门的消费者来进行处理;ImmediateRequeueMessageRecoverer 是重新返回原队列,周而复始直到不抛出异常才会停止,因此该种方式一般不推荐使用,可能会产生死循环。

另外还有一种方式是通过死信队列来处理重试失败的消息,这里就不展开。。

  • 手动 Ack:手动 Ack 需要调用代码来显示指定 ack。

channel.basicAck (deliveryTag, multiple):消费成功的 Ack 应答;

channel.basicNack (deliveryTag, multiple, requeue):消费失败的 Ack 应答。

在消费失败的场景下,有两个选择:即 requeue 为 true 还是 false。当 requeue=true 时,消息会重新放入消息队列进行重新消费,直到发送消费成功的 Ack 应答为止;当 requeue=false 时,消息会直接进入死信队列。

那么这里仍然有一个问题:在 requeue=true 的情况下,可能会出现无限制重试,这里可通过配合 redis 计数器进行重试次数指定。

//消费失败重试3次,3次失败后放入死信队列
            int retryCount = (int) redisUtil.get(msgId);

            if (retryCount >= 3) {
                //requeue = false 放入死信队列
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
            } else {
                //requeue = true 放入消费队列重试消费
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
                redisUtil.set(msgId, retryCount + 1);
            }
1
2
3
4
5
6
7
8
9
10
11
1
2
3
4
5
6
7
8
9
10
11

懵逼点来了:在手动 Ack 模式下,其实也可以使用 spring 的自动 retry 配置,但是因为在手动模式下需要显式的发送 Ack 应答,当重试次数达到后,仍然消费失败,那么就无法发送成功的 Ack 应答,因此不推荐在手动 Ack 模式下进行自动 retry。

# 附件:sf-rabbit.yml

# 该配置文件是示例参考文件,需要在自己的微服务中根据需要进行配置,本配置并未启用
# 更多详细配置可参考官方:https://docs.spring.io/spring-boot/docs/2.3.5.RELEASE/reference/html/appendix-application-properties.html#common-application-properties
spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest

    ssl:
      enable: false
      # 指定持有SSL certificate的key store的路径
      key-store:
      # 指定访问key store的密码
      key-store-password:
      # 指定持有SSL certificates的Trust store
      trust-store:
      # 指定访问trust store的密码
      trust-store-password:
      # ssl使用的算法,例如,TLSv1.1
      algorithm:

    virtual-host: /
    # 必须配置这个才会确认回调
    publisher-confirm-type: correlated
    publisher-returns: true
    # 手动提交消息
    listener:
      simple:
        #(手动Ack或自动Ack,这里只能选择一个)
        acknowledge-mode: manual/auto 
        # 公平分发(限制每次发送一条数据, 必须大于等于transaction数量)
        prefetch: 1
        # 指定一个事务处理的消息数量,最好是小于等于prefetch的数量.
        transaction-size: 1
        retry:
          # 是否支持重试
          enabled: true
          # 最大重试次数
          max-attempts: 5
          # 重试有无状态
          stateless: false
          # 时间策略乘数因子
          multiplier: 1.0
          # 第一次和第二次尝试发布或传递消息之间的间隔
          initial-interval: 1000ms
          # 最大重试时间间隔
          max-interval: 10000ms
      direct:
        acknowledge-mode: manual
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50

# Kafka

so-fast 提供对分布式消息队列 kafka 的支持。

如果使用该功能,请现在 pom.xml 中添加依赖:

<dependency>
    <groupId>com.sofast.cloud</groupId>
    <artifactId>so-fast-kafka-starter</artifactId>
</dependency>
1
2
3
4
1
2
3
4

并在配置文件中进行配置:

spring:
  kafka:
      bootstrap-servers: 127.0.0.1:9092
1
2
3
1
2
3

这里只是一个配置示例,具体针对 kafka server,消费者,生产者的消息配置可以参考文末 sf-kafka.properties 文件。

# 使用示例

# 简易示例
下面示例创建了一个生产者,default_topic。消费者用@KafkaListener注解进行加监听,topics表示监听的topic,支持同时监听多个,用英文逗号分隔

/**
 * 生产者
 */
@RestController
public class KafkaProducer {
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    // 发送消息
    @PostMapping("/kafka/normal/{message}")
    public void sendMessage1(@PathVariable("message") String normalMessage) {
        kafkaTemplate.send("default_topic", normalMessage);
    }
}

/**
 * 消费者
 */
@Component
public class KafkaConsumer {
    // 消费监听
    @KafkaListener(topics = {"default_topic"})
    public void onMessage1(ConsumerRecord<?, ?> record){
        // 消费的哪个topic、partition的消息,打印出消息内容
        System.out.println("简单消费:"+record.topic()+"-"+record.partition()+"-"+record.value());
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 生产者 - callback 使用示例
下面示例创建了一个带callback的生产者,我们可以在回调方法中监控消息是否发送成功 或 失败时做补偿处理

/**
 * 第一种写法
 */
@PostMapping("/kafka/callbackOne/{message}")
public void sendMessage2(@PathVariable("message") String callbackMessage) {
    kafkaTemplate.send("default_topic", callbackMessage).addCallback(success -> {
        // 消息发送到的topic
        String topic = success.getRecordMetadata().topic();
        // 消息发送到的分区
        int partition = success.getRecordMetadata().partition();
        // 消息在分区内的offset
        long offset = success.getRecordMetadata().offset();
        System.out.println("发送消息成功:" + topic + "-" + partition + "-" + offset);
    }, failure -> {
        System.out.println("发送消息失败:" + failure.getMessage());
    });
}

/**
 * 第二种写法
 */
@PostMapping("/kafka/callbackTwo/{message}")
public void sendMessage3(@PathVariable("message") String callbackMessage) {
    kafkaTemplate.send("default_topic", callbackMessage).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
        @Override
        public void onFailure(Throwable ex) {
            System.out.println("发送消息失败:"+ex.getMessage());
        }

        @Override
        public void onSuccess(SendResult<String, Object> result) {
            System.out.println("发送消息成功:" + result.getRecordMetadata().topic() + "-"
                    + result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset());
        }
    });
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# 生产者 - 事务使用示例
/**
 * 如果在发送消息时需要创建事务,可以使用 KafkaTemplate 的 executeInTransaction 方法来声明事务
 */
@PostMapping("/kafka/transaction")
public void sendMessage4(){
    // 声明事务:后面报错消息不会发出去
    kafkaTemplate.executeInTransaction(operations -> {
        operations.send("default_topic","test executeInTransaction");
        throw new RuntimeException("fail");
    });

    // 不声明事务:后面报错但前面消息已经发送成功了
   kafkaTemplate.send("default_topic","test executeInTransaction");
   throw new RuntimeException("fail");
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 消费者 - 指定参数消费使用示例
```
/**
 *  ① id:消费者ID;
 *  ② groupId:消费组ID;
 *  ③ topics:监听的topic,可监听多个;
 *  ④ topicPartitions:可配置更加详细的监听信息,可指定topic、parition、offset监听。
 *
 *  监听default_topic的0号分区,同时监听other_topic的0号分区和other_topic的1号分区里面offset从8开始的消息。
 */
@KafkaListener(id = "consumer1",groupId = "sofast-group",topicPartitions = {
        @TopicPartition(topic = "default_topic", partitions = { "0" }),
        @TopicPartition(topic = "other_topic", partitions = "0", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "8"))
})
```
1
2
3
4
5
6
7
8
9
10
11
12
13
14
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 消费者 - 批量消费使用示例
application.yml加入配置:

# 设置批量消费
spring.kafka.listener.type=batch
# 批量消费每次最多消费多少条消息
spring.kafka.consumer.max-poll-records=50
/**
 * 接收消息时用List来接收
 */
@KafkaListener(id = "consumer2",groupId = "sofast-group", topics = "default_topic")
public void onMessage2(List<ConsumerRecord<?, ?>> records) {
    System.out.println(">>>批量消费一次,records.size()="+records.size());
    for (ConsumerRecord<?, ?> record : records) {
        System.out.println(record.value());
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 消费者 - 异常处理器
/**
 * 通过异常处理器,我们可以处理consumer在消费时发生的异常。
 * 新建一个异常处理器,用@Bean注入
 */
@Bean
public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() {
    return (message, exception, consumer) -> {
        System.out.println("消费异常:"+message.getPayload());
        return null;
    };
}

/**
 * 将这个异常处理器的BeanName放到@KafkaListener注解的errorHandler属性里面
 */
@KafkaListener(topics = {"default_topic"},errorHandler = "consumerAwareErrorHandler")
public void onMessage3(ConsumerRecord<?, ?> record) throws Exception {
    throw new Exception("简单消费-模拟异常");
}

/**
 * 批量消费也一样,异常处理器的message.getPayload()也可以拿到各条消息的信息
 */
@KafkaListener(topics = "default_topic",errorHandler="consumerAwareErrorHandler")
public void onMessage4(List<ConsumerRecord<?, ?>> records) throws Exception {
    System.out.println("批量消费一次...");
    throw new Exception("批量消费-模拟异常");
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 消费者 - 消息过滤器
/**
 * 消息过滤器可以在消息抵达consumer之前被拦截,在实际应用中,我们可以根据自己的业务逻辑,筛选出需要的信息再交由KafkaListener处理,不需要的消息则过滤掉。
 * 配置消息过滤只需要为 监听器工厂 配置一个RecordFilterStrategy(消息过滤策略),返回true的时候消息将会被抛弃,返回false时,消息能正常抵达监听容器。
 */
@Component
public class KafkaConsumer {
    @Autowired
    ConsumerFactory consumerFactory;

    /**
     * 消息过滤器
     */
    @Bean
    public ConcurrentKafkaListenerContainerFactory filterContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory);
        // 被过滤的消息将被丢弃
        factory.setAckDiscarded(true);
        // 消息过滤策略(过滤奇数和偶数)
        factory.setRecordFilterStrategy(consumerRecord -> {
            if (Integer.parseInt(consumerRecord.value().toString()) % 2 == 0) {
                return false;
            }
            //返回true消息则被过滤
            return true;
        });
        return factory;
    }

    /**
     * 消息过滤监听
     */
    @KafkaListener(topics = {"default_topic"},containerFactory = "filterContainerFactory")
    public void onMessage5(ConsumerRecord<?, ?> record) {
        System.out.println(record.value());
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# 消费者 - 消息转发
/**
 * 在实际开发中,我们可能有这样的需求,应用A从TopicA获取到消息,经过处理后转发到TopicB,再由应用B监听处理消息,即一个应用处理完成后将该消息转发至其他应用,完成消息的转发。
 */
@KafkaListener(topics = {"default_topic"})
@SendTo("other_topic")
public String onMessage6(ConsumerRecord<?, ?> record) {
    return record.value()+"-forward message";
}
1
2
3
4
5
6
7
8
1
2
3
4
5
6
7
8

# 附件:sf-kafka.properties

###########【Kafka集群】###########
spring.kafka.bootstrap-servers=ip:port,ip:port
spring.kafka.security.protocol=SASL_PLAINTEXT
spring.kafka.security.sasl.mechanism=GSSAPI
spring.kafka.security.sasl.kerberos.service.name=kafka

###########【初始化Topic配置】###########
# topic名称
spring.kafka.topic=test
# topic分区数量
spring.kafka.topic.numPartitions=2
# topic副本数量
spring.kafka.topic.replicationFactor=1

###########【初始化生产者配置】###########
# 重试次数
spring.kafka.producer.retries=0
# 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
spring.kafka.producer.acks=1
# 批量大小
spring.kafka.producer.batch-size=16384
# 提交延时
spring.kafka.producer.properties.linger.ms=0
# 当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
# linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了

# 生产端缓冲区大小
spring.kafka.producer.buffer-memory = 33554432
# Kafka提供的序列化和反序列化类
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 自定义分区器
# spring.kafka.producer.properties.partitioner.class=xxxxxx.CustomizePartitioner

###########【初始化消费者配置】###########
# 默认的消费组ID
spring.kafka.consumer.properties.group.id=defaultConsumerGroup
# 是否自动提交offset
spring.kafka.consumer.enable-auto-commit=true
# 提交offset延时(接收到消息后多久提交offset)
spring.kafka.consumer.auto.commit.interval.ms=1000
# 当kafka中没有初始offset或offset超出范围时将自动重置offset
# earliest:重置为分区中最小的offset;
# latest:重置为分区中最新的offset(消费分区中新产生的数据);
# none:只要有一个分区不存在已提交的offset,就抛出异常;
spring.kafka.consumer.auto-offset-reset=latest
# 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
spring.kafka.consumer.properties.session.timeout.ms=120000
# 消费请求超时时间
spring.kafka.consumer.properties.request.timeout.ms=180000
# Kafka提供的序列化和反序列化类
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 消费端监听的topic不存在时,项目启动会报错(关掉)
spring.kafka.listener.missing-topics-fatal=false
# 设置批量消费
# spring.kafka.listener.type=batch
# 批量消费每次最多消费多少条消息
# spring.kafka.consumer.max-poll-records=50
# 消费者线程并发数
spring.kafka.consumer.currency=2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61

# 多租户

so-fast 框架支持平台级的多租户功能(schema 隔离或数据库隔离)

# 组件介绍:

框架基于 dynamic-datasource-spring-boot-starter 组件来实现的多租户的动态数据源切换功能。目前支持 schema 级别和数据库级别的数据隔离。

dynamic-datasource-spring-boot-starter 是一个基于 springboot 的快速集成多数据源的启动器。

源码地址 - github: https://github.com/baomidou/dynamic-datasource-spring-boot-starter

# 实现原理:

在租户创建时根据模板生成对应的数据库,表和初始数据,并以 SoFast_tenant_DS: 租户 ID: 服务名为 KEY 将相应的数据源信息缓存在 redis 中。

当租户请求对应的服务时,根据请求头里的租户 ID 获取对应的数据源信息,创建或获取数据源,并将其设定其为当前的数据源。

# 使用方式:

  • 自己工程的 POM 文件里导入以下配置
<dependency>
  <groupId>com.sofast.cloud</groupId>
  <artifactId>so-fast-ds-tenant-starter</artifactId>
</dependency>
1
2
3
4
1
2
3
4
  • so-fast-tenant 是租户管理工程,其中 bootstrap.yml 里需要按照如下图的格式来配置多租户的服务

    ![img](file:///C:/work/Project/so-fast/book_so-fast-cloud/assets/backend-dev/tenant1.png)

    其中 serviceName 是服务名称,schemaName 是默认的该服务建库名称,目前多租户服务只支持一个服务一个库。

    template 下存放涉及租户服务对应的建表 SQL 文,文件名以服务名称来命名。

  • so-fast-tenant 工程只提供了基础的租户管理内容,需要业务和开发人员根据实际需求将企业和租户绑定。

实现原理是实现一个 fliter 来拦截 web 请求,根据请求头里的 tenantId 来切换租户的数据源,保证同一个服务支持多数据库的租户模式。所以需要进行租户管理的业务请求头都需要一个 tenantId 的参数。

# 页面功能介绍:

租户管理页面,首先添加租户,成功后在对租户配置数据源

# 列表功能页

img

  1. 模糊搜索租户数据

  2. 添加租户

  3. 租户状态修改

  4. 列表中操作事情

    • 查看

    • 配置数据源

    • 修改租户

    • 删除租户

      # 模态事件

  5. 新增租户

    • 状态默认为禁用
    • 必填项为租户 ID 和租户名称
    • 规则
      • 租户 ID: 不能包含汉字,只能是字母、数字或 - 组成
  6. 配置数据源 img

    • 测试
      • 单独测试 当数据为默认数据源时则可以测试按钮测试, 测试成功后不能再次进行修改
      • 批量测试 选择每个服务名称前 checkbox 后,测试选中数据是否成功!成功后不能再此进行修改
    • 删除 根据租户 ID 删除当前数据源数据,删除后重新获取数据源列表数据,只有当是租户 ID 下的数据源数据才会显示此功能按钮
    • 建库 当测试完成后的数据源数才可以建库,没有完成测试,先处理完成所选择的数据源完成测试后再进行建库事件处理
  7. 删除租户 img 删除租户: 根据当前行 ID

# 注意事项:
  1. 在创建租户的数据源时数据源 schema 的内容开默认填充的,可以修改。如果不修改会创建默认的库名:默认 schema+“_”+ 租户 ID。
  2. 如果建库的用户名和密码不填写,会默认使用业务的用户名密码来建库,请注意给予相应的数据库权限,以防止建库失败。
  3. 如果有不需要进行租户数据源切换的业务,请收集对应的请求路径。在自己的工程里实现 SecurityConstants.java 接口重新定义 STATIC_RESOURCE 的路径内容。
  4. 如果提示获取 "当前租户的数据源情报无法获取,请联系管理员!" 的消息,可以试一下数据源页面的刷新按钮,其作用是将该租户的数据源信息重新导入到 redis 缓存中。

# 工作流

工作流是指一类能够完全自动执行的经营过程,根据一系列过程规则,将文档、信息或任务在不同的执行者之间进行传递与执行。说的直白一点工作流就是封装好的一种框架,我们利用这种框架来解决需要多个人或者多个部门协同完成的某项工作。so-fast 基于 Flowable 6.6 实现协同流程,支持在线实时 modeler 设计流程图,流程节点发起 处理,及其流程节点的图形化查看功能。

# 流程设计
# 流程基本信息

img

  • 流程编码:全局唯一编码
  • 流程完成回调:流程正常结束时回调地址
    • 服务名称:Nacos 注册中心中的服务名称
  • 流程终止回调:流程取消或流程终止时回调地址
# 节点基本信息

img

  • 通知方式为邮件通知,暂时仅支持待办通知和流程完成通知,抄送人通知暂不支持

    • 待办通知:发送邮件至办理人,提醒处理待办

    • 完成通知:发送邮件至申请人,提醒申请已结束

    • 流程通知开关配置:true 为开启通知,false 为关闭通知,默认为关闭通知

      sofast:
       #工作流通知
       flowable:
         notice-enable: true
      
      1
      2
      3
      4
      1
      2
      3
      4
  • 表单组件地址:创建业务信息的地址

  • 详情组件地址:显示业务信息的地址

# 节点事件监听

img

事件监听仅支持执行事件,在任务开始前或完成后调用配置的请求地址

流程设计完成后,点击发布按钮发布流程。

# 后台使用方式

自己工程的 POM 文件里导入以下配置

<!-- 工作流模块依赖 -->
<dependency>
    <groupId>com.sofast.cloud</groupId>
    <artifactId>so-fast-flowable-sdk</artifactId>
</dependency>
1
2
3
4
5
1
2
3
4
5

工作流模块依赖提供如下功能:

  • 流程实例开始接口
  • 流程任务驳回后再申请接口
  • 流程任务审批同意接口
  • 流程任务审批拒绝接口
上次更新: 2025/04/23, 09:03:53

← 项目介绍 开发规范→

Theme by | Copyright © -2025
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式