source: codes/icosagcm/devel/Python/dynamico/parallel.py @ 615

Last change on this file since 615 was 615, checked in by dubos, 6 years ago

devel/unstructured : import Python layer from HEAT

File size: 8.4 KB
Line 
1from itertools import groupby
2import numpy as np
3
4def inverse_list(lst): return {j:i for i,j in enumerate(lst)}
5def ordered_list(lst): 
6        lst = sorted(zip( lst, range(len(lst)) ))
7        lst, order = zip(*lst)
8        order = inverse_list(order)
9        order = [order[i] for i in range(len(lst))] # dict->list
10        return lst,order 
11   
12#----------------------- Parallel access to distributed array ------------------------------#
13
14# The classes PDim, Get_Indices and PArrayX are used to read NetCDF arrays in parallel
15# Each MPI process reads a contiguous chunk of the array once.
16# Classes CstPArrayX emulate constant arrays that are not stored in a NetCDF file.
17# With class LocArray1D, local data is copied from a user-provided array, rather than from a NetCDF file.
18# Using the class Get_Indices, each MPI process can obtain data for an arbitrary list of indices
19# Data is not re-read from file but scattered via MPI all2all
20
21class PDim:
22    def __init__(self, ncdim, comm):
23        mpi_rank, mpi_size = comm.Get_rank(), comm.Get_size()
24        self.ncdim, n = ncdim, len(ncdim)
25        vtxdist = [(n*i)/mpi_size for i in range(mpi_size+1)]
26        self.n, self.vtxdist = n, np.asarray(vtxdist, dtype=np.int32)
27        self.comm, self.start, self.end = comm, vtxdist[mpi_rank], vtxdist[mpi_rank+1]
28    def getter(self, index_list): return Get_Indices(self, index_list)
29    def get(self, index_list, data):
30        getter=self.getter(index_list)
31        return getter(data)
32class Get_Indices:
33    def __init__(self, dim, index_list): # list MUST be a list of unique indices
34        index_list, self.order = ordered_list(index_list)
35        comm, vtxdist = dim.comm, dim.vtxdist
36        list_local = [ [ j-vtxdist[i] for j in index_list if j>=vtxdist[i] and j<vtxdist[i+1]] 
37                      for i in range(len(vtxdist)-1) ]
38        list_send = comm.alltoall(list_local)
39        self.comm, self.n, self.list_n = comm, len(index_list), [len(i) for i in list_local]
40        self.list_local, self.list_send, self.dict = list_local, list_send, inverse_list(index_list)
41        self.vtxdist=vtxdist
42    def get(self, get_data, put_data):
43        data_send = [get_data(indices) for indices in self.list_send]
44        data_recv = self.comm.alltoall(data_send)
45        start=0
46        for data,n in zip(data_recv,self.list_n):
47            if n>0:
48                end = start + n
49                put_data(start,end,data)
50                start=end
51    def __call__(indices, self): # self is a PArrayND
52        shape = list(self.data.shape)
53        shape[0] = indices.n
54        self.data_out = np.zeros(shape, dtype=self.data.dtype)
55        indices.get(self.get_data, self.put_data )
56#        return self.data_out
57        return self.reorder(indices.order)
58
59class PArrayND:
60    def init_data(self, dim, data):
61        self.dim, self.data = dim, np.asarray(data) # local chunk of data
62    def max(self):
63        max_loc = self.data.max()
64        self.dim.comm.allreduce
65
66class PArray1D(PArrayND):
67    def __init__(self, dim, data): self.init_data(dim, data[dim.start:dim.end])
68    def get_data(self, indices): return np.array(self.data[indices])
69    def put_data(self, start, end, data): self.data_out[start:end]=data
70    def reorder(self, order): return self.data_out[order]
71class LocPArray1D(PArray1D):
72    def __init__(self, dim, data): self.init_data(dim,data)
73class CstPArray1D(PArray1D):
74    def __init__(self, dim, dtype, val): 
75        self.init_data(dim,np.full( (dim.end-dim.start,), val, dtype=dtype))
76
77class PArray2D(PArrayND):
78    def __init__(self, dim, data): self.init_data(dim, data[dim.start:dim.end,:])
79    def get_data(self, indices): return self.data[indices,:]
80    def put_data(self, start, end, data): self.data_out[start:end]=data
81    def reorder(self, order): return self.data_out[order,:]
82
83def PArray(dim, data): return {1:PArray1D, 2:PArray2D}[len(data.shape)](dim,data)
84
85#-------------------------------------- Halo management -----------------------------------#
86
87# Classes LocalDim, LocalArrayX and Halo_Xchange are used to
88# store local arrays with halos and update halos, respectively
89# A LocalDim instance is created using a list of (global indices of) cells. The instance contains
90# a lookup table associating a local index to a global index.
91# A Halo_Xchange instance is created first based on a LocalDim and a partitioning table giving MPI process owning each cell.
92# This instance can the be used to create instances of Halo_Xchange.
93# LocalArray data can be initialized by reading from a PArray or a NumPy array containing local values.
94# The update() methods updates the halo using point-to-point MPI communications (send/recv)
95
96class LocalDim:
97    def __init__(self, dim, cells):
98        # dim : a PDim instance ; cells : a list of (global indices of) cells, in any order
99        self.dim, self.loc2glob, self.glob2loc = dim, cells, inverse_list(list)
100       
101def dict2list(size, thedict, default):
102    lst=[default]*size
103    for rank,val in thedict.items() : lst[rank]=val
104    return lst
105
106class Halo_Xchange:
107    def __init__(self, tag, dim, cells, part, reorder_cells=False): 
108        # cells = global indices, part = rank owning each cell
109        # if reorder_cells is True, cells will be reordered so that each halo is contiguous
110        # reordering is not recommended if cells is a telescopic sum of increasing sets C0<C1<C2 ...
111        comm = dim.comm
112        mpi_size, mpi_rank = comm.Get_size(), comm.Get_rank()
113        self.comm, self.tag, self.reorder_cell = comm, tag, reorder_cells
114        # sort cells by rank, keep track of their global and local index
115        recv = sorted(zip(part,cells,range(len(cells))))
116        # group by MPI rank ; lst is a list of (rank,global,local) tuples so zip(*it) is a tuple of lists (rank,global,local)
117        recv = { rank : zip(*lst) for rank,lst in groupby(recv,lambda x:x[0]) }
118        # remove mpi_rank from dict and get list of own cells
119        junk, own_global, own_local = recv.pop(mpi_rank)
120        own_len = len(own_local)
121        if reorder_cells :  own_local = range(own_len)
122        self.own_len, self.own_local, self.get_own = own_len, list(own_local), Get_Indices(dim, own_global) 
123        # figure out data we want to receive from other CPUs
124        recv_rank = sorted(recv.keys())
125        recv_glob = { rank : recv[rank][1] for rank in recv_rank} # global indices of cells to receive
126        recv_len = { rank : len(recv_glob[rank]) for rank in recv_rank}
127        if reorder_cells :
128            recv_loc, start = {}, own_len
129            for rank in recv_rank:
130                end = start+recv_len[rank]
131                recv_loc[rank] = range(start,end)
132                start=end
133            cells = sum([recv_glob[rank] for rank in recv_rank], own_global)
134        else:
135            recv_loc = { rank : recv[rank][2] for rank in recv_rank}
136        self.recv_list = [(rank, list(recv_loc[rank])) for rank in recv_rank]
137        self.cells, self.get_all = cells, Get_Indices(dim, cells)
138        # now figure out the data we must send
139        send=comm.alltoall(dict2list(mpi_size, recv_glob, []))
140        send_len = [len(lst) for lst in send]
141        send_rank = [rank for rank in range(mpi_size) if (send_len[rank]>0)] # CPUs asking for data
142        print 'send_rank %d'%mpi_rank, send_rank
143        # asssociate local index to global index
144        own_dict = { glob:loc for glob,loc in zip(own_global, own_local) }
145        self.send_list = [(rank, [own_dict[i] for i in send[rank]]) for rank in send_rank]                   
146       
147class LocalArray: # a base class for arrays with halos
148    def read_own(self, parray): self.put(self.halo.own_local, self.halo.get_own(parray))
149    def read_all(self, parray): self.data = self.halo.get_all(parray)
150    def update(self):
151        halo=self.halo
152        comm, tag = halo.comm, halo.tag
153        for rank,loc in halo.send_list:
154            comm.send(self.get(loc), dest=rank, tag=tag)
155        for rank,loc in halo.recv_list:
156            self.put(loc, comm.recv(source=rank, tag=tag) )
157       
158class LocalArray1(LocalArray): # a 1D array with halo
159    def __init__(self, halo):
160        self.halo, self.data = halo, np.zeros((len(halo.cells),))
161    def get(self, cells): return np.asarray([self.data[i] for i in cells])
162    def put(self,cells,data): self.data[cells]=data
163class LocalArray2(LocalArray): # a 2D array with halo
164    def __init__(self, halo, llm):
165        self.halo, self.llm, self.data = halo, llm, np.array((halo.local_len,llm))
166    def get(self, cells): return self.data[cells,:]
167    def put(self,cells,data): self.data[cells,:]=data
Note: See TracBrowser for help on using the repository browser.