首页 星云 工具 资源 星选 资讯 热门工具
:

PDF转图片 完全免费 小红书视频下载 无水印 抖音视频下载 无水印 数字星空

EntityFramework Core并发迁移解决方案

编程知识
2024年09月19日 14:31

场景

目前一个项目中数据持久化采用EF Core + MySQL,使用CodeFirst模式开发,并且对数据进行了分库,按照目前颗粒度分完之后,大概有一两百个库,每个库的数据都是相互隔离的。
借鉴了Github上一个开源的仓库 arch/UnitOfWork 实现UnitOfWork,核心操作就是每个api请求的时候带上库名,在执行CRUD之前先将DbContext切换到目标数据库,我们在切换数据库的时候加了一些操作,如检查数据库是否已创建、检查连接是否可用、判断是否需要 表结构迁移

/// <summary>
/// 切换数据库 这要求数据库在同一台机器上 注意:这只适用于MySQL。
/// </summary>
/// <param name="database">目标数据库</param>
public void ChangeDatabase(string database)
{
    // 检查连接
    ......

    // 检查数据库是否创建
    ......

    var connection = _context.Database.GetDbConnection();
    if (connection.State.HasFlag(ConnectionState.Open))
    {
        connection.ChangeDatabase(database);
    }
    else
    {
        var connectionString = Regex.Replace(connection.ConnectionString.Replace(" ", ""), @"(?<=[Dd]atabase=)\w+(?=;)", database, RegexOptions.Singleline);
        connection.ConnectionString = connectionString;
    }

    // 判断是否需要执行表结构迁移
    if(_context..Database.GetPendingMigrations().Any())
    {
        //自定义的迁移的一些逻辑
        _context.Database.Migrate(_context);
    }
}       

但是当多个操作同时对一个库进行Migrate的时候,就会出现问题,比如“新增一张表”的操作已经被第一个迁移执行过了,第二个执行的迁移并不知道已经执行过了Migrate,就会报错表已存在。
于是考虑在执行Migrate的时候,加入一个锁的机制,对当前数据库执行Migrate之前先获取锁,然后再来决定接下来的操作。由于这边有的服务无法访问Redis,这里使用数据库来实现锁的机制,当然用Redis来实现更好,加入锁的机制只是一种解决问题的思路。

利用数据库实现迁移锁

1. 新增 MigrationLocks 表来实现迁移锁

  • 锁的操作不依赖DbContext实例
  • 在执行Migrate之前,尝试获取一个锁,在获取锁之前,如果表不存在则创建
    CREATE TABLE IF NOT EXISTS MigrationLocks (
        LockName VARCHAR(255) PRIMARY KEY,
        LockedAt DATETIME NOT NULL
    );
    
  • 成功往表中插入一条记录,视为获取锁成功,主键为需要迁移的库的名称
    INSERT INTO MigrationLocks (LockName, LockedAt) VALUES (@database, NOW());
    
  • 迁移完成后,删除这条记录,视为释放锁成功;
    DELETE FROM MigrationLocks WHERE LockName = @database;
    
  • 为防止 “死锁” 发生,每次尝试获取锁之前,会对锁的状态进行检查,释放超过5分钟的锁(正常来说,上一个迁移的执行时间不会超过5分钟)。
    SELECT COUNT(*) FROM MigrationLocks WHERE LockName = @database AND LockedAt > NOW() - INTERVAL 5 MINUTE;
    

2. 封装一下MigrateLock的实现

/// <summary>
/// 迁移锁
/// </summary>
public interface IMigrateLock
{
    /// <summary>
    /// 尝试获取锁
    /// </summary>
    /// <param name="connection"></param>
    /// <returns></returns>
    bool TryAcquireLock(IDbConnection connection);

    /// <summary>
    /// 尝试获取锁
    /// </summary>
    /// <param name="connection"></param>
    /// <returns></returns>
    Task<bool> TryAcquireLockAsync(IDbConnection connection);

    /// <summary>
    /// 释放锁
    /// </summary>
    void ReleaseLock(IDbConnection connection);

    /// <summary>
    /// 释放锁
    /// </summary>
    /// <returns></returns>
    Task ReleaseLockAsync(IDbConnection connection);
}

/// <summary>
/// 迁移锁
/// </summary>
public class MigrateLock : IMigrateLock
{
    private readonly ILogger<MigrateLock> _logger;

    public MigrateLock(ILogger<MigrateLock> logger)
    {
        _logger = logger;
    }

    private const string CreateTableSql = @"
        CREATE TABLE IF NOT EXISTS MigrationLocks (
            LockName VARCHAR(255) PRIMARY KEY,
            LockedAt DATETIME NOT NULL
        );";

    private const string CheckLockedSql = "SELECT COUNT(*) FROM MigrationLocks WHERE LockName = @database AND LockedAt > NOW() - INTERVAL 5 MINUTE;";

    private const string AcquireLockSql = "INSERT INTO MigrationLocks (LockName, LockedAt) VALUES (@database, NOW());";

    private const string ReleaseLockSql = "DELETE FROM MigrationLocks WHERE LockName = @database;";

    /// <summary>
    /// 尝试获取锁
    /// </summary>
    /// <param name="connection"></param>
    /// <returns></returns>
    public bool TryAcquireLock(IDbConnection connection)
    {
        try  
        {
            CheckLocked(connection);

            var result = connection.Execute(AcquireLockSql, new { database = connection.Database });
            if (result == 1)
            {
                _logger.LogInformation("Lock acquired: {LockName}", connection.Database);

                return true;
            }

            _logger.LogWarning("Failed to acquire lock: {LockName}", connection.Database);

            return false;
        }
        catch (Exception ex)
        {
            if (ex.Message.StartsWith("Duplicate"))
            {
                _logger.LogWarning("Failed acquiring lock due to duplicate entry: {LockName}", connection.Database);
            }
            else
            {
                _logger.LogError(ex, "Error acquiring lock: {LockName}", connection.Database);
            }

            return false;
        }
    }

    /// <summary>
    /// 尝试获取锁
    /// </summary>
    /// <param name="connection"></param>
    /// <returns></returns>
    public async Task<bool> TryAcquireLockAsync(IDbConnection connection)
    {
        try
        {
            await CheckLockedAsync(connection);

            var result = await connection.ExecuteAsync(AcquireLockSql, new { database = connection.Database });
            if (result == 1)
            {
                _logger.LogInformation("Lock acquired: {LockName}", connection.Database);

                return true;
            }

            _logger.LogWarning("Failed to acquire lock: {LockName}", connection.Database);

            return false;
        }
        catch (Exception ex)
        {
            if (ex.Message.StartsWith("Duplicate"))
            {
                _logger.LogWarning("Failed acquiring lock due to duplicate entry: {LockName}", connection.Database);
            }
            else
            {
                _logger.LogError(ex, "Error acquiring lock: {LockName}", connection.Database);
            }

            return false;
        }
    }

    /// <summary>
    /// 释放锁
    /// </summary>
    public void ReleaseLock(IDbConnection connection)
    {
        try
        {
            connection.ExecuteAsync(ReleaseLockSql, new { database = connection.Database });
            _logger.LogInformation("Lock released: {LockName}", connection.Database);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Error releasing lock: {LockName}", connection.Database);
        }
    }

    /// <summary>
    /// 释放锁
    /// </summary>
    public async Task ReleaseLockAsync(IDbConnection connection)
    {
        try
        {
            await connection.ExecuteAsync(ReleaseLockSql, new { database = connection.Database });
            _logger.LogInformation("Lock released: {LockName}", connection.Database);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Error releasing lock: {LockName}", connection.Database);
        }
    }

    /// <summary>
    /// 检查锁
    /// </summary>
    private void CheckLocked(IDbConnection connection)
    {
        connection.Execute(CreateTableSql); 

        var databaseParam = new
        {
            database = connection.Database
        };

        var lockExists = connection.QueryFirstOrDefault<int>(CheckLockedSql, databaseParam);
        if (lockExists <= 0)
        {
            return;
        }

        _logger.LogWarning("Lock exists and is older than 5 minutes. Releasing old lock.");
        connection.Execute(ReleaseLockSql, databaseParam);
    }

    /// <summary>
    /// 检查锁
    /// </summary>
    private async Task CheckLockedAsync(IDbConnection connection)
    {
        await connection.ExecuteAsync(CreateTableSql);
         
        var databaseParam = new
        {
            database = connection.Database
        };

        var lockExists = await connection.QueryFirstOrDefaultAsync<int>(CheckLockedSql, databaseParam);
        if (lockExists <= 0)
        {
            return;
        }

        _logger.LogWarning("Lock exists and is older than 5 minutes. Releasing old lock.");
        await connection.ExecuteAsync(ReleaseLockSql, databaseParam);
    }
}

3. 封装一下MigrateExecutor的实现

/// <summary>
/// 数据库迁移执行器
/// </summary>
public interface IMigrateExcutor
{
    /// <summary>
    /// 执行迁移
    /// </summary>
    /// <param name="dbContext"></param>
    void Migrate(DbContext dbContext);

    /// <summary>
    /// 执行迁移
    /// </summary>
    /// <param name="dbContext"></param>
    /// <returns></returns>
    Task MigrateAsync(DbContext dbContext);

    /// <summary>
    /// 并发场景执行迁移
    /// </summary>
    /// <param name="dbContext"></param>
    /// <param name="wait">是否等待至正在进行中的迁移完成</param>
    void ConcurrentMigrate(DbContext dbContext, bool wait = true);
     
    /// <summary>
    /// 并发场景执行迁移
    /// </summary>
    /// <param name="dbContext"></param>
    /// <param name="wait">是否等待至正在进行中的迁移完成</param>
    /// <returns></returns>
    Task ConcurrentMigrateAsync(DbContext dbContext, bool wait = true);

    /// <summary>
    /// 并发场景执行迁移
    /// </summary>
    /// <param name="dbContext"></param>
    /// <param name="connection"></param>
    /// <param name="wait">是否等待至正在进行中的迁移完成</param>
    void ConcurrentMigrate(DbContext dbContext, IDbConnection connection, bool wait = true);

    /// <summary>
    /// 并发场景执行迁移
    /// </summary>
    /// <param name="dbContext"></param>
    /// <param name="connection"></param>
    /// <param name="wait">是否等待至正在进行中的迁移完成</param>
    Task ConcurrentMigrateAsync(DbContext dbContext, IDbConnection connection, bool wait = true);
}

/// <summary>
/// 数据库迁移执行器
/// </summary>
public class MigrateExcutor : IMigrateExcutor
{
    private readonly IMigrateLock _migrateLock;
    private readonly ILogger<MigrateExcutor> _logger;

    public MigrateExcutor(
        IMigrateLock migrateLock,
        ILogger<MigrateExcutor> logger)
    {
        _migrateLock = migrateLock;
        _logger = logger;
    }

    /// <summary>
    /// 执行迁移
    /// </summary>
    /// <param name="dbContext"></param>
    /// <returns></returns>
    public void Migrate(DbContext dbContext)
    {
        try
        {
            if (dbContext.Database.GetPendingMigrations().Any())
            {
                dbContext.Database.Migrate();
            }
        }
        catch (Exception e)
        {
            _logger.LogError(e, "Migration failed");

            HandleError(dbContext, e);
        }
    }

    /// <summary>
    /// 执行迁移
    /// </summary>
    /// <param name="dbContext"></param>
    /// <returns></returns>
    public async Task MigrateAsync(DbContext dbContext)
    {
        try
        {
            if ((await dbContext.Database.GetPendingMigrationsAsync()).Any())
            {
                await dbContext.Database.MigrateAsync();
            }
        }
        catch (Exception e)
        {
            _logger.LogError(e, "Migration failed");

            await HandleErrorAsync(dbContext, e);
        }
    }

    /// <summary>
    /// 并发场景执行迁移
    /// </summary>
    /// <param name="dbContext"></param>
    /// <param name="wait">是否等待至正在进行中的迁移完成</param>
    /// <returns></returns>
    public void ConcurrentMigrate(DbContext dbContext, bool wait = true)
    {
        if (!dbContext.Database.GetPendingMigrations().Any())
        {
            return;
        }

        using var connection = MySqlConnectionHelper.CreateConnection(dbContext.Database.GetDbConnection().Database);

        ConcurrentMigrate(dbContext, connection, wait);
    }

    /// <summary>
    /// 并发场景执行迁移
    /// </summary>
    /// <param name="dbContext"></param>
    /// <param name="wait">是否等待至正在进行中的迁移完成</param>
    /// <returns></returns>
    public async Task ConcurrentMigrateAsync(DbContext dbContext, bool wait = true)
    {
        if ((await dbContext.Database.GetPendingMigrationsAsync()).Any())
        {
            return;
        }

        await using var connection = await MySqlConnectionHelper.CreateConnectionAsync(dbContext.Database.GetDbConnection().Database);

        await ConcurrentMigrateAsync(dbContext, connection, wait);
    }

    /// <summary>
    /// 并发场景执行迁移(供数据同步相关服务使用,”迁移锁“ 使用传入的 <see cref="IDbConnection"/> 对象来完成)
    /// </summary>
    /// <param name="dbContext"></param>
    /// <param name="connection"></param>
    /// <param name="wait">是否等待至正在进行中的迁移完成</param>
    public void ConcurrentMigrate(DbContext dbContext, IDbConnection connection, bool wait = true)
    {
        if (!dbContext.Database.GetPendingMigrations().Any())
        {
            return;
        }

        while (true)
        {
            if (_migrateLock.TryAcquireLock(connection))
            {
                try
                {
                    Migrate(dbContext);

                    break;
                }
                finally
                {
                    _migrateLock.ReleaseLock(connection);
                }
            }

            if (wait)
            {
                _logger.LogWarning("Migration is locked, wait for 2 seconds");
                Thread.Sleep(20000);

                continue;
            }

            _logger.LogInformation("Migration is locked, skip");
        }
    }

    /// <summary>
    /// 并发场景执行迁移(供数据同步相关服务使用,”迁移锁“ 使用传入的 <see cref="IDbConnection"/> 对象来完成)
    /// </summary>
    /// <param name="dbContext"></param>
    /// <param name="connection"></param>
    /// <param name="wait">是否等待至正在进行中的迁移完成</param>
    public async Task ConcurrentMigrateAsync(DbContext dbContext, IDbConnection connection, bool wait = true)
    {
        if ((await dbContext.Database.GetPendingMigrationsAsync()).Any())
        {
            return;
        }

        while (true)
        {
            if (await _migrateLock.TryAcquireLockAsync(connection))
            {
                try
                {
                    await MigrateAsync(dbContext);
                    break;
                }
                finally
                {
                    await _migrateLock.ReleaseLockAsync(connection);
                }
            }

            if (wait)
            {
                _logger.LogWarning("Migration is locked, wait for 2 seconds");
                Thread.Sleep(20000);

                continue;
            }

            _logger.LogInformation("Migration is locked, skip");

            break;
        }
    }

    private void HandleError(DbContext dbContext, Exception e)
    {
        var needChangeList = dbContext.Database.GetPendingMigrations().ToList();
        var allChangeList = dbContext.Database.GetMigrations().ToList();
        var hasChangeList = dbContext.Database.GetAppliedMigrations().ToList();

        if (needChangeList.Count + hasChangeList.Count > allChangeList.Count)
        {
            int errIndex = allChangeList.Count - needChangeList.Count;

            if (hasChangeList.Count - 1 == errIndex && hasChangeList[errIndex] != needChangeList[0])
            {
                int index = needChangeList[0].IndexOf("_", StringComparison.Ordinal);
                string errSuffix = needChangeList[0].Substring(index, needChangeList[0].Length - index);
                if (hasChangeList[errIndex].EndsWith(errSuffix))
                {
                    dbContext.Database.ExecuteSqlRaw($"Update __EFMigrationsHistory set MigrationId = '{needChangeList[0]}' where MigrationId = '{hasChangeList[errIndex]}'");
                    dbContext.Database.Migrate();
                }
                else
                {
                    throw e;
                }
            }
            else
            {
                throw e;
            }
        }
        else
        {
            throw e;
        }

        _logger.LogInformation("Migration failed, but success on second try.");
    }

    private async Task HandleErrorAsync(DbContext dbContext, Exception e)
    {
        var needChangeList = (await dbContext.Database.GetPendingMigrationsAsync()).ToList();
        var allChangeList = dbContext.Database.GetMigrations().ToList();
        var hasChangeList = (await dbContext.Database.GetAppliedMigrationsAsync()).ToList();

        if (needChangeList.Count + hasChangeList.Count > allChangeList.Count)
        {
            int errIndex = allChangeList.Count - needChangeList.Count;

            if (hasChangeList.Count - 1 == errIndex && hasChangeList[errIndex] != needChangeList[0])
            {
                int index = needChangeList[0].IndexOf("_", StringComparison.Ordinal);
                string errSuffix = needChangeList[0].Substring(index, needChangeList[0].Length - index);
                if (hasChangeList[errIndex].EndsWith(errSuffix))
                {
                    await dbContext.Database.ExecuteSqlRawAsync($"Update __EFMigrationsHistory set MigrationId = '{needChangeList[0]}' where MigrationId = '{hasChangeList[errIndex]}'");
                    await dbContext.Database.MigrateAsync();
                }
                else
                {
                    throw e;
                }
            }
            else
            {
                throw e;
            }
        }
        else
        {
            throw e;
        }

        _logger.LogInformation("Migration failed, but success on second try.");
    }
}
From:https://www.cnblogs.com/Tangtang1997/p/18420640
本文地址: http://shuzixingkong.net/article/2119
0评论
提交 加载更多评论
其他文章 Log4j2—漏洞分析(CVE-2021-44228)
目录Log4j2漏洞原理漏洞根因调用链源码分析调用链总结漏洞复现dnsrmi Log4j2漏洞原理 前排提醒:本篇文章基于我另外一篇总结的JNDI注入后写的,建议先看该文章进行简单了解JNDI注入: https://blog.csdn.net/weixin_60521036/article/deta
Log4j2—漏洞分析(CVE-2021-44228) Log4j2—漏洞分析(CVE-2021-44228) Log4j2—漏洞分析(CVE-2021-44228)
代码整洁之道--读书笔记(13)
代码整洁之道 简介: 本书是编程大师“Bob 大叔”40余年编程生涯的心得体会的总结,讲解要成为真正专业的程序员需要具备什么样的态度,需要遵循什么样的原则,需要采取什么样的行动。作者以自己以及身边的同事走过的弯路、犯过的错误为例,意在为后来者引路,助其职业生涯迈上更高台阶。 本书适合所有程序员阅读,
代码整洁之道--读书笔记(13) 代码整洁之道--读书笔记(13)
DLA:动态层级注意力架构,实现特征图的持续动态刷新与交互 | IJCAI'24
论文深入探讨了层级注意力与一般注意力机制之间的区别,并指出现有的层级注意力方法是在静态特征图上实现层间交互的。这些静态层级注意力方法限制了层间上下文特征提取的能力。为了恢复注意力机制的动态上下文表示能力,提出了一种动态层级注意力(DLA)架构。DLA包括双路径,其中前向路径利用一种改进的递归神经网络
DLA:动态层级注意力架构,实现特征图的持续动态刷新与交互 | IJCAI'24 DLA:动态层级注意力架构,实现特征图的持续动态刷新与交互 | IJCAI'24 DLA:动态层级注意力架构,实现特征图的持续动态刷新与交互 | IJCAI'24
云上分布式SQL Server,你值得拥有
云上分布式SQL Server,你值得拥有 介绍Microsoft SQL Azure 是微软的云关系型数据库,后端存储又称为云 SQL Server(Cloud SQL Server)。它构建在 SQL Server 之上,通过分布式技术提升传统关系型数据库的可扩展性和容错能力。 数据模型 (1)
云上分布式SQL Server,你值得拥有 云上分布式SQL Server,你值得拥有 云上分布式SQL Server,你值得拥有
【Abyss】Android 平台应用级系统调用拦截框架
Android 平台从上到下,无需 ROOT/解锁/刷机,应用级拦截框架的最后一环 —— SVC系统调用拦截。
【Abyss】Android 平台应用级系统调用拦截框架 【Abyss】Android 平台应用级系统调用拦截框架 【Abyss】Android 平台应用级系统调用拦截框架
使用梯度下降法实现多项式回归
使用梯度下降法实现多项式回归 实验目的 本实验旨在通过梯度下降法实现多项式回归,探究不同阶数的多项式模型对同一组数据的拟合效果,并分析样本数量对模型拟合结果的影响。 实验材料与方法 数据准备 生成训练样本:我们首先生成了20个训练样本,其中自变量X服从均值为0,方差为1的标准正态分布。因变量Y由下述
使用梯度下降法实现多项式回归 使用梯度下降法实现多项式回归 使用梯度下降法实现多项式回归
.NET全局静态可访问IServiceProvider(支持Blazor)
DependencyInjection.StaticAccessor 前言 如何在静态方法中访问DI容器长期以来一直都是一个令人苦恼的问题,特别是对于热爱编写扩展方法的朋友。之所以会为这个问题苦恼,是因为一个特殊的服务生存期——范围内(Scoped),所谓的Scoped就是范围内单例,最常见的Web
Java SE 23 新增特性
Java SE 23 新增特性 作者:Grey 原文地址: 博客园:Java SE 23 新增特性 CSDN:Java SE 23 新增特性 源码 源仓库: Github:java_new_features Primitive Types in Patterns, instanceof, and s