0
点赞
收藏
分享

微信扫一扫

MYSQL 从项目经理的一次查询, 到PYTHON 解决问题(2) --传统企业使用MYSQL的问题

MYSQL 从项目经理的一次查询, 到PYTHON 解决问题(2) --传统企业使用MYSQL的问题_mysql

上一期的读者这个话题的读者浏览量不是太多,有点可惜了, 实际上这就是

传统企业在使用MYSQL时的问题. 解决方案很多,作为上一期的续集,我想从

几点来阐述一下传统企业使用MYSQL的一些问题.


1 不少传统企业的软件开发是外包性质的,外包企业都是有一些成熟的架构的,

大部分企业支持的数据库的列表都包含MYSQL ,并且MYSQL也是大部分企业使用

的开源数据库之一. 那问题在哪里


1 传统企业并未有互联网的企业的技术水平,包含运维的水平,MYSQL的维护水平差,

对MYSQL的认知水平也差,例如如果你问 MYSQL 是否适合所有业务的场景,大部分的

回答可能是YES.

2 部分软件外包企业的人员流动大,技术本身积累的一般,当然大的软件外包商还是

可以的,小的软件外包,就不好说了,问什么都支持,其实都是话术,真正能会使用MYSQL

的软件人员就更少了,并且为了和涨春笋形式的软件开发速度一致,部分软件外包将ORACLE

的表结构直接在MYSQL中实现,是部分企业的软件运行不畅和频频出问题的一个原因.


所以呢,真心希望某些软件外包上,能请一个资深的数据库专家,给你们普及一下表怎么设计,

怎么能符合数据库原理的使用数据库


2 另外在MYSQL 中火热的分表,尤其是多个物理主机形式的分表方式 ,逻辑分表

或者 DBLE 方式的分表,在不少传统企业做起来比较困难,维护MYSQL的难度也提高了.所以

软件外包上的分库分表,就变成了在一个MYSQL实例上的 分库分表, 通过逻辑关系将一个表

打散变为N 张表. 这样解决很好,可使用的人员,尤其是需要通过SQL 来查询业务问题的

一批人,就感到困惑了.


所以就有了下面的这个程序,(如果不清楚这个程序的产生的原因,和在MYSQL的之前通过SQL

来查询产生的问题可以翻翻上一篇前传)



这个程序主要的想法是充分利用MYSQL的高并发,将数据查询打散,通过一个SESSION 处理

一个逻辑的查询,将几十万与几千万的两个表进行程序方式的JOIN ,最终获得需要的数据

这里我们开了200个并发,并且计算了120万次,在6分钟交付了数据的分析结果,下面是

相关的程序.


import configparser

import pymysql

import time

import asyncio #标准库异步线程的库,协程,与多并发不同的是这个是单线程的

from aiomysql import create_pool #第三方库,为MYSQL 来进行的异步线程操作库





class Solution: #定义类

def __init__(self, sql1, sql2): #初始化参数

config = configparser.ConfigParser()

config.read('config.ini') #读取配置文件

self.host = config.get('config', 'host')

self.user = config.get('config', 'user')

self.password = config.get('config', 'password')

self.database = config.get('config', 'database')

self.save_database = config.get('config', 'save_database')

self.save_user = config.get('config', 'save_user')

self.save_password = config.get('config', 'save_password')


self.conn = pymysql.connect(host=self.host, user=self.user, password=self.password, database=self.database,

charset='utf8mb4') #

self.sql1 = sql1 #定义两个SQL

self.sql2 = sql2


self.task_num = 300 #异步并发数量, 一次可以干300个事务 self.pool_size = 100 #连接池size读写各100 def get_ids(self, ):

cursor = self.conn.cursor()

cursor.execute(self.sql1)

self.data_a_full = cursor.fetchall()

self.data_a_full_len = len(self.data_a_full) # 获取查询的COUNT


def run(self, ):


self.get_ids()

cur_loop = asyncio.get_event_loop() #获取一个事件的循环

cur_loop.run_until_complete(self.start(cur_loop))


async def start(self, loop):

pool = await create_pool(host=self.host, port=3306,

user=self.user, password=self.password,

db=self.database, loop=loop, maxsize=self.pool_size)

save_pool = await create_pool(host=self.host, port=3306,

user=self.save_user, password=self.save_password,

db=self.save_database, loop=loop, maxsize=self.pool_size)

i = 0 while True:

start = time.time()

m = self.data_a_full[i:i + self.task_num]

i += self.task_num

tasks = []

if len(m) == 0: #每次给300个事务,直到这个池里面的没有数据取出

break for s in m:

# print(s) sql = f"{sql2}'{s[0]}'" #PYTHON3.X新特性简写,拼接SQL c1 = self.select(loop=loop, sql=sql, pool=pool, save_pool=save_pool, data=s)

tasks.append(c1) #执行SQL 语句

await asyncio.gather(*tasks)

print(time.time() - start,'----------',i,'/',self.data_a_full_len)


async def select(self, loop, sql, pool, save_pool, data): #处理的业务逻辑

async with pool.acquire() as conn:

async with conn.cursor() as cur:

await cur.execute(sql)

r = await cur.fetchall()

table_name_dict = {

'F': 'tb_f',

'T': 'tb_t',

'D': 'tb_d',

}

sum_dict = {

'F': 0, 'T': 0, 'D': 0 }

for item in r:

if item[1] is not None:

sum_dict[item[1]] = sum_dict.get(item[1]) + item[0]

data = list(data)

sql_list = []

for key,value in sum_dict.items():

table_name = table_name_dict.get(key)

new_data = data+[value]

new_data[2] = str(new_data[2])

if new_data[4] is not None:

new_data[4] = str(float(new_data[4]))

else:

new_data[4] = 0 new_data[3] = str(new_data[3])

if table_name: #结果插入到MYSQL数据库中

insert_sql = "insert into {} (CONTRACTNO,APPLYNO,ACTIVEDATE,term,AMORTIZEAMT) values ('{}','{}','{}',{},{})".format(

table_name, *new_data)

sql_list.append(insert_sql)

sql_do = ';'.join(sql_list)

# print(sql_do) async with save_pool.acquire() as save_conn:

async with save_conn.cursor() as save_cur:

await save_cur.execute(sql_do)

await save_conn.commit()




if __name__ == '__main__':

s = time.time()

sql1 = "select CONTRACTNO,APPLYNO,ACTIVEDATE,term from tb_sync_contract where ACTIVEDATE>='2020-07-01'" sql2 = "select AMORTIZEAMT,SAP_POSTING_IND from tb_amortize where CAMAINID=" solution = Solution(sql1, sql2)

solution.run()

print(time.time() - s)



最终我们在一个16G 4CORE 核心的MYSQL 5.7.23 的数据库中,成功的产生200并发,模拟

了75万与2千600百万的数据的JOIN的计算,产生结果 时间在6分钟.


感谢程序的提供者,我们的TEAM的 PYTHON专家兼 REDIS DBA 闫树爽.

另外随着我的TEAM的人员增多, 有PYTHON专家,有POSTGRESQL, MYSQL 的专家,估计

以后能SHARE的文字会越来越多.


MYSQL 从项目经理的一次查询, 到PYTHON 解决问题(2) --传统企业使用MYSQL的问题_mysql_02






举报

相关推荐

0 条评论