source: XIOS/dev/branch_openmp/extern/src_ep_dev/ep_message.cpp @ 1289

Last change on this file since 1289 was 1289, checked in by yushan, 7 years ago

EP update part 2

File size: 7.1 KB
Line 
1/*!
2   \file ep_message.cpp
3   \since 2 may 2016
4
5   \brief Definitions of MPI endpoint function: Message_Check
6 */
7
8#include "ep_lib.hpp"
9#include <mpi.h>
10#include "ep_declaration.hpp"
11
12using namespace std;
13
14extern std::list< ep_lib::MPI_Request* > * EP_PendingRequests;
15#pragma omp threadprivate(EP_PendingRequests)
16
17namespace ep_lib
18{
19
20  int Message_Check(MPI_Comm comm)
21  {
22    int myRank;
23    MPI_Comm_rank(comm, &myRank);
24
25    if(!comm.is_ep) return 0;
26
27    if(comm.is_intercomm)
28    {
29      return  Message_Check_intercomm(comm);
30    }
31
32    int flag = true;
33    ::MPI_Message message;
34    ::MPI_Status status;
35    int mpi_source;
36
37    while(flag) // loop until the end of global queue
38    {
39      Debug("Message probing for intracomm\n");
40      ::MPI_Comm mpi_comm = static_cast< ::MPI_Comm> (comm.mpi_comm);
41      #ifdef _openmpi
42      #pragma omp critical (_mpi_call)
43      {
44        ::MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, mpi_comm, &flag, &status);
45        if(flag)
46        {
47          Debug("find message in mpi comm \n");
48          mpi_source = status.MPI_SOURCE;
49          int tag = status.MPI_TAG;
50          ::MPI_Mprobe(mpi_source, tag, mpi_comm, &message, &status);
51
52        }
53      }
54      #elif _intelmpi
55      #pragma omp critical (_mpi_call)
56      ::MPI_Improbe(MPI_ANY_SOURCE, MPI_ANY_TAG, mpi_comm, &flag, &message, &status); 
57      #endif
58     
59      if(flag)
60      {
61
62        MPI_Message *msg_block = new MPI_Message; 
63        msg_block->mpi_message = message; 
64        msg_block->ep_tag = bitset<15>(status.MPI_TAG >> 16).to_ulong(); 
65        int src_loc       = bitset<8> (status.MPI_TAG >> 8) .to_ulong(); 
66        int dest_loc      = bitset<8> (status.MPI_TAG)      .to_ulong();
67        int src_mpi       = status.MPI_SOURCE;
68             
69        msg_block->ep_src  = get_ep_rank(comm, src_loc,  src_mpi);       
70        int dest_mpi = comm.ep_comm_ptr->size_rank_info[2].first;
71        int ep_dest = get_ep_rank(comm, dest_loc, dest_mpi);
72        msg_block->mpi_status = new ::MPI_Status(status);
73
74        MPI_Comm* ptr_comm_list = comm.ep_comm_ptr->comm_list;
75        MPI_Comm* ptr_comm_target = &ptr_comm_list[dest_loc];
76
77
78        #pragma omp critical (_query)
79        {
80          #pragma omp flush
81          ptr_comm_target->ep_comm_ptr->message_queue->push_back(*msg_block); 
82   
83          #pragma omp flush
84        }
85       
86        delete msg_block;       
87      }
88
89    }
90
91    return MPI_SUCCESS;
92  }
93
94
95
96  int Message_Check_intercomm(MPI_Comm comm)
97  {
98    if(!comm.ep_comm_ptr->intercomm->mpi_inter_comm) return 0;
99
100    Debug("Message probing for intercomm\n");
101
102    int flag = true;
103    ::MPI_Message message;
104    ::MPI_Status status;
105    int mpi_source;
106    int current_ep_rank;
107    MPI_Comm_rank(comm, &current_ep_rank);
108
109    while(flag) // loop until the end of global queue "comm.ep_comm_ptr->intercomm->mpi_inter_comm"
110    {
111      Debug("Message probing for intracomm\n");
112      ::MPI_Comm mpi_comm = static_cast< ::MPI_Comm> (comm.ep_comm_ptr->intercomm->mpi_inter_comm);  // => mpi_intercomm
113     
114      #ifdef _openmpi
115      #pragma omp critical (_mpi_call)
116      {
117        ::MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, mpi_comm, &flag, &status);
118        if(flag)
119        {
120          Debug("find message in mpi comm \n");
121          mpi_source = status.MPI_SOURCE;
122          int tag = status.MPI_TAG;
123          ::MPI_Mprobe(mpi_source, tag, mpi_comm, &message, &status);
124
125        }
126      }
127      #elif _intelmpi
128      #pragma omp critical (_mpi_call)
129      ::MPI_Improbe(MPI_ANY_SOURCE, MPI_ANY_TAG, mpi_comm, &flag, &message, &status);       
130      #endif
131
132      if(flag)
133      {
134
135        MPI_Message *msg_block = new MPI_Message;
136
137        msg_block->mpi_message = message;
138        msg_block->ep_tag = bitset<15>(status.MPI_TAG >> 16).to_ulong();
139        int src_loc       = bitset<8> (status.MPI_TAG >> 8) .to_ulong();
140        int dest_loc      = bitset<8> (status.MPI_TAG)      .to_ulong();
141        int src_mpi       = status.MPI_SOURCE;
142        int current_inter = comm.ep_comm_ptr->intercomm->local_rank_map->at(current_ep_rank).first;
143             
144        msg_block->ep_src  = get_ep_rank_intercomm(comm, src_loc,  src_mpi);
145        msg_block->mpi_status = new ::MPI_Status(status);
146
147
148        MPI_Comm* ptr_comm_list = comm.ep_comm_ptr->comm_list;
149        MPI_Comm* ptr_comm_target = &ptr_comm_list[dest_loc];
150
151
152        #pragma omp critical (_query)
153        {
154          #pragma omp flush
155          ptr_comm_target->ep_comm_ptr->message_queue->push_back(*msg_block);
156          #pragma omp flush
157        }
158       
159        delete msg_block;
160       
161      }
162
163    }
164
165    flag = true;
166    while(flag) // loop until the end of global queue "comm.mpi_comm"
167    {
168      Debug("Message probing for intracomm\n");
169      ::MPI_Comm mpi_comm = static_cast< ::MPI_Comm> (comm.mpi_comm);
170      #ifdef _openmpi
171      #pragma omp critical (_mpi_call)
172      {
173        ::MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, mpi_comm, &flag, &status);
174        if(flag)
175        {
176          Debug("find message in mpi comm \n");
177          mpi_source = status.MPI_SOURCE;
178          int tag = status.MPI_TAG;
179          ::MPI_Mprobe(mpi_source, tag, mpi_comm, &message, &status);
180
181        }
182      }
183      #elif _intelmpi
184      #pragma omp critical (_mpi_call)
185      ::MPI_Improbe(MPI_ANY_SOURCE, MPI_ANY_TAG, mpi_comm, &flag, &message, &status);       
186      #endif
187
188      if(flag)
189      {
190
191        MPI_Message *msg_block = new MPI_Message;
192       
193        msg_block->mpi_message = message;
194        msg_block->ep_tag = bitset<15>(status.MPI_TAG >> 16).to_ulong();
195        int src_loc       = bitset<8> (status.MPI_TAG >> 8) .to_ulong();
196        int dest_loc      = bitset<8> (status.MPI_TAG)      .to_ulong();
197        int src_mpi       = status.MPI_SOURCE;
198        int current_inter = comm.ep_comm_ptr->intercomm->local_rank_map->at(current_ep_rank).first;
199       
200        msg_block->ep_src  = get_ep_rank_intercomm(comm, src_loc, src_mpi);
201        msg_block->mpi_status = new ::MPI_Status(status);
202       
203
204        MPI_Comm* ptr_comm_list = comm.ep_comm_ptr->comm_list;
205        MPI_Comm* ptr_comm_target = &ptr_comm_list[dest_loc];
206
207
208        #pragma omp critical (_query)
209        {
210          #pragma omp flush
211          ptr_comm_target->ep_comm_ptr->message_queue->push_back(*msg_block);
212          #pragma omp flush
213        }
214       
215        delete msg_block;
216       
217      }
218
219    }
220
221    return MPI_SUCCESS;
222  }
223
224  int Request_Check()
225  {
226    MPI_Status status;
227    MPI_Message message;
228    int probed = false;
229    int recv_count = 0;
230    std::list<MPI_Request* >::iterator it;
231   
232    for(it = EP_PendingRequests->begin(); it!=EP_PendingRequests->end(); it++)
233    { 
234      Message_Check((*it)->comm);
235    }
236
237
238    for(it = EP_PendingRequests->begin(); it!=EP_PendingRequests->end(); )
239    {
240      MPI_Improbe((*it)->ep_src, (*it)->ep_tag, (*it)->comm, &probed, &message, &status);
241      if(probed)
242      {
243        MPI_Get_count(&status, (*it)->ep_datatype, &recv_count);
244        MPI_Imrecv((*it)->buf, recv_count, (*it)->ep_datatype, &message, *it);
245        (*it)->type = 3;
246        EP_PendingRequests->erase(it);
247        it = EP_PendingRequests->begin();
248        continue;
249      }
250      it++;
251    }
252  }
253
254}
Note: See TracBrowser for help on using the repository browser.