source: XIOS/dev/branch_yushan/extern/src_ep_dev/ep_probe.cpp @ 1110

Last change on this file since 1110 was 1110, checked in by yushan, 7 years ago

redefinition of mpi_any_source and mpi_any_tag

File size: 4.0 KB
Line 
1#include "ep_lib.hpp"
2#include <mpi.h>
3#include "ep_declaration.hpp"
4
5namespace ep_lib
6{
7
8
9
10  int MPI_Iprobe(int src, int tag, MPI_Comm comm, int *flag, MPI_Status *status)
11  {
12    *flag = false;
13
14    if(!comm.is_ep)
15    {
16      ::MPI_Comm mpi_comm = static_cast< ::MPI_Comm >(comm.mpi_comm);
17      ::MPI_Status *mpi_status = static_cast< ::MPI_Status* >(status->mpi_status);
18      ::MPI_Iprobe(src<0? MPI_ANY_SOURCE : src, tag<0? MPI_ANY_TAG: tag, mpi_comm, flag, mpi_status);
19
20      status->mpi_status = mpi_status;
21      status->ep_src = src;
22      status->ep_tag = tag;
23      return 0;
24    }
25
26    Debug("calling MPI_Iprobe EP\n");
27
28    Message_Check(comm);
29
30    #pragma omp flush
31
32   
33    //printf("iprobe, message queue size = %lu, queue = %p\n", comm.ep_comm_ptr->message_queue->size(), comm.ep_comm_ptr->message_queue);
34
35    #pragma omp flush
36
37    #pragma omp critical (_query)
38    if(!comm.ep_comm_ptr->message_queue->empty())
39    {
40      for(Message_list::iterator it = comm.ep_comm_ptr->message_queue->begin(); it!= comm.ep_comm_ptr->message_queue->end(); ++it)
41      {
42        bool src_matched = src<0? true: it->ep_src == src;
43        bool tag_matched = tag<0? true: it->ep_tag == tag;
44       
45        if(src_matched && tag_matched)       
46        {
47          Debug("find message\n");
48          // printf("iprobe, find in local message queue %p, src = %d, tag = %d\n", comm.ep_comm_ptr->message_queue, it->ep_src, it->ep_tag);
49
50          *flag = true;
51
52          ::MPI_Status mpi_status = *(static_cast< ::MPI_Status *>(it->mpi_status));
53
54          status->mpi_status = new ::MPI_Status(mpi_status);
55          status->ep_src = it->ep_src;
56          status->ep_tag = it->ep_tag;
57
58          break;
59        }
60
61      }
62    }
63
64    return 0;
65  }
66
67
68
69  int MPI_Improbe(int src, int tag, MPI_Comm comm, int *flag, MPI_Message *message, MPI_Status *status)
70  {
71    *flag = false;
72    if(!comm.is_ep)
73    {
74      Debug("calling MPI_Improbe MPI\n");
75
76        ::MPI_Comm mpi_comm = static_cast< ::MPI_Comm>(comm.mpi_comm);
77        ::MPI_Status mpi_status;
78      ::MPI_Message mpi_message;
79
80        #ifdef _openmpi
81      #pragma omp critical (_mpi_call)
82      {
83        ::MPI_Iprobe(src<0? MPI_ANY_SOURCE : src, tag<0? MPI_ANY_TAG: tag, mpi_comm, flag, &mpi_status);
84        if(*flag)
85        {
86          ::MPI_Mprobe(src<0? MPI_ANY_SOURCE : src, tag<0? MPI_ANY_TAG: tag, mpi_comm, &mpi_message, &mpi_status);
87        }
88      }
89      #elif _intelmpi
90        ::MPI_Improbe(src<0? MPI_ANY_SOURCE : src, tag<0? MPI_ANY_TAG: tag, mpi_comm, flag, &mpi_message, &mpi_status);
91      #endif
92       
93      status->mpi_status = new ::MPI_Status(mpi_status);
94      status->ep_src = src;
95      status->ep_tag = tag;
96
97      message->mpi_message = mpi_message;
98      message->ep_src = src;
99      message->ep_tag = tag;
100        return 0;
101    }
102
103   
104    Message_Check(comm);
105
106    #pragma omp flush
107
108    #pragma omp critical (_query)
109    if(! comm.ep_comm_ptr->message_queue->empty())
110    {
111      for(Message_list::iterator it = comm.ep_comm_ptr->message_queue->begin(); it!= comm.ep_comm_ptr->message_queue->end(); ++it)
112      {
113        bool src_matched = src<0? true: it->ep_src == src;
114        bool tag_matched = tag<0? true: it->ep_tag == tag;
115       
116        if(src_matched && tag_matched)
117        {
118          *flag = true;
119
120          ::MPI_Status mpi_status;
121          mpi_status = *(static_cast< ::MPI_Status *>(it->mpi_status));
122
123          status->mpi_status = new ::MPI_Status(mpi_status);
124          status->ep_src = it->ep_src;
125          status->ep_tag = it->ep_tag;
126
127          message->mpi_message = it->mpi_message;
128          message->ep_tag = it->ep_tag;
129          message->ep_src = it->ep_src;
130
131          #pragma omp critical (_query2)
132          {             
133            //printf("local message erased. src = %d, dest = %d, tag = %d\n", it->ep_src, it->ep_dest, it->ep_tag);     
134            delete it->mpi_status;
135            comm.ep_comm_ptr->message_queue->erase(it);
136            #pragma omp flush
137          }
138
139          break;
140        }
141
142      }
143    }
144  }
145
146}
147
Note: See TracBrowser for help on using the repository browser.