Scenariobeschrijving
Stel dat er een mysql-tabel is die horizontaal wordt gesneden, verspreid over meerdere hosts, elk met n gesharded tabellen. Wat moet ik doen als ik deze tabellen gelijktijdig moet openen en snel de zoekresultaten moet krijgen? Hier is een oplossing om deze eis te implementeren met behulp van de asyncio asyncio-bibliotheek en de aiomysql asynchrone bibliotheek van python3.
Codedemo
importlogging Import willekeurig import asyncio From AIOMYSQL Import create_pool
# Stel dat de mysql-tabel verspreid is over 8 hosts, elk met 16 subtabellen TBLES = { "192.168.1.01": "table_000-015", #000-015 geeft aan dat de aanduiding onder deze IP continu is van table_000 tot table_015 "192.168.1.02": "table_016-031", "192.168.1.03": "table_032-047", "192.168.1.04": "table_048-063", "192.168.1.05": "table_064-079", "192.168.1.06": "table_080-095", "192.168.1.07": "table_096-0111", "192.168.1.08": "table_112-0127",
} GEBRUIKER = "xxx" PASSWD = "xxxx"
# wrapperfunctie om uitzonderingen te vangen def query_wrapper(func): Async def wrapper(*args, **kwargs): Probeer: await func(*args, **kwargs) behalve uitzondering als e: print(e) Retouromhulsel
# De daadwerkelijke SQL-toegangsafhandelingsfunctie implementeert asynchrone niet-blokkerende verzoeken via AIOMYSQL @query_wrapper Async Def query_do_something(IP, DB, Table): asynchroon met create_pool(host=ip, db=db, user=USER, password=PASSWD) als pool: Asynchroon met pool.get() als conn: Asynchroon met conn.cursor() als cur: SQL = ("Selecteer xxx uit {} waar xxxx") await cur.execute(sql.format(table)) res = wacht cur.fetchall() # Doe dan iets...
# Genereer een SQL-toegangswachtrij, elk element van de wachtrij bevat functies en parameters om toegang te krijgen tot een tabel Def gen_tasks(): taken = [] voor ip, tbls in TBLES.items(): cols = re.split('_|-', tbls) tblpre = "_".join(cols[:-2]) min_num = int(kols[-2]) max_num = int(kols[-1]) voor num in bereik(min_num, max_num+1): tasks.append( (query_do_something, ip, 'your_dbname', '{}_{}'.format(tblpre, num)) )
random.shuffle(taken) Retourtaken
# Voer de SQL-toegangsaanvraagwachtrij in batches uit Def run_tasks(taken, batch_len): Probeer: voor idx in range(0, len(taken), batch_len): batch_tasks = taken[idx:idx+batch_len] logging.info("huidige batch, start_idx:%s len:%s" % (idx, len(batch_tasks))) voor i in range(0, len(batch_tasks)): l = batch_tasks batch_tasks= asyncio.ensure_future( l[0](*l[1:]) ) loop.run_until_complete(asyncio.gather(*batch_tasks)) behalve uitzondering als e: logging.warn(e)
# hoofdmethode, die asynchrone oproep van functies via asyncio implementeert def main(): lus = asyncio.get_event_loop()
taken = gen_tasks() batch_len = len(TBLES.keys()) * 5 # alles aan jou run_tasks(taken, batch_len)
loop.close()
|