New URL for NEMO forge!   http://forge.nemo-ocean.eu

Since March 2022 along with NEMO 4.2 release, the code development moved to a self-hosted GitLab.
This present forge is now archived and remained online for history.
lib_mpp.F90 in NEMO/trunk/src/OCE/LBC – NEMO

source: NEMO/trunk/src/OCE/LBC/lib_mpp.F90 @ 13438

Last change on this file since 13438 was 13438, checked in by smasson, 4 years ago

trunk: bugfix to compile and run the code without key_mpp_mpi, see #2495

  • Property svn:keywords set to Id
File size: 69.2 KB
Line 
1MODULE lib_mpp
2   !!======================================================================
3   !!                       ***  MODULE  lib_mpp  ***
4   !! Ocean numerics:  massively parallel processing library
5   !!=====================================================================
6   !! History :  OPA  !  1994  (M. Guyon, J. Escobar, M. Imbard)  Original code
7   !!            7.0  !  1997  (A.M. Treguier)  SHMEM additions
8   !!            8.0  !  1998  (M. Imbard, J. Escobar, L. Colombet ) SHMEM and MPI
9   !!                 !  1998  (J.M. Molines) Open boundary conditions
10   !!   NEMO     1.0  !  2003  (J.M. Molines, G. Madec)  F90, free form
11   !!                 !  2003  (J.M. Molines) add mpp_ini_north(_3d,_2d)
12   !!             -   !  2004  (R. Bourdalle Badie)  isend option in mpi
13   !!                 !  2004  (J.M. Molines) minloc, maxloc
14   !!             -   !  2005  (G. Madec, S. Masson)  npolj=5,6 F-point & ice cases
15   !!             -   !  2005  (R. Redler) Replacement of MPI_COMM_WORLD except for MPI_Abort
16   !!             -   !  2005  (R. Benshila, G. Madec)  add extra halo case
17   !!             -   !  2008  (R. Benshila) add mpp_ini_ice
18   !!            3.2  !  2009  (R. Benshila) SHMEM suppression, north fold in lbc_nfd
19   !!            3.2  !  2009  (O. Marti)    add mpp_ini_znl
20   !!            4.0  !  2011  (G. Madec)  move ctl_ routines from in_out_manager
21   !!            3.5  !  2012  (S.Mocavero, I. Epicoco) Add mpp_lnk_bdy_3d/2d routines to optimize the BDY comm.
22   !!            3.5  !  2013  (C. Ethe, G. Madec)  message passing arrays as local variables
23   !!            3.5  !  2013  (S.Mocavero, I.Epicoco - CMCC) north fold optimizations
24   !!            3.6  !  2015  (O. Tintó and M. Castrillo - BSC) Added '_multiple' case for 2D lbc and max
25   !!            4.0  !  2017  (G. Madec) automatique allocation of array argument (use any 3rd dimension)
26   !!             -   !  2017  (G. Madec) create generic.h90 files to generate all lbc and north fold routines
27   !!----------------------------------------------------------------------
28
29   !!----------------------------------------------------------------------
30   !!   ctl_stop      : update momentum and tracer Kz from a tke scheme
31   !!   ctl_warn      : initialization, namelist read, and parameters control
32   !!   ctl_opn       : Open file and check if required file is available.
33   !!   ctl_nam       : Prints informations when an error occurs while reading a namelist
34   !!   load_nml      : Read, condense and buffer namelist file into character array for use as an internal file
35   !!----------------------------------------------------------------------
36   !!----------------------------------------------------------------------
37   !!   mpp_start     : get local communicator its size and rank
38   !!   mpp_lnk       : interface (defined in lbclnk) for message passing of 2d or 3d arrays (mpp_lnk_2d, mpp_lnk_3d)
39   !!   mpp_lnk_icb   : interface for message passing of 2d arrays with extra halo for icebergs (mpp_lnk_2d_icb)
40   !!   mpprecv       :
41   !!   mppsend       :
42   !!   mppscatter    :
43   !!   mppgather     :
44   !!   mpp_min       : generic interface for mppmin_int , mppmin_a_int , mppmin_real, mppmin_a_real
45   !!   mpp_max       : generic interface for mppmax_int , mppmax_a_int , mppmax_real, mppmax_a_real
46   !!   mpp_sum       : generic interface for mppsum_int , mppsum_a_int , mppsum_real, mppsum_a_real
47   !!   mpp_minloc    :
48   !!   mpp_maxloc    :
49   !!   mppsync       :
50   !!   mppstop       :
51   !!   mpp_ini_north : initialisation of north fold
52   !!   mpp_lbc_north_icb : alternative to mpp_nfd for extra outer halo with icebergs
53   !!   mpp_bcast_nml : broadcast/receive namelist character buffer from reading process to all others
54   !!----------------------------------------------------------------------
55   USE dom_oce        ! ocean space and time domain
56   USE in_out_manager ! I/O manager
57
58   IMPLICIT NONE
59   PRIVATE
60   !
61   PUBLIC   ctl_stop, ctl_warn, ctl_opn, ctl_nam, load_nml
62   PUBLIC   mpp_start, mppstop, mppsync, mpp_comm_free
63   PUBLIC   mpp_ini_north
64   PUBLIC   mpp_min, mpp_max, mpp_sum, mpp_minloc, mpp_maxloc
65   PUBLIC   mpp_delay_max, mpp_delay_sum, mpp_delay_rcv
66   PUBLIC   mppscatter, mppgather
67   PUBLIC   mpp_ini_znl
68   PUBLIC   mppsend, mpprecv                          ! needed by TAM and ICB routines
69   PUBLIC   mppsend_sp, mpprecv_sp                          ! needed by TAM and ICB routines
70   PUBLIC   mppsend_dp, mpprecv_dp                          ! needed by TAM and ICB routines
71   PUBLIC   mpp_report
72   PUBLIC   mpp_bcast_nml
73   PUBLIC   tic_tac
74#if ! defined key_mpp_mpi
75   PUBLIC MPI_wait
76   PUBLIC MPI_Wtime
77#endif
78   
79   !! * Interfaces
80   !! define generic interface for these routine as they are called sometimes
81   !! with scalar arguments instead of array arguments, which causes problems
82   !! for the compilation on AIX system as well as NEC and SGI. Ok on COMPACQ
83   INTERFACE mpp_min
84      MODULE PROCEDURE mppmin_a_int, mppmin_int
85      MODULE PROCEDURE mppmin_a_real_sp, mppmin_real_sp
86      MODULE PROCEDURE mppmin_a_real_dp, mppmin_real_dp
87   END INTERFACE
88   INTERFACE mpp_max
89      MODULE PROCEDURE mppmax_a_int, mppmax_int
90      MODULE PROCEDURE mppmax_a_real_sp, mppmax_real_sp
91      MODULE PROCEDURE mppmax_a_real_dp, mppmax_real_dp
92   END INTERFACE
93   INTERFACE mpp_sum
94      MODULE PROCEDURE mppsum_a_int, mppsum_int
95      MODULE PROCEDURE mppsum_realdd, mppsum_a_realdd
96      MODULE PROCEDURE mppsum_a_real_sp, mppsum_real_sp
97      MODULE PROCEDURE mppsum_a_real_dp, mppsum_real_dp
98   END INTERFACE
99   INTERFACE mpp_minloc
100      MODULE PROCEDURE mpp_minloc2d_sp ,mpp_minloc3d_sp
101      MODULE PROCEDURE mpp_minloc2d_dp ,mpp_minloc3d_dp
102   END INTERFACE
103   INTERFACE mpp_maxloc
104      MODULE PROCEDURE mpp_maxloc2d_sp ,mpp_maxloc3d_sp
105      MODULE PROCEDURE mpp_maxloc2d_dp ,mpp_maxloc3d_dp
106   END INTERFACE
107
108   !! ========================= !!
109   !!  MPI  variable definition !!
110   !! ========================= !!
111#if   defined key_mpp_mpi
112!$AGRIF_DO_NOT_TREAT
113   INCLUDE 'mpif.h'
114!$AGRIF_END_DO_NOT_TREAT
115   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .TRUE.    !: mpp flag
116#else   
117   INTEGER, PUBLIC, PARAMETER ::   MPI_STATUS_SIZE = 1
118   INTEGER, PUBLIC, PARAMETER ::   MPI_REAL = 4
119   INTEGER, PUBLIC, PARAMETER ::   MPI_DOUBLE_PRECISION = 8
120   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .FALSE.    !: mpp flag
121#endif
122
123   INTEGER, PARAMETER         ::   nprocmax = 2**10   ! maximun dimension (required to be a power of 2)
124
125   INTEGER, PUBLIC ::   mppsize        ! number of process
126   INTEGER, PUBLIC ::   mpprank        ! process number  [ 0 - size-1 ]
127!$AGRIF_DO_NOT_TREAT
128   INTEGER, PUBLIC ::   mpi_comm_oce   ! opa local communicator
129!$AGRIF_END_DO_NOT_TREAT
130
131   INTEGER :: MPI_SUMDD
132
133   ! variables used for zonal integration
134   INTEGER, PUBLIC ::   ncomm_znl       !: communicator made by the processors on the same zonal average
135   LOGICAL, PUBLIC ::   l_znl_root      !: True on the 'left'most processor on the same row
136   INTEGER         ::   ngrp_znl        !  group ID for the znl processors
137   INTEGER         ::   ndim_rank_znl   !  number of processors on the same zonal average
138   INTEGER, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_znl  ! dimension ndim_rank_znl, number of the procs into the same znl domain
139
140   ! North fold condition in mpp_mpi with jpni > 1 (PUBLIC for TAM)
141   INTEGER, PUBLIC ::   ngrp_world        !: group ID for the world processors
142   INTEGER, PUBLIC ::   ngrp_opa          !: group ID for the opa processors
143   INTEGER, PUBLIC ::   ngrp_north        !: group ID for the northern processors (to be fold)
144   INTEGER, PUBLIC ::   ncomm_north       !: communicator made by the processors belonging to ngrp_north
145   INTEGER, PUBLIC ::   ndim_rank_north   !: number of 'sea' processor in the northern line (can be /= jpni !)
146   INTEGER, PUBLIC ::   njmppmax          !: value of njmpp for the processors of the northern line
147   INTEGER, PUBLIC ::   north_root        !: number (in the comm_opa) of proc 0 in the northern comm
148   INTEGER, PUBLIC, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_north   !: dimension ndim_rank_north
149
150   ! Communications summary report
151   CHARACTER(len=lca), DIMENSION(:), ALLOCATABLE ::   crname_lbc                   !: names of lbc_lnk calling routines
152   CHARACTER(len=lca), DIMENSION(:), ALLOCATABLE ::   crname_glb                   !: names of global comm calling routines
153   CHARACTER(len=lca), DIMENSION(:), ALLOCATABLE ::   crname_dlg                   !: names of delayed global comm calling routines
154   INTEGER, PUBLIC                               ::   ncom_stp = 0                 !: copy of time step # istp
155   INTEGER, PUBLIC                               ::   ncom_fsbc = 1                !: copy of sbc time step # nn_fsbc
156   INTEGER, PUBLIC                               ::   ncom_freq                    !: frequency of comm diagnostic
157   INTEGER, PUBLIC , DIMENSION(:,:), ALLOCATABLE ::   ncomm_sequence               !: size of communicated arrays (halos)
158   INTEGER, PARAMETER, PUBLIC                    ::   ncom_rec_max = 5000          !: max number of communication record
159   INTEGER, PUBLIC                               ::   n_sequence_lbc = 0           !: # of communicated arraysvia lbc
160   INTEGER, PUBLIC                               ::   n_sequence_glb = 0           !: # of global communications
161   INTEGER, PUBLIC                               ::   n_sequence_dlg = 0           !: # of delayed global communications
162   INTEGER, PUBLIC                               ::   numcom = -1                  !: logical unit for communicaton report
163   LOGICAL, PUBLIC                               ::   l_full_nf_update = .TRUE.    !: logical for a full (2lines) update of bc at North fold report
164   INTEGER,                    PARAMETER, PUBLIC ::   nbdelay = 2       !: number of delayed operations
165   !: name (used as id) of allreduce-delayed operations
166   ! Warning: we must use the same character length in an array constructor (at least for gcc compiler)
167   CHARACTER(len=32), DIMENSION(nbdelay), PUBLIC ::   c_delaylist = (/ 'cflice', 'fwb   ' /)
168   !: component name where the allreduce-delayed operation is performed
169   CHARACTER(len=3),  DIMENSION(nbdelay), PUBLIC ::   c_delaycpnt = (/ 'ICE'   , 'OCE' /)
170   TYPE, PUBLIC ::   DELAYARR
171      REAL(   wp), POINTER, DIMENSION(:) ::  z1d => NULL()
172      COMPLEX(dp), POINTER, DIMENSION(:) ::  y1d => NULL()
173   END TYPE DELAYARR
174   TYPE( DELAYARR ), DIMENSION(nbdelay), PUBLIC, SAVE  ::   todelay         !: must have SAVE for default initialization of DELAYARR
175   INTEGER,          DIMENSION(nbdelay), PUBLIC        ::   ndelayid = -1   !: mpi request id of the delayed operations
176
177   ! timing summary report
178   REAL(dp), DIMENSION(2), PUBLIC ::  waiting_time = 0._dp
179   REAL(dp)              , PUBLIC ::  compute_time = 0._dp, elapsed_time = 0._dp
180   
181   REAL(wp), DIMENSION(:), ALLOCATABLE, SAVE ::   tampon   ! buffer in case of bsend
182
183   LOGICAL, PUBLIC ::   ln_nnogather                !: namelist control of northfold comms
184   LOGICAL, PUBLIC ::   l_north_nogather = .FALSE.  !: internal control of northfold comms
185   
186   !! * Substitutions
187#  include "do_loop_substitute.h90"
188   !!----------------------------------------------------------------------
189   !! NEMO/OCE 4.0 , NEMO Consortium (2018)
190   !! $Id$
191   !! Software governed by the CeCILL license (see ./LICENSE)
192   !!----------------------------------------------------------------------
193CONTAINS
194
195   SUBROUTINE mpp_start( localComm )
196      !!----------------------------------------------------------------------
197      !!                  ***  routine mpp_start  ***
198      !!
199      !! ** Purpose :   get mpi_comm_oce, mpprank and mppsize
200      !!----------------------------------------------------------------------
201      INTEGER         , OPTIONAL   , INTENT(in   ) ::   localComm    !
202      !
203      INTEGER ::   ierr
204      LOGICAL ::   llmpi_init
205      !!----------------------------------------------------------------------
206#if defined key_mpp_mpi
207      !
208      CALL mpi_initialized ( llmpi_init, ierr )
209      IF( ierr /= MPI_SUCCESS ) CALL ctl_stop( 'STOP', ' lib_mpp: Error in routine mpi_initialized' )
210
211      IF( .NOT. llmpi_init ) THEN
212         IF( PRESENT(localComm) ) THEN
213            WRITE(ctmp1,*) ' lib_mpp: You cannot provide a local communicator '
214            WRITE(ctmp2,*) '          without calling MPI_Init before ! '
215            CALL ctl_stop( 'STOP', ctmp1, ctmp2 )
216         ENDIF
217         CALL mpi_init( ierr )
218         IF( ierr /= MPI_SUCCESS ) CALL ctl_stop( 'STOP', ' lib_mpp: Error in routine mpi_init' )
219      ENDIF
220       
221      IF( PRESENT(localComm) ) THEN
222         IF( Agrif_Root() ) THEN
223            mpi_comm_oce = localComm
224         ENDIF
225      ELSE
226         CALL mpi_comm_dup( mpi_comm_world, mpi_comm_oce, ierr)
227         IF( ierr /= MPI_SUCCESS ) CALL ctl_stop( 'STOP', ' lib_mpp: Error in routine mpi_comm_dup' )
228      ENDIF
229
230# if defined key_agrif
231      IF( Agrif_Root() ) THEN
232         CALL Agrif_MPI_Init(mpi_comm_oce)
233      ELSE
234         CALL Agrif_MPI_set_grid_comm(mpi_comm_oce)
235      ENDIF
236# endif
237
238      CALL mpi_comm_rank( mpi_comm_oce, mpprank, ierr )
239      CALL mpi_comm_size( mpi_comm_oce, mppsize, ierr )
240      !
241      CALL MPI_OP_CREATE(DDPDD_MPI, .TRUE., MPI_SUMDD, ierr)
242      !
243#else
244      IF( PRESENT( localComm ) ) mpi_comm_oce = localComm
245      mppsize = 1
246      mpprank = 0
247#endif
248   END SUBROUTINE mpp_start
249
250
251   SUBROUTINE mppsend( ktyp, pmess, kbytes, kdest, md_req )
252      !!----------------------------------------------------------------------
253      !!                  ***  routine mppsend  ***
254      !!
255      !! ** Purpose :   Send messag passing array
256      !!
257      !!----------------------------------------------------------------------
258      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
259      INTEGER , INTENT(in   ) ::   kbytes     ! size of the array pmess
260      INTEGER , INTENT(in   ) ::   kdest      ! receive process number
261      INTEGER , INTENT(in   ) ::   ktyp       ! tag of the message
262      INTEGER , INTENT(in   ) ::   md_req     ! argument for isend
263      !!
264      INTEGER ::   iflag
265      INTEGER :: mpi_working_type
266      !!----------------------------------------------------------------------
267      !
268#if defined key_mpp_mpi
269      IF (wp == dp) THEN
270         mpi_working_type = mpi_double_precision
271      ELSE
272         mpi_working_type = mpi_real
273      END IF
274      CALL mpi_isend( pmess, kbytes, mpi_working_type, kdest , ktyp, mpi_comm_oce, md_req, iflag )
275#endif
276      !
277   END SUBROUTINE mppsend
278
279
280   SUBROUTINE mppsend_dp( ktyp, pmess, kbytes, kdest, md_req )
281      !!----------------------------------------------------------------------
282      !!                  ***  routine mppsend  ***
283      !!
284      !! ** Purpose :   Send messag passing array
285      !!
286      !!----------------------------------------------------------------------
287      REAL(dp), INTENT(inout) ::   pmess(*)   ! array of real
288      INTEGER , INTENT(in   ) ::   kbytes     ! size of the array pmess
289      INTEGER , INTENT(in   ) ::   kdest      ! receive process number
290      INTEGER , INTENT(in   ) ::   ktyp       ! tag of the message
291      INTEGER , INTENT(in   ) ::   md_req     ! argument for isend
292      !!
293      INTEGER ::   iflag
294      !!----------------------------------------------------------------------
295      !
296#if defined key_mpp_mpi
297      CALL mpi_isend( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce, md_req, iflag )
298#endif
299      !
300   END SUBROUTINE mppsend_dp
301
302
303   SUBROUTINE mppsend_sp( ktyp, pmess, kbytes, kdest, md_req )
304      !!----------------------------------------------------------------------
305      !!                  ***  routine mppsend  ***
306      !!
307      !! ** Purpose :   Send messag passing array
308      !!
309      !!----------------------------------------------------------------------
310      REAL(sp), INTENT(inout) ::   pmess(*)   ! array of real
311      INTEGER , INTENT(in   ) ::   kbytes     ! size of the array pmess
312      INTEGER , INTENT(in   ) ::   kdest      ! receive process number
313      INTEGER , INTENT(in   ) ::   ktyp       ! tag of the message
314      INTEGER , INTENT(in   ) ::   md_req     ! argument for isend
315      !!
316      INTEGER ::   iflag
317      !!----------------------------------------------------------------------
318      !
319#if defined key_mpp_mpi
320      CALL mpi_isend( pmess, kbytes, mpi_real, kdest , ktyp, mpi_comm_oce, md_req, iflag )
321#endif
322      !
323   END SUBROUTINE mppsend_sp
324
325
326   SUBROUTINE mpprecv( ktyp, pmess, kbytes, ksource )
327      !!----------------------------------------------------------------------
328      !!                  ***  routine mpprecv  ***
329      !!
330      !! ** Purpose :   Receive messag passing array
331      !!
332      !!----------------------------------------------------------------------
333      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
334      INTEGER , INTENT(in   ) ::   kbytes     ! suze of the array pmess
335      INTEGER , INTENT(in   ) ::   ktyp       ! Tag of the recevied message
336      INTEGER, OPTIONAL, INTENT(in) :: ksource    ! source process number
337      !!
338      INTEGER :: istatus(mpi_status_size)
339      INTEGER :: iflag
340      INTEGER :: use_source
341      INTEGER :: mpi_working_type
342      !!----------------------------------------------------------------------
343      !
344#if defined key_mpp_mpi
345      ! If a specific process number has been passed to the receive call,
346      ! use that one. Default is to use mpi_any_source
347      use_source = mpi_any_source
348      IF( PRESENT(ksource) )   use_source = ksource
349      !
350      IF (wp == dp) THEN
351         mpi_working_type = mpi_double_precision
352      ELSE
353         mpi_working_type = mpi_real
354      END IF
355      CALL mpi_recv( pmess, kbytes, mpi_working_type, use_source, ktyp, mpi_comm_oce, istatus, iflag )
356#endif
357      !
358   END SUBROUTINE mpprecv
359
360   SUBROUTINE mpprecv_dp( ktyp, pmess, kbytes, ksource )
361      !!----------------------------------------------------------------------
362      !!                  ***  routine mpprecv  ***
363      !!
364      !! ** Purpose :   Receive messag passing array
365      !!
366      !!----------------------------------------------------------------------
367      REAL(dp), INTENT(inout) ::   pmess(*)   ! array of real
368      INTEGER , INTENT(in   ) ::   kbytes     ! suze of the array pmess
369      INTEGER , INTENT(in   ) ::   ktyp       ! Tag of the recevied message
370      INTEGER, OPTIONAL, INTENT(in) :: ksource    ! source process number
371      !!
372      INTEGER :: istatus(mpi_status_size)
373      INTEGER :: iflag
374      INTEGER :: use_source
375      !!----------------------------------------------------------------------
376      !
377#if defined key_mpp_mpi
378      ! If a specific process number has been passed to the receive call,
379      ! use that one. Default is to use mpi_any_source
380      use_source = mpi_any_source
381      IF( PRESENT(ksource) )   use_source = ksource
382      !
383      CALL mpi_recv( pmess, kbytes, mpi_double_precision, use_source, ktyp, mpi_comm_oce, istatus, iflag )
384#endif
385      !
386   END SUBROUTINE mpprecv_dp
387
388
389   SUBROUTINE mpprecv_sp( ktyp, pmess, kbytes, ksource )
390      !!----------------------------------------------------------------------
391      !!                  ***  routine mpprecv  ***
392      !!
393      !! ** Purpose :   Receive messag passing array
394      !!
395      !!----------------------------------------------------------------------
396      REAL(sp), INTENT(inout) ::   pmess(*)   ! array of real
397      INTEGER , INTENT(in   ) ::   kbytes     ! suze of the array pmess
398      INTEGER , INTENT(in   ) ::   ktyp       ! Tag of the recevied message
399      INTEGER, OPTIONAL, INTENT(in) :: ksource    ! source process number
400      !!
401      INTEGER :: istatus(mpi_status_size)
402      INTEGER :: iflag
403      INTEGER :: use_source
404      !!----------------------------------------------------------------------
405      !
406#if defined key_mpp_mpi
407      ! If a specific process number has been passed to the receive call,
408      ! use that one. Default is to use mpi_any_source
409      use_source = mpi_any_source
410      IF( PRESENT(ksource) )   use_source = ksource
411      !
412      CALL mpi_recv( pmess, kbytes, mpi_real, use_source, ktyp, mpi_comm_oce, istatus, iflag )
413#endif
414      !
415   END SUBROUTINE mpprecv_sp
416
417
418   SUBROUTINE mppgather( ptab, kp, pio )
419      !!----------------------------------------------------------------------
420      !!                   ***  routine mppgather  ***
421      !!
422      !! ** Purpose :   Transfert between a local subdomain array and a work
423      !!     array which is distributed following the vertical level.
424      !!
425      !!----------------------------------------------------------------------
426      REAL(wp), DIMENSION(jpi,jpj)      , INTENT(in   ) ::   ptab   ! subdomain input array
427      INTEGER                           , INTENT(in   ) ::   kp     ! record length
428      REAL(wp), DIMENSION(jpi,jpj,jpnij), INTENT(  out) ::   pio    ! subdomain input array
429      !!
430      INTEGER :: itaille, ierror   ! temporary integer
431      !!---------------------------------------------------------------------
432      !
433      itaille = jpi * jpj
434#if defined key_mpp_mpi
435      CALL mpi_gather( ptab, itaille, mpi_double_precision, pio, itaille     ,   &
436         &                            mpi_double_precision, kp , mpi_comm_oce, ierror )
437#else
438      pio(:,:,1) = ptab(:,:)
439#endif
440      !
441   END SUBROUTINE mppgather
442
443
444   SUBROUTINE mppscatter( pio, kp, ptab )
445      !!----------------------------------------------------------------------
446      !!                  ***  routine mppscatter  ***
447      !!
448      !! ** Purpose :   Transfert between awork array which is distributed
449      !!      following the vertical level and the local subdomain array.
450      !!
451      !!----------------------------------------------------------------------
452      REAL(wp), DIMENSION(jpi,jpj,jpnij)  ::   pio    ! output array
453      INTEGER                             ::   kp     ! Tag (not used with MPI
454      REAL(wp), DIMENSION(jpi,jpj)        ::   ptab   ! subdomain array input
455      !!
456      INTEGER :: itaille, ierror   ! temporary integer
457      !!---------------------------------------------------------------------
458      !
459      itaille = jpi * jpj
460      !
461#if defined key_mpp_mpi
462      CALL mpi_scatter( pio, itaille, mpi_double_precision, ptab, itaille     ,   &
463         &                            mpi_double_precision, kp  , mpi_comm_oce, ierror )
464#else
465      ptab(:,:) = pio(:,:,1)
466#endif
467      !
468   END SUBROUTINE mppscatter
469
470   
471   SUBROUTINE mpp_delay_sum( cdname, cdelay, y_in, pout, ldlast, kcom )
472     !!----------------------------------------------------------------------
473      !!                   ***  routine mpp_delay_sum  ***
474      !!
475      !! ** Purpose :   performed delayed mpp_sum, the result is received on next call
476      !!
477      !!----------------------------------------------------------------------
478      CHARACTER(len=*), INTENT(in   )               ::   cdname  ! name of the calling subroutine
479      CHARACTER(len=*), INTENT(in   )               ::   cdelay  ! name (used as id) of the delayed operation
480      COMPLEX(dp),      INTENT(in   ), DIMENSION(:) ::   y_in
481      REAL(wp),         INTENT(  out), DIMENSION(:) ::   pout
482      LOGICAL,          INTENT(in   )               ::   ldlast  ! true if this is the last time we call this routine
483      INTEGER,          INTENT(in   ), OPTIONAL     ::   kcom
484      !!
485      INTEGER ::   ji, isz
486      INTEGER ::   idvar
487      INTEGER ::   ierr, ilocalcomm
488      COMPLEX(dp), ALLOCATABLE, DIMENSION(:) ::   ytmp
489      !!----------------------------------------------------------------------
490#if defined key_mpp_mpi
491      ilocalcomm = mpi_comm_oce
492      IF( PRESENT(kcom) )   ilocalcomm = kcom
493
494      isz = SIZE(y_in)
495     
496      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ld_dlg = .TRUE. )
497
498      idvar = -1
499      DO ji = 1, nbdelay
500         IF( TRIM(cdelay) == TRIM(c_delaylist(ji)) ) idvar = ji
501      END DO
502      IF ( idvar == -1 )   CALL ctl_stop( 'STOP',' mpp_delay_sum : please add a new delayed exchange for '//TRIM(cdname) )
503
504      IF ( ndelayid(idvar) == 0 ) THEN         ! first call    with restart: %z1d defined in iom_delay_rst
505         !                                       --------------------------
506         IF ( SIZE(todelay(idvar)%z1d) /= isz ) THEN                  ! Check dimension coherence
507            IF(lwp) WRITE(numout,*) ' WARNING: the nb of delayed variables in restart file is not the model one'
508            DEALLOCATE(todelay(idvar)%z1d)
509            ndelayid(idvar) = -1                                      ! do as if we had no restart
510         ELSE
511            ALLOCATE(todelay(idvar)%y1d(isz))
512            todelay(idvar)%y1d(:) = CMPLX(todelay(idvar)%z1d(:), 0., wp)   ! create %y1d, complex variable needed by mpi_sumdd
513         END IF
514      ENDIF
515     
516      IF( ndelayid(idvar) == -1 ) THEN         ! first call without restart: define %y1d and %z1d from y_in with blocking allreduce
517         !                                       --------------------------
518         ALLOCATE(todelay(idvar)%z1d(isz), todelay(idvar)%y1d(isz))   ! allocate also %z1d as used for the restart
519         CALL mpi_allreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ierr )   ! get %y1d
520         todelay(idvar)%z1d(:) = REAL(todelay(idvar)%y1d(:), wp)      ! define %z1d from %y1d
521      ENDIF
522
523      IF( ndelayid(idvar) > 0 )   CALL mpp_delay_rcv( idvar )         ! make sure %z1d is received
524
525      ! send back pout from todelay(idvar)%z1d defined at previous call
526      pout(:) = todelay(idvar)%z1d(:)
527
528      ! send y_in into todelay(idvar)%y1d with a non-blocking communication
529# if defined key_mpi2
530      IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
531      CALL  mpi_allreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ierr )
532      ndelayid(idvar) = 1
533      IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
534# else
535      CALL mpi_iallreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ndelayid(idvar), ierr )
536# endif
537#else
538      pout(:) = REAL(y_in(:), wp)
539#endif
540
541   END SUBROUTINE mpp_delay_sum
542
543   
544   SUBROUTINE mpp_delay_max( cdname, cdelay, p_in, pout, ldlast, kcom )
545      !!----------------------------------------------------------------------
546      !!                   ***  routine mpp_delay_max  ***
547      !!
548      !! ** Purpose :   performed delayed mpp_max, the result is received on next call
549      !!
550      !!----------------------------------------------------------------------
551      CHARACTER(len=*), INTENT(in   )                 ::   cdname  ! name of the calling subroutine
552      CHARACTER(len=*), INTENT(in   )                 ::   cdelay  ! name (used as id) of the delayed operation
553      REAL(wp),         INTENT(in   ), DIMENSION(:)   ::   p_in    !
554      REAL(wp),         INTENT(  out), DIMENSION(:)   ::   pout    !
555      LOGICAL,          INTENT(in   )                 ::   ldlast  ! true if this is the last time we call this routine
556      INTEGER,          INTENT(in   ), OPTIONAL       ::   kcom
557      !!
558      INTEGER ::   ji, isz
559      INTEGER ::   idvar
560      INTEGER ::   ierr, ilocalcomm
561      INTEGER ::   MPI_TYPE
562      !!----------------------------------------------------------------------
563     
564#if defined key_mpp_mpi
565      if( wp == dp ) then
566         MPI_TYPE = MPI_DOUBLE_PRECISION
567      else if ( wp == sp ) then
568         MPI_TYPE = MPI_REAL
569      else
570        CALL ctl_stop( "Error defining type, wp is neither dp nor sp" )
571   
572      end if
573
574      ilocalcomm = mpi_comm_oce
575      IF( PRESENT(kcom) )   ilocalcomm = kcom
576
577      isz = SIZE(p_in)
578
579      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ld_dlg = .TRUE. )
580
581      idvar = -1
582      DO ji = 1, nbdelay
583         IF( TRIM(cdelay) == TRIM(c_delaylist(ji)) ) idvar = ji
584      END DO
585      IF ( idvar == -1 )   CALL ctl_stop( 'STOP',' mpp_delay_max : please add a new delayed exchange for '//TRIM(cdname) )
586
587      IF ( ndelayid(idvar) == 0 ) THEN         ! first call    with restart: %z1d defined in iom_delay_rst
588         !                                       --------------------------
589         IF ( SIZE(todelay(idvar)%z1d) /= isz ) THEN                  ! Check dimension coherence
590            IF(lwp) WRITE(numout,*) ' WARNING: the nb of delayed variables in restart file is not the model one'
591            DEALLOCATE(todelay(idvar)%z1d)
592            ndelayid(idvar) = -1                                      ! do as if we had no restart
593         END IF
594      ENDIF
595
596      IF( ndelayid(idvar) == -1 ) THEN         ! first call without restart: define %z1d from p_in with a blocking allreduce
597         !                                       --------------------------
598         ALLOCATE(todelay(idvar)%z1d(isz))
599         CALL mpi_allreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ierr )   ! get %z1d
600      ENDIF
601
602      IF( ndelayid(idvar) > 0 )   CALL mpp_delay_rcv( idvar )         ! make sure %z1d is received
603
604      ! send back pout from todelay(idvar)%z1d defined at previous call
605      pout(:) = todelay(idvar)%z1d(:)
606
607      ! send p_in into todelay(idvar)%z1d with a non-blocking communication
608# if defined key_mpi2
609      IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
610      CALL  mpi_allreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_TYPE, mpi_max, ilocalcomm, ndelayid(idvar), ierr )
611      IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
612# else
613      CALL mpi_iallreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_TYPE, mpi_max, ilocalcomm, ndelayid(idvar), ierr )
614# endif
615#else
616      pout(:) = p_in(:)
617#endif
618
619   END SUBROUTINE mpp_delay_max
620
621   
622   SUBROUTINE mpp_delay_rcv( kid )
623      !!----------------------------------------------------------------------
624      !!                   ***  routine mpp_delay_rcv  ***
625      !!
626      !! ** Purpose :  force barrier for delayed mpp (needed for restart)
627      !!
628      !!----------------------------------------------------------------------
629      INTEGER,INTENT(in   )      ::  kid 
630      INTEGER ::   ierr
631      !!----------------------------------------------------------------------
632#if defined key_mpp_mpi
633      IF( ndelayid(kid) /= -2 ) THEN 
634#if ! defined key_mpi2
635         IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
636         CALL mpi_wait( ndelayid(kid), MPI_STATUS_IGNORE, ierr )                        ! make sure todelay(kid) is received
637         IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
638#endif
639         IF( ASSOCIATED(todelay(kid)%y1d) )   todelay(kid)%z1d(:) = REAL(todelay(kid)%y1d(:), wp)  ! define %z1d from %y1d
640         ndelayid(kid) = -2   ! add flag to know that mpi_wait was already called on kid
641      ENDIF
642#endif
643   END SUBROUTINE mpp_delay_rcv
644
645   SUBROUTINE mpp_bcast_nml( cdnambuff , kleng )
646      CHARACTER(LEN=:)    , ALLOCATABLE, INTENT(INOUT) :: cdnambuff
647      INTEGER                          , INTENT(INOUT) :: kleng
648      !!----------------------------------------------------------------------
649      !!                  ***  routine mpp_bcast_nml  ***
650      !!
651      !! ** Purpose :   broadcast namelist character buffer
652      !!
653      !!----------------------------------------------------------------------
654      !!
655      INTEGER ::   iflag
656      !!----------------------------------------------------------------------
657      !
658#if defined key_mpp_mpi
659      call MPI_BCAST(kleng, 1, MPI_INT, 0, mpi_comm_oce, iflag)
660      call MPI_BARRIER(mpi_comm_oce, iflag)
661!$AGRIF_DO_NOT_TREAT
662      IF ( .NOT. ALLOCATED(cdnambuff) ) ALLOCATE( CHARACTER(LEN=kleng) :: cdnambuff )
663!$AGRIF_END_DO_NOT_TREAT
664      call MPI_BCAST(cdnambuff, kleng, MPI_CHARACTER, 0, mpi_comm_oce, iflag)
665      call MPI_BARRIER(mpi_comm_oce, iflag)
666#endif
667      !
668   END SUBROUTINE mpp_bcast_nml
669
670   
671   !!----------------------------------------------------------------------
672   !!    ***  mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real  ***
673   !!   
674   !!----------------------------------------------------------------------
675   !!
676#  define OPERATION_MAX
677#  define INTEGER_TYPE
678#  define DIM_0d
679#     define ROUTINE_ALLREDUCE           mppmax_int
680#     include "mpp_allreduce_generic.h90"
681#     undef ROUTINE_ALLREDUCE
682#  undef DIM_0d
683#  define DIM_1d
684#     define ROUTINE_ALLREDUCE           mppmax_a_int
685#     include "mpp_allreduce_generic.h90"
686#     undef ROUTINE_ALLREDUCE
687#  undef DIM_1d
688#  undef INTEGER_TYPE
689!
690   !!
691   !!   ----   SINGLE PRECISION VERSIONS
692   !!
693#  define SINGLE_PRECISION
694#  define REAL_TYPE
695#  define DIM_0d
696#     define ROUTINE_ALLREDUCE           mppmax_real_sp
697#     include "mpp_allreduce_generic.h90"
698#     undef ROUTINE_ALLREDUCE
699#  undef DIM_0d
700#  define DIM_1d
701#     define ROUTINE_ALLREDUCE           mppmax_a_real_sp
702#     include "mpp_allreduce_generic.h90"
703#     undef ROUTINE_ALLREDUCE
704#  undef DIM_1d
705#  undef SINGLE_PRECISION
706   !!
707   !!
708   !!   ----   DOUBLE PRECISION VERSIONS
709   !!
710!
711#  define DIM_0d
712#     define ROUTINE_ALLREDUCE           mppmax_real_dp
713#     include "mpp_allreduce_generic.h90"
714#     undef ROUTINE_ALLREDUCE
715#  undef DIM_0d
716#  define DIM_1d
717#     define ROUTINE_ALLREDUCE           mppmax_a_real_dp
718#     include "mpp_allreduce_generic.h90"
719#     undef ROUTINE_ALLREDUCE
720#  undef DIM_1d
721#  undef REAL_TYPE
722#  undef OPERATION_MAX
723   !!----------------------------------------------------------------------
724   !!    ***  mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real  ***
725   !!   
726   !!----------------------------------------------------------------------
727   !!
728#  define OPERATION_MIN
729#  define INTEGER_TYPE
730#  define DIM_0d
731#     define ROUTINE_ALLREDUCE           mppmin_int
732#     include "mpp_allreduce_generic.h90"
733#     undef ROUTINE_ALLREDUCE
734#  undef DIM_0d
735#  define DIM_1d
736#     define ROUTINE_ALLREDUCE           mppmin_a_int
737#     include "mpp_allreduce_generic.h90"
738#     undef ROUTINE_ALLREDUCE
739#  undef DIM_1d
740#  undef INTEGER_TYPE
741!
742   !!
743   !!   ----   SINGLE PRECISION VERSIONS
744   !!
745#  define SINGLE_PRECISION
746#  define REAL_TYPE
747#  define DIM_0d
748#     define ROUTINE_ALLREDUCE           mppmin_real_sp
749#     include "mpp_allreduce_generic.h90"
750#     undef ROUTINE_ALLREDUCE
751#  undef DIM_0d
752#  define DIM_1d
753#     define ROUTINE_ALLREDUCE           mppmin_a_real_sp
754#     include "mpp_allreduce_generic.h90"
755#     undef ROUTINE_ALLREDUCE
756#  undef DIM_1d
757#  undef SINGLE_PRECISION
758   !!
759   !!   ----   DOUBLE PRECISION VERSIONS
760   !!
761
762#  define DIM_0d
763#     define ROUTINE_ALLREDUCE           mppmin_real_dp
764#     include "mpp_allreduce_generic.h90"
765#     undef ROUTINE_ALLREDUCE
766#  undef DIM_0d
767#  define DIM_1d
768#     define ROUTINE_ALLREDUCE           mppmin_a_real_dp
769#     include "mpp_allreduce_generic.h90"
770#     undef ROUTINE_ALLREDUCE
771#  undef DIM_1d
772#  undef REAL_TYPE
773#  undef OPERATION_MIN
774
775   !!----------------------------------------------------------------------
776   !!    ***  mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real  ***
777   !!   
778   !!   Global sum of 1D array or a variable (integer, real or complex)
779   !!----------------------------------------------------------------------
780   !!
781#  define OPERATION_SUM
782#  define INTEGER_TYPE
783#  define DIM_0d
784#     define ROUTINE_ALLREDUCE           mppsum_int
785#     include "mpp_allreduce_generic.h90"
786#     undef ROUTINE_ALLREDUCE
787#  undef DIM_0d
788#  define DIM_1d
789#     define ROUTINE_ALLREDUCE           mppsum_a_int
790#     include "mpp_allreduce_generic.h90"
791#     undef ROUTINE_ALLREDUCE
792#  undef DIM_1d
793#  undef INTEGER_TYPE
794
795   !!
796   !!   ----   SINGLE PRECISION VERSIONS
797   !!
798#  define OPERATION_SUM
799#  define SINGLE_PRECISION
800#  define REAL_TYPE
801#  define DIM_0d
802#     define ROUTINE_ALLREDUCE           mppsum_real_sp
803#     include "mpp_allreduce_generic.h90"
804#     undef ROUTINE_ALLREDUCE
805#  undef DIM_0d
806#  define DIM_1d
807#     define ROUTINE_ALLREDUCE           mppsum_a_real_sp
808#     include "mpp_allreduce_generic.h90"
809#     undef ROUTINE_ALLREDUCE
810#  undef DIM_1d
811#  undef REAL_TYPE
812#  undef OPERATION_SUM
813
814#  undef SINGLE_PRECISION
815
816   !!
817   !!   ----   DOUBLE PRECISION VERSIONS
818   !!
819#  define OPERATION_SUM
820#  define REAL_TYPE
821#  define DIM_0d
822#     define ROUTINE_ALLREDUCE           mppsum_real_dp
823#     include "mpp_allreduce_generic.h90"
824#     undef ROUTINE_ALLREDUCE
825#  undef DIM_0d
826#  define DIM_1d
827#     define ROUTINE_ALLREDUCE           mppsum_a_real_dp
828#     include "mpp_allreduce_generic.h90"
829#     undef ROUTINE_ALLREDUCE
830#  undef DIM_1d
831#  undef REAL_TYPE
832#  undef OPERATION_SUM
833
834#  define OPERATION_SUM_DD
835#  define COMPLEX_TYPE
836#  define DIM_0d
837#     define ROUTINE_ALLREDUCE           mppsum_realdd
838#     include "mpp_allreduce_generic.h90"
839#     undef ROUTINE_ALLREDUCE
840#  undef DIM_0d
841#  define DIM_1d
842#     define ROUTINE_ALLREDUCE           mppsum_a_realdd
843#     include "mpp_allreduce_generic.h90"
844#     undef ROUTINE_ALLREDUCE
845#  undef DIM_1d
846#  undef COMPLEX_TYPE
847#  undef OPERATION_SUM_DD
848
849   !!----------------------------------------------------------------------
850   !!    ***  mpp_minloc2d, mpp_minloc3d, mpp_maxloc2d, mpp_maxloc3d
851   !!   
852   !!----------------------------------------------------------------------
853   !!
854   !!
855   !!   ----   SINGLE PRECISION VERSIONS
856   !!
857#  define SINGLE_PRECISION
858#  define OPERATION_MINLOC
859#  define DIM_2d
860#     define ROUTINE_LOC           mpp_minloc2d_sp
861#     include "mpp_loc_generic.h90"
862#     undef ROUTINE_LOC
863#  undef DIM_2d
864#  define DIM_3d
865#     define ROUTINE_LOC           mpp_minloc3d_sp
866#     include "mpp_loc_generic.h90"
867#     undef ROUTINE_LOC
868#  undef DIM_3d
869#  undef OPERATION_MINLOC
870
871#  define OPERATION_MAXLOC
872#  define DIM_2d
873#     define ROUTINE_LOC           mpp_maxloc2d_sp
874#     include "mpp_loc_generic.h90"
875#     undef ROUTINE_LOC
876#  undef DIM_2d
877#  define DIM_3d
878#     define ROUTINE_LOC           mpp_maxloc3d_sp
879#     include "mpp_loc_generic.h90"
880#     undef ROUTINE_LOC
881#  undef DIM_3d
882#  undef OPERATION_MAXLOC
883#  undef SINGLE_PRECISION
884   !!
885   !!   ----   DOUBLE PRECISION VERSIONS
886   !!
887#  define OPERATION_MINLOC
888#  define DIM_2d
889#     define ROUTINE_LOC           mpp_minloc2d_dp
890#     include "mpp_loc_generic.h90"
891#     undef ROUTINE_LOC
892#  undef DIM_2d
893#  define DIM_3d
894#     define ROUTINE_LOC           mpp_minloc3d_dp
895#     include "mpp_loc_generic.h90"
896#     undef ROUTINE_LOC
897#  undef DIM_3d
898#  undef OPERATION_MINLOC
899
900#  define OPERATION_MAXLOC
901#  define DIM_2d
902#     define ROUTINE_LOC           mpp_maxloc2d_dp
903#     include "mpp_loc_generic.h90"
904#     undef ROUTINE_LOC
905#  undef DIM_2d
906#  define DIM_3d
907#     define ROUTINE_LOC           mpp_maxloc3d_dp
908#     include "mpp_loc_generic.h90"
909#     undef ROUTINE_LOC
910#  undef DIM_3d
911#  undef OPERATION_MAXLOC
912
913
914   SUBROUTINE mppsync()
915      !!----------------------------------------------------------------------
916      !!                  ***  routine mppsync  ***
917      !!
918      !! ** Purpose :   Massively parallel processors, synchroneous
919      !!
920      !!-----------------------------------------------------------------------
921      INTEGER :: ierror
922      !!-----------------------------------------------------------------------
923      !
924#if defined key_mpp_mpi
925      CALL mpi_barrier( mpi_comm_oce, ierror )
926#endif
927      !
928   END SUBROUTINE mppsync
929
930
931   SUBROUTINE mppstop( ld_abort ) 
932      !!----------------------------------------------------------------------
933      !!                  ***  routine mppstop  ***
934      !!
935      !! ** purpose :   Stop massively parallel processors method
936      !!
937      !!----------------------------------------------------------------------
938      LOGICAL, OPTIONAL, INTENT(in) :: ld_abort    ! source process number
939      LOGICAL ::   ll_abort
940      INTEGER ::   info
941      !!----------------------------------------------------------------------
942      ll_abort = .FALSE.
943      IF( PRESENT(ld_abort) ) ll_abort = ld_abort
944      !
945#if defined key_mpp_mpi
946      IF(ll_abort) THEN
947         CALL mpi_abort( MPI_COMM_WORLD )
948      ELSE
949         CALL mppsync
950         CALL mpi_finalize( info )
951      ENDIF
952#endif
953      IF( ll_abort ) STOP 123
954      !
955   END SUBROUTINE mppstop
956
957
958   SUBROUTINE mpp_comm_free( kcom )
959      !!----------------------------------------------------------------------
960      INTEGER, INTENT(in) ::   kcom
961      !!
962      INTEGER :: ierr
963      !!----------------------------------------------------------------------
964      !
965#if defined key_mpp_mpi
966      CALL MPI_COMM_FREE(kcom, ierr)
967#endif
968      !
969   END SUBROUTINE mpp_comm_free
970
971
972   SUBROUTINE mpp_ini_znl( kumout )
973      !!----------------------------------------------------------------------
974      !!               ***  routine mpp_ini_znl  ***
975      !!
976      !! ** Purpose :   Initialize special communicator for computing zonal sum
977      !!
978      !! ** Method  : - Look for processors in the same row
979      !!              - Put their number in nrank_znl
980      !!              - Create group for the znl processors
981      !!              - Create a communicator for znl processors
982      !!              - Determine if processor should write znl files
983      !!
984      !! ** output
985      !!      ndim_rank_znl = number of processors on the same row
986      !!      ngrp_znl = group ID for the znl processors
987      !!      ncomm_znl = communicator for the ice procs.
988      !!      n_znl_root = number (in the world) of proc 0 in the ice comm.
989      !!
990      !!----------------------------------------------------------------------
991      INTEGER, INTENT(in) ::   kumout   ! ocean.output logical units
992      !
993      INTEGER :: jproc      ! dummy loop integer
994      INTEGER :: ierr, ii   ! local integer
995      INTEGER, ALLOCATABLE, DIMENSION(:) ::   kwork
996      !!----------------------------------------------------------------------
997#if defined key_mpp_mpi
998      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_world     : ', ngrp_world
999      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_world : ', mpi_comm_world
1000      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_oce   : ', mpi_comm_oce
1001      !
1002      ALLOCATE( kwork(jpnij), STAT=ierr )
1003      IF( ierr /= 0 ) CALL ctl_stop( 'STOP', 'mpp_ini_znl : failed to allocate 1D array of length jpnij')
1004
1005      IF( jpnj == 1 ) THEN
1006         ngrp_znl  = ngrp_world
1007         ncomm_znl = mpi_comm_oce
1008      ELSE
1009         !
1010         CALL MPI_ALLGATHER ( njmpp, 1, mpi_integer, kwork, 1, mpi_integer, mpi_comm_oce, ierr )
1011         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - kwork pour njmpp : ', kwork
1012         !-$$        CALL flush(numout)
1013         !
1014         ! Count number of processors on the same row
1015         ndim_rank_znl = 0
1016         DO jproc=1,jpnij
1017            IF ( kwork(jproc) == njmpp ) THEN
1018               ndim_rank_znl = ndim_rank_znl + 1
1019            ENDIF
1020         END DO
1021         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ndim_rank_znl : ', ndim_rank_znl
1022         !-$$        CALL flush(numout)
1023         ! Allocate the right size to nrank_znl
1024         IF (ALLOCATED (nrank_znl)) DEALLOCATE(nrank_znl)
1025         ALLOCATE(nrank_znl(ndim_rank_znl))
1026         ii = 0
1027         nrank_znl (:) = 0
1028         DO jproc=1,jpnij
1029            IF ( kwork(jproc) == njmpp) THEN
1030               ii = ii + 1
1031               nrank_znl(ii) = jproc -1
1032            ENDIF
1033         END DO
1034         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - nrank_znl : ', nrank_znl
1035         !-$$        CALL flush(numout)
1036
1037         ! Create the opa group
1038         CALL MPI_COMM_GROUP(mpi_comm_oce,ngrp_opa,ierr)
1039         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_opa : ', ngrp_opa
1040         !-$$        CALL flush(numout)
1041
1042         ! Create the znl group from the opa group
1043         CALL MPI_GROUP_INCL  ( ngrp_opa, ndim_rank_znl, nrank_znl, ngrp_znl, ierr )
1044         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_znl ', ngrp_znl
1045         !-$$        CALL flush(numout)
1046
1047         ! Create the znl communicator from the opa communicator, ie the pool of procs in the same row
1048         CALL MPI_COMM_CREATE ( mpi_comm_oce, ngrp_znl, ncomm_znl, ierr )
1049         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ncomm_znl ', ncomm_znl
1050         !-$$        CALL flush(numout)
1051         !
1052      END IF
1053
1054      ! Determines if processor if the first (starting from i=1) on the row
1055      IF ( jpni == 1 ) THEN
1056         l_znl_root = .TRUE.
1057      ELSE
1058         l_znl_root = .FALSE.
1059         kwork (1) = nimpp
1060         CALL mpp_min ( 'lib_mpp', kwork(1), kcom = ncomm_znl)
1061         IF ( nimpp == kwork(1)) l_znl_root = .TRUE.
1062      END IF
1063
1064      DEALLOCATE(kwork)
1065#endif
1066
1067   END SUBROUTINE mpp_ini_znl
1068
1069
1070   SUBROUTINE mpp_ini_north
1071      !!----------------------------------------------------------------------
1072      !!               ***  routine mpp_ini_north  ***
1073      !!
1074      !! ** Purpose :   Initialize special communicator for north folding
1075      !!      condition together with global variables needed in the mpp folding
1076      !!
1077      !! ** Method  : - Look for northern processors
1078      !!              - Put their number in nrank_north
1079      !!              - Create groups for the world processors and the north processors
1080      !!              - Create a communicator for northern processors
1081      !!
1082      !! ** output
1083      !!      njmppmax = njmpp for northern procs
1084      !!      ndim_rank_north = number of processors in the northern line
1085      !!      nrank_north (ndim_rank_north) = number  of the northern procs.
1086      !!      ngrp_world = group ID for the world processors
1087      !!      ngrp_north = group ID for the northern processors
1088      !!      ncomm_north = communicator for the northern procs.
1089      !!      north_root = number (in the world) of proc 0 in the northern comm.
1090      !!
1091      !!----------------------------------------------------------------------
1092      INTEGER ::   ierr
1093      INTEGER ::   jjproc
1094      INTEGER ::   ii, ji
1095      !!----------------------------------------------------------------------
1096      !
1097#if defined key_mpp_mpi
1098      njmppmax = MAXVAL( njmppt )
1099      !
1100      ! Look for how many procs on the northern boundary
1101      ndim_rank_north = 0
1102      DO jjproc = 1, jpni
1103         IF( nfproc(jjproc) /= -1 )   ndim_rank_north = ndim_rank_north + 1
1104      END DO
1105      !
1106      ! Allocate the right size to nrank_north
1107      IF (ALLOCATED (nrank_north)) DEALLOCATE(nrank_north)
1108      ALLOCATE( nrank_north(ndim_rank_north) )
1109
1110      ! Fill the nrank_north array with proc. number of northern procs.
1111      ! Note : the rank start at 0 in MPI
1112      ii = 0
1113      DO ji = 1, jpni
1114         IF ( nfproc(ji) /= -1   ) THEN
1115            ii=ii+1
1116            nrank_north(ii)=nfproc(ji)
1117         END IF
1118      END DO
1119      !
1120      ! create the world group
1121      CALL MPI_COMM_GROUP( mpi_comm_oce, ngrp_world, ierr )
1122      !
1123      ! Create the North group from the world group
1124      CALL MPI_GROUP_INCL( ngrp_world, ndim_rank_north, nrank_north, ngrp_north, ierr )
1125      !
1126      ! Create the North communicator , ie the pool of procs in the north group
1127      CALL MPI_COMM_CREATE( mpi_comm_oce, ngrp_north, ncomm_north, ierr )
1128      !
1129#endif
1130   END SUBROUTINE mpp_ini_north
1131
1132
1133   SUBROUTINE DDPDD_MPI( ydda, yddb, ilen, itype )
1134      !!---------------------------------------------------------------------
1135      !!   Routine DDPDD_MPI: used by reduction operator MPI_SUMDD
1136      !!
1137      !!   Modification of original codes written by David H. Bailey
1138      !!   This subroutine computes yddb(i) = ydda(i)+yddb(i)
1139      !!---------------------------------------------------------------------
1140      INTEGER                     , INTENT(in)    ::   ilen, itype
1141      COMPLEX(dp), DIMENSION(ilen), INTENT(in)    ::   ydda
1142      COMPLEX(dp), DIMENSION(ilen), INTENT(inout) ::   yddb
1143      !
1144      REAL(dp) :: zerr, zt1, zt2    ! local work variables
1145      INTEGER  :: ji, ztmp           ! local scalar
1146      !!---------------------------------------------------------------------
1147      !
1148      ztmp = itype   ! avoid compilation warning
1149      !
1150      DO ji=1,ilen
1151      ! Compute ydda + yddb using Knuth's trick.
1152         zt1  = real(ydda(ji)) + real(yddb(ji))
1153         zerr = zt1 - real(ydda(ji))
1154         zt2  = ((real(yddb(ji)) - zerr) + (real(ydda(ji)) - (zt1 - zerr))) &
1155                + aimag(ydda(ji)) + aimag(yddb(ji))
1156
1157         ! The result is zt1 + zt2, after normalization.
1158         yddb(ji) = cmplx ( zt1 + zt2, zt2 - ((zt1 + zt2) - zt1),wp )
1159      END DO
1160      !
1161   END SUBROUTINE DDPDD_MPI
1162
1163
1164   SUBROUTINE mpp_report( cdname, kpk, kpl, kpf, ld_lbc, ld_glb, ld_dlg )
1165      !!----------------------------------------------------------------------
1166      !!                  ***  routine mpp_report  ***
1167      !!
1168      !! ** Purpose :   report use of mpp routines per time-setp
1169      !!
1170      !!----------------------------------------------------------------------
1171      CHARACTER(len=*),           INTENT(in   ) ::   cdname      ! name of the calling subroutine
1172      INTEGER         , OPTIONAL, INTENT(in   ) ::   kpk, kpl, kpf
1173      LOGICAL         , OPTIONAL, INTENT(in   ) ::   ld_lbc, ld_glb, ld_dlg
1174      !!
1175      CHARACTER(len=128)                        ::   ccountname  ! name of a subroutine to count communications
1176      LOGICAL ::   ll_lbc, ll_glb, ll_dlg
1177      INTEGER ::    ji,  jj,  jk,  jh, jf, jcount   ! dummy loop indices
1178      !!----------------------------------------------------------------------
1179#if defined key_mpp_mpi
1180      !
1181      ll_lbc = .FALSE.
1182      IF( PRESENT(ld_lbc) ) ll_lbc = ld_lbc
1183      ll_glb = .FALSE.
1184      IF( PRESENT(ld_glb) ) ll_glb = ld_glb
1185      ll_dlg = .FALSE.
1186      IF( PRESENT(ld_dlg) ) ll_dlg = ld_dlg
1187      !
1188      ! find the smallest common frequency: default = frequency product, if multiple, choose the larger of the 2 frequency
1189      ncom_freq = ncom_fsbc
1190      !
1191      IF ( ncom_stp == nit000+ncom_freq ) THEN   ! avoid to count extra communications in potential initializations at nit000
1192         IF( ll_lbc ) THEN
1193            IF( .NOT. ALLOCATED(ncomm_sequence) ) ALLOCATE( ncomm_sequence(ncom_rec_max,2) )
1194            IF( .NOT. ALLOCATED(    crname_lbc) ) ALLOCATE(     crname_lbc(ncom_rec_max  ) )
1195            n_sequence_lbc = n_sequence_lbc + 1
1196            IF( n_sequence_lbc > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1197            crname_lbc(n_sequence_lbc) = cdname     ! keep the name of the calling routine
1198            ncomm_sequence(n_sequence_lbc,1) = kpk*kpl   ! size of 3rd and 4th dimensions
1199            ncomm_sequence(n_sequence_lbc,2) = kpf       ! number of arrays to be treated (multi)
1200         ENDIF
1201         IF( ll_glb ) THEN
1202            IF( .NOT. ALLOCATED(crname_glb) ) ALLOCATE( crname_glb(ncom_rec_max) )
1203            n_sequence_glb = n_sequence_glb + 1
1204            IF( n_sequence_glb > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1205            crname_glb(n_sequence_glb) = cdname     ! keep the name of the calling routine
1206         ENDIF
1207         IF( ll_dlg ) THEN
1208            IF( .NOT. ALLOCATED(crname_dlg) ) ALLOCATE( crname_dlg(ncom_rec_max) )
1209            n_sequence_dlg = n_sequence_dlg + 1
1210            IF( n_sequence_dlg > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1211            crname_dlg(n_sequence_dlg) = cdname     ! keep the name of the calling routine
1212         ENDIF
1213      ELSE IF ( ncom_stp == nit000+2*ncom_freq ) THEN
1214         CALL ctl_opn( numcom, 'communication_report.txt', 'REPLACE', 'FORMATTED', 'SEQUENTIAL', -1, numout, .FALSE., narea )
1215         WRITE(numcom,*) ' '
1216         WRITE(numcom,*) ' ------------------------------------------------------------'
1217         WRITE(numcom,*) ' Communication pattern report (second oce+sbc+top time step):'
1218         WRITE(numcom,*) ' ------------------------------------------------------------'
1219         WRITE(numcom,*) ' '
1220         WRITE(numcom,'(A,I4)') ' Exchanged halos : ', n_sequence_lbc
1221         jj = 0; jk = 0; jf = 0; jh = 0
1222         DO ji = 1, n_sequence_lbc
1223            IF ( ncomm_sequence(ji,1) .GT. 1 ) jk = jk + 1
1224            IF ( ncomm_sequence(ji,2) .GT. 1 ) jf = jf + 1
1225            IF ( ncomm_sequence(ji,1) .GT. 1 .AND. ncomm_sequence(ji,2) .GT. 1 ) jj = jj + 1
1226            jh = MAX (jh, ncomm_sequence(ji,1)*ncomm_sequence(ji,2))
1227         END DO
1228         WRITE(numcom,'(A,I3)') ' 3D Exchanged halos : ', jk
1229         WRITE(numcom,'(A,I3)') ' Multi arrays exchanged halos : ', jf
1230         WRITE(numcom,'(A,I3)') '   from which 3D : ', jj
1231         WRITE(numcom,'(A,I10)') ' Array max size : ', jh*jpi*jpj
1232         WRITE(numcom,*) ' '
1233         WRITE(numcom,*) ' lbc_lnk called'
1234         DO ji = 1, n_sequence_lbc - 1
1235            IF ( crname_lbc(ji) /= 'already counted' ) THEN
1236               ccountname = crname_lbc(ji)
1237               crname_lbc(ji) = 'already counted'
1238               jcount = 1
1239               DO jj = ji + 1, n_sequence_lbc
1240                  IF ( ccountname ==  crname_lbc(jj) ) THEN
1241                     jcount = jcount + 1
1242                     crname_lbc(jj) = 'already counted'
1243                  END IF
1244               END DO
1245               WRITE(numcom,'(A, I4, A, A)') ' - ', jcount,' times by subroutine ', TRIM(ccountname)
1246            END IF
1247         END DO
1248         IF ( crname_lbc(n_sequence_lbc) /= 'already counted' ) THEN
1249            WRITE(numcom,'(A, I4, A, A)') ' - ', 1,' times by subroutine ', TRIM(crname_lbc(ncom_rec_max))
1250         END IF
1251         WRITE(numcom,*) ' '
1252         IF ( n_sequence_glb > 0 ) THEN
1253            WRITE(numcom,'(A,I4)') ' Global communications : ', n_sequence_glb
1254            jj = 1
1255            DO ji = 2, n_sequence_glb
1256               IF( crname_glb(ji-1) /= crname_glb(ji) ) THEN
1257                  WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(ji-1))
1258                  jj = 0
1259               END IF
1260               jj = jj + 1 
1261            END DO
1262            WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(n_sequence_glb))
1263            DEALLOCATE(crname_glb)
1264         ELSE
1265            WRITE(numcom,*) ' No MPI global communication '
1266         ENDIF
1267         WRITE(numcom,*) ' '
1268         IF ( n_sequence_dlg > 0 ) THEN
1269            WRITE(numcom,'(A,I4)') ' Delayed global communications : ', n_sequence_dlg
1270            jj = 1
1271            DO ji = 2, n_sequence_dlg
1272               IF( crname_dlg(ji-1) /= crname_dlg(ji) ) THEN
1273                  WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_dlg(ji-1))
1274                  jj = 0
1275               END IF
1276               jj = jj + 1 
1277            END DO
1278            WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_dlg(n_sequence_dlg))
1279            DEALLOCATE(crname_dlg)
1280         ELSE
1281            WRITE(numcom,*) ' No MPI delayed global communication '
1282         ENDIF
1283         WRITE(numcom,*) ' '
1284         WRITE(numcom,*) ' -----------------------------------------------'
1285         WRITE(numcom,*) ' '
1286         DEALLOCATE(ncomm_sequence)
1287         DEALLOCATE(crname_lbc)
1288      ENDIF
1289#endif
1290   END SUBROUTINE mpp_report
1291
1292   
1293   SUBROUTINE tic_tac (ld_tic, ld_global)
1294
1295    LOGICAL,           INTENT(IN) :: ld_tic
1296    LOGICAL, OPTIONAL, INTENT(IN) :: ld_global
1297    REAL(dp), DIMENSION(2), SAVE :: tic_wt
1298    REAL(dp),               SAVE :: tic_ct = 0._dp
1299    INTEGER :: ii
1300#if defined key_mpp_mpi
1301
1302    IF( ncom_stp <= nit000 ) RETURN
1303    IF( ncom_stp == nitend ) RETURN
1304    ii = 1
1305    IF( PRESENT( ld_global ) ) THEN
1306       IF( ld_global ) ii = 2
1307    END IF
1308   
1309    IF ( ld_tic ) THEN
1310       tic_wt(ii) = MPI_Wtime()                                                    ! start count tic->tac (waiting time)
1311       IF ( tic_ct > 0.0_dp ) compute_time = compute_time + MPI_Wtime() - tic_ct   ! cumulate count tac->tic
1312    ELSE
1313       waiting_time(ii) = waiting_time(ii) + MPI_Wtime() - tic_wt(ii)              ! cumulate count tic->tac
1314       tic_ct = MPI_Wtime()                                                        ! start count tac->tic (waiting time)
1315    ENDIF
1316#endif
1317   
1318   END SUBROUTINE tic_tac
1319
1320#if ! defined key_mpp_mpi
1321   SUBROUTINE mpi_wait(request, status, ierror)
1322      INTEGER                            , INTENT(in   ) ::   request
1323      INTEGER, DIMENSION(MPI_STATUS_SIZE), INTENT(  out) ::   status
1324      INTEGER                            , INTENT(  out) ::   ierror
1325   END SUBROUTINE mpi_wait
1326
1327   
1328   FUNCTION MPI_Wtime()
1329      REAL(wp) ::  MPI_Wtime
1330      MPI_Wtime = -1.
1331   END FUNCTION MPI_Wtime
1332#endif
1333
1334   !!----------------------------------------------------------------------
1335   !!   ctl_stop, ctl_warn, get_unit, ctl_opn, ctl_nam, load_nml   routines
1336   !!----------------------------------------------------------------------
1337
1338   SUBROUTINE ctl_stop( cd1, cd2, cd3, cd4, cd5 ,   &
1339      &                 cd6, cd7, cd8, cd9, cd10 )
1340      !!----------------------------------------------------------------------
1341      !!                  ***  ROUTINE  stop_opa  ***
1342      !!
1343      !! ** Purpose :   print in ocean.outpput file a error message and
1344      !!                increment the error number (nstop) by one.
1345      !!----------------------------------------------------------------------
1346      CHARACTER(len=*), INTENT(in   )           ::   cd1
1347      CHARACTER(len=*), INTENT(in   ), OPTIONAL ::        cd2, cd3, cd4, cd5
1348      CHARACTER(len=*), INTENT(in   ), OPTIONAL ::   cd6, cd7, cd8, cd9, cd10
1349      !
1350      CHARACTER(LEN=8) ::   clfmt            ! writing format
1351      INTEGER          ::   inum
1352      !!----------------------------------------------------------------------
1353      !
1354      nstop = nstop + 1
1355      !
1356      IF( cd1 == 'STOP' .AND. narea /= 1 ) THEN    ! Immediate stop: add an arror message in 'ocean.output' file
1357         CALL ctl_opn( inum, 'ocean.output', 'APPEND', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. )
1358         WRITE(inum,*)
1359         WRITE(inum,*) ' ==>>>   Look for "E R R O R" messages in all existing *ocean.output* files'
1360         CLOSE(inum)
1361      ENDIF
1362      IF( numout == 6 ) THEN                       ! force to open ocean.output file if not already opened
1363         CALL ctl_opn( numout, 'ocean.output', 'REPLACE', 'FORMATTED', 'SEQUENTIAL', -1, -1, .FALSE., narea )
1364      ENDIF
1365      !
1366                            WRITE(numout,*)
1367                            WRITE(numout,*) ' ===>>> : E R R O R'
1368                            WRITE(numout,*)
1369                            WRITE(numout,*) '         ==========='
1370                            WRITE(numout,*)
1371                            WRITE(numout,*) TRIM(cd1)
1372      IF( PRESENT(cd2 ) )   WRITE(numout,*) TRIM(cd2)
1373      IF( PRESENT(cd3 ) )   WRITE(numout,*) TRIM(cd3)
1374      IF( PRESENT(cd4 ) )   WRITE(numout,*) TRIM(cd4)
1375      IF( PRESENT(cd5 ) )   WRITE(numout,*) TRIM(cd5)
1376      IF( PRESENT(cd6 ) )   WRITE(numout,*) TRIM(cd6)
1377      IF( PRESENT(cd7 ) )   WRITE(numout,*) TRIM(cd7)
1378      IF( PRESENT(cd8 ) )   WRITE(numout,*) TRIM(cd8)
1379      IF( PRESENT(cd9 ) )   WRITE(numout,*) TRIM(cd9)
1380      IF( PRESENT(cd10) )   WRITE(numout,*) TRIM(cd10)
1381                            WRITE(numout,*)
1382      !
1383                               CALL FLUSH(numout    )
1384      IF( numstp     /= -1 )   CALL FLUSH(numstp    )
1385      IF( numrun     /= -1 )   CALL FLUSH(numrun    )
1386      IF( numevo_ice /= -1 )   CALL FLUSH(numevo_ice)
1387      !
1388      IF( cd1 == 'STOP' ) THEN
1389         WRITE(numout,*) 
1390         WRITE(numout,*)  'huge E-R-R-O-R : immediate stop'
1391         WRITE(numout,*) 
1392         CALL FLUSH(numout)
1393         CALL SLEEP(60)   ! make sure that all output and abort files are written by all cores. 60s should be enough...
1394         CALL mppstop( ld_abort = .true. )
1395      ENDIF
1396      !
1397   END SUBROUTINE ctl_stop
1398
1399
1400   SUBROUTINE ctl_warn( cd1, cd2, cd3, cd4, cd5,   &
1401      &                 cd6, cd7, cd8, cd9, cd10 )
1402      !!----------------------------------------------------------------------
1403      !!                  ***  ROUTINE  stop_warn  ***
1404      !!
1405      !! ** Purpose :   print in ocean.outpput file a error message and
1406      !!                increment the warning number (nwarn) by one.
1407      !!----------------------------------------------------------------------
1408      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd1, cd2, cd3, cd4, cd5
1409      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd6, cd7, cd8, cd9, cd10
1410      !!----------------------------------------------------------------------
1411      !
1412      nwarn = nwarn + 1
1413      !
1414      IF(lwp) THEN
1415                               WRITE(numout,*)
1416                               WRITE(numout,*) ' ===>>> : W A R N I N G'
1417                               WRITE(numout,*)
1418                               WRITE(numout,*) '         ==============='
1419                               WRITE(numout,*)
1420         IF( PRESENT(cd1 ) )   WRITE(numout,*) TRIM(cd1)
1421         IF( PRESENT(cd2 ) )   WRITE(numout,*) TRIM(cd2)
1422         IF( PRESENT(cd3 ) )   WRITE(numout,*) TRIM(cd3)
1423         IF( PRESENT(cd4 ) )   WRITE(numout,*) TRIM(cd4)
1424         IF( PRESENT(cd5 ) )   WRITE(numout,*) TRIM(cd5)
1425         IF( PRESENT(cd6 ) )   WRITE(numout,*) TRIM(cd6)
1426         IF( PRESENT(cd7 ) )   WRITE(numout,*) TRIM(cd7)
1427         IF( PRESENT(cd8 ) )   WRITE(numout,*) TRIM(cd8)
1428         IF( PRESENT(cd9 ) )   WRITE(numout,*) TRIM(cd9)
1429         IF( PRESENT(cd10) )   WRITE(numout,*) TRIM(cd10)
1430                               WRITE(numout,*)
1431      ENDIF
1432      CALL FLUSH(numout)
1433      !
1434   END SUBROUTINE ctl_warn
1435
1436
1437   SUBROUTINE ctl_opn( knum, cdfile, cdstat, cdform, cdacce, klengh, kout, ldwp, karea )
1438      !!----------------------------------------------------------------------
1439      !!                  ***  ROUTINE ctl_opn  ***
1440      !!
1441      !! ** Purpose :   Open file and check if required file is available.
1442      !!
1443      !! ** Method  :   Fortan open
1444      !!----------------------------------------------------------------------
1445      INTEGER          , INTENT(  out) ::   knum      ! logical unit to open
1446      CHARACTER(len=*) , INTENT(in   ) ::   cdfile    ! file name to open
1447      CHARACTER(len=*) , INTENT(in   ) ::   cdstat    ! disposition specifier
1448      CHARACTER(len=*) , INTENT(in   ) ::   cdform    ! formatting specifier
1449      CHARACTER(len=*) , INTENT(in   ) ::   cdacce    ! access specifier
1450      INTEGER          , INTENT(in   ) ::   klengh    ! record length
1451      INTEGER          , INTENT(in   ) ::   kout      ! number of logical units for write
1452      LOGICAL          , INTENT(in   ) ::   ldwp      ! boolean term for print
1453      INTEGER, OPTIONAL, INTENT(in   ) ::   karea     ! proc number
1454      !
1455      CHARACTER(len=80) ::   clfile
1456      CHARACTER(LEN=10) ::   clfmt            ! writing format
1457      INTEGER           ::   iost
1458      INTEGER           ::   idg              ! number of digits
1459      !!----------------------------------------------------------------------
1460      !
1461      ! adapt filename
1462      ! ----------------
1463      clfile = TRIM(cdfile)
1464      IF( PRESENT( karea ) ) THEN
1465         IF( karea > 1 ) THEN
1466            ! Warning: jpnij is maybe not already defined when calling ctl_opn -> use mppsize instead of jpnij
1467            idg = MAX( INT(LOG10(REAL(MAX(1,mppsize-1),wp))) + 1, 4 )      ! how many digits to we need to write? min=4, max=9
1468            WRITE(clfmt, "('(a,a,i', i1, '.', i1, ')')") idg, idg          ! '(a,a,ix.x)'
1469            WRITE(clfile, clfmt) TRIM(clfile), '_', karea-1
1470         ENDIF
1471      ENDIF
1472#if defined key_agrif
1473      IF( .NOT. Agrif_Root() )   clfile = TRIM(Agrif_CFixed())//'_'//TRIM(clfile)
1474      knum=Agrif_Get_Unit()
1475#else
1476      knum=get_unit()
1477#endif
1478      IF( TRIM(cdfile) == '/dev/null' )   clfile = TRIM(cdfile)   ! force the use of /dev/null
1479      !
1480      IF(       cdacce(1:6) == 'DIRECT' )  THEN   ! cdacce has always more than 6 characters
1481         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat, RECL=klengh         , ERR=100, IOSTAT=iost )
1482      ELSE IF( TRIM(cdstat) == 'APPEND' )  THEN   ! cdstat can have less than 6 characters
1483         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS='UNKNOWN', POSITION='APPEND', ERR=100, IOSTAT=iost )
1484      ELSE
1485         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )
1486      ENDIF
1487      IF( iost /= 0 .AND. TRIM(clfile) == '/dev/null' ) &   ! for windows
1488         &  OPEN(UNIT=knum,FILE='NUL', FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )   
1489      IF( iost == 0 ) THEN
1490         IF(ldwp .AND. kout > 0) THEN
1491            WRITE(kout,*) '     file   : ', TRIM(clfile),' open ok'
1492            WRITE(kout,*) '     unit   = ', knum
1493            WRITE(kout,*) '     status = ', cdstat
1494            WRITE(kout,*) '     form   = ', cdform
1495            WRITE(kout,*) '     access = ', cdacce
1496            WRITE(kout,*)
1497         ENDIF
1498      ENDIF
1499100   CONTINUE
1500      IF( iost /= 0 ) THEN
1501         WRITE(ctmp1,*) ' ===>>>> : bad opening file: ', TRIM(clfile)
1502         WRITE(ctmp2,*) ' =======   ===  '
1503         WRITE(ctmp3,*) '           unit   = ', knum
1504         WRITE(ctmp4,*) '           status = ', cdstat
1505         WRITE(ctmp5,*) '           form   = ', cdform
1506         WRITE(ctmp6,*) '           access = ', cdacce
1507         WRITE(ctmp7,*) '           iostat = ', iost
1508         WRITE(ctmp8,*) '           we stop. verify the file '
1509         CALL ctl_stop( 'STOP', ctmp1, ctmp2, ctmp3, ctmp4, ctmp5, ctmp6, ctmp7, ctmp8 )
1510      ENDIF
1511      !
1512   END SUBROUTINE ctl_opn
1513
1514
1515   SUBROUTINE ctl_nam ( kios, cdnam )
1516      !!----------------------------------------------------------------------
1517      !!                  ***  ROUTINE ctl_nam  ***
1518      !!
1519      !! ** Purpose :   Informations when error while reading a namelist
1520      !!
1521      !! ** Method  :   Fortan open
1522      !!----------------------------------------------------------------------
1523      INTEGER                                , INTENT(inout) ::   kios    ! IO status after reading the namelist
1524      CHARACTER(len=*)                       , INTENT(in   ) ::   cdnam   ! group name of namelist for which error occurs
1525      !
1526      CHARACTER(len=5) ::   clios   ! string to convert iostat in character for print
1527      !!----------------------------------------------------------------------
1528      !
1529      WRITE (clios, '(I5.0)')   kios
1530      IF( kios < 0 ) THEN         
1531         CALL ctl_warn( 'end of record or file while reading namelist '   &
1532            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
1533      ENDIF
1534      !
1535      IF( kios > 0 ) THEN
1536         CALL ctl_stop( 'misspelled variable in namelist '   &
1537            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
1538      ENDIF
1539      kios = 0
1540      !
1541   END SUBROUTINE ctl_nam
1542
1543
1544   INTEGER FUNCTION get_unit()
1545      !!----------------------------------------------------------------------
1546      !!                  ***  FUNCTION  get_unit  ***
1547      !!
1548      !! ** Purpose :   return the index of an unused logical unit
1549      !!----------------------------------------------------------------------
1550      LOGICAL :: llopn
1551      !!----------------------------------------------------------------------
1552      !
1553      get_unit = 15   ! choose a unit that is big enough then it is not already used in NEMO
1554      llopn = .TRUE.
1555      DO WHILE( (get_unit < 998) .AND. llopn )
1556         get_unit = get_unit + 1
1557         INQUIRE( unit = get_unit, opened = llopn )
1558      END DO
1559      IF( (get_unit == 999) .AND. llopn ) THEN
1560         CALL ctl_stop( 'STOP', 'get_unit: All logical units until 999 are used...' )
1561      ENDIF
1562      !
1563   END FUNCTION get_unit
1564
1565   SUBROUTINE load_nml( cdnambuff , cdnamfile, kout, ldwp)
1566      CHARACTER(LEN=:)    , ALLOCATABLE, INTENT(INOUT) :: cdnambuff
1567      CHARACTER(LEN=*), INTENT(IN )                :: cdnamfile
1568      CHARACTER(LEN=256)                           :: chline
1569      CHARACTER(LEN=1)                             :: csp
1570      INTEGER, INTENT(IN)                          :: kout
1571      LOGICAL, INTENT(IN)                          :: ldwp  !: .true. only for the root broadcaster
1572      INTEGER                                      :: itot, iun, iltc, inl, ios, itotsav
1573      !
1574      !csp = NEW_LINE('A')
1575      ! a new line character is the best seperator but some systems (e.g.Cray)
1576      ! seem to terminate namelist reads from internal files early if they
1577      ! encounter new-lines. Use a single space for safety.
1578      csp = ' '
1579      !
1580      ! Check if the namelist buffer has already been allocated. Return if it has.
1581      !
1582      IF ( ALLOCATED( cdnambuff ) ) RETURN
1583      IF( ldwp ) THEN
1584         !
1585         ! Open namelist file
1586         !
1587         CALL ctl_opn( iun, cdnamfile, 'OLD', 'FORMATTED', 'SEQUENTIAL', -1, kout, ldwp )
1588         !
1589         ! First pass: count characters excluding comments and trimable white space
1590         !
1591         itot=0
1592     10  READ(iun,'(A256)',END=20,ERR=20) chline
1593         iltc = LEN_TRIM(chline)
1594         IF ( iltc.GT.0 ) THEN
1595          inl = INDEX(chline, '!') 
1596          IF( inl.eq.0 ) THEN
1597           itot = itot + iltc + 1                                ! +1 for the newline character
1598          ELSEIF( inl.GT.0 .AND. LEN_TRIM( chline(1:inl-1) ).GT.0 ) THEN
1599           itot = itot + inl                                  !  includes +1 for the newline character
1600          ENDIF
1601         ENDIF
1602         GOTO 10
1603     20  CONTINUE
1604         !
1605         ! Allocate text cdnambuff for condensed namelist
1606         !
1607!$AGRIF_DO_NOT_TREAT
1608         ALLOCATE( CHARACTER(LEN=itot) :: cdnambuff )
1609!$AGRIF_END_DO_NOT_TREAT
1610         itotsav = itot
1611         !
1612         ! Second pass: read and transfer pruned characters into cdnambuff
1613         !
1614         REWIND(iun)
1615         itot=1
1616     30  READ(iun,'(A256)',END=40,ERR=40) chline
1617         iltc = LEN_TRIM(chline)
1618         IF ( iltc.GT.0 ) THEN
1619          inl = INDEX(chline, '!')
1620          IF( inl.eq.0 ) THEN
1621           inl = iltc
1622          ELSE
1623           inl = inl - 1
1624          ENDIF
1625          IF( inl.GT.0 .AND. LEN_TRIM( chline(1:inl) ).GT.0 ) THEN
1626             cdnambuff(itot:itot+inl-1) = chline(1:inl)
1627             WRITE( cdnambuff(itot+inl:itot+inl), '(a)' ) csp
1628             itot = itot + inl + 1
1629          ENDIF
1630         ENDIF
1631         GOTO 30
1632     40  CONTINUE
1633         itot = itot - 1
1634         IF( itotsav .NE. itot ) WRITE(*,*) 'WARNING in load_nml. Allocated ',itotsav,' for read buffer; but used ',itot
1635         !
1636         ! Close namelist file
1637         !
1638         CLOSE(iun)
1639         !write(*,'(32A)') cdnambuff
1640      ENDIF
1641#if defined key_mpp_mpi
1642      CALL mpp_bcast_nml( cdnambuff, itot )
1643#endif
1644  END SUBROUTINE load_nml
1645
1646
1647   !!----------------------------------------------------------------------
1648END MODULE lib_mpp
Note: See TracBrowser for help on using the repository browser.