Szenariobeschreibung
Angenommen, es gibt eine MySQL-Tabelle, die horizontal geschnitten und über mehrere Hosts verteilt ist, jeweils mit n gesplitterten Tabellen. Was soll ich tun, wenn ich gleichzeitig auf diese Tabellen zugreifen und die Abfrageergebnisse schnell erhalten muss? Hier ist eine Lösung zur Umsetzung dieser Anforderung mit der asyncio asyncio Library und der aiomysql asynchronen Bibliothek von Python3.
Code-Demo
Importprotokollierung Zufälliger Import import asyncio Von AIOMYSQL Import create_pool
# Angenommen, die MySQL-Tabelle verteilt sich auf 8 Hosts, von denen jeder 16 Untertabellen enthält TBLES = { "192.168.1.01": "table_000-015", #000-015 zeigt an, dass die Anzeige unter dieser IP von table_000 bis table_015 kontinuierlich ist "192.168.1.02": "table_016-031", "192.168.1.03": "table_032-047", "192.168.1.04": "table_048-063", "192.168.1.05": "table_064-079", "192.168.1.06": "table_080-095", "192.168.1.07": "table_096-0111", "192.168.1.08": "table_112-0127",
} NUTZER = "xxx" PASSWD = "xxxx"
# Wrapper-Funktion zum Fangen von Ausnahmen def query_wrapper(func): Async Def Wrapper(*args, **kwargs): Versuchen Sie: await func(*args, **kwargs) außer Ausnahme als e: druck(e) Rückgabeumschlag
# Die eigentliche SQL-Zugriffsbehandlungsfunktion implementiert asynchrone, nicht blockierende Anfragen über AIOMYSQL @query_wrapper Async Def query_do_something(IP, DB, Table): asynchron mit create_pool(host=ip, db=db, user=USER, password=PASSWD) als Pool: asynchron mit pool.get() als conn: asynchron mit conn.cursor() als cur: sql = ("select xxx from {} where xxxx") await cur.execute(sql.format(table)) res = await cur.fetchall() # Dann tu etwas...
# Generiere eine SQL-Zugriffswarteschlange, jedes Element der Warteschlange enthält Funktionen und Parameter zum Zugriff auf eine Tabelle def gen_tasks(): Aufgaben = [] für ip, tbls in TBLES.items(): cols = re.split('_|-', tbls) tblpre = "_".join(cols[:-2]) min_num = int(kols[-2]) max_num = int(kols[-1]) für Num im Bereich(min_num, max_num+1): tasks.append( (query_do_something, ip, 'your_dbname', '{}_{}'.format(tblpre, num)) )
random.shuffle(tasks) Rückkehraufgaben
# Führe die SQL-Zugriffsanfrage-Warteschlange in Chargen aus def run_tasks(Aufgaben, batch_len): Versuchen Sie: für idx in Range(0, len(tasks), batch_len): batch_tasks = tasks[idx:idx+batch_len] logging.info("aktueller Batch, start_idx:%s len:%s" % (idx, len(batch_tasks))) für i im Bereich(0, len(batch_tasks)): l = batch_tasks batch_tasks= asyncio.ensure_future( l[0](*l[1:]) ) loop.run_until_complete(asyncio.gather(*batch_tasks)) außer Ausnahme als e: logging.warn(e)
# Hauptmethode, die asynchronen Aufruf von Funktionen über Asyncio implementiert def main(): Schleife = asyncio.get_event_loop()
tasks = gen_tasks() batch_len = len(TBLES.keys()) * 5 # alles liegt bei dir run_tasks(Aufgaben, batch_len)
loop.close()
|