source: XIOS/dev/branch_yushan_merged/src/node/context.cpp @ 1134

Last change on this file since 1134 was 1134, checked in by yushan, 7 years ago

branch merged with trunk r1130

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
File size: 43.5 KB
Line 
1#include "context.hpp"
2#include "attribute_template.hpp"
3#include "object_template.hpp"
4#include "group_template.hpp"
5
6#include "calendar_type.hpp"
7#include "duration.hpp"
8
9#include "context_client.hpp"
10#include "context_server.hpp"
11#include "nc4_data_output.hpp"
12#include "node_type.hpp"
13#include "message.hpp"
14#include "type.hpp"
15#include "xios_spl.hpp"
16
17
18namespace xios {
19
20  //shared_ptr<CContextGroup> CContext::root;
21  shared_ptr<CContextGroup> * CContext::root_ptr = 0;
22
23   /// ////////////////////// Définitions ////////////////////// ///
24
25   CContext::CContext(void)
26      : CObjectTemplate<CContext>(), CContextAttributes()
27      , calendar(), hasClient(false), hasServer(false), isPostProcessed(false), finalized(false)
28      , idServer_(), client(0), server(0)
29   { /* Ne rien faire de plus */ }
30
31   CContext::CContext(const StdString & id)
32      : CObjectTemplate<CContext>(id), CContextAttributes()
33      , calendar(), hasClient(false), hasServer(false), isPostProcessed(false), finalized(false)
34      , idServer_(), client(0), server(0)
35   { /* Ne rien faire de plus */ }
36
37   CContext::~CContext(void)
38   {
39     delete client;
40     delete server;
41   }
42
43   //----------------------------------------------------------------
44   //! Get name of context
45   StdString CContext::GetName(void)   { return (StdString("context")); }
46   StdString CContext::GetDefName(void){ return (CContext::GetName()); }
47   ENodeType CContext::GetType(void)   { return (eContext); }
48
49   //----------------------------------------------------------------
50
51   /*!
52   \brief Get context group (context root)
53   \return Context root
54   */
55   CContextGroup* CContext::getRoot(void)
56   {
57      //if (root.get()==NULL) root=shared_ptr<CContextGroup>(new CContextGroup(xml::CXMLNode::GetRootName()));
58      //return root.get();
59
60      //static shared_ptr<CContextGroup> *root_ptr;
61      if(root_ptr == 0) //root_ptr = new shared_ptr<CContextGroup>;
62      // if (root_ptr->get()==NULL)
63      root_ptr = new shared_ptr<CContextGroup>(new CContextGroup(xml::CXMLNode::GetRootName()));
64      return root_ptr->get();
65   }
66
67   //----------------------------------------------------------------
68
69   /*!
70   \brief Get calendar of a context
71   \return Calendar
72   */
73   boost::shared_ptr<CCalendar> CContext::getCalendar(void) const
74   {
75      return (this->calendar);
76   }
77
78   //----------------------------------------------------------------
79
80   /*!
81   \brief Set a context with a calendar
82   \param[in] newCalendar new calendar
83   */
84   void CContext::setCalendar(boost::shared_ptr<CCalendar> newCalendar)
85   {
86      this->calendar = newCalendar;
87   }
88
89   //----------------------------------------------------------------
90   /*!
91   \brief Parse xml file and write information into context object
92   \param [in] node xmld node corresponding in xml file
93   */
94   void CContext::parse(xml::CXMLNode & node)
95   {
96      CContext::SuperClass::parse(node);
97
98      // PARSING POUR GESTION DES ENFANTS
99      xml::THashAttributes attributes = node.getAttributes();
100
101      if (attributes.end() != attributes.find("src"))
102      {
103         StdIFStream ifs ( attributes["src"].c_str() , StdIFStream::in );
104         if ( (ifs.rdstate() & std::ifstream::failbit ) != 0 )
105            ERROR("void CContext::parse(xml::CXMLNode & node)",
106                  <<endl<< "Can not open <"<<attributes["src"].c_str()<<"> file" );
107         if (!ifs.good())
108            ERROR("CContext::parse(xml::CXMLNode & node)",
109                  << "[ filename = " << attributes["src"] << " ] Bad xml stream !");
110         xml::CXMLParser::ParseInclude(ifs, attributes["src"], *this);
111      }
112
113      if (node.getElementName().compare(CContext::GetName()))
114         DEBUG("Le noeud is wrong defined but will be considered as a context !");
115
116      if (!(node.goToChildElement()))
117      {
118         DEBUG("Le context ne contient pas d'enfant !");
119      }
120      else
121      {
122         do { // Parcours des contextes pour traitement.
123
124            StdString name = node.getElementName();
125            attributes.clear();
126            attributes = node.getAttributes();
127
128            if (attributes.end() != attributes.find("id"))
129            {
130              DEBUG(<< "Definition node has an id,"
131                    << "it will not be taking account !");
132            }
133
134#define DECLARE_NODE(Name_, name_)    \
135   if (name.compare(C##Name_##Definition::GetDefName()) == 0) \
136   { C##Name_##Definition::create(C##Name_##Definition::GetDefName()) -> parse(node); continue; }
137#define DECLARE_NODE_PAR(Name_, name_)
138#include "node_type.conf"
139
140            DEBUG(<< "The element \'"     << name
141                  << "\' in the context \'" << CContext::getCurrent()->getId()
142                  << "\' is not a definition !");
143
144         } while (node.goToNextElement());
145
146         node.goToParentElement(); // Retour au parent
147      }
148   }
149
150   //----------------------------------------------------------------
151   //! Show tree structure of context
152   void CContext::ShowTree(StdOStream & out)
153   {
154      StdString currentContextId = CContext::getCurrent() -> getId();
155      std::vector<CContext*> def_vector =
156         CContext::getRoot()->getChildList();
157      std::vector<CContext*>::iterator
158         it = def_vector.begin(), end = def_vector.end();
159
160      out << "<? xml version=\"1.0\" ?>" << std::endl;
161      out << "<"  << xml::CXMLNode::GetRootName() << " >" << std::endl;
162
163      for (; it != end; it++)
164      {
165         CContext* context = *it;
166         CContext::setCurrent(context->getId());
167         out << *context << std::endl;
168      }
169
170      out << "</" << xml::CXMLNode::GetRootName() << " >" << std::endl;
171      CContext::setCurrent(currentContextId);
172   }
173
174
175   //----------------------------------------------------------------
176
177   //! Convert context object into string (to print)
178   StdString CContext::toString(void) const
179   {
180      StdOStringStream oss;
181      oss << "<" << CContext::GetName()
182          << " id=\"" << this->getId() << "\" "
183          << SuperClassAttribute::toString() << ">" << std::endl;
184      if (!this->hasChild())
185      {
186         //oss << "<!-- No definition -->" << std::endl; // fait planter l'incrémentation
187      }
188      else
189      {
190
191#define DECLARE_NODE(Name_, name_)    \
192   if (C##Name_##Definition::has(C##Name_##Definition::GetDefName())) \
193   oss << * C##Name_##Definition::get(C##Name_##Definition::GetDefName()) << std::endl;
194#define DECLARE_NODE_PAR(Name_, name_)
195#include "node_type.conf"
196
197      }
198
199      oss << "</" << CContext::GetName() << " >";
200
201      return (oss.str());
202   }
203
204   //----------------------------------------------------------------
205
206   /*!
207   \brief Find all inheritace among objects in a context.
208   \param [in] apply (true) write attributes of parent into ones of child if they are empty
209                     (false) write attributes of parent into a new container of child
210   \param [in] parent unused
211   */
212   void CContext::solveDescInheritance(bool apply, const CAttributeMap * const UNUSED(parent))
213   {
214#define DECLARE_NODE(Name_, name_)    \
215   if (C##Name_##Definition::has(C##Name_##Definition::GetDefName())) \
216     C##Name_##Definition::get(C##Name_##Definition::GetDefName())->solveDescInheritance(apply);
217#define DECLARE_NODE_PAR(Name_, name_)
218#include "node_type.conf"
219   }
220
221   //----------------------------------------------------------------
222
223   //! Verify if all root definition in the context have child.
224   bool CContext::hasChild(void) const
225   {
226      return (
227#define DECLARE_NODE(Name_, name_)    \
228   C##Name_##Definition::has(C##Name_##Definition::GetDefName())   ||
229#define DECLARE_NODE_PAR(Name_, name_)
230#include "node_type.conf"
231      false);
232}
233
234   //----------------------------------------------------------------
235
236   void CContext::CleanTree(void)
237   {
238#define DECLARE_NODE(Name_, name_) C##Name_##Definition::ClearAllAttributes();
239#define DECLARE_NODE_PAR(Name_, name_)
240#include "node_type.conf"
241   }
242   ///---------------------------------------------------------------
243
244   //! Initialize client side
245   void CContext::initClient(ep_lib::MPI_Comm intraComm, ep_lib::MPI_Comm interComm, CContext* cxtServer /*= 0*/)
246   {
247     hasClient=true;
248     client = new CContextClient(this, intraComm, interComm, cxtServer);
249
250     int tmp_rank;
251     MPI_Comm_rank(intraComm, &tmp_rank);
252     MPI_Barrier(intraComm);
253     
254
255     registryIn=new CRegistry(intraComm);
256     registryIn->setPath(getId()) ;
257     if (client->clientRank==0) registryIn->fromFile("xios_registry.bin") ;
258     registryIn->bcastRegistry() ;
259
260     registryOut=new CRegistry(intraComm) ;
261     registryOut->setPath(getId()) ;
262
263     ep_lib::MPI_Comm intraCommServer, interCommServer;
264     if (cxtServer) // Attached mode
265     {
266       intraCommServer = intraComm;
267       interCommServer = interComm;
268     }
269     else
270     {
271       MPI_Comm_dup(intraComm, &intraCommServer);
272       comms.push_back(intraCommServer);
273       MPI_Comm_dup(interComm, &interCommServer);
274       comms.push_back(interCommServer);
275     }
276     server = new CContextServer(this,intraCommServer,interCommServer);
277   }
278
279   void CContext::setClientServerBuffer()
280   {
281     size_t minBufferSize = CXios::minBufferSize;
282#define DECLARE_NODE(Name_, name_)    \
283     if (minBufferSize < sizeof(C##Name_##Definition)) minBufferSize = sizeof(C##Name_##Definition);
284#define DECLARE_NODE_PAR(Name_, name_)
285#include "node_type.conf"
286#undef DECLARE_NODE
287#undef DECLARE_NODE_PAR
288
289     std::map<int, StdSize> maxEventSize;
290     std::map<int, StdSize> bufferSize = getAttributesBufferSize(maxEventSize);
291     std::map<int, StdSize> dataBufferSize = getDataBufferSize(maxEventSize);
292
293     std::map<int, StdSize>::iterator it, ite = dataBufferSize.end();
294     for (it = dataBufferSize.begin(); it != ite; ++it)
295       if (it->second > bufferSize[it->first]) bufferSize[it->first] = it->second;
296
297     ite = bufferSize.end();
298     for (it = bufferSize.begin(); it != ite; ++it)
299     {
300       it->second *= CXios::bufferSizeFactor;
301       if (it->second < minBufferSize) it->second = minBufferSize;
302     }
303
304     // We consider that the minimum buffer size is also the minimum event size
305     ite = maxEventSize.end();
306     for (it = maxEventSize.begin(); it != ite; ++it)
307       if (it->second < minBufferSize) it->second = minBufferSize;
308
309     if (client->isServerLeader())
310     {
311       const std::list<int>& ranks = client->getRanksServerLeader();
312       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
313         if (!bufferSize.count(*itRank)) bufferSize[*itRank] = maxEventSize[*itRank] = minBufferSize;
314     }
315
316     client->setBufferSize(bufferSize, maxEventSize);
317   }
318
319   //! Verify whether a context is initialized
320   bool CContext::isInitialized(void)
321   {
322     return hasClient;
323   }
324
325   //! Initialize server
326   void CContext::initServer(ep_lib::MPI_Comm intraComm, ep_lib::MPI_Comm interComm, CContext* cxtClient /*= 0*/)
327   {
328     hasServer=true;
329     server = new CContextServer(this,intraComm,interComm);
330
331     registryIn=new CRegistry(intraComm);
332     registryIn->setPath(getId()) ;
333     if (server->intraCommRank==0) registryIn->fromFile("xios_registry.bin") ;
334     registryIn->bcastRegistry() ;
335     registryOut=new CRegistry(intraComm) ;
336     registryOut->setPath(getId()) ;
337
338     ep_lib::MPI_Comm intraCommClient, interCommClient;
339     if (cxtClient) // Attached mode
340     {
341       intraCommClient = intraComm;
342       interCommClient = interComm;
343     }
344     else
345     {
346       MPI_Comm_dup(intraComm, &intraCommClient);
347       comms.push_back(intraCommClient);
348       MPI_Comm_dup(interComm, &interCommClient);
349       comms.push_back(interCommClient);
350     }
351     client = new CContextClient(this,intraCommClient,interCommClient, cxtClient);
352   }
353
354   //! Try to send the buffers and receive possible answers
355   bool CContext::checkBuffersAndListen(void)
356   {
357     client->checkBuffers();
358
359     bool hasTmpBufferedEvent = client->hasTemporarilyBufferedEvent();
360     if (hasTmpBufferedEvent)
361       hasTmpBufferedEvent = !client->sendTemporarilyBufferedEvent();
362
363     // Don't process events if there is a temporarily buffered event
364     return server->eventLoop(!hasTmpBufferedEvent);
365   }
366
367   //! Terminate a context
368   void CContext::finalize(void)
369   {
370      if (!finalized)
371      {
372        finalized = true;
373        if (hasClient) sendRegistry() ;
374        client->finalize();
375        while (!server->hasFinished())
376        {
377          server->eventLoop();
378        }
379
380        if (hasServer)
381        {
382          closeAllFile();
383          registryOut->hierarchicalGatherRegistry() ;
384          if (server->intraCommRank==0) CXios::globalRegistry->mergeRegistry(*registryOut) ;
385        }
386
387        for (std::list<ep_lib::MPI_Comm>::iterator it = comms.begin(); it != comms.end(); ++it)
388          MPI_Comm_free(&(*it));
389        comms.clear();
390      }
391   }
392
393   /*!
394   \brief Close all the context defintion and do processing data
395      After everything is well defined on client side, they will be processed and sent to server
396   From the version 2.0, sever and client work no more on the same database. Moreover, client(s) will send
397   all necessary information to server, from which each server can build its own database.
398   Because the role of server is to write out field data on a specific netcdf file,
399   the only information that it needs is the enabled files
400   and the active fields (fields will be written onto active files)
401   */
402   void CContext::closeDefinition(void)
403   {
404     // There is nothing client need to send to server
405     if (hasClient)
406     {
407       // After xml is parsed, there are some more works with post processing
408       postProcessing();
409     }
410     setClientServerBuffer();
411
412     if (hasClient && !hasServer)
413     {
414      // Send all attributes of current context to server
415      this->sendAllAttributesToServer();
416
417      // Send all attributes of current calendar
418      CCalendarWrapper::get(CCalendarWrapper::GetDefName())->sendAllAttributesToServer();
419
420      // We have enough information to send to server
421      // First of all, send all enabled files
422       sendEnabledFiles();
423
424      // Then, send all enabled fields
425       sendEnabledFields();
426
427      // At last, we have all info of domain and axis, then send them
428       sendRefDomainsAxis();
429
430      // After that, send all grid (if any)
431       sendRefGrid();
432    }
433
434    // We have a xml tree on the server side and now, it should be also processed
435    if (hasClient && !hasServer) sendPostProcessing();
436
437    // There are some processings that should be done after all of above. For example: check mask or index
438    if (hasClient)
439    {
440      this->buildFilterGraphOfEnabledFields();
441      buildFilterGraphOfFieldsWithReadAccess();
442      this->solveAllRefOfEnabledFields(true);
443    }
444
445    // Now tell server that it can process all messages from client
446    if (hasClient && !hasServer) this->sendCloseDefinition();
447
448    // Nettoyage de l'arborescence
449    if (hasClient && !hasServer) CleanTree(); // Only on client side??
450
451    if (hasClient)
452    {
453      sendCreateFileHeader();
454
455      startPrefetchingOfEnabledReadModeFiles();
456    }
457   }
458
459   void CContext::findAllEnabledFields(void)
460   {
461     for (unsigned int i = 0; i < this->enabledFiles.size(); i++)
462     (void)this->enabledFiles[i]->getEnabledFields();
463   }
464
465   void CContext::findAllEnabledFieldsInReadModeFiles(void)
466   {
467     for (unsigned int i = 0; i < this->enabledReadModeFiles.size(); ++i)
468     (void)this->enabledReadModeFiles[i]->getEnabledFields();
469   }
470
471   void CContext::readAttributesOfEnabledFieldsInReadModeFiles()
472   {
473      for (unsigned int i = 0; i < this->enabledReadModeFiles.size(); ++i)
474        (void)this->enabledReadModeFiles[i]->readAttributesOfEnabledFieldsInReadMode();
475   }
476
477   void CContext::solveOnlyRefOfEnabledFields(bool sendToServer)
478   {
479     int size = this->enabledFiles.size();
480     for (int i = 0; i < size; ++i)
481     {
482       this->enabledFiles[i]->solveOnlyRefOfEnabledFields(sendToServer);
483     }
484
485     for (int i = 0; i < size; ++i)
486     {
487       this->enabledFiles[i]->generateNewTransformationGridDest();
488     }
489   }
490
491   void CContext::solveAllRefOfEnabledFields(bool sendToServer)
492   {
493     int size = this->enabledFiles.size();
494     for (int i = 0; i < size; ++i)
495     {
496       this->enabledFiles[i]->solveAllRefOfEnabledFields(sendToServer);
497     }
498   }
499
500   void CContext::buildFilterGraphOfEnabledFields()
501   {
502     int size = this->enabledFiles.size();
503     for (int i = 0; i < size; ++i)
504     {
505       this->enabledFiles[i]->buildFilterGraphOfEnabledFields(garbageCollector);
506     }
507   }
508
509   void CContext::startPrefetchingOfEnabledReadModeFiles()
510   {
511     int size = enabledReadModeFiles.size();
512     for (int i = 0; i < size; ++i)
513     {
514        enabledReadModeFiles[i]->prefetchEnabledReadModeFields();
515     }
516   }
517
518   void CContext::checkPrefetchingOfEnabledReadModeFiles()
519   {
520     int size = enabledReadModeFiles.size();
521     for (int i = 0; i < size; ++i)
522     {
523        enabledReadModeFiles[i]->prefetchEnabledReadModeFieldsIfNeeded();
524     }
525   }
526
527  void CContext::findFieldsWithReadAccess(void)
528  {
529    fieldsWithReadAccess.clear();
530    const vector<CField*> allFields = CField::getAll();
531    for (size_t i = 0; i < allFields.size(); ++i)
532    {
533      CField* field = allFields[i];
534
535      if (field->file && !field->file->mode.isEmpty() && field->file->mode == CFile::mode_attr::read)
536        field->read_access = true;
537      else if (!field->read_access.isEmpty() && field->read_access && (field->enabled.isEmpty() || field->enabled))
538        fieldsWithReadAccess.push_back(field);
539    }
540  }
541
542  void CContext::solveAllRefOfFieldsWithReadAccess()
543  {
544    for (size_t i = 0; i < fieldsWithReadAccess.size(); ++i)
545      fieldsWithReadAccess[i]->solveAllReferenceEnabledField(false);
546  }
547
548  void CContext::buildFilterGraphOfFieldsWithReadAccess()
549  {
550    for (size_t i = 0; i < fieldsWithReadAccess.size(); ++i)
551      fieldsWithReadAccess[i]->buildFilterGraph(garbageCollector, true);
552  }
553
554   void CContext::solveAllInheritance(bool apply)
555   {
556     // Résolution des héritages descendants (càd des héritages de groupes)
557     // pour chacun des contextes.
558      solveDescInheritance(apply);
559
560     // Résolution des héritages par référence au niveau des fichiers.
561      const vector<CFile*> allFiles=CFile::getAll();
562      const vector<CGrid*> allGrids= CGrid::getAll();
563
564     //if (hasClient && !hasServer)
565      if (hasClient)
566      {
567        for (unsigned int i = 0; i < allFiles.size(); i++)
568          allFiles[i]->solveFieldRefInheritance(apply);
569      }
570
571      unsigned int vecSize = allGrids.size();
572      unsigned int i = 0;
573      for (i = 0; i < vecSize; ++i)
574        allGrids[i]->solveDomainAxisRefInheritance(apply);
575
576   }
577
578   void CContext::findEnabledFiles(void)
579   {
580      const std::vector<CFile*> allFiles = CFile::getAll();
581      const CDate& initDate = calendar->getInitDate();
582
583      for (unsigned int i = 0; i < allFiles.size(); i++)
584         if (!allFiles[i]->enabled.isEmpty()) // Si l'attribut 'enabled' est défini.
585         {
586            if (allFiles[i]->enabled.getValue()) // Si l'attribut 'enabled' est fixé à vrai.
587            {
588              if ((initDate + allFiles[i]->output_freq.getValue()) < (initDate + this->getCalendar()->getTimeStep()))
589              {
590                error(0)<<"WARNING: void CContext::findEnabledFiles()"<<endl
591                    << "Output frequency in file \""<<allFiles[i]->getFileOutputName()
592                    <<"\" is less than the time step. File will not be written."<<endl;
593              }
594              else
595               enabledFiles.push_back(allFiles[i]);
596            }
597         }
598         else
599         {
600           if ( (initDate + allFiles[i]->output_freq.getValue()) < (initDate + this->getCalendar()->getTimeStep()))
601           {
602             error(0)<<"WARNING: void CContext::findEnabledFiles()"<<endl
603                 << "Output frequency in file \""<<allFiles[i]->getFileOutputName()
604                 <<"\" is less than the time step. File will not be written."<<endl;
605           }
606           else
607             enabledFiles.push_back(allFiles[i]); // otherwise true by default
608         }
609
610      if (enabledFiles.size() == 0)
611         DEBUG(<<"Aucun fichier ne va être sorti dans le contexte nommé \""
612               << getId() << "\" !");
613   }
614
615   void CContext::findEnabledReadModeFiles(void)
616   {
617     int size = this->enabledFiles.size();
618     for (int i = 0; i < size; ++i)
619     {
620       if (!enabledFiles[i]->mode.isEmpty() && enabledFiles[i]->mode.getValue() == CFile::mode_attr::read)
621        enabledReadModeFiles.push_back(enabledFiles[i]);
622     }
623   }
624
625   void CContext::closeAllFile(void)
626   {
627     std::vector<CFile*>::const_iterator
628            it = this->enabledFiles.begin(), end = this->enabledFiles.end();
629
630     for (; it != end; it++)
631     {
632       info(30)<<"Closing File : "<<(*it)->getId()<<endl;
633       (*it)->close();
634     }
635   }
636
637   /*!
638   \brief Dispatch event received from client
639      Whenever a message is received in buffer of server, it will be processed depending on
640   its event type. A new event type should be added in the switch list to make sure
641   it processed on server side.
642   \param [in] event: Received message
643   */
644   bool CContext::dispatchEvent(CEventServer& event)
645   {
646
647      if (SuperClass::dispatchEvent(event)) return true;
648      else
649      {
650        switch(event.type)
651        {
652           case EVENT_ID_CLOSE_DEFINITION :
653             recvCloseDefinition(event);
654             return true;
655             break;
656           case EVENT_ID_UPDATE_CALENDAR:
657             recvUpdateCalendar(event);
658             return true;
659             break;
660           case EVENT_ID_CREATE_FILE_HEADER :
661             recvCreateFileHeader(event);
662             return true;
663             break;
664           case EVENT_ID_POST_PROCESS:
665             recvPostProcessing(event);
666             return true;
667            case EVENT_ID_SEND_REGISTRY:
668             recvRegistry(event);
669             return true;
670            break;
671
672           default :
673             ERROR("bool CContext::dispatchEvent(CEventServer& event)",
674                    <<"Unknown Event");
675           return false;
676         }
677      }
678   }
679
680   //! Client side: Send a message to server to make it close
681   void CContext::sendCloseDefinition(void)
682   {
683     CEventClient event(getType(),EVENT_ID_CLOSE_DEFINITION);
684     if (client->isServerLeader())
685     {
686       CMessage msg;
687       msg<<this->getIdServer();
688       const std::list<int>& ranks = client->getRanksServerLeader();
689       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
690         event.push(*itRank,1,msg);
691       client->sendEvent(event);
692     }
693     else client->sendEvent(event);
694   }
695
696   //! Server side: Receive a message of client announcing a context close
697   void CContext::recvCloseDefinition(CEventServer& event)
698   {
699
700      CBufferIn* buffer=event.subEvents.begin()->buffer;
701      string id;
702      *buffer>>id;
703      get(id)->closeDefinition();
704   }
705
706   //! Client side: Send a message to update calendar in each time step
707   void CContext::sendUpdateCalendar(int step)
708   {
709     if (!hasServer)
710     {
711       CEventClient event(getType(),EVENT_ID_UPDATE_CALENDAR);
712       if (client->isServerLeader())
713       {
714         CMessage msg;
715         msg<<this->getIdServer()<<step;
716         const std::list<int>& ranks = client->getRanksServerLeader();
717         for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
718           event.push(*itRank,1,msg);
719         client->sendEvent(event);
720       }
721       else client->sendEvent(event);
722     }
723   }
724
725   //! Server side: Receive a message of client annoucing calendar update
726   void CContext::recvUpdateCalendar(CEventServer& event)
727   {
728      CBufferIn* buffer=event.subEvents.begin()->buffer;
729      string id;
730      *buffer>>id;
731      get(id)->recvUpdateCalendar(*buffer);
732   }
733
734   //! Server side: Receive a message of client annoucing calendar update
735   void CContext::recvUpdateCalendar(CBufferIn& buffer)
736   {
737      int step;
738      buffer>>step;
739      updateCalendar(step);
740   }
741
742   //! Client side: Send a message to create header part of netcdf file
743   void CContext::sendCreateFileHeader(void)
744   {
745     CEventClient event(getType(),EVENT_ID_CREATE_FILE_HEADER);
746     if (client->isServerLeader())
747     {
748       CMessage msg;
749       msg<<this->getIdServer();
750       const std::list<int>& ranks = client->getRanksServerLeader();
751       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
752         event.push(*itRank,1,msg) ;
753       client->sendEvent(event);
754     }
755     else client->sendEvent(event);
756   }
757
758   //! Server side: Receive a message of client annoucing the creation of header part of netcdf file
759   void CContext::recvCreateFileHeader(CEventServer& event)
760   {
761      CBufferIn* buffer=event.subEvents.begin()->buffer;
762      string id;
763      *buffer>>id;
764      get(id)->recvCreateFileHeader(*buffer);
765   }
766
767   //! Server side: Receive a message of client annoucing the creation of header part of netcdf file
768   void CContext::recvCreateFileHeader(CBufferIn& buffer)
769   {
770      createFileHeader();
771   }
772
773   //! Client side: Send a message to do some post processing on server
774   void CContext::sendPostProcessing()
775   {
776     if (!hasServer)
777     {
778       CEventClient event(getType(),EVENT_ID_POST_PROCESS);
779       if (client->isServerLeader())
780       {
781         CMessage msg;
782         msg<<this->getIdServer();
783         const std::list<int>& ranks = client->getRanksServerLeader();
784         for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
785           event.push(*itRank,1,msg);
786         client->sendEvent(event);
787       }
788       else client->sendEvent(event);
789     }
790   }
791
792   //! Server side: Receive a message to do some post processing
793   void CContext::recvPostProcessing(CEventServer& event)
794   {
795      CBufferIn* buffer=event.subEvents.begin()->buffer;
796      string id;
797      *buffer>>id;
798      get(id)->recvPostProcessing(*buffer);
799   }
800
801   //! Server side: Receive a message to do some post processing
802   void CContext::recvPostProcessing(CBufferIn& buffer)
803   {
804      CCalendarWrapper::get(CCalendarWrapper::GetDefName())->createCalendar();
805      postProcessing();
806   }
807
808   const StdString& CContext::getIdServer()
809   {
810      if (hasClient)
811      {
812        idServer_ = this->getId();
813        idServer_ += "_server";
814        return idServer_;
815      }
816      if (hasServer) return (this->getId());
817   }
818
819   /*!
820   \brief Do some simple post processings after parsing xml file
821      After the xml file (iodef.xml) is parsed, it is necessary to build all relations among
822   created object, e.g: inhertance among fields, domain, axis. After that, all fiels as well as their parents (reference fields),
823   which will be written out into netcdf files, are processed
824   */
825   void CContext::postProcessing()
826   {
827     int myRank;
828     MPI_Comm_rank(MPI_COMM_WORLD, &myRank);
829
830     if (isPostProcessed) return;
831
832      // Make sure the calendar was correctly created
833      if (!calendar)
834        ERROR("CContext::postProcessing()", << "A calendar must be defined for the context \"" << getId() << "!\"")
835      else if (calendar->getTimeStep() == NoneDu)
836        ERROR("CContext::postProcessing()", << "A timestep must be defined for the context \"" << getId() << "!\"")
837      // Calendar first update to set the current date equals to the start date
838      calendar->update(0);
839
840      // Find all inheritance in xml structure
841      this->solveAllInheritance();
842
843      // Check if some axis, domains or grids are eligible to for compressed indexed output.
844      // Warning: This must be done after solving the inheritance and before the rest of post-processing
845      checkAxisDomainsGridsEligibilityForCompressedOutput();
846
847      // Check if some automatic time series should be generated
848      // Warning: This must be done after solving the inheritance and before the rest of post-processing
849      prepareTimeseries();
850
851      //Initialisation du vecteur 'enabledFiles' contenant la liste des fichiers à sortir.
852      this->findEnabledFiles();
853      this->findEnabledReadModeFiles();
854
855      // Find all enabled fields of each file
856      this->findAllEnabledFields();
857      this->findAllEnabledFieldsInReadModeFiles();
858
859     if (hasClient && !hasServer)
860     {
861      // Try to read attributes of fields in file then fill in corresponding grid (or domain, axis)
862      this->readAttributesOfEnabledFieldsInReadModeFiles();
863     }
864
865      // Only search and rebuild all reference objects of enable fields, don't transform
866      this->solveOnlyRefOfEnabledFields(false);
867
868      // Search and rebuild all reference object of enabled fields
869      this->solveAllRefOfEnabledFields(false);
870
871      // Find all fields with read access from the public API
872      findFieldsWithReadAccess();
873      // and solve the all reference for them
874      solveAllRefOfFieldsWithReadAccess();
875
876      isPostProcessed = true;
877   }
878
879   /*!
880    * Compute the required buffer size to send the attributes (mostly those grid related).
881    *
882    * \param maxEventSize [in/out] the size of the bigger event for each connected server
883    */
884   std::map<int, StdSize> CContext::getAttributesBufferSize(std::map<int, StdSize>& maxEventSize)
885   {
886     std::map<int, StdSize> attributesSize;
887
888     if (hasClient)
889     {
890       size_t numEnabledFiles = this->enabledFiles.size();
891       for (size_t i = 0; i < numEnabledFiles; ++i)
892       {
893         CFile* file = this->enabledFiles[i];
894
895         std::vector<CField*> enabledFields = file->getEnabledFields();
896         size_t numEnabledFields = enabledFields.size();
897         for (size_t j = 0; j < numEnabledFields; ++j)
898         {
899           const std::map<int, StdSize> mapSize = enabledFields[j]->getGridAttributesBufferSize();
900           std::map<int, StdSize>::const_iterator it = mapSize.begin(), itE = mapSize.end();
901           for (; it != itE; ++it)
902           {
903             // If attributesSize[it->first] does not exist, it will be zero-initialized
904             // so we can use it safely without checking for its existance
905             if (attributesSize[it->first] < it->second)
906               attributesSize[it->first] = it->second;
907
908             if (maxEventSize[it->first] < it->second)
909               maxEventSize[it->first] = it->second;
910           }
911         }
912       }
913     }
914
915     return attributesSize;
916   }
917
918   /*!
919    * Compute the required buffer size to send the fields data.
920    *
921    * \param maxEventSize [in/out] the size of the bigger event for each connected server
922    */
923   std::map<int, StdSize> CContext::getDataBufferSize(std::map<int, StdSize>& maxEventSize)
924   {
925     CFile::mode_attr::t_enum mode = hasClient ? CFile::mode_attr::write : CFile::mode_attr::read;
926
927     std::map<int, StdSize> dataSize;
928
929     // Find all reference domain and axis of all active fields
930     size_t numEnabledFiles = this->enabledFiles.size();
931     for (size_t i = 0; i < numEnabledFiles; ++i)
932     {
933       CFile* file = this->enabledFiles[i];
934       CFile::mode_attr::t_enum fileMode = file->mode.isEmpty() ? CFile::mode_attr::write : file->mode.getValue();
935
936       if (fileMode == mode)
937       {
938         std::vector<CField*> enabledFields = file->getEnabledFields();
939         size_t numEnabledFields = enabledFields.size();
940         for (size_t j = 0; j < numEnabledFields; ++j)
941         {
942           const std::map<int, StdSize> mapSize = enabledFields[j]->getGridDataBufferSize();
943           std::map<int, StdSize>::const_iterator it = mapSize.begin(), itE = mapSize.end();
944           for (; it != itE; ++it)
945           {
946             // If dataSize[it->first] does not exist, it will be zero-initialized
947             // so we can use it safely without checking for its existance
948             if (CXios::isOptPerformance)
949               dataSize[it->first] += it->second;
950             else if (dataSize[it->first] < it->second)
951               dataSize[it->first] = it->second;
952
953             if (maxEventSize[it->first] < it->second)
954               maxEventSize[it->first] = it->second;
955           }
956         }
957       }
958     }
959
960     return dataSize;
961   }
962
963   //! Client side: Send infomation of active files (files are enabled to write out)
964   void CContext::sendEnabledFiles()
965   {
966     int size = this->enabledFiles.size();
967
968     // In a context, each type has a root definition, e.g: axis, domain, field.
969     // Every object must be a child of one of these root definition. In this case
970     // all new file objects created on server must be children of the root "file_definition"
971     StdString fileDefRoot("file_definition");
972     CFileGroup* cfgrpPtr = CFileGroup::get(fileDefRoot);
973
974     for (int i = 0; i < size; ++i)
975     {
976       cfgrpPtr->sendCreateChild(this->enabledFiles[i]->getId());
977       this->enabledFiles[i]->sendAllAttributesToServer();
978       this->enabledFiles[i]->sendAddAllVariables();
979     }
980   }
981
982   //! Client side: Send information of active fields (ones are written onto files)
983   void CContext::sendEnabledFields()
984   {
985     int size = this->enabledFiles.size();
986     for (int i = 0; i < size; ++i)
987     {
988       this->enabledFiles[i]->sendEnabledFields();
989     }
990   }
991
992   //! Client side: Check if the defined axis, domains and grids are eligible for compressed indexed output
993   void CContext::checkAxisDomainsGridsEligibilityForCompressedOutput()
994   {
995     if (!hasClient) return;
996
997     const vector<CAxis*> allAxis = CAxis::getAll();
998     for (vector<CAxis*>::const_iterator it = allAxis.begin(); it != allAxis.end(); it++)
999       (*it)->checkEligibilityForCompressedOutput();
1000
1001     const vector<CDomain*> allDomains = CDomain::getAll();
1002     for (vector<CDomain*>::const_iterator it = allDomains.begin(); it != allDomains.end(); it++)
1003       (*it)->checkEligibilityForCompressedOutput();
1004
1005     const vector<CGrid*> allGrids = CGrid::getAll();
1006     for (vector<CGrid*>::const_iterator it = allGrids.begin(); it != allGrids.end(); it++)
1007       (*it)->checkEligibilityForCompressedOutput();
1008   }
1009
1010   //! Client side: Prepare the timeseries by adding the necessary files
1011   void CContext::prepareTimeseries()
1012   {
1013     if (!hasClient) return;
1014
1015     const std::vector<CFile*> allFiles = CFile::getAll();
1016     for (size_t i = 0; i < allFiles.size(); i++)
1017     {
1018       CFile* file = allFiles[i];
1019
1020       std::vector<CVariable*> fileVars, fieldVars, vars = file->getAllVariables();
1021       for (size_t k = 0; k < vars.size(); k++)
1022       {
1023         CVariable* var = vars[k];
1024
1025         if (var->ts_target.isEmpty()
1026              || var->ts_target == CVariable::ts_target_attr::file || var->ts_target == CVariable::ts_target_attr::both)
1027           fileVars.push_back(var);
1028
1029         if (!var->ts_target.isEmpty()
1030              && (var->ts_target == CVariable::ts_target_attr::field || var->ts_target == CVariable::ts_target_attr::both))
1031           fieldVars.push_back(var);
1032       }
1033
1034       if (!file->timeseries.isEmpty() && file->timeseries != CFile::timeseries_attr::none)
1035       {
1036         StdString fileNameStr("%file_name%") ;
1037         StdString tsPrefix = !file->ts_prefix.isEmpty() ? file->ts_prefix : fileNameStr ;
1038         
1039         StdString fileName=file->getFileOutputName();
1040         size_t pos=tsPrefix.find(fileNameStr) ;
1041         while (pos!=std::string::npos)
1042         {
1043           tsPrefix=tsPrefix.replace(pos,fileNameStr.size(),fileName) ;
1044           pos=tsPrefix.find(fileNameStr) ;
1045         }
1046       
1047         const std::vector<CField*> allFields = file->getAllFields();
1048         for (size_t j = 0; j < allFields.size(); j++)
1049         {
1050           CField* field = allFields[j];
1051
1052           if (!field->ts_enabled.isEmpty() && field->ts_enabled)
1053           {
1054             CFile* tsFile = CFile::create();
1055             tsFile->duplicateAttributes(file);
1056
1057             // Add variables originating from file and targeted to timeserie file
1058             for (size_t k = 0; k < fileVars.size(); k++)
1059               tsFile->getVirtualVariableGroup()->addChild(fileVars[k]);
1060
1061           
1062             tsFile->name = tsPrefix + "_";
1063             if (!field->name.isEmpty())
1064               tsFile->name.get() += field->name;
1065             else if (field->hasDirectFieldReference()) // We cannot use getBaseFieldReference() just yet
1066               tsFile->name.get() += field->field_ref;
1067             else
1068               tsFile->name.get() += field->getId();
1069
1070             if (!field->ts_split_freq.isEmpty())
1071               tsFile->split_freq = field->ts_split_freq;
1072
1073             CField* tsField = tsFile->addField();
1074             tsField->field_ref = field->getId();
1075
1076             // Add variables originating from file and targeted to timeserie field
1077             for (size_t k = 0; k < fieldVars.size(); k++)
1078               tsField->getVirtualVariableGroup()->addChild(fieldVars[k]);
1079
1080             vars = field->getAllVariables();
1081             for (size_t k = 0; k < vars.size(); k++)
1082             {
1083               CVariable* var = vars[k];
1084
1085               // Add variables originating from field and targeted to timeserie field
1086               if (var->ts_target.isEmpty()
1087                    || var->ts_target == CVariable::ts_target_attr::field || var->ts_target == CVariable::ts_target_attr::both)
1088                 tsField->getVirtualVariableGroup()->addChild(var);
1089
1090               // Add variables originating from field and targeted to timeserie file
1091               if (!var->ts_target.isEmpty()
1092                    && (var->ts_target == CVariable::ts_target_attr::file || var->ts_target == CVariable::ts_target_attr::both))
1093                 tsFile->getVirtualVariableGroup()->addChild(var);
1094             }
1095
1096             tsFile->solveFieldRefInheritance(true);
1097
1098             if (file->timeseries == CFile::timeseries_attr::exclusive)
1099               field->enabled = false;
1100           }
1101         }
1102
1103         // Finally disable the original file is need be
1104         if (file->timeseries == CFile::timeseries_attr::only)
1105          file->enabled = false;
1106       }
1107     }
1108   }
1109
1110   //! Client side: Send information of reference grid of active fields
1111   void CContext::sendRefGrid()
1112   {
1113     std::set<StdString> gridIds;
1114     int sizeFile = this->enabledFiles.size();
1115     CFile* filePtr(NULL);
1116
1117     // Firstly, find all reference grids of all active fields
1118     for (int i = 0; i < sizeFile; ++i)
1119     {
1120       filePtr = this->enabledFiles[i];
1121       std::vector<CField*> enabledFields = filePtr->getEnabledFields();
1122       int sizeField = enabledFields.size();
1123       for (int numField = 0; numField < sizeField; ++numField)
1124       {
1125         if (0 != enabledFields[numField]->getRelGrid())
1126           gridIds.insert(CGrid::get(enabledFields[numField]->getRelGrid())->getId());
1127       }
1128     }
1129
1130     // Create all reference grids on server side
1131     StdString gridDefRoot("grid_definition");
1132     CGridGroup* gridPtr = CGridGroup::get(gridDefRoot);
1133     std::set<StdString>::const_iterator it, itE = gridIds.end();
1134     for (it = gridIds.begin(); it != itE; ++it)
1135     {
1136       gridPtr->sendCreateChild(*it);
1137       CGrid::get(*it)->sendAllAttributesToServer();
1138       CGrid::get(*it)->sendAllDomains();
1139       CGrid::get(*it)->sendAllAxis();
1140       CGrid::get(*it)->sendAllScalars();
1141     }
1142   }
1143
1144
1145   //! Client side: Send information of reference domain and axis of active fields
1146   void CContext::sendRefDomainsAxis()
1147   {
1148     std::set<StdString> domainIds, axisIds, scalarIds;
1149
1150     // Find all reference domain and axis of all active fields
1151     int numEnabledFiles = this->enabledFiles.size();
1152     for (int i = 0; i < numEnabledFiles; ++i)
1153     {
1154       std::vector<CField*> enabledFields = this->enabledFiles[i]->getEnabledFields();
1155       int numEnabledFields = enabledFields.size();
1156       for (int j = 0; j < numEnabledFields; ++j)
1157       {
1158         const std::vector<StdString>& prDomAxisScalarId = enabledFields[j]->getRefDomainAxisIds();
1159         if ("" != prDomAxisScalarId[0]) domainIds.insert(prDomAxisScalarId[0]);
1160         if ("" != prDomAxisScalarId[1]) axisIds.insert(prDomAxisScalarId[1]);
1161         if ("" != prDomAxisScalarId[2]) scalarIds.insert(prDomAxisScalarId[2]);
1162       }
1163     }
1164
1165     // Create all reference axis on server side
1166     std::set<StdString>::iterator itDom, itAxis, itScalar;
1167     std::set<StdString>::const_iterator itE;
1168
1169     StdString scalarDefRoot("scalar_definition");
1170     CScalarGroup* scalarPtr = CScalarGroup::get(scalarDefRoot);
1171     itE = scalarIds.end();
1172     for (itScalar = scalarIds.begin(); itScalar != itE; ++itScalar)
1173     {
1174       if (!itScalar->empty())
1175       {
1176         scalarPtr->sendCreateChild(*itScalar);
1177         CScalar::get(*itScalar)->sendAllAttributesToServer();
1178       }
1179     }
1180
1181     StdString axiDefRoot("axis_definition");
1182     CAxisGroup* axisPtr = CAxisGroup::get(axiDefRoot);
1183     itE = axisIds.end();
1184     for (itAxis = axisIds.begin(); itAxis != itE; ++itAxis)
1185     {
1186       if (!itAxis->empty())
1187       {
1188         axisPtr->sendCreateChild(*itAxis);
1189         CAxis::get(*itAxis)->sendAllAttributesToServer();
1190       }
1191     }
1192
1193     // Create all reference domains on server side
1194     StdString domDefRoot("domain_definition");
1195     CDomainGroup* domPtr = CDomainGroup::get(domDefRoot);
1196     itE = domainIds.end();
1197     for (itDom = domainIds.begin(); itDom != itE; ++itDom)
1198     {
1199       if (!itDom->empty()) {
1200          domPtr->sendCreateChild(*itDom);
1201          CDomain::get(*itDom)->sendAllAttributesToServer();
1202       }
1203     }
1204   }
1205
1206   //! Update calendar in each time step
1207   void CContext::updateCalendar(int step)
1208   {
1209      calendar->update(step);
1210
1211      if (hasClient)
1212      {
1213        checkPrefetchingOfEnabledReadModeFiles();
1214        garbageCollector.invalidate(calendar->getCurrentDate());
1215      }
1216   }
1217
1218   //! Server side: Create header of netcdf file
1219   void CContext::createFileHeader(void )
1220   {
1221      vector<CFile*>::const_iterator it;
1222
1223      for (it=enabledFiles.begin(); it != enabledFiles.end(); it++)
1224      {
1225         (*it)->initFile();
1226      }
1227   }
1228
1229   //! Get current context
1230   CContext* CContext::getCurrent(void)
1231   {
1232     return CObjectFactory::GetObject<CContext>(CObjectFactory::GetCurrentContextId()).get();
1233   }
1234
1235   /*!
1236   \brief Set context with an id be the current context
1237   \param [in] id identity of context to be set to current
1238   */
1239   void CContext::setCurrent(const string& id)
1240   {
1241     CObjectFactory::SetCurrentContextId(id);
1242     CGroupFactory::SetCurrentContextId(id);
1243   }
1244
1245  /*!
1246  \brief Create a context with specific id
1247  \param [in] id identity of new context
1248  \return pointer to the new context or already-existed one with identity id
1249  */
1250  CContext* CContext::create(const StdString& id)
1251  {
1252    CContext::setCurrent(id);
1253
1254    bool hasctxt = CContext::has(id);
1255    CContext* context = CObjectFactory::CreateObject<CContext>(id).get();
1256    getRoot();
1257    //if (!hasctxt) CGroupFactory::AddChild(root, context->getShared());
1258    if (!hasctxt) CGroupFactory::AddChild(*root_ptr, context->getShared());
1259
1260#define DECLARE_NODE(Name_, name_) \
1261    C##Name_##Definition::create(C##Name_##Definition::GetDefName());
1262#define DECLARE_NODE_PAR(Name_, name_)
1263#include "node_type.conf"
1264
1265    return (context);
1266  }
1267
1268
1269
1270     //! Server side: Receive a message to do some post processing
1271  void CContext::recvRegistry(CEventServer& event)
1272  {
1273    CBufferIn* buffer=event.subEvents.begin()->buffer;
1274    string id;
1275    *buffer>>id;
1276    get(id)->recvRegistry(*buffer);
1277  }
1278
1279  void CContext::recvRegistry(CBufferIn& buffer)
1280  {
1281    if (server->intraCommRank==0)
1282    {
1283      CRegistry registry(server->intraComm) ;
1284      registry.fromBuffer(buffer) ;
1285      registryOut->mergeRegistry(registry) ;
1286    }
1287  }
1288
1289  void CContext::sendRegistry(void)
1290  {
1291    registryOut->hierarchicalGatherRegistry() ;
1292
1293    CEventClient event(CContext::GetType(), CContext::EVENT_ID_SEND_REGISTRY);
1294    if (client->isServerLeader())
1295    {
1296       CMessage msg ;
1297       msg<<this->getIdServer();
1298       if (client->clientRank==0) msg<<*registryOut ;
1299       const std::list<int>& ranks = client->getRanksServerLeader();
1300       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
1301         event.push(*itRank,1,msg);
1302       client->sendEvent(event);
1303     }
1304     else client->sendEvent(event);
1305  }
1306
1307} // namespace xios
Note: See TracBrowser for help on using the repository browser.