分享一个复用很多次的异步MySQL连接池封装,非常实用,希望能帮到大家。
其中,初始化连接池中 cursorclass 参数设定 aiomysql.cursors.DictCursor时代表返回的值将会以字典的形式,若默认未设置将返回数据元组。
封装类如下:
import aiomysql
from Config import Config
import logging as logger
config = Config()
class Pmysql:
def __init__(self):
self.__coon = None
self.__pool = None
async def initpool(self): # 初始化连接池
try:
if self.__pool:
return self.__pool
self.__pool = await aiomysql.create_pool(
minsize=3, maxsize=6, host=config.db_host, user=config.db_user,
password=config.db_password, db=config.db_database, port=config.db_port, autocommit=True, cursorclass=aiomysql.cursors.DictCursor)
return self.__pool
except Exception as e:
logger.error(f'数据库连接异常:{e}')
raise Exception("请排除数据库异常后再尝试重启应用!")
async def getCurosr(self): # 取游标
conn = await self.pool.acquire()
cur = await conn.cursor()
return conn, cur
async def query(self, query, param=None): # 查询
conn, cur = await self.getCurosr()
try:
await cur.execute(query, param)
return await cur.fetchall()
except Exception as e:
logger.error(e)
finally:
if cur:
await cur.close()
# 释放掉conn,将连接放回到连接池中
await self.pool.release(conn)
async def getAmysqlobj(): # 取数据库对象
mysqlobj = Pmysql()
pool = await mysqlobj.initpool()
mysqlobj.pool = pool
return mysqlobj
其中的Config配置类如下:
from pydantic import BaseModel
class Config(BaseModel):
db_host:str = "localhost"
db_user:str = "root"
db_password:str = "root"
db_database:str = "test"
db_port:int = 3306
在其他模块调用:
from aiosql import getAmysqlobj
import asyio
async def main():
conn = await getAmysqlobj()
data = await conn.query("SELECT VERSION()")
print(data)
if __name__ == '__main__':
asyncio.run(main())
发表回复