Description du scénario
Supposons qu’il existe une table mysql tranchée horizontalement, répartie sur plusieurs hôtes, chacun avec n tables fragmentées. Que dois-je faire si je dois accéder simultanément à ces tables et obtenir rapidement les résultats des requêtes ? Voici une solution pour implémenter cette exigence en utilisant la bibliothèque asyncio asyncio et la bibliothèque asynchrone aiomysql de python3.
Démo de code
Journalisation des importations importer aléatoire importation asyncio D’après l’importation d’AIoMySQL create_pool
# Supposons que la table mysql soit répartie sur 8 hôtes, chacun avec 16 sous-tables TBLES = { « 192.168.1.01 » : « table_000-015 », #000-015 indique que l’indication sous cette IP est continue de table_000 à table_015 « 192.168.1.02 » : « table_016-031 », « 192.168.1.03 » : « table_032-047 », « 192.168.1.04 » : « table_048-063 », « 192.168.1.05 » : « table_064-079 », « 192.168.1.06 » : « table_080-095 », « 192.168.1.07 » : « table_096-0111 », « 192.168.1.08 » : « table_112-0127 »,
} UTILISATEUR = « xxx » PASSWD = « xxxx »
# fonction wrapper pour détecter les exceptions def query_wrapper(func) : Enveloppe de défense asynchrone (*args, **kwargs) : Essayez : await func(*args, **kwargs) sauf l’exception comme e : Imprimé(e) Enveloppe de retour
# La fonction réelle de gestion de l’accès SQL implémente des requêtes asynchrones non bloquantes via AIOMYSQL @query_wrapper Défense asynchrone query_do_something(IP, DB, table) : asynchrone avec create_pool(host=IP, db=db, user=USER, password=PASSWD) comme pool : asynchrone avec pool.get() en tant que conn : asynchrone avec conn.cursor() en tant que cur : SQL = (« sélectionner xxx depuis {} où xxxx ») await cur.execute(sql.format(table)) res = await cur.fetchall() # Alors fais quelque chose...
# Générez une file d’accès SQL, chaque élément de la file contient des fonctions et des paramètres pour accéder à une table def gen_tasks() : Tâches = [] pour IP, tbls dans TBLES.items() : Cols = re.split('_|-', à tbls) tblpre = « _ ».join(cols[ :-2]) min_num = int(cols[-2]) max_num = int(cols[-1]) pour un num dans la plage (min_num, max_num+1) : tasks.append( (query_do_something, IP, 'your_dbname', '{}_{}'.format(tblpre, num)) )
random.shuffle(tâches) Retour des tâches
# Exécuter la file d’attente de requêtes d’accès SQL par lots Définitivement run_tasks(Tâches, batch_len) : Essayez : Pour IDX en portée (0, LEN(Tâches), batch_len) : batch_tasks = tâches[idx :idx+batch_len] logging.info(« current batch, start_idx :%s len :%s » % (idx, len(batch_tasks))) Pour I dans la plage(0, Len(batch_tasks)) : l = batch_tasks batch_tasks= asyncio.ensure_future( l[0](*l[1 :]) ) loop.run_until_complete(asyncio.gather(*batch_tasks)) sauf l’exception comme e : Journalisation.avertissement(e)
# méthode principale, qui implémente l’appel asynchrone des fonctions via asyncio def main() : boucle = asyncio.get_event_loop()
Tâches = gen_tasks() batch_len = len(TBLES.keys()) * 5 # tout dépend de vous run_tasks(tâches, batch_len)
loop.close()
|