Mysql常用操作

simon
0
2025-10-15

1. 安装

1.1. 安装扩展包-Python
pip install sqlalchemy pymysql mysql-connector-python

2. 安装-Docker

docker run --name simon-mysql -itd --restart=always \
    -p 53306:3306 \
    -v $PWD/mysql/data:/var/lib/mysql \
    -v $PWD/mysql/conf:/etc/mysql/conf.d \
    -e MYSQL_ROOT_PASSWORD="BWOYO0R7oDt40y0shH6t" \
    -e TZ=Asia/Shanghai \
    -d mysql:5.7


docker run --name simon-mysql910 -itd --restart=always \
    -p 53306:3306 \
    -v $PWD/mysql/data:/var/lib/mysql \
    -v $PWD/mysql/conf:/etc/mysql/conf.d \
    -e MYSQL_ROOT_PASSWORD="BWOYO0R7oDt40y0shH6t" \
    -e TZ=Asia/Shanghai \
    -d mysql:9.1.0

3. 数据库

3.1. 创建数据库
from sqlalchemy import create_engine, text


def create_database(
    database_name,
    user="root",
    password="",
    host="127.0.0.1",
    port=3306,
    charset="utf8mb4",
):
    # 创建一个引擎,这里使用的是 pymysql 作为驱动
    engine = create_engine(f"mysql+pymysql://{user}:{password}@{host}:{port}/")

    # 使用引擎连接到 MySQL 服务器
    with engine.connect() as connection:
        # 构建创建数据库的 SQL 语句,并使用 text 函数包装
        query = text(
            f"CREATE DATABASE IF NOT EXISTS {database_name} CHARACTER SET {charset} COLLATE {charset}_unicode_ci;"
        )

        # 执行 SQL 语句
        connection.execute(query)

# 指定要创建的数据库名
db_name = "testdb"

# 调用函数创建数据库
create_database(
    database_name=db_name,
    user="root",
    password="BWOYO0R7oDt40y0shH6t",
    host="192.168.0.91",
    port=10087,
)

print(f"Database '{db_name}' created or already exists.")
3.2. 查询所有数据库
from sqlalchemy import create_engine, text


def list_databases(user="root", password="", host="127.0.0.1", port=3306):
    # 创建一个引擎,这里使用的是 pymysql 作为驱动
    engine = create_engine(f"mysql+pymysql://{user}:{password}@{host}:{port}/")

    # 使用引擎连接到 MySQL 服务器
    with engine.connect() as connection:
        # 构建查询数据库的 SQL 语句,并使用 text 函数包装
        query = text("SHOW DATABASES;")

        # 执行 SQL 语句
        result = connection.execute(query)

        # 获取所有数据库名称
        databases = [row[0] for row in result]

        return databases


if __name__ == "__main__":
    # 调用函数查询所有数据库
    databases = list_databases(
        user="root",
        password="BWOYO0R7oDt40y0shH6t",
        host="192.168.0.91",
        port=10087,
    )

    # 打印所有数据库名称
    print("Databases:")
    for db in databases:
        print(db)
3.3. 清空指定数据库
from sqlalchemy import create_engine, text


def clear_database(
    database_name, user="root", password="", host="127.0.0.1", port=3306
):
    # 创建一个引擎,这里使用的是 pymysql 作为驱动
    engine = create_engine(
        f"mysql+pymysql://{user}:{password}@{host}:{port}/{database_name}"
    )

    # 使用引擎连接到指定的数据库
    with engine.connect() as connection:
        # 获取数据库中的所有表名
        query = text("SHOW TABLES;")
        result = connection.execute(query)
        tables = [row[0] for row in result]

        # 逐个删除表
        for table in tables:
            drop_table_query = text(f"DROP TABLE IF EXISTS {table};")
            connection.execute(drop_table_query)


if __name__ == "__main__":
    # 指定要清空的数据库名
    db_name = "testdb"

    # 调用函数清空数据库
    clear_database(
        database_name=db_name,
        user="root",
        password="BWOYO0R7oDt40y0shH6t",
        host="192.168.0.91",
        port=10087,
    )

    print(f"Database '{db_name}' has been cleared.")
3.4. 删除指定数据库
from sqlalchemy import create_engine, text


def delete_database(
    database_name, user="root", password="", host="127.0.0.1", port=3306
):
    # 创建一个引擎,这里使用的是 pymysql 作为驱动
    engine = create_engine(f"mysql+pymysql://{user}:{password}@{host}:{port}/")

    # 使用引擎连接到 MySQL 服务器
    with engine.connect() as connection:
        # 构建删除数据库的 SQL 语句,并使用 text 函数包装
        query = text(f"DROP DATABASE IF EXISTS {database_name};")

        # 执行 SQL 语句
        connection.execute(query)


if __name__ == "__main__":
    # 指定要删除的数据库名
    db_name = "testdb"

    # 调用函数删除数据库
    delete_database(
        database_name=db_name,
        user="root",
        password="BWOYO0R7oDt40y0shH6t",
        host="192.168.0.91",
        port=10087,
    )

    print(f"Database '{db_name}' has been deleted.")
3.5. 查询特定数据库有哪些表
from sqlalchemy import create_engine, text


def list_tables_in_database(
    database_name, user="root", password="", host="127.0.0.1", port=3306
):
    # 创建一个引擎,这里使用的是 pymysql 作为驱动
    engine = create_engine(
        f"mysql+pymysql://{user}:{password}@{host}:{port}/{database_name}"
    )

    # 使用引擎连接到指定的数据库
    with engine.connect() as connection:
        # 构建查询表的 SQL 语句,并使用 text 函数包装
        query = text("SHOW TABLES;")

        # 执行 SQL 语句
        result = connection.execute(query)

        # 获取所有表名
        tables = [row[0] for row in result]

        return tables


if __name__ == "__main__":
    # 指定要查询的数据库名
    db_name = "bdchat"

    # 调用函数查询所有表
    tables = list_tables_in_database(
        database_name=db_name,
        user="root",
        password="BWOYO0R7oDt40y0shH6t",
        host="192.168.0.91",
        port=10087,
    )

    # 打印所有表名
    print("Tables in database '{}':".format(db_name))
    for table in tables:
        print(table)

4. 表

4.1. 创建 三张 表

Users 表

  • id (主键,自增)

  • username (用户名)

  • email (电子邮件)

Posts 表

  • id (主键,自增)

  • title (标题)

  • content (内容)

  • user_id (外键,关联 Users 表)

Comments 表

  • id (主键,自增)

  • content (评论内容)

  • post_id (外键,关联 Posts 表)

  • user_id (外键,关联 Users 表)

from sqlalchemy import create_engine, Column, Integer, String, Text, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship
import random
import string

database_name="bdchat",
user="root",
password="BWOYO0R7oDt40y0shH6t",
host="192.168.0.91",
port=10087

# 创建一个引擎,这里使用的是 pymysql 作为驱动
engine = create_engine('mysql+pymysql://root:BWOYO0R7oDt40y0shH6t@192.168.0.91:10087/bdchat', echo=True)


# 创建基类
Base = declarative_base()

# 定义 Users 表
class User(Base):
    __tablename__ = 'users'
    
    id = Column(Integer, primary_key=True, autoincrement=True)
    username = Column(String(50), nullable=False)
    email = Column(String(100), nullable=False)

    posts = relationship('Post', back_populates='user')
    comments = relationship('Comment', back_populates='user')

# 定义 Posts 表
class Post(Base):
    __tablename__ = 'posts'
    
    id = Column(Integer, primary_key=True, autoincrement=True)
    title = Column(String(100), nullable=False)
    content = Column(Text, nullable=False)
    user_id = Column(Integer, ForeignKey('users.id'), nullable=False)

    user = relationship('User', back_populates='posts')
    comments = relationship('Comment', back_populates='post')

# 定义 Comments 表
class Comment(Base):
    __tablename__ = 'comments'
    
    id = Column(Integer, primary_key=True, autoincrement=True)
    content = Column(Text, nullable=False)
    post_id = Column(Integer, ForeignKey('posts.id'), nullable=False)
    user_id = Column(Integer, ForeignKey('users.id'), nullable=False)

    post = relationship('Post', back_populates='comments')
    user = relationship('User', back_populates='comments')

# 创建表
Base.metadata.create_all(engine)

# 创建会话
Session = sessionmaker(bind=engine)
session = Session()

# 生成随机字符串
def generate_random_string(length=10):
    letters = string.ascii_lowercase
    return ''.join(random.choice(letters) for _ in range(length))

# 插入 100 组模拟数据
def insert_data():
    # 插入 100 个用户
    users = [User(username=generate_random_string(), email=f"{generate_random_string()}@example.com") for _ in range(100)]
    session.add_all(users)
    session.commit()

    # 插入 100 个帖子
    posts = [Post(title=generate_random_string(20), content=generate_random_string(100), user_id=random.randint(1, 100)) for _ in range(100)]
    session.add_all(posts)
    session.commit()

    # 插入 100 个评论
    comments = [Comment(content=generate_random_string(50), post_id=random.randint(1, 100), user_id=random.randint(1, 100)) for _ in range(100)]
    session.add_all(comments)
    session.commit()

if __name__ == "__main__":
    # 插入数据
    insert_data()
    print("Data insertion completed.")

5. 事物回滚

from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine

# 创建引擎实例
engine = create_engine('sqlite:///example.db')

# 创建Session类
Session = sessionmaker(bind=engine)

# 开始会话
session = Session()

try:
    # 手动开始事务
    session.begin()
    
    # 在这里执行你的数据库操作
    user = User(name='John Doe', age=30)
    session.add(user)
    # 更多操作...

    # 提交事务
    session.commit()
except:
    # 发生异常时回滚事务
    session.rollback()
    raise
finally:
    # 确保会话关闭
    session.close()

6. 命令行操作

6.1. 依据 json 自动创建

pip install mysql-connector-python

data = {
    "name": "张三",
    "age": 30,
    "is_student": False,
    "courses": "['数学', '物理']",
    "address": "{'street': '和平路1号', 'city': '北京'}",
    "data": "{'name': '张三', 'age': 30, 'is_student': false, 'courses': ['数学', '物理'], 'address': {'street': '和平路1号', 'city': '北京'}}"
}

import json
from mysql import connector
from mysql.connector import Error


def connect_to_database(host, user, password, database, port):
    try:
        connection = connector.connect(host=host,
                                       user=user,
                                       password=password,
                                       database=database,
                                       port=port)
        if connection.is_connected():
            print('Connected to MySQL database')
            return connection
    except Error as e:
        print(f"Error: {e}")
        return None


def create_table(connection, table_name, columns):
    cursor = connection.cursor()
    columns_def = ", ".join([f"{col} VARCHAR(255)" for col in columns])
    create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {table_name} (
        id INT AUTO_INCREMENT PRIMARY KEY,
        {columns_def}
    );
    """
    cursor.execute(create_table_query)
    connection.commit()


def truncate_table(connection, table_name):
    cursor = connection.cursor()
    truncate_query = f"TRUNCATE TABLE {table_name};"
    cursor.execute(truncate_query)
    connection.commit()


def insert_data(connection, table_name, data):
    cursor = connection.cursor()
    placeholders = ', '.join(['%s'] * len(data))
    columns = ', '.join(data.keys())
    sql = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})"
    cursor.execute(sql, tuple(data.values()))
    connection.commit()


def process_json(json_data, table_name, connection, truncate=False):
    # 如果是单个JSON对象
    if isinstance(json_data, dict):
        if truncate:
            truncate_table(connection, table_name)
        else:
            create_table(connection, table_name, json_data.keys())
        insert_data(connection, table_name, json_data)
    # 如果是JSON Lines
    elif isinstance(json_data, list):
        if len(json_data) > 0:
            if truncate:
                truncate_table(connection, table_name)
            else:
                create_table(connection, table_name, json_data[0].keys())
            for item in json_data:
                insert_data(connection, table_name, item)


def search_data(connection, table_name, column_name=None, search_string=None):
    cursor = connection.cursor()
    if not column_name or column_name == '':
        # 对所有字段进行模糊搜索
        columns = get_columns(connection, table_name)
        conditions = " OR ".join([f"{col} LIKE %s" for col in columns])
        query = f"SELECT * FROM {table_name} WHERE {conditions}"
        params = [f"%{search_string}%" for _ in columns]
    else:
        # 对指定字段进行模糊搜索
        query = f"SELECT * FROM {table_name} WHERE {column_name} LIKE %s"
        params = [f"%{search_string}%"]

    cursor.execute(query, params)
    results = cursor.fetchall()
    return results


def get_columns(connection, table_name):
    cursor = connection.cursor()
    cursor.execute(f"SHOW COLUMNS FROM {table_name}")
    columns = [column[0] for column in cursor.fetchall()]
    return columns


def search_by_id(connection, table_name, id_value):
    cursor = connection.cursor()
    query = f"SELECT * FROM {table_name} WHERE id = %s"
    cursor.execute(query, (id_value, ))
    result = cursor.fetchone()
    if result:
        return dict(zip([col[0] for col in cursor.description], result))
    else:
        return None


# 假设这是从文件或API获取的JSON数据
json_data = [data]

# 数据库连接信息
db_config = {
    'host': '192.168.63.128',
    'user': 'root',
    'password': 'BWOYO0R7oDt40y0shH6t',
    'database': 'play',
    'port': 3306
}

# 连接到数据库
connection = connect_to_database(**db_config)

table_name = "demo"
# 处理JSON数据,可以选择是否清空表
process_json(json_data, 'demo', connection, truncate=False)

# 搜索数据
search_results = search_data(connection,
                             table_name,
                             column_name='',
                             search_string='数学')
for result in search_results:
    print(result)

# 根据ID搜索数据
id_search_result = search_by_id(connection, table_name, 1)
if id_search_result:
    print(json.dumps(id_search_result, indent=4, ensure_ascii=False))
else:
    print("No result found for the given ID.")

# 关闭数据库连接
connection.close()
6.2. 依据 json 创建含有修改时间和添加时间的

存在时间在 json 序列化的问题

data = {
    "name":
    "张三",
    "age":
    30,
    "is_student":
    False,
    "courses":
    "['数学', '物理']",
    "address":
    "{'street': '和平路1号', 'city': '北京'}",
    "data":
    "{'name': '张三', 'age': 30, 'is_student': false, 'courses': ['数学', '物理'], 'address': {'street': '和平路1号', 'city': '北京'}}"
}

import json
from mysql import connector
from mysql.connector import Error
from datetime import datetime


def connect_to_database(host, user, password, database, port):
    try:
        connection = connector.connect(host=host,
                                       user=user,
                                       password=password,
                                       database=database,
                                       port=port)
        if connection.is_connected():
            print('Connected to MySQL database')
            return connection
    except Error as e:
        print(f"Error: {e}")
        return None


def create_table(connection, table_name, columns):
    cursor = connection.cursor()
    columns_def = ", ".join([f"{col} VARCHAR(255)" for col in columns])
    create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {table_name} (
        id INT AUTO_INCREMENT PRIMARY KEY,
        {columns_def},
    """
    create_table_query += "created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,"
    create_table_query += "updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"
    create_table_query += ") CHARACTER SET = utf8 COLLATE = utf8_general_ci;"
    cursor.execute(create_table_query)
    connection.commit()


def truncate_table(connection, table_name):
    cursor = connection.cursor()
    truncate_query = f"TRUNCATE TABLE {table_name};"
    cursor.execute(truncate_query)
    connection.commit()


def insert_data(connection, table_name, data):
    cursor = connection.cursor()
    placeholders = ', '.join(['%s'] * len(data))
    columns = ', '.join(data.keys())
    sql = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})"
    cursor.execute(sql, tuple(data.values()))
    connection.commit()


def process_json(json_data, table_name, connection, truncate=False):
    # 如果是单个JSON对象
    if isinstance(json_data, dict):
        if truncate:
            truncate_table(connection, table_name)
        else:
            create_table(connection, table_name, json_data.keys())
        insert_data(connection, table_name, json_data)
    # 如果是JSON Lines
    elif isinstance(json_data, list):
        if len(json_data) > 0:
            if truncate:
                truncate_table(connection, table_name)
            else:
                create_table(connection, table_name, json_data[0].keys())
            for item in json_data:
                insert_data(connection, table_name, item)


def search_data(connection, table_name, column_name=None, search_string=None):
    cursor = connection.cursor()
    if not column_name or column_name == '':
        # 对所有字段进行模糊搜索
        columns = get_columns(connection, table_name)
        conditions = " OR ".join([f"{col} LIKE %s" for col in columns])
        query = f"SELECT * FROM {table_name} WHERE {conditions}"
        params = [f"%{search_string}%" for _ in columns]
    else:
        # 对指定字段进行模糊搜索
        query = f"SELECT * FROM {table_name} WHERE {column_name} LIKE %s"
        params = [f"%{search_string}%"]

    cursor.execute(query, params)
    results = cursor.fetchall()
    return results


def get_columns(connection, table_name):
    cursor = connection.cursor()
    cursor.execute(f"SHOW COLUMNS FROM {table_name}")
    columns = [column[0] for column in cursor.fetchall()]
    return columns


def search_by_id(connection, table_name, id_value):
    cursor = connection.cursor()
    query = f"SELECT * FROM {table_name} WHERE id = %s"
    cursor.execute(query, (id_value, ))
    result = cursor.fetchone()
    if result:
        result_dict = dict(zip([col[0] for col in cursor.description], result))
        return {
            k:
            v.strftime('%Y-%m-%d %H:%M:%S') if isinstance(v, datetime) else v
            for k, v in result_dict.items()
        }

    else:
        return None


# 假设这是从文件或API获取的JSON数据
json_data = [data]

# 数据库连接信息
db_config = {
    'host': '192.168.0.91',
    'user': 'root',
    'password': 'BWOYO0R7oDt40y0shH6t',
    'database': 'BuckyDrop',
    'port': 13306
}

# 连接到数据库
connection = connect_to_database(**db_config)

table_name = "demo"
# 处理JSON数据,可以选择是否清空表
# process_json(json_data, 'demo', connection, truncate=False)

# 搜索数据
search_results = search_data(connection,
                             table_name,
                             column_name='',
                             search_string='数学')
for result in search_results:
    print(result)

# 根据ID搜索数据
id_search_result = search_by_id(connection, table_name, 1)
print(id_search_result)
if id_search_result:
    print(json.dumps(id_search_result, indent=4, ensure_ascii=False))
else:
    print("No result found for the given ID.")

# 关闭数据库连接
connection.close()

7. 事物回滚

from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine

# 创建引擎实例
engine = create_engine('sqlite:///example.db')

# 创建Session类
Session = sessionmaker(bind=engine)

# 开始会话
session = Session()

try:
    # 手动开始事务
    session.begin()
    
    # 在这里执行你的数据库操作
    user = User(name='John Doe', age=30)
    session.add(user)
    # 更多操作...

    # 提交事务
    session.commit()
except:
    # 发生异常时回滚事务
    session.rollback()
    raise
finally:
    # 确保会话关闭
    session.close()

8. Mysql 常用 sql 语句

8.1. 原生语句
-- 创建示例表
CREATE TABLE employees (
    id INT PRIMARY KEY,
    name VARCHAR(100),
    age INT,
    address VARCHAR(255),
    salary DECIMAL(10, 2)
);

-- 插入数据
INSERT INTO employees (id, name, age, address, salary)
VALUES (1, '张三', 28, '北京', 6000);

-- 查询数据
SELECT * FROM employees;

-- 更新数据
UPDATE employees
SET salary = 7000
WHERE id = 1;

-- 再次查询数据以查看更新结果
SELECT * FROM employees;

-- 删除数据
DELETE FROM employees WHERE id = 1;

-- 最后查询数据以确认删除结果
SELECT * FROM employees;
8.2. python 语句
import mysql.connector
from mysql.connector import Error

def create_connection(host_name, user_name, user_password, db_name):
    connection = None
    try:
        connection = mysql.connector.connect(
            host=host_name,
            user=user_name,
            passwd=user_password,
            database=db_name
        )
        print("Connection to MySQL DB successful")
    except Error as e:
        print(f"The error '{e}' occurred")
    return connection

def execute_query(connection, query, params=None):
    cursor = connection.cursor()
    try:
        if params:
            cursor.execute(query, params)
        else:
            cursor.execute(query)
        connection.commit()
        print("Query executed successfully")
    except Error as e:
        print(f"The error '{e}' occurred")

def execute_read_query(connection, query, params=None):
    cursor = connection.cursor()
    result = None
    try:
        if params:
            cursor.execute(query, params)
        else:
            cursor.execute(query)
        result = cursor.fetchall()
        return result
    except Error as e:
        print(f"The error '{e}' occurred")

# 数据库连接信息
host = "localhost"
user = "your_username"
password = "your_password"
database = "your_database"

# 创建数据库连接
connection = create_connection(host, user, password, database)

# 创建表
create_table_query = """
CREATE TABLE IF NOT EXISTS employees (
    id INT PRIMARY KEY,
    name VARCHAR(100),
    age INT,
    address VARCHAR(255),
    salary DECIMAL(10, 2)
);
"""
execute_query(connection, create_table_query)

# 插入数据
insert_query = """
INSERT INTO employees (id, name, age, address, salary)
VALUES (%s, %s, %s, %s, %s);
"""
insert_params = (1, '张三', 28, '北京', 6000)
execute_query(connection, insert_query, insert_params)

# 查询数据
select_query = "SELECT * FROM employees;"
results = execute_read_query(connection, select_query)
for row in results:
    print(row)

table_name = "employees"
# 更新数据
update_query = f"""
UPDATE {table_name}
SET salary = %s
WHERE id = %s;
"""
update_params = ( 7000, 1)
execute_query(connection, update_query, update_params)

# 再次查询数据以查看更新结果
results = execute_read_query(connection, select_query)
for row in results:
    print(row)

# 删除数据
delete_query = f"""DELETE FROM {table_name} WHERE id = %s;"""
delete_params = (1)
execute_query(connection, delete_query, delete_params)

# 最后查询数据以确认删除结果
results = execute_read_query(connection, select_query)
for row in results:
    print(row)

# 关闭连接
if connection.is_connected():
    connection.close()
    print("MySQL connection is closed")

9. 存 JSON

MySQL从5.7版本开始支持原生的JSON数据类型,这使得存储、查询JSON数据变得更加方便。以下是使用Python和MySQL进行JSON数据的增删改查的示例代码。这里假设你已经有一个MySQL数据库,并且其中有一个表用于存储JSON数据。

首先,确保你的MySQL用户有权限操作数据库,并且已经安装了mysql-connector-python库,如果没有安装,可以通过pip install mysql-connector-python来安装。

9.1. 连接数据库
import mysql.connector
from mysql.connector import Error

def create_connection():
    try:
        connection = mysql.connector.connect(
            host='localhost',
            database='your_database',
            user='your_username',
            password='your_password'
        )
        print("Connection to MySQL DB successful")
        return connection
    except Error as e:
        print(f"The error '{e}' occurred")

# 创建连接
conn = create_connection()
cursor = conn.cursor()
9.2. 插入JSON数据
def insert_json_data(table_name, json_data):
    query = f"INSERT INTO {table_name} (json_column) VALUES (%s)"
    cursor.execute(query, (json_data,))
    conn.commit()
    print("JSON data inserted successfully")

# 插入JSON数据
json_data = '{"name": "John", "age": 30}'
insert_json_data('your_table', json_data)
9.3. 查询JSON数据
def query_json_data(table_name, column, condition):
    query = f"SELECT * FROM {table_name} WHERE JSON_EXTRACT(json_column, '{column}') = %s"
    cursor.execute(query, (condition,))
    result = cursor.fetchall()
    return result

# 查询JSON数据
results = query_json_data('your_table', '$.name', 'John')
for row in results:
    print(row)
9.4. 更新JSON数据
def update_json_data(table_name, set_clause, condition):
    query = f"UPDATE {table_name} SET json_column = JSON_SET(json_column, '{set_clause}') WHERE JSON_EXTRACT(json_column, '{condition}') = %s"
    cursor.execute(query, ('value_to_match',))
    conn.commit()
    print("JSON data updated successfully")

# 更新JSON数据
update_json_data('your_table', '$.age', '$.name')
9.5. 删除JSON数据
def delete_json_data(table_name, condition):
    query = f"DELETE FROM {table_name} WHERE JSON_EXTRACT(json_column, '{condition}') = %s"
    cursor.execute(query, ('value_to_match',))
    conn.commit()
    print("JSON data deleted successfully")

# 删除JSON数据
delete_json_data('your_table', '$.name')
9.6. 模糊查询JSON数据

对于模糊查询,可以使用LIKE运算符结合JSON_EXTRACT函数来实现。例如,如果你想查询JSON字段中某个属性包含特定关键词的记录,可以这样做:

import mysql.connector

# 连接数据库
conn = mysql.connector.connect(
    host='localhost',
    database='your_database',
    user='your_username',
    password='your_password'
)
cursor = conn.cursor()

# 模糊查询JSON字段中的name属性包含'John'的记录
query = """
SELECT * FROM your_table
WHERE JSON_EXTRACT(json_column, '$.name') LIKE '%John%'
"""
cursor.execute(query)
results = cursor.fetchall()

for result in results:
    print(result)

# 关闭连接
cursor.close()
conn.close()
9.7. 多条件查询JSON数据

对于多条件查询,可以使用JSON_CONTAINS函数或者结合多个JSON_EXTRACT函数的条件。例如,如果你想查询JSON字段中同时包含特定爱好的用户,可以这样做:

import mysql.connector

# 连接数据库
conn = mysql.connector.connect(
    host='localhost',
    database='your_database',
    user='your_username',
    password='your_password'
)
cursor = conn.cursor()

# 多条件查询,查找同时拥有爱好'sport'和'music'的用户
query = """
SELECT name
FROM users
WHERE JSON_CONTAINS(json_column, '"sport"')
AND JSON_CONTAINS(json_column, '"music"')
"""
cursor.execute(query)
results = cursor.fetchall()

for result in results:
    print(result)

# 关闭连接
cursor.close()
conn.close()
9.8. 含参查询

在这些示例中,your_tableusers是你的表名,json_column是存储JSON数据的列名。你需要根据你的实际数据库结构来替换这些名称。这些查询利用了MySQL的JSON函数来实现对JSON数据的模糊查询和多条件查询。

如果John需要以变量的形式传入查询,你可以在Python中定义一个变量,然后将该变量插入到SQL查询中。以下是如何实现这一点的示例代码:

import mysql.connector

# 定义变量
name_to_search = "John"

# 连接数据库
conn = mysql.connector.connect(
    host='localhost',
    database='your_database',
    user='your_username',
    password='your_password'
)
cursor = conn.cursor()

# 模糊查询JSON字段中的name属性包含变量值的记录
query = """
SELECT * FROM your_table
WHERE JSON_EXTRACT(json_column, '$.name') LIKE %s
"""
cursor.execute(query, ('%' + name_to_search + '%',))  # 使用参数化查询防止SQL注入

results = cursor.fetchall()

for result in results:
    print(result)

# 关闭连接
cursor.close()
conn.close()

9.9. 含参查询

在这个例子中,name_to_search变量包含了要搜索的值,并且在执行查询时,我们使用参数化查询(cursor.execute(query, ('%' + name_to_search + '%',)))来防止SQL注入攻击。这是处理外部输入数据时的一个安全实践。

同样地,对于多条件查询,如果条件也是以变量形式传入,可以这样做:

import mysql.connector

# 定义变量
hobby1 = "sport"
hobby2 = "music"

# 连接数据库
conn = mysql.connector.connect(
    host='localhost',
    database='your_database',
    user='your_username',
    password='your_password'
)
cursor = conn.cursor()

# 多条件查询,查找同时拥有爱好变量值的用户
query = """
SELECT name
FROM users
WHERE JSON_CONTAINS(json_column, %s)
AND JSON_CONTAINS(json_column, %s)
"""
cursor.execute(query, (hobby1, hobby2))  # 使用参数化查询

results = cursor.fetchall()

for result in results:
    print(result)

# 关闭连接
cursor.close()
conn.close()

在这个例子中,hobby1hobby2变量包含了要搜索的爱好,我们在执行查询时将这些变量作为参数传入,这样可以动态地根据需要搜索不同的条件。同样,使用参数化查询是一种安全的做法,可以防止SQL注入攻击。

10. 常用命令

-- 增 (Create): 插入新记录到表中
INSERT INTO table_name (column1, column2, column3)
VALUES (value1, value2, value3);

-- 删 (Delete): 从表中删除记录
DELETE FROM table_name WHERE condition;

-- 改 (Update): 更新表中已有的记录
UPDATE table_name
SET column1 = value1, column2 = value2
WHERE condition;

-- 查 (Read/Select): 查询表中的数据
SELECT column1, column2
FROM table_name
WHERE condition;

-- 添加列 (Add Column): 向现有表中添加一个新列
ALTER TABLE table_name
ADD column4 datatype;

-- 删除列 (Drop Column): 从表中删除列
ALTER TABLE table_name
DROP COLUMN column4;

-- 修改列属性 (Modify Column): 改变现有列的数据类型
ALTER TABLE table_name
MODIFY COLUMN column1 new_datatype;

-- 修改列名称 (Change Column): 重命名列名并可同时改变数据类型
ALTER TABLE table_name
CHANGE old_column_name new_column_name new_datatype;

-- 修改表名 (Rename Table): 更改表的名字
RENAME TABLE old_table_name TO new_table_name;


CREATE TABLE example_table (
    id INT AUTO_INCREMENT,          -- 自动递增的整数ID
    name VARCHAR(100) NOT NULL,     -- 非空的可变长度字符串,最大长度为100
    description TEXT,               -- 较长的文本字段
    age TINYINT UNSIGNED,           -- 无符号的小整数,适用于年龄
    salary DECIMAL(10,2),           -- 十进制数字,总共10位,其中2位小数
    birth_date DATE,                -- 日期格式,只包含年月日
    hire_datetime DATETIME,         -- 日期时间格式,包含年月日和时分秒
    is_active BOOLEAN DEFAULT TRUE, -- 布尔值,默认为TRUE
    email VARCHAR(255) UNIQUE,      -- 独一无二的电子邮件地址
    image BLOB,                     -- 用于存储二进制数据,比如图片
    PRIMARY KEY (id)                -- 定义主键
);

在这个例子中,example_table 表包含了不同种类的数据类型,以适应不同的数据存储需求:

  • INT:用于存储整数值。

  • VARCHAR:用于存储可变长度的字符串。

  • TEXT:用于存储大段文本。

  • TINYINT UNSIGNED:用于存储较小的非负整数。

  • DECIMAL:用于存储精确的小数。

  • DATEDATETIME:分别用于存储日期和日期时间信息。

  • BOOLEAN:用于存储布尔值。

  • BLOB:用于存储二进制大对象。

此外,还设置了几个约束条件:

  • NOT NULL:确保特定列不允许有NULL值。

  • AUTO_INCREMENT:自动递增列,通常与主键一起使用。

  • UNIQUE:确保某一列中的所有值都是唯一的。

  • DEFAULT:设置默认值,当没有提供显式值时使用。

  • PRIMARY KEY:定义表的主键,用于唯一标识表中的每一行记录。

11. 数据迁移

11.1. 表迁移
import mysql.connector
from mysql.connector import errorcode


def copy_tables(source_db, target_db, tables, user, password, host='localhost', port="13306"):
    try:
        # 连接到MySQL服务器
        connection = mysql.connector.connect(
            user=user,
            password=password,
            host=host,
            port=port
        )

        cursor = connection.cursor()

        # 创建目标数据库(如果不存在)
        cursor.execute(f"CREATE DATABASE IF NOT EXISTS {target_db};")
        cursor.execute(f"USE {target_db};")

        # 切换到源数据库以获取表信息
        cursor.execute(f"USE {source_db};")

        for table in tables:
            print(f"Processing table: {table}")

            # 检查并删除目标数据库中的现有表
            cursor.execute(f"DROP TABLE IF EXISTS {target_db}.{table};")

            # 复制表结构
            cursor.execute(f"CREATE TABLE {target_db}.{table} LIKE {source_db}.{table};")
            print(f"Table structure copied: {table}")

            # 复制数据
            cursor.execute(f"INSERT INTO {target_db}.{table} SELECT * FROM {source_db}.{table};")
            print(f"Data copied into table: {table}")

        connection.commit()
        print("Tables copied successfully.")

    except mysql.connector.Error as err:
        if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
            print("Something is wrong with your user name or password")
        elif err.errno == errorcode.ER_BAD_DB_ERROR:
            print("Database does not exist")
        else:
            print(err)
    finally:
        if connection.is_connected():
            cursor.close()
            connection.close()


if __name__ == '__main__':
    source_database = 'starit'
    target_database = 'starit_dev'
    db_user = 'root'
    db_password = 'BWOYO0R7oDt40y0shH6t'
    host = "192.168.0.91"
    port = "13306"

    # 表名列表,这里列出你想复制的表
    tables_to_copy = ['super_buy_qa', 'super_buy_bot_setting']

    copy_tables(source_database, target_database, tables_to_copy, db_user, db_password, host, port)
11.2. 数据库迁移
import mysql.connector
from mysql.connector import errorcode


def copy_database_directly(source_db, target_db, user, password, host='localhost', port="13306"):
    try:
        # 连接到MySQL服务器
        connection = mysql.connector.connect(
            user=user,
            password=password,
            host=host,
            port=port
        )

        cursor = connection.cursor()

        # 创建目标数据库
        cursor.execute(f"CREATE DATABASE IF NOT EXISTS {target_db};")
        cursor.execute(f"USE {target_db};")

        # 切换到源数据库以获取表信息
        cursor.execute(f"USE {source_db};")
        cursor.execute("SHOW TABLES;")
        tables = cursor.fetchall()

        for (table,) in tables:
            # 复制表结构
            cursor.execute(f"CREATE TABLE {target_db}.{table} LIKE {source_db}.{table};")
            # 复制数据
            cursor.execute(f"INSERT INTO {target_db}.{table} SELECT * FROM {source_db}.{table};")

        connection.commit()
        print("Database copied successfully.")

    except mysql.connector.Error as err:
        if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
            print("Something is wrong with your user name or password")
        elif err.errno == errorcode.ER_BAD_DB_ERROR:
            print("Database does not exist")
        else:
            print(err)
    finally:
        if connection.is_connected():
            cursor.close()
            connection.close()


if __name__ == '__main__':
    source_database = 'starit'
    target_database = 'starit_dev'
    db_user = 'root'
    db_password = 'BWOYO0R7oDt40y0shH6t'
    host = "192.168.0.91"
    port = "13306"

    copy_database_directly(source_database, target_database, db_user, db_password, host, port)

动物装饰