source: XIOS/dev/branch_openmp/extern/src_ep_dev/ep_message.cpp @ 1354

Last change on this file since 1354 was 1347, checked in by yushan, 6 years ago

dev omp from Ada

File size: 8.0 KB
Line 
1/*!
2   \file ep_message.cpp
3   \since 2 may 2016
4
5   \brief Definitions of MPI endpoint function: Message_Check
6 */
7
8#include "ep_lib.hpp"
9#include <mpi.h>
10#include "ep_declaration.hpp"
11#include "ep_mpi.hpp"
12
13using namespace std;
14
15extern std::list< ep_lib::MPI_Request* > * EP_PendingRequests;
16#pragma omp threadprivate(EP_PendingRequests)
17
18namespace ep_lib
19{
20
21  int Message_Check(MPI_Comm comm)
22  {
23    if(!comm.is_ep) return 0;
24
25    if(comm.is_intercomm)
26    {
27      return  Message_Check_intercomm(comm);
28    }
29
30    int flag = true;
31    ::MPI_Message message;
32    ::MPI_Status status;
33    int mpi_source;
34
35    while(flag) // loop until the end of global queue
36    {
37      Debug("Message probing for intracomm\n");
38     
39
40      #ifdef _openmpi
41      #pragma omp critical (_mpi_call)
42      {
43        ::MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, to_mpi_comm(comm.mpi_comm), &flag, &status);
44        if(flag)
45        {
46          Debug("find message in mpi comm \n");
47          mpi_source = status.MPI_SOURCE;
48          int tag = status.MPI_TAG;
49          ::MPI_Mprobe(mpi_source, tag, to_mpi_comm(comm.mpi_comm), &message, &status);
50
51        }
52      }
53      #elif _intelmpi
54      #pragma omp critical (_mpi_call)
55      {
56        ::MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, to_mpi_comm(comm.mpi_comm), &flag, &status);
57        if(flag)
58        {
59          Debug("find message in mpi comm \n");
60          mpi_source = status.MPI_SOURCE;
61          int tag = status.MPI_TAG;
62          ::MPI_Mprobe(mpi_source, tag, to_mpi_comm(comm.mpi_comm), &message, &status);
63
64        }
65      }
66      //::MPI_Improbe(MPI_ANY_SOURCE, MPI_ANY_TAG, to_mpi_comm(comm.mpi_comm), &flag, &message, &status);
67      #endif
68     
69      if(flag)
70      {
71
72        MPI_Message *msg_block = new MPI_Message; 
73        msg_block->mpi_message = message; 
74        msg_block->ep_tag = bitset<15>(status.MPI_TAG >> 16).to_ulong(); 
75        int src_loc       = bitset<8> (status.MPI_TAG >> 8) .to_ulong(); 
76        int dest_loc      = bitset<8> (status.MPI_TAG)      .to_ulong();
77        int src_mpi       = status.MPI_SOURCE;
78             
79        msg_block->ep_src  = get_ep_rank(comm, src_loc,  src_mpi);       
80        msg_block->mpi_status = new ::MPI_Status(status);
81
82        MPI_Comm* ptr_comm_list = comm.ep_comm_ptr->comm_list;
83        MPI_Comm* ptr_comm_target = &ptr_comm_list[dest_loc];
84
85
86        #pragma omp critical (_query)
87        {
88          #pragma omp flush
89          comm.ep_comm_ptr->comm_list[dest_loc].ep_comm_ptr->message_queue->push_back(*msg_block);     
90          #pragma omp flush
91        }
92       
93        delete msg_block;       
94      }
95
96    }
97
98    return MPI_SUCCESS;
99  }
100
101
102
103  int Message_Check_intercomm(MPI_Comm comm)
104  {
105    if(!comm.ep_comm_ptr->intercomm->mpi_inter_comm) return 0;
106
107    Debug("Message probing for intercomm\n");
108
109    int flag = true;
110    ::MPI_Message message;
111    ::MPI_Status status;
112    int mpi_source;
113    int current_ep_rank;
114    MPI_Comm_rank(comm, &current_ep_rank);
115
116    while(flag) // loop until the end of global queue "comm.ep_comm_ptr->intercomm->mpi_inter_comm"
117    {
118      Debug("Message probing for intracomm\n");
119
120      #ifdef _openmpi
121      #pragma omp critical (_mpi_call)
122      {
123        ::MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, to_mpi_comm(comm.ep_comm_ptr->intercomm->mpi_inter_comm), &flag, &status);
124        if(flag)
125        {
126          Debug("find message in mpi comm \n");
127          mpi_source = status.MPI_SOURCE;
128          int tag = status.MPI_TAG;
129          ::MPI_Mprobe(mpi_source, tag, to_mpi_comm(comm.ep_comm_ptr->intercomm->mpi_inter_comm), &message, &status);
130
131        }
132      }
133      #elif _intelmpi
134      #pragma omp critical (_mpi_call)
135      {
136        ::MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, to_mpi_comm(comm.ep_comm_ptr->intercomm->mpi_inter_comm), &flag, &status);
137        if(flag)
138        {
139          Debug("find message in mpi comm \n");
140          mpi_source = status.MPI_SOURCE;
141          int tag = status.MPI_TAG;
142          ::MPI_Mprobe(mpi_source, tag, to_mpi_comm(comm.ep_comm_ptr->intercomm->mpi_inter_comm), &message, &status);
143
144        }
145      }
146      //::MPI_Improbe(MPI_ANY_SOURCE, MPI_ANY_TAG, to_mpi_comm(comm.ep_comm_ptr->intercomm->mpi_inter_comm), &flag, &message, &status);       
147      #endif
148
149      if(flag)
150      {
151
152        MPI_Message *msg_block = new MPI_Message;
153
154        msg_block->mpi_message = message;
155        msg_block->ep_tag = bitset<15>(status.MPI_TAG >> 16).to_ulong();
156        int src_loc       = bitset<8> (status.MPI_TAG >> 8) .to_ulong();
157        int dest_loc      = bitset<8> (status.MPI_TAG)      .to_ulong();
158        int src_mpi       = status.MPI_SOURCE;
159        int current_inter = comm.ep_comm_ptr->intercomm->local_rank_map->at(current_ep_rank).first;
160             
161        msg_block->ep_src  = get_ep_rank_intercomm(comm, src_loc,  src_mpi);
162        msg_block->mpi_status = new ::MPI_Status(status);
163
164
165        MPI_Comm* ptr_comm_list = comm.ep_comm_ptr->comm_list;
166        MPI_Comm* ptr_comm_target = &ptr_comm_list[dest_loc];
167
168
169        #pragma omp critical (_query)
170        {
171          #pragma omp flush
172          comm.ep_comm_ptr->comm_list[dest_loc].ep_comm_ptr->message_queue->push_back(*msg_block);
173          #pragma omp flush
174        }
175       
176        delete msg_block;
177       
178      }
179
180    }
181
182    flag = true;
183    while(flag) // loop until the end of global queue "comm.mpi_comm"
184    {
185      Debug("Message probing for intracomm\n");
186     
187      #ifdef _openmpi
188      #pragma omp critical (_mpi_call)
189      {
190        ::MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, to_mpi_comm(comm.mpi_comm), &flag, &status);
191        if(flag)
192        {
193          Debug("find message in mpi comm \n");
194          mpi_source = status.MPI_SOURCE;
195          int tag = status.MPI_TAG;
196          ::MPI_Mprobe(mpi_source, tag, to_mpi_comm(comm.mpi_comm), &message, &status);
197
198        }
199      }
200      #elif _intelmpi
201      #pragma omp critical (_mpi_call)
202      {
203        ::MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, to_mpi_comm(comm.mpi_comm), &flag, &status);
204        if(flag)
205        {
206          Debug("find message in mpi comm \n");
207          mpi_source = status.MPI_SOURCE;
208          int tag = status.MPI_TAG;
209          ::MPI_Mprobe(mpi_source, tag, to_mpi_comm(comm.mpi_comm), &message, &status);
210
211        }
212      }
213      //::MPI_Improbe(MPI_ANY_SOURCE, MPI_ANY_TAG, to_mpi_comm(comm.mpi_comm), &flag, &message, &status);       
214      #endif
215
216      if(flag)
217      {
218
219        MPI_Message *msg_block = new MPI_Message;
220       
221        msg_block->mpi_message = message;
222        msg_block->ep_tag = bitset<15>(status.MPI_TAG >> 16).to_ulong();
223        int src_loc       = bitset<8> (status.MPI_TAG >> 8) .to_ulong();
224        int dest_loc      = bitset<8> (status.MPI_TAG)      .to_ulong();
225        int src_mpi       = status.MPI_SOURCE;
226       
227        msg_block->ep_src  = get_ep_rank_intercomm(comm, src_loc, src_mpi);
228        msg_block->mpi_status = new ::MPI_Status(status);
229       
230
231        MPI_Comm* ptr_comm_list = comm.ep_comm_ptr->comm_list;
232        MPI_Comm* ptr_comm_target = &ptr_comm_list[dest_loc];
233
234
235        #pragma omp critical (_query)
236        {
237          #pragma omp flush
238          comm.ep_comm_ptr->comm_list[dest_loc].ep_comm_ptr->message_queue->push_back(*msg_block);
239          #pragma omp flush
240        }
241       
242        delete msg_block;
243       
244      }
245
246    }
247
248    return MPI_SUCCESS;
249  }
250
251  int Request_Check()
252  {
253    MPI_Status status;
254    MPI_Message message;
255    int probed = false;
256    int recv_count = 0;
257    std::list<MPI_Request* >::iterator it;
258   
259    for(it = EP_PendingRequests->begin(); it!=EP_PendingRequests->end(); it++)
260    { 
261      Message_Check((*it)->comm);
262    }
263
264
265    for(it = EP_PendingRequests->begin(); it!=EP_PendingRequests->end(); )
266    {
267      MPI_Improbe((*it)->ep_src, (*it)->ep_tag, (*it)->comm, &probed, &message, &status);
268      if(probed)
269      {
270        MPI_Get_count(&status, (*it)->ep_datatype, &recv_count);
271        MPI_Imrecv((*it)->buf, recv_count, (*it)->ep_datatype, &message, *it);
272        (*it)->type = 3;
273        EP_PendingRequests->erase(it);
274        it = EP_PendingRequests->begin();
275        continue;
276      }
277      it++;
278    }
279  }
280
281}
Note: See TracBrowser for help on using the repository browser.