Ignore:
Timestamp:
10/11/21 14:41:56 (3 years ago)
Author:
ymipsl
Message:
  • Update of the tranfer protocol using one sided communication
  • Introduce MPI_Improb/MPI_mrecv to listen incomming request
  • Introducing latency when looping over managers

YM

File:
1 edited

Legend:

Unmodified
Added
Removed
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/window_manager.hpp

    r1764 r2246  
    2525    MPI_Win window_ ; 
    2626    void * winBuffer_ ; 
     27    map<int,double> lastTimeLock_ ; 
     28    const double latency_=0e-2 ;  
    2729 
    2830    public : 
     
    4648    { 
    4749      int lock=state ; 
    48            
     50      double time ; 
     51      auto it=lastTimeLock_.find(rank) ; 
     52      if (it == lastTimeLock_.end())  
     53      {  
     54        lastTimeLock_[rank] = 0. ;  
     55        it=lastTimeLock_.find(rank) ; 
     56      } 
     57      double& lastTime = it->second ; 
     58 
    4959      do  
    5060      { 
     61        time=MPI_Wtime() ; 
     62        while(time-lastTime < latency_) time=MPI_Wtime() ; 
    5163        MPI_Win_lock(MPI_LOCK_EXCLUSIVE, rank, 0, window_) ; 
    5264        MPI_Compare_and_swap(&WINDOWS_LOCKED, &state, &lock, MPI_INT, rank, OFFSET_LOCK, window_) ; 
    5365        MPI_Win_unlock(rank, window_) ; 
     66        lastTime=MPI_Wtime() ; 
    5467      } while (lock!=state) ; 
    55  
    56        
    57     } 
    58  
     68       
     69       
     70    } 
     71 
     72    void lockWindowExclusive(int rank, int state ) 
     73    { 
     74      int lock=state ; 
     75      double time ; 
     76      auto it=lastTimeLock_.find(rank) ; 
     77      if (it == lastTimeLock_.end())  
     78      {  
     79        lastTimeLock_[rank] = 0. ;  
     80        it=lastTimeLock_.find(rank) ; 
     81      } 
     82      double& lastTime = it->second ; 
     83 
     84      do  
     85      { 
     86        time=MPI_Wtime() ; 
     87        while(time-lastTime < latency_) time=MPI_Wtime() ; 
     88        MPI_Win_lock(MPI_LOCK_EXCLUSIVE, rank, 0, window_) ; 
     89        MPI_Compare_and_swap(&WINDOWS_LOCKED, &state, &lock, MPI_INT, rank, OFFSET_LOCK, window_) ; 
     90        MPI_Win_unlock(rank, window_) ; 
     91        lastTime=MPI_Wtime() ; 
     92      } while (lock!=state) ; 
     93    } 
     94 
     95    void lockWindowExclusive(int rank) 
     96    { 
     97      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, rank, 0, window_) ; 
     98    } 
     99 
     100    void lockWindowShared(int rank) 
     101    { 
     102      MPI_Win_lock(MPI_LOCK_SHARED, rank, 0, window_) ; 
     103    } 
     104 
     105    void unlockWindow(int rank) 
     106    { 
     107      MPI_Win_unlock(rank, window_) ; 
     108    } 
     109 
     110    void flushWindow(int rank) 
     111    { 
     112      MPI_Win_flush(rank, window_) ; 
     113    } 
    59114 
    60115    void unlockWindow(int rank, int state ) 
     
    77132      MPI_Win_unlock(rank, window_) ; 
    78133    } 
    79      
     134 
     135    template< class T > 
     136    void updateToLockedWindow(int rank, T* object, void (T::*dumpOut)(CBufferOut&) ) 
     137    { 
     138      CBufferOut buffer ; 
     139      (object->*dumpOut)(buffer) ; 
     140      size_t size=buffer.count() ; 
     141//      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, rank, 0, window_) ; 
     142      MPI_Put(&size, SIZE_BUFFER_SIZE, MPI_CHAR, rank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR, window_) ; 
     143      MPI_Put(buffer.start(), size, MPI_CHAR, rank,OFFSET_BUFFER, size, MPI_CHAR, window_) ; 
     144//      MPI_Win_unlock(rank, window_) ; 
     145    } 
     146 
    80147    template< typename T > 
    81148    void updateFromWindow(int rank, T* object, void (T::*dumpIn)(CBufferIn&) )  
     
    90157      (object->*dumpIn)(buffer) ; 
    91158    } 
     159 
     160    template< typename T > 
     161    void updateFromLockedWindow(int rank, T* object, void (T::*dumpIn)(CBufferIn&) )  
     162    { 
     163      size_t size ; 
     164//      MPI_Win_lock(MPI_LOCK_SHARED, rank, 0, window_) ; 
     165      MPI_Get(&size, SIZE_BUFFER_SIZE, MPI_CHAR, rank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR, window_) ; 
     166      MPI_Win_flush(rank,window_) ; 
     167      CBufferIn buffer(size) ; 
     168      MPI_Get(buffer.start(), size, MPI_CHAR, rank,OFFSET_BUFFER, size, MPI_CHAR, window_) ; 
     169//      MPI_Win_unlock(rank, window_) ; 
     170      MPI_Win_flush(rank, window_) ; 
     171      (object->*dumpIn)(buffer) ; 
     172    } 
     173 
    92174 
    93175    template< class T > 
Note: See TracChangeset for help on using the changeset viewer.