source: codes/icosagcm/devel/Python/dynamico/parallel.py

Last change on this file was 825, checked in by dubos, 5 years ago

devel/Python : moved Fortran bindings and *.pyx to dynamico/dev module + necessary changes to test/py/*.py

File size: 9.7 KB
Line 
1from itertools import groupby
2import numpy as np
3
4def inverse_list(lst): return {j:i for i,j in enumerate(lst)}
5def ordered_list(lst): 
6        lst = sorted(zip( lst, range(len(lst)) ))
7        lst, order = zip(*lst)
8        order = inverse_list(order)
9        order = [order[i] for i in range(len(lst))] # dict->list
10        return lst,order 
11
12from dynamico import getargs
13log_master, log_world = getargs.getLogger(__name__)
14INFO, DEBUG, ERROR = log_master.info, log_master.debug, log_world.error
15INFO_ALL, DEBUG_ALL = log_world.info, log_world.debug
16
17#----------------------- Parallel access to distributed array ------------------------------#
18
19# The classes PDim, Get_Indices and PArrayX are used to read NetCDF arrays in parallel
20# Each MPI process reads a contiguous chunk of the array once.
21# Classes CstPArrayX emulate constant arrays that are not stored in a NetCDF file.
22# With class LocArray1D, local data is copied from a user-provided array, rather than from a NetCDF file.
23# Using the class Get_Indices, each MPI process can obtain data for an arbitrary list of indices
24# Data is not re-read from file but scattered via MPI all2all
25
26class PDim:
27    def __init__(self, ncdim, comm):
28        mpi_rank, mpi_size = comm.Get_rank(), comm.Get_size()
29        self.ncdim, n = ncdim, len(ncdim)
30        vtxdist = [(n*i)/mpi_size for i in range(mpi_size+1)]
31        self.n, self.vtxdist = n, np.asarray(vtxdist, dtype=np.int32)
32        self.comm, self.start, self.end = comm, vtxdist[mpi_rank], vtxdist[mpi_rank+1]
33    def getter(self, index_list): return Get_Indices(self, index_list)
34    def get(self, index_list, data):
35        getter=self.getter(index_list)
36        return getter(data)
37class Get_Indices:
38    def __init__(self, dim, index_list): # list MUST be a list of unique indices
39        index_list, self.order = ordered_list(index_list)
40        comm, vtxdist = dim.comm, dim.vtxdist
41        self.comm, self.n, self.vtxdist = comm, len(index_list), vtxdist
42        list_local = [ [ j-vtxdist[i] for j in index_list if j>=vtxdist[i] and j<vtxdist[i+1]] 
43                      for i in range(len(vtxdist)-1) ]
44        self.list_local, self.list_n = list_local, map(len, list_local)
45        list_send = comm.alltoall(list_local) # may modify list_local !!
46        self.list_send, self.dict = list_send, inverse_list(index_list)
47    def get(self, get_data, put_data):
48        data_send = [get_data(indices) for indices in self.list_send]
49        data_recv = self.comm.alltoall(data_send)
50        start=0
51        for data,n in zip(data_recv,self.list_n):
52            if n>0:
53                end = start + n
54                put_data(start,end,data)
55                start=end
56    def __call__(indices, self): # self is a PArrayND
57        shape = list(self.data.shape)
58        shape[0] = indices.n
59        self.data_out = np.zeros(shape, dtype=self.data.dtype)
60        indices.get(self.get_data, self.put_data )
61#        return self.data_out
62        return self.reorder(indices.order)
63
64class PArrayND:
65    def init_data(self, dim, data):
66        self.dim, self.data = dim, np.asarray(data) # local chunk of data
67    def max(self):
68        max_loc = self.data.max()
69        self.dim.comm.allreduce
70
71class PArray1D(PArrayND):
72    def __init__(self, dim, data): self.init_data(dim, data[dim.start:dim.end])
73    def get_data(self, indices): return np.array(self.data[indices])
74    def put_data(self, start, end, data): self.data_out[start:end]=data
75    def reorder(self, order): return self.data_out[order]
76class LocPArray1D(PArray1D):
77    def __init__(self, dim, data): self.init_data(dim,data)
78class CstPArray1D(PArray1D):
79    def __init__(self, dim, dtype, val): 
80        self.init_data(dim,np.full( (dim.end-dim.start,), val, dtype=dtype))
81
82class PArray2D(PArrayND):
83    def __init__(self, dim, data): self.init_data(dim, data[dim.start:dim.end,:])
84    def get_data(self, indices): return self.data[indices,:]
85    def put_data(self, start, end, data): self.data_out[start:end]=data
86    def reorder(self, order): return self.data_out[order,:]
87
88def PArray(dim, data): return {1:PArray1D, 2:PArray2D}[len(data.shape)](dim,data)
89
90#-------------------------------------- Halo management -----------------------------------#
91
92# Classes LocalDim, LocalArrayX and Halo_Xchange are used to
93# store local arrays with halos and update halos, respectively
94# A LocalDim instance is created using a list of (global indices of) cells. The instance contains
95# a lookup table associating a local index to a global index.
96# A Halo_Xchange instance is created first based on a LocalDim and a partitioning table giving MPI process owning each cell.
97# This instance can the be used to create instances of Halo_Xchange.
98# LocalArray data can be initialized by reading from a PArray or a NumPy array containing local values.
99# The update() methods updates the halo using point-to-point MPI communications (send/recv)
100
101class LocalDim:
102    def __init__(self, dim, cells):
103        # dim : a PDim instance ; cells : a list of (global indices of) cells, in any order
104        self.dim, self.loc2glob, self.glob2loc = dim, cells, inverse_list(list)
105       
106def dict2list(size, thedict, default):
107    lst=[default]*size
108    for rank,val in thedict.items() : lst[rank]=val
109    return lst
110
111class Halo_Xchange:
112    def __init__(self, tag, dim, cells, part, reorder_cells=False): 
113        # cells = global indices, part = rank owning each cell
114        # if reorder_cells is True, cells will be reordered so that each halo is contiguous
115        # reordering is not recommended if cells is a telescopic sum of increasing sets C0<C1<C2 ...
116        comm = dim.comm
117        mpi_size, mpi_rank = comm.Get_size(), comm.Get_rank()
118        self.comm, self.tag, self.reorder_cell = comm, tag, reorder_cells
119        # sort cells by rank, keep track of their global and local index
120        recv = sorted(zip(part,cells,range(len(cells))))
121        # group by MPI rank ; lst is a list of (rank,global,local) tuples so zip(*it) is a tuple of lists (rank,global,local)
122        recv = { rank : zip(*lst) for rank,lst in groupby(recv,lambda x:x[0]) }
123        # remove mpi_rank from dict and get list of own cells
124        junk, own_global, own_local = recv.pop(mpi_rank)
125        own_len = len(own_local)
126        if reorder_cells :  own_local = range(own_len)
127        self.own_len, self.own_local, self.get_own = own_len, list(own_local), Get_Indices(dim, own_global) 
128        # figure out data we want to receive from other CPUs
129        recv_rank = sorted(recv.keys())
130        recv_glob = { rank : recv[rank][1] for rank in recv_rank} # global indices of cells to receive
131        recv_len = { rank : len(recv_glob[rank]) for rank in recv_rank}
132        if reorder_cells :
133            recv_loc, start = {}, own_len
134            for rank in recv_rank:
135                end = start+recv_len[rank]
136                recv_loc[rank] = range(start,end)
137                start=end
138            cells = sum([recv_glob[rank] for rank in recv_rank], own_global)
139        else:
140            recv_loc = { rank : recv[rank][2] for rank in recv_rank}
141        self.recv_list = [(rank, list(recv_loc[rank])) for rank in recv_rank]
142        self.cells, self.get_all = cells, Get_Indices(dim, cells)
143        # now figure out the data we must send
144        send=comm.alltoall(dict2list(mpi_size, recv_glob, []))
145        send_len = [len(lst) for lst in send]
146        send_rank = [rank for rank in range(mpi_size) if (send_len[rank]>0)] # CPUs asking for data
147        DEBUG('send_rank %d %s'%(mpi_rank, send_rank))
148        # asssociate local index to global index
149        own_dict = { glob:loc for glob,loc in zip(own_global, own_local) }
150        self.send_list = [(rank, [own_dict[i] for i in send[rank]]) for rank in send_rank]
151    def set_dynamico_transfer(self,name):
152        def list_to_args(lst):
153            ranks, lst = ([], []) if lst == [] else zip(*lst) # list of tuples => tuple of lists
154            lens = map(len, lst)
155            return len(ranks), ranks, lens, sum(lens), sum(lst,[])
156        index = { 'primal':1, 'edge':2, 'dual':3 }[name]
157        self.index = index
158        send_num, send_rank, send_len, send_size, send_list = list_to_args(self.send_list) 
159        recv_num, recv_rank, recv_len, recv_size, recv_list = list_to_args(self.recv_list) 
160        send_rank, send_len, send_list, recv_rank, recv_len, recv_list = [ np.asarray(x, dtype=np.int32)
161            for x in send_rank, send_len, send_list, recv_rank, recv_len, recv_list ]
162        send_list, recv_list = send_list+1, recv_list+1 # Fortran expects that indices start at 1
163        self.dynamico_init_transfer(index, send_num, send_size, send_rank, send_len, send_list,
164                                       recv_num, recv_size, recv_rank, recv_len, recv_list)
165    def dynamico_init_transfer(*args) : pass # overriden in dev.parallel
166
167class LocalArray: # a base class for arrays with halos
168    def read_own(self, parray): self.put(self.halo.own_local, self.halo.get_own(parray))
169    def read_all(self, parray): self.data = self.halo.get_all(parray)
170    def update(self):
171        halo=self.halo
172        comm, tag = halo.comm, halo.tag
173        for rank,loc in halo.send_list:
174            comm.send(self.get(loc), dest=rank, tag=tag)
175        for rank,loc in halo.recv_list:
176            self.put(loc, comm.recv(source=rank, tag=tag) )
177       
178class LocalArray1(LocalArray): # a 1D array with halo
179    def __init__(self, halo):
180        self.halo, self.data = halo, np.zeros((len(halo.cells),))
181    def get(self, cells): return np.asarray([self.data[i] for i in cells])
182    def put(self,cells,data): self.data[cells]=data
183class LocalArray2(LocalArray): # a 2D array with halo
184    def __init__(self, halo, llm):
185        self.halo, self.llm, self.data = halo, llm, np.array((halo.local_len,llm))
186    def get(self, cells): return self.data[cells,:]
187    def put(self,cells,data): self.data[cells,:]=data
Note: See TracBrowser for help on using the repository browser.