This is how it all started. Last week, somebody asked me to check the timestamp of the last file written to a specific directory in a remote machine.
Sure. I only needed to run the following:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
$ ssh remote ls -lrt /mnt/data/ | tail -1 |
Sure. Database query returned hostnames contained in the realm. So I had a text file with 87 lines that looked like the following:
remote1 remote2 ... remote87I opened the text file with vim, joined all lines into one:
:1,$s/\n//gAnd added some code to turn this into the following bash script:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
for s in remote1 remote2 ... remote87 | |
do | |
ssh $s ls -lrt /var/lib/jelli/audio-recording/ | tail -1 | |
done |
The following day, another new request. How about keep monitoring the realm for a week? And also monitor other realms?
So I created the following Python script to read the hostnames from a text file, and check the hosts one by one:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import argparse | |
import subprocess | |
def check_one_station(station): | |
cmd = 'ssh {0} ls -alrt /var/lib/jelli/audio-recording/ | grep -v ^d | tail -1'.format(station) | |
p = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
(out, err) = p.communicate() | |
idx = err.find('cannot access') | |
if idx >= 0: | |
cmd = 'ssh {0} ls -alrt /var/lib/jelli/{0}/audio-recording/ | grep -v ^d | tail -1'.format(station) | |
p = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
(out, err) = p.communicate() | |
idx = err.find('cannot access') | |
if idx >= 0: | |
print '###', station | |
else: | |
print station, out[:-1] | |
else: | |
print station, out[:-1] | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser() | |
parser.add_argument('filename', help='file containing station names') | |
args = parser.parse_args() | |
f = open(args.filename, 'r') | |
line = f.read() | |
stations = line.split() | |
for station in stations: | |
check_one_station(station) |
To reduce the execution time, I try to use multithreading. So I learned Python multithreading from this tutorial, and modified my script to followed the example of the tutorial.
Right away, I found printing to standard output from the threads results in interleaved texts from different threads, making the output unreadable. To avoid this problem, I made all worker threads post output message to a queue. The main thread is the only thread that takes the output message from the queue and prints them to the standard output. The standard output looks good after introducing the message queue.
Then I found that joining the worker threads in the order of creation is not ideal. When one worker thread takes a long time to finish its task, other worker threads created later than the slow one might have finished their task, but the main thread is blocked while joining the slow worker thread and can't join the younger but faster threads. So I added a 5 millisecond timeout to the thread join. This means if a worker thread can't be joined in 5 milliseconds, the main thread moves on to join younger worker threads. When main thread reaches the end of the list of worker thread, it starts over from the beginning of the worker thread list and try to join worker threads that haven't been joined. The main thread keeps trying until all worker threads are joined.
The following is the script with message queue and multithreading:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import argparse | |
import subprocess | |
import threading | |
import Queue | |
import string | |
import time | |
class CheckOneStation(threading.Thread): | |
_err_marker = '###' | |
def __init__(self, station, q): | |
super(CheckOneStation, self).__init__() | |
self._station = station | |
self._q = q | |
def run(self): | |
result = self.do_work() | |
self._q.put(result) | |
def do_work(self): | |
cmd = 'ssh {0} ls -alrt /var/lib/jelli/{0}/audio-recording/ | grep -v ^d | tail -1'.format(self._station) | |
p = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
(out, err) = p.communicate() | |
if self.cmd_failed(err): | |
cmd = 'ssh {0} ls -alrt /var/lib/jelli/audio-recording/ | grep -v ^d | tail -1'.format(self._station) | |
p = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
(out, err) = p.communicate() | |
if self.cmd_failed(err): | |
msg = '{0}{1}'.format(self._err_marker, self._station) | |
else: | |
msg = '{0}{1} {2}'.format('' if check_file_time(out) else self._err_marker, self._station, out[:-1]) | |
else: | |
msg = '{0}{1} {2}'.format('' if check_file_time(out) else self._err_marker, self._station, out[:-1]) | |
return msg | |
def cmd_failed(self, err): | |
idx = err.find('cannot access') | |
if idx >= 0: | |
return True | |
idx = err.find('Connection timed out') | |
if idx >= 0: | |
return True | |
return False | |
now = time.localtime() | |
def check_file_time(line): | |
seg = string.join(line.split()[5:8]) | |
try: | |
file_time = time.strptime(seg, '%b %d %H:%M') | |
if (file_time.tm_mon == now.tm_mon and | |
file_time.tm_mday == now.tm_mday and | |
file_time.tm_hour == now.tm_hour and | |
abs(file_time.tm_min == now.tm_min) < 5): | |
return True | |
else: | |
pass | |
except: | |
pass | |
return False | |
def print_msg(q): | |
while not q.empty(): | |
s = q.get() | |
print s | |
if __name__ == '__main__': | |
q = Queue.Queue() | |
parser = argparse.ArgumentParser() | |
parser.add_argument('filename', help='file containing station names') | |
cmd_args = parser.parse_args() | |
f = open(cmd_args.filename, 'r') | |
line = f.read() | |
stations = line.split() | |
threads = [] | |
for station in stations: | |
t = CheckOneStation(station, q) | |
t.start() | |
threads.append(t) | |
while (len(threads) > 0): | |
working_set = threads; | |
for t in working_set: | |
t.join(0.005) | |
if not t.isAlive(): | |
threads.remove(t) | |
print_msg(q) |
single-thread | multi-theaded |
95.594 sec. | 4.161 sec. |
The speed up is 95.594/4.161=22.974.