source: XIOS/dev/branch_openmp/extern/src_ep_dev/ep_message.cpp @ 1362

Last change on this file since 1362 was 1362, checked in by yushan, 6 years ago

unify type : MPI_Message MPI_Info

File size: 7.0 KB
Line 
1/*!
2   \file ep_message.cpp
3   \since 2 may 2016
4
5   \brief Definitions of MPI endpoint function: Message_Check
6 */
7
8#include "ep_lib.hpp"
9#include <mpi.h>
10#include "ep_declaration.hpp"
11#include "ep_mpi.hpp"
12
13using namespace std;
14
15extern std::list< ep_lib::MPI_Request* > * EP_PendingRequests;
16#pragma omp threadprivate(EP_PendingRequests)
17
18namespace ep_lib
19{
20
21  int Message_Check(MPI_Comm comm)
22  {
23    if(!comm.is_ep) return 0;
24
25    if(comm.is_intercomm)
26    {
27      return  Message_Check_intercomm(comm);
28    }
29
30    int flag = true;
31    ::MPI_Message message;
32    ::MPI_Status status;
33    int mpi_source;
34
35    while(flag) // loop until the end of global queue
36    {
37      Debug("Message probing for intracomm\n");
38     
39
40      //#ifdef _openmpi
41      #pragma omp critical (_mpi_call)
42      {
43        ::MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, to_mpi_comm(comm.mpi_comm), &flag, &status);
44        if(flag)
45        {
46          Debug("find message in mpi comm \n");
47          mpi_source = status.MPI_SOURCE;
48          int tag = status.MPI_TAG;
49          ::MPI_Mprobe(mpi_source, tag, to_mpi_comm(comm.mpi_comm), &message, &status);
50
51        }
52      }
53      //#elif _intelmpi
54      //#pragma omp critical (_mpi_call)
55      //{
56      //  ::MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, to_mpi_comm(comm.mpi_comm), &flag, &status);
57      //  if(flag)
58      //  {
59      //    Debug("find message in mpi comm \n");
60      //    mpi_source = status.MPI_SOURCE;
61      //    int tag = status.MPI_TAG;
62      //    ::MPI_Mprobe(mpi_source, tag, to_mpi_comm(comm.mpi_comm), &message, &status);
63      //  }
64      //}
65      //#endif
66     
67      if(flag)
68      {
69
70        MPI_Message *msg_block = new MPI_Message; 
71        msg_block->mpi_message = new ::MPI_Message;
72        *(static_cast< ::MPI_Message*>(msg_block->mpi_message)) = message; 
73        msg_block->ep_tag = bitset<15>(status.MPI_TAG >> 16).to_ulong(); 
74        int src_loc       = bitset<8> (status.MPI_TAG >> 8) .to_ulong(); 
75        int dest_loc      = bitset<8> (status.MPI_TAG)      .to_ulong();
76        int src_mpi       = status.MPI_SOURCE;
77             
78        msg_block->ep_src  = get_ep_rank(comm, src_loc,  src_mpi);       
79        msg_block->mpi_status = new ::MPI_Status(status);
80
81        MPI_Comm* ptr_comm_list = comm.ep_comm_ptr->comm_list;
82        MPI_Comm* ptr_comm_target = &ptr_comm_list[dest_loc];
83
84
85        #pragma omp critical (_query)
86        {
87          #pragma omp flush
88          comm.ep_comm_ptr->comm_list[dest_loc].ep_comm_ptr->message_queue->push_back(*msg_block);     
89          #pragma omp flush
90        }
91       
92        delete msg_block;       
93      }
94
95    }
96
97    return MPI_SUCCESS;
98  }
99
100
101
102  int Message_Check_intercomm(MPI_Comm comm)
103  {
104    if(!comm.ep_comm_ptr->intercomm->mpi_inter_comm) return 0;
105
106    Debug("Message probing for intercomm\n");
107
108    int flag = true;
109    ::MPI_Message message;
110    ::MPI_Status status;
111    int mpi_source;
112    int current_ep_rank;
113    MPI_Comm_rank(comm, &current_ep_rank);
114
115    while(flag) // loop until the end of global queue "comm.ep_comm_ptr->intercomm->mpi_inter_comm"
116    {
117      Debug("Message probing for intracomm\n");
118
119      //#ifdef _openmpi
120      #pragma omp critical (_mpi_call)
121      {
122        ::MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, to_mpi_comm(comm.ep_comm_ptr->intercomm->mpi_inter_comm), &flag, &status);
123        if(flag)
124        {
125          Debug("find message in mpi comm \n");
126          mpi_source = status.MPI_SOURCE;
127          int tag = status.MPI_TAG;
128          ::MPI_Mprobe(mpi_source, tag, to_mpi_comm(comm.ep_comm_ptr->intercomm->mpi_inter_comm), &message, &status);
129
130        }
131      }
132     
133
134      if(flag)
135      {
136
137        MPI_Message *msg_block = new MPI_Message;
138        msg_block->mpi_message = new ::MPI_Message;
139        *(static_cast< ::MPI_Message*>(msg_block->mpi_message)) = message;
140        msg_block->ep_tag = bitset<15>(status.MPI_TAG >> 16).to_ulong();
141        int src_loc       = bitset<8> (status.MPI_TAG >> 8) .to_ulong();
142        int dest_loc      = bitset<8> (status.MPI_TAG)      .to_ulong();
143        int src_mpi       = status.MPI_SOURCE;
144        int current_inter = comm.ep_comm_ptr->intercomm->local_rank_map->at(current_ep_rank).first;
145             
146        msg_block->ep_src  = get_ep_rank_intercomm(comm, src_loc,  src_mpi);
147        msg_block->mpi_status = new ::MPI_Status(status);
148
149
150        MPI_Comm* ptr_comm_list = comm.ep_comm_ptr->comm_list;
151        MPI_Comm* ptr_comm_target = &ptr_comm_list[dest_loc];
152
153
154        #pragma omp critical (_query)
155        {
156          #pragma omp flush
157          comm.ep_comm_ptr->comm_list[dest_loc].ep_comm_ptr->message_queue->push_back(*msg_block);
158          #pragma omp flush
159        }
160       
161        delete msg_block;
162       
163      }
164
165    }
166
167    flag = true;
168    while(flag) // loop until the end of global queue "comm.mpi_comm"
169    {
170      Debug("Message probing for intracomm\n");
171     
172      //#ifdef _openmpi
173      #pragma omp critical (_mpi_call)
174      {
175        ::MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, to_mpi_comm(comm.mpi_comm), &flag, &status);
176        if(flag)
177        {
178          Debug("find message in mpi comm \n");
179          mpi_source = status.MPI_SOURCE;
180          int tag = status.MPI_TAG;
181          ::MPI_Mprobe(mpi_source, tag, to_mpi_comm(comm.mpi_comm), &message, &status);
182
183        }
184      }
185     
186
187      if(flag)
188      {
189
190        MPI_Message *msg_block = new MPI_Message;
191        msg_block->mpi_message = new ::MPI_Message;
192        *(static_cast< ::MPI_Message*>(msg_block->mpi_message)) = message;
193        msg_block->ep_tag = bitset<15>(status.MPI_TAG >> 16).to_ulong();
194        int src_loc       = bitset<8> (status.MPI_TAG >> 8) .to_ulong();
195        int dest_loc      = bitset<8> (status.MPI_TAG)      .to_ulong();
196        int src_mpi       = status.MPI_SOURCE;
197       
198        msg_block->ep_src  = get_ep_rank_intercomm(comm, src_loc, src_mpi);
199        msg_block->mpi_status = new ::MPI_Status(status);
200       
201
202        MPI_Comm* ptr_comm_list = comm.ep_comm_ptr->comm_list;
203        MPI_Comm* ptr_comm_target = &ptr_comm_list[dest_loc];
204
205
206        #pragma omp critical (_query)
207        {
208          #pragma omp flush
209          comm.ep_comm_ptr->comm_list[dest_loc].ep_comm_ptr->message_queue->push_back(*msg_block);
210          #pragma omp flush
211        }
212       
213        delete msg_block;
214       
215      }
216
217    }
218
219    return MPI_SUCCESS;
220  }
221
222  int Request_Check()
223  {
224    MPI_Status status;
225    MPI_Message message;
226    int probed = false;
227    int recv_count = 0;
228    std::list<MPI_Request* >::iterator it;
229   
230    for(it = EP_PendingRequests->begin(); it!=EP_PendingRequests->end(); it++)
231    { 
232      Message_Check((*it)->comm);
233    }
234
235
236    for(it = EP_PendingRequests->begin(); it!=EP_PendingRequests->end(); )
237    {
238      MPI_Improbe((*it)->ep_src, (*it)->ep_tag, (*it)->comm, &probed, &message, &status);
239      if(probed)
240      {
241        MPI_Get_count(&status, (*it)->ep_datatype, &recv_count);
242        MPI_Imrecv((*it)->buf, recv_count, (*it)->ep_datatype, &message, *it);
243        (*it)->type = 3;
244        EP_PendingRequests->erase(it);
245        it = EP_PendingRequests->begin();
246        continue;
247      }
248      it++;
249    }
250  }
251
252}
Note: See TracBrowser for help on using the repository browser.