source: XIOS/trunk/src/node/context.cpp @ 1028

Last change on this file since 1028 was 1028, checked in by oabramkina, 7 years ago

Correcting a bug introduced in r1015 with a check if output_frequency >= timestep.

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
File size: 41.0 KB
Line 
1#include "context.hpp"
2#include "attribute_template.hpp"
3#include "object_template.hpp"
4#include "group_template.hpp"
5
6#include "calendar_type.hpp"
7#include "duration.hpp"
8
9#include "context_client.hpp"
10#include "context_server.hpp"
11#include "nc4_data_output.hpp"
12#include "node_type.hpp"
13#include "message.hpp"
14#include "type.hpp"
15#include "xios_spl.hpp"
16
17
18namespace xios {
19
20  shared_ptr<CContextGroup> CContext::root;
21
22   /// ////////////////////// Définitions ////////////////////// ///
23
24   CContext::CContext(void)
25      : CObjectTemplate<CContext>(), CContextAttributes()
26      , calendar(), hasClient(false), hasServer(false), isPostProcessed(false), finalized(false)
27      , idServer_(), client(0), server(0)
28   { /* Ne rien faire de plus */ }
29
30   CContext::CContext(const StdString & id)
31      : CObjectTemplate<CContext>(id), CContextAttributes()
32      , calendar(), hasClient(false), hasServer(false), isPostProcessed(false), finalized(false)
33      , idServer_(), client(0), server(0)
34   { /* Ne rien faire de plus */ }
35
36   CContext::~CContext(void)
37   {
38     delete client;
39     delete server;
40   }
41
42   //----------------------------------------------------------------
43   //! Get name of context
44   StdString CContext::GetName(void)   { return (StdString("context")); }
45   StdString CContext::GetDefName(void){ return (CContext::GetName()); }
46   ENodeType CContext::GetType(void)   { return (eContext); }
47
48   //----------------------------------------------------------------
49
50   /*!
51   \brief Get context group (context root)
52   \return Context root
53   */
54   CContextGroup* CContext::getRoot(void)
55   {
56      if (root.get()==NULL) root=shared_ptr<CContextGroup>(new CContextGroup(xml::CXMLNode::GetRootName()));
57      return root.get();
58   }
59
60   //----------------------------------------------------------------
61
62   /*!
63   \brief Get calendar of a context
64   \return Calendar
65   */
66   boost::shared_ptr<CCalendar> CContext::getCalendar(void) const
67   {
68      return (this->calendar);
69   }
70
71   //----------------------------------------------------------------
72
73   /*!
74   \brief Set a context with a calendar
75   \param[in] newCalendar new calendar
76   */
77   void CContext::setCalendar(boost::shared_ptr<CCalendar> newCalendar)
78   {
79      this->calendar = newCalendar;
80   }
81
82   //----------------------------------------------------------------
83   /*!
84   \brief Parse xml file and write information into context object
85   \param [in] node xmld node corresponding in xml file
86   */
87   void CContext::parse(xml::CXMLNode & node)
88   {
89      CContext::SuperClass::parse(node);
90
91      // PARSING POUR GESTION DES ENFANTS
92      xml::THashAttributes attributes = node.getAttributes();
93
94      if (attributes.end() != attributes.find("src"))
95      {
96         StdIFStream ifs ( attributes["src"].c_str() , StdIFStream::in );
97         if ( (ifs.rdstate() & std::ifstream::failbit ) != 0 )
98            ERROR("void CContext::parse(xml::CXMLNode & node)",
99                  <<endl<< "Can not open <"<<attributes["src"].c_str()<<"> file" );
100         if (!ifs.good())
101            ERROR("CContext::parse(xml::CXMLNode & node)",
102                  << "[ filename = " << attributes["src"] << " ] Bad xml stream !");
103         xml::CXMLParser::ParseInclude(ifs, attributes["src"], *this);
104      }
105
106      if (node.getElementName().compare(CContext::GetName()))
107         DEBUG("Le noeud is wrong defined but will be considered as a context !");
108
109      if (!(node.goToChildElement()))
110      {
111         DEBUG("Le context ne contient pas d'enfant !");
112      }
113      else
114      {
115         do { // Parcours des contextes pour traitement.
116
117            StdString name = node.getElementName();
118            attributes.clear();
119            attributes = node.getAttributes();
120
121            if (attributes.end() != attributes.find("id"))
122            {
123              DEBUG(<< "Definition node has an id,"
124                    << "it will not be taking account !");
125            }
126
127#define DECLARE_NODE(Name_, name_)    \
128   if (name.compare(C##Name_##Definition::GetDefName()) == 0) \
129   { C##Name_##Definition::create(C##Name_##Definition::GetDefName()) -> parse(node); continue; }
130#define DECLARE_NODE_PAR(Name_, name_)
131#include "node_type.conf"
132
133            DEBUG(<< "The element \'"     << name
134                  << "\' in the context \'" << CContext::getCurrent()->getId()
135                  << "\' is not a definition !");
136
137         } while (node.goToNextElement());
138
139         node.goToParentElement(); // Retour au parent
140      }
141   }
142
143   //----------------------------------------------------------------
144   //! Show tree structure of context
145   void CContext::ShowTree(StdOStream & out)
146   {
147      StdString currentContextId = CContext::getCurrent() -> getId();
148      std::vector<CContext*> def_vector =
149         CContext::getRoot()->getChildList();
150      std::vector<CContext*>::iterator
151         it = def_vector.begin(), end = def_vector.end();
152
153      out << "<? xml version=\"1.0\" ?>" << std::endl;
154      out << "<"  << xml::CXMLNode::GetRootName() << " >" << std::endl;
155
156      for (; it != end; it++)
157      {
158         CContext* context = *it;
159         CContext::setCurrent(context->getId());
160         out << *context << std::endl;
161      }
162
163      out << "</" << xml::CXMLNode::GetRootName() << " >" << std::endl;
164      CContext::setCurrent(currentContextId);
165   }
166
167
168   //----------------------------------------------------------------
169
170   //! Convert context object into string (to print)
171   StdString CContext::toString(void) const
172   {
173      StdOStringStream oss;
174      oss << "<" << CContext::GetName()
175          << " id=\"" << this->getId() << "\" "
176          << SuperClassAttribute::toString() << ">" << std::endl;
177      if (!this->hasChild())
178      {
179         //oss << "<!-- No definition -->" << std::endl; // fait planter l'incrémentation
180      }
181      else
182      {
183
184#define DECLARE_NODE(Name_, name_)    \
185   if (C##Name_##Definition::has(C##Name_##Definition::GetDefName())) \
186   oss << * C##Name_##Definition::get(C##Name_##Definition::GetDefName()) << std::endl;
187#define DECLARE_NODE_PAR(Name_, name_)
188#include "node_type.conf"
189
190      }
191
192      oss << "</" << CContext::GetName() << " >";
193
194      return (oss.str());
195   }
196
197   //----------------------------------------------------------------
198
199   /*!
200   \brief Find all inheritace among objects in a context.
201   \param [in] apply (true) write attributes of parent into ones of child if they are empty
202                     (false) write attributes of parent into a new container of child
203   \param [in] parent unused
204   */
205   void CContext::solveDescInheritance(bool apply, const CAttributeMap * const UNUSED(parent))
206   {
207#define DECLARE_NODE(Name_, name_)    \
208   if (C##Name_##Definition::has(C##Name_##Definition::GetDefName())) \
209     C##Name_##Definition::get(C##Name_##Definition::GetDefName())->solveDescInheritance(apply);
210#define DECLARE_NODE_PAR(Name_, name_)
211#include "node_type.conf"
212   }
213
214   //----------------------------------------------------------------
215
216   //! Verify if all root definition in the context have child.
217   bool CContext::hasChild(void) const
218   {
219      return (
220#define DECLARE_NODE(Name_, name_)    \
221   C##Name_##Definition::has(C##Name_##Definition::GetDefName())   ||
222#define DECLARE_NODE_PAR(Name_, name_)
223#include "node_type.conf"
224      false);
225}
226
227   //----------------------------------------------------------------
228
229   void CContext::CleanTree(void)
230   {
231#define DECLARE_NODE(Name_, name_) C##Name_##Definition::ClearAllAttributes();
232#define DECLARE_NODE_PAR(Name_, name_)
233#include "node_type.conf"
234   }
235   ///---------------------------------------------------------------
236
237   //! Initialize client side
238   void CContext::initClient(MPI_Comm intraComm, MPI_Comm interComm, CContext* cxtServer /*= 0*/)
239   {
240     hasClient=true;
241     client = new CContextClient(this,intraComm, interComm, cxtServer);
242     registryIn=new CRegistry(intraComm);
243     registryIn->setPath(getId()) ;
244     if (client->clientRank==0) registryIn->fromFile("xios_registry.bin") ;
245     registryIn->bcastRegistry() ;
246
247     registryOut=new CRegistry(intraComm) ;
248     registryOut->setPath(getId()) ;
249
250     MPI_Comm intraCommServer, interCommServer;
251     if (cxtServer) // Attached mode
252     {
253       intraCommServer = intraComm;
254       interCommServer = interComm;
255     }
256     else
257     {
258       MPI_Comm_dup(intraComm, &intraCommServer);
259       comms.push_back(intraCommServer);
260       MPI_Comm_dup(interComm, &interCommServer);
261       comms.push_back(interCommServer);
262     }
263     server = new CContextServer(this,intraCommServer,interCommServer);
264   }
265
266   void CContext::setClientServerBuffer()
267   {
268     size_t minBufferSize = CXios::minBufferSize;
269#define DECLARE_NODE(Name_, name_)    \
270     if (minBufferSize < sizeof(C##Name_##Definition)) minBufferSize = sizeof(C##Name_##Definition);
271#define DECLARE_NODE_PAR(Name_, name_)
272#include "node_type.conf"
273#undef DECLARE_NODE
274#undef DECLARE_NODE_PAR
275
276     std::map<int, StdSize> maxEventSize;
277     std::map<int, StdSize> bufferSize = getAttributesBufferSize(maxEventSize);
278     std::map<int, StdSize> dataBufferSize = getDataBufferSize(maxEventSize);
279
280     std::map<int, StdSize>::iterator it, ite = dataBufferSize.end();
281     for (it = dataBufferSize.begin(); it != ite; ++it)
282       if (it->second > bufferSize[it->first]) bufferSize[it->first] = it->second;
283
284     ite = bufferSize.end();
285     for (it = bufferSize.begin(); it != ite; ++it)
286     {
287       it->second *= CXios::bufferSizeFactor;
288       if (it->second < minBufferSize) it->second = minBufferSize;
289     }
290
291     // We consider that the minimum buffer size is also the minimum event size
292     ite = maxEventSize.end();
293     for (it = maxEventSize.begin(); it != ite; ++it)
294       if (it->second < minBufferSize) it->second = minBufferSize;
295
296     if (client->isServerLeader())
297     {
298       const std::list<int>& ranks = client->getRanksServerLeader();
299       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
300         if (!bufferSize.count(*itRank)) bufferSize[*itRank] = maxEventSize[*itRank] = minBufferSize;
301     }
302
303     client->setBufferSize(bufferSize, maxEventSize);
304   }
305
306   //! Verify whether a context is initialized
307   bool CContext::isInitialized(void)
308   {
309     return hasClient;
310   }
311
312   //! Initialize server
313   void CContext::initServer(MPI_Comm intraComm,MPI_Comm interComm, CContext* cxtClient /*= 0*/)
314   {
315     hasServer=true;
316     server = new CContextServer(this,intraComm,interComm);
317
318     registryIn=new CRegistry(intraComm);
319     registryIn->setPath(getId()) ;
320     if (server->intraCommRank==0) registryIn->fromFile("xios_registry.bin") ;
321     registryIn->bcastRegistry() ;
322     registryOut=new CRegistry(intraComm) ;
323     registryOut->setPath(getId()) ;
324
325     MPI_Comm intraCommClient, interCommClient;
326     if (cxtClient) // Attached mode
327     {
328       intraCommClient = intraComm;
329       interCommClient = interComm;
330     }
331     else
332     {
333       MPI_Comm_dup(intraComm, &intraCommClient);
334       comms.push_back(intraCommClient);
335       MPI_Comm_dup(interComm, &interCommClient);
336       comms.push_back(interCommClient);
337     }
338     client = new CContextClient(this,intraCommClient,interCommClient, cxtClient);
339   }
340
341   //! Server side: Put server into a loop in order to listen message from client
342   bool CContext::eventLoop(void)
343   {
344     return server->eventLoop();
345   }
346
347   //! Try to send the buffers and receive possible answers
348   bool CContext::checkBuffersAndListen(void)
349   {
350     client->checkBuffers();
351     return server->eventLoop();
352   }
353
354   //! Terminate a context
355   void CContext::finalize(void)
356   {
357      if (!finalized)
358      {
359        finalized = true;
360        if (hasClient) sendRegistry() ;
361        client->finalize();
362        while (!server->hasFinished())
363        {
364          server->eventLoop();
365        }
366
367        if (hasServer)
368        {
369          closeAllFile();
370          registryOut->hierarchicalGatherRegistry() ;
371          if (server->intraCommRank==0) CXios::globalRegistry->mergeRegistry(*registryOut) ;
372        }
373
374        for (std::list<MPI_Comm>::iterator it = comms.begin(); it != comms.end(); ++it)
375          MPI_Comm_free(&(*it));
376        comms.clear();
377      }
378   }
379
380   /*!
381   \brief Close all the context defintion and do processing data
382      After everything is well defined on client side, they will be processed and sent to server
383   From the version 2.0, sever and client work no more on the same database. Moreover, client(s) will send
384   all necessary information to server, from which each server can build its own database.
385   Because the role of server is to write out field data on a specific netcdf file,
386   the only information that it needs is the enabled files
387   and the active fields (fields will be written onto active files)
388   */
389   void CContext::closeDefinition(void)
390   {
391     // There is nothing client need to send to server
392     if (hasClient)
393     {
394       // After xml is parsed, there are some more works with post processing
395       postProcessing();
396     }
397     setClientServerBuffer();
398
399     if (hasClient && !hasServer)
400     {
401      // Send all attributes of current context to server
402      this->sendAllAttributesToServer();
403
404      // Send all attributes of current calendar
405      CCalendarWrapper::get(CCalendarWrapper::GetDefName())->sendAllAttributesToServer();
406
407      // We have enough information to send to server
408      // First of all, send all enabled files
409       sendEnabledFiles();
410
411      // Then, send all enabled fields
412       sendEnabledFields();
413
414      // At last, we have all info of domain and axis, then send them
415       sendRefDomainsAxis();
416
417      // After that, send all grid (if any)
418       sendRefGrid();
419    }
420
421    // We have a xml tree on the server side and now, it should be also processed
422    if (hasClient && !hasServer) sendPostProcessing();
423
424    // There are some processings that should be done after all of above. For example: check mask or index
425    if (hasClient)
426    {
427      this->buildFilterGraphOfEnabledFields();
428      buildFilterGraphOfFieldsWithReadAccess();
429      this->solveAllRefOfEnabledFields(true);
430    }
431
432    // Now tell server that it can process all messages from client
433    if (hasClient && !hasServer) this->sendCloseDefinition();
434
435    // Nettoyage de l'arborescence
436    if (hasClient && !hasServer) CleanTree(); // Only on client side??
437
438    if (hasClient)
439    {
440      sendCreateFileHeader();
441
442      startPrefetchingOfEnabledReadModeFiles();
443    }
444   }
445
446   void CContext::findAllEnabledFields(void)
447   {
448     for (unsigned int i = 0; i < this->enabledFiles.size(); i++)
449     (void)this->enabledFiles[i]->getEnabledFields();
450   }
451
452   void CContext::findAllEnabledFieldsInReadModeFiles(void)
453   {
454     for (unsigned int i = 0; i < this->enabledReadModeFiles.size(); ++i)
455     (void)this->enabledReadModeFiles[i]->getEnabledFields();
456   }
457
458   void CContext::readAttributesOfEnabledFieldsInReadModeFiles()
459   {
460      for (unsigned int i = 0; i < this->enabledReadModeFiles.size(); ++i)
461        (void)this->enabledReadModeFiles[i]->readAttributesOfEnabledFieldsInReadMode();
462   }
463
464   void CContext::solveOnlyRefOfEnabledFields(bool sendToServer)
465   {
466     int size = this->enabledFiles.size();
467     for (int i = 0; i < size; ++i)
468     {
469       this->enabledFiles[i]->solveOnlyRefOfEnabledFields(sendToServer);
470     }
471
472     for (int i = 0; i < size; ++i)
473     {
474       this->enabledFiles[i]->generateNewTransformationGridDest();
475     }
476   }
477
478   void CContext::solveAllRefOfEnabledFields(bool sendToServer)
479   {
480     int size = this->enabledFiles.size();
481     for (int i = 0; i < size; ++i)
482     {
483       this->enabledFiles[i]->solveAllRefOfEnabledFields(sendToServer);
484     }
485   }
486
487   void CContext::buildFilterGraphOfEnabledFields()
488   {
489     int size = this->enabledFiles.size();
490     for (int i = 0; i < size; ++i)
491     {
492       this->enabledFiles[i]->buildFilterGraphOfEnabledFields(garbageCollector);
493     }
494   }
495
496   void CContext::startPrefetchingOfEnabledReadModeFiles()
497   {
498     int size = enabledReadModeFiles.size();
499     for (int i = 0; i < size; ++i)
500     {
501        enabledReadModeFiles[i]->prefetchEnabledReadModeFields();
502     }
503   }
504
505   void CContext::checkPrefetchingOfEnabledReadModeFiles()
506   {
507     int size = enabledReadModeFiles.size();
508     for (int i = 0; i < size; ++i)
509     {
510        enabledReadModeFiles[i]->prefetchEnabledReadModeFieldsIfNeeded();
511     }
512   }
513
514  void CContext::findFieldsWithReadAccess(void)
515  {
516    fieldsWithReadAccess.clear();
517    const vector<CField*> allFields = CField::getAll();
518    for (size_t i = 0; i < allFields.size(); ++i)
519    {
520      CField* field = allFields[i];
521
522      if (field->file && !field->file->mode.isEmpty() && field->file->mode == CFile::mode_attr::read)
523        field->read_access = true;
524      else if (!field->read_access.isEmpty() && field->read_access && (field->enabled.isEmpty() || field->enabled))
525        fieldsWithReadAccess.push_back(field);
526    }
527  }
528
529  void CContext::solveAllRefOfFieldsWithReadAccess()
530  {
531    for (size_t i = 0; i < fieldsWithReadAccess.size(); ++i)
532      fieldsWithReadAccess[i]->solveAllReferenceEnabledField(false);
533  }
534
535  void CContext::buildFilterGraphOfFieldsWithReadAccess()
536  {
537    for (size_t i = 0; i < fieldsWithReadAccess.size(); ++i)
538      fieldsWithReadAccess[i]->buildFilterGraph(garbageCollector, true);
539  }
540
541   void CContext::solveAllInheritance(bool apply)
542   {
543     // Résolution des héritages descendants (càd des héritages de groupes)
544     // pour chacun des contextes.
545      solveDescInheritance(apply);
546
547     // Résolution des héritages par référence au niveau des fichiers.
548      const vector<CFile*> allFiles=CFile::getAll();
549      const vector<CGrid*> allGrids= CGrid::getAll();
550
551     //if (hasClient && !hasServer)
552      if (hasClient)
553      {
554        for (unsigned int i = 0; i < allFiles.size(); i++)
555          allFiles[i]->solveFieldRefInheritance(apply);
556      }
557
558      unsigned int vecSize = allGrids.size();
559      unsigned int i = 0;
560      for (i = 0; i < vecSize; ++i)
561        allGrids[i]->solveDomainAxisRefInheritance(apply);
562
563   }
564
565   void CContext::findEnabledFiles(void)
566   {
567      const std::vector<CFile*> allFiles = CFile::getAll();
568      const CDate& initDate = calendar->getInitDate();
569
570      for (unsigned int i = 0; i < allFiles.size(); i++)
571         if (!allFiles[i]->enabled.isEmpty()) // Si l'attribut 'enabled' est défini.
572         {
573            if (allFiles[i]->enabled.getValue()) // Si l'attribut 'enabled' est fixé à vrai.
574            {
575              if ((initDate + allFiles[i]->output_freq.getValue()) < (initDate + this->getCalendar()->getTimeStep()))
576              {
577                error(0)<<"WARNING: void CContext::findEnabledFiles()"<<endl
578                    << "Output frequency in file \""<<allFiles[i]->getFileOutputName()
579                    <<"\" is less than the time step. File will not be written."<<endl;
580              }
581              else
582               enabledFiles.push_back(allFiles[i]);
583            }
584         }
585         else
586         {
587           if ( (initDate + allFiles[i]->output_freq.getValue()) < (initDate + this->getCalendar()->getTimeStep()))
588           {
589             error(0)<<"WARNING: void CContext::findEnabledFiles()"<<endl
590                 << "Output frequency in file \""<<allFiles[i]->getFileOutputName()
591                 <<"\" is less than the time step. File will not be written."<<endl;
592           }
593           else
594             enabledFiles.push_back(allFiles[i]); // otherwise true by default
595         }
596
597      if (enabledFiles.size() == 0)
598         DEBUG(<<"Aucun fichier ne va être sorti dans le contexte nommé \""
599               << getId() << "\" !");
600   }
601
602   void CContext::findEnabledReadModeFiles(void)
603   {
604     int size = this->enabledFiles.size();
605     for (int i = 0; i < size; ++i)
606     {
607       if (!enabledFiles[i]->mode.isEmpty() && enabledFiles[i]->mode.getValue() == CFile::mode_attr::read)
608        enabledReadModeFiles.push_back(enabledFiles[i]);
609     }
610   }
611
612   void CContext::closeAllFile(void)
613   {
614     std::vector<CFile*>::const_iterator
615            it = this->enabledFiles.begin(), end = this->enabledFiles.end();
616
617     for (; it != end; it++)
618     {
619       info(30)<<"Closing File : "<<(*it)->getId()<<endl;
620       (*it)->close();
621     }
622   }
623
624   /*!
625   \brief Dispatch event received from client
626      Whenever a message is received in buffer of server, it will be processed depending on
627   its event type. A new event type should be added in the switch list to make sure
628   it processed on server side.
629   \param [in] event: Received message
630   */
631   bool CContext::dispatchEvent(CEventServer& event)
632   {
633
634      if (SuperClass::dispatchEvent(event)) return true;
635      else
636      {
637        switch(event.type)
638        {
639           case EVENT_ID_CLOSE_DEFINITION :
640             recvCloseDefinition(event);
641             return true;
642             break;
643           case EVENT_ID_UPDATE_CALENDAR:
644             recvUpdateCalendar(event);
645             return true;
646             break;
647           case EVENT_ID_CREATE_FILE_HEADER :
648             recvCreateFileHeader(event);
649             return true;
650             break;
651           case EVENT_ID_POST_PROCESS:
652             recvPostProcessing(event);
653             return true;
654            case EVENT_ID_SEND_REGISTRY:
655             recvRegistry(event);
656             return true;
657            break;
658
659           default :
660             ERROR("bool CContext::dispatchEvent(CEventServer& event)",
661                    <<"Unknown Event");
662           return false;
663         }
664      }
665   }
666
667   //! Client side: Send a message to server to make it close
668   void CContext::sendCloseDefinition(void)
669   {
670     CEventClient event(getType(),EVENT_ID_CLOSE_DEFINITION);
671     if (client->isServerLeader())
672     {
673       CMessage msg;
674       msg<<this->getIdServer();
675       const std::list<int>& ranks = client->getRanksServerLeader();
676       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
677         event.push(*itRank,1,msg);
678       client->sendEvent(event);
679     }
680     else client->sendEvent(event);
681   }
682
683   //! Server side: Receive a message of client announcing a context close
684   void CContext::recvCloseDefinition(CEventServer& event)
685   {
686
687      CBufferIn* buffer=event.subEvents.begin()->buffer;
688      string id;
689      *buffer>>id;
690      get(id)->closeDefinition();
691   }
692
693   //! Client side: Send a message to update calendar in each time step
694   void CContext::sendUpdateCalendar(int step)
695   {
696     if (!hasServer)
697     {
698       CEventClient event(getType(),EVENT_ID_UPDATE_CALENDAR);
699       if (client->isServerLeader())
700       {
701         CMessage msg;
702         msg<<this->getIdServer()<<step;
703         const std::list<int>& ranks = client->getRanksServerLeader();
704         for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
705           event.push(*itRank,1,msg);
706         client->sendEvent(event);
707       }
708       else client->sendEvent(event);
709     }
710   }
711
712   //! Server side: Receive a message of client annoucing calendar update
713   void CContext::recvUpdateCalendar(CEventServer& event)
714   {
715      CBufferIn* buffer=event.subEvents.begin()->buffer;
716      string id;
717      *buffer>>id;
718      get(id)->recvUpdateCalendar(*buffer);
719   }
720
721   //! Server side: Receive a message of client annoucing calendar update
722   void CContext::recvUpdateCalendar(CBufferIn& buffer)
723   {
724      int step;
725      buffer>>step;
726      updateCalendar(step);
727   }
728
729   //! Client side: Send a message to create header part of netcdf file
730   void CContext::sendCreateFileHeader(void)
731   {
732     CEventClient event(getType(),EVENT_ID_CREATE_FILE_HEADER);
733     if (client->isServerLeader())
734     {
735       CMessage msg;
736       msg<<this->getIdServer();
737       const std::list<int>& ranks = client->getRanksServerLeader();
738       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
739         event.push(*itRank,1,msg) ;
740       client->sendEvent(event);
741     }
742     else client->sendEvent(event);
743   }
744
745   //! Server side: Receive a message of client annoucing the creation of header part of netcdf file
746   void CContext::recvCreateFileHeader(CEventServer& event)
747   {
748      CBufferIn* buffer=event.subEvents.begin()->buffer;
749      string id;
750      *buffer>>id;
751      get(id)->recvCreateFileHeader(*buffer);
752   }
753
754   //! Server side: Receive a message of client annoucing the creation of header part of netcdf file
755   void CContext::recvCreateFileHeader(CBufferIn& buffer)
756   {
757      createFileHeader();
758   }
759
760   //! Client side: Send a message to do some post processing on server
761   void CContext::sendPostProcessing()
762   {
763     if (!hasServer)
764     {
765       CEventClient event(getType(),EVENT_ID_POST_PROCESS);
766       if (client->isServerLeader())
767       {
768         CMessage msg;
769         msg<<this->getIdServer();
770         const std::list<int>& ranks = client->getRanksServerLeader();
771         for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
772           event.push(*itRank,1,msg);
773         client->sendEvent(event);
774       }
775       else client->sendEvent(event);
776     }
777   }
778
779   //! Server side: Receive a message to do some post processing
780   void CContext::recvPostProcessing(CEventServer& event)
781   {
782      CBufferIn* buffer=event.subEvents.begin()->buffer;
783      string id;
784      *buffer>>id;
785      get(id)->recvPostProcessing(*buffer);
786   }
787
788   //! Server side: Receive a message to do some post processing
789   void CContext::recvPostProcessing(CBufferIn& buffer)
790   {
791      CCalendarWrapper::get(CCalendarWrapper::GetDefName())->createCalendar();
792      postProcessing();
793   }
794
795   const StdString& CContext::getIdServer()
796   {
797      if (hasClient)
798      {
799        idServer_ = this->getId();
800        idServer_ += "_server";
801        return idServer_;
802      }
803      if (hasServer) return (this->getId());
804   }
805
806   /*!
807   \brief Do some simple post processings after parsing xml file
808      After the xml file (iodef.xml) is parsed, it is necessary to build all relations among
809   created object, e.g: inhertance among fields, domain, axis. After that, all fiels as well as their parents (reference fields),
810   which will be written out into netcdf files, are processed
811   */
812   void CContext::postProcessing()
813   {
814     if (isPostProcessed) return;
815
816      // Make sure the calendar was correctly created
817      if (!calendar)
818        ERROR("CContext::postProcessing()", << "A calendar must be defined for the context \"" << getId() << "!\"")
819      else if (calendar->getTimeStep() == NoneDu)
820        ERROR("CContext::postProcessing()", << "A timestep must be defined for the context \"" << getId() << "!\"")
821      // Calendar first update to set the current date equals to the start date
822      calendar->update(0);
823
824      // Find all inheritance in xml structure
825      this->solveAllInheritance();
826
827      // Check if some axis, domains or grids are eligible to for compressed indexed output.
828      // Warning: This must be done after solving the inheritance and before the rest of post-processing
829      checkAxisDomainsGridsEligibilityForCompressedOutput();
830
831      // Check if some automatic time series should be generated
832      // Warning: This must be done after solving the inheritance and before the rest of post-processing
833      prepareTimeseries();
834
835      //Initialisation du vecteur 'enabledFiles' contenant la liste des fichiers à sortir.
836      this->findEnabledFiles();
837      this->findEnabledReadModeFiles();
838
839      // Find all enabled fields of each file
840      this->findAllEnabledFields();
841      this->findAllEnabledFieldsInReadModeFiles();
842
843     if (hasClient && !hasServer)
844     {
845      // Try to read attributes of fields in file then fill in corresponding grid (or domain, axis)
846      this->readAttributesOfEnabledFieldsInReadModeFiles();
847     }
848
849      // Only search and rebuild all reference objects of enable fields, don't transform
850      this->solveOnlyRefOfEnabledFields(false);
851
852      // Search and rebuild all reference object of enabled fields
853      this->solveAllRefOfEnabledFields(false);
854
855      // Find all fields with read access from the public API
856      findFieldsWithReadAccess();
857      // and solve the all reference for them
858      solveAllRefOfFieldsWithReadAccess();
859
860      isPostProcessed = true;
861   }
862
863   /*!
864    * Compute the required buffer size to send the attributes (mostly those grid related).
865    *
866    * \param maxEventSize [in/out] the size of the bigger event for each connected server
867    */
868   std::map<int, StdSize> CContext::getAttributesBufferSize(std::map<int, StdSize>& maxEventSize)
869   {
870     std::map<int, StdSize> attributesSize;
871
872     if (hasClient)
873     {
874       size_t numEnabledFiles = this->enabledFiles.size();
875       for (size_t i = 0; i < numEnabledFiles; ++i)
876       {
877         CFile* file = this->enabledFiles[i];
878
879         std::vector<CField*> enabledFields = file->getEnabledFields();
880         size_t numEnabledFields = enabledFields.size();
881         for (size_t j = 0; j < numEnabledFields; ++j)
882         {
883           const std::map<int, StdSize> mapSize = enabledFields[j]->getGridAttributesBufferSize();
884           std::map<int, StdSize>::const_iterator it = mapSize.begin(), itE = mapSize.end();
885           for (; it != itE; ++it)
886           {
887             // If attributesSize[it->first] does not exist, it will be zero-initialized
888             // so we can use it safely without checking for its existance
889             if (attributesSize[it->first] < it->second)
890               attributesSize[it->first] = it->second;
891
892             if (maxEventSize[it->first] < it->second)
893               maxEventSize[it->first] = it->second;
894           }
895         }
896       }
897     }
898
899     return attributesSize;
900   }
901
902   /*!
903    * Compute the required buffer size to send the fields data.
904    *
905    * \param maxEventSize [in/out] the size of the bigger event for each connected server
906    */
907   std::map<int, StdSize> CContext::getDataBufferSize(std::map<int, StdSize>& maxEventSize)
908   {
909     CFile::mode_attr::t_enum mode = hasClient ? CFile::mode_attr::write : CFile::mode_attr::read;
910
911     std::map<int, StdSize> dataSize;
912
913     // Find all reference domain and axis of all active fields
914     size_t numEnabledFiles = this->enabledFiles.size();
915     for (size_t i = 0; i < numEnabledFiles; ++i)
916     {
917       CFile* file = this->enabledFiles[i];
918       CFile::mode_attr::t_enum fileMode = file->mode.isEmpty() ? CFile::mode_attr::write : file->mode.getValue();
919
920       if (fileMode == mode)
921       {
922         std::vector<CField*> enabledFields = file->getEnabledFields();
923         size_t numEnabledFields = enabledFields.size();
924         for (size_t j = 0; j < numEnabledFields; ++j)
925         {
926           const std::map<int, StdSize> mapSize = enabledFields[j]->getGridDataBufferSize();
927           std::map<int, StdSize>::const_iterator it = mapSize.begin(), itE = mapSize.end();
928           for (; it != itE; ++it)
929           {
930             // If dataSize[it->first] does not exist, it will be zero-initialized
931             // so we can use it safely without checking for its existance
932             if (CXios::isOptPerformance)
933               dataSize[it->first] += it->second;
934             else if (dataSize[it->first] < it->second)
935               dataSize[it->first] = it->second;
936
937             if (maxEventSize[it->first] < it->second)
938               maxEventSize[it->first] = it->second;
939           }
940         }
941       }
942     }
943
944     return dataSize;
945   }
946
947   //! Client side: Send infomation of active files (files are enabled to write out)
948   void CContext::sendEnabledFiles()
949   {
950     int size = this->enabledFiles.size();
951
952     // In a context, each type has a root definition, e.g: axis, domain, field.
953     // Every object must be a child of one of these root definition. In this case
954     // all new file objects created on server must be children of the root "file_definition"
955     StdString fileDefRoot("file_definition");
956     CFileGroup* cfgrpPtr = CFileGroup::get(fileDefRoot);
957
958     for (int i = 0; i < size; ++i)
959     {
960       cfgrpPtr->sendCreateChild(this->enabledFiles[i]->getId());
961       this->enabledFiles[i]->sendAllAttributesToServer();
962       this->enabledFiles[i]->sendAddAllVariables();
963     }
964   }
965
966   //! Client side: Send information of active fields (ones are written onto files)
967   void CContext::sendEnabledFields()
968   {
969     int size = this->enabledFiles.size();
970     for (int i = 0; i < size; ++i)
971     {
972       this->enabledFiles[i]->sendEnabledFields();
973     }
974   }
975
976   //! Client side: Check if the defined axis, domains and grids are eligible for compressed indexed output
977   void CContext::checkAxisDomainsGridsEligibilityForCompressedOutput()
978   {
979     if (!hasClient) return;
980
981     const vector<CAxis*> allAxis = CAxis::getAll();
982     for (vector<CAxis*>::const_iterator it = allAxis.begin(); it != allAxis.end(); it++)
983       (*it)->checkEligibilityForCompressedOutput();
984
985     const vector<CDomain*> allDomains = CDomain::getAll();
986     for (vector<CDomain*>::const_iterator it = allDomains.begin(); it != allDomains.end(); it++)
987       (*it)->checkEligibilityForCompressedOutput();
988
989     const vector<CGrid*> allGrids = CGrid::getAll();
990     for (vector<CGrid*>::const_iterator it = allGrids.begin(); it != allGrids.end(); it++)
991       (*it)->checkEligibilityForCompressedOutput();
992   }
993
994   //! Client side: Prepare the timeseries by adding the necessary files
995   void CContext::prepareTimeseries()
996   {
997     if (!hasClient) return;
998
999     const std::vector<CFile*> allFiles = CFile::getAll();
1000     for (size_t i = 0; i < allFiles.size(); i++)
1001     {
1002       CFile* file = allFiles[i];
1003
1004       if (!file->timeseries.isEmpty() && file->timeseries != CFile::timeseries_attr::none)
1005       {
1006         StdString tsPrefix = !file->ts_prefix.isEmpty() ? file->ts_prefix : file->getFileOutputName();
1007
1008         const std::vector<CField*> allFields = file->getAllFields();
1009         for (size_t j = 0; j < allFields.size(); j++)
1010         {
1011           CField* field = allFields[j];
1012
1013           if (!field->ts_enabled.isEmpty() && field->ts_enabled)
1014           {
1015             CFile* tsFile = CFile::create();
1016             tsFile->duplicateAttributes(file);
1017             tsFile->setVirtualVariableGroup(file->getVirtualVariableGroup());
1018
1019             tsFile->name = tsPrefix + "_";
1020             if (!field->name.isEmpty())
1021               tsFile->name.get() += field->name;
1022             else if (field->hasDirectFieldReference()) // We cannot use getBaseFieldReference() just yet
1023               tsFile->name.get() += field->field_ref;
1024             else
1025               tsFile->name.get() += field->getId();
1026
1027             if (!field->ts_split_freq.isEmpty())
1028               tsFile->split_freq = field->ts_split_freq;
1029
1030             CField* tsField = tsFile->addField();
1031             tsField->field_ref = field->getId();
1032             tsField->setVirtualVariableGroup(field->getVirtualVariableGroup());
1033
1034             tsFile->solveFieldRefInheritance(true);
1035
1036             if (file->timeseries == CFile::timeseries_attr::exclusive)
1037               field->enabled = false;
1038           }
1039         }
1040
1041         // Finally disable the original file is need be
1042         if (file->timeseries == CFile::timeseries_attr::only)
1043          file->enabled = false;
1044       }
1045     }
1046   }
1047
1048   //! Client side: Send information of reference grid of active fields
1049   void CContext::sendRefGrid()
1050   {
1051     std::set<StdString> gridIds;
1052     int sizeFile = this->enabledFiles.size();
1053     CFile* filePtr(NULL);
1054
1055     // Firstly, find all reference grids of all active fields
1056     for (int i = 0; i < sizeFile; ++i)
1057     {
1058       filePtr = this->enabledFiles[i];
1059       std::vector<CField*> enabledFields = filePtr->getEnabledFields();
1060       int sizeField = enabledFields.size();
1061       for (int numField = 0; numField < sizeField; ++numField)
1062       {
1063         if (0 != enabledFields[numField]->getRelGrid())
1064           gridIds.insert(CGrid::get(enabledFields[numField]->getRelGrid())->getId());
1065       }
1066     }
1067
1068     // Create all reference grids on server side
1069     StdString gridDefRoot("grid_definition");
1070     CGridGroup* gridPtr = CGridGroup::get(gridDefRoot);
1071     std::set<StdString>::const_iterator it, itE = gridIds.end();
1072     for (it = gridIds.begin(); it != itE; ++it)
1073     {
1074       gridPtr->sendCreateChild(*it);
1075       CGrid::get(*it)->sendAllAttributesToServer();
1076       CGrid::get(*it)->sendAllDomains();
1077       CGrid::get(*it)->sendAllAxis();
1078       CGrid::get(*it)->sendAllScalars();
1079     }
1080   }
1081
1082
1083   //! Client side: Send information of reference domain and axis of active fields
1084   void CContext::sendRefDomainsAxis()
1085   {
1086     std::set<StdString> domainIds, axisIds, scalarIds;
1087
1088     // Find all reference domain and axis of all active fields
1089     int numEnabledFiles = this->enabledFiles.size();
1090     for (int i = 0; i < numEnabledFiles; ++i)
1091     {
1092       std::vector<CField*> enabledFields = this->enabledFiles[i]->getEnabledFields();
1093       int numEnabledFields = enabledFields.size();
1094       for (int j = 0; j < numEnabledFields; ++j)
1095       {
1096         const std::vector<StdString>& prDomAxisScalarId = enabledFields[j]->getRefDomainAxisIds();
1097         if ("" != prDomAxisScalarId[0]) domainIds.insert(prDomAxisScalarId[0]);
1098         if ("" != prDomAxisScalarId[1]) axisIds.insert(prDomAxisScalarId[1]);
1099         if ("" != prDomAxisScalarId[2]) scalarIds.insert(prDomAxisScalarId[2]);
1100       }
1101     }
1102
1103     // Create all reference axis on server side
1104     std::set<StdString>::iterator itDom, itAxis, itScalar;
1105     std::set<StdString>::const_iterator itE;
1106
1107     StdString scalarDefRoot("scalar_definition");
1108     CScalarGroup* scalarPtr = CScalarGroup::get(scalarDefRoot);
1109     itE = scalarIds.end();
1110     for (itScalar = scalarIds.begin(); itScalar != itE; ++itScalar)
1111     {
1112       if (!itScalar->empty())
1113       {
1114         scalarPtr->sendCreateChild(*itScalar);
1115         CScalar::get(*itScalar)->sendAllAttributesToServer();
1116       }
1117     }
1118
1119     StdString axiDefRoot("axis_definition");
1120     CAxisGroup* axisPtr = CAxisGroup::get(axiDefRoot);
1121     itE = axisIds.end();
1122     for (itAxis = axisIds.begin(); itAxis != itE; ++itAxis)
1123     {
1124       if (!itAxis->empty())
1125       {
1126         axisPtr->sendCreateChild(*itAxis);
1127         CAxis::get(*itAxis)->sendAllAttributesToServer();
1128       }
1129     }
1130
1131     // Create all reference domains on server side
1132     StdString domDefRoot("domain_definition");
1133     CDomainGroup* domPtr = CDomainGroup::get(domDefRoot);
1134     itE = domainIds.end();
1135     for (itDom = domainIds.begin(); itDom != itE; ++itDom)
1136     {
1137       if (!itDom->empty()) {
1138          domPtr->sendCreateChild(*itDom);
1139          CDomain::get(*itDom)->sendAllAttributesToServer();
1140       }
1141     }
1142   }
1143
1144   //! Update calendar in each time step
1145   void CContext::updateCalendar(int step)
1146   {
1147      info(50) << "updateCalendar : before : " << calendar->getCurrentDate() << endl;
1148      calendar->update(step);
1149      info(50) << "updateCalendar : after : " << calendar->getCurrentDate() << endl;
1150
1151      if (hasClient)
1152      {
1153        checkPrefetchingOfEnabledReadModeFiles();
1154        garbageCollector.invalidate(calendar->getCurrentDate());
1155      }
1156   }
1157
1158   //! Server side: Create header of netcdf file
1159   void CContext::createFileHeader(void )
1160   {
1161      vector<CFile*>::const_iterator it;
1162
1163      for (it=enabledFiles.begin(); it != enabledFiles.end(); it++)
1164      {
1165         (*it)->initFile();
1166      }
1167   }
1168
1169   //! Get current context
1170   CContext* CContext::getCurrent(void)
1171   {
1172     return CObjectFactory::GetObject<CContext>(CObjectFactory::GetCurrentContextId()).get();
1173   }
1174
1175   /*!
1176   \brief Set context with an id be the current context
1177   \param [in] id identity of context to be set to current
1178   */
1179   void CContext::setCurrent(const string& id)
1180   {
1181     CObjectFactory::SetCurrentContextId(id);
1182     CGroupFactory::SetCurrentContextId(id);
1183   }
1184
1185  /*!
1186  \brief Create a context with specific id
1187  \param [in] id identity of new context
1188  \return pointer to the new context or already-existed one with identity id
1189  */
1190  CContext* CContext::create(const StdString& id)
1191  {
1192    CContext::setCurrent(id);
1193
1194    bool hasctxt = CContext::has(id);
1195    CContext* context = CObjectFactory::CreateObject<CContext>(id).get();
1196    getRoot();
1197    if (!hasctxt) CGroupFactory::AddChild(root, context->getShared());
1198
1199#define DECLARE_NODE(Name_, name_) \
1200    C##Name_##Definition::create(C##Name_##Definition::GetDefName());
1201#define DECLARE_NODE_PAR(Name_, name_)
1202#include "node_type.conf"
1203
1204    return (context);
1205  }
1206
1207
1208
1209     //! Server side: Receive a message to do some post processing
1210  void CContext::recvRegistry(CEventServer& event)
1211  {
1212    CBufferIn* buffer=event.subEvents.begin()->buffer;
1213    string id;
1214    *buffer>>id;
1215    get(id)->recvRegistry(*buffer);
1216  }
1217
1218  void CContext::recvRegistry(CBufferIn& buffer)
1219  {
1220    if (server->intraCommRank==0)
1221    {
1222      CRegistry registry(server->intraComm) ;
1223      registry.fromBuffer(buffer) ;
1224      registryOut->mergeRegistry(registry) ;
1225    }
1226  }
1227
1228  void CContext::sendRegistry(void)
1229  {
1230    registryOut->hierarchicalGatherRegistry() ;
1231
1232    CEventClient event(CContext::GetType(), CContext::EVENT_ID_SEND_REGISTRY);
1233    if (client->isServerLeader())
1234    {
1235       CMessage msg ;
1236       msg<<this->getIdServer();
1237       if (client->clientRank==0) msg<<*registryOut ;
1238       const std::list<int>& ranks = client->getRanksServerLeader();
1239       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
1240         event.push(*itRank,1,msg);
1241       client->sendEvent(event);
1242     }
1243     else client->sendEvent(event);
1244  }
1245
1246} // namespace xios
Note: See TracBrowser for help on using the repository browser.