source: XIOS3/trunk/src/workflow_graph.cpp @ 2628

Last change on this file since 2628 was 2193, checked in by yushan, 3 years ago

workflow graph : enable unary and binary arithmetic filters

File size: 13.5 KB
Line 
1#include "workflow_graph.hpp"
2#include "cxios.hpp"
3
4namespace xios
5{
6
7  std::vector<graph_node_object> *CWorkflowGraph::vectorOfNodes_ = 0;
8  std::vector<graph_edge_object> *CWorkflowGraph::vectorOfEdges_ = 0;
9  std::vector<StdString> *CWorkflowGraph::vectorOfContexts_ = 0;
10
11  std::vector<graph_node_object> *CWorkflowGraph::vectorOfNodes_srv_ = 0;
12  std::vector<graph_edge_object> *CWorkflowGraph::vectorOfEdges_srv_ = 0;
13  std::vector<StdString> *CWorkflowGraph::vectorOfContexts_srv_ = 0;
14
15  bool CWorkflowGraph::clientGraphBuilt = false;
16  bool CWorkflowGraph::serverGraphBuilt = false;
17  bool CWorkflowGraph::build_begin = false;
18
19  std::unordered_map <size_t, int> *CWorkflowGraph::mapHashFilterID_ = 0;
20  std::unordered_map <size_t, int> *CWorkflowGraph::mapHashFilterID_srv_ = 0;
21
22
23  CWorkflowGraph::CWorkflowGraph()
24  { }
25
26
27//******************************************************
28
29  void CWorkflowGraph::outputWorkflowGraph_client_stdout()
30  {
31    std::cout<<"\n\nbuild workflow graph ..."<<std::endl;
32    for(int i=0; i<vectorOfNodes_->size(); i++)
33    {
34      std::cout<<"Node["<<i<<"] is "<<(*vectorOfNodes_)[i].filter_name<<std::endl;
35      info(100)<<"Node["<<i<<"] is "<<(*vectorOfNodes_)[i].filter_name<<std::endl;
36    }
37 
38    for(int i=0; i<vectorOfEdges_->size(); i++)
39    {
40      std::cout<<"Edge["<<i<<"] from "<<(*vectorOfEdges_)[i].from<<" to "<<(*vectorOfEdges_)[i].to<<std::endl;
41      info(100)<<"Edge["<<i<<"] from "<<(*vectorOfEdges_)[i].from<<" to "<<(*vectorOfEdges_)[i].to<<std::endl;
42    }
43    std::cout<<"\nend workflow graph ...\n\n"<<std::endl;
44  }
45 
46  void CWorkflowGraph::outputWorkflowGraph_server_stdout()
47  {
48    std::cout<<"\n\nServer side : build workflow graph ..."<<std::endl;
49    for(int i=0; i<vectorOfNodes_srv_->size(); i++)
50    {
51      info(100)<<"Node["<<i<<"] is "<<(*vectorOfNodes_srv_)[i].filter_name<<std::endl;
52    }
53 
54    for(int i=0; i<vectorOfEdges_srv_->size(); i++)
55    {
56      info(100)<<"Edge["<<i<<"] from "<<(*vectorOfEdges_srv_)[i].from<<" to "<<(*vectorOfEdges_srv_)[i].to<<std::endl;
57    }
58    std::cout<<"\nend workflow graph ...\n\n"<<std::endl;
59  }
60
61
62  void CWorkflowGraph::drawWorkFlowGraph_client()
63  TRY
64  {
65    if(vectorOfNodes_ && vectorOfEdges_) 
66    {
67      outputWorkflowGraph_client();
68    }
69    else info(100)<<"Client side : no graph information"<<std::endl;
70  }
71  CATCH
72 
73
74  void CWorkflowGraph::drawWorkFlowGraph_server()
75  TRY
76  {
77    if(vectorOfNodes_srv_ && vectorOfEdges_srv_) 
78    {
79      outputWorkflowGraph_server();
80    }
81    else info(100)<<"Server side : no graph information"<<std::endl;
82  }
83  CATCH
84 
85  void CWorkflowGraph::addEdge(int from, int to, CDataPacketPtr packet)
86  TRY
87  {
88    if(from<0) return;
89
90    if(CXios::isClient)
91    {
92      // if(vectorOfEdges_&&vectorOfNodes_) outputWorkflowGraph_client_stdout();
93      // std::cout<<"Trying to add an edge from "<<from<<" to "<<to<<std::endl;
94      if(!vectorOfEdges_) vectorOfEdges_ = new std::vector<graph_edge_object>;
95      std::string currentContextId = CContext::getCurrent()->getId();
96     
97      graph_edge_object edge_obj;   
98      edge_obj.from = from;
99      edge_obj.to = to;
100      edge_obj.date = packet->date;
101      edge_obj.timestamp = packet->timestamp;
102      edge_obj.field = packet->graphPackage->currentField;
103      edge_obj.show = true;
104     
105      if(vectorOfNodes_->at(from).filter_class == 2) // from pass through filter
106      {
107        edge_obj.label_info = vectorOfNodes_->at(from).label_field_id;
108      }
109     
110      if(vectorOfNodes_->at(to).filter_class == 3) // to temporal filter
111      {
112        vectorOfNodes_->at(to).expected_entry_nb++;
113      }
114
115      for(int i=0; i<vectorOfContexts_->size(); i++)
116      {
117        if(vectorOfContexts_->at(i) == currentContextId)
118        {
119          edge_obj.context = i;     
120          edge_obj.context_id = currentContextId;     
121          break;
122        }
123      } 
124      edge_obj.attributes = packet->graphPackage->currentField->recordXiosAttributes();
125     
126      vectorOfEdges_->push_back(edge_obj);
127      //info(100)<<"****************** Add Edge from "<<from<<" to "<<to<<std::endl;
128      vectorOfNodes_->at(from).filter_filled = true;
129    }
130    else
131    {
132      if(!vectorOfEdges_srv_) vectorOfEdges_srv_ = new std::vector<graph_edge_object>;
133      std::string currentContextId = CContext::getCurrent()->getId();
134     
135      graph_edge_object edge_obj;   
136      edge_obj.from = from;
137      edge_obj.to = to;
138      edge_obj.date = packet->date;
139      edge_obj.timestamp = packet->timestamp;
140      edge_obj.field = packet->graphPackage->currentField;
141      edge_obj.show = true;
142      for(int i=0; i<vectorOfContexts_srv_->size(); i++)
143      {
144        if(vectorOfContexts_srv_->at(i) == currentContextId)
145        {
146          edge_obj.context = i;     
147          edge_obj.context_id = currentContextId;     
148          break;
149        }
150      }
151      edge_obj.attributes = packet->graphPackage->currentField->recordXiosAttributes();
152     
153      vectorOfEdges_srv_->push_back(edge_obj);
154      //info(100)<<"****************** Server side : Add Edge from "<<from<<" to "<<to<<std::endl;
155      vectorOfNodes_srv_->at(from).filter_filled = true;
156
157    }
158  }
159  CATCH
160 
161
162  void CWorkflowGraph::addNode(StdString filterName, int filterClass, bool filterFilled, int entryNb, CDataPacketPtr packet)
163  TRY
164  {
165    if(CXios::isClient)
166    {
167      //if(vectorOfEdges_&&vectorOfNodes_) outputWorkflowGraph_client_stdout();
168      // std::cout<<"Trying to add a node naming "<<filterName<<std::endl;
169      if(!vectorOfNodes_) vectorOfNodes_ = new std::vector<graph_node_object>;
170      if(!vectorOfContexts_) vectorOfContexts_ = new std::vector<StdString>;
171      if(!mapHashFilterID_) mapHashFilterID_ = new std::unordered_map <size_t, int>;
172
173      std::string currentContextId = CContext::getCurrent()->getId();
174      if ( std::find(vectorOfContexts_->begin(), vectorOfContexts_->end(), currentContextId) == vectorOfContexts_->end() )
175         vectorOfContexts_->push_back(currentContextId);
176     
177     
178      graph_node_object node_obj;   
179      node_obj.filter_name = filterName;
180      node_obj.filter_class = filterClass;
181      node_obj.filter_filled = filterFilled;
182      node_obj.expected_entry_nb = entryNb;
183      node_obj.date = packet->date;
184      node_obj.timestamp = packet->timestamp;
185     
186      for(int i=0; i<vectorOfContexts_->size(); i++)
187      {
188        if(vectorOfContexts_->at(i) == currentContextId)
189        {
190          node_obj.context = i;     
191          node_obj.context_id = currentContextId;     
192          break;
193        }
194      }   
195     
196      node_obj.attributes = packet->graphPackage->currentField->recordXiosAttributes();
197     
198      vectorOfNodes_->push_back(node_obj);
199    }
200    else
201    { 
202      if(!vectorOfNodes_srv_) vectorOfNodes_srv_ = new std::vector<graph_node_object>;
203      if(!vectorOfContexts_srv_) vectorOfContexts_srv_ = new std::vector<StdString>;
204      if(!mapHashFilterID_srv_) mapHashFilterID_srv_ = new std::unordered_map <size_t, int>;
205
206      std::string currentContextId = CContext::getCurrent()->getId();
207      if ( std::find(vectorOfContexts_srv_->begin(), vectorOfContexts_srv_->end(), currentContextId) == vectorOfContexts_srv_->end() )
208         vectorOfContexts_srv_->push_back(currentContextId);
209     
210      graph_node_object node_obj;   
211      node_obj.filter_name = filterName;
212      node_obj.filter_class = filterClass;
213      node_obj.filter_filled = filterFilled;
214      node_obj.expected_entry_nb = entryNb;
215      node_obj.date = packet->date;
216      node_obj.timestamp = packet->timestamp;
217      for(int i=0; i<vectorOfContexts_srv_->size(); i++)
218      {
219        if(vectorOfContexts_srv_->at(i) == currentContextId)
220        {
221          node_obj.context = i;     
222          node_obj.context_id = currentContextId;     
223          break;
224        }
225      } 
226      node_obj.attributes = packet->graphPackage->currentField->recordXiosAttributes();
227
228      vectorOfNodes_srv_->push_back(node_obj);
229    }
230
231  }
232  CATCH
233
234  int CWorkflowGraph::getNodeSize()
235  TRY
236  {
237    if(CXios::isClient)
238    {
239      return !vectorOfNodes_? 0 : vectorOfNodes_->size();
240    }
241    else
242    {
243      return !vectorOfNodes_srv_? 0 : vectorOfNodes_srv_->size();
244    }
245  }
246  CATCH
247
248
249
250  void CWorkflowGraph::outputWorkflowGraph_client()
251  {
252    int graph_rank;
253    MPI_Comm_rank(MPI_COMM_WORLD, &graph_rank);
254    std::ofstream *outfiles;
255
256    outfiles = new std::ofstream[vectorOfContexts_->size()];
257
258    for(int ctx=0; ctx<vectorOfContexts_->size(); ctx++)
259    {
260      StdString graphFileName="graph_data_"+vectorOfContexts_->at(ctx)+"_client_"+to_string(graph_rank)+".json";
261      outfiles[ctx].open(graphFileName); 
262   
263      outfiles[ctx] << "{\"nodes\":["<<std::endl;
264    }
265    for(int i=0; i<vectorOfNodes_->size(); i++)
266    {
267      int ctx = vectorOfNodes_->at(i).context;
268      if(i!=0) outfiles[ctx] << ",";
269      outfiles[ctx] << "{\"id\":"<<i<<","<<std::endl;
270      outfiles[ctx] << "\"label\":"<<"\""<<vectorOfNodes_->at(i).filter_name<<"\","<<std::endl;
271      outfiles[ctx] << "\"class\":"<<vectorOfNodes_->at(i).filter_class<<","<<std::endl;
272      outfiles[ctx] << "\"filled\":"<<!(vectorOfNodes_->at(i).filter_filled)<<","<<std::endl;
273      outfiles[ctx] << "\"context\":"<<"\""<<vectorOfNodes_->at(i).context_id<<"\","<<std::endl;
274      outfiles[ctx] << "\"entry\":"<<"\""<<vectorOfNodes_->at(i).expected_entry_nb<<"\","<<std::endl;
275      outfiles[ctx] << "\"attributes\":"<<"\""<<vectorOfNodes_->at(i).attributes<<"\","<<std::endl;
276      outfiles[ctx] << "\"type\":"<<"\"\"}"<<std::endl;
277    } 
278    for(int ctx=0; ctx<vectorOfContexts_->size(); ctx++)
279    {
280      outfiles[ctx] << std::endl<<"],"<<std::endl<<"\"edges\" : ["<<std::endl;
281    }
282    for(int i=0; i<vectorOfEdges_->size(); i++)
283    {
284      int ctx = vectorOfEdges_->at(i).context;
285      if(i!=0) outfiles[ctx] << ",";
286      outfiles[ctx] << "{\"id\":"<<i<<","<<std::endl;
287      outfiles[ctx] << "\"from\":"<<vectorOfEdges_->at(i).from<<","<<std::endl;
288      outfiles[ctx] << "\"to\":"<<vectorOfEdges_->at(i).to<<","<<std::endl;
289      if(vectorOfEdges_->at(i).label_info != "none")
290      {
291        if(vectorOfEdges_->at(i).show) outfiles[ctx] << "\"label\":"<<"\""<<vectorOfEdges_->at(i).label_info<<"\\n"<<vectorOfEdges_->at(i).date<<"\","<<std::endl;
292        else outfiles[ctx] << "\"label\":"<<"\"\\n"<<vectorOfEdges_->at(i).date<<"\","<<std::endl;
293      }
294      else
295      {
296        if(vectorOfEdges_->at(i).show) outfiles[ctx] << "\"label\":"<<"\""<<vectorOfEdges_->at(i).field->getId()<<"\\n"<<vectorOfEdges_->at(i).date<<"\","<<std::endl;
297        else outfiles[ctx] << "\"label\":"<<"\"\\n"<<vectorOfEdges_->at(i).date<<"\","<<std::endl;
298      }
299      outfiles[ctx] << "\"context\":"<<"\""<<vectorOfEdges_->at(i).context_id<<"\","<<std::endl;
300      outfiles[ctx] << "\"attributes\":"<<"\""<<vectorOfEdges_->at(i).attributes<<"\","<<std::endl;
301      outfiles[ctx] << "\"date\":"<<"\""<<vectorOfEdges_->at(i).date<<"\"}"<<std::endl;
302    }
303    for(int ctx=0; ctx<vectorOfContexts_->size(); ctx++)
304    {
305      outfiles[ctx] << std::endl<<"]}"<<std::endl;
306      outfiles[ctx].close();
307    }
308  }
309 
310
311  void CWorkflowGraph::outputWorkflowGraph_server()
312  {
313    int graph_rank;
314    MPI_Comm_rank(MPI_COMM_WORLD, &graph_rank);
315    std::ofstream *outfiles;
316
317    outfiles = new std::ofstream[vectorOfContexts_srv_->size()];
318
319    for(int ctx=0; ctx<vectorOfContexts_srv_->size(); ctx++)
320    {
321      StdString graphFileName="graph_data_"+vectorOfContexts_srv_->at(ctx)+"_client_"+to_string(graph_rank)+".json";
322      outfiles[ctx].open(graphFileName); 
323   
324      outfiles[ctx] << "{\"nodes\":["<<std::endl;
325    }
326    for(int i=0; i<vectorOfNodes_srv_->size(); i++)
327    {
328      int ctx = vectorOfNodes_srv_->at(i).context;
329      if(i!=0) outfiles[ctx] << ",";
330      outfiles[ctx] << "{\"id\":"<<i<<","<<std::endl;
331      outfiles[ctx] << "\"label\":"<<"\""<<vectorOfNodes_srv_->at(i).filter_name<<"\","<<std::endl;
332      outfiles[ctx] << "\"class\":"<<vectorOfNodes_srv_->at(i).filter_class<<","<<std::endl;
333      outfiles[ctx] << "\"filled\":"<<!(vectorOfNodes_srv_->at(i).filter_filled)<<","<<std::endl;
334      outfiles[ctx] << "\"context\":"<<"\""<<vectorOfNodes_srv_->at(i).context_id<<"\","<<std::endl;
335      outfiles[ctx] << "\"entry\":"<<"\""<<vectorOfNodes_srv_->at(i).expected_entry_nb<<"\","<<std::endl;
336      outfiles[ctx] << "\"attributes\":"<<"\""<<vectorOfNodes_srv_->at(i).attributes<<"\","<<std::endl;
337      outfiles[ctx] << "\"type\":"<<"\"\"}"<<std::endl;
338    } 
339    for(int ctx=0; ctx<vectorOfContexts_srv_->size(); ctx++)
340    {
341      outfiles[ctx] << std::endl<<"],"<<std::endl<<"\"edges\" : ["<<std::endl;
342    }
343    for(int i=0; i<vectorOfEdges_srv_->size(); i++)
344    {
345      int ctx = vectorOfEdges_srv_->at(i).context;
346      if(i!=0) outfiles[ctx] << ",";
347      outfiles[ctx] << "{\"id\":"<<i<<","<<std::endl;
348      outfiles[ctx] << "\"from\":"<<vectorOfEdges_srv_->at(i).from<<","<<std::endl;
349      outfiles[ctx] << "\"to\":"<<vectorOfEdges_srv_->at(i).to<<","<<std::endl;
350      if(vectorOfEdges_srv_->at(i).show) outfiles[ctx] << "\"label\":"<<"\""<<vectorOfEdges_srv_->at(i).field->getId()<<"\\n"<<vectorOfEdges_srv_->at(i).date<<"\","<<std::endl;
351      else                               outfiles[ctx] << "\"label\":"<<"\"\\n"<<vectorOfEdges_srv_->at(i).date<<"\","<<std::endl;
352      outfiles[ctx] << "\"context\":"<<"\""<<vectorOfEdges_srv_->at(i).context_id<<"\","<<std::endl;
353      outfiles[ctx] << "\"attributes\":"<<"\""<<vectorOfEdges_srv_->at(i).attributes<<"\","<<std::endl;
354      outfiles[ctx] << "\"date\":"<<"\""<<vectorOfEdges_srv_->at(i).date<<"\"}"<<std::endl;
355    }
356    for(int ctx=0; ctx<vectorOfContexts_srv_->size(); ctx++)
357    {
358      outfiles[ctx] << std::endl<<"]}"<<std::endl;
359      outfiles[ctx].close();
360    }
361  }
362}
363
Note: See TracBrowser for help on using the repository browser.