In the interests of being frugal (ok, being a complete cheapskate) I wanted to build a full text index of around 115,000 documents. Sure, I could just punt them all into something like Amazon Elasticsearch but that costs money - and seeing as I'm running this in my personal account and there will be (maybe) one or two searches a day it didn't seem like the way to go. So let's do this the hard way, then!
First job is to figure out how to build the index. That's not so hard. Some pseudocode for what I'm doing is below. In reality, I'm using Python magic module to identify the type of document (PDF, Word, text, HTML) and then other Python modules to extract the raw text - like PyPDF2, olefile, docx and Beautiful Soup.
for each file in the directory: determine file type (using python-magic) if known file type: ExtractedText := get text from file using appropriate module for each word in ExtractedText: add/update word in hash/dictionary with reference to the filename dump hash/dictionary to text file
Pretty simple - turns out that on a low-powered machine with one vCPU this takes around 4 hours (actually closer to four-and-a-half hours) to complete. But it works! So a good start. And yes, it is important for the purposes of the exercise to completely index the files every so often (rather than just add to the existing index) so the adventure now begins - how to bring this time down.
First step: Get a faster machine. Turns out this reduces the time by about 20% as the Python code is single-threaded. On a "good" machine we get around 600 files per second - the process isn't I/O bound, it's all about the CPU - and that CPU is maxed out.
Second step is therefore multithreading. Let's spawn off a bunch of threads and get this thing running in parallel. Specifically, we'll take the text extraction piece and run each file in a separate thread. Through some experimentation it turns out the best way to do this is to set up a thread pool with the maximum number of threads to run (rather than spawning a thread for each file) and to drop each file into a queue so that the threads can read the next job out when they're ready to go. Some actual Python this time - but with lots of detail removed because it isn't really that important.
import threading import Queue WordList = {} MaxThreads = 20 ExitFlag = False QueueLock = threading.Lock() WorkQueue = Queue.Queue(150000) # Maximum number of items that can be queued Directory = "/somewhere/" Threads = [] class myThread(threading.Thread): def __init__(self, threadID, name, q): threading.Thread.__init__(self) self.threadID = threadID self.name = name self.q = q def run(self): print("Starting thread: "+self.name) StartThread(self.name, self.q) print("Exiting thread: "+self.name) def StartThread(ThreadName, QueueItem): while not ExitFlag: QueueLock.acquire() if not WorkQueue.empty(): Filename = QueueItem.get() QueueLock.release() ProcessFile(Filename) else: QueueLock.release() def ProcessFile(Filename): Text = GetTextFromFile(Filename) for Word in Text: # # Note that the next line is very inefficient in Python2 but ok in Python3. # In Python2 it is better to write: # if WordList.has_key(Word) # if Word in WordList: WordList[Word].append(Word) else: WordList[Word] = [Filename] # # Create the thread pool # for ThreadId in range(1,MaxThreads+1): Thread = myThread(ThreadId, "Thread"+str(ThreadId), WorkQueue) Thread.start() Threads.append(Thread) # # Add filenames to the queue to be processed # QueueLock.acquire() for Filename in os.listdir(Directory): WorkQueue.put(Filename) QueueLock.release() while not WorkQueue.empty(): # Wait until the queue is empty pass ExitFlag = True for T in Threads: # Wait for all threads to complete T.join()
Yes, the output from this will be messy but it works and all is good. Except that it is much slower. Much, much slower - taking around 15 hours to process the files. Clearly not an acceptable amount of time. Yes - we're using more CPU cores (with 20 threads there are 20 active cores) but the average utilisation drops as you add a core. With four threads each core is running at around 20-25% utilisation. With 20 threads it is far lower - in fact, if you add up the percentage utilisation of all the cores you get around 100%. Not a good solution.
The "ProcessFile()" function does what the psuedocode above says - it processes each file and then inserts the filename (and the words) into the "WordList" dictionary. The challenge there is that whatever Python is doing at the back end means that the insert process is single-threaded and it is operating sequentially on the dictionary for each word. And because there are around 14 million unique words in the files, each thread blocks until it can get access to the dictionary.
All right then. How can we decouple the threads from having to write to the dictionary? Easy! We'll just get them to put the words they've extracted into a message queue and send it to another thread that can insert them into the dictionary. Sure, that single thread will be really busy and the file processors will probably run ahead but it's better than them blocking, right?
import threading import Queue import multiprocessing WordList = {} MaxThreads = 20 ExitFlag = False QueueLock = threading.Lock() WorkQueue = Queue.Queue(150000) # Maximum number of items that can be queued Directory = "/somewhere/" Threads = [] MessageQ = multiprocessing.Queue() class myThread(threading.Thread): def __init__(self, threadID, name, q): threading.Thread.__init__(self) self.threadID = threadID self.name = name self.q = q def run(self): print("Starting thread: "+self.name) StartThread(self.name, self.q) print("Exiting thread: "+self.name) def StartThread(ThreadName, QueueItem): while not ExitFlag: QueueLock.acquire() if not WorkQueue.empty(): Filename = QueueItem.get() QueueLock.release() ProcessFile(Filename) # ProcessFile not included in this code excerpt else: QueueLock.release() def ProcessFile(Filename): Text = GetTextFromFile(Filename) for Word in Text: MessageQ.put([Word,Filename]) def ReadMessageQueue(): global GlobalWords if not MessageQ.empty(): (Word,Filename) = MessageQ.get() # # Note that the next line is very inefficient in Python2 but ok in Python3. # In Python2 it is better to write: # if WordList.has_key(Word) # if Word in WordList: WordList[Word].append(Word) else: WordList[Word] = [Filename] # # Create the thread pool # for ThreadId in range(1,MaxThreads+1): Thread = myThread(ThreadId, "Thread"+str(ThreadId), WorkQueue) Thread.start() Threads.append(Thread) # # Add filenames to the queue to be processed # QueueLock.acquire() for Filename in os.listdir(Directory): WorkQueue.put(Filename) QueueLock.release() while not WorkQueue.empty(): # Wait until the queue is empty ReadMessageQueue() ExitFlag = True for T in Threads: # Wait for all threads to complete T.join() while not MessageQ.empty(): ReadMessageQueue()
The biggest difference here is that the main thread is processing the message queue that contains all of the words while the other threads are free to run ahead and read files. How much of a difference does it make? Not much. Ok, that's a lie - it makes a big difference and brings the time down to around 8 hours - but that's still a far cry from the original 4 on a single vCPU machine. Not really helpful. Again, there are lots of active CPU cores but they're just not very active - which is totally different to the original single threaded program which maxed out a single CPU.
If we assume that there's something awry in the Python threading code that disagrees with how I'm trying to do things - can we use multiple processes instead of multiple threads? I'm going to stick with the message queue as a mechanism for storing the words into the dictionary because I already know that's a single-threaded action and it works best that way. So let's have each process do the same as the threads above and write the words back into the message queue as before.
So that we don't block the word insertion process, I'm running the file scanning part (which is launching each worker process) in a separate thread.
import multiprocessing def ScanDirectory(): for Filename in os.listdir(Directory): WorkerPool.map(ProcessFile, [Filename]) WorkerPool = multiprocessing.Pool(NumberOfWorkers) Thread = threading.Thread(target=ScanDirectory) Thread.start() while True: # # Yes, there's a mechanism here to stop this loop but you get the idea # ReadMessageQueue() Thread.join()
Otherwise the code stays the same. Result? Not good. Lots of very active processes but the time is still in the 7 hour range. And the process that is ingesting all of the words and putting them into the dictionary is (maybe) running at 10-15% utilisation. Why?
The issue here is that I'm forking a new process for each of 115,000 files and each process is only running for way less than a tenth of a second - perhaps even down to one hundreth of a second. This is amazingly inefficient due to the overheads of creating and destroying so many processes at the OS layer. So this sucks.
But - what if I could have a set of worker processes just like I had worker threads above. That is, spawn off a process and then tell it what to do? So that's what this next attempt does - given the maximum number of workers it creates a list of lists (that's an array of arrays to non-Python people in the audience) and then evenly distributes the files within those lists. I could have just done the thread spawning thing as per earlier but scanning a directory is really quick so it's no issue doing it this way. Then, we launch a bunch of processes and give them each a list of files to run with. Again, they're going to push the messages back to the main process so that it can process the words as they are extracted from each file.
import multiprocessing FileList = [] for i in range(0,NumberOfWorkers): FileList.append([]) for Filename in os.listdir(Directory): FileList[FileCount%NumberOfWorkers].append(Filename) Processes = [] for i in range(0,NumberOfWorkers): Process = multiprocessing.Process(target=ProcessFile, args=([FileList[i]])) Process.start() Processes.append(Process) while True: # # Again, we'll terminate this magically elsewhere... # ReadMessageQueue() for Process in Processes: Process.join()
This is a winner. Each process spawns and then runs through the list of files that it has been given - and it maxes out the CPU that it is assigned to. On top of that, the main process is then maxing out the CPU it's running on just inserting words into the dictionary. On a machine with 16 vCPUs (so there is one main process and fifteen file/word processing processes) the total time taken is now 63 minutes. And all CPUs are maxed out for most of that time - at the end, once the file processing is done the dictionary collation takes a few more minutes to complete. Cool!
My 16 vCPU test was run on a m4.4xlarge - I also tried it on a c4.4xlarge (same number of vCPUs but less memory - still more than I need) and that only took 56 minutes and worked out to be slightly cheaper overall. However, the real winner is the m4.10xlarge with 40 vCPUs. Total runtime was just over 22 minutes. On the m4.16xlarge (64 vCPUs) it ran a little slower - overheads were exceeding the raw processing power.
Instance Type | Runtime | Cost per hour | Overall cost |
---|---|---|---|
m4.4xlarge (16 vCPUs) | 3791 seconds/63.2 minutes | $1 | $1.053 |
c4.4xlarge (16 vCPUs) | 3365 seconds/56.1 minutes | $1.042 | $0.974 |
m4.10xlarge (40 vCPUs) | 1343 seconds/22.4 minutes | $2.5 | $0.933 |
m4.16xlarge (64 vCPUs) | 1793 seconds/29.8 minutes | $4 | $1.992 |
Phew. Finally. A lot of work to get here but a good result for what I need - full text indexing for a few dollars per week. I could have made my life a bit easier at the end by using AWS Batch to run through a bunch of different instances sizes for me but alas I didn't do that. I have also not tried the C5 and M5 instance types because they aren't available in the region I need.
There endeth the lesson.