source: XIOS/dev/XIOS_DEV_CMIP6/src/node/context.cpp @ 1250

Last change on this file since 1250 was 1239, checked in by mhnguyen, 7 years ago

Correcting a minor bug on detecting server reading level

+) Reading server level should be 1 (for now). In the future, maybe level-2 server can be used
+) Correct nbReadSenders with the right ContextClient?

Test
+) On Curie
+) Work with toy

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
File size: 63.9 KB
Line 
1#include "context.hpp"
2#include "attribute_template.hpp"
3#include "object_template.hpp"
4#include "group_template.hpp"
5
6#include "calendar_type.hpp"
7#include "duration.hpp"
8
9#include "context_client.hpp"
10#include "context_server.hpp"
11#include "nc4_data_output.hpp"
12#include "node_type.hpp"
13#include "message.hpp"
14#include "type.hpp"
15#include "xios_spl.hpp"
16#include "timer.hpp"
17#include "memtrack.hpp"
18#include <limits>
19#include "server.hpp"
20
21namespace xios {
22
23  shared_ptr<CContextGroup> CContext::root;
24
25   /// ////////////////////// Définitions ////////////////////// ///
26
27   CContext::CContext(void)
28      : CObjectTemplate<CContext>(), CContextAttributes()
29      , calendar(), hasClient(false), hasServer(false)
30      , isPostProcessed(false), finalized(false)
31      , idServer_(), client(0), server(0)
32      , allProcessed(false), countChildCtx_(0)
33   { /* Ne rien faire de plus */ }
34
35   CContext::CContext(const StdString & id)
36      : CObjectTemplate<CContext>(id), CContextAttributes()
37      , calendar(), hasClient(false), hasServer(false)
38      , isPostProcessed(false), finalized(false)
39      , idServer_(), client(0), server(0)
40      , allProcessed(false), countChildCtx_(0)
41   { /* Ne rien faire de plus */ }
42
43   CContext::~CContext(void)
44   {
45     delete client;
46     delete server;
47     for (std::vector<CContextClient*>::iterator it = clientPrimServer.begin(); it != clientPrimServer.end(); it++)  delete *it;
48     for (std::vector<CContextServer*>::iterator it = serverPrimServer.begin(); it != serverPrimServer.end(); it++)  delete *it;
49
50   }
51
52   //----------------------------------------------------------------
53   //! Get name of context
54   StdString CContext::GetName(void)   { return (StdString("context")); }
55   StdString CContext::GetDefName(void){ return (CContext::GetName()); }
56   ENodeType CContext::GetType(void)   { return (eContext); }
57
58   //----------------------------------------------------------------
59
60   /*!
61   \brief Get context group (context root)
62   \return Context root
63   */
64   CContextGroup* CContext::getRoot(void)
65   {
66      if (root.get()==NULL) root=shared_ptr<CContextGroup>(new CContextGroup(xml::CXMLNode::GetRootName()));
67      return root.get();
68   }
69
70   //----------------------------------------------------------------
71
72   /*!
73   \brief Get calendar of a context
74   \return Calendar
75   */
76   boost::shared_ptr<CCalendar> CContext::getCalendar(void) const
77   {
78      return (this->calendar);
79   }
80
81   //----------------------------------------------------------------
82
83   /*!
84   \brief Set a context with a calendar
85   \param[in] newCalendar new calendar
86   */
87   void CContext::setCalendar(boost::shared_ptr<CCalendar> newCalendar)
88   {
89      this->calendar = newCalendar;
90   }
91
92   //----------------------------------------------------------------
93   /*!
94   \brief Parse xml file and write information into context object
95   \param [in] node xmld node corresponding in xml file
96   */
97   void CContext::parse(xml::CXMLNode & node)
98   {
99      CContext::SuperClass::parse(node);
100
101      // PARSING POUR GESTION DES ENFANTS
102      xml::THashAttributes attributes = node.getAttributes();
103
104      if (attributes.end() != attributes.find("src"))
105      {
106         StdIFStream ifs ( attributes["src"].c_str() , StdIFStream::in );
107         if ( (ifs.rdstate() & std::ifstream::failbit ) != 0 )
108            ERROR("void CContext::parse(xml::CXMLNode & node)",
109                  <<endl<< "Can not open <"<<attributes["src"].c_str()<<"> file" );
110         if (!ifs.good())
111            ERROR("CContext::parse(xml::CXMLNode & node)",
112                  << "[ filename = " << attributes["src"] << " ] Bad xml stream !");
113         xml::CXMLParser::ParseInclude(ifs, attributes["src"], *this);
114      }
115
116      if (node.getElementName().compare(CContext::GetName()))
117         DEBUG("Le noeud is wrong defined but will be considered as a context !");
118
119      if (!(node.goToChildElement()))
120      {
121         DEBUG("Le context ne contient pas d'enfant !");
122      }
123      else
124      {
125         do { // Parcours des contextes pour traitement.
126
127            StdString name = node.getElementName();
128            attributes.clear();
129            attributes = node.getAttributes();
130
131            if (attributes.end() != attributes.find("id"))
132            {
133              DEBUG(<< "Definition node has an id,"
134                    << "it will not be taking account !");
135            }
136
137#define DECLARE_NODE(Name_, name_)    \
138   if (name.compare(C##Name_##Definition::GetDefName()) == 0) \
139   { C##Name_##Definition::create(C##Name_##Definition::GetDefName()) -> parse(node); continue; }
140#define DECLARE_NODE_PAR(Name_, name_)
141#include "node_type.conf"
142
143            DEBUG(<< "The element \'"     << name
144                  << "\' in the context \'" << CContext::getCurrent()->getId()
145                  << "\' is not a definition !");
146
147         } while (node.goToNextElement());
148
149         node.goToParentElement(); // Retour au parent
150      }
151   }
152
153   //----------------------------------------------------------------
154   //! Show tree structure of context
155   void CContext::ShowTree(StdOStream & out)
156   {
157      StdString currentContextId = CContext::getCurrent() -> getId();
158      std::vector<CContext*> def_vector =
159         CContext::getRoot()->getChildList();
160      std::vector<CContext*>::iterator
161         it = def_vector.begin(), end = def_vector.end();
162
163      out << "<? xml version=\"1.0\" ?>" << std::endl;
164      out << "<"  << xml::CXMLNode::GetRootName() << " >" << std::endl;
165
166      for (; it != end; it++)
167      {
168         CContext* context = *it;
169         CContext::setCurrent(context->getId());
170         out << *context << std::endl;
171      }
172
173      out << "</" << xml::CXMLNode::GetRootName() << " >" << std::endl;
174      CContext::setCurrent(currentContextId);
175   }
176
177
178   //----------------------------------------------------------------
179
180   //! Convert context object into string (to print)
181   StdString CContext::toString(void) const
182   {
183      StdOStringStream oss;
184      oss << "<" << CContext::GetName()
185          << " id=\"" << this->getId() << "\" "
186          << SuperClassAttribute::toString() << ">" << std::endl;
187      if (!this->hasChild())
188      {
189         //oss << "<!-- No definition -->" << std::endl; // fait planter l'incrémentation
190      }
191      else
192      {
193
194#define DECLARE_NODE(Name_, name_)    \
195   if (C##Name_##Definition::has(C##Name_##Definition::GetDefName())) \
196   oss << * C##Name_##Definition::get(C##Name_##Definition::GetDefName()) << std::endl;
197#define DECLARE_NODE_PAR(Name_, name_)
198#include "node_type.conf"
199
200      }
201
202      oss << "</" << CContext::GetName() << " >";
203
204      return (oss.str());
205   }
206
207   //----------------------------------------------------------------
208
209   /*!
210   \brief Find all inheritace among objects in a context.
211   \param [in] apply (true) write attributes of parent into ones of child if they are empty
212                     (false) write attributes of parent into a new container of child
213   \param [in] parent unused
214   */
215   void CContext::solveDescInheritance(bool apply, const CAttributeMap * const UNUSED(parent))
216   {
217#define DECLARE_NODE(Name_, name_)    \
218   if (C##Name_##Definition::has(C##Name_##Definition::GetDefName())) \
219     C##Name_##Definition::get(C##Name_##Definition::GetDefName())->solveDescInheritance(apply);
220#define DECLARE_NODE_PAR(Name_, name_)
221#include "node_type.conf"
222   }
223
224   //----------------------------------------------------------------
225
226   //! Verify if all root definition in the context have child.
227   bool CContext::hasChild(void) const
228   {
229      return (
230#define DECLARE_NODE(Name_, name_)    \
231   C##Name_##Definition::has(C##Name_##Definition::GetDefName())   ||
232#define DECLARE_NODE_PAR(Name_, name_)
233#include "node_type.conf"
234      false);
235}
236
237   //----------------------------------------------------------------
238
239   void CContext::CleanTree(void)
240   {
241#define DECLARE_NODE(Name_, name_) C##Name_##Definition::ClearAllAttributes();
242#define DECLARE_NODE_PAR(Name_, name_)
243#include "node_type.conf"
244   }
245   ///---------------------------------------------------------------
246
247   //! Initialize client side
248   void CContext::initClient(MPI_Comm intraComm, MPI_Comm interComm, CContext* cxtServer /*= 0*/)
249   {
250
251     hasClient = true;
252     MPI_Comm intraCommServer, interCommServer;
253
254     if (CServer::serverLevel != 1)
255      // initClient is called by client
256     {
257       client = new CContextClient(this, intraComm, interComm, cxtServer);
258       if (cxtServer) // Attached mode
259       {
260         intraCommServer = intraComm;
261         interCommServer = interComm;
262       }
263       else
264       {
265         MPI_Comm_dup(intraComm, &intraCommServer);
266         comms.push_back(intraCommServer);
267         MPI_Comm_dup(interComm, &interCommServer);
268         comms.push_back(interCommServer);
269       }
270       registryIn=new CRegistry(intraComm);
271       registryIn->setPath(getId()) ;
272       if (client->clientRank==0) registryIn->fromFile("xios_registry.bin") ;
273       registryIn->bcastRegistry() ;
274       registryOut=new CRegistry(intraComm) ;
275       registryOut->setPath(getId()) ;
276
277       server = new CContextServer(this, intraCommServer, interCommServer);
278     }
279     else
280     // initClient is called by primary server
281     {
282       clientPrimServer.push_back(new CContextClient(this, intraComm, interComm));
283       MPI_Comm_dup(intraComm, &intraCommServer);
284       comms.push_back(intraCommServer);
285       MPI_Comm_dup(interComm, &interCommServer);
286       comms.push_back(interCommServer);
287       serverPrimServer.push_back(new CContextServer(this, intraCommServer, interCommServer));
288     }
289   }
290
291   void CContext::setClientServerBuffer(CContextClient* contextClient)
292   {
293      // Estimated minimum event size for small events (10 is an arbitrary constant just for safety)
294      const size_t minEventSize = CEventClient::headerSize + getIdServer().size() + 10 * sizeof(int);
295
296      // Ensure there is at least some room for 20 of such events in the buffers
297      size_t minBufferSize = std::max(CXios::minBufferSize, 20 * minEventSize);
298
299#define DECLARE_NODE(Name_, name_)    \
300     if (minBufferSize < sizeof(C##Name_##Definition)) minBufferSize = sizeof(C##Name_##Definition);
301#define DECLARE_NODE_PAR(Name_, name_)
302#include "node_type.conf"
303#undef DECLARE_NODE
304#undef DECLARE_NODE_PAR
305
306     // Compute the buffer sizes needed to send the attributes and data corresponding to fields
307     std::map<int, StdSize> maxEventSize;
308     std::map<int, StdSize> bufferSize = getAttributesBufferSize(maxEventSize, contextClient);
309     std::map<int, StdSize> dataBufferSize = getDataBufferSize(maxEventSize, contextClient);
310
311     std::map<int, StdSize>::iterator it, ite = dataBufferSize.end();
312     for (it = dataBufferSize.begin(); it != ite; ++it)
313       if (it->second > bufferSize[it->first]) bufferSize[it->first] = it->second;
314
315     // Apply the buffer size factor, check that we are above the minimum buffer size and below the maximum size
316     ite = bufferSize.end();
317     for (it = bufferSize.begin(); it != ite; ++it)
318     {
319       it->second *= CXios::bufferSizeFactor;
320       if (it->second < minBufferSize) it->second = minBufferSize;
321       if (it->second > CXios::maxBufferSize) it->second = CXios::maxBufferSize;
322     }
323
324     // Leaders will have to send some control events so ensure there is some room for those in the buffers
325     if (contextClient->isServerLeader())
326     {
327       const std::list<int>& ranks = contextClient->getRanksServerLeader();
328       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
329       {
330         if (!bufferSize.count(*itRank))
331         {
332           bufferSize[*itRank] = minBufferSize;
333           maxEventSize[*itRank] = minEventSize;
334         }
335       }
336     }
337     contextClient->setBufferSize(bufferSize, maxEventSize);
338
339   }
340
341   //! Verify whether a context is initialized
342   bool CContext::isInitialized(void)
343   {
344     return hasClient;
345   }
346
347   void CContext::initServer(MPI_Comm intraComm, MPI_Comm interComm, CContext* cxtClient /*= 0*/)
348   {
349     hasServer=true;
350     server = new CContextServer(this,intraComm,interComm);
351
352     registryIn=new CRegistry(intraComm);
353     registryIn->setPath(getId()) ;
354     if (server->intraCommRank==0) registryIn->fromFile("xios_registry.bin") ;
355     registryIn->bcastRegistry() ;
356     registryOut=new CRegistry(intraComm) ;
357     registryOut->setPath(getId()) ;
358
359     MPI_Comm intraCommClient, interCommClient;
360     if (cxtClient) // Attached mode
361     {
362       intraCommClient = intraComm;
363       interCommClient = interComm;
364     }
365     else
366     {
367       MPI_Comm_dup(intraComm, &intraCommClient);
368       comms.push_back(intraCommClient);
369       MPI_Comm_dup(interComm, &interCommClient);
370       comms.push_back(interCommClient);
371     }
372     client = new CContextClient(this,intraCommClient,interCommClient, cxtClient);
373   }
374
375   //! Try to send the buffers and receive possible answers
376  bool CContext::checkBuffersAndListen(void)
377  {
378    bool clientReady, serverFinished;
379
380    // Only classical servers are non-blocking
381    if (CServer::serverLevel == 0)
382    {
383      client->checkBuffers();
384      bool hasTmpBufferedEvent = client->hasTemporarilyBufferedEvent();
385      if (hasTmpBufferedEvent)
386        hasTmpBufferedEvent = !client->sendTemporarilyBufferedEvent();
387      // Don't process events if there is a temporarily buffered event
388      return server->eventLoop(!hasTmpBufferedEvent);
389    }
390    else if (CServer::serverLevel == 1)
391    {
392      if (!finalized)
393        client->checkBuffers();
394      bool serverFinished = true;
395      if (!finalized)
396        serverFinished = server->eventLoop();
397      bool serverPrimFinished = true;
398      for (int i = 0; i < clientPrimServer.size(); ++i)
399      {
400        if (!finalized)
401          clientPrimServer[i]->checkBuffers();
402        if (!finalized)
403          serverPrimFinished *= serverPrimServer[i]->eventLoop();
404      }
405      return ( serverFinished && serverPrimFinished);
406    }
407
408    else if (CServer::serverLevel == 2)
409    {
410      client->checkBuffers();
411      return server->eventLoop();
412    }
413  }
414
415   //! Terminate a context
416   void CContext::finalize(void)
417   {
418     // Send registry upon calling the function the first time
419     if (countChildCtx_ == 0)
420       if (hasClient) sendRegistry() ;
421
422     // Client:
423     // (1) blocking send context finalize to its server
424     // (2) blocking receive context finalize from its server
425     // (3) some memory deallocations
426     if (CXios::isClient)
427     {
428       // Make sure that client (model) enters the loop only once
429       if (countChildCtx_ < 1)
430       {
431         ++countChildCtx_;
432
433         client->finalize();
434         while (client->havePendingRequests())
435            client->checkBuffers();
436
437         while (!server->hasFinished())
438           server->eventLoop();
439
440         if (hasServer) // Mode attache
441         {
442           closeAllFile();
443           registryOut->hierarchicalGatherRegistry() ;
444           if (server->intraCommRank==0) CXios::globalRegistry->mergeRegistry(*registryOut) ;
445         }
446
447         //! Deallocate client buffers
448         client->releaseBuffers();
449
450         //! Free internally allocated communicators
451         for (std::list<MPI_Comm>::iterator it = comms.begin(); it != comms.end(); ++it)
452           MPI_Comm_free(&(*it));
453         comms.clear();
454
455         info(20)<<"CContext: Context <"<<getId()<<"> is finalized."<<endl;
456       }
457     }
458     else if (CXios::isServer)
459     {
460       // First context finalize message received from a model
461       // Send context finalize to its child contexts (if any)
462       if (countChildCtx_ == 0)
463         for (int i = 0; i < clientPrimServer.size(); ++i)
464           clientPrimServer[i]->finalize();
465
466       // (Last) context finalized message received
467       if (countChildCtx_ == clientPrimServer.size())
468       {
469         // Blocking send of context finalize message to its client (e.g. primary server or model)
470         client->finalize();
471         bool bufferReleased;
472         do
473         {
474           client->checkBuffers();
475           bufferReleased = !client->havePendingRequests();
476         } while (!bufferReleased);
477         finalized = true;
478
479         closeAllFile(); // Just move to here to make sure that server-level 1 can close files
480         if (hasServer && !hasClient)
481         {           
482           registryOut->hierarchicalGatherRegistry() ;
483           if (server->intraCommRank==0) CXios::globalRegistry->mergeRegistry(*registryOut) ;
484         }
485
486         //! Deallocate client buffers
487         client->releaseBuffers();
488         for (int i = 0; i < clientPrimServer.size(); ++i)
489           clientPrimServer[i]->releaseBuffers();
490
491         //! Free internally allocated communicators
492         for (std::list<MPI_Comm>::iterator it = comms.begin(); it != comms.end(); ++it)
493           MPI_Comm_free(&(*it));
494         comms.clear();
495
496         info(20)<<"CContext: Context <"<<getId()<<"> is finalized."<<endl;
497       }
498
499       ++countChildCtx_;
500     }
501   }
502
503   //! Free internally allocated communicators
504   void CContext::freeComms(void)
505   {
506     for (std::list<MPI_Comm>::iterator it = comms.begin(); it != comms.end(); ++it)
507       MPI_Comm_free(&(*it));
508     comms.clear();
509   }
510
511   //! Deallocate buffers allocated by clientContexts
512   void CContext::releaseClientBuffers(void)
513   {
514     client->releaseBuffers();
515     for (int i = 0; i < clientPrimServer.size(); ++i)
516       clientPrimServer[i]->releaseBuffers();
517   }
518
519   void CContext::postProcessingGlobalAttributes()
520   {
521     if (allProcessed) return; 
522     
523     // After xml is parsed, there are some more works with post processing
524     postProcessing();
525
526     // Check grid and calculate its distribution
527     checkGridEnabledFields();
528 
529     // Distribute files between secondary servers according to the data size
530     distributeFiles();
531
532     // Buffer for primary server for connection to client will be allocated by default (to min size)
533     if (CServer::serverLevel != 1)
534       setClientServerBuffer(client);
535     if (hasServer)
536       for (int i = 0; i < clientPrimServer.size(); ++i)
537         setClientServerBuffer(clientPrimServer[i]);
538
539
540     if (hasClient)
541     {
542      // Send all attributes of current context to server
543      this->sendAllAttributesToServer();
544
545      // Send all attributes of current calendar
546      CCalendarWrapper::get(CCalendarWrapper::GetDefName())->sendAllAttributesToServer();
547
548      // We have enough information to send to server
549      // First of all, send all enabled files
550      sendEnabledFiles(this->enabledWriteModeFiles);
551      // We only use server-level 1 (for now) to read data
552      if (!hasServer)
553        sendEnabledFiles(this->enabledReadModeFiles);
554
555      // Then, send all enabled fields     
556      sendEnabledFieldsInFiles(this->enabledWriteModeFiles);
557      if (!hasServer)
558        sendEnabledFieldsInFiles(this->enabledReadModeFiles);
559
560      // Then, check whether we have domain_ref, axis_ref or scalar_ref attached to the enabled fields
561      // If any, so send them to server
562       sendRefDomainsAxisScalars(this->enabledWriteModeFiles);
563      if (!hasServer)
564        sendRefDomainsAxisScalars(this->enabledReadModeFiles);       
565
566       // Check whether enabled fields have grid_ref, if any, send this info to server
567      sendRefGrid(this->enabledFiles);
568      // This code may be useful in the future when we want to seperate completely read and write
569      // sendRefGrid(this->enabledWriteModeFiles);
570      // if (!hasServer)
571      //   sendRefGrid(this->enabledReadModeFiles);
572     
573      // A grid of enabled fields composed of several components which must be checked then their
574      // checked attributes should be sent to server
575      sendGridComponentEnabledFieldsInFiles(this->enabledFiles); // This code can be seperated in two (one for reading, another for writing)
576
577       // We have a xml tree on the server side and now, it should be also processed
578      sendPostProcessing();
579       
580      // Finally, we send information of grid itself to server
581      sendGridEnabledFieldsInFiles(this->enabledWriteModeFiles);       
582      if (!hasServer)
583        sendGridEnabledFieldsInFiles(this->enabledReadModeFiles);       
584     }
585     allProcessed = true;
586   }
587
588   void CContext::sendPostProcessingGlobalAttributes()
589   {
590      // Use correct context client to send message
591     // int nbSrvPools = (hasServer) ? clientPrimServer.size() : 1;
592    int nbSrvPools = (this->hasServer) ? (this->hasClient ? this->clientPrimServer.size() : 0) : 1;
593     for (int i = 0; i < nbSrvPools; ++i)
594     {
595       CContextClient* contextClientTmp = (0 != clientPrimServer.size()) ? clientPrimServer[i] : client;
596       CEventClient event(getType(),EVENT_ID_POST_PROCESS_GLOBAL_ATTRIBUTES);
597
598       if (contextClientTmp->isServerLeader())
599       {
600         CMessage msg;
601         if (hasServer)
602           msg<<this->getIdServer(i);
603         else
604           msg<<this->getIdServer();
605         const std::list<int>& ranks = contextClientTmp->getRanksServerLeader();
606         for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
607           event.push(*itRank,1,msg);
608         contextClientTmp->sendEvent(event);
609       }
610       else contextClientTmp->sendEvent(event);
611     }
612   }
613
614   void CContext::recvPostProcessingGlobalAttributes(CEventServer& event)
615   {
616      CBufferIn* buffer=event.subEvents.begin()->buffer;
617      string id;
618      *buffer>>id;
619      get(id)->recvPostProcessingGlobalAttributes(*buffer);
620   }
621
622   void CContext::recvPostProcessingGlobalAttributes(CBufferIn& buffer)
623   {     
624      postProcessingGlobalAttributes();
625   }
626
627   /*!
628   \brief Close all the context defintion and do processing data
629      After everything is well defined on client side, they will be processed and sent to server
630   From the version 2.0, sever and client work no more on the same database. Moreover, client(s) will send
631   all necessary information to server, from which each server can build its own database.
632   Because the role of server is to write out field data on a specific netcdf file,
633   the only information that it needs is the enabled files
634   and the active fields (fields will be written onto active files)
635   */
636   void CContext::closeDefinition(void)
637   {
638    CTimer::get("Context : close definition").resume() ;
639    postProcessingGlobalAttributes();
640
641    if (hasClient) sendPostProcessingGlobalAttributes();
642
643    // There are some processings that should be done after all of above. For example: check mask or index
644    this->buildFilterGraphOfEnabledFields();
645   
646     if (hasClient && !hasServer)
647    {
648      buildFilterGraphOfFieldsWithReadAccess();
649    }
650   
651    checkGridEnabledFields();   
652
653    if (hasClient) this->sendProcessingGridOfEnabledFields();
654    if (hasClient) this->sendCloseDefinition();
655
656    // Nettoyage de l'arborescence
657    if (hasClient) CleanTree(); // Only on client side??
658
659    if (hasClient)
660    {
661      sendCreateFileHeader();
662      if (!hasServer) startPrefetchingOfEnabledReadModeFiles();
663    }
664    CTimer::get("Context : close definition").suspend() ;
665   }
666
667   void CContext::findAllEnabledFieldsInFiles(const std::vector<CFile*>& activeFiles)
668   {
669     for (unsigned int i = 0; i < activeFiles.size(); i++)
670     (void)activeFiles[i]->getEnabledFields();
671   }
672
673   void CContext::readAttributesOfEnabledFieldsInReadModeFiles()
674   {
675      for (unsigned int i = 0; i < this->enabledReadModeFiles.size(); ++i)
676        (void)this->enabledReadModeFiles[i]->readAttributesOfEnabledFieldsInReadMode();
677   }
678
679   void CContext::sendGridComponentEnabledFieldsInFiles(const std::vector<CFile*>& activeFiles)
680   {
681     int size = activeFiles.size();
682     for (int i = 0; i < size; ++i)
683     {       
684       activeFiles[i]->sendGridComponentOfEnabledFields();
685     }
686   }
687
688   /*!
689      Send active (enabled) fields in file from a client to others
690      \param [in] activeFiles files contains enabled fields to send
691   */
692   void CContext::sendGridEnabledFieldsInFiles(const std::vector<CFile*>& activeFiles)
693   {
694     int size = activeFiles.size();
695     for (int i = 0; i < size; ++i)
696     {       
697       activeFiles[i]->sendGridOfEnabledFields();
698     }
699   }
700
701   void CContext::checkGridEnabledFields()
702   {
703     int size = enabledFiles.size();
704     for (int i = 0; i < size; ++i)
705     {
706       enabledFiles[i]->checkGridOfEnabledFields();       
707     }
708   }
709
710   /*!
711      Check grid of active (enabled) fields in file
712      \param [in] activeFiles files contains enabled fields whose grid needs checking
713   */
714   void CContext::checkGridEnabledFieldsInFiles(const std::vector<CFile*>& activeFiles)
715   {
716     int size = activeFiles.size();
717     for (int i = 0; i < size; ++i)
718     {
719       activeFiles[i]->checkGridOfEnabledFields();       
720     }
721   }
722
723    /*!
724      Go up the hierachical tree via field_ref and do check of attributes of fields
725      This can be done in a client then all computed information will be sent from this client to others
726      \param [in] sendToServer Flag to indicate whether calculated information will be sent
727   */
728   void CContext::solveOnlyRefOfEnabledFields(bool sendToServer)
729   {
730     int size = this->enabledFiles.size();
731     for (int i = 0; i < size; ++i)
732     {
733       this->enabledFiles[i]->solveOnlyRefOfEnabledFields(sendToServer);
734     }
735
736     for (int i = 0; i < size; ++i)
737     {
738       this->enabledFiles[i]->generateNewTransformationGridDest();
739     }
740   }
741
742    /*!
743      Go up the hierachical tree via field_ref and do check of attributes of fields.
744      The transformation can be done in this step.
745      All computed information will be sent from this client to others.
746      \param [in] sendToServer Flag to indicate whether calculated information will be sent
747   */
748   void CContext::solveAllRefOfEnabledFieldsAndTransform(bool sendToServer)
749   {
750     int size = this->enabledFiles.size();
751     for (int i = 0; i < size; ++i)
752     {
753       this->enabledFiles[i]->solveAllRefOfEnabledFieldsAndTransform(sendToServer);
754     }
755   }
756
757   void CContext::buildFilterGraphOfEnabledFields()
758   {
759     int size = this->enabledFiles.size();
760     for (int i = 0; i < size; ++i)
761     {
762       this->enabledFiles[i]->buildFilterGraphOfEnabledFields(garbageCollector);
763     }
764   }
765
766   void CContext::startPrefetchingOfEnabledReadModeFiles()
767   {
768     int size = enabledReadModeFiles.size();
769     for (int i = 0; i < size; ++i)
770     {
771        enabledReadModeFiles[i]->prefetchEnabledReadModeFields();
772     }
773   }
774
775   void CContext::checkPrefetchingOfEnabledReadModeFiles()
776   {
777     int size = enabledReadModeFiles.size();
778     for (int i = 0; i < size; ++i)
779     {
780        enabledReadModeFiles[i]->prefetchEnabledReadModeFieldsIfNeeded();
781     }
782   }
783
784  void CContext::findFieldsWithReadAccess(void)
785  {
786    fieldsWithReadAccess.clear();
787    const vector<CField*> allFields = CField::getAll();
788    for (size_t i = 0; i < allFields.size(); ++i)
789    {
790      CField* field = allFields[i];
791
792      if (field->file && !field->file->mode.isEmpty() && field->file->mode == CFile::mode_attr::read)
793        field->read_access = true;
794      else if (!field->read_access.isEmpty() && field->read_access && (field->enabled.isEmpty() || field->enabled))
795        fieldsWithReadAccess.push_back(field);
796    }
797  }
798
799  void CContext::solveAllRefOfFieldsWithReadAccess()
800  {
801    for (size_t i = 0; i < fieldsWithReadAccess.size(); ++i)
802      fieldsWithReadAccess[i]->solveAllReferenceEnabledField(false);
803  }
804
805  void CContext::buildFilterGraphOfFieldsWithReadAccess()
806  {
807    for (size_t i = 0; i < fieldsWithReadAccess.size(); ++i)
808      fieldsWithReadAccess[i]->buildFilterGraph(garbageCollector, true);
809  }
810
811   void CContext::solveAllInheritance(bool apply)
812   {
813     // Résolution des héritages descendants (càd des héritages de groupes)
814     // pour chacun des contextes.
815      solveDescInheritance(apply);
816
817     // Résolution des héritages par référence au niveau des fichiers.
818      const vector<CFile*> allFiles=CFile::getAll();
819      const vector<CGrid*> allGrids= CGrid::getAll();
820
821      if (hasClient && !hasServer)
822      //if (hasClient)
823      {
824        for (unsigned int i = 0; i < allFiles.size(); i++)
825          allFiles[i]->solveFieldRefInheritance(apply);
826      }
827
828      unsigned int vecSize = allGrids.size();
829      unsigned int i = 0;
830      for (i = 0; i < vecSize; ++i)
831        allGrids[i]->solveDomainAxisRefInheritance(apply);
832
833   }
834
835   void CContext::findEnabledFiles(void)
836   {
837      const std::vector<CFile*> allFiles = CFile::getAll();
838      const CDate& initDate = calendar->getInitDate();
839
840      for (unsigned int i = 0; i < allFiles.size(); i++)
841         if (!allFiles[i]->enabled.isEmpty()) // Si l'attribut 'enabled' est défini.
842         {
843            if (allFiles[i]->enabled.getValue()) // Si l'attribut 'enabled' est fixé à vrai.
844            {
845              if ((initDate + allFiles[i]->output_freq.getValue()) < (initDate + this->getCalendar()->getTimeStep()))
846              {
847                error(0)<<"WARNING: void CContext::findEnabledFiles()"<<endl
848                    << "Output frequency in file \""<<allFiles[i]->getFileOutputName()
849                    <<"\" is less than the time step. File will not be written."<<endl;
850              }
851              else
852               enabledFiles.push_back(allFiles[i]);
853            }
854         }
855         else
856         {
857           if ( (initDate + allFiles[i]->output_freq.getValue()) < (initDate + this->getCalendar()->getTimeStep()))
858           {
859             error(0)<<"WARNING: void CContext::findEnabledFiles()"<<endl
860                 << "Output frequency in file \""<<allFiles[i]->getFileOutputName()
861                 <<"\" is less than the time step. File will not be written."<<endl;
862           }
863           else
864             enabledFiles.push_back(allFiles[i]); // otherwise true by default
865         }
866
867      if (enabledFiles.size() == 0)
868         DEBUG(<<"Aucun fichier ne va être sorti dans le contexte nommé \""
869               << getId() << "\" !");
870
871   }
872
873   void CContext::distributeFiles(void)
874   {
875     double eps=std::numeric_limits<double>::epsilon()*10 ;
876     
877     // If primary server
878     if (hasServer && hasClient)
879     {
880       int nbPools = clientPrimServer.size();
881
882       // (1) Find all enabled files in write mode
883       // for (int i = 0; i < this->enabledFiles.size(); ++i)
884       // {
885       //   if (enabledFiles[i]->mode.isEmpty() || (!enabledFiles[i]->mode.isEmpty() && enabledFiles[i]->mode.getValue() == CFile::mode_attr::write ))
886       //    enabledWriteModeFiles.push_back(enabledFiles[i]);
887       // }
888
889       // (2) Estimate the data volume for each file
890       int size = this->enabledWriteModeFiles.size();
891       std::vector<std::pair<double, CFile*> > dataSizeMap;
892       double dataPerPool = 0;
893       int nfield=0 ;
894       for (size_t i = 0; i < size; ++i)
895       {
896         CFile* file = this->enabledWriteModeFiles[i];
897         StdSize dataSize=0;
898         std::vector<CField*> enabledFields = file->getEnabledFields();
899         size_t numEnabledFields = enabledFields.size();
900         for (size_t j = 0; j < numEnabledFields; ++j) dataSize += enabledFields[j]->getGlobalWrittenSize() ;
901
902         double outFreqSec = (Time)(calendar->getCurrentDate()+file->output_freq)-(Time)(calendar->getCurrentDate()) ;
903         double dataSizeSec= dataSize/ outFreqSec;
904         nfield++ ;
905// add epsilon*nField to dataSizeSec in order to  preserve reproductive ordering when sorting
906         dataSizeMap.push_back(make_pair(dataSizeSec + dataSizeSec * eps * nfield , file));
907         dataPerPool += dataSizeSec;
908       }
909       dataPerPool /= nbPools;
910       std::sort(dataSizeMap.begin(), dataSizeMap.end());
911
912       // (3) Assign contextClient to each enabled file
913
914       std::multimap<double,int> poolDataSize ;
915// multimap is not garanty to preserve stable sorting in c++98 but it seems it does for c++11
916
917       int j;
918       double dataSize ;
919       for (j = 0 ; j < nbPools ; ++j) poolDataSize.insert(std::pair<double,int>(0.,j)) ; 
920             
921       for (int i = dataSizeMap.size()-1; i >= 0; --i)
922       {
923         dataSize=(*poolDataSize.begin()).first ;
924         j=(*poolDataSize.begin()).second ;
925         dataSizeMap[i].second->setContextClient(clientPrimServer[j]);
926         dataSize+=dataSizeMap[i].first;
927         poolDataSize.erase(poolDataSize.begin()) ;
928         poolDataSize.insert(std::pair<double,int>(dataSize,j)) ; 
929       }
930
931       for (std::multimap<double,int>:: iterator it=poolDataSize.begin() ; it!=poolDataSize.end(); ++it) info(30)<<"Load Balancing for servers (perfect=1) : "<<it->second<<" :  ratio "<<it->first*1./dataPerPool<<endl ;
932 
933       for (int i = 0; i < this->enabledReadModeFiles.size(); ++i)
934       {
935         enabledReadModeFiles[i]->setContextClient(client);         
936       }
937     }
938     else
939     {
940       for (int i = 0; i < this->enabledFiles.size(); ++i)
941         enabledFiles[i]->setContextClient(client);
942     }
943   }
944
945   /*!
946      Find all files in write mode
947   */
948   void CContext::findEnabledWriteModeFiles(void)
949   {
950     int size = this->enabledFiles.size();
951     for (int i = 0; i < size; ++i)
952     {
953       if (enabledFiles[i]->mode.isEmpty() || 
954          (!enabledFiles[i]->mode.isEmpty() && enabledFiles[i]->mode.getValue() == CFile::mode_attr::write ))
955        enabledWriteModeFiles.push_back(enabledFiles[i]);
956     }
957   }
958
959   /*!
960      Find all files in read mode
961   */
962   void CContext::findEnabledReadModeFiles(void)
963   {
964     int size = this->enabledFiles.size();
965     for (int i = 0; i < size; ++i)
966     {
967       if (!enabledFiles[i]->mode.isEmpty() && enabledFiles[i]->mode.getValue() == CFile::mode_attr::read)
968        enabledReadModeFiles.push_back(enabledFiles[i]);
969     }
970   }
971
972   void CContext::closeAllFile(void)
973   {
974     std::vector<CFile*>::const_iterator
975            it = this->enabledFiles.begin(), end = this->enabledFiles.end();
976
977     for (; it != end; it++)
978     {
979       info(30)<<"Closing File : "<<(*it)->getId()<<endl;
980       (*it)->close();
981     }
982   }
983
984   /*!
985   \brief Dispatch event received from client
986      Whenever a message is received in buffer of server, it will be processed depending on
987   its event type. A new event type should be added in the switch list to make sure
988   it processed on server side.
989   \param [in] event: Received message
990   */
991   bool CContext::dispatchEvent(CEventServer& event)
992   {
993
994      if (SuperClass::dispatchEvent(event)) return true;
995      else
996      {
997        switch(event.type)
998        {
999           case EVENT_ID_CLOSE_DEFINITION :
1000             recvCloseDefinition(event);
1001             return true;
1002             break;
1003           case EVENT_ID_UPDATE_CALENDAR:
1004             recvUpdateCalendar(event);
1005             return true;
1006             break;
1007           case EVENT_ID_CREATE_FILE_HEADER :
1008             recvCreateFileHeader(event);
1009             return true;
1010             break;
1011           case EVENT_ID_POST_PROCESS:
1012             recvPostProcessing(event);
1013             return true;
1014            case EVENT_ID_SEND_REGISTRY:
1015             recvRegistry(event);
1016             return true;
1017             break;
1018            case EVENT_ID_POST_PROCESS_GLOBAL_ATTRIBUTES:
1019             recvPostProcessingGlobalAttributes(event);
1020             return true;
1021             break;
1022            case EVENT_ID_PROCESS_GRID_ENABLED_FIELDS:
1023             recvProcessingGridOfEnabledFields(event);
1024             return true;
1025             break;
1026           default :
1027             ERROR("bool CContext::dispatchEvent(CEventServer& event)",
1028                    <<"Unknown Event");
1029           return false;
1030         }
1031      }
1032   }
1033
1034   //! Client side: Send a message to server to make it close
1035   void CContext::sendCloseDefinition(void)
1036   {
1037     // Use correct context client to send message
1038     int nbSrvPools = (this->hasServer) ? (this->hasClient ? this->clientPrimServer.size() : 0) : 1;
1039     for (int i = 0; i < nbSrvPools; ++i)
1040     {
1041       CContextClient* contextClientTmp = (hasServer) ? clientPrimServer[i] : client;
1042       CEventClient event(getType(),EVENT_ID_CLOSE_DEFINITION);
1043       if (contextClientTmp->isServerLeader())
1044       {
1045         CMessage msg;
1046         if (hasServer)
1047           msg<<this->getIdServer(i);
1048         else
1049           msg<<this->getIdServer();
1050         const std::list<int>& ranks = contextClientTmp->getRanksServerLeader();
1051         for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
1052           event.push(*itRank,1,msg);
1053         contextClientTmp->sendEvent(event);
1054       }
1055       else contextClientTmp->sendEvent(event);
1056     }
1057   }
1058
1059   //! Server side: Receive a message of client announcing a context close
1060   void CContext::recvCloseDefinition(CEventServer& event)
1061   {
1062      CBufferIn* buffer=event.subEvents.begin()->buffer;
1063      string id;
1064      *buffer>>id;
1065      get(id)->closeDefinition();
1066   }
1067
1068   //! Client side: Send a message to update calendar in each time step
1069   void CContext::sendUpdateCalendar(int step)
1070   {
1071     // Use correct context client to send message
1072    int nbSrvPools = (this->hasServer) ? (this->hasClient ? this->clientPrimServer.size() : 0) : 1;
1073     for (int i = 0; i < nbSrvPools; ++i)
1074     {
1075       CContextClient* contextClientTmp = (hasServer) ? clientPrimServer[i] : client;
1076       CEventClient event(getType(),EVENT_ID_UPDATE_CALENDAR);
1077
1078         if (contextClientTmp->isServerLeader())
1079         {
1080           CMessage msg;
1081           if (hasServer)
1082             msg<<this->getIdServer(i)<<step;
1083           else
1084             msg<<this->getIdServer()<<step;
1085           const std::list<int>& ranks = contextClientTmp->getRanksServerLeader();
1086           for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
1087             event.push(*itRank,1,msg);
1088           contextClientTmp->sendEvent(event);
1089         }
1090         else contextClientTmp->sendEvent(event);
1091     }
1092   }
1093
1094   //! Server side: Receive a message of client annoucing calendar update
1095   void CContext::recvUpdateCalendar(CEventServer& event)
1096   {
1097      CBufferIn* buffer=event.subEvents.begin()->buffer;
1098      string id;
1099      *buffer>>id;
1100      get(id)->recvUpdateCalendar(*buffer);
1101   }
1102
1103   //! Server side: Receive a message of client annoucing calendar update
1104   void CContext::recvUpdateCalendar(CBufferIn& buffer)
1105   {
1106      int step;
1107      buffer>>step;
1108      updateCalendar(step);
1109      if (hasClient && hasServer)
1110      {       
1111        sendUpdateCalendar(step);
1112      }
1113   }
1114
1115   //! Client side: Send a message to create header part of netcdf file
1116   void CContext::sendCreateFileHeader(void)
1117   {
1118     // Use correct context client to send message
1119     // int nbSrvPools = (hasServer) ? clientPrimServer.size() : 1;
1120     int nbSrvPools = (this->hasServer) ? (this->hasClient ? this->clientPrimServer.size() : 0) : 1;
1121     for (int i = 0; i < nbSrvPools; ++i)
1122     {
1123       CContextClient* contextClientTmp = (hasServer) ? clientPrimServer[i] : client;
1124       CEventClient event(getType(),EVENT_ID_CREATE_FILE_HEADER);
1125
1126       if (contextClientTmp->isServerLeader())
1127       {
1128         CMessage msg;
1129         if (hasServer)
1130           msg<<this->getIdServer(i);
1131         else
1132           msg<<this->getIdServer();
1133         const std::list<int>& ranks = contextClientTmp->getRanksServerLeader();
1134         for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
1135           event.push(*itRank,1,msg) ;
1136         contextClientTmp->sendEvent(event);
1137       }
1138       else contextClientTmp->sendEvent(event);
1139     }
1140   }
1141
1142   //! Server side: Receive a message of client annoucing the creation of header part of netcdf file
1143   void CContext::recvCreateFileHeader(CEventServer& event)
1144   {
1145      CBufferIn* buffer=event.subEvents.begin()->buffer;
1146      string id;
1147      *buffer>>id;
1148      get(id)->recvCreateFileHeader(*buffer);
1149   }
1150
1151   //! Server side: Receive a message of client annoucing the creation of header part of netcdf file
1152   void CContext::recvCreateFileHeader(CBufferIn& buffer)
1153   {
1154      if (!hasClient && hasServer) 
1155        createFileHeader();
1156   }
1157
1158   //! Client side: Send a message to do some post processing on server
1159   void CContext::sendProcessingGridOfEnabledFields()
1160   {
1161      // Use correct context client to send message
1162     int nbSrvPools = (this->hasServer) ? (this->hasClient ? this->clientPrimServer.size() : 0) : 1;
1163     for (int i = 0; i < nbSrvPools; ++i)
1164     {
1165       CContextClient* contextClientTmp = (0 != clientPrimServer.size()) ? clientPrimServer[i] : client;
1166       CEventClient event(getType(),EVENT_ID_PROCESS_GRID_ENABLED_FIELDS);
1167
1168       if (contextClientTmp->isServerLeader())
1169       {
1170         CMessage msg;
1171         if (hasServer)
1172           msg<<this->getIdServer(i);
1173         else
1174           msg<<this->getIdServer();
1175         const std::list<int>& ranks = contextClientTmp->getRanksServerLeader();
1176         for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
1177           event.push(*itRank,1,msg);
1178         contextClientTmp->sendEvent(event);
1179       }
1180       else contextClientTmp->sendEvent(event);
1181     }
1182   }
1183
1184   //! Server side: Receive a message to do some post processing
1185   void CContext::recvProcessingGridOfEnabledFields(CEventServer& event)
1186   {
1187      CBufferIn* buffer=event.subEvents.begin()->buffer;
1188      string id;
1189      *buffer>>id;     
1190   }
1191
1192   //! Client side: Send a message to do some post processing on server
1193   void CContext::sendPostProcessing()
1194   {
1195      // Use correct context client to send message
1196     // int nbSrvPools = (hasServer) ? clientPrimServer.size() : 1;
1197     int nbSrvPools = (this->hasServer) ? (this->hasClient ? this->clientPrimServer.size() : 0) : 1;
1198     for (int i = 0; i < nbSrvPools; ++i)
1199     {
1200       CContextClient* contextClientTmp = (hasServer) ? clientPrimServer[i] : client;
1201       CEventClient event(getType(),EVENT_ID_POST_PROCESS);
1202       if (contextClientTmp->isServerLeader())
1203       {
1204         CMessage msg;
1205         if (hasServer)
1206           msg<<this->getIdServer(i);
1207         else
1208           msg<<this->getIdServer();
1209         const std::list<int>& ranks = contextClientTmp->getRanksServerLeader();
1210         for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
1211         event.push(*itRank,1,msg);
1212         contextClientTmp->sendEvent(event);
1213       }
1214       else contextClientTmp->sendEvent(event);
1215     }
1216   }
1217
1218   //! Server side: Receive a message to do some post processing
1219   void CContext::recvPostProcessing(CEventServer& event)
1220   {
1221      CBufferIn* buffer=event.subEvents.begin()->buffer;
1222      string id;
1223      *buffer>>id;
1224      get(id)->recvPostProcessing(*buffer);
1225   }
1226
1227   //! Server side: Receive a message to do some post processing
1228   void CContext::recvPostProcessing(CBufferIn& buffer)
1229   {
1230      CCalendarWrapper::get(CCalendarWrapper::GetDefName())->createCalendar();
1231      postProcessing();
1232   }
1233
1234   const StdString& CContext::getIdServer()
1235   {
1236      if (hasClient)
1237      {
1238        idServer_ = this->getId();
1239        idServer_ += "_server";
1240        return idServer_;
1241      }
1242      if (hasServer) return (this->getId());
1243   }
1244
1245   const StdString& CContext::getIdServer(const int i)
1246   {
1247     idServer_ = this->getId();
1248     idServer_ += "_server_";
1249     idServer_ += boost::lexical_cast<string>(i);
1250     return idServer_;
1251   }
1252
1253
1254   /*!
1255   \brief Do some simple post processings after parsing xml file
1256      After the xml file (iodef.xml) is parsed, it is necessary to build all relations among
1257   created object, e.g: inhertance among fields, domain, axis. After that, all fiels as well as their parents (reference fields),
1258   which will be written out into netcdf files, are processed
1259   */
1260   void CContext::postProcessing()
1261   {
1262     if (isPostProcessed) return;
1263
1264      // Make sure the calendar was correctly created
1265      if (!calendar)
1266        ERROR("CContext::postProcessing()", << "A calendar must be defined for the context \"" << getId() << "!\"")
1267      else if (calendar->getTimeStep() == NoneDu)
1268        ERROR("CContext::postProcessing()", << "A timestep must be defined for the context \"" << getId() << "!\"")
1269      // Calendar first update to set the current date equals to the start date
1270      calendar->update(0);
1271
1272      // Find all inheritance in xml structure
1273      this->solveAllInheritance();
1274
1275//      ShowTree(info(10));
1276
1277      // Check if some axis, domains or grids are eligible to for compressed indexed output.
1278      // Warning: This must be done after solving the inheritance and before the rest of post-processing
1279      checkAxisDomainsGridsEligibilityForCompressedOutput();     
1280
1281      // Check if some automatic time series should be generated
1282      // Warning: This must be done after solving the inheritance and before the rest of post-processing     
1283
1284      // The timeseries should only be prepared in client
1285      if (hasClient && !hasServer) prepareTimeseries();
1286
1287      //Initialisation du vecteur 'enabledFiles' contenant la liste des fichiers à sortir.
1288      findEnabledFiles();
1289      findEnabledWriteModeFiles();
1290      findEnabledReadModeFiles();
1291
1292      // For now, only read files with client and only one level server
1293      // if (hasClient && !hasServer) findEnabledReadModeFiles();     
1294
1295      // Find all enabled fields of each file     
1296      findAllEnabledFieldsInFiles(this->enabledWriteModeFiles);
1297      findAllEnabledFieldsInFiles(this->enabledReadModeFiles);
1298
1299      // For now, only read files with client and only one level server
1300      // if (hasClient && !hasServer)
1301      //   findAllEnabledFieldsInFiles(this->enabledReadModeFiles);     
1302
1303      if (hasClient && !hasServer)
1304      {
1305        initReadFiles();
1306        // Try to read attributes of fields in file then fill in corresponding grid (or domain, axis)
1307        this->readAttributesOfEnabledFieldsInReadModeFiles();
1308      }
1309
1310      // Only search and rebuild all reference objects of enable fields, don't transform
1311      this->solveOnlyRefOfEnabledFields(false);
1312
1313      // Search and rebuild all reference object of enabled fields, and transform
1314      this->solveAllRefOfEnabledFieldsAndTransform(false);
1315
1316      // Find all fields with read access from the public API
1317      if (hasClient && !hasServer) findFieldsWithReadAccess();
1318      // and solve the all reference for them
1319      if (hasClient && !hasServer) solveAllRefOfFieldsWithReadAccess();
1320
1321      isPostProcessed = true;
1322   }
1323
1324   /*!
1325    * Compute the required buffer size to send the attributes (mostly those grid related).
1326    *
1327    * \param maxEventSize [in/out] the size of the bigger event for each connected server
1328    */
1329   std::map<int, StdSize> CContext::getAttributesBufferSize(std::map<int, StdSize>& maxEventSize, CContextClient* contextClient)
1330   {
1331     std::map<int, StdSize> attributesSize;
1332
1333     if (hasClient)
1334     {
1335       size_t numEnabledFiles = this->enabledWriteModeFiles.size();
1336       for (size_t i = 0; i < numEnabledFiles; ++i)
1337       {
1338         CFile* file = this->enabledWriteModeFiles[i];
1339         // if (file->getContextClient() == contextClient)
1340         {
1341           std::vector<CField*> enabledFields = file->getEnabledFields();
1342           size_t numEnabledFields = enabledFields.size();
1343           for (size_t j = 0; j < numEnabledFields; ++j)
1344           {
1345             const std::map<int, StdSize> mapSize = enabledFields[j]->getGridAttributesBufferSize(contextClient);
1346             std::map<int, StdSize>::const_iterator it = mapSize.begin(), itE = mapSize.end();
1347             for (; it != itE; ++it)
1348             {
1349               // If attributesSize[it->first] does not exist, it will be zero-initialized
1350               // so we can use it safely without checking for its existance
1351               if (attributesSize[it->first] < it->second)
1352                 attributesSize[it->first] = it->second;
1353
1354               if (maxEventSize[it->first] < it->second)
1355                 maxEventSize[it->first] = it->second;
1356             }
1357           }
1358         }
1359       }
1360
1361      // Not a good approach here, duplicate code
1362       if (!hasServer)
1363       {
1364         size_t numEnabledFiles = this->enabledReadModeFiles.size();
1365         for (size_t i = 0; i < numEnabledFiles; ++i)
1366         {
1367           CFile* file = this->enabledReadModeFiles[i]; 
1368           {
1369             std::vector<CField*> enabledFields = file->getEnabledFields();
1370             size_t numEnabledFields = enabledFields.size();
1371             for (size_t j = 0; j < numEnabledFields; ++j)
1372             {
1373               const std::map<int, StdSize> mapSize = enabledFields[j]->getGridAttributesBufferSize(contextClient);
1374               std::map<int, StdSize>::const_iterator it = mapSize.begin(), itE = mapSize.end();
1375               for (; it != itE; ++it)
1376               {
1377                 // If attributesSize[it->first] does not exist, it will be zero-initialized
1378                 // so we can use it safely without checking for its existance
1379                 if (attributesSize[it->first] < it->second)
1380                   attributesSize[it->first] = it->second;
1381
1382                 if (maxEventSize[it->first] < it->second)
1383                   maxEventSize[it->first] = it->second;
1384               }
1385             }
1386           }
1387         }
1388       }
1389     }
1390
1391     return attributesSize;
1392   }
1393
1394   /*!
1395    * Compute the required buffer size to send the fields data.
1396    *
1397    * \param maxEventSize [in/out] the size of the bigger event for each connected server
1398    */
1399   std::map<int, StdSize> CContext::getDataBufferSize(std::map<int, StdSize>& maxEventSize, CContextClient* contextClient)
1400   {
1401     CFile::mode_attr::t_enum mode = hasClient ? CFile::mode_attr::write : CFile::mode_attr::read;
1402
1403     std::map<int, StdSize> dataSize;
1404
1405     // Find all reference domain and axis of all active fields
1406     size_t numEnabledFiles = this->enabledFiles.size();
1407     for (size_t i = 0; i < numEnabledFiles; ++i)
1408     {
1409       CFile* file = this->enabledFiles[i];
1410       if (file->getContextClient() == contextClient)
1411       {
1412         CFile::mode_attr::t_enum fileMode = file->mode.isEmpty() ? CFile::mode_attr::write : file->mode.getValue();
1413
1414         if (fileMode == mode)
1415         {
1416           std::vector<CField*> enabledFields = file->getEnabledFields();
1417           size_t numEnabledFields = enabledFields.size();
1418           for (size_t j = 0; j < numEnabledFields; ++j)
1419           {
1420             // const std::vector<std::map<int, StdSize> > mapSize = enabledFields[j]->getGridDataBufferSize(contextClient);
1421            const std::map<int, StdSize> mapSize = enabledFields[j]->getGridDataBufferSize(contextClient);
1422             // for (size_t c = 0; c < mapSize.size(); ++c)
1423             // {
1424               std::map<int, StdSize>::const_iterator it = mapSize.begin(), itE = mapSize.end();
1425               for (; it != itE; ++it)
1426               {
1427                 // If dataSize[it->first] does not exist, it will be zero-initialized
1428                 // so we can use it safely without checking for its existance
1429                 if (CXios::isOptPerformance)
1430                   dataSize[it->first] += it->second;
1431                 else if (dataSize[it->first] < it->second)
1432                   dataSize[it->first] = it->second;
1433
1434                 if (maxEventSize[it->first] < it->second)
1435                   maxEventSize[it->first] = it->second;
1436               }
1437             // }
1438           }
1439         }
1440       }
1441     }
1442
1443     return dataSize;
1444   }
1445
1446   //! Client side: Send infomation of active files (files are enabled to write out)
1447   void CContext::sendEnabledFiles(const std::vector<CFile*>& activeFiles)
1448   {
1449     int size = activeFiles.size();
1450
1451     // In a context, each type has a root definition, e.g: axis, domain, field.
1452     // Every object must be a child of one of these root definition. In this case
1453     // all new file objects created on server must be children of the root "file_definition"
1454     StdString fileDefRoot("file_definition");
1455     CFileGroup* cfgrpPtr = CFileGroup::get(fileDefRoot);
1456
1457     for (int i = 0; i < size; ++i)
1458     {
1459       CFile* f = activeFiles[i];
1460       cfgrpPtr->sendCreateChild(f->getId(),f->getContextClient());
1461       f->sendAllAttributesToServer(f->getContextClient());
1462       f->sendAddAllVariables(f->getContextClient());
1463     }
1464   }
1465
1466   //! Client side: Send information of active fields (ones are written onto files)
1467   void CContext::sendEnabledFieldsInFiles(const std::vector<CFile*>& activeFiles)
1468   {
1469     int size = activeFiles.size();
1470     for (int i = 0; i < size; ++i)
1471     {
1472       activeFiles[i]->sendEnabledFields(activeFiles[i]->getContextClient());
1473     }
1474   }
1475
1476   //! Client side: Check if the defined axis, domains and grids are eligible for compressed indexed output
1477   void CContext::checkAxisDomainsGridsEligibilityForCompressedOutput()
1478   {
1479     if (!hasClient) return;
1480
1481     const vector<CAxis*> allAxis = CAxis::getAll();
1482     for (vector<CAxis*>::const_iterator it = allAxis.begin(); it != allAxis.end(); it++)
1483       (*it)->checkEligibilityForCompressedOutput();
1484
1485     const vector<CDomain*> allDomains = CDomain::getAll();
1486     for (vector<CDomain*>::const_iterator it = allDomains.begin(); it != allDomains.end(); it++)
1487       (*it)->checkEligibilityForCompressedOutput();
1488
1489     const vector<CGrid*> allGrids = CGrid::getAll();
1490     for (vector<CGrid*>::const_iterator it = allGrids.begin(); it != allGrids.end(); it++)
1491       (*it)->checkEligibilityForCompressedOutput();
1492   }
1493
1494   //! Client side: Prepare the timeseries by adding the necessary files
1495   void CContext::prepareTimeseries()
1496   {
1497     if (!hasClient) return;
1498
1499     const std::vector<CFile*> allFiles = CFile::getAll();
1500     for (size_t i = 0; i < allFiles.size(); i++)
1501     {
1502       CFile* file = allFiles[i];
1503
1504       std::vector<CVariable*> fileVars, fieldVars, vars = file->getAllVariables();
1505       for (size_t k = 0; k < vars.size(); k++)
1506       {
1507         CVariable* var = vars[k];
1508
1509         if (var->ts_target.isEmpty()
1510              || var->ts_target == CVariable::ts_target_attr::file || var->ts_target == CVariable::ts_target_attr::both)
1511           fileVars.push_back(var);
1512
1513         if (!var->ts_target.isEmpty()
1514              && (var->ts_target == CVariable::ts_target_attr::field || var->ts_target == CVariable::ts_target_attr::both))
1515           fieldVars.push_back(var);
1516       }
1517
1518       if (!file->timeseries.isEmpty() && file->timeseries != CFile::timeseries_attr::none)
1519       {
1520         StdString fileNameStr("%file_name%") ;
1521         StdString tsPrefix = !file->ts_prefix.isEmpty() ? file->ts_prefix : fileNameStr ;
1522         
1523         StdString fileName=file->getFileOutputName();
1524         size_t pos=tsPrefix.find(fileNameStr) ;
1525         while (pos!=std::string::npos)
1526         {
1527           tsPrefix=tsPrefix.replace(pos,fileNameStr.size(),fileName) ;
1528           pos=tsPrefix.find(fileNameStr) ;
1529         }
1530       
1531         const std::vector<CField*> allFields = file->getAllFields();
1532         for (size_t j = 0; j < allFields.size(); j++)
1533         {
1534           CField* field = allFields[j];
1535
1536           if (!field->ts_enabled.isEmpty() && field->ts_enabled)
1537           {
1538             CFile* tsFile = CFile::create();
1539             tsFile->duplicateAttributes(file);
1540
1541             // Add variables originating from file and targeted to timeserie file
1542             for (size_t k = 0; k < fileVars.size(); k++)
1543               tsFile->getVirtualVariableGroup()->addChild(fileVars[k]);
1544
1545           
1546             tsFile->name = tsPrefix + "_";
1547             if (!field->name.isEmpty())
1548               tsFile->name.get() += field->name;
1549             else if (field->hasDirectFieldReference()) // We cannot use getBaseFieldReference() just yet
1550               tsFile->name.get() += field->field_ref;
1551             else
1552               tsFile->name.get() += field->getId();
1553
1554             if (!field->ts_split_freq.isEmpty())
1555               tsFile->split_freq = field->ts_split_freq;
1556
1557             CField* tsField = tsFile->addField();
1558             tsField->field_ref = field->getId();
1559
1560             // Add variables originating from file and targeted to timeserie field
1561             for (size_t k = 0; k < fieldVars.size(); k++)
1562               tsField->getVirtualVariableGroup()->addChild(fieldVars[k]);
1563
1564             vars = field->getAllVariables();
1565             for (size_t k = 0; k < vars.size(); k++)
1566             {
1567               CVariable* var = vars[k];
1568
1569               // Add variables originating from field and targeted to timeserie field
1570               if (var->ts_target.isEmpty()
1571                    || var->ts_target == CVariable::ts_target_attr::field || var->ts_target == CVariable::ts_target_attr::both)
1572                 tsField->getVirtualVariableGroup()->addChild(var);
1573
1574               // Add variables originating from field and targeted to timeserie file
1575               if (!var->ts_target.isEmpty()
1576                    && (var->ts_target == CVariable::ts_target_attr::file || var->ts_target == CVariable::ts_target_attr::both))
1577                 tsFile->getVirtualVariableGroup()->addChild(var);
1578             }
1579
1580             tsFile->solveFieldRefInheritance(true);
1581
1582             if (file->timeseries == CFile::timeseries_attr::exclusive)
1583               field->enabled = false;
1584           }
1585         }
1586
1587         // Finally disable the original file is need be
1588         if (file->timeseries == CFile::timeseries_attr::only)
1589          file->enabled = false;
1590       }
1591     }
1592   }
1593
1594   //! Client side: Send information of reference grid of active fields
1595   void CContext::sendRefGrid(const std::vector<CFile*>& activeFiles)
1596   {
1597     std::set<StdString> gridIds;
1598     int sizeFile = activeFiles.size();
1599     CFile* filePtr(NULL);
1600
1601     // Firstly, find all reference grids of all active fields
1602     for (int i = 0; i < sizeFile; ++i)
1603     {
1604       filePtr = activeFiles[i];
1605       std::vector<CField*> enabledFields = filePtr->getEnabledFields();
1606       int sizeField = enabledFields.size();
1607       for (int numField = 0; numField < sizeField; ++numField)
1608       {
1609         if (0 != enabledFields[numField]->getRelGrid())
1610           gridIds.insert(CGrid::get(enabledFields[numField]->getRelGrid())->getId());
1611       }
1612     }
1613
1614     // Create all reference grids on server side
1615     StdString gridDefRoot("grid_definition");
1616     CGridGroup* gridPtr = CGridGroup::get(gridDefRoot);
1617     std::set<StdString>::const_iterator it, itE = gridIds.end();
1618     for (it = gridIds.begin(); it != itE; ++it)
1619     {
1620       gridPtr->sendCreateChild(*it);
1621       CGrid::get(*it)->sendAllAttributesToServer();
1622       CGrid::get(*it)->sendAllDomains();
1623       CGrid::get(*it)->sendAllAxis();
1624       CGrid::get(*it)->sendAllScalars();
1625     }
1626   }
1627
1628   //! Client side: Send information of reference domain, axis and scalar of active fields
1629   void CContext::sendRefDomainsAxisScalars(const std::vector<CFile*>& activeFiles)
1630   {
1631     std::set<StdString> domainIds, axisIds, scalarIds;
1632
1633     // Find all reference domain and axis of all active fields
1634     int numEnabledFiles = activeFiles.size();
1635     for (int i = 0; i < numEnabledFiles; ++i)
1636     {
1637       std::vector<CField*> enabledFields = activeFiles[i]->getEnabledFields();
1638       int numEnabledFields = enabledFields.size();
1639       for (int j = 0; j < numEnabledFields; ++j)
1640       {
1641         const std::vector<StdString>& prDomAxisScalarId = enabledFields[j]->getRefDomainAxisIds();
1642         if ("" != prDomAxisScalarId[0]) domainIds.insert(prDomAxisScalarId[0]);
1643         if ("" != prDomAxisScalarId[1]) axisIds.insert(prDomAxisScalarId[1]);
1644         if ("" != prDomAxisScalarId[2]) scalarIds.insert(prDomAxisScalarId[2]);
1645       }
1646     }
1647
1648     // Create all reference axis on server side
1649     std::set<StdString>::iterator itDom, itAxis, itScalar;
1650     std::set<StdString>::const_iterator itE;
1651
1652     StdString scalarDefRoot("scalar_definition");
1653     CScalarGroup* scalarPtr = CScalarGroup::get(scalarDefRoot);
1654     itE = scalarIds.end();
1655     for (itScalar = scalarIds.begin(); itScalar != itE; ++itScalar)
1656     {
1657       if (!itScalar->empty())
1658       {
1659         scalarPtr->sendCreateChild(*itScalar);
1660         CScalar::get(*itScalar)->sendAllAttributesToServer();
1661       }
1662     }
1663
1664     StdString axiDefRoot("axis_definition");
1665     CAxisGroup* axisPtr = CAxisGroup::get(axiDefRoot);
1666     itE = axisIds.end();
1667     for (itAxis = axisIds.begin(); itAxis != itE; ++itAxis)
1668     {
1669       if (!itAxis->empty())
1670       {
1671         axisPtr->sendCreateChild(*itAxis);
1672         CAxis::get(*itAxis)->sendAllAttributesToServer();
1673       }
1674     }
1675
1676     // Create all reference domains on server side
1677     StdString domDefRoot("domain_definition");
1678     CDomainGroup* domPtr = CDomainGroup::get(domDefRoot);
1679     itE = domainIds.end();
1680     for (itDom = domainIds.begin(); itDom != itE; ++itDom)
1681     {
1682       if (!itDom->empty()) {
1683          domPtr->sendCreateChild(*itDom);
1684          CDomain::get(*itDom)->sendAllAttributesToServer();
1685       }
1686     }
1687   }
1688
1689   //! Update calendar in each time step
1690   void CContext::updateCalendar(int step)
1691   {
1692      info(50) << "updateCalendar : before : " << calendar->getCurrentDate() << endl;
1693      calendar->update(step);
1694      info(50) << "updateCalendar : after : " << calendar->getCurrentDate() << endl;
1695#ifdef XIOS_MEMTRACK_LIGHT
1696      info(50) << " Current memory used by XIOS : "<<  MemTrack::getCurrentMemorySize()*1.0/(1024*1024)<<" Mbyte, at timestep "<<step<<" of context "<<this->getId()<<endl ;
1697#endif
1698      //if (hasClient)
1699      if (hasClient && !hasServer) // For now we only use server level 1 to read data
1700      {
1701        checkPrefetchingOfEnabledReadModeFiles();
1702        garbageCollector.invalidate(calendar->getCurrentDate());
1703      }
1704   }
1705
1706   void CContext::initReadFiles(void)
1707   {
1708      vector<CFile*>::const_iterator it;
1709
1710      for (it=enabledReadModeFiles.begin(); it != enabledReadModeFiles.end(); it++)
1711      {
1712         (*it)->initRead();
1713      }
1714   }
1715
1716   //! Server side: Create header of netcdf file
1717   void CContext::createFileHeader(void)
1718   {
1719      vector<CFile*>::const_iterator it;
1720
1721      for (it=enabledFiles.begin(); it != enabledFiles.end(); it++)
1722      // for (it=enabledWriteModeFiles.begin(); it != enabledWriteModeFiles.end(); it++)
1723      {
1724         (*it)->initWrite();
1725      }
1726   }
1727
1728   //! Get current context
1729   CContext* CContext::getCurrent(void)
1730   {
1731     return CObjectFactory::GetObject<CContext>(CObjectFactory::GetCurrentContextId()).get();
1732   }
1733
1734   /*!
1735   \brief Set context with an id be the current context
1736   \param [in] id identity of context to be set to current
1737   */
1738   void CContext::setCurrent(const string& id)
1739   {
1740     CObjectFactory::SetCurrentContextId(id);
1741     CGroupFactory::SetCurrentContextId(id);
1742   }
1743
1744  /*!
1745  \brief Create a context with specific id
1746  \param [in] id identity of new context
1747  \return pointer to the new context or already-existed one with identity id
1748  */
1749  CContext* CContext::create(const StdString& id)
1750  {
1751    CContext::setCurrent(id);
1752
1753    bool hasctxt = CContext::has(id);
1754    CContext* context = CObjectFactory::CreateObject<CContext>(id).get();
1755    getRoot();
1756    if (!hasctxt) CGroupFactory::AddChild(root, context->getShared());
1757
1758#define DECLARE_NODE(Name_, name_) \
1759    C##Name_##Definition::create(C##Name_##Definition::GetDefName());
1760#define DECLARE_NODE_PAR(Name_, name_)
1761#include "node_type.conf"
1762
1763    return (context);
1764  }
1765
1766
1767     //! Server side: Receive a message to do some post processing
1768  void CContext::recvRegistry(CEventServer& event)
1769  {
1770    CBufferIn* buffer=event.subEvents.begin()->buffer;
1771    string id;
1772    *buffer>>id;
1773    get(id)->recvRegistry(*buffer);
1774  }
1775
1776  void CContext::recvRegistry(CBufferIn& buffer)
1777  {
1778    if (server->intraCommRank==0)
1779    {
1780      CRegistry registry(server->intraComm) ;
1781      registry.fromBuffer(buffer) ;
1782      registryOut->mergeRegistry(registry) ;
1783    }
1784  }
1785
1786  void CContext::sendRegistry(void)
1787  {
1788    registryOut->hierarchicalGatherRegistry() ;
1789
1790    // Use correct context client to send message
1791    int nbSrvPools = (this->hasServer) ? (this->hasClient ? this->clientPrimServer.size() : 0) : 1;
1792    for (int i = 0; i < nbSrvPools; ++i)
1793    {
1794      CContextClient* contextClientTmp = (hasServer) ? clientPrimServer[i] : client;
1795      CEventClient event(CContext::GetType(), CContext::EVENT_ID_SEND_REGISTRY);
1796        if (contextClientTmp->isServerLeader())
1797        {
1798           CMessage msg ;
1799           if (hasServer)
1800             msg<<this->getIdServer(i);
1801           else
1802             msg<<this->getIdServer();
1803           if (contextClientTmp->clientRank==0) msg<<*registryOut ;
1804           const std::list<int>& ranks = contextClientTmp->getRanksServerLeader();
1805           for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
1806             event.push(*itRank,1,msg);
1807           contextClientTmp->sendEvent(event);
1808         }
1809         else contextClientTmp->sendEvent(event);
1810    }
1811  }
1812
1813  /*!
1814  * \fn bool CContext::isFinalized(void)
1815  * Context is finalized if it received context post finalize event.
1816  */
1817  bool CContext::isFinalized(void)
1818  {
1819    return finalized;
1820  }
1821
1822} // namespace xios
Note: See TracBrowser for help on using the repository browser.