Changeset 1328 for XIOS/dev/branch_openmp/src/context_server.cpp
- Timestamp:
- 11/15/17 12:14:34 (6 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS/dev/branch_openmp/src/context_server.cpp
r1179 r1328 10 10 #include "file.hpp" 11 11 #include "grid.hpp" 12 #include "mpi _std.hpp"12 #include "mpi.hpp" 13 13 #include "tracer.hpp" 14 14 #include "timer.hpp" … … 18 18 #include <boost/functional/hash.hpp> 19 19 20 20 using namespace ep_lib; 21 21 22 22 namespace xios 23 23 { 24 24 25 CContextServer::CContextServer(CContext* parent, ep_lib::MPI_Comm intraComm_, ep_lib::MPI_Comm interComm_)25 CContextServer::CContextServer(CContext* parent,MPI_Comm intraComm_,MPI_Comm interComm_) 26 26 { 27 27 context=parent; … … 72 72 int count; 73 73 char * addr; 74 ep_lib::MPI_Status status;74 MPI_Status status; 75 75 map<int,CServerBuffer*>::iterator it; 76 77 for(rank=0;rank<commSize;rank++) 78 { 76 bool okLoop; 77 78 traceOff(); 79 MPI_Iprobe(-2, 20,interComm,&flag,&status); 80 traceOn(); 81 82 if (flag==true) 83 { 84 #ifdef _usingMPI 85 rank=status.MPI_SOURCE ; 86 #elif _usingEP 87 rank=status.ep_src ; 88 #endif 89 okLoop = true; 79 90 if (pendingRequest.find(rank)==pendingRequest.end()) 80 { 81 traceOff(); 82 ep_lib::MPI_Iprobe(rank,20,interComm,&flag,&status); 83 traceOn(); 84 if (flag) 91 okLoop = !listenPendingRequest(status) ; 92 if (okLoop) 93 { 94 for(rank=0;rank<commSize;rank++) 85 95 { 86 it=buffers.find(rank); 87 if (it==buffers.end()) // Receive the buffer size and allocate the buffer 96 if (pendingRequest.find(rank)==pendingRequest.end()) 88 97 { 89 StdSize buffSize = 0; 90 ep_lib::MPI_Recv(&buffSize, 1, MPI_LONG, rank, 20, interComm, &status); 91 mapBufferSize_.insert(std::make_pair(rank, buffSize)); 92 it=(buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(buffSize)))).first; 93 } 94 else 95 { 96 97 ep_lib::MPI_Get_count(&status,MPI_CHAR,&count); 98 if (it->second->isBufferFree(count)) 99 { 100 addr=(char*)it->second->getBuffer(count); 101 ep_lib::MPI_Irecv(addr,count,MPI_CHAR,rank,20,interComm,&pendingRequest[rank]); 102 bufferRequest[rank]=addr; 103 } 98 99 traceOff(); 100 MPI_Iprobe(rank, 20,interComm,&flag,&status); 101 traceOn(); 102 if (flag==true) listenPendingRequest(status) ; 104 103 } 105 104 } … … 108 107 } 109 108 109 bool CContextServer::listenPendingRequest(MPI_Status& status) 110 { 111 int count; 112 char * addr; 113 map<int,CServerBuffer*>::iterator it; 114 #ifdef _usingMPI 115 int rank=status.MPI_SOURCE ; 116 #elif _usingEP 117 int rank=status.ep_src; 118 #endif 119 120 it=buffers.find(rank); 121 if (it==buffers.end()) // Receive the buffer size and allocate the buffer 122 { 123 StdSize buffSize = 0; 124 MPI_Recv(&buffSize, 1, MPI_LONG, rank, 20, interComm, &status); 125 mapBufferSize_.insert(std::make_pair(rank, buffSize)); 126 it=(buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(buffSize)))).first; 127 return true; 128 } 129 else 130 { 131 MPI_Get_count(&status,MPI_CHAR,&count); 132 if (it->second->isBufferFree(count)) 133 { 134 addr=(char*)it->second->getBuffer(count); 135 MPI_Irecv(addr,count,MPI_CHAR,rank,20,interComm,&pendingRequest[rank]); 136 bufferRequest[rank]=addr; 137 return true; 138 } 139 else 140 return false; 141 } 142 } 143 110 144 void CContextServer::checkPendingRequest(void) 111 145 { 112 map<int, ep_lib::MPI_Request>::iterator it;146 map<int,MPI_Request>::iterator it; 113 147 list<int> recvRequest; 114 148 list<int>::iterator itRecv; … … 116 150 int flag; 117 151 int count; 118 ep_lib::MPI_Status status;119 120 for(it=pendingRequest.begin();it!=pendingRequest.end(); ++it)152 MPI_Status status; 153 154 for(it=pendingRequest.begin();it!=pendingRequest.end();it++) 121 155 { 122 156 rank=it->first; 123 157 traceOff(); 124 ep_lib::MPI_Test(& it->second, &flag, &status);158 MPI_Test(& it->second, &flag, &status); 125 159 traceOn(); 126 160 if (flag==true) 127 161 { 128 162 recvRequest.push_back(rank); 129 ep_lib::MPI_Get_count(&status,MPI_CHAR,&count);163 MPI_Get_count(&status,MPI_CHAR,&count); 130 164 processRequest(rank,bufferRequest[rank],count); 131 165 } … … 220 254 { 221 255 finished=true; 222 #pragma omp critical (_output)223 256 info(20)<<"Server Side context <"<<context->getId()<<"> finalized"<<endl; 224 257 std::map<int, StdSize>::const_iterator itbMap = mapBufferSize_.begin(), … … 227 260 for (itMap = itbMap; itMap != iteMap; ++itMap) 228 261 { 229 //report(10)<< " Memory report : Context <"<<context->getId()<<"> : server side : memory used for buffer of each connection to client" << endl230 //<< " +) With client of rank " << itMap->first << " : " << itMap->second << " bytes " << endl;262 report(10)<< " Memory report : Context <"<<context->getId()<<"> : server side : memory used for buffer of each connection to client" << endl 263 << " +) With client of rank " << itMap->first << " : " << itMap->second << " bytes " << endl; 231 264 totalBuf += itMap->second; 232 265 } 233 266 context->finalize(); 234 //report(0)<< " Memory report : Context <"<<context->getId()<<"> : server side : total memory used for buffer "<<totalBuf<<" bytes"<<endl;267 report(0)<< " Memory report : Context <"<<context->getId()<<"> : server side : total memory used for buffer "<<totalBuf<<" bytes"<<endl; 235 268 } 236 269 else if (event.classId==CContext::GetType()) CContext::dispatchEvent(event);
Note: See TracChangeset
for help on using the changeset viewer.