Descrição do cenário
Suponha que exista uma tabela mysql fatiada horizontalmente, distribuída por múltiplos hosts, cada um com n tabelas fragmentadas. O que devo fazer se precisar acessar essas tabelas simultaneamente e obter os resultados da consulta rapidamente? Aqui está uma solução para implementar esse requisito usando a biblioteca asyncio asyncio e a biblioteca aiomysql assíncrona do python3.
Demonstração de código
Registro de importação importar aleatório importar assínpio Do Import create_pool AioMySQL
# Suponha que a tabela mysql esteja distribuída por 8 hosts, cada um com 16 sub-tabelas TBLES = { "192.168.1.01": "table_000-015", #000-015 indica que a indicação sob este IP é contínua de table_000 a table_015 "192.168.1.02": "table_016-031", "192.168.1.03": "table_032-047", "192.168.1.04": "table_048-063", "192.168.1.05": "table_064-079", "192.168.1.06": "table_080-095", "192.168.1.07": "table_096-0111", "192.168.1.08": "table_112-0127",
} USUÁRIO = "xxx" PASSWD = "xxxx"
# função de wrapper para capturar exceções def query_wrapper(func): Envoltório de defesa assíncrona (*args, **kwargs): Tente: await func(*args, **kwargs) exceto a exceção como e: Impresso(e) Embalagem de retorno
# A função real de gerenciamento de acesso SQL implementa requisições assíncronas não bloqueantes via AIOMYSQL @query_wrapper Defesa assíncrona query_do_something(IP, DB, Tabela): async com create_pool(host=ip, db=db, user=USER, password=PASSWD) como pool: ASSÍNCRONO com pool.get() como conn: async com conn.cursor() como cur: SQL = ("selecione xxx de {} onde xxxx") await cur.execute(sql.format(table)) res = aguardar cur.fetchall() # Então faça alguma coisa...
# Gerar uma fila de acesso SQL, cada elemento da fila contém funções e parâmetros para acessar uma tabela def gen_tasks(): Tarefas = [] para IP, tbls em TBLES.items(): cols = re.split('_|-', TBLs) tblpre = "_".join(cols[:-2]) min_num = int(cols[-2]) max_num = int(cols[-1]) para num no intervalo(min_num, max_num+1): tasks.append( (query_do_something, IP, 'your_dbname', '{}_{}'.format(tblpre, num)) )
random.shuffle(tarefas) Tarefas de retorno
# Executar a fila de requisição de acesso SQL em lotes Def run_tasks(tarefas, batch_len): Tente: Para IDX em Range(0, Len(Tarefas), batch_len): batch_tasks = tarefas[idx:idx+batch_len] logging.info("atual lote, start_idx:%s len:%s" % (idx, len(batch_tasks))) para i no alcance(0, len(batch_tasks)): l = batch_tasks batch_tasks= asyncio.ensure_future( l[0](*l[1:]) ) loop.run_until_complete(asyncio.gather(*batch_tasks)) exceto a exceção como e: Registro.aviso(e)
# método principal, que implementa chamada assíncrona de funções através de assíncro def main(): loop = asyncio.get_event_loop()
Tarefas = gen_tasks() batch_len = len(TBLES.keys()) * 5 # tudo a seu critério run_tasks(tarefas, batch_len)
loop.close()
|