首页 星云 工具 资源 星选 资讯 热门工具
:

PDF转图片 完全免费 小红书视频下载 无水印 抖音视频下载 无水印 数字星空

在 SQLAlchemy 中实现数据处理的时候,实现表自引用、多对多、联合查询,有序id等常见的一些经验总结

编程知识
2024年08月26日 10:07

有时候,我们在使用SQLAlchemy操作某些表的时候,需要使用外键关系来实现一对多或者多对多的关系引用,以及对多表的联合查询,有序列的uuid值或者自增id值,字符串的分拆等常见处理操作。

1、在 SQLAlchemy 中定义具有嵌套 children 关系的表

要在 SQLAlchemy 中定义具有嵌套 children 关系的表,如表中包含 idpid 字段,可以使用 relationshipForeignKey 来建立父子关系。

首先,你需要定义一个模型类,其中包含 idpid 字段。id 是主键,pid 是指向父记录的外键。然后,你使用 relationship 来建立父子关系。

from sqlalchemy import Column, Integer, String, ForeignKey
from sqlalchemy.orm import relationship, declarative_base
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker

Base = declarative_base()

class DictTypeInfo(Base):
    __tablename__ = 'dict_type_info'
    
    id = Column(Integer, primary_key=True, index=True)
    name = Column(String, index=True)
    code = Column(String)
    remark = Column(String)
    seq = Column(Integer)
    pid = Column(Integer, ForeignKey('dict_type_info.id'))  # 外键指向父节点的 id

    # 定义 parent 关系
    parent = relationship("DictTypeInfo", remote_side=[id], back_populates="children")

    # 定义 children 关系
    children = relationship("DictTypeInfo", back_populates="parent")

例子使用代码如下所示。

# 创建异步引擎和会话
DATABASE_URL = "mysql+asyncmy://username:password@localhost/mydatabase"
engine = create_async_engine(DATABASE_URL, echo=True)
AsyncSessionLocal = sessionmaker(bind=engine, class_=AsyncSession, expire_on_commit=False)

async def init_db():
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)

# 示例:如何插入数据并进行查询
async def example_usage():
    async with AsyncSessionLocal() as session:
        async with session.begin():
            # 插入数据
            parent_node = DictTypeInfo(name="Parent", code="P001", remark="Parent Node", seq=1)
            child_node1 = DictTypeInfo(name="Child1", code="C001", remark="First Child", seq=1, parent=parent_node)
            child_node2 = DictTypeInfo(name="Child2", code="C002", remark="Second Child", seq=2, parent=parent_node)
            session.add(parent_node)
            session.add(child_node1)
            session.add(child_node2)
            
        # 查询数据
        async with session.begin():
            result = await session.execute(
                "SELECT * FROM dict_type_info WHERE pid IS NULL"
            )
            parent_nodes = result.scalars().all()
            for node in parent_nodes:
                print(f"Parent Node: {node.name}, Children: {[child.name for child in node.children]}")

代码说明

  1. 定义模型类 (DictTypeInfo):

    • id: 主键。
    • pid: 外键,指向同一个表的 id,表示父节点。
    • parent: 父关系,通过 remote_side 设定本模型的外键指向自身的主键。
    • children: 子关系,back_populates 用于双向关系的映射。
  2. 创建异步引擎和会话:

    • 使用 create_async_engineAsyncSession 创建数据库引擎和会话,以支持异步操作。
  3. 插入和查询数据:

    • 插入数据示例展示了如何创建父节点和子节点,并将子节点关联到父节点。
    • 查询数据示例展示了如何查询所有父节点以及它们的子节点。

注意事项

  • remote_side: 在 relationship 中,remote_side 是指定哪些字段是远程的一方(即子节点关系的目标)。
  • 确保在模型中定义了正确的外键约束。在你提供的模型中,pid 列需要指向同一表中的 id 列。确保 ForeignKey 设置正确。
  • 异步操作: 使用 AsyncSessionasyncio 进行异步数据库操作。
  • 创建表: 在初始化数据库时,确保表结构是正确的。

要使用 selectinload 加载某个 pid 下的对象及其子列表,可以通过 SQLAlchemy 的 selectinload 来优化加载子关系。selectinload 可以减少 SQL 查询的数量,特别是在加载具有层次结构的数据时。

async def get_tree(pid: int):
    async with AsyncSessionLocal() as session:
        # 通过 selectinload 加载所有子节点
        stmt = select(DictTypeInfo).filter(DictTypeInfo.pid == pid).options(selectinload(DictTypeInfo.children))
        result = await session.execute(stmt)
        nodes = result.scalars().all()
        
        return nodes

这样,调用 get_tree 函数获取指定 pid 的节点及其子节点,代码如下。

async def example_usage():
    nodes = await get_tree(pid=1)
    for node in nodes:
        print(f"Node: {node.name}, Children: {[child.name for child in node.children]}")

selectinload: selectinload 可以减少 N+1 查询问题,它通过一条额外的查询来加载相关对象。这适合用于层次结构数据的加载。通过这种方式,你可以使用 SQLAlchemy 的 selectinload 来高效地加载具有父子关系的对象,并优化数据库查询性能。

 

同样,我们在 SQLAlchemy 中实现多对多关系也是类似的处理方式。

 在 SQLAlchemy 中,实现多对多关系通常需要创建一个关联表(association table),该表将存储两个相关联表的外键,从而实现多对多关系。以下是一个实现多对多关系的详细步骤。

1) 定义多对多关系的关联表

首先,需要定义一个关联表,该表包含两个外键,分别指向两端的主表。这通常使用 Table 对象来实现。

from sqlalchemy import Table, Column, Integer, ForeignKey
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

association_table = Table('association', Base.metadata,
    Column('left_id', Integer, ForeignKey('left_table.id')),
    Column('right_id', Integer, ForeignKey('right_table.id'))
)

在这个例子中,association_table 是一个包含两个外键的中间表:left_idright_id 分别指向 left_tableright_table 的主键。

2)定义两端的模型并添加关系

在两端的模型中,使用 relationship 来定义多对多关系,并指定 secondary 参数为关联表。

from sqlalchemy.orm import relationship

class LeftModel(Base):
    __tablename__ = 'left_table'

    id = Column(Integer, primary_key=True)
    name = Column(String(50))
    rights = relationship("RightModel", secondary=association_table, back_populates="lefts")

class RightModel(Base):
    __tablename__ = 'right_table'

    id = Column(Integer, primary_key=True)
    name = Column(String(50))
    lefts = relationship("LeftModel", secondary=association_table, back_populates="rights")
  • rightsLeftModel 中定义的关系属性,它将连接到 RightModel
  • leftsRightModel 中定义的关系属性,它将连接到 LeftModel
  • secondary=association_table 告诉 SQLAlchemy 使用 association_table 作为连接表。
  • back_populates 用于双向关系的对称引用。

3)创建数据库并插入数据

下面的代码展示了如何创建数据库、插入数据并查询多对多关系。

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

# 创建数据库引擎
engine = create_engine('sqlite:///example.db')
Base.metadata.create_all(engine)

Session = sessionmaker(bind=engine)
session = Session()

# 创建模型实例
left1 = LeftModel(name="Left 1")
right1 = RightModel(name="Right 1")
right2 = RightModel(name="Right 2")

# 设置多对多关系
left1.rights = [right1, right2]

# 添加到会话并提交
session.add(left1)
session.commit()

# 查询并打印关系
for right in left1.rights:
    print(right.name)  # 输出: Right 1, Right 2

for left in right1.lefts:
    print(left.name)  # 输出: Left 1

你可以像操作普通列表一样来处理这些关系,例如添加、删除关联等:

# 添加关系
left1.rights.append(RightModel(name="Right 3"))
session.commit()

# 删除关系
left1.rights.remove(right2)
session.commit()

 通过这些步骤,你可以在 SQLAlchemy 中实现和操作多对多关系。

 

2、在 SQLAlchemy 中联合多个表进行记录关联查询

例如,在我的框架中,字典大类和字典项目是不同的表进管理的,因此如果需要根据大类名称进行字典项目的查询,那么就需要联合两个表进行处理。

具体操作如下:创建一个查询,将 DictDataInfo 表与 DictTypeInfo 表联接(通过 DictType_IDId 列)

from sqlalchemy.future import select
from sqlalchemy.orm import aliased
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.orm import sessionmaker

# 假设你的数据库模型是 DictDataInfo 和 DictTypeInfo
# 需要提前定义好这两个模型类

DATABASE_URL = "mysql+asyncmy://username:password@localhost/mydatabase"
engine = create_async_engine(DATABASE_URL, echo=True)
AsyncSessionLocal = sessionmaker(bind=engine, class_=AsyncSession, expire_on_commit=False)

async def get_dict_data(dict_type_name: str):
    async with AsyncSessionLocal() as session:
        # 创建别名
        DictData = aliased(DictDataInfo)
        DictType = aliased(DictTypeInfo)

        # 联合查询并根据条件过滤
        stmt = (
            select(DictData)
            .join(DictType, DictData.DictType_ID == DictType.id)
            .filter(DictType.name == dict_type_name)
        )

        result = await session.execute(stmt)
        dict_data = result.scalars().all()

        return dict_data

# 示例用法
import asyncio

async def example_usage():
    dict_type_name = "some_type_name"
    dict_data = await get_dict_data(dict_type_name)
    for data in dict_data:
        print(data)

代码说明

  1. aliased: 使用 aliased 创建表的别名,这样可以方便地在查询中引用这些表。

  2. join: 使用 join 进行表连接。这里 DictDataInfo 表的 DictType_ID 列与 DictTypeInfo 表的 id 列连接。

  3. filter: 使用 filter 来添加条件筛选,筛选出 DictTypeInfo 表中 name 列等于 dict_type_name 的记录。

  4. select: 使用 select 语句来选择 DictDataInfo 表中的记录,这对应于 Select(d => d)

  5. 异步操作: 由于使用的是 SQLAlchemy 的异步模式,所有数据库操作都在 async withawait 语句中进行,以确保异步执行。

如果我们需要将获得的数据进行对象转换,我们可以使用下面的处理代码实现。

# 定义 CListItem 类
class CListItem:
    def __init__(self, name, value):
        self.name = name
        self.value = value

# 定义示例列表和转换操作
def convert_list_items(list_items):
    dict_list = []
    if list_items:  # 确保 list_items 不是 None
        for info in list_items.Items:
            dict_list.append(CListItem(info.Name, info.Value))
    return dict_list

 

3、使用sqlalchemy插入数据的时候,如何判断为非自增类型的时候,id赋值一个有序列的uuid值

有时候,我们的数据表主键是用字符串的,这种适用于很广的用途,比较容易在插入的时候就确定好id键的值,从而可以处理相关的内容。

但是,有时候我们可以让后端进行确定一个有序的ID值,那么使用SQLAlchemy 我们应该如何实现?

首先,确保你已经导入了 uuid 库,这是用于生成 UUID 的 Python 标准库。

有序 UUID 通常是基于时间的 UUID。你可以使用 uuid.uuid1() 来生成基于时间的 UUID。

def generate_sequential_uuid():
    return uuid.uuid1()  # 基于时间生成有序UUID

在定义 SQLAlchemy 模型时,可以将 id 字段设置为使用该函数生成的 UUID。通常在模型中通过 default 参数设置默认值。

from sqlalchemy import Column, String
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

class MyModel(Base):
    __tablename__ = 'my_table'

    id = Column(String(36), primary_key=True, default=generate_sequential_uuid, nullable=False)
    # 其他字段...

在插入新数据时,如果 id 字段为空,它将自动使用 generate_sequential_uuid 函数生成一个基于时间的 UUID。

这样就可以确保在插入数据时,非自增类型的 id 字段会被赋值为一个有序列的 UUID 值。

对于自增的整型 id,SQLAlchemy 提供了自动处理机制。你只需要在模型中将 id 字段定义为 Integer 类型,并设置 primary_key=True,SQLAlchemy 就会自动为该字段设置自增属性。

from sqlalchemy import Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

class MyModel(Base):
    __tablename__ = 'my_table'

    id = Column(Integer, primary_key=True, autoincrement=True)
    name = Column(String(50))
    # 其他字段..

默认情况下,SQLAlchemy 会使用数据库的原生自增机制(如 MySQL 的 AUTO_INCREMENT 或 PostgreSQL 的 SERIAL)。如果你需要使用自定义的自增策略,可以通过设置 Sequence 来实现(适用于支持 Sequence 的数据库,如 PostgreSQL)。

from sqlalchemy import Sequence

class MyModel(Base):
    __tablename__ = 'my_table'

    id = Column(Integer, Sequence('my_sequence'), primary_key=True)
    name = Column(String(50))

在上述代码中,Sequence('my_sequence') 定义了一个序列,SQLAlchemy 将使用该序列生成自增的 id 值。

通过这些步骤,你可以轻松处理整型自增 id 字段,SQLAlchemy 会自动为每个新记录分配唯一的自增 id

 

4、在插入记录的时候,对字符串的数据处理

在批量插入数据字典的时候,我希望根据用户输入内容(多行数据)进行转化,把每行的数据分拆进行判断,如果符合条件的进行处理插入。

在 Python 中,可以使用字符串的 splitlines() 方法来实现相同的功能。

# 假设 Data 和 input.Seq 是从输入中获取的
Data = "example\nline1\nline2\n"  # 示例数据
input_seq = "123"  # 示例序列字符串

# 将 Data 按行拆分,并移除空行
array_items = [line for line in Data.splitlines() if line]

# 初始化变量
int_seq = -1
seq_length = 3
str_seq = input_seq

# 尝试将 str_seq 转换为整数
if str_seq.isdigit():
    int_seq = int(str_seq)
    seq_length = len(str_seq)

# 打印结果
print(f"Array Items: {array_items}")
print(f"int_seq: {int_seq}")
print(f"seq_length: {seq_length}")
  • Python 的 splitlines() 方法将字符串按行分割,同时自动处理各种换行符(包括 \n\r\n)。
  • 列表推导式 [line for line in Data.splitlines() if line] 移除了空行,类似于 C# 中的 StringSplitOptions.RemoveEmptyEntries
  • 使用 str_seq.isdigit() 检查 str_seq 是否全部由数字组成,这类似于 C# 的 int.TryParse

在 Python 中,可以使用 re.split() 函数来按照正则表达式分割字符串。以下是对应的 Python 代码:

import re

# 假设 info 是一个包含 Name 和 Value 属性的对象
class Info:
    def __init__(self):
        self.Name = ""
        self.Value = ""

info = Info()

# dictData 是输入的字符串
dict_data = "example_name example_value"

# 使用正则表达式按照空白字符分割字符串
array = re.split(r'\s+', dict_data)

# 赋值给 info 对象的属性
info.Name = array[0]
info.Value = array[1] if len(array) > 1 else array[0]

# 打印结果
print(f"Name: {info.Name}")
print(f"Value: {info.Value}")

使用 re.split() 函数根据空白字符(包括空格、制表符等)分割字符串 dict_datar'\s+' 是一个正则表达式,表示一个或多个空白字符。

如果你需要根据多个分隔符来分割字符串,同样可以使用正则表达式(re 模块)的 re.split() 方法。

str_item = " 1,2,3;4;5/6/7、8、9;10 "

import re

result = re.split(r"[;,|/,;、]+", str_item.strip())
print(result)

结果输出:['1', '2', '3', '4', '5', '6', '7', '8', '9', '10']

解释:

  • re.split(r'[;,|/,;、]', text) 中的 r'[;,|/,;、]' 是一个正则表达式模式:
    [] 表示字符类,表示匹配字符类中的任意一个字符。
    ;,|/,;、 分别表示分号、逗号,竖线,中文逗号,中文分号,和空格,这些字符都将作为分隔符。

使用正则表达式可以灵活处理多个分隔符,适用于更复杂的分割需求。

 

From:https://www.cnblogs.com/wuhuacong/p/18380616
本文地址: http://www.shuzixingkong.net/article/1453
0评论
提交 加载更多评论
其他文章 Swahili-text:华中大推出非洲语言场景文本检测和识别数据集 | ICDAR 2024
论文提出了一个专门针对斯瓦希里语自然场景文本检测和识别的数据集,这在当前研究中是一个未充分开发的语言领域。数据集包括976张带标注的场景图像,可用于文本检测,以及8284张裁剪后的图像用于识别。 来源:晓飞的算法工程笔记 公众号 论文: The First Swahili Language Scen
Swahili-text:华中大推出非洲语言场景文本检测和识别数据集 | ICDAR 2024 Swahili-text:华中大推出非洲语言场景文本检测和识别数据集 | ICDAR 2024 Swahili-text:华中大推出非洲语言场景文本检测和识别数据集 | ICDAR 2024
OpenCV开发笔记(七十九):基于Stitcher类实现全景图片拼接
前言 一个摄像头视野不大的时候,我们希望进行两个视野合并,这样让正视的视野增大,从而可以看到更广阔的标准视野。拼接的方法分为两条路,第一条路是stitcher类,第二条思路是特征点匹配。 本篇使用stitcher匹配,进行两张图来视野合并拼接。 Demo 两张图拼接过程 步骤一:打开图片 cv::M
OpenCV开发笔记(七十九):基于Stitcher类实现全景图片拼接 OpenCV开发笔记(七十九):基于Stitcher类实现全景图片拼接 OpenCV开发笔记(七十九):基于Stitcher类实现全景图片拼接
组合逻辑环(Combinational Logic Loop)
组合逻辑电路 组合逻辑电路是数字电子学中一类基本的电路类型,它由一系列逻辑门组成,用于实现特定的逻辑功能。与时序逻辑电路不同,组合逻辑电路的输出完全取决于当前的输入信号,而不受之前输入的影响。换句话说,组合逻辑电路没有记忆功能,输出仅由当前时刻的输入决定。 组合逻辑电路的基本特点: 无记忆性:输出只
equals与hashCode关系梳理
目录equals用法hashCode用法总结为什么一个类中需要两个比较方法为什么重写 equals 方法时必须同时重写 hashCode 方法?Reference 这个并不是一个通用性编程问题,只属于在Java领域内专有问题。 要做好心理准备,这是一个复杂类的问题,要解答这个问题,需要梳理清楚两个函
使用 SpanMetrics Connector 将 OpenTelemetry 跟踪转换为指标
原文:https://last9.io/blog/convert-opentelemetry-traces-to-metrics-using-spanconnector/ 如果您已经实施了跟踪但缺乏强大的指标功能怎么办? SpanConnector 是一个通过将跟踪数据转换为可操作指标来弥补这一差距
使用 SpanMetrics Connector 将 OpenTelemetry 跟踪转换为指标 使用 SpanMetrics Connector 将 OpenTelemetry 跟踪转换为指标
《数据资产管理核心技术与应用》读书笔记-第五章:数据服务(二)
《数据资产管理核心技术与应用》是清华大学出版社出版的一本图书,全书共分10章,第1章主要让读者认识数据资产,了解数据资产相关的基础概念,以及数据资产的发展情况。第2~8章主要介绍大数据时代数据资产管理所涉及的核心技术,内容包括元数据的采集与存储、数据血缘、数据质量、数据监控与告警、数据服务、数据权限
《数据资产管理核心技术与应用》读书笔记-第五章:数据服务(二) 《数据资产管理核心技术与应用》读书笔记-第五章:数据服务(二) 《数据资产管理核心技术与应用》读书笔记-第五章:数据服务(二)
【故障公告】博客站点遭遇大规模疑似 CC 攻击
在上周五 12:24-14:05 遭遇大规模 DDoS 攻击后,今天11:40左右开始遭遇疑似大规模 CC 攻击,明显的特征是有很多来自海外各个国家IP的异常高频次访问请求。 虽然我们针对海外访问临时采取了躲避措施,但攻击请求不仅限于海外,我们躲不过去。 非常抱歉,这次攻击给大家带来了麻烦,请大家谅
TwinCAT3 - 实现CiA402
目录1,起缘2,想办法3,开搞3.1,CANOpen通信3.1.1 对象字典3.1.2 通信建立3.2,CiA402伺服状态机3.3,伺服运行3.3.1 操作模式3.3.2 轮廓位置模式3.3.3 轮廓速度模式3.3.4 其他4,用起来 1,起缘 在TwinCAT3项目中涉及到轴运动时,通常做法都是
TwinCAT3 - 实现CiA402 TwinCAT3 - 实现CiA402 TwinCAT3 - 实现CiA402