source: XIOS/dev/branch_openmp/extern/ep_dev/ep_probe.cpp @ 1522

Last change on this file since 1522 was 1522, checked in by yushan, 6 years ago

Test_client 6*8 clients 2 servers OK. TO DO : intercomm->intracomm, comm_free_intercomm

File size: 4.5 KB
RevLine 
[1381]1#include "ep_lib.hpp"
2#include <mpi.h>
3#include "ep_declaration.hpp"
4#include "ep_mpi.hpp"
5
6namespace ep_lib
7{
[1522]8  int MPI_Iprobe_mpi(int src, int tag, MPI_Comm comm, int *flag, MPI_Status *status)
9  {
10    status->ep_src = src;
11    status->ep_tag = tag;
12    return ::MPI_Iprobe(src<0? MPI_ANY_SOURCE : src, tag<0? MPI_ANY_TAG: tag, to_mpi_comm(comm->mpi_comm), flag, to_mpi_status_ptr(*status));
13  }
[1381]14
15  int MPI_Iprobe(int src, int tag, MPI_Comm comm, int *flag, MPI_Status *status)
16  {
[1500]17    if(!comm->is_ep)
[1381]18    {
[1522]19      Debug("MPI_Iprobe with MPI\n");
20      return MPI_Iprobe_mpi(src, tag, comm, flag, status);
[1381]21    }
[1522]22   
23    else
24    {
25      Debug("MPI_Iprobe with EP\n");
26     
27      *flag = false;
28   
29      Message_Check(comm);
[1381]30
[1522]31      #pragma omp flush
[1381]32
[1522]33      #pragma omp critical (_query)
[1500]34      for(Message_list::iterator it = comm->ep_comm_ptr->message_queue->begin(); it!= comm->ep_comm_ptr->message_queue->end(); ++it)
[1381]35      {
[1500]36        bool src_matched = src<0? true: (*it)->ep_src == src;
37        bool tag_matched = tag<0? true: (*it)->ep_tag == tag;
[1381]38       
39        if(src_matched && tag_matched)       
40        {
41          Debug("find message\n");
[1522]42         
[1381]43
[1522]44          status->mpi_status = new ::MPI_Status(*static_cast< ::MPI_Status*>((*it)->mpi_status));
[1500]45          status->ep_src = (*it)->ep_src;
46          status->ep_tag = (*it)->ep_tag;
[1381]47
[1522]48          *flag = true;
[1381]49          break;
50        }
51      }
52    }
53  }
54
55
56
57  int MPI_Improbe(int src, int tag, MPI_Comm comm, int *flag, MPI_Message *message, MPI_Status *status)
58  {
[1503]59    int ep_rank_loc = comm->ep_comm_ptr->size_rank_info[1].first;
60    int mpi_rank    = comm->ep_comm_ptr->size_rank_info[2].first;
[1381]61    *flag = false;
[1500]62    if(!comm->is_ep)
[1381]63    {
64      Debug("calling MPI_Improbe MPI\n");
65
66      ::MPI_Status mpi_status;
67      ::MPI_Message mpi_message;
68
69      #ifdef _openmpi
70      #pragma omp critical (_mpi_call)
71      {
[1500]72        ::MPI_Iprobe(src<0? MPI_ANY_SOURCE : src, tag<0? MPI_ANY_TAG: tag, to_mpi_comm(comm->mpi_comm), flag, &mpi_status);
[1381]73        if(*flag)
74        {
[1500]75          ::MPI_Mprobe(src<0? MPI_ANY_SOURCE : src, tag<0? MPI_ANY_TAG: tag, to_mpi_comm(comm->mpi_comm), &mpi_message, &mpi_status);
[1381]76        }
77      }
78      #elif _intelmpi
[1500]79        ::MPI_Improbe(src<0? MPI_ANY_SOURCE : src, tag<0? MPI_ANY_TAG: tag, to_mpi_comm(comm->mpi_comm), flag, &mpi_message, &mpi_status);
[1381]80      #endif
81       
[1500]82      status->mpi_status = &mpi_status;
[1381]83      status->ep_src = src;
84      status->ep_tag = tag;
85
[1500]86      (*message)->mpi_message = &message;
87      (*message)->ep_src = src;
88      (*message)->ep_tag = tag;
89     
90     
[1381]91      return 0;
92    }
93
[1500]94   
[1381]95
96    #pragma omp flush
97
98    #pragma omp critical (_query)
[1500]99    if(! comm->ep_comm_ptr->message_queue->empty())
[1381]100    {
[1500]101      for(Message_list::iterator it = comm->ep_comm_ptr->message_queue->begin(); it!= comm->ep_comm_ptr->message_queue->end(); ++it)
[1381]102      {
[1500]103                                         
104        bool src_matched = src<0? true: (*it)->ep_src == src;
105        bool tag_matched = tag<0? true: (*it)->ep_tag == tag;
[1381]106       
107        if(src_matched && tag_matched)
108        {
109          *flag = true;
110
[1503]111          status->mpi_status = new ::MPI_Status(*static_cast< ::MPI_Status*>((*it)->mpi_status));
112          memcheck("new "<< status->mpi_status << " : in ep_lib::MPI_Improbe, status->mpi_status = new ::MPI_Status");
[1500]113          status->ep_src = (*it)->ep_src;
114          status->ep_tag = (*it)->ep_tag;
[1381]115
[1503]116          (*message)->mpi_message = new ::MPI_Message(*static_cast< ::MPI_Message*>((*it)->mpi_message));
117          memcheck("new "<< (*message)->mpi_message <<" : in ep_lib::MPI_Improbe, (*message)->mpi_message = new ::MPI_Message");
[1500]118          (*message)->ep_src = (*it)->ep_src;
119          (*message)->ep_tag = (*it)->ep_tag;
120                                     
[1381]121
122          #pragma omp critical (_query2)
123          {             
[1503]124            memcheck("delete "<< (*it)->mpi_message <<" : in ep_lib::Message_Check, delete (*it)->mpi_message");
125            memcheck("delete "<< (*it)->mpi_status <<" : in ep_lib::Message_Check, delete (*it)->mpi_status");
126            memcheck("delete "<< (*it) <<" : in ep_lib::Message_Check, delete (*it)");
[1500]127           
[1503]128           
[1500]129            delete (*it)->mpi_message;     
130            delete (*it)->mpi_status; 
131            delete *it;
132           
[1503]133                       
[1500]134            comm->ep_comm_ptr->message_queue->erase(it);
[1503]135            memcheck("message_queue["<<mpi_rank<<","<<ep_rank_loc<<"]->size = "<<comm->ep_comm_ptr->message_queue->size());
[1381]136            #pragma omp flush
137          }
[1500]138         
[1381]139          break;
140        }
141
142      }
143    }
144  }
145
146}
147
Note: See TracBrowser for help on using the repository browser.