This article is a mirror article of machine translation, please click here to jump to the original article.

View: 15907|Reply: 2

[Source] python3 implements concurrent access horizontal partitioning tables

[Copy link]
Posted on 11/29/2017 9:15:55 PM | | |
Scenario description

Suppose there is a mysql table that is sliced horizontally, spread across multiple hosts, each with n sharded tables.
What should I do if I need to access these tables concurrently and get the query results quickly?
Here is a solution to implement this requirement using the asyncio asyncio library and the aiomysql asynchronous library of python3.

Code demo

import logging
import random
import asyncio
from aiomysql import create_pool

# Suppose the mysql table is spread across 8 hosts, each with 16 sub-tables
TBLES = {
    "192.168.1.01": "table_000-015", #000-015 indicates that the indication under this IP is continuous from table_000 to table_015
    "192.168.1.02": "table_016-031",
    "192.168.1.03": "table_032-047",
    "192.168.1.04": "table_048-063",
    "192.168.1.05": "table_064-079",
    "192.168.1.06": "table_080-095",
    "192.168.1.07": "table_096-0111",
    "192.168.1.08": "table_112-0127",
}
USER = "xxx"
PASSWD = "xxxx"

# wrapper function to catch exceptions
def query_wrapper(func):
    async def wrapper(*args, **kwargs):
        try:
            await func(*args, **kwargs)
        except Exception as e:
            print(e)
    return wrapper


# The actual SQL access handling function implements asynchronous non-blocking requests through AIOMYSQL
@query_wrapper
async def query_do_something(ip, db, table):
    async with create_pool(host=ip, db=db, user=USER, password=PASSWD) as pool:
        async with pool.get() as conn:
            async with conn.cursor() as cur:
                sql = ("select xxx from {} where xxxx")
                await cur.execute(sql.format(table))
                res = await cur.fetchall()
                # then do something...


# Generate a SQL access queue, each element of the queue contains functions and parameters to access a table
def gen_tasks():
    tasks = []
    for ip, tbls in TBLES.items():
        cols = re.split('_|-', tbls)
        tblpre = "_".join(cols[:-2])
        min_num = int(cols[-2])
        max_num = int(cols[-1])
        for num in range(min_num, max_num+1):
            tasks.append(
               (query_do_something, ip, 'your_dbname', '{}_{}'.format(tblpre, num))
            )

    random.shuffle(tasks)
    return tasks

# Run the SQL access request queue in batches
def run_tasks(tasks, batch_len):
    try:
        for idx in range(0, len(tasks), batch_len):
            batch_tasks = tasks[idx:idx+batch_len]
            logging.info("current batch, start_idx:%s len:%s" % (idx, len(batch_tasks)))
            for i in range(0, len(batch_tasks)):
                l = batch_tasks
                batch_tasks= asyncio.ensure_future(
                    l[0](*l[1:])
                )
            loop.run_until_complete(asyncio.gather(*batch_tasks))
    except Exception as e:
        logging.warn(e)

# main method, which implements asynchronous call of functions through asyncio
def main():
    loop = asyncio.get_event_loop()

    tasks = gen_tasks()
    batch_len = len(TBLES.keys()) * 5   # all up to you
    run_tasks(tasks, batch_len)

    loop.close()





Previous:How to recharge.
Next:WebAPI customizes the return data format
Posted on 11/30/2017 9:03:52 AM |
Good spooling and table optimization database
Posted on 4/4/2018 3:20:12 PM |
Like
Disclaimer:
All software, programming materials or articles published by Code Farmer Network are only for learning and research purposes; The above content shall not be used for commercial or illegal purposes, otherwise, users shall bear all consequences. The information on this site comes from the Internet, and copyright disputes have nothing to do with this site. You must completely delete the above content from your computer within 24 hours of downloading. If you like the program, please support genuine software, purchase registration, and get better genuine services. If there is any infringement, please contact us by email.

Mail To:help@itsvse.com