Source code for bumps.mapper

"""
Parallel and serial mapper implementations.

The API is a bit crufty since interprocess communication has evolved from
the original implementation. And the names are misleading.

Available mappers:
- SerialMapper: Single-threaded execution
- MPMapper: Multi-process execution using multiprocessing
- ThreadPoolMapper: Multi-threaded execution using ThreadPoolExecutor
- MPIMapper: MPI-based distributed execution across cluster nodes

Usage::

    Mapper.start_worker(problem)
    mapper = Mapper.start_mapper(problem, None, cpus)
    result = mapper(points)
    ...
    mapper = Mapper.start_mapper(problem, None, cpus)
    result = mapper(points)
    Mapper.stop_mapper()
"""

import time
import sys
import os
import signal
import threading
import copy
from concurrent.futures import ThreadPoolExecutor

from cloudpickle import dumps, loads
# {{{ http://code.activestate.com/recipes/496767/ (r1)
# Converted to use ctypes by Paul Kienzle


PROCESS_ALL_ACCESS = 0x1F0FFF


[docs] def can_pickle(problem, check=False): """ Returns True if *problem* can be pickled. If this method returns False then MPMapper cannot be used and SerialMapper should be used instead. If *check* is True then call *nllf()* on the duplicated object as a "smoke test" to verify that the function will run after copying. This is not foolproof. For example, access to a database may work in the duplicated object because the connection is open and available in the current process, but it will fail when trying to run on a remote machine. """ try: dup = loads(dumps(problem)) if check: dup.nllf() return True except Exception: return False
[docs] def setpriority(pid=None, priority=1): """ Set The Priority of a Windows Process. Priority is a value between 0-5 where 2 is normal priority and 5 is maximum. Default sets the priority of the current python process but can take any valid process ID. """ # import win32api,win32process,win32con from ctypes import windll priorityclasses = [ 0x40, # IDLE_PRIORITY_CLASS, 0x4000, # BELOW_NORMAL_PRIORITY_CLASS, 0x20, # NORMAL_PRIORITY_CLASS, 0x8000, # ABOVE_NORMAL_PRIORITY_CLASS, 0x80, # HIGH_PRIORITY_CLASS, 0x100, # REALTIME_PRIORITY_CLASS ] if pid is None: pid = windll.kernel32.GetCurrentProcessId() handle = windll.kernel32.OpenProcess(PROCESS_ALL_ACCESS, True, pid) windll.kernel32.SetPriorityClass(handle, priorityclasses[priority])
# end of http://code.activestate.com/recipes/496767/ }}}
[docs] def nice(): if os.name == "nt": setpriority(priority=1) else: os.nice(5)
[docs] def pool_size(cpus=0): """ Get the number of cpus available for processing, or use the number provided. On linux, use os.sched_getaffinity to count the number of cpus allocated to the process rather than multiprocessing.cpu_count to return all processors on the system. This allows us to restrict the amount of parallelism to the number of cpus allocated by slurm when running on a compute cluster with a partial node. """ if cpus > 0: return cpus # Use sched_getaffinity if available (only on linux) if hasattr(os, "sched_getaffinity"): return len(os.sched_getaffinity(0)) import multiprocessing return multiprocessing.cpu_count()
# For debugging parallelism it is handy to know which core the process is using
[docs] def cpu_id(num_sockets=2): """ Return the processor id for the currently running process. """ import multiprocessing import psutil process = multiprocessing.current_process() return psutil.Process(process.pid).cpu_num()
SHOW_PERFORMANCE = os.environ.get("BUMPS_SHOW_PERFORMANCE", "0").upper() in ("1", "TRUE", "ON")
[docs] def show_performance(timestamps): """ *timestamps* is a series of pairs (tstart, tstop) before and after the synchronous map call, with times in nanoseconds returned from time.perf_counter_ns(). Display the median time within *(tstop[k] - tstart[k])* and between *(tstart[k+1] = tstop[k])* map calls. """ if not timestamps or not SHOW_PERFORMANCE: return import numpy as np tstart, tstop = [np.array(v) for v in zip(*timestamps)] print( f"median step time {np.median(tstart[1:]-tstop[:-1])/1e6:.2f} ms (serial) and map time {np.median(tstop-tstart)/1e6:.2f} ms (parallel)" )
# Noise so that the type checker is happy
[docs] class BaseMapper(object): has_problem = False
[docs] @staticmethod def start_worker(problem): """Called with the problem to initialize the worker""" raise NotImplementedError()
[docs] @staticmethod def start_mapper(problem, modelargs=None, cpus=0): """Called with the problem on a new fit.""" raise NotImplementedError()
# TODO: deprecate mapper parameter
[docs] @staticmethod def stop_mapper(mapper=None): raise NotImplementedError()
[docs] class SerialMapper(BaseMapper): timestamps = []
[docs] @staticmethod def start_worker(problem): pass
[docs] @staticmethod def start_mapper(problem, modelargs=None, cpus=0): # Note: map is an iterator in python 3.x # return lambda points: list(map(problem.nllf, points)) SerialMapper.timestamps = [] def mapper(points): tstart = time.perf_counter_ns() result = list(map(problem.nllf, points)) tstop = time.perf_counter_ns() SerialMapper.timestamps.append((tstart, tstop)) return result return mapper
[docs] @staticmethod def stop_mapper(mapper=None): show_performance(MPMapper.timestamps) pass
def _MP_setup(): # Using MPMapper class variables to store worker globals. # It doesn't matter if they conflict with the controller values since # they are in a different process. signal.signal(signal.SIGINT, signal.SIG_IGN) nice() # print(f"starting pool worker on {cpu_id()}") def _MP_run_problem(problem_point_tuple): problem_id, point, shared_pickled_problem = problem_point_tuple # problem_id, point, shared_pickled_problem, cpu_usage, lock = problem_point_tuple # with lock: cpu_usage[cpu_id()] += 1 if problem_id != MPMapper.problem_id: # print(f"Fetching problem {problem_id} from namespace") MPMapper.problem = loads(shared_pickled_problem[:].tobytes()) MPMapper.problem_id = problem_id return MPMapper.problem.nllf(point)
[docs] class MPMapper(BaseMapper): # Note: suprocesses are using the same variables pool = None manager = None problem_id = 0 shared_pickled_problem = None problem = None timestamps = []
[docs] @staticmethod def start_worker(problem): pass
[docs] @staticmethod def start_mapper(problem, modelargs=None, cpus=0): import multiprocessing # Set up the process pool on the first call. if MPMapper.pool is None: # Create a sync namespace to distribute the problem description. MPMapper.manager = multiprocessing.Manager() # Start the process pool, sending the namespace handle MPMapper.pool = multiprocessing.Pool(pool_size(cpus), _MP_setup) # For verifying that the execution threads can migrate between cpus, accumulate # a histogram of the processor id for each function evaluation. # MPMapper.num_cpus = multiprocessing.cpu_count() # may be more than pool_size # MPMapper.lock = MPMapper.manager.Lock() # MPMapper.cpu_usage = MPMapper.manager.Array('i', [0]*MPMapper.num_cpus) # print("pool created") # Increment the problem number and store the problem in the namespace. # The store action uses pickle to transfer python objects to the # manager process. Since this may fail for lambdas and for functions # defined within the model file, instead use cloudpickle # to pickle the problem before storing. MPMapper.problem_id += 1 MPMapper.pickled_problem = dumps(problem) MPMapper.shared_pickled_problem = MPMapper.manager.Array("B", MPMapper.pickled_problem) # Set the mapper to send problem_id/point/shared_pickled_problem value triples MPMapper.timestamps = [] def mapper(points): try: tstart = time.perf_counter_ns() # args = ((MPMapper.problem_id, p, MPMapper.shared_pickled_problem, MPMapper.cpu_usage, MPMapper.lock) for p in points) args = ((MPMapper.problem_id, p, MPMapper.shared_pickled_problem) for p in points) result = MPMapper.pool.map(_MP_run_problem, args) tstop = time.perf_counter_ns() # print(f"map time {tstart} => {tstop}") MPMapper.timestamps.append((tstart, tstop)) return result except KeyboardInterrupt: MPMapper.stop_mapper() return mapper
[docs] @staticmethod def stop_mapper(mapper=None): # print("stopping mapper") # reset pool and manager if MPMapper.pool is not None: MPMapper.pool.terminate() MPMapper.pool = None # Show cpu histogram # print("== evaluation count per cpu ==") # for k in range(MPMapper.num_cpus): # print(MPMapper.cpu_usage[k], end=" ") # print() show_performance(MPMapper.timestamps) MPMapper.manager.shutdown() MPMapper.manager = None
# Don't reset problem id; it keeps count even when mapper is restarted. ##MPMapper.problem_id = 0 def _TP_run_problem(problem_point_tuple): """Thread pool worker function with thread-local problem copy.""" problem_id, point, original_problem = problem_point_tuple # Get or create thread-local problem copy thread_local = threading.current_thread() if getattr(thread_local, "problem_id", None) != problem_id: thread_local.problem_id = problem_id thread_local.problem_copy = copy.deepcopy(original_problem) return thread_local.problem_copy.nllf(point)
[docs] class ThreadPoolMapper(BaseMapper): """ Thread-based parallel mapper using concurrent.futures.ThreadPoolExecutor. Each thread maintains its own copy of the problem object for independent calculations of nllf. This mapper will only be efficient when using a free-threaded python interpreter (otherwise the GIL will prevent true parallelism). """ pool = None problem_id = 0 timestamps = []
[docs] @staticmethod def start_worker(problem): pass
[docs] @staticmethod def start_mapper(problem, modelargs=None, cpus=0): # Set up the thread pool on the first call. if ThreadPoolMapper.pool is None: ThreadPoolMapper.pool = ThreadPoolExecutor(max_workers=pool_size(cpus)) ThreadPoolMapper.problem_id += 1 # Create mapper function that submits tasks to thread pool ThreadPoolMapper.timestamps = [] def mapper(points): try: tstart = time.perf_counter_ns() futures = [ ThreadPoolMapper.pool.submit(_TP_run_problem, (ThreadPoolMapper.problem_id, p, problem)) for p in points ] # Collect results in order result = [future.result() for future in futures] tstop = time.perf_counter_ns() ThreadPoolMapper.timestamps.append((tstart, tstop)) return result except KeyboardInterrupt: ThreadPoolMapper.stop_mapper() raise return mapper
[docs] @staticmethod def stop_mapper(mapper=None): if ThreadPoolMapper.pool is not None: show_performance(ThreadPoolMapper.timestamps) ThreadPoolMapper.pool.shutdown(wait=True) ThreadPoolMapper.pool = None
# Thread-local copies will be automatically garbage collected # when threads are destroyed def _MPI_set_problem(problem, comm, root=0): pickled_problem = dumps(problem) if comm.rank == root else None pickled_problem = comm.bcast(pickled_problem, root=root) return problem if comm.rank == root else loads(pickled_problem) def _MPI_map(problem, points, comm, root=0): import numpy as np from mpi4py import MPI # print(f"{comm.rank}: mapping points") # Send number of points and number of variables per point. # root: return result if there are points otherwise return False # worker: return True if there are points otherwise return False npoints, nvars = comm.bcast(points.shape if comm.rank == root else None, root=root) if npoints == 0: return None # Divvy points equally across all processes whole = points if comm.rank == root else None idx = np.arange(comm.size) size = np.ones(comm.size, idx.dtype) * (npoints // comm.size) + (idx < npoints % comm.size) offset = np.cumsum(np.hstack((0, size[:-1]))) part = np.empty((size[comm.rank], nvars), dtype="d") comm.Scatterv((whole, (size * nvars, offset * nvars), MPI.DOUBLE), (part, MPI.DOUBLE), root=root) # Evaluate models assigned to each processor partial_result = np.array([problem.nllf(pk) for pk in part], dtype="d") # Collect results result = np.empty(npoints, dtype="d") if comm.rank == root else True comm.Barrier() comm.Gatherv((partial_result, MPI.DOUBLE), (result, (size, offset), MPI.DOUBLE), root=root) comm.Barrier() return result
[docs] def using_mpi(): # Can look for environment variables defined by mpirun # mpich: PMI_SIZE, PMI_*, MPI_* # openmp: OMPI_COMM_WORLD_SIZE, OMPI_* PMIX_* # impi_rt (intel): PMI_SIZE I_MPI_* HYDRA_* # msmpi (microsoft): PMI_SIZE PMI_* MSMPI_* size = os.environ.get("PMI_SIZE", None) if size is None: size = os.environ.get("OMPI_COMM_WORLD_SIZE", None) return size is not None and int(size) > 1
[docs] class MPIMapper(BaseMapper): has_problem = True """For MPIMapper only the worker is initialized with the fit problem.""" timestamps = []
[docs] @staticmethod def start_worker(problem): """ Start the worker process. For the main process this does nothing and returns immediately. The worker processes never return. Each worker sits in a loop waiting for the next batch of points for the problem, or for the next problem. Set t problem is set to None, then exit the process and never """ from mpi4py import MPI comm, root = MPI.COMM_WORLD, 0 MPIMapper.rank = comm.rank rank = comm.rank # print(f"MPI {rank} of {comm.size} initializing") # If worker, sit in a loop waiting for the next point. # If the point is empty, then wait for a new problem. # If the problem is None then we are done, otherwise wait for next point. if rank != root: # print(f"{rank}: looping") while True: result = _MPI_map(problem, None, comm, root) if result is None: problem = _MPI_set_problem(None, comm, root) if problem is None: break # print(f"{rank}: changing problem") # print(f"{rank}: finalizing") MPI.Finalize() # Exit the program after the worker is done. Don't return # to the caller since that is continuing on with the main # thread, and in particular, attempting to rerun the fit on # each worker. # print(f"{rank}: Worker exiting") sys.exit(0) # print(f"{rank}: Worker exited") else: # Root initialization: MPIMapper.has_problem = problem is not None
# print("mapper has problem", MPIMapper.has_problem)
[docs] @staticmethod def start_mapper(problem, modelargs=None, cpus=0): # Only root can get here---worker is stuck in start_worker from mpi4py import MPI import numpy as np comm, root = MPI.COMM_WORLD, 0 # Signal new problem then send it, but not on the first fit. We do this # so that we can still run MPI fits even if the problem itself cannot # be pickled, but only the first one. (You can still fit a series even # if the problem can't be pickled, but you will need to restart the # MPI job separately for each fit.) # Note: setting problem to None stops the program, so call finalize(). MPIMapper.timestamps = [] def mapper(points): tstart = time.perf_counter_ns() result = _MPI_map(problem, points, comm, root) tstop = time.perf_counter_ns() MPIMapper.timestamps.append((tstart, tstop)) return result if not MPIMapper.has_problem: # Only true on the first fit # print(f"*** {comm.rank}: replacing problem") # Send an empty set of points to signal a new problem is coming. mapper(np.empty((0, 0), "d")) _MPI_set_problem(problem, comm, root) if problem is None: # print(f"{comm.rank}: finalizing root") MPI.Finalize() MPIMapper.has_problem = False return mapper
[docs] @staticmethod def stop_mapper(mapper=None): # print("stopping mapper") # Set problem=None to stop the program. show_performance(MPIMapper.timestamps) MPIMapper.start_mapper(None, None)