1. 安装
1.1. 安装扩展包-Python
pip install sqlalchemy pymysql mysql-connector-python2. 安装-Docker
docker run --name simon-mysql -itd --restart=always \
-p 53306:3306 \
-v $PWD/mysql/data:/var/lib/mysql \
-v $PWD/mysql/conf:/etc/mysql/conf.d \
-e MYSQL_ROOT_PASSWORD="BWOYO0R7oDt40y0shH6t" \
-e TZ=Asia/Shanghai \
-d mysql:5.7
docker run --name simon-mysql910 -itd --restart=always \
-p 53306:3306 \
-v $PWD/mysql/data:/var/lib/mysql \
-v $PWD/mysql/conf:/etc/mysql/conf.d \
-e MYSQL_ROOT_PASSWORD="BWOYO0R7oDt40y0shH6t" \
-e TZ=Asia/Shanghai \
-d mysql:9.1.03. 数据库
3.1. 创建数据库
from sqlalchemy import create_engine, text
def create_database(
database_name,
user="root",
password="",
host="127.0.0.1",
port=3306,
charset="utf8mb4",
):
# 创建一个引擎,这里使用的是 pymysql 作为驱动
engine = create_engine(f"mysql+pymysql://{user}:{password}@{host}:{port}/")
# 使用引擎连接到 MySQL 服务器
with engine.connect() as connection:
# 构建创建数据库的 SQL 语句,并使用 text 函数包装
query = text(
f"CREATE DATABASE IF NOT EXISTS {database_name} CHARACTER SET {charset} COLLATE {charset}_unicode_ci;"
)
# 执行 SQL 语句
connection.execute(query)
# 指定要创建的数据库名
db_name = "testdb"
# 调用函数创建数据库
create_database(
database_name=db_name,
user="root",
password="BWOYO0R7oDt40y0shH6t",
host="192.168.0.91",
port=10087,
)
print(f"Database '{db_name}' created or already exists.")3.2. 查询所有数据库
from sqlalchemy import create_engine, text
def list_databases(user="root", password="", host="127.0.0.1", port=3306):
# 创建一个引擎,这里使用的是 pymysql 作为驱动
engine = create_engine(f"mysql+pymysql://{user}:{password}@{host}:{port}/")
# 使用引擎连接到 MySQL 服务器
with engine.connect() as connection:
# 构建查询数据库的 SQL 语句,并使用 text 函数包装
query = text("SHOW DATABASES;")
# 执行 SQL 语句
result = connection.execute(query)
# 获取所有数据库名称
databases = [row[0] for row in result]
return databases
if __name__ == "__main__":
# 调用函数查询所有数据库
databases = list_databases(
user="root",
password="BWOYO0R7oDt40y0shH6t",
host="192.168.0.91",
port=10087,
)
# 打印所有数据库名称
print("Databases:")
for db in databases:
print(db)3.3. 清空指定数据库
from sqlalchemy import create_engine, text
def clear_database(
database_name, user="root", password="", host="127.0.0.1", port=3306
):
# 创建一个引擎,这里使用的是 pymysql 作为驱动
engine = create_engine(
f"mysql+pymysql://{user}:{password}@{host}:{port}/{database_name}"
)
# 使用引擎连接到指定的数据库
with engine.connect() as connection:
# 获取数据库中的所有表名
query = text("SHOW TABLES;")
result = connection.execute(query)
tables = [row[0] for row in result]
# 逐个删除表
for table in tables:
drop_table_query = text(f"DROP TABLE IF EXISTS {table};")
connection.execute(drop_table_query)
if __name__ == "__main__":
# 指定要清空的数据库名
db_name = "testdb"
# 调用函数清空数据库
clear_database(
database_name=db_name,
user="root",
password="BWOYO0R7oDt40y0shH6t",
host="192.168.0.91",
port=10087,
)
print(f"Database '{db_name}' has been cleared.")3.4. 删除指定数据库
from sqlalchemy import create_engine, text
def delete_database(
database_name, user="root", password="", host="127.0.0.1", port=3306
):
# 创建一个引擎,这里使用的是 pymysql 作为驱动
engine = create_engine(f"mysql+pymysql://{user}:{password}@{host}:{port}/")
# 使用引擎连接到 MySQL 服务器
with engine.connect() as connection:
# 构建删除数据库的 SQL 语句,并使用 text 函数包装
query = text(f"DROP DATABASE IF EXISTS {database_name};")
# 执行 SQL 语句
connection.execute(query)
if __name__ == "__main__":
# 指定要删除的数据库名
db_name = "testdb"
# 调用函数删除数据库
delete_database(
database_name=db_name,
user="root",
password="BWOYO0R7oDt40y0shH6t",
host="192.168.0.91",
port=10087,
)
print(f"Database '{db_name}' has been deleted.")3.5. 查询特定数据库有哪些表
from sqlalchemy import create_engine, text
def list_tables_in_database(
database_name, user="root", password="", host="127.0.0.1", port=3306
):
# 创建一个引擎,这里使用的是 pymysql 作为驱动
engine = create_engine(
f"mysql+pymysql://{user}:{password}@{host}:{port}/{database_name}"
)
# 使用引擎连接到指定的数据库
with engine.connect() as connection:
# 构建查询表的 SQL 语句,并使用 text 函数包装
query = text("SHOW TABLES;")
# 执行 SQL 语句
result = connection.execute(query)
# 获取所有表名
tables = [row[0] for row in result]
return tables
if __name__ == "__main__":
# 指定要查询的数据库名
db_name = "bdchat"
# 调用函数查询所有表
tables = list_tables_in_database(
database_name=db_name,
user="root",
password="BWOYO0R7oDt40y0shH6t",
host="192.168.0.91",
port=10087,
)
# 打印所有表名
print("Tables in database '{}':".format(db_name))
for table in tables:
print(table)4. 表
4.1. 创建 三张 表
Users 表:
id(主键,自增)username(用户名)email(电子邮件)
Posts 表:
id(主键,自增)title(标题)content(内容)user_id(外键,关联 Users 表)
Comments 表:
id(主键,自增)content(评论内容)post_id(外键,关联 Posts 表)user_id(外键,关联 Users 表)
from sqlalchemy import create_engine, Column, Integer, String, Text, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship
import random
import string
database_name="bdchat",
user="root",
password="BWOYO0R7oDt40y0shH6t",
host="192.168.0.91",
port=10087
# 创建一个引擎,这里使用的是 pymysql 作为驱动
engine = create_engine('mysql+pymysql://root:BWOYO0R7oDt40y0shH6t@192.168.0.91:10087/bdchat', echo=True)
# 创建基类
Base = declarative_base()
# 定义 Users 表
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True, autoincrement=True)
username = Column(String(50), nullable=False)
email = Column(String(100), nullable=False)
posts = relationship('Post', back_populates='user')
comments = relationship('Comment', back_populates='user')
# 定义 Posts 表
class Post(Base):
__tablename__ = 'posts'
id = Column(Integer, primary_key=True, autoincrement=True)
title = Column(String(100), nullable=False)
content = Column(Text, nullable=False)
user_id = Column(Integer, ForeignKey('users.id'), nullable=False)
user = relationship('User', back_populates='posts')
comments = relationship('Comment', back_populates='post')
# 定义 Comments 表
class Comment(Base):
__tablename__ = 'comments'
id = Column(Integer, primary_key=True, autoincrement=True)
content = Column(Text, nullable=False)
post_id = Column(Integer, ForeignKey('posts.id'), nullable=False)
user_id = Column(Integer, ForeignKey('users.id'), nullable=False)
post = relationship('Post', back_populates='comments')
user = relationship('User', back_populates='comments')
# 创建表
Base.metadata.create_all(engine)
# 创建会话
Session = sessionmaker(bind=engine)
session = Session()
# 生成随机字符串
def generate_random_string(length=10):
letters = string.ascii_lowercase
return ''.join(random.choice(letters) for _ in range(length))
# 插入 100 组模拟数据
def insert_data():
# 插入 100 个用户
users = [User(username=generate_random_string(), email=f"{generate_random_string()}@example.com") for _ in range(100)]
session.add_all(users)
session.commit()
# 插入 100 个帖子
posts = [Post(title=generate_random_string(20), content=generate_random_string(100), user_id=random.randint(1, 100)) for _ in range(100)]
session.add_all(posts)
session.commit()
# 插入 100 个评论
comments = [Comment(content=generate_random_string(50), post_id=random.randint(1, 100), user_id=random.randint(1, 100)) for _ in range(100)]
session.add_all(comments)
session.commit()
if __name__ == "__main__":
# 插入数据
insert_data()
print("Data insertion completed.")5. 事物回滚
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
# 创建引擎实例
engine = create_engine('sqlite:///example.db')
# 创建Session类
Session = sessionmaker(bind=engine)
# 开始会话
session = Session()
try:
# 手动开始事务
session.begin()
# 在这里执行你的数据库操作
user = User(name='John Doe', age=30)
session.add(user)
# 更多操作...
# 提交事务
session.commit()
except:
# 发生异常时回滚事务
session.rollback()
raise
finally:
# 确保会话关闭
session.close()6. 命令行操作
6.1. 依据 json 自动创建
pip install mysql-connector-python
data = {
"name": "张三",
"age": 30,
"is_student": False,
"courses": "['数学', '物理']",
"address": "{'street': '和平路1号', 'city': '北京'}",
"data": "{'name': '张三', 'age': 30, 'is_student': false, 'courses': ['数学', '物理'], 'address': {'street': '和平路1号', 'city': '北京'}}"
}
import json
from mysql import connector
from mysql.connector import Error
def connect_to_database(host, user, password, database, port):
try:
connection = connector.connect(host=host,
user=user,
password=password,
database=database,
port=port)
if connection.is_connected():
print('Connected to MySQL database')
return connection
except Error as e:
print(f"Error: {e}")
return None
def create_table(connection, table_name, columns):
cursor = connection.cursor()
columns_def = ", ".join([f"{col} VARCHAR(255)" for col in columns])
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {table_name} (
id INT AUTO_INCREMENT PRIMARY KEY,
{columns_def}
);
"""
cursor.execute(create_table_query)
connection.commit()
def truncate_table(connection, table_name):
cursor = connection.cursor()
truncate_query = f"TRUNCATE TABLE {table_name};"
cursor.execute(truncate_query)
connection.commit()
def insert_data(connection, table_name, data):
cursor = connection.cursor()
placeholders = ', '.join(['%s'] * len(data))
columns = ', '.join(data.keys())
sql = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})"
cursor.execute(sql, tuple(data.values()))
connection.commit()
def process_json(json_data, table_name, connection, truncate=False):
# 如果是单个JSON对象
if isinstance(json_data, dict):
if truncate:
truncate_table(connection, table_name)
else:
create_table(connection, table_name, json_data.keys())
insert_data(connection, table_name, json_data)
# 如果是JSON Lines
elif isinstance(json_data, list):
if len(json_data) > 0:
if truncate:
truncate_table(connection, table_name)
else:
create_table(connection, table_name, json_data[0].keys())
for item in json_data:
insert_data(connection, table_name, item)
def search_data(connection, table_name, column_name=None, search_string=None):
cursor = connection.cursor()
if not column_name or column_name == '':
# 对所有字段进行模糊搜索
columns = get_columns(connection, table_name)
conditions = " OR ".join([f"{col} LIKE %s" for col in columns])
query = f"SELECT * FROM {table_name} WHERE {conditions}"
params = [f"%{search_string}%" for _ in columns]
else:
# 对指定字段进行模糊搜索
query = f"SELECT * FROM {table_name} WHERE {column_name} LIKE %s"
params = [f"%{search_string}%"]
cursor.execute(query, params)
results = cursor.fetchall()
return results
def get_columns(connection, table_name):
cursor = connection.cursor()
cursor.execute(f"SHOW COLUMNS FROM {table_name}")
columns = [column[0] for column in cursor.fetchall()]
return columns
def search_by_id(connection, table_name, id_value):
cursor = connection.cursor()
query = f"SELECT * FROM {table_name} WHERE id = %s"
cursor.execute(query, (id_value, ))
result = cursor.fetchone()
if result:
return dict(zip([col[0] for col in cursor.description], result))
else:
return None
# 假设这是从文件或API获取的JSON数据
json_data = [data]
# 数据库连接信息
db_config = {
'host': '192.168.63.128',
'user': 'root',
'password': 'BWOYO0R7oDt40y0shH6t',
'database': 'play',
'port': 3306
}
# 连接到数据库
connection = connect_to_database(**db_config)
table_name = "demo"
# 处理JSON数据,可以选择是否清空表
process_json(json_data, 'demo', connection, truncate=False)
# 搜索数据
search_results = search_data(connection,
table_name,
column_name='',
search_string='数学')
for result in search_results:
print(result)
# 根据ID搜索数据
id_search_result = search_by_id(connection, table_name, 1)
if id_search_result:
print(json.dumps(id_search_result, indent=4, ensure_ascii=False))
else:
print("No result found for the given ID.")
# 关闭数据库连接
connection.close()6.2. 依据 json 创建含有修改时间和添加时间的
存在时间在 json 序列化的问题
data = {
"name":
"张三",
"age":
30,
"is_student":
False,
"courses":
"['数学', '物理']",
"address":
"{'street': '和平路1号', 'city': '北京'}",
"data":
"{'name': '张三', 'age': 30, 'is_student': false, 'courses': ['数学', '物理'], 'address': {'street': '和平路1号', 'city': '北京'}}"
}
import json
from mysql import connector
from mysql.connector import Error
from datetime import datetime
def connect_to_database(host, user, password, database, port):
try:
connection = connector.connect(host=host,
user=user,
password=password,
database=database,
port=port)
if connection.is_connected():
print('Connected to MySQL database')
return connection
except Error as e:
print(f"Error: {e}")
return None
def create_table(connection, table_name, columns):
cursor = connection.cursor()
columns_def = ", ".join([f"{col} VARCHAR(255)" for col in columns])
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {table_name} (
id INT AUTO_INCREMENT PRIMARY KEY,
{columns_def},
"""
create_table_query += "created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,"
create_table_query += "updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"
create_table_query += ") CHARACTER SET = utf8 COLLATE = utf8_general_ci;"
cursor.execute(create_table_query)
connection.commit()
def truncate_table(connection, table_name):
cursor = connection.cursor()
truncate_query = f"TRUNCATE TABLE {table_name};"
cursor.execute(truncate_query)
connection.commit()
def insert_data(connection, table_name, data):
cursor = connection.cursor()
placeholders = ', '.join(['%s'] * len(data))
columns = ', '.join(data.keys())
sql = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})"
cursor.execute(sql, tuple(data.values()))
connection.commit()
def process_json(json_data, table_name, connection, truncate=False):
# 如果是单个JSON对象
if isinstance(json_data, dict):
if truncate:
truncate_table(connection, table_name)
else:
create_table(connection, table_name, json_data.keys())
insert_data(connection, table_name, json_data)
# 如果是JSON Lines
elif isinstance(json_data, list):
if len(json_data) > 0:
if truncate:
truncate_table(connection, table_name)
else:
create_table(connection, table_name, json_data[0].keys())
for item in json_data:
insert_data(connection, table_name, item)
def search_data(connection, table_name, column_name=None, search_string=None):
cursor = connection.cursor()
if not column_name or column_name == '':
# 对所有字段进行模糊搜索
columns = get_columns(connection, table_name)
conditions = " OR ".join([f"{col} LIKE %s" for col in columns])
query = f"SELECT * FROM {table_name} WHERE {conditions}"
params = [f"%{search_string}%" for _ in columns]
else:
# 对指定字段进行模糊搜索
query = f"SELECT * FROM {table_name} WHERE {column_name} LIKE %s"
params = [f"%{search_string}%"]
cursor.execute(query, params)
results = cursor.fetchall()
return results
def get_columns(connection, table_name):
cursor = connection.cursor()
cursor.execute(f"SHOW COLUMNS FROM {table_name}")
columns = [column[0] for column in cursor.fetchall()]
return columns
def search_by_id(connection, table_name, id_value):
cursor = connection.cursor()
query = f"SELECT * FROM {table_name} WHERE id = %s"
cursor.execute(query, (id_value, ))
result = cursor.fetchone()
if result:
result_dict = dict(zip([col[0] for col in cursor.description], result))
return {
k:
v.strftime('%Y-%m-%d %H:%M:%S') if isinstance(v, datetime) else v
for k, v in result_dict.items()
}
else:
return None
# 假设这是从文件或API获取的JSON数据
json_data = [data]
# 数据库连接信息
db_config = {
'host': '192.168.0.91',
'user': 'root',
'password': 'BWOYO0R7oDt40y0shH6t',
'database': 'BuckyDrop',
'port': 13306
}
# 连接到数据库
connection = connect_to_database(**db_config)
table_name = "demo"
# 处理JSON数据,可以选择是否清空表
# process_json(json_data, 'demo', connection, truncate=False)
# 搜索数据
search_results = search_data(connection,
table_name,
column_name='',
search_string='数学')
for result in search_results:
print(result)
# 根据ID搜索数据
id_search_result = search_by_id(connection, table_name, 1)
print(id_search_result)
if id_search_result:
print(json.dumps(id_search_result, indent=4, ensure_ascii=False))
else:
print("No result found for the given ID.")
# 关闭数据库连接
connection.close()7. 事物回滚
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
# 创建引擎实例
engine = create_engine('sqlite:///example.db')
# 创建Session类
Session = sessionmaker(bind=engine)
# 开始会话
session = Session()
try:
# 手动开始事务
session.begin()
# 在这里执行你的数据库操作
user = User(name='John Doe', age=30)
session.add(user)
# 更多操作...
# 提交事务
session.commit()
except:
# 发生异常时回滚事务
session.rollback()
raise
finally:
# 确保会话关闭
session.close()8. Mysql 常用 sql 语句
8.1. 原生语句
-- 创建示例表
CREATE TABLE employees (
id INT PRIMARY KEY,
name VARCHAR(100),
age INT,
address VARCHAR(255),
salary DECIMAL(10, 2)
);
-- 插入数据
INSERT INTO employees (id, name, age, address, salary)
VALUES (1, '张三', 28, '北京', 6000);
-- 查询数据
SELECT * FROM employees;
-- 更新数据
UPDATE employees
SET salary = 7000
WHERE id = 1;
-- 再次查询数据以查看更新结果
SELECT * FROM employees;
-- 删除数据
DELETE FROM employees WHERE id = 1;
-- 最后查询数据以确认删除结果
SELECT * FROM employees;8.2. python 语句
import mysql.connector
from mysql.connector import Error
def create_connection(host_name, user_name, user_password, db_name):
connection = None
try:
connection = mysql.connector.connect(
host=host_name,
user=user_name,
passwd=user_password,
database=db_name
)
print("Connection to MySQL DB successful")
except Error as e:
print(f"The error '{e}' occurred")
return connection
def execute_query(connection, query, params=None):
cursor = connection.cursor()
try:
if params:
cursor.execute(query, params)
else:
cursor.execute(query)
connection.commit()
print("Query executed successfully")
except Error as e:
print(f"The error '{e}' occurred")
def execute_read_query(connection, query, params=None):
cursor = connection.cursor()
result = None
try:
if params:
cursor.execute(query, params)
else:
cursor.execute(query)
result = cursor.fetchall()
return result
except Error as e:
print(f"The error '{e}' occurred")
# 数据库连接信息
host = "localhost"
user = "your_username"
password = "your_password"
database = "your_database"
# 创建数据库连接
connection = create_connection(host, user, password, database)
# 创建表
create_table_query = """
CREATE TABLE IF NOT EXISTS employees (
id INT PRIMARY KEY,
name VARCHAR(100),
age INT,
address VARCHAR(255),
salary DECIMAL(10, 2)
);
"""
execute_query(connection, create_table_query)
# 插入数据
insert_query = """
INSERT INTO employees (id, name, age, address, salary)
VALUES (%s, %s, %s, %s, %s);
"""
insert_params = (1, '张三', 28, '北京', 6000)
execute_query(connection, insert_query, insert_params)
# 查询数据
select_query = "SELECT * FROM employees;"
results = execute_read_query(connection, select_query)
for row in results:
print(row)
table_name = "employees"
# 更新数据
update_query = f"""
UPDATE {table_name}
SET salary = %s
WHERE id = %s;
"""
update_params = ( 7000, 1)
execute_query(connection, update_query, update_params)
# 再次查询数据以查看更新结果
results = execute_read_query(connection, select_query)
for row in results:
print(row)
# 删除数据
delete_query = f"""DELETE FROM {table_name} WHERE id = %s;"""
delete_params = (1)
execute_query(connection, delete_query, delete_params)
# 最后查询数据以确认删除结果
results = execute_read_query(connection, select_query)
for row in results:
print(row)
# 关闭连接
if connection.is_connected():
connection.close()
print("MySQL connection is closed")9. 存 JSON
MySQL从5.7版本开始支持原生的JSON数据类型,这使得存储、查询JSON数据变得更加方便。以下是使用Python和MySQL进行JSON数据的增删改查的示例代码。这里假设你已经有一个MySQL数据库,并且其中有一个表用于存储JSON数据。
首先,确保你的MySQL用户有权限操作数据库,并且已经安装了mysql-connector-python库,如果没有安装,可以通过pip install mysql-connector-python来安装。
9.1. 连接数据库
import mysql.connector
from mysql.connector import Error
def create_connection():
try:
connection = mysql.connector.connect(
host='localhost',
database='your_database',
user='your_username',
password='your_password'
)
print("Connection to MySQL DB successful")
return connection
except Error as e:
print(f"The error '{e}' occurred")
# 创建连接
conn = create_connection()
cursor = conn.cursor()9.2. 插入JSON数据
def insert_json_data(table_name, json_data):
query = f"INSERT INTO {table_name} (json_column) VALUES (%s)"
cursor.execute(query, (json_data,))
conn.commit()
print("JSON data inserted successfully")
# 插入JSON数据
json_data = '{"name": "John", "age": 30}'
insert_json_data('your_table', json_data)9.3. 查询JSON数据
def query_json_data(table_name, column, condition):
query = f"SELECT * FROM {table_name} WHERE JSON_EXTRACT(json_column, '{column}') = %s"
cursor.execute(query, (condition,))
result = cursor.fetchall()
return result
# 查询JSON数据
results = query_json_data('your_table', '$.name', 'John')
for row in results:
print(row)9.4. 更新JSON数据
def update_json_data(table_name, set_clause, condition):
query = f"UPDATE {table_name} SET json_column = JSON_SET(json_column, '{set_clause}') WHERE JSON_EXTRACT(json_column, '{condition}') = %s"
cursor.execute(query, ('value_to_match',))
conn.commit()
print("JSON data updated successfully")
# 更新JSON数据
update_json_data('your_table', '$.age', '$.name')9.5. 删除JSON数据
def delete_json_data(table_name, condition):
query = f"DELETE FROM {table_name} WHERE JSON_EXTRACT(json_column, '{condition}') = %s"
cursor.execute(query, ('value_to_match',))
conn.commit()
print("JSON data deleted successfully")
# 删除JSON数据
delete_json_data('your_table', '$.name')9.6. 模糊查询JSON数据
对于模糊查询,可以使用LIKE运算符结合JSON_EXTRACT函数来实现。例如,如果你想查询JSON字段中某个属性包含特定关键词的记录,可以这样做:
import mysql.connector
# 连接数据库
conn = mysql.connector.connect(
host='localhost',
database='your_database',
user='your_username',
password='your_password'
)
cursor = conn.cursor()
# 模糊查询JSON字段中的name属性包含'John'的记录
query = """
SELECT * FROM your_table
WHERE JSON_EXTRACT(json_column, '$.name') LIKE '%John%'
"""
cursor.execute(query)
results = cursor.fetchall()
for result in results:
print(result)
# 关闭连接
cursor.close()
conn.close()9.7. 多条件查询JSON数据
对于多条件查询,可以使用JSON_CONTAINS函数或者结合多个JSON_EXTRACT函数的条件。例如,如果你想查询JSON字段中同时包含特定爱好的用户,可以这样做:
import mysql.connector
# 连接数据库
conn = mysql.connector.connect(
host='localhost',
database='your_database',
user='your_username',
password='your_password'
)
cursor = conn.cursor()
# 多条件查询,查找同时拥有爱好'sport'和'music'的用户
query = """
SELECT name
FROM users
WHERE JSON_CONTAINS(json_column, '"sport"')
AND JSON_CONTAINS(json_column, '"music"')
"""
cursor.execute(query)
results = cursor.fetchall()
for result in results:
print(result)
# 关闭连接
cursor.close()
conn.close()9.8. 含参查询
在这些示例中,your_table和users是你的表名,json_column是存储JSON数据的列名。你需要根据你的实际数据库结构来替换这些名称。这些查询利用了MySQL的JSON函数来实现对JSON数据的模糊查询和多条件查询。
如果John需要以变量的形式传入查询,你可以在Python中定义一个变量,然后将该变量插入到SQL查询中。以下是如何实现这一点的示例代码:
import mysql.connector
# 定义变量
name_to_search = "John"
# 连接数据库
conn = mysql.connector.connect(
host='localhost',
database='your_database',
user='your_username',
password='your_password'
)
cursor = conn.cursor()
# 模糊查询JSON字段中的name属性包含变量值的记录
query = """
SELECT * FROM your_table
WHERE JSON_EXTRACT(json_column, '$.name') LIKE %s
"""
cursor.execute(query, ('%' + name_to_search + '%',)) # 使用参数化查询防止SQL注入
results = cursor.fetchall()
for result in results:
print(result)
# 关闭连接
cursor.close()
conn.close()9.9. 含参查询
在这个例子中,name_to_search变量包含了要搜索的值,并且在执行查询时,我们使用参数化查询(cursor.execute(query, ('%' + name_to_search + '%',)))来防止SQL注入攻击。这是处理外部输入数据时的一个安全实践。
同样地,对于多条件查询,如果条件也是以变量形式传入,可以这样做:
import mysql.connector
# 定义变量
hobby1 = "sport"
hobby2 = "music"
# 连接数据库
conn = mysql.connector.connect(
host='localhost',
database='your_database',
user='your_username',
password='your_password'
)
cursor = conn.cursor()
# 多条件查询,查找同时拥有爱好变量值的用户
query = """
SELECT name
FROM users
WHERE JSON_CONTAINS(json_column, %s)
AND JSON_CONTAINS(json_column, %s)
"""
cursor.execute(query, (hobby1, hobby2)) # 使用参数化查询
results = cursor.fetchall()
for result in results:
print(result)
# 关闭连接
cursor.close()
conn.close()在这个例子中,hobby1和hobby2变量包含了要搜索的爱好,我们在执行查询时将这些变量作为参数传入,这样可以动态地根据需要搜索不同的条件。同样,使用参数化查询是一种安全的做法,可以防止SQL注入攻击。
10. 常用命令
-- 增 (Create): 插入新记录到表中
INSERT INTO table_name (column1, column2, column3)
VALUES (value1, value2, value3);
-- 删 (Delete): 从表中删除记录
DELETE FROM table_name WHERE condition;
-- 改 (Update): 更新表中已有的记录
UPDATE table_name
SET column1 = value1, column2 = value2
WHERE condition;
-- 查 (Read/Select): 查询表中的数据
SELECT column1, column2
FROM table_name
WHERE condition;
-- 添加列 (Add Column): 向现有表中添加一个新列
ALTER TABLE table_name
ADD column4 datatype;
-- 删除列 (Drop Column): 从表中删除列
ALTER TABLE table_name
DROP COLUMN column4;
-- 修改列属性 (Modify Column): 改变现有列的数据类型
ALTER TABLE table_name
MODIFY COLUMN column1 new_datatype;
-- 修改列名称 (Change Column): 重命名列名并可同时改变数据类型
ALTER TABLE table_name
CHANGE old_column_name new_column_name new_datatype;
-- 修改表名 (Rename Table): 更改表的名字
RENAME TABLE old_table_name TO new_table_name;
CREATE TABLE example_table (
id INT AUTO_INCREMENT, -- 自动递增的整数ID
name VARCHAR(100) NOT NULL, -- 非空的可变长度字符串,最大长度为100
description TEXT, -- 较长的文本字段
age TINYINT UNSIGNED, -- 无符号的小整数,适用于年龄
salary DECIMAL(10,2), -- 十进制数字,总共10位,其中2位小数
birth_date DATE, -- 日期格式,只包含年月日
hire_datetime DATETIME, -- 日期时间格式,包含年月日和时分秒
is_active BOOLEAN DEFAULT TRUE, -- 布尔值,默认为TRUE
email VARCHAR(255) UNIQUE, -- 独一无二的电子邮件地址
image BLOB, -- 用于存储二进制数据,比如图片
PRIMARY KEY (id) -- 定义主键
);在这个例子中,example_table 表包含了不同种类的数据类型,以适应不同的数据存储需求:
INT:用于存储整数值。VARCHAR:用于存储可变长度的字符串。TEXT:用于存储大段文本。TINYINT UNSIGNED:用于存储较小的非负整数。DECIMAL:用于存储精确的小数。DATE和DATETIME:分别用于存储日期和日期时间信息。BOOLEAN:用于存储布尔值。BLOB:用于存储二进制大对象。
此外,还设置了几个约束条件:
NOT NULL:确保特定列不允许有NULL值。AUTO_INCREMENT:自动递增列,通常与主键一起使用。UNIQUE:确保某一列中的所有值都是唯一的。DEFAULT:设置默认值,当没有提供显式值时使用。PRIMARY KEY:定义表的主键,用于唯一标识表中的每一行记录。
11. 数据迁移
11.1. 表迁移
import mysql.connector
from mysql.connector import errorcode
def copy_tables(source_db, target_db, tables, user, password, host='localhost', port="13306"):
try:
# 连接到MySQL服务器
connection = mysql.connector.connect(
user=user,
password=password,
host=host,
port=port
)
cursor = connection.cursor()
# 创建目标数据库(如果不存在)
cursor.execute(f"CREATE DATABASE IF NOT EXISTS {target_db};")
cursor.execute(f"USE {target_db};")
# 切换到源数据库以获取表信息
cursor.execute(f"USE {source_db};")
for table in tables:
print(f"Processing table: {table}")
# 检查并删除目标数据库中的现有表
cursor.execute(f"DROP TABLE IF EXISTS {target_db}.{table};")
# 复制表结构
cursor.execute(f"CREATE TABLE {target_db}.{table} LIKE {source_db}.{table};")
print(f"Table structure copied: {table}")
# 复制数据
cursor.execute(f"INSERT INTO {target_db}.{table} SELECT * FROM {source_db}.{table};")
print(f"Data copied into table: {table}")
connection.commit()
print("Tables copied successfully.")
except mysql.connector.Error as err:
if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
print("Something is wrong with your user name or password")
elif err.errno == errorcode.ER_BAD_DB_ERROR:
print("Database does not exist")
else:
print(err)
finally:
if connection.is_connected():
cursor.close()
connection.close()
if __name__ == '__main__':
source_database = 'starit'
target_database = 'starit_dev'
db_user = 'root'
db_password = 'BWOYO0R7oDt40y0shH6t'
host = "192.168.0.91"
port = "13306"
# 表名列表,这里列出你想复制的表
tables_to_copy = ['super_buy_qa', 'super_buy_bot_setting']
copy_tables(source_database, target_database, tables_to_copy, db_user, db_password, host, port)
11.2. 数据库迁移
import mysql.connector
from mysql.connector import errorcode
def copy_database_directly(source_db, target_db, user, password, host='localhost', port="13306"):
try:
# 连接到MySQL服务器
connection = mysql.connector.connect(
user=user,
password=password,
host=host,
port=port
)
cursor = connection.cursor()
# 创建目标数据库
cursor.execute(f"CREATE DATABASE IF NOT EXISTS {target_db};")
cursor.execute(f"USE {target_db};")
# 切换到源数据库以获取表信息
cursor.execute(f"USE {source_db};")
cursor.execute("SHOW TABLES;")
tables = cursor.fetchall()
for (table,) in tables:
# 复制表结构
cursor.execute(f"CREATE TABLE {target_db}.{table} LIKE {source_db}.{table};")
# 复制数据
cursor.execute(f"INSERT INTO {target_db}.{table} SELECT * FROM {source_db}.{table};")
connection.commit()
print("Database copied successfully.")
except mysql.connector.Error as err:
if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
print("Something is wrong with your user name or password")
elif err.errno == errorcode.ER_BAD_DB_ERROR:
print("Database does not exist")
else:
print(err)
finally:
if connection.is_connected():
cursor.close()
connection.close()
if __name__ == '__main__':
source_database = 'starit'
target_database = 'starit_dev'
db_user = 'root'
db_password = 'BWOYO0R7oDt40y0shH6t'
host = "192.168.0.91"
port = "13306"
copy_database_directly(source_database, target_database, db_user, db_password, host, port)