1、表关系
一对一
一对一其实就是一对多的特殊情况。
from sqlalchemy import create_engine, Column, Float, String, DATETIME, TEXT, Integer, String, Float, TEXT, ForeignKey
from sqlalchemy.orm import sessionmaker, relationship, backref
from sqlalchemy.ext.declarative import declarative_base
# 127.0.0.1
HOSTNAME = "localhost"
DATABASE = "demo0425"
PORT = 3306
USERNAME = "root"
PASSWORD = "root"
DB_URL = 'mysql+mysqlconnector://{}:{}@{}:{}/{}'.format(USERNAME, PASSWORD, HOSTNAME, PORT, DATABASE)
# 创建引擎并生成Base类
engine = create_engine(DB_URL)
Base = declarative_base(engine)
# 一对一
class User(Base):
__tablename__ = "user"
id = Column(Integer, primary_key=True, autoincrement=True)
username = Column(String(50))
user_extend = relationship("User_Extend", uselist=False) #
def __str__(self):
return "User{username:%s}" % self.username
class User_Extend(Base):
__tablename__ = "user_extend"
id = Column(Integer, primary_key=True, autoincrement=True)
school = Column(String(50))
# 外键
uid = Column(Integer, ForeignKey("user.id"))
user = relationship("User") # "User" 注意是类
Base.metadata.drop_all()
Base.metadata.create_all()
session = sessionmaker(bind=engine)()
user = User(username="x1")
user_extend1 = User_Extend(school="y1")
user_extend2 = User_Extend(school="y2")
# 用下面这种添加记录模式不会报错
# user_extend1.user = user
# user_extend2.user = user
# session.add(user_extend1)
# session.add(user_extend2)
# session.commit()
# 用下面这种方式添加一对一的数据,会被禁止
# AttributeError: 'NoneType' object has no attribute 'append'
user.user_extend.append(user_extend1)
user.user_extend.append(user_extend2)
session.commit()
class User(Base):
__tablename__ = 'users'
id = Column(Integer,primary_key=True)
name = Column(String(50))
fullname = Column(String(50))
password = Column(String(100))
addresses = relationship("Address",backref='addresses',uselist=False)
class Address(Base):
__tablename__ = 'addresses'
id = Column(Integer,primary_key=True)
email_address = Column(String(50))
user_id = Column(Integer,ForeignKey('users.id')
user = relationship('Address',backref='user')
从以上例子可以看到,只要在User表中的addresses字段上添加uselist=False就可以达到一对一的效果
多对多
多对多需要一个中间表来作为连接,同理在sqlalchemy中的orm也需要一个中间表。假如现在有一个Teacher和一个Classes表,即老师和班级,一个老师可以教多个班级,一个班级有多个老师,是一种典型的多对多的关系,那么通过sqlalchemy的ORM的实现方式。
from sqlalchemy import create_engine, Column, Float, String, DATETIME, TEXT, Integer, String, Float, TEXT, ForeignKey
from sqlalchemy.orm import sessionmaker, relationship, backref
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Table # 多对多需要利用Table创建中间表
# 127.0.0.1
HOSTNAME = "localhost"
DATABASE = "demo0425"
PORT = 3306
USERNAME = "root"
PASSWORD = "root"
DB_URL = 'mysql+mysqlconnector://{}:{}@{}:{}/{}'.format(USERNAME, PASSWORD, HOSTNAME, PORT, DATABASE)
# 创建引擎并生成Base类
engine = create_engine(DB_URL)
Base = declarative_base(engine)
# 中间表的定义
'''
my_table = Table('mytable', metadata,
... Column('id', Integer, primary_key=True),
... Column('version_id', Integer, primary_key=True),
... Column('data', String(50))
... )
'''
teacher_classes = Table(
"teacher_classes", Base.metadata,
Column('teacher_id', Integer, ForeignKey('teacher.id')),
Column('classes_id', Integer, ForeignKey('classes.id')), # ForeignKey('classes.id'外键的选取
)
class Teacher(Base):
__tablename__ = "teacher"
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(50))
classes = relationship('Classes', backref='teacher', secondary='teacher_classes') # 关联中间表secondary=teacher_classes
def __str__(self):
return "Teacher(name:%s)"% self.name
class Classes(Base):
__tablename__ = "classes"
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(50))
# Teacher = relationship("Teacher")
def __str__(self):
return "Classes(name:%s)"% self.name
Base.metadata.drop_all()
Base.metadata.create_all()
session = sessionmaker(bind=engine)()
teacher1 = Teacher(name="x1老师")
teacher2 = Teacher(name="x2老师")
classes1 = Classes(name="y1班")
classes2 = Classes(name="y2班")
teacher1.classes.append(classes1) # teacher1.classes指Teacher中定义的字段classes
teacher1.classes.append(classes2) # 指Teacher中定义的字段classes
teacher2.classes.append(classes1)
teacher2.classes.append(classes2)
session.add(teacher1)
session.add(teacher2)
session.commit()
# 查询老师对应的班级 先查询老师,再定班级
teacher = session.query(Teacher).first()
print(teacher)
for teacher.classe in teacher.classes: # teacher.classes利用了反向查询属性
print(teacher.classe) # 输出数据需要__str__魔法方法
print("^"*30)
# 班级对应的老师
classes = session.query(Classes).first()
print(classes)
for data in classes.teacher: # 依赖backref='teacher'这个反向查询属性
print(data)
要创建一个多对多的关系表,首先需要一个中间表,通过Table来创建一个中间表。上例中第一个参数teacher_classes代表的是中间表的表名,第二个参数是Base的元类,第三个和第四个参数就是要连接的两个表,其中Column第一个参数是表示的是连接表的外键名,第二个参数表示这个外键的类型,第三个参数表示要外键的表名和字段。中间表用于存放两个表的外键。
# 中间表创建模板,用于存放两个表的外键
my_table = Table('mytable', metadata,
... Column('id', Integer, primary_key=True),
... Column('version_id', Integer, primary_key=True),
... Column('data', String(50))
... )
2、排序和查询高级
排序
1.order_by:可以指定根据这个表中的某个字段进行排序,如果在前面加了一个-,代表的是降序排序。
2.在模型定义的时候指定默认排序:有些时候,不想每次在查询的时候都指定排序的方式,可以在定义模型的时候就指定排序的方式。
from sqlalchemy import create_engine, Column, Float, String, DATETIME, TEXT, Integer, String, Float, TEXT, ForeignKey
from sqlalchemy.orm import sessionmaker, relationship, backref
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Table # 多对多需要利用Table创建中间表
# 127.0.0.1
HOSTNAME = "localhost"
DATABASE = "demo0425"
PORT = 3306
USERNAME = "root"
PASSWORD = "root"
DB_URL = 'mysql+mysqlconnector://{}:{}@{}:{}/{}'.format(USERNAME, PASSWORD, HOSTNAME, PORT, DATABASE)
# 创建引擎并生成Base类
engine = create_engine(DB_URL)
Base = declarative_base(engine)
class Article(Base):
__tablename__ = "article"
id = Column(Integer, primary_key=True, autoincrement=True)
title = Column(String(50))
# __repr__方法类似于__str__
def __str__(self):
return "Article(title:%s)" % self.title
# 默认排序设置方法,
__mapper_args__= {
# 'order_by': id, # 默认升叙
# 'order_by': id.desc() # 默认倒叙
'order_by': -id # 默认倒叙
}
# Base.metadata.drop_all()
# Base.metadata.create_all()
session = sessionmaker(bind=engine)()
# for i in range(10):
# article = Article(title='title_%s'% i)
# session.add(article)
# session.commit()
# 查询数据进行排序 默认升序排序
articles = session.query(Article).order_by(Article.id).all()
print(articles) # 列表数据类型
for data in articles:
print(data)
print("*"*30)
# 倒叙排序 两种倒叙写法
# articles = session.query(Article).order_by(Article.id.desc()).all()
articles = session.query(Article).order_by(-Article.id).all()
for data in articles:
print(data)
print("#"*30)
# 默认排序方式,按照__mapper_args__方法 可以实现在查询时候就默认有排序方式
articles = session.query(Article).all()
for data in articles:
print(data)
在模型定义中,添加以下代码
__mapper_args__ = {
"order_by": title
}
即可让文章使用标题来进行排序。
3.正向排序和反向排序:默认情况是从小到大,从前到后排序的,如果想要反向排序,可以调用排序的字段的desc方法。
limit、offset和切片
- limit:可以限制每次查询的时候只查询几条数据。
- offset:可以限制查找数据的时候过滤掉前面多少条。
- 切片:可以对Query对象使用切片操作,来获取想要的数据。
from sqlalchemy import create_engine, Column, Float, String, DATETIME, TEXT, Integer, String, Float, TEXT, ForeignKey
from sqlalchemy.orm import sessionmaker, relationship, backref
from sqlalchemy.ext.declarative import declarative_base
# 127.0.0.1
HOSTNAME = "localhost"
DATABASE = "demo0425"
PORT = 3306
USERNAME = "root"
PASSWORD = "root"
DB_URL = 'mysql+mysqlconnector://{}:{}@{}:{}/{}'.format(USERNAME, PASSWORD, HOSTNAME, PORT, DATABASE)
# 创建引擎并生成Base类
engine = create_engine(DB_URL)
Base = declarative_base(engine)
class Article(Base):
__tablename__ = "article"
id = Column(Integer, primary_key=True, autoincrement=True)
title = Column(String(50))
# creat_time= Column()
# __repr__方法类似于__str__
def __str__(self):
return "Article(title:%s)" % self.title
# 默认排序设置方法,
__mapper_args__= {
# 'order_by': id, # 默认升叙
# 'order_by': id.desc() # 默认倒叙
'order_by': -id # 默认倒叙
}
# Base.metadata.drop_all()
# Base.metadata.create_all()
session = sessionmaker(bind=engine)()
# for i in range(10):
# article = Article(title='title_%s'% i)
# session.add(article)
# session.commit()
# 查询数据进行排序 默认升序排序
articles = session.query(Article).order_by(Article.id).all()
print(articles) # 列表数据类型
for data in articles:
print(data)
print("*"*30)
# 倒叙排序 两种倒叙写法
# articles = session.query(Article).order_by(Article.id.desc()).all()
articles = session.query(Article).order_by(-Article.id).all()
for data in articles:
print(data)
print("#"*30)
# 默认排序方式,按照__mapper_args__方法 可以实现在查询时候就默认有排序方式
articles = session.query(Article).all()
for data in articles:
print(data)
print('查询多条数据,翻页功能')
# 查询前3条数据 目前设置的默认排序是倒叙
articles = session.query(Article).limit(3).all()
for data in articles:
print(data)
print('查询3-5条数据')
# 查询3-5条数据 .offset(2)代表从第2条数据开始(从0开始计算),.limit(3)代表调取3条数据
# articles = session.query(Article).offset(2).limit(3).all() # offset 偏移量
articles = session.query(Article).order_by(Article.id).offset(2).limit(3).all() # 先设置排序,再选择调取几条数据
for data in articles:
print(data)
# 查询原生sql语句
articles = session.query(Article).limit(3).offset(2)
print(articles)
print('!'*40)
# 切片数据 对查询结果的列表进行切片操作
articles = session.query(Article).all()[2:5] # 读取第2-5条数据 即2,3,4三条数据,第5条数据查不到
for data in articles:
print(data)
查询高级
group_by
根据某个字段进行分组。比如想要根据性别进行分组,来统计每个分组分别有多少人。
class User(Base):
__tablename__ = "user1"
id = Column(Integer, primary_key=True, autoincrement=True)
username = Column(String(50))
gender = Column(Enum('男', '女'), default='男')
age = Column(Integer)
def __str__(self):
return "User{username:%s}" % self.username
# Base.metadata.drop_all()
# Base.metadata.create_all()
session = sessionmaker(bind=engine)()
# import random
# for i in range(10):
# users = User(username='title_%s'%i, age=random.randint(10, 25))
# session.add(users)
# session.commit()
# 按照性别区分 求男女的人数
# 聚合函数 一般和分组一起使用
from sqlalchemy import func
result = session.query(User.gender).group_by(User.gender).all()
print(result)
# 按照func.count(User.id)来求和,gender等字段无法求和
result = session.query(User.gender, func.count(User.id)).group_by(User.gender).all()
print(result)
having
having是对查找结果进一步过滤。比如只想要看未成年人的数量,那么可以首先对年龄进行分组统计人数,然后再对分组进行having过滤。
# having
# 在原生数据库中操作的时候,having是用于where语句分组之后的数据进行判定
result = session.query(User.age, func.count(User.id)).group_by(User.age).all() # 先根据年龄分组数据
# [(18, 3), (10, 1), (17, 1), (20, 1), (21, 2), (24, 1), (15, 1)] 代表是18岁3个,10岁1个,17岁1个
print(result)
result = session.query(User.age, func.count(User.id)).group_by(User.age).having(User.age >= 18).all() # 年龄分组数据之后再having分类
print(result)
join方法
join查询分为两种,一种是inner join,另一种是outer join。默认的是inner join,如果指定left join或者是right join则为outer join。如果想要查询User及其对应的Address,则可以通过以下方式来实现。
join查询知识点在我之前的博客中有介绍:https://blog.csdn.net/weixin_42118531/article/details/103552544
for u,a in session.query(User,Address).filter(User.id==Address.user_id).all():
print(u)
print(a)
这是通过普通方式的实现,也可以通过join的方式实现,更加简单。
for u,a in session.query(User,Address).join(Address).all():
print(u)
print(a)
当然,如果采用outerjoin,可以获取所有user,而不用在乎这个user是否有address对象,并且outerjoin默认为左外查询:
for instance in session.query(User,Address).outerjoin(Address).all():
print(instance)
别名
当多表查询的时候,有时候同一个表要用到多次,这时候用别名就可以方便的解决命名冲突的问题了:
from sqlalchemy.orm import aliased
adalias1 = aliased(Address)
adalias2 = aliased(Address)
for username,email1,email2 in session.query(User.name,adalias1.email_address,adalias2.email_address).join(adalias1).join(adalias2).all()
print(username,email1,email2)
子查询
sqlalchemy也支持子查询,比如现在要查找一个用户的用户名以及该用户的邮箱地址数量。要满足这个需求,可以在子查询中找到所有用户的邮箱数(通过group by合并同一用户),然后再将结果放在父查询中进行使用:
from sqlalchemy.sql import func
# 构造子查询
stmt = session.query(Address.user_id.label('user_id'),func.count(*).label('address_count')).group_by(Address.user_id).subquery()
# 将子查询放到父查询中
for u,count in session.query(User,stmt.c.address_count).outerjoin(stmt,User.id==stmt.c.user_id).order_by(User.id):
print u,count
实例如下:
# 查询 username == 'x' 查询city,age相同的user 常用两句进行子查询,数据量不大时候,查询效率差不多,而且查询逻辑清楚
user = session.query(User).filter(User.username == 'x').first()
print(user)
print(user.age)
print(user.city)
results = session.query(User).filter(User.age == user.city, User.city == user.age).all()
print(results)
for result in results:
print(result)
# 子查询就是两个查询的嵌套 .label('city')别名为city,防止重名
sub = session.query(User.city.label('city'), User.age.label('age')).filter(User.username == 'x').subquery()
# 外层查询 sub.c.city相当于sub.colum.city sub.c.age相当于sub.column.age
result = session.query(User).filter(User.city == sub.c.city, User.age == sub.c.age)
print(result)
从上面我们可以看到,一个查询如果想要变为子查询,则是通过subquery()方法实现,变成子查询后,通过子查询.c属性来访问查询出来的列。以上方式只能查询某个对象的具体字段,如果要查找整个实体,则需要通过aliased方法。
stmt = session.query(Address)
adalias = aliased(Address,stmt)
for user,address in session.query(User,stmt).join(stmt,User.addresses):
print user,address
转载:https://blog.csdn.net/weixin_42118531/article/details/105752408