New URL for NEMO forge!   http://forge.nemo-ocean.eu

Since March 2022 along with NEMO 4.2 release, the code development moved to a self-hosted GitLab.
This present forge is now archived and remained online for history.
lib_mpp.F90 in NEMO/branches/2019/dev_r11613_ENHANCE-04_namelists_as_internalfiles/src/OCE/LBC – NEMO

source: NEMO/branches/2019/dev_r11613_ENHANCE-04_namelists_as_internalfiles/src/OCE/LBC/lib_mpp.F90 @ 11624

Last change on this file since 11624 was 11624, checked in by acc, 5 years ago

Branch 2019/dev_r11613_ENHANCE-04_namelists_as_internalfiles. Substantive changes required to replace all namelists with internal files. These are the key changes only; to compile and run tests all REWIND and CLOSE operations on the (no longer) units have to be removed. These changes affect many more files but can be scripted so are not included here in order to make a later merge easier. The scripts used to prepare code for testing are included on: wiki:2019WP/ENHANCE-04_AndrewC-reporting/Internal_Namelists. With these additional changes this code passes most SETTE tests but the AGRIF preprocessor does not currently accept the new allocatable character strings. To be investigated.

  • Property svn:keywords set to Id
File size: 58.6 KB
RevLine 
[3]1MODULE lib_mpp
[13]2   !!======================================================================
3   !!                       ***  MODULE  lib_mpp  ***
[1344]4   !! Ocean numerics:  massively parallel processing library
[13]5   !!=====================================================================
[1344]6   !! History :  OPA  !  1994  (M. Guyon, J. Escobar, M. Imbard)  Original code
7   !!            7.0  !  1997  (A.M. Treguier)  SHMEM additions
8   !!            8.0  !  1998  (M. Imbard, J. Escobar, L. Colombet ) SHMEM and MPI
9   !!                 !  1998  (J.M. Molines) Open boundary conditions
[9019]10   !!   NEMO     1.0  !  2003  (J.M. Molines, G. Madec)  F90, free form
[1344]11   !!                 !  2003  (J.M. Molines) add mpp_ini_north(_3d,_2d)
12   !!             -   !  2004  (R. Bourdalle Badie)  isend option in mpi
13   !!                 !  2004  (J.M. Molines) minloc, maxloc
14   !!             -   !  2005  (G. Madec, S. Masson)  npolj=5,6 F-point & ice cases
15   !!             -   !  2005  (R. Redler) Replacement of MPI_COMM_WORLD except for MPI_Abort
16   !!             -   !  2005  (R. Benshila, G. Madec)  add extra halo case
17   !!             -   !  2008  (R. Benshila) add mpp_ini_ice
18   !!            3.2  !  2009  (R. Benshila) SHMEM suppression, north fold in lbc_nfd
[3764]19   !!            3.2  !  2009  (O. Marti)    add mpp_ini_znl
[2715]20   !!            4.0  !  2011  (G. Madec)  move ctl_ routines from in_out_manager
[9019]21   !!            3.5  !  2012  (S.Mocavero, I. Epicoco) Add mpp_lnk_bdy_3d/2d routines to optimize the BDY comm.
22   !!            3.5  !  2013  (C. Ethe, G. Madec)  message passing arrays as local variables
[6140]23   !!            3.5  !  2013  (S.Mocavero, I.Epicoco - CMCC) north fold optimizations
[9019]24   !!            3.6  !  2015  (O. Tintó and M. Castrillo - BSC) Added '_multiple' case for 2D lbc and max
25   !!            4.0  !  2017  (G. Madec) automatique allocation of array argument (use any 3rd dimension)
26   !!             -   !  2017  (G. Madec) create generic.h90 files to generate all lbc and north fold routines
[13]27   !!----------------------------------------------------------------------
[2715]28
29   !!----------------------------------------------------------------------
[6140]30   !!   ctl_stop      : update momentum and tracer Kz from a tke scheme
31   !!   ctl_warn      : initialization, namelist read, and parameters control
32   !!   ctl_opn       : Open file and check if required file is available.
33   !!   ctl_nam       : Prints informations when an error occurs while reading a namelist
[11624]34   !!   load_nml      : Read, condense and buffer namelist file into character array for use as an internal file
[2715]35   !!----------------------------------------------------------------------
[13]36   !!----------------------------------------------------------------------
[11536]37   !!   mpp_start     : get local communicator its size and rank
[2715]38   !!   mpp_lnk       : interface (defined in lbclnk) for message passing of 2d or 3d arrays (mpp_lnk_2d, mpp_lnk_3d)
[4990]39   !!   mpp_lnk_icb   : interface for message passing of 2d arrays with extra halo for icebergs (mpp_lnk_2d_icb)
[6140]40   !!   mpprecv       :
[9019]41   !!   mppsend       :
[2715]42   !!   mppscatter    :
43   !!   mppgather     :
44   !!   mpp_min       : generic interface for mppmin_int , mppmin_a_int , mppmin_real, mppmin_a_real
45   !!   mpp_max       : generic interface for mppmax_int , mppmax_a_int , mppmax_real, mppmax_a_real
46   !!   mpp_sum       : generic interface for mppsum_int , mppsum_a_int , mppsum_real, mppsum_a_real
47   !!   mpp_minloc    :
48   !!   mpp_maxloc    :
49   !!   mppsync       :
50   !!   mppstop       :
[1344]51   !!   mpp_ini_north : initialisation of north fold
[9019]52   !!   mpp_lbc_north_icb : alternative to mpp_nfd for extra outer halo with icebergs
[13]53   !!----------------------------------------------------------------------
[3764]54   USE dom_oce        ! ocean space and time domain
[2715]55   USE in_out_manager ! I/O manager
[3]56
[13]57   IMPLICIT NONE
[415]58   PRIVATE
[9019]59   !
[11624]60   PUBLIC   ctl_stop, ctl_warn, ctl_opn, ctl_nam, load_nml
[11536]61   PUBLIC   mpp_start, mppstop, mppsync, mpp_comm_free
[9019]62   PUBLIC   mpp_ini_north
[1344]63   PUBLIC   mpp_min, mpp_max, mpp_sum, mpp_minloc, mpp_maxloc
[10425]64   PUBLIC   mpp_delay_max, mpp_delay_sum, mpp_delay_rcv
[3294]65   PUBLIC   mppscatter, mppgather
[10425]66   PUBLIC   mpp_ini_znl
[3764]67   PUBLIC   mppsend, mpprecv                          ! needed by TAM and ICB routines
[11536]68   PUBLIC   mpp_report
69   PUBLIC   tic_tac
70#if ! defined key_mpp_mpi
71   PUBLIC MPI_Wtime
72#endif
[5429]73   
[13]74   !! * Interfaces
75   !! define generic interface for these routine as they are called sometimes
[1344]76   !! with scalar arguments instead of array arguments, which causes problems
77   !! for the compilation on AIX system as well as NEC and SGI. Ok on COMPACQ
[13]78   INTERFACE mpp_min
79      MODULE PROCEDURE mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real
80   END INTERFACE
81   INTERFACE mpp_max
[681]82      MODULE PROCEDURE mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real
[13]83   END INTERFACE
84   INTERFACE mpp_sum
[6140]85      MODULE PROCEDURE mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real,   &
[9019]86         &             mppsum_realdd, mppsum_a_realdd
[13]87   END INTERFACE
[1344]88   INTERFACE mpp_minloc
89      MODULE PROCEDURE mpp_minloc2d ,mpp_minloc3d
90   END INTERFACE
91   INTERFACE mpp_maxloc
92      MODULE PROCEDURE mpp_maxloc2d ,mpp_maxloc3d
93   END INTERFACE
[6490]94
[51]95   !! ========================= !!
96   !!  MPI  variable definition !!
97   !! ========================= !!
[11536]98#if   defined key_mpp_mpi
[1629]99!$AGRIF_DO_NOT_TREAT
[2004]100   INCLUDE 'mpif.h'
[1629]101!$AGRIF_END_DO_NOT_TREAT
[1344]102   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .TRUE.    !: mpp flag
[11536]103#else   
104   INTEGER, PUBLIC, PARAMETER ::   MPI_STATUS_SIZE = 1
105   INTEGER, PUBLIC, PARAMETER ::   MPI_DOUBLE_PRECISION = 8
106   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .FALSE.    !: mpp flag
107#endif
[3]108
[1344]109   INTEGER, PARAMETER         ::   nprocmax = 2**10   ! maximun dimension (required to be a power of 2)
[3764]110
[10425]111   INTEGER, PUBLIC ::   mppsize        ! number of process
112   INTEGER, PUBLIC ::   mpprank        ! process number  [ 0 - size-1 ]
[2363]113!$AGRIF_DO_NOT_TREAT
[9570]114   INTEGER, PUBLIC ::   mpi_comm_oce   ! opa local communicator
[2363]115!$AGRIF_END_DO_NOT_TREAT
[3]116
[2480]117   INTEGER :: MPI_SUMDD
[1976]118
[1345]119   ! variables used for zonal integration
120   INTEGER, PUBLIC ::   ncomm_znl       !: communicator made by the processors on the same zonal average
[9019]121   LOGICAL, PUBLIC ::   l_znl_root      !: True on the 'left'most processor on the same row
122   INTEGER         ::   ngrp_znl        !  group ID for the znl processors
123   INTEGER         ::   ndim_rank_znl   !  number of processors on the same zonal average
[2715]124   INTEGER, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_znl  ! dimension ndim_rank_znl, number of the procs into the same znl domain
[3]125
[3764]126   ! North fold condition in mpp_mpi with jpni > 1 (PUBLIC for TAM)
[9019]127   INTEGER, PUBLIC ::   ngrp_world        !: group ID for the world processors
128   INTEGER, PUBLIC ::   ngrp_opa          !: group ID for the opa processors
129   INTEGER, PUBLIC ::   ngrp_north        !: group ID for the northern processors (to be fold)
130   INTEGER, PUBLIC ::   ncomm_north       !: communicator made by the processors belonging to ngrp_north
131   INTEGER, PUBLIC ::   ndim_rank_north   !: number of 'sea' processor in the northern line (can be /= jpni !)
132   INTEGER, PUBLIC ::   njmppmax          !: value of njmpp for the processors of the northern line
133   INTEGER, PUBLIC ::   north_root        !: number (in the comm_opa) of proc 0 in the northern comm
134   INTEGER, PUBLIC, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_north   !: dimension ndim_rank_north
[3764]135
[10425]136   ! Communications summary report
137   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_lbc                   !: names of lbc_lnk calling routines
[10437]138   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_glb                   !: names of global comm calling routines
139   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_dlg                   !: names of delayed global comm calling routines
[10425]140   INTEGER, PUBLIC                               ::   ncom_stp = 0                 !: copy of time step # istp
141   INTEGER, PUBLIC                               ::   ncom_fsbc = 1                !: copy of sbc time step # nn_fsbc
142   INTEGER, PUBLIC                               ::   ncom_dttrc = 1               !: copy of top time step # nn_dttrc
143   INTEGER, PUBLIC                               ::   ncom_freq                    !: frequency of comm diagnostic
144   INTEGER, PUBLIC , DIMENSION(:,:), ALLOCATABLE ::   ncomm_sequence               !: size of communicated arrays (halos)
[10781]145   INTEGER, PARAMETER, PUBLIC                    ::   ncom_rec_max = 5000          !: max number of communication record
[10425]146   INTEGER, PUBLIC                               ::   n_sequence_lbc = 0           !: # of communicated arraysvia lbc
147   INTEGER, PUBLIC                               ::   n_sequence_glb = 0           !: # of global communications
[10437]148   INTEGER, PUBLIC                               ::   n_sequence_dlg = 0           !: # of delayed global communications
[10425]149   INTEGER, PUBLIC                               ::   numcom = -1                  !: logical unit for communicaton report
150   LOGICAL, PUBLIC                               ::   l_full_nf_update = .TRUE.    !: logical for a full (2lines) update of bc at North fold report
151   INTEGER,                    PARAMETER, PUBLIC ::   nbdelay = 2       !: number of delayed operations
152   !: name (used as id) of allreduce-delayed operations
[10521]153   ! Warning: we must use the same character length in an array constructor (at least for gcc compiler)
154   CHARACTER(len=32), DIMENSION(nbdelay), PUBLIC ::   c_delaylist = (/ 'cflice', 'fwb   ' /)
[10425]155   !: component name where the allreduce-delayed operation is performed
156   CHARACTER(len=3),  DIMENSION(nbdelay), PUBLIC ::   c_delaycpnt = (/ 'ICE'   , 'OCE' /)
157   TYPE, PUBLIC ::   DELAYARR
158      REAL(   wp), POINTER, DIMENSION(:) ::  z1d => NULL()
159      COMPLEX(wp), POINTER, DIMENSION(:) ::  y1d => NULL()
160   END TYPE DELAYARR
[10817]161   TYPE( DELAYARR ), DIMENSION(nbdelay), PUBLIC, SAVE  ::   todelay         !: must have SAVE for default initialization of DELAYARR
162   INTEGER,          DIMENSION(nbdelay), PUBLIC        ::   ndelayid = -1   !: mpi request id of the delayed operations
[10425]163
164   ! timing summary report
165   REAL(wp), DIMENSION(2), PUBLIC ::  waiting_time = 0._wp
166   REAL(wp)              , PUBLIC ::  compute_time = 0._wp, elapsed_time = 0._wp
167   
[9019]168   REAL(wp), DIMENSION(:), ALLOCATABLE, SAVE ::   tampon   ! buffer in case of bsend
[3]169
[9019]170   LOGICAL, PUBLIC ::   ln_nnogather                !: namelist control of northfold comms
171   LOGICAL, PUBLIC ::   l_north_nogather = .FALSE.  !: internal control of northfold comms
[11536]172   
[51]173   !!----------------------------------------------------------------------
[9598]174   !! NEMO/OCE 4.0 , NEMO Consortium (2018)
[888]175   !! $Id$
[10068]176   !! Software governed by the CeCILL license (see ./LICENSE)
[1344]177   !!----------------------------------------------------------------------
[3]178CONTAINS
179
[11536]180   SUBROUTINE mpp_start( localComm )
[2715]181      !!----------------------------------------------------------------------
[11536]182      !!                  ***  routine mpp_start  ***
[3764]183      !!
[11536]184      !! ** Purpose :   get mpi_comm_oce, mpprank and mppsize
[51]185      !!----------------------------------------------------------------------
[6140]186      INTEGER         , OPTIONAL   , INTENT(in   ) ::   localComm    !
[2715]187      !
[11536]188      INTEGER ::   ierr
189      LOGICAL ::   llmpi_init
[51]190      !!----------------------------------------------------------------------
[11536]191#if defined key_mpp_mpi
[1344]192      !
[11536]193      CALL mpi_initialized ( llmpi_init, ierr )
194      IF( ierr /= MPI_SUCCESS ) CALL ctl_stop( 'STOP', ' lib_mpp: Error in routine mpi_initialized' )
[2715]195
[11536]196      IF( .NOT. llmpi_init ) THEN
197         IF( PRESENT(localComm) ) THEN
198            WRITE(ctmp1,*) ' lib_mpp: You cannot provide a local communicator '
199            WRITE(ctmp2,*) '          without calling MPI_Init before ! '
200            CALL ctl_stop( 'STOP', ctmp1, ctmp2 )
201         ENDIF
202         CALL mpi_init( ierr )
203         IF( ierr /= MPI_SUCCESS ) CALL ctl_stop( 'STOP', ' lib_mpp: Error in routine mpi_init' )
[2480]204      ENDIF
[11536]205       
[3764]206      IF( PRESENT(localComm) ) THEN
[2480]207         IF( Agrif_Root() ) THEN
[9570]208            mpi_comm_oce = localComm
[2480]209         ENDIF
210      ELSE
[11536]211         CALL mpi_comm_dup( mpi_comm_world, mpi_comm_oce, ierr)
212         IF( ierr /= MPI_SUCCESS ) CALL ctl_stop( 'STOP', ' lib_mpp: Error in routine mpi_comm_dup' )
[3764]213      ENDIF
[2480]214
[11536]215# if defined key_agrif
[9019]216      IF( Agrif_Root() ) THEN
[9570]217         CALL Agrif_MPI_Init(mpi_comm_oce)
[5656]218      ELSE
[9570]219         CALL Agrif_MPI_set_grid_comm(mpi_comm_oce)
[5656]220      ENDIF
[11536]221# endif
[5656]222
[9570]223      CALL mpi_comm_rank( mpi_comm_oce, mpprank, ierr )
224      CALL mpi_comm_size( mpi_comm_oce, mppsize, ierr )
[3764]225      !
[1976]226      CALL MPI_OP_CREATE(DDPDD_MPI, .TRUE., MPI_SUMDD, ierr)
227      !
[11536]228#else
229      IF( PRESENT( localComm ) ) mpi_comm_oce = localComm
230      mppsize = 1
231      mpprank = 0
232#endif
233   END SUBROUTINE mpp_start
[3]234
[6140]235
[1344]236   SUBROUTINE mppsend( ktyp, pmess, kbytes, kdest, md_req )
[51]237      !!----------------------------------------------------------------------
238      !!                  ***  routine mppsend  ***
[3764]239      !!
[51]240      !! ** Purpose :   Send messag passing array
241      !!
242      !!----------------------------------------------------------------------
[1344]243      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
244      INTEGER , INTENT(in   ) ::   kbytes     ! size of the array pmess
245      INTEGER , INTENT(in   ) ::   kdest      ! receive process number
246      INTEGER , INTENT(in   ) ::   ktyp       ! tag of the message
247      INTEGER , INTENT(in   ) ::   md_req     ! argument for isend
248      !!
249      INTEGER ::   iflag
[51]250      !!----------------------------------------------------------------------
[1344]251      !
[11536]252#if defined key_mpp_mpi
253      CALL mpi_isend( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce, md_req, iflag )
254#endif
[1344]255      !
[51]256   END SUBROUTINE mppsend
[3]257
258
[3294]259   SUBROUTINE mpprecv( ktyp, pmess, kbytes, ksource )
[51]260      !!----------------------------------------------------------------------
261      !!                  ***  routine mpprecv  ***
262      !!
263      !! ** Purpose :   Receive messag passing array
264      !!
265      !!----------------------------------------------------------------------
[1344]266      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
267      INTEGER , INTENT(in   ) ::   kbytes     ! suze of the array pmess
268      INTEGER , INTENT(in   ) ::   ktyp       ! Tag of the recevied message
[3764]269      INTEGER, OPTIONAL, INTENT(in) :: ksource    ! source process number
[1344]270      !!
[51]271      INTEGER :: istatus(mpi_status_size)
272      INTEGER :: iflag
[3294]273      INTEGER :: use_source
[1344]274      !!----------------------------------------------------------------------
275      !
[11536]276#if defined key_mpp_mpi
[3764]277      ! If a specific process number has been passed to the receive call,
[3294]278      ! use that one. Default is to use mpi_any_source
[6140]279      use_source = mpi_any_source
280      IF( PRESENT(ksource) )   use_source = ksource
281      !
[9570]282      CALL mpi_recv( pmess, kbytes, mpi_double_precision, use_source, ktyp, mpi_comm_oce, istatus, iflag )
[11536]283#endif
[1344]284      !
[51]285   END SUBROUTINE mpprecv
[3]286
287
[51]288   SUBROUTINE mppgather( ptab, kp, pio )
289      !!----------------------------------------------------------------------
290      !!                   ***  routine mppgather  ***
[3764]291      !!
292      !! ** Purpose :   Transfert between a local subdomain array and a work
[51]293      !!     array which is distributed following the vertical level.
294      !!
[1344]295      !!----------------------------------------------------------------------
[6140]296      REAL(wp), DIMENSION(jpi,jpj)      , INTENT(in   ) ::   ptab   ! subdomain input array
297      INTEGER                           , INTENT(in   ) ::   kp     ! record length
[1344]298      REAL(wp), DIMENSION(jpi,jpj,jpnij), INTENT(  out) ::   pio    ! subdomain input array
[51]299      !!
[1344]300      INTEGER :: itaille, ierror   ! temporary integer
[51]301      !!---------------------------------------------------------------------
[1344]302      !
303      itaille = jpi * jpj
[11536]304#if defined key_mpp_mpi
[1344]305      CALL mpi_gather( ptab, itaille, mpi_double_precision, pio, itaille     ,   &
[9570]306         &                            mpi_double_precision, kp , mpi_comm_oce, ierror )
[11536]307#else
308      pio(:,:,1) = ptab(:,:)
309#endif
[1344]310      !
[51]311   END SUBROUTINE mppgather
[3]312
313
[51]314   SUBROUTINE mppscatter( pio, kp, ptab )
315      !!----------------------------------------------------------------------
316      !!                  ***  routine mppscatter  ***
317      !!
[3764]318      !! ** Purpose :   Transfert between awork array which is distributed
[51]319      !!      following the vertical level and the local subdomain array.
320      !!
321      !!----------------------------------------------------------------------
[6140]322      REAL(wp), DIMENSION(jpi,jpj,jpnij)  ::   pio    ! output array
323      INTEGER                             ::   kp     ! Tag (not used with MPI
324      REAL(wp), DIMENSION(jpi,jpj)        ::   ptab   ! subdomain array input
[1344]325      !!
326      INTEGER :: itaille, ierror   ! temporary integer
[51]327      !!---------------------------------------------------------------------
[1344]328      !
[6140]329      itaille = jpi * jpj
[1344]330      !
[11536]331#if defined key_mpp_mpi
[1344]332      CALL mpi_scatter( pio, itaille, mpi_double_precision, ptab, itaille     ,   &
[9570]333         &                            mpi_double_precision, kp  , mpi_comm_oce, ierror )
[11536]334#else
335      ptab(:,:) = pio(:,:,1)
336#endif
[1344]337      !
[51]338   END SUBROUTINE mppscatter
[3]339
[10425]340   
341   SUBROUTINE mpp_delay_sum( cdname, cdelay, y_in, pout, ldlast, kcom )
342     !!----------------------------------------------------------------------
343      !!                   ***  routine mpp_delay_sum  ***
[1344]344      !!
[10425]345      !! ** Purpose :   performed delayed mpp_sum, the result is received on next call
346      !!
[1344]347      !!----------------------------------------------------------------------
[10425]348      CHARACTER(len=*), INTENT(in   )               ::   cdname  ! name of the calling subroutine
349      CHARACTER(len=*), INTENT(in   )               ::   cdelay  ! name (used as id) of the delayed operation
350      COMPLEX(wp),      INTENT(in   ), DIMENSION(:) ::   y_in
351      REAL(wp),         INTENT(  out), DIMENSION(:) ::   pout
352      LOGICAL,          INTENT(in   )               ::   ldlast  ! true if this is the last time we call this routine
353      INTEGER,          INTENT(in   ), OPTIONAL     ::   kcom
[1344]354      !!
[10425]355      INTEGER ::   ji, isz
356      INTEGER ::   idvar
357      INTEGER ::   ierr, ilocalcomm
358      COMPLEX(wp), ALLOCATABLE, DIMENSION(:) ::   ytmp
[1344]359      !!----------------------------------------------------------------------
[11536]360#if defined key_mpp_mpi
[9570]361      ilocalcomm = mpi_comm_oce
[9019]362      IF( PRESENT(kcom) )   ilocalcomm = kcom
[3]363
[10425]364      isz = SIZE(y_in)
365     
[10437]366      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ld_dlg = .TRUE. )
[13]367
[10425]368      idvar = -1
369      DO ji = 1, nbdelay
370         IF( TRIM(cdelay) == TRIM(c_delaylist(ji)) ) idvar = ji
371      END DO
372      IF ( idvar == -1 )   CALL ctl_stop( 'STOP',' mpp_delay_sum : please add a new delayed exchange for '//TRIM(cdname) )
373
374      IF ( ndelayid(idvar) == 0 ) THEN         ! first call    with restart: %z1d defined in iom_delay_rst
375         !                                       --------------------------
376         IF ( SIZE(todelay(idvar)%z1d) /= isz ) THEN                  ! Check dimension coherence
377            IF(lwp) WRITE(numout,*) ' WARNING: the nb of delayed variables in restart file is not the model one'
378            DEALLOCATE(todelay(idvar)%z1d)
379            ndelayid(idvar) = -1                                      ! do as if we had no restart
380         ELSE
381            ALLOCATE(todelay(idvar)%y1d(isz))
382            todelay(idvar)%y1d(:) = CMPLX(todelay(idvar)%z1d(:), 0., wp)   ! create %y1d, complex variable needed by mpi_sumdd
383         END IF
384      ENDIF
385     
386      IF( ndelayid(idvar) == -1 ) THEN         ! first call without restart: define %y1d and %z1d from y_in with blocking allreduce
387         !                                       --------------------------
388         ALLOCATE(todelay(idvar)%z1d(isz), todelay(idvar)%y1d(isz))   ! allocate also %z1d as used for the restart
389         CALL mpi_allreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ierr )   ! get %y1d
390         todelay(idvar)%z1d(:) = REAL(todelay(idvar)%y1d(:), wp)      ! define %z1d from %y1d
391      ENDIF
392
393      IF( ndelayid(idvar) > 0 )   CALL mpp_delay_rcv( idvar )         ! make sure %z1d is received
394
395      ! send back pout from todelay(idvar)%z1d defined at previous call
396      pout(:) = todelay(idvar)%z1d(:)
397
398      ! send y_in into todelay(idvar)%y1d with a non-blocking communication
[11536]399# if defined key_mpi2
[10526]400      IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
401      CALL  mpi_allreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ndelayid(idvar), ierr )
402      IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
[11536]403# else
404      CALL mpi_iallreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ndelayid(idvar), ierr )
405# endif
[10526]406#else
[11536]407      pout(:) = REAL(y_in(:), wp)
[10526]408#endif
[10425]409
410   END SUBROUTINE mpp_delay_sum
411
[9019]412   
[10425]413   SUBROUTINE mpp_delay_max( cdname, cdelay, p_in, pout, ldlast, kcom )
[9019]414      !!----------------------------------------------------------------------
[10425]415      !!                   ***  routine mpp_delay_max  ***
[9019]416      !!
[10425]417      !! ** Purpose :   performed delayed mpp_max, the result is received on next call
[9019]418      !!
419      !!----------------------------------------------------------------------
[10425]420      CHARACTER(len=*), INTENT(in   )                 ::   cdname  ! name of the calling subroutine
421      CHARACTER(len=*), INTENT(in   )                 ::   cdelay  ! name (used as id) of the delayed operation
422      REAL(wp),         INTENT(in   ), DIMENSION(:)   ::   p_in    !
423      REAL(wp),         INTENT(  out), DIMENSION(:)   ::   pout    !
424      LOGICAL,          INTENT(in   )                 ::   ldlast  ! true if this is the last time we call this routine
425      INTEGER,          INTENT(in   ), OPTIONAL       ::   kcom
[9019]426      !!
[10425]427      INTEGER ::   ji, isz
428      INTEGER ::   idvar
429      INTEGER ::   ierr, ilocalcomm
[9019]430      !!----------------------------------------------------------------------
[11536]431#if defined key_mpp_mpi
[9570]432      ilocalcomm = mpi_comm_oce
[9019]433      IF( PRESENT(kcom) )   ilocalcomm = kcom
[6140]434
[10425]435      isz = SIZE(p_in)
[9019]436
[10437]437      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ld_dlg = .TRUE. )
[13]438
[10425]439      idvar = -1
440      DO ji = 1, nbdelay
441         IF( TRIM(cdelay) == TRIM(c_delaylist(ji)) ) idvar = ji
442      END DO
443      IF ( idvar == -1 )   CALL ctl_stop( 'STOP',' mpp_delay_max : please add a new delayed exchange for '//TRIM(cdname) )
[3]444
[10425]445      IF ( ndelayid(idvar) == 0 ) THEN         ! first call    with restart: %z1d defined in iom_delay_rst
446         !                                       --------------------------
447         IF ( SIZE(todelay(idvar)%z1d) /= isz ) THEN                  ! Check dimension coherence
448            IF(lwp) WRITE(numout,*) ' WARNING: the nb of delayed variables in restart file is not the model one'
449            DEALLOCATE(todelay(idvar)%z1d)
450            ndelayid(idvar) = -1                                      ! do as if we had no restart
451         END IF
452      ENDIF
[13]453
[10425]454      IF( ndelayid(idvar) == -1 ) THEN         ! first call without restart: define %z1d from p_in with a blocking allreduce
455         !                                       --------------------------
456         ALLOCATE(todelay(idvar)%z1d(isz))
457         CALL mpi_allreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ierr )   ! get %z1d
458      ENDIF
[3]459
[10425]460      IF( ndelayid(idvar) > 0 )   CALL mpp_delay_rcv( idvar )         ! make sure %z1d is received
[3]461
[10425]462      ! send back pout from todelay(idvar)%z1d defined at previous call
463      pout(:) = todelay(idvar)%z1d(:)
[13]464
[10425]465      ! send p_in into todelay(idvar)%z1d with a non-blocking communication
[11536]466# if defined key_mpi2
[10526]467      IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
468      CALL  mpi_allreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ndelayid(idvar), ierr )
469      IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
[11536]470# else
471      CALL mpi_iallreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ndelayid(idvar), ierr )
472# endif
[10526]473#else
[11536]474      pout(:) = p_in(:)
[10526]475#endif
[10425]476
477   END SUBROUTINE mpp_delay_max
478
479   
480   SUBROUTINE mpp_delay_rcv( kid )
481      !!----------------------------------------------------------------------
482      !!                   ***  routine mpp_delay_rcv  ***
[1344]483      !!
[10425]484      !! ** Purpose :  force barrier for delayed mpp (needed for restart)
[1344]485      !!
[10425]486      !!----------------------------------------------------------------------
487      INTEGER,INTENT(in   )      ::  kid 
488      INTEGER ::   ierr
489      !!----------------------------------------------------------------------
[11536]490#if defined key_mpp_mpi
[10425]491      IF( ndelayid(kid) /= -2 ) THEN 
[10526]492#if ! defined key_mpi2
[10425]493         IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
494         CALL mpi_wait( ndelayid(kid), MPI_STATUS_IGNORE, ierr )                        ! make sure todelay(kid) is received
495         IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
[10526]496#endif
[10425]497         IF( ASSOCIATED(todelay(kid)%y1d) )   todelay(kid)%z1d(:) = REAL(todelay(kid)%y1d(:), wp)  ! define %z1d from %y1d
498         ndelayid(kid) = -2   ! add flag to know that mpi_wait was already called on kid
499      ENDIF
[11536]500#endif
[10425]501   END SUBROUTINE mpp_delay_rcv
[3]502
[10425]503   
504   !!----------------------------------------------------------------------
505   !!    ***  mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real  ***
506   !!   
507   !!----------------------------------------------------------------------
508   !!
509#  define OPERATION_MAX
510#  define INTEGER_TYPE
511#  define DIM_0d
512#     define ROUTINE_ALLREDUCE           mppmax_int
513#     include "mpp_allreduce_generic.h90"
514#     undef ROUTINE_ALLREDUCE
515#  undef DIM_0d
516#  define DIM_1d
517#     define ROUTINE_ALLREDUCE           mppmax_a_int
518#     include "mpp_allreduce_generic.h90"
519#     undef ROUTINE_ALLREDUCE
520#  undef DIM_1d
521#  undef INTEGER_TYPE
522!
523#  define REAL_TYPE
524#  define DIM_0d
525#     define ROUTINE_ALLREDUCE           mppmax_real
526#     include "mpp_allreduce_generic.h90"
527#     undef ROUTINE_ALLREDUCE
528#  undef DIM_0d
529#  define DIM_1d
530#     define ROUTINE_ALLREDUCE           mppmax_a_real
531#     include "mpp_allreduce_generic.h90"
532#     undef ROUTINE_ALLREDUCE
533#  undef DIM_1d
534#  undef REAL_TYPE
535#  undef OPERATION_MAX
536   !!----------------------------------------------------------------------
537   !!    ***  mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real  ***
538   !!   
539   !!----------------------------------------------------------------------
540   !!
541#  define OPERATION_MIN
542#  define INTEGER_TYPE
543#  define DIM_0d
544#     define ROUTINE_ALLREDUCE           mppmin_int
545#     include "mpp_allreduce_generic.h90"
546#     undef ROUTINE_ALLREDUCE
547#  undef DIM_0d
548#  define DIM_1d
549#     define ROUTINE_ALLREDUCE           mppmin_a_int
550#     include "mpp_allreduce_generic.h90"
551#     undef ROUTINE_ALLREDUCE
552#  undef DIM_1d
553#  undef INTEGER_TYPE
554!
555#  define REAL_TYPE
556#  define DIM_0d
557#     define ROUTINE_ALLREDUCE           mppmin_real
558#     include "mpp_allreduce_generic.h90"
559#     undef ROUTINE_ALLREDUCE
560#  undef DIM_0d
561#  define DIM_1d
562#     define ROUTINE_ALLREDUCE           mppmin_a_real
563#     include "mpp_allreduce_generic.h90"
564#     undef ROUTINE_ALLREDUCE
565#  undef DIM_1d
566#  undef REAL_TYPE
567#  undef OPERATION_MIN
[869]568
[10425]569   !!----------------------------------------------------------------------
570   !!    ***  mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real  ***
571   !!   
572   !!   Global sum of 1D array or a variable (integer, real or complex)
573   !!----------------------------------------------------------------------
574   !!
575#  define OPERATION_SUM
576#  define INTEGER_TYPE
577#  define DIM_0d
578#     define ROUTINE_ALLREDUCE           mppsum_int
579#     include "mpp_allreduce_generic.h90"
580#     undef ROUTINE_ALLREDUCE
581#  undef DIM_0d
582#  define DIM_1d
583#     define ROUTINE_ALLREDUCE           mppsum_a_int
584#     include "mpp_allreduce_generic.h90"
585#     undef ROUTINE_ALLREDUCE
586#  undef DIM_1d
587#  undef INTEGER_TYPE
588!
589#  define REAL_TYPE
590#  define DIM_0d
591#     define ROUTINE_ALLREDUCE           mppsum_real
592#     include "mpp_allreduce_generic.h90"
593#     undef ROUTINE_ALLREDUCE
594#  undef DIM_0d
595#  define DIM_1d
596#     define ROUTINE_ALLREDUCE           mppsum_a_real
597#     include "mpp_allreduce_generic.h90"
598#     undef ROUTINE_ALLREDUCE
599#  undef DIM_1d
600#  undef REAL_TYPE
601#  undef OPERATION_SUM
602
603#  define OPERATION_SUM_DD
604#  define COMPLEX_TYPE
605#  define DIM_0d
606#     define ROUTINE_ALLREDUCE           mppsum_realdd
607#     include "mpp_allreduce_generic.h90"
608#     undef ROUTINE_ALLREDUCE
609#  undef DIM_0d
610#  define DIM_1d
611#     define ROUTINE_ALLREDUCE           mppsum_a_realdd
612#     include "mpp_allreduce_generic.h90"
613#     undef ROUTINE_ALLREDUCE
614#  undef DIM_1d
615#  undef COMPLEX_TYPE
616#  undef OPERATION_SUM_DD
617
618   !!----------------------------------------------------------------------
619   !!    ***  mpp_minloc2d, mpp_minloc3d, mpp_maxloc2d, mpp_maxloc3d
620   !!   
621   !!----------------------------------------------------------------------
622   !!
623#  define OPERATION_MINLOC
624#  define DIM_2d
625#     define ROUTINE_LOC           mpp_minloc2d
626#     include "mpp_loc_generic.h90"
627#     undef ROUTINE_LOC
628#  undef DIM_2d
629#  define DIM_3d
630#     define ROUTINE_LOC           mpp_minloc3d
631#     include "mpp_loc_generic.h90"
632#     undef ROUTINE_LOC
633#  undef DIM_3d
634#  undef OPERATION_MINLOC
635
636#  define OPERATION_MAXLOC
637#  define DIM_2d
638#     define ROUTINE_LOC           mpp_maxloc2d
639#     include "mpp_loc_generic.h90"
640#     undef ROUTINE_LOC
641#  undef DIM_2d
642#  define DIM_3d
643#     define ROUTINE_LOC           mpp_maxloc3d
644#     include "mpp_loc_generic.h90"
645#     undef ROUTINE_LOC
646#  undef DIM_3d
647#  undef OPERATION_MAXLOC
648
[1344]649   SUBROUTINE mppsync()
650      !!----------------------------------------------------------------------
651      !!                  ***  routine mppsync  ***
[3764]652      !!
[1344]653      !! ** Purpose :   Massively parallel processors, synchroneous
654      !!
655      !!-----------------------------------------------------------------------
656      INTEGER :: ierror
657      !!-----------------------------------------------------------------------
658      !
[11536]659#if defined key_mpp_mpi
[9570]660      CALL mpi_barrier( mpi_comm_oce, ierror )
[11536]661#endif
[1344]662      !
663   END SUBROUTINE mppsync
[3]664
665
[11536]666   SUBROUTINE mppstop( ld_abort ) 
[1344]667      !!----------------------------------------------------------------------
668      !!                  ***  routine mppstop  ***
[3764]669      !!
[3294]670      !! ** purpose :   Stop massively parallel processors method
[1344]671      !!
672      !!----------------------------------------------------------------------
[11536]673      LOGICAL, OPTIONAL, INTENT(in) :: ld_abort    ! source process number
674      LOGICAL ::   ll_abort
[1344]675      INTEGER ::   info
676      !!----------------------------------------------------------------------
[11536]677      ll_abort = .FALSE.
678      IF( PRESENT(ld_abort) ) ll_abort = ld_abort
[1344]679      !
[11536]680#if defined key_mpp_mpi
681      IF(ll_abort) THEN
[10425]682         CALL mpi_abort( MPI_COMM_WORLD )
683      ELSE
684         CALL mppsync
685         CALL mpi_finalize( info )
686      ENDIF
[11536]687#endif
688      IF( ll_abort ) STOP 123
[1344]689      !
690   END SUBROUTINE mppstop
[3]691
692
[1344]693   SUBROUTINE mpp_comm_free( kcom )
694      !!----------------------------------------------------------------------
695      INTEGER, INTENT(in) ::   kcom
696      !!
697      INTEGER :: ierr
698      !!----------------------------------------------------------------------
699      !
[11536]700#if defined key_mpp_mpi
[1344]701      CALL MPI_COMM_FREE(kcom, ierr)
[11536]702#endif
[1344]703      !
704   END SUBROUTINE mpp_comm_free
[3]705
[869]706
[2715]707   SUBROUTINE mpp_ini_znl( kumout )
[1345]708      !!----------------------------------------------------------------------
709      !!               ***  routine mpp_ini_znl  ***
710      !!
711      !! ** Purpose :   Initialize special communicator for computing zonal sum
712      !!
713      !! ** Method  : - Look for processors in the same row
714      !!              - Put their number in nrank_znl
715      !!              - Create group for the znl processors
716      !!              - Create a communicator for znl processors
717      !!              - Determine if processor should write znl files
718      !!
719      !! ** output
720      !!      ndim_rank_znl = number of processors on the same row
721      !!      ngrp_znl = group ID for the znl processors
722      !!      ncomm_znl = communicator for the ice procs.
723      !!      n_znl_root = number (in the world) of proc 0 in the ice comm.
724      !!
725      !!----------------------------------------------------------------------
[2715]726      INTEGER, INTENT(in) ::   kumout   ! ocean.output logical units
[1345]727      !
[2715]728      INTEGER :: jproc      ! dummy loop integer
729      INTEGER :: ierr, ii   ! local integer
730      INTEGER, ALLOCATABLE, DIMENSION(:) ::   kwork
731      !!----------------------------------------------------------------------
[11536]732#if defined key_mpp_mpi
[1345]733      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_world     : ', ngrp_world
734      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_world : ', mpi_comm_world
[9570]735      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_oce   : ', mpi_comm_oce
[1345]736      !
[2715]737      ALLOCATE( kwork(jpnij), STAT=ierr )
[11536]738      IF( ierr /= 0 ) CALL ctl_stop( 'STOP', 'mpp_ini_znl : failed to allocate 1D array of length jpnij')
[2715]739
740      IF( jpnj == 1 ) THEN
[1345]741         ngrp_znl  = ngrp_world
[9570]742         ncomm_znl = mpi_comm_oce
[1345]743      ELSE
744         !
[9570]745         CALL MPI_ALLGATHER ( njmpp, 1, mpi_integer, kwork, 1, mpi_integer, mpi_comm_oce, ierr )
[1345]746         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - kwork pour njmpp : ', kwork
747         !-$$        CALL flush(numout)
748         !
749         ! Count number of processors on the same row
750         ndim_rank_znl = 0
751         DO jproc=1,jpnij
752            IF ( kwork(jproc) == njmpp ) THEN
753               ndim_rank_znl = ndim_rank_znl + 1
754            ENDIF
755         END DO
756         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ndim_rank_znl : ', ndim_rank_znl
757         !-$$        CALL flush(numout)
758         ! Allocate the right size to nrank_znl
[1441]759         IF (ALLOCATED (nrank_znl)) DEALLOCATE(nrank_znl)
[1345]760         ALLOCATE(nrank_znl(ndim_rank_znl))
[3764]761         ii = 0
[1345]762         nrank_znl (:) = 0
763         DO jproc=1,jpnij
764            IF ( kwork(jproc) == njmpp) THEN
765               ii = ii + 1
[3764]766               nrank_znl(ii) = jproc -1
[1345]767            ENDIF
768         END DO
769         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - nrank_znl : ', nrank_znl
770         !-$$        CALL flush(numout)
771
772         ! Create the opa group
[9570]773         CALL MPI_COMM_GROUP(mpi_comm_oce,ngrp_opa,ierr)
[1345]774         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_opa : ', ngrp_opa
775         !-$$        CALL flush(numout)
776
777         ! Create the znl group from the opa group
778         CALL MPI_GROUP_INCL  ( ngrp_opa, ndim_rank_znl, nrank_znl, ngrp_znl, ierr )
779         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_znl ', ngrp_znl
780         !-$$        CALL flush(numout)
781
782         ! Create the znl communicator from the opa communicator, ie the pool of procs in the same row
[9570]783         CALL MPI_COMM_CREATE ( mpi_comm_oce, ngrp_znl, ncomm_znl, ierr )
[1345]784         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ncomm_znl ', ncomm_znl
785         !-$$        CALL flush(numout)
786         !
787      END IF
788
789      ! Determines if processor if the first (starting from i=1) on the row
[3764]790      IF ( jpni == 1 ) THEN
[1345]791         l_znl_root = .TRUE.
792      ELSE
793         l_znl_root = .FALSE.
794         kwork (1) = nimpp
[10425]795         CALL mpp_min ( 'lib_mpp', kwork(1), kcom = ncomm_znl)
[1345]796         IF ( nimpp == kwork(1)) l_znl_root = .TRUE.
797      END IF
798
[2715]799      DEALLOCATE(kwork)
[11536]800#endif
[2715]801
[1345]802   END SUBROUTINE mpp_ini_znl
803
804
[1344]805   SUBROUTINE mpp_ini_north
806      !!----------------------------------------------------------------------
807      !!               ***  routine mpp_ini_north  ***
808      !!
[3764]809      !! ** Purpose :   Initialize special communicator for north folding
[1344]810      !!      condition together with global variables needed in the mpp folding
811      !!
812      !! ** Method  : - Look for northern processors
813      !!              - Put their number in nrank_north
814      !!              - Create groups for the world processors and the north processors
815      !!              - Create a communicator for northern processors
816      !!
817      !! ** output
818      !!      njmppmax = njmpp for northern procs
819      !!      ndim_rank_north = number of processors in the northern line
820      !!      nrank_north (ndim_rank_north) = number  of the northern procs.
821      !!      ngrp_world = group ID for the world processors
822      !!      ngrp_north = group ID for the northern processors
823      !!      ncomm_north = communicator for the northern procs.
824      !!      north_root = number (in the world) of proc 0 in the northern comm.
825      !!
826      !!----------------------------------------------------------------------
827      INTEGER ::   ierr
828      INTEGER ::   jjproc
829      INTEGER ::   ii, ji
830      !!----------------------------------------------------------------------
831      !
[11536]832#if defined key_mpp_mpi
[1344]833      njmppmax = MAXVAL( njmppt )
834      !
835      ! Look for how many procs on the northern boundary
836      ndim_rank_north = 0
837      DO jjproc = 1, jpnij
838         IF( njmppt(jjproc) == njmppmax )   ndim_rank_north = ndim_rank_north + 1
839      END DO
840      !
841      ! Allocate the right size to nrank_north
[1441]842      IF (ALLOCATED (nrank_north)) DEALLOCATE(nrank_north)
[1344]843      ALLOCATE( nrank_north(ndim_rank_north) )
[869]844
[1344]845      ! Fill the nrank_north array with proc. number of northern procs.
846      ! Note : the rank start at 0 in MPI
847      ii = 0
848      DO ji = 1, jpnij
849         IF ( njmppt(ji) == njmppmax   ) THEN
850            ii=ii+1
851            nrank_north(ii)=ji-1
852         END IF
853      END DO
854      !
855      ! create the world group
[9570]856      CALL MPI_COMM_GROUP( mpi_comm_oce, ngrp_world, ierr )
[1344]857      !
858      ! Create the North group from the world group
859      CALL MPI_GROUP_INCL( ngrp_world, ndim_rank_north, nrank_north, ngrp_north, ierr )
860      !
861      ! Create the North communicator , ie the pool of procs in the north group
[9570]862      CALL MPI_COMM_CREATE( mpi_comm_oce, ngrp_north, ncomm_north, ierr )
[1344]863      !
[11536]864#endif
[1344]865   END SUBROUTINE mpp_ini_north
[869]866
867
[9019]868   SUBROUTINE DDPDD_MPI( ydda, yddb, ilen, itype )
[1976]869      !!---------------------------------------------------------------------
870      !!   Routine DDPDD_MPI: used by reduction operator MPI_SUMDD
871      !!
872      !!   Modification of original codes written by David H. Bailey
873      !!   This subroutine computes yddb(i) = ydda(i)+yddb(i)
874      !!---------------------------------------------------------------------
[9019]875      INTEGER                     , INTENT(in)    ::   ilen, itype
876      COMPLEX(wp), DIMENSION(ilen), INTENT(in)    ::   ydda
877      COMPLEX(wp), DIMENSION(ilen), INTENT(inout) ::   yddb
[1976]878      !
879      REAL(wp) :: zerr, zt1, zt2    ! local work variables
[9019]880      INTEGER  :: ji, ztmp           ! local scalar
881      !!---------------------------------------------------------------------
882      !
[1976]883      ztmp = itype   ! avoid compilation warning
[9019]884      !
[1976]885      DO ji=1,ilen
886      ! Compute ydda + yddb using Knuth's trick.
887         zt1  = real(ydda(ji)) + real(yddb(ji))
888         zerr = zt1 - real(ydda(ji))
889         zt2  = ((real(yddb(ji)) - zerr) + (real(ydda(ji)) - (zt1 - zerr))) &
890                + aimag(ydda(ji)) + aimag(yddb(ji))
891
892         ! The result is zt1 + zt2, after normalization.
893         yddb(ji) = cmplx ( zt1 + zt2, zt2 - ((zt1 + zt2) - zt1),wp )
894      END DO
[9019]895      !
[1976]896   END SUBROUTINE DDPDD_MPI
897
[6140]898
[10437]899   SUBROUTINE mpp_report( cdname, kpk, kpl, kpf, ld_lbc, ld_glb, ld_dlg )
[10425]900      !!----------------------------------------------------------------------
901      !!                  ***  routine mpp_report  ***
902      !!
903      !! ** Purpose :   report use of mpp routines per time-setp
904      !!
905      !!----------------------------------------------------------------------
906      CHARACTER(len=*),           INTENT(in   ) ::   cdname      ! name of the calling subroutine
907      INTEGER         , OPTIONAL, INTENT(in   ) ::   kpk, kpl, kpf
[10437]908      LOGICAL         , OPTIONAL, INTENT(in   ) ::   ld_lbc, ld_glb, ld_dlg
[10425]909      !!
[10982]910      CHARACTER(len=128)                        ::   ccountname  ! name of a subroutine to count communications
[10437]911      LOGICAL ::   ll_lbc, ll_glb, ll_dlg
[10982]912      INTEGER ::    ji,  jj,  jk,  jh, jf, jcount   ! dummy loop indices
[10425]913      !!----------------------------------------------------------------------
[11536]914#if defined key_mpp_mpi
[10425]915      !
916      ll_lbc = .FALSE.
917      IF( PRESENT(ld_lbc) ) ll_lbc = ld_lbc
918      ll_glb = .FALSE.
919      IF( PRESENT(ld_glb) ) ll_glb = ld_glb
[10437]920      ll_dlg = .FALSE.
921      IF( PRESENT(ld_dlg) ) ll_dlg = ld_dlg
[10425]922      !
923      ! find the smallest common frequency: default = frequency product, if multiple, choose the larger of the 2 frequency
924      IF( ncom_dttrc /= 1 )   CALL ctl_stop( 'STOP', 'mpp_report, ncom_dttrc /= 1 not coded...' ) 
925      ncom_freq = ncom_fsbc
926      !
927      IF ( ncom_stp == nit000+ncom_freq ) THEN   ! avoid to count extra communications in potential initializations at nit000
928         IF( ll_lbc ) THEN
929            IF( .NOT. ALLOCATED(ncomm_sequence) ) ALLOCATE( ncomm_sequence(ncom_rec_max,2) )
930            IF( .NOT. ALLOCATED(    crname_lbc) ) ALLOCATE(     crname_lbc(ncom_rec_max  ) )
931            n_sequence_lbc = n_sequence_lbc + 1
932            IF( n_sequence_lbc > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
933            crname_lbc(n_sequence_lbc) = cdname     ! keep the name of the calling routine
934            ncomm_sequence(n_sequence_lbc,1) = kpk*kpl   ! size of 3rd and 4th dimensions
935            ncomm_sequence(n_sequence_lbc,2) = kpf       ! number of arrays to be treated (multi)
936         ENDIF
937         IF( ll_glb ) THEN
938            IF( .NOT. ALLOCATED(crname_glb) ) ALLOCATE( crname_glb(ncom_rec_max) )
939            n_sequence_glb = n_sequence_glb + 1
940            IF( n_sequence_glb > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
941            crname_glb(n_sequence_glb) = cdname     ! keep the name of the calling routine
942         ENDIF
[10437]943         IF( ll_dlg ) THEN
944            IF( .NOT. ALLOCATED(crname_dlg) ) ALLOCATE( crname_dlg(ncom_rec_max) )
945            n_sequence_dlg = n_sequence_dlg + 1
946            IF( n_sequence_dlg > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
947            crname_dlg(n_sequence_dlg) = cdname     ! keep the name of the calling routine
948         ENDIF
[10425]949      ELSE IF ( ncom_stp == nit000+2*ncom_freq ) THEN
950         CALL ctl_opn( numcom, 'communication_report.txt', 'REPLACE', 'FORMATTED', 'SEQUENTIAL', -1, numout, .FALSE., narea )
951         WRITE(numcom,*) ' '
952         WRITE(numcom,*) ' ------------------------------------------------------------'
953         WRITE(numcom,*) ' Communication pattern report (second oce+sbc+top time step):'
954         WRITE(numcom,*) ' ------------------------------------------------------------'
955         WRITE(numcom,*) ' '
956         WRITE(numcom,'(A,I4)') ' Exchanged halos : ', n_sequence_lbc
957         jj = 0; jk = 0; jf = 0; jh = 0
958         DO ji = 1, n_sequence_lbc
959            IF ( ncomm_sequence(ji,1) .GT. 1 ) jk = jk + 1
960            IF ( ncomm_sequence(ji,2) .GT. 1 ) jf = jf + 1
961            IF ( ncomm_sequence(ji,1) .GT. 1 .AND. ncomm_sequence(ji,2) .GT. 1 ) jj = jj + 1
962            jh = MAX (jh, ncomm_sequence(ji,1)*ncomm_sequence(ji,2))
963         END DO
964         WRITE(numcom,'(A,I3)') ' 3D Exchanged halos : ', jk
965         WRITE(numcom,'(A,I3)') ' Multi arrays exchanged halos : ', jf
966         WRITE(numcom,'(A,I3)') '   from which 3D : ', jj
967         WRITE(numcom,'(A,I10)') ' Array max size : ', jh*jpi*jpj
968         WRITE(numcom,*) ' '
969         WRITE(numcom,*) ' lbc_lnk called'
[10982]970         DO ji = 1, n_sequence_lbc - 1
971            IF ( crname_lbc(ji) /= 'already counted' ) THEN
972               ccountname = crname_lbc(ji)
973               crname_lbc(ji) = 'already counted'
974               jcount = 1
975               DO jj = ji + 1, n_sequence_lbc
976                  IF ( ccountname ==  crname_lbc(jj) ) THEN
977                     jcount = jcount + 1
978                     crname_lbc(jj) = 'already counted'
979                  END IF
980               END DO
981               WRITE(numcom,'(A, I4, A, A)') ' - ', jcount,' times by subroutine ', TRIM(ccountname)
[10425]982            END IF
983         END DO
[10982]984         IF ( crname_lbc(n_sequence_lbc) /= 'already counted' ) THEN
985            WRITE(numcom,'(A, I4, A, A)') ' - ', 1,' times by subroutine ', TRIM(crname_lbc(ncom_rec_max))
986         END IF
[10425]987         WRITE(numcom,*) ' '
988         IF ( n_sequence_glb > 0 ) THEN
989            WRITE(numcom,'(A,I4)') ' Global communications : ', n_sequence_glb
990            jj = 1
991            DO ji = 2, n_sequence_glb
992               IF( crname_glb(ji-1) /= crname_glb(ji) ) THEN
993                  WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(ji-1))
994                  jj = 0
995               END IF
996               jj = jj + 1 
997            END DO
998            WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(n_sequence_glb))
999            DEALLOCATE(crname_glb)
1000         ELSE
1001            WRITE(numcom,*) ' No MPI global communication '
1002         ENDIF
1003         WRITE(numcom,*) ' '
[10437]1004         IF ( n_sequence_dlg > 0 ) THEN
1005            WRITE(numcom,'(A,I4)') ' Delayed global communications : ', n_sequence_dlg
1006            jj = 1
1007            DO ji = 2, n_sequence_dlg
1008               IF( crname_dlg(ji-1) /= crname_dlg(ji) ) THEN
1009                  WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_dlg(ji-1))
1010                  jj = 0
1011               END IF
1012               jj = jj + 1 
1013            END DO
1014            WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_dlg(n_sequence_dlg))
1015            DEALLOCATE(crname_dlg)
1016         ELSE
1017            WRITE(numcom,*) ' No MPI delayed global communication '
1018         ENDIF
1019         WRITE(numcom,*) ' '
[10425]1020         WRITE(numcom,*) ' -----------------------------------------------'
1021         WRITE(numcom,*) ' '
1022         DEALLOCATE(ncomm_sequence)
1023         DEALLOCATE(crname_lbc)
1024      ENDIF
[11536]1025#endif
[10425]1026   END SUBROUTINE mpp_report
1027
[6140]1028   
[10425]1029   SUBROUTINE tic_tac (ld_tic, ld_global)
1030
1031    LOGICAL,           INTENT(IN) :: ld_tic
1032    LOGICAL, OPTIONAL, INTENT(IN) :: ld_global
1033    REAL(wp), DIMENSION(2), SAVE :: tic_wt
1034    REAL(wp),               SAVE :: tic_ct = 0._wp
1035    INTEGER :: ii
[11536]1036#if defined key_mpp_mpi
[10425]1037
1038    IF( ncom_stp <= nit000 ) RETURN
1039    IF( ncom_stp == nitend ) RETURN
1040    ii = 1
1041    IF( PRESENT( ld_global ) ) THEN
1042       IF( ld_global ) ii = 2
1043    END IF
1044   
1045    IF ( ld_tic ) THEN
1046       tic_wt(ii) = MPI_Wtime()                                                    ! start count tic->tac (waiting time)
1047       IF ( tic_ct > 0.0_wp ) compute_time = compute_time + MPI_Wtime() - tic_ct   ! cumulate count tac->tic
1048    ELSE
1049       waiting_time(ii) = waiting_time(ii) + MPI_Wtime() - tic_wt(ii)              ! cumulate count tic->tac
1050       tic_ct = MPI_Wtime()                                                        ! start count tac->tic (waiting time)
1051    ENDIF
[11536]1052#endif
[10425]1053   
1054   END SUBROUTINE tic_tac
1055
[11536]1056#if ! defined key_mpp_mpi
1057   SUBROUTINE mpi_wait(request, status, ierror)
1058      INTEGER                            , INTENT(in   ) ::   request
1059      INTEGER, DIMENSION(MPI_STATUS_SIZE), INTENT(  out) ::   status
1060      INTEGER                            , INTENT(  out) ::   ierror
1061   END SUBROUTINE mpi_wait
[1976]1062
[10425]1063   
[11536]1064   FUNCTION MPI_Wtime()
1065      REAL(wp) ::  MPI_Wtime
1066      MPI_Wtime = -1.
1067   END FUNCTION MPI_Wtime
[3]1068#endif
[2715]1069
[13]1070   !!----------------------------------------------------------------------
[11624]1071   !!   ctl_stop, ctl_warn, get_unit, ctl_opn, ctl_nam, load_nml   routines
[2715]1072   !!----------------------------------------------------------------------
1073
1074   SUBROUTINE ctl_stop( cd1, cd2, cd3, cd4, cd5 ,   &
1075      &                 cd6, cd7, cd8, cd9, cd10 )
1076      !!----------------------------------------------------------------------
1077      !!                  ***  ROUTINE  stop_opa  ***
1078      !!
[3764]1079      !! ** Purpose :   print in ocean.outpput file a error message and
[2715]1080      !!                increment the error number (nstop) by one.
1081      !!----------------------------------------------------------------------
[11536]1082      CHARACTER(len=*), INTENT(in   )           ::   cd1
1083      CHARACTER(len=*), INTENT(in   ), OPTIONAL ::        cd2, cd3, cd4, cd5
1084      CHARACTER(len=*), INTENT(in   ), OPTIONAL ::   cd6, cd7, cd8, cd9, cd10
[2715]1085      !!----------------------------------------------------------------------
1086      !
[3764]1087      nstop = nstop + 1
[11536]1088      !
1089      ! force to open ocean.output file if not already opened
[10425]1090      IF( numout == 6 ) CALL ctl_opn( numout, 'ocean.output', 'APPEND', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. )
[11536]1091      !
1092                            WRITE(numout,*)
1093                            WRITE(numout,*) ' ===>>> : E R R O R'
1094                            WRITE(numout,*)
1095                            WRITE(numout,*) '         ==========='
1096                            WRITE(numout,*)
1097                            WRITE(numout,*) TRIM(cd1)
[10425]1098      IF( PRESENT(cd2 ) )   WRITE(numout,*) TRIM(cd2)
1099      IF( PRESENT(cd3 ) )   WRITE(numout,*) TRIM(cd3)
1100      IF( PRESENT(cd4 ) )   WRITE(numout,*) TRIM(cd4)
1101      IF( PRESENT(cd5 ) )   WRITE(numout,*) TRIM(cd5)
1102      IF( PRESENT(cd6 ) )   WRITE(numout,*) TRIM(cd6)
1103      IF( PRESENT(cd7 ) )   WRITE(numout,*) TRIM(cd7)
1104      IF( PRESENT(cd8 ) )   WRITE(numout,*) TRIM(cd8)
1105      IF( PRESENT(cd9 ) )   WRITE(numout,*) TRIM(cd9)
1106      IF( PRESENT(cd10) )   WRITE(numout,*) TRIM(cd10)
[11536]1107                            WRITE(numout,*)
1108      !
[2715]1109                               CALL FLUSH(numout    )
1110      IF( numstp     /= -1 )   CALL FLUSH(numstp    )
[9019]1111      IF( numrun     /= -1 )   CALL FLUSH(numrun    )
[2715]1112      IF( numevo_ice /= -1 )   CALL FLUSH(numevo_ice)
1113      !
1114      IF( cd1 == 'STOP' ) THEN
[11536]1115         WRITE(numout,*) 
[10425]1116         WRITE(numout,*)  'huge E-R-R-O-R : immediate stop'
[11536]1117         WRITE(numout,*) 
1118         CALL mppstop( ld_abort = .true. )
[2715]1119      ENDIF
1120      !
1121   END SUBROUTINE ctl_stop
1122
1123
1124   SUBROUTINE ctl_warn( cd1, cd2, cd3, cd4, cd5,   &
1125      &                 cd6, cd7, cd8, cd9, cd10 )
1126      !!----------------------------------------------------------------------
1127      !!                  ***  ROUTINE  stop_warn  ***
1128      !!
[3764]1129      !! ** Purpose :   print in ocean.outpput file a error message and
[2715]1130      !!                increment the warning number (nwarn) by one.
1131      !!----------------------------------------------------------------------
1132      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd1, cd2, cd3, cd4, cd5
1133      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd6, cd7, cd8, cd9, cd10
1134      !!----------------------------------------------------------------------
[3764]1135      !
1136      nwarn = nwarn + 1
[11536]1137      !
[2715]1138      IF(lwp) THEN
[11536]1139                               WRITE(numout,*)
1140                               WRITE(numout,*) ' ===>>> : W A R N I N G'
1141                               WRITE(numout,*)
1142                               WRITE(numout,*) '         ==============='
1143                               WRITE(numout,*)
1144         IF( PRESENT(cd1 ) )   WRITE(numout,*) TRIM(cd1)
1145         IF( PRESENT(cd2 ) )   WRITE(numout,*) TRIM(cd2)
1146         IF( PRESENT(cd3 ) )   WRITE(numout,*) TRIM(cd3)
1147         IF( PRESENT(cd4 ) )   WRITE(numout,*) TRIM(cd4)
1148         IF( PRESENT(cd5 ) )   WRITE(numout,*) TRIM(cd5)
1149         IF( PRESENT(cd6 ) )   WRITE(numout,*) TRIM(cd6)
1150         IF( PRESENT(cd7 ) )   WRITE(numout,*) TRIM(cd7)
1151         IF( PRESENT(cd8 ) )   WRITE(numout,*) TRIM(cd8)
1152         IF( PRESENT(cd9 ) )   WRITE(numout,*) TRIM(cd9)
1153         IF( PRESENT(cd10) )   WRITE(numout,*) TRIM(cd10)
1154                               WRITE(numout,*)
[2715]1155      ENDIF
1156      CALL FLUSH(numout)
1157      !
1158   END SUBROUTINE ctl_warn
1159
1160
1161   SUBROUTINE ctl_opn( knum, cdfile, cdstat, cdform, cdacce, klengh, kout, ldwp, karea )
1162      !!----------------------------------------------------------------------
1163      !!                  ***  ROUTINE ctl_opn  ***
1164      !!
1165      !! ** Purpose :   Open file and check if required file is available.
1166      !!
1167      !! ** Method  :   Fortan open
1168      !!----------------------------------------------------------------------
1169      INTEGER          , INTENT(  out) ::   knum      ! logical unit to open
1170      CHARACTER(len=*) , INTENT(in   ) ::   cdfile    ! file name to open
1171      CHARACTER(len=*) , INTENT(in   ) ::   cdstat    ! disposition specifier
1172      CHARACTER(len=*) , INTENT(in   ) ::   cdform    ! formatting specifier
1173      CHARACTER(len=*) , INTENT(in   ) ::   cdacce    ! access specifier
1174      INTEGER          , INTENT(in   ) ::   klengh    ! record length
1175      INTEGER          , INTENT(in   ) ::   kout      ! number of logical units for write
1176      LOGICAL          , INTENT(in   ) ::   ldwp      ! boolean term for print
1177      INTEGER, OPTIONAL, INTENT(in   ) ::   karea     ! proc number
[5836]1178      !
[2715]1179      CHARACTER(len=80) ::   clfile
1180      INTEGER           ::   iost
1181      !!----------------------------------------------------------------------
[5836]1182      !
[2715]1183      ! adapt filename
1184      ! ----------------
1185      clfile = TRIM(cdfile)
1186      IF( PRESENT( karea ) ) THEN
1187         IF( karea > 1 )   WRITE(clfile, "(a,'_',i4.4)") TRIM(clfile), karea-1
1188      ENDIF
1189#if defined key_agrif
1190      IF( .NOT. Agrif_Root() )   clfile = TRIM(Agrif_CFixed())//'_'//TRIM(clfile)
1191      knum=Agrif_Get_Unit()
1192#else
1193      knum=get_unit()
1194#endif
[10425]1195      IF( TRIM(cdfile) == '/dev/null' )   clfile = TRIM(cdfile)   ! force the use of /dev/null
[5836]1196      !
[11536]1197      IF(       cdacce(1:6) == 'DIRECT' )  THEN   ! cdacce has always more than 6 characters
[10425]1198         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat, RECL=klengh         , ERR=100, IOSTAT=iost )
1199      ELSE IF( TRIM(cdstat) == 'APPEND' )  THEN   ! cdstat can have less than 6 characters
1200         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS='UNKNOWN', POSITION='APPEND', ERR=100, IOSTAT=iost )
[2715]1201      ELSE
[10425]1202         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )
[2715]1203      ENDIF
[10425]1204      IF( iost /= 0 .AND. TRIM(clfile) == '/dev/null' ) &   ! for windows
1205         &  OPEN(UNIT=knum,FILE='NUL', FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )   
[2715]1206      IF( iost == 0 ) THEN
1207         IF(ldwp) THEN
[10425]1208            WRITE(kout,*) '     file   : ', TRIM(clfile),' open ok'
[2715]1209            WRITE(kout,*) '     unit   = ', knum
1210            WRITE(kout,*) '     status = ', cdstat
1211            WRITE(kout,*) '     form   = ', cdform
1212            WRITE(kout,*) '     access = ', cdacce
1213            WRITE(kout,*)
1214         ENDIF
1215      ENDIF
1216100   CONTINUE
1217      IF( iost /= 0 ) THEN
[11536]1218         WRITE(ctmp1,*) ' ===>>>> : bad opening file: ', TRIM(clfile)
1219         WRITE(ctmp2,*) ' =======   ===  '
1220         WRITE(ctmp3,*) '           unit   = ', knum
1221         WRITE(ctmp4,*) '           status = ', cdstat
1222         WRITE(ctmp5,*) '           form   = ', cdform
1223         WRITE(ctmp6,*) '           access = ', cdacce
1224         WRITE(ctmp7,*) '           iostat = ', iost
1225         WRITE(ctmp8,*) '           we stop. verify the file '
1226         CALL ctl_stop( 'STOP', ctmp1, ctmp2, ctmp3, ctmp4, ctmp5, ctmp6, ctmp7, ctmp8 )
[2715]1227      ENDIF
[5836]1228      !
[2715]1229   END SUBROUTINE ctl_opn
1230
[5836]1231
[11536]1232   SUBROUTINE ctl_nam ( kios, cdnam )
[4147]1233      !!----------------------------------------------------------------------
1234      !!                  ***  ROUTINE ctl_nam  ***
1235      !!
1236      !! ** Purpose :   Informations when error while reading a namelist
1237      !!
1238      !! ** Method  :   Fortan open
1239      !!----------------------------------------------------------------------
[11536]1240      INTEGER                                , INTENT(inout) ::   kios    ! IO status after reading the namelist
1241      CHARACTER(len=*)                       , INTENT(in   ) ::   cdnam   ! group name of namelist for which error occurs
1242      !
1243      CHARACTER(len=5) ::   clios   ! string to convert iostat in character for print
[4147]1244      !!----------------------------------------------------------------------
[5836]1245      !
[7646]1246      WRITE (clios, '(I5.0)')   kios
[4147]1247      IF( kios < 0 ) THEN         
[5836]1248         CALL ctl_warn( 'end of record or file while reading namelist '   &
1249            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
[4147]1250      ENDIF
[5836]1251      !
[4147]1252      IF( kios > 0 ) THEN
[5836]1253         CALL ctl_stop( 'misspelled variable in namelist '   &
1254            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
[4147]1255      ENDIF
1256      kios = 0
[5836]1257      !
[4147]1258   END SUBROUTINE ctl_nam
1259
[5836]1260
[2715]1261   INTEGER FUNCTION get_unit()
1262      !!----------------------------------------------------------------------
1263      !!                  ***  FUNCTION  get_unit  ***
1264      !!
1265      !! ** Purpose :   return the index of an unused logical unit
1266      !!----------------------------------------------------------------------
[3764]1267      LOGICAL :: llopn
[2715]1268      !!----------------------------------------------------------------------
1269      !
1270      get_unit = 15   ! choose a unit that is big enough then it is not already used in NEMO
1271      llopn = .TRUE.
1272      DO WHILE( (get_unit < 998) .AND. llopn )
1273         get_unit = get_unit + 1
1274         INQUIRE( unit = get_unit, opened = llopn )
1275      END DO
1276      IF( (get_unit == 999) .AND. llopn ) THEN
[11536]1277         CALL ctl_stop( 'STOP', 'get_unit: All logical units until 999 are used...' )
[2715]1278      ENDIF
1279      !
1280   END FUNCTION get_unit
1281
[11624]1282   SUBROUTINE load_nml( cdnambuff , cdnamfile, kout, ldwp)
1283      CHARACTER(LEN=:)    , ALLOCATABLE, INTENT(INOUT) :: cdnambuff
1284      CHARACTER(LEN=*), INTENT(IN )                :: cdnamfile
1285      CHARACTER(LEN=256)                           :: chline
1286      INTEGER, INTENT(IN)                          :: kout
1287      LOGICAL, INTENT(IN)                          :: ldwp
1288      INTEGER                                      :: itot, iun, iltc, inl, ios
1289      !
1290      ! Check if the namelist buffer has already been allocated. Return if it has.
1291      !
1292      IF ( ALLOCATED( cdnambuff ) ) RETURN
1293      !
1294      ! Open namelist file
1295      !
1296      CALL ctl_opn( iun, cdnamfile, 'OLD', 'FORMATTED', 'SEQUENTIAL', -1, kout, ldwp )
1297      !
1298      ! First pass: count characters excluding comments and trimable white space
1299      !
1300      itot=0
1301  10  READ(iun,'(A256)',END=20,ERR=20) chline
1302      iltc = LEN_TRIM(chline)
1303      IF ( iltc.GT.0 ) THEN
1304       inl = INDEX(chline, '!') 
1305       IF( inl.eq.0 ) THEN
1306        itot = itot + iltc + 1                                ! +1 for the newline character
1307       ELSEIF( inl.GT.0 .AND. LEN_TRIM( chline(1:inl-1) ).GT.0 ) THEN
1308        itot = itot + inl                                  !  includes +1 for the newline character
1309       ENDIF
1310      ENDIF
1311      GOTO 10
1312  20  CONTINUE
1313      !
1314      ! Allocate text cdnambuff for condensed namelist
1315      !
1316      ALLOCATE( CHARACTER(LEN=itot) :: cdnambuff )
1317      WRITE(*,*) 'ALLOCATED ', itot
1318      !
1319      ! Second pass: read and transfer pruned characters into cdnambuff
1320      !
1321      REWIND(iun)
1322      itot=1
1323  30  READ(iun,'(A256)',END=40,ERR=40) chline
1324      iltc = LEN_TRIM(chline)
1325      IF ( iltc.GT.0 ) THEN
1326       inl = INDEX(chline, '!')
1327       IF( inl.eq.0 ) THEN
1328        inl = iltc
1329       ELSE
1330        inl = inl - 1
1331       ENDIF
1332       IF( inl.GT.0 .AND. LEN_TRIM( chline(1:inl) ).GT.0 ) THEN
1333          cdnambuff(itot:itot+inl-1) = chline(1:inl)
1334          WRITE( cdnambuff(itot+inl:itot+inl), '(a)' ) NEW_LINE('A')
1335          itot = itot + inl + 1
1336       ENDIF
1337      ENDIF
1338      GOTO 30
1339  40  CONTINUE
1340      WRITE(*,*) 'ASSIGNED ',itot - 1
1341      !
1342      ! Close namelist file
1343      !
1344      CLOSE(iun)
1345      !write(*,'(32A)') cdnambuff
1346  END SUBROUTINE load_nml
1347
1348
[2715]1349   !!----------------------------------------------------------------------
[3]1350END MODULE lib_mpp
Note: See TracBrowser for help on using the repository browser.