Wednesday, August 13, 2014

Multiprocessing Python Template.

Hi there.
Today I often use multiprocessing module, and i needed some template to write scripts a little faster..

Hope it will be useful to you.
#!/usr/bin/env python
#Multiprocessing template

from multiprocessing import Pool,cpu_count,active_children,Lock
import signal
from time import sleep
import sys


class TimeoutException(Exception):
    """ Simple Exception to be called on timeouts. """
    pass

def _timeout(signum, frame):
    """ Raise an TimeoutException.

    This is intended for use as a signal handler.
    The signum and frame arguments passed to this are ignored.

    """
    # Raise TimeoutException with system default timeout message
    raise TimeoutException()

# Set the handler for the SIGALRM signal:
signal.signal(signal.SIGALRM, _timeout)
# Send the SIGALRM signal in 10 seconds:

# Multiprocessing with KeyboardInterrupt
def init_worker():
        signal.signal(signal.SIGINT, signal.SIG_IGN)
        
        
        
def $$somefunc$$($$args$$):

        try:
                signal.alarm(5)                 #timeout exception
                
                $$do something$$
                
                return $$some data$$
        except Exception:
                pass
        except TimeoutException:
                print 'It timed out!'
        finally:
                signal.alarm(0) 


def main():
        try:
                def cb(data):
                        if data:
                                $$do something with returned data$$
                  
                pool = Pool(processes=cpu_count()*10, initializer=init_worker) #processes = CPU*10 = 40 (if 4 CPUs)
                
                for i in $$some_list$$:
                        pool.apply_async($$somefunc$$, ($$args$$,), callback=cb) #callback function defined above
                pool.close()

                #wait for worker processes completes job 
                while len(active_children()) > 1:
                        sleep(0.5)
                pool.join()
                return
                
        # ^C will kill all running processes and return to main function
        except KeyboardInterrupt:
                print " Terminating..."
                pool.close()
                pool.terminate()
                return
        else:
                print sys.exc_info()
                return


By the way, instead of callback() function, shared memory can be managed by Manager() module.

Manager module type support: list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Queue, Value and Array.


Set as global variable:
manager = Manager.list()
lock = Lock()   #pass this lock to worker process

Call in running process:
with lock:   
   manager.append($$something$$)

#the lock is necessary, because of two processes can try to read the same value before editing it.


Maybe it's a mess for you, but maybe it will help.

No comments:

Post a Comment