Ignore:
Timestamp:
01/10/17 14:36:29 (7 years ago)
Author:
oabramkina
Message:

Intermeadiate version for merging with new server functionalities.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • XIOS/dev/dev_olga/src/node/field.cpp

    r1009 r1021  
    3636      , domAxisScalarIds_(vector<StdString>(3,"")), areAllReferenceSolved(false), isReferenceSolved(false) 
    3737      , useCompressedOutput(false) 
    38       , isReadDataRequestPending(false) 
     38      , wasDataAlreadyReceivedFromServer(false) 
    3939   { setVirtualVariableGroup(CVariableGroup::create(getId() + "_virtual_variable_group")); } 
    4040 
     
    4747      , domAxisScalarIds_(vector<StdString>(3,"")), areAllReferenceSolved(false), isReferenceSolved(false) 
    4848      , useCompressedOutput(false) 
    49       , isReadDataRequestPending(false) 
     49      , wasDataAlreadyReceivedFromServer(false) 
    5050   { setVirtualVariableGroup(CVariableGroup::create(getId() + "_virtual_variable_group")); } 
    5151 
     
    179179  } 
    180180 
    181   void CField::sendUpdateData(const CArray<double,1>& data, const int srvPool) 
     181  void CField::sendUpdateData(const CArray<double,1>& data, CContextClient* client) 
    182182  { 
    183183    CTimer::get("XIOS Send Data").resume(); 
    184  
    185     CContext* context = CContext::getCurrent(); 
    186     CContextClient* client = context->clientPrimServer[srvPool]; 
    187184 
    188185    CEventClient event(getType(), EVENT_ID_UPDATE_DATA); 
     
    312309        int fileIdx = std::find(context->enabledFiles.begin(), context->enabledFiles.end(), this->file) - context->enabledFiles.begin(); 
    313310        int srvId = fileIdx % context->clientPrimServer.size(); 
    314         sendUpdateData(fieldData, srvId); 
     311        sendUpdateData(fieldData, context->clientPrimServer[srvId]); 
    315312    } 
    316313    if (!context->hasClient && context->hasServer) 
     
    348345  } 
    349346 
    350   void CField::sendReadDataRequest(void) 
     347  void CField::sendReadDataRequest(const CDate& tsDataRequested) 
    351348  { 
    352349    CContext* context = CContext::getCurrent(); 
    353350    CContextClient* client = context->client; 
    354351 
    355     lastDataRequestedFromServer = context->getCalendar()->getCurrentDate(); 
    356     isReadDataRequestPending = true; 
     352    lastDataRequestedFromServer = tsDataRequested; 
    357353 
    358354    CEventClient event(getType(), EVENT_ID_READ_DATA); 
     
    377373    const CDate& currentDate = CContext::getCurrent()->getCalendar()->getCurrentDate(); 
    378374 
    379     bool requestData = (currentDate >= lastDataRequestedFromServer + file->output_freq.getValue()); 
    380  
    381     if (requestData) 
    382     { 
    383       cout<<"currentDate : "<<currentDate<<endl ; 
    384       cout<<"lastDataRequestedFromServer : "<<lastDataRequestedFromServer<<endl ; 
    385       cout<<"file->output_freq.getValue() : "<<file->output_freq.getValue()<<endl ; 
    386       cout<<"lastDataRequestedFromServer + file->output_freq.getValue() : "<<lastDataRequestedFromServer + file->output_freq.getValue()<<endl ; 
    387  
    388       sendReadDataRequest(); 
    389     } 
    390  
    391     return requestData; 
     375    bool dataRequested = false; 
     376    while (currentDate >= lastDataRequestedFromServer) 
     377    { 
     378      info(20) << "currentDate : " << currentDate << endl ; 
     379      info(20) << "lastDataRequestedFromServer : " << lastDataRequestedFromServer << endl ; 
     380      info(20) << "file->output_freq.getValue() : " << file->output_freq.getValue() << endl ; 
     381      info(20) << "lastDataRequestedFromServer + file->output_freq.getValue() : " << lastDataRequestedFromServer + file->output_freq << endl ; 
     382 
     383      sendReadDataRequest(lastDataRequestedFromServer + file->output_freq); 
     384 
     385      dataRequested = true; 
     386    } 
     387    return dataRequested; 
    392388  } 
    393389 
     
    411407 
    412408    map<int, CArray<double,1> >::iterator it; 
    413     for (it = data_srv.begin(); it != data_srv.end(); it++) 
    414     { 
    415       msgs.push_back(CMessage()); 
    416       CMessage& msg = msgs.back(); 
    417       msg << getId(); 
    418       if (hasData) 
    419         msg << getNStep() - 1 << it->second; 
    420       else 
    421         msg << int(-1); 
    422       event.push(it->first, grid->nbSenders[it->first], msg); 
    423     } 
    424     client->sendEvent(event); 
     409//    for (it = data_srv.begin(); it != data_srv.end(); it++) 
     410//    { 
     411//      msgs.push_back(CMessage()); 
     412//      CMessage& msg = msgs.back(); 
     413//      msg << getId(); 
     414//      if (hasData) 
     415//        msg << getNStep() - 1 << it->second; 
     416//      else 
     417//        msg << int(-1); 
     418//      event.push(it->first, grid->nbSenders[it->first], msg); 
     419//    } 
     420//    client->sendEvent(event); 
     421    if (!grid->doGridHaveDataDistributed()) 
     422    { 
     423       if (client->isServerLeader()) 
     424       { 
     425          if (!data_srv.empty()) 
     426          { 
     427            it = data_srv.begin(); 
     428            const std::list<int>& ranks = client->getRanksServerLeader(); 
     429            for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) 
     430            { 
     431              msgs.push_back(CMessage()); 
     432              CMessage& msg = msgs.back(); 
     433              msg << getId(); 
     434              if (hasData) 
     435                msg << getNStep() - 1 << it->second; 
     436              else 
     437                msg << int(-1); 
     438              event.push(*itRank, 1, msg); 
     439            } 
     440          } 
     441          client->sendEvent(event); 
     442       } 
     443       else 
     444       { 
     445          // if (!data_srv.empty()) 
     446          // { 
     447          //   it = data_srv.begin(); 
     448          //   const std::list<int>& ranks = client->getRanksServerNotLeader(); 
     449          //   for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) 
     450          //   { 
     451          //     msgs.push_back(CMessage()); 
     452          //     CMessage& msg = msgs.back(); 
     453          //     msg << getId(); 
     454          //     if (hasData) 
     455          //       msg << getNStep() - 1 << it->second; 
     456          //     else 
     457          //       msg << int(-1); 
     458          //     event.push(*itRank, 1, msg); 
     459          //   } 
     460          // } 
     461          client->sendEvent(event); 
     462       } 
     463    } 
     464    else 
     465    { 
     466      for (it = data_srv.begin(); it != data_srv.end(); it++) 
     467      { 
     468        msgs.push_back(CMessage()); 
     469        CMessage& msg = msgs.back(); 
     470        msg << getId(); 
     471        if (hasData) 
     472          msg << getNStep() - 1 << it->second; 
     473        else 
     474          msg << int(-1); 
     475        event.push(it->first, grid->nbSenders[it->first], msg); 
     476      } 
     477      client->sendEvent(event); 
     478    } 
    425479  } 
    426480 
     
    494548    } 
    495549 
     550    if (wasDataAlreadyReceivedFromServer) 
     551      lastDataReceivedFromServer = lastDataReceivedFromServer + file->output_freq; 
     552    else 
     553    { 
     554      lastDataReceivedFromServer = context->getCalendar()->getInitDate(); 
     555      wasDataAlreadyReceivedFromServer = true; 
     556    } 
     557 
    496558    if (isEOF) 
    497       serverSourceFilter->signalEndOfStream(lastDataRequestedFromServer); 
     559      serverSourceFilter->signalEndOfStream(lastDataReceivedFromServer); 
    498560    else 
    499       serverSourceFilter->streamDataFromServer(lastDataRequestedFromServer, data); 
    500  
    501     isReadDataRequestPending = false; 
     561      serverSourceFilter->streamDataFromServer(lastDataReceivedFromServer, data); 
    502562  } 
    503563 
     
    749809     solveGridDomainAxisRef(doSending2Server); 
    750810 
    751      if (context->hasClient && !context->hasClient) 
     811     if (context->hasClient && !context->hasServer) 
    752812     { 
    753813       solveTransformedGrid(); 
     
    831891     { 
    832892       // Check if we have an expression to parse 
    833        if (!content.empty()) 
    834        { 
    835          boost::scoped_ptr<IFilterExprNode> expr(parseExpr(content + '\0')); 
    836          instantDataFilter = expr->reduce(gc, *this); 
     893       if (hasExpression()) 
     894       { 
     895         boost::scoped_ptr<IFilterExprNode> expr(parseExpr(getExpression() + '\0')); 
     896         boost::shared_ptr<COutputPin> filter = expr->reduce(gc, *this); 
     897 
     898         // Check if a spatial transformation is needed 
     899         if (!field_ref.isEmpty()) 
     900         { 
     901           CGrid* gridRef = CField::get(field_ref)->grid; 
     902 
     903           if (grid && grid != gridRef && grid->hasTransform()) 
     904           { 
     905             double defaultValue = !default_value.isEmpty() ? default_value : 0.0; 
     906             std::pair<boost::shared_ptr<CFilter>, boost::shared_ptr<CFilter> > filters = CSpatialTransformFilter::buildFilterGraph(gc, gridRef, grid, defaultValue); 
     907 
     908             filter->connectOutput(filters.first, 0); 
     909             filter = filters.second; 
     910           } 
     911         } 
     912 
     913         instantDataFilter = filter; 
    837914       } 
    838915       // Check if we have a reference on another field 
     
    841918       // Check if the data is to be read from a file 
    842919       else if (file && !file->mode.isEmpty() && file->mode == CFile::mode_attr::read) 
    843          instantDataFilter = serverSourceFilter = boost::shared_ptr<CSourceFilter>(new CSourceFilter(grid, 
    844                                                                                                      freq_offset.isEmpty() ? NoneDu : freq_offset)); 
     920         instantDataFilter = serverSourceFilter = boost::shared_ptr<CSourceFilter>(new CSourceFilter(gc, grid, 
     921                                                                                                     freq_offset.isEmpty() ? NoneDu : freq_offset, 
     922                                                                                                     true)); 
    845923       else // The data might be passed from the model 
    846          instantDataFilter = clientSourceFilter = boost::shared_ptr<CSourceFilter>(new CSourceFilter(grid)); 
     924         instantDataFilter = clientSourceFilter = boost::shared_ptr<CSourceFilter>(new CSourceFilter(gc, grid)); 
    847925     } 
    848926 
     
    863941     } 
    864942   } 
     943 
    865944 
    866945   /*! 
     
    871950    * \return the output pin corresponding to the field reference 
    872951    */ 
    873    boost::shared_ptr<COutputPin> CField::getFieldReference(CGarbageCollector& gc) 
    874    { 
    875      if (instantDataFilter || field_ref.isEmpty()) 
    876        ERROR("COutputPin* CField::getFieldReference(CGarbageCollector& gc)", 
    877              "Impossible to get the field reference for a field which has already been parsed or which does not have a field_ref."); 
    878  
    879      CField* fieldRef = CField::get(field_ref); 
    880      fieldRef->buildFilterGraph(gc, false); 
    881  
    882      std::pair<boost::shared_ptr<CFilter>, boost::shared_ptr<CFilter> > filters; 
    883      // Check if a spatial transformation is needed 
    884      if (grid && grid != fieldRef->grid && grid->hasTransform()) 
    885      { 
    886        double defaultValue = 0.0; 
    887        if (!default_value.isEmpty()) defaultValue = this->default_value; 
    888        filters = CSpatialTransformFilter::buildFilterGraph(gc, fieldRef->grid, grid, defaultValue); 
    889      } 
    890  
    891      else 
    892        filters.first = filters.second = boost::shared_ptr<CFilter>(new CPassThroughFilter(gc)); 
    893  
    894      fieldRef->getInstantDataFilter()->connectOutput(filters.first, 0); 
    895  
    896      return filters.second; 
    897    } 
     952     boost::shared_ptr<COutputPin> CField::getFieldReference(CGarbageCollector& gc) 
     953     { 
     954       if (instantDataFilter || field_ref.isEmpty()) 
     955         ERROR("COutputPin* CField::getFieldReference(CGarbageCollector& gc)", 
     956               "Impossible to get the field reference for a field which has already been parsed or which does not have a field_ref."); 
     957 
     958       CField* fieldRef = CField::get(field_ref); 
     959       fieldRef->buildFilterGraph(gc, false); 
     960 
     961       std::pair<boost::shared_ptr<CFilter>, boost::shared_ptr<CFilter> > filters; 
     962       // Check if a spatial transformation is needed 
     963       if (grid && grid != fieldRef->grid && grid->hasTransform()) 
     964       { 
     965         double defaultValue = !default_value.isEmpty() ? default_value : 0.0; 
     966         filters = CSpatialTransformFilter::buildFilterGraph(gc, fieldRef->grid, grid, defaultValue); 
     967       } 
     968       else 
     969         filters.first = filters.second = boost::shared_ptr<CFilter>(new CPassThroughFilter(gc)); 
     970 
     971       fieldRef->getInstantDataFilter()->connectOutput(filters.first, 0); 
     972 
     973       return filters.second; 
     974     } 
    898975 
    899976   /*! 
     
    908985   boost::shared_ptr<COutputPin> CField::getSelfReference(CGarbageCollector& gc) 
    909986   { 
    910      if (instantDataFilter || content.empty()) 
     987     if (instantDataFilter || !hasExpression()) 
    911988       ERROR("COutputPin* CField::getSelfReference(CGarbageCollector& gc)", 
    912989             "Impossible to add a self reference to a field which has already been parsed or which does not have an expression."); 
     
    917994       { 
    918995         if (!serverSourceFilter) 
    919            serverSourceFilter = boost::shared_ptr<CSourceFilter>(new CSourceFilter(grid, 
    920                                                                                    freq_offset.isEmpty() ? NoneDu : freq_offset)); 
    921  
     996           serverSourceFilter = boost::shared_ptr<CSourceFilter>(new CSourceFilter(gc, grid, 
     997                                                                                   freq_offset.isEmpty() ? NoneDu : freq_offset, 
     998                                                                                   true)); 
    922999         selfReferenceFilter = serverSourceFilter; 
    9231000       } 
    9241001       else if (!field_ref.isEmpty()) 
    925          selfReferenceFilter = getFieldReference(gc); 
     1002       { 
     1003         CField* fieldRef = CField::get(field_ref); 
     1004         fieldRef->buildFilterGraph(gc, false); 
     1005         selfReferenceFilter = fieldRef->getInstantDataFilter(); 
     1006       } 
    9261007       else 
    9271008       { 
    9281009         if (!clientSourceFilter) 
    929            clientSourceFilter = boost::shared_ptr<CSourceFilter>(new CSourceFilter(grid)); 
     1010           clientSourceFilter = boost::shared_ptr<CSourceFilter>(new CSourceFilter(gc, grid)); 
    9301011 
    9311012         selfReferenceFilter = clientSourceFilter; 
     
    11261207     else if (grid && grid->hasTransform() && !grid->isTransformed()) 
    11271208     { 
    1128        grid->transformGrid(grid); 
     1209       // Temporarily deactivate the self-transformation of grid 
     1210       //grid->transformGrid(grid); 
    11291211     } 
    11301212   } 
     
    13251407   } 
    13261408 
    1327    void CField::sendAddAllVariables(const int srvPool) 
     1409   void CField::sendAddAllVariables(CContextClient* client) 
    13281410   { 
    13291411     std::vector<CVariable*> allVar = getAllVariables(); 
     
    13331415     for (; it != itE; ++it) 
    13341416     { 
    1335        this->sendAddVariable((*it)->getId()); 
    1336        (*it)->sendAllAttributesToServer(srvPool); 
    1337        (*it)->sendValue(srvPool); 
     1417       this->sendAddVariable((*it)->getId(), client); 
     1418       (*it)->sendAllAttributesToServer(client); 
     1419       (*it)->sendValue(client); 
    13381420     } 
    13391421   } 
     
    13631445   } 
    13641446 
     1447   void CField::sendAddVariable(const string& id, CContextClient* client) 
     1448   { 
     1449      sendAddItem(id, (int)EVENT_ID_ADD_VARIABLE, client); 
     1450    // CContext* context = CContext::getCurrent(); 
     1451 
     1452    // if (!context->hasServer) 
     1453    // { 
     1454    //    CContextClient* client = context->client; 
     1455 
     1456    //    CEventClient event(this->getType(),EVENT_ID_ADD_VARIABLE); 
     1457    //    if (client->isServerLeader()) 
     1458    //    { 
     1459    //      CMessage msg; 
     1460    //      msg << this->getId(); 
     1461    //      msg << id; 
     1462    //      const std::list<int>& ranks = client->getRanksServerLeader(); 
     1463    //      for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) 
     1464    //        event.push(*itRank,1,msg); 
     1465    //      client->sendEvent(event); 
     1466    //    } 
     1467    //    else client->sendEvent(event); 
     1468    // } 
     1469   } 
     1470 
    13651471   void CField::sendAddVariableGroup(const string& id) 
    13661472   { 
     
    14181524   } 
    14191525 
     1526   /*! 
     1527    * Returns string arithmetic expression associated to the field. 
     1528    * \return if content is defined return content string, otherwise, if "expr" attribute is defined, return expr string. 
     1529    */ 
     1530   const string& CField::getExpression(void) 
     1531   { 
     1532     if (!expr.isEmpty() && content.empty()) 
     1533     { 
     1534       content = expr; 
     1535       expr.reset(); 
     1536     } 
     1537 
     1538     return content; 
     1539   } 
     1540 
     1541   bool CField::hasExpression(void) const 
     1542   { 
     1543     return (!expr.isEmpty() || !content.empty()); 
     1544   } 
     1545 
     1546 
    14201547   DEFINE_REF_FUNC(Field,field) 
    14211548} // namespace xios 
Note: See TracChangeset for help on using the changeset viewer.