New URL for NEMO forge!   http://forge.nemo-ocean.eu

Since March 2022 along with NEMO 4.2 release, the code development moved to a self-hosted GitLab.
This present forge is now archived and remained online for history.
lib_mpp.F90 in NEMO/branches/2020/dev_r12512_HPC-04_mcastril_Mixed_Precision_implementation/src/OCE/LBC – NEMO

source: NEMO/branches/2020/dev_r12512_HPC-04_mcastril_Mixed_Precision_implementation/src/OCE/LBC/lib_mpp.F90 @ 13181

Last change on this file since 13181 was 13181, checked in by orioltp, 4 years ago

Several bugfixes added by Sam Hatfield (ECMWF).

  • Property svn:keywords set to Id
File size: 69.2 KB
Line 
1MODULE lib_mpp
2   !!======================================================================
3   !!                       ***  MODULE  lib_mpp  ***
4   !! Ocean numerics:  massively parallel processing library
5   !!=====================================================================
6   !! History :  OPA  !  1994  (M. Guyon, J. Escobar, M. Imbard)  Original code
7   !!            7.0  !  1997  (A.M. Treguier)  SHMEM additions
8   !!            8.0  !  1998  (M. Imbard, J. Escobar, L. Colombet ) SHMEM and MPI
9   !!                 !  1998  (J.M. Molines) Open boundary conditions
10   !!   NEMO     1.0  !  2003  (J.M. Molines, G. Madec)  F90, free form
11   !!                 !  2003  (J.M. Molines) add mpp_ini_north(_3d,_2d)
12   !!             -   !  2004  (R. Bourdalle Badie)  isend option in mpi
13   !!                 !  2004  (J.M. Molines) minloc, maxloc
14   !!             -   !  2005  (G. Madec, S. Masson)  npolj=5,6 F-point & ice cases
15   !!             -   !  2005  (R. Redler) Replacement of MPI_COMM_WORLD except for MPI_Abort
16   !!             -   !  2005  (R. Benshila, G. Madec)  add extra halo case
17   !!             -   !  2008  (R. Benshila) add mpp_ini_ice
18   !!            3.2  !  2009  (R. Benshila) SHMEM suppression, north fold in lbc_nfd
19   !!            3.2  !  2009  (O. Marti)    add mpp_ini_znl
20   !!            4.0  !  2011  (G. Madec)  move ctl_ routines from in_out_manager
21   !!            3.5  !  2012  (S.Mocavero, I. Epicoco) Add mpp_lnk_bdy_3d/2d routines to optimize the BDY comm.
22   !!            3.5  !  2013  (C. Ethe, G. Madec)  message passing arrays as local variables
23   !!            3.5  !  2013  (S.Mocavero, I.Epicoco - CMCC) north fold optimizations
24   !!            3.6  !  2015  (O. Tintó and M. Castrillo - BSC) Added '_multiple' case for 2D lbc and max
25   !!            4.0  !  2017  (G. Madec) automatique allocation of array argument (use any 3rd dimension)
26   !!             -   !  2017  (G. Madec) create generic.h90 files to generate all lbc and north fold routines
27   !!----------------------------------------------------------------------
28
29   !!----------------------------------------------------------------------
30   !!   ctl_stop      : update momentum and tracer Kz from a tke scheme
31   !!   ctl_warn      : initialization, namelist read, and parameters control
32   !!   ctl_opn       : Open file and check if required file is available.
33   !!   ctl_nam       : Prints informations when an error occurs while reading a namelist
34   !!   load_nml      : Read, condense and buffer namelist file into character array for use as an internal file
35   !!----------------------------------------------------------------------
36   !!----------------------------------------------------------------------
37   !!   mpp_start     : get local communicator its size and rank
38   !!   mpp_lnk       : interface (defined in lbclnk) for message passing of 2d or 3d arrays (mpp_lnk_2d, mpp_lnk_3d)
39   !!   mpp_lnk_icb   : interface for message passing of 2d arrays with extra halo for icebergs (mpp_lnk_2d_icb)
40   !!   mpprecv       :
41   !!   mppsend       :
42   !!   mppscatter    :
43   !!   mppgather     :
44   !!   mpp_min       : generic interface for mppmin_int , mppmin_a_int , mppmin_real, mppmin_a_real
45   !!   mpp_max       : generic interface for mppmax_int , mppmax_a_int , mppmax_real, mppmax_a_real
46   !!   mpp_sum       : generic interface for mppsum_int , mppsum_a_int , mppsum_real, mppsum_a_real
47   !!   mpp_minloc    :
48   !!   mpp_maxloc    :
49   !!   mppsync       :
50   !!   mppstop       :
51   !!   mpp_ini_north : initialisation of north fold
52   !!   mpp_lbc_north_icb : alternative to mpp_nfd for extra outer halo with icebergs
53   !!   mpp_bcast_nml : broadcast/receive namelist character buffer from reading process to all others
54   !!----------------------------------------------------------------------
55   USE dom_oce        ! ocean space and time domain
56   USE in_out_manager ! I/O manager
57
58   IMPLICIT NONE
59   PRIVATE
60   !
61   PUBLIC   ctl_stop, ctl_warn, ctl_opn, ctl_nam, load_nml
62   PUBLIC   mpp_start, mppstop, mppsync, mpp_comm_free
63   PUBLIC   mpp_ini_north
64   PUBLIC   mpp_min, mpp_max, mpp_sum, mpp_minloc, mpp_maxloc
65   PUBLIC   mpp_delay_max, mpp_delay_sum, mpp_delay_rcv
66   PUBLIC   mppscatter, mppgather
67   PUBLIC   mpp_ini_znl
68   PUBLIC   mppsend, mpprecv                          ! needed by TAM and ICB routines
69   PUBLIC   mppsend_sp, mpprecv_sp                          ! needed by TAM and ICB routines
70   PUBLIC   mppsend_dp, mpprecv_dp                          ! needed by TAM and ICB routines
71   PUBLIC   mpp_report
72   PUBLIC   mpp_bcast_nml
73   PUBLIC   tic_tac
74#if ! defined key_mpp_mpi
75   PUBLIC MPI_Wtime
76#endif
77   
78   !! * Interfaces
79   !! define generic interface for these routine as they are called sometimes
80   !! with scalar arguments instead of array arguments, which causes problems
81   !! for the compilation on AIX system as well as NEC and SGI. Ok on COMPACQ
82   INTERFACE mpp_min
83      MODULE PROCEDURE mppmin_a_int, mppmin_int
84      MODULE PROCEDURE mppmin_a_real_sp, mppmin_real_sp
85      MODULE PROCEDURE mppmin_a_real_dp, mppmin_real_dp
86   END INTERFACE
87   INTERFACE mpp_max
88      MODULE PROCEDURE mppmax_a_int, mppmax_int
89      MODULE PROCEDURE mppmax_a_real_sp, mppmax_real_sp
90      MODULE PROCEDURE mppmax_a_real_dp, mppmax_real_dp
91   END INTERFACE
92   INTERFACE mpp_sum
93      MODULE PROCEDURE mppsum_a_int, mppsum_int
94      MODULE PROCEDURE mppsum_realdd, mppsum_a_realdd
95      MODULE PROCEDURE mppsum_a_real_sp, mppsum_real_sp
96      MODULE PROCEDURE mppsum_a_real_dp, mppsum_real_dp
97   END INTERFACE
98   INTERFACE mpp_minloc
99      MODULE PROCEDURE mpp_minloc2d_sp ,mpp_minloc3d_sp
100      MODULE PROCEDURE mpp_minloc2d_dp ,mpp_minloc3d_dp
101   END INTERFACE
102   INTERFACE mpp_maxloc
103      MODULE PROCEDURE mpp_maxloc2d_sp ,mpp_maxloc3d_sp
104      MODULE PROCEDURE mpp_maxloc2d_dp ,mpp_maxloc3d_dp
105   END INTERFACE
106
107   !! ========================= !!
108   !!  MPI  variable definition !!
109   !! ========================= !!
110#if   defined key_mpp_mpi
111!$AGRIF_DO_NOT_TREAT
112   INCLUDE 'mpif.h'
113!$AGRIF_END_DO_NOT_TREAT
114   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .TRUE.    !: mpp flag
115#else   
116   INTEGER, PUBLIC, PARAMETER ::   MPI_STATUS_SIZE = 1
117   INTEGER, PUBLIC, PARAMETER ::   MPI_DOUBLE_PRECISION = 8
118   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .FALSE.    !: mpp flag
119#endif
120
121   INTEGER, PARAMETER         ::   nprocmax = 2**10   ! maximun dimension (required to be a power of 2)
122
123   INTEGER, PUBLIC ::   mppsize        ! number of process
124   INTEGER, PUBLIC ::   mpprank        ! process number  [ 0 - size-1 ]
125!$AGRIF_DO_NOT_TREAT
126   INTEGER, PUBLIC ::   mpi_comm_oce   ! opa local communicator
127!$AGRIF_END_DO_NOT_TREAT
128
129   INTEGER :: MPI_SUMDD
130
131   ! variables used for zonal integration
132   INTEGER, PUBLIC ::   ncomm_znl       !: communicator made by the processors on the same zonal average
133   LOGICAL, PUBLIC ::   l_znl_root      !: True on the 'left'most processor on the same row
134   INTEGER         ::   ngrp_znl        !  group ID for the znl processors
135   INTEGER         ::   ndim_rank_znl   !  number of processors on the same zonal average
136   INTEGER, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_znl  ! dimension ndim_rank_znl, number of the procs into the same znl domain
137
138   ! North fold condition in mpp_mpi with jpni > 1 (PUBLIC for TAM)
139   INTEGER, PUBLIC ::   ngrp_world        !: group ID for the world processors
140   INTEGER, PUBLIC ::   ngrp_opa          !: group ID for the opa processors
141   INTEGER, PUBLIC ::   ngrp_north        !: group ID for the northern processors (to be fold)
142   INTEGER, PUBLIC ::   ncomm_north       !: communicator made by the processors belonging to ngrp_north
143   INTEGER, PUBLIC ::   ndim_rank_north   !: number of 'sea' processor in the northern line (can be /= jpni !)
144   INTEGER, PUBLIC ::   njmppmax          !: value of njmpp for the processors of the northern line
145   INTEGER, PUBLIC ::   north_root        !: number (in the comm_opa) of proc 0 in the northern comm
146   INTEGER, PUBLIC, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_north   !: dimension ndim_rank_north
147
148   ! Communications summary report
149   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_lbc                   !: names of lbc_lnk calling routines
150   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_glb                   !: names of global comm calling routines
151   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_dlg                   !: names of delayed global comm calling routines
152   INTEGER, PUBLIC                               ::   ncom_stp = 0                 !: copy of time step # istp
153   INTEGER, PUBLIC                               ::   ncom_fsbc = 1                !: copy of sbc time step # nn_fsbc
154   INTEGER, PUBLIC                               ::   ncom_freq                    !: frequency of comm diagnostic
155   INTEGER, PUBLIC , DIMENSION(:,:), ALLOCATABLE ::   ncomm_sequence               !: size of communicated arrays (halos)
156   INTEGER, PARAMETER, PUBLIC                    ::   ncom_rec_max = 5000          !: max number of communication record
157   INTEGER, PUBLIC                               ::   n_sequence_lbc = 0           !: # of communicated arraysvia lbc
158   INTEGER, PUBLIC                               ::   n_sequence_glb = 0           !: # of global communications
159   INTEGER, PUBLIC                               ::   n_sequence_dlg = 0           !: # of delayed global communications
160   INTEGER, PUBLIC                               ::   numcom = -1                  !: logical unit for communicaton report
161   LOGICAL, PUBLIC                               ::   l_full_nf_update = .TRUE.    !: logical for a full (2lines) update of bc at North fold report
162   INTEGER,                    PARAMETER, PUBLIC ::   nbdelay = 2       !: number of delayed operations
163   !: name (used as id) of allreduce-delayed operations
164   ! Warning: we must use the same character length in an array constructor (at least for gcc compiler)
165   CHARACTER(len=32), DIMENSION(nbdelay), PUBLIC ::   c_delaylist = (/ 'cflice', 'fwb   ' /)
166   !: component name where the allreduce-delayed operation is performed
167   CHARACTER(len=3),  DIMENSION(nbdelay), PUBLIC ::   c_delaycpnt = (/ 'ICE'   , 'OCE' /)
168   TYPE, PUBLIC ::   DELAYARR
169      REAL(   wp), POINTER, DIMENSION(:) ::  z1d => NULL()
170      COMPLEX(dp), POINTER, DIMENSION(:) ::  y1d => NULL()
171   END TYPE DELAYARR
172   TYPE( DELAYARR ), DIMENSION(nbdelay), PUBLIC, SAVE  ::   todelay         !: must have SAVE for default initialization of DELAYARR
173   INTEGER,          DIMENSION(nbdelay), PUBLIC        ::   ndelayid = -1   !: mpi request id of the delayed operations
174
175   ! timing summary report
176   REAL(dp), DIMENSION(2), PUBLIC ::  waiting_time = 0._dp
177   REAL(dp)              , PUBLIC ::  compute_time = 0._dp, elapsed_time = 0._dp
178   
179   REAL(wp), DIMENSION(:), ALLOCATABLE, SAVE ::   tampon   ! buffer in case of bsend
180
181   LOGICAL, PUBLIC ::   ln_nnogather                !: namelist control of northfold comms
182   LOGICAL, PUBLIC ::   l_north_nogather = .FALSE.  !: internal control of northfold comms
183   
184   !! * Substitutions
185#  include "do_loop_substitute.h90"
186   !!----------------------------------------------------------------------
187   !! NEMO/OCE 4.0 , NEMO Consortium (2018)
188   !! $Id$
189   !! Software governed by the CeCILL license (see ./LICENSE)
190   !!----------------------------------------------------------------------
191CONTAINS
192
193   SUBROUTINE mpp_start( localComm )
194      !!----------------------------------------------------------------------
195      !!                  ***  routine mpp_start  ***
196      !!
197      !! ** Purpose :   get mpi_comm_oce, mpprank and mppsize
198      !!----------------------------------------------------------------------
199      INTEGER         , OPTIONAL   , INTENT(in   ) ::   localComm    !
200      !
201      INTEGER ::   ierr
202      LOGICAL ::   llmpi_init
203      !!----------------------------------------------------------------------
204#if defined key_mpp_mpi
205      !
206      CALL mpi_initialized ( llmpi_init, ierr )
207      IF( ierr /= MPI_SUCCESS ) CALL ctl_stop( 'STOP', ' lib_mpp: Error in routine mpi_initialized' )
208
209      IF( .NOT. llmpi_init ) THEN
210         IF( PRESENT(localComm) ) THEN
211            WRITE(ctmp1,*) ' lib_mpp: You cannot provide a local communicator '
212            WRITE(ctmp2,*) '          without calling MPI_Init before ! '
213            CALL ctl_stop( 'STOP', ctmp1, ctmp2 )
214         ENDIF
215         CALL mpi_init( ierr )
216         IF( ierr /= MPI_SUCCESS ) CALL ctl_stop( 'STOP', ' lib_mpp: Error in routine mpi_init' )
217      ENDIF
218       
219      IF( PRESENT(localComm) ) THEN
220         IF( Agrif_Root() ) THEN
221            mpi_comm_oce = localComm
222         ENDIF
223      ELSE
224         CALL mpi_comm_dup( mpi_comm_world, mpi_comm_oce, ierr)
225         IF( ierr /= MPI_SUCCESS ) CALL ctl_stop( 'STOP', ' lib_mpp: Error in routine mpi_comm_dup' )
226      ENDIF
227
228# if defined key_agrif
229      IF( Agrif_Root() ) THEN
230         CALL Agrif_MPI_Init(mpi_comm_oce)
231      ELSE
232         CALL Agrif_MPI_set_grid_comm(mpi_comm_oce)
233      ENDIF
234# endif
235
236      CALL mpi_comm_rank( mpi_comm_oce, mpprank, ierr )
237      CALL mpi_comm_size( mpi_comm_oce, mppsize, ierr )
238      !
239      CALL MPI_OP_CREATE(DDPDD_MPI, .TRUE., MPI_SUMDD, ierr)
240      !
241#else
242      IF( PRESENT( localComm ) ) mpi_comm_oce = localComm
243      mppsize = 1
244      mpprank = 0
245#endif
246   END SUBROUTINE mpp_start
247
248
249   SUBROUTINE mppsend( ktyp, pmess, kbytes, kdest, md_req )
250      !!----------------------------------------------------------------------
251      !!                  ***  routine mppsend  ***
252      !!
253      !! ** Purpose :   Send messag passing array
254      !!
255      !!----------------------------------------------------------------------
256      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
257      INTEGER , INTENT(in   ) ::   kbytes     ! size of the array pmess
258      INTEGER , INTENT(in   ) ::   kdest      ! receive process number
259      INTEGER , INTENT(in   ) ::   ktyp       ! tag of the message
260      INTEGER , INTENT(in   ) ::   md_req     ! argument for isend
261      !!
262      INTEGER ::   iflag
263      INTEGER :: mpi_working_type
264      !!----------------------------------------------------------------------
265      !
266#if defined key_mpp_mpi
267      IF (wp == dp) THEN
268         mpi_working_type = mpi_double_precision
269      ELSE
270         mpi_working_type = mpi_real
271      END IF
272      CALL mpi_isend( pmess, kbytes, mpi_working_type, kdest , ktyp, mpi_comm_oce, md_req, iflag )
273#endif
274      !
275   END SUBROUTINE mppsend
276
277
278   SUBROUTINE mppsend_dp( ktyp, pmess, kbytes, kdest, md_req )
279      !!----------------------------------------------------------------------
280      !!                  ***  routine mppsend  ***
281      !!
282      !! ** Purpose :   Send messag passing array
283      !!
284      !!----------------------------------------------------------------------
285      REAL(dp), INTENT(inout) ::   pmess(*)   ! array of real
286      INTEGER , INTENT(in   ) ::   kbytes     ! size of the array pmess
287      INTEGER , INTENT(in   ) ::   kdest      ! receive process number
288      INTEGER , INTENT(in   ) ::   ktyp       ! tag of the message
289      INTEGER , INTENT(in   ) ::   md_req     ! argument for isend
290      !!
291      INTEGER ::   iflag
292      !!----------------------------------------------------------------------
293      !
294#if defined key_mpp_mpi
295      CALL mpi_isend( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce, md_req, iflag )
296#endif
297      !
298   END SUBROUTINE mppsend_dp
299
300
301   SUBROUTINE mppsend_sp( ktyp, pmess, kbytes, kdest, md_req )
302      !!----------------------------------------------------------------------
303      !!                  ***  routine mppsend  ***
304      !!
305      !! ** Purpose :   Send messag passing array
306      !!
307      !!----------------------------------------------------------------------
308      REAL(sp), INTENT(inout) ::   pmess(*)   ! array of real
309      INTEGER , INTENT(in   ) ::   kbytes     ! size of the array pmess
310      INTEGER , INTENT(in   ) ::   kdest      ! receive process number
311      INTEGER , INTENT(in   ) ::   ktyp       ! tag of the message
312      INTEGER , INTENT(in   ) ::   md_req     ! argument for isend
313      !!
314      INTEGER ::   iflag
315      !!----------------------------------------------------------------------
316      !
317#if defined key_mpp_mpi
318      CALL mpi_isend( pmess, kbytes, mpi_real, kdest , ktyp, mpi_comm_oce, md_req, iflag )
319#endif
320      !
321   END SUBROUTINE mppsend_sp
322
323
324   SUBROUTINE mpprecv( ktyp, pmess, kbytes, ksource )
325      !!----------------------------------------------------------------------
326      !!                  ***  routine mpprecv  ***
327      !!
328      !! ** Purpose :   Receive messag passing array
329      !!
330      !!----------------------------------------------------------------------
331      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
332      INTEGER , INTENT(in   ) ::   kbytes     ! suze of the array pmess
333      INTEGER , INTENT(in   ) ::   ktyp       ! Tag of the recevied message
334      INTEGER, OPTIONAL, INTENT(in) :: ksource    ! source process number
335      !!
336      INTEGER :: istatus(mpi_status_size)
337      INTEGER :: iflag
338      INTEGER :: use_source
339      INTEGER :: mpi_working_type
340      !!----------------------------------------------------------------------
341      !
342#if defined key_mpp_mpi
343      ! If a specific process number has been passed to the receive call,
344      ! use that one. Default is to use mpi_any_source
345      use_source = mpi_any_source
346      IF( PRESENT(ksource) )   use_source = ksource
347      !
348      IF (wp == dp) THEN
349         mpi_working_type = mpi_double_precision
350      ELSE
351         mpi_working_type = mpi_real
352      END IF
353      CALL mpi_recv( pmess, kbytes, mpi_working_type, use_source, ktyp, mpi_comm_oce, istatus, iflag )
354#endif
355      !
356   END SUBROUTINE mpprecv
357
358   SUBROUTINE mpprecv_dp( ktyp, pmess, kbytes, ksource )
359      !!----------------------------------------------------------------------
360      !!                  ***  routine mpprecv  ***
361      !!
362      !! ** Purpose :   Receive messag passing array
363      !!
364      !!----------------------------------------------------------------------
365      REAL(dp), INTENT(inout) ::   pmess(*)   ! array of real
366      INTEGER , INTENT(in   ) ::   kbytes     ! suze of the array pmess
367      INTEGER , INTENT(in   ) ::   ktyp       ! Tag of the recevied message
368      INTEGER, OPTIONAL, INTENT(in) :: ksource    ! source process number
369      !!
370      INTEGER :: istatus(mpi_status_size)
371      INTEGER :: iflag
372      INTEGER :: use_source
373      !!----------------------------------------------------------------------
374      !
375#if defined key_mpp_mpi
376      ! If a specific process number has been passed to the receive call,
377      ! use that one. Default is to use mpi_any_source
378      use_source = mpi_any_source
379      IF( PRESENT(ksource) )   use_source = ksource
380      !
381      CALL mpi_recv( pmess, kbytes, mpi_double_precision, use_source, ktyp, mpi_comm_oce, istatus, iflag )
382#endif
383      !
384   END SUBROUTINE mpprecv_dp
385
386
387   SUBROUTINE mpprecv_sp( ktyp, pmess, kbytes, ksource )
388      !!----------------------------------------------------------------------
389      !!                  ***  routine mpprecv  ***
390      !!
391      !! ** Purpose :   Receive messag passing array
392      !!
393      !!----------------------------------------------------------------------
394      REAL(sp), INTENT(inout) ::   pmess(*)   ! array of real
395      INTEGER , INTENT(in   ) ::   kbytes     ! suze of the array pmess
396      INTEGER , INTENT(in   ) ::   ktyp       ! Tag of the recevied message
397      INTEGER, OPTIONAL, INTENT(in) :: ksource    ! source process number
398      !!
399      INTEGER :: istatus(mpi_status_size)
400      INTEGER :: iflag
401      INTEGER :: use_source
402      !!----------------------------------------------------------------------
403      !
404#if defined key_mpp_mpi
405      ! If a specific process number has been passed to the receive call,
406      ! use that one. Default is to use mpi_any_source
407      use_source = mpi_any_source
408      IF( PRESENT(ksource) )   use_source = ksource
409      !
410      CALL mpi_recv( pmess, kbytes, mpi_real, use_source, ktyp, mpi_comm_oce, istatus, iflag )
411#endif
412      !
413   END SUBROUTINE mpprecv_sp
414
415
416   SUBROUTINE mppgather( ptab, kp, pio )
417      !!----------------------------------------------------------------------
418      !!                   ***  routine mppgather  ***
419      !!
420      !! ** Purpose :   Transfert between a local subdomain array and a work
421      !!     array which is distributed following the vertical level.
422      !!
423      !!----------------------------------------------------------------------
424      REAL(wp), DIMENSION(jpi,jpj)      , INTENT(in   ) ::   ptab   ! subdomain input array
425      INTEGER                           , INTENT(in   ) ::   kp     ! record length
426      REAL(wp), DIMENSION(jpi,jpj,jpnij), INTENT(  out) ::   pio    ! subdomain input array
427      !!
428      INTEGER :: itaille, ierror   ! temporary integer
429      !!---------------------------------------------------------------------
430      !
431      itaille = jpi * jpj
432#if defined key_mpp_mpi
433      CALL mpi_gather( ptab, itaille, mpi_double_precision, pio, itaille     ,   &
434         &                            mpi_double_precision, kp , mpi_comm_oce, ierror )
435#else
436      pio(:,:,1) = ptab(:,:)
437#endif
438      !
439   END SUBROUTINE mppgather
440
441
442   SUBROUTINE mppscatter( pio, kp, ptab )
443      !!----------------------------------------------------------------------
444      !!                  ***  routine mppscatter  ***
445      !!
446      !! ** Purpose :   Transfert between awork array which is distributed
447      !!      following the vertical level and the local subdomain array.
448      !!
449      !!----------------------------------------------------------------------
450      REAL(wp), DIMENSION(jpi,jpj,jpnij)  ::   pio    ! output array
451      INTEGER                             ::   kp     ! Tag (not used with MPI
452      REAL(wp), DIMENSION(jpi,jpj)        ::   ptab   ! subdomain array input
453      !!
454      INTEGER :: itaille, ierror   ! temporary integer
455      !!---------------------------------------------------------------------
456      !
457      itaille = jpi * jpj
458      !
459#if defined key_mpp_mpi
460      CALL mpi_scatter( pio, itaille, mpi_double_precision, ptab, itaille     ,   &
461         &                            mpi_double_precision, kp  , mpi_comm_oce, ierror )
462#else
463      ptab(:,:) = pio(:,:,1)
464#endif
465      !
466   END SUBROUTINE mppscatter
467
468   
469   SUBROUTINE mpp_delay_sum( cdname, cdelay, y_in, pout, ldlast, kcom )
470     !!----------------------------------------------------------------------
471      !!                   ***  routine mpp_delay_sum  ***
472      !!
473      !! ** Purpose :   performed delayed mpp_sum, the result is received on next call
474      !!
475      !!----------------------------------------------------------------------
476      CHARACTER(len=*), INTENT(in   )               ::   cdname  ! name of the calling subroutine
477      CHARACTER(len=*), INTENT(in   )               ::   cdelay  ! name (used as id) of the delayed operation
478      COMPLEX(dp),      INTENT(in   ), DIMENSION(:) ::   y_in
479      REAL(wp),         INTENT(  out), DIMENSION(:) ::   pout
480      LOGICAL,          INTENT(in   )               ::   ldlast  ! true if this is the last time we call this routine
481      INTEGER,          INTENT(in   ), OPTIONAL     ::   kcom
482      !!
483      INTEGER ::   ji, isz
484      INTEGER ::   idvar
485      INTEGER ::   ierr, ilocalcomm
486      COMPLEX(dp), ALLOCATABLE, DIMENSION(:) ::   ytmp
487      !!----------------------------------------------------------------------
488#if defined key_mpp_mpi
489      ilocalcomm = mpi_comm_oce
490      IF( PRESENT(kcom) )   ilocalcomm = kcom
491
492      isz = SIZE(y_in)
493     
494      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ld_dlg = .TRUE. )
495
496      idvar = -1
497      DO ji = 1, nbdelay
498         IF( TRIM(cdelay) == TRIM(c_delaylist(ji)) ) idvar = ji
499      END DO
500      IF ( idvar == -1 )   CALL ctl_stop( 'STOP',' mpp_delay_sum : please add a new delayed exchange for '//TRIM(cdname) )
501
502      IF ( ndelayid(idvar) == 0 ) THEN         ! first call    with restart: %z1d defined in iom_delay_rst
503         !                                       --------------------------
504         IF ( SIZE(todelay(idvar)%z1d) /= isz ) THEN                  ! Check dimension coherence
505            IF(lwp) WRITE(numout,*) ' WARNING: the nb of delayed variables in restart file is not the model one'
506            DEALLOCATE(todelay(idvar)%z1d)
507            ndelayid(idvar) = -1                                      ! do as if we had no restart
508         ELSE
509            ALLOCATE(todelay(idvar)%y1d(isz))
510            todelay(idvar)%y1d(:) = CMPLX(todelay(idvar)%z1d(:), 0., wp)   ! create %y1d, complex variable needed by mpi_sumdd
511         END IF
512      ENDIF
513     
514      IF( ndelayid(idvar) == -1 ) THEN         ! first call without restart: define %y1d and %z1d from y_in with blocking allreduce
515         !                                       --------------------------
516         ALLOCATE(todelay(idvar)%z1d(isz), todelay(idvar)%y1d(isz))   ! allocate also %z1d as used for the restart
517         CALL mpi_allreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ierr )   ! get %y1d
518         todelay(idvar)%z1d(:) = REAL(todelay(idvar)%y1d(:), wp)      ! define %z1d from %y1d
519      ENDIF
520
521      IF( ndelayid(idvar) > 0 )   CALL mpp_delay_rcv( idvar )         ! make sure %z1d is received
522
523      ! send back pout from todelay(idvar)%z1d defined at previous call
524      pout(:) = todelay(idvar)%z1d(:)
525
526      ! send y_in into todelay(idvar)%y1d with a non-blocking communication
527# if defined key_mpi2
528      IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
529      CALL  mpi_allreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ierr )
530      ndelayid(idvar) = 1
531      IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
532# else
533      CALL mpi_iallreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ndelayid(idvar), ierr )
534# endif
535#else
536      pout(:) = REAL(y_in(:), wp)
537#endif
538
539   END SUBROUTINE mpp_delay_sum
540
541   
542   SUBROUTINE mpp_delay_max( cdname, cdelay, p_in, pout, ldlast, kcom )
543      !!----------------------------------------------------------------------
544      !!                   ***  routine mpp_delay_max  ***
545      !!
546      !! ** Purpose :   performed delayed mpp_max, the result is received on next call
547      !!
548      !!----------------------------------------------------------------------
549      CHARACTER(len=*), INTENT(in   )                 ::   cdname  ! name of the calling subroutine
550      CHARACTER(len=*), INTENT(in   )                 ::   cdelay  ! name (used as id) of the delayed operation
551      REAL(wp),         INTENT(in   ), DIMENSION(:)   ::   p_in    !
552      REAL(wp),         INTENT(  out), DIMENSION(:)   ::   pout    !
553      LOGICAL,          INTENT(in   )                 ::   ldlast  ! true if this is the last time we call this routine
554      INTEGER,          INTENT(in   ), OPTIONAL       ::   kcom
555      !!
556      INTEGER ::   ji, isz
557      INTEGER ::   idvar
558      INTEGER ::   ierr, ilocalcomm
559      INTEGER ::   MPI_TYPE
560      !!----------------------------------------------------------------------
561     
562#if defined key_mpp_mpi
563      if( wp == dp ) then
564         MPI_TYPE = MPI_DOUBLE_PRECISION
565      else if ( wp == sp ) then
566         MPI_TYPE = MPI_REAL
567      else
568        CALL ctl_stop( "Error defining type, wp is neither dp nor sp" )
569   
570      end if
571
572      ilocalcomm = mpi_comm_oce
573      IF( PRESENT(kcom) )   ilocalcomm = kcom
574
575      isz = SIZE(p_in)
576
577      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ld_dlg = .TRUE. )
578
579      idvar = -1
580      DO ji = 1, nbdelay
581         IF( TRIM(cdelay) == TRIM(c_delaylist(ji)) ) idvar = ji
582      END DO
583      IF ( idvar == -1 )   CALL ctl_stop( 'STOP',' mpp_delay_max : please add a new delayed exchange for '//TRIM(cdname) )
584
585      IF ( ndelayid(idvar) == 0 ) THEN         ! first call    with restart: %z1d defined in iom_delay_rst
586         !                                       --------------------------
587         IF ( SIZE(todelay(idvar)%z1d) /= isz ) THEN                  ! Check dimension coherence
588            IF(lwp) WRITE(numout,*) ' WARNING: the nb of delayed variables in restart file is not the model one'
589            DEALLOCATE(todelay(idvar)%z1d)
590            ndelayid(idvar) = -1                                      ! do as if we had no restart
591         END IF
592      ENDIF
593
594      IF( ndelayid(idvar) == -1 ) THEN         ! first call without restart: define %z1d from p_in with a blocking allreduce
595         !                                       --------------------------
596         ALLOCATE(todelay(idvar)%z1d(isz))
597         CALL mpi_allreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ierr )   ! get %z1d
598      ENDIF
599
600      IF( ndelayid(idvar) > 0 )   CALL mpp_delay_rcv( idvar )         ! make sure %z1d is received
601
602      ! send back pout from todelay(idvar)%z1d defined at previous call
603      pout(:) = todelay(idvar)%z1d(:)
604
605      ! send p_in into todelay(idvar)%z1d with a non-blocking communication
606# if defined key_mpi2
607      IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
608      CALL  mpi_allreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_TYPE, mpi_max, ilocalcomm, ndelayid(idvar), ierr )
609      IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
610# else
611      CALL mpi_iallreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_TYPE, mpi_max, ilocalcomm, ndelayid(idvar), ierr )
612# endif
613#else
614      pout(:) = p_in(:)
615#endif
616
617   END SUBROUTINE mpp_delay_max
618
619   
620   SUBROUTINE mpp_delay_rcv( kid )
621      !!----------------------------------------------------------------------
622      !!                   ***  routine mpp_delay_rcv  ***
623      !!
624      !! ** Purpose :  force barrier for delayed mpp (needed for restart)
625      !!
626      !!----------------------------------------------------------------------
627      INTEGER,INTENT(in   )      ::  kid 
628      INTEGER ::   ierr
629      !!----------------------------------------------------------------------
630#if defined key_mpp_mpi
631      IF( ndelayid(kid) /= -2 ) THEN 
632#if ! defined key_mpi2
633         IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
634         CALL mpi_wait( ndelayid(kid), MPI_STATUS_IGNORE, ierr )                        ! make sure todelay(kid) is received
635         IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
636#endif
637         IF( ASSOCIATED(todelay(kid)%y1d) )   todelay(kid)%z1d(:) = REAL(todelay(kid)%y1d(:), wp)  ! define %z1d from %y1d
638         ndelayid(kid) = -2   ! add flag to know that mpi_wait was already called on kid
639      ENDIF
640#endif
641   END SUBROUTINE mpp_delay_rcv
642
643   SUBROUTINE mpp_bcast_nml( cdnambuff , kleng )
644      CHARACTER(LEN=:)    , ALLOCATABLE, INTENT(INOUT) :: cdnambuff
645      INTEGER                          , INTENT(INOUT) :: kleng
646      !!----------------------------------------------------------------------
647      !!                  ***  routine mpp_bcast_nml  ***
648      !!
649      !! ** Purpose :   broadcast namelist character buffer
650      !!
651      !!----------------------------------------------------------------------
652      !!
653      INTEGER ::   iflag
654      !!----------------------------------------------------------------------
655      !
656#if defined key_mpp_mpi
657      call MPI_BCAST(kleng, 1, MPI_INT, 0, mpi_comm_oce, iflag)
658      call MPI_BARRIER(mpi_comm_oce, iflag)
659!$AGRIF_DO_NOT_TREAT
660      IF ( .NOT. ALLOCATED(cdnambuff) ) ALLOCATE( CHARACTER(LEN=kleng) :: cdnambuff )
661!$AGRIF_END_DO_NOT_TREAT
662      call MPI_BCAST(cdnambuff, kleng, MPI_CHARACTER, 0, mpi_comm_oce, iflag)
663      call MPI_BARRIER(mpi_comm_oce, iflag)
664#endif
665      !
666   END SUBROUTINE mpp_bcast_nml
667
668   
669   !!----------------------------------------------------------------------
670   !!    ***  mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real  ***
671   !!   
672   !!----------------------------------------------------------------------
673   !!
674#  define OPERATION_MAX
675#  define INTEGER_TYPE
676#  define DIM_0d
677#     define ROUTINE_ALLREDUCE           mppmax_int
678#     include "mpp_allreduce_generic.h90"
679#     undef ROUTINE_ALLREDUCE
680#  undef DIM_0d
681#  define DIM_1d
682#     define ROUTINE_ALLREDUCE           mppmax_a_int
683#     include "mpp_allreduce_generic.h90"
684#     undef ROUTINE_ALLREDUCE
685#  undef DIM_1d
686#  undef INTEGER_TYPE
687!
688   !!
689   !!   ----   SINGLE PRECISION VERSIONS
690   !!
691#  define SINGLE_PRECISION
692#  define REAL_TYPE
693#  define DIM_0d
694#     define ROUTINE_ALLREDUCE           mppmax_real_sp
695#     include "mpp_allreduce_generic.h90"
696#     undef ROUTINE_ALLREDUCE
697#  undef DIM_0d
698#  define DIM_1d
699#     define ROUTINE_ALLREDUCE           mppmax_a_real_sp
700#     include "mpp_allreduce_generic.h90"
701#     undef ROUTINE_ALLREDUCE
702#  undef DIM_1d
703#  undef SINGLE_PRECISION
704   !!
705   !!
706   !!   ----   DOUBLE PRECISION VERSIONS
707   !!
708!
709#  define DIM_0d
710#     define ROUTINE_ALLREDUCE           mppmax_real_dp
711#     include "mpp_allreduce_generic.h90"
712#     undef ROUTINE_ALLREDUCE
713#  undef DIM_0d
714#  define DIM_1d
715#     define ROUTINE_ALLREDUCE           mppmax_a_real_dp
716#     include "mpp_allreduce_generic.h90"
717#     undef ROUTINE_ALLREDUCE
718#  undef DIM_1d
719#  undef REAL_TYPE
720#  undef OPERATION_MAX
721   !!----------------------------------------------------------------------
722   !!    ***  mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real  ***
723   !!   
724   !!----------------------------------------------------------------------
725   !!
726#  define OPERATION_MIN
727#  define INTEGER_TYPE
728#  define DIM_0d
729#     define ROUTINE_ALLREDUCE           mppmin_int
730#     include "mpp_allreduce_generic.h90"
731#     undef ROUTINE_ALLREDUCE
732#  undef DIM_0d
733#  define DIM_1d
734#     define ROUTINE_ALLREDUCE           mppmin_a_int
735#     include "mpp_allreduce_generic.h90"
736#     undef ROUTINE_ALLREDUCE
737#  undef DIM_1d
738#  undef INTEGER_TYPE
739!
740   !!
741   !!   ----   SINGLE PRECISION VERSIONS
742   !!
743#  define SINGLE_PRECISION
744#  define REAL_TYPE
745#  define DIM_0d
746#     define ROUTINE_ALLREDUCE           mppmin_real_sp
747#     include "mpp_allreduce_generic.h90"
748#     undef ROUTINE_ALLREDUCE
749#  undef DIM_0d
750#  define DIM_1d
751#     define ROUTINE_ALLREDUCE           mppmin_a_real_sp
752#     include "mpp_allreduce_generic.h90"
753#     undef ROUTINE_ALLREDUCE
754#  undef DIM_1d
755#  undef SINGLE_PRECISION
756   !!
757   !!   ----   DOUBLE PRECISION VERSIONS
758   !!
759
760#  define DIM_0d
761#     define ROUTINE_ALLREDUCE           mppmin_real_dp
762#     include "mpp_allreduce_generic.h90"
763#     undef ROUTINE_ALLREDUCE
764#  undef DIM_0d
765#  define DIM_1d
766#     define ROUTINE_ALLREDUCE           mppmin_a_real_dp
767#     include "mpp_allreduce_generic.h90"
768#     undef ROUTINE_ALLREDUCE
769#  undef DIM_1d
770#  undef REAL_TYPE
771#  undef OPERATION_MIN
772
773   !!----------------------------------------------------------------------
774   !!    ***  mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real  ***
775   !!   
776   !!   Global sum of 1D array or a variable (integer, real or complex)
777   !!----------------------------------------------------------------------
778   !!
779#  define OPERATION_SUM
780#  define INTEGER_TYPE
781#  define DIM_0d
782#     define ROUTINE_ALLREDUCE           mppsum_int
783#     include "mpp_allreduce_generic.h90"
784#     undef ROUTINE_ALLREDUCE
785#  undef DIM_0d
786#  define DIM_1d
787#     define ROUTINE_ALLREDUCE           mppsum_a_int
788#     include "mpp_allreduce_generic.h90"
789#     undef ROUTINE_ALLREDUCE
790#  undef DIM_1d
791#  undef INTEGER_TYPE
792
793   !!
794   !!   ----   SINGLE PRECISION VERSIONS
795   !!
796#  define OPERATION_SUM
797#  define SINGLE_PRECISION
798#  define REAL_TYPE
799#  define DIM_0d
800#     define ROUTINE_ALLREDUCE           mppsum_real_sp
801#     include "mpp_allreduce_generic.h90"
802#     undef ROUTINE_ALLREDUCE
803#  undef DIM_0d
804#  define DIM_1d
805#     define ROUTINE_ALLREDUCE           mppsum_a_real_sp
806#     include "mpp_allreduce_generic.h90"
807#     undef ROUTINE_ALLREDUCE
808#  undef DIM_1d
809#  undef REAL_TYPE
810#  undef OPERATION_SUM
811
812#  undef SINGLE_PRECISION
813
814   !!
815   !!   ----   DOUBLE PRECISION VERSIONS
816   !!
817#  define OPERATION_SUM
818#  define REAL_TYPE
819#  define DIM_0d
820#     define ROUTINE_ALLREDUCE           mppsum_real_dp
821#     include "mpp_allreduce_generic.h90"
822#     undef ROUTINE_ALLREDUCE
823#  undef DIM_0d
824#  define DIM_1d
825#     define ROUTINE_ALLREDUCE           mppsum_a_real_dp
826#     include "mpp_allreduce_generic.h90"
827#     undef ROUTINE_ALLREDUCE
828#  undef DIM_1d
829#  undef REAL_TYPE
830#  undef OPERATION_SUM
831
832#  define OPERATION_SUM_DD
833#  define COMPLEX_TYPE
834#  define DIM_0d
835#     define ROUTINE_ALLREDUCE           mppsum_realdd
836#     include "mpp_allreduce_generic.h90"
837#     undef ROUTINE_ALLREDUCE
838#  undef DIM_0d
839#  define DIM_1d
840#     define ROUTINE_ALLREDUCE           mppsum_a_realdd
841#     include "mpp_allreduce_generic.h90"
842#     undef ROUTINE_ALLREDUCE
843#  undef DIM_1d
844#  undef COMPLEX_TYPE
845#  undef OPERATION_SUM_DD
846
847   !!----------------------------------------------------------------------
848   !!    ***  mpp_minloc2d, mpp_minloc3d, mpp_maxloc2d, mpp_maxloc3d
849   !!   
850   !!----------------------------------------------------------------------
851   !!
852   !!
853   !!   ----   SINGLE PRECISION VERSIONS
854   !!
855#  define SINGLE_PRECISION
856#  define OPERATION_MINLOC
857#  define DIM_2d
858#     define ROUTINE_LOC           mpp_minloc2d_sp
859#     include "mpp_loc_generic.h90"
860#     undef ROUTINE_LOC
861#  undef DIM_2d
862#  define DIM_3d
863#     define ROUTINE_LOC           mpp_minloc3d_sp
864#     include "mpp_loc_generic.h90"
865#     undef ROUTINE_LOC
866#  undef DIM_3d
867#  undef OPERATION_MINLOC
868
869#  define OPERATION_MAXLOC
870#  define DIM_2d
871#     define ROUTINE_LOC           mpp_maxloc2d_sp
872#     include "mpp_loc_generic.h90"
873#     undef ROUTINE_LOC
874#  undef DIM_2d
875#  define DIM_3d
876#     define ROUTINE_LOC           mpp_maxloc3d_sp
877#     include "mpp_loc_generic.h90"
878#     undef ROUTINE_LOC
879#  undef DIM_3d
880#  undef OPERATION_MAXLOC
881#  undef SINGLE_PRECISION
882   !!
883   !!   ----   DOUBLE PRECISION VERSIONS
884   !!
885#  define OPERATION_MINLOC
886#  define DIM_2d
887#     define ROUTINE_LOC           mpp_minloc2d_dp
888#     include "mpp_loc_generic.h90"
889#     undef ROUTINE_LOC
890#  undef DIM_2d
891#  define DIM_3d
892#     define ROUTINE_LOC           mpp_minloc3d_dp
893#     include "mpp_loc_generic.h90"
894#     undef ROUTINE_LOC
895#  undef DIM_3d
896#  undef OPERATION_MINLOC
897
898#  define OPERATION_MAXLOC
899#  define DIM_2d
900#     define ROUTINE_LOC           mpp_maxloc2d_dp
901#     include "mpp_loc_generic.h90"
902#     undef ROUTINE_LOC
903#  undef DIM_2d
904#  define DIM_3d
905#     define ROUTINE_LOC           mpp_maxloc3d_dp
906#     include "mpp_loc_generic.h90"
907#     undef ROUTINE_LOC
908#  undef DIM_3d
909#  undef OPERATION_MAXLOC
910
911
912   SUBROUTINE mppsync()
913      !!----------------------------------------------------------------------
914      !!                  ***  routine mppsync  ***
915      !!
916      !! ** Purpose :   Massively parallel processors, synchroneous
917      !!
918      !!-----------------------------------------------------------------------
919      INTEGER :: ierror
920      !!-----------------------------------------------------------------------
921      !
922#if defined key_mpp_mpi
923      CALL mpi_barrier( mpi_comm_oce, ierror )
924#endif
925      !
926   END SUBROUTINE mppsync
927
928
929   SUBROUTINE mppstop( ld_abort ) 
930      !!----------------------------------------------------------------------
931      !!                  ***  routine mppstop  ***
932      !!
933      !! ** purpose :   Stop massively parallel processors method
934      !!
935      !!----------------------------------------------------------------------
936      LOGICAL, OPTIONAL, INTENT(in) :: ld_abort    ! source process number
937      LOGICAL ::   ll_abort
938      INTEGER ::   info
939      !!----------------------------------------------------------------------
940      ll_abort = .FALSE.
941      IF( PRESENT(ld_abort) ) ll_abort = ld_abort
942      !
943#if defined key_mpp_mpi
944      IF(ll_abort) THEN
945         CALL mpi_abort( MPI_COMM_WORLD )
946      ELSE
947         CALL mppsync
948         CALL mpi_finalize( info )
949      ENDIF
950#endif
951      IF( ll_abort ) STOP 123
952      !
953   END SUBROUTINE mppstop
954
955
956   SUBROUTINE mpp_comm_free( kcom )
957      !!----------------------------------------------------------------------
958      INTEGER, INTENT(in) ::   kcom
959      !!
960      INTEGER :: ierr
961      !!----------------------------------------------------------------------
962      !
963#if defined key_mpp_mpi
964      CALL MPI_COMM_FREE(kcom, ierr)
965#endif
966      !
967   END SUBROUTINE mpp_comm_free
968
969
970   SUBROUTINE mpp_ini_znl( kumout )
971      !!----------------------------------------------------------------------
972      !!               ***  routine mpp_ini_znl  ***
973      !!
974      !! ** Purpose :   Initialize special communicator for computing zonal sum
975      !!
976      !! ** Method  : - Look for processors in the same row
977      !!              - Put their number in nrank_znl
978      !!              - Create group for the znl processors
979      !!              - Create a communicator for znl processors
980      !!              - Determine if processor should write znl files
981      !!
982      !! ** output
983      !!      ndim_rank_znl = number of processors on the same row
984      !!      ngrp_znl = group ID for the znl processors
985      !!      ncomm_znl = communicator for the ice procs.
986      !!      n_znl_root = number (in the world) of proc 0 in the ice comm.
987      !!
988      !!----------------------------------------------------------------------
989      INTEGER, INTENT(in) ::   kumout   ! ocean.output logical units
990      !
991      INTEGER :: jproc      ! dummy loop integer
992      INTEGER :: ierr, ii   ! local integer
993      INTEGER, ALLOCATABLE, DIMENSION(:) ::   kwork
994      !!----------------------------------------------------------------------
995#if defined key_mpp_mpi
996      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_world     : ', ngrp_world
997      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_world : ', mpi_comm_world
998      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_oce   : ', mpi_comm_oce
999      !
1000      ALLOCATE( kwork(jpnij), STAT=ierr )
1001      IF( ierr /= 0 ) CALL ctl_stop( 'STOP', 'mpp_ini_znl : failed to allocate 1D array of length jpnij')
1002
1003      IF( jpnj == 1 ) THEN
1004         ngrp_znl  = ngrp_world
1005         ncomm_znl = mpi_comm_oce
1006      ELSE
1007         !
1008         CALL MPI_ALLGATHER ( njmpp, 1, mpi_integer, kwork, 1, mpi_integer, mpi_comm_oce, ierr )
1009         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - kwork pour njmpp : ', kwork
1010         !-$$        CALL flush(numout)
1011         !
1012         ! Count number of processors on the same row
1013         ndim_rank_znl = 0
1014         DO jproc=1,jpnij
1015            IF ( kwork(jproc) == njmpp ) THEN
1016               ndim_rank_znl = ndim_rank_znl + 1
1017            ENDIF
1018         END DO
1019         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ndim_rank_znl : ', ndim_rank_znl
1020         !-$$        CALL flush(numout)
1021         ! Allocate the right size to nrank_znl
1022         IF (ALLOCATED (nrank_znl)) DEALLOCATE(nrank_znl)
1023         ALLOCATE(nrank_znl(ndim_rank_znl))
1024         ii = 0
1025         nrank_znl (:) = 0
1026         DO jproc=1,jpnij
1027            IF ( kwork(jproc) == njmpp) THEN
1028               ii = ii + 1
1029               nrank_znl(ii) = jproc -1
1030            ENDIF
1031         END DO
1032         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - nrank_znl : ', nrank_znl
1033         !-$$        CALL flush(numout)
1034
1035         ! Create the opa group
1036         CALL MPI_COMM_GROUP(mpi_comm_oce,ngrp_opa,ierr)
1037         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_opa : ', ngrp_opa
1038         !-$$        CALL flush(numout)
1039
1040         ! Create the znl group from the opa group
1041         CALL MPI_GROUP_INCL  ( ngrp_opa, ndim_rank_znl, nrank_znl, ngrp_znl, ierr )
1042         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_znl ', ngrp_znl
1043         !-$$        CALL flush(numout)
1044
1045         ! Create the znl communicator from the opa communicator, ie the pool of procs in the same row
1046         CALL MPI_COMM_CREATE ( mpi_comm_oce, ngrp_znl, ncomm_znl, ierr )
1047         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ncomm_znl ', ncomm_znl
1048         !-$$        CALL flush(numout)
1049         !
1050      END IF
1051
1052      ! Determines if processor if the first (starting from i=1) on the row
1053      IF ( jpni == 1 ) THEN
1054         l_znl_root = .TRUE.
1055      ELSE
1056         l_znl_root = .FALSE.
1057         kwork (1) = nimpp
1058         CALL mpp_min ( 'lib_mpp', kwork(1), kcom = ncomm_znl)
1059         IF ( nimpp == kwork(1)) l_znl_root = .TRUE.
1060      END IF
1061
1062      DEALLOCATE(kwork)
1063#endif
1064
1065   END SUBROUTINE mpp_ini_znl
1066
1067
1068   SUBROUTINE mpp_ini_north
1069      !!----------------------------------------------------------------------
1070      !!               ***  routine mpp_ini_north  ***
1071      !!
1072      !! ** Purpose :   Initialize special communicator for north folding
1073      !!      condition together with global variables needed in the mpp folding
1074      !!
1075      !! ** Method  : - Look for northern processors
1076      !!              - Put their number in nrank_north
1077      !!              - Create groups for the world processors and the north processors
1078      !!              - Create a communicator for northern processors
1079      !!
1080      !! ** output
1081      !!      njmppmax = njmpp for northern procs
1082      !!      ndim_rank_north = number of processors in the northern line
1083      !!      nrank_north (ndim_rank_north) = number  of the northern procs.
1084      !!      ngrp_world = group ID for the world processors
1085      !!      ngrp_north = group ID for the northern processors
1086      !!      ncomm_north = communicator for the northern procs.
1087      !!      north_root = number (in the world) of proc 0 in the northern comm.
1088      !!
1089      !!----------------------------------------------------------------------
1090      INTEGER ::   ierr
1091      INTEGER ::   jjproc
1092      INTEGER ::   ii, ji
1093      !!----------------------------------------------------------------------
1094      !
1095#if defined key_mpp_mpi
1096      njmppmax = MAXVAL( njmppt )
1097      !
1098      ! Look for how many procs on the northern boundary
1099      ndim_rank_north = 0
1100      DO jjproc = 1, jpnij
1101         IF( njmppt(jjproc) == njmppmax )   ndim_rank_north = ndim_rank_north + 1
1102      END DO
1103      !
1104      ! Allocate the right size to nrank_north
1105      IF (ALLOCATED (nrank_north)) DEALLOCATE(nrank_north)
1106      ALLOCATE( nrank_north(ndim_rank_north) )
1107
1108      ! Fill the nrank_north array with proc. number of northern procs.
1109      ! Note : the rank start at 0 in MPI
1110      ii = 0
1111      DO ji = 1, jpnij
1112         IF ( njmppt(ji) == njmppmax   ) THEN
1113            ii=ii+1
1114            nrank_north(ii)=ji-1
1115         END IF
1116      END DO
1117      !
1118      ! create the world group
1119      CALL MPI_COMM_GROUP( mpi_comm_oce, ngrp_world, ierr )
1120      !
1121      ! Create the North group from the world group
1122      CALL MPI_GROUP_INCL( ngrp_world, ndim_rank_north, nrank_north, ngrp_north, ierr )
1123      !
1124      ! Create the North communicator , ie the pool of procs in the north group
1125      CALL MPI_COMM_CREATE( mpi_comm_oce, ngrp_north, ncomm_north, ierr )
1126      !
1127#endif
1128   END SUBROUTINE mpp_ini_north
1129
1130
1131   SUBROUTINE DDPDD_MPI( ydda, yddb, ilen, itype )
1132      !!---------------------------------------------------------------------
1133      !!   Routine DDPDD_MPI: used by reduction operator MPI_SUMDD
1134      !!
1135      !!   Modification of original codes written by David H. Bailey
1136      !!   This subroutine computes yddb(i) = ydda(i)+yddb(i)
1137      !!---------------------------------------------------------------------
1138      INTEGER                     , INTENT(in)    ::   ilen, itype
1139      COMPLEX(dp), DIMENSION(ilen), INTENT(in)    ::   ydda
1140      COMPLEX(dp), DIMENSION(ilen), INTENT(inout) ::   yddb
1141      !
1142      REAL(dp) :: zerr, zt1, zt2    ! local work variables
1143      INTEGER  :: ji, ztmp           ! local scalar
1144      !!---------------------------------------------------------------------
1145      !
1146      ztmp = itype   ! avoid compilation warning
1147      !
1148      DO ji=1,ilen
1149      ! Compute ydda + yddb using Knuth's trick.
1150         zt1  = real(ydda(ji)) + real(yddb(ji))
1151         zerr = zt1 - real(ydda(ji))
1152         zt2  = ((real(yddb(ji)) - zerr) + (real(ydda(ji)) - (zt1 - zerr))) &
1153                + aimag(ydda(ji)) + aimag(yddb(ji))
1154
1155         ! The result is zt1 + zt2, after normalization.
1156         yddb(ji) = cmplx ( zt1 + zt2, zt2 - ((zt1 + zt2) - zt1),wp )
1157      END DO
1158      !
1159   END SUBROUTINE DDPDD_MPI
1160
1161
1162   SUBROUTINE mpp_report( cdname, kpk, kpl, kpf, ld_lbc, ld_glb, ld_dlg )
1163      !!----------------------------------------------------------------------
1164      !!                  ***  routine mpp_report  ***
1165      !!
1166      !! ** Purpose :   report use of mpp routines per time-setp
1167      !!
1168      !!----------------------------------------------------------------------
1169      CHARACTER(len=*),           INTENT(in   ) ::   cdname      ! name of the calling subroutine
1170      INTEGER         , OPTIONAL, INTENT(in   ) ::   kpk, kpl, kpf
1171      LOGICAL         , OPTIONAL, INTENT(in   ) ::   ld_lbc, ld_glb, ld_dlg
1172      !!
1173      CHARACTER(len=128)                        ::   ccountname  ! name of a subroutine to count communications
1174      LOGICAL ::   ll_lbc, ll_glb, ll_dlg
1175      INTEGER ::    ji,  jj,  jk,  jh, jf, jcount   ! dummy loop indices
1176      !!----------------------------------------------------------------------
1177#if defined key_mpp_mpi
1178      !
1179      ll_lbc = .FALSE.
1180      IF( PRESENT(ld_lbc) ) ll_lbc = ld_lbc
1181      ll_glb = .FALSE.
1182      IF( PRESENT(ld_glb) ) ll_glb = ld_glb
1183      ll_dlg = .FALSE.
1184      IF( PRESENT(ld_dlg) ) ll_dlg = ld_dlg
1185      !
1186      ! find the smallest common frequency: default = frequency product, if multiple, choose the larger of the 2 frequency
1187      ncom_freq = ncom_fsbc
1188      !
1189      IF ( ncom_stp == nit000+ncom_freq ) THEN   ! avoid to count extra communications in potential initializations at nit000
1190         IF( ll_lbc ) THEN
1191            IF( .NOT. ALLOCATED(ncomm_sequence) ) ALLOCATE( ncomm_sequence(ncom_rec_max,2) )
1192            IF( .NOT. ALLOCATED(    crname_lbc) ) ALLOCATE(     crname_lbc(ncom_rec_max  ) )
1193            n_sequence_lbc = n_sequence_lbc + 1
1194            IF( n_sequence_lbc > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1195            crname_lbc(n_sequence_lbc) = cdname     ! keep the name of the calling routine
1196            ncomm_sequence(n_sequence_lbc,1) = kpk*kpl   ! size of 3rd and 4th dimensions
1197            ncomm_sequence(n_sequence_lbc,2) = kpf       ! number of arrays to be treated (multi)
1198         ENDIF
1199         IF( ll_glb ) THEN
1200            IF( .NOT. ALLOCATED(crname_glb) ) ALLOCATE( crname_glb(ncom_rec_max) )
1201            n_sequence_glb = n_sequence_glb + 1
1202            IF( n_sequence_glb > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1203            crname_glb(n_sequence_glb) = cdname     ! keep the name of the calling routine
1204         ENDIF
1205         IF( ll_dlg ) THEN
1206            IF( .NOT. ALLOCATED(crname_dlg) ) ALLOCATE( crname_dlg(ncom_rec_max) )
1207            n_sequence_dlg = n_sequence_dlg + 1
1208            IF( n_sequence_dlg > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1209            crname_dlg(n_sequence_dlg) = cdname     ! keep the name of the calling routine
1210         ENDIF
1211      ELSE IF ( ncom_stp == nit000+2*ncom_freq ) THEN
1212         CALL ctl_opn( numcom, 'communication_report.txt', 'REPLACE', 'FORMATTED', 'SEQUENTIAL', -1, numout, .FALSE., narea )
1213         WRITE(numcom,*) ' '
1214         WRITE(numcom,*) ' ------------------------------------------------------------'
1215         WRITE(numcom,*) ' Communication pattern report (second oce+sbc+top time step):'
1216         WRITE(numcom,*) ' ------------------------------------------------------------'
1217         WRITE(numcom,*) ' '
1218         WRITE(numcom,'(A,I4)') ' Exchanged halos : ', n_sequence_lbc
1219         jj = 0; jk = 0; jf = 0; jh = 0
1220         DO ji = 1, n_sequence_lbc
1221            IF ( ncomm_sequence(ji,1) .GT. 1 ) jk = jk + 1
1222            IF ( ncomm_sequence(ji,2) .GT. 1 ) jf = jf + 1
1223            IF ( ncomm_sequence(ji,1) .GT. 1 .AND. ncomm_sequence(ji,2) .GT. 1 ) jj = jj + 1
1224            jh = MAX (jh, ncomm_sequence(ji,1)*ncomm_sequence(ji,2))
1225         END DO
1226         WRITE(numcom,'(A,I3)') ' 3D Exchanged halos : ', jk
1227         WRITE(numcom,'(A,I3)') ' Multi arrays exchanged halos : ', jf
1228         WRITE(numcom,'(A,I3)') '   from which 3D : ', jj
1229         WRITE(numcom,'(A,I10)') ' Array max size : ', jh*jpi*jpj
1230         WRITE(numcom,*) ' '
1231         WRITE(numcom,*) ' lbc_lnk called'
1232         DO ji = 1, n_sequence_lbc - 1
1233            IF ( crname_lbc(ji) /= 'already counted' ) THEN
1234               ccountname = crname_lbc(ji)
1235               crname_lbc(ji) = 'already counted'
1236               jcount = 1
1237               DO jj = ji + 1, n_sequence_lbc
1238                  IF ( ccountname ==  crname_lbc(jj) ) THEN
1239                     jcount = jcount + 1
1240                     crname_lbc(jj) = 'already counted'
1241                  END IF
1242               END DO
1243               WRITE(numcom,'(A, I4, A, A)') ' - ', jcount,' times by subroutine ', TRIM(ccountname)
1244            END IF
1245         END DO
1246         IF ( crname_lbc(n_sequence_lbc) /= 'already counted' ) THEN
1247            WRITE(numcom,'(A, I4, A, A)') ' - ', 1,' times by subroutine ', TRIM(crname_lbc(ncom_rec_max))
1248         END IF
1249         WRITE(numcom,*) ' '
1250         IF ( n_sequence_glb > 0 ) THEN
1251            WRITE(numcom,'(A,I4)') ' Global communications : ', n_sequence_glb
1252            jj = 1
1253            DO ji = 2, n_sequence_glb
1254               IF( crname_glb(ji-1) /= crname_glb(ji) ) THEN
1255                  WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(ji-1))
1256                  jj = 0
1257               END IF
1258               jj = jj + 1 
1259            END DO
1260            WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(n_sequence_glb))
1261            DEALLOCATE(crname_glb)
1262         ELSE
1263            WRITE(numcom,*) ' No MPI global communication '
1264         ENDIF
1265         WRITE(numcom,*) ' '
1266         IF ( n_sequence_dlg > 0 ) THEN
1267            WRITE(numcom,'(A,I4)') ' Delayed global communications : ', n_sequence_dlg
1268            jj = 1
1269            DO ji = 2, n_sequence_dlg
1270               IF( crname_dlg(ji-1) /= crname_dlg(ji) ) THEN
1271                  WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_dlg(ji-1))
1272                  jj = 0
1273               END IF
1274               jj = jj + 1 
1275            END DO
1276            WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_dlg(n_sequence_dlg))
1277            DEALLOCATE(crname_dlg)
1278         ELSE
1279            WRITE(numcom,*) ' No MPI delayed global communication '
1280         ENDIF
1281         WRITE(numcom,*) ' '
1282         WRITE(numcom,*) ' -----------------------------------------------'
1283         WRITE(numcom,*) ' '
1284         DEALLOCATE(ncomm_sequence)
1285         DEALLOCATE(crname_lbc)
1286      ENDIF
1287#endif
1288   END SUBROUTINE mpp_report
1289
1290   
1291   SUBROUTINE tic_tac (ld_tic, ld_global)
1292
1293    LOGICAL,           INTENT(IN) :: ld_tic
1294    LOGICAL, OPTIONAL, INTENT(IN) :: ld_global
1295    REAL(dp), DIMENSION(2), SAVE :: tic_wt
1296    REAL(dp),               SAVE :: tic_ct = 0._dp
1297    INTEGER :: ii
1298#if defined key_mpp_mpi
1299
1300    IF( ncom_stp <= nit000 ) RETURN
1301    IF( ncom_stp == nitend ) RETURN
1302    ii = 1
1303    IF( PRESENT( ld_global ) ) THEN
1304       IF( ld_global ) ii = 2
1305    END IF
1306   
1307    IF ( ld_tic ) THEN
1308       tic_wt(ii) = MPI_Wtime()                                                    ! start count tic->tac (waiting time)
1309       IF ( tic_ct > 0.0_dp ) compute_time = compute_time + MPI_Wtime() - tic_ct   ! cumulate count tac->tic
1310    ELSE
1311       waiting_time(ii) = waiting_time(ii) + MPI_Wtime() - tic_wt(ii)              ! cumulate count tic->tac
1312       tic_ct = MPI_Wtime()                                                        ! start count tac->tic (waiting time)
1313    ENDIF
1314#endif
1315   
1316   END SUBROUTINE tic_tac
1317
1318#if ! defined key_mpp_mpi
1319   SUBROUTINE mpi_wait(request, status, ierror)
1320      INTEGER                            , INTENT(in   ) ::   request
1321      INTEGER, DIMENSION(MPI_STATUS_SIZE), INTENT(  out) ::   status
1322      INTEGER                            , INTENT(  out) ::   ierror
1323   END SUBROUTINE mpi_wait
1324
1325   
1326   FUNCTION MPI_Wtime()
1327      REAL(wp) ::  MPI_Wtime
1328      MPI_Wtime = -1.
1329   END FUNCTION MPI_Wtime
1330#endif
1331
1332   !!----------------------------------------------------------------------
1333   !!   ctl_stop, ctl_warn, get_unit, ctl_opn, ctl_nam, load_nml   routines
1334   !!----------------------------------------------------------------------
1335
1336   SUBROUTINE ctl_stop( cd1, cd2, cd3, cd4, cd5 ,   &
1337      &                 cd6, cd7, cd8, cd9, cd10 )
1338      !!----------------------------------------------------------------------
1339      !!                  ***  ROUTINE  stop_opa  ***
1340      !!
1341      !! ** Purpose :   print in ocean.outpput file a error message and
1342      !!                increment the error number (nstop) by one.
1343      !!----------------------------------------------------------------------
1344      CHARACTER(len=*), INTENT(in   )           ::   cd1
1345      CHARACTER(len=*), INTENT(in   ), OPTIONAL ::        cd2, cd3, cd4, cd5
1346      CHARACTER(len=*), INTENT(in   ), OPTIONAL ::   cd6, cd7, cd8, cd9, cd10
1347      !
1348      CHARACTER(LEN=8) ::   clfmt            ! writing format
1349      INTEGER          ::   inum
1350      !!----------------------------------------------------------------------
1351      !
1352      nstop = nstop + 1
1353      !
1354      IF( cd1 == 'STOP' .AND. narea /= 1 ) THEN    ! Immediate stop: add an arror message in 'ocean.output' file
1355         CALL ctl_opn( inum, 'ocean.output', 'APPEND', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. )
1356         WRITE(inum,*)
1357         WRITE(inum,*) ' ==>>>   Look for "E R R O R" messages in all existing *ocean.output* files'
1358         CLOSE(inum)
1359      ENDIF
1360      IF( numout == 6 ) THEN                       ! force to open ocean.output file if not already opened
1361         CALL ctl_opn( numout, 'ocean.output', 'REPLACE', 'FORMATTED', 'SEQUENTIAL', -1, -1, .FALSE., narea )
1362      ENDIF
1363      !
1364                            WRITE(numout,*)
1365                            WRITE(numout,*) ' ===>>> : E R R O R'
1366                            WRITE(numout,*)
1367                            WRITE(numout,*) '         ==========='
1368                            WRITE(numout,*)
1369                            WRITE(numout,*) TRIM(cd1)
1370      IF( PRESENT(cd2 ) )   WRITE(numout,*) TRIM(cd2)
1371      IF( PRESENT(cd3 ) )   WRITE(numout,*) TRIM(cd3)
1372      IF( PRESENT(cd4 ) )   WRITE(numout,*) TRIM(cd4)
1373      IF( PRESENT(cd5 ) )   WRITE(numout,*) TRIM(cd5)
1374      IF( PRESENT(cd6 ) )   WRITE(numout,*) TRIM(cd6)
1375      IF( PRESENT(cd7 ) )   WRITE(numout,*) TRIM(cd7)
1376      IF( PRESENT(cd8 ) )   WRITE(numout,*) TRIM(cd8)
1377      IF( PRESENT(cd9 ) )   WRITE(numout,*) TRIM(cd9)
1378      IF( PRESENT(cd10) )   WRITE(numout,*) TRIM(cd10)
1379                            WRITE(numout,*)
1380      !
1381                               CALL FLUSH(numout    )
1382      IF( numstp     /= -1 )   CALL FLUSH(numstp    )
1383      IF( numrun     /= -1 )   CALL FLUSH(numrun    )
1384      IF( numevo_ice /= -1 )   CALL FLUSH(numevo_ice)
1385      !
1386      IF( cd1 == 'STOP' ) THEN
1387         WRITE(numout,*) 
1388         WRITE(numout,*)  'huge E-R-R-O-R : immediate stop'
1389         WRITE(numout,*) 
1390         CALL FLUSH(numout)
1391         CALL SLEEP(60)   ! make sure that all output and abort files are written by all cores. 60s should be enough...
1392         CALL mppstop( ld_abort = .true. )
1393      ENDIF
1394      !
1395   END SUBROUTINE ctl_stop
1396
1397
1398   SUBROUTINE ctl_warn( cd1, cd2, cd3, cd4, cd5,   &
1399      &                 cd6, cd7, cd8, cd9, cd10 )
1400      !!----------------------------------------------------------------------
1401      !!                  ***  ROUTINE  stop_warn  ***
1402      !!
1403      !! ** Purpose :   print in ocean.outpput file a error message and
1404      !!                increment the warning number (nwarn) by one.
1405      !!----------------------------------------------------------------------
1406      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd1, cd2, cd3, cd4, cd5
1407      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd6, cd7, cd8, cd9, cd10
1408      !!----------------------------------------------------------------------
1409      !
1410      nwarn = nwarn + 1
1411      !
1412      IF(lwp) THEN
1413                               WRITE(numout,*)
1414                               WRITE(numout,*) ' ===>>> : W A R N I N G'
1415                               WRITE(numout,*)
1416                               WRITE(numout,*) '         ==============='
1417                               WRITE(numout,*)
1418         IF( PRESENT(cd1 ) )   WRITE(numout,*) TRIM(cd1)
1419         IF( PRESENT(cd2 ) )   WRITE(numout,*) TRIM(cd2)
1420         IF( PRESENT(cd3 ) )   WRITE(numout,*) TRIM(cd3)
1421         IF( PRESENT(cd4 ) )   WRITE(numout,*) TRIM(cd4)
1422         IF( PRESENT(cd5 ) )   WRITE(numout,*) TRIM(cd5)
1423         IF( PRESENT(cd6 ) )   WRITE(numout,*) TRIM(cd6)
1424         IF( PRESENT(cd7 ) )   WRITE(numout,*) TRIM(cd7)
1425         IF( PRESENT(cd8 ) )   WRITE(numout,*) TRIM(cd8)
1426         IF( PRESENT(cd9 ) )   WRITE(numout,*) TRIM(cd9)
1427         IF( PRESENT(cd10) )   WRITE(numout,*) TRIM(cd10)
1428                               WRITE(numout,*)
1429      ENDIF
1430      CALL FLUSH(numout)
1431      !
1432   END SUBROUTINE ctl_warn
1433
1434
1435   SUBROUTINE ctl_opn( knum, cdfile, cdstat, cdform, cdacce, klengh, kout, ldwp, karea )
1436      !!----------------------------------------------------------------------
1437      !!                  ***  ROUTINE ctl_opn  ***
1438      !!
1439      !! ** Purpose :   Open file and check if required file is available.
1440      !!
1441      !! ** Method  :   Fortan open
1442      !!----------------------------------------------------------------------
1443      INTEGER          , INTENT(  out) ::   knum      ! logical unit to open
1444      CHARACTER(len=*) , INTENT(in   ) ::   cdfile    ! file name to open
1445      CHARACTER(len=*) , INTENT(in   ) ::   cdstat    ! disposition specifier
1446      CHARACTER(len=*) , INTENT(in   ) ::   cdform    ! formatting specifier
1447      CHARACTER(len=*) , INTENT(in   ) ::   cdacce    ! access specifier
1448      INTEGER          , INTENT(in   ) ::   klengh    ! record length
1449      INTEGER          , INTENT(in   ) ::   kout      ! number of logical units for write
1450      LOGICAL          , INTENT(in   ) ::   ldwp      ! boolean term for print
1451      INTEGER, OPTIONAL, INTENT(in   ) ::   karea     ! proc number
1452      !
1453      CHARACTER(len=80) ::   clfile
1454      CHARACTER(LEN=10) ::   clfmt            ! writing format
1455      INTEGER           ::   iost
1456      INTEGER           ::   idg              ! number of digits
1457      !!----------------------------------------------------------------------
1458      !
1459      ! adapt filename
1460      ! ----------------
1461      clfile = TRIM(cdfile)
1462      IF( PRESENT( karea ) ) THEN
1463         IF( karea > 1 ) THEN
1464            ! Warning: jpnij is maybe not already defined when calling ctl_opn -> use mppsize instead of jpnij
1465            idg = MAX( INT(LOG10(REAL(MAX(1,mppsize-1),wp))) + 1, 4 )      ! how many digits to we need to write? min=4, max=9
1466            WRITE(clfmt, "('(a,a,i', i1, '.', i1, ')')") idg, idg          ! '(a,a,ix.x)'
1467            WRITE(clfile, clfmt) TRIM(clfile), '_', karea-1
1468         ENDIF
1469      ENDIF
1470#if defined key_agrif
1471      IF( .NOT. Agrif_Root() )   clfile = TRIM(Agrif_CFixed())//'_'//TRIM(clfile)
1472      knum=Agrif_Get_Unit()
1473#else
1474      knum=get_unit()
1475#endif
1476      IF( TRIM(cdfile) == '/dev/null' )   clfile = TRIM(cdfile)   ! force the use of /dev/null
1477      !
1478      IF(       cdacce(1:6) == 'DIRECT' )  THEN   ! cdacce has always more than 6 characters
1479         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat, RECL=klengh         , ERR=100, IOSTAT=iost )
1480      ELSE IF( TRIM(cdstat) == 'APPEND' )  THEN   ! cdstat can have less than 6 characters
1481         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS='UNKNOWN', POSITION='APPEND', ERR=100, IOSTAT=iost )
1482      ELSE
1483         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )
1484      ENDIF
1485      IF( iost /= 0 .AND. TRIM(clfile) == '/dev/null' ) &   ! for windows
1486         &  OPEN(UNIT=knum,FILE='NUL', FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )   
1487      IF( iost == 0 ) THEN
1488         IF(ldwp .AND. kout > 0) THEN
1489            WRITE(kout,*) '     file   : ', TRIM(clfile),' open ok'
1490            WRITE(kout,*) '     unit   = ', knum
1491            WRITE(kout,*) '     status = ', cdstat
1492            WRITE(kout,*) '     form   = ', cdform
1493            WRITE(kout,*) '     access = ', cdacce
1494            WRITE(kout,*)
1495         ENDIF
1496      ENDIF
1497100   CONTINUE
1498      IF( iost /= 0 ) THEN
1499         WRITE(ctmp1,*) ' ===>>>> : bad opening file: ', TRIM(clfile)
1500         WRITE(ctmp2,*) ' =======   ===  '
1501         WRITE(ctmp3,*) '           unit   = ', knum
1502         WRITE(ctmp4,*) '           status = ', cdstat
1503         WRITE(ctmp5,*) '           form   = ', cdform
1504         WRITE(ctmp6,*) '           access = ', cdacce
1505         WRITE(ctmp7,*) '           iostat = ', iost
1506         WRITE(ctmp8,*) '           we stop. verify the file '
1507         CALL ctl_stop( 'STOP', ctmp1, ctmp2, ctmp3, ctmp4, ctmp5, ctmp6, ctmp7, ctmp8 )
1508      ENDIF
1509      !
1510   END SUBROUTINE ctl_opn
1511
1512
1513   SUBROUTINE ctl_nam ( kios, cdnam )
1514      !!----------------------------------------------------------------------
1515      !!                  ***  ROUTINE ctl_nam  ***
1516      !!
1517      !! ** Purpose :   Informations when error while reading a namelist
1518      !!
1519      !! ** Method  :   Fortan open
1520      !!----------------------------------------------------------------------
1521      INTEGER                                , INTENT(inout) ::   kios    ! IO status after reading the namelist
1522      CHARACTER(len=*)                       , INTENT(in   ) ::   cdnam   ! group name of namelist for which error occurs
1523      !
1524      CHARACTER(len=5) ::   clios   ! string to convert iostat in character for print
1525      !!----------------------------------------------------------------------
1526      !
1527      WRITE (clios, '(I5.0)')   kios
1528      IF( kios < 0 ) THEN         
1529         CALL ctl_warn( 'end of record or file while reading namelist '   &
1530            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
1531      ENDIF
1532      !
1533      IF( kios > 0 ) THEN
1534         CALL ctl_stop( 'misspelled variable in namelist '   &
1535            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
1536      ENDIF
1537      kios = 0
1538      !
1539   END SUBROUTINE ctl_nam
1540
1541
1542   INTEGER FUNCTION get_unit()
1543      !!----------------------------------------------------------------------
1544      !!                  ***  FUNCTION  get_unit  ***
1545      !!
1546      !! ** Purpose :   return the index of an unused logical unit
1547      !!----------------------------------------------------------------------
1548      LOGICAL :: llopn
1549      !!----------------------------------------------------------------------
1550      !
1551      get_unit = 15   ! choose a unit that is big enough then it is not already used in NEMO
1552      llopn = .TRUE.
1553      DO WHILE( (get_unit < 998) .AND. llopn )
1554         get_unit = get_unit + 1
1555         INQUIRE( unit = get_unit, opened = llopn )
1556      END DO
1557      IF( (get_unit == 999) .AND. llopn ) THEN
1558         CALL ctl_stop( 'STOP', 'get_unit: All logical units until 999 are used...' )
1559      ENDIF
1560      !
1561   END FUNCTION get_unit
1562
1563   SUBROUTINE load_nml( cdnambuff , cdnamfile, kout, ldwp)
1564      CHARACTER(LEN=:)    , ALLOCATABLE, INTENT(INOUT) :: cdnambuff
1565      CHARACTER(LEN=*), INTENT(IN )                :: cdnamfile
1566      CHARACTER(LEN=256)                           :: chline
1567      CHARACTER(LEN=1)                             :: csp
1568      INTEGER, INTENT(IN)                          :: kout
1569      LOGICAL, INTENT(IN)                          :: ldwp  !: .true. only for the root broadcaster
1570      INTEGER                                      :: itot, iun, iltc, inl, ios, itotsav
1571      !
1572      !csp = NEW_LINE('A')
1573      ! a new line character is the best seperator but some systems (e.g.Cray)
1574      ! seem to terminate namelist reads from internal files early if they
1575      ! encounter new-lines. Use a single space for safety.
1576      csp = ' '
1577      !
1578      ! Check if the namelist buffer has already been allocated. Return if it has.
1579      !
1580      IF ( ALLOCATED( cdnambuff ) ) RETURN
1581      IF( ldwp ) THEN
1582         !
1583         ! Open namelist file
1584         !
1585         CALL ctl_opn( iun, cdnamfile, 'OLD', 'FORMATTED', 'SEQUENTIAL', -1, kout, ldwp )
1586         !
1587         ! First pass: count characters excluding comments and trimable white space
1588         !
1589         itot=0
1590     10  READ(iun,'(A256)',END=20,ERR=20) chline
1591         iltc = LEN_TRIM(chline)
1592         IF ( iltc.GT.0 ) THEN
1593          inl = INDEX(chline, '!') 
1594          IF( inl.eq.0 ) THEN
1595           itot = itot + iltc + 1                                ! +1 for the newline character
1596          ELSEIF( inl.GT.0 .AND. LEN_TRIM( chline(1:inl-1) ).GT.0 ) THEN
1597           itot = itot + inl                                  !  includes +1 for the newline character
1598          ENDIF
1599         ENDIF
1600         GOTO 10
1601     20  CONTINUE
1602         !
1603         ! Allocate text cdnambuff for condensed namelist
1604         !
1605!$AGRIF_DO_NOT_TREAT
1606         ALLOCATE( CHARACTER(LEN=itot) :: cdnambuff )
1607!$AGRIF_END_DO_NOT_TREAT
1608         itotsav = itot
1609         !
1610         ! Second pass: read and transfer pruned characters into cdnambuff
1611         !
1612         REWIND(iun)
1613         itot=1
1614     30  READ(iun,'(A256)',END=40,ERR=40) chline
1615         iltc = LEN_TRIM(chline)
1616         IF ( iltc.GT.0 ) THEN
1617          inl = INDEX(chline, '!')
1618          IF( inl.eq.0 ) THEN
1619           inl = iltc
1620          ELSE
1621           inl = inl - 1
1622          ENDIF
1623          IF( inl.GT.0 .AND. LEN_TRIM( chline(1:inl) ).GT.0 ) THEN
1624             cdnambuff(itot:itot+inl-1) = chline(1:inl)
1625             WRITE( cdnambuff(itot+inl:itot+inl), '(a)' ) csp
1626             itot = itot + inl + 1
1627          ENDIF
1628         ENDIF
1629         GOTO 30
1630     40  CONTINUE
1631         itot = itot - 1
1632         IF( itotsav .NE. itot ) WRITE(*,*) 'WARNING in load_nml. Allocated ',itotsav,' for read buffer; but used ',itot
1633         !
1634         ! Close namelist file
1635         !
1636         CLOSE(iun)
1637         !write(*,'(32A)') cdnambuff
1638      ENDIF
1639#if defined key_mpp_mpi
1640      CALL mpp_bcast_nml( cdnambuff, itot )
1641#endif
1642  END SUBROUTINE load_nml
1643
1644
1645   !!----------------------------------------------------------------------
1646END MODULE lib_mpp
Note: See TracBrowser for help on using the repository browser.