source: XIOS/trunk/src/node/context.cpp @ 1357

Last change on this file since 1357 was 1357, checked in by rlacroix, 6 years ago

Add some extra checks when switching to the next timestep.

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
File size: 44.3 KB
Line 
1#include "context.hpp"
2#include "attribute_template.hpp"
3#include "object_template.hpp"
4#include "group_template.hpp"
5
6#include "calendar_type.hpp"
7#include "duration.hpp"
8
9#include "context_client.hpp"
10#include "context_server.hpp"
11#include "nc4_data_output.hpp"
12#include "node_type.hpp"
13#include "message.hpp"
14#include "type.hpp"
15#include "xios_spl.hpp"
16#include "timer.hpp"
17#include "memtrack.hpp"
18
19
20namespace xios {
21
22  shared_ptr<CContextGroup> CContext::root;
23
24   /// ////////////////////// Définitions ////////////////////// ///
25
26   CContext::CContext(void)
27      : CObjectTemplate<CContext>(), CContextAttributes()
28      , calendar(), hasClient(false), hasServer(false), isPostProcessed(false), finalized(false)
29      , idServer_(), client(0), server(0)
30   { /* Ne rien faire de plus */ }
31
32   CContext::CContext(const StdString & id)
33      : CObjectTemplate<CContext>(id), CContextAttributes()
34      , calendar(), hasClient(false), hasServer(false), isPostProcessed(false), finalized(false)
35      , idServer_(), client(0), server(0)
36   { /* Ne rien faire de plus */ }
37
38   CContext::~CContext(void)
39   {
40     delete client;
41     delete server;
42   }
43
44   //----------------------------------------------------------------
45   //! Get name of context
46   StdString CContext::GetName(void)   { return (StdString("context")); }
47   StdString CContext::GetDefName(void){ return (CContext::GetName()); }
48   ENodeType CContext::GetType(void)   { return (eContext); }
49
50   //----------------------------------------------------------------
51
52   /*!
53   \brief Get context group (context root)
54   \return Context root
55   */
56   CContextGroup* CContext::getRoot(void)
57   {
58      if (root.get()==NULL) root=shared_ptr<CContextGroup>(new CContextGroup(xml::CXMLNode::GetRootName()));
59      return root.get();
60   }
61
62   //----------------------------------------------------------------
63
64   /*!
65   \brief Get calendar of a context
66   \return Calendar
67   */
68   boost::shared_ptr<CCalendar> CContext::getCalendar(void) const
69   {
70      return (this->calendar);
71   }
72
73   //----------------------------------------------------------------
74
75   /*!
76   \brief Set a context with a calendar
77   \param[in] newCalendar new calendar
78   */
79   void CContext::setCalendar(boost::shared_ptr<CCalendar> newCalendar)
80   {
81      this->calendar = newCalendar;
82   }
83
84   //----------------------------------------------------------------
85   /*!
86   \brief Parse xml file and write information into context object
87   \param [in] node xmld node corresponding in xml file
88   */
89   void CContext::parse(xml::CXMLNode & node)
90   {
91      CContext::SuperClass::parse(node);
92
93      // PARSING POUR GESTION DES ENFANTS
94      xml::THashAttributes attributes = node.getAttributes();
95
96      if (attributes.end() != attributes.find("src"))
97      {
98         StdIFStream ifs ( attributes["src"].c_str() , StdIFStream::in );
99         if ( (ifs.rdstate() & std::ifstream::failbit ) != 0 )
100            ERROR("void CContext::parse(xml::CXMLNode & node)",
101                  <<endl<< "Can not open <"<<attributes["src"].c_str()<<"> file" );
102         if (!ifs.good())
103            ERROR("CContext::parse(xml::CXMLNode & node)",
104                  << "[ filename = " << attributes["src"] << " ] Bad xml stream !");
105         xml::CXMLParser::ParseInclude(ifs, attributes["src"], *this);
106      }
107
108      if (node.getElementName().compare(CContext::GetName()))
109         DEBUG("Le noeud is wrong defined but will be considered as a context !");
110
111      if (!(node.goToChildElement()))
112      {
113         DEBUG("Le context ne contient pas d'enfant !");
114      }
115      else
116      {
117         do { // Parcours des contextes pour traitement.
118
119            StdString name = node.getElementName();
120            attributes.clear();
121            attributes = node.getAttributes();
122
123            if (attributes.end() != attributes.find("id"))
124            {
125              DEBUG(<< "Definition node has an id,"
126                    << "it will not be taking account !");
127            }
128
129#define DECLARE_NODE(Name_, name_)    \
130   if (name.compare(C##Name_##Definition::GetDefName()) == 0) \
131   { C##Name_##Definition::create(C##Name_##Definition::GetDefName()) -> parse(node); continue; }
132#define DECLARE_NODE_PAR(Name_, name_)
133#include "node_type.conf"
134
135            DEBUG(<< "The element \'"     << name
136                  << "\' in the context \'" << CContext::getCurrent()->getId()
137                  << "\' is not a definition !");
138
139         } while (node.goToNextElement());
140
141         node.goToParentElement(); // Retour au parent
142      }
143   }
144
145   //----------------------------------------------------------------
146   //! Show tree structure of context
147   void CContext::ShowTree(StdOStream & out)
148   {
149      StdString currentContextId = CContext::getCurrent() -> getId();
150      std::vector<CContext*> def_vector =
151         CContext::getRoot()->getChildList();
152      std::vector<CContext*>::iterator
153         it = def_vector.begin(), end = def_vector.end();
154
155      out << "<? xml version=\"1.0\" ?>" << std::endl;
156      out << "<"  << xml::CXMLNode::GetRootName() << " >" << std::endl;
157
158      for (; it != end; it++)
159      {
160         CContext* context = *it;
161         CContext::setCurrent(context->getId());
162         out << *context << std::endl;
163      }
164
165      out << "</" << xml::CXMLNode::GetRootName() << " >" << std::endl;
166      CContext::setCurrent(currentContextId);
167   }
168
169
170   //----------------------------------------------------------------
171
172   //! Convert context object into string (to print)
173   StdString CContext::toString(void) const
174   {
175      StdOStringStream oss;
176      oss << "<" << CContext::GetName()
177          << " id=\"" << this->getId() << "\" "
178          << SuperClassAttribute::toString() << ">" << std::endl;
179      if (!this->hasChild())
180      {
181         //oss << "<!-- No definition -->" << std::endl; // fait planter l'incrémentation
182      }
183      else
184      {
185
186#define DECLARE_NODE(Name_, name_)    \
187   if (C##Name_##Definition::has(C##Name_##Definition::GetDefName())) \
188   oss << * C##Name_##Definition::get(C##Name_##Definition::GetDefName()) << std::endl;
189#define DECLARE_NODE_PAR(Name_, name_)
190#include "node_type.conf"
191
192      }
193
194      oss << "</" << CContext::GetName() << " >";
195
196      return (oss.str());
197   }
198
199   //----------------------------------------------------------------
200
201   /*!
202   \brief Find all inheritace among objects in a context.
203   \param [in] apply (true) write attributes of parent into ones of child if they are empty
204                     (false) write attributes of parent into a new container of child
205   \param [in] parent unused
206   */
207   void CContext::solveDescInheritance(bool apply, const CAttributeMap * const UNUSED(parent))
208   {
209#define DECLARE_NODE(Name_, name_)    \
210   if (C##Name_##Definition::has(C##Name_##Definition::GetDefName())) \
211     C##Name_##Definition::get(C##Name_##Definition::GetDefName())->solveDescInheritance(apply);
212#define DECLARE_NODE_PAR(Name_, name_)
213#include "node_type.conf"
214   }
215
216   //----------------------------------------------------------------
217
218   //! Verify if all root definition in the context have child.
219   bool CContext::hasChild(void) const
220   {
221      return (
222#define DECLARE_NODE(Name_, name_)    \
223   C##Name_##Definition::has(C##Name_##Definition::GetDefName())   ||
224#define DECLARE_NODE_PAR(Name_, name_)
225#include "node_type.conf"
226      false);
227}
228
229   //----------------------------------------------------------------
230
231   void CContext::CleanTree(void)
232   {
233#define DECLARE_NODE(Name_, name_) C##Name_##Definition::ClearAllAttributes();
234#define DECLARE_NODE_PAR(Name_, name_)
235#include "node_type.conf"
236   }
237   ///---------------------------------------------------------------
238
239   //! Initialize client side
240   void CContext::initClient(MPI_Comm intraComm, MPI_Comm interComm, CContext* cxtServer /*= 0*/)
241   {
242     hasClient=true;
243     client = new CContextClient(this,intraComm, interComm, cxtServer);
244     registryIn=new CRegistry(intraComm);
245     registryIn->setPath(getId()) ;
246     if (client->clientRank==0) registryIn->fromFile("xios_registry.bin") ;
247     registryIn->bcastRegistry() ;
248
249     registryOut=new CRegistry(intraComm) ;
250     registryOut->setPath(getId()) ;
251
252     MPI_Comm intraCommServer, interCommServer;
253     if (cxtServer) // Attached mode
254     {
255       intraCommServer = intraComm;
256       interCommServer = interComm;
257     }
258     else
259     {
260       MPI_Comm_dup(intraComm, &intraCommServer);
261       comms.push_back(intraCommServer);
262       MPI_Comm_dup(interComm, &interCommServer);
263       comms.push_back(interCommServer);
264     }
265     server = new CContextServer(this,intraCommServer,interCommServer);
266   }
267
268   void CContext::setClientServerBuffer()
269   {
270     // Estimated minimum event size for small events (10 is an arbitrary constant just for safety)
271     const size_t minEventSize = CEventClient::headerSize + getIdServer().size() + 10 * sizeof(int);
272     // Ensure there is at least some room for 20 of such events in the buffers
273     size_t minBufferSize = std::max(CXios::minBufferSize, 20 * minEventSize);
274#define DECLARE_NODE(Name_, name_)    \
275     if (minBufferSize < sizeof(C##Name_##Definition)) minBufferSize = sizeof(C##Name_##Definition);
276#define DECLARE_NODE_PAR(Name_, name_)
277#include "node_type.conf"
278#undef DECLARE_NODE
279#undef DECLARE_NODE_PAR
280
281     // Compute the buffer sizes needed to send the attributes and data corresponding to fields
282     std::map<int, StdSize> maxEventSize;
283     std::map<int, StdSize> bufferSize = getAttributesBufferSize(maxEventSize);
284     std::map<int, StdSize> dataBufferSize = getDataBufferSize(maxEventSize);
285
286     std::map<int, StdSize>::iterator it, ite = dataBufferSize.end();
287     for (it = dataBufferSize.begin(); it != ite; ++it)
288       if (it->second > bufferSize[it->first]) bufferSize[it->first] = it->second;
289
290     // Apply the buffer size factor and check that we are above the minimum buffer size
291     ite = bufferSize.end();
292     for (it = bufferSize.begin(); it != ite; ++it)
293     {
294       it->second *= CXios::bufferSizeFactor;
295       if (it->second < minBufferSize) it->second = minBufferSize;
296     }
297
298     // Leaders will have to send some control events so ensure there is some room for those in the buffers
299     if (client->isServerLeader())
300     {
301       const std::list<int>& ranks = client->getRanksServerLeader();
302       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
303       {
304         if (!bufferSize.count(*itRank))
305         {
306           bufferSize[*itRank] = minBufferSize;
307           maxEventSize[*itRank] = minEventSize;
308         }
309       }
310     }
311
312     client->setBufferSize(bufferSize, maxEventSize);
313   }
314
315   //! Verify whether a context is initialized
316   bool CContext::isInitialized(void)
317   {
318     return hasClient;
319   }
320
321   //! Initialize server
322   void CContext::initServer(MPI_Comm intraComm,MPI_Comm interComm, CContext* cxtClient /*= 0*/)
323   {
324     hasServer=true;
325     server = new CContextServer(this,intraComm,interComm);
326
327     registryIn=new CRegistry(intraComm);
328     registryIn->setPath(getId()) ;
329     if (server->intraCommRank==0) registryIn->fromFile("xios_registry.bin") ;
330     registryIn->bcastRegistry() ;
331     registryOut=new CRegistry(intraComm) ;
332     registryOut->setPath(getId()) ;
333
334     MPI_Comm intraCommClient, interCommClient;
335     if (cxtClient) // Attached mode
336     {
337       intraCommClient = intraComm;
338       interCommClient = interComm;
339     }
340     else
341     {
342       MPI_Comm_dup(intraComm, &intraCommClient);
343       comms.push_back(intraCommClient);
344       MPI_Comm_dup(interComm, &interCommClient);
345       comms.push_back(interCommClient);
346     }
347     client = new CContextClient(this,intraCommClient,interCommClient, cxtClient);
348   }
349
350   //! Try to send the buffers and receive possible answers
351   bool CContext::checkBuffersAndListen(void)
352   {
353     client->checkBuffers();
354
355     bool hasTmpBufferedEvent = client->hasTemporarilyBufferedEvent();
356     if (hasTmpBufferedEvent)
357       hasTmpBufferedEvent = !client->sendTemporarilyBufferedEvent();
358
359     // Don't process events if there is a temporarily buffered event
360     return server->eventLoop(!hasTmpBufferedEvent);
361   }
362
363   //! Terminate a context
364   void CContext::finalize(void)
365   {
366      if (!finalized)
367      {
368        finalized = true;
369        if (hasClient) sendRegistry() ;
370        client->finalize();
371        while (!server->hasFinished())
372        {
373          server->eventLoop();
374        }
375
376        if (hasServer)
377        {
378          closeAllFile();
379          registryOut->hierarchicalGatherRegistry() ;
380          if (server->intraCommRank==0) CXios::globalRegistry->mergeRegistry(*registryOut) ;
381        }
382
383        for (std::list<MPI_Comm>::iterator it = comms.begin(); it != comms.end(); ++it)
384          MPI_Comm_free(&(*it));
385        comms.clear();
386      }
387   }
388
389   /*!
390   \brief Close all the context defintion and do processing data
391      After everything is well defined on client side, they will be processed and sent to server
392   From the version 2.0, sever and client work no more on the same database. Moreover, client(s) will send
393   all necessary information to server, from which each server can build its own database.
394   Because the role of server is to write out field data on a specific netcdf file,
395   the only information that it needs is the enabled files
396   and the active fields (fields will be written onto active files)
397   */
398   void CContext::closeDefinition(void)
399   {
400     CTimer::get("Context : close definition").resume() ;
401     // There is nothing client need to send to server
402     if (hasClient)
403     {
404       // After xml is parsed, there are some more works with post processing
405       postProcessing();
406     }
407     setClientServerBuffer();
408
409     if (hasClient && !hasServer)
410     {
411      // Send all attributes of current context to server
412      this->sendAllAttributesToServer();
413
414      // Send all attributes of current calendar
415      CCalendarWrapper::get(CCalendarWrapper::GetDefName())->sendAllAttributesToServer();
416
417      // We have enough information to send to server
418      // First of all, send all enabled files
419       sendEnabledFiles();
420
421      // Then, send all enabled fields
422       sendEnabledFields();
423
424      // At last, we have all info of domain and axis, then send them
425       sendRefDomainsAxis();
426
427      // After that, send all grid (if any)
428       sendRefGrid();
429    }
430
431    // We have a xml tree on the server side and now, it should be also processed
432    if (hasClient && !hasServer) sendPostProcessing();
433
434    // There are some processings that should be done after all of above. For example: check mask or index
435    if (hasClient)
436    {
437      this->buildFilterGraphOfEnabledFields();
438      buildFilterGraphOfFieldsWithReadAccess();
439      this->solveAllRefOfEnabledFields(true);
440    }
441
442    // Now tell server that it can process all messages from client
443    if (hasClient && !hasServer) this->sendCloseDefinition();
444
445    // Nettoyage de l'arborescence
446    if (hasClient && !hasServer) CleanTree(); // Only on client side??
447
448    if (hasClient)
449    {
450      sendCreateFileHeader();
451
452      startPrefetchingOfEnabledReadModeFiles();
453    }
454    CTimer::get("Context : close definition").suspend() ;
455   }
456
457   void CContext::findAllEnabledFields(void)
458   {
459     for (unsigned int i = 0; i < this->enabledFiles.size(); i++)
460     (void)this->enabledFiles[i]->getEnabledFields();
461   }
462
463   void CContext::findAllEnabledFieldsInReadModeFiles(void)
464   {
465     for (unsigned int i = 0; i < this->enabledReadModeFiles.size(); ++i)
466     (void)this->enabledReadModeFiles[i]->getEnabledFields();
467   }
468
469   void CContext::readAttributesOfEnabledFieldsInReadModeFiles()
470   {
471      for (unsigned int i = 0; i < this->enabledReadModeFiles.size(); ++i)
472        (void)this->enabledReadModeFiles[i]->readAttributesOfEnabledFieldsInReadMode();
473   }
474
475   void CContext::solveOnlyRefOfEnabledFields(bool sendToServer)
476   {
477     int size = this->enabledFiles.size();
478     for (int i = 0; i < size; ++i)
479     {
480       this->enabledFiles[i]->solveOnlyRefOfEnabledFields(sendToServer);
481     }
482
483     for (int i = 0; i < size; ++i)
484     {
485       this->enabledFiles[i]->generateNewTransformationGridDest();
486     }
487   }
488
489   void CContext::solveAllRefOfEnabledFields(bool sendToServer)
490   {
491     int size = this->enabledFiles.size();
492     for (int i = 0; i < size; ++i)
493     {
494       this->enabledFiles[i]->solveAllRefOfEnabledFields(sendToServer);
495     }
496   }
497
498   void CContext::buildFilterGraphOfEnabledFields()
499   {
500     int size = this->enabledFiles.size();
501     for (int i = 0; i < size; ++i)
502     {
503       this->enabledFiles[i]->buildFilterGraphOfEnabledFields(garbageCollector);
504     }
505   }
506
507   void CContext::startPrefetchingOfEnabledReadModeFiles()
508   {
509     int size = enabledReadModeFiles.size();
510     for (int i = 0; i < size; ++i)
511     {
512        enabledReadModeFiles[i]->prefetchEnabledReadModeFields();
513     }
514   }
515
516   void CContext::doPostTimestepOperationsForEnabledReadModeFiles()
517   {
518     int size = enabledReadModeFiles.size();
519     for (int i = 0; i < size; ++i)
520     {
521        enabledReadModeFiles[i]->doPostTimestepOperationsForEnabledReadModeFields();
522     }
523   }
524
525  void CContext::findFieldsWithReadAccess(void)
526  {
527    fieldsWithReadAccess.clear();
528    const vector<CField*> allFields = CField::getAll();
529    for (size_t i = 0; i < allFields.size(); ++i)
530    {
531      CField* field = allFields[i];
532
533      if (field->file && !field->file->mode.isEmpty() && field->file->mode == CFile::mode_attr::read)
534        field->read_access = true;
535      else if (!field->read_access.isEmpty() && field->read_access && (field->enabled.isEmpty() || field->enabled))
536        fieldsWithReadAccess.push_back(field);
537    }
538  }
539
540  void CContext::solveAllRefOfFieldsWithReadAccess()
541  {
542    for (size_t i = 0; i < fieldsWithReadAccess.size(); ++i)
543      fieldsWithReadAccess[i]->solveAllReferenceEnabledField(false);
544  }
545
546  void CContext::buildFilterGraphOfFieldsWithReadAccess()
547  {
548    for (size_t i = 0; i < fieldsWithReadAccess.size(); ++i)
549      fieldsWithReadAccess[i]->buildFilterGraph(garbageCollector, true);
550  }
551
552   void CContext::solveAllInheritance(bool apply)
553   {
554     // Résolution des héritages descendants (càd des héritages de groupes)
555     // pour chacun des contextes.
556      solveDescInheritance(apply);
557
558     // Résolution des héritages par référence au niveau des fichiers.
559      const vector<CFile*> allFiles=CFile::getAll();
560      const vector<CGrid*> allGrids= CGrid::getAll();
561
562     //if (hasClient && !hasServer)
563      if (hasClient)
564      {
565        for (unsigned int i = 0; i < allFiles.size(); i++)
566          allFiles[i]->solveFieldRefInheritance(apply);
567      }
568
569      unsigned int vecSize = allGrids.size();
570      unsigned int i = 0;
571      for (i = 0; i < vecSize; ++i)
572        allGrids[i]->solveDomainAxisRefInheritance(apply);
573
574   }
575
576   void CContext::findEnabledFiles(void)
577   {
578      const std::vector<CFile*> allFiles = CFile::getAll();
579      const CDate& initDate = calendar->getInitDate();
580
581      for (unsigned int i = 0; i < allFiles.size(); i++)
582         if (!allFiles[i]->enabled.isEmpty()) // Si l'attribut 'enabled' est défini.
583         {
584            if (allFiles[i]->enabled.getValue()) // Si l'attribut 'enabled' est fixé à vrai.
585            {
586              if ((initDate + allFiles[i]->output_freq.getValue()) < (initDate + this->getCalendar()->getTimeStep()))
587              {
588                error(0)<<"WARNING: void CContext::findEnabledFiles()"<<endl
589                    << "Output frequency in file \""<<allFiles[i]->getFileOutputName()
590                    <<"\" is less than the time step. File will not be written."<<endl;
591              }
592              else
593               enabledFiles.push_back(allFiles[i]);
594            }
595         }
596         else
597         {
598           if ( (initDate + allFiles[i]->output_freq.getValue()) < (initDate + this->getCalendar()->getTimeStep()))
599           {
600             error(0)<<"WARNING: void CContext::findEnabledFiles()"<<endl
601                 << "Output frequency in file \""<<allFiles[i]->getFileOutputName()
602                 <<"\" is less than the time step. File will not be written."<<endl;
603           }
604           else
605             enabledFiles.push_back(allFiles[i]); // otherwise true by default
606         }
607
608      if (enabledFiles.size() == 0)
609         DEBUG(<<"Aucun fichier ne va être sorti dans le contexte nommé \""
610               << getId() << "\" !");
611   }
612
613   void CContext::findEnabledReadModeFiles(void)
614   {
615     int size = this->enabledFiles.size();
616     for (int i = 0; i < size; ++i)
617     {
618       if (!enabledFiles[i]->mode.isEmpty() && enabledFiles[i]->mode.getValue() == CFile::mode_attr::read)
619        enabledReadModeFiles.push_back(enabledFiles[i]);
620     }
621   }
622
623   void CContext::closeAllFile(void)
624   {
625     std::vector<CFile*>::const_iterator
626            it = this->enabledFiles.begin(), end = this->enabledFiles.end();
627
628     for (; it != end; it++)
629     {
630       info(30)<<"Closing File : "<<(*it)->getId()<<endl;
631       (*it)->close();
632     }
633   }
634
635   /*!
636   \brief Dispatch event received from client
637      Whenever a message is received in buffer of server, it will be processed depending on
638   its event type. A new event type should be added in the switch list to make sure
639   it processed on server side.
640   \param [in] event: Received message
641   */
642   bool CContext::dispatchEvent(CEventServer& event)
643   {
644
645      if (SuperClass::dispatchEvent(event)) return true;
646      else
647      {
648        switch(event.type)
649        {
650           case EVENT_ID_CLOSE_DEFINITION :
651             recvCloseDefinition(event);
652             return true;
653             break;
654           case EVENT_ID_UPDATE_CALENDAR:
655             recvUpdateCalendar(event);
656             return true;
657             break;
658           case EVENT_ID_CREATE_FILE_HEADER :
659             recvCreateFileHeader(event);
660             return true;
661             break;
662           case EVENT_ID_POST_PROCESS:
663             recvPostProcessing(event);
664             return true;
665            case EVENT_ID_SEND_REGISTRY:
666             recvRegistry(event);
667             return true;
668            break;
669
670           default :
671             ERROR("bool CContext::dispatchEvent(CEventServer& event)",
672                    <<"Unknown Event");
673           return false;
674         }
675      }
676   }
677
678   //! Client side: Send a message to server to make it close
679   void CContext::sendCloseDefinition(void)
680   {
681     CEventClient event(getType(),EVENT_ID_CLOSE_DEFINITION);
682     if (client->isServerLeader())
683     {
684       CMessage msg;
685       msg<<this->getIdServer();
686       const std::list<int>& ranks = client->getRanksServerLeader();
687       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
688         event.push(*itRank,1,msg);
689       client->sendEvent(event);
690     }
691     else client->sendEvent(event);
692   }
693
694   //! Server side: Receive a message of client announcing a context close
695   void CContext::recvCloseDefinition(CEventServer& event)
696   {
697
698      CBufferIn* buffer=event.subEvents.begin()->buffer;
699      string id;
700      *buffer>>id;
701      get(id)->closeDefinition();
702   }
703
704   //! Client side: Send a message to update calendar in each time step
705   void CContext::sendUpdateCalendar(int step)
706   {
707     if (!hasServer)
708     {
709       CEventClient event(getType(),EVENT_ID_UPDATE_CALENDAR);
710       if (client->isServerLeader())
711       {
712         CMessage msg;
713         msg<<this->getIdServer()<<step;
714         const std::list<int>& ranks = client->getRanksServerLeader();
715         for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
716           event.push(*itRank,1,msg);
717         client->sendEvent(event);
718       }
719       else client->sendEvent(event);
720     }
721   }
722
723   //! Server side: Receive a message of client annoucing calendar update
724   void CContext::recvUpdateCalendar(CEventServer& event)
725   {
726      CBufferIn* buffer=event.subEvents.begin()->buffer;
727      string id;
728      *buffer>>id;
729      get(id)->recvUpdateCalendar(*buffer);
730   }
731
732   //! Server side: Receive a message of client annoucing calendar update
733   void CContext::recvUpdateCalendar(CBufferIn& buffer)
734   {
735      int step;
736      buffer>>step;
737      updateCalendar(step);
738   }
739
740   //! Client side: Send a message to create header part of netcdf file
741   void CContext::sendCreateFileHeader(void)
742   {
743     CEventClient event(getType(),EVENT_ID_CREATE_FILE_HEADER);
744     if (client->isServerLeader())
745     {
746       CMessage msg;
747       msg<<this->getIdServer();
748       const std::list<int>& ranks = client->getRanksServerLeader();
749       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
750         event.push(*itRank,1,msg) ;
751       client->sendEvent(event);
752     }
753     else client->sendEvent(event);
754   }
755
756   //! Server side: Receive a message of client annoucing the creation of header part of netcdf file
757   void CContext::recvCreateFileHeader(CEventServer& event)
758   {
759      CBufferIn* buffer=event.subEvents.begin()->buffer;
760      string id;
761      *buffer>>id;
762      get(id)->recvCreateFileHeader(*buffer);
763   }
764
765   //! Server side: Receive a message of client annoucing the creation of header part of netcdf file
766   void CContext::recvCreateFileHeader(CBufferIn& buffer)
767   {
768      createFileHeader();
769   }
770
771   //! Client side: Send a message to do some post processing on server
772   void CContext::sendPostProcessing()
773   {
774     if (!hasServer)
775     {
776       CEventClient event(getType(),EVENT_ID_POST_PROCESS);
777       if (client->isServerLeader())
778       {
779         CMessage msg;
780         msg<<this->getIdServer();
781         const std::list<int>& ranks = client->getRanksServerLeader();
782         for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
783           event.push(*itRank,1,msg);
784         client->sendEvent(event);
785       }
786       else client->sendEvent(event);
787     }
788   }
789
790   //! Server side: Receive a message to do some post processing
791   void CContext::recvPostProcessing(CEventServer& event)
792   {
793      CBufferIn* buffer=event.subEvents.begin()->buffer;
794      string id;
795      *buffer>>id;
796      get(id)->recvPostProcessing(*buffer);
797   }
798
799   //! Server side: Receive a message to do some post processing
800   void CContext::recvPostProcessing(CBufferIn& buffer)
801   {
802      CCalendarWrapper::get(CCalendarWrapper::GetDefName())->createCalendar();
803      postProcessing();
804   }
805
806   const StdString& CContext::getIdServer()
807   {
808      if (hasClient)
809      {
810        idServer_ = this->getId();
811        idServer_ += "_server";
812        return idServer_;
813      }
814      if (hasServer) return (this->getId());
815   }
816
817   /*!
818   \brief Do some simple post processings after parsing xml file
819      After the xml file (iodef.xml) is parsed, it is necessary to build all relations among
820   created object, e.g: inhertance among fields, domain, axis. After that, all fiels as well as their parents (reference fields),
821   which will be written out into netcdf files, are processed
822   */
823   void CContext::postProcessing()
824   {
825     if (isPostProcessed) return;
826
827      // Make sure the calendar was correctly created
828      if (!calendar)
829        ERROR("CContext::postProcessing()", << "A calendar must be defined for the context \"" << getId() << "!\"")
830      else if (calendar->getTimeStep() == NoneDu)
831        ERROR("CContext::postProcessing()", << "A timestep must be defined for the context \"" << getId() << "!\"")
832      // Calendar first update to set the current date equals to the start date
833      calendar->update(0);
834
835      // Find all inheritance in xml structure
836      this->solveAllInheritance();
837
838      // Check if some axis, domains or grids are eligible to for compressed indexed output.
839      // Warning: This must be done after solving the inheritance and before the rest of post-processing
840      checkAxisDomainsGridsEligibilityForCompressedOutput();
841
842      // Check if some automatic time series should be generated
843      // Warning: This must be done after solving the inheritance and before the rest of post-processing
844      prepareTimeseries();
845
846      //Initialisation du vecteur 'enabledFiles' contenant la liste des fichiers à sortir.
847      this->findEnabledFiles();
848      this->findEnabledReadModeFiles();
849
850      // Find all enabled fields of each file
851      this->findAllEnabledFields();
852      this->findAllEnabledFieldsInReadModeFiles();
853
854     if (hasClient && !hasServer)
855     {
856      // Try to read attributes of fields in file then fill in corresponding grid (or domain, axis)
857      this->readAttributesOfEnabledFieldsInReadModeFiles();
858     }
859
860      // Only search and rebuild all reference objects of enable fields, don't transform
861      this->solveOnlyRefOfEnabledFields(false);
862
863      // Search and rebuild all reference object of enabled fields
864      this->solveAllRefOfEnabledFields(false);
865
866      // Find all fields with read access from the public API
867      findFieldsWithReadAccess();
868      // and solve the all reference for them
869      solveAllRefOfFieldsWithReadAccess();
870
871      isPostProcessed = true;
872   }
873
874   /*!
875    * Compute the required buffer size to send the attributes (mostly those grid related).
876    *
877    * \param maxEventSize [in/out] the size of the bigger event for each connected server
878    */
879   std::map<int, StdSize> CContext::getAttributesBufferSize(std::map<int, StdSize>& maxEventSize)
880   {
881     std::map<int, StdSize> attributesSize;
882
883     if (hasClient)
884     {
885       size_t numEnabledFiles = this->enabledFiles.size();
886       for (size_t i = 0; i < numEnabledFiles; ++i)
887       {
888         CFile* file = this->enabledFiles[i];
889
890         std::vector<CField*> enabledFields = file->getEnabledFields();
891         size_t numEnabledFields = enabledFields.size();
892         for (size_t j = 0; j < numEnabledFields; ++j)
893         {
894           const std::map<int, StdSize> mapSize = enabledFields[j]->getGridAttributesBufferSize();
895           std::map<int, StdSize>::const_iterator it = mapSize.begin(), itE = mapSize.end();
896           for (; it != itE; ++it)
897           {
898             // If attributesSize[it->first] does not exist, it will be zero-initialized
899             // so we can use it safely without checking for its existance
900             if (attributesSize[it->first] < it->second)
901               attributesSize[it->first] = it->second;
902
903             if (maxEventSize[it->first] < it->second)
904               maxEventSize[it->first] = it->second;
905           }
906         }
907       }
908     }
909
910     return attributesSize;
911   }
912
913   /*!
914    * Compute the required buffer size to send the fields data.
915    *
916    * \param maxEventSize [in/out] the size of the bigger event for each connected server
917    */
918   std::map<int, StdSize> CContext::getDataBufferSize(std::map<int, StdSize>& maxEventSize)
919   {
920     CFile::mode_attr::t_enum mode = hasClient ? CFile::mode_attr::write : CFile::mode_attr::read;
921
922     std::map<int, StdSize> dataSize;
923
924     // Find all reference domain and axis of all active fields
925     size_t numEnabledFiles = this->enabledFiles.size();
926     for (size_t i = 0; i < numEnabledFiles; ++i)
927     {
928       CFile* file = this->enabledFiles[i];
929       CFile::mode_attr::t_enum fileMode = file->mode.isEmpty() ? CFile::mode_attr::write : file->mode.getValue();
930
931       if (fileMode == mode)
932       {
933         std::vector<CField*> enabledFields = file->getEnabledFields();
934         size_t numEnabledFields = enabledFields.size();
935         for (size_t j = 0; j < numEnabledFields; ++j)
936         {
937           const std::map<int, StdSize> mapSize = enabledFields[j]->getGridDataBufferSize();
938           std::map<int, StdSize>::const_iterator it = mapSize.begin(), itE = mapSize.end();
939           for (; it != itE; ++it)
940           {
941             // If dataSize[it->first] does not exist, it will be zero-initialized
942             // so we can use it safely without checking for its existance
943             if (CXios::isOptPerformance)
944               dataSize[it->first] += it->second;
945             else if (dataSize[it->first] < it->second)
946               dataSize[it->first] = it->second;
947
948             if (maxEventSize[it->first] < it->second)
949               maxEventSize[it->first] = it->second;
950           }
951         }
952       }
953     }
954
955     return dataSize;
956   }
957
958   //! Client side: Send infomation of active files (files are enabled to write out)
959   void CContext::sendEnabledFiles()
960   {
961     int size = this->enabledFiles.size();
962
963     // In a context, each type has a root definition, e.g: axis, domain, field.
964     // Every object must be a child of one of these root definition. In this case
965     // all new file objects created on server must be children of the root "file_definition"
966     StdString fileDefRoot("file_definition");
967     CFileGroup* cfgrpPtr = CFileGroup::get(fileDefRoot);
968
969     for (int i = 0; i < size; ++i)
970     {
971       cfgrpPtr->sendCreateChild(this->enabledFiles[i]->getId());
972       this->enabledFiles[i]->sendAllAttributesToServer();
973       this->enabledFiles[i]->sendAddAllVariables();
974     }
975   }
976
977   //! Client side: Send information of active fields (ones are written onto files)
978   void CContext::sendEnabledFields()
979   {
980     int size = this->enabledFiles.size();
981     for (int i = 0; i < size; ++i)
982     {
983       this->enabledFiles[i]->sendEnabledFields();
984     }
985   }
986
987   //! Client side: Check if the defined axis, domains and grids are eligible for compressed indexed output
988   void CContext::checkAxisDomainsGridsEligibilityForCompressedOutput()
989   {
990     if (!hasClient) return;
991
992     const vector<CAxis*> allAxis = CAxis::getAll();
993     for (vector<CAxis*>::const_iterator it = allAxis.begin(); it != allAxis.end(); it++)
994       (*it)->checkEligibilityForCompressedOutput();
995
996     const vector<CDomain*> allDomains = CDomain::getAll();
997     for (vector<CDomain*>::const_iterator it = allDomains.begin(); it != allDomains.end(); it++)
998       (*it)->checkEligibilityForCompressedOutput();
999
1000     const vector<CGrid*> allGrids = CGrid::getAll();
1001     for (vector<CGrid*>::const_iterator it = allGrids.begin(); it != allGrids.end(); it++)
1002       (*it)->checkEligibilityForCompressedOutput();
1003   }
1004
1005   //! Client side: Prepare the timeseries by adding the necessary files
1006   void CContext::prepareTimeseries()
1007   {
1008     if (!hasClient) return;
1009
1010     const std::vector<CFile*> allFiles = CFile::getAll();
1011     for (size_t i = 0; i < allFiles.size(); i++)
1012     {
1013       CFile* file = allFiles[i];
1014
1015       std::vector<CVariable*> fileVars, fieldVars, vars = file->getAllVariables();
1016       for (size_t k = 0; k < vars.size(); k++)
1017       {
1018         CVariable* var = vars[k];
1019
1020         if (var->ts_target.isEmpty()
1021              || var->ts_target == CVariable::ts_target_attr::file || var->ts_target == CVariable::ts_target_attr::both)
1022           fileVars.push_back(var);
1023
1024         if (!var->ts_target.isEmpty()
1025              && (var->ts_target == CVariable::ts_target_attr::field || var->ts_target == CVariable::ts_target_attr::both))
1026           fieldVars.push_back(var);
1027       }
1028
1029       if (!file->timeseries.isEmpty() && file->timeseries != CFile::timeseries_attr::none)
1030       {
1031         StdString fileNameStr("%file_name%") ;
1032         StdString tsPrefix = !file->ts_prefix.isEmpty() ? file->ts_prefix : fileNameStr ;
1033         
1034         StdString fileName=file->getFileOutputName();
1035         size_t pos=tsPrefix.find(fileNameStr) ;
1036         while (pos!=std::string::npos)
1037         {
1038           tsPrefix=tsPrefix.replace(pos,fileNameStr.size(),fileName) ;
1039           pos=tsPrefix.find(fileNameStr) ;
1040         }
1041       
1042         const std::vector<CField*> allFields = file->getAllFields();
1043         for (size_t j = 0; j < allFields.size(); j++)
1044         {
1045           CField* field = allFields[j];
1046
1047           if (!field->ts_enabled.isEmpty() && field->ts_enabled)
1048           {
1049             CFile* tsFile = CFile::create();
1050             tsFile->duplicateAttributes(file);
1051
1052             // Add variables originating from file and targeted to timeserie file
1053             for (size_t k = 0; k < fileVars.size(); k++)
1054               tsFile->getVirtualVariableGroup()->addChild(fileVars[k]);
1055
1056           
1057             tsFile->name = tsPrefix + "_";
1058             if (!field->name.isEmpty())
1059               tsFile->name.get() += field->name;
1060             else if (field->hasDirectFieldReference()) // We cannot use getBaseFieldReference() just yet
1061               tsFile->name.get() += field->field_ref;
1062             else
1063               tsFile->name.get() += field->getId();
1064
1065             if (!field->ts_split_freq.isEmpty())
1066               tsFile->split_freq = field->ts_split_freq;
1067
1068             CField* tsField = tsFile->addField();
1069             tsField->field_ref = field->getId();
1070
1071             // Add variables originating from file and targeted to timeserie field
1072             for (size_t k = 0; k < fieldVars.size(); k++)
1073               tsField->getVirtualVariableGroup()->addChild(fieldVars[k]);
1074
1075             vars = field->getAllVariables();
1076             for (size_t k = 0; k < vars.size(); k++)
1077             {
1078               CVariable* var = vars[k];
1079
1080               // Add variables originating from field and targeted to timeserie field
1081               if (var->ts_target.isEmpty()
1082                    || var->ts_target == CVariable::ts_target_attr::field || var->ts_target == CVariable::ts_target_attr::both)
1083                 tsField->getVirtualVariableGroup()->addChild(var);
1084
1085               // Add variables originating from field and targeted to timeserie file
1086               if (!var->ts_target.isEmpty()
1087                    && (var->ts_target == CVariable::ts_target_attr::file || var->ts_target == CVariable::ts_target_attr::both))
1088                 tsFile->getVirtualVariableGroup()->addChild(var);
1089             }
1090
1091             tsFile->solveFieldRefInheritance(true);
1092
1093             if (file->timeseries == CFile::timeseries_attr::exclusive)
1094               field->enabled = false;
1095           }
1096         }
1097
1098         // Finally disable the original file is need be
1099         if (file->timeseries == CFile::timeseries_attr::only)
1100          file->enabled = false;
1101       }
1102     }
1103   }
1104
1105   //! Client side: Send information of reference grid of active fields
1106   void CContext::sendRefGrid()
1107   {
1108     std::set<StdString> gridIds;
1109     int sizeFile = this->enabledFiles.size();
1110     CFile* filePtr(NULL);
1111
1112     // Firstly, find all reference grids of all active fields
1113     for (int i = 0; i < sizeFile; ++i)
1114     {
1115       filePtr = this->enabledFiles[i];
1116       std::vector<CField*> enabledFields = filePtr->getEnabledFields();
1117       int sizeField = enabledFields.size();
1118       for (int numField = 0; numField < sizeField; ++numField)
1119       {
1120         if (0 != enabledFields[numField]->getRelGrid())
1121           gridIds.insert(CGrid::get(enabledFields[numField]->getRelGrid())->getId());
1122       }
1123     }
1124
1125     // Create all reference grids on server side
1126     StdString gridDefRoot("grid_definition");
1127     CGridGroup* gridPtr = CGridGroup::get(gridDefRoot);
1128     std::set<StdString>::const_iterator it, itE = gridIds.end();
1129     for (it = gridIds.begin(); it != itE; ++it)
1130     {
1131       gridPtr->sendCreateChild(*it);
1132       CGrid::get(*it)->sendAllAttributesToServer();
1133       CGrid::get(*it)->sendAllDomains();
1134       CGrid::get(*it)->sendAllAxis();
1135       CGrid::get(*it)->sendAllScalars();
1136     }
1137   }
1138
1139
1140   //! Client side: Send information of reference domain and axis of active fields
1141   void CContext::sendRefDomainsAxis()
1142   {
1143     std::set<StdString> domainIds, axisIds, scalarIds;
1144
1145     // Find all reference domain and axis of all active fields
1146     int numEnabledFiles = this->enabledFiles.size();
1147     for (int i = 0; i < numEnabledFiles; ++i)
1148     {
1149       std::vector<CField*> enabledFields = this->enabledFiles[i]->getEnabledFields();
1150       int numEnabledFields = enabledFields.size();
1151       for (int j = 0; j < numEnabledFields; ++j)
1152       {
1153         const std::vector<StdString>& prDomAxisScalarId = enabledFields[j]->getRefDomainAxisIds();
1154         if ("" != prDomAxisScalarId[0]) domainIds.insert(prDomAxisScalarId[0]);
1155         if ("" != prDomAxisScalarId[1]) axisIds.insert(prDomAxisScalarId[1]);
1156         if ("" != prDomAxisScalarId[2]) scalarIds.insert(prDomAxisScalarId[2]);
1157       }
1158     }
1159
1160     // Create all reference axis on server side
1161     std::set<StdString>::iterator itDom, itAxis, itScalar;
1162     std::set<StdString>::const_iterator itE;
1163
1164     StdString scalarDefRoot("scalar_definition");
1165     CScalarGroup* scalarPtr = CScalarGroup::get(scalarDefRoot);
1166     itE = scalarIds.end();
1167     for (itScalar = scalarIds.begin(); itScalar != itE; ++itScalar)
1168     {
1169       if (!itScalar->empty())
1170       {
1171         scalarPtr->sendCreateChild(*itScalar);
1172         CScalar::get(*itScalar)->sendAllAttributesToServer();
1173       }
1174     }
1175
1176     StdString axiDefRoot("axis_definition");
1177     CAxisGroup* axisPtr = CAxisGroup::get(axiDefRoot);
1178     itE = axisIds.end();
1179     for (itAxis = axisIds.begin(); itAxis != itE; ++itAxis)
1180     {
1181       if (!itAxis->empty())
1182       {
1183         axisPtr->sendCreateChild(*itAxis);
1184         CAxis::get(*itAxis)->sendAllAttributesToServer();
1185       }
1186     }
1187
1188     // Create all reference domains on server side
1189     StdString domDefRoot("domain_definition");
1190     CDomainGroup* domPtr = CDomainGroup::get(domDefRoot);
1191     itE = domainIds.end();
1192     for (itDom = domainIds.begin(); itDom != itE; ++itDom)
1193     {
1194       if (!itDom->empty()) {
1195          domPtr->sendCreateChild(*itDom);
1196          CDomain::get(*itDom)->sendAllAttributesToServer();
1197       }
1198     }
1199   }
1200
1201   //! Update calendar in each time step
1202   void CContext::updateCalendar(int step)
1203   {
1204      int prevStep = calendar->getStep();
1205
1206      if (prevStep < step)
1207      {
1208        info(50) << "updateCalendar : before : " << calendar->getCurrentDate() << endl;
1209        calendar->update(step);
1210        info(50) << "updateCalendar : after : " << calendar->getCurrentDate() << endl;
1211  #ifdef XIOS_MEMTRACK_LIGHT
1212        info(50) << " Current memory used by XIOS : "<<  MemTrack::getCurrentMemorySize()*1.0/(1024*1024)<<" Mbyte, at timestep "<<step<<" of context "<<this->getId()<<endl ;
1213  #endif
1214
1215        if (hasClient)
1216        {
1217          doPostTimestepOperationsForEnabledReadModeFiles();
1218          garbageCollector.invalidate(calendar->getCurrentDate());
1219        }
1220      }
1221      else if (prevStep == step)
1222        info(50) << "updateCalendar: already at step " << step << ", no operation done." << endl;
1223      else // if (prevStep > step)
1224        ERROR("void CContext::updateCalendar(int step)",
1225              << "Illegal calendar update: previous step was " << prevStep << ", new step " << step << "is in the past!")
1226   }
1227
1228   //! Server side: Create header of netcdf file
1229   void CContext::createFileHeader(void )
1230   {
1231      vector<CFile*>::const_iterator it;
1232
1233      for (it=enabledFiles.begin(); it != enabledFiles.end(); it++)
1234      {
1235         (*it)->initFile();
1236      }
1237   }
1238
1239   //! Get current context
1240   CContext* CContext::getCurrent(void)
1241   {
1242     return CObjectFactory::GetObject<CContext>(CObjectFactory::GetCurrentContextId()).get();
1243   }
1244
1245   /*!
1246   \brief Set context with an id be the current context
1247   \param [in] id identity of context to be set to current
1248   */
1249   void CContext::setCurrent(const string& id)
1250   {
1251     CObjectFactory::SetCurrentContextId(id);
1252     CGroupFactory::SetCurrentContextId(id);
1253   }
1254
1255  /*!
1256  \brief Create a context with specific id
1257  \param [in] id identity of new context
1258  \return pointer to the new context or already-existed one with identity id
1259  */
1260  CContext* CContext::create(const StdString& id)
1261  {
1262    CContext::setCurrent(id);
1263
1264    bool hasctxt = CContext::has(id);
1265    CContext* context = CObjectFactory::CreateObject<CContext>(id).get();
1266    getRoot();
1267    if (!hasctxt) CGroupFactory::AddChild(root, context->getShared());
1268
1269#define DECLARE_NODE(Name_, name_) \
1270    C##Name_##Definition::create(C##Name_##Definition::GetDefName());
1271#define DECLARE_NODE_PAR(Name_, name_)
1272#include "node_type.conf"
1273
1274    return (context);
1275  }
1276
1277
1278
1279     //! Server side: Receive a message to do some post processing
1280  void CContext::recvRegistry(CEventServer& event)
1281  {
1282    CBufferIn* buffer=event.subEvents.begin()->buffer;
1283    string id;
1284    *buffer>>id;
1285    get(id)->recvRegistry(*buffer);
1286  }
1287
1288  void CContext::recvRegistry(CBufferIn& buffer)
1289  {
1290    if (server->intraCommRank==0)
1291    {
1292      CRegistry registry(server->intraComm) ;
1293      registry.fromBuffer(buffer) ;
1294      registryOut->mergeRegistry(registry) ;
1295    }
1296  }
1297
1298  void CContext::sendRegistry(void)
1299  {
1300    registryOut->hierarchicalGatherRegistry() ;
1301
1302    CEventClient event(CContext::GetType(), CContext::EVENT_ID_SEND_REGISTRY);
1303    if (client->isServerLeader())
1304    {
1305       CMessage msg ;
1306       msg<<this->getIdServer();
1307       if (client->clientRank==0) msg<<*registryOut ;
1308       const std::list<int>& ranks = client->getRanksServerLeader();
1309       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
1310         event.push(*itRank,1,msg);
1311       client->sendEvent(event);
1312     }
1313     else client->sendEvent(event);
1314  }
1315
1316} // namespace xios
Note: See TracBrowser for help on using the repository browser.