Περιγραφή σεναρίου
Ας υποθέσουμε ότι υπάρχει ένας πίνακας mysql που τεμαχίζεται οριζόντια, κατανεμημένος σε πολλούς κεντρικούς υπολογιστές, ο καθένας με n διαμοιρασμένους πίνακες. Τι πρέπει να κάνω εάν πρέπει να αποκτήσω πρόσβαση σε αυτούς τους πίνακες ταυτόχρονα και να λάβω γρήγορα τα αποτελέσματα του ερωτήματος; Ακολουθεί μια λύση για την υλοποίηση αυτής της απαίτησης χρησιμοποιώντας τη βιβλιοθήκη asyncio asyncio και την ασύγχρονη βιβλιοθήκη aiomysql της python3.
Επίδειξη κώδικα
Εισαγωγή καταγραφής εισαγωγή τυχαία εισαγωγή asyncio Από το AOMYSQL Import create_pool
# Ας υποθέσουμε ότι ο πίνακας mysql κατανέμεται σε 8 κεντρικούς υπολογιστές, ο καθένας με 16 υποπίνακες TBLES = { "192.168.1.01": "table_000-015", #000-015 υποδεικνύει ότι η ένδειξη κάτω από αυτήν την IP είναι συνεχής από table_000 έως table_015 "192.168.1.02": "table_016-031", "192.168.1.03": "table_032-047", "192.168.1.04": "table_048-063", "192.168.1.05": "table_064-079", "192.168.1.06": "table_080-095", "192.168.1.07": "table_096-0111", "192.168.1.08": "table_112-0127",
} ΧΡΗΣΤΗΣ = "xxx" PASSWD = "xxxx"
# Λειτουργία περιτυλίγματος για να πιάσετε εξαιρέσεις def query_wrapper(func): Async def wrapper(*args, **kwargs): Δοκιμάστε: Wait func(*args, **kwargs) εκτός από την εξαίρεση ως ε: εκτύπωση(ε) περιτύλιγμα επιστροφής
# Η πραγματική λειτουργία χειρισμού πρόσβασης SQL υλοποιεί ασύγχρονα αιτήματα μη αποκλεισμού μέσω AIOMYSQL @query_wrapper Async def query_do_something(IP, DB, πίνακας): async με create_pool(host=ip, db=db, user=USER, password=PASSWD) ως χώρο συγκέντρωσης: async με το pool.get() ως conn: async με conn.cursor() ως cur: sql = ("επιλέξτε xxx από {} όπου xxxx") Αναμονή cur.execute(sql.format(πίνακας)) res = αναμονή cur.fetchall() # τότε κάνε κάτι...
# Δημιουργήστε μια ουρά πρόσβασης SQL, κάθε στοιχείο της ουράς περιέχει συναρτήσεις και παραμέτρους για πρόσβαση σε έναν πίνακα def gen_tasks(): εργασίες = [] για ip, tbls στο TBLES.items(): cols = re.split('_|-', tbls) tblpre = "_".join(cols[:-2]) min_num = int(στήλες[-2]) max_num = int(στήλες[-1]) Για το NUM στο εύρος (min_num, max_num+1): tasks.append( (query_do_something, ip, 'your_dbname', '{}_{}'.format(tblpre, num)) )
random.shuffle(εργασίες) εργασίες επιστροφής
# Εκτελέστε την ουρά αιτημάτων πρόσβασης SQL σε παρτίδες def run_tasks(tasks, batch_len): Δοκιμάστε: Για IDX στο εύρος(0, len(εργασίες), batch_len): batch_tasks = εργασίες[idx:idx+batch_len] logging.info("τρέχουσα παρτίδα, start_idx:%s len:%s" % (idx, len(batch_tasks))) για i στο εύρος(0, len(batch_tasks)): l = batch_tasks batch_tasks= asyncio.ensure_future( l[0](*l[1:]) ) loop.run_until_complete(asyncio.gather(*batch_tasks)) εκτός από την εξαίρεση ως ε: logging.warn(e)
# κύρια μέθοδος, η οποία υλοποιεί ασύγχρονη κλήση συναρτήσεων μέσω ασυγχρονισμού def main(): βρόχος = asyncio.get_event_loop()
εργασίες = gen_tasks() batch_len = len(TBLES.keys()) * 5 # όλα εξαρτώνται από εσάς run_tasks(καθήκοντα, batch_len)
βρόχος.κλείσιμο()
|