source: XIOS/dev/branch_yushan_merged/src/registry.cpp @ 1134

Last change on this file since 1134 was 1134, checked in by yushan, 7 years ago

branch merged with trunk r1130

  • Property svn:eol-style set to native
File size: 7.9 KB
Line 
1#include "registry.hpp"
2#include "type.hpp"
3#include <fstream>
4#include <sstream>
5
6namespace xios
7{
8  using namespace std;
9
10  CRegistry::CRegistry(const CRegistry& reg) : communicator(reg.communicator)
11  {
12    for(map<string,pair<size_t,char*> >::const_iterator it=reg.registry.begin() ; it!=reg.registry.end() ; ++it)
13    {
14      char* buffer=new char[it->second.first] ;
15      memcpy(buffer,it->second.second,it->second.first) ;
16      registry.insert(pair<string, pair<size_t,char*> >(it->first, pair<size_t,char*>(it->second.first,buffer))) ; 
17    }
18  }
19
20       
21  void CRegistry::reset()
22  {
23    for(map<string,pair<size_t,char*> >::const_iterator it=registry.begin() ; it!=registry.end() ; ++it)
24    {
25      if (it->second.first>0) delete[] it->second.second ;
26    }
27    registry.clear() ;
28  }
29
30  void CRegistry::setKey_(const string& key_, const CBaseType& value)
31  {
32    const string key=path+key_ ;
33    if (!value.isEmpty())
34    {
35      size_t size=value.size();
36     
37      map<string,pair<size_t,char*> >::iterator it=registry.find(key) ;
38
39      if (it!=registry.end())
40      {
41        delete[] it->second.second ;
42        registry.erase(it) ;
43      }
44
45      char* buffer=new char[size] ;
46      CBufferOut tmpBuff(buffer,size) ;
47      value.toBuffer(tmpBuff) ;
48      registry.insert(pair<string, pair<size_t,char*> >(key, pair<size_t,char*>(size,buffer))) ;
49    }
50  }
51
52  void CRegistry::getKey_(const string& key_, CBaseType& value)
53  {
54    const string key=path+key_ ;
55    size_t size=value.size();
56     
57    map<string,pair<size_t,char*> >::iterator it=registry.find(key) ;
58
59    if (it!=registry.end())
60    {
61      CBufferIn tmpBuff(it->second.second,it->second.first) ;
62      value.fromBuffer(tmpBuff) ;
63    }
64    else value.reset() ;
65  }
66
67  bool CRegistry::foundKey(const string& key_) const
68  {
69    const string key=path+key_ ;
70    map<string,pair<size_t,char*> >::const_iterator it=registry.find(key) ;
71    if (it!=registry.end()) return true ;
72    else return false ;
73  }
74 
75  bool CRegistry::toBuffer(CBufferOut& buffer) const
76  {
77    buffer<<registry.size() ;
78    for(map<string,pair<size_t,char*> >::const_iterator it=registry.begin() ; it!=registry.end() ; ++it)
79    {
80      buffer<<it->first<<it->second.first ;
81      if (!buffer.put(it->second.second,it->second.first)) ERROR("bool CRegistry::toBuffer(CBufferOut& buffer) const)",
82                                                           << "Not enough free space in buffer to queue the data."); 
83    }
84    return true ;
85  }
86
87  size_t CRegistry::size() const
88  {
89    size_t s=0;
90    size_t size_t_size=CType<size_t>(0).size() ;
91    s+=size_t_size ;
92    for(map<string,pair<size_t,char*> >::const_iterator it=registry.begin() ; it!=registry.end() ; ++it)
93      s+=(CType<string>(it->first)).size() + size_t_size + it->second.first ;
94    return s ;
95  }
96
97  void CRegistry::fromString(const string& str)
98  {
99     ERROR("void CRegistry::fromString(const string& str)",<< "This method has not been implemented"); 
100  }
101
102  string CRegistry::toString() const
103  {
104    ostringstream oss;
105   
106    for(map<string,pair<size_t,char*> >::const_iterator it=registry.begin() ; it!=registry.end() ; ++it)
107    {
108      oss<<"Key = "<< it->first  <<" , size : "<<it->second.first<<"  ASCII value : "<<string((char*) it->second.second,it->second.first)<<endl ;
109    }
110    return oss.str() ;
111  }
112
113 
114
115  bool CRegistry::fromBuffer(CBufferIn& buffer)
116  {
117    string key ;
118    size_t size ;
119    char* value ;
120    size_t nKeys ;
121    buffer >> nKeys ;
122    for(size_t i=0;i<nKeys;++i)
123    {
124      buffer>>key>>size ;
125      if (size > 0)
126      {
127        value = new char[size] ;
128        buffer.get(value,size) ;
129      }
130     
131      map<string,pair<size_t,char*> >::iterator it=registry.find(key) ;
132      if (it!=registry.end())
133      {
134        delete[] it->second.second ;
135        registry.erase(it) ;
136      }
137      registry.insert(pair<string, pair<size_t,char*> >(key, pair<size_t,char*>(size,value))) ;       
138    }
139    return true ;
140  } 
141
142  void CRegistry::toFile(const string& filename)
143  {
144    if (registry.size()==0) return ;
145   
146    CBufferOut buffer(this->size()) ;
147    this->toBuffer(buffer) ;
148    ofstream file(filename.c_str(), ofstream::out ) ;
149    size_t size=buffer.count() ;
150    file.write((const char*) &size,sizeof(size)) ;
151    file.write((const char*) buffer.start(),size) ;
152    file.close() ;
153  }
154
155  void CRegistry::fromFile(const string& filename)
156  {
157    ifstream file(filename.c_str(), ifstream::in | ifstream::binary) ;
158    if (!file) return ;
159    size_t size;
160    file.read((char*) &size,sizeof(size)) ;
161   
162    CBufferIn buffer(size) ;
163    file.read((char*) buffer.ptr(),size) ;
164    this->fromBuffer(buffer) ;
165
166    file.close() ;
167  }
168   
169  void CRegistry::mergeRegistry(const CRegistry& inRegistry)
170  {
171    size_t size ;
172    char* value;
173   
174    for(map<string,pair<size_t,char*> >::const_iterator it=inRegistry.registry.begin() ; it!=inRegistry.registry.end() ; ++it)
175    {
176      const string& key=it->first ;
177      map<string,pair<size_t,char*> >::iterator it2=registry.find(key) ;
178      if (it2==registry.end())
179      {
180        size=it->second.first ;
181        value=new char[size] ;
182        memcpy(value,it->second.second,size) ;
183        registry.insert(pair<string, pair<size_t,char*> >(key, pair<size_t,char*>(size,value))) ;     
184      }
185    }
186  }
187
188 
189  void CRegistry::bcastRegistry(void)
190  {
191    int rank ;
192    MPI_Comm_rank(communicator,&rank);
193    if (rank==0)
194    {
195      CBufferOut buffer(this->size()) ;
196      this->toBuffer(buffer) ;
197      int size=buffer.count() ;
198      MPI_Bcast(&size,1,MPI_INT,0,communicator) ;
199      MPI_Bcast(buffer.start(),size,MPI_CHAR,0,communicator) ;
200    }
201    else
202    {
203      int size ;
204      MPI_Bcast(&size,1,MPI_INT,0,communicator) ;
205      CBufferIn buffer(size) ;
206      MPI_Bcast(buffer.start(),size,MPI_CHAR,0,communicator) ;
207      this->fromBuffer(buffer) ;
208    }
209  }
210  void CRegistry::gatherRegistry(void)
211  {
212    gatherRegistry(communicator) ;
213  }
214
215  void CRegistry::gatherRegistry(const MPI_Comm& comm)
216  {
217    int rank,mpiSize ;
218    MPI_Comm_rank(comm,&rank);
219    MPI_Comm_size(comm,&mpiSize);
220
221    int* sizes=new int[mpiSize] ;
222    CBufferOut localBuffer(this->size()) ;
223    this->toBuffer(localBuffer) ;
224    int localSize=localBuffer.count() ;
225    MPI_Gather(&localSize,1,MPI_INT,sizes,1,MPI_INT,0,comm) ;
226
227    char* globalBuffer ;
228    int*   displs ;
229   
230    if (rank==0)
231    {
232      size_t globalBufferSize=0 ;
233      displs=new int[mpiSize] ;
234      for (int i=0;i<mpiSize;++i)
235      {
236        displs[i]=globalBufferSize ;
237        globalBufferSize+=sizes[i] ;
238      }
239
240      globalBuffer=new char[globalBufferSize] ;
241      MPI_Gatherv(localBuffer.start(),localSize,MPI_CHAR,globalBuffer,sizes,displs,MPI_CHAR,0,comm) ;
242      for(int i=1;i<mpiSize;++i)
243      {
244        CBufferIn buffer(globalBuffer+displs[i],sizes[i]) ;
245        CRegistry reg ;
246        reg.fromBuffer(buffer) ;
247        mergeRegistry(reg) ;
248      }
249      delete[] displs ;
250      delete[] globalBuffer ;
251    }
252    else  MPI_Gatherv(localBuffer.start(),localSize,MPI_CHAR,globalBuffer,sizes,displs,MPI_CHAR,0,comm) ;   
253    delete[] sizes ;
254   
255  }
256
257  void CRegistry::hierarchicalGatherRegistry(void)
258  {
259   // hierarchicalGatherRegistry(communicator) ;
260    gatherRegistry(communicator) ;
261  }
262
263  void CRegistry::hierarchicalGatherRegistry(const MPI_Comm& comm)
264  {
265    int mpiRank,mpiSize ;
266    MPI_Comm_rank(comm,&mpiRank);
267    MPI_Comm_size(comm,&mpiSize);   
268
269    if (mpiSize>2)
270    {
271      int color ;
272      if (mpiRank<mpiSize/2+mpiSize%2) color=0 ;
273      else color=1 ;
274      MPI_Comm commUp ;
275      MPI_Comm_split(comm,color,mpiRank,&commUp) ,
276      hierarchicalGatherRegistry(commUp) ;
277      MPI_Comm_free(&commUp) ;
278    }
279
280    if (mpiSize>1)
281    {
282      MPI_Comm commDown ;
283      int color ;
284     
285      if (mpiRank==0 || mpiRank==mpiSize/2+mpiSize%2) color=0 ;
286      else color=1 ;
287      MPI_Comm_split(comm,color,mpiRank,&commDown) ;
288      if (color==0) gatherRegistry(commDown) ;
289      MPI_Comm_free(&commDown) ;   
290    }
291  }
292
293}
Note: See TracBrowser for help on using the repository browser.