source: XIOS/trunk/src/node/context.cpp @ 1091

Last change on this file since 1091 was 1091, checked in by ymipsl, 5 years ago

Enhancement : in file attribut ts_prefix, the substring "%file_name%", if found, is replaced by the file name.
(untested for now)

YM

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
File size: 43.1 KB
Line 
1#include "context.hpp"
2#include "attribute_template.hpp"
3#include "object_template.hpp"
4#include "group_template.hpp"
5
6#include "calendar_type.hpp"
7#include "duration.hpp"
8
9#include "context_client.hpp"
10#include "context_server.hpp"
11#include "nc4_data_output.hpp"
12#include "node_type.hpp"
13#include "message.hpp"
14#include "type.hpp"
15#include "xios_spl.hpp"
16
17
18namespace xios {
19
20  shared_ptr<CContextGroup> CContext::root;
21
22   /// ////////////////////// Définitions ////////////////////// ///
23
24   CContext::CContext(void)
25      : CObjectTemplate<CContext>(), CContextAttributes()
26      , calendar(), hasClient(false), hasServer(false), isPostProcessed(false), finalized(false)
27      , idServer_(), client(0), server(0)
28   { /* Ne rien faire de plus */ }
29
30   CContext::CContext(const StdString & id)
31      : CObjectTemplate<CContext>(id), CContextAttributes()
32      , calendar(), hasClient(false), hasServer(false), isPostProcessed(false), finalized(false)
33      , idServer_(), client(0), server(0)
34   { /* Ne rien faire de plus */ }
35
36   CContext::~CContext(void)
37   {
38     delete client;
39     delete server;
40   }
41
42   //----------------------------------------------------------------
43   //! Get name of context
44   StdString CContext::GetName(void)   { return (StdString("context")); }
45   StdString CContext::GetDefName(void){ return (CContext::GetName()); }
46   ENodeType CContext::GetType(void)   { return (eContext); }
47
48   //----------------------------------------------------------------
49
50   /*!
51   \brief Get context group (context root)
52   \return Context root
53   */
54   CContextGroup* CContext::getRoot(void)
55   {
56      if (root.get()==NULL) root=shared_ptr<CContextGroup>(new CContextGroup(xml::CXMLNode::GetRootName()));
57      return root.get();
58   }
59
60   //----------------------------------------------------------------
61
62   /*!
63   \brief Get calendar of a context
64   \return Calendar
65   */
66   boost::shared_ptr<CCalendar> CContext::getCalendar(void) const
67   {
68      return (this->calendar);
69   }
70
71   //----------------------------------------------------------------
72
73   /*!
74   \brief Set a context with a calendar
75   \param[in] newCalendar new calendar
76   */
77   void CContext::setCalendar(boost::shared_ptr<CCalendar> newCalendar)
78   {
79      this->calendar = newCalendar;
80   }
81
82   //----------------------------------------------------------------
83   /*!
84   \brief Parse xml file and write information into context object
85   \param [in] node xmld node corresponding in xml file
86   */
87   void CContext::parse(xml::CXMLNode & node)
88   {
89      CContext::SuperClass::parse(node);
90
91      // PARSING POUR GESTION DES ENFANTS
92      xml::THashAttributes attributes = node.getAttributes();
93
94      if (attributes.end() != attributes.find("src"))
95      {
96         StdIFStream ifs ( attributes["src"].c_str() , StdIFStream::in );
97         if ( (ifs.rdstate() & std::ifstream::failbit ) != 0 )
98            ERROR("void CContext::parse(xml::CXMLNode & node)",
99                  <<endl<< "Can not open <"<<attributes["src"].c_str()<<"> file" );
100         if (!ifs.good())
101            ERROR("CContext::parse(xml::CXMLNode & node)",
102                  << "[ filename = " << attributes["src"] << " ] Bad xml stream !");
103         xml::CXMLParser::ParseInclude(ifs, attributes["src"], *this);
104      }
105
106      if (node.getElementName().compare(CContext::GetName()))
107         DEBUG("Le noeud is wrong defined but will be considered as a context !");
108
109      if (!(node.goToChildElement()))
110      {
111         DEBUG("Le context ne contient pas d'enfant !");
112      }
113      else
114      {
115         do { // Parcours des contextes pour traitement.
116
117            StdString name = node.getElementName();
118            attributes.clear();
119            attributes = node.getAttributes();
120
121            if (attributes.end() != attributes.find("id"))
122            {
123              DEBUG(<< "Definition node has an id,"
124                    << "it will not be taking account !");
125            }
126
127#define DECLARE_NODE(Name_, name_)    \
128   if (name.compare(C##Name_##Definition::GetDefName()) == 0) \
129   { C##Name_##Definition::create(C##Name_##Definition::GetDefName()) -> parse(node); continue; }
130#define DECLARE_NODE_PAR(Name_, name_)
131#include "node_type.conf"
132
133            DEBUG(<< "The element \'"     << name
134                  << "\' in the context \'" << CContext::getCurrent()->getId()
135                  << "\' is not a definition !");
136
137         } while (node.goToNextElement());
138
139         node.goToParentElement(); // Retour au parent
140      }
141   }
142
143   //----------------------------------------------------------------
144   //! Show tree structure of context
145   void CContext::ShowTree(StdOStream & out)
146   {
147      StdString currentContextId = CContext::getCurrent() -> getId();
148      std::vector<CContext*> def_vector =
149         CContext::getRoot()->getChildList();
150      std::vector<CContext*>::iterator
151         it = def_vector.begin(), end = def_vector.end();
152
153      out << "<? xml version=\"1.0\" ?>" << std::endl;
154      out << "<"  << xml::CXMLNode::GetRootName() << " >" << std::endl;
155
156      for (; it != end; it++)
157      {
158         CContext* context = *it;
159         CContext::setCurrent(context->getId());
160         out << *context << std::endl;
161      }
162
163      out << "</" << xml::CXMLNode::GetRootName() << " >" << std::endl;
164      CContext::setCurrent(currentContextId);
165   }
166
167
168   //----------------------------------------------------------------
169
170   //! Convert context object into string (to print)
171   StdString CContext::toString(void) const
172   {
173      StdOStringStream oss;
174      oss << "<" << CContext::GetName()
175          << " id=\"" << this->getId() << "\" "
176          << SuperClassAttribute::toString() << ">" << std::endl;
177      if (!this->hasChild())
178      {
179         //oss << "<!-- No definition -->" << std::endl; // fait planter l'incrémentation
180      }
181      else
182      {
183
184#define DECLARE_NODE(Name_, name_)    \
185   if (C##Name_##Definition::has(C##Name_##Definition::GetDefName())) \
186   oss << * C##Name_##Definition::get(C##Name_##Definition::GetDefName()) << std::endl;
187#define DECLARE_NODE_PAR(Name_, name_)
188#include "node_type.conf"
189
190      }
191
192      oss << "</" << CContext::GetName() << " >";
193
194      return (oss.str());
195   }
196
197   //----------------------------------------------------------------
198
199   /*!
200   \brief Find all inheritace among objects in a context.
201   \param [in] apply (true) write attributes of parent into ones of child if they are empty
202                     (false) write attributes of parent into a new container of child
203   \param [in] parent unused
204   */
205   void CContext::solveDescInheritance(bool apply, const CAttributeMap * const UNUSED(parent))
206   {
207#define DECLARE_NODE(Name_, name_)    \
208   if (C##Name_##Definition::has(C##Name_##Definition::GetDefName())) \
209     C##Name_##Definition::get(C##Name_##Definition::GetDefName())->solveDescInheritance(apply);
210#define DECLARE_NODE_PAR(Name_, name_)
211#include "node_type.conf"
212   }
213
214   //----------------------------------------------------------------
215
216   //! Verify if all root definition in the context have child.
217   bool CContext::hasChild(void) const
218   {
219      return (
220#define DECLARE_NODE(Name_, name_)    \
221   C##Name_##Definition::has(C##Name_##Definition::GetDefName())   ||
222#define DECLARE_NODE_PAR(Name_, name_)
223#include "node_type.conf"
224      false);
225}
226
227   //----------------------------------------------------------------
228
229   void CContext::CleanTree(void)
230   {
231#define DECLARE_NODE(Name_, name_) C##Name_##Definition::ClearAllAttributes();
232#define DECLARE_NODE_PAR(Name_, name_)
233#include "node_type.conf"
234   }
235   ///---------------------------------------------------------------
236
237   //! Initialize client side
238   void CContext::initClient(MPI_Comm intraComm, MPI_Comm interComm, CContext* cxtServer /*= 0*/)
239   {
240     hasClient=true;
241     client = new CContextClient(this,intraComm, interComm, cxtServer);
242     registryIn=new CRegistry(intraComm);
243     registryIn->setPath(getId()) ;
244     if (client->clientRank==0) registryIn->fromFile("xios_registry.bin") ;
245     registryIn->bcastRegistry() ;
246
247     registryOut=new CRegistry(intraComm) ;
248     registryOut->setPath(getId()) ;
249
250     MPI_Comm intraCommServer, interCommServer;
251     if (cxtServer) // Attached mode
252     {
253       intraCommServer = intraComm;
254       interCommServer = interComm;
255     }
256     else
257     {
258       MPI_Comm_dup(intraComm, &intraCommServer);
259       comms.push_back(intraCommServer);
260       MPI_Comm_dup(interComm, &interCommServer);
261       comms.push_back(interCommServer);
262     }
263     server = new CContextServer(this,intraCommServer,interCommServer);
264   }
265
266   void CContext::setClientServerBuffer()
267   {
268     size_t minBufferSize = CXios::minBufferSize;
269#define DECLARE_NODE(Name_, name_)    \
270     if (minBufferSize < sizeof(C##Name_##Definition)) minBufferSize = sizeof(C##Name_##Definition);
271#define DECLARE_NODE_PAR(Name_, name_)
272#include "node_type.conf"
273#undef DECLARE_NODE
274#undef DECLARE_NODE_PAR
275
276     std::map<int, StdSize> maxEventSize;
277     std::map<int, StdSize> bufferSize = getAttributesBufferSize(maxEventSize);
278     std::map<int, StdSize> dataBufferSize = getDataBufferSize(maxEventSize);
279
280     std::map<int, StdSize>::iterator it, ite = dataBufferSize.end();
281     for (it = dataBufferSize.begin(); it != ite; ++it)
282       if (it->second > bufferSize[it->first]) bufferSize[it->first] = it->second;
283
284     ite = bufferSize.end();
285     for (it = bufferSize.begin(); it != ite; ++it)
286     {
287       it->second *= CXios::bufferSizeFactor;
288       if (it->second < minBufferSize) it->second = minBufferSize;
289     }
290
291     // We consider that the minimum buffer size is also the minimum event size
292     ite = maxEventSize.end();
293     for (it = maxEventSize.begin(); it != ite; ++it)
294       if (it->second < minBufferSize) it->second = minBufferSize;
295
296     if (client->isServerLeader())
297     {
298       const std::list<int>& ranks = client->getRanksServerLeader();
299       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
300         if (!bufferSize.count(*itRank)) bufferSize[*itRank] = maxEventSize[*itRank] = minBufferSize;
301     }
302
303     client->setBufferSize(bufferSize, maxEventSize);
304   }
305
306   //! Verify whether a context is initialized
307   bool CContext::isInitialized(void)
308   {
309     return hasClient;
310   }
311
312   //! Initialize server
313   void CContext::initServer(MPI_Comm intraComm,MPI_Comm interComm, CContext* cxtClient /*= 0*/)
314   {
315     hasServer=true;
316     server = new CContextServer(this,intraComm,interComm);
317
318     registryIn=new CRegistry(intraComm);
319     registryIn->setPath(getId()) ;
320     if (server->intraCommRank==0) registryIn->fromFile("xios_registry.bin") ;
321     registryIn->bcastRegistry() ;
322     registryOut=new CRegistry(intraComm) ;
323     registryOut->setPath(getId()) ;
324
325     MPI_Comm intraCommClient, interCommClient;
326     if (cxtClient) // Attached mode
327     {
328       intraCommClient = intraComm;
329       interCommClient = interComm;
330     }
331     else
332     {
333       MPI_Comm_dup(intraComm, &intraCommClient);
334       comms.push_back(intraCommClient);
335       MPI_Comm_dup(interComm, &interCommClient);
336       comms.push_back(interCommClient);
337     }
338     client = new CContextClient(this,intraCommClient,interCommClient, cxtClient);
339   }
340
341   //! Try to send the buffers and receive possible answers
342   bool CContext::checkBuffersAndListen(void)
343   {
344     client->checkBuffers();
345
346     bool hasTmpBufferedEvent = client->hasTemporarilyBufferedEvent();
347     if (hasTmpBufferedEvent)
348       hasTmpBufferedEvent = !client->sendTemporarilyBufferedEvent();
349
350     // Don't process events if there is a temporarily buffered event
351     return server->eventLoop(!hasTmpBufferedEvent);
352   }
353
354   //! Terminate a context
355   void CContext::finalize(void)
356   {
357      if (!finalized)
358      {
359        finalized = true;
360        if (hasClient) sendRegistry() ;
361        client->finalize();
362        while (!server->hasFinished())
363        {
364          server->eventLoop();
365        }
366
367        if (hasServer)
368        {
369          closeAllFile();
370          registryOut->hierarchicalGatherRegistry() ;
371          if (server->intraCommRank==0) CXios::globalRegistry->mergeRegistry(*registryOut) ;
372        }
373
374        for (std::list<MPI_Comm>::iterator it = comms.begin(); it != comms.end(); ++it)
375          MPI_Comm_free(&(*it));
376        comms.clear();
377      }
378   }
379
380   /*!
381   \brief Close all the context defintion and do processing data
382      After everything is well defined on client side, they will be processed and sent to server
383   From the version 2.0, sever and client work no more on the same database. Moreover, client(s) will send
384   all necessary information to server, from which each server can build its own database.
385   Because the role of server is to write out field data on a specific netcdf file,
386   the only information that it needs is the enabled files
387   and the active fields (fields will be written onto active files)
388   */
389   void CContext::closeDefinition(void)
390   {
391     // There is nothing client need to send to server
392     if (hasClient)
393     {
394       // After xml is parsed, there are some more works with post processing
395       postProcessing();
396     }
397     setClientServerBuffer();
398
399     if (hasClient && !hasServer)
400     {
401      // Send all attributes of current context to server
402      this->sendAllAttributesToServer();
403
404      // Send all attributes of current calendar
405      CCalendarWrapper::get(CCalendarWrapper::GetDefName())->sendAllAttributesToServer();
406
407      // We have enough information to send to server
408      // First of all, send all enabled files
409       sendEnabledFiles();
410
411      // Then, send all enabled fields
412       sendEnabledFields();
413
414      // At last, we have all info of domain and axis, then send them
415       sendRefDomainsAxis();
416
417      // After that, send all grid (if any)
418       sendRefGrid();
419    }
420
421    // We have a xml tree on the server side and now, it should be also processed
422    if (hasClient && !hasServer) sendPostProcessing();
423
424    // There are some processings that should be done after all of above. For example: check mask or index
425    if (hasClient)
426    {
427      this->buildFilterGraphOfEnabledFields();
428      buildFilterGraphOfFieldsWithReadAccess();
429      this->solveAllRefOfEnabledFields(true);
430    }
431
432    // Now tell server that it can process all messages from client
433    if (hasClient && !hasServer) this->sendCloseDefinition();
434
435    // Nettoyage de l'arborescence
436    if (hasClient && !hasServer) CleanTree(); // Only on client side??
437
438    if (hasClient)
439    {
440      sendCreateFileHeader();
441
442      startPrefetchingOfEnabledReadModeFiles();
443    }
444   }
445
446   void CContext::findAllEnabledFields(void)
447   {
448     for (unsigned int i = 0; i < this->enabledFiles.size(); i++)
449     (void)this->enabledFiles[i]->getEnabledFields();
450   }
451
452   void CContext::findAllEnabledFieldsInReadModeFiles(void)
453   {
454     for (unsigned int i = 0; i < this->enabledReadModeFiles.size(); ++i)
455     (void)this->enabledReadModeFiles[i]->getEnabledFields();
456   }
457
458   void CContext::readAttributesOfEnabledFieldsInReadModeFiles()
459   {
460      for (unsigned int i = 0; i < this->enabledReadModeFiles.size(); ++i)
461        (void)this->enabledReadModeFiles[i]->readAttributesOfEnabledFieldsInReadMode();
462   }
463
464   void CContext::solveOnlyRefOfEnabledFields(bool sendToServer)
465   {
466     int size = this->enabledFiles.size();
467     for (int i = 0; i < size; ++i)
468     {
469       this->enabledFiles[i]->solveOnlyRefOfEnabledFields(sendToServer);
470     }
471
472     for (int i = 0; i < size; ++i)
473     {
474       this->enabledFiles[i]->generateNewTransformationGridDest();
475     }
476   }
477
478   void CContext::solveAllRefOfEnabledFields(bool sendToServer)
479   {
480     int size = this->enabledFiles.size();
481     for (int i = 0; i < size; ++i)
482     {
483       this->enabledFiles[i]->solveAllRefOfEnabledFields(sendToServer);
484     }
485   }
486
487   void CContext::buildFilterGraphOfEnabledFields()
488   {
489     int size = this->enabledFiles.size();
490     for (int i = 0; i < size; ++i)
491     {
492       this->enabledFiles[i]->buildFilterGraphOfEnabledFields(garbageCollector);
493     }
494   }
495
496   void CContext::startPrefetchingOfEnabledReadModeFiles()
497   {
498     int size = enabledReadModeFiles.size();
499     for (int i = 0; i < size; ++i)
500     {
501        enabledReadModeFiles[i]->prefetchEnabledReadModeFields();
502     }
503   }
504
505   void CContext::checkPrefetchingOfEnabledReadModeFiles()
506   {
507     int size = enabledReadModeFiles.size();
508     for (int i = 0; i < size; ++i)
509     {
510        enabledReadModeFiles[i]->prefetchEnabledReadModeFieldsIfNeeded();
511     }
512   }
513
514  void CContext::findFieldsWithReadAccess(void)
515  {
516    fieldsWithReadAccess.clear();
517    const vector<CField*> allFields = CField::getAll();
518    for (size_t i = 0; i < allFields.size(); ++i)
519    {
520      CField* field = allFields[i];
521
522      if (field->file && !field->file->mode.isEmpty() && field->file->mode == CFile::mode_attr::read)
523        field->read_access = true;
524      else if (!field->read_access.isEmpty() && field->read_access && (field->enabled.isEmpty() || field->enabled))
525        fieldsWithReadAccess.push_back(field);
526    }
527  }
528
529  void CContext::solveAllRefOfFieldsWithReadAccess()
530  {
531    for (size_t i = 0; i < fieldsWithReadAccess.size(); ++i)
532      fieldsWithReadAccess[i]->solveAllReferenceEnabledField(false);
533  }
534
535  void CContext::buildFilterGraphOfFieldsWithReadAccess()
536  {
537    for (size_t i = 0; i < fieldsWithReadAccess.size(); ++i)
538      fieldsWithReadAccess[i]->buildFilterGraph(garbageCollector, true);
539  }
540
541   void CContext::solveAllInheritance(bool apply)
542   {
543     // Résolution des héritages descendants (càd des héritages de groupes)
544     // pour chacun des contextes.
545      solveDescInheritance(apply);
546
547     // Résolution des héritages par référence au niveau des fichiers.
548      const vector<CFile*> allFiles=CFile::getAll();
549      const vector<CGrid*> allGrids= CGrid::getAll();
550
551     //if (hasClient && !hasServer)
552      if (hasClient)
553      {
554        for (unsigned int i = 0; i < allFiles.size(); i++)
555          allFiles[i]->solveFieldRefInheritance(apply);
556      }
557
558      unsigned int vecSize = allGrids.size();
559      unsigned int i = 0;
560      for (i = 0; i < vecSize; ++i)
561        allGrids[i]->solveDomainAxisRefInheritance(apply);
562
563   }
564
565   void CContext::findEnabledFiles(void)
566   {
567      const std::vector<CFile*> allFiles = CFile::getAll();
568      const CDate& initDate = calendar->getInitDate();
569
570      for (unsigned int i = 0; i < allFiles.size(); i++)
571         if (!allFiles[i]->enabled.isEmpty()) // Si l'attribut 'enabled' est défini.
572         {
573            if (allFiles[i]->enabled.getValue()) // Si l'attribut 'enabled' est fixé à vrai.
574            {
575              if ((initDate + allFiles[i]->output_freq.getValue()) < (initDate + this->getCalendar()->getTimeStep()))
576              {
577                error(0)<<"WARNING: void CContext::findEnabledFiles()"<<endl
578                    << "Output frequency in file \""<<allFiles[i]->getFileOutputName()
579                    <<"\" is less than the time step. File will not be written."<<endl;
580              }
581              else
582               enabledFiles.push_back(allFiles[i]);
583            }
584         }
585         else
586         {
587           if ( (initDate + allFiles[i]->output_freq.getValue()) < (initDate + this->getCalendar()->getTimeStep()))
588           {
589             error(0)<<"WARNING: void CContext::findEnabledFiles()"<<endl
590                 << "Output frequency in file \""<<allFiles[i]->getFileOutputName()
591                 <<"\" is less than the time step. File will not be written."<<endl;
592           }
593           else
594             enabledFiles.push_back(allFiles[i]); // otherwise true by default
595         }
596
597      if (enabledFiles.size() == 0)
598         DEBUG(<<"Aucun fichier ne va être sorti dans le contexte nommé \""
599               << getId() << "\" !");
600   }
601
602   void CContext::findEnabledReadModeFiles(void)
603   {
604     int size = this->enabledFiles.size();
605     for (int i = 0; i < size; ++i)
606     {
607       if (!enabledFiles[i]->mode.isEmpty() && enabledFiles[i]->mode.getValue() == CFile::mode_attr::read)
608        enabledReadModeFiles.push_back(enabledFiles[i]);
609     }
610   }
611
612   void CContext::closeAllFile(void)
613   {
614     std::vector<CFile*>::const_iterator
615            it = this->enabledFiles.begin(), end = this->enabledFiles.end();
616
617     for (; it != end; it++)
618     {
619       info(30)<<"Closing File : "<<(*it)->getId()<<endl;
620       (*it)->close();
621     }
622   }
623
624   /*!
625   \brief Dispatch event received from client
626      Whenever a message is received in buffer of server, it will be processed depending on
627   its event type. A new event type should be added in the switch list to make sure
628   it processed on server side.
629   \param [in] event: Received message
630   */
631   bool CContext::dispatchEvent(CEventServer& event)
632   {
633
634      if (SuperClass::dispatchEvent(event)) return true;
635      else
636      {
637        switch(event.type)
638        {
639           case EVENT_ID_CLOSE_DEFINITION :
640             recvCloseDefinition(event);
641             return true;
642             break;
643           case EVENT_ID_UPDATE_CALENDAR:
644             recvUpdateCalendar(event);
645             return true;
646             break;
647           case EVENT_ID_CREATE_FILE_HEADER :
648             recvCreateFileHeader(event);
649             return true;
650             break;
651           case EVENT_ID_POST_PROCESS:
652             recvPostProcessing(event);
653             return true;
654            case EVENT_ID_SEND_REGISTRY:
655             recvRegistry(event);
656             return true;
657            break;
658
659           default :
660             ERROR("bool CContext::dispatchEvent(CEventServer& event)",
661                    <<"Unknown Event");
662           return false;
663         }
664      }
665   }
666
667   //! Client side: Send a message to server to make it close
668   void CContext::sendCloseDefinition(void)
669   {
670     CEventClient event(getType(),EVENT_ID_CLOSE_DEFINITION);
671     if (client->isServerLeader())
672     {
673       CMessage msg;
674       msg<<this->getIdServer();
675       const std::list<int>& ranks = client->getRanksServerLeader();
676       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
677         event.push(*itRank,1,msg);
678       client->sendEvent(event);
679     }
680     else client->sendEvent(event);
681   }
682
683   //! Server side: Receive a message of client announcing a context close
684   void CContext::recvCloseDefinition(CEventServer& event)
685   {
686
687      CBufferIn* buffer=event.subEvents.begin()->buffer;
688      string id;
689      *buffer>>id;
690      get(id)->closeDefinition();
691   }
692
693   //! Client side: Send a message to update calendar in each time step
694   void CContext::sendUpdateCalendar(int step)
695   {
696     if (!hasServer)
697     {
698       CEventClient event(getType(),EVENT_ID_UPDATE_CALENDAR);
699       if (client->isServerLeader())
700       {
701         CMessage msg;
702         msg<<this->getIdServer()<<step;
703         const std::list<int>& ranks = client->getRanksServerLeader();
704         for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
705           event.push(*itRank,1,msg);
706         client->sendEvent(event);
707       }
708       else client->sendEvent(event);
709     }
710   }
711
712   //! Server side: Receive a message of client annoucing calendar update
713   void CContext::recvUpdateCalendar(CEventServer& event)
714   {
715      CBufferIn* buffer=event.subEvents.begin()->buffer;
716      string id;
717      *buffer>>id;
718      get(id)->recvUpdateCalendar(*buffer);
719   }
720
721   //! Server side: Receive a message of client annoucing calendar update
722   void CContext::recvUpdateCalendar(CBufferIn& buffer)
723   {
724      int step;
725      buffer>>step;
726      updateCalendar(step);
727   }
728
729   //! Client side: Send a message to create header part of netcdf file
730   void CContext::sendCreateFileHeader(void)
731   {
732     CEventClient event(getType(),EVENT_ID_CREATE_FILE_HEADER);
733     if (client->isServerLeader())
734     {
735       CMessage msg;
736       msg<<this->getIdServer();
737       const std::list<int>& ranks = client->getRanksServerLeader();
738       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
739         event.push(*itRank,1,msg) ;
740       client->sendEvent(event);
741     }
742     else client->sendEvent(event);
743   }
744
745   //! Server side: Receive a message of client annoucing the creation of header part of netcdf file
746   void CContext::recvCreateFileHeader(CEventServer& event)
747   {
748      CBufferIn* buffer=event.subEvents.begin()->buffer;
749      string id;
750      *buffer>>id;
751      get(id)->recvCreateFileHeader(*buffer);
752   }
753
754   //! Server side: Receive a message of client annoucing the creation of header part of netcdf file
755   void CContext::recvCreateFileHeader(CBufferIn& buffer)
756   {
757      createFileHeader();
758   }
759
760   //! Client side: Send a message to do some post processing on server
761   void CContext::sendPostProcessing()
762   {
763     if (!hasServer)
764     {
765       CEventClient event(getType(),EVENT_ID_POST_PROCESS);
766       if (client->isServerLeader())
767       {
768         CMessage msg;
769         msg<<this->getIdServer();
770         const std::list<int>& ranks = client->getRanksServerLeader();
771         for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
772           event.push(*itRank,1,msg);
773         client->sendEvent(event);
774       }
775       else client->sendEvent(event);
776     }
777   }
778
779   //! Server side: Receive a message to do some post processing
780   void CContext::recvPostProcessing(CEventServer& event)
781   {
782      CBufferIn* buffer=event.subEvents.begin()->buffer;
783      string id;
784      *buffer>>id;
785      get(id)->recvPostProcessing(*buffer);
786   }
787
788   //! Server side: Receive a message to do some post processing
789   void CContext::recvPostProcessing(CBufferIn& buffer)
790   {
791      CCalendarWrapper::get(CCalendarWrapper::GetDefName())->createCalendar();
792      postProcessing();
793   }
794
795   const StdString& CContext::getIdServer()
796   {
797      if (hasClient)
798      {
799        idServer_ = this->getId();
800        idServer_ += "_server";
801        return idServer_;
802      }
803      if (hasServer) return (this->getId());
804   }
805
806   /*!
807   \brief Do some simple post processings after parsing xml file
808      After the xml file (iodef.xml) is parsed, it is necessary to build all relations among
809   created object, e.g: inhertance among fields, domain, axis. After that, all fiels as well as their parents (reference fields),
810   which will be written out into netcdf files, are processed
811   */
812   void CContext::postProcessing()
813   {
814     if (isPostProcessed) return;
815
816      // Make sure the calendar was correctly created
817      if (!calendar)
818        ERROR("CContext::postProcessing()", << "A calendar must be defined for the context \"" << getId() << "!\"")
819      else if (calendar->getTimeStep() == NoneDu)
820        ERROR("CContext::postProcessing()", << "A timestep must be defined for the context \"" << getId() << "!\"")
821      // Calendar first update to set the current date equals to the start date
822      calendar->update(0);
823
824      // Find all inheritance in xml structure
825      this->solveAllInheritance();
826
827      // Check if some axis, domains or grids are eligible to for compressed indexed output.
828      // Warning: This must be done after solving the inheritance and before the rest of post-processing
829      checkAxisDomainsGridsEligibilityForCompressedOutput();
830
831      // Check if some automatic time series should be generated
832      // Warning: This must be done after solving the inheritance and before the rest of post-processing
833      prepareTimeseries();
834
835      //Initialisation du vecteur 'enabledFiles' contenant la liste des fichiers à sortir.
836      this->findEnabledFiles();
837      this->findEnabledReadModeFiles();
838
839      // Find all enabled fields of each file
840      this->findAllEnabledFields();
841      this->findAllEnabledFieldsInReadModeFiles();
842
843     if (hasClient && !hasServer)
844     {
845      // Try to read attributes of fields in file then fill in corresponding grid (or domain, axis)
846      this->readAttributesOfEnabledFieldsInReadModeFiles();
847     }
848
849      // Only search and rebuild all reference objects of enable fields, don't transform
850      this->solveOnlyRefOfEnabledFields(false);
851
852      // Search and rebuild all reference object of enabled fields
853      this->solveAllRefOfEnabledFields(false);
854
855      // Find all fields with read access from the public API
856      findFieldsWithReadAccess();
857      // and solve the all reference for them
858      solveAllRefOfFieldsWithReadAccess();
859
860      isPostProcessed = true;
861   }
862
863   /*!
864    * Compute the required buffer size to send the attributes (mostly those grid related).
865    *
866    * \param maxEventSize [in/out] the size of the bigger event for each connected server
867    */
868   std::map<int, StdSize> CContext::getAttributesBufferSize(std::map<int, StdSize>& maxEventSize)
869   {
870     std::map<int, StdSize> attributesSize;
871
872     if (hasClient)
873     {
874       size_t numEnabledFiles = this->enabledFiles.size();
875       for (size_t i = 0; i < numEnabledFiles; ++i)
876       {
877         CFile* file = this->enabledFiles[i];
878
879         std::vector<CField*> enabledFields = file->getEnabledFields();
880         size_t numEnabledFields = enabledFields.size();
881         for (size_t j = 0; j < numEnabledFields; ++j)
882         {
883           const std::map<int, StdSize> mapSize = enabledFields[j]->getGridAttributesBufferSize();
884           std::map<int, StdSize>::const_iterator it = mapSize.begin(), itE = mapSize.end();
885           for (; it != itE; ++it)
886           {
887             // If attributesSize[it->first] does not exist, it will be zero-initialized
888             // so we can use it safely without checking for its existance
889             if (attributesSize[it->first] < it->second)
890               attributesSize[it->first] = it->second;
891
892             if (maxEventSize[it->first] < it->second)
893               maxEventSize[it->first] = it->second;
894           }
895         }
896       }
897     }
898
899     return attributesSize;
900   }
901
902   /*!
903    * Compute the required buffer size to send the fields data.
904    *
905    * \param maxEventSize [in/out] the size of the bigger event for each connected server
906    */
907   std::map<int, StdSize> CContext::getDataBufferSize(std::map<int, StdSize>& maxEventSize)
908   {
909     CFile::mode_attr::t_enum mode = hasClient ? CFile::mode_attr::write : CFile::mode_attr::read;
910
911     std::map<int, StdSize> dataSize;
912
913     // Find all reference domain and axis of all active fields
914     size_t numEnabledFiles = this->enabledFiles.size();
915     for (size_t i = 0; i < numEnabledFiles; ++i)
916     {
917       CFile* file = this->enabledFiles[i];
918       CFile::mode_attr::t_enum fileMode = file->mode.isEmpty() ? CFile::mode_attr::write : file->mode.getValue();
919
920       if (fileMode == mode)
921       {
922         std::vector<CField*> enabledFields = file->getEnabledFields();
923         size_t numEnabledFields = enabledFields.size();
924         for (size_t j = 0; j < numEnabledFields; ++j)
925         {
926           const std::map<int, StdSize> mapSize = enabledFields[j]->getGridDataBufferSize();
927           std::map<int, StdSize>::const_iterator it = mapSize.begin(), itE = mapSize.end();
928           for (; it != itE; ++it)
929           {
930             // If dataSize[it->first] does not exist, it will be zero-initialized
931             // so we can use it safely without checking for its existance
932             if (CXios::isOptPerformance)
933               dataSize[it->first] += it->second;
934             else if (dataSize[it->first] < it->second)
935               dataSize[it->first] = it->second;
936
937             if (maxEventSize[it->first] < it->second)
938               maxEventSize[it->first] = it->second;
939           }
940         }
941       }
942     }
943
944     return dataSize;
945   }
946
947   //! Client side: Send infomation of active files (files are enabled to write out)
948   void CContext::sendEnabledFiles()
949   {
950     int size = this->enabledFiles.size();
951
952     // In a context, each type has a root definition, e.g: axis, domain, field.
953     // Every object must be a child of one of these root definition. In this case
954     // all new file objects created on server must be children of the root "file_definition"
955     StdString fileDefRoot("file_definition");
956     CFileGroup* cfgrpPtr = CFileGroup::get(fileDefRoot);
957
958     for (int i = 0; i < size; ++i)
959     {
960       cfgrpPtr->sendCreateChild(this->enabledFiles[i]->getId());
961       this->enabledFiles[i]->sendAllAttributesToServer();
962       this->enabledFiles[i]->sendAddAllVariables();
963     }
964   }
965
966   //! Client side: Send information of active fields (ones are written onto files)
967   void CContext::sendEnabledFields()
968   {
969     int size = this->enabledFiles.size();
970     for (int i = 0; i < size; ++i)
971     {
972       this->enabledFiles[i]->sendEnabledFields();
973     }
974   }
975
976   //! Client side: Check if the defined axis, domains and grids are eligible for compressed indexed output
977   void CContext::checkAxisDomainsGridsEligibilityForCompressedOutput()
978   {
979     if (!hasClient) return;
980
981     const vector<CAxis*> allAxis = CAxis::getAll();
982     for (vector<CAxis*>::const_iterator it = allAxis.begin(); it != allAxis.end(); it++)
983       (*it)->checkEligibilityForCompressedOutput();
984
985     const vector<CDomain*> allDomains = CDomain::getAll();
986     for (vector<CDomain*>::const_iterator it = allDomains.begin(); it != allDomains.end(); it++)
987       (*it)->checkEligibilityForCompressedOutput();
988
989     const vector<CGrid*> allGrids = CGrid::getAll();
990     for (vector<CGrid*>::const_iterator it = allGrids.begin(); it != allGrids.end(); it++)
991       (*it)->checkEligibilityForCompressedOutput();
992   }
993
994   //! Client side: Prepare the timeseries by adding the necessary files
995   void CContext::prepareTimeseries()
996   {
997     if (!hasClient) return;
998
999     const std::vector<CFile*> allFiles = CFile::getAll();
1000     for (size_t i = 0; i < allFiles.size(); i++)
1001     {
1002       CFile* file = allFiles[i];
1003
1004       std::vector<CVariable*> fileVars, fieldVars, vars = file->getAllVariables();
1005       for (size_t k = 0; k < vars.size(); k++)
1006       {
1007         CVariable* var = vars[k];
1008
1009         if (var->ts_target.isEmpty()
1010              || var->ts_target == CVariable::ts_target_attr::file || var->ts_target == CVariable::ts_target_attr::both)
1011           fileVars.push_back(var);
1012
1013         if (!var->ts_target.isEmpty()
1014              && (var->ts_target == CVariable::ts_target_attr::field || var->ts_target == CVariable::ts_target_attr::both))
1015           fieldVars.push_back(var);
1016       }
1017
1018       if (!file->timeseries.isEmpty() && file->timeseries != CFile::timeseries_attr::none)
1019       {
1020         StdString fileNameStr("%file_name%") ;
1021         StdString tsPrefix = !file->ts_prefix.isEmpty() ? file->ts_prefix : fileNameStr ;
1022         
1023         StdString fileName=file->getFileOutputName();
1024         size_t pos=tsPrefix.find(fileNameStr) ;
1025         while (pos!=std::string::npos)
1026         {
1027           tsPrefix=tsPrefix.replace(pos,fileNameStr.size(),fileName) ;
1028           pos=tsPrefix.find(fileNameStr) ;
1029         }
1030       
1031         const std::vector<CField*> allFields = file->getAllFields();
1032         for (size_t j = 0; j < allFields.size(); j++)
1033         {
1034           CField* field = allFields[j];
1035
1036           if (!field->ts_enabled.isEmpty() && field->ts_enabled)
1037           {
1038             CFile* tsFile = CFile::create();
1039             tsFile->duplicateAttributes(file);
1040
1041             // Add variables originating from file and targeted to timeserie file
1042             for (size_t k = 0; k < fileVars.size(); k++)
1043               tsFile->getVirtualVariableGroup()->addChild(fileVars[k]);
1044
1045           
1046             tsFile->name = tsPrefix + "_";
1047             if (!field->name.isEmpty())
1048               tsFile->name.get() += field->name;
1049             else if (field->hasDirectFieldReference()) // We cannot use getBaseFieldReference() just yet
1050               tsFile->name.get() += field->field_ref;
1051             else
1052               tsFile->name.get() += field->getId();
1053
1054             if (!field->ts_split_freq.isEmpty())
1055               tsFile->split_freq = field->ts_split_freq;
1056
1057             CField* tsField = tsFile->addField();
1058             tsField->field_ref = field->getId();
1059
1060             // Add variables originating from file and targeted to timeserie field
1061             for (size_t k = 0; k < fieldVars.size(); k++)
1062               tsField->getVirtualVariableGroup()->addChild(fieldVars[k]);
1063
1064             vars = field->getAllVariables();
1065             for (size_t k = 0; k < vars.size(); k++)
1066             {
1067               CVariable* var = vars[k];
1068
1069               // Add variables originating from field and targeted to timeserie field
1070               if (var->ts_target.isEmpty()
1071                    || var->ts_target == CVariable::ts_target_attr::field || var->ts_target == CVariable::ts_target_attr::both)
1072                 tsField->getVirtualVariableGroup()->addChild(var);
1073
1074               // Add variables originating from field and targeted to timeserie file
1075               if (!var->ts_target.isEmpty()
1076                    && (var->ts_target == CVariable::ts_target_attr::file || var->ts_target == CVariable::ts_target_attr::both))
1077                 tsFile->getVirtualVariableGroup()->addChild(var);
1078             }
1079
1080             tsFile->solveFieldRefInheritance(true);
1081
1082             if (file->timeseries == CFile::timeseries_attr::exclusive)
1083               field->enabled = false;
1084           }
1085         }
1086
1087         // Finally disable the original file is need be
1088         if (file->timeseries == CFile::timeseries_attr::only)
1089          file->enabled = false;
1090       }
1091     }
1092   }
1093
1094   //! Client side: Send information of reference grid of active fields
1095   void CContext::sendRefGrid()
1096   {
1097     std::set<StdString> gridIds;
1098     int sizeFile = this->enabledFiles.size();
1099     CFile* filePtr(NULL);
1100
1101     // Firstly, find all reference grids of all active fields
1102     for (int i = 0; i < sizeFile; ++i)
1103     {
1104       filePtr = this->enabledFiles[i];
1105       std::vector<CField*> enabledFields = filePtr->getEnabledFields();
1106       int sizeField = enabledFields.size();
1107       for (int numField = 0; numField < sizeField; ++numField)
1108       {
1109         if (0 != enabledFields[numField]->getRelGrid())
1110           gridIds.insert(CGrid::get(enabledFields[numField]->getRelGrid())->getId());
1111       }
1112     }
1113
1114     // Create all reference grids on server side
1115     StdString gridDefRoot("grid_definition");
1116     CGridGroup* gridPtr = CGridGroup::get(gridDefRoot);
1117     std::set<StdString>::const_iterator it, itE = gridIds.end();
1118     for (it = gridIds.begin(); it != itE; ++it)
1119     {
1120       gridPtr->sendCreateChild(*it);
1121       CGrid::get(*it)->sendAllAttributesToServer();
1122       CGrid::get(*it)->sendAllDomains();
1123       CGrid::get(*it)->sendAllAxis();
1124       CGrid::get(*it)->sendAllScalars();
1125     }
1126   }
1127
1128
1129   //! Client side: Send information of reference domain and axis of active fields
1130   void CContext::sendRefDomainsAxis()
1131   {
1132     std::set<StdString> domainIds, axisIds, scalarIds;
1133
1134     // Find all reference domain and axis of all active fields
1135     int numEnabledFiles = this->enabledFiles.size();
1136     for (int i = 0; i < numEnabledFiles; ++i)
1137     {
1138       std::vector<CField*> enabledFields = this->enabledFiles[i]->getEnabledFields();
1139       int numEnabledFields = enabledFields.size();
1140       for (int j = 0; j < numEnabledFields; ++j)
1141       {
1142         const std::vector<StdString>& prDomAxisScalarId = enabledFields[j]->getRefDomainAxisIds();
1143         if ("" != prDomAxisScalarId[0]) domainIds.insert(prDomAxisScalarId[0]);
1144         if ("" != prDomAxisScalarId[1]) axisIds.insert(prDomAxisScalarId[1]);
1145         if ("" != prDomAxisScalarId[2]) scalarIds.insert(prDomAxisScalarId[2]);
1146       }
1147     }
1148
1149     // Create all reference axis on server side
1150     std::set<StdString>::iterator itDom, itAxis, itScalar;
1151     std::set<StdString>::const_iterator itE;
1152
1153     StdString scalarDefRoot("scalar_definition");
1154     CScalarGroup* scalarPtr = CScalarGroup::get(scalarDefRoot);
1155     itE = scalarIds.end();
1156     for (itScalar = scalarIds.begin(); itScalar != itE; ++itScalar)
1157     {
1158       if (!itScalar->empty())
1159       {
1160         scalarPtr->sendCreateChild(*itScalar);
1161         CScalar::get(*itScalar)->sendAllAttributesToServer();
1162       }
1163     }
1164
1165     StdString axiDefRoot("axis_definition");
1166     CAxisGroup* axisPtr = CAxisGroup::get(axiDefRoot);
1167     itE = axisIds.end();
1168     for (itAxis = axisIds.begin(); itAxis != itE; ++itAxis)
1169     {
1170       if (!itAxis->empty())
1171       {
1172         axisPtr->sendCreateChild(*itAxis);
1173         CAxis::get(*itAxis)->sendAllAttributesToServer();
1174       }
1175     }
1176
1177     // Create all reference domains on server side
1178     StdString domDefRoot("domain_definition");
1179     CDomainGroup* domPtr = CDomainGroup::get(domDefRoot);
1180     itE = domainIds.end();
1181     for (itDom = domainIds.begin(); itDom != itE; ++itDom)
1182     {
1183       if (!itDom->empty()) {
1184          domPtr->sendCreateChild(*itDom);
1185          CDomain::get(*itDom)->sendAllAttributesToServer();
1186       }
1187     }
1188   }
1189
1190   //! Update calendar in each time step
1191   void CContext::updateCalendar(int step)
1192   {
1193      info(50) << "updateCalendar : before : " << calendar->getCurrentDate() << endl;
1194      calendar->update(step);
1195      info(50) << "updateCalendar : after : " << calendar->getCurrentDate() << endl;
1196
1197      if (hasClient)
1198      {
1199        checkPrefetchingOfEnabledReadModeFiles();
1200        garbageCollector.invalidate(calendar->getCurrentDate());
1201      }
1202   }
1203
1204   //! Server side: Create header of netcdf file
1205   void CContext::createFileHeader(void )
1206   {
1207      vector<CFile*>::const_iterator it;
1208
1209      for (it=enabledFiles.begin(); it != enabledFiles.end(); it++)
1210      {
1211         (*it)->initFile();
1212      }
1213   }
1214
1215   //! Get current context
1216   CContext* CContext::getCurrent(void)
1217   {
1218     return CObjectFactory::GetObject<CContext>(CObjectFactory::GetCurrentContextId()).get();
1219   }
1220
1221   /*!
1222   \brief Set context with an id be the current context
1223   \param [in] id identity of context to be set to current
1224   */
1225   void CContext::setCurrent(const string& id)
1226   {
1227     CObjectFactory::SetCurrentContextId(id);
1228     CGroupFactory::SetCurrentContextId(id);
1229   }
1230
1231  /*!
1232  \brief Create a context with specific id
1233  \param [in] id identity of new context
1234  \return pointer to the new context or already-existed one with identity id
1235  */
1236  CContext* CContext::create(const StdString& id)
1237  {
1238    CContext::setCurrent(id);
1239
1240    bool hasctxt = CContext::has(id);
1241    CContext* context = CObjectFactory::CreateObject<CContext>(id).get();
1242    getRoot();
1243    if (!hasctxt) CGroupFactory::AddChild(root, context->getShared());
1244
1245#define DECLARE_NODE(Name_, name_) \
1246    C##Name_##Definition::create(C##Name_##Definition::GetDefName());
1247#define DECLARE_NODE_PAR(Name_, name_)
1248#include "node_type.conf"
1249
1250    return (context);
1251  }
1252
1253
1254
1255     //! Server side: Receive a message to do some post processing
1256  void CContext::recvRegistry(CEventServer& event)
1257  {
1258    CBufferIn* buffer=event.subEvents.begin()->buffer;
1259    string id;
1260    *buffer>>id;
1261    get(id)->recvRegistry(*buffer);
1262  }
1263
1264  void CContext::recvRegistry(CBufferIn& buffer)
1265  {
1266    if (server->intraCommRank==0)
1267    {
1268      CRegistry registry(server->intraComm) ;
1269      registry.fromBuffer(buffer) ;
1270      registryOut->mergeRegistry(registry) ;
1271    }
1272  }
1273
1274  void CContext::sendRegistry(void)
1275  {
1276    registryOut->hierarchicalGatherRegistry() ;
1277
1278    CEventClient event(CContext::GetType(), CContext::EVENT_ID_SEND_REGISTRY);
1279    if (client->isServerLeader())
1280    {
1281       CMessage msg ;
1282       msg<<this->getIdServer();
1283       if (client->clientRank==0) msg<<*registryOut ;
1284       const std::list<int>& ranks = client->getRanksServerLeader();
1285       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
1286         event.push(*itRank,1,msg);
1287       client->sendEvent(event);
1288     }
1289     else client->sendEvent(event);
1290  }
1291
1292} // namespace xios
Note: See TracBrowser for help on using the repository browser.