source: XIOS/dev/XIOS_DEV_CMIP6/src/registry.cpp @ 1260

Last change on this file since 1260 was 696, checked in by ymipsl, 9 years ago

Implement CRegistry class to manage restart parameters
YM

  • Property svn:eol-style set to native
File size: 7.8 KB
Line 
1#include "registry.hpp"
2#include "type.hpp"
3#include <mpi.hpp>
4#include <fstream>
5#include <sstream>
6
7namespace xios
8{
9  using namespace std;
10
11  CRegistry::CRegistry(const CRegistry& reg) : communicator(reg.communicator)
12  {
13    for(map<string,pair<size_t,char*> >::const_iterator it=reg.registry.begin() ; it!=reg.registry.end() ; ++it)
14    {
15      char* buffer=new char[it->second.first] ;
16      memcpy(buffer,it->second.second,it->second.first) ;
17      registry.insert(pair<string, pair<size_t,char*> >(it->first, pair<size_t,char*>(it->second.first,buffer))) ; 
18    }
19  }
20
21       
22  void CRegistry::reset()
23  {
24    for(map<string,pair<size_t,char*> >::const_iterator it=registry.begin() ; it!=registry.end() ; ++it)
25    {
26      if (it->second.first>0) delete[] it->second.second ;
27    }
28    registry.clear() ;
29  }
30
31  void CRegistry::setKey_(const string& key_, const CBaseType& value)
32  {
33    const string key=path+key_ ;
34    if (!value.isEmpty())
35    {
36      size_t size=value.size();
37     
38      map<string,pair<size_t,char*> >::iterator it=registry.find(key) ;
39
40      if (it!=registry.end())
41      {
42        delete[] it->second.second ;
43        registry.erase(it) ;
44      }
45
46      char* buffer=new char[size] ;
47      CBufferOut tmpBuff(buffer,size) ;
48      value.toBuffer(tmpBuff) ;
49      registry.insert(pair<string, pair<size_t,char*> >(key, pair<size_t,char*>(size,buffer))) ;
50    }
51  }
52
53  void CRegistry::getKey_(const string& key_, CBaseType& value)
54  {
55    const string key=path+key_ ;
56    size_t size=value.size();
57     
58    map<string,pair<size_t,char*> >::iterator it=registry.find(key) ;
59
60    if (it!=registry.end())
61    {
62      CBufferIn tmpBuff(it->second.second,it->second.first) ;
63      value.fromBuffer(tmpBuff) ;
64    }
65    else value.reset() ;
66  }
67
68  bool CRegistry::foundKey(const string& key_) const
69  {
70    const string key=path+key_ ;
71    map<string,pair<size_t,char*> >::const_iterator it=registry.find(key) ;
72    if (it!=registry.end()) return true ;
73    else return false ;
74  }
75 
76  bool CRegistry::toBuffer(CBufferOut& buffer) const
77  {
78    buffer<<registry.size() ;
79    for(map<string,pair<size_t,char*> >::const_iterator it=registry.begin() ; it!=registry.end() ; ++it)
80    {
81      buffer<<it->first<<it->second.first ;
82      if (!buffer.put(it->second.second,it->second.first)) ERROR("bool CRegistry::toBuffer(CBufferOut& buffer) const)",
83                                                           << "Not enough free space in buffer to queue the data."); 
84    }
85    return true ;
86  }
87
88  size_t CRegistry::size() const
89  {
90    size_t s=0;
91    size_t size_t_size=CType<size_t>(0).size() ;
92    s+=size_t_size ;
93    for(map<string,pair<size_t,char*> >::const_iterator it=registry.begin() ; it!=registry.end() ; ++it)
94      s+=(CType<string>(it->first)).size() + size_t_size + it->second.first ;
95    return s ;
96  }
97
98  void CRegistry::fromString(const string& str)
99  {
100     ERROR("void CRegistry::fromString(const string& str)",<< "This method has not been implemented"); 
101  }
102
103  string CRegistry::toString() const
104  {
105    ostringstream oss;
106   
107    for(map<string,pair<size_t,char*> >::const_iterator it=registry.begin() ; it!=registry.end() ; ++it)
108    {
109      oss<<"Key = "<< it->first  <<" , size : "<<it->second.first<<"  ASCII value : "<<string((char*) it->second.second,it->second.first)<<endl ;
110    }
111    return oss.str() ;
112  }
113
114 
115
116  bool CRegistry::fromBuffer(CBufferIn& buffer)
117  {
118    string key ;
119    size_t size ;
120    char* value ;
121    size_t nKeys ;
122    buffer >> nKeys ;
123    for(size_t i=0;i<nKeys;++i)
124    {
125      buffer>>key>>size ;
126      if (size > 0)
127      {
128        value = new char[size] ;
129        buffer.get(value,size) ;
130      }
131     
132      map<string,pair<size_t,char*> >::iterator it=registry.find(key) ;
133      if (it!=registry.end())
134      {
135        delete[] it->second.second ;
136        registry.erase(it) ;
137      }
138      registry.insert(pair<string, pair<size_t,char*> >(key, pair<size_t,char*>(size,value))) ;       
139    }
140    return true ;
141  } 
142
143  void CRegistry::toFile(const string& filename)
144  {
145    if (registry.size()==0) return ;
146   
147    CBufferOut buffer(this->size()) ;
148    this->toBuffer(buffer) ;
149    ofstream file(filename.c_str(), ofstream::out ) ;
150    size_t size=buffer.count() ;
151    file.write((const char*) &size,sizeof(size)) ;
152    file.write((const char*) buffer.start(),size) ;
153    file.close() ;
154  }
155
156  void CRegistry::fromFile(const string& filename)
157  {
158    ifstream file(filename.c_str(), ifstream::in | ifstream::binary) ;
159    if (!file) return ;
160    size_t size;
161    file.read((char*) &size,sizeof(size)) ;
162   
163    CBufferIn buffer(size) ;
164    file.read((char*) buffer.ptr(),size) ;
165    this->fromBuffer(buffer) ;
166
167    file.close() ;
168  }
169   
170  void CRegistry::mergeRegistry(const CRegistry& inRegistry)
171  {
172    size_t size ;
173    char* value;
174   
175    for(map<string,pair<size_t,char*> >::const_iterator it=inRegistry.registry.begin() ; it!=inRegistry.registry.end() ; ++it)
176    {
177      const string& key=it->first ;
178      map<string,pair<size_t,char*> >::iterator it2=registry.find(key) ;
179      if (it2==registry.end())
180      {
181        size=it->second.first ;
182        value=new char[size] ;
183        memcpy(value,it->second.second,size) ;
184        registry.insert(pair<string, pair<size_t,char*> >(key, pair<size_t,char*>(size,value))) ;     
185      }
186    }
187  }
188
189 
190  void CRegistry::bcastRegistry(void)
191  {
192    int rank ;
193    MPI_Comm_rank(communicator,&rank);
194    if (rank==0)
195    {
196      CBufferOut buffer(this->size()) ;
197      this->toBuffer(buffer) ;
198      int size=buffer.count() ;
199      MPI_Bcast(&size,1,MPI_INT,0,communicator) ;
200      MPI_Bcast(buffer.start(),size,MPI_CHAR,0,communicator) ;
201    }
202    else
203    {
204      int size ;
205      MPI_Bcast(&size,1,MPI_INT,0,communicator) ;
206      CBufferIn buffer(size) ;
207      MPI_Bcast(buffer.start(),size,MPI_CHAR,0,communicator) ;
208      this->fromBuffer(buffer) ;
209    }
210  }
211  void CRegistry::gatherRegistry(void)
212  {
213    gatherRegistry(communicator) ;
214  }
215
216  void CRegistry::gatherRegistry(const MPI_Comm& comm)
217  {
218    int rank,mpiSize ;
219    MPI_Comm_rank(comm,&rank);
220    MPI_Comm_size(comm,&mpiSize);
221
222    int* sizes=new int[mpiSize] ;
223    CBufferOut localBuffer(this->size()) ;
224    this->toBuffer(localBuffer) ;
225    int localSize=localBuffer.count() ;
226    MPI_Gather(&localSize,1,MPI_INT,sizes,1,MPI_INT,0,comm) ;
227
228    char* globalBuffer ;
229    int*   displs ;
230   
231    if (rank==0)
232    {
233      size_t globalBufferSize=0 ;
234      displs=new int[mpiSize] ;
235      for (int i=0;i<mpiSize;++i)
236      {
237        displs[i]=globalBufferSize ;
238        globalBufferSize+=sizes[i] ;
239      }
240
241      globalBuffer=new char[globalBufferSize] ;
242      MPI_Gatherv(localBuffer.start(),localSize,MPI_CHAR,globalBuffer,sizes,displs,MPI_CHAR,0,comm) ;
243      for(int i=1;i<mpiSize;++i)
244      {
245        CBufferIn buffer(globalBuffer+displs[i],sizes[i]) ;
246        CRegistry reg ;
247        reg.fromBuffer(buffer) ;
248        mergeRegistry(reg) ;
249      }
250      delete[] displs ;
251      delete[] globalBuffer ;
252    }
253    else  MPI_Gatherv(localBuffer.start(),localSize,MPI_CHAR,globalBuffer,sizes,displs,MPI_CHAR,0,comm) ;   
254    delete[] sizes ;
255   
256  }
257
258  void CRegistry::hierarchicalGatherRegistry(void)
259  {
260    hierarchicalGatherRegistry(communicator) ;
261  }
262
263  void CRegistry::hierarchicalGatherRegistry(const MPI_Comm& comm)
264  {
265    int mpiRank,mpiSize ;
266    MPI_Comm_rank(comm,&mpiRank);
267    MPI_Comm_size(comm,&mpiSize);   
268
269    if (mpiSize>2)
270    {
271      int color ;
272      if (mpiRank<mpiSize/2+mpiSize%2) color=0 ;
273      else color=1 ;
274      MPI_Comm commUp ;
275      MPI_Comm_split(comm,color,mpiRank,&commUp) ,
276      hierarchicalGatherRegistry(commUp) ;
277      MPI_Comm_free(&commUp) ;
278    }
279
280    if (mpiSize>1)
281    {
282      MPI_Comm commDown ;
283      int color ;
284     
285      if (mpiRank==0 || mpiRank==mpiSize/2+mpiSize%2) color=0 ;
286      else color=1 ;
287      MPI_Comm_split(comm,color,mpiRank,&commDown) ;
288      if (color==0) gatherRegistry(commDown) ;
289      MPI_Comm_free(&commDown) ;   
290    }
291  }
292
293}
Note: See TracBrowser for help on using the repository browser.