source: XIOS/dev/branch_yushan/src/node/context.cpp @ 1037

Last change on this file since 1037 was 1037, checked in by yushan, 7 years ago

initialize the branch

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
File size: 42.2 KB
Line 
1#include "context.hpp"
2#include "attribute_template.hpp"
3#include "object_template.hpp"
4#include "group_template.hpp"
5
6#include "calendar_type.hpp"
7#include "duration.hpp"
8
9#include "context_client.hpp"
10#include "context_server.hpp"
11#include "nc4_data_output.hpp"
12#include "node_type.hpp"
13#include "message.hpp"
14#include "type.hpp"
15#include "xios_spl.hpp"
16
17
18namespace xios {
19
20  shared_ptr<CContextGroup> CContext::root;
21
22   /// ////////////////////// Définitions ////////////////////// ///
23
24   CContext::CContext(void)
25      : CObjectTemplate<CContext>(), CContextAttributes()
26      , calendar(), hasClient(false), hasServer(false), isPostProcessed(false), finalized(false)
27      , idServer_(), client(0), server(0)
28   { /* Ne rien faire de plus */ }
29
30   CContext::CContext(const StdString & id)
31      : CObjectTemplate<CContext>(id), CContextAttributes()
32      , calendar(), hasClient(false), hasServer(false), isPostProcessed(false), finalized(false)
33      , idServer_(), client(0), server(0)
34   { /* Ne rien faire de plus */ }
35
36   CContext::~CContext(void)
37   {
38     delete client;
39     delete server;
40   }
41
42   //----------------------------------------------------------------
43   //! Get name of context
44   StdString CContext::GetName(void)   { return (StdString("context")); }
45   StdString CContext::GetDefName(void){ return (CContext::GetName()); }
46   ENodeType CContext::GetType(void)   { return (eContext); }
47
48   //----------------------------------------------------------------
49
50   /*!
51   \brief Get context group (context root)
52   \return Context root
53   */
54   CContextGroup* CContext::getRoot(void)
55   {
56      if (root.get()==NULL) root=shared_ptr<CContextGroup>(new CContextGroup(xml::CXMLNode::GetRootName()));
57      return root.get();
58   }
59
60   //----------------------------------------------------------------
61
62   /*!
63   \brief Get calendar of a context
64   \return Calendar
65   */
66   boost::shared_ptr<CCalendar> CContext::getCalendar(void) const
67   {
68      return (this->calendar);
69   }
70
71   //----------------------------------------------------------------
72
73   /*!
74   \brief Set a context with a calendar
75   \param[in] newCalendar new calendar
76   */
77   void CContext::setCalendar(boost::shared_ptr<CCalendar> newCalendar)
78   {
79      this->calendar = newCalendar;
80   }
81
82   //----------------------------------------------------------------
83   /*!
84   \brief Parse xml file and write information into context object
85   \param [in] node xmld node corresponding in xml file
86   */
87   void CContext::parse(xml::CXMLNode & node)
88   {
89      CContext::SuperClass::parse(node);
90
91      // PARSING POUR GESTION DES ENFANTS
92      xml::THashAttributes attributes = node.getAttributes();
93
94      if (attributes.end() != attributes.find("src"))
95      {
96         StdIFStream ifs ( attributes["src"].c_str() , StdIFStream::in );
97         if ( (ifs.rdstate() & std::ifstream::failbit ) != 0 )
98            ERROR("void CContext::parse(xml::CXMLNode & node)",
99                  <<endl<< "Can not open <"<<attributes["src"].c_str()<<"> file" );
100         if (!ifs.good())
101            ERROR("CContext::parse(xml::CXMLNode & node)",
102                  << "[ filename = " << attributes["src"] << " ] Bad xml stream !");
103         xml::CXMLParser::ParseInclude(ifs, attributes["src"], *this);
104      }
105
106      if (node.getElementName().compare(CContext::GetName()))
107         DEBUG("Le noeud is wrong defined but will be considered as a context !");
108
109      if (!(node.goToChildElement()))
110      {
111         DEBUG("Le context ne contient pas d'enfant !");
112      }
113      else
114      {
115         do { // Parcours des contextes pour traitement.
116
117            StdString name = node.getElementName();
118            attributes.clear();
119            attributes = node.getAttributes();
120
121            if (attributes.end() != attributes.find("id"))
122            {
123              DEBUG(<< "Definition node has an id,"
124                    << "it will not be taking account !");
125            }
126
127#define DECLARE_NODE(Name_, name_)    \
128   if (name.compare(C##Name_##Definition::GetDefName()) == 0) \
129   { C##Name_##Definition::create(C##Name_##Definition::GetDefName()) -> parse(node); continue; }
130#define DECLARE_NODE_PAR(Name_, name_)
131#include "node_type.conf"
132
133            DEBUG(<< "The element \'"     << name
134                  << "\' in the context \'" << CContext::getCurrent()->getId()
135                  << "\' is not a definition !");
136
137         } while (node.goToNextElement());
138
139         node.goToParentElement(); // Retour au parent
140      }
141   }
142
143   //----------------------------------------------------------------
144   //! Show tree structure of context
145   void CContext::ShowTree(StdOStream & out)
146   {
147      StdString currentContextId = CContext::getCurrent() -> getId();
148      std::vector<CContext*> def_vector =
149         CContext::getRoot()->getChildList();
150      std::vector<CContext*>::iterator
151         it = def_vector.begin(), end = def_vector.end();
152
153      out << "<? xml version=\"1.0\" ?>" << std::endl;
154      out << "<"  << xml::CXMLNode::GetRootName() << " >" << std::endl;
155
156      for (; it != end; it++)
157      {
158         CContext* context = *it;
159         CContext::setCurrent(context->getId());
160         out << *context << std::endl;
161      }
162
163      out << "</" << xml::CXMLNode::GetRootName() << " >" << std::endl;
164      CContext::setCurrent(currentContextId);
165   }
166
167
168   //----------------------------------------------------------------
169
170   //! Convert context object into string (to print)
171   StdString CContext::toString(void) const
172   {
173      StdOStringStream oss;
174      oss << "<" << CContext::GetName()
175          << " id=\"" << this->getId() << "\" "
176          << SuperClassAttribute::toString() << ">" << std::endl;
177      if (!this->hasChild())
178      {
179         //oss << "<!-- No definition -->" << std::endl; // fait planter l'incrémentation
180      }
181      else
182      {
183
184#define DECLARE_NODE(Name_, name_)    \
185   if (C##Name_##Definition::has(C##Name_##Definition::GetDefName())) \
186   oss << * C##Name_##Definition::get(C##Name_##Definition::GetDefName()) << std::endl;
187#define DECLARE_NODE_PAR(Name_, name_)
188#include "node_type.conf"
189
190      }
191
192      oss << "</" << CContext::GetName() << " >";
193
194      return (oss.str());
195   }
196
197   //----------------------------------------------------------------
198
199   /*!
200   \brief Find all inheritace among objects in a context.
201   \param [in] apply (true) write attributes of parent into ones of child if they are empty
202                     (false) write attributes of parent into a new container of child
203   \param [in] parent unused
204   */
205   void CContext::solveDescInheritance(bool apply, const CAttributeMap * const UNUSED(parent))
206   {
207#define DECLARE_NODE(Name_, name_)    \
208   if (C##Name_##Definition::has(C##Name_##Definition::GetDefName())) \
209     C##Name_##Definition::get(C##Name_##Definition::GetDefName())->solveDescInheritance(apply);
210#define DECLARE_NODE_PAR(Name_, name_)
211#include "node_type.conf"
212   }
213
214   //----------------------------------------------------------------
215
216   //! Verify if all root definition in the context have child.
217   bool CContext::hasChild(void) const
218   {
219      return (
220#define DECLARE_NODE(Name_, name_)    \
221   C##Name_##Definition::has(C##Name_##Definition::GetDefName())   ||
222#define DECLARE_NODE_PAR(Name_, name_)
223#include "node_type.conf"
224      false);
225}
226
227   //----------------------------------------------------------------
228
229   void CContext::CleanTree(void)
230   {
231#define DECLARE_NODE(Name_, name_) C##Name_##Definition::ClearAllAttributes();
232#define DECLARE_NODE_PAR(Name_, name_)
233#include "node_type.conf"
234   }
235   ///---------------------------------------------------------------
236
237   //! Initialize client side
238   void CContext::initClient(MPI_Comm intraComm, MPI_Comm interComm, CContext* cxtServer /*= 0*/)
239   {
240     hasClient=true;
241     client = new CContextClient(this,intraComm, interComm, cxtServer);
242     registryIn=new CRegistry(intraComm);
243     registryIn->setPath(getId()) ;
244     if (client->clientRank==0) registryIn->fromFile("xios_registry.bin") ;
245     registryIn->bcastRegistry() ;
246
247     registryOut=new CRegistry(intraComm) ;
248     registryOut->setPath(getId()) ;
249
250     MPI_Comm intraCommServer, interCommServer;
251     if (cxtServer) // Attached mode
252     {
253       intraCommServer = intraComm;
254       interCommServer = interComm;
255     }
256     else
257     {
258       MPI_Comm_dup(intraComm, &intraCommServer);
259       comms.push_back(intraCommServer);
260       MPI_Comm_dup(interComm, &interCommServer);
261       comms.push_back(interCommServer);
262     }
263     server = new CContextServer(this,intraCommServer,interCommServer);
264   }
265
266   void CContext::setClientServerBuffer()
267   {
268     size_t minBufferSize = CXios::minBufferSize;
269#define DECLARE_NODE(Name_, name_)    \
270     if (minBufferSize < sizeof(C##Name_##Definition)) minBufferSize = sizeof(C##Name_##Definition);
271#define DECLARE_NODE_PAR(Name_, name_)
272#include "node_type.conf"
273#undef DECLARE_NODE
274#undef DECLARE_NODE_PAR
275
276     std::map<int, StdSize> maxEventSize;
277     std::map<int, StdSize> bufferSize = getAttributesBufferSize(maxEventSize);
278     std::map<int, StdSize> dataBufferSize = getDataBufferSize(maxEventSize);
279
280     std::map<int, StdSize>::iterator it, ite = dataBufferSize.end();
281     for (it = dataBufferSize.begin(); it != ite; ++it)
282       if (it->second > bufferSize[it->first]) bufferSize[it->first] = it->second;
283
284     ite = bufferSize.end();
285     for (it = bufferSize.begin(); it != ite; ++it)
286     {
287       it->second *= CXios::bufferSizeFactor;
288       if (it->second < minBufferSize) it->second = minBufferSize;
289     }
290
291     // We consider that the minimum buffer size is also the minimum event size
292     ite = maxEventSize.end();
293     for (it = maxEventSize.begin(); it != ite; ++it)
294       if (it->second < minBufferSize) it->second = minBufferSize;
295
296     if (client->isServerLeader())
297     {
298       const std::list<int>& ranks = client->getRanksServerLeader();
299       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
300         if (!bufferSize.count(*itRank)) bufferSize[*itRank] = maxEventSize[*itRank] = minBufferSize;
301     }
302
303     client->setBufferSize(bufferSize, maxEventSize);
304   }
305
306   //! Verify whether a context is initialized
307   bool CContext::isInitialized(void)
308   {
309     return hasClient;
310   }
311
312   //! Initialize server
313   void CContext::initServer(MPI_Comm intraComm,MPI_Comm interComm, CContext* cxtClient /*= 0*/)
314   {
315     hasServer=true;
316     server = new CContextServer(this,intraComm,interComm);
317
318     registryIn=new CRegistry(intraComm);
319     registryIn->setPath(getId()) ;
320     if (server->intraCommRank==0) registryIn->fromFile("xios_registry.bin") ;
321     registryIn->bcastRegistry() ;
322     registryOut=new CRegistry(intraComm) ;
323     registryOut->setPath(getId()) ;
324
325     MPI_Comm intraCommClient, interCommClient;
326     if (cxtClient) // Attached mode
327     {
328       intraCommClient = intraComm;
329       interCommClient = interComm;
330     }
331     else
332     {
333       MPI_Comm_dup(intraComm, &intraCommClient);
334       comms.push_back(intraCommClient);
335       MPI_Comm_dup(interComm, &interCommClient);
336       comms.push_back(interCommClient);
337     }
338     client = new CContextClient(this,intraCommClient,interCommClient, cxtClient);
339   }
340
341   //! Server side: Put server into a loop in order to listen message from client
342   bool CContext::eventLoop(void)
343   {
344     return server->eventLoop();
345   }
346
347   //! Try to send the buffers and receive possible answers
348   bool CContext::checkBuffersAndListen(void)
349   {
350     client->checkBuffers();
351     return server->eventLoop();
352   }
353
354   //! Terminate a context
355   void CContext::finalize(void)
356   {
357      if (!finalized)
358      {
359        finalized = true;
360        if (hasClient) sendRegistry() ;
361        client->finalize();
362        while (!server->hasFinished())
363        {
364          server->eventLoop();
365        }
366
367        if (hasServer)
368        {
369          closeAllFile();
370          registryOut->hierarchicalGatherRegistry() ;
371          if (server->intraCommRank==0) CXios::globalRegistry->mergeRegistry(*registryOut) ;
372        }
373
374        for (std::list<MPI_Comm>::iterator it = comms.begin(); it != comms.end(); ++it)
375          MPI_Comm_free(&(*it));
376        comms.clear();
377      }
378   }
379
380   /*!
381   \brief Close all the context defintion and do processing data
382      After everything is well defined on client side, they will be processed and sent to server
383   From the version 2.0, sever and client work no more on the same database. Moreover, client(s) will send
384   all necessary information to server, from which each server can build its own database.
385   Because the role of server is to write out field data on a specific netcdf file,
386   the only information that it needs is the enabled files
387   and the active fields (fields will be written onto active files)
388   */
389   void CContext::closeDefinition(void)
390   {
391     int myRank;
392     MPI_Comm_rank(MPI_COMM_WORLD, &myRank);
393
394     //printf("myRank = %d, hasClient = %d, hasServer = %d\n", myRank, hasClient, hasServer);
395
396     // There is nothing client need to send to server
397     if (hasClient)
398     {
399       // After xml is parsed, there are some more works with post processing
400       postProcessing(); 
401       //printf("myRank = %d,                postProcessing OK\n", myRank);
402     }
403     setClientServerBuffer(); //printf("myRank = %d, setClientServerBuffer OK\n", myRank);
404
405     if (hasClient && !hasServer)
406     {
407      // Send all attributes of current context to server
408      this->sendAllAttributesToServer(); //printf("myRank = %d, this->sendAllAttributesToServer OK\n", myRank);
409
410      // Send all attributes of current calendar
411      CCalendarWrapper::get(CCalendarWrapper::GetDefName())->sendAllAttributesToServer();
412      //printf("myRank = %d, CCalendarWrapper::get(CCalendarWrapper::GetDefName())->sendAllAttributesToServer OK\n", myRank);
413
414      // We have enough information to send to server
415      // First of all, send all enabled files
416       sendEnabledFiles();  //printf("myRank = %d, sendEnabledFiles OK\n", myRank);
417
418      // Then, send all enabled fields
419       sendEnabledFields();  //printf("myRank = %d, sendEnabledFields OK\n", myRank);
420
421      // At last, we have all info of domain and axis, then send them
422       sendRefDomainsAxis();  //printf("myRank = %d, sendRefDomainsAxis OK\n", myRank);
423
424      // After that, send all grid (if any)
425       sendRefGrid(); //printf("myRank = %d, sendRefGrid OK\n", myRank);
426    }
427
428    // We have a xml tree on the server side and now, it should be also processed
429    if (hasClient && !hasServer) sendPostProcessing(); 
430
431    // There are some processings that should be done after all of above. For example: check mask or index
432    if (hasClient)
433    {
434      this->buildFilterGraphOfEnabledFields();  //printf("myRank = %d, buildFilterGraphOfEnabledFields OK\n", myRank);
435      buildFilterGraphOfFieldsWithReadAccess();  //printf("myRank = %d, buildFilterGraphOfFieldsWithReadAccess OK\n", myRank);
436      this->solveAllRefOfEnabledFields(true);  //printf("myRank = %d, solveAllRefOfEnabledFields OK\n", myRank);
437    }
438
439    // Now tell server that it can process all messages from client
440    if (hasClient && !hasServer) this->sendCloseDefinition();
441
442    // Nettoyage de l'arborescence
443    if (hasClient && !hasServer) CleanTree();
444
445    if (hasClient)
446    {
447      sendCreateFileHeader();  //printf("myRank = %d, sendCreateFileHeader OK\n", myRank);
448
449      startPrefetchingOfEnabledReadModeFiles();  //printf("myRank = %d, startPrefetchingOfEnabledReadModeFiles OK\n", myRank);
450    }
451   }
452
453   void CContext::findAllEnabledFields(void)
454   {
455     for (unsigned int i = 0; i < this->enabledFiles.size(); i++)
456     (void)this->enabledFiles[i]->getEnabledFields();
457   }
458
459   void CContext::findAllEnabledFieldsInReadModeFiles(void)
460   {
461     for (unsigned int i = 0; i < this->enabledReadModeFiles.size(); ++i)
462     (void)this->enabledReadModeFiles[i]->getEnabledFields();
463   }
464
465   void CContext::readAttributesOfEnabledFieldsInReadModeFiles()
466   {
467      for (unsigned int i = 0; i < this->enabledReadModeFiles.size(); ++i)
468        (void)this->enabledReadModeFiles[i]->readAttributesOfEnabledFieldsInReadMode();
469   }
470
471   void CContext::solveOnlyRefOfEnabledFields(bool sendToServer)
472   {
473     int size = this->enabledFiles.size();
474     for (int i = 0; i < size; ++i)
475     {
476       this->enabledFiles[i]->solveOnlyRefOfEnabledFields(sendToServer);
477     }
478
479     for (int i = 0; i < size; ++i)
480     {
481       this->enabledFiles[i]->generateNewTransformationGridDest();
482     }
483   }
484
485   void CContext::solveAllRefOfEnabledFields(bool sendToServer)
486   {
487     int size = this->enabledFiles.size();
488     
489     for (int i = 0; i < size; ++i)
490     {
491       this->enabledFiles[i]->solveAllRefOfEnabledFields(sendToServer);
492     }
493   }
494
495   void CContext::buildFilterGraphOfEnabledFields()
496   {
497     int size = this->enabledFiles.size();
498     for (int i = 0; i < size; ++i)
499     {
500       this->enabledFiles[i]->buildFilterGraphOfEnabledFields(garbageCollector);
501     }
502   }
503
504   void CContext::startPrefetchingOfEnabledReadModeFiles()
505   {
506     int size = enabledReadModeFiles.size();
507     for (int i = 0; i < size; ++i)
508     {
509        enabledReadModeFiles[i]->prefetchEnabledReadModeFields();
510     }
511   }
512
513   void CContext::checkPrefetchingOfEnabledReadModeFiles()
514   {
515     int size = enabledReadModeFiles.size();
516     for (int i = 0; i < size; ++i)
517     {
518        enabledReadModeFiles[i]->prefetchEnabledReadModeFieldsIfNeeded();
519     }
520   }
521
522  void CContext::findFieldsWithReadAccess(void)
523  {
524    fieldsWithReadAccess.clear();
525    const vector<CField*> allFields = CField::getAll();
526    for (size_t i = 0; i < allFields.size(); ++i)
527    {
528      CField* field = allFields[i];
529
530      if (field->file && !field->file->mode.isEmpty() && field->file->mode == CFile::mode_attr::read)
531        field->read_access = true;
532      else if (!field->read_access.isEmpty() && field->read_access && (field->enabled.isEmpty() || field->enabled))
533        fieldsWithReadAccess.push_back(field);
534    }
535  }
536
537  void CContext::solveAllRefOfFieldsWithReadAccess()
538  {
539    for (size_t i = 0; i < fieldsWithReadAccess.size(); ++i)
540      fieldsWithReadAccess[i]->solveAllReferenceEnabledField(false);
541  }
542
543  void CContext::buildFilterGraphOfFieldsWithReadAccess()
544  {
545    for (size_t i = 0; i < fieldsWithReadAccess.size(); ++i)
546      fieldsWithReadAccess[i]->buildFilterGraph(garbageCollector, true);
547  }
548
549   void CContext::solveAllInheritance(bool apply)
550   {
551     // Résolution des héritages descendants (càd des héritages de groupes)
552     // pour chacun des contextes.
553      solveDescInheritance(apply);
554
555     // Résolution des héritages par référence au niveau des fichiers.
556      const vector<CFile*> allFiles=CFile::getAll();
557      const vector<CGrid*> allGrids= CGrid::getAll();
558
559     //if (hasClient && !hasServer)
560      if (hasClient)
561      {
562        for (unsigned int i = 0; i < allFiles.size(); i++)
563          allFiles[i]->solveFieldRefInheritance(apply);
564      }
565
566      unsigned int vecSize = allGrids.size();
567      unsigned int i = 0;
568      for (i = 0; i < vecSize; ++i)
569        allGrids[i]->solveDomainAxisRefInheritance(apply);
570
571   }
572
573   void CContext::findEnabledFiles(void)
574   {
575      const std::vector<CFile*> allFiles = CFile::getAll();
576
577      for (unsigned int i = 0; i < allFiles.size(); i++)
578         if (!allFiles[i]->enabled.isEmpty()) // Si l'attribut 'enabled' est défini.
579         {
580            if (allFiles[i]->enabled.getValue()) // Si l'attribut 'enabled' est fixé à vrai.
581               enabledFiles.push_back(allFiles[i]);
582         }
583         else enabledFiles.push_back(allFiles[i]); // otherwise true by default
584
585
586      if (enabledFiles.size() == 0)
587         DEBUG(<<"Aucun fichier ne va être sorti dans le contexte nommé \""
588               << getId() << "\" !");
589   }
590
591   void CContext::findEnabledReadModeFiles(void)
592   {
593     int size = this->enabledFiles.size();
594     for (int i = 0; i < size; ++i)
595     {
596       if (!enabledFiles[i]->mode.isEmpty() && enabledFiles[i]->mode.getValue() == CFile::mode_attr::read)
597        enabledReadModeFiles.push_back(enabledFiles[i]);
598     }
599   }
600
601   void CContext::closeAllFile(void)
602   {
603     std::vector<CFile*>::const_iterator
604            it = this->enabledFiles.begin(), end = this->enabledFiles.end();
605
606     for (; it != end; it++)
607     {
608       info(30)<<"Closing File : "<<(*it)->getId()<<endl;
609       (*it)->close();
610     }
611   }
612
613   /*!
614   \brief Dispatch event received from client
615      Whenever a message is received in buffer of server, it will be processed depending on
616   its event type. A new event type should be added in the switch list to make sure
617   it processed on server side.
618   \param [in] event: Received message
619   */
620   bool CContext::dispatchEvent(CEventServer& event)
621   {
622
623      if (SuperClass::dispatchEvent(event)) return true;
624      else
625      {
626        switch(event.type)
627        {
628           case EVENT_ID_CLOSE_DEFINITION :
629             recvCloseDefinition(event);
630             return true;
631             break;
632           case EVENT_ID_UPDATE_CALENDAR:
633             recvUpdateCalendar(event);
634             return true;
635             break;
636           case EVENT_ID_CREATE_FILE_HEADER :
637             recvCreateFileHeader(event);
638             return true;
639             break;
640           case EVENT_ID_POST_PROCESS:
641             recvPostProcessing(event);
642             return true;
643            case EVENT_ID_SEND_REGISTRY:
644             recvRegistry(event);
645             return true;
646            break;
647
648           default :
649             ERROR("bool CContext::dispatchEvent(CEventServer& event)",
650                    <<"Unknown Event");
651           return false;
652         }
653      }
654   }
655
656   //! Client side: Send a message to server to make it close
657   void CContext::sendCloseDefinition(void)
658   {
659     CEventClient event(getType(),EVENT_ID_CLOSE_DEFINITION);
660     if (client->isServerLeader())
661     {
662       CMessage msg;
663       msg<<this->getIdServer();
664       const std::list<int>& ranks = client->getRanksServerLeader();
665       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
666         event.push(*itRank,1,msg);
667       client->sendEvent(event);
668     }
669     else client->sendEvent(event);
670   }
671
672   //! Server side: Receive a message of client announcing a context close
673   void CContext::recvCloseDefinition(CEventServer& event)
674   {
675
676      CBufferIn* buffer=event.subEvents.begin()->buffer;
677      string id;
678      *buffer>>id;
679      get(id)->closeDefinition();
680   }
681
682   //! Client side: Send a message to update calendar in each time step
683   void CContext::sendUpdateCalendar(int step)
684   {
685     if (!hasServer)
686     {
687       CEventClient event(getType(),EVENT_ID_UPDATE_CALENDAR);
688       if (client->isServerLeader())
689       {
690         CMessage msg;
691         msg<<this->getIdServer()<<step;
692         const std::list<int>& ranks = client->getRanksServerLeader();
693         for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
694           event.push(*itRank,1,msg);
695         client->sendEvent(event);
696       }
697       else client->sendEvent(event);
698     }
699   }
700
701   //! Server side: Receive a message of client annoucing calendar update
702   void CContext::recvUpdateCalendar(CEventServer& event)
703   {
704      CBufferIn* buffer=event.subEvents.begin()->buffer;
705      string id;
706      *buffer>>id;
707      get(id)->recvUpdateCalendar(*buffer);
708   }
709
710   //! Server side: Receive a message of client annoucing calendar update
711   void CContext::recvUpdateCalendar(CBufferIn& buffer)
712   {
713      int step;
714      buffer>>step;
715      updateCalendar(step);
716   }
717
718   //! Client side: Send a message to create header part of netcdf file
719   void CContext::sendCreateFileHeader(void)
720   {
721     CEventClient event(getType(),EVENT_ID_CREATE_FILE_HEADER);
722     if (client->isServerLeader())
723     {
724       CMessage msg;
725       msg<<this->getIdServer();
726       const std::list<int>& ranks = client->getRanksServerLeader();
727       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
728         event.push(*itRank,1,msg) ;
729       client->sendEvent(event);
730     }
731     else client->sendEvent(event);
732   }
733
734   //! Server side: Receive a message of client annoucing the creation of header part of netcdf file
735   void CContext::recvCreateFileHeader(CEventServer& event)
736   {
737      CBufferIn* buffer=event.subEvents.begin()->buffer;
738      string id;
739      *buffer>>id;
740      get(id)->recvCreateFileHeader(*buffer);
741   }
742
743   //! Server side: Receive a message of client annoucing the creation of header part of netcdf file
744   void CContext::recvCreateFileHeader(CBufferIn& buffer)
745   {
746      createFileHeader();
747   }
748
749   //! Client side: Send a message to do some post processing on server
750   void CContext::sendPostProcessing()
751   {
752     if (!hasServer)
753     {
754       CEventClient event(getType(),EVENT_ID_POST_PROCESS);
755       if (client->isServerLeader())
756       {
757         CMessage msg;
758         msg<<this->getIdServer();
759         const std::list<int>& ranks = client->getRanksServerLeader();
760         for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
761           event.push(*itRank,1,msg);
762         client->sendEvent(event);
763       }
764       else client->sendEvent(event);
765     }
766   }
767
768   //! Server side: Receive a message to do some post processing
769   void CContext::recvPostProcessing(CEventServer& event)
770   {
771      CBufferIn* buffer=event.subEvents.begin()->buffer;
772      string id;
773      *buffer>>id;
774      get(id)->recvPostProcessing(*buffer);
775   }
776
777   //! Server side: Receive a message to do some post processing
778   void CContext::recvPostProcessing(CBufferIn& buffer)
779   {
780      CCalendarWrapper::get(CCalendarWrapper::GetDefName())->createCalendar();
781      postProcessing();
782   }
783
784   const StdString& CContext::getIdServer()
785   {
786      if (hasClient)
787      {
788        idServer_ = this->getId();
789        idServer_ += "_server";
790        return idServer_;
791      }
792      if (hasServer) return (this->getId());
793   }
794
795   /*!
796   \brief Do some simple post processings after parsing xml file
797      After the xml file (iodef.xml) is parsed, it is necessary to build all relations among
798   created object, e.g: inhertance among fields, domain, axis. After that, all fiels as well as their parents (reference fields),
799   which will be written out into netcdf files, are processed
800   */
801   void CContext::postProcessing()
802   {
803     int myRank;
804     MPI_Comm_rank(MPI_COMM_WORLD, &myRank);
805
806     //printf("myRank = %d, in postProcessing, isPostProcessed = %d\n", myRank, isPostProcessed);
807     if (isPostProcessed) return;
808
809      // Make sure the calendar was correctly created
810      if (!calendar)
811        ERROR("CContext::postProcessing()", << "A calendar must be defined for the context \"" << getId() << "!\"")
812      else if (calendar->getTimeStep() == NoneDu)
813        ERROR("CContext::postProcessing()", << "A timestep must be defined for the context \"" << getId() << "!\"")
814      // Calendar first update to set the current date equals to the start date
815      calendar->update(0);  //printf("myRank = %d, calendar->update(0) OK\n", myRank);
816
817      // Find all inheritance in xml structure
818      this->solveAllInheritance();  //printf("myRank = %d, this->solveAllInheritance OK\n", myRank);
819
820      // Check if some axis, domains or grids are eligible to for compressed indexed output.
821      // Warning: This must be done after solving the inheritance and before the rest of post-processing
822      checkAxisDomainsGridsEligibilityForCompressedOutput();  //printf("myRank = %d, checkAxisDomainsGridsEligibilityForCompressedOutput OK\n", myRank);
823
824      // Check if some automatic time series should be generated
825      // Warning: This must be done after solving the inheritance and before the rest of post-processing
826      prepareTimeseries();  //printf("myRank = %d, prepareTimeseries OK\n", myRank);
827
828      //Initialisation du vecteur 'enabledFiles' contenant la liste des fichiers à sortir.
829      this->findEnabledFiles();  //printf("myRank = %d, this->findEnabledFiles OK\n", myRank);
830      this->findEnabledReadModeFiles();  //printf("myRank = %d, this->findEnabledReadModeFiles OK\n", myRank);
831
832      // Find all enabled fields of each file
833      this->findAllEnabledFields();  //printf("myRank = %d, this->findAllEnabledFields OK\n", myRank);
834      this->findAllEnabledFieldsInReadModeFiles();  //printf("myRank = %d, this->findAllEnabledFieldsInReadModeFiles OK\n", myRank);
835
836     if (hasClient && !hasServer)
837     {
838      // Try to read attributes of fields in file then fill in corresponding grid (or domain, axis)
839      this->readAttributesOfEnabledFieldsInReadModeFiles();  //printf("myRank = %d, this->readAttributesOfEnabledFieldsInReadModeFiles OK\n", myRank);
840     }
841
842      // Only search and rebuild all reference objects of enable fields, don't transform
843      this->solveOnlyRefOfEnabledFields(false);  //printf("myRank = %d, this->solveOnlyRefOfEnabledFields(false) OK\n", myRank);
844
845      // Search and rebuild all reference object of enabled fields
846      this->solveAllRefOfEnabledFields(false);  //printf("myRank = %d, this->solveAllRefOfEnabledFields(false) OK\n", myRank);
847
848      // Find all fields with read access from the public API
849      findFieldsWithReadAccess();  //printf("myRank = %d, findFieldsWithReadAccess OK\n", myRank);
850      // and solve the all reference for them
851      solveAllRefOfFieldsWithReadAccess();  //printf("myRank = %d, solveAllRefOfFieldsWithReadAccess OK\n", myRank);
852
853      isPostProcessed = true;
854   }
855
856   /*!
857    * Compute the required buffer size to send the attributes (mostly those grid related).
858    *
859    * \param maxEventSize [in/out] the size of the bigger event for each connected server
860    */
861   std::map<int, StdSize> CContext::getAttributesBufferSize(std::map<int, StdSize>& maxEventSize)
862   {
863     std::map<int, StdSize> attributesSize;
864
865     if (hasClient)
866     {
867       size_t numEnabledFiles = this->enabledFiles.size();
868       for (size_t i = 0; i < numEnabledFiles; ++i)
869       {
870         CFile* file = this->enabledFiles[i];
871
872         std::vector<CField*> enabledFields = file->getEnabledFields();
873         size_t numEnabledFields = enabledFields.size();
874         for (size_t j = 0; j < numEnabledFields; ++j)
875         {
876           const std::map<int, StdSize> mapSize = enabledFields[j]->getGridAttributesBufferSize();
877           std::map<int, StdSize>::const_iterator it = mapSize.begin(), itE = mapSize.end();
878           for (; it != itE; ++it)
879           {
880             // If attributesSize[it->first] does not exist, it will be zero-initialized
881             // so we can use it safely without checking for its existance
882             if (attributesSize[it->first] < it->second)
883               attributesSize[it->first] = it->second;
884
885             if (maxEventSize[it->first] < it->second)
886               maxEventSize[it->first] = it->second;
887           }
888         }
889       }
890     }
891
892     return attributesSize;
893   }
894
895   /*!
896    * Compute the required buffer size to send the fields data.
897    *
898    * \param maxEventSize [in/out] the size of the bigger event for each connected server
899    */
900   std::map<int, StdSize> CContext::getDataBufferSize(std::map<int, StdSize>& maxEventSize)
901   {
902     CFile::mode_attr::t_enum mode = hasClient ? CFile::mode_attr::write : CFile::mode_attr::read;
903
904     std::map<int, StdSize> dataSize;
905
906     // Find all reference domain and axis of all active fields
907     size_t numEnabledFiles = this->enabledFiles.size();
908     for (size_t i = 0; i < numEnabledFiles; ++i)
909     {
910       CFile* file = this->enabledFiles[i];
911       CFile::mode_attr::t_enum fileMode = file->mode.isEmpty() ? CFile::mode_attr::write : file->mode.getValue();
912
913       if (fileMode == mode)
914       {
915         std::vector<CField*> enabledFields = file->getEnabledFields();
916         size_t numEnabledFields = enabledFields.size();
917         for (size_t j = 0; j < numEnabledFields; ++j)
918         {
919           const std::map<int, StdSize> mapSize = enabledFields[j]->getGridDataBufferSize();
920           std::map<int, StdSize>::const_iterator it = mapSize.begin(), itE = mapSize.end();
921           for (; it != itE; ++it)
922           {
923             // If dataSize[it->first] does not exist, it will be zero-initialized
924             // so we can use it safely without checking for its existance
925             if (CXios::isOptPerformance)
926               dataSize[it->first] += it->second;
927             else if (dataSize[it->first] < it->second)
928               dataSize[it->first] = it->second;
929
930             if (maxEventSize[it->first] < it->second)
931               maxEventSize[it->first] = it->second;
932           }
933         }
934       }
935     }
936
937     return dataSize;
938   }
939
940   //! Client side: Send infomation of active files (files are enabled to write out)
941   void CContext::sendEnabledFiles()
942   {
943     int size = this->enabledFiles.size();
944
945     // In a context, each type has a root definition, e.g: axis, domain, field.
946     // Every object must be a child of one of these root definition. In this case
947     // all new file objects created on server must be children of the root "file_definition"
948     StdString fileDefRoot("file_definition");
949     CFileGroup* cfgrpPtr = CFileGroup::get(fileDefRoot);
950
951     for (int i = 0; i < size; ++i)
952     {
953       cfgrpPtr->sendCreateChild(this->enabledFiles[i]->getId());
954       this->enabledFiles[i]->sendAllAttributesToServer();
955       this->enabledFiles[i]->sendAddAllVariables();
956     }
957   }
958
959   //! Client side: Send information of active fields (ones are written onto files)
960   void CContext::sendEnabledFields()
961   {
962     int size = this->enabledFiles.size();
963     for (int i = 0; i < size; ++i)
964     {
965       this->enabledFiles[i]->sendEnabledFields();
966     }
967   }
968
969   //! Client side: Check if the defined axis, domains and grids are eligible for compressed indexed output
970   void CContext::checkAxisDomainsGridsEligibilityForCompressedOutput()
971   {
972     if (!hasClient) return;
973
974     const vector<CAxis*> allAxis = CAxis::getAll();
975     for (vector<CAxis*>::const_iterator it = allAxis.begin(); it != allAxis.end(); it++)
976       (*it)->checkEligibilityForCompressedOutput();
977
978     const vector<CDomain*> allDomains = CDomain::getAll();
979     for (vector<CDomain*>::const_iterator it = allDomains.begin(); it != allDomains.end(); it++)
980       (*it)->checkEligibilityForCompressedOutput();
981
982     const vector<CGrid*> allGrids = CGrid::getAll();
983     for (vector<CGrid*>::const_iterator it = allGrids.begin(); it != allGrids.end(); it++)
984       (*it)->checkEligibilityForCompressedOutput();
985   }
986
987   //! Client side: Prepare the timeseries by adding the necessary files
988   void CContext::prepareTimeseries()
989   {
990     if (!hasClient) return;
991
992     const std::vector<CFile*> allFiles = CFile::getAll();
993     for (size_t i = 0; i < allFiles.size(); i++)
994     {
995       CFile* file = allFiles[i];
996
997       if (!file->timeseries.isEmpty() && file->timeseries != CFile::timeseries_attr::none)
998       {
999         StdString tsPrefix = !file->ts_prefix.isEmpty() ? file->ts_prefix : file->getFileOutputName();
1000
1001         const std::vector<CField*> allFields = file->getAllFields();
1002         for (size_t j = 0; j < allFields.size(); j++)
1003         {
1004           CField* field = allFields[j];
1005
1006           if (!field->ts_enabled.isEmpty() && field->ts_enabled)
1007           {
1008             CFile* tsFile = CFile::create();
1009             tsFile->duplicateAttributes(file);
1010             tsFile->setVirtualVariableGroup(file->getVirtualVariableGroup());
1011
1012             tsFile->name = tsPrefix + "_";
1013             if (!field->name.isEmpty())
1014               tsFile->name.get() += field->name;
1015             else if (field->hasDirectFieldReference()) // We cannot use getBaseFieldReference() just yet
1016               tsFile->name.get() += field->field_ref;
1017             else
1018               tsFile->name.get() += field->getId();
1019
1020             if (!field->ts_split_freq.isEmpty())
1021               tsFile->split_freq = field->ts_split_freq;
1022
1023             CField* tsField = tsFile->addField();
1024             tsField->field_ref = field->getId();
1025             tsField->setVirtualVariableGroup(field->getVirtualVariableGroup());
1026
1027             tsFile->solveFieldRefInheritance(true);
1028
1029             if (file->timeseries == CFile::timeseries_attr::exclusive)
1030               field->enabled = false;
1031           }
1032         }
1033
1034         // Finally disable the original file is need be
1035         if (file->timeseries == CFile::timeseries_attr::only)
1036          file->enabled = false;
1037       }
1038     }
1039   }
1040
1041   //! Client side: Send information of reference grid of active fields
1042   void CContext::sendRefGrid()
1043   {
1044     std::set<StdString> gridIds;
1045     int sizeFile = this->enabledFiles.size();
1046     CFile* filePtr(NULL);
1047
1048     // Firstly, find all reference grids of all active fields
1049     for (int i = 0; i < sizeFile; ++i)
1050     {
1051       filePtr = this->enabledFiles[i];
1052       std::vector<CField*> enabledFields = filePtr->getEnabledFields();
1053       int sizeField = enabledFields.size();
1054       for (int numField = 0; numField < sizeField; ++numField)
1055       {
1056         if (0 != enabledFields[numField]->getRelGrid())
1057           gridIds.insert(CGrid::get(enabledFields[numField]->getRelGrid())->getId());
1058       }
1059     }
1060
1061     // Create all reference grids on server side
1062     StdString gridDefRoot("grid_definition");
1063     CGridGroup* gridPtr = CGridGroup::get(gridDefRoot);
1064     std::set<StdString>::const_iterator it, itE = gridIds.end();
1065     for (it = gridIds.begin(); it != itE; ++it)
1066     {
1067       gridPtr->sendCreateChild(*it);
1068       CGrid::get(*it)->sendAllAttributesToServer();
1069       CGrid::get(*it)->sendAllDomains();
1070       CGrid::get(*it)->sendAllAxis();
1071       CGrid::get(*it)->sendAllScalars();
1072     }
1073   }
1074
1075
1076   //! Client side: Send information of reference domain and axis of active fields
1077   void CContext::sendRefDomainsAxis()
1078   {
1079     std::set<StdString> domainIds, axisIds, scalarIds;
1080
1081     // Find all reference domain and axis of all active fields
1082     int numEnabledFiles = this->enabledFiles.size();
1083     for (int i = 0; i < numEnabledFiles; ++i)
1084     {
1085       std::vector<CField*> enabledFields = this->enabledFiles[i]->getEnabledFields();
1086       int numEnabledFields = enabledFields.size();
1087       for (int j = 0; j < numEnabledFields; ++j)
1088       {
1089         const std::vector<StdString>& prDomAxisScalarId = enabledFields[j]->getRefDomainAxisIds();
1090         if ("" != prDomAxisScalarId[0]) domainIds.insert(prDomAxisScalarId[0]);
1091         if ("" != prDomAxisScalarId[1]) axisIds.insert(prDomAxisScalarId[1]);
1092         if ("" != prDomAxisScalarId[2]) scalarIds.insert(prDomAxisScalarId[2]);
1093       }
1094     }
1095
1096     // Create all reference axis on server side
1097     std::set<StdString>::iterator itDom, itAxis, itScalar;
1098     std::set<StdString>::const_iterator itE;
1099
1100     StdString scalarDefRoot("scalar_definition");
1101     CScalarGroup* scalarPtr = CScalarGroup::get(scalarDefRoot);
1102     itE = scalarIds.end();
1103     for (itScalar = scalarIds.begin(); itScalar != itE; ++itScalar)
1104     {
1105       if (!itScalar->empty())
1106       {
1107         scalarPtr->sendCreateChild(*itScalar);
1108         CScalar::get(*itScalar)->sendAllAttributesToServer();
1109       }
1110     }
1111
1112     StdString axiDefRoot("axis_definition");
1113     CAxisGroup* axisPtr = CAxisGroup::get(axiDefRoot);
1114     itE = axisIds.end();
1115     for (itAxis = axisIds.begin(); itAxis != itE; ++itAxis)
1116     {
1117       if (!itAxis->empty())
1118       {
1119         axisPtr->sendCreateChild(*itAxis);
1120         CAxis::get(*itAxis)->sendAllAttributesToServer();
1121       }
1122     }
1123
1124     // Create all reference domains on server side
1125     StdString domDefRoot("domain_definition");
1126     CDomainGroup* domPtr = CDomainGroup::get(domDefRoot);
1127     itE = domainIds.end();
1128     for (itDom = domainIds.begin(); itDom != itE; ++itDom)
1129     {
1130       if (!itDom->empty()) {
1131          domPtr->sendCreateChild(*itDom);
1132          CDomain::get(*itDom)->sendAllAttributesToServer();
1133       }
1134     }
1135   }
1136
1137   //! Update calendar in each time step
1138   void CContext::updateCalendar(int step)
1139   {
1140      info(50) << "updateCalendar : before : " << calendar->getCurrentDate() << endl;
1141      calendar->update(step);
1142      info(50) << "updateCalendar : after : " << calendar->getCurrentDate() << endl;
1143
1144      if (hasClient)
1145      {
1146        checkPrefetchingOfEnabledReadModeFiles();
1147        garbageCollector.invalidate(calendar->getCurrentDate());
1148      }
1149   }
1150
1151   //! Server side: Create header of netcdf file
1152   void CContext::createFileHeader(void )
1153   {
1154      vector<CFile*>::const_iterator it;
1155
1156      for (it=enabledFiles.begin(); it != enabledFiles.end(); it++)
1157      {
1158         (*it)->initFile();
1159      }
1160   }
1161
1162   //! Get current context
1163   CContext* CContext::getCurrent(void)
1164   {
1165     return CObjectFactory::GetObject<CContext>(CObjectFactory::GetCurrentContextId()).get();
1166   }
1167
1168   /*!
1169   \brief Set context with an id be the current context
1170   \param [in] id identity of context to be set to current
1171   */
1172   void CContext::setCurrent(const string& id)
1173   {
1174     CObjectFactory::SetCurrentContextId(id);
1175     CGroupFactory::SetCurrentContextId(id);
1176   }
1177
1178  /*!
1179  \brief Create a context with specific id
1180  \param [in] id identity of new context
1181  \return pointer to the new context or already-existed one with identity id
1182  */
1183  CContext* CContext::create(const StdString& id)
1184  {
1185    CContext::setCurrent(id);
1186
1187    bool hasctxt = CContext::has(id);
1188    CContext* context = CObjectFactory::CreateObject<CContext>(id).get();
1189    getRoot();
1190    if (!hasctxt) CGroupFactory::AddChild(root, context->getShared());
1191
1192#define DECLARE_NODE(Name_, name_) \
1193    C##Name_##Definition::create(C##Name_##Definition::GetDefName());
1194#define DECLARE_NODE_PAR(Name_, name_)
1195#include "node_type.conf"
1196
1197    return (context);
1198  }
1199
1200
1201
1202     //! Server side: Receive a message to do some post processing
1203  void CContext::recvRegistry(CEventServer& event)
1204  {
1205    CBufferIn* buffer=event.subEvents.begin()->buffer;
1206    string id;
1207    *buffer>>id;
1208    get(id)->recvRegistry(*buffer);
1209  }
1210
1211  void CContext::recvRegistry(CBufferIn& buffer)
1212  {
1213    if (server->intraCommRank==0)
1214    {
1215      CRegistry registry(server->intraComm) ;
1216      registry.fromBuffer(buffer) ;
1217      registryOut->mergeRegistry(registry) ;
1218    }
1219  }
1220
1221  void CContext::sendRegistry(void)
1222  {
1223    registryOut->hierarchicalGatherRegistry() ;
1224
1225    CEventClient event(CContext::GetType(), CContext::EVENT_ID_SEND_REGISTRY);
1226    if (client->isServerLeader())
1227    {
1228       CMessage msg ;
1229       msg<<this->getIdServer();
1230       if (client->clientRank==0) msg<<*registryOut ;
1231       const std::list<int>& ranks = client->getRanksServerLeader();
1232       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
1233         event.push(*itRank,1,msg);
1234       client->sendEvent(event);
1235    }
1236    else client->sendEvent(event);
1237  }
1238
1239} // namespace xios
Note: See TracBrowser for help on using the repository browser.