source: XIOS/trunk/src/node/context.cpp @ 729

Last change on this file since 729 was 729, checked in by rlacroix, 9 years ago

Ensure all needed buffers are properly initialized with the minimum buffer size.

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
File size: 36.6 KB
Line 
1#include "context.hpp"
2#include "attribute_template.hpp"
3#include "object_template.hpp"
4#include "group_template.hpp"
5
6#include "calendar_type.hpp"
7#include "duration.hpp"
8
9#include "context_client.hpp"
10#include "context_server.hpp"
11#include "nc4_data_output.hpp"
12#include "node_type.hpp"
13#include "message.hpp"
14#include "type.hpp"
15#include "xios_spl.hpp"
16
17
18namespace xios {
19
20  shared_ptr<CContextGroup> CContext::root;
21
22   /// ////////////////////// Définitions ////////////////////// ///
23
24   CContext::CContext(void)
25      : CObjectTemplate<CContext>(), CContextAttributes()
26      , calendar(), hasClient(false), hasServer(false), isPostProcessed(false), finalized(false)
27      , dataSize_(), idServer_(), client(0), server(0)
28   { /* Ne rien faire de plus */ }
29
30   CContext::CContext(const StdString & id)
31      : CObjectTemplate<CContext>(id), CContextAttributes()
32      , calendar(), hasClient(false), hasServer(false), isPostProcessed(false), finalized(false)
33      , dataSize_(), idServer_(), client(0), server(0)
34   { /* Ne rien faire de plus */ }
35
36   CContext::~CContext(void)
37   {
38     delete client;
39     delete server;
40   }
41
42   //----------------------------------------------------------------
43   //! Get name of context
44   StdString CContext::GetName(void)   { return (StdString("context")); }
45   StdString CContext::GetDefName(void){ return (CContext::GetName()); }
46   ENodeType CContext::GetType(void)   { return (eContext); }
47
48   //----------------------------------------------------------------
49
50   /*!
51   \brief Get context group (context root)
52   \return Context root
53   */
54   CContextGroup* CContext::getRoot(void)
55   {
56      if (root.get()==NULL) root=shared_ptr<CContextGroup>(new CContextGroup(xml::CXMLNode::GetRootName()));
57      return root.get();
58   }
59
60   //----------------------------------------------------------------
61
62   /*!
63   \brief Get calendar of a context
64   \return Calendar
65   */
66   boost::shared_ptr<CCalendar> CContext::getCalendar(void) const
67   {
68      return (this->calendar);
69   }
70
71   //----------------------------------------------------------------
72
73   /*!
74   \brief Set a context with a calendar
75   \param[in] newCalendar new calendar
76   */
77   void CContext::setCalendar(boost::shared_ptr<CCalendar> newCalendar)
78   {
79      this->calendar = newCalendar;
80   }
81
82   //----------------------------------------------------------------
83   /*!
84   \brief Parse xml file and write information into context object
85   \param [in] node xmld node corresponding in xml file
86   */
87   void CContext::parse(xml::CXMLNode & node)
88   {
89      CContext::SuperClass::parse(node);
90
91      // PARSING POUR GESTION DES ENFANTS
92      xml::THashAttributes attributes = node.getAttributes();
93
94      if (attributes.end() != attributes.find("src"))
95      {
96         StdIFStream ifs ( attributes["src"].c_str() , StdIFStream::in );
97         if ( (ifs.rdstate() & std::ifstream::failbit ) != 0 )
98            ERROR("void CContext::parse(xml::CXMLNode & node)",
99                  <<endl<< "Can not open <"<<attributes["src"].c_str()<<"> file" );
100         if (!ifs.good())
101            ERROR("CContext::parse(xml::CXMLNode & node)",
102                  << "[ filename = " << attributes["src"] << " ] Bad xml stream !");
103         xml::CXMLParser::ParseInclude(ifs, attributes["src"], *this);
104      }
105
106      if (node.getElementName().compare(CContext::GetName()))
107         DEBUG("Le noeud is wrong defined but will be considered as a context !");
108
109      if (!(node.goToChildElement()))
110      {
111         DEBUG("Le context ne contient pas d'enfant !");
112      }
113      else
114      {
115         do { // Parcours des contextes pour traitement.
116
117            StdString name = node.getElementName();
118            attributes.clear();
119            attributes = node.getAttributes();
120
121            if (attributes.end() != attributes.find("id"))
122            {
123              DEBUG(<< "Definition node has an id,"
124                    << "it will not be taking account !");
125            }
126
127#define DECLARE_NODE(Name_, name_)    \
128   if (name.compare(C##Name_##Definition::GetDefName()) == 0) \
129   { C##Name_##Definition::create(C##Name_##Definition::GetDefName()) -> parse(node); continue; }
130#define DECLARE_NODE_PAR(Name_, name_)
131#include "node_type.conf"
132
133            DEBUG(<< "The element \'"     << name
134                  << "\' in the context \'" << CContext::getCurrent()->getId()
135                  << "\' is not a definition !");
136
137         } while (node.goToNextElement());
138
139         node.goToParentElement(); // Retour au parent
140      }
141   }
142
143   //----------------------------------------------------------------
144   //! Show tree structure of context
145   void CContext::ShowTree(StdOStream & out)
146   {
147      StdString currentContextId = CContext::getCurrent() -> getId();
148      std::vector<CContext*> def_vector =
149         CContext::getRoot()->getChildList();
150      std::vector<CContext*>::iterator
151         it = def_vector.begin(), end = def_vector.end();
152
153      out << "<? xml version=\"1.0\" ?>" << std::endl;
154      out << "<"  << xml::CXMLNode::GetRootName() << " >" << std::endl;
155
156      for (; it != end; it++)
157      {
158         CContext* context = *it;
159         CContext::setCurrent(context->getId());
160         out << *context << std::endl;
161      }
162
163      out << "</" << xml::CXMLNode::GetRootName() << " >" << std::endl;
164      CContext::setCurrent(currentContextId);
165   }
166
167
168   //----------------------------------------------------------------
169
170   //! Convert context object into string (to print)
171   StdString CContext::toString(void) const
172   {
173      StdOStringStream oss;
174      oss << "<" << CContext::GetName()
175          << " id=\"" << this->getId() << "\" "
176          << SuperClassAttribute::toString() << ">" << std::endl;
177      if (!this->hasChild())
178      {
179         //oss << "<!-- No definition -->" << std::endl; // fait planter l'incrémentation
180      }
181      else
182      {
183
184#define DECLARE_NODE(Name_, name_)    \
185   if (C##Name_##Definition::has(C##Name_##Definition::GetDefName())) \
186   oss << * C##Name_##Definition::get(C##Name_##Definition::GetDefName()) << std::endl;
187#define DECLARE_NODE_PAR(Name_, name_)
188#include "node_type.conf"
189
190      }
191
192      oss << "</" << CContext::GetName() << " >";
193
194      return (oss.str());
195   }
196
197   //----------------------------------------------------------------
198
199   /*!
200   \brief Find all inheritace among objects in a context.
201   \param [in] apply (true) write attributes of parent into ones of child if they are empty
202                     (false) write attributes of parent into a new container of child
203   \param [in] parent unused
204   */
205   void CContext::solveDescInheritance(bool apply, const CAttributeMap * const UNUSED(parent))
206   {
207#define DECLARE_NODE(Name_, name_)    \
208   if (C##Name_##Definition::has(C##Name_##Definition::GetDefName())) \
209     C##Name_##Definition::get(C##Name_##Definition::GetDefName())->solveDescInheritance(apply);
210#define DECLARE_NODE_PAR(Name_, name_)
211#include "node_type.conf"
212   }
213
214   //----------------------------------------------------------------
215
216   //! Verify if all root definition in the context have child.
217   bool CContext::hasChild(void) const
218   {
219      return (
220#define DECLARE_NODE(Name_, name_)    \
221   C##Name_##Definition::has(C##Name_##Definition::GetDefName())   ||
222#define DECLARE_NODE_PAR(Name_, name_)
223#include "node_type.conf"
224      false);
225}
226
227   //----------------------------------------------------------------
228
229   void CContext::CleanTree(void)
230   {
231#define DECLARE_NODE(Name_, name_) C##Name_##Definition::ClearAllAttributes();
232#define DECLARE_NODE_PAR(Name_, name_)
233#include "node_type.conf"
234   }
235   ///---------------------------------------------------------------
236
237   //! Initialize client side
238   void CContext::initClient(MPI_Comm intraComm, MPI_Comm interComm, CContext* cxtServer /*= 0*/)
239   {
240     hasClient=true;
241     client = new CContextClient(this,intraComm, interComm, cxtServer);
242     registryIn=new CRegistry(intraComm);
243     registryIn->setPath(getId()) ;
244     if (client->clientRank==0) registryIn->fromFile("xios_registry.bin") ;
245     registryIn->bcastRegistry() ;
246
247     registryOut=new CRegistry(intraComm) ;
248     registryOut->setPath(getId()) ;
249
250     MPI_Comm intraCommServer, interCommServer;
251     if (cxtServer) // Attached mode
252     {
253       intraCommServer = intraComm;
254       interCommServer = interComm;
255     }
256     else
257     {
258       MPI_Comm_dup(intraComm, &intraCommServer);
259       comms.push_back(intraCommServer);
260       MPI_Comm_dup(interComm, &interCommServer);
261       comms.push_back(interCommServer);
262     }
263     server = new CContextServer(this,intraCommServer,interCommServer);
264   }
265
266   void CContext::setClientServerBuffer()
267   {
268     size_t bufferSizeMin = CXios::minBufferSize;
269#define DECLARE_NODE(Name_, name_)    \
270     if (bufferSizeMin < sizeof(C##Name_##Definition)) bufferSizeMin = sizeof(C##Name_##Definition);
271#define DECLARE_NODE_PAR(Name_, name_)
272#include "node_type.conf"
273#undef DECLARE_NODE
274#undef DECLARE_NODE_PAR
275
276     std::map<int, StdSize> bufferSize = getDataSize();
277     std::map<int, StdSize>::iterator it  = bufferSize.begin(),
278                                      ite = bufferSize.end();
279     for (; it != ite; ++it)
280       if (it->second < bufferSizeMin) it->second = bufferSizeMin;
281
282     if (client->isServerLeader())
283     {
284       const std::list<int>& ranks = client->getRanksServerLeader();
285       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
286         if (!bufferSize.count(*itRank)) bufferSize[*itRank] = bufferSizeMin;
287     }
288
289     if (!bufferSize.empty())
290       client->setBufferSize(bufferSize);
291   }
292
293   //! Verify whether a context is initialized
294   bool CContext::isInitialized(void)
295   {
296     return hasClient;
297   }
298
299   //! Initialize server
300   void CContext::initServer(MPI_Comm intraComm,MPI_Comm interComm, CContext* cxtClient /*= 0*/)
301   {
302     hasServer=true;
303     server = new CContextServer(this,intraComm,interComm);
304
305     registryIn=new CRegistry(intraComm);
306     registryIn->setPath(getId()) ;
307     if (server->intraCommRank==0) registryIn->fromFile("xios_registry.bin") ;
308     registryIn->bcastRegistry() ;
309     registryOut=new CRegistry(intraComm) ;
310     registryOut->setPath(getId()) ;
311 
312     MPI_Comm intraCommClient, interCommClient;
313     if (cxtClient) // Attached mode
314     {
315       intraCommClient = intraComm;
316       interCommClient = interComm;
317     }
318     else
319     {
320       MPI_Comm_dup(intraComm, &intraCommClient);
321       comms.push_back(intraCommClient);
322       MPI_Comm_dup(interComm, &interCommClient);
323       comms.push_back(interCommClient);
324     }
325     client = new CContextClient(this,intraCommClient,interCommClient, cxtClient);
326   }
327
328   //! Server side: Put server into a loop in order to listen message from client
329   bool CContext::eventLoop(void)
330   {
331     return server->eventLoop();
332   }
333
334   //! Try to send the buffers and receive possible answers
335   bool CContext::checkBuffersAndListen(void)
336   {
337     client->checkBuffers();
338     return server->eventLoop();
339   }
340
341   //! Terminate a context
342   void CContext::finalize(void)
343   {
344      if (!finalized)
345      {
346        finalized = true;
347        if (hasClient) sendRegistry() ;
348        client->finalize();
349        while (!server->hasFinished())
350        {
351          server->eventLoop();
352        }
353
354        if (hasServer)
355        {
356          closeAllFile();
357          registryOut->hierarchicalGatherRegistry() ;
358          if (server->intraCommRank==0) CXios::globalRegistry->mergeRegistry(*registryOut) ;
359        }
360       
361        for (std::list<MPI_Comm>::iterator it = comms.begin(); it != comms.end(); ++it)
362          MPI_Comm_free(&(*it));
363        comms.clear();
364      }
365   }
366
367   /*!
368   \brief Close all the context defintion and do processing data
369      After everything is well defined on client side, they will be processed and sent to server
370   From the version 2.0, sever and client work no more on the same database. Moreover, client(s) will send
371   all necessary information to server, from which each server can build its own database.
372   Because the role of server is to write out field data on a specific netcdf file,
373   the only information that it needs is the enabled files
374   and the active fields (fields will be written onto active files)
375   */
376   void CContext::closeDefinition(void)
377   {
378     // There is nothing client need to send to server
379     if (hasClient)
380     {
381       // After xml is parsed, there are some more works with post processing
382       postProcessing();
383     }
384     setClientServerBuffer();
385
386     if (hasClient && !hasServer)
387     {
388      // Send all attributes of current context to server
389      this->sendAllAttributesToServer();
390
391      // Send all attributes of current calendar
392      CCalendarWrapper::get(CCalendarWrapper::GetDefName())->sendAllAttributesToServer();
393
394      // We have enough information to send to server
395      // First of all, send all enabled files
396       sendEnabledFiles();
397
398      // Then, send all enabled fields
399       sendEnabledFields();
400
401      // At last, we have all info of domain and axis, then send them
402       sendRefDomainsAxis();
403
404      // After that, send all grid (if any)
405       sendRefGrid();
406    }
407
408    // We have a xml tree on the server side and now, it should be also processed
409    if (hasClient && !hasServer) sendPostProcessing();
410
411    // There are some processings that should be done after all of above. For example: check mask or index
412    if (hasClient)
413    {
414      this->buildFilterGraphOfEnabledFields();
415      buildFilterGraphOfFieldsWithReadAccess();
416      this->solveAllRefOfEnabledFields(true);
417    }
418
419    // Now tell server that it can process all messages from client
420    if (hasClient && !hasServer) this->sendCloseDefinition();
421
422    // Nettoyage de l'arborescence
423    if (hasClient && !hasServer) CleanTree(); // Only on client side??
424
425    if (hasClient)
426    {
427      sendCreateFileHeader();
428
429      startPrefetchingOfEnabledReadModeFiles();
430    }
431   }
432
433   void CContext::findAllEnabledFields(void)
434   {
435     for (unsigned int i = 0; i < this->enabledFiles.size(); i++)
436     (void)this->enabledFiles[i]->getEnabledFields();
437   }
438
439   void CContext::solveAllRefOfEnabledFields(bool sendToServer)
440   {
441     int size = this->enabledFiles.size();
442     for (int i = 0; i < size; ++i)
443     {
444       this->enabledFiles[i]->solveAllRefOfEnabledFields(sendToServer);
445     }
446   }
447
448   void CContext::buildFilterGraphOfEnabledFields()
449   {
450     int size = this->enabledFiles.size();
451     for (int i = 0; i < size; ++i)
452     {
453       this->enabledFiles[i]->buildFilterGraphOfEnabledFields(garbageCollector);
454     }
455   }
456
457   void CContext::startPrefetchingOfEnabledReadModeFiles()
458   {
459     int size = enabledReadModeFiles.size();
460     for (int i = 0; i < size; ++i)
461     {
462        enabledReadModeFiles[i]->prefetchEnabledReadModeFields();
463     }
464   }
465
466   void CContext::checkPrefetchingOfEnabledReadModeFiles()
467   {
468     int size = enabledReadModeFiles.size();
469     for (int i = 0; i < size; ++i)
470     {
471        enabledReadModeFiles[i]->prefetchEnabledReadModeFieldsIfNeeded();
472     }
473   }
474
475  void CContext::findFieldsWithReadAccess(void)
476  {
477    fieldsWithReadAccess.clear();
478    const vector<CField*> allFields = CField::getAll();
479    for (size_t i = 0; i < allFields.size(); ++i)
480    {
481      if (allFields[i]->file && !allFields[i]->file->mode.isEmpty() && allFields[i]->file->mode.getValue() == CFile::mode_attr::read)
482        allFields[i]->read_access = true;
483      if (!allFields[i]->read_access.isEmpty() && allFields[i]->read_access.getValue())
484        fieldsWithReadAccess.push_back(allFields[i]);
485    }
486  }
487
488  void CContext::solveAllRefOfFieldsWithReadAccess()
489  {
490    for (size_t i = 0; i < fieldsWithReadAccess.size(); ++i)
491      fieldsWithReadAccess[i]->solveAllReferenceEnabledField(false);
492  }
493
494  void CContext::buildFilterGraphOfFieldsWithReadAccess()
495  {
496    for (size_t i = 0; i < fieldsWithReadAccess.size(); ++i)
497      fieldsWithReadAccess[i]->buildFilterGraph(garbageCollector, true);
498  }
499
500   void CContext::solveAllInheritance(bool apply)
501   {
502     // Résolution des héritages descendants (càd des héritages de groupes)
503     // pour chacun des contextes.
504      solveDescInheritance(apply);
505
506     // Résolution des héritages par référence au niveau des fichiers.
507      const vector<CFile*> allFiles=CFile::getAll();
508      const vector<CGrid*> allGrids= CGrid::getAll();
509
510     //if (hasClient && !hasServer)
511      if (hasClient)
512      {
513        for (unsigned int i = 0; i < allFiles.size(); i++)
514          allFiles[i]->solveFieldRefInheritance(apply);
515      }
516
517      unsigned int vecSize = allGrids.size();
518      unsigned int i = 0;
519      for (i = 0; i < vecSize; ++i)
520        allGrids[i]->solveDomainAxisRefInheritance(apply);
521
522   }
523
524   void CContext::findEnabledFiles(void)
525   {
526      const std::vector<CFile*> allFiles = CFile::getAll();
527
528      for (unsigned int i = 0; i < allFiles.size(); i++)
529         if (!allFiles[i]->enabled.isEmpty()) // Si l'attribut 'enabled' est défini.
530         {
531            if (allFiles[i]->enabled.getValue()) // Si l'attribut 'enabled' est fixé à vrai.
532               enabledFiles.push_back(allFiles[i]);
533         }
534         else enabledFiles.push_back(allFiles[i]); // otherwise true by default
535
536
537      if (enabledFiles.size() == 0)
538         DEBUG(<<"Aucun fichier ne va être sorti dans le contexte nommé \""
539               << getId() << "\" !");
540   }
541
542   void CContext::findEnabledReadModeFiles(void)
543   {
544     int size = this->enabledFiles.size();
545     for (int i = 0; i < size; ++i)
546     {
547       if (!enabledFiles[i]->mode.isEmpty() && enabledFiles[i]->mode.getValue() == CFile::mode_attr::read)
548        enabledReadModeFiles.push_back(enabledFiles[i]);
549     }
550   }
551
552   void CContext::closeAllFile(void)
553   {
554     std::vector<CFile*>::const_iterator
555            it = this->enabledFiles.begin(), end = this->enabledFiles.end();
556
557     for (; it != end; it++)
558     {
559       info(30)<<"Closing File : "<<(*it)->getId()<<endl;
560       (*it)->close();
561     }
562   }
563
564   /*!
565   \brief Dispatch event received from client
566      Whenever a message is received in buffer of server, it will be processed depending on
567   its event type. A new event type should be added in the switch list to make sure
568   it processed on server side.
569   \param [in] event: Received message
570   */
571   bool CContext::dispatchEvent(CEventServer& event)
572   {
573
574      if (SuperClass::dispatchEvent(event)) return true;
575      else
576      {
577        switch(event.type)
578        {
579           case EVENT_ID_CLOSE_DEFINITION :
580             recvCloseDefinition(event);
581             return true;
582             break;
583           case EVENT_ID_UPDATE_CALENDAR:
584             recvUpdateCalendar(event);
585             return true;
586             break;
587           case EVENT_ID_CREATE_FILE_HEADER :
588             recvCreateFileHeader(event);
589             return true;
590             break;
591           case EVENT_ID_POST_PROCESS:
592             recvPostProcessing(event);
593             return true;
594            case EVENT_ID_SEND_REGISTRY:
595             recvRegistry(event);
596             return true;
597            break;
598
599           default :
600             ERROR("bool CContext::dispatchEvent(CEventServer& event)",
601                    <<"Unknown Event");
602           return false;
603         }
604      }
605   }
606
607   //! Client side: Send a message to server to make it close
608   void CContext::sendCloseDefinition(void)
609   {
610     CEventClient event(getType(),EVENT_ID_CLOSE_DEFINITION);
611     if (client->isServerLeader())
612     {
613       CMessage msg;
614       msg<<this->getIdServer();
615       const std::list<int>& ranks = client->getRanksServerLeader();
616       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
617         event.push(*itRank,1,msg);
618       client->sendEvent(event);
619     }
620     else client->sendEvent(event);
621   }
622
623   //! Server side: Receive a message of client announcing a context close
624   void CContext::recvCloseDefinition(CEventServer& event)
625   {
626
627      CBufferIn* buffer=event.subEvents.begin()->buffer;
628      string id;
629      *buffer>>id;
630      get(id)->closeDefinition();
631   }
632
633   //! Client side: Send a message to update calendar in each time step
634   void CContext::sendUpdateCalendar(int step)
635   {
636     if (!hasServer)
637     {
638       CEventClient event(getType(),EVENT_ID_UPDATE_CALENDAR);
639       if (client->isServerLeader())
640       {
641         CMessage msg;
642         msg<<this->getIdServer()<<step;
643         const std::list<int>& ranks = client->getRanksServerLeader();
644         for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
645           event.push(*itRank,1,msg);
646         client->sendEvent(event);
647       }
648       else client->sendEvent(event);
649     }
650   }
651
652   //! Server side: Receive a message of client annoucing calendar update
653   void CContext::recvUpdateCalendar(CEventServer& event)
654   {
655      CBufferIn* buffer=event.subEvents.begin()->buffer;
656      string id;
657      *buffer>>id;
658      get(id)->recvUpdateCalendar(*buffer);
659   }
660
661   //! Server side: Receive a message of client annoucing calendar update
662   void CContext::recvUpdateCalendar(CBufferIn& buffer)
663   {
664      int step;
665      buffer>>step;
666      updateCalendar(step);
667   }
668
669   //! Client side: Send a message to create header part of netcdf file
670   void CContext::sendCreateFileHeader(void)
671   {
672     CEventClient event(getType(),EVENT_ID_CREATE_FILE_HEADER);
673     if (client->isServerLeader())
674     {
675       CMessage msg;
676       msg<<this->getIdServer();
677       const std::list<int>& ranks = client->getRanksServerLeader();
678       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
679         event.push(*itRank,1,msg) ;
680       client->sendEvent(event);
681     }
682     else client->sendEvent(event);
683   }
684
685   //! Server side: Receive a message of client annoucing the creation of header part of netcdf file
686   void CContext::recvCreateFileHeader(CEventServer& event)
687   {
688      CBufferIn* buffer=event.subEvents.begin()->buffer;
689      string id;
690      *buffer>>id;
691      get(id)->recvCreateFileHeader(*buffer);
692   }
693
694   //! Server side: Receive a message of client annoucing the creation of header part of netcdf file
695   void CContext::recvCreateFileHeader(CBufferIn& buffer)
696   {
697      createFileHeader();
698   }
699
700   //! Client side: Send a message to do some post processing on server
701   void CContext::sendPostProcessing()
702   {
703     if (!hasServer)
704     {
705       CEventClient event(getType(),EVENT_ID_POST_PROCESS);
706       if (client->isServerLeader())
707       {
708         CMessage msg;
709         msg<<this->getIdServer();
710         const std::list<int>& ranks = client->getRanksServerLeader();
711         for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
712           event.push(*itRank,1,msg);
713         client->sendEvent(event);
714       }
715       else client->sendEvent(event);
716     }
717   }
718
719   //! Server side: Receive a message to do some post processing
720   void CContext::recvPostProcessing(CEventServer& event)
721   {
722      CBufferIn* buffer=event.subEvents.begin()->buffer;
723      string id;
724      *buffer>>id;
725      get(id)->recvPostProcessing(*buffer);
726   }
727
728   //! Server side: Receive a message to do some post processing
729   void CContext::recvPostProcessing(CBufferIn& buffer)
730   {
731      CCalendarWrapper::get(CCalendarWrapper::GetDefName())->createCalendar();
732      postProcessing();
733   }
734
735   const StdString& CContext::getIdServer()
736   {
737      if (hasClient)
738      {
739        idServer_ = this->getId();
740        idServer_ += "_server";
741        return idServer_;
742      }
743      if (hasServer) return (this->getId());
744   }
745
746   /*!
747   \brief Do some simple post processings after parsing xml file
748      After the xml file (iodef.xml) is parsed, it is necessary to build all relations among
749   created object, e.g: inhertance among fields, domain, axis. After that, all fiels as well as their parents (reference fields),
750   which will be written out into netcdf files, are processed
751   */
752   void CContext::postProcessing()
753   {
754     if (isPostProcessed) return;
755
756      // Make sure the calendar was correctly created
757      if (!calendar)
758        ERROR("CContext::postProcessing()", << "A calendar must be defined for the context \"" << getId() << "!\"")
759      else if (calendar->getTimeStep() == NoneDu)
760        ERROR("CContext::postProcessing()", << "A timestep must be defined for the context \"" << getId() << "!\"")
761      // Calendar first update to set the current date equals to the start date
762      calendar->update(0);
763
764      // Find all inheritance in xml structure
765      this->solveAllInheritance();
766
767      // Check if some axis, domains or grids are eligible to for compressed indexed output.
768      // Warning: This must be done after solving the inheritance and before the rest of post-processing
769      checkAxisDomainsGridsEligibilityForCompressedOutput();
770
771      // Check if some automatic time series should be generated
772      // Warning: This must be done after solving the inheritance and before the rest of post-processing
773      prepareTimeseries();
774
775      //Initialisation du vecteur 'enabledFiles' contenant la liste des fichiers à sortir.
776      this->findEnabledFiles();
777      this->findEnabledReadModeFiles();
778
779      // Find all enabled fields of each file
780      this->findAllEnabledFields();
781
782      // Search and rebuild all reference object of enabled fields
783      this->solveAllRefOfEnabledFields(false);
784
785      // Find all fields with read access from the public API
786      findFieldsWithReadAccess();
787      // and solve the all reference for them
788      solveAllRefOfFieldsWithReadAccess();
789
790      isPostProcessed = true;
791   }
792
793   std::map<int, StdSize>& CContext::getDataSize()
794   {
795     CFile::mode_attr::t_enum mode = hasClient ? CFile::mode_attr::write : CFile::mode_attr::read;
796
797     // Set of grid used by enabled fields
798     std::set<StdString> usedGrid;
799
800     // Find all reference domain and axis of all active fields
801     int numEnabledFiles = this->enabledFiles.size();
802     for (int i = 0; i < numEnabledFiles; ++i)
803     {
804       CFile* file = this->enabledFiles[i];
805       CFile::mode_attr::t_enum fileMode = file->mode.isEmpty() ? CFile::mode_attr::write : file->mode.getValue();
806
807       if (fileMode == mode)
808       {
809         std::vector<CField*> enabledFields = file->getEnabledFields();
810         int numEnabledFields = enabledFields.size();
811         for (int j = 0; j < numEnabledFields; ++j)
812         {
813           StdString currentGrid = enabledFields[j]->grid->getId();
814           const std::map<int, StdSize> mapSize = enabledFields[j]->getGridDataSize();
815           if (dataSize_.empty())
816           {
817             dataSize_ = mapSize;
818             usedGrid.insert(currentGrid);
819           }
820           else
821           {
822             std::map<int, StdSize>::const_iterator it = mapSize.begin(), itE = mapSize.end();
823             if (usedGrid.find(currentGrid) == usedGrid.end())
824             {
825               for (; it != itE; ++it)
826               {
827                 if (0 < dataSize_.count(it->first)) dataSize_[it->first] += it->second;
828                 else dataSize_.insert(make_pair(it->first, it->second));
829               }
830             } else
831             {
832               for (; it != itE; ++it)
833               {
834                 if (0 < dataSize_.count(it->first))
835                  if (CXios::isOptPerformance) dataSize_[it->first] += it->second;
836                  else
837                  {
838                    if (dataSize_[it->first] < it->second) dataSize_[it->first] = it->second;
839                  }
840                 else dataSize_.insert(make_pair(it->first, it->second));
841               }
842             }
843           }
844         }
845       }
846     }
847
848     return dataSize_;
849   }
850
851   //! Client side: Send infomation of active files (files are enabled to write out)
852   void CContext::sendEnabledFiles()
853   {
854     int size = this->enabledFiles.size();
855
856     // In a context, each type has a root definition, e.g: axis, domain, field.
857     // Every object must be a child of one of these root definition. In this case
858     // all new file objects created on server must be children of the root "file_definition"
859     StdString fileDefRoot("file_definition");
860     CFileGroup* cfgrpPtr = CFileGroup::get(fileDefRoot);
861
862     for (int i = 0; i < size; ++i)
863     {
864       cfgrpPtr->sendCreateChild(this->enabledFiles[i]->getId());
865       this->enabledFiles[i]->sendAllAttributesToServer();
866       this->enabledFiles[i]->sendAddAllVariables();
867     }
868   }
869
870   //! Client side: Send information of active fields (ones are written onto files)
871   void CContext::sendEnabledFields()
872   {
873     int size = this->enabledFiles.size();
874     for (int i = 0; i < size; ++i)
875     {
876       this->enabledFiles[i]->sendEnabledFields();
877     }
878   }
879
880   //! Client side: Check if the defined axis, domains and grids are eligible for compressed indexed output
881   void CContext::checkAxisDomainsGridsEligibilityForCompressedOutput()
882   {
883     if (!hasClient) return;
884
885     const vector<CAxis*> allAxis = CAxis::getAll();
886     for (vector<CAxis*>::const_iterator it = allAxis.begin(); it != allAxis.end(); it++)
887       (*it)->checkEligibilityForCompressedOutput();
888
889     const vector<CDomain*> allDomains = CDomain::getAll();
890     for (vector<CDomain*>::const_iterator it = allDomains.begin(); it != allDomains.end(); it++)
891       (*it)->checkEligibilityForCompressedOutput();
892
893     const vector<CGrid*> allGrids = CGrid::getAll();
894     for (vector<CGrid*>::const_iterator it = allGrids.begin(); it != allGrids.end(); it++)
895       (*it)->checkEligibilityForCompressedOutput();
896   }
897
898   //! Client side: Prepare the timeseries by adding the necessary files
899   void CContext::prepareTimeseries()
900   {
901     if (!hasClient) return;
902
903     const std::vector<CFile*> allFiles = CFile::getAll();
904     for (size_t i = 0; i < allFiles.size(); i++)
905     {
906       CFile* file = allFiles[i];
907
908       if (!file->timeseries.isEmpty() && file->timeseries != CFile::timeseries_attr::none)
909       {
910         StdString tsPrefix = !file->ts_prefix.isEmpty() ? file->ts_prefix : (!file->name.isEmpty() ? file->name : file->getId());
911
912         const std::vector<CField*> allFields = file->getAllFields();
913         for (size_t j = 0; j < allFields.size(); j++)
914         {
915           CField* field = allFields[j];
916
917           if (!field->ts_enabled.isEmpty() && field->ts_enabled)
918           {
919             CFile* tsFile = CFile::create();
920             tsFile->duplicateAttributes(file);
921
922             tsFile->name = tsPrefix + "_";
923             if (!field->name.isEmpty())
924               tsFile->name.get() += field->name;
925             else if (field->hasDirectFieldReference()) // We cannot use getBaseFieldReference() just yet
926               tsFile->name.get() += field->field_ref;
927             else
928               tsFile->name.get() += field->getId();
929
930             if (!field->ts_split_freq.isEmpty())
931               tsFile->split_freq = field->ts_split_freq;
932
933             CField* tsField = tsFile->addField();
934             tsField->field_ref = field->getId();
935
936             tsFile->solveFieldRefInheritance(true);
937
938             if (file->timeseries == CFile::timeseries_attr::exclusive)
939               field->enabled = false;
940           }
941         }
942
943         // Finally disable the original file is need be
944         if (file->timeseries == CFile::timeseries_attr::only)
945          file->enabled = false;
946       }
947     }
948   }
949
950   //! Client side: Send information of reference grid of active fields
951   void CContext::sendRefGrid()
952   {
953     std::set<StdString> gridIds;
954     int sizeFile = this->enabledFiles.size();
955     CFile* filePtr(NULL);
956
957     // Firstly, find all reference grids of all active fields
958     for (int i = 0; i < sizeFile; ++i)
959     {
960       filePtr = this->enabledFiles[i];
961       std::vector<CField*> enabledFields = filePtr->getEnabledFields();
962       int sizeField = enabledFields.size();
963       for (int numField = 0; numField < sizeField; ++numField)
964       {
965         if (0 != enabledFields[numField]->getRelGrid())
966           gridIds.insert(CGrid::get(enabledFields[numField]->getRelGrid())->getId());
967       }
968     }
969
970     // Create all reference grids on server side
971     StdString gridDefRoot("grid_definition");
972     CGridGroup* gridPtr = CGridGroup::get(gridDefRoot);
973     std::set<StdString>::const_iterator it, itE = gridIds.end();
974     for (it = gridIds.begin(); it != itE; ++it)
975     {
976       gridPtr->sendCreateChild(*it);
977       CGrid::get(*it)->sendAllAttributesToServer();
978       CGrid::get(*it)->sendAllDomains();
979       CGrid::get(*it)->sendAllAxis();
980     }
981   }
982
983
984   //! Client side: Send information of reference domain and axis of active fields
985   void CContext::sendRefDomainsAxis()
986   {
987     std::set<StdString> domainIds;
988     std::set<StdString> axisIds;
989
990     // Find all reference domain and axis of all active fields
991     int numEnabledFiles = this->enabledFiles.size();
992     for (int i = 0; i < numEnabledFiles; ++i)
993     {
994       std::vector<CField*> enabledFields = this->enabledFiles[i]->getEnabledFields();
995       int numEnabledFields = enabledFields.size();
996       for (int j = 0; j < numEnabledFields; ++j)
997       {
998         const std::pair<StdString, StdString>& prDomAxisId = enabledFields[j]->getRefDomainAxisIds();
999         if ("" != prDomAxisId.first) domainIds.insert(prDomAxisId.first);
1000         if ("" != prDomAxisId.second) axisIds.insert(prDomAxisId.second);
1001       }
1002     }
1003
1004     // Create all reference axis on server side
1005     std::set<StdString>::iterator itDom, itAxis;
1006     std::set<StdString>::const_iterator itE;
1007
1008     StdString axiDefRoot("axis_definition");
1009     CAxisGroup* axisPtr = CAxisGroup::get(axiDefRoot);
1010     itE = axisIds.end();
1011     for (itAxis = axisIds.begin(); itAxis != itE; ++itAxis)
1012     {
1013       if (!itAxis->empty())
1014       {
1015         axisPtr->sendCreateChild(*itAxis);
1016         CAxis::get(*itAxis)->sendAllAttributesToServer();
1017       }
1018     }
1019
1020     // Create all reference domains on server side
1021     StdString domDefRoot("domain_definition");
1022     CDomainGroup* domPtr = CDomainGroup::get(domDefRoot);
1023     itE = domainIds.end();
1024     for (itDom = domainIds.begin(); itDom != itE; ++itDom)
1025     {
1026       if (!itDom->empty()) {
1027          domPtr->sendCreateChild(*itDom);
1028          CDomain::get(*itDom)->sendAllAttributesToServer();
1029       }
1030     }
1031   }
1032
1033   //! Update calendar in each time step
1034   void CContext::updateCalendar(int step)
1035   {
1036      info(50) << "updateCalendar : before : " << calendar->getCurrentDate() << endl;
1037      calendar->update(step);
1038      info(50) << "updateCalendar : after : " << calendar->getCurrentDate() << endl;
1039
1040      if (hasClient)
1041      {
1042        checkPrefetchingOfEnabledReadModeFiles();
1043        garbageCollector.invalidate(calendar->getCurrentDate());
1044      }
1045   }
1046
1047   //! Server side: Create header of netcdf file
1048   void CContext::createFileHeader(void )
1049   {
1050      vector<CFile*>::const_iterator it;
1051
1052      for (it=enabledFiles.begin(); it != enabledFiles.end(); it++)
1053      {
1054         (*it)->initFile();
1055      }
1056   }
1057
1058   //! Get current context
1059   CContext* CContext::getCurrent(void)
1060   {
1061     return CObjectFactory::GetObject<CContext>(CObjectFactory::GetCurrentContextId()).get();
1062   }
1063
1064   /*!
1065   \brief Set context with an id be the current context
1066   \param [in] id identity of context to be set to current
1067   */
1068   void CContext::setCurrent(const string& id)
1069   {
1070     CObjectFactory::SetCurrentContextId(id);
1071     CGroupFactory::SetCurrentContextId(id);
1072   }
1073
1074  /*!
1075  \brief Create a context with specific id
1076  \param [in] id identity of new context
1077  \return pointer to the new context or already-existed one with identity id
1078  */
1079  CContext* CContext::create(const StdString& id)
1080  {
1081    CContext::setCurrent(id);
1082
1083    bool hasctxt = CContext::has(id);
1084    CContext* context = CObjectFactory::CreateObject<CContext>(id).get();
1085    getRoot();
1086    if (!hasctxt) CGroupFactory::AddChild(root, context->getShared());
1087
1088#define DECLARE_NODE(Name_, name_) \
1089    C##Name_##Definition::create(C##Name_##Definition::GetDefName());
1090#define DECLARE_NODE_PAR(Name_, name_)
1091#include "node_type.conf"
1092
1093    return (context);
1094  }
1095
1096
1097
1098     //! Server side: Receive a message to do some post processing
1099  void CContext::recvRegistry(CEventServer& event)
1100  {
1101    CBufferIn* buffer=event.subEvents.begin()->buffer;
1102    string id;
1103    *buffer>>id;
1104    get(id)->recvRegistry(*buffer);
1105  }
1106
1107  void CContext::recvRegistry(CBufferIn& buffer)
1108  {
1109    if (server->intraCommRank==0)
1110    {
1111      CRegistry registry(server->intraComm) ;
1112      registry.fromBuffer(buffer) ;
1113      registryOut->mergeRegistry(registry) ;
1114    }
1115  }
1116
1117  void CContext::sendRegistry(void)
1118  {
1119    registryOut->hierarchicalGatherRegistry() ;
1120
1121    CEventClient event(CContext::GetType(), CContext::EVENT_ID_SEND_REGISTRY);
1122    if (client->isServerLeader())
1123    {
1124       CMessage msg ;
1125       msg<<this->getIdServer();
1126       if (client->clientRank==0) msg<<*registryOut ;
1127       const std::list<int>& ranks = client->getRanksServerLeader();
1128       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
1129         event.push(*itRank,1,msg);
1130       client->sendEvent(event);
1131     }
1132     else client->sendEvent(event);
1133  }
1134
1135} // namespace xios
Note: See TracBrowser for help on using the repository browser.