python数据库操作 | 臭大佬
简介
python数据库操作
介绍
在 Python 中操作数据库,一般来说有两种方式:
- 使用 ORM(对象关系映射)模型,例如 SQLALchemy;
- 使用 Python 的特定数据库软件接口,例如 pymysql;
准备工作
准备一张数据表
CREATE TABLE `articles` (
`id` int(10) NOT NULL AUTO_INCREMENT,
`title` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '',
`url` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '',
`img_url` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '',
PRIMARY KEY (`id`)
) ENGINE=MyISAM DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
pymysql操作数据
安装
pip install pymysql
爬取数据并存储
# coding:utf-8
import requests
from bs4 import BeautifulSoup
import pymysql
def getData():
'''
获取数据
:return:
'''
url = "https://www.choudalao.com/"
wbdata = requests.get(url).text
soup = BeautifulSoup(wbdata, 'lxml')
news = soup.select("#body_app > article > div.blogs > ul > li > h3 > a")
return news
def pymysqlAction():
'''
pymysql操作
:return:
'''
# 建立一个连接
conn = pymysql.connect(host='localhost', port=3306, user='root', password='root', db='python_test', charset='utf8')
# 创建一个游标
cursor = conn.cursor()
news = getData()
for d in news:
# 提取出标题和连接信息
title = d.get_text()
link = d.get("href")
sql = "INSERT INTO articles(title,url)VALUES('{0}','{1}');".format(title, link)
# 执行 SQL 语句
cursor.execute(sql)
# 提交
conn.commit()
# 关闭游标
cursor.close()
# 关闭数据库连接
conn.close()
if __name__ == '__main__':
pymysqlAction()
读取数据
# 建立一个连接
conn = pymysql.connect(host='localhost', port=3306, user='root', password='root', db='python_test', charset='utf8')
# 创建一个游标
cursor = conn.cursor()
sql = "select * from articles"
cursor.execute(sql)
# 执行sql语句,获取所有结果
result = cursor.fetchall()
# 执行sql语句,得到一条结果
# result = cursor.fetchone()
print(result)
# 断开连接
cursor.close()
conn.close()
SQLALchemy操作数据
安装
pip install sqlalchemy -i https://pypi.douban.com/simple
直接使用会报Warning: (1366, "Incorrect string value: '\\xD6\\xD0\\xB9\\xFA\\xB1\\xEA...' for column 'VARIABLE_VALUE' at row 518") result = self._query(query)
的错误,建议安装mysql-connector-python驱动
pip install mysql-connector-python -i https://pypi.douban.com/simple
详情点击查看
执行原生sql
# coding:utf-8
from sqlalchemy import create_engine
def selectDb():
conn = engine.raw_connection()
cursor = conn.cursor()
cursor.execute(
"select * from articles"
)
result = cursor.fetchall()
print(result)
cursor.close()
conn.close()
if __name__ == '__main__':
engine = create_engine(
"mysql+mysqlconnector://root:root@127.0.0.1:3306/python_test?charset=utf8",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
selectDb()
orm使用
表操作
# coding:utf-8
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
# Base是declarative_base的实例化对象
Base = declarative_base()
# 定义一个常量
ENGINE = create_engine("mysql+mysqlconnector://root:root@127.0.0.1:3306/python_test")
# 每个类都要继承Base
class Articls(Base):
__tablename__ = 'articles' # 数据库表名称
# Column是列的意思,固定写法 Column(字段类型, 参数)
# primary_key主键、index索引、nullable是否可以为空
id = Column(Integer, primary_key=True) # id 主键
title = Column(String(256), nullable=False) # title列,不可为空
url = Column(String(256), nullable=False) # url列,不可为空
# extra = Column(Text, nullable=True)
# 相当于Django的ORM的class Meta,是一些元信息
__table_args__ = (
)
if __name__ == '__main__':
# 根据类创建数据库表
Base.metadata.create_all(ENGINE)
# 根据类删除数据库表
#Base.metadata.drop_all(ENGINE)
pass
增伤改查
数据表
CREATE TABLE `users` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(32) COLLATE utf8mb4_unicode_ci NOT NULL,
PRIMARY KEY (`id`),
KEY `ix_users_name` (`name`)
) ENGINE=MyISAM AUTO_INCREMENT=4 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
目录结构
run.py
'''
-*- coding: utf-8 -*-
'''
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
from models.Users import Users
ENGINE = create_engine("mysql+mysqlconnector://root:root@127.0.0.1:3306/python_test")
# 每次执行数据库操作的时候,都需要创建一个session,相当于管理器(相当于Django的ORM的objects)
Session = sessionmaker(bind=ENGINE)
# 线程安全,基于本地线程实现每个线程用同一个session
session = scoped_session(Session)
# =======执行ORM操作==========
def select():
# res = session.query(Users).all()
# res = session.query(Users).get(1) # 查询id=1的记录
# res = session.query(Users).filter(Users.name == "wjf").all()
# res = session.query(Users).filter_by(name="wjf").all()
res = session.query(Users).filter_by(name="wjf").first()
return res
def add(name=''):
if name == '':
return False
obj = Users(name=name)
session.add(obj)
# # 批量
# session.add_all([
# Users(name="aaa"),
# Users(name="bbb"),
# Hosts(name="ccc"),
# ])
# 提交
session.commit()
# 关闭session
session.close()
def update(name=''):
'''
更新
:return:
'''
if name == '':
return False
session.query(Users).filter(Users.id > 0).update({"name" :name})
#session.query(Users).filter(Users.id > 0).update({Users.name: Users.name + "099"}, synchronize_session=False)
session.commit()
session.close()
def delete():
'''
删除
:return:
'''
session.query(Users).filter(Users.id > 2).delete()
session.commit()
session.close()
if __name__ == '__main__':
add('德玛西亚')
Users.py
# coding:utf-8
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, scoped_session
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
# Base是declarative_base的实例化对象
Base = declarative_base()
# 定义一个常量
ENGINE = create_engine("mysql+mysqlconnector://root:root@127.0.0.1:3306/python_test")
# 每个类都要继承Base
class Users(Base):
__tablename__ = 'users'
# Column是列的意思,固定写法 Column(字段类型, 参数)
# primary_key主键、index索引、nullable是否可以为空
id = Column(Integer, primary_key=True)
name = Column(String(32), index=True, nullable=False)
# 相当于Django的ORM的class Meta,是一些元信息
__table_args__ = (
)
常规操作
以下操作并没有全部验证,使用前请自行验证
# 条件查询
ret1 = session.query(Users).filter_by(id=1).first()
res2 = session.query(Users).filter(Users.id > 1, Users.name == "wjf").all()
res3 = session.query(Users).filter(Users.id.between(1, 10)).all()
res4 = session.query(Users).filter(~Users.id.in_([1, 10])).first()
from sqlalchemy import and_, or_
res5 = session.query(Users).filter(and_(Users.id > 1, Users.name == "wjf")).first()
res6 = session.query(Users).filter(or_(Users.id > 1, Users.name == "wjf")).first()
res7 = session.query(Users).filter(or_(
Users.id > 1,
and_(Users.id > 3, Users.name == "wjf")
)).all()
# 通配符
res8 = session.query(Users).filter(Users.name.like("L%")).all()
res9 = session.query(Users).filter(~Users.name.like("L%")).all()
# 限制
res10 = session.query(Users).filter(~Users.name.like("L%")).all()[1:2]
# 排序
res11 = session.query(Users).order_by(Users.id.desc()).all() # 倒序
res12 = session.query(Users).order_by(Users.id.asc()).all() # 正序
# 分组
res13 = session.query(Users.name).group_by(Users.name).all()
# 聚合函数
from sqlalchemy.sql import func
res14 = session.query(
func.max(Users.id),
func.sum(Users.name),
func.min(Users.id)
).group_by(Users.name).having(func.max(Users.id > 22)).all()
# 连表
# print(res15) 得到一个列表套元组 元组里是两个对象
# [(user_obj1, hobby_obj1), (user_obj2, hobby_obj2), ]
res15 = session.query(UserInfo, Hobby).filter(UserInfo.hobby_id == Hobby.id).all()
# print(res16) 得到列表里面是前一个对象,join相当于inner join
# [user_obj1, user_obj2, ]
res16 = session.query(UserInfo).join(Hobby).all()
# 相当于inner join
# for i in res16:
# # print(i[0].name, i[1].name)
# print(i.hobby.name)
# 指定isouter=True相当于left join
res17 = session.query(Hobby).join(UserInfo, isouter=True).all()
res17_1 = session.query(UserInfo).join(Hobby, isouter=True).all()
# 或者直接用outerjoin也是相当于left join
res18 = session.query(Hobby).outerjoin(UserInfo).all()
res18_1 = session.query(UserInfo).outerjoin(Hobby).all()
print(res17)
print(res17_1)
print(res18)
print(res18_1)