source: XIOS/dev/branch_openmp/src/registry.cpp @ 1482

Last change on this file since 1482 was 1464, checked in by yushan, 6 years ago

bug fix for server to correctly finalize. LMDZ OK.

  • Property svn:eol-style set to native
File size: 7.9 KB
Line 
1#include "registry.hpp"
2#include "type.hpp"
3#include <mpi.hpp>
4#include <fstream>
5#include <sstream>
6using namespace ep_lib;
7
8namespace xios
9{
10  using namespace std;
11
12  CRegistry::CRegistry(const CRegistry& reg) : communicator(reg.communicator)
13  {
14    for(map<string,pair<size_t,char*> >::const_iterator it=reg.registry.begin() ; it!=reg.registry.end() ; ++it)
15    {
16      char* buffer=new char[it->second.first] ;
17      memcpy(buffer,it->second.second,it->second.first) ;
18      registry.insert(pair<string, pair<size_t,char*> >(it->first, pair<size_t,char*>(it->second.first,buffer))) ; 
19    }
20  }
21
22       
23  void CRegistry::reset()
24  {
25    for(map<string,pair<size_t,char*> >::const_iterator it=registry.begin() ; it!=registry.end() ; ++it)
26    {
27      if (it->second.first>0) delete[] it->second.second ;
28    }
29    registry.clear() ;
30  }
31
32  void CRegistry::setKey_(const string& key_, const CBaseType& value)
33  {
34    const string key=path+key_ ;
35    if (!value.isEmpty())
36    {
37      size_t size=value.size();
38     
39      map<string,pair<size_t,char*> >::iterator it=registry.find(key) ;
40
41      if (it!=registry.end())
42      {
43        delete[] it->second.second ;
44        registry.erase(it) ;
45      }
46
47      char* buffer=new char[size] ;
48      CBufferOut tmpBuff(buffer,size) ;
49      value.toBuffer(tmpBuff) ;
50      registry.insert(pair<string, pair<size_t,char*> >(key, pair<size_t,char*>(size,buffer))) ;
51    }
52  }
53
54  void CRegistry::getKey_(const string& key_, CBaseType& value)
55  {
56    const string key=path+key_ ;
57    size_t size=value.size();
58     
59    map<string,pair<size_t,char*> >::iterator it=registry.find(key) ;
60
61    if (it!=registry.end())
62    {
63      CBufferIn tmpBuff(it->second.second,it->second.first) ;
64      value.fromBuffer(tmpBuff) ;
65    }
66    else value.reset() ;
67  }
68
69  bool CRegistry::foundKey(const string& key_) const
70  {
71    const string key=path+key_ ;
72    map<string,pair<size_t,char*> >::const_iterator it=registry.find(key) ;
73    if (it!=registry.end()) return true ;
74    else return false ;
75  }
76 
77  bool CRegistry::toBuffer(CBufferOut& buffer) const
78  {
79    buffer<<registry.size() ;
80    for(map<string,pair<size_t,char*> >::const_iterator it=registry.begin() ; it!=registry.end() ; ++it)
81    {
82      buffer<<it->first<<it->second.first ;
83      if (!buffer.put(it->second.second,it->second.first)) ERROR("bool CRegistry::toBuffer(CBufferOut& buffer) const)",
84                                                           << "Not enough free space in buffer to queue the data."); 
85    }
86    return true ;
87  }
88
89  size_t CRegistry::size() const
90  {
91    size_t s=0;
92    size_t size_t_size=CType<size_t>(0).size() ;
93    s+=size_t_size ;
94    for(map<string,pair<size_t,char*> >::const_iterator it=registry.begin() ; it!=registry.end() ; ++it)
95      s+=(CType<string>(it->first)).size() + size_t_size + it->second.first ;
96    return s ;
97  }
98
99  void CRegistry::fromString(const string& str)
100  {
101     ERROR("void CRegistry::fromString(const string& str)",<< "This method has not been implemented"); 
102  }
103
104  string CRegistry::toString() const
105  {
106    ostringstream oss;
107   
108    for(map<string,pair<size_t,char*> >::const_iterator it=registry.begin() ; it!=registry.end() ; ++it)
109    {
110      oss<<"Key = "<< it->first  <<" , size : "<<it->second.first<<"  ASCII value : "<<string((char*) it->second.second,it->second.first)<<endl ;
111    }
112    return oss.str() ;
113  }
114
115 
116
117  bool CRegistry::fromBuffer(CBufferIn& buffer)
118  {
119    string key ;
120    size_t size ;
121    char* value ;
122    size_t nKeys ;
123    buffer >> nKeys ;
124    for(size_t i=0;i<nKeys;++i)
125    {
126      buffer>>key>>size ;
127      if (size > 0)
128      {
129        value = new char[size] ;
130        buffer.get(value,size) ;
131      }
132     
133      map<string,pair<size_t,char*> >::iterator it=registry.find(key) ;
134      if (it!=registry.end())
135      {
136        delete[] it->second.second ;
137        registry.erase(it) ;
138      }
139      registry.insert(pair<string, pair<size_t,char*> >(key, pair<size_t,char*>(size,value))) ;       
140    }
141    return true ;
142  } 
143
144  void CRegistry::toFile(const string& filename)
145  {
146    if (registry.size()==0) return ;
147   
148    CBufferOut buffer(this->size()) ;
149    this->toBuffer(buffer) ;
150    ofstream file(filename.c_str(), ofstream::out ) ;
151    size_t size=buffer.count() ;
152    file.write((const char*) &size,sizeof(size)) ;
153    file.write((const char*) buffer.start(),size) ;
154    file.close() ;
155  }
156
157  void CRegistry::fromFile(const string& filename)
158  {
159    ifstream file(filename.c_str(), ifstream::in | ifstream::binary) ;
160    if (!file) return ;
161    size_t size;
162    file.read((char*) &size,sizeof(size)) ;
163   
164    CBufferIn buffer(size) ;
165    file.read((char*) buffer.ptr(),size) ;
166    this->fromBuffer(buffer) ;
167
168    file.close() ;
169  }
170   
171  void CRegistry::mergeRegistry(const CRegistry& inRegistry)
172  {
173    size_t size ;
174    char* value;
175   
176    for(map<string,pair<size_t,char*> >::const_iterator it=inRegistry.registry.begin() ; it!=inRegistry.registry.end() ; ++it)
177    {
178      const string& key=it->first ;
179      map<string,pair<size_t,char*> >::iterator it2=registry.find(key) ;
180      if (it2==registry.end())
181      {
182        size=it->second.first ;
183        value=new char[size] ;
184        memcpy(value,it->second.second,size) ;
185        registry.insert(pair<string, pair<size_t,char*> >(key, pair<size_t,char*>(size,value))) ;     
186      }
187    }
188  }
189
190 
191  void CRegistry::bcastRegistry(void)
192  {
193    int rank ;
194    MPI_Comm_rank(communicator,&rank);
195    if (rank==0)
196    {
197      CBufferOut buffer(this->size()) ;
198      this->toBuffer(buffer) ;
199      int size=buffer.count() ;
200      MPI_Bcast(&size,1,MPI_INT,0,communicator) ;
201      MPI_Bcast(buffer.start(),size,MPI_CHAR,0,communicator) ;
202    }
203    else
204    {
205      int size ;
206      MPI_Bcast(&size,1,MPI_INT,0,communicator) ;
207      CBufferIn buffer(size) ;
208      MPI_Bcast(buffer.start(),size,MPI_CHAR,0,communicator) ;
209      this->fromBuffer(buffer) ;
210    }
211  }
212  void CRegistry::gatherRegistry(void)
213  {
214    gatherRegistry(communicator) ;
215  }
216
217  void CRegistry::gatherRegistry(const MPI_Comm& comm)
218  {
219    int rank,mpiSize ;
220    MPI_Comm_rank(comm,&rank);
221    MPI_Comm_size(comm,&mpiSize);
222
223    int* sizes=new int[mpiSize] ;
224    CBufferOut localBuffer(this->size()) ;
225    this->toBuffer(localBuffer) ;
226    int localSize=localBuffer.count() ;
227    MPI_Gather(&localSize,1,MPI_INT,sizes,1,MPI_INT,0,comm) ;
228
229    char* globalBuffer ;
230    int*   displs ;
231   
232    if (rank==0)
233    {
234      size_t globalBufferSize=0 ;
235      displs=new int[mpiSize] ;
236      for (int i=0;i<mpiSize;++i)
237      {
238        displs[i]=globalBufferSize ;
239        globalBufferSize+=sizes[i] ;
240      }
241
242      globalBuffer=new char[globalBufferSize] ;
243      MPI_Gatherv(localBuffer.start(),localSize,MPI_CHAR,globalBuffer,sizes,displs,MPI_CHAR,0,comm) ;
244      for(int i=1;i<mpiSize;++i)
245      {
246        CBufferIn buffer(globalBuffer+displs[i],sizes[i]) ;
247        CRegistry reg ;
248        reg.fromBuffer(buffer) ;
249        mergeRegistry(reg) ;
250      }
251      delete[] displs ;
252      delete[] globalBuffer ;
253    }
254    else  MPI_Gatherv(localBuffer.start(),localSize,MPI_CHAR,globalBuffer,sizes,displs,MPI_CHAR,0,comm) ;   
255    delete[] sizes ;
256   
257  }
258
259  void CRegistry::hierarchicalGatherRegistry(void)
260  {
261    //hierarchicalGatherRegistry(communicator) ;
262    gatherRegistry(communicator) ;
263  }
264
265  void CRegistry::hierarchicalGatherRegistry(const MPI_Comm& comm)
266  {
267    int mpiRank,mpiSize ;
268    MPI_Comm_rank(comm,&mpiRank);
269    MPI_Comm_size(comm,&mpiSize);   
270
271    if (mpiSize>2)
272    {
273      int color ;
274      if (mpiRank<mpiSize/2+mpiSize%2) color=0 ;
275      else color=1 ;
276      MPI_Comm commUp ;
277      MPI_Comm_split(comm,color,mpiRank,&commUp) ,
278      hierarchicalGatherRegistry(commUp) ;
279      MPI_Comm_free(&commUp) ;
280    }
281
282    if (mpiSize>1)
283    {
284      MPI_Comm commDown ;
285      int color ;
286     
287      if (mpiRank==0 || mpiRank==mpiSize/2+mpiSize%2) color=0 ;
288      else color=1 ;
289      MPI_Comm_split(comm,color,mpiRank,&commDown) ;
290      if (color==0) gatherRegistry(commDown) ;
291      MPI_Comm_free(&commDown) ;   
292    }
293  }
294
295}
Note: See TracBrowser for help on using the repository browser.