source: XIOS/trunk/src/node/context.cpp @ 1190

Last change on this file since 1190 was 1190, checked in by rlacroix, 7 years ago

Remove an old failsafe mechanism that could hide some buffers mispredictions.

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
File size: 43.1 KB
Line 
1#include "context.hpp"
2#include "attribute_template.hpp"
3#include "object_template.hpp"
4#include "group_template.hpp"
5
6#include "calendar_type.hpp"
7#include "duration.hpp"
8
9#include "context_client.hpp"
10#include "context_server.hpp"
11#include "nc4_data_output.hpp"
12#include "node_type.hpp"
13#include "message.hpp"
14#include "type.hpp"
15#include "xios_spl.hpp"
16#include "timer.hpp"
17#include "memtrack.hpp"
18
19
20namespace xios {
21
22  shared_ptr<CContextGroup> CContext::root;
23
24   /// ////////////////////// Définitions ////////////////////// ///
25
26   CContext::CContext(void)
27      : CObjectTemplate<CContext>(), CContextAttributes()
28      , calendar(), hasClient(false), hasServer(false), isPostProcessed(false), finalized(false)
29      , idServer_(), client(0), server(0)
30   { /* Ne rien faire de plus */ }
31
32   CContext::CContext(const StdString & id)
33      : CObjectTemplate<CContext>(id), CContextAttributes()
34      , calendar(), hasClient(false), hasServer(false), isPostProcessed(false), finalized(false)
35      , idServer_(), client(0), server(0)
36   { /* Ne rien faire de plus */ }
37
38   CContext::~CContext(void)
39   {
40     delete client;
41     delete server;
42   }
43
44   //----------------------------------------------------------------
45   //! Get name of context
46   StdString CContext::GetName(void)   { return (StdString("context")); }
47   StdString CContext::GetDefName(void){ return (CContext::GetName()); }
48   ENodeType CContext::GetType(void)   { return (eContext); }
49
50   //----------------------------------------------------------------
51
52   /*!
53   \brief Get context group (context root)
54   \return Context root
55   */
56   CContextGroup* CContext::getRoot(void)
57   {
58      if (root.get()==NULL) root=shared_ptr<CContextGroup>(new CContextGroup(xml::CXMLNode::GetRootName()));
59      return root.get();
60   }
61
62   //----------------------------------------------------------------
63
64   /*!
65   \brief Get calendar of a context
66   \return Calendar
67   */
68   boost::shared_ptr<CCalendar> CContext::getCalendar(void) const
69   {
70      return (this->calendar);
71   }
72
73   //----------------------------------------------------------------
74
75   /*!
76   \brief Set a context with a calendar
77   \param[in] newCalendar new calendar
78   */
79   void CContext::setCalendar(boost::shared_ptr<CCalendar> newCalendar)
80   {
81      this->calendar = newCalendar;
82   }
83
84   //----------------------------------------------------------------
85   /*!
86   \brief Parse xml file and write information into context object
87   \param [in] node xmld node corresponding in xml file
88   */
89   void CContext::parse(xml::CXMLNode & node)
90   {
91      CContext::SuperClass::parse(node);
92
93      // PARSING POUR GESTION DES ENFANTS
94      xml::THashAttributes attributes = node.getAttributes();
95
96      if (attributes.end() != attributes.find("src"))
97      {
98         StdIFStream ifs ( attributes["src"].c_str() , StdIFStream::in );
99         if ( (ifs.rdstate() & std::ifstream::failbit ) != 0 )
100            ERROR("void CContext::parse(xml::CXMLNode & node)",
101                  <<endl<< "Can not open <"<<attributes["src"].c_str()<<"> file" );
102         if (!ifs.good())
103            ERROR("CContext::parse(xml::CXMLNode & node)",
104                  << "[ filename = " << attributes["src"] << " ] Bad xml stream !");
105         xml::CXMLParser::ParseInclude(ifs, attributes["src"], *this);
106      }
107
108      if (node.getElementName().compare(CContext::GetName()))
109         DEBUG("Le noeud is wrong defined but will be considered as a context !");
110
111      if (!(node.goToChildElement()))
112      {
113         DEBUG("Le context ne contient pas d'enfant !");
114      }
115      else
116      {
117         do { // Parcours des contextes pour traitement.
118
119            StdString name = node.getElementName();
120            attributes.clear();
121            attributes = node.getAttributes();
122
123            if (attributes.end() != attributes.find("id"))
124            {
125              DEBUG(<< "Definition node has an id,"
126                    << "it will not be taking account !");
127            }
128
129#define DECLARE_NODE(Name_, name_)    \
130   if (name.compare(C##Name_##Definition::GetDefName()) == 0) \
131   { C##Name_##Definition::create(C##Name_##Definition::GetDefName()) -> parse(node); continue; }
132#define DECLARE_NODE_PAR(Name_, name_)
133#include "node_type.conf"
134
135            DEBUG(<< "The element \'"     << name
136                  << "\' in the context \'" << CContext::getCurrent()->getId()
137                  << "\' is not a definition !");
138
139         } while (node.goToNextElement());
140
141         node.goToParentElement(); // Retour au parent
142      }
143   }
144
145   //----------------------------------------------------------------
146   //! Show tree structure of context
147   void CContext::ShowTree(StdOStream & out)
148   {
149      StdString currentContextId = CContext::getCurrent() -> getId();
150      std::vector<CContext*> def_vector =
151         CContext::getRoot()->getChildList();
152      std::vector<CContext*>::iterator
153         it = def_vector.begin(), end = def_vector.end();
154
155      out << "<? xml version=\"1.0\" ?>" << std::endl;
156      out << "<"  << xml::CXMLNode::GetRootName() << " >" << std::endl;
157
158      for (; it != end; it++)
159      {
160         CContext* context = *it;
161         CContext::setCurrent(context->getId());
162         out << *context << std::endl;
163      }
164
165      out << "</" << xml::CXMLNode::GetRootName() << " >" << std::endl;
166      CContext::setCurrent(currentContextId);
167   }
168
169
170   //----------------------------------------------------------------
171
172   //! Convert context object into string (to print)
173   StdString CContext::toString(void) const
174   {
175      StdOStringStream oss;
176      oss << "<" << CContext::GetName()
177          << " id=\"" << this->getId() << "\" "
178          << SuperClassAttribute::toString() << ">" << std::endl;
179      if (!this->hasChild())
180      {
181         //oss << "<!-- No definition -->" << std::endl; // fait planter l'incrémentation
182      }
183      else
184      {
185
186#define DECLARE_NODE(Name_, name_)    \
187   if (C##Name_##Definition::has(C##Name_##Definition::GetDefName())) \
188   oss << * C##Name_##Definition::get(C##Name_##Definition::GetDefName()) << std::endl;
189#define DECLARE_NODE_PAR(Name_, name_)
190#include "node_type.conf"
191
192      }
193
194      oss << "</" << CContext::GetName() << " >";
195
196      return (oss.str());
197   }
198
199   //----------------------------------------------------------------
200
201   /*!
202   \brief Find all inheritace among objects in a context.
203   \param [in] apply (true) write attributes of parent into ones of child if they are empty
204                     (false) write attributes of parent into a new container of child
205   \param [in] parent unused
206   */
207   void CContext::solveDescInheritance(bool apply, const CAttributeMap * const UNUSED(parent))
208   {
209#define DECLARE_NODE(Name_, name_)    \
210   if (C##Name_##Definition::has(C##Name_##Definition::GetDefName())) \
211     C##Name_##Definition::get(C##Name_##Definition::GetDefName())->solveDescInheritance(apply);
212#define DECLARE_NODE_PAR(Name_, name_)
213#include "node_type.conf"
214   }
215
216   //----------------------------------------------------------------
217
218   //! Verify if all root definition in the context have child.
219   bool CContext::hasChild(void) const
220   {
221      return (
222#define DECLARE_NODE(Name_, name_)    \
223   C##Name_##Definition::has(C##Name_##Definition::GetDefName())   ||
224#define DECLARE_NODE_PAR(Name_, name_)
225#include "node_type.conf"
226      false);
227}
228
229   //----------------------------------------------------------------
230
231   void CContext::CleanTree(void)
232   {
233#define DECLARE_NODE(Name_, name_) C##Name_##Definition::ClearAllAttributes();
234#define DECLARE_NODE_PAR(Name_, name_)
235#include "node_type.conf"
236   }
237   ///---------------------------------------------------------------
238
239   //! Initialize client side
240   void CContext::initClient(MPI_Comm intraComm, MPI_Comm interComm, CContext* cxtServer /*= 0*/)
241   {
242     hasClient=true;
243     client = new CContextClient(this,intraComm, interComm, cxtServer);
244     registryIn=new CRegistry(intraComm);
245     registryIn->setPath(getId()) ;
246     if (client->clientRank==0) registryIn->fromFile("xios_registry.bin") ;
247     registryIn->bcastRegistry() ;
248
249     registryOut=new CRegistry(intraComm) ;
250     registryOut->setPath(getId()) ;
251
252     MPI_Comm intraCommServer, interCommServer;
253     if (cxtServer) // Attached mode
254     {
255       intraCommServer = intraComm;
256       interCommServer = interComm;
257     }
258     else
259     {
260       MPI_Comm_dup(intraComm, &intraCommServer);
261       comms.push_back(intraCommServer);
262       MPI_Comm_dup(interComm, &interCommServer);
263       comms.push_back(interCommServer);
264     }
265     server = new CContextServer(this,intraCommServer,interCommServer);
266   }
267
268   void CContext::setClientServerBuffer()
269   {
270     size_t minBufferSize = CXios::minBufferSize;
271#define DECLARE_NODE(Name_, name_)    \
272     if (minBufferSize < sizeof(C##Name_##Definition)) minBufferSize = sizeof(C##Name_##Definition);
273#define DECLARE_NODE_PAR(Name_, name_)
274#include "node_type.conf"
275#undef DECLARE_NODE
276#undef DECLARE_NODE_PAR
277
278     std::map<int, StdSize> maxEventSize;
279     std::map<int, StdSize> bufferSize = getAttributesBufferSize(maxEventSize);
280     std::map<int, StdSize> dataBufferSize = getDataBufferSize(maxEventSize);
281
282     std::map<int, StdSize>::iterator it, ite = dataBufferSize.end();
283     for (it = dataBufferSize.begin(); it != ite; ++it)
284       if (it->second > bufferSize[it->first]) bufferSize[it->first] = it->second;
285
286     ite = bufferSize.end();
287     for (it = bufferSize.begin(); it != ite; ++it)
288     {
289       it->second *= CXios::bufferSizeFactor;
290       if (it->second < minBufferSize) it->second = minBufferSize;
291     }
292
293     // We consider that the minimum buffer size is also the minimum event size
294     ite = maxEventSize.end();
295     for (it = maxEventSize.begin(); it != ite; ++it)
296       if (it->second < minBufferSize) it->second = minBufferSize;
297
298     client->setBufferSize(bufferSize, maxEventSize);
299   }
300
301   //! Verify whether a context is initialized
302   bool CContext::isInitialized(void)
303   {
304     return hasClient;
305   }
306
307   //! Initialize server
308   void CContext::initServer(MPI_Comm intraComm,MPI_Comm interComm, CContext* cxtClient /*= 0*/)
309   {
310     hasServer=true;
311     server = new CContextServer(this,intraComm,interComm);
312
313     registryIn=new CRegistry(intraComm);
314     registryIn->setPath(getId()) ;
315     if (server->intraCommRank==0) registryIn->fromFile("xios_registry.bin") ;
316     registryIn->bcastRegistry() ;
317     registryOut=new CRegistry(intraComm) ;
318     registryOut->setPath(getId()) ;
319
320     MPI_Comm intraCommClient, interCommClient;
321     if (cxtClient) // Attached mode
322     {
323       intraCommClient = intraComm;
324       interCommClient = interComm;
325     }
326     else
327     {
328       MPI_Comm_dup(intraComm, &intraCommClient);
329       comms.push_back(intraCommClient);
330       MPI_Comm_dup(interComm, &interCommClient);
331       comms.push_back(interCommClient);
332     }
333     client = new CContextClient(this,intraCommClient,interCommClient, cxtClient);
334   }
335
336   //! Try to send the buffers and receive possible answers
337   bool CContext::checkBuffersAndListen(void)
338   {
339     client->checkBuffers();
340
341     bool hasTmpBufferedEvent = client->hasTemporarilyBufferedEvent();
342     if (hasTmpBufferedEvent)
343       hasTmpBufferedEvent = !client->sendTemporarilyBufferedEvent();
344
345     // Don't process events if there is a temporarily buffered event
346     return server->eventLoop(!hasTmpBufferedEvent);
347   }
348
349   //! Terminate a context
350   void CContext::finalize(void)
351   {
352      if (!finalized)
353      {
354        finalized = true;
355        if (hasClient) sendRegistry() ;
356        client->finalize();
357        while (!server->hasFinished())
358        {
359          server->eventLoop();
360        }
361
362        if (hasServer)
363        {
364          closeAllFile();
365          registryOut->hierarchicalGatherRegistry() ;
366          if (server->intraCommRank==0) CXios::globalRegistry->mergeRegistry(*registryOut) ;
367        }
368
369        for (std::list<MPI_Comm>::iterator it = comms.begin(); it != comms.end(); ++it)
370          MPI_Comm_free(&(*it));
371        comms.clear();
372      }
373   }
374
375   /*!
376   \brief Close all the context defintion and do processing data
377      After everything is well defined on client side, they will be processed and sent to server
378   From the version 2.0, sever and client work no more on the same database. Moreover, client(s) will send
379   all necessary information to server, from which each server can build its own database.
380   Because the role of server is to write out field data on a specific netcdf file,
381   the only information that it needs is the enabled files
382   and the active fields (fields will be written onto active files)
383   */
384   void CContext::closeDefinition(void)
385   {
386     CTimer::get("Context : close definition").resume() ;
387     // There is nothing client need to send to server
388     if (hasClient)
389     {
390       // After xml is parsed, there are some more works with post processing
391       postProcessing();
392     }
393     setClientServerBuffer();
394
395     if (hasClient && !hasServer)
396     {
397      // Send all attributes of current context to server
398      this->sendAllAttributesToServer();
399
400      // Send all attributes of current calendar
401      CCalendarWrapper::get(CCalendarWrapper::GetDefName())->sendAllAttributesToServer();
402
403      // We have enough information to send to server
404      // First of all, send all enabled files
405       sendEnabledFiles();
406
407      // Then, send all enabled fields
408       sendEnabledFields();
409
410      // At last, we have all info of domain and axis, then send them
411       sendRefDomainsAxis();
412
413      // After that, send all grid (if any)
414       sendRefGrid();
415    }
416
417    // We have a xml tree on the server side and now, it should be also processed
418    if (hasClient && !hasServer) sendPostProcessing();
419
420    // There are some processings that should be done after all of above. For example: check mask or index
421    if (hasClient)
422    {
423      this->buildFilterGraphOfEnabledFields();
424      buildFilterGraphOfFieldsWithReadAccess();
425      this->solveAllRefOfEnabledFields(true);
426    }
427
428    // Now tell server that it can process all messages from client
429    if (hasClient && !hasServer) this->sendCloseDefinition();
430
431    // Nettoyage de l'arborescence
432    if (hasClient && !hasServer) CleanTree(); // Only on client side??
433
434    if (hasClient)
435    {
436      sendCreateFileHeader();
437
438      startPrefetchingOfEnabledReadModeFiles();
439    }
440    CTimer::get("Context : close definition").suspend() ;
441   }
442
443   void CContext::findAllEnabledFields(void)
444   {
445     for (unsigned int i = 0; i < this->enabledFiles.size(); i++)
446     (void)this->enabledFiles[i]->getEnabledFields();
447   }
448
449   void CContext::findAllEnabledFieldsInReadModeFiles(void)
450   {
451     for (unsigned int i = 0; i < this->enabledReadModeFiles.size(); ++i)
452     (void)this->enabledReadModeFiles[i]->getEnabledFields();
453   }
454
455   void CContext::readAttributesOfEnabledFieldsInReadModeFiles()
456   {
457      for (unsigned int i = 0; i < this->enabledReadModeFiles.size(); ++i)
458        (void)this->enabledReadModeFiles[i]->readAttributesOfEnabledFieldsInReadMode();
459   }
460
461   void CContext::solveOnlyRefOfEnabledFields(bool sendToServer)
462   {
463     int size = this->enabledFiles.size();
464     for (int i = 0; i < size; ++i)
465     {
466       this->enabledFiles[i]->solveOnlyRefOfEnabledFields(sendToServer);
467     }
468
469     for (int i = 0; i < size; ++i)
470     {
471       this->enabledFiles[i]->generateNewTransformationGridDest();
472     }
473   }
474
475   void CContext::solveAllRefOfEnabledFields(bool sendToServer)
476   {
477     int size = this->enabledFiles.size();
478     for (int i = 0; i < size; ++i)
479     {
480       this->enabledFiles[i]->solveAllRefOfEnabledFields(sendToServer);
481     }
482   }
483
484   void CContext::buildFilterGraphOfEnabledFields()
485   {
486     int size = this->enabledFiles.size();
487     for (int i = 0; i < size; ++i)
488     {
489       this->enabledFiles[i]->buildFilterGraphOfEnabledFields(garbageCollector);
490     }
491   }
492
493   void CContext::startPrefetchingOfEnabledReadModeFiles()
494   {
495     int size = enabledReadModeFiles.size();
496     for (int i = 0; i < size; ++i)
497     {
498        enabledReadModeFiles[i]->prefetchEnabledReadModeFields();
499     }
500   }
501
502   void CContext::checkPrefetchingOfEnabledReadModeFiles()
503   {
504     int size = enabledReadModeFiles.size();
505     for (int i = 0; i < size; ++i)
506     {
507        enabledReadModeFiles[i]->prefetchEnabledReadModeFieldsIfNeeded();
508     }
509   }
510
511  void CContext::findFieldsWithReadAccess(void)
512  {
513    fieldsWithReadAccess.clear();
514    const vector<CField*> allFields = CField::getAll();
515    for (size_t i = 0; i < allFields.size(); ++i)
516    {
517      CField* field = allFields[i];
518
519      if (field->file && !field->file->mode.isEmpty() && field->file->mode == CFile::mode_attr::read)
520        field->read_access = true;
521      else if (!field->read_access.isEmpty() && field->read_access && (field->enabled.isEmpty() || field->enabled))
522        fieldsWithReadAccess.push_back(field);
523    }
524  }
525
526  void CContext::solveAllRefOfFieldsWithReadAccess()
527  {
528    for (size_t i = 0; i < fieldsWithReadAccess.size(); ++i)
529      fieldsWithReadAccess[i]->solveAllReferenceEnabledField(false);
530  }
531
532  void CContext::buildFilterGraphOfFieldsWithReadAccess()
533  {
534    for (size_t i = 0; i < fieldsWithReadAccess.size(); ++i)
535      fieldsWithReadAccess[i]->buildFilterGraph(garbageCollector, true);
536  }
537
538   void CContext::solveAllInheritance(bool apply)
539   {
540     // Résolution des héritages descendants (càd des héritages de groupes)
541     // pour chacun des contextes.
542      solveDescInheritance(apply);
543
544     // Résolution des héritages par référence au niveau des fichiers.
545      const vector<CFile*> allFiles=CFile::getAll();
546      const vector<CGrid*> allGrids= CGrid::getAll();
547
548     //if (hasClient && !hasServer)
549      if (hasClient)
550      {
551        for (unsigned int i = 0; i < allFiles.size(); i++)
552          allFiles[i]->solveFieldRefInheritance(apply);
553      }
554
555      unsigned int vecSize = allGrids.size();
556      unsigned int i = 0;
557      for (i = 0; i < vecSize; ++i)
558        allGrids[i]->solveDomainAxisRefInheritance(apply);
559
560   }
561
562   void CContext::findEnabledFiles(void)
563   {
564      const std::vector<CFile*> allFiles = CFile::getAll();
565      const CDate& initDate = calendar->getInitDate();
566
567      for (unsigned int i = 0; i < allFiles.size(); i++)
568         if (!allFiles[i]->enabled.isEmpty()) // Si l'attribut 'enabled' est défini.
569         {
570            if (allFiles[i]->enabled.getValue()) // Si l'attribut 'enabled' est fixé à vrai.
571            {
572              if ((initDate + allFiles[i]->output_freq.getValue()) < (initDate + this->getCalendar()->getTimeStep()))
573              {
574                error(0)<<"WARNING: void CContext::findEnabledFiles()"<<endl
575                    << "Output frequency in file \""<<allFiles[i]->getFileOutputName()
576                    <<"\" is less than the time step. File will not be written."<<endl;
577              }
578              else
579               enabledFiles.push_back(allFiles[i]);
580            }
581         }
582         else
583         {
584           if ( (initDate + allFiles[i]->output_freq.getValue()) < (initDate + this->getCalendar()->getTimeStep()))
585           {
586             error(0)<<"WARNING: void CContext::findEnabledFiles()"<<endl
587                 << "Output frequency in file \""<<allFiles[i]->getFileOutputName()
588                 <<"\" is less than the time step. File will not be written."<<endl;
589           }
590           else
591             enabledFiles.push_back(allFiles[i]); // otherwise true by default
592         }
593
594      if (enabledFiles.size() == 0)
595         DEBUG(<<"Aucun fichier ne va être sorti dans le contexte nommé \""
596               << getId() << "\" !");
597   }
598
599   void CContext::findEnabledReadModeFiles(void)
600   {
601     int size = this->enabledFiles.size();
602     for (int i = 0; i < size; ++i)
603     {
604       if (!enabledFiles[i]->mode.isEmpty() && enabledFiles[i]->mode.getValue() == CFile::mode_attr::read)
605        enabledReadModeFiles.push_back(enabledFiles[i]);
606     }
607   }
608
609   void CContext::closeAllFile(void)
610   {
611     std::vector<CFile*>::const_iterator
612            it = this->enabledFiles.begin(), end = this->enabledFiles.end();
613
614     for (; it != end; it++)
615     {
616       info(30)<<"Closing File : "<<(*it)->getId()<<endl;
617       (*it)->close();
618     }
619   }
620
621   /*!
622   \brief Dispatch event received from client
623      Whenever a message is received in buffer of server, it will be processed depending on
624   its event type. A new event type should be added in the switch list to make sure
625   it processed on server side.
626   \param [in] event: Received message
627   */
628   bool CContext::dispatchEvent(CEventServer& event)
629   {
630
631      if (SuperClass::dispatchEvent(event)) return true;
632      else
633      {
634        switch(event.type)
635        {
636           case EVENT_ID_CLOSE_DEFINITION :
637             recvCloseDefinition(event);
638             return true;
639             break;
640           case EVENT_ID_UPDATE_CALENDAR:
641             recvUpdateCalendar(event);
642             return true;
643             break;
644           case EVENT_ID_CREATE_FILE_HEADER :
645             recvCreateFileHeader(event);
646             return true;
647             break;
648           case EVENT_ID_POST_PROCESS:
649             recvPostProcessing(event);
650             return true;
651            case EVENT_ID_SEND_REGISTRY:
652             recvRegistry(event);
653             return true;
654            break;
655
656           default :
657             ERROR("bool CContext::dispatchEvent(CEventServer& event)",
658                    <<"Unknown Event");
659           return false;
660         }
661      }
662   }
663
664   //! Client side: Send a message to server to make it close
665   void CContext::sendCloseDefinition(void)
666   {
667     CEventClient event(getType(),EVENT_ID_CLOSE_DEFINITION);
668     if (client->isServerLeader())
669     {
670       CMessage msg;
671       msg<<this->getIdServer();
672       const std::list<int>& ranks = client->getRanksServerLeader();
673       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
674         event.push(*itRank,1,msg);
675       client->sendEvent(event);
676     }
677     else client->sendEvent(event);
678   }
679
680   //! Server side: Receive a message of client announcing a context close
681   void CContext::recvCloseDefinition(CEventServer& event)
682   {
683
684      CBufferIn* buffer=event.subEvents.begin()->buffer;
685      string id;
686      *buffer>>id;
687      get(id)->closeDefinition();
688   }
689
690   //! Client side: Send a message to update calendar in each time step
691   void CContext::sendUpdateCalendar(int step)
692   {
693     if (!hasServer)
694     {
695       CEventClient event(getType(),EVENT_ID_UPDATE_CALENDAR);
696       if (client->isServerLeader())
697       {
698         CMessage msg;
699         msg<<this->getIdServer()<<step;
700         const std::list<int>& ranks = client->getRanksServerLeader();
701         for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
702           event.push(*itRank,1,msg);
703         client->sendEvent(event);
704       }
705       else client->sendEvent(event);
706     }
707   }
708
709   //! Server side: Receive a message of client annoucing calendar update
710   void CContext::recvUpdateCalendar(CEventServer& event)
711   {
712      CBufferIn* buffer=event.subEvents.begin()->buffer;
713      string id;
714      *buffer>>id;
715      get(id)->recvUpdateCalendar(*buffer);
716   }
717
718   //! Server side: Receive a message of client annoucing calendar update
719   void CContext::recvUpdateCalendar(CBufferIn& buffer)
720   {
721      int step;
722      buffer>>step;
723      updateCalendar(step);
724   }
725
726   //! Client side: Send a message to create header part of netcdf file
727   void CContext::sendCreateFileHeader(void)
728   {
729     CEventClient event(getType(),EVENT_ID_CREATE_FILE_HEADER);
730     if (client->isServerLeader())
731     {
732       CMessage msg;
733       msg<<this->getIdServer();
734       const std::list<int>& ranks = client->getRanksServerLeader();
735       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
736         event.push(*itRank,1,msg) ;
737       client->sendEvent(event);
738     }
739     else client->sendEvent(event);
740   }
741
742   //! Server side: Receive a message of client annoucing the creation of header part of netcdf file
743   void CContext::recvCreateFileHeader(CEventServer& event)
744   {
745      CBufferIn* buffer=event.subEvents.begin()->buffer;
746      string id;
747      *buffer>>id;
748      get(id)->recvCreateFileHeader(*buffer);
749   }
750
751   //! Server side: Receive a message of client annoucing the creation of header part of netcdf file
752   void CContext::recvCreateFileHeader(CBufferIn& buffer)
753   {
754      createFileHeader();
755   }
756
757   //! Client side: Send a message to do some post processing on server
758   void CContext::sendPostProcessing()
759   {
760     if (!hasServer)
761     {
762       CEventClient event(getType(),EVENT_ID_POST_PROCESS);
763       if (client->isServerLeader())
764       {
765         CMessage msg;
766         msg<<this->getIdServer();
767         const std::list<int>& ranks = client->getRanksServerLeader();
768         for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
769           event.push(*itRank,1,msg);
770         client->sendEvent(event);
771       }
772       else client->sendEvent(event);
773     }
774   }
775
776   //! Server side: Receive a message to do some post processing
777   void CContext::recvPostProcessing(CEventServer& event)
778   {
779      CBufferIn* buffer=event.subEvents.begin()->buffer;
780      string id;
781      *buffer>>id;
782      get(id)->recvPostProcessing(*buffer);
783   }
784
785   //! Server side: Receive a message to do some post processing
786   void CContext::recvPostProcessing(CBufferIn& buffer)
787   {
788      CCalendarWrapper::get(CCalendarWrapper::GetDefName())->createCalendar();
789      postProcessing();
790   }
791
792   const StdString& CContext::getIdServer()
793   {
794      if (hasClient)
795      {
796        idServer_ = this->getId();
797        idServer_ += "_server";
798        return idServer_;
799      }
800      if (hasServer) return (this->getId());
801   }
802
803   /*!
804   \brief Do some simple post processings after parsing xml file
805      After the xml file (iodef.xml) is parsed, it is necessary to build all relations among
806   created object, e.g: inhertance among fields, domain, axis. After that, all fiels as well as their parents (reference fields),
807   which will be written out into netcdf files, are processed
808   */
809   void CContext::postProcessing()
810   {
811     if (isPostProcessed) return;
812
813      // Make sure the calendar was correctly created
814      if (!calendar)
815        ERROR("CContext::postProcessing()", << "A calendar must be defined for the context \"" << getId() << "!\"")
816      else if (calendar->getTimeStep() == NoneDu)
817        ERROR("CContext::postProcessing()", << "A timestep must be defined for the context \"" << getId() << "!\"")
818      // Calendar first update to set the current date equals to the start date
819      calendar->update(0);
820
821      // Find all inheritance in xml structure
822      this->solveAllInheritance();
823
824      // Check if some axis, domains or grids are eligible to for compressed indexed output.
825      // Warning: This must be done after solving the inheritance and before the rest of post-processing
826      checkAxisDomainsGridsEligibilityForCompressedOutput();
827
828      // Check if some automatic time series should be generated
829      // Warning: This must be done after solving the inheritance and before the rest of post-processing
830      prepareTimeseries();
831
832      //Initialisation du vecteur 'enabledFiles' contenant la liste des fichiers à sortir.
833      this->findEnabledFiles();
834      this->findEnabledReadModeFiles();
835
836      // Find all enabled fields of each file
837      this->findAllEnabledFields();
838      this->findAllEnabledFieldsInReadModeFiles();
839
840     if (hasClient && !hasServer)
841     {
842      // Try to read attributes of fields in file then fill in corresponding grid (or domain, axis)
843      this->readAttributesOfEnabledFieldsInReadModeFiles();
844     }
845
846      // Only search and rebuild all reference objects of enable fields, don't transform
847      this->solveOnlyRefOfEnabledFields(false);
848
849      // Search and rebuild all reference object of enabled fields
850      this->solveAllRefOfEnabledFields(false);
851
852      // Find all fields with read access from the public API
853      findFieldsWithReadAccess();
854      // and solve the all reference for them
855      solveAllRefOfFieldsWithReadAccess();
856
857      isPostProcessed = true;
858   }
859
860   /*!
861    * Compute the required buffer size to send the attributes (mostly those grid related).
862    *
863    * \param maxEventSize [in/out] the size of the bigger event for each connected server
864    */
865   std::map<int, StdSize> CContext::getAttributesBufferSize(std::map<int, StdSize>& maxEventSize)
866   {
867     std::map<int, StdSize> attributesSize;
868
869     if (hasClient)
870     {
871       size_t numEnabledFiles = this->enabledFiles.size();
872       for (size_t i = 0; i < numEnabledFiles; ++i)
873       {
874         CFile* file = this->enabledFiles[i];
875
876         std::vector<CField*> enabledFields = file->getEnabledFields();
877         size_t numEnabledFields = enabledFields.size();
878         for (size_t j = 0; j < numEnabledFields; ++j)
879         {
880           const std::map<int, StdSize> mapSize = enabledFields[j]->getGridAttributesBufferSize();
881           std::map<int, StdSize>::const_iterator it = mapSize.begin(), itE = mapSize.end();
882           for (; it != itE; ++it)
883           {
884             // If attributesSize[it->first] does not exist, it will be zero-initialized
885             // so we can use it safely without checking for its existance
886             if (attributesSize[it->first] < it->second)
887               attributesSize[it->first] = it->second;
888
889             if (maxEventSize[it->first] < it->second)
890               maxEventSize[it->first] = it->second;
891           }
892         }
893       }
894     }
895
896     return attributesSize;
897   }
898
899   /*!
900    * Compute the required buffer size to send the fields data.
901    *
902    * \param maxEventSize [in/out] the size of the bigger event for each connected server
903    */
904   std::map<int, StdSize> CContext::getDataBufferSize(std::map<int, StdSize>& maxEventSize)
905   {
906     CFile::mode_attr::t_enum mode = hasClient ? CFile::mode_attr::write : CFile::mode_attr::read;
907
908     std::map<int, StdSize> dataSize;
909
910     // Find all reference domain and axis of all active fields
911     size_t numEnabledFiles = this->enabledFiles.size();
912     for (size_t i = 0; i < numEnabledFiles; ++i)
913     {
914       CFile* file = this->enabledFiles[i];
915       CFile::mode_attr::t_enum fileMode = file->mode.isEmpty() ? CFile::mode_attr::write : file->mode.getValue();
916
917       if (fileMode == mode)
918       {
919         std::vector<CField*> enabledFields = file->getEnabledFields();
920         size_t numEnabledFields = enabledFields.size();
921         for (size_t j = 0; j < numEnabledFields; ++j)
922         {
923           const std::map<int, StdSize> mapSize = enabledFields[j]->getGridDataBufferSize();
924           std::map<int, StdSize>::const_iterator it = mapSize.begin(), itE = mapSize.end();
925           for (; it != itE; ++it)
926           {
927             // If dataSize[it->first] does not exist, it will be zero-initialized
928             // so we can use it safely without checking for its existance
929             if (CXios::isOptPerformance)
930               dataSize[it->first] += it->second;
931             else if (dataSize[it->first] < it->second)
932               dataSize[it->first] = it->second;
933
934             if (maxEventSize[it->first] < it->second)
935               maxEventSize[it->first] = it->second;
936           }
937         }
938       }
939     }
940
941     return dataSize;
942   }
943
944   //! Client side: Send infomation of active files (files are enabled to write out)
945   void CContext::sendEnabledFiles()
946   {
947     int size = this->enabledFiles.size();
948
949     // In a context, each type has a root definition, e.g: axis, domain, field.
950     // Every object must be a child of one of these root definition. In this case
951     // all new file objects created on server must be children of the root "file_definition"
952     StdString fileDefRoot("file_definition");
953     CFileGroup* cfgrpPtr = CFileGroup::get(fileDefRoot);
954
955     for (int i = 0; i < size; ++i)
956     {
957       cfgrpPtr->sendCreateChild(this->enabledFiles[i]->getId());
958       this->enabledFiles[i]->sendAllAttributesToServer();
959       this->enabledFiles[i]->sendAddAllVariables();
960     }
961   }
962
963   //! Client side: Send information of active fields (ones are written onto files)
964   void CContext::sendEnabledFields()
965   {
966     int size = this->enabledFiles.size();
967     for (int i = 0; i < size; ++i)
968     {
969       this->enabledFiles[i]->sendEnabledFields();
970     }
971   }
972
973   //! Client side: Check if the defined axis, domains and grids are eligible for compressed indexed output
974   void CContext::checkAxisDomainsGridsEligibilityForCompressedOutput()
975   {
976     if (!hasClient) return;
977
978     const vector<CAxis*> allAxis = CAxis::getAll();
979     for (vector<CAxis*>::const_iterator it = allAxis.begin(); it != allAxis.end(); it++)
980       (*it)->checkEligibilityForCompressedOutput();
981
982     const vector<CDomain*> allDomains = CDomain::getAll();
983     for (vector<CDomain*>::const_iterator it = allDomains.begin(); it != allDomains.end(); it++)
984       (*it)->checkEligibilityForCompressedOutput();
985
986     const vector<CGrid*> allGrids = CGrid::getAll();
987     for (vector<CGrid*>::const_iterator it = allGrids.begin(); it != allGrids.end(); it++)
988       (*it)->checkEligibilityForCompressedOutput();
989   }
990
991   //! Client side: Prepare the timeseries by adding the necessary files
992   void CContext::prepareTimeseries()
993   {
994     if (!hasClient) return;
995
996     const std::vector<CFile*> allFiles = CFile::getAll();
997     for (size_t i = 0; i < allFiles.size(); i++)
998     {
999       CFile* file = allFiles[i];
1000
1001       std::vector<CVariable*> fileVars, fieldVars, vars = file->getAllVariables();
1002       for (size_t k = 0; k < vars.size(); k++)
1003       {
1004         CVariable* var = vars[k];
1005
1006         if (var->ts_target.isEmpty()
1007              || var->ts_target == CVariable::ts_target_attr::file || var->ts_target == CVariable::ts_target_attr::both)
1008           fileVars.push_back(var);
1009
1010         if (!var->ts_target.isEmpty()
1011              && (var->ts_target == CVariable::ts_target_attr::field || var->ts_target == CVariable::ts_target_attr::both))
1012           fieldVars.push_back(var);
1013       }
1014
1015       if (!file->timeseries.isEmpty() && file->timeseries != CFile::timeseries_attr::none)
1016       {
1017         StdString fileNameStr("%file_name%") ;
1018         StdString tsPrefix = !file->ts_prefix.isEmpty() ? file->ts_prefix : fileNameStr ;
1019         
1020         StdString fileName=file->getFileOutputName();
1021         size_t pos=tsPrefix.find(fileNameStr) ;
1022         while (pos!=std::string::npos)
1023         {
1024           tsPrefix=tsPrefix.replace(pos,fileNameStr.size(),fileName) ;
1025           pos=tsPrefix.find(fileNameStr) ;
1026         }
1027       
1028         const std::vector<CField*> allFields = file->getAllFields();
1029         for (size_t j = 0; j < allFields.size(); j++)
1030         {
1031           CField* field = allFields[j];
1032
1033           if (!field->ts_enabled.isEmpty() && field->ts_enabled)
1034           {
1035             CFile* tsFile = CFile::create();
1036             tsFile->duplicateAttributes(file);
1037
1038             // Add variables originating from file and targeted to timeserie file
1039             for (size_t k = 0; k < fileVars.size(); k++)
1040               tsFile->getVirtualVariableGroup()->addChild(fileVars[k]);
1041
1042           
1043             tsFile->name = tsPrefix + "_";
1044             if (!field->name.isEmpty())
1045               tsFile->name.get() += field->name;
1046             else if (field->hasDirectFieldReference()) // We cannot use getBaseFieldReference() just yet
1047               tsFile->name.get() += field->field_ref;
1048             else
1049               tsFile->name.get() += field->getId();
1050
1051             if (!field->ts_split_freq.isEmpty())
1052               tsFile->split_freq = field->ts_split_freq;
1053
1054             CField* tsField = tsFile->addField();
1055             tsField->field_ref = field->getId();
1056
1057             // Add variables originating from file and targeted to timeserie field
1058             for (size_t k = 0; k < fieldVars.size(); k++)
1059               tsField->getVirtualVariableGroup()->addChild(fieldVars[k]);
1060
1061             vars = field->getAllVariables();
1062             for (size_t k = 0; k < vars.size(); k++)
1063             {
1064               CVariable* var = vars[k];
1065
1066               // Add variables originating from field and targeted to timeserie field
1067               if (var->ts_target.isEmpty()
1068                    || var->ts_target == CVariable::ts_target_attr::field || var->ts_target == CVariable::ts_target_attr::both)
1069                 tsField->getVirtualVariableGroup()->addChild(var);
1070
1071               // Add variables originating from field and targeted to timeserie file
1072               if (!var->ts_target.isEmpty()
1073                    && (var->ts_target == CVariable::ts_target_attr::file || var->ts_target == CVariable::ts_target_attr::both))
1074                 tsFile->getVirtualVariableGroup()->addChild(var);
1075             }
1076
1077             tsFile->solveFieldRefInheritance(true);
1078
1079             if (file->timeseries == CFile::timeseries_attr::exclusive)
1080               field->enabled = false;
1081           }
1082         }
1083
1084         // Finally disable the original file is need be
1085         if (file->timeseries == CFile::timeseries_attr::only)
1086          file->enabled = false;
1087       }
1088     }
1089   }
1090
1091   //! Client side: Send information of reference grid of active fields
1092   void CContext::sendRefGrid()
1093   {
1094     std::set<StdString> gridIds;
1095     int sizeFile = this->enabledFiles.size();
1096     CFile* filePtr(NULL);
1097
1098     // Firstly, find all reference grids of all active fields
1099     for (int i = 0; i < sizeFile; ++i)
1100     {
1101       filePtr = this->enabledFiles[i];
1102       std::vector<CField*> enabledFields = filePtr->getEnabledFields();
1103       int sizeField = enabledFields.size();
1104       for (int numField = 0; numField < sizeField; ++numField)
1105       {
1106         if (0 != enabledFields[numField]->getRelGrid())
1107           gridIds.insert(CGrid::get(enabledFields[numField]->getRelGrid())->getId());
1108       }
1109     }
1110
1111     // Create all reference grids on server side
1112     StdString gridDefRoot("grid_definition");
1113     CGridGroup* gridPtr = CGridGroup::get(gridDefRoot);
1114     std::set<StdString>::const_iterator it, itE = gridIds.end();
1115     for (it = gridIds.begin(); it != itE; ++it)
1116     {
1117       gridPtr->sendCreateChild(*it);
1118       CGrid::get(*it)->sendAllAttributesToServer();
1119       CGrid::get(*it)->sendAllDomains();
1120       CGrid::get(*it)->sendAllAxis();
1121       CGrid::get(*it)->sendAllScalars();
1122     }
1123   }
1124
1125
1126   //! Client side: Send information of reference domain and axis of active fields
1127   void CContext::sendRefDomainsAxis()
1128   {
1129     std::set<StdString> domainIds, axisIds, scalarIds;
1130
1131     // Find all reference domain and axis of all active fields
1132     int numEnabledFiles = this->enabledFiles.size();
1133     for (int i = 0; i < numEnabledFiles; ++i)
1134     {
1135       std::vector<CField*> enabledFields = this->enabledFiles[i]->getEnabledFields();
1136       int numEnabledFields = enabledFields.size();
1137       for (int j = 0; j < numEnabledFields; ++j)
1138       {
1139         const std::vector<StdString>& prDomAxisScalarId = enabledFields[j]->getRefDomainAxisIds();
1140         if ("" != prDomAxisScalarId[0]) domainIds.insert(prDomAxisScalarId[0]);
1141         if ("" != prDomAxisScalarId[1]) axisIds.insert(prDomAxisScalarId[1]);
1142         if ("" != prDomAxisScalarId[2]) scalarIds.insert(prDomAxisScalarId[2]);
1143       }
1144     }
1145
1146     // Create all reference axis on server side
1147     std::set<StdString>::iterator itDom, itAxis, itScalar;
1148     std::set<StdString>::const_iterator itE;
1149
1150     StdString scalarDefRoot("scalar_definition");
1151     CScalarGroup* scalarPtr = CScalarGroup::get(scalarDefRoot);
1152     itE = scalarIds.end();
1153     for (itScalar = scalarIds.begin(); itScalar != itE; ++itScalar)
1154     {
1155       if (!itScalar->empty())
1156       {
1157         scalarPtr->sendCreateChild(*itScalar);
1158         CScalar::get(*itScalar)->sendAllAttributesToServer();
1159       }
1160     }
1161
1162     StdString axiDefRoot("axis_definition");
1163     CAxisGroup* axisPtr = CAxisGroup::get(axiDefRoot);
1164     itE = axisIds.end();
1165     for (itAxis = axisIds.begin(); itAxis != itE; ++itAxis)
1166     {
1167       if (!itAxis->empty())
1168       {
1169         axisPtr->sendCreateChild(*itAxis);
1170         CAxis::get(*itAxis)->sendAllAttributesToServer();
1171       }
1172     }
1173
1174     // Create all reference domains on server side
1175     StdString domDefRoot("domain_definition");
1176     CDomainGroup* domPtr = CDomainGroup::get(domDefRoot);
1177     itE = domainIds.end();
1178     for (itDom = domainIds.begin(); itDom != itE; ++itDom)
1179     {
1180       if (!itDom->empty()) {
1181          domPtr->sendCreateChild(*itDom);
1182          CDomain::get(*itDom)->sendAllAttributesToServer();
1183       }
1184     }
1185   }
1186
1187   //! Update calendar in each time step
1188   void CContext::updateCalendar(int step)
1189   {
1190      info(50) << "updateCalendar : before : " << calendar->getCurrentDate() << endl;
1191      calendar->update(step);
1192      info(50) << "updateCalendar : after : " << calendar->getCurrentDate() << endl;
1193#ifdef XIOS_MEMTRACK_LIGHT
1194      info(50) << " Current memory used by XIOS : "<<  MemTrack::getCurrentMemorySize()*1.0/(1024*1024)<<" Mbyte, at timestep "<<step<<" of context "<<this->getId()<<endl ;
1195#endif
1196      if (hasClient)
1197      {
1198        checkPrefetchingOfEnabledReadModeFiles();
1199        garbageCollector.invalidate(calendar->getCurrentDate());
1200      }
1201   }
1202
1203   //! Server side: Create header of netcdf file
1204   void CContext::createFileHeader(void )
1205   {
1206      vector<CFile*>::const_iterator it;
1207
1208      for (it=enabledFiles.begin(); it != enabledFiles.end(); it++)
1209      {
1210         (*it)->initFile();
1211      }
1212   }
1213
1214   //! Get current context
1215   CContext* CContext::getCurrent(void)
1216   {
1217     return CObjectFactory::GetObject<CContext>(CObjectFactory::GetCurrentContextId()).get();
1218   }
1219
1220   /*!
1221   \brief Set context with an id be the current context
1222   \param [in] id identity of context to be set to current
1223   */
1224   void CContext::setCurrent(const string& id)
1225   {
1226     CObjectFactory::SetCurrentContextId(id);
1227     CGroupFactory::SetCurrentContextId(id);
1228   }
1229
1230  /*!
1231  \brief Create a context with specific id
1232  \param [in] id identity of new context
1233  \return pointer to the new context or already-existed one with identity id
1234  */
1235  CContext* CContext::create(const StdString& id)
1236  {
1237    CContext::setCurrent(id);
1238
1239    bool hasctxt = CContext::has(id);
1240    CContext* context = CObjectFactory::CreateObject<CContext>(id).get();
1241    getRoot();
1242    if (!hasctxt) CGroupFactory::AddChild(root, context->getShared());
1243
1244#define DECLARE_NODE(Name_, name_) \
1245    C##Name_##Definition::create(C##Name_##Definition::GetDefName());
1246#define DECLARE_NODE_PAR(Name_, name_)
1247#include "node_type.conf"
1248
1249    return (context);
1250  }
1251
1252
1253
1254     //! Server side: Receive a message to do some post processing
1255  void CContext::recvRegistry(CEventServer& event)
1256  {
1257    CBufferIn* buffer=event.subEvents.begin()->buffer;
1258    string id;
1259    *buffer>>id;
1260    get(id)->recvRegistry(*buffer);
1261  }
1262
1263  void CContext::recvRegistry(CBufferIn& buffer)
1264  {
1265    if (server->intraCommRank==0)
1266    {
1267      CRegistry registry(server->intraComm) ;
1268      registry.fromBuffer(buffer) ;
1269      registryOut->mergeRegistry(registry) ;
1270    }
1271  }
1272
1273  void CContext::sendRegistry(void)
1274  {
1275    registryOut->hierarchicalGatherRegistry() ;
1276
1277    CEventClient event(CContext::GetType(), CContext::EVENT_ID_SEND_REGISTRY);
1278    if (client->isServerLeader())
1279    {
1280       CMessage msg ;
1281       msg<<this->getIdServer();
1282       if (client->clientRank==0) msg<<*registryOut ;
1283       const std::list<int>& ranks = client->getRanksServerLeader();
1284       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
1285         event.push(*itRank,1,msg);
1286       client->sendEvent(event);
1287     }
1288     else client->sendEvent(event);
1289  }
1290
1291} // namespace xios
Note: See TracBrowser for help on using the repository browser.