source: XIOS/dev/branch_yushan/src/node/context.cpp @ 1081

Last change on this file since 1081 was 1081, checked in by yushan, 8 years ago

save current modifications

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
File size: 41.8 KB
Line 
1#include "context.hpp"
2#include "attribute_template.hpp"
3#include "object_template.hpp"
4#include "group_template.hpp"
5
6#include "calendar_type.hpp"
7#include "duration.hpp"
8
9#include "context_client.hpp"
10#include "context_server.hpp"
11#include "nc4_data_output.hpp"
12#include "node_type.hpp"
13#include "message.hpp"
14#include "type.hpp"
15#include "xios_spl.hpp"
16
17
18namespace xios {
19
20  shared_ptr<CContextGroup> CContext::root;
21
22   /// ////////////////////// Dfinitions ////////////////////// ///
23
24   CContext::CContext(void)
25      : CObjectTemplate<CContext>(), CContextAttributes()
26      , calendar(), hasClient(false), hasServer(false), isPostProcessed(false), finalized(false)
27      , idServer_(), client(0), server(0)
28   { /* Ne rien faire de plus */ }
29
30   CContext::CContext(const StdString & id)
31      : CObjectTemplate<CContext>(id), CContextAttributes()
32      , calendar(), hasClient(false), hasServer(false), isPostProcessed(false), finalized(false)
33      , idServer_(), client(0), server(0)
34   { /* Ne rien faire de plus */ }
35
36   CContext::~CContext(void)
37   {
38     delete client;
39     delete server;
40   }
41
42   //----------------------------------------------------------------
43   //! Get name of context
44   StdString CContext::GetName(void)   { return (StdString("context")); }
45   StdString CContext::GetDefName(void){ return (CContext::GetName()); }
46   ENodeType CContext::GetType(void)   { return (eContext); }
47
48   //----------------------------------------------------------------
49
50   /*!
51   \brief Get context group (context root)
52   \return Context root
53   */
54   CContextGroup* CContext::getRoot(void)
55   {
56      if (root.get()==NULL) root=shared_ptr<CContextGroup>(new CContextGroup(xml::CXMLNode::GetRootName()));
57      return root.get();
58   }
59
60   //----------------------------------------------------------------
61
62   /*!
63   \brief Get calendar of a context
64   \return Calendar
65   */
66   boost::shared_ptr<CCalendar> CContext::getCalendar(void) const
67   {
68      return (this->calendar);
69   }
70
71   //----------------------------------------------------------------
72
73   /*!
74   \brief Set a context with a calendar
75   \param[in] newCalendar new calendar
76   */
77   void CContext::setCalendar(boost::shared_ptr<CCalendar> newCalendar)
78   {
79      this->calendar = newCalendar;
80   }
81
82   //----------------------------------------------------------------
83   /*!
84   \brief Parse xml file and write information into context object
85   \param [in] node xmld node corresponding in xml file
86   */
87   void CContext::parse(xml::CXMLNode & node)
88   {
89      CContext::SuperClass::parse(node);
90
91      // PARSING POUR GESTION DES ENFANTS
92      xml::THashAttributes attributes = node.getAttributes();
93
94      if (attributes.end() != attributes.find("src"))
95      {
96         StdIFStream ifs ( attributes["src"].c_str() , StdIFStream::in );
97         if ( (ifs.rdstate() & std::ifstream::failbit ) != 0 )
98            ERROR("void CContext::parse(xml::CXMLNode & node)",
99                  <<endl<< "Can not open <"<<attributes["src"].c_str()<<"> file" );
100         if (!ifs.good())
101            ERROR("CContext::parse(xml::CXMLNode & node)",
102                  << "[ filename = " << attributes["src"] << " ] Bad xml stream !");
103         xml::CXMLParser::ParseInclude(ifs, attributes["src"], *this);
104      }
105
106      if (node.getElementName().compare(CContext::GetName()))
107         DEBUG("Le noeud is wrong defined but will be considered as a context !");
108
109      if (!(node.goToChildElement()))
110      {
111         DEBUG("Le context ne contient pas d'enfant !");
112      }
113      else
114      {
115         do { // Parcours des contextes pour traitement.
116
117            StdString name = node.getElementName();
118            attributes.clear();
119            attributes = node.getAttributes();
120
121            if (attributes.end() != attributes.find("id"))
122            {
123              DEBUG(<< "Definition node has an id,"
124                    << "it will not be taking account !");
125            }
126
127#define DECLARE_NODE(Name_, name_)    \
128   if (name.compare(C##Name_##Definition::GetDefName()) == 0) \
129   { C##Name_##Definition::create(C##Name_##Definition::GetDefName()) -> parse(node); continue; }
130#define DECLARE_NODE_PAR(Name_, name_)
131#include "node_type.conf"
132
133            DEBUG(<< "The element \'"     << name
134                  << "\' in the context \'" << CContext::getCurrent()->getId()
135                  << "\' is not a definition !");
136
137         } while (node.goToNextElement());
138
139         node.goToParentElement(); // Retour au parent
140      }
141   }
142
143   //----------------------------------------------------------------
144   //! Show tree structure of context
145   void CContext::ShowTree(StdOStream & out)
146   {
147      StdString currentContextId = CContext::getCurrent() -> getId();
148      std::vector<CContext*> def_vector =
149         CContext::getRoot()->getChildList();
150      std::vector<CContext*>::iterator
151         it = def_vector.begin(), end = def_vector.end();
152
153      out << "<? xml version=\"1.0\" ?>" << std::endl;
154      out << "<"  << xml::CXMLNode::GetRootName() << " >" << std::endl;
155
156      for (; it != end; it++)
157      {
158         CContext* context = *it;
159         CContext::setCurrent(context->getId());
160         out << *context << std::endl;
161      }
162
163      out << "</" << xml::CXMLNode::GetRootName() << " >" << std::endl;
164      CContext::setCurrent(currentContextId);
165   }
166
167
168   //----------------------------------------------------------------
169
170   //! Convert context object into string (to print)
171   StdString CContext::toString(void) const
172   {
173      StdOStringStream oss;
174      oss << "<" << CContext::GetName()
175          << " id=\"" << this->getId() << "\" "
176          << SuperClassAttribute::toString() << ">" << std::endl;
177      if (!this->hasChild())
178      {
179         //oss << "<!-- No definition -->" << std::endl; // fait planter l'incrmentation
180      }
181      else
182      {
183
184#define DECLARE_NODE(Name_, name_)    \
185   if (C##Name_##Definition::has(C##Name_##Definition::GetDefName())) \
186   oss << * C##Name_##Definition::get(C##Name_##Definition::GetDefName()) << std::endl;
187#define DECLARE_NODE_PAR(Name_, name_)
188#include "node_type.conf"
189
190      }
191
192      oss << "</" << CContext::GetName() << " >";
193
194      return (oss.str());
195   }
196
197   //----------------------------------------------------------------
198
199   /*!
200   \brief Find all inheritace among objects in a context.
201   \param [in] apply (true) write attributes of parent into ones of child if they are empty
202                     (false) write attributes of parent into a new container of child
203   \param [in] parent unused
204   */
205   void CContext::solveDescInheritance(bool apply, const CAttributeMap * const UNUSED(parent))
206   {
207#define DECLARE_NODE(Name_, name_)    \
208   if (C##Name_##Definition::has(C##Name_##Definition::GetDefName())) \
209     C##Name_##Definition::get(C##Name_##Definition::GetDefName())->solveDescInheritance(apply);
210#define DECLARE_NODE_PAR(Name_, name_)
211#include "node_type.conf"
212   }
213
214   //----------------------------------------------------------------
215
216   //! Verify if all root definition in the context have child.
217   bool CContext::hasChild(void) const
218   {
219      return (
220#define DECLARE_NODE(Name_, name_)    \
221   C##Name_##Definition::has(C##Name_##Definition::GetDefName())   ||
222#define DECLARE_NODE_PAR(Name_, name_)
223#include "node_type.conf"
224      false);
225}
226
227   //----------------------------------------------------------------
228
229   void CContext::CleanTree(void)
230   {
231#define DECLARE_NODE(Name_, name_) C##Name_##Definition::ClearAllAttributes();
232#define DECLARE_NODE_PAR(Name_, name_)
233#include "node_type.conf"
234   }
235   ///---------------------------------------------------------------
236
237   //! Initialize client side
238   void CContext::initClient(ep_lib::MPI_Comm intraComm, ep_lib::MPI_Comm interComm, CContext* cxtServer /*= 0*/)
239   {
240     hasClient=true;
241     
242     #pragma omp critical
243     client = new CContextClient(this, intraComm, interComm, cxtServer);
244
245
246     int tmp_rank;
247     MPI_Comm_rank(intraComm, &tmp_rank);
248     MPI_Barrier(intraComm);
249     
250     #pragma omp critical
251     registryIn=new CRegistry(intraComm);
252     
253
254     registryIn->setPath(getId()) ;
255     
256     #pragma omp critical (_output)
257     printf("Client %d : registryIn->setPath(getId()=%s), clientRank = %d (%p) \n", tmp_rank, getId(), client->clientRank, &(client->clientRank)) ;
258     printf("Client %d : context.identifier = %d\n", tmp_rank, this->get_identifier());
259
260     if (client->clientRank==0) registryIn->fromFile("xios_registry.bin") ;
261     registryIn->bcastRegistry() ;
262
263     registryOut=new CRegistry(intraComm) ;
264     registryOut->setPath(getId()) ;
265     
266     #pragma omp critical (_output)
267     printf("Client %d : registryOut->setPath(getId()=%s) \n", tmp_rank, getId()) ;
268
269     ep_lib::MPI_Comm intraCommServer, interCommServer;
270     if (cxtServer) // Attached mode
271     {
272       intraCommServer = intraComm;
273       interCommServer = interComm;
274     }
275     else
276     {
277       MPI_Comm_dup(intraComm, &intraCommServer);
278       comms.push_back(intraCommServer);
279       MPI_Comm_dup(interComm, &interCommServer);
280       comms.push_back(interCommServer);
281       
282     }
283     server = new CContextServer(this,intraCommServer,interCommServer);
284   }
285
286   void CContext::setClientServerBuffer()
287   {
288     size_t minBufferSize = CXios::minBufferSize;
289#define DECLARE_NODE(Name_, name_)    \
290     if (minBufferSize < sizeof(C##Name_##Definition)) minBufferSize = sizeof(C##Name_##Definition);
291#define DECLARE_NODE_PAR(Name_, name_)
292#include "node_type.conf"
293#undef DECLARE_NODE
294#undef DECLARE_NODE_PAR
295
296     std::map<int, StdSize> maxEventSize;
297     std::map<int, StdSize> bufferSize = getAttributesBufferSize(maxEventSize);
298     std::map<int, StdSize> dataBufferSize = getDataBufferSize(maxEventSize);
299
300     std::map<int, StdSize>::iterator it, ite = dataBufferSize.end();
301     for (it = dataBufferSize.begin(); it != ite; ++it)
302       if (it->second > bufferSize[it->first]) bufferSize[it->first] = it->second;
303
304     ite = bufferSize.end();
305     for (it = bufferSize.begin(); it != ite; ++it)
306     {
307       it->second *= CXios::bufferSizeFactor;
308       if (it->second < minBufferSize) it->second = minBufferSize;
309     }
310
311     // We consider that the minimum buffer size is also the minimum event size
312     ite = maxEventSize.end();
313     for (it = maxEventSize.begin(); it != ite; ++it)
314       if (it->second < minBufferSize) it->second = minBufferSize;
315
316     if (client->isServerLeader())
317     {
318       const std::list<int>& ranks = client->getRanksServerLeader();
319       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
320         if (!bufferSize.count(*itRank)) bufferSize[*itRank] = maxEventSize[*itRank] = minBufferSize;
321     }
322
323     client->setBufferSize(bufferSize, maxEventSize);
324   }
325
326   //! Verify whether a context is initialized
327   bool CContext::isInitialized(void)
328   {
329     return hasClient;
330   }
331
332   //! Initialize server
333   void CContext::initServer(ep_lib::MPI_Comm intraComm, ep_lib::MPI_Comm interComm, CContext* cxtClient /*= 0*/)
334   {
335     hasServer=true;
336     server = new CContextServer(this,intraComm,interComm);
337
338     registryIn=new CRegistry(intraComm);
339     registryIn->setPath(getId()) ;
340     if (server->intraCommRank==0) registryIn->fromFile("xios_registry.bin") ;
341     registryIn->bcastRegistry() ;
342     registryOut=new CRegistry(intraComm) ;
343     registryOut->setPath(getId()) ;
344
345     ep_lib::MPI_Comm intraCommClient, interCommClient;
346     if (cxtClient) // Attached mode
347     {
348       intraCommClient = intraComm;
349       interCommClient = interComm;
350     }
351     else
352     {
353       MPI_Comm_dup(intraComm, &intraCommClient);
354       comms.push_back(intraCommClient);
355       MPI_Comm_dup(interComm, &interCommClient);
356       comms.push_back(interCommClient);
357     }
358     client = new CContextClient(this,intraCommClient,interCommClient, cxtClient);
359   }
360
361   //! Server side: Put server into a loop in order to listen message from client
362   bool CContext::eventLoop(void)
363   {
364     return server->eventLoop();
365   }
366
367   //! Try to send the buffers and receive possible answers
368   bool CContext::checkBuffersAndListen(void)
369   {
370     client->checkBuffers();
371     return server->eventLoop();
372   }
373
374   //! Terminate a context
375   void CContext::finalize(void)
376   {
377     
378      if (!finalized)
379      {
380        finalized = true;
381        if (hasClient) sendRegistry() ;
382        client->finalize();
383        while (!server->hasFinished())
384        {
385          server->eventLoop();
386        }
387
388        if (hasServer)
389        {
390          closeAllFile();
391          registryOut->hierarchicalGatherRegistry() ;
392          if (server->intraCommRank==0) CXios::globalRegistry->mergeRegistry(*registryOut) ;
393        }
394
395        for (std::list<ep_lib::MPI_Comm>::iterator it = comms.begin(); it != comms.end(); ++it)
396          MPI_Comm_free(&(*it));
397        comms.clear();
398      }
399   }
400
401   /*!
402   \brief Close all the context defintion and do processing data
403      After everything is well defined on client side, they will be processed and sent to server
404   From the version 2.0, sever and client work no more on the same database. Moreover, client(s) will send
405   all necessary information to server, from which each server can build its own database.
406   Because the role of server is to write out field data on a specific netcdf file,
407   the only information that it needs is the enabled files
408   and the active fields (fields will be written onto active files)
409   */
410   void CContext::closeDefinition(void)
411   {
412
413     // There is nothing client need to send to server
414     if (hasClient)
415     {
416       // After xml is parsed, there are some more works with post processing
417       postProcessing(); 
418     }
419     setClientServerBuffer(); 
420
421     if (hasClient && !hasServer)
422     {
423      // Send all attributes of current context to server
424      this->sendAllAttributesToServer();
425
426      // Send all attributes of current calendar
427      CCalendarWrapper::get(CCalendarWrapper::GetDefName())->sendAllAttributesToServer();
428
429      // We have enough information to send to server
430      // First of all, send all enabled files
431       sendEnabledFiles(); 
432
433      // Then, send all enabled fields
434       sendEnabledFields(); 
435
436      // At last, we have all info of domain and axis, then send them
437       sendRefDomainsAxis(); 
438      // After that, send all grid (if any)
439       sendRefGrid(); 
440    }
441
442    // We have a xml tree on the server side and now, it should be also processed
443    if (hasClient && !hasServer) sendPostProcessing(); 
444
445    // There are some processings that should be done after all of above. For example: check mask or index
446    if (hasClient)
447    {
448      this->buildFilterGraphOfEnabledFields(); 
449      buildFilterGraphOfFieldsWithReadAccess(); 
450      this->solveAllRefOfEnabledFields(true); 
451    }
452
453    // Now tell server that it can process all messages from client
454    if (hasClient && !hasServer) this->sendCloseDefinition();
455
456    // Nettoyage de l'arborescence
457    if (hasClient && !hasServer) CleanTree();
458
459    if (hasClient)
460    {
461      sendCreateFileHeader(); 
462
463      startPrefetchingOfEnabledReadModeFiles(); 
464    }
465   }
466
467   void CContext::findAllEnabledFields(void)
468   {
469     for (unsigned int i = 0; i < this->enabledFiles.size(); i++)
470     (void)this->enabledFiles[i]->getEnabledFields();
471   }
472
473   void CContext::findAllEnabledFieldsInReadModeFiles(void)
474   {
475     for (unsigned int i = 0; i < this->enabledReadModeFiles.size(); ++i)
476     (void)this->enabledReadModeFiles[i]->getEnabledFields();
477   }
478
479   void CContext::readAttributesOfEnabledFieldsInReadModeFiles()
480   {
481      for (unsigned int i = 0; i < this->enabledReadModeFiles.size(); ++i)
482        (void)this->enabledReadModeFiles[i]->readAttributesOfEnabledFieldsInReadMode();
483   }
484
485   void CContext::solveOnlyRefOfEnabledFields(bool sendToServer)
486   {
487     int size = this->enabledFiles.size();
488     for (int i = 0; i < size; ++i)
489     {
490       this->enabledFiles[i]->solveOnlyRefOfEnabledFields(sendToServer);
491     }
492
493     for (int i = 0; i < size; ++i)
494     {
495       this->enabledFiles[i]->generateNewTransformationGridDest();
496     }
497   }
498
499   void CContext::solveAllRefOfEnabledFields(bool sendToServer)
500   {
501     int size = this->enabledFiles.size();
502     
503     for (int i = 0; i < size; ++i)
504     {
505       this->enabledFiles[i]->solveAllRefOfEnabledFields(sendToServer);
506     }
507   }
508
509   void CContext::buildFilterGraphOfEnabledFields()
510   {
511     int size = this->enabledFiles.size();
512     for (int i = 0; i < size; ++i)
513     {
514       this->enabledFiles[i]->buildFilterGraphOfEnabledFields(garbageCollector);
515     }
516   }
517
518   void CContext::startPrefetchingOfEnabledReadModeFiles()
519   {
520     int size = enabledReadModeFiles.size();
521     for (int i = 0; i < size; ++i)
522     {
523        enabledReadModeFiles[i]->prefetchEnabledReadModeFields();
524     }
525   }
526
527   void CContext::checkPrefetchingOfEnabledReadModeFiles()
528   {
529     int size = enabledReadModeFiles.size();
530     for (int i = 0; i < size; ++i)
531     {
532        enabledReadModeFiles[i]->prefetchEnabledReadModeFieldsIfNeeded();
533     }
534   }
535
536  void CContext::findFieldsWithReadAccess(void)
537  {
538    fieldsWithReadAccess.clear();
539    const vector<CField*> allFields = CField::getAll();
540    for (size_t i = 0; i < allFields.size(); ++i)
541    {
542      CField* field = allFields[i];
543
544      if (field->file && !field->file->mode.isEmpty() && field->file->mode == CFile::mode_attr::read)
545        field->read_access = true;
546      else if (!field->read_access.isEmpty() && field->read_access && (field->enabled.isEmpty() || field->enabled))
547        fieldsWithReadAccess.push_back(field);
548    }
549  }
550
551  void CContext::solveAllRefOfFieldsWithReadAccess()
552  {
553    for (size_t i = 0; i < fieldsWithReadAccess.size(); ++i)
554      fieldsWithReadAccess[i]->solveAllReferenceEnabledField(false);
555  }
556
557  void CContext::buildFilterGraphOfFieldsWithReadAccess()
558  {
559    for (size_t i = 0; i < fieldsWithReadAccess.size(); ++i)
560      fieldsWithReadAccess[i]->buildFilterGraph(garbageCollector, true);
561  }
562
563   void CContext::solveAllInheritance(bool apply)
564   {
565     // Rsolution des hritages descendants (cd des hritages de groupes)
566     // pour chacun des contextes.
567      solveDescInheritance(apply);
568
569     // Rsolution des hritages par rfrence au niveau des fichiers.
570      const vector<CFile*> allFiles=CFile::getAll();
571      const vector<CGrid*> allGrids= CGrid::getAll();
572
573     //if (hasClient && !hasServer)
574      if (hasClient)
575      {
576        for (unsigned int i = 0; i < allFiles.size(); i++)
577          allFiles[i]->solveFieldRefInheritance(apply);
578      }
579
580      unsigned int vecSize = allGrids.size();
581      unsigned int i = 0;
582      for (i = 0; i < vecSize; ++i)
583        allGrids[i]->solveDomainAxisRefInheritance(apply);
584
585   }
586
587   void CContext::findEnabledFiles(void)
588   {
589      const std::vector<CFile*> allFiles = CFile::getAll();
590
591      for (unsigned int i = 0; i < allFiles.size(); i++)
592         if (!allFiles[i]->enabled.isEmpty()) // Si l'attribut 'enabled' est dfini.
593         {
594            if (allFiles[i]->enabled.getValue()) // Si l'attribut 'enabled' est fix  vrai.
595               enabledFiles.push_back(allFiles[i]);
596         }
597         else enabledFiles.push_back(allFiles[i]); // otherwise true by default
598
599
600      if (enabledFiles.size() == 0)
601         DEBUG(<<"Aucun fichier ne va tre sorti dans le contexte nomm \""
602               << getId() << "\" !");
603   }
604
605   void CContext::findEnabledReadModeFiles(void)
606   {
607     int size = this->enabledFiles.size();
608     for (int i = 0; i < size; ++i)
609     {
610       if (!enabledFiles[i]->mode.isEmpty() && enabledFiles[i]->mode.getValue() == CFile::mode_attr::read)
611        enabledReadModeFiles.push_back(enabledFiles[i]);
612     }
613   }
614
615   void CContext::closeAllFile(void)
616   {
617     std::vector<CFile*>::const_iterator
618            it = this->enabledFiles.begin(), end = this->enabledFiles.end();
619
620     for (; it != end; it++)
621     {
622       info(30)<<"Closing File : "<<(*it)->getId()<<endl;
623       (*it)->close();
624     }
625   }
626
627   /*!
628   \brief Dispatch event received from client
629      Whenever a message is received in buffer of server, it will be processed depending on
630   its event type. A new event type should be added in the switch list to make sure
631   it processed on server side.
632   \param [in] event: Received message
633   */
634   bool CContext::dispatchEvent(CEventServer& event)
635   {
636
637      if (SuperClass::dispatchEvent(event)) return true;
638      else
639      {
640        switch(event.type)
641        {
642           case EVENT_ID_CLOSE_DEFINITION :
643             recvCloseDefinition(event);
644             return true;
645             break;
646           case EVENT_ID_UPDATE_CALENDAR:
647             recvUpdateCalendar(event);
648             return true;
649             break;
650           case EVENT_ID_CREATE_FILE_HEADER :
651             recvCreateFileHeader(event);
652             return true;
653             break;
654           case EVENT_ID_POST_PROCESS:
655             recvPostProcessing(event);
656             return true;
657            case EVENT_ID_SEND_REGISTRY:
658             recvRegistry(event);
659             return true;
660            break;
661
662           default :
663             ERROR("bool CContext::dispatchEvent(CEventServer& event)",
664                    <<"Unknown Event");
665           return false;
666         }
667      }
668   }
669
670   //! Client side: Send a message to server to make it close
671   void CContext::sendCloseDefinition(void)
672   {
673     CEventClient event(getType(),EVENT_ID_CLOSE_DEFINITION);
674     if (client->isServerLeader())
675     {
676       CMessage msg;
677       msg<<this->getIdServer();
678       const std::list<int>& ranks = client->getRanksServerLeader();
679       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
680         event.push(*itRank,1,msg);
681       client->sendEvent(event);
682     }
683     else client->sendEvent(event);
684   }
685
686   //! Server side: Receive a message of client announcing a context close
687   void CContext::recvCloseDefinition(CEventServer& event)
688   {
689
690      CBufferIn* buffer=event.subEvents.begin()->buffer;
691      string id;
692      *buffer>>id;
693      get(id)->closeDefinition();
694   }
695
696   //! Client side: Send a message to update calendar in each time step
697   void CContext::sendUpdateCalendar(int step)
698   {
699     if (!hasServer)
700     {
701       CEventClient event(getType(),EVENT_ID_UPDATE_CALENDAR);
702       if (client->isServerLeader())
703       {
704         CMessage msg;
705         msg<<this->getIdServer()<<step;
706         const std::list<int>& ranks = client->getRanksServerLeader();
707         for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
708           event.push(*itRank,1,msg);
709         client->sendEvent(event);
710       }
711       else client->sendEvent(event);
712     }
713   }
714
715   //! Server side: Receive a message of client annoucing calendar update
716   void CContext::recvUpdateCalendar(CEventServer& event)
717   {
718      CBufferIn* buffer=event.subEvents.begin()->buffer;
719      string id;
720      *buffer>>id;
721      get(id)->recvUpdateCalendar(*buffer);
722   }
723
724   //! Server side: Receive a message of client annoucing calendar update
725   void CContext::recvUpdateCalendar(CBufferIn& buffer)
726   {
727      int step;
728      buffer>>step;
729      updateCalendar(step);
730   }
731
732   //! Client side: Send a message to create header part of netcdf file
733   void CContext::sendCreateFileHeader(void)
734   {
735     CEventClient event(getType(),EVENT_ID_CREATE_FILE_HEADER);
736     if (client->isServerLeader())
737     {
738       CMessage msg;
739       msg<<this->getIdServer();
740       const std::list<int>& ranks = client->getRanksServerLeader();
741       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
742         event.push(*itRank,1,msg) ;
743       client->sendEvent(event);
744     }
745     else client->sendEvent(event);
746   }
747
748   //! Server side: Receive a message of client annoucing the creation of header part of netcdf file
749   void CContext::recvCreateFileHeader(CEventServer& event)
750   {
751      CBufferIn* buffer=event.subEvents.begin()->buffer;
752      string id;
753      *buffer>>id;
754      get(id)->recvCreateFileHeader(*buffer);
755   }
756
757   //! Server side: Receive a message of client annoucing the creation of header part of netcdf file
758   void CContext::recvCreateFileHeader(CBufferIn& buffer)
759   {
760      createFileHeader();
761   }
762
763   //! Client side: Send a message to do some post processing on server
764   void CContext::sendPostProcessing()
765   {
766     if (!hasServer)
767     {
768       CEventClient event(getType(),EVENT_ID_POST_PROCESS);
769       if (client->isServerLeader())
770       {
771         CMessage msg;
772         msg<<this->getIdServer();
773         const std::list<int>& ranks = client->getRanksServerLeader();
774         for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
775           event.push(*itRank,1,msg);
776         client->sendEvent(event);
777       }
778       else client->sendEvent(event);
779     }
780   }
781
782   //! Server side: Receive a message to do some post processing
783   void CContext::recvPostProcessing(CEventServer& event)
784   {
785      CBufferIn* buffer=event.subEvents.begin()->buffer;
786      string id;
787      *buffer>>id;
788      get(id)->recvPostProcessing(*buffer);
789   }
790
791   //! Server side: Receive a message to do some post processing
792   void CContext::recvPostProcessing(CBufferIn& buffer)
793   {
794      CCalendarWrapper::get(CCalendarWrapper::GetDefName())->createCalendar();
795      postProcessing();
796   }
797
798   const StdString& CContext::getIdServer()
799   {
800      if (hasClient)
801      {
802        idServer_ = this->getId();
803        idServer_ += "_server";
804        return idServer_;
805      }
806      if (hasServer) return (this->getId());
807   }
808
809   /*!
810   \brief Do some simple post processings after parsing xml file
811      After the xml file (iodef.xml) is parsed, it is necessary to build all relations among
812   created object, e.g: inhertance among fields, domain, axis. After that, all fiels as well as their parents (reference fields),
813   which will be written out into netcdf files, are processed
814   */
815   void CContext::postProcessing()
816   {
817     int myRank;
818     MPI_Comm_rank(MPI_COMM_WORLD, &myRank);
819
820     //printf("myRank = %d, in postProcessing, isPostProcessed = %d\n", myRank, isPostProcessed);
821     if (isPostProcessed) return;
822
823      // Make sure the calendar was correctly created
824      if (!calendar)
825        ERROR("CContext::postProcessing()", << "A calendar must be defined for the context \"" << getId() << "!\"")
826      else if (calendar->getTimeStep() == NoneDu)
827        ERROR("CContext::postProcessing()", << "A timestep must be defined for the context \"" << getId() << "!\"")
828      // Calendar first update to set the current date equals to the start date
829      calendar->update(0);  //printf("myRank = %d, calendar->update(0) OK\n", myRank);
830
831      // Find all inheritance in xml structure
832      this->solveAllInheritance();  //printf("myRank = %d, this->solveAllInheritance OK\n", myRank);
833
834      // Check if some axis, domains or grids are eligible to for compressed indexed output.
835      // Warning: This must be done after solving the inheritance and before the rest of post-processing
836      checkAxisDomainsGridsEligibilityForCompressedOutput();  //printf("myRank = %d, checkAxisDomainsGridsEligibilityForCompressedOutput OK\n", myRank);
837
838      // Check if some automatic time series should be generated
839      // Warning: This must be done after solving the inheritance and before the rest of post-processing
840      prepareTimeseries();  //printf("myRank = %d, prepareTimeseries OK\n", myRank);
841
842      //Initialisation du vecteur 'enabledFiles' contenant la liste des fichiers  sortir.
843      this->findEnabledFiles();  //printf("myRank = %d, this->findEnabledFiles OK\n", myRank);
844      this->findEnabledReadModeFiles();  //printf("myRank = %d, this->findEnabledReadModeFiles OK\n", myRank);
845
846      // Find all enabled fields of each file
847      this->findAllEnabledFields();  //printf("myRank = %d, this->findAllEnabledFields OK\n", myRank);
848      this->findAllEnabledFieldsInReadModeFiles();  //printf("myRank = %d, this->findAllEnabledFieldsInReadModeFiles OK\n", myRank);
849
850     if (hasClient && !hasServer)
851     {
852      // Try to read attributes of fields in file then fill in corresponding grid (or domain, axis)
853      this->readAttributesOfEnabledFieldsInReadModeFiles();  //printf("myRank = %d, this->readAttributesOfEnabledFieldsInReadModeFiles OK\n", myRank);
854     }
855
856      // Only search and rebuild all reference objects of enable fields, don't transform
857      this->solveOnlyRefOfEnabledFields(false);  //printf("myRank = %d, this->solveOnlyRefOfEnabledFields(false) OK\n", myRank);
858
859      // Search and rebuild all reference object of enabled fields
860      this->solveAllRefOfEnabledFields(false);  //printf("myRank = %d, this->solveAllRefOfEnabledFields(false) OK\n", myRank);
861
862      // Find all fields with read access from the public API
863      findFieldsWithReadAccess();  //printf("myRank = %d, findFieldsWithReadAccess OK\n", myRank);
864      // and solve the all reference for them
865      solveAllRefOfFieldsWithReadAccess();  //printf("myRank = %d, solveAllRefOfFieldsWithReadAccess OK\n", myRank);
866
867      isPostProcessed = true;
868   }
869
870   /*!
871    * Compute the required buffer size to send the attributes (mostly those grid related).
872    *
873    * \param maxEventSize [in/out] the size of the bigger event for each connected server
874    */
875   std::map<int, StdSize> CContext::getAttributesBufferSize(std::map<int, StdSize>& maxEventSize)
876   {
877     std::map<int, StdSize> attributesSize;
878
879     if (hasClient)
880     {
881       size_t numEnabledFiles = this->enabledFiles.size();
882       for (size_t i = 0; i < numEnabledFiles; ++i)
883       {
884         CFile* file = this->enabledFiles[i];
885
886         std::vector<CField*> enabledFields = file->getEnabledFields();
887         size_t numEnabledFields = enabledFields.size();
888         for (size_t j = 0; j < numEnabledFields; ++j)
889         {
890           const std::map<int, StdSize> mapSize = enabledFields[j]->getGridAttributesBufferSize();
891           std::map<int, StdSize>::const_iterator it = mapSize.begin(), itE = mapSize.end();
892           for (; it != itE; ++it)
893           {
894             // If attributesSize[it->first] does not exist, it will be zero-initialized
895             // so we can use it safely without checking for its existance
896             if (attributesSize[it->first] < it->second)
897               attributesSize[it->first] = it->second;
898
899             if (maxEventSize[it->first] < it->second)
900               maxEventSize[it->first] = it->second;
901           }
902         }
903       }
904     }
905
906     return attributesSize;
907   }
908
909   /*!
910    * Compute the required buffer size to send the fields data.
911    *
912    * \param maxEventSize [in/out] the size of the bigger event for each connected server
913    */
914   std::map<int, StdSize> CContext::getDataBufferSize(std::map<int, StdSize>& maxEventSize)
915   {
916     CFile::mode_attr::t_enum mode = hasClient ? CFile::mode_attr::write : CFile::mode_attr::read;
917
918     std::map<int, StdSize> dataSize;
919
920     // Find all reference domain and axis of all active fields
921     size_t numEnabledFiles = this->enabledFiles.size();
922     for (size_t i = 0; i < numEnabledFiles; ++i)
923     {
924       CFile* file = this->enabledFiles[i];
925       CFile::mode_attr::t_enum fileMode = file->mode.isEmpty() ? CFile::mode_attr::write : file->mode.getValue();
926
927       if (fileMode == mode)
928       {
929         std::vector<CField*> enabledFields = file->getEnabledFields();
930         size_t numEnabledFields = enabledFields.size();
931         for (size_t j = 0; j < numEnabledFields; ++j)
932         {
933           const std::map<int, StdSize> mapSize = enabledFields[j]->getGridDataBufferSize();
934           std::map<int, StdSize>::const_iterator it = mapSize.begin(), itE = mapSize.end();
935           for (; it != itE; ++it)
936           {
937             // If dataSize[it->first] does not exist, it will be zero-initialized
938             // so we can use it safely without checking for its existance
939             if (CXios::isOptPerformance)
940               dataSize[it->first] += it->second;
941             else if (dataSize[it->first] < it->second)
942               dataSize[it->first] = it->second;
943
944             if (maxEventSize[it->first] < it->second)
945               maxEventSize[it->first] = it->second;
946           }
947         }
948       }
949     }
950
951     return dataSize;
952   }
953
954   //! Client side: Send infomation of active files (files are enabled to write out)
955   void CContext::sendEnabledFiles()
956   {
957     int size = this->enabledFiles.size();
958
959     // In a context, each type has a root definition, e.g: axis, domain, field.
960     // Every object must be a child of one of these root definition. In this case
961     // all new file objects created on server must be children of the root "file_definition"
962     StdString fileDefRoot("file_definition");
963     CFileGroup* cfgrpPtr = CFileGroup::get(fileDefRoot);
964
965     for (int i = 0; i < size; ++i)
966     {
967       cfgrpPtr->sendCreateChild(this->enabledFiles[i]->getId());
968       this->enabledFiles[i]->sendAllAttributesToServer();
969       this->enabledFiles[i]->sendAddAllVariables();
970     }
971   }
972
973   //! Client side: Send information of active fields (ones are written onto files)
974   void CContext::sendEnabledFields()
975   {
976     int size = this->enabledFiles.size();
977     for (int i = 0; i < size; ++i)
978     {
979       this->enabledFiles[i]->sendEnabledFields();
980     }
981   }
982
983   //! Client side: Check if the defined axis, domains and grids are eligible for compressed indexed output
984   void CContext::checkAxisDomainsGridsEligibilityForCompressedOutput()
985   {
986     if (!hasClient) return;
987
988     const vector<CAxis*> allAxis = CAxis::getAll();
989     for (vector<CAxis*>::const_iterator it = allAxis.begin(); it != allAxis.end(); it++)
990       (*it)->checkEligibilityForCompressedOutput();
991
992     const vector<CDomain*> allDomains = CDomain::getAll();
993     for (vector<CDomain*>::const_iterator it = allDomains.begin(); it != allDomains.end(); it++)
994       (*it)->checkEligibilityForCompressedOutput();
995
996     const vector<CGrid*> allGrids = CGrid::getAll();
997     for (vector<CGrid*>::const_iterator it = allGrids.begin(); it != allGrids.end(); it++)
998       (*it)->checkEligibilityForCompressedOutput();
999   }
1000
1001   //! Client side: Prepare the timeseries by adding the necessary files
1002   void CContext::prepareTimeseries()
1003   {
1004     if (!hasClient) return;
1005
1006     const std::vector<CFile*> allFiles = CFile::getAll();
1007     for (size_t i = 0; i < allFiles.size(); i++)
1008     {
1009       CFile* file = allFiles[i];
1010
1011       if (!file->timeseries.isEmpty() && file->timeseries != CFile::timeseries_attr::none)
1012       {
1013         StdString tsPrefix = !file->ts_prefix.isEmpty() ? file->ts_prefix : file->getFileOutputName();
1014
1015         const std::vector<CField*> allFields = file->getAllFields();
1016         for (size_t j = 0; j < allFields.size(); j++)
1017         {
1018           CField* field = allFields[j];
1019
1020           if (!field->ts_enabled.isEmpty() && field->ts_enabled)
1021           {
1022             CFile* tsFile = CFile::create();
1023             tsFile->duplicateAttributes(file);
1024             tsFile->setVirtualVariableGroup(file->getVirtualVariableGroup());
1025
1026             tsFile->name = tsPrefix + "_";
1027             if (!field->name.isEmpty())
1028               tsFile->name.get() += field->name;
1029             else if (field->hasDirectFieldReference()) // We cannot use getBaseFieldReference() just yet
1030               tsFile->name.get() += field->field_ref;
1031             else
1032               tsFile->name.get() += field->getId();
1033
1034             if (!field->ts_split_freq.isEmpty())
1035               tsFile->split_freq = field->ts_split_freq;
1036
1037             CField* tsField = tsFile->addField();
1038             tsField->field_ref = field->getId();
1039             tsField->setVirtualVariableGroup(field->getVirtualVariableGroup());
1040
1041             tsFile->solveFieldRefInheritance(true);
1042
1043             if (file->timeseries == CFile::timeseries_attr::exclusive)
1044               field->enabled = false;
1045           }
1046         }
1047
1048         // Finally disable the original file is need be
1049         if (file->timeseries == CFile::timeseries_attr::only)
1050          file->enabled = false;
1051       }
1052     }
1053   }
1054
1055   //! Client side: Send information of reference grid of active fields
1056   void CContext::sendRefGrid()
1057   {
1058     std::set<StdString> gridIds;
1059     int sizeFile = this->enabledFiles.size();
1060     CFile* filePtr(NULL);
1061
1062     // Firstly, find all reference grids of all active fields
1063     for (int i = 0; i < sizeFile; ++i)
1064     {
1065       filePtr = this->enabledFiles[i];
1066       std::vector<CField*> enabledFields = filePtr->getEnabledFields();
1067       int sizeField = enabledFields.size();
1068       for (int numField = 0; numField < sizeField; ++numField)
1069       {
1070         if (0 != enabledFields[numField]->getRelGrid())
1071           gridIds.insert(CGrid::get(enabledFields[numField]->getRelGrid())->getId());
1072       }
1073     }
1074
1075     // Create all reference grids on server side
1076     StdString gridDefRoot("grid_definition");
1077     CGridGroup* gridPtr = CGridGroup::get(gridDefRoot);
1078     std::set<StdString>::const_iterator it, itE = gridIds.end();
1079     for (it = gridIds.begin(); it != itE; ++it)
1080     {
1081       gridPtr->sendCreateChild(*it);
1082       CGrid::get(*it)->sendAllAttributesToServer();
1083       CGrid::get(*it)->sendAllDomains();
1084       CGrid::get(*it)->sendAllAxis();
1085       CGrid::get(*it)->sendAllScalars();
1086     }
1087   }
1088
1089
1090   //! Client side: Send information of reference domain and axis of active fields
1091   void CContext::sendRefDomainsAxis()
1092   {
1093     std::set<StdString> domainIds, axisIds, scalarIds;
1094
1095     // Find all reference domain and axis of all active fields
1096     int numEnabledFiles = this->enabledFiles.size();
1097     for (int i = 0; i < numEnabledFiles; ++i)
1098     {
1099       std::vector<CField*> enabledFields = this->enabledFiles[i]->getEnabledFields();
1100       int numEnabledFields = enabledFields.size();
1101       for (int j = 0; j < numEnabledFields; ++j)
1102       {
1103         const std::vector<StdString>& prDomAxisScalarId = enabledFields[j]->getRefDomainAxisIds();
1104         if ("" != prDomAxisScalarId[0]) domainIds.insert(prDomAxisScalarId[0]);
1105         if ("" != prDomAxisScalarId[1]) axisIds.insert(prDomAxisScalarId[1]);
1106         if ("" != prDomAxisScalarId[2]) scalarIds.insert(prDomAxisScalarId[2]);
1107       }
1108     }
1109
1110     // Create all reference axis on server side
1111     std::set<StdString>::iterator itDom, itAxis, itScalar;
1112     std::set<StdString>::const_iterator itE;
1113
1114     StdString scalarDefRoot("scalar_definition");
1115     CScalarGroup* scalarPtr = CScalarGroup::get(scalarDefRoot);
1116     itE = scalarIds.end();
1117     for (itScalar = scalarIds.begin(); itScalar != itE; ++itScalar)
1118     {
1119       if (!itScalar->empty())
1120       {
1121         scalarPtr->sendCreateChild(*itScalar);
1122         CScalar::get(*itScalar)->sendAllAttributesToServer();
1123       }
1124     }
1125
1126     StdString axiDefRoot("axis_definition");
1127     CAxisGroup* axisPtr = CAxisGroup::get(axiDefRoot);
1128     itE = axisIds.end();
1129     for (itAxis = axisIds.begin(); itAxis != itE; ++itAxis)
1130     {
1131       if (!itAxis->empty())
1132       {
1133         axisPtr->sendCreateChild(*itAxis);
1134         CAxis::get(*itAxis)->sendAllAttributesToServer();
1135       }
1136     }
1137
1138     // Create all reference domains on server side
1139     StdString domDefRoot("domain_definition");
1140     CDomainGroup* domPtr = CDomainGroup::get(domDefRoot);
1141     itE = domainIds.end();
1142     for (itDom = domainIds.begin(); itDom != itE; ++itDom)
1143     {
1144       if (!itDom->empty()) {
1145          domPtr->sendCreateChild(*itDom);
1146          CDomain::get(*itDom)->sendAllAttributesToServer();
1147       }
1148     }
1149   }
1150
1151   //! Update calendar in each time step
1152   void CContext::updateCalendar(int step)
1153   {
1154      info(50) << "updateCalendar : before : " << calendar->getCurrentDate() << endl;
1155      calendar->update(step);
1156      info(50) << "updateCalendar : after : " << calendar->getCurrentDate() << endl;
1157
1158      if (hasClient)
1159      {
1160        checkPrefetchingOfEnabledReadModeFiles();
1161        garbageCollector.invalidate(calendar->getCurrentDate());
1162      }
1163   }
1164
1165   //! Server side: Create header of netcdf file
1166   void CContext::createFileHeader(void )
1167   {
1168      vector<CFile*>::const_iterator it;
1169
1170      for (it=enabledFiles.begin(); it != enabledFiles.end(); it++)
1171      {
1172         (*it)->initFile();
1173      }
1174   }
1175
1176   //! Get current context
1177   CContext* CContext::getCurrent(void)
1178   {
1179     return CObjectFactory::GetObject<CContext>(CObjectFactory::GetCurrentContextId()).get();
1180   }
1181
1182   /*!
1183   \brief Set context with an id be the current context
1184   \param [in] id identity of context to be set to current
1185   */
1186   void CContext::setCurrent(const string& id)
1187   {
1188     CObjectFactory::SetCurrentContextId(id);
1189     CGroupFactory::SetCurrentContextId(id);
1190   }
1191
1192  /*!
1193  \brief Create a context with specific id
1194  \param [in] id identity of new context
1195  \return pointer to the new context or already-existed one with identity id
1196  */
1197   //bkp
1198  CContext* CContext::create(const StdString& id)
1199  {
1200    CContext::setCurrent(id);
1201
1202    bool hasctxt = CContext::has(id);
1203    CContext* context = CObjectFactory::CreateObject<CContext>(id).get();
1204    getRoot();
1205    if (!hasctxt) CGroupFactory::AddChild(root, context->getShared());
1206
1207#define DECLARE_NODE(Name_, name_) \
1208    C##Name_##Definition::create(C##Name_##Definition::GetDefName());
1209#define DECLARE_NODE_PAR(Name_, name_)
1210#include "node_type.conf"
1211
1212    return (context);
1213  }
1214
1215  int CContext::get_identifier()
1216  {
1217    return this->identifier;
1218  }
1219
1220
1221     //! Server side: Receive a message to do some post processing
1222  void CContext::recvRegistry(CEventServer& event)
1223  {
1224    CBufferIn* buffer=event.subEvents.begin()->buffer;
1225    string id;
1226    *buffer>>id;
1227    get(id)->recvRegistry(*buffer);
1228  }
1229
1230  void CContext::recvRegistry(CBufferIn& buffer)
1231  {
1232    if (server->intraCommRank==0)
1233    {
1234      CRegistry registry(server->intraComm) ;
1235      registry.fromBuffer(buffer) ;
1236      registryOut->mergeRegistry(registry) ;
1237    }
1238  }
1239
1240  void CContext::sendRegistry(void)
1241  {
1242    registryOut->hierarchicalGatherRegistry() ;
1243
1244    CEventClient event(CContext::GetType(), CContext::EVENT_ID_SEND_REGISTRY);
1245    if (client->isServerLeader())
1246    {
1247       CMessage msg ;
1248       msg<<this->getIdServer();
1249       if (client->clientRank==0) msg<<*registryOut ;
1250       const std::list<int>& ranks = client->getRanksServerLeader();
1251       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
1252         event.push(*itRank,1,msg);
1253       client->sendEvent(event);
1254    }
1255    else client->sendEvent(event);
1256  }
1257
1258} // namespace xios
Note: See TracBrowser for help on using the repository browser.