source: XIOS/trunk/src/node/context.cpp @ 1015

Last change on this file since 1015 was 1015, checked in by oabramkina, 4 years ago

Check if output_frequency >= timestep added.
Files with output_frequency < timestep are considered to be disabled.

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
File size: 40.9 KB
Line 
1#include "context.hpp"
2#include "attribute_template.hpp"
3#include "object_template.hpp"
4#include "group_template.hpp"
5
6#include "calendar_type.hpp"
7#include "duration.hpp"
8
9#include "context_client.hpp"
10#include "context_server.hpp"
11#include "nc4_data_output.hpp"
12#include "node_type.hpp"
13#include "message.hpp"
14#include "type.hpp"
15#include "xios_spl.hpp"
16
17
18namespace xios {
19
20  shared_ptr<CContextGroup> CContext::root;
21
22   /// ////////////////////// Définitions ////////////////////// ///
23
24   CContext::CContext(void)
25      : CObjectTemplate<CContext>(), CContextAttributes()
26      , calendar(), hasClient(false), hasServer(false), isPostProcessed(false), finalized(false)
27      , idServer_(), client(0), server(0)
28   { /* Ne rien faire de plus */ }
29
30   CContext::CContext(const StdString & id)
31      : CObjectTemplate<CContext>(id), CContextAttributes()
32      , calendar(), hasClient(false), hasServer(false), isPostProcessed(false), finalized(false)
33      , idServer_(), client(0), server(0)
34   { /* Ne rien faire de plus */ }
35
36   CContext::~CContext(void)
37   {
38     delete client;
39     delete server;
40   }
41
42   //----------------------------------------------------------------
43   //! Get name of context
44   StdString CContext::GetName(void)   { return (StdString("context")); }
45   StdString CContext::GetDefName(void){ return (CContext::GetName()); }
46   ENodeType CContext::GetType(void)   { return (eContext); }
47
48   //----------------------------------------------------------------
49
50   /*!
51   \brief Get context group (context root)
52   \return Context root
53   */
54   CContextGroup* CContext::getRoot(void)
55   {
56      if (root.get()==NULL) root=shared_ptr<CContextGroup>(new CContextGroup(xml::CXMLNode::GetRootName()));
57      return root.get();
58   }
59
60   //----------------------------------------------------------------
61
62   /*!
63   \brief Get calendar of a context
64   \return Calendar
65   */
66   boost::shared_ptr<CCalendar> CContext::getCalendar(void) const
67   {
68      return (this->calendar);
69   }
70
71   //----------------------------------------------------------------
72
73   /*!
74   \brief Set a context with a calendar
75   \param[in] newCalendar new calendar
76   */
77   void CContext::setCalendar(boost::shared_ptr<CCalendar> newCalendar)
78   {
79      this->calendar = newCalendar;
80   }
81
82   //----------------------------------------------------------------
83   /*!
84   \brief Parse xml file and write information into context object
85   \param [in] node xmld node corresponding in xml file
86   */
87   void CContext::parse(xml::CXMLNode & node)
88   {
89      CContext::SuperClass::parse(node);
90
91      // PARSING POUR GESTION DES ENFANTS
92      xml::THashAttributes attributes = node.getAttributes();
93
94      if (attributes.end() != attributes.find("src"))
95      {
96         StdIFStream ifs ( attributes["src"].c_str() , StdIFStream::in );
97         if ( (ifs.rdstate() & std::ifstream::failbit ) != 0 )
98            ERROR("void CContext::parse(xml::CXMLNode & node)",
99                  <<endl<< "Can not open <"<<attributes["src"].c_str()<<"> file" );
100         if (!ifs.good())
101            ERROR("CContext::parse(xml::CXMLNode & node)",
102                  << "[ filename = " << attributes["src"] << " ] Bad xml stream !");
103         xml::CXMLParser::ParseInclude(ifs, attributes["src"], *this);
104      }
105
106      if (node.getElementName().compare(CContext::GetName()))
107         DEBUG("Le noeud is wrong defined but will be considered as a context !");
108
109      if (!(node.goToChildElement()))
110      {
111         DEBUG("Le context ne contient pas d'enfant !");
112      }
113      else
114      {
115         do { // Parcours des contextes pour traitement.
116
117            StdString name = node.getElementName();
118            attributes.clear();
119            attributes = node.getAttributes();
120
121            if (attributes.end() != attributes.find("id"))
122            {
123              DEBUG(<< "Definition node has an id,"
124                    << "it will not be taking account !");
125            }
126
127#define DECLARE_NODE(Name_, name_)    \
128   if (name.compare(C##Name_##Definition::GetDefName()) == 0) \
129   { C##Name_##Definition::create(C##Name_##Definition::GetDefName()) -> parse(node); continue; }
130#define DECLARE_NODE_PAR(Name_, name_)
131#include "node_type.conf"
132
133            DEBUG(<< "The element \'"     << name
134                  << "\' in the context \'" << CContext::getCurrent()->getId()
135                  << "\' is not a definition !");
136
137         } while (node.goToNextElement());
138
139         node.goToParentElement(); // Retour au parent
140      }
141   }
142
143   //----------------------------------------------------------------
144   //! Show tree structure of context
145   void CContext::ShowTree(StdOStream & out)
146   {
147      StdString currentContextId = CContext::getCurrent() -> getId();
148      std::vector<CContext*> def_vector =
149         CContext::getRoot()->getChildList();
150      std::vector<CContext*>::iterator
151         it = def_vector.begin(), end = def_vector.end();
152
153      out << "<? xml version=\"1.0\" ?>" << std::endl;
154      out << "<"  << xml::CXMLNode::GetRootName() << " >" << std::endl;
155
156      for (; it != end; it++)
157      {
158         CContext* context = *it;
159         CContext::setCurrent(context->getId());
160         out << *context << std::endl;
161      }
162
163      out << "</" << xml::CXMLNode::GetRootName() << " >" << std::endl;
164      CContext::setCurrent(currentContextId);
165   }
166
167
168   //----------------------------------------------------------------
169
170   //! Convert context object into string (to print)
171   StdString CContext::toString(void) const
172   {
173      StdOStringStream oss;
174      oss << "<" << CContext::GetName()
175          << " id=\"" << this->getId() << "\" "
176          << SuperClassAttribute::toString() << ">" << std::endl;
177      if (!this->hasChild())
178      {
179         //oss << "<!-- No definition -->" << std::endl; // fait planter l'incrémentation
180      }
181      else
182      {
183
184#define DECLARE_NODE(Name_, name_)    \
185   if (C##Name_##Definition::has(C##Name_##Definition::GetDefName())) \
186   oss << * C##Name_##Definition::get(C##Name_##Definition::GetDefName()) << std::endl;
187#define DECLARE_NODE_PAR(Name_, name_)
188#include "node_type.conf"
189
190      }
191
192      oss << "</" << CContext::GetName() << " >";
193
194      return (oss.str());
195   }
196
197   //----------------------------------------------------------------
198
199   /*!
200   \brief Find all inheritace among objects in a context.
201   \param [in] apply (true) write attributes of parent into ones of child if they are empty
202                     (false) write attributes of parent into a new container of child
203   \param [in] parent unused
204   */
205   void CContext::solveDescInheritance(bool apply, const CAttributeMap * const UNUSED(parent))
206   {
207#define DECLARE_NODE(Name_, name_)    \
208   if (C##Name_##Definition::has(C##Name_##Definition::GetDefName())) \
209     C##Name_##Definition::get(C##Name_##Definition::GetDefName())->solveDescInheritance(apply);
210#define DECLARE_NODE_PAR(Name_, name_)
211#include "node_type.conf"
212   }
213
214   //----------------------------------------------------------------
215
216   //! Verify if all root definition in the context have child.
217   bool CContext::hasChild(void) const
218   {
219      return (
220#define DECLARE_NODE(Name_, name_)    \
221   C##Name_##Definition::has(C##Name_##Definition::GetDefName())   ||
222#define DECLARE_NODE_PAR(Name_, name_)
223#include "node_type.conf"
224      false);
225}
226
227   //----------------------------------------------------------------
228
229   void CContext::CleanTree(void)
230   {
231#define DECLARE_NODE(Name_, name_) C##Name_##Definition::ClearAllAttributes();
232#define DECLARE_NODE_PAR(Name_, name_)
233#include "node_type.conf"
234   }
235   ///---------------------------------------------------------------
236
237   //! Initialize client side
238   void CContext::initClient(MPI_Comm intraComm, MPI_Comm interComm, CContext* cxtServer /*= 0*/)
239   {
240     hasClient=true;
241     client = new CContextClient(this,intraComm, interComm, cxtServer);
242     registryIn=new CRegistry(intraComm);
243     registryIn->setPath(getId()) ;
244     if (client->clientRank==0) registryIn->fromFile("xios_registry.bin") ;
245     registryIn->bcastRegistry() ;
246
247     registryOut=new CRegistry(intraComm) ;
248     registryOut->setPath(getId()) ;
249
250     MPI_Comm intraCommServer, interCommServer;
251     if (cxtServer) // Attached mode
252     {
253       intraCommServer = intraComm;
254       interCommServer = interComm;
255     }
256     else
257     {
258       MPI_Comm_dup(intraComm, &intraCommServer);
259       comms.push_back(intraCommServer);
260       MPI_Comm_dup(interComm, &interCommServer);
261       comms.push_back(interCommServer);
262     }
263     server = new CContextServer(this,intraCommServer,interCommServer);
264   }
265
266   void CContext::setClientServerBuffer()
267   {
268     size_t minBufferSize = CXios::minBufferSize;
269#define DECLARE_NODE(Name_, name_)    \
270     if (minBufferSize < sizeof(C##Name_##Definition)) minBufferSize = sizeof(C##Name_##Definition);
271#define DECLARE_NODE_PAR(Name_, name_)
272#include "node_type.conf"
273#undef DECLARE_NODE
274#undef DECLARE_NODE_PAR
275
276     std::map<int, StdSize> maxEventSize;
277     std::map<int, StdSize> bufferSize = getAttributesBufferSize(maxEventSize);
278     std::map<int, StdSize> dataBufferSize = getDataBufferSize(maxEventSize);
279
280     std::map<int, StdSize>::iterator it, ite = dataBufferSize.end();
281     for (it = dataBufferSize.begin(); it != ite; ++it)
282       if (it->second > bufferSize[it->first]) bufferSize[it->first] = it->second;
283
284     ite = bufferSize.end();
285     for (it = bufferSize.begin(); it != ite; ++it)
286     {
287       it->second *= CXios::bufferSizeFactor;
288       if (it->second < minBufferSize) it->second = minBufferSize;
289     }
290
291     // We consider that the minimum buffer size is also the minimum event size
292     ite = maxEventSize.end();
293     for (it = maxEventSize.begin(); it != ite; ++it)
294       if (it->second < minBufferSize) it->second = minBufferSize;
295
296     if (client->isServerLeader())
297     {
298       const std::list<int>& ranks = client->getRanksServerLeader();
299       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
300         if (!bufferSize.count(*itRank)) bufferSize[*itRank] = maxEventSize[*itRank] = minBufferSize;
301     }
302
303     client->setBufferSize(bufferSize, maxEventSize);
304   }
305
306   //! Verify whether a context is initialized
307   bool CContext::isInitialized(void)
308   {
309     return hasClient;
310   }
311
312   //! Initialize server
313   void CContext::initServer(MPI_Comm intraComm,MPI_Comm interComm, CContext* cxtClient /*= 0*/)
314   {
315     hasServer=true;
316     server = new CContextServer(this,intraComm,interComm);
317
318     registryIn=new CRegistry(intraComm);
319     registryIn->setPath(getId()) ;
320     if (server->intraCommRank==0) registryIn->fromFile("xios_registry.bin") ;
321     registryIn->bcastRegistry() ;
322     registryOut=new CRegistry(intraComm) ;
323     registryOut->setPath(getId()) ;
324
325     MPI_Comm intraCommClient, interCommClient;
326     if (cxtClient) // Attached mode
327     {
328       intraCommClient = intraComm;
329       interCommClient = interComm;
330     }
331     else
332     {
333       MPI_Comm_dup(intraComm, &intraCommClient);
334       comms.push_back(intraCommClient);
335       MPI_Comm_dup(interComm, &interCommClient);
336       comms.push_back(interCommClient);
337     }
338     client = new CContextClient(this,intraCommClient,interCommClient, cxtClient);
339   }
340
341   //! Server side: Put server into a loop in order to listen message from client
342   bool CContext::eventLoop(void)
343   {
344     return server->eventLoop();
345   }
346
347   //! Try to send the buffers and receive possible answers
348   bool CContext::checkBuffersAndListen(void)
349   {
350     client->checkBuffers();
351     return server->eventLoop();
352   }
353
354   //! Terminate a context
355   void CContext::finalize(void)
356   {
357      if (!finalized)
358      {
359        finalized = true;
360        if (hasClient) sendRegistry() ;
361        client->finalize();
362        while (!server->hasFinished())
363        {
364          server->eventLoop();
365        }
366
367        if (hasServer)
368        {
369          closeAllFile();
370          registryOut->hierarchicalGatherRegistry() ;
371          if (server->intraCommRank==0) CXios::globalRegistry->mergeRegistry(*registryOut) ;
372        }
373
374        for (std::list<MPI_Comm>::iterator it = comms.begin(); it != comms.end(); ++it)
375          MPI_Comm_free(&(*it));
376        comms.clear();
377      }
378   }
379
380   /*!
381   \brief Close all the context defintion and do processing data
382      After everything is well defined on client side, they will be processed and sent to server
383   From the version 2.0, sever and client work no more on the same database. Moreover, client(s) will send
384   all necessary information to server, from which each server can build its own database.
385   Because the role of server is to write out field data on a specific netcdf file,
386   the only information that it needs is the enabled files
387   and the active fields (fields will be written onto active files)
388   */
389   void CContext::closeDefinition(void)
390   {
391     // There is nothing client need to send to server
392     if (hasClient)
393     {
394       // After xml is parsed, there are some more works with post processing
395       postProcessing();
396     }
397     setClientServerBuffer();
398
399     if (hasClient && !hasServer)
400     {
401      // Send all attributes of current context to server
402      this->sendAllAttributesToServer();
403
404      // Send all attributes of current calendar
405      CCalendarWrapper::get(CCalendarWrapper::GetDefName())->sendAllAttributesToServer();
406
407      // We have enough information to send to server
408      // First of all, send all enabled files
409       sendEnabledFiles();
410
411      // Then, send all enabled fields
412       sendEnabledFields();
413
414      // At last, we have all info of domain and axis, then send them
415       sendRefDomainsAxis();
416
417      // After that, send all grid (if any)
418       sendRefGrid();
419    }
420
421    // We have a xml tree on the server side and now, it should be also processed
422    if (hasClient && !hasServer) sendPostProcessing();
423
424    // There are some processings that should be done after all of above. For example: check mask or index
425    if (hasClient)
426    {
427      this->buildFilterGraphOfEnabledFields();
428      buildFilterGraphOfFieldsWithReadAccess();
429      this->solveAllRefOfEnabledFields(true);
430    }
431
432    // Now tell server that it can process all messages from client
433    if (hasClient && !hasServer) this->sendCloseDefinition();
434
435    // Nettoyage de l'arborescence
436    if (hasClient && !hasServer) CleanTree(); // Only on client side??
437
438    if (hasClient)
439    {
440      sendCreateFileHeader();
441
442      startPrefetchingOfEnabledReadModeFiles();
443    }
444   }
445
446   void CContext::findAllEnabledFields(void)
447   {
448     for (unsigned int i = 0; i < this->enabledFiles.size(); i++)
449     (void)this->enabledFiles[i]->getEnabledFields();
450   }
451
452   void CContext::findAllEnabledFieldsInReadModeFiles(void)
453   {
454     for (unsigned int i = 0; i < this->enabledReadModeFiles.size(); ++i)
455     (void)this->enabledReadModeFiles[i]->getEnabledFields();
456   }
457
458   void CContext::readAttributesOfEnabledFieldsInReadModeFiles()
459   {
460      for (unsigned int i = 0; i < this->enabledReadModeFiles.size(); ++i)
461        (void)this->enabledReadModeFiles[i]->readAttributesOfEnabledFieldsInReadMode();
462   }
463
464   void CContext::solveOnlyRefOfEnabledFields(bool sendToServer)
465   {
466     int size = this->enabledFiles.size();
467     for (int i = 0; i < size; ++i)
468     {
469       this->enabledFiles[i]->solveOnlyRefOfEnabledFields(sendToServer);
470     }
471
472     for (int i = 0; i < size; ++i)
473     {
474       this->enabledFiles[i]->generateNewTransformationGridDest();
475     }
476   }
477
478   void CContext::solveAllRefOfEnabledFields(bool sendToServer)
479   {
480     int size = this->enabledFiles.size();
481     for (int i = 0; i < size; ++i)
482     {
483       this->enabledFiles[i]->solveAllRefOfEnabledFields(sendToServer);
484     }
485   }
486
487   void CContext::buildFilterGraphOfEnabledFields()
488   {
489     int size = this->enabledFiles.size();
490     for (int i = 0; i < size; ++i)
491     {
492       this->enabledFiles[i]->buildFilterGraphOfEnabledFields(garbageCollector);
493     }
494   }
495
496   void CContext::startPrefetchingOfEnabledReadModeFiles()
497   {
498     int size = enabledReadModeFiles.size();
499     for (int i = 0; i < size; ++i)
500     {
501        enabledReadModeFiles[i]->prefetchEnabledReadModeFields();
502     }
503   }
504
505   void CContext::checkPrefetchingOfEnabledReadModeFiles()
506   {
507     int size = enabledReadModeFiles.size();
508     for (int i = 0; i < size; ++i)
509     {
510        enabledReadModeFiles[i]->prefetchEnabledReadModeFieldsIfNeeded();
511     }
512   }
513
514  void CContext::findFieldsWithReadAccess(void)
515  {
516    fieldsWithReadAccess.clear();
517    const vector<CField*> allFields = CField::getAll();
518    for (size_t i = 0; i < allFields.size(); ++i)
519    {
520      CField* field = allFields[i];
521
522      if (field->file && !field->file->mode.isEmpty() && field->file->mode == CFile::mode_attr::read)
523        field->read_access = true;
524      else if (!field->read_access.isEmpty() && field->read_access && (field->enabled.isEmpty() || field->enabled))
525        fieldsWithReadAccess.push_back(field);
526    }
527  }
528
529  void CContext::solveAllRefOfFieldsWithReadAccess()
530  {
531    for (size_t i = 0; i < fieldsWithReadAccess.size(); ++i)
532      fieldsWithReadAccess[i]->solveAllReferenceEnabledField(false);
533  }
534
535  void CContext::buildFilterGraphOfFieldsWithReadAccess()
536  {
537    for (size_t i = 0; i < fieldsWithReadAccess.size(); ++i)
538      fieldsWithReadAccess[i]->buildFilterGraph(garbageCollector, true);
539  }
540
541   void CContext::solveAllInheritance(bool apply)
542   {
543     // Résolution des héritages descendants (càd des héritages de groupes)
544     // pour chacun des contextes.
545      solveDescInheritance(apply);
546
547     // Résolution des héritages par référence au niveau des fichiers.
548      const vector<CFile*> allFiles=CFile::getAll();
549      const vector<CGrid*> allGrids= CGrid::getAll();
550
551     //if (hasClient && !hasServer)
552      if (hasClient)
553      {
554        for (unsigned int i = 0; i < allFiles.size(); i++)
555          allFiles[i]->solveFieldRefInheritance(apply);
556      }
557
558      unsigned int vecSize = allGrids.size();
559      unsigned int i = 0;
560      for (i = 0; i < vecSize; ++i)
561        allGrids[i]->solveDomainAxisRefInheritance(apply);
562
563   }
564
565   void CContext::findEnabledFiles(void)
566   {
567      const std::vector<CFile*> allFiles = CFile::getAll();
568
569      for (unsigned int i = 0; i < allFiles.size(); i++)
570         if (!allFiles[i]->enabled.isEmpty()) // Si l'attribut 'enabled' est défini.
571         {
572            if (allFiles[i]->enabled.getValue()) // Si l'attribut 'enabled' est fixé à vrai.
573            {
574              if ( allFiles[i]->output_freq.getValue() < this->getCalendar()->getTimeStep())
575              {
576                error(0)<<"WARNING: void CContext::findEnabledFiles()"<<endl
577                    << "Output frequency in file \""<<allFiles[i]->getId()
578                    <<"\" is greater than the time step. File will not be written."<<endl;
579              }
580              else
581               enabledFiles.push_back(allFiles[i]);
582            }
583         }
584         else
585         {
586           if ( allFiles[i]->output_freq.getValue() < this->getCalendar()->getTimeStep())
587           {
588             error(0)<<"WARNING: void CContext::findEnabledFiles()"<<endl
589                 << "Output frequency in file \""<<allFiles[i]->getId()
590                 <<"\" is greater than the time step. File will not be written."<<endl;
591           }
592           else
593             enabledFiles.push_back(allFiles[i]); // otherwise true by default
594         }
595
596      if (enabledFiles.size() == 0)
597         DEBUG(<<"Aucun fichier ne va être sorti dans le contexte nommé \""
598               << getId() << "\" !");
599   }
600
601   void CContext::findEnabledReadModeFiles(void)
602   {
603     int size = this->enabledFiles.size();
604     for (int i = 0; i < size; ++i)
605     {
606       if (!enabledFiles[i]->mode.isEmpty() && enabledFiles[i]->mode.getValue() == CFile::mode_attr::read)
607        enabledReadModeFiles.push_back(enabledFiles[i]);
608     }
609   }
610
611   void CContext::closeAllFile(void)
612   {
613     std::vector<CFile*>::const_iterator
614            it = this->enabledFiles.begin(), end = this->enabledFiles.end();
615
616     for (; it != end; it++)
617     {
618       info(30)<<"Closing File : "<<(*it)->getId()<<endl;
619       (*it)->close();
620     }
621   }
622
623   /*!
624   \brief Dispatch event received from client
625      Whenever a message is received in buffer of server, it will be processed depending on
626   its event type. A new event type should be added in the switch list to make sure
627   it processed on server side.
628   \param [in] event: Received message
629   */
630   bool CContext::dispatchEvent(CEventServer& event)
631   {
632
633      if (SuperClass::dispatchEvent(event)) return true;
634      else
635      {
636        switch(event.type)
637        {
638           case EVENT_ID_CLOSE_DEFINITION :
639             recvCloseDefinition(event);
640             return true;
641             break;
642           case EVENT_ID_UPDATE_CALENDAR:
643             recvUpdateCalendar(event);
644             return true;
645             break;
646           case EVENT_ID_CREATE_FILE_HEADER :
647             recvCreateFileHeader(event);
648             return true;
649             break;
650           case EVENT_ID_POST_PROCESS:
651             recvPostProcessing(event);
652             return true;
653            case EVENT_ID_SEND_REGISTRY:
654             recvRegistry(event);
655             return true;
656            break;
657
658           default :
659             ERROR("bool CContext::dispatchEvent(CEventServer& event)",
660                    <<"Unknown Event");
661           return false;
662         }
663      }
664   }
665
666   //! Client side: Send a message to server to make it close
667   void CContext::sendCloseDefinition(void)
668   {
669     CEventClient event(getType(),EVENT_ID_CLOSE_DEFINITION);
670     if (client->isServerLeader())
671     {
672       CMessage msg;
673       msg<<this->getIdServer();
674       const std::list<int>& ranks = client->getRanksServerLeader();
675       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
676         event.push(*itRank,1,msg);
677       client->sendEvent(event);
678     }
679     else client->sendEvent(event);
680   }
681
682   //! Server side: Receive a message of client announcing a context close
683   void CContext::recvCloseDefinition(CEventServer& event)
684   {
685
686      CBufferIn* buffer=event.subEvents.begin()->buffer;
687      string id;
688      *buffer>>id;
689      get(id)->closeDefinition();
690   }
691
692   //! Client side: Send a message to update calendar in each time step
693   void CContext::sendUpdateCalendar(int step)
694   {
695     if (!hasServer)
696     {
697       CEventClient event(getType(),EVENT_ID_UPDATE_CALENDAR);
698       if (client->isServerLeader())
699       {
700         CMessage msg;
701         msg<<this->getIdServer()<<step;
702         const std::list<int>& ranks = client->getRanksServerLeader();
703         for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
704           event.push(*itRank,1,msg);
705         client->sendEvent(event);
706       }
707       else client->sendEvent(event);
708     }
709   }
710
711   //! Server side: Receive a message of client annoucing calendar update
712   void CContext::recvUpdateCalendar(CEventServer& event)
713   {
714      CBufferIn* buffer=event.subEvents.begin()->buffer;
715      string id;
716      *buffer>>id;
717      get(id)->recvUpdateCalendar(*buffer);
718   }
719
720   //! Server side: Receive a message of client annoucing calendar update
721   void CContext::recvUpdateCalendar(CBufferIn& buffer)
722   {
723      int step;
724      buffer>>step;
725      updateCalendar(step);
726   }
727
728   //! Client side: Send a message to create header part of netcdf file
729   void CContext::sendCreateFileHeader(void)
730   {
731     CEventClient event(getType(),EVENT_ID_CREATE_FILE_HEADER);
732     if (client->isServerLeader())
733     {
734       CMessage msg;
735       msg<<this->getIdServer();
736       const std::list<int>& ranks = client->getRanksServerLeader();
737       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
738         event.push(*itRank,1,msg) ;
739       client->sendEvent(event);
740     }
741     else client->sendEvent(event);
742   }
743
744   //! Server side: Receive a message of client annoucing the creation of header part of netcdf file
745   void CContext::recvCreateFileHeader(CEventServer& event)
746   {
747      CBufferIn* buffer=event.subEvents.begin()->buffer;
748      string id;
749      *buffer>>id;
750      get(id)->recvCreateFileHeader(*buffer);
751   }
752
753   //! Server side: Receive a message of client annoucing the creation of header part of netcdf file
754   void CContext::recvCreateFileHeader(CBufferIn& buffer)
755   {
756      createFileHeader();
757   }
758
759   //! Client side: Send a message to do some post processing on server
760   void CContext::sendPostProcessing()
761   {
762     if (!hasServer)
763     {
764       CEventClient event(getType(),EVENT_ID_POST_PROCESS);
765       if (client->isServerLeader())
766       {
767         CMessage msg;
768         msg<<this->getIdServer();
769         const std::list<int>& ranks = client->getRanksServerLeader();
770         for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
771           event.push(*itRank,1,msg);
772         client->sendEvent(event);
773       }
774       else client->sendEvent(event);
775     }
776   }
777
778   //! Server side: Receive a message to do some post processing
779   void CContext::recvPostProcessing(CEventServer& event)
780   {
781      CBufferIn* buffer=event.subEvents.begin()->buffer;
782      string id;
783      *buffer>>id;
784      get(id)->recvPostProcessing(*buffer);
785   }
786
787   //! Server side: Receive a message to do some post processing
788   void CContext::recvPostProcessing(CBufferIn& buffer)
789   {
790      CCalendarWrapper::get(CCalendarWrapper::GetDefName())->createCalendar();
791      postProcessing();
792   }
793
794   const StdString& CContext::getIdServer()
795   {
796      if (hasClient)
797      {
798        idServer_ = this->getId();
799        idServer_ += "_server";
800        return idServer_;
801      }
802      if (hasServer) return (this->getId());
803   }
804
805   /*!
806   \brief Do some simple post processings after parsing xml file
807      After the xml file (iodef.xml) is parsed, it is necessary to build all relations among
808   created object, e.g: inhertance among fields, domain, axis. After that, all fiels as well as their parents (reference fields),
809   which will be written out into netcdf files, are processed
810   */
811   void CContext::postProcessing()
812   {
813     if (isPostProcessed) return;
814
815      // Make sure the calendar was correctly created
816      if (!calendar)
817        ERROR("CContext::postProcessing()", << "A calendar must be defined for the context \"" << getId() << "!\"")
818      else if (calendar->getTimeStep() == NoneDu)
819        ERROR("CContext::postProcessing()", << "A timestep must be defined for the context \"" << getId() << "!\"")
820      // Calendar first update to set the current date equals to the start date
821      calendar->update(0);
822
823      // Find all inheritance in xml structure
824      this->solveAllInheritance();
825
826      // Check if some axis, domains or grids are eligible to for compressed indexed output.
827      // Warning: This must be done after solving the inheritance and before the rest of post-processing
828      checkAxisDomainsGridsEligibilityForCompressedOutput();
829
830      // Check if some automatic time series should be generated
831      // Warning: This must be done after solving the inheritance and before the rest of post-processing
832      prepareTimeseries();
833
834      //Initialisation du vecteur 'enabledFiles' contenant la liste des fichiers à sortir.
835      this->findEnabledFiles();
836      this->findEnabledReadModeFiles();
837
838      // Find all enabled fields of each file
839      this->findAllEnabledFields();
840      this->findAllEnabledFieldsInReadModeFiles();
841
842     if (hasClient && !hasServer)
843     {
844      // Try to read attributes of fields in file then fill in corresponding grid (or domain, axis)
845      this->readAttributesOfEnabledFieldsInReadModeFiles();
846     }
847
848      // Only search and rebuild all reference objects of enable fields, don't transform
849      this->solveOnlyRefOfEnabledFields(false);
850
851      // Search and rebuild all reference object of enabled fields
852      this->solveAllRefOfEnabledFields(false);
853
854      // Find all fields with read access from the public API
855      findFieldsWithReadAccess();
856      // and solve the all reference for them
857      solveAllRefOfFieldsWithReadAccess();
858
859      isPostProcessed = true;
860   }
861
862   /*!
863    * Compute the required buffer size to send the attributes (mostly those grid related).
864    *
865    * \param maxEventSize [in/out] the size of the bigger event for each connected server
866    */
867   std::map<int, StdSize> CContext::getAttributesBufferSize(std::map<int, StdSize>& maxEventSize)
868   {
869     std::map<int, StdSize> attributesSize;
870
871     if (hasClient)
872     {
873       size_t numEnabledFiles = this->enabledFiles.size();
874       for (size_t i = 0; i < numEnabledFiles; ++i)
875       {
876         CFile* file = this->enabledFiles[i];
877
878         std::vector<CField*> enabledFields = file->getEnabledFields();
879         size_t numEnabledFields = enabledFields.size();
880         for (size_t j = 0; j < numEnabledFields; ++j)
881         {
882           const std::map<int, StdSize> mapSize = enabledFields[j]->getGridAttributesBufferSize();
883           std::map<int, StdSize>::const_iterator it = mapSize.begin(), itE = mapSize.end();
884           for (; it != itE; ++it)
885           {
886             // If attributesSize[it->first] does not exist, it will be zero-initialized
887             // so we can use it safely without checking for its existance
888             if (attributesSize[it->first] < it->second)
889               attributesSize[it->first] = it->second;
890
891             if (maxEventSize[it->first] < it->second)
892               maxEventSize[it->first] = it->second;
893           }
894         }
895       }
896     }
897
898     return attributesSize;
899   }
900
901   /*!
902    * Compute the required buffer size to send the fields data.
903    *
904    * \param maxEventSize [in/out] the size of the bigger event for each connected server
905    */
906   std::map<int, StdSize> CContext::getDataBufferSize(std::map<int, StdSize>& maxEventSize)
907   {
908     CFile::mode_attr::t_enum mode = hasClient ? CFile::mode_attr::write : CFile::mode_attr::read;
909
910     std::map<int, StdSize> dataSize;
911
912     // Find all reference domain and axis of all active fields
913     size_t numEnabledFiles = this->enabledFiles.size();
914     for (size_t i = 0; i < numEnabledFiles; ++i)
915     {
916       CFile* file = this->enabledFiles[i];
917       CFile::mode_attr::t_enum fileMode = file->mode.isEmpty() ? CFile::mode_attr::write : file->mode.getValue();
918
919       if (fileMode == mode)
920       {
921         std::vector<CField*> enabledFields = file->getEnabledFields();
922         size_t numEnabledFields = enabledFields.size();
923         for (size_t j = 0; j < numEnabledFields; ++j)
924         {
925           const std::map<int, StdSize> mapSize = enabledFields[j]->getGridDataBufferSize();
926           std::map<int, StdSize>::const_iterator it = mapSize.begin(), itE = mapSize.end();
927           for (; it != itE; ++it)
928           {
929             // If dataSize[it->first] does not exist, it will be zero-initialized
930             // so we can use it safely without checking for its existance
931             if (CXios::isOptPerformance)
932               dataSize[it->first] += it->second;
933             else if (dataSize[it->first] < it->second)
934               dataSize[it->first] = it->second;
935
936             if (maxEventSize[it->first] < it->second)
937               maxEventSize[it->first] = it->second;
938           }
939         }
940       }
941     }
942
943     return dataSize;
944   }
945
946   //! Client side: Send infomation of active files (files are enabled to write out)
947   void CContext::sendEnabledFiles()
948   {
949     int size = this->enabledFiles.size();
950
951     // In a context, each type has a root definition, e.g: axis, domain, field.
952     // Every object must be a child of one of these root definition. In this case
953     // all new file objects created on server must be children of the root "file_definition"
954     StdString fileDefRoot("file_definition");
955     CFileGroup* cfgrpPtr = CFileGroup::get(fileDefRoot);
956
957     for (int i = 0; i < size; ++i)
958     {
959       cfgrpPtr->sendCreateChild(this->enabledFiles[i]->getId());
960       this->enabledFiles[i]->sendAllAttributesToServer();
961       this->enabledFiles[i]->sendAddAllVariables();
962     }
963   }
964
965   //! Client side: Send information of active fields (ones are written onto files)
966   void CContext::sendEnabledFields()
967   {
968     int size = this->enabledFiles.size();
969     for (int i = 0; i < size; ++i)
970     {
971       this->enabledFiles[i]->sendEnabledFields();
972     }
973   }
974
975   //! Client side: Check if the defined axis, domains and grids are eligible for compressed indexed output
976   void CContext::checkAxisDomainsGridsEligibilityForCompressedOutput()
977   {
978     if (!hasClient) return;
979
980     const vector<CAxis*> allAxis = CAxis::getAll();
981     for (vector<CAxis*>::const_iterator it = allAxis.begin(); it != allAxis.end(); it++)
982       (*it)->checkEligibilityForCompressedOutput();
983
984     const vector<CDomain*> allDomains = CDomain::getAll();
985     for (vector<CDomain*>::const_iterator it = allDomains.begin(); it != allDomains.end(); it++)
986       (*it)->checkEligibilityForCompressedOutput();
987
988     const vector<CGrid*> allGrids = CGrid::getAll();
989     for (vector<CGrid*>::const_iterator it = allGrids.begin(); it != allGrids.end(); it++)
990       (*it)->checkEligibilityForCompressedOutput();
991   }
992
993   //! Client side: Prepare the timeseries by adding the necessary files
994   void CContext::prepareTimeseries()
995   {
996     if (!hasClient) return;
997
998     const std::vector<CFile*> allFiles = CFile::getAll();
999     for (size_t i = 0; i < allFiles.size(); i++)
1000     {
1001       CFile* file = allFiles[i];
1002
1003       if (!file->timeseries.isEmpty() && file->timeseries != CFile::timeseries_attr::none)
1004       {
1005         StdString tsPrefix = !file->ts_prefix.isEmpty() ? file->ts_prefix : file->getFileOutputName();
1006
1007         const std::vector<CField*> allFields = file->getAllFields();
1008         for (size_t j = 0; j < allFields.size(); j++)
1009         {
1010           CField* field = allFields[j];
1011
1012           if (!field->ts_enabled.isEmpty() && field->ts_enabled)
1013           {
1014             CFile* tsFile = CFile::create();
1015             tsFile->duplicateAttributes(file);
1016             tsFile->setVirtualVariableGroup(file->getVirtualVariableGroup());
1017
1018             tsFile->name = tsPrefix + "_";
1019             if (!field->name.isEmpty())
1020               tsFile->name.get() += field->name;
1021             else if (field->hasDirectFieldReference()) // We cannot use getBaseFieldReference() just yet
1022               tsFile->name.get() += field->field_ref;
1023             else
1024               tsFile->name.get() += field->getId();
1025
1026             if (!field->ts_split_freq.isEmpty())
1027               tsFile->split_freq = field->ts_split_freq;
1028
1029             CField* tsField = tsFile->addField();
1030             tsField->field_ref = field->getId();
1031             tsField->setVirtualVariableGroup(field->getVirtualVariableGroup());
1032
1033             tsFile->solveFieldRefInheritance(true);
1034
1035             if (file->timeseries == CFile::timeseries_attr::exclusive)
1036               field->enabled = false;
1037           }
1038         }
1039
1040         // Finally disable the original file is need be
1041         if (file->timeseries == CFile::timeseries_attr::only)
1042          file->enabled = false;
1043       }
1044     }
1045   }
1046
1047   //! Client side: Send information of reference grid of active fields
1048   void CContext::sendRefGrid()
1049   {
1050     std::set<StdString> gridIds;
1051     int sizeFile = this->enabledFiles.size();
1052     CFile* filePtr(NULL);
1053
1054     // Firstly, find all reference grids of all active fields
1055     for (int i = 0; i < sizeFile; ++i)
1056     {
1057       filePtr = this->enabledFiles[i];
1058       std::vector<CField*> enabledFields = filePtr->getEnabledFields();
1059       int sizeField = enabledFields.size();
1060       for (int numField = 0; numField < sizeField; ++numField)
1061       {
1062         if (0 != enabledFields[numField]->getRelGrid())
1063           gridIds.insert(CGrid::get(enabledFields[numField]->getRelGrid())->getId());
1064       }
1065     }
1066
1067     // Create all reference grids on server side
1068     StdString gridDefRoot("grid_definition");
1069     CGridGroup* gridPtr = CGridGroup::get(gridDefRoot);
1070     std::set<StdString>::const_iterator it, itE = gridIds.end();
1071     for (it = gridIds.begin(); it != itE; ++it)
1072     {
1073       gridPtr->sendCreateChild(*it);
1074       CGrid::get(*it)->sendAllAttributesToServer();
1075       CGrid::get(*it)->sendAllDomains();
1076       CGrid::get(*it)->sendAllAxis();
1077       CGrid::get(*it)->sendAllScalars();
1078     }
1079   }
1080
1081
1082   //! Client side: Send information of reference domain and axis of active fields
1083   void CContext::sendRefDomainsAxis()
1084   {
1085     std::set<StdString> domainIds, axisIds, scalarIds;
1086
1087     // Find all reference domain and axis of all active fields
1088     int numEnabledFiles = this->enabledFiles.size();
1089     for (int i = 0; i < numEnabledFiles; ++i)
1090     {
1091       std::vector<CField*> enabledFields = this->enabledFiles[i]->getEnabledFields();
1092       int numEnabledFields = enabledFields.size();
1093       for (int j = 0; j < numEnabledFields; ++j)
1094       {
1095         const std::vector<StdString>& prDomAxisScalarId = enabledFields[j]->getRefDomainAxisIds();
1096         if ("" != prDomAxisScalarId[0]) domainIds.insert(prDomAxisScalarId[0]);
1097         if ("" != prDomAxisScalarId[1]) axisIds.insert(prDomAxisScalarId[1]);
1098         if ("" != prDomAxisScalarId[2]) scalarIds.insert(prDomAxisScalarId[2]);
1099       }
1100     }
1101
1102     // Create all reference axis on server side
1103     std::set<StdString>::iterator itDom, itAxis, itScalar;
1104     std::set<StdString>::const_iterator itE;
1105
1106     StdString scalarDefRoot("scalar_definition");
1107     CScalarGroup* scalarPtr = CScalarGroup::get(scalarDefRoot);
1108     itE = scalarIds.end();
1109     for (itScalar = scalarIds.begin(); itScalar != itE; ++itScalar)
1110     {
1111       if (!itScalar->empty())
1112       {
1113         scalarPtr->sendCreateChild(*itScalar);
1114         CScalar::get(*itScalar)->sendAllAttributesToServer();
1115       }
1116     }
1117
1118     StdString axiDefRoot("axis_definition");
1119     CAxisGroup* axisPtr = CAxisGroup::get(axiDefRoot);
1120     itE = axisIds.end();
1121     for (itAxis = axisIds.begin(); itAxis != itE; ++itAxis)
1122     {
1123       if (!itAxis->empty())
1124       {
1125         axisPtr->sendCreateChild(*itAxis);
1126         CAxis::get(*itAxis)->sendAllAttributesToServer();
1127       }
1128     }
1129
1130     // Create all reference domains on server side
1131     StdString domDefRoot("domain_definition");
1132     CDomainGroup* domPtr = CDomainGroup::get(domDefRoot);
1133     itE = domainIds.end();
1134     for (itDom = domainIds.begin(); itDom != itE; ++itDom)
1135     {
1136       if (!itDom->empty()) {
1137          domPtr->sendCreateChild(*itDom);
1138          CDomain::get(*itDom)->sendAllAttributesToServer();
1139       }
1140     }
1141   }
1142
1143   //! Update calendar in each time step
1144   void CContext::updateCalendar(int step)
1145   {
1146      info(50) << "updateCalendar : before : " << calendar->getCurrentDate() << endl;
1147      calendar->update(step);
1148      info(50) << "updateCalendar : after : " << calendar->getCurrentDate() << endl;
1149
1150      if (hasClient)
1151      {
1152        checkPrefetchingOfEnabledReadModeFiles();
1153        garbageCollector.invalidate(calendar->getCurrentDate());
1154      }
1155   }
1156
1157   //! Server side: Create header of netcdf file
1158   void CContext::createFileHeader(void )
1159   {
1160      vector<CFile*>::const_iterator it;
1161
1162      for (it=enabledFiles.begin(); it != enabledFiles.end(); it++)
1163      {
1164         (*it)->initFile();
1165      }
1166   }
1167
1168   //! Get current context
1169   CContext* CContext::getCurrent(void)
1170   {
1171     return CObjectFactory::GetObject<CContext>(CObjectFactory::GetCurrentContextId()).get();
1172   }
1173
1174   /*!
1175   \brief Set context with an id be the current context
1176   \param [in] id identity of context to be set to current
1177   */
1178   void CContext::setCurrent(const string& id)
1179   {
1180     CObjectFactory::SetCurrentContextId(id);
1181     CGroupFactory::SetCurrentContextId(id);
1182   }
1183
1184  /*!
1185  \brief Create a context with specific id
1186  \param [in] id identity of new context
1187  \return pointer to the new context or already-existed one with identity id
1188  */
1189  CContext* CContext::create(const StdString& id)
1190  {
1191    CContext::setCurrent(id);
1192
1193    bool hasctxt = CContext::has(id);
1194    CContext* context = CObjectFactory::CreateObject<CContext>(id).get();
1195    getRoot();
1196    if (!hasctxt) CGroupFactory::AddChild(root, context->getShared());
1197
1198#define DECLARE_NODE(Name_, name_) \
1199    C##Name_##Definition::create(C##Name_##Definition::GetDefName());
1200#define DECLARE_NODE_PAR(Name_, name_)
1201#include "node_type.conf"
1202
1203    return (context);
1204  }
1205
1206
1207
1208     //! Server side: Receive a message to do some post processing
1209  void CContext::recvRegistry(CEventServer& event)
1210  {
1211    CBufferIn* buffer=event.subEvents.begin()->buffer;
1212    string id;
1213    *buffer>>id;
1214    get(id)->recvRegistry(*buffer);
1215  }
1216
1217  void CContext::recvRegistry(CBufferIn& buffer)
1218  {
1219    if (server->intraCommRank==0)
1220    {
1221      CRegistry registry(server->intraComm) ;
1222      registry.fromBuffer(buffer) ;
1223      registryOut->mergeRegistry(registry) ;
1224    }
1225  }
1226
1227  void CContext::sendRegistry(void)
1228  {
1229    registryOut->hierarchicalGatherRegistry() ;
1230
1231    CEventClient event(CContext::GetType(), CContext::EVENT_ID_SEND_REGISTRY);
1232    if (client->isServerLeader())
1233    {
1234       CMessage msg ;
1235       msg<<this->getIdServer();
1236       if (client->clientRank==0) msg<<*registryOut ;
1237       const std::list<int>& ranks = client->getRanksServerLeader();
1238       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
1239         event.push(*itRank,1,msg);
1240       client->sendEvent(event);
1241     }
1242     else client->sendEvent(event);
1243  }
1244
1245} // namespace xios
Note: See TracBrowser for help on using the repository browser.