source: XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/window_manager.hpp @ 2246

Last change on this file since 2246 was 2246, checked in by ymipsl, 3 years ago
  • Update of the tranfer protocol using one sided communication
  • Introduce MPI_Improb/MPI_mrecv to listen incomming request
  • Introducing latency when looping over managers

YM

  • Property svn:executable set to *
File size: 7.3 KB
Line 
1#ifndef __WINDOW_MANAGER_HPP__
2#define __WINDOW_MANAGER_HPP__
3
4#include <map>
5#include "mpi.hpp"
6#include "buffer_in.hpp"
7#include "buffer_out.hpp"
8#include "message.hpp"
9
10namespace xios
11{
12
13
14  class CWindowManager
15  {
16
17    private :
18    const MPI_Aint OFFSET_LOCK=0 ;
19    const int SIZE_LOCK=sizeof(int) ;
20    const MPI_Aint OFFSET_BUFFER_SIZE=OFFSET_LOCK+SIZE_LOCK ;
21    const int SIZE_BUFFER_SIZE=sizeof(size_t) ;
22    const MPI_Aint OFFSET_BUFFER=OFFSET_BUFFER_SIZE+SIZE_BUFFER_SIZE ;
23    const int WINDOWS_LOCKED=-1 ;
24
25    MPI_Win window_ ;
26    void * winBuffer_ ;
27    map<int,double> lastTimeLock_ ;
28    const double latency_=0e-2 ; 
29
30    public :
31
32    CWindowManager(MPI_Comm winComm, size_t bufferSize)
33    {
34      const MPI_Aint windowSize=bufferSize+OFFSET_BUFFER ;
35      MPI_Win_allocate(windowSize, 1, MPI_INFO_NULL, winComm, &winBuffer_, &window_) ;
36      int lock=0 ;
37      size_t size=0 ;
38      int commRank ;
39      MPI_Comm_rank(winComm, &commRank) ;
40      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, commRank, 0, window_) ;
41      MPI_Put(&lock, SIZE_LOCK, MPI_CHAR, commRank, OFFSET_LOCK, SIZE_LOCK, MPI_CHAR, window_) ;
42      MPI_Put(&size, SIZE_BUFFER_SIZE, MPI_CHAR, commRank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR, window_) ;
43      MPI_Win_unlock(commRank, window_) ;
44      MPI_Barrier(winComm) ;
45    }
46   
47    void lockWindow(int rank, int state )
48    {
49      int lock=state ;
50      double time ;
51      auto it=lastTimeLock_.find(rank) ;
52      if (it == lastTimeLock_.end()) 
53      { 
54        lastTimeLock_[rank] = 0. ; 
55        it=lastTimeLock_.find(rank) ;
56      }
57      double& lastTime = it->second ;
58
59      do 
60      {
61        time=MPI_Wtime() ;
62        while(time-lastTime < latency_) time=MPI_Wtime() ;
63        MPI_Win_lock(MPI_LOCK_EXCLUSIVE, rank, 0, window_) ;
64        MPI_Compare_and_swap(&WINDOWS_LOCKED, &state, &lock, MPI_INT, rank, OFFSET_LOCK, window_) ;
65        MPI_Win_unlock(rank, window_) ;
66        lastTime=MPI_Wtime() ;
67      } while (lock!=state) ;
68     
69     
70    }
71
72    void lockWindowExclusive(int rank, int state )
73    {
74      int lock=state ;
75      double time ;
76      auto it=lastTimeLock_.find(rank) ;
77      if (it == lastTimeLock_.end()) 
78      { 
79        lastTimeLock_[rank] = 0. ; 
80        it=lastTimeLock_.find(rank) ;
81      }
82      double& lastTime = it->second ;
83
84      do 
85      {
86        time=MPI_Wtime() ;
87        while(time-lastTime < latency_) time=MPI_Wtime() ;
88        MPI_Win_lock(MPI_LOCK_EXCLUSIVE, rank, 0, window_) ;
89        MPI_Compare_and_swap(&WINDOWS_LOCKED, &state, &lock, MPI_INT, rank, OFFSET_LOCK, window_) ;
90        MPI_Win_unlock(rank, window_) ;
91        lastTime=MPI_Wtime() ;
92      } while (lock!=state) ;
93    }
94
95    void lockWindowExclusive(int rank)
96    {
97      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, rank, 0, window_) ;
98    }
99
100    void lockWindowShared(int rank)
101    {
102      MPI_Win_lock(MPI_LOCK_SHARED, rank, 0, window_) ;
103    }
104
105    void unlockWindow(int rank)
106    {
107      MPI_Win_unlock(rank, window_) ;
108    }
109
110    void flushWindow(int rank)
111    {
112      MPI_Win_flush(rank, window_) ;
113    }
114
115    void unlockWindow(int rank, int state )
116    {
117      int lock ;
118      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, rank, 0, window_) ;
119      MPI_Compare_and_swap(&state, &WINDOWS_LOCKED, &lock, MPI_INT, rank, OFFSET_LOCK, window_) ;
120      MPI_Win_unlock(rank, window_) ;
121    }
122   
123    template< class T >
124    void updateToWindow(int rank, T* object, void (T::*dumpOut)(CBufferOut&) )
125    {
126      CBufferOut buffer ;
127      (object->*dumpOut)(buffer) ;
128      size_t size=buffer.count() ;
129      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, rank, 0, window_) ;
130      MPI_Put(&size, SIZE_BUFFER_SIZE, MPI_CHAR, rank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR, window_) ;
131      MPI_Put(buffer.start(), size, MPI_CHAR, rank,OFFSET_BUFFER, size, MPI_CHAR, window_) ;
132      MPI_Win_unlock(rank, window_) ;
133    }
134
135    template< class T >
136    void updateToLockedWindow(int rank, T* object, void (T::*dumpOut)(CBufferOut&) )
137    {
138      CBufferOut buffer ;
139      (object->*dumpOut)(buffer) ;
140      size_t size=buffer.count() ;
141//      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, rank, 0, window_) ;
142      MPI_Put(&size, SIZE_BUFFER_SIZE, MPI_CHAR, rank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR, window_) ;
143      MPI_Put(buffer.start(), size, MPI_CHAR, rank,OFFSET_BUFFER, size, MPI_CHAR, window_) ;
144//      MPI_Win_unlock(rank, window_) ;
145    }
146
147    template< typename T >
148    void updateFromWindow(int rank, T* object, void (T::*dumpIn)(CBufferIn&) ) 
149    {
150      size_t size ;
151      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, rank, 0, window_) ;
152      MPI_Get(&size, SIZE_BUFFER_SIZE, MPI_CHAR, rank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR, window_) ;
153      MPI_Win_flush(rank,window_) ;
154      CBufferIn buffer(size) ;
155      MPI_Get(buffer.start(), size, MPI_CHAR, rank,OFFSET_BUFFER, size, MPI_CHAR, window_) ;
156      MPI_Win_unlock(rank, window_) ;
157      (object->*dumpIn)(buffer) ;
158    }
159
160    template< typename T >
161    void updateFromLockedWindow(int rank, T* object, void (T::*dumpIn)(CBufferIn&) ) 
162    {
163      size_t size ;
164//      MPI_Win_lock(MPI_LOCK_SHARED, rank, 0, window_) ;
165      MPI_Get(&size, SIZE_BUFFER_SIZE, MPI_CHAR, rank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR, window_) ;
166      MPI_Win_flush(rank,window_) ;
167      CBufferIn buffer(size) ;
168      MPI_Get(buffer.start(), size, MPI_CHAR, rank,OFFSET_BUFFER, size, MPI_CHAR, window_) ;
169//      MPI_Win_unlock(rank, window_) ;
170      MPI_Win_flush(rank, window_) ;
171      (object->*dumpIn)(buffer) ;
172    }
173
174
175    template< class T >
176    void pushToWindow(int rank, T* object, void (T::*dumpOut)(CBufferOut&) )
177    {
178      size_t size ;
179      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, rank, 0, window_) ;
180      MPI_Get(&size, SIZE_BUFFER_SIZE, MPI_CHAR, rank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR, window_) ;
181      MPI_Win_flush(rank,window_) ;
182      CBufferOut buffer ;
183      (object->*dumpOut)(buffer) ;
184      size_t bufferSize=buffer.count() ;
185      size_t newSize = size + bufferSize;
186      MPI_Put(&newSize, SIZE_BUFFER_SIZE, MPI_CHAR, rank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR, window_) ;
187      MPI_Put(buffer.start(), bufferSize, MPI_CHAR, rank, OFFSET_BUFFER+size, bufferSize, MPI_CHAR, window_) ;
188      MPI_Win_unlock(rank, window_) ;
189    }
190
191    template< typename T >
192    void popFromWindow(int rank, T* object, void (T::*dumpIn)(CBufferIn&) ) 
193    {
194      size_t size ;
195      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, rank, 0, window_) ;
196      MPI_Get(&size, SIZE_BUFFER_SIZE, MPI_CHAR, rank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR, window_) ;
197      MPI_Win_flush(rank,window_) ;
198      CBufferIn buffer(size) ;
199      MPI_Get(buffer.start(), size, MPI_CHAR, rank,OFFSET_BUFFER, size, MPI_CHAR, window_) ;
200      MPI_Win_flush(rank,window_) ;
201      (object->*dumpIn)(buffer) ;
202     
203      size=buffer.remain() ;
204      MPI_Put(&size, SIZE_BUFFER_SIZE, MPI_CHAR, rank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR, window_) ;
205      MPI_Put(buffer.ptr(),buffer.remain(), MPI_CHAR, rank, OFFSET_BUFFER, buffer.remain(), MPI_CHAR, window_) ;
206      MPI_Win_unlock(rank, window_) ;
207     
208    }
209
210    ~CWindowManager()
211    {
212      MPI_Win_free(&window_) ;
213    }
214  } ;
215}
216
217
218
219#endif
Note: See TracBrowser for help on using the repository browser.