Changeset 2015


Ignore:
Timestamp:
01/13/21 19:09:23 (8 months ago)
Author:
ymipsl
Message:

ASSIM2K Branch :
Improve transfer protocol using mpi matching Probe / matching receive
YM

Location:
XIOS/dev/dev_ym/XIOS_ASSIM2K/src
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • XIOS/dev/dev_ym/XIOS_ASSIM2K/src/context_server.cpp

    r1230 r2015  
    6464  { 
    6565    listen(); 
     66    checkPendingProbe(); 
    6667    checkPendingRequest(); 
    6768    if (enableEventsProcessing) 
     
    6970    return finished; 
    7071  } 
    71  
     72/* 
    7273  void CContextServer::listen(void) 
    7374  { 
     
    138139  } 
    139140 
     141*/ 
     142 
     143  void CContextServer::listen(void) 
     144  { 
     145    int rank; 
     146    int flag; 
     147    int count; 
     148    char * addr; 
     149    MPI_Status status; 
     150    MPI_Message message ; 
     151    map<int,CServerBuffer*>::iterator it; 
     152    bool okLoop; 
     153 
     154    traceOff(); 
     155    MPI_Improbe(MPI_ANY_SOURCE, 20,interComm,&flag,&message, &status); 
     156    traceOn(); 
     157    if (flag==true) listenPendingRequest(message, status) ; 
     158  } 
     159 
     160  bool CContextServer::listenPendingRequest( MPI_Message &message, MPI_Status& status) 
     161  { 
     162    int count; 
     163    char * addr; 
     164    map<int,CServerBuffer*>::iterator it; 
     165    int rank=status.MPI_SOURCE ; 
     166 
     167    it=buffers.find(rank); 
     168    if (it==buffers.end()) // Receive the buffer size and allocate the buffer 
     169    { 
     170       StdSize buffSize = 0; 
     171       MPI_Mrecv(&buffSize, 1, MPI_LONG, &message, &status); 
     172       mapBufferSize_.insert(std::make_pair(rank, buffSize)); 
     173       it=(buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(buffSize)))).first; 
     174       return true; 
     175    } 
     176    else 
     177    { 
     178//      MPI_Get_count(&status,MPI_CHAR,&count); 
     179//      if (it->second->isBufferFree(count)) 
     180//      { 
     181//        addr=(char*)it->second->getBuffer(count); 
     182//         MPI_Imrecv(addr,count,MPI_CHAR, &message, &pendingRequest[rank]); 
     183//         bufferRequest[rank]=addr; 
     184//         return true; 
     185//       } 
     186//      else 
     187//      { 
     188        pendingProbe[rank].push_back(make_pair<MPI_Message,MPI_Status>(message,status)) ; 
     189        return false; 
     190//      } 
     191    } 
     192  } 
     193 
     194  void CContextServer::checkPendingProbe(void) 
     195  { 
     196     
     197    list<int> recvProbe ; 
     198    list<int>::iterator itRecv ; 
     199    map<int, list<std::pair<MPI_Message,MPI_Status> > >::iterator itProbe; 
     200 
     201    for(itProbe=pendingProbe.begin();itProbe!=pendingProbe.end();itProbe++) 
     202    { 
     203      int rank=itProbe->first ; 
     204      if (pendingRequest.count(rank)==0) 
     205      { 
     206        MPI_Message& message = itProbe->second.front().first ; 
     207        MPI_Status& status = itProbe->second.front().second ; 
     208        int count ; 
     209        MPI_Get_count(&status,MPI_CHAR,&count); 
     210        map<int,CServerBuffer*>::iterator it = buffers.find(rank); 
     211        if (it->second->isBufferFree(count)) 
     212        { 
     213          char * addr; 
     214          addr=(char*)it->second->getBuffer(count); 
     215          MPI_Imrecv(addr,count,MPI_CHAR, &message, &pendingRequest[rank]); 
     216          bufferRequest[rank]=addr; 
     217          recvProbe.push_back(rank) ; 
     218          itProbe->second.pop_front() ; 
     219        } 
     220      } 
     221    } 
     222 
     223    for(itRecv=recvProbe.begin(); itRecv!=recvProbe.end(); itRecv++) if (pendingProbe[*itRecv].empty()) pendingProbe.erase(*itRecv) ; 
     224  } 
    140225 
    141226  void CContextServer::checkPendingRequest(void) 
  • XIOS/dev/dev_ym/XIOS_ASSIM2K/src/context_server.hpp

    r1228 r2015  
    1717    bool eventLoop(bool enableEventsProcessing = true); 
    1818    void listen(void) ; 
    19     bool listenPendingRequest(MPI_Status& status) ; 
     19    bool listenPendingRequest(MPI_Message &message, MPI_Status& status) ; 
     20    void checkPendingProbe(void) ; 
    2021    void checkPendingRequest(void) ; 
    2122    void processRequest(int rank, char* buff,int count) ; 
     
    3435 
    3536    map<int,CServerBuffer*> buffers ; 
     37    map<int, list<std::pair<MPI_Message,MPI_Status> > > pendingProbe; 
    3638    map<int,MPI_Request> pendingRequest ; 
    3739    map<int,char*> bufferRequest ; 
Note: See TracChangeset for help on using the changeset viewer.