Sqlit操作合集

simon
3
2025-10-15

1. 一张表

1.1. 定义数据库模型
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

Base = declarative_base()

class User(Base):
    __tablename__ = 'users'
    
    id = Column(Integer, primary_key=True)
    name = Column(String, nullable=False)
    email = Column(String, nullable=False)

# 创建数据库引擎
engine = create_engine('sqlite:///example.db', echo=True)

# 创建所有定义的表
Base.metadata.create_all(engine)

# 创建会话
Session = sessionmaker(bind=engine)
session = Session()
1.2. 定义 Pydantic 模型
from pydantic import BaseModel, EmailStr


class UserCreate(BaseModel):
    name: str
    email: EmailStr

# python3.10
# class UserUpdate(BaseModel):
#     name: str | None = None
#     email: EmailStr | None = None

# python3.9以下
from typing import Union
from pydantic import BaseModel, EmailStr

class UserUpdate(BaseModel):
    name: Union[str, None] = None
    email: Union[EmailStr, None] = None
        
        
class UserOut(BaseModel):
    id: int
    name: str
    email: EmailStr

    class Config:
        from_attributes = True  # 设置 from_attributes 为 True
1.3. CRUD 操作
1.3.1. 增加数据
def create_user(user_data: UserCreate):
    user = User(**user_data.dict())
    session.add(user)
    session.commit()
    session.refresh(user)
    return user
1.3.2. 查询数据
def get_users():
    users = session.query(User).all()
    return [UserOut.from_orm(user) for user in users]

def get_user(user_id: int):
    user = session.query(User).get(user_id)
    if user:
        return UserOut.from_orm(user)
    return None
1.3.3. 更新数据
def update_user(user_id: int, user_data: UserUpdate):
    user = session.query(User).get(user_id)
    if user:
        for key, value in user_data.dict().items():
            if value is not None:
                setattr(user, key, value)
        session.commit()
        return UserOut.from_orm(user)
    return None
1.3.4. 删除数据
def delete_user(user_id: int):
    user = session.query(User).get(user_id)
    if user:
        session.delete(user)
        session.commit()
        return True
    return False
1.3.5. 示例用法
# 创建用户
new_user_data = UserCreate(name="John Doe", email="john@example.com")
created_user = create_user(new_user_data)
print(f"Created User: {created_user}")

# 获取所有用户
all_users = get_users()
print("All Users:")
for user in all_users:
    print(user)

# 获取单个用户
user = get_user(created_user.id)
print(f"User with ID {created_user.id}: {user}")

# 更新用户
updated_user_data = UserUpdate(name="Jane Doe", email="jane@example.com")
updated_user = update_user(created_user.id, updated_user_data)
print(f"Updated User: {updated_user}")

# 删除用户
deleted = delete_user(created_user.id)
print(f"User deleted: {deleted}")

# 关闭会话
session.close()

2. 一对多

2.1. 定义数据模型
from sqlalchemy import create_engine, Column, Integer, String, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship

Base = declarative_base()

class User(Base):
    __tablename__ = 'users'
    
    id = Column(Integer, primary_key=True)
    name = Column(String, nullable=False)
    email = Column(String, nullable=False)
    
    posts = relationship('Post', back_populates='user')

class Post(Base):
    __tablename__ = 'posts'
    
    id = Column(Integer, primary_key=True)
    title = Column(String, nullable=False)
    content = Column(String, nullable=False)
    user_id = Column(Integer, ForeignKey('users.id'))
    
    user = relationship('User', back_populates='posts')

# 创建数据库引擎
engine = create_engine('sqlite:///blog.db', echo=True)

# 创建所有定义的表
Base.metadata.create_all(engine)

# 创建会话
Session = sessionmaker(bind=engine)
session = Session()
2.2. 定义 pydantic 模型
from pydantic import BaseModel, EmailStr
from typing import List, Optional

class PostCreate(BaseModel):
    title: str
    content: str

class PostOut(BaseModel):
    id: int
    title: str
    content: str

    class Config:
        from_attributes = True

class UserCreate(BaseModel):
    name: str
    email: EmailStr
    posts: List[PostCreate] = []

class UserOut(BaseModel):
    id: int
    name: str
    email: EmailStr
    posts: List[PostOut] = []

    class Config:
        from_attributes = True
2.3. CRUD 操作
2.3.1. 增加用户和文章
def create_user(user_data: UserCreate):
    user = User(name=user_data.name, email=user_data.email)
    for post_data in user_data.posts:
        post = Post(title=post_data.title, content=post_data.content, user=user)
        user.posts.append(post)
    session.add(user)
    session.commit()
    session.refresh(user)
    return user
2.3.2. 查询用户及其文章
def get_users():
    users = session.query(User).all()
    return [UserOut.from_orm(user) for user in users]

def get_user(user_id: int):
    user = session.query(User).get(user_id)
    if user:
        return UserOut.from_orm(user)
    return None
2.3.3. 更新用户及其文章
def update_user(user_id: int, user_data: UserCreate):
    user = session.query(User).get(user_id)
    if user:
        user.name = user_data.name
        user.email = user_data.email
        # 清除现有文章并添加新的文章
        user.posts.clear()
        for post_data in user_data.posts:
            post = Post(title=post_data.title, content=post_data.content, user=user)
            user.posts.append(post)
        session.commit()
        return UserOut.from_orm(user)
    return None
2.3.4. 删除用户及其文章
def delete_user(user_id: int):
    user = session.query(User).get(user_id)
    if user:
        session.delete(user)
        session.commit()
        return True
    return False

2.3.5. 根据文章ID查找对应的用户
def get_user_by_post_id(post_id: int):
    post = session.query(Post).get(post_id)
    if post:
        user = post.user
        return UserOut.from_orm(user)
    return None
2.3.6. 示例用法
# 创建用户及其文章
new_user_data = UserCreate(
    name="John Doe",
    email="john@example.com",
  posts=[
        PostCreate(title="First Post", content="This is my first post."),
        PostCreate(title="Second Post", content="This is my second post.")
    ]
)
created_user = create_user(new_user_data)
print(f"Created User: {created_user}")

# 获取所有用户及其文章
all_users = get_users()
print("All Users:")
for user in all_users:
    print(user)

# 获取单个用户及其文章
user = get_user(created_user.id)
print(f"User with ID {created_user.id}: {user}")

# 根据文章内容模糊查找用户ID
search_content = "first"
user_ids = get_user_ids_by_post_content(search_content)
print(f"User IDs for posts containing '{search_content}': {user_ids}")

# 更新用户及其文章
updated_user_data = UserCreate(
    name="Jane Doe",
    email="jane@example.com",
    posts=[
        PostCreate(title="Updated First Post", content="This is my updated first post."),
        PostCreate(title="New Post", content="This is a new post.")
    ]
)
updated_user = update_user(created_user.id, updated_user_data)
print(f"Updated User: {updated_user}")

# 删除用户及其文章
deleted = delete_user(created_user.id)
print(f"User deleted: {deleted}")

# 关闭会话
session.close()

3. 多对对

3.1. 定义数据模型
from sqlalchemy import create_engine, Column, Integer, String, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship
from pydantic import BaseModel, EmailStr, Field, validator
from typing import List, Optional

# 创建数据库引擎
engine = create_engine('sqlite:///mydatabase.db', echo=True)

# 声明基类
Base = declarative_base()

# 创建会话
Session = sessionmaker(bind=engine)
session = Session()

# 定义用户(User)和粉丝(Follower)的ORM模型
class User(Base):
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True)
    username = Column(String)

    following = relationship("Follower", back_populates="follower", foreign_keys='Follower.follower_id', cascade="all, delete-orphan")
    followers = relationship("Follower", back_populates="followed", foreign_keys='Follower.followed_id', cascade="all, delete-orphan")

class Follower(Base):
    __tablename__ = 'followers'

    follower_id = Column(Integer, ForeignKey('users.id'), primary_key=True)
    followed_id = Column(Integer, ForeignKey('users.id'), primary_key=True)

    follower = relationship("User", foreign_keys=[follower_id], back_populates="following")
    followed = relationship("User", foreign_keys=[followed_id], back_populates="followers")

# 创建所有定义的表
Base.metadata.create_all(engine)
3.2. 定义 Pydantic 模型
# 定义 Pydantic 模型
class UserCreate(BaseModel):
    username: str
    followed_ids: List[int] = []

class UserOut(BaseModel):
    id: int
    username: str
    following: List['UserOut'] = []
    followers: List['UserOut'] = []

    @validator('following', 'followers', pre=True, always=True)
    def avoid_recursion(cls, v):
        if isinstance(v, list):
            return [UserOut(id=item.followed.id, username=item.followed.username) for item in v]
        return v

    class Config:
        from_attributes = True

# 使用 Pydantic 的递归引用
UserOut.update_forward_refs()
3.3. CRUE
3.3.1. 创建用户
def create_user(user_data: UserCreate):
    user = User(username=user_data.username)
    followed_users = session.query(User).filter(User.id.in_(user_data.followed_ids)).all()
    for followed_user in followed_users:
        session.add(Follower(follower=user, followed=followed_user))
    session.add(user)
    session.commit()
    session.refresh(user)
    return user
3.3.2. 查询用户
def get_users():
    users = session.query(User).all()
    return [UserOut.from_orm(user) for user in users]
3.3.3. 根据用户 ID 查询
def get_user(user_id: int):
    user = session.query(User).get(user_id)
    if user:
        return UserOut.from_orm(user)
    return None
3.3.4. 更新用户关注信息

def update_user(user_id: int, user_data: UserCreate):
    user = session.query(User).get(user_id)
    if user:
        user.username = user_data.username
        
        # 清除现有关注关系
        session.query(Follower).filter_by(follower_id=user.id).delete()
        
        # 添加新的关注关系
        followed_users = session.query(User).filter(User.id.in_(user_data.followed_ids)).all()
        for followed_user in followed_users:
            session.add(Follower(follower=user, followed=followed_user))
        
        session.commit()
        return UserOut.from_orm(user)
    return None
3.3.5. 删除用户后更新关系
def delete_user(user_id: int):
    user = session.query(User).get(user_id)
    if user:
        # 删除用户及其相关记录
        session.query(Follower).filter_by(follower_id=user.id).delete()
        session.query(Follower).filter_by(followed_id=user.id).delete()
        session.delete(user)
        session.commit()
        return True
    return False
3.3.6. 示例
# 创建用户
user1_data = UserCreate(username="Alice", followed_ids=[])
user2_data = UserCreate(username="Bob", followed_ids=[])
user1 = create_user(user1_data)
user2 = create_user(user2_data)

# 添加粉丝关系
update_user(user1.id, UserCreate(username="Alice", followed_ids=[user2.id]))

# 查询用户信息及其粉丝
user_info = get_user(user1.id)
print("用户 ID:", user_info.id, "用户名:", user_info.username)

print("用户 ID 1 关注的用户:")
for follow in user_info.following:
    print(follow.id, follow.username)

print("用户 ID 1 的粉丝:")
for follower in user_info.followers:
    print(follower.id, follower.username)

# 删除用户及其相关记录
delete_user(user2.id)

session.close()

4. ORM 权限

4.1. models
from sqlalchemy import create_engine, Column, Integer, String, ForeignKey, Table
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship, sessionmaker

Base = declarative_base()

# 多对多关联表
user_role = Table('user_role', Base.metadata,
                  Column('user_id', Integer, ForeignKey('users.id')),
                  Column('role_id', Integer, ForeignKey('roles.id'))
                 )

role_permission = Table('role_permission', Base.metadata,
                        Column('role_id', Integer, ForeignKey('roles.id')),
                        Column('permission_id', Integer, ForeignKey('permissions.id'))
                       )

class User(Base):
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True)
    username = Column(String(50), unique=True, nullable=False)
    roles = relationship("Role", secondary=user_role, back_populates="users")

class Role(Base):
    __tablename__ = 'roles'

    id = Column(Integer, primary_key=True)
    name = Column(String(50), unique=True, nullable=False)
    users = relationship("User", secondary=user_role, back_populates="roles")
    permissions = relationship("Permission", secondary=role_permission, back_populates="roles")

class Permission(Base):
    __tablename__ = 'permissions'

    id = Column(Integer, primary_key=True)
    name = Column(String(50), unique=True, nullable=False)
    roles = relationship("Role", secondary=role_permission, back_populates="permissions")

# 初始化数据库连接
engine = create_engine('sqlite:///auth.db', echo=True)  # 使用SQLite数据库
Base.metadata.create_all(engine)  # 创建所有表

# 创建会话
Session = sessionmaker(bind=engine)
session = Session()
4.2. main
from models import User,Permission,session,Role,user_role,role_permission
# 添加用户
user1 = User(username='admin')
user2 = User(username='user')

# 添加角色
role_admin = Role(name='admin')
role_user = Role(name='user')

# 添加权限
perm_read = Permission(name='read')
perm_write = Permission(name='write')

# 给角色分配权限
role_admin.permissions.append(perm_read)
role_admin.permissions.append(perm_write)
role_user.permissions.append(perm_read)

# 给用户分配角色
user1.roles.append(role_admin)
user2.roles.append(role_user)

# 提交到数据库
session.add_all([user1, user2, role_admin, role_user, perm_read, perm_write])
session.commit()

# 打印每个用户的所有权限
users = session.query(User).all()
for user in users:
    print(f"User: {user.username}")
    for role in user.roles:
        print(f"  Role: {role.name}")
        for permission in role.permissions:
            print(f"    Permission: {permission.name}")

# 查询
print("Users with admin role:")
for user in session.query(User).join(user_role).join(Role).filter(Role.name == 'admin'):
    print(user.username)

print("Permissions of user 'admin':")
for permission in session.query(Permission).join(role_permission).join(Role).join(user_role).join(User).filter(User.username == 'admin'):
    print(permission.name)

5. ORM

from sqlalchemy import create_engine, Column, Integer, String, ForeignKey, Table
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship, sessionmaker
from pydantic import BaseModel, ValidationError, Field
import hashlib
import datetime
import jwt
from functools import wraps

Base = declarative_base()

# 多对多关联表
user_role = Table('user_role', Base.metadata,
    Column('user_id', Integer, ForeignKey('users.id')),
    Column('role_id', Integer, ForeignKey('roles.id'))
)

role_permission = Table('role_permission', Base.metadata,
    Column('role_id', Integer, ForeignKey('roles.id')),
    Column('permission_id', Integer, ForeignKey('permissions.id'))
)

class User(Base):
    __tablename__ = 'users'
    
    id = Column(Integer, primary_key=True)
    username = Column(String(50), unique=True, nullable=False)
    password_hash = Column(String(128), nullable=False)
    roles = relationship("Role", secondary=user_role, back_populates="users")

    def set_password(self, password):
        self.password_hash = hashlib.sha256(password.encode()).hexdigest()

    def check_password(self, password):
        return self.password_hash == hashlib.sha256(password.encode()).hexdigest()

    def generate_auth_token(self, secret_key, expiration=3600):
        payload = {
            'id': self.id,
            'exp': datetime.datetime.utcnow() + datetime.timedelta(seconds=expiration)
        }
        return jwt.encode(payload, secret_key, algorithm='HS256')

class Role(Base):
    __tablename__ = 'roles'
    
    id = Column(Integer, primary_key=True)
    name = Column(String(50), unique=True, nullable=False)
    users = relationship("User", secondary=user_role, back_populates="roles")
    permissions = relationship("Permission", secondary=role_permission, back_populates="roles")

class Permission(Base):
    __tablename__ = 'permissions'
    
    id = Column(Integer, primary_key=True)
    name = Column(String(50), unique=True, nullable=False)
    roles = relationship("Role", secondary=role_permission, back_populates="permissions")

# 初始化数据库连接
engine = create_engine('sqlite:///auth.db', echo=True)  # 使用SQLite数据库
Base.metadata.create_all(engine)  # 创建所有表

# 创建会话
Session = sessionmaker(bind=engine)
session = Session()

# 配置
SECRET_KEY = 'your_secret_key'

# Pydantic模型
class TokenPayload(BaseModel):
    id: int
    exp: datetime.datetime

# 装饰器:验证Token
def token_required(f):
    @wraps(f)
    def decorated(*args, **kwargs):
        token = kwargs.pop('token', None)
        if not token:
            return {"message": "Token is missing"}, 401
        
        try:
            payload = jwt.decode(token, SECRET_KEY, algorithms=['HS256'])
            token_payload = TokenPayload(**payload)
            user = session.query(User).get(token_payload.id)
            if not user:
                return {"message": "User not found"}, 401
        except (jwt.ExpiredSignatureError, jwt.InvalidTokenError, ValidationError):
            return {"message": "Token is invalid or expired"}, 401
        
        return f(user, *args, **kwargs)
    
    return decorated

# 注册用户
def register_user(username, password):
    if session.query(User).filter_by(username=username).first():
        return {"message": "User already exists"}, 400
    
    new_user = User(username=username)
    new_user.set_password(password)
    session.add(new_user)
    session.commit()
    return {"message": "User registered successfully"}, 201

# 登录用户
def login_user(username, password):
    user = session.query(User).filter_by(username=username).first()
    if user and user.check_password(password):
        token = user.generate_auth_token(SECRET_KEY)
        return {"message": "Login successful", "token": token}, 200
    else:
        return {"message": "Invalid credentials"}, 401

# 示例受保护的函数
@token_required
def protected_function(user):
    return {"message": f"Hello, {user.username}! This is a protected function."}, 200  # 返回一个包含response和status_code的元组

# 示例:注册用户
response, status_code = register_user('admin', 'password123')
print(response, status_code)

# 示例:登录用户
response, status_code = login_user('admin', 'password123')
print(response, status_code)

# 示例:调用受保护的函数
if response.get('token'):
    response, status_code = protected_function(token=response['token'])
    print(response, status_code)
else:
    print("Failed to get token")

运行结果如下

CREATE TABLE users (
        id INTEGER NOT NULL,
        username VARCHAR(50) NOT NULL,
        password_hash VARCHAR(128) NOT NULL,
        PRIMARY KEY (id),
        UNIQUE (username)
)



CREATE TABLE roles (
        id INTEGER NOT NULL,
        name VARCHAR(50) NOT NULL,
        PRIMARY KEY (id),
        UNIQUE (name)
)


 
CREATE TABLE permissions (
        id INTEGER NOT NULL,
        name VARCHAR(50) NOT NULL,
        PRIMARY KEY (id),
        UNIQUE (name)
)



CREATE TABLE user_role (
        user_id INTEGER,
        role_id INTEGER,
        FOREIGN KEY(user_id) REFERENCES users (id),
        FOREIGN KEY(role_id) REFERENCES roles (id)
)



CREATE TABLE role_permission (
        role_id INTEGER,
        permission_id INTEGER,
        FOREIGN KEY(role_id) REFERENCES roles (id),
        FOREIGN KEY(permission_id) REFERENCES permissions (id)
)


FROM users
WHERE users.username = ?
 LIMIT ? OFFSET ?

{'message': 'User registered successfully'} 201

FROM users
WHERE users.username = ?
 LIMIT ? OFFSET ?

{'message': 'Login successful', 'token': 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpZCI6MSwiZXhwIjoxNzI4ODIwMjU5fQ.cajZ-sYxjmzOGyXw6UrlkTjcQYDNTkCWgqUB3gyluI0'} 200
C:\Users\simon\Desktop\webui\app.py:86: LegacyAPIWarning: The Query.get() method is considered legacy as of the 1.x series of SQLAlchemy and becomes a legacy construct in 2.0. The method is now available as Session.get() (deprecated since: 2.0) (Background on SQLAlchemy 2.0 at: https://sqlalche.me/e/b8d9)
  user = session.query(User).get(token_payload.id)

FROM users
WHERE users.id = ?
       
{'message': 'Hello, admin! This is a protected function.'} 200

6. 根据 表模型创建数据库

from sqlalchemy import Column, Integer, String, ForeignKey, Table, create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship, sessionmaker
from sqlalchemy.inspection import inspect

Base = declarative_base()

def create_models(table1_name, table2_name, relation_type):
    # 动态创建模型类
    class Table1(Base):
        __tablename__ = table1_name
        id = Column(Integer, primary_key=True)
        name = Column(String)

    class Table2(Base):
        __tablename__ = table2_name
        id = Column(Integer, primary_key=True)
        name = Column(String)

    if relation_type == 'one-to-many':
        # 定义一对多关系
        Table1.items = relationship(
            lambda: Table2,
            back_populates="owner",
            foreign_keys=lambda: Table2.owner_id
        )
        Table2.owner_id = Column(Integer, ForeignKey(f'{table1_name}.id'))
        Table2.owner = relationship(
            lambda: Table1,
            back_populates="items"
        )
    elif relation_type == 'many-to-many':
        # 定义多对多关系
        association_table = Table(
            f'{table1_name}_{table2_name}_association',
            Base.metadata,
            Column('left_id', Integer, ForeignKey(f'{table1_name}.id')),
            Column('right_id', Integer, ForeignKey(f'{table2_name}.id'))
        )
        Table1.items = relationship(
            lambda: Table2,
            secondary=association_table,
            back_populates="owners"
        )
        Table2.owners = relationship(
            lambda: Table1,
            secondary=association_table,
            back_populates="items"
        )

    return Table1, Table2

def print_table_structure(model):
    mapper = inspect(model)
    print(f"Table: {model.__tablename__}")
    print("Columns:")
    for column in mapper.columns:
        print(f"  - {column.key}: {column.type}")
    print("Relationships:")
    for rel in mapper.relationships:
        print(f"  - {rel.key} -> {rel.mapper.class_.__name__}")

# 示例用法
if __name__ == "__main__":
    engine = create_engine('sqlite:///database.db', echo=True)

    # 创建表模型
    Person, Pet = create_models('person', 'pet', 'one-to-many')
    Student, Course = create_models('student', 'course', 'many-to-many')

    # 确保表已经创建
    Base.metadata.create_all(engine)

    # 打印表结构
    print_table_structure(Person)
    print_table_structure(Pet)
    print_table_structure(Student)
    print_table_structure(Course)

    # 使用Session进行数据库操作
    Session = sessionmaker(bind=engine)
    session = Session()

    # 示例添加数据
    person = Person(name='John Doe')
    pet = Pet(name='Buddy', owner=person)
    session.add(person)
    session.add(pet)

    # 添加多对多关系的数据
    student1 = Student(name='Alice')
    student2 = Student(name='Bob')
    course1 = Course(name='Math')
    course2 = Course(name='Science')

    # 建立多对多关系
    student1.items.append(course1)
    student1.items.append(course2)
    student2.items.append(course1)

    session.add(student1)
    session.add(student2)
    session.add(course1)
    session.add(course2)

    session.commit()

    # 查询示例
    print("Persons and their pets:")
    for person in session.query(Person).all():
        print(person.name, [pet.name for pet in person.items])

    print("Students and their courses:")
    for student in session.query(Student).all():
        print(student.name, [course.name for course in student.items])

    print("Courses and their students:")
    for course in session.query(Course).all():
        print(course.name, [student.name for student in course.owners])

7. 根据字典存取

import sqlite3
import json

# 默认字符串长度
DEFAULT_STRING_LENGTH = 255


def create_table(cursor, table_name, columns, string_lengths=None):
    column_definitions = []
    for key, value in columns.items():
        if isinstance(value, int):
            column_type = 'INTEGER'
        elif isinstance(value, float):
            column_type = 'REAL'
        else:
            column_type = f'TEXT({string_lengths.get(key, DEFAULT_STRING_LENGTH)})' if string_lengths else f'TEXT({DEFAULT_STRING_LENGTH})'
        column_definitions.append(f"{key} {column_type}")

    columns_str = ', '.join(column_definitions)
    create_table_sql = f"CREATE TABLE IF NOT EXISTS {table_name} (id INTEGER PRIMARY KEY AUTOINCREMENT, {columns_str})"
    cursor.execute(create_table_sql)


def insert_data(cursor, table_name, data):
    keys = data[0].keys()
    placeholders = ', '.join(['?'] * len(keys))
    columns = ', '.join(keys)
    insert_sql = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})"
    cursor.executemany(insert_sql, [tuple(item.values()) for item in data])


def process_json_file(data, table_name, string_lengths=None):
    if isinstance(data, dict):
        data = [data]
    elif isinstance(data, list):
        pass

    # 获取第一个数据项的键值对类型
    columns = {key: type(value) for key, value in data[0].items()}

    # 创建表
    create_table(cursor, table_name, columns, string_lengths)

    # 插入数据
    insert_data(cursor, table_name, data)


# 连接到SQLite数据库
conn = sqlite3.connect('test.db')
cursor = conn.cursor()

# 字段长度配置
string_lengths = {
    'name': 100  # 例如,设置name字段的最大长度为100
}

data = {"name": "Alice", "age": 30}
data = [{
    "name": "Alice",
    "age": 30
}, {
    "name": "Bob",
    "age": 28
}, {
    "name": "Charlie",
    "age": 22
}]
# 处理JSON文件
process_json_file(data, 'user_json', string_lengths)

# 提交事务
conn.commit()

# 查询所有数据以验证插入是否成功
cursor.execute('SELECT * FROM user_json')
print("Data from user_json:")
print(cursor.fetchall())

# 关闭Cursor和Connection
cursor.close()
conn.close()

8. json 格式处理

8.1. JSON 原声 sql
CREATE TABLE my_table (
    id INTEGER PRIMARY KEY,
    json_data TEXT -- 这里可以存入JSON格式的字符串
);

-- 插入JSON数据
INSERT INTO my_table (json_data) VALUES ('{"key": "value", "number": 42}');

但是,为了更方便地处理 JSON 数据,你可能想要确保启用了 JSON1 模块,这允许你执行诸如提取 JSON 值、检查键的存在性等操作。例如:

-- 提取JSON中的值
SELECT json_extract(json_data, '$.key') FROM my_table;

-- 更新JSON中的值
UPDATE my_table SET json_data = json_set(json_data, '$.key', 'new_value');

如果你正在使用 Python 的 sqlite3 库,它也提供了对 JSON 的支持。从 Python 3.9 开始,sqlite3 库默认启用 JSON1 扩展。对于较早版本的 Python,你可能需要手动加载这个扩展:

import sqlite3

# 如果你需要手动加载JSON1扩展(Python 3.8及以下)
conn = sqlite3.connect('example.db')
conn.execute('SELECT load_extension("json1")')  # 注意:这行代码可能在某些环境中不被允许

# 插入JSON数据
cur = conn.cursor()
cur.execute("INSERT INTO my_table (json_data) VALUES (?)", ('{"key": "value"}',))
conn.commit()

# 查询JSON数据
cur.execute("SELECT json_extract(json_data, '$.key') FROM my_table")
print(cur.fetchone())
8.2. 使用 JSON 字段
from sqlalchemy import create_engine, Column, Integer, String, JSON, func, cast
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

Base = declarative_base()

class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    name = Column(String)
    data = Column(JSON)  # 使用JSON字段类型

# 创建SQLite数据库引擎
engine = create_engine('sqlite:///example.db')
# 创建所有表
Base.metadata.create_all(engine)

# 创建会话
Session = sessionmaker(bind=engine)
session = Session()

# 插入测试数据
new_user = User(name='John Doe', data={'age': 30, 'city': 'New York'})
session.add(new_user)
new_user = User(name='Jane Smith', data={'age': 25, 'city': 'Boston'})
session.add(new_user)
session.commit()

# 查询 age 为 30 的用户,并进行类型转换
query = session.query(User).filter(cast(func.json_extract(User.data, '$.age'), Integer) == 30)

for user in query.all():
    print(f"User: {user.name}, Age: {user.data['age']}, City: {user.data['city']}")

注意,由于 SQLite 的 json_extract 函数返回的是字符串,因此在比较时需要将 Python 中的数值转换为字符串。如果需要进行数值比较,可以使用 CAST 来转换类型:

{
    "name": "John Doe",
    "address": {
        "city": "New York",
        "zipcode": "10001"
    }
}

$.address.city 将返回 "New York"

{
    "name": "John Doe",
    "hobbies": ["reading", "swimming"]
}

$.hobbies[0] 将返回 "reading"

9. 过滤查询

from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

# 定义基类
Base = declarative_base()

# 定义映射类
class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    name = Column(String)
    age = Column(Integer)

# 配置数据库连接
engine = create_engine('sqlite:///example.db')  # 示例使用 SQLite

# 创建会话
Session = sessionmaker(bind=engine)
session = Session()

# 定义查询条件变量
name_condition = 'Alice'
age_condition = 30

# 示例 1: 根据名字过滤
users_with_name = session.query(User).filter(User.name == name_condition).all()
for user in users_with_name:
    print(f"ID: {user.id}, Name: {user.name}, Age: {user.age}")

# 示例 2: 根据年龄过滤
users_over_age = session.query(User).filter(User.age > age_condition).all()
for user in users_over_age:
    print(f"ID: {user.id}, Name: {user.name}, Age: {user.age}")

# 示例 3: 复合条件过滤
# 查询名字为 name_condition 且年龄大于 age_condition 的用户
users_with_name_and_age = session.query(User).filter(User.name == name_condition, User.age > age_condition).all()
for user in users_with_name_and_age:
    print(f"ID: {user.id}, Name: {user.name}, Age: {user.age}")

# 关闭会话
session.close()

10. 复合查询

from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

# 定义基类
Base = declarative_base()

# 定义映射类
class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    name = Column(String)
    age = Column(Integer)
    city = Column(String)

# 配置数据库连接
engine = create_engine('sqlite:///example.db')  # 示例使用 SQLite

# 创建会话
Session = sessionmaker(bind=engine)
session = Session()

# 定义查询条件变量
name_condition = 'Alice'
age_condition = 30
city_keyword = 'New'

# 示例:复合查询
# 查询名字为 name_condition,年龄大于 age_condition,且城市名称包含 city_keyword 的用户
users = session.query(User).filter(
    User.name == name_condition,
    User.age > age_condition,
    User.city.like(f'%{city_keyword}%')
).all()

for user in users:
    print(f"ID: {user.id}, Name: {user.name}, Age: {user.age}, City: {user.city}")

# 关闭会话
session.close()

动物装饰