source: XIOS/trunk/src/node/context.cpp @ 1135

Last change on this file since 1135 was 1135, checked in by ymipsl, 5 years ago
  • Add new timer for better profiling. The full timer output will be provided only for info_level=100
  • Add new file attribute : convention_str (string) : this string will overide the default value wrote in the file (CF-1.6 or UGRID)

YM

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
File size: 43.2 KB
Line 
1#include "context.hpp"
2#include "attribute_template.hpp"
3#include "object_template.hpp"
4#include "group_template.hpp"
5
6#include "calendar_type.hpp"
7#include "duration.hpp"
8
9#include "context_client.hpp"
10#include "context_server.hpp"
11#include "nc4_data_output.hpp"
12#include "node_type.hpp"
13#include "message.hpp"
14#include "type.hpp"
15#include "xios_spl.hpp"
16#include "timer.hpp"
17
18
19namespace xios {
20
21  shared_ptr<CContextGroup> CContext::root;
22
23   /// ////////////////////// Définitions ////////////////////// ///
24
25   CContext::CContext(void)
26      : CObjectTemplate<CContext>(), CContextAttributes()
27      , calendar(), hasClient(false), hasServer(false), isPostProcessed(false), finalized(false)
28      , idServer_(), client(0), server(0)
29   { /* Ne rien faire de plus */ }
30
31   CContext::CContext(const StdString & id)
32      : CObjectTemplate<CContext>(id), CContextAttributes()
33      , calendar(), hasClient(false), hasServer(false), isPostProcessed(false), finalized(false)
34      , idServer_(), client(0), server(0)
35   { /* Ne rien faire de plus */ }
36
37   CContext::~CContext(void)
38   {
39     delete client;
40     delete server;
41   }
42
43   //----------------------------------------------------------------
44   //! Get name of context
45   StdString CContext::GetName(void)   { return (StdString("context")); }
46   StdString CContext::GetDefName(void){ return (CContext::GetName()); }
47   ENodeType CContext::GetType(void)   { return (eContext); }
48
49   //----------------------------------------------------------------
50
51   /*!
52   \brief Get context group (context root)
53   \return Context root
54   */
55   CContextGroup* CContext::getRoot(void)
56   {
57      if (root.get()==NULL) root=shared_ptr<CContextGroup>(new CContextGroup(xml::CXMLNode::GetRootName()));
58      return root.get();
59   }
60
61   //----------------------------------------------------------------
62
63   /*!
64   \brief Get calendar of a context
65   \return Calendar
66   */
67   boost::shared_ptr<CCalendar> CContext::getCalendar(void) const
68   {
69      return (this->calendar);
70   }
71
72   //----------------------------------------------------------------
73
74   /*!
75   \brief Set a context with a calendar
76   \param[in] newCalendar new calendar
77   */
78   void CContext::setCalendar(boost::shared_ptr<CCalendar> newCalendar)
79   {
80      this->calendar = newCalendar;
81   }
82
83   //----------------------------------------------------------------
84   /*!
85   \brief Parse xml file and write information into context object
86   \param [in] node xmld node corresponding in xml file
87   */
88   void CContext::parse(xml::CXMLNode & node)
89   {
90      CContext::SuperClass::parse(node);
91
92      // PARSING POUR GESTION DES ENFANTS
93      xml::THashAttributes attributes = node.getAttributes();
94
95      if (attributes.end() != attributes.find("src"))
96      {
97         StdIFStream ifs ( attributes["src"].c_str() , StdIFStream::in );
98         if ( (ifs.rdstate() & std::ifstream::failbit ) != 0 )
99            ERROR("void CContext::parse(xml::CXMLNode & node)",
100                  <<endl<< "Can not open <"<<attributes["src"].c_str()<<"> file" );
101         if (!ifs.good())
102            ERROR("CContext::parse(xml::CXMLNode & node)",
103                  << "[ filename = " << attributes["src"] << " ] Bad xml stream !");
104         xml::CXMLParser::ParseInclude(ifs, attributes["src"], *this);
105      }
106
107      if (node.getElementName().compare(CContext::GetName()))
108         DEBUG("Le noeud is wrong defined but will be considered as a context !");
109
110      if (!(node.goToChildElement()))
111      {
112         DEBUG("Le context ne contient pas d'enfant !");
113      }
114      else
115      {
116         do { // Parcours des contextes pour traitement.
117
118            StdString name = node.getElementName();
119            attributes.clear();
120            attributes = node.getAttributes();
121
122            if (attributes.end() != attributes.find("id"))
123            {
124              DEBUG(<< "Definition node has an id,"
125                    << "it will not be taking account !");
126            }
127
128#define DECLARE_NODE(Name_, name_)    \
129   if (name.compare(C##Name_##Definition::GetDefName()) == 0) \
130   { C##Name_##Definition::create(C##Name_##Definition::GetDefName()) -> parse(node); continue; }
131#define DECLARE_NODE_PAR(Name_, name_)
132#include "node_type.conf"
133
134            DEBUG(<< "The element \'"     << name
135                  << "\' in the context \'" << CContext::getCurrent()->getId()
136                  << "\' is not a definition !");
137
138         } while (node.goToNextElement());
139
140         node.goToParentElement(); // Retour au parent
141      }
142   }
143
144   //----------------------------------------------------------------
145   //! Show tree structure of context
146   void CContext::ShowTree(StdOStream & out)
147   {
148      StdString currentContextId = CContext::getCurrent() -> getId();
149      std::vector<CContext*> def_vector =
150         CContext::getRoot()->getChildList();
151      std::vector<CContext*>::iterator
152         it = def_vector.begin(), end = def_vector.end();
153
154      out << "<? xml version=\"1.0\" ?>" << std::endl;
155      out << "<"  << xml::CXMLNode::GetRootName() << " >" << std::endl;
156
157      for (; it != end; it++)
158      {
159         CContext* context = *it;
160         CContext::setCurrent(context->getId());
161         out << *context << std::endl;
162      }
163
164      out << "</" << xml::CXMLNode::GetRootName() << " >" << std::endl;
165      CContext::setCurrent(currentContextId);
166   }
167
168
169   //----------------------------------------------------------------
170
171   //! Convert context object into string (to print)
172   StdString CContext::toString(void) const
173   {
174      StdOStringStream oss;
175      oss << "<" << CContext::GetName()
176          << " id=\"" << this->getId() << "\" "
177          << SuperClassAttribute::toString() << ">" << std::endl;
178      if (!this->hasChild())
179      {
180         //oss << "<!-- No definition -->" << std::endl; // fait planter l'incrémentation
181      }
182      else
183      {
184
185#define DECLARE_NODE(Name_, name_)    \
186   if (C##Name_##Definition::has(C##Name_##Definition::GetDefName())) \
187   oss << * C##Name_##Definition::get(C##Name_##Definition::GetDefName()) << std::endl;
188#define DECLARE_NODE_PAR(Name_, name_)
189#include "node_type.conf"
190
191      }
192
193      oss << "</" << CContext::GetName() << " >";
194
195      return (oss.str());
196   }
197
198   //----------------------------------------------------------------
199
200   /*!
201   \brief Find all inheritace among objects in a context.
202   \param [in] apply (true) write attributes of parent into ones of child if they are empty
203                     (false) write attributes of parent into a new container of child
204   \param [in] parent unused
205   */
206   void CContext::solveDescInheritance(bool apply, const CAttributeMap * const UNUSED(parent))
207   {
208#define DECLARE_NODE(Name_, name_)    \
209   if (C##Name_##Definition::has(C##Name_##Definition::GetDefName())) \
210     C##Name_##Definition::get(C##Name_##Definition::GetDefName())->solveDescInheritance(apply);
211#define DECLARE_NODE_PAR(Name_, name_)
212#include "node_type.conf"
213   }
214
215   //----------------------------------------------------------------
216
217   //! Verify if all root definition in the context have child.
218   bool CContext::hasChild(void) const
219   {
220      return (
221#define DECLARE_NODE(Name_, name_)    \
222   C##Name_##Definition::has(C##Name_##Definition::GetDefName())   ||
223#define DECLARE_NODE_PAR(Name_, name_)
224#include "node_type.conf"
225      false);
226}
227
228   //----------------------------------------------------------------
229
230   void CContext::CleanTree(void)
231   {
232#define DECLARE_NODE(Name_, name_) C##Name_##Definition::ClearAllAttributes();
233#define DECLARE_NODE_PAR(Name_, name_)
234#include "node_type.conf"
235   }
236   ///---------------------------------------------------------------
237
238   //! Initialize client side
239   void CContext::initClient(MPI_Comm intraComm, MPI_Comm interComm, CContext* cxtServer /*= 0*/)
240   {
241     hasClient=true;
242     client = new CContextClient(this,intraComm, interComm, cxtServer);
243     registryIn=new CRegistry(intraComm);
244     registryIn->setPath(getId()) ;
245     if (client->clientRank==0) registryIn->fromFile("xios_registry.bin") ;
246     registryIn->bcastRegistry() ;
247
248     registryOut=new CRegistry(intraComm) ;
249     registryOut->setPath(getId()) ;
250
251     MPI_Comm intraCommServer, interCommServer;
252     if (cxtServer) // Attached mode
253     {
254       intraCommServer = intraComm;
255       interCommServer = interComm;
256     }
257     else
258     {
259       MPI_Comm_dup(intraComm, &intraCommServer);
260       comms.push_back(intraCommServer);
261       MPI_Comm_dup(interComm, &interCommServer);
262       comms.push_back(interCommServer);
263     }
264     server = new CContextServer(this,intraCommServer,interCommServer);
265   }
266
267   void CContext::setClientServerBuffer()
268   {
269     size_t minBufferSize = CXios::minBufferSize;
270#define DECLARE_NODE(Name_, name_)    \
271     if (minBufferSize < sizeof(C##Name_##Definition)) minBufferSize = sizeof(C##Name_##Definition);
272#define DECLARE_NODE_PAR(Name_, name_)
273#include "node_type.conf"
274#undef DECLARE_NODE
275#undef DECLARE_NODE_PAR
276
277     std::map<int, StdSize> maxEventSize;
278     std::map<int, StdSize> bufferSize = getAttributesBufferSize(maxEventSize);
279     std::map<int, StdSize> dataBufferSize = getDataBufferSize(maxEventSize);
280
281     std::map<int, StdSize>::iterator it, ite = dataBufferSize.end();
282     for (it = dataBufferSize.begin(); it != ite; ++it)
283       if (it->second > bufferSize[it->first]) bufferSize[it->first] = it->second;
284
285     ite = bufferSize.end();
286     for (it = bufferSize.begin(); it != ite; ++it)
287     {
288       it->second *= CXios::bufferSizeFactor;
289       if (it->second < minBufferSize) it->second = minBufferSize;
290     }
291
292     // We consider that the minimum buffer size is also the minimum event size
293     ite = maxEventSize.end();
294     for (it = maxEventSize.begin(); it != ite; ++it)
295       if (it->second < minBufferSize) it->second = minBufferSize;
296
297     if (client->isServerLeader())
298     {
299       const std::list<int>& ranks = client->getRanksServerLeader();
300       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
301         if (!bufferSize.count(*itRank)) bufferSize[*itRank] = maxEventSize[*itRank] = minBufferSize;
302     }
303
304     client->setBufferSize(bufferSize, maxEventSize);
305   }
306
307   //! Verify whether a context is initialized
308   bool CContext::isInitialized(void)
309   {
310     return hasClient;
311   }
312
313   //! Initialize server
314   void CContext::initServer(MPI_Comm intraComm,MPI_Comm interComm, CContext* cxtClient /*= 0*/)
315   {
316     hasServer=true;
317     server = new CContextServer(this,intraComm,interComm);
318
319     registryIn=new CRegistry(intraComm);
320     registryIn->setPath(getId()) ;
321     if (server->intraCommRank==0) registryIn->fromFile("xios_registry.bin") ;
322     registryIn->bcastRegistry() ;
323     registryOut=new CRegistry(intraComm) ;
324     registryOut->setPath(getId()) ;
325
326     MPI_Comm intraCommClient, interCommClient;
327     if (cxtClient) // Attached mode
328     {
329       intraCommClient = intraComm;
330       interCommClient = interComm;
331     }
332     else
333     {
334       MPI_Comm_dup(intraComm, &intraCommClient);
335       comms.push_back(intraCommClient);
336       MPI_Comm_dup(interComm, &interCommClient);
337       comms.push_back(interCommClient);
338     }
339     client = new CContextClient(this,intraCommClient,interCommClient, cxtClient);
340   }
341
342   //! Try to send the buffers and receive possible answers
343   bool CContext::checkBuffersAndListen(void)
344   {
345     client->checkBuffers();
346
347     bool hasTmpBufferedEvent = client->hasTemporarilyBufferedEvent();
348     if (hasTmpBufferedEvent)
349       hasTmpBufferedEvent = !client->sendTemporarilyBufferedEvent();
350
351     // Don't process events if there is a temporarily buffered event
352     return server->eventLoop(!hasTmpBufferedEvent);
353   }
354
355   //! Terminate a context
356   void CContext::finalize(void)
357   {
358      if (!finalized)
359      {
360        finalized = true;
361        if (hasClient) sendRegistry() ;
362        client->finalize();
363        while (!server->hasFinished())
364        {
365          server->eventLoop();
366        }
367
368        if (hasServer)
369        {
370          closeAllFile();
371          registryOut->hierarchicalGatherRegistry() ;
372          if (server->intraCommRank==0) CXios::globalRegistry->mergeRegistry(*registryOut) ;
373        }
374
375        for (std::list<MPI_Comm>::iterator it = comms.begin(); it != comms.end(); ++it)
376          MPI_Comm_free(&(*it));
377        comms.clear();
378      }
379   }
380
381   /*!
382   \brief Close all the context defintion and do processing data
383      After everything is well defined on client side, they will be processed and sent to server
384   From the version 2.0, sever and client work no more on the same database. Moreover, client(s) will send
385   all necessary information to server, from which each server can build its own database.
386   Because the role of server is to write out field data on a specific netcdf file,
387   the only information that it needs is the enabled files
388   and the active fields (fields will be written onto active files)
389   */
390   void CContext::closeDefinition(void)
391   {
392     CTimer::get("Context : close definition").resume() ;
393     // There is nothing client need to send to server
394     if (hasClient)
395     {
396       // After xml is parsed, there are some more works with post processing
397       postProcessing();
398     }
399     setClientServerBuffer();
400
401     if (hasClient && !hasServer)
402     {
403      // Send all attributes of current context to server
404      this->sendAllAttributesToServer();
405
406      // Send all attributes of current calendar
407      CCalendarWrapper::get(CCalendarWrapper::GetDefName())->sendAllAttributesToServer();
408
409      // We have enough information to send to server
410      // First of all, send all enabled files
411       sendEnabledFiles();
412
413      // Then, send all enabled fields
414       sendEnabledFields();
415
416      // At last, we have all info of domain and axis, then send them
417       sendRefDomainsAxis();
418
419      // After that, send all grid (if any)
420       sendRefGrid();
421    }
422
423    // We have a xml tree on the server side and now, it should be also processed
424    if (hasClient && !hasServer) sendPostProcessing();
425
426    // There are some processings that should be done after all of above. For example: check mask or index
427    if (hasClient)
428    {
429      this->buildFilterGraphOfEnabledFields();
430      buildFilterGraphOfFieldsWithReadAccess();
431      this->solveAllRefOfEnabledFields(true);
432    }
433
434    // Now tell server that it can process all messages from client
435    if (hasClient && !hasServer) this->sendCloseDefinition();
436
437    // Nettoyage de l'arborescence
438    if (hasClient && !hasServer) CleanTree(); // Only on client side??
439
440    if (hasClient)
441    {
442      sendCreateFileHeader();
443
444      startPrefetchingOfEnabledReadModeFiles();
445    }
446    CTimer::get("Context : close definition").suspend() ;
447   }
448
449   void CContext::findAllEnabledFields(void)
450   {
451     for (unsigned int i = 0; i < this->enabledFiles.size(); i++)
452     (void)this->enabledFiles[i]->getEnabledFields();
453   }
454
455   void CContext::findAllEnabledFieldsInReadModeFiles(void)
456   {
457     for (unsigned int i = 0; i < this->enabledReadModeFiles.size(); ++i)
458     (void)this->enabledReadModeFiles[i]->getEnabledFields();
459   }
460
461   void CContext::readAttributesOfEnabledFieldsInReadModeFiles()
462   {
463      for (unsigned int i = 0; i < this->enabledReadModeFiles.size(); ++i)
464        (void)this->enabledReadModeFiles[i]->readAttributesOfEnabledFieldsInReadMode();
465   }
466
467   void CContext::solveOnlyRefOfEnabledFields(bool sendToServer)
468   {
469     int size = this->enabledFiles.size();
470     for (int i = 0; i < size; ++i)
471     {
472       this->enabledFiles[i]->solveOnlyRefOfEnabledFields(sendToServer);
473     }
474
475     for (int i = 0; i < size; ++i)
476     {
477       this->enabledFiles[i]->generateNewTransformationGridDest();
478     }
479   }
480
481   void CContext::solveAllRefOfEnabledFields(bool sendToServer)
482   {
483     int size = this->enabledFiles.size();
484     for (int i = 0; i < size; ++i)
485     {
486       this->enabledFiles[i]->solveAllRefOfEnabledFields(sendToServer);
487     }
488   }
489
490   void CContext::buildFilterGraphOfEnabledFields()
491   {
492     int size = this->enabledFiles.size();
493     for (int i = 0; i < size; ++i)
494     {
495       this->enabledFiles[i]->buildFilterGraphOfEnabledFields(garbageCollector);
496     }
497   }
498
499   void CContext::startPrefetchingOfEnabledReadModeFiles()
500   {
501     int size = enabledReadModeFiles.size();
502     for (int i = 0; i < size; ++i)
503     {
504        enabledReadModeFiles[i]->prefetchEnabledReadModeFields();
505     }
506   }
507
508   void CContext::checkPrefetchingOfEnabledReadModeFiles()
509   {
510     int size = enabledReadModeFiles.size();
511     for (int i = 0; i < size; ++i)
512     {
513        enabledReadModeFiles[i]->prefetchEnabledReadModeFieldsIfNeeded();
514     }
515   }
516
517  void CContext::findFieldsWithReadAccess(void)
518  {
519    fieldsWithReadAccess.clear();
520    const vector<CField*> allFields = CField::getAll();
521    for (size_t i = 0; i < allFields.size(); ++i)
522    {
523      CField* field = allFields[i];
524
525      if (field->file && !field->file->mode.isEmpty() && field->file->mode == CFile::mode_attr::read)
526        field->read_access = true;
527      else if (!field->read_access.isEmpty() && field->read_access && (field->enabled.isEmpty() || field->enabled))
528        fieldsWithReadAccess.push_back(field);
529    }
530  }
531
532  void CContext::solveAllRefOfFieldsWithReadAccess()
533  {
534    for (size_t i = 0; i < fieldsWithReadAccess.size(); ++i)
535      fieldsWithReadAccess[i]->solveAllReferenceEnabledField(false);
536  }
537
538  void CContext::buildFilterGraphOfFieldsWithReadAccess()
539  {
540    for (size_t i = 0; i < fieldsWithReadAccess.size(); ++i)
541      fieldsWithReadAccess[i]->buildFilterGraph(garbageCollector, true);
542  }
543
544   void CContext::solveAllInheritance(bool apply)
545   {
546     // Résolution des héritages descendants (càd des héritages de groupes)
547     // pour chacun des contextes.
548      solveDescInheritance(apply);
549
550     // Résolution des héritages par référence au niveau des fichiers.
551      const vector<CFile*> allFiles=CFile::getAll();
552      const vector<CGrid*> allGrids= CGrid::getAll();
553
554     //if (hasClient && !hasServer)
555      if (hasClient)
556      {
557        for (unsigned int i = 0; i < allFiles.size(); i++)
558          allFiles[i]->solveFieldRefInheritance(apply);
559      }
560
561      unsigned int vecSize = allGrids.size();
562      unsigned int i = 0;
563      for (i = 0; i < vecSize; ++i)
564        allGrids[i]->solveDomainAxisRefInheritance(apply);
565
566   }
567
568   void CContext::findEnabledFiles(void)
569   {
570      const std::vector<CFile*> allFiles = CFile::getAll();
571      const CDate& initDate = calendar->getInitDate();
572
573      for (unsigned int i = 0; i < allFiles.size(); i++)
574         if (!allFiles[i]->enabled.isEmpty()) // Si l'attribut 'enabled' est défini.
575         {
576            if (allFiles[i]->enabled.getValue()) // Si l'attribut 'enabled' est fixé à vrai.
577            {
578              if ((initDate + allFiles[i]->output_freq.getValue()) < (initDate + this->getCalendar()->getTimeStep()))
579              {
580                error(0)<<"WARNING: void CContext::findEnabledFiles()"<<endl
581                    << "Output frequency in file \""<<allFiles[i]->getFileOutputName()
582                    <<"\" is less than the time step. File will not be written."<<endl;
583              }
584              else
585               enabledFiles.push_back(allFiles[i]);
586            }
587         }
588         else
589         {
590           if ( (initDate + allFiles[i]->output_freq.getValue()) < (initDate + this->getCalendar()->getTimeStep()))
591           {
592             error(0)<<"WARNING: void CContext::findEnabledFiles()"<<endl
593                 << "Output frequency in file \""<<allFiles[i]->getFileOutputName()
594                 <<"\" is less than the time step. File will not be written."<<endl;
595           }
596           else
597             enabledFiles.push_back(allFiles[i]); // otherwise true by default
598         }
599
600      if (enabledFiles.size() == 0)
601         DEBUG(<<"Aucun fichier ne va être sorti dans le contexte nommé \""
602               << getId() << "\" !");
603   }
604
605   void CContext::findEnabledReadModeFiles(void)
606   {
607     int size = this->enabledFiles.size();
608     for (int i = 0; i < size; ++i)
609     {
610       if (!enabledFiles[i]->mode.isEmpty() && enabledFiles[i]->mode.getValue() == CFile::mode_attr::read)
611        enabledReadModeFiles.push_back(enabledFiles[i]);
612     }
613   }
614
615   void CContext::closeAllFile(void)
616   {
617     std::vector<CFile*>::const_iterator
618            it = this->enabledFiles.begin(), end = this->enabledFiles.end();
619
620     for (; it != end; it++)
621     {
622       info(30)<<"Closing File : "<<(*it)->getId()<<endl;
623       (*it)->close();
624     }
625   }
626
627   /*!
628   \brief Dispatch event received from client
629      Whenever a message is received in buffer of server, it will be processed depending on
630   its event type. A new event type should be added in the switch list to make sure
631   it processed on server side.
632   \param [in] event: Received message
633   */
634   bool CContext::dispatchEvent(CEventServer& event)
635   {
636
637      if (SuperClass::dispatchEvent(event)) return true;
638      else
639      {
640        switch(event.type)
641        {
642           case EVENT_ID_CLOSE_DEFINITION :
643             recvCloseDefinition(event);
644             return true;
645             break;
646           case EVENT_ID_UPDATE_CALENDAR:
647             recvUpdateCalendar(event);
648             return true;
649             break;
650           case EVENT_ID_CREATE_FILE_HEADER :
651             recvCreateFileHeader(event);
652             return true;
653             break;
654           case EVENT_ID_POST_PROCESS:
655             recvPostProcessing(event);
656             return true;
657            case EVENT_ID_SEND_REGISTRY:
658             recvRegistry(event);
659             return true;
660            break;
661
662           default :
663             ERROR("bool CContext::dispatchEvent(CEventServer& event)",
664                    <<"Unknown Event");
665           return false;
666         }
667      }
668   }
669
670   //! Client side: Send a message to server to make it close
671   void CContext::sendCloseDefinition(void)
672   {
673     CEventClient event(getType(),EVENT_ID_CLOSE_DEFINITION);
674     if (client->isServerLeader())
675     {
676       CMessage msg;
677       msg<<this->getIdServer();
678       const std::list<int>& ranks = client->getRanksServerLeader();
679       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
680         event.push(*itRank,1,msg);
681       client->sendEvent(event);
682     }
683     else client->sendEvent(event);
684   }
685
686   //! Server side: Receive a message of client announcing a context close
687   void CContext::recvCloseDefinition(CEventServer& event)
688   {
689
690      CBufferIn* buffer=event.subEvents.begin()->buffer;
691      string id;
692      *buffer>>id;
693      get(id)->closeDefinition();
694   }
695
696   //! Client side: Send a message to update calendar in each time step
697   void CContext::sendUpdateCalendar(int step)
698   {
699     if (!hasServer)
700     {
701       CEventClient event(getType(),EVENT_ID_UPDATE_CALENDAR);
702       if (client->isServerLeader())
703       {
704         CMessage msg;
705         msg<<this->getIdServer()<<step;
706         const std::list<int>& ranks = client->getRanksServerLeader();
707         for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
708           event.push(*itRank,1,msg);
709         client->sendEvent(event);
710       }
711       else client->sendEvent(event);
712     }
713   }
714
715   //! Server side: Receive a message of client annoucing calendar update
716   void CContext::recvUpdateCalendar(CEventServer& event)
717   {
718      CBufferIn* buffer=event.subEvents.begin()->buffer;
719      string id;
720      *buffer>>id;
721      get(id)->recvUpdateCalendar(*buffer);
722   }
723
724   //! Server side: Receive a message of client annoucing calendar update
725   void CContext::recvUpdateCalendar(CBufferIn& buffer)
726   {
727      int step;
728      buffer>>step;
729      updateCalendar(step);
730   }
731
732   //! Client side: Send a message to create header part of netcdf file
733   void CContext::sendCreateFileHeader(void)
734   {
735     CEventClient event(getType(),EVENT_ID_CREATE_FILE_HEADER);
736     if (client->isServerLeader())
737     {
738       CMessage msg;
739       msg<<this->getIdServer();
740       const std::list<int>& ranks = client->getRanksServerLeader();
741       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
742         event.push(*itRank,1,msg) ;
743       client->sendEvent(event);
744     }
745     else client->sendEvent(event);
746   }
747
748   //! Server side: Receive a message of client annoucing the creation of header part of netcdf file
749   void CContext::recvCreateFileHeader(CEventServer& event)
750   {
751      CBufferIn* buffer=event.subEvents.begin()->buffer;
752      string id;
753      *buffer>>id;
754      get(id)->recvCreateFileHeader(*buffer);
755   }
756
757   //! Server side: Receive a message of client annoucing the creation of header part of netcdf file
758   void CContext::recvCreateFileHeader(CBufferIn& buffer)
759   {
760      createFileHeader();
761   }
762
763   //! Client side: Send a message to do some post processing on server
764   void CContext::sendPostProcessing()
765   {
766     if (!hasServer)
767     {
768       CEventClient event(getType(),EVENT_ID_POST_PROCESS);
769       if (client->isServerLeader())
770       {
771         CMessage msg;
772         msg<<this->getIdServer();
773         const std::list<int>& ranks = client->getRanksServerLeader();
774         for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
775           event.push(*itRank,1,msg);
776         client->sendEvent(event);
777       }
778       else client->sendEvent(event);
779     }
780   }
781
782   //! Server side: Receive a message to do some post processing
783   void CContext::recvPostProcessing(CEventServer& event)
784   {
785      CBufferIn* buffer=event.subEvents.begin()->buffer;
786      string id;
787      *buffer>>id;
788      get(id)->recvPostProcessing(*buffer);
789   }
790
791   //! Server side: Receive a message to do some post processing
792   void CContext::recvPostProcessing(CBufferIn& buffer)
793   {
794      CCalendarWrapper::get(CCalendarWrapper::GetDefName())->createCalendar();
795      postProcessing();
796   }
797
798   const StdString& CContext::getIdServer()
799   {
800      if (hasClient)
801      {
802        idServer_ = this->getId();
803        idServer_ += "_server";
804        return idServer_;
805      }
806      if (hasServer) return (this->getId());
807   }
808
809   /*!
810   \brief Do some simple post processings after parsing xml file
811      After the xml file (iodef.xml) is parsed, it is necessary to build all relations among
812   created object, e.g: inhertance among fields, domain, axis. After that, all fiels as well as their parents (reference fields),
813   which will be written out into netcdf files, are processed
814   */
815   void CContext::postProcessing()
816   {
817     if (isPostProcessed) return;
818
819      // Make sure the calendar was correctly created
820      if (!calendar)
821        ERROR("CContext::postProcessing()", << "A calendar must be defined for the context \"" << getId() << "!\"")
822      else if (calendar->getTimeStep() == NoneDu)
823        ERROR("CContext::postProcessing()", << "A timestep must be defined for the context \"" << getId() << "!\"")
824      // Calendar first update to set the current date equals to the start date
825      calendar->update(0);
826
827      // Find all inheritance in xml structure
828      this->solveAllInheritance();
829
830      // Check if some axis, domains or grids are eligible to for compressed indexed output.
831      // Warning: This must be done after solving the inheritance and before the rest of post-processing
832      checkAxisDomainsGridsEligibilityForCompressedOutput();
833
834      // Check if some automatic time series should be generated
835      // Warning: This must be done after solving the inheritance and before the rest of post-processing
836      prepareTimeseries();
837
838      //Initialisation du vecteur 'enabledFiles' contenant la liste des fichiers à sortir.
839      this->findEnabledFiles();
840      this->findEnabledReadModeFiles();
841
842      // Find all enabled fields of each file
843      this->findAllEnabledFields();
844      this->findAllEnabledFieldsInReadModeFiles();
845
846     if (hasClient && !hasServer)
847     {
848      // Try to read attributes of fields in file then fill in corresponding grid (or domain, axis)
849      this->readAttributesOfEnabledFieldsInReadModeFiles();
850     }
851
852      // Only search and rebuild all reference objects of enable fields, don't transform
853      this->solveOnlyRefOfEnabledFields(false);
854
855      // Search and rebuild all reference object of enabled fields
856      this->solveAllRefOfEnabledFields(false);
857
858      // Find all fields with read access from the public API
859      findFieldsWithReadAccess();
860      // and solve the all reference for them
861      solveAllRefOfFieldsWithReadAccess();
862
863      isPostProcessed = true;
864   }
865
866   /*!
867    * Compute the required buffer size to send the attributes (mostly those grid related).
868    *
869    * \param maxEventSize [in/out] the size of the bigger event for each connected server
870    */
871   std::map<int, StdSize> CContext::getAttributesBufferSize(std::map<int, StdSize>& maxEventSize)
872   {
873     std::map<int, StdSize> attributesSize;
874
875     if (hasClient)
876     {
877       size_t numEnabledFiles = this->enabledFiles.size();
878       for (size_t i = 0; i < numEnabledFiles; ++i)
879       {
880         CFile* file = this->enabledFiles[i];
881
882         std::vector<CField*> enabledFields = file->getEnabledFields();
883         size_t numEnabledFields = enabledFields.size();
884         for (size_t j = 0; j < numEnabledFields; ++j)
885         {
886           const std::map<int, StdSize> mapSize = enabledFields[j]->getGridAttributesBufferSize();
887           std::map<int, StdSize>::const_iterator it = mapSize.begin(), itE = mapSize.end();
888           for (; it != itE; ++it)
889           {
890             // If attributesSize[it->first] does not exist, it will be zero-initialized
891             // so we can use it safely without checking for its existance
892             if (attributesSize[it->first] < it->second)
893               attributesSize[it->first] = it->second;
894
895             if (maxEventSize[it->first] < it->second)
896               maxEventSize[it->first] = it->second;
897           }
898         }
899       }
900     }
901
902     return attributesSize;
903   }
904
905   /*!
906    * Compute the required buffer size to send the fields data.
907    *
908    * \param maxEventSize [in/out] the size of the bigger event for each connected server
909    */
910   std::map<int, StdSize> CContext::getDataBufferSize(std::map<int, StdSize>& maxEventSize)
911   {
912     CFile::mode_attr::t_enum mode = hasClient ? CFile::mode_attr::write : CFile::mode_attr::read;
913
914     std::map<int, StdSize> dataSize;
915
916     // Find all reference domain and axis of all active fields
917     size_t numEnabledFiles = this->enabledFiles.size();
918     for (size_t i = 0; i < numEnabledFiles; ++i)
919     {
920       CFile* file = this->enabledFiles[i];
921       CFile::mode_attr::t_enum fileMode = file->mode.isEmpty() ? CFile::mode_attr::write : file->mode.getValue();
922
923       if (fileMode == mode)
924       {
925         std::vector<CField*> enabledFields = file->getEnabledFields();
926         size_t numEnabledFields = enabledFields.size();
927         for (size_t j = 0; j < numEnabledFields; ++j)
928         {
929           const std::map<int, StdSize> mapSize = enabledFields[j]->getGridDataBufferSize();
930           std::map<int, StdSize>::const_iterator it = mapSize.begin(), itE = mapSize.end();
931           for (; it != itE; ++it)
932           {
933             // If dataSize[it->first] does not exist, it will be zero-initialized
934             // so we can use it safely without checking for its existance
935             if (CXios::isOptPerformance)
936               dataSize[it->first] += it->second;
937             else if (dataSize[it->first] < it->second)
938               dataSize[it->first] = it->second;
939
940             if (maxEventSize[it->first] < it->second)
941               maxEventSize[it->first] = it->second;
942           }
943         }
944       }
945     }
946
947     return dataSize;
948   }
949
950   //! Client side: Send infomation of active files (files are enabled to write out)
951   void CContext::sendEnabledFiles()
952   {
953     int size = this->enabledFiles.size();
954
955     // In a context, each type has a root definition, e.g: axis, domain, field.
956     // Every object must be a child of one of these root definition. In this case
957     // all new file objects created on server must be children of the root "file_definition"
958     StdString fileDefRoot("file_definition");
959     CFileGroup* cfgrpPtr = CFileGroup::get(fileDefRoot);
960
961     for (int i = 0; i < size; ++i)
962     {
963       cfgrpPtr->sendCreateChild(this->enabledFiles[i]->getId());
964       this->enabledFiles[i]->sendAllAttributesToServer();
965       this->enabledFiles[i]->sendAddAllVariables();
966     }
967   }
968
969   //! Client side: Send information of active fields (ones are written onto files)
970   void CContext::sendEnabledFields()
971   {
972     int size = this->enabledFiles.size();
973     for (int i = 0; i < size; ++i)
974     {
975       this->enabledFiles[i]->sendEnabledFields();
976     }
977   }
978
979   //! Client side: Check if the defined axis, domains and grids are eligible for compressed indexed output
980   void CContext::checkAxisDomainsGridsEligibilityForCompressedOutput()
981   {
982     if (!hasClient) return;
983
984     const vector<CAxis*> allAxis = CAxis::getAll();
985     for (vector<CAxis*>::const_iterator it = allAxis.begin(); it != allAxis.end(); it++)
986       (*it)->checkEligibilityForCompressedOutput();
987
988     const vector<CDomain*> allDomains = CDomain::getAll();
989     for (vector<CDomain*>::const_iterator it = allDomains.begin(); it != allDomains.end(); it++)
990       (*it)->checkEligibilityForCompressedOutput();
991
992     const vector<CGrid*> allGrids = CGrid::getAll();
993     for (vector<CGrid*>::const_iterator it = allGrids.begin(); it != allGrids.end(); it++)
994       (*it)->checkEligibilityForCompressedOutput();
995   }
996
997   //! Client side: Prepare the timeseries by adding the necessary files
998   void CContext::prepareTimeseries()
999   {
1000     if (!hasClient) return;
1001
1002     const std::vector<CFile*> allFiles = CFile::getAll();
1003     for (size_t i = 0; i < allFiles.size(); i++)
1004     {
1005       CFile* file = allFiles[i];
1006
1007       std::vector<CVariable*> fileVars, fieldVars, vars = file->getAllVariables();
1008       for (size_t k = 0; k < vars.size(); k++)
1009       {
1010         CVariable* var = vars[k];
1011
1012         if (var->ts_target.isEmpty()
1013              || var->ts_target == CVariable::ts_target_attr::file || var->ts_target == CVariable::ts_target_attr::both)
1014           fileVars.push_back(var);
1015
1016         if (!var->ts_target.isEmpty()
1017              && (var->ts_target == CVariable::ts_target_attr::field || var->ts_target == CVariable::ts_target_attr::both))
1018           fieldVars.push_back(var);
1019       }
1020
1021       if (!file->timeseries.isEmpty() && file->timeseries != CFile::timeseries_attr::none)
1022       {
1023         StdString fileNameStr("%file_name%") ;
1024         StdString tsPrefix = !file->ts_prefix.isEmpty() ? file->ts_prefix : fileNameStr ;
1025         
1026         StdString fileName=file->getFileOutputName();
1027         size_t pos=tsPrefix.find(fileNameStr) ;
1028         while (pos!=std::string::npos)
1029         {
1030           tsPrefix=tsPrefix.replace(pos,fileNameStr.size(),fileName) ;
1031           pos=tsPrefix.find(fileNameStr) ;
1032         }
1033       
1034         const std::vector<CField*> allFields = file->getAllFields();
1035         for (size_t j = 0; j < allFields.size(); j++)
1036         {
1037           CField* field = allFields[j];
1038
1039           if (!field->ts_enabled.isEmpty() && field->ts_enabled)
1040           {
1041             CFile* tsFile = CFile::create();
1042             tsFile->duplicateAttributes(file);
1043
1044             // Add variables originating from file and targeted to timeserie file
1045             for (size_t k = 0; k < fileVars.size(); k++)
1046               tsFile->getVirtualVariableGroup()->addChild(fileVars[k]);
1047
1048           
1049             tsFile->name = tsPrefix + "_";
1050             if (!field->name.isEmpty())
1051               tsFile->name.get() += field->name;
1052             else if (field->hasDirectFieldReference()) // We cannot use getBaseFieldReference() just yet
1053               tsFile->name.get() += field->field_ref;
1054             else
1055               tsFile->name.get() += field->getId();
1056
1057             if (!field->ts_split_freq.isEmpty())
1058               tsFile->split_freq = field->ts_split_freq;
1059
1060             CField* tsField = tsFile->addField();
1061             tsField->field_ref = field->getId();
1062
1063             // Add variables originating from file and targeted to timeserie field
1064             for (size_t k = 0; k < fieldVars.size(); k++)
1065               tsField->getVirtualVariableGroup()->addChild(fieldVars[k]);
1066
1067             vars = field->getAllVariables();
1068             for (size_t k = 0; k < vars.size(); k++)
1069             {
1070               CVariable* var = vars[k];
1071
1072               // Add variables originating from field and targeted to timeserie field
1073               if (var->ts_target.isEmpty()
1074                    || var->ts_target == CVariable::ts_target_attr::field || var->ts_target == CVariable::ts_target_attr::both)
1075                 tsField->getVirtualVariableGroup()->addChild(var);
1076
1077               // Add variables originating from field and targeted to timeserie file
1078               if (!var->ts_target.isEmpty()
1079                    && (var->ts_target == CVariable::ts_target_attr::file || var->ts_target == CVariable::ts_target_attr::both))
1080                 tsFile->getVirtualVariableGroup()->addChild(var);
1081             }
1082
1083             tsFile->solveFieldRefInheritance(true);
1084
1085             if (file->timeseries == CFile::timeseries_attr::exclusive)
1086               field->enabled = false;
1087           }
1088         }
1089
1090         // Finally disable the original file is need be
1091         if (file->timeseries == CFile::timeseries_attr::only)
1092          file->enabled = false;
1093       }
1094     }
1095   }
1096
1097   //! Client side: Send information of reference grid of active fields
1098   void CContext::sendRefGrid()
1099   {
1100     std::set<StdString> gridIds;
1101     int sizeFile = this->enabledFiles.size();
1102     CFile* filePtr(NULL);
1103
1104     // Firstly, find all reference grids of all active fields
1105     for (int i = 0; i < sizeFile; ++i)
1106     {
1107       filePtr = this->enabledFiles[i];
1108       std::vector<CField*> enabledFields = filePtr->getEnabledFields();
1109       int sizeField = enabledFields.size();
1110       for (int numField = 0; numField < sizeField; ++numField)
1111       {
1112         if (0 != enabledFields[numField]->getRelGrid())
1113           gridIds.insert(CGrid::get(enabledFields[numField]->getRelGrid())->getId());
1114       }
1115     }
1116
1117     // Create all reference grids on server side
1118     StdString gridDefRoot("grid_definition");
1119     CGridGroup* gridPtr = CGridGroup::get(gridDefRoot);
1120     std::set<StdString>::const_iterator it, itE = gridIds.end();
1121     for (it = gridIds.begin(); it != itE; ++it)
1122     {
1123       gridPtr->sendCreateChild(*it);
1124       CGrid::get(*it)->sendAllAttributesToServer();
1125       CGrid::get(*it)->sendAllDomains();
1126       CGrid::get(*it)->sendAllAxis();
1127       CGrid::get(*it)->sendAllScalars();
1128     }
1129   }
1130
1131
1132   //! Client side: Send information of reference domain and axis of active fields
1133   void CContext::sendRefDomainsAxis()
1134   {
1135     std::set<StdString> domainIds, axisIds, scalarIds;
1136
1137     // Find all reference domain and axis of all active fields
1138     int numEnabledFiles = this->enabledFiles.size();
1139     for (int i = 0; i < numEnabledFiles; ++i)
1140     {
1141       std::vector<CField*> enabledFields = this->enabledFiles[i]->getEnabledFields();
1142       int numEnabledFields = enabledFields.size();
1143       for (int j = 0; j < numEnabledFields; ++j)
1144       {
1145         const std::vector<StdString>& prDomAxisScalarId = enabledFields[j]->getRefDomainAxisIds();
1146         if ("" != prDomAxisScalarId[0]) domainIds.insert(prDomAxisScalarId[0]);
1147         if ("" != prDomAxisScalarId[1]) axisIds.insert(prDomAxisScalarId[1]);
1148         if ("" != prDomAxisScalarId[2]) scalarIds.insert(prDomAxisScalarId[2]);
1149       }
1150     }
1151
1152     // Create all reference axis on server side
1153     std::set<StdString>::iterator itDom, itAxis, itScalar;
1154     std::set<StdString>::const_iterator itE;
1155
1156     StdString scalarDefRoot("scalar_definition");
1157     CScalarGroup* scalarPtr = CScalarGroup::get(scalarDefRoot);
1158     itE = scalarIds.end();
1159     for (itScalar = scalarIds.begin(); itScalar != itE; ++itScalar)
1160     {
1161       if (!itScalar->empty())
1162       {
1163         scalarPtr->sendCreateChild(*itScalar);
1164         CScalar::get(*itScalar)->sendAllAttributesToServer();
1165       }
1166     }
1167
1168     StdString axiDefRoot("axis_definition");
1169     CAxisGroup* axisPtr = CAxisGroup::get(axiDefRoot);
1170     itE = axisIds.end();
1171     for (itAxis = axisIds.begin(); itAxis != itE; ++itAxis)
1172     {
1173       if (!itAxis->empty())
1174       {
1175         axisPtr->sendCreateChild(*itAxis);
1176         CAxis::get(*itAxis)->sendAllAttributesToServer();
1177       }
1178     }
1179
1180     // Create all reference domains on server side
1181     StdString domDefRoot("domain_definition");
1182     CDomainGroup* domPtr = CDomainGroup::get(domDefRoot);
1183     itE = domainIds.end();
1184     for (itDom = domainIds.begin(); itDom != itE; ++itDom)
1185     {
1186       if (!itDom->empty()) {
1187          domPtr->sendCreateChild(*itDom);
1188          CDomain::get(*itDom)->sendAllAttributesToServer();
1189       }
1190     }
1191   }
1192
1193   //! Update calendar in each time step
1194   void CContext::updateCalendar(int step)
1195   {
1196      info(50) << "updateCalendar : before : " << calendar->getCurrentDate() << endl;
1197      calendar->update(step);
1198      info(50) << "updateCalendar : after : " << calendar->getCurrentDate() << endl;
1199
1200      if (hasClient)
1201      {
1202        checkPrefetchingOfEnabledReadModeFiles();
1203        garbageCollector.invalidate(calendar->getCurrentDate());
1204      }
1205   }
1206
1207   //! Server side: Create header of netcdf file
1208   void CContext::createFileHeader(void )
1209   {
1210      vector<CFile*>::const_iterator it;
1211
1212      for (it=enabledFiles.begin(); it != enabledFiles.end(); it++)
1213      {
1214         (*it)->initFile();
1215      }
1216   }
1217
1218   //! Get current context
1219   CContext* CContext::getCurrent(void)
1220   {
1221     return CObjectFactory::GetObject<CContext>(CObjectFactory::GetCurrentContextId()).get();
1222   }
1223
1224   /*!
1225   \brief Set context with an id be the current context
1226   \param [in] id identity of context to be set to current
1227   */
1228   void CContext::setCurrent(const string& id)
1229   {
1230     CObjectFactory::SetCurrentContextId(id);
1231     CGroupFactory::SetCurrentContextId(id);
1232   }
1233
1234  /*!
1235  \brief Create a context with specific id
1236  \param [in] id identity of new context
1237  \return pointer to the new context or already-existed one with identity id
1238  */
1239  CContext* CContext::create(const StdString& id)
1240  {
1241    CContext::setCurrent(id);
1242
1243    bool hasctxt = CContext::has(id);
1244    CContext* context = CObjectFactory::CreateObject<CContext>(id).get();
1245    getRoot();
1246    if (!hasctxt) CGroupFactory::AddChild(root, context->getShared());
1247
1248#define DECLARE_NODE(Name_, name_) \
1249    C##Name_##Definition::create(C##Name_##Definition::GetDefName());
1250#define DECLARE_NODE_PAR(Name_, name_)
1251#include "node_type.conf"
1252
1253    return (context);
1254  }
1255
1256
1257
1258     //! Server side: Receive a message to do some post processing
1259  void CContext::recvRegistry(CEventServer& event)
1260  {
1261    CBufferIn* buffer=event.subEvents.begin()->buffer;
1262    string id;
1263    *buffer>>id;
1264    get(id)->recvRegistry(*buffer);
1265  }
1266
1267  void CContext::recvRegistry(CBufferIn& buffer)
1268  {
1269    if (server->intraCommRank==0)
1270    {
1271      CRegistry registry(server->intraComm) ;
1272      registry.fromBuffer(buffer) ;
1273      registryOut->mergeRegistry(registry) ;
1274    }
1275  }
1276
1277  void CContext::sendRegistry(void)
1278  {
1279    registryOut->hierarchicalGatherRegistry() ;
1280
1281    CEventClient event(CContext::GetType(), CContext::EVENT_ID_SEND_REGISTRY);
1282    if (client->isServerLeader())
1283    {
1284       CMessage msg ;
1285       msg<<this->getIdServer();
1286       if (client->clientRank==0) msg<<*registryOut ;
1287       const std::list<int>& ranks = client->getRanksServerLeader();
1288       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
1289         event.push(*itRank,1,msg);
1290       client->sendEvent(event);
1291     }
1292     else client->sendEvent(event);
1293  }
1294
1295} // namespace xios
Note: See TracBrowser for help on using the repository browser.