source: XIOS/dev/branch_openmp/extern/src_ep_dev/ep_message.cpp @ 1287

Last change on this file since 1287 was 1287, checked in by yushan, 7 years ago

EP updated

File size: 6.8 KB
Line 
1/*!
2   \file ep_message.cpp
3   \since 2 may 2016
4
5   \brief Definitions of MPI endpoint function: Message_Check
6 */
7
8#include "ep_lib.hpp"
9#include <mpi.h>
10#include "ep_declaration.hpp"
11#include "ep_mpi.hpp"
12
13using namespace std;
14
15extern std::list< ep_lib::MPI_Request* > * EP_PendingRequests;
16#pragma omp threadprivate(EP_PendingRequests)
17
18namespace ep_lib
19{
20
21  int Message_Check(MPI_Comm comm)
22  {
23    if(!comm.is_ep) return 0;
24
25    if(comm.is_intercomm)
26    {
27      return  Message_Check_intercomm(comm);
28    }
29
30    int flag = true;
31    ::MPI_Message message;
32    ::MPI_Status status;
33    int mpi_source;
34
35    while(flag) // loop until the end of global queue
36    {
37      Debug("Message probing for intracomm\n");
38     
39
40      #ifdef _openmpi
41      #pragma omp critical (_mpi_call)
42      {
43        ::MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, to_mpi_comm(comm.mpi_comm), &flag, &status);
44        if(flag)
45        {
46          Debug("find message in mpi comm \n");
47          mpi_source = status.MPI_SOURCE;
48          int tag = status.MPI_TAG;
49          ::MPI_Mprobe(mpi_source, tag, to_mpi_comm(comm.mpi_comm), &message, &status);
50
51        }
52      }
53      #elif _intelmpi
54      ::MPI_Improbe(MPI_ANY_SOURCE, MPI_ANY_TAG, to_mpi_comm(comm.mpi_comm), &flag, &message, &status); 
55      #endif
56     
57      if(flag)
58      {
59
60        MPI_Message *msg_block = new MPI_Message; 
61        msg_block->mpi_message = message; 
62        msg_block->ep_tag = bitset<15>(status.MPI_TAG >> 16).to_ulong(); 
63        int src_loc       = bitset<8> (status.MPI_TAG >> 8) .to_ulong(); 
64        int dest_loc      = bitset<8> (status.MPI_TAG)      .to_ulong();
65        int src_mpi       = status.MPI_SOURCE;
66             
67        msg_block->ep_src  = get_ep_rank(comm, src_loc,  src_mpi);       
68        msg_block->mpi_status = new ::MPI_Status(status);
69
70        MPI_Comm* ptr_comm_list = comm.ep_comm_ptr->comm_list;
71        MPI_Comm* ptr_comm_target = &ptr_comm_list[dest_loc];
72
73
74        #pragma omp critical (_query)
75        {
76          #pragma omp flush
77          comm.ep_comm_ptr->comm_list[dest_loc].ep_comm_ptr->message_queue->push_back(*msg_block);     
78          #pragma omp flush
79        }
80       
81        delete msg_block;       
82      }
83
84    }
85
86    return MPI_SUCCESS;
87  }
88
89
90
91  int Message_Check_intercomm(MPI_Comm comm)
92  {
93    if(!comm.ep_comm_ptr->intercomm->mpi_inter_comm) return 0;
94
95    Debug("Message probing for intercomm\n");
96
97    int flag = true;
98    ::MPI_Message message;
99    ::MPI_Status status;
100    int mpi_source;
101    int current_ep_rank;
102    MPI_Comm_rank(comm, &current_ep_rank);
103
104    while(flag) // loop until the end of global queue "comm.ep_comm_ptr->intercomm->mpi_inter_comm"
105    {
106      Debug("Message probing for intracomm\n");
107
108      #ifdef _openmpi
109      #pragma omp critical (_mpi_call)
110      {
111        ::MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, to_mpi_comm(comm.ep_comm_ptr->intercomm->mpi_inter_comm), &flag, &status);
112        if(flag)
113        {
114          Debug("find message in mpi comm \n");
115          mpi_source = status.MPI_SOURCE;
116          int tag = status.MPI_TAG;
117          ::MPI_Mprobe(mpi_source, tag, to_mpi_comm(comm.ep_comm_ptr->intercomm->mpi_inter_comm), &message, &status);
118
119        }
120      }
121      #elif _intelmpi
122      ::MPI_Improbe(MPI_ANY_SOURCE, MPI_ANY_TAG, to_mpi_comm(comm.ep_comm_ptr->intercomm->mpi_inter_comm), &flag, &message, &status);       
123      #endif
124
125      if(flag)
126      {
127
128        MPI_Message *msg_block = new MPI_Message;
129
130        msg_block->mpi_message = message;
131        msg_block->ep_tag = bitset<15>(status.MPI_TAG >> 16).to_ulong();
132        int src_loc       = bitset<8> (status.MPI_TAG >> 8) .to_ulong();
133        int dest_loc      = bitset<8> (status.MPI_TAG)      .to_ulong();
134        int src_mpi       = status.MPI_SOURCE;
135        int current_inter = comm.ep_comm_ptr->intercomm->local_rank_map->at(current_ep_rank).first;
136             
137        msg_block->ep_src  = get_ep_rank_intercomm(comm, src_loc,  src_mpi);
138        msg_block->mpi_status = new ::MPI_Status(status);
139
140
141        MPI_Comm* ptr_comm_list = comm.ep_comm_ptr->comm_list;
142        MPI_Comm* ptr_comm_target = &ptr_comm_list[dest_loc];
143
144
145        #pragma omp critical (_query)
146        {
147          #pragma omp flush
148          comm.ep_comm_ptr->comm_list[dest_loc].ep_comm_ptr->message_queue->push_back(*msg_block);
149          #pragma omp flush
150        }
151       
152        delete msg_block;
153       
154      }
155
156    }
157
158    flag = true;
159    while(flag) // loop until the end of global queue "comm.mpi_comm"
160    {
161      Debug("Message probing for intracomm\n");
162     
163      #ifdef _openmpi
164      #pragma omp critical (_mpi_call)
165      {
166        ::MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, to_mpi_comm(comm.mpi_comm), &flag, &status);
167        if(flag)
168        {
169          Debug("find message in mpi comm \n");
170          mpi_source = status.MPI_SOURCE;
171          int tag = status.MPI_TAG;
172          ::MPI_Mprobe(mpi_source, tag, to_mpi_comm(comm.mpi_comm), &message, &status);
173
174        }
175      }
176      #elif _intelmpi
177      ::MPI_Improbe(MPI_ANY_SOURCE, MPI_ANY_TAG, to_mpi_comm(comm.mpi_comm), &flag, &message, &status);       
178      #endif
179
180      if(flag)
181      {
182
183        MPI_Message *msg_block = new MPI_Message;
184       
185        msg_block->mpi_message = message;
186        msg_block->ep_tag = bitset<15>(status.MPI_TAG >> 16).to_ulong();
187        int src_loc       = bitset<8> (status.MPI_TAG >> 8) .to_ulong();
188        int dest_loc      = bitset<8> (status.MPI_TAG)      .to_ulong();
189        int src_mpi       = status.MPI_SOURCE;
190       
191        msg_block->ep_src  = get_ep_rank_intercomm(comm, src_loc, src_mpi);
192        msg_block->mpi_status = new ::MPI_Status(status);
193       
194
195        MPI_Comm* ptr_comm_list = comm.ep_comm_ptr->comm_list;
196        MPI_Comm* ptr_comm_target = &ptr_comm_list[dest_loc];
197
198
199        #pragma omp critical (_query)
200        {
201          #pragma omp flush
202          comm.ep_comm_ptr->comm_list[dest_loc].ep_comm_ptr->message_queue->push_back(*msg_block);
203          #pragma omp flush
204        }
205       
206        delete msg_block;
207       
208      }
209
210    }
211
212    return MPI_SUCCESS;
213  }
214
215  int Request_Check()
216  {
217    MPI_Status status;
218    MPI_Message message;
219    int probed = false;
220    int recv_count = 0;
221    std::list<MPI_Request* >::iterator it;
222   
223    for(it = EP_PendingRequests->begin(); it!=EP_PendingRequests->end(); it++)
224    { 
225      Message_Check((*it)->comm);
226    }
227
228
229    for(it = EP_PendingRequests->begin(); it!=EP_PendingRequests->end(); )
230    {
231      MPI_Improbe((*it)->ep_src, (*it)->ep_tag, (*it)->comm, &probed, &message, &status);
232      if(probed)
233      {
234        MPI_Get_count(&status, (*it)->ep_datatype, &recv_count);
235        MPI_Imrecv((*it)->buf, recv_count, (*it)->ep_datatype, &message, *it);
236        (*it)->type = 3;
237        EP_PendingRequests->erase(it);
238        it = EP_PendingRequests->begin();
239        continue;
240      }
241      it++;
242    }
243  }
244
245}
Note: See TracBrowser for help on using the repository browser.