Saturday, July 26, 2014

Python threading application

This is a story about how Python threading reduces a script's execution time by more than 20 times.

This is how it all started. Last week, somebody asked me to check the timestamp of the last file written to a specific directory in a remote machine.

Sure. I only needed to run the following:
$ ssh remote ls -lrt /mnt/data/ | tail -1
view raw ssh-ls.txt hosted with ❤ by GitHub
Then came the follow up question: How about checking the whole realm?

Sure. Database query returned hostnames contained in the realm. So I had a text file with 87 lines that looked like the following:
remote1
remote2
...
remote87
I opened the text file with vim, joined all lines into one:
:1,$s/\n//g
And added some code to turn this into the following bash script:
for s in remote1 remote2 ... remote87
do
ssh $s ls -lrt /var/lib/jelli/audio-recording/ | tail -1
done
view raw check.sh hosted with ❤ by GitHub
We ran the script and found some problematic hosts, fixed those hosts, and everybody went home feeling relieved.

The following day, another new request. How about keep monitoring the realm for a week? And also monitor other realms?

So I created the following Python script to read the hostnames from a text file, and check the hosts one by one:
#!/usr/bin/env python
import argparse
import subprocess
def check_one_station(station):
cmd = 'ssh {0} ls -alrt /var/lib/jelli/audio-recording/ | grep -v ^d | tail -1'.format(station)
p = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(out, err) = p.communicate()
idx = err.find('cannot access')
if idx >= 0:
cmd = 'ssh {0} ls -alrt /var/lib/jelli/{0}/audio-recording/ | grep -v ^d | tail -1'.format(station)
p = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(out, err) = p.communicate()
idx = err.find('cannot access')
if idx >= 0:
print '###', station
else:
print station, out[:-1]
else:
print station, out[:-1]
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('filename', help='file containing station names')
args = parser.parse_args()
f = open(args.filename, 'r')
line = f.read()
stations = line.split()
for station in stations:
check_one_station(station)
This script gets the job done. But it takes quite some time to check 87 stations. There must be a faster way.

To reduce the execution time, I try to use multithreading. So I learned Python multithreading from this tutorial, and modified my script to followed the example of the tutorial.

Right away, I found printing to standard output from the threads results in interleaved texts from different threads, making the output unreadable. To avoid this problem, I made all worker threads post output message to a queue. The main thread is the only thread that takes the output message from the queue and prints them to the standard output. The standard output looks good after introducing the message queue.

Then I found that joining the worker threads in the order of creation is not ideal. When one worker thread takes a long time to finish its task, other worker threads created later than the slow one might have finished their task, but the main thread is blocked while joining the slow worker thread and can't join the younger but faster threads. So I added a 5 millisecond timeout to the thread join. This means if a worker thread can't be joined in 5 milliseconds, the main thread moves on to join younger worker threads. When main thread reaches the end of the list of worker thread, it starts over from the beginning of the worker thread list and try to join worker threads that haven't been joined. The main thread keeps trying until all worker threads are joined.

The following is the script with message queue and multithreading:
#!/usr/bin/env python
import argparse
import subprocess
import threading
import Queue
import string
import time
class CheckOneStation(threading.Thread):
_err_marker = '###'
def __init__(self, station, q):
super(CheckOneStation, self).__init__()
self._station = station
self._q = q
def run(self):
result = self.do_work()
self._q.put(result)
def do_work(self):
cmd = 'ssh {0} ls -alrt /var/lib/jelli/{0}/audio-recording/ | grep -v ^d | tail -1'.format(self._station)
p = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(out, err) = p.communicate()
if self.cmd_failed(err):
cmd = 'ssh {0} ls -alrt /var/lib/jelli/audio-recording/ | grep -v ^d | tail -1'.format(self._station)
p = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(out, err) = p.communicate()
if self.cmd_failed(err):
msg = '{0}{1}'.format(self._err_marker, self._station)
else:
msg = '{0}{1} {2}'.format('' if check_file_time(out) else self._err_marker, self._station, out[:-1])
else:
msg = '{0}{1} {2}'.format('' if check_file_time(out) else self._err_marker, self._station, out[:-1])
return msg
def cmd_failed(self, err):
idx = err.find('cannot access')
if idx >= 0:
return True
idx = err.find('Connection timed out')
if idx >= 0:
return True
return False
now = time.localtime()
def check_file_time(line):
seg = string.join(line.split()[5:8])
try:
file_time = time.strptime(seg, '%b %d %H:%M')
if (file_time.tm_mon == now.tm_mon and
file_time.tm_mday == now.tm_mday and
file_time.tm_hour == now.tm_hour and
abs(file_time.tm_min == now.tm_min) < 5):
return True
else:
pass
except:
pass
return False
def print_msg(q):
while not q.empty():
s = q.get()
print s
if __name__ == '__main__':
q = Queue.Queue()
parser = argparse.ArgumentParser()
parser.add_argument('filename', help='file containing station names')
cmd_args = parser.parse_args()
f = open(cmd_args.filename, 'r')
line = f.read()
stations = line.split()
threads = []
for station in stations:
t = CheckOneStation(station, q)
t.start()
threads.append(t)
while (len(threads) > 0):
working_set = threads;
for t in working_set:
t.join(0.005)
if not t.isAlive():
threads.remove(t)
print_msg(q)
How much faster does the multithreaded script run? The following shows the elapse time of single- and multi-threaded version:

single-thread multi-theaded
95.594 sec. 4.161 sec.

The speed up is 95.594/4.161=22.974.