相关推荐recommended
Springboot MyBatis实现多数据源切换和主从复制(读写分离)
作者:mmseoamin日期:2023-12-25

简介

  本文主要通过文字和代码的方式讲解Springboot MyBatis如何实现多数据源切换和主从复制(读写分离),这里是通过从数据库数据源配置表中配置获取动态数据源配置信息,并实现数据源生成,切换,主从复制操作的。

技术实现主要原理是:AbstractRoutingDataSource+ThreadLocal+AOP的方式实现的。

AbstractRoutingDataSource:

AbstractRoutingDataSource是Spring  JDBC提供的一个数据源路由类,用于根据不同的数据源选择对应的目标数据源。

在一些复杂场景中,我们需要连接多个数据库,而这些数据库都有各自的数据源配置。而且还需要在运行时动态地根据某些条件选择具体的数据源,比如根据请求参数、用户权限等动态决定应该连接哪个数据库。

这时候就可以使用AbstractRoutingDataSource类来管理这些数据源,它提供了一种机制,可以根据某些条件选择合适的数据源,从而实现动态数据源的配置和管理。

具体来说,我们可以继承AbstractRoutingDataSource类,重写determineCurrentLookupKey方法,该方法根据当前请求或用户权限等信息动态地返回目标数据源路由的key,然后就可以根据这个key来选择对应的数据源进行连接操作。

ThreadLocal:

ThreadLocal是Java中一个非常实用的类,用于解决多线程情况下共享变量产生的问题。它的作用是提供线程内的局部变量,在多线程环境下每个线程互不影响,可以在任何需要的地方随时取出,而不需要额外的同步操作。其主要功能是为每个线程提供了一个独立的变量副本,每个线程都可以随意修改自己的变量副本,而不会影响到其他线程的变量副本。

ThreadLocal主要应用于多线程之间的信息隔离,例如:

1.  在多线程环境中,每个线程都有自己的数据域,例如线程池中,每个线程处理任务时需要有自己的数据域,ThreadLocal就非常适用于这种情况。

2.  线程处理任务时需要使用一些共享变量,但是这些变量是不可变的,使用ThreadLocal可以避免多线程中的竞争条件,提高程序的性能。

总而言之,ThreadLocal是Java中提供的一种线程内部的数据共享方式,它是一种线程安全的方式,在多线程环境下,使用ThreadLocal可以有效地保护共享数据的独立性。

AOP:

Java  AOP(Aspect-Oriented  Programming,面向切面编程)是一种编程范式,它强调横向抽取逻辑单元(cross-cutting  concerns)。常见的  cross-cutting  concerns  包括日志、事务、安全、缓存等。在传统的  OOP(Object-Oriented  Programming,面向对象编程)中,这些逻辑单元常常散落在各个类和方法中,使得代码变得复杂难懂,难以维护。

Java  AOP  提供了一种解决方案,它允许开发者将  cross-cutting  concerns  抽象为一些切面(aspect),并将这些切面应用到代码中的特定点上,如方法调用、对象创建、属性访问等。通过使用  Java  AOP,开发者可以将关注点分离出来,使得代码更加清晰和模块化。

Java  AOP  的实现方式有很多种,包括基于动态代理的  JDK  AOP,基于字节码增强的  AspectJ,基于注解的  Spring  AOP  等。这些框架和工具都提供了方便易用的API,使得开发者可以方便地实现  AOP。

代码

首先现在maven项目中引入mybatis的依赖,并配置好相关配置!!!

声明一个动态数据源的配置类,代码如下:

import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import javax.sql.DataSource;
/**
 * file:DynamicDataSourceConfig
 *
 * @author tarzan 
 */
@Configuration
public class DynamicDataSourceConfig {
    @Value("${spring.datasource.url}")
    private String defaultUrl;
    @Value("${spring.datasource.driverClassName:org.postgresql.Driver}")
    private String driverClassName;
    @Value("${spring.datasource.username}")
    private String defaultUsername;
    @Value("${spring.datasource.password}")
    private String defaultPassword;
    @Primary
    @Bean
    public DynamicRoutingDataSource dynamicDataSource() {
        DataSource defaultDataSource=DataSourceBuilder.create().url(defaultUrl).driverClassName(driverClassName).username(defaultUsername).password(defaultPassword).build();
        return new DynamicRoutingDataSource(defaultDataSource);
    }
}

这里读取spring数据库的默认配置:

spring: 
 datasource:
    url: jdbc:postgresql://${POSTGRES_HOST:119.167.159.211}:${POSTGRES_PORT:5432}/${POSTGRES_DATABASE:dzbz2_lgyz}
    username: ${POSTGRES_USERNAME:postgres}
    password: ${POSTGRES_PASSWORD:#5Rd!TC2CBA}

创建一个动态路由数据源类,代码如下:

import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;
import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import static org.springblade.dynamic.data.source.config.DataSourceContextHolder.DEFAULT_DATA_SOURCE;
/**
 * @author tarzan
 */
public class DynamicRoutingDataSource extends AbstractRoutingDataSource {
    public final ConcurrentHashMap dataSourcesMap = new ConcurrentHashMap<>();
    public DynamicRoutingDataSource(DataSource defaultDataSource) {
        super.setDefaultTargetDataSource(defaultDataSource);
        Map targetDataSources=new HashMap<>(1);
        targetDataSources.put(DEFAULT_DATA_SOURCE,defaultDataSource);
        super.setTargetDataSources(targetDataSources);
        super.afterPropertiesSet();
        this.dataSourcesMap.put(DEFAULT_DATA_SOURCE, defaultDataSource);
    }
    @Override
    protected Object determineCurrentLookupKey() {
        return DataSourceContextHolder.getDataSource();
    }
    public void addDataSource(DataSourceInfo dataSourceInfo,boolean join) {
        DataSource dataSource = createDataSource(dataSourceInfo);
        Map targetDataSources=new HashMap<>(1);
        if(join){
            targetDataSources.put(DEFAULT_DATA_SOURCE,dataSourcesMap.get(DEFAULT_DATA_SOURCE));
        }
        targetDataSources.put(dataSourceInfo.getName(),dataSource);
        super.setTargetDataSources(targetDataSources);
        super.afterPropertiesSet();
        dataSourcesMap.put(dataSourceInfo.getName(),dataSource);
    }
    public DataSource createDataSource(DataSourceInfo dataSourceInfo) {
        if(dataSourcesMap.containsKey(dataSourceInfo.getName())){
            return dataSourcesMap.get(dataSourceInfo.getName());
        }
        return DataSourceBuilder.create().url(dataSourceInfo.getUrl()).driverClassName(dataSourceInfo.getDriverClassName()).username(dataSourceInfo.getUserName()).password(dataSourceInfo.getPassword()).build();
    }
}

DataSourceInfo 数据库信息类,代码如下:

import lombok.AllArgsConstructor;
import lombok.Data;
@Data
@AllArgsConstructor
public class DataSourceInfo {
    private String name;
    private String driverClassName;
    private String url;
    private String userName;
    private String password;
}

创建一个数据源工具类DataSourceContextHolder,用数据源线程的切换,代码如下: 

public class DataSourceContextHolder {
    public static final String DEFAULT_DATA_SOURCE = "defaultDataSource";
    /**
     * 线程级别的私有变量
     */
    private static final ThreadLocal CURRENT_DATASOURCE_NAME = new ThreadLocal<>();
    public static String getDataSource() {
        return CURRENT_DATASOURCE_NAME.get();
    }
    /**
     * 设置数据源
     */
    public static void setDataSource(String datasourceId) {
        CURRENT_DATASOURCE_NAME.set(datasourceId);
    }
    /**
     * 删除数据源
     */
    public static void removeDataSource() {
        CURRENT_DATASOURCE_NAME.remove();
    }
    /**
     * 切换默认数据源
     */
    public static void switchDefaultDataSource() {
        CURRENT_DATASOURCE_NAME.set(DEFAULT_DATA_SOURCE);
    }
}

数据库数据源管理服务类,用于从数据库数据源配置表中读取相关数据源的配置信息,生成数据源,进行数据源切换,主从复制等操作。

import org.springblade.core.tool.utils.Func;
import org.springblade.dynamic.data.source.database.entity.Dbs;
import org.springblade.dynamic.data.source.database.service.IDbsService;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.text.MessageFormat;
import java.util.Objects;
/**
 * @author tarzan
 * @since JDK1.8
 */
@Service
public class DataSourceService {
    @Resource
    private IDbsService dbsService;
    @Resource
    private DynamicRoutingDataSource dynamicRoutingDataSource;
    public  void switchDataSource(Long dbsId) {
        if (dbsId != null && dbsId != 0) {
            DataSourceContextHolder.switchDefaultDataSource();
            Dbs dbs = dbsService.getById(Func.toLong(dbsId));
            switchDataSource(dbs);
        }
    }
    public void switchDataSource(Dbs dbs) {
            if (Func.isNull(dbs)) {
                throw new RuntimeException("未找到dbsId对应的数据库");
            }
            //创建数据源
            String url = MessageFormat.format("jdbc:postgresql://{0}:{1}/{2}", dbs.getIp(), dbs.getPort(), dbs.getDatabasename());
            DataSourceInfo dataSourceInfo = new DataSourceInfo(String.valueOf(dbs.getId()), "org.postgresql.Driver", url, dbs.getUsername(), dbs.getPassword());
            //加入多数据源列表
            dynamicRoutingDataSource.addDataSource(dataSourceInfo,false);
            DataSourceContextHolder.setDataSource(dataSourceInfo.getName());
    }
    public  void addTargetDataSource(Dbs dbs) {
        if (Func.isNull(dbs)) {
            throw new RuntimeException("未找到dbsId对应的数据库");
        }
        //创建数据源
        String url = MessageFormat.format("jdbc:postgresql://{0}:{1}/{2}", dbs.getIp(), dbs.getPort(), dbs.getDatabasename());
        DataSourceInfo dataSourceInfo = new DataSourceInfo("ms_"+ dbs.getId(), "org.postgresql.Driver", url, dbs.getUsername(), dbs.getPassword());
        dynamicRoutingDataSource.addDataSource(dataSourceInfo,true);
        DataSourceContextHolder.setDataSource(dataSourceInfo.getName());
    }
    public  void switchDataSource(String code) {
        if (Func.isNotBlank(code)) {
            DataSourceContextHolder.switchDefaultDataSource();
            Dbs dbs = getDbs(code);
            if (Objects.nonNull(dbs)) {
                switchDataSource(dbs);
            }else {
                throw new RuntimeException("未找到dbsCode对应的数据库");
            }
        }
    }
    public  void addTargetDataSource(String code) {
        if (Func.isNotBlank(code)) {
            DataSourceContextHolder.switchDefaultDataSource();
            Dbs dbs = getDbs(code);
            if (Objects.nonNull(dbs)) {
                addTargetDataSource(dbs);
            }else {
                throw new RuntimeException("未找到dbsCode对应的数据库");
            }
        }
    }
    private Dbs getDbs(String code) {
        if (Func.isNotBlank(code)) {
            return  dbsService.lambdaQuery().eq(Dbs::getIsDeleted, 0).eq(Dbs::getCode, code).last("limit 1").one();
        }else {
            return null;
        }
    }
    public synchronized void switchDefaultDataSource() {
        DataSourceContextHolder.switchDefaultDataSource();
    }

}

注: 该类中用的IDbsService 类 是数据库 数据源配置表的服务接口类,相关代码,由于文章长度限制,且实现比较简单,这里直接省略了。这里附加上Dbs类的数据库实体代码,方便大家设计及数据库表。

Dbs数据源配置表实体类,代码如下:

package org.springblade.dynamic.data.source.database.entity;
import com.baomidou.mybatisplus.annotation.TableName;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
import org.springblade.core.mp.base.BaseEntity;
@Data
@TableName("gis_standard_dbs")
@EqualsAndHashCode(callSuper = true)
@ApiModel(value = "Dbs对象", description = "Dbs对象")
public class Dbs extends BaseEntity {
    private static final long serialVersionUID = 1L;
    /**
     * 连接展示名称
     */
    @ApiModelProperty(value = "连接展示名称")
    private String nickname;
    /**
     * 数据库ip
     */
    @ApiModelProperty(value = "数据库ip")
    private String ip;
    /**
     * 用户名
     */
    @ApiModelProperty(value = "用户名")
    private String username;
    /**
     * 密码
     */
    @ApiModelProperty(value = "密码")
    private String password;
    /**
     * 数据库端口
     */
    @ApiModelProperty(value = "数据库端口")
    private String port;
    /**
     * 数据库名
     */
    @ApiModelProperty(value = "数据库名")
    private String databaseName;
    @ApiModelProperty(value = "唯一标识")
    private String code;
}

创建自定义注解,用于在相关的mapper或者service数据库操作事务中,指定连接的数据源。代码如下:

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface DbsAnnotation {
    String value() default "";
    boolean join() default false;
}

 自定义注解aop切面 DbsAspect,代码如下:

import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.annotation.*;
import org.springblade.core.tool.utils.Func;
import org.springblade.dynamic.data.source.annotations.DbsAnnotation;
import org.springblade.dynamic.data.source.config.DataSourceContextHolder;
import org.springblade.dynamic.data.source.config.DataSourceService;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
 * 业务
 *
 * @author gaoqiang
 * @version 1.0
 * @company 北斗天地股份有限公司
 * @copyright (c) China Bdtd Co.LTD.All rights reserved.
 * @date 2022/9/15 16:11
 * @since JDK1.8
 */
@Order(1)
@Aspect
@Component
@Slf4j
public class DbsAspect {
    @Resource
    private DataSourceService dataSourceService;
    @Pointcut("@annotation(org.springblade.dynamic.data.source.annotations.DbsAnnotation)")
    private void dbsAnnotation() {
    }
    @Before("@annotation(dbs)")
    public void record(DbsAnnotation dbs) {
            if (Func.isNotBlank(dbs.value())) {
                if(dbs.join()){
                    log.info("添加目标数据源 "+dbs.value());
                    dataSourceService.addTargetDataSource(dbs.value());
                }else {
                    log.info("切换数据源 "+dbs.value());
                    dataSourceService.switchDataSource(dbs.value());
                }
            }
    }
    @After("dbsAnnotation()")
    public void after() {
        log.info("切换为默认数据源");
        DataSourceContextHolder.switchDefaultDataSource();
    }
}

好了,到这里基本就大功告成了,下面就让我来讲解下如何使用吧!!

    @DbsAnnotation("master")
    public List listAll() {
       return this.list();
    }

@DbsAnnotation注解中的值master要和数据源配置表中的code字段的值对应,就可以在执行这个方法的时候,切换到master数据源对数据库进行增删改查的操作了,方法执行完毕,会自动切换为默认数据源。

    @DbsAnnotation(value = "data_center",join = true)
    public void dataSync(List tunnelAddList,Long orgId) {
        if(CollectionUtils.isNotEmpty(tunnelAddList)){
            baseMapper.deleteByOrgId(orgId);
            super.saveBatch(tunnelAddList);
        }
    }

 @DbsAnnotation(value = "data_center",join = true) 比上面的多个一个join=true,意思是在默认数据源上添加一个data_center的目标数据源,这样数据在进行增删改的时候,能把数据同时同步到目标数据源和data_center数据源的表中,前提这两个数据源的表要一摸一样!!,这样就实现主从复制了!!!