|
|
Yayınlandı 29.11.2017 21:15:55
|
|
|

Senaryo açıklaması
Yataydan dilimlenmiş ve birden fazla ana konak arasında yayılmış bir mysql tablosu olduğunu varsayalım, her birinde n parçalanmış tablo var. Bu tablolara eşzamanlı erişmem ve sorgu sonuçlarını hızlıca almam gerekirse ne yapmalıyım? İşte bu gereksinimi uygulamak için asyncio asyncio kütüphanesi ve python3'ün aiomysql asenkron kütüphanesi kullanılarak sunulmuştur.
Kod demosu
Kaydı içe aktarma Rastgele içe aktarma import asyncio AIOMYSQL import create_pool
# Mysql tablosunun 8 konakta yayılmış olduğunu varsayalım, her biri 16 alt tabloya sahip TBLES = { "192.168.1.01": "table_000-015", #000-015, bu IP altındaki göstergenin table_000'den table_015'ye kadar sürekli olduğunu gösterir "192.168.1.02": "table_016-031", "192.168.1.03": "table_032-047", "192.168.1.04": "table_048-063", "192.168.1.05": "table_064-079", "192.168.1.06": "table_080-095", "192.168.1.07": "table_096-0111", "192.168.1.08": "table_112-0127",
} KULLANICI = "xxx" PASSWD = "xxxx"
# İstisnaları yakalamak için wrapper fonksiyonu def query_wrapper(func): async def wrapper(*args, **kwargs): Deneyin: Bekle Func(*args, **kwargs) İstisna dışında e: print(e) Dönüş ambalajı
# Gerçek SQL erişim işleme fonksiyonu, AIOMYSQL üzerinden asenkron engellemesiz istekleri uygular @query_wrapper Async def query_do_something(IP, DB, tablo): create_pool(host=ip, db=db, user=USER, password=PASSWD) havuz olarak asenkronize edilir: pool.get() ile conn olarak asenkronize et: conn.cursor() ile assenkronize olarak cur: SQL = ("{}'den xxx'i seç burada xxxx") await cur.execute(sql.format(table)) res = bekle cur.fetchall() # O zaman bir şey yap...
# Bir SQL erişim kuyruğu oluşturun, kuyruğun her elemanı bir tabloya erişmek için fonksiyonlar ve parametreler içerir def gen_tasks(): görevler = [] IP için, TBLES.items() içinde tbls: cols = re.split('_|-', tbls) tblpre = "_".join(cols[:-2]) min_num = int(cols[-2]) max_num = int(cols[-1]) Aralıktaki num için(min_num, max_num+1): tasks.append( (query_do_something, ip, 'your_dbname', '{}_{}'.format(tblpre, num)) )
random.shuffle(görevler) Görevleri geri getir
# SQL erişim isteği kuyruğunu toplu olarak çalıştır def run_tasks(görevler, batch_len): Deneyin: IDX için range(0, len(tasks), batch_len): batch_tasks = görevler[idx:idx+batch_len] logging.info("güncel parti, start_idx:%s len:%s" % (idx, len(batch_tasks))) Aralık(0, len(batch_tasks))'de i için): l = batch_tasks batch_tasks= asyncio.ensure_future( l[0](*l[1:]) ) loop.run_until_complete(asyncio.gather(*batch_tasks)) İstisna dışında e: loging.warn(e)
# ana yöntem, asenkron fonksiyon çağrısını asenkron olarak gerçekleştirir def main(): döngü = asyncio.get_event_loop()
görevler = gen_tasks() batch_len = len(TBLES.keys()) * 5 # tamamen sana kalmış run_tasks(görevler, batch_len)
loop.close()
|
Önceki:Nasıl şarj edilir.Önümüzdeki:WebAPI, dönüş veri formatını özelleştirir
|