We needed to delete a large number of rows from a MySQL table without interrupting the service. Without going into the details of how the approach was determined, I was asked to repeatedly delete a small number of rows from a MySQL table, followed by sleeping some time.
I searched the web and found the following useful links:
- python-mysql connector at http://dev.mysql.com/downloads/connector/python/. I used this connector to access MySQL.
- The Python database api at http://legacy.python.org/dev/peps/pep-0249/. This api defines how to interact with the python-mysql connector.
So I wrapped the database operations in the MySQL class:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class MySQL: | |
def __init__(self): | |
self.connector = mysql.connector.connect( | |
user='usr', | |
password='pwd', | |
host='127.0.0.1', | |
port='20033', # tunnel to server | |
database='db') | |
self.cursor = self.connector.cursor() | |
def delete(self, num_rows): | |
t1 = datetime.now() | |
query = ('SELECT id FROM events WHERE source_id=3651 LIMIT {}'. | |
format(num_rows)) | |
self.cursor.execute(query) | |
ids = tuple(chain.from_iterable(self.cursor.fetchall())) | |
actual_rows = len(ids) | |
if actual_rows > 0: | |
query = 'DELETE FROM events WHERE id in {}'.format( | |
ids if actual_rows > 1 else '({})'.format(ids[0])) | |
self.cursor.execute(query) | |
self.connector.commit() | |
t2 = datetime.now() | |
print 'delete {} rows took {} seconds'.format( | |
actual_rows, (t2-t1).total_seconds()) | |
return actual_rows == rows | |
def __enter__(self): | |
return self | |
def __exit__(self, type, value, traceback): | |
self.cursor.close() | |
self.connector.close() |
The database connection is closed in the __exit__() method. And I instantiate the class by Python's "with" statement to ensure the database connection is closed at the end. Detailed discussion about the with statement can be found in this article.
The database deletion is done in the delete() method. Database operation is performed by passing a string containing the SQL statement to Cursor.execute(). The result of the database operation can be accessed through the Cursor. My first database operation is query for a set of ids. I fetched all results and store them in a Python tuple using the following command:
ids = tuple(chain.from_iterable(self.cursor.fetchall()))
The second database operation deletes rows using the ids from the previous query. I tried the following SQL statement:
query = 'DELETE FROM relay_event WHERE id in {}'.format(ids)This works except when ids contains only one element. A tuple with one element has a training comma to ensure it's not mistaken as the element enclosed by parenthesis. But SQL sees the trailing comma as syntax error. So I remove the trailing comma from a single element tuple before passing it to SQL:
actual_rows = len(ids) ... ids if actual_rows > 1 else '({})'.format(ids[0])
MySQL.delete() returns a boolean indicating if some rows were deleted.
The main program instantiates MySQL, and repeatedly call MySQL.delete() and sleep(), until MySQL.delete() can't find any row to delete:
with MySQL() as mySQL: more_to_delete = mySQL.delete(rows) time.sleep(sleep_seconds) while more_to_delete: more_to_delete = mySQL.delete(rows) time.sleep(sleep_seconds)
That's all coding needed. The following is everything put together:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from datetime import datetime | |
import mysql.connector | |
from itertools import chain | |
import time | |
class MySQL: | |
def __init__(self): | |
self.connector = mysql.connector.connect( | |
user='usr', | |
password='pwd', | |
host='127.0.0.1', | |
port='20033', # tunnel to server | |
database='db') | |
self.cursor = self.connector.cursor() | |
def delete(self, num_rows): | |
t1 = datetime.now() | |
query = ('SELECT id FROM events WHERE source_id=3651 LIMIT {}'. | |
format(num_rows)) | |
self.cursor.execute(query) | |
ids = tuple(chain.from_iterable(self.cursor.fetchall())) | |
actual_rows = len(ids) | |
if actual_rows > 0: | |
query = 'DELETE FROM events WHERE id in {}'.format( | |
ids if actual_rows > 1 else '({})'.format(ids[0])) | |
self.cursor.execute(query) | |
self.connector.commit() | |
t2 = datetime.now() | |
print 'delete {} rows took {} seconds'.format( | |
actual_rows, (t2-t1).total_seconds()) | |
return actual_rows == rows | |
def __enter__(self): | |
return self | |
def __exit__(self, type, value, traceback): | |
self.cursor.close() | |
self.connector.close() | |
if __name__ == '__main__': | |
rows = 1000 | |
sleep_seconds = 5 | |
with MySQL() as mySQL: | |
more_to_delete = mySQL.delete(rows) | |
time.sleep(sleep_seconds) | |
while more_to_delete: | |
more_to_delete = mySQL.delete(rows) | |
time.sleep(sleep_seconds) |
No comments:
Post a Comment