1. 一张表
1.1. 定义数据库模型
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String, nullable=False)
email = Column(String, nullable=False)
# 创建数据库引擎
engine = create_engine('sqlite:///example.db', echo=True)
# 创建所有定义的表
Base.metadata.create_all(engine)
# 创建会话
Session = sessionmaker(bind=engine)
session = Session()1.2. 定义 Pydantic 模型
from pydantic import BaseModel, EmailStr
class UserCreate(BaseModel):
name: str
email: EmailStr
# python3.10
# class UserUpdate(BaseModel):
# name: str | None = None
# email: EmailStr | None = None
# python3.9以下
from typing import Union
from pydantic import BaseModel, EmailStr
class UserUpdate(BaseModel):
name: Union[str, None] = None
email: Union[EmailStr, None] = None
class UserOut(BaseModel):
id: int
name: str
email: EmailStr
class Config:
from_attributes = True # 设置 from_attributes 为 True1.3. CRUD 操作
1.3.1. 增加数据
def create_user(user_data: UserCreate):
user = User(**user_data.dict())
session.add(user)
session.commit()
session.refresh(user)
return user1.3.2. 查询数据
def get_users():
users = session.query(User).all()
return [UserOut.from_orm(user) for user in users]
def get_user(user_id: int):
user = session.query(User).get(user_id)
if user:
return UserOut.from_orm(user)
return None1.3.3. 更新数据
def update_user(user_id: int, user_data: UserUpdate):
user = session.query(User).get(user_id)
if user:
for key, value in user_data.dict().items():
if value is not None:
setattr(user, key, value)
session.commit()
return UserOut.from_orm(user)
return None1.3.4. 删除数据
def delete_user(user_id: int):
user = session.query(User).get(user_id)
if user:
session.delete(user)
session.commit()
return True
return False1.3.5. 示例用法
# 创建用户
new_user_data = UserCreate(name="John Doe", email="john@example.com")
created_user = create_user(new_user_data)
print(f"Created User: {created_user}")
# 获取所有用户
all_users = get_users()
print("All Users:")
for user in all_users:
print(user)
# 获取单个用户
user = get_user(created_user.id)
print(f"User with ID {created_user.id}: {user}")
# 更新用户
updated_user_data = UserUpdate(name="Jane Doe", email="jane@example.com")
updated_user = update_user(created_user.id, updated_user_data)
print(f"Updated User: {updated_user}")
# 删除用户
deleted = delete_user(created_user.id)
print(f"User deleted: {deleted}")
# 关闭会话
session.close()2. 一对多
2.1. 定义数据模型
from sqlalchemy import create_engine, Column, Integer, String, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String, nullable=False)
email = Column(String, nullable=False)
posts = relationship('Post', back_populates='user')
class Post(Base):
__tablename__ = 'posts'
id = Column(Integer, primary_key=True)
title = Column(String, nullable=False)
content = Column(String, nullable=False)
user_id = Column(Integer, ForeignKey('users.id'))
user = relationship('User', back_populates='posts')
# 创建数据库引擎
engine = create_engine('sqlite:///blog.db', echo=True)
# 创建所有定义的表
Base.metadata.create_all(engine)
# 创建会话
Session = sessionmaker(bind=engine)
session = Session()2.2. 定义 pydantic 模型
from pydantic import BaseModel, EmailStr
from typing import List, Optional
class PostCreate(BaseModel):
title: str
content: str
class PostOut(BaseModel):
id: int
title: str
content: str
class Config:
from_attributes = True
class UserCreate(BaseModel):
name: str
email: EmailStr
posts: List[PostCreate] = []
class UserOut(BaseModel):
id: int
name: str
email: EmailStr
posts: List[PostOut] = []
class Config:
from_attributes = True2.3. CRUD 操作
2.3.1. 增加用户和文章
def create_user(user_data: UserCreate):
user = User(name=user_data.name, email=user_data.email)
for post_data in user_data.posts:
post = Post(title=post_data.title, content=post_data.content, user=user)
user.posts.append(post)
session.add(user)
session.commit()
session.refresh(user)
return user2.3.2. 查询用户及其文章
def get_users():
users = session.query(User).all()
return [UserOut.from_orm(user) for user in users]
def get_user(user_id: int):
user = session.query(User).get(user_id)
if user:
return UserOut.from_orm(user)
return None2.3.3. 更新用户及其文章
def update_user(user_id: int, user_data: UserCreate):
user = session.query(User).get(user_id)
if user:
user.name = user_data.name
user.email = user_data.email
# 清除现有文章并添加新的文章
user.posts.clear()
for post_data in user_data.posts:
post = Post(title=post_data.title, content=post_data.content, user=user)
user.posts.append(post)
session.commit()
return UserOut.from_orm(user)
return None2.3.4. 删除用户及其文章
def delete_user(user_id: int):
user = session.query(User).get(user_id)
if user:
session.delete(user)
session.commit()
return True
return False2.3.5. 根据文章ID查找对应的用户
def get_user_by_post_id(post_id: int):
post = session.query(Post).get(post_id)
if post:
user = post.user
return UserOut.from_orm(user)
return None2.3.6. 示例用法
# 创建用户及其文章
new_user_data = UserCreate(
name="John Doe",
email="john@example.com",
posts=[
PostCreate(title="First Post", content="This is my first post."),
PostCreate(title="Second Post", content="This is my second post.")
]
)
created_user = create_user(new_user_data)
print(f"Created User: {created_user}")
# 获取所有用户及其文章
all_users = get_users()
print("All Users:")
for user in all_users:
print(user)
# 获取单个用户及其文章
user = get_user(created_user.id)
print(f"User with ID {created_user.id}: {user}")
# 根据文章内容模糊查找用户ID
search_content = "first"
user_ids = get_user_ids_by_post_content(search_content)
print(f"User IDs for posts containing '{search_content}': {user_ids}")
# 更新用户及其文章
updated_user_data = UserCreate(
name="Jane Doe",
email="jane@example.com",
posts=[
PostCreate(title="Updated First Post", content="This is my updated first post."),
PostCreate(title="New Post", content="This is a new post.")
]
)
updated_user = update_user(created_user.id, updated_user_data)
print(f"Updated User: {updated_user}")
# 删除用户及其文章
deleted = delete_user(created_user.id)
print(f"User deleted: {deleted}")
# 关闭会话
session.close()3. 多对对
3.1. 定义数据模型
from sqlalchemy import create_engine, Column, Integer, String, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship
from pydantic import BaseModel, EmailStr, Field, validator
from typing import List, Optional
# 创建数据库引擎
engine = create_engine('sqlite:///mydatabase.db', echo=True)
# 声明基类
Base = declarative_base()
# 创建会话
Session = sessionmaker(bind=engine)
session = Session()
# 定义用户(User)和粉丝(Follower)的ORM模型
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
username = Column(String)
following = relationship("Follower", back_populates="follower", foreign_keys='Follower.follower_id', cascade="all, delete-orphan")
followers = relationship("Follower", back_populates="followed", foreign_keys='Follower.followed_id', cascade="all, delete-orphan")
class Follower(Base):
__tablename__ = 'followers'
follower_id = Column(Integer, ForeignKey('users.id'), primary_key=True)
followed_id = Column(Integer, ForeignKey('users.id'), primary_key=True)
follower = relationship("User", foreign_keys=[follower_id], back_populates="following")
followed = relationship("User", foreign_keys=[followed_id], back_populates="followers")
# 创建所有定义的表
Base.metadata.create_all(engine)
3.2. 定义 Pydantic 模型
# 定义 Pydantic 模型
class UserCreate(BaseModel):
username: str
followed_ids: List[int] = []
class UserOut(BaseModel):
id: int
username: str
following: List['UserOut'] = []
followers: List['UserOut'] = []
@validator('following', 'followers', pre=True, always=True)
def avoid_recursion(cls, v):
if isinstance(v, list):
return [UserOut(id=item.followed.id, username=item.followed.username) for item in v]
return v
class Config:
from_attributes = True
# 使用 Pydantic 的递归引用
UserOut.update_forward_refs()
3.3. CRUE
3.3.1. 创建用户
def create_user(user_data: UserCreate):
user = User(username=user_data.username)
followed_users = session.query(User).filter(User.id.in_(user_data.followed_ids)).all()
for followed_user in followed_users:
session.add(Follower(follower=user, followed=followed_user))
session.add(user)
session.commit()
session.refresh(user)
return user3.3.2. 查询用户
def get_users():
users = session.query(User).all()
return [UserOut.from_orm(user) for user in users]3.3.3. 根据用户 ID 查询
def get_user(user_id: int):
user = session.query(User).get(user_id)
if user:
return UserOut.from_orm(user)
return None3.3.4. 更新用户关注信息
def update_user(user_id: int, user_data: UserCreate):
user = session.query(User).get(user_id)
if user:
user.username = user_data.username
# 清除现有关注关系
session.query(Follower).filter_by(follower_id=user.id).delete()
# 添加新的关注关系
followed_users = session.query(User).filter(User.id.in_(user_data.followed_ids)).all()
for followed_user in followed_users:
session.add(Follower(follower=user, followed=followed_user))
session.commit()
return UserOut.from_orm(user)
return None3.3.5. 删除用户后更新关系
def delete_user(user_id: int):
user = session.query(User).get(user_id)
if user:
# 删除用户及其相关记录
session.query(Follower).filter_by(follower_id=user.id).delete()
session.query(Follower).filter_by(followed_id=user.id).delete()
session.delete(user)
session.commit()
return True
return False3.3.6. 示例
# 创建用户
user1_data = UserCreate(username="Alice", followed_ids=[])
user2_data = UserCreate(username="Bob", followed_ids=[])
user1 = create_user(user1_data)
user2 = create_user(user2_data)
# 添加粉丝关系
update_user(user1.id, UserCreate(username="Alice", followed_ids=[user2.id]))
# 查询用户信息及其粉丝
user_info = get_user(user1.id)
print("用户 ID:", user_info.id, "用户名:", user_info.username)
print("用户 ID 1 关注的用户:")
for follow in user_info.following:
print(follow.id, follow.username)
print("用户 ID 1 的粉丝:")
for follower in user_info.followers:
print(follower.id, follower.username)
# 删除用户及其相关记录
delete_user(user2.id)
session.close()4. ORM 权限
4.1. models
from sqlalchemy import create_engine, Column, Integer, String, ForeignKey, Table
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship, sessionmaker
Base = declarative_base()
# 多对多关联表
user_role = Table('user_role', Base.metadata,
Column('user_id', Integer, ForeignKey('users.id')),
Column('role_id', Integer, ForeignKey('roles.id'))
)
role_permission = Table('role_permission', Base.metadata,
Column('role_id', Integer, ForeignKey('roles.id')),
Column('permission_id', Integer, ForeignKey('permissions.id'))
)
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
username = Column(String(50), unique=True, nullable=False)
roles = relationship("Role", secondary=user_role, back_populates="users")
class Role(Base):
__tablename__ = 'roles'
id = Column(Integer, primary_key=True)
name = Column(String(50), unique=True, nullable=False)
users = relationship("User", secondary=user_role, back_populates="roles")
permissions = relationship("Permission", secondary=role_permission, back_populates="roles")
class Permission(Base):
__tablename__ = 'permissions'
id = Column(Integer, primary_key=True)
name = Column(String(50), unique=True, nullable=False)
roles = relationship("Role", secondary=role_permission, back_populates="permissions")
# 初始化数据库连接
engine = create_engine('sqlite:///auth.db', echo=True) # 使用SQLite数据库
Base.metadata.create_all(engine) # 创建所有表
# 创建会话
Session = sessionmaker(bind=engine)
session = Session()4.2. main
from models import User,Permission,session,Role,user_role,role_permission
# 添加用户
user1 = User(username='admin')
user2 = User(username='user')
# 添加角色
role_admin = Role(name='admin')
role_user = Role(name='user')
# 添加权限
perm_read = Permission(name='read')
perm_write = Permission(name='write')
# 给角色分配权限
role_admin.permissions.append(perm_read)
role_admin.permissions.append(perm_write)
role_user.permissions.append(perm_read)
# 给用户分配角色
user1.roles.append(role_admin)
user2.roles.append(role_user)
# 提交到数据库
session.add_all([user1, user2, role_admin, role_user, perm_read, perm_write])
session.commit()
# 打印每个用户的所有权限
users = session.query(User).all()
for user in users:
print(f"User: {user.username}")
for role in user.roles:
print(f" Role: {role.name}")
for permission in role.permissions:
print(f" Permission: {permission.name}")
# 查询
print("Users with admin role:")
for user in session.query(User).join(user_role).join(Role).filter(Role.name == 'admin'):
print(user.username)
print("Permissions of user 'admin':")
for permission in session.query(Permission).join(role_permission).join(Role).join(user_role).join(User).filter(User.username == 'admin'):
print(permission.name)5. ORM
from sqlalchemy import create_engine, Column, Integer, String, ForeignKey, Table
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship, sessionmaker
from pydantic import BaseModel, ValidationError, Field
import hashlib
import datetime
import jwt
from functools import wraps
Base = declarative_base()
# 多对多关联表
user_role = Table('user_role', Base.metadata,
Column('user_id', Integer, ForeignKey('users.id')),
Column('role_id', Integer, ForeignKey('roles.id'))
)
role_permission = Table('role_permission', Base.metadata,
Column('role_id', Integer, ForeignKey('roles.id')),
Column('permission_id', Integer, ForeignKey('permissions.id'))
)
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
username = Column(String(50), unique=True, nullable=False)
password_hash = Column(String(128), nullable=False)
roles = relationship("Role", secondary=user_role, back_populates="users")
def set_password(self, password):
self.password_hash = hashlib.sha256(password.encode()).hexdigest()
def check_password(self, password):
return self.password_hash == hashlib.sha256(password.encode()).hexdigest()
def generate_auth_token(self, secret_key, expiration=3600):
payload = {
'id': self.id,
'exp': datetime.datetime.utcnow() + datetime.timedelta(seconds=expiration)
}
return jwt.encode(payload, secret_key, algorithm='HS256')
class Role(Base):
__tablename__ = 'roles'
id = Column(Integer, primary_key=True)
name = Column(String(50), unique=True, nullable=False)
users = relationship("User", secondary=user_role, back_populates="roles")
permissions = relationship("Permission", secondary=role_permission, back_populates="roles")
class Permission(Base):
__tablename__ = 'permissions'
id = Column(Integer, primary_key=True)
name = Column(String(50), unique=True, nullable=False)
roles = relationship("Role", secondary=role_permission, back_populates="permissions")
# 初始化数据库连接
engine = create_engine('sqlite:///auth.db', echo=True) # 使用SQLite数据库
Base.metadata.create_all(engine) # 创建所有表
# 创建会话
Session = sessionmaker(bind=engine)
session = Session()
# 配置
SECRET_KEY = 'your_secret_key'
# Pydantic模型
class TokenPayload(BaseModel):
id: int
exp: datetime.datetime
# 装饰器:验证Token
def token_required(f):
@wraps(f)
def decorated(*args, **kwargs):
token = kwargs.pop('token', None)
if not token:
return {"message": "Token is missing"}, 401
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=['HS256'])
token_payload = TokenPayload(**payload)
user = session.query(User).get(token_payload.id)
if not user:
return {"message": "User not found"}, 401
except (jwt.ExpiredSignatureError, jwt.InvalidTokenError, ValidationError):
return {"message": "Token is invalid or expired"}, 401
return f(user, *args, **kwargs)
return decorated
# 注册用户
def register_user(username, password):
if session.query(User).filter_by(username=username).first():
return {"message": "User already exists"}, 400
new_user = User(username=username)
new_user.set_password(password)
session.add(new_user)
session.commit()
return {"message": "User registered successfully"}, 201
# 登录用户
def login_user(username, password):
user = session.query(User).filter_by(username=username).first()
if user and user.check_password(password):
token = user.generate_auth_token(SECRET_KEY)
return {"message": "Login successful", "token": token}, 200
else:
return {"message": "Invalid credentials"}, 401
# 示例受保护的函数
@token_required
def protected_function(user):
return {"message": f"Hello, {user.username}! This is a protected function."}, 200 # 返回一个包含response和status_code的元组
# 示例:注册用户
response, status_code = register_user('admin', 'password123')
print(response, status_code)
# 示例:登录用户
response, status_code = login_user('admin', 'password123')
print(response, status_code)
# 示例:调用受保护的函数
if response.get('token'):
response, status_code = protected_function(token=response['token'])
print(response, status_code)
else:
print("Failed to get token")运行结果如下
CREATE TABLE users (
id INTEGER NOT NULL,
username VARCHAR(50) NOT NULL,
password_hash VARCHAR(128) NOT NULL,
PRIMARY KEY (id),
UNIQUE (username)
)
CREATE TABLE roles (
id INTEGER NOT NULL,
name VARCHAR(50) NOT NULL,
PRIMARY KEY (id),
UNIQUE (name)
)
CREATE TABLE permissions (
id INTEGER NOT NULL,
name VARCHAR(50) NOT NULL,
PRIMARY KEY (id),
UNIQUE (name)
)
CREATE TABLE user_role (
user_id INTEGER,
role_id INTEGER,
FOREIGN KEY(user_id) REFERENCES users (id),
FOREIGN KEY(role_id) REFERENCES roles (id)
)
CREATE TABLE role_permission (
role_id INTEGER,
permission_id INTEGER,
FOREIGN KEY(role_id) REFERENCES roles (id),
FOREIGN KEY(permission_id) REFERENCES permissions (id)
)
FROM users
WHERE users.username = ?
LIMIT ? OFFSET ?
{'message': 'User registered successfully'} 201
FROM users
WHERE users.username = ?
LIMIT ? OFFSET ?
{'message': 'Login successful', 'token': 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpZCI6MSwiZXhwIjoxNzI4ODIwMjU5fQ.cajZ-sYxjmzOGyXw6UrlkTjcQYDNTkCWgqUB3gyluI0'} 200
C:\Users\simon\Desktop\webui\app.py:86: LegacyAPIWarning: The Query.get() method is considered legacy as of the 1.x series of SQLAlchemy and becomes a legacy construct in 2.0. The method is now available as Session.get() (deprecated since: 2.0) (Background on SQLAlchemy 2.0 at: https://sqlalche.me/e/b8d9)
user = session.query(User).get(token_payload.id)
FROM users
WHERE users.id = ?
{'message': 'Hello, admin! This is a protected function.'} 2006. 根据 表模型创建数据库
from sqlalchemy import Column, Integer, String, ForeignKey, Table, create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship, sessionmaker
from sqlalchemy.inspection import inspect
Base = declarative_base()
def create_models(table1_name, table2_name, relation_type):
# 动态创建模型类
class Table1(Base):
__tablename__ = table1_name
id = Column(Integer, primary_key=True)
name = Column(String)
class Table2(Base):
__tablename__ = table2_name
id = Column(Integer, primary_key=True)
name = Column(String)
if relation_type == 'one-to-many':
# 定义一对多关系
Table1.items = relationship(
lambda: Table2,
back_populates="owner",
foreign_keys=lambda: Table2.owner_id
)
Table2.owner_id = Column(Integer, ForeignKey(f'{table1_name}.id'))
Table2.owner = relationship(
lambda: Table1,
back_populates="items"
)
elif relation_type == 'many-to-many':
# 定义多对多关系
association_table = Table(
f'{table1_name}_{table2_name}_association',
Base.metadata,
Column('left_id', Integer, ForeignKey(f'{table1_name}.id')),
Column('right_id', Integer, ForeignKey(f'{table2_name}.id'))
)
Table1.items = relationship(
lambda: Table2,
secondary=association_table,
back_populates="owners"
)
Table2.owners = relationship(
lambda: Table1,
secondary=association_table,
back_populates="items"
)
return Table1, Table2
def print_table_structure(model):
mapper = inspect(model)
print(f"Table: {model.__tablename__}")
print("Columns:")
for column in mapper.columns:
print(f" - {column.key}: {column.type}")
print("Relationships:")
for rel in mapper.relationships:
print(f" - {rel.key} -> {rel.mapper.class_.__name__}")
# 示例用法
if __name__ == "__main__":
engine = create_engine('sqlite:///database.db', echo=True)
# 创建表模型
Person, Pet = create_models('person', 'pet', 'one-to-many')
Student, Course = create_models('student', 'course', 'many-to-many')
# 确保表已经创建
Base.metadata.create_all(engine)
# 打印表结构
print_table_structure(Person)
print_table_structure(Pet)
print_table_structure(Student)
print_table_structure(Course)
# 使用Session进行数据库操作
Session = sessionmaker(bind=engine)
session = Session()
# 示例添加数据
person = Person(name='John Doe')
pet = Pet(name='Buddy', owner=person)
session.add(person)
session.add(pet)
# 添加多对多关系的数据
student1 = Student(name='Alice')
student2 = Student(name='Bob')
course1 = Course(name='Math')
course2 = Course(name='Science')
# 建立多对多关系
student1.items.append(course1)
student1.items.append(course2)
student2.items.append(course1)
session.add(student1)
session.add(student2)
session.add(course1)
session.add(course2)
session.commit()
# 查询示例
print("Persons and their pets:")
for person in session.query(Person).all():
print(person.name, [pet.name for pet in person.items])
print("Students and their courses:")
for student in session.query(Student).all():
print(student.name, [course.name for course in student.items])
print("Courses and their students:")
for course in session.query(Course).all():
print(course.name, [student.name for student in course.owners])7. 根据字典存取
import sqlite3
import json
# 默认字符串长度
DEFAULT_STRING_LENGTH = 255
def create_table(cursor, table_name, columns, string_lengths=None):
column_definitions = []
for key, value in columns.items():
if isinstance(value, int):
column_type = 'INTEGER'
elif isinstance(value, float):
column_type = 'REAL'
else:
column_type = f'TEXT({string_lengths.get(key, DEFAULT_STRING_LENGTH)})' if string_lengths else f'TEXT({DEFAULT_STRING_LENGTH})'
column_definitions.append(f"{key} {column_type}")
columns_str = ', '.join(column_definitions)
create_table_sql = f"CREATE TABLE IF NOT EXISTS {table_name} (id INTEGER PRIMARY KEY AUTOINCREMENT, {columns_str})"
cursor.execute(create_table_sql)
def insert_data(cursor, table_name, data):
keys = data[0].keys()
placeholders = ', '.join(['?'] * len(keys))
columns = ', '.join(keys)
insert_sql = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})"
cursor.executemany(insert_sql, [tuple(item.values()) for item in data])
def process_json_file(data, table_name, string_lengths=None):
if isinstance(data, dict):
data = [data]
elif isinstance(data, list):
pass
# 获取第一个数据项的键值对类型
columns = {key: type(value) for key, value in data[0].items()}
# 创建表
create_table(cursor, table_name, columns, string_lengths)
# 插入数据
insert_data(cursor, table_name, data)
# 连接到SQLite数据库
conn = sqlite3.connect('test.db')
cursor = conn.cursor()
# 字段长度配置
string_lengths = {
'name': 100 # 例如,设置name字段的最大长度为100
}
data = {"name": "Alice", "age": 30}
data = [{
"name": "Alice",
"age": 30
}, {
"name": "Bob",
"age": 28
}, {
"name": "Charlie",
"age": 22
}]
# 处理JSON文件
process_json_file(data, 'user_json', string_lengths)
# 提交事务
conn.commit()
# 查询所有数据以验证插入是否成功
cursor.execute('SELECT * FROM user_json')
print("Data from user_json:")
print(cursor.fetchall())
# 关闭Cursor和Connection
cursor.close()
conn.close()8. json 格式处理
8.1. JSON 原声 sql
CREATE TABLE my_table (
id INTEGER PRIMARY KEY,
json_data TEXT -- 这里可以存入JSON格式的字符串
);
-- 插入JSON数据
INSERT INTO my_table (json_data) VALUES ('{"key": "value", "number": 42}');但是,为了更方便地处理 JSON 数据,你可能想要确保启用了 JSON1 模块,这允许你执行诸如提取 JSON 值、检查键的存在性等操作。例如:
-- 提取JSON中的值
SELECT json_extract(json_data, '$.key') FROM my_table;
-- 更新JSON中的值
UPDATE my_table SET json_data = json_set(json_data, '$.key', 'new_value');如果你正在使用 Python 的 sqlite3 库,它也提供了对 JSON 的支持。从 Python 3.9 开始,sqlite3 库默认启用 JSON1 扩展。对于较早版本的 Python,你可能需要手动加载这个扩展:
import sqlite3
# 如果你需要手动加载JSON1扩展(Python 3.8及以下)
conn = sqlite3.connect('example.db')
conn.execute('SELECT load_extension("json1")') # 注意:这行代码可能在某些环境中不被允许
# 插入JSON数据
cur = conn.cursor()
cur.execute("INSERT INTO my_table (json_data) VALUES (?)", ('{"key": "value"}',))
conn.commit()
# 查询JSON数据
cur.execute("SELECT json_extract(json_data, '$.key') FROM my_table")
print(cur.fetchone())8.2. 使用 JSON 字段
from sqlalchemy import create_engine, Column, Integer, String, JSON, func, cast
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String)
data = Column(JSON) # 使用JSON字段类型
# 创建SQLite数据库引擎
engine = create_engine('sqlite:///example.db')
# 创建所有表
Base.metadata.create_all(engine)
# 创建会话
Session = sessionmaker(bind=engine)
session = Session()
# 插入测试数据
new_user = User(name='John Doe', data={'age': 30, 'city': 'New York'})
session.add(new_user)
new_user = User(name='Jane Smith', data={'age': 25, 'city': 'Boston'})
session.add(new_user)
session.commit()
# 查询 age 为 30 的用户,并进行类型转换
query = session.query(User).filter(cast(func.json_extract(User.data, '$.age'), Integer) == 30)
for user in query.all():
print(f"User: {user.name}, Age: {user.data['age']}, City: {user.data['city']}")注意,由于 SQLite 的 json_extract 函数返回的是字符串,因此在比较时需要将 Python 中的数值转换为字符串。如果需要进行数值比较,可以使用 CAST 来转换类型:
{
"name": "John Doe",
"address": {
"city": "New York",
"zipcode": "10001"
}
}$.address.city 将返回 "New York"。
{
"name": "John Doe",
"hobbies": ["reading", "swimming"]
}$.hobbies[0] 将返回 "reading"。
9. 过滤查询
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
# 定义基类
Base = declarative_base()
# 定义映射类
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String)
age = Column(Integer)
# 配置数据库连接
engine = create_engine('sqlite:///example.db') # 示例使用 SQLite
# 创建会话
Session = sessionmaker(bind=engine)
session = Session()
# 定义查询条件变量
name_condition = 'Alice'
age_condition = 30
# 示例 1: 根据名字过滤
users_with_name = session.query(User).filter(User.name == name_condition).all()
for user in users_with_name:
print(f"ID: {user.id}, Name: {user.name}, Age: {user.age}")
# 示例 2: 根据年龄过滤
users_over_age = session.query(User).filter(User.age > age_condition).all()
for user in users_over_age:
print(f"ID: {user.id}, Name: {user.name}, Age: {user.age}")
# 示例 3: 复合条件过滤
# 查询名字为 name_condition 且年龄大于 age_condition 的用户
users_with_name_and_age = session.query(User).filter(User.name == name_condition, User.age > age_condition).all()
for user in users_with_name_and_age:
print(f"ID: {user.id}, Name: {user.name}, Age: {user.age}")
# 关闭会话
session.close()10. 复合查询
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
# 定义基类
Base = declarative_base()
# 定义映射类
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String)
age = Column(Integer)
city = Column(String)
# 配置数据库连接
engine = create_engine('sqlite:///example.db') # 示例使用 SQLite
# 创建会话
Session = sessionmaker(bind=engine)
session = Session()
# 定义查询条件变量
name_condition = 'Alice'
age_condition = 30
city_keyword = 'New'
# 示例:复合查询
# 查询名字为 name_condition,年龄大于 age_condition,且城市名称包含 city_keyword 的用户
users = session.query(User).filter(
User.name == name_condition,
User.age > age_condition,
User.city.like(f'%{city_keyword}%')
).all()
for user in users:
print(f"ID: {user.id}, Name: {user.name}, Age: {user.age}, City: {user.city}")
# 关闭会话
session.close()