Ignore:
Timestamp:
10/11/21 14:41:56 (3 years ago)
Author:
ymipsl
Message:
  • Update of the tranfer protocol using one sided communication
  • Introduce MPI_Improb/MPI_mrecv to listen incomming request
  • Introducing latency when looping over managers

YM

File:
1 edited

Legend:

Unmodified
Added
Removed
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/context_server.cpp

    r2240 r2246  
    4646    else  attachedMode=true ; 
    4747     
    48     if (flag) MPI_Comm_remote_size(interComm,&commSize); 
    49     else  MPI_Comm_size(interComm,&commSize); 
     48    int clientSize ; 
     49    if (flag) MPI_Comm_remote_size(interComm,&clientSize); 
     50    else  MPI_Comm_size(interComm,&clientSize); 
    5051 
    5152    
     
    7576    if (!isAttachedModeEnabled()) 
    7677    { 
     78      CTimer::get("create Windows").resume() ; 
     79 
    7780      MPI_Intercomm_merge(interComm_,true,&interCommMerged) ; 
    78 // create windows for one sided comm 
    79       int interCommMergedRank; 
     81 
     82      // We create dummy pair of intercommunicator between clients and server 
     83      // Why ? Just because on openMPI, it reduce the creation time of windows otherwhise which increase quadratically 
     84      // We don't know the reason 
     85      MPI_Comm commSelf ; 
     86      MPI_Comm_split(intraComm_, intraCommRank, intraCommRank, &commSelf) ; 
     87      vector<MPI_Comm> dummyComm(clientSize) ; 
     88      for(int rank=0; rank<clientSize ; rank++) MPI_Intercomm_create(commSelf, 0, interCommMerged, rank, 0 , &dummyComm[rank]) ; 
     89 
     90      // create windows for one sided comm 
    8091      MPI_Comm winComm ; 
    81       MPI_Comm_rank(intraComm, &interCommMergedRank); 
    8292      windows.resize(2) ; 
    83       for(int rank=commSize; rank<commSize+intraCommSize; rank++) 
    84       { 
    85         if (rank==commSize+interCommMergedRank)  
     93      for(int rank=clientSize; rank<clientSize+intraCommSize; rank++) 
     94      { 
     95        if (rank==clientSize+intraCommRank)  
    8696        { 
    87           MPI_Comm_split(interCommMerged, interCommMergedRank, rank, &winComm); 
    88           int myRank ; 
    89           MPI_Comm_rank(winComm,&myRank); 
     97          MPI_Comm_split(interCommMerged, intraCommRank, rank, &winComm); 
    9098          MPI_Win_create_dynamic(MPI_INFO_NULL, winComm, &windows[0]); 
    91           MPI_Win_create_dynamic(MPI_INFO_NULL, winComm, &windows[1]);       
     99          MPI_Win_create_dynamic(MPI_INFO_NULL, winComm, &windows[1]);    
    92100        } 
    93         else MPI_Comm_split(interCommMerged, interCommMergedRank, rank, &winComm); 
    94 //       ym : Warning : intelMPI doesn't support that communicator of windows be deallocated before the windows deallocation, crash at MPI_Win_lock 
    95 //            Bug or not ?           
    96         // MPI_Comm_free(&winComm) ; 
    97       } 
     101        else MPI_Comm_split(interCommMerged, intraCommRank, rank, &winComm); 
     102        //       ym : Warning : intelMPI doesn't support that communicator of windows be deallocated before the windows deallocation, crash at MPI_Win_lock 
     103        //            Bug or not ?           
     104        //         MPI_Comm_free(&winComm) ; 
     105      } 
     106       
     107      // free dummy intercommunicator 
     108      for(int rank=0; rank<clientSize ; rank++)  MPI_Comm_free(&dummyComm[rank]) ; 
     109      MPI_Comm_free(&commSelf) ; 
     110      CTimer::get("create Windows").suspend() ; 
    98111    } 
    99112    else  
     
    103116      windows[1]=MPI_WIN_NULL ; 
    104117    } 
    105  
    106  
    107118     
    108     MPI_Comm_split(intraComm_,intraCommRank,intraCommRank, &commSelf) ; 
    109119    itLastTimeLine=lastTimeLine.begin() ; 
    110120 
     
    138148  bool CContextServer::eventLoop(bool enableEventsProcessing /*= true*/) 
    139149  { 
     150    CTimer::get("listen request").resume(); 
    140151    listen(); 
     152    CTimer::get("listen request").suspend(); 
     153    CTimer::get("check pending request").resume(); 
    141154    checkPendingRequest(); 
     155    checkPendingProbe() ; 
     156    CTimer::get("check pending request").suspend(); 
     157    CTimer::get("check event process").resume(); 
    142158    if (enableEventsProcessing)  processEvents(); 
     159    CTimer::get("check event process").suspend(); 
    143160    return finished; 
    144161  } 
    145  
     162/* 
    146163  void CContextServer::listen(void) 
    147164  { 
     
    221238    } 
    222239  } 
     240*/ 
     241 
     242 void CContextServer::listen(void) 
     243  { 
     244    int rank; 
     245    int flag; 
     246    int count; 
     247    char * addr; 
     248    MPI_Status status; 
     249    MPI_Message message ; 
     250    map<int,CServerBuffer*>::iterator it; 
     251    bool okLoop; 
     252 
     253    traceOff(); 
     254    MPI_Improbe(MPI_ANY_SOURCE, 20,interComm,&flag,&message, &status); 
     255    traceOn(); 
     256    if (flag==true) listenPendingRequest(message, status) ; 
     257  } 
     258 
     259  bool CContextServer::listenPendingRequest( MPI_Message &message, MPI_Status& status) 
     260  { 
     261    int count; 
     262    char * addr; 
     263    map<int,CServerBuffer*>::iterator it; 
     264    int rank=status.MPI_SOURCE ; 
     265 
     266    it=buffers.find(rank); 
     267    if (it==buffers.end()) // Receive the buffer size and allocate the buffer 
     268    { 
     269       MPI_Aint recvBuff[4] ; 
     270       MPI_Mrecv(recvBuff, 4, MPI_AINT,  &message, &status); 
     271       remoteHashId_ = recvBuff[0] ; 
     272       StdSize buffSize = recvBuff[1]; 
     273       vector<MPI_Aint> winAdress(2) ; 
     274       winAdress[0]=recvBuff[2] ; winAdress[1]=recvBuff[3] ; 
     275       mapBufferSize_.insert(std::make_pair(rank, buffSize)); 
     276       it=(buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(windows, winAdress, rank, buffSize)))).first; 
     277       lastTimeLine[rank]=0 ; 
     278       itLastTimeLine=lastTimeLine.begin() ; 
     279       return true; 
     280    } 
     281    else 
     282    { 
     283        std::pair<MPI_Message,MPI_Status> mypair(message,status) ; 
     284        pendingProbe[rank].push_back(mypair) ; 
     285        return false; 
     286    } 
     287  } 
     288 
     289  void CContextServer::checkPendingProbe(void) 
     290  { 
     291     
     292    list<int> recvProbe ; 
     293    list<int>::iterator itRecv ; 
     294    map<int, list<std::pair<MPI_Message,MPI_Status> > >::iterator itProbe; 
     295 
     296    for(itProbe=pendingProbe.begin();itProbe!=pendingProbe.end();itProbe++) 
     297    { 
     298      int rank=itProbe->first ; 
     299      if (pendingRequest.count(rank)==0) 
     300      { 
     301        MPI_Message& message = itProbe->second.front().first ; 
     302        MPI_Status& status = itProbe->second.front().second ; 
     303        int count ; 
     304        MPI_Get_count(&status,MPI_CHAR,&count); 
     305        map<int,CServerBuffer*>::iterator it = buffers.find(rank); 
     306        if (it->second->isBufferFree(count)) 
     307        { 
     308          char * addr; 
     309          addr=(char*)it->second->getBuffer(count); 
     310          MPI_Imrecv(addr,count,MPI_CHAR, &message, &pendingRequest[rank]); 
     311          bufferRequest[rank]=addr; 
     312          recvProbe.push_back(rank) ; 
     313          itProbe->second.pop_front() ; 
     314        } 
     315      } 
     316    } 
     317 
     318    for(itRecv=recvProbe.begin(); itRecv!=recvProbe.end(); itRecv++) if (pendingProbe[*itRecv].empty()) pendingProbe.erase(*itRecv) ; 
     319  } 
    223320 
    224321 
     
    232329    int count; 
    233330    MPI_Status status; 
     331    
     332    if (!pendingRequest.empty()) CTimer::get("receiving requests").resume(); 
     333    else CTimer::get("receiving requests").suspend(); 
    234334 
    235335    for(it=pendingRequest.begin();it!=pendingRequest.end();it++) 
     
    257357  void CContextServer::getBufferFromClient(size_t timeLine) 
    258358  { 
     359    CTimer::get("CContextServer::getBufferFromClient").resume() ; 
    259360    if (!isAttachedModeEnabled()) // one sided desactivated in attached mode 
    260361    {   
     
    267368      { 
    268369        rank=itLastTimeLine->first ; 
    269         if (itLastTimeLine->second < timeLine &&  pendingRequest.count(rank)==0) 
     370        if (itLastTimeLine->second < timeLine &&  pendingRequest.count(rank)==0 && buffers[rank]->isBufferEmpty()) 
    270371        { 
    271           if (buffers[rank]->getBufferFromClient(timeLine, buffer, count)) 
    272           { 
    273             processRequest(rank, buffer, count); 
    274             break ; 
    275           } 
     372          if (buffers[rank]->getBufferFromClient(timeLine, buffer, count)) processRequest(rank, buffer, count); 
     373          if (count >= 0) break ; 
    276374        } 
    277375      } 
    278376    } 
     377    CTimer::get("CContextServer::getBufferFromClient").suspend() ; 
    279378  } 
    280379          
     
    388487        } 
    389488      } 
    390       else getBufferFromClient(currentTimeLine) ; 
     489      else if (pendingRequest.empty()) getBufferFromClient(currentTimeLine) ; 
    391490    } 
    392491    else if (pureOneSided) getBufferFromClient(currentTimeLine) ; // if pure one sided check buffer even if no event recorded at current time line 
     
    441540//      releaseBuffers() ; 
    442541      notifyClientsFinalize() ; 
     542      CTimer::get("receiving requests").suspend(); 
    443543      context->finalize(); 
    444544 
     
    446546      MPI_Win_free(&windows[0]) ; 
    447547      MPI_Win_free(&windows[1]) ; 
    448       
     548 
    449549      std::map<int, StdSize>::const_iterator itbMap = mapBufferSize_.begin(), 
    450550                           iteMap = mapBufferSize_.end(), itMap; 
Note: See TracChangeset for help on using the changeset viewer.