New URL for NEMO forge!   http://forge.nemo-ocean.eu

Since March 2022 along with NEMO 4.2 release, the code development moved to a self-hosted GitLab.
This present forge is now archived and remained online for history.
lib_mpp.F90 in NEMO/branches/2020/dev_r13296_HPC-07_mocavero_mpi3/src/OCE/LBC – NEMO

source: NEMO/branches/2020/dev_r13296_HPC-07_mocavero_mpi3/src/OCE/LBC/lib_mpp.F90 @ 13303

Last change on this file since 13303 was 13303, checked in by mocavero, 4 years ago

Add neighborhood collectives lbc routines - ticket #2496

  • Property svn:keywords set to Id
File size: 71.5 KB
Line 
1MODULE lib_mpp
2   !!======================================================================
3   !!                       ***  MODULE  lib_mpp  ***
4   !! Ocean numerics:  massively parallel processing library
5   !!=====================================================================
6   !! History :  OPA  !  1994  (M. Guyon, J. Escobar, M. Imbard)  Original code
7   !!            7.0  !  1997  (A.M. Treguier)  SHMEM additions
8   !!            8.0  !  1998  (M. Imbard, J. Escobar, L. Colombet ) SHMEM and MPI
9   !!                 !  1998  (J.M. Molines) Open boundary conditions
10   !!   NEMO     1.0  !  2003  (J.M. Molines, G. Madec)  F90, free form
11   !!                 !  2003  (J.M. Molines) add mpp_ini_north(_3d,_2d)
12   !!             -   !  2004  (R. Bourdalle Badie)  isend option in mpi
13   !!                 !  2004  (J.M. Molines) minloc, maxloc
14   !!             -   !  2005  (G. Madec, S. Masson)  npolj=5,6 F-point & ice cases
15   !!             -   !  2005  (R. Redler) Replacement of MPI_COMM_WORLD except for MPI_Abort
16   !!             -   !  2005  (R. Benshila, G. Madec)  add extra halo case
17   !!             -   !  2008  (R. Benshila) add mpp_ini_ice
18   !!            3.2  !  2009  (R. Benshila) SHMEM suppression, north fold in lbc_nfd
19   !!            3.2  !  2009  (O. Marti)    add mpp_ini_znl
20   !!            4.0  !  2011  (G. Madec)  move ctl_ routines from in_out_manager
21   !!            3.5  !  2012  (S.Mocavero, I. Epicoco) Add mpp_lnk_bdy_3d/2d routines to optimize the BDY comm.
22   !!            3.5  !  2013  (C. Ethe, G. Madec)  message passing arrays as local variables
23   !!            3.5  !  2013  (S.Mocavero, I.Epicoco - CMCC) north fold optimizations
24   !!            3.6  !  2015  (O. Tintó and M. Castrillo - BSC) Added '_multiple' case for 2D lbc and max
25   !!            4.0  !  2017  (G. Madec) automatique allocation of array argument (use any 3rd dimension)
26   !!             -   !  2017  (G. Madec) create generic.h90 files to generate all lbc and north fold routines
27   !!----------------------------------------------------------------------
28
29   !!----------------------------------------------------------------------
30   !!   ctl_stop      : update momentum and tracer Kz from a tke scheme
31   !!   ctl_warn      : initialization, namelist read, and parameters control
32   !!   ctl_opn       : Open file and check if required file is available.
33   !!   ctl_nam       : Prints informations when an error occurs while reading a namelist
34   !!   load_nml      : Read, condense and buffer namelist file into character array for use as an internal file
35   !!----------------------------------------------------------------------
36   !!----------------------------------------------------------------------
37   !!   mpp_start     : get local communicator its size and rank
38   !!   mpp_lnk       : interface (defined in lbclnk) for message passing of 2d or 3d arrays (mpp_lnk_2d, mpp_lnk_3d)
39   !!   mpp_lnk_icb   : interface for message passing of 2d arrays with extra halo for icebergs (mpp_lnk_2d_icb)
40   !!   mpprecv       :
41   !!   mppsend       :
42   !!   mppscatter    :
43   !!   mppgather     :
44   !!   mpp_min       : generic interface for mppmin_int , mppmin_a_int , mppmin_real, mppmin_a_real
45   !!   mpp_max       : generic interface for mppmax_int , mppmax_a_int , mppmax_real, mppmax_a_real
46   !!   mpp_sum       : generic interface for mppsum_int , mppsum_a_int , mppsum_real, mppsum_a_real
47   !!   mpp_minloc    :
48   !!   mpp_maxloc    :
49   !!   mppsync       :
50   !!   mppstop       :
51   !!   mpp_ini_north : initialisation of north fold
52   !!   mpp_lbc_north_icb : alternative to mpp_nfd for extra outer halo with icebergs
53   !!   mpp_bcast_nml : broadcast/receive namelist character buffer from reading process to all others
54   !!----------------------------------------------------------------------
55   USE dom_oce        ! ocean space and time domain
56   USE in_out_manager ! I/O manager
57
58   IMPLICIT NONE
59   PRIVATE
60   !
61   PUBLIC   ctl_stop, ctl_warn, ctl_opn, ctl_nam, load_nml
62   PUBLIC   mpp_start, mppstop, mppsync, mpp_comm_free
63   PUBLIC   mpp_ini_north
64   PUBLIC   mpp_min, mpp_max, mpp_sum, mpp_minloc, mpp_maxloc
65   PUBLIC   mpp_delay_max, mpp_delay_sum, mpp_delay_rcv
66   PUBLIC   mppscatter, mppgather
67   PUBLIC   mpp_ini_znl
68   PUBLIC   mpp_ini_nc
69   PUBLIC   mppsend, mpprecv                          ! needed by TAM and ICB routines
70   PUBLIC   mppsend_sp, mpprecv_sp                          ! needed by TAM and ICB routines
71   PUBLIC   mppsend_dp, mpprecv_dp                          ! needed by TAM and ICB routines
72   PUBLIC   mpp_report
73   PUBLIC   mpp_bcast_nml
74   PUBLIC   tic_tac
75#if ! defined key_mpp_mpi
76   PUBLIC MPI_Wtime
77#endif
78   
79   !! * Interfaces
80   !! define generic interface for these routine as they are called sometimes
81   !! with scalar arguments instead of array arguments, which causes problems
82   !! for the compilation on AIX system as well as NEC and SGI. Ok on COMPACQ
83   INTERFACE mpp_min
84      MODULE PROCEDURE mppmin_a_int, mppmin_int
85      MODULE PROCEDURE mppmin_a_real_sp, mppmin_real_sp
86      MODULE PROCEDURE mppmin_a_real_dp, mppmin_real_dp
87   END INTERFACE
88   INTERFACE mpp_max
89      MODULE PROCEDURE mppmax_a_int, mppmax_int
90      MODULE PROCEDURE mppmax_a_real_sp, mppmax_real_sp
91      MODULE PROCEDURE mppmax_a_real_dp, mppmax_real_dp
92   END INTERFACE
93   INTERFACE mpp_sum
94      MODULE PROCEDURE mppsum_a_int, mppsum_int
95      MODULE PROCEDURE mppsum_realdd, mppsum_a_realdd
96      MODULE PROCEDURE mppsum_a_real_sp, mppsum_real_sp
97      MODULE PROCEDURE mppsum_a_real_dp, mppsum_real_dp
98   END INTERFACE
99   INTERFACE mpp_minloc
100      MODULE PROCEDURE mpp_minloc2d_sp ,mpp_minloc3d_sp
101      MODULE PROCEDURE mpp_minloc2d_dp ,mpp_minloc3d_dp
102   END INTERFACE
103   INTERFACE mpp_maxloc
104      MODULE PROCEDURE mpp_maxloc2d_sp ,mpp_maxloc3d_sp
105      MODULE PROCEDURE mpp_maxloc2d_dp ,mpp_maxloc3d_dp
106   END INTERFACE
107
108   !! ========================= !!
109   !!  MPI  variable definition !!
110   !! ========================= !!
111#if   defined key_mpp_mpi
112!$AGRIF_DO_NOT_TREAT
113   INCLUDE 'mpif.h'
114!$AGRIF_END_DO_NOT_TREAT
115   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .TRUE.    !: mpp flag
116#else   
117   INTEGER, PUBLIC, PARAMETER ::   MPI_STATUS_SIZE = 1
118   INTEGER, PUBLIC, PARAMETER ::   MPI_DOUBLE_PRECISION = 8
119   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .FALSE.    !: mpp flag
120#endif
121
122   INTEGER, PARAMETER         ::   nprocmax = 2**10   ! maximun dimension (required to be a power of 2)
123
124   INTEGER, PUBLIC ::   mppsize        ! number of process
125   INTEGER, PUBLIC ::   mpprank        ! process number  [ 0 - size-1 ]
126!$AGRIF_DO_NOT_TREAT
127   INTEGER, PUBLIC ::   mpi_comm_oce   ! opa local communicator
128!$AGRIF_END_DO_NOT_TREAT
129
130   INTEGER :: MPI_SUMDD
131
132   ! variables used for zonal integration
133   INTEGER, PUBLIC ::   ncomm_znl       !: communicator made by the processors on the same zonal average
134   LOGICAL, PUBLIC ::   l_znl_root      !: True on the 'left'most processor on the same row
135   INTEGER         ::   ngrp_znl        !  group ID for the znl processors
136   INTEGER         ::   ndim_rank_znl   !  number of processors on the same zonal average
137   INTEGER, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_znl  ! dimension ndim_rank_znl, number of the procs into the same znl domain
138
139   ! variables used for MPI3 neighbourhood collectives
140   INTEGER, PUBLIC :: mpi_nc_com                   ! MPI3 neighbourhood collectives communicator
141   INTEGER, PUBLIC, DIMENSION(:), ALLOCATABLE :: nranks
142
143   ! North fold condition in mpp_mpi with jpni > 1 (PUBLIC for TAM)
144   INTEGER, PUBLIC ::   ngrp_world        !: group ID for the world processors
145   INTEGER, PUBLIC ::   ngrp_opa          !: group ID for the opa processors
146   INTEGER, PUBLIC ::   ngrp_north        !: group ID for the northern processors (to be fold)
147   INTEGER, PUBLIC ::   ncomm_north       !: communicator made by the processors belonging to ngrp_north
148   INTEGER, PUBLIC ::   ndim_rank_north   !: number of 'sea' processor in the northern line (can be /= jpni !)
149   INTEGER, PUBLIC ::   njmppmax          !: value of njmpp for the processors of the northern line
150   INTEGER, PUBLIC ::   north_root        !: number (in the comm_opa) of proc 0 in the northern comm
151   INTEGER, PUBLIC, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_north   !: dimension ndim_rank_north
152
153   ! Communications summary report
154   CHARACTER(len=lca), DIMENSION(:), ALLOCATABLE ::   crname_lbc                   !: names of lbc_lnk calling routines
155   CHARACTER(len=lca), DIMENSION(:), ALLOCATABLE ::   crname_glb                   !: names of global comm calling routines
156   CHARACTER(len=lca), DIMENSION(:), ALLOCATABLE ::   crname_dlg                   !: names of delayed global comm calling routines
157   INTEGER, PUBLIC                               ::   ncom_stp = 0                 !: copy of time step # istp
158   INTEGER, PUBLIC                               ::   ncom_fsbc = 1                !: copy of sbc time step # nn_fsbc
159   INTEGER, PUBLIC                               ::   ncom_freq                    !: frequency of comm diagnostic
160   INTEGER, PUBLIC , DIMENSION(:,:), ALLOCATABLE ::   ncomm_sequence               !: size of communicated arrays (halos)
161   INTEGER, PARAMETER, PUBLIC                    ::   ncom_rec_max = 5000          !: max number of communication record
162   INTEGER, PUBLIC                               ::   n_sequence_lbc = 0           !: # of communicated arraysvia lbc
163   INTEGER, PUBLIC                               ::   n_sequence_glb = 0           !: # of global communications
164   INTEGER, PUBLIC                               ::   n_sequence_dlg = 0           !: # of delayed global communications
165   INTEGER, PUBLIC                               ::   numcom = -1                  !: logical unit for communicaton report
166   LOGICAL, PUBLIC                               ::   l_full_nf_update = .TRUE.    !: logical for a full (2lines) update of bc at North fold report
167   INTEGER,                    PARAMETER, PUBLIC ::   nbdelay = 2       !: number of delayed operations
168   !: name (used as id) of allreduce-delayed operations
169   ! Warning: we must use the same character length in an array constructor (at least for gcc compiler)
170   CHARACTER(len=32), DIMENSION(nbdelay), PUBLIC ::   c_delaylist = (/ 'cflice', 'fwb   ' /)
171   !: component name where the allreduce-delayed operation is performed
172   CHARACTER(len=3),  DIMENSION(nbdelay), PUBLIC ::   c_delaycpnt = (/ 'ICE'   , 'OCE' /)
173   TYPE, PUBLIC ::   DELAYARR
174      REAL(   wp), POINTER, DIMENSION(:) ::  z1d => NULL()
175      COMPLEX(dp), POINTER, DIMENSION(:) ::  y1d => NULL()
176   END TYPE DELAYARR
177   TYPE( DELAYARR ), DIMENSION(nbdelay), PUBLIC, SAVE  ::   todelay         !: must have SAVE for default initialization of DELAYARR
178   INTEGER,          DIMENSION(nbdelay), PUBLIC        ::   ndelayid = -1   !: mpi request id of the delayed operations
179
180   ! timing summary report
181   REAL(dp), DIMENSION(2), PUBLIC ::  waiting_time = 0._dp
182   REAL(dp)              , PUBLIC ::  compute_time = 0._dp, elapsed_time = 0._dp
183   
184   REAL(wp), DIMENSION(:), ALLOCATABLE, SAVE ::   tampon   ! buffer in case of bsend
185
186   LOGICAL, PUBLIC ::   ln_nnogather                !: namelist control of northfold comms
187   LOGICAL, PUBLIC ::   l_north_nogather = .FALSE.  !: internal control of northfold comms
188   
189   !! * Substitutions
190#  include "do_loop_substitute.h90"
191   !!----------------------------------------------------------------------
192   !! NEMO/OCE 4.0 , NEMO Consortium (2018)
193   !! $Id$
194   !! Software governed by the CeCILL license (see ./LICENSE)
195   !!----------------------------------------------------------------------
196CONTAINS
197
198   SUBROUTINE mpp_start( localComm )
199      !!----------------------------------------------------------------------
200      !!                  ***  routine mpp_start  ***
201      !!
202      !! ** Purpose :   get mpi_comm_oce, mpprank and mppsize
203      !!----------------------------------------------------------------------
204      INTEGER         , OPTIONAL   , INTENT(in   ) ::   localComm    !
205      !
206      INTEGER ::   ierr
207      LOGICAL ::   llmpi_init
208      !!----------------------------------------------------------------------
209#if defined key_mpp_mpi
210      !
211      CALL mpi_initialized ( llmpi_init, ierr )
212      IF( ierr /= MPI_SUCCESS ) CALL ctl_stop( 'STOP', ' lib_mpp: Error in routine mpi_initialized' )
213
214      IF( .NOT. llmpi_init ) THEN
215         IF( PRESENT(localComm) ) THEN
216            WRITE(ctmp1,*) ' lib_mpp: You cannot provide a local communicator '
217            WRITE(ctmp2,*) '          without calling MPI_Init before ! '
218            CALL ctl_stop( 'STOP', ctmp1, ctmp2 )
219         ENDIF
220         CALL mpi_init( ierr )
221         IF( ierr /= MPI_SUCCESS ) CALL ctl_stop( 'STOP', ' lib_mpp: Error in routine mpi_init' )
222      ENDIF
223       
224      IF( PRESENT(localComm) ) THEN
225         IF( Agrif_Root() ) THEN
226            mpi_comm_oce = localComm
227         ENDIF
228      ELSE
229         CALL mpi_comm_dup( mpi_comm_world, mpi_comm_oce, ierr)
230         IF( ierr /= MPI_SUCCESS ) CALL ctl_stop( 'STOP', ' lib_mpp: Error in routine mpi_comm_dup' )
231      ENDIF
232
233# if defined key_agrif
234      IF( Agrif_Root() ) THEN
235         CALL Agrif_MPI_Init(mpi_comm_oce)
236      ELSE
237         CALL Agrif_MPI_set_grid_comm(mpi_comm_oce)
238      ENDIF
239# endif
240
241      CALL mpi_comm_rank( mpi_comm_oce, mpprank, ierr )
242      CALL mpi_comm_size( mpi_comm_oce, mppsize, ierr )
243      !
244      CALL MPI_OP_CREATE(DDPDD_MPI, .TRUE., MPI_SUMDD, ierr)
245      !
246#else
247      IF( PRESENT( localComm ) ) mpi_comm_oce = localComm
248      mppsize = 1
249      mpprank = 0
250#endif
251   END SUBROUTINE mpp_start
252
253
254   SUBROUTINE mppsend( ktyp, pmess, kbytes, kdest, md_req )
255      !!----------------------------------------------------------------------
256      !!                  ***  routine mppsend  ***
257      !!
258      !! ** Purpose :   Send messag passing array
259      !!
260      !!----------------------------------------------------------------------
261      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
262      INTEGER , INTENT(in   ) ::   kbytes     ! size of the array pmess
263      INTEGER , INTENT(in   ) ::   kdest      ! receive process number
264      INTEGER , INTENT(in   ) ::   ktyp       ! tag of the message
265      INTEGER , INTENT(in   ) ::   md_req     ! argument for isend
266      !!
267      INTEGER ::   iflag
268      INTEGER :: mpi_working_type
269      !!----------------------------------------------------------------------
270      !
271#if defined key_mpp_mpi
272      IF (wp == dp) THEN
273         mpi_working_type = mpi_double_precision
274      ELSE
275         mpi_working_type = mpi_real
276      END IF
277      CALL mpi_isend( pmess, kbytes, mpi_working_type, kdest , ktyp, mpi_comm_oce, md_req, iflag )
278#endif
279      !
280   END SUBROUTINE mppsend
281
282
283   SUBROUTINE mppsend_dp( ktyp, pmess, kbytes, kdest, md_req )
284      !!----------------------------------------------------------------------
285      !!                  ***  routine mppsend  ***
286      !!
287      !! ** Purpose :   Send messag passing array
288      !!
289      !!----------------------------------------------------------------------
290      REAL(dp), INTENT(inout) ::   pmess(*)   ! array of real
291      INTEGER , INTENT(in   ) ::   kbytes     ! size of the array pmess
292      INTEGER , INTENT(in   ) ::   kdest      ! receive process number
293      INTEGER , INTENT(in   ) ::   ktyp       ! tag of the message
294      INTEGER , INTENT(in   ) ::   md_req     ! argument for isend
295      !!
296      INTEGER ::   iflag
297      !!----------------------------------------------------------------------
298      !
299#if defined key_mpp_mpi
300      CALL mpi_isend( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce, md_req, iflag )
301#endif
302      !
303   END SUBROUTINE mppsend_dp
304
305
306   SUBROUTINE mppsend_sp( ktyp, pmess, kbytes, kdest, md_req )
307      !!----------------------------------------------------------------------
308      !!                  ***  routine mppsend  ***
309      !!
310      !! ** Purpose :   Send messag passing array
311      !!
312      !!----------------------------------------------------------------------
313      REAL(sp), INTENT(inout) ::   pmess(*)   ! array of real
314      INTEGER , INTENT(in   ) ::   kbytes     ! size of the array pmess
315      INTEGER , INTENT(in   ) ::   kdest      ! receive process number
316      INTEGER , INTENT(in   ) ::   ktyp       ! tag of the message
317      INTEGER , INTENT(in   ) ::   md_req     ! argument for isend
318      !!
319      INTEGER ::   iflag
320      !!----------------------------------------------------------------------
321      !
322#if defined key_mpp_mpi
323      CALL mpi_isend( pmess, kbytes, mpi_real, kdest , ktyp, mpi_comm_oce, md_req, iflag )
324#endif
325      !
326   END SUBROUTINE mppsend_sp
327
328
329   SUBROUTINE mpprecv( ktyp, pmess, kbytes, ksource )
330      !!----------------------------------------------------------------------
331      !!                  ***  routine mpprecv  ***
332      !!
333      !! ** Purpose :   Receive messag passing array
334      !!
335      !!----------------------------------------------------------------------
336      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
337      INTEGER , INTENT(in   ) ::   kbytes     ! suze of the array pmess
338      INTEGER , INTENT(in   ) ::   ktyp       ! Tag of the recevied message
339      INTEGER, OPTIONAL, INTENT(in) :: ksource    ! source process number
340      !!
341      INTEGER :: istatus(mpi_status_size)
342      INTEGER :: iflag
343      INTEGER :: use_source
344      INTEGER :: mpi_working_type
345      !!----------------------------------------------------------------------
346      !
347#if defined key_mpp_mpi
348      ! If a specific process number has been passed to the receive call,
349      ! use that one. Default is to use mpi_any_source
350      use_source = mpi_any_source
351      IF( PRESENT(ksource) )   use_source = ksource
352      !
353      IF (wp == dp) THEN
354         mpi_working_type = mpi_double_precision
355      ELSE
356         mpi_working_type = mpi_real
357      END IF
358      CALL mpi_recv( pmess, kbytes, mpi_working_type, use_source, ktyp, mpi_comm_oce, istatus, iflag )
359#endif
360      !
361   END SUBROUTINE mpprecv
362
363   SUBROUTINE mpprecv_dp( ktyp, pmess, kbytes, ksource )
364      !!----------------------------------------------------------------------
365      !!                  ***  routine mpprecv  ***
366      !!
367      !! ** Purpose :   Receive messag passing array
368      !!
369      !!----------------------------------------------------------------------
370      REAL(dp), INTENT(inout) ::   pmess(*)   ! array of real
371      INTEGER , INTENT(in   ) ::   kbytes     ! suze of the array pmess
372      INTEGER , INTENT(in   ) ::   ktyp       ! Tag of the recevied message
373      INTEGER, OPTIONAL, INTENT(in) :: ksource    ! source process number
374      !!
375      INTEGER :: istatus(mpi_status_size)
376      INTEGER :: iflag
377      INTEGER :: use_source
378      !!----------------------------------------------------------------------
379      !
380#if defined key_mpp_mpi
381      ! If a specific process number has been passed to the receive call,
382      ! use that one. Default is to use mpi_any_source
383      use_source = mpi_any_source
384      IF( PRESENT(ksource) )   use_source = ksource
385      !
386      CALL mpi_recv( pmess, kbytes, mpi_double_precision, use_source, ktyp, mpi_comm_oce, istatus, iflag )
387#endif
388      !
389   END SUBROUTINE mpprecv_dp
390
391
392   SUBROUTINE mpprecv_sp( ktyp, pmess, kbytes, ksource )
393      !!----------------------------------------------------------------------
394      !!                  ***  routine mpprecv  ***
395      !!
396      !! ** Purpose :   Receive messag passing array
397      !!
398      !!----------------------------------------------------------------------
399      REAL(sp), INTENT(inout) ::   pmess(*)   ! array of real
400      INTEGER , INTENT(in   ) ::   kbytes     ! suze of the array pmess
401      INTEGER , INTENT(in   ) ::   ktyp       ! Tag of the recevied message
402      INTEGER, OPTIONAL, INTENT(in) :: ksource    ! source process number
403      !!
404      INTEGER :: istatus(mpi_status_size)
405      INTEGER :: iflag
406      INTEGER :: use_source
407      !!----------------------------------------------------------------------
408      !
409#if defined key_mpp_mpi
410      ! If a specific process number has been passed to the receive call,
411      ! use that one. Default is to use mpi_any_source
412      use_source = mpi_any_source
413      IF( PRESENT(ksource) )   use_source = ksource
414      !
415      CALL mpi_recv( pmess, kbytes, mpi_real, use_source, ktyp, mpi_comm_oce, istatus, iflag )
416#endif
417      !
418   END SUBROUTINE mpprecv_sp
419
420
421   SUBROUTINE mppgather( ptab, kp, pio )
422      !!----------------------------------------------------------------------
423      !!                   ***  routine mppgather  ***
424      !!
425      !! ** Purpose :   Transfert between a local subdomain array and a work
426      !!     array which is distributed following the vertical level.
427      !!
428      !!----------------------------------------------------------------------
429      REAL(wp), DIMENSION(jpi,jpj)      , INTENT(in   ) ::   ptab   ! subdomain input array
430      INTEGER                           , INTENT(in   ) ::   kp     ! record length
431      REAL(wp), DIMENSION(jpi,jpj,jpnij), INTENT(  out) ::   pio    ! subdomain input array
432      !!
433      INTEGER :: itaille, ierror   ! temporary integer
434      !!---------------------------------------------------------------------
435      !
436      itaille = jpi * jpj
437#if defined key_mpp_mpi
438      CALL mpi_gather( ptab, itaille, mpi_double_precision, pio, itaille     ,   &
439         &                            mpi_double_precision, kp , mpi_comm_oce, ierror )
440#else
441      pio(:,:,1) = ptab(:,:)
442#endif
443      !
444   END SUBROUTINE mppgather
445
446
447   SUBROUTINE mppscatter( pio, kp, ptab )
448      !!----------------------------------------------------------------------
449      !!                  ***  routine mppscatter  ***
450      !!
451      !! ** Purpose :   Transfert between awork array which is distributed
452      !!      following the vertical level and the local subdomain array.
453      !!
454      !!----------------------------------------------------------------------
455      REAL(wp), DIMENSION(jpi,jpj,jpnij)  ::   pio    ! output array
456      INTEGER                             ::   kp     ! Tag (not used with MPI
457      REAL(wp), DIMENSION(jpi,jpj)        ::   ptab   ! subdomain array input
458      !!
459      INTEGER :: itaille, ierror   ! temporary integer
460      !!---------------------------------------------------------------------
461      !
462      itaille = jpi * jpj
463      !
464#if defined key_mpp_mpi
465      CALL mpi_scatter( pio, itaille, mpi_double_precision, ptab, itaille     ,   &
466         &                            mpi_double_precision, kp  , mpi_comm_oce, ierror )
467#else
468      ptab(:,:) = pio(:,:,1)
469#endif
470      !
471   END SUBROUTINE mppscatter
472
473   
474   SUBROUTINE mpp_delay_sum( cdname, cdelay, y_in, pout, ldlast, kcom )
475     !!----------------------------------------------------------------------
476      !!                   ***  routine mpp_delay_sum  ***
477      !!
478      !! ** Purpose :   performed delayed mpp_sum, the result is received on next call
479      !!
480      !!----------------------------------------------------------------------
481      CHARACTER(len=*), INTENT(in   )               ::   cdname  ! name of the calling subroutine
482      CHARACTER(len=*), INTENT(in   )               ::   cdelay  ! name (used as id) of the delayed operation
483      COMPLEX(dp),      INTENT(in   ), DIMENSION(:) ::   y_in
484      REAL(wp),         INTENT(  out), DIMENSION(:) ::   pout
485      LOGICAL,          INTENT(in   )               ::   ldlast  ! true if this is the last time we call this routine
486      INTEGER,          INTENT(in   ), OPTIONAL     ::   kcom
487      !!
488      INTEGER ::   ji, isz
489      INTEGER ::   idvar
490      INTEGER ::   ierr, ilocalcomm
491      COMPLEX(dp), ALLOCATABLE, DIMENSION(:) ::   ytmp
492      !!----------------------------------------------------------------------
493#if defined key_mpp_mpi
494      ilocalcomm = mpi_comm_oce
495      IF( PRESENT(kcom) )   ilocalcomm = kcom
496
497      isz = SIZE(y_in)
498     
499      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ld_dlg = .TRUE. )
500
501      idvar = -1
502      DO ji = 1, nbdelay
503         IF( TRIM(cdelay) == TRIM(c_delaylist(ji)) ) idvar = ji
504      END DO
505      IF ( idvar == -1 )   CALL ctl_stop( 'STOP',' mpp_delay_sum : please add a new delayed exchange for '//TRIM(cdname) )
506
507      IF ( ndelayid(idvar) == 0 ) THEN         ! first call    with restart: %z1d defined in iom_delay_rst
508         !                                       --------------------------
509         IF ( SIZE(todelay(idvar)%z1d) /= isz ) THEN                  ! Check dimension coherence
510            IF(lwp) WRITE(numout,*) ' WARNING: the nb of delayed variables in restart file is not the model one'
511            DEALLOCATE(todelay(idvar)%z1d)
512            ndelayid(idvar) = -1                                      ! do as if we had no restart
513         ELSE
514            ALLOCATE(todelay(idvar)%y1d(isz))
515            todelay(idvar)%y1d(:) = CMPLX(todelay(idvar)%z1d(:), 0., wp)   ! create %y1d, complex variable needed by mpi_sumdd
516         END IF
517      ENDIF
518     
519      IF( ndelayid(idvar) == -1 ) THEN         ! first call without restart: define %y1d and %z1d from y_in with blocking allreduce
520         !                                       --------------------------
521         ALLOCATE(todelay(idvar)%z1d(isz), todelay(idvar)%y1d(isz))   ! allocate also %z1d as used for the restart
522         CALL mpi_allreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ierr )   ! get %y1d
523         todelay(idvar)%z1d(:) = REAL(todelay(idvar)%y1d(:), wp)      ! define %z1d from %y1d
524      ENDIF
525
526      IF( ndelayid(idvar) > 0 )   CALL mpp_delay_rcv( idvar )         ! make sure %z1d is received
527
528      ! send back pout from todelay(idvar)%z1d defined at previous call
529      pout(:) = todelay(idvar)%z1d(:)
530
531      ! send y_in into todelay(idvar)%y1d with a non-blocking communication
532# if defined key_mpi2
533      IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
534      CALL  mpi_allreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ierr )
535      ndelayid(idvar) = 1
536      IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
537# else
538      CALL mpi_iallreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ndelayid(idvar), ierr )
539# endif
540#else
541      pout(:) = REAL(y_in(:), wp)
542#endif
543
544   END SUBROUTINE mpp_delay_sum
545
546   
547   SUBROUTINE mpp_delay_max( cdname, cdelay, p_in, pout, ldlast, kcom )
548      !!----------------------------------------------------------------------
549      !!                   ***  routine mpp_delay_max  ***
550      !!
551      !! ** Purpose :   performed delayed mpp_max, the result is received on next call
552      !!
553      !!----------------------------------------------------------------------
554      CHARACTER(len=*), INTENT(in   )                 ::   cdname  ! name of the calling subroutine
555      CHARACTER(len=*), INTENT(in   )                 ::   cdelay  ! name (used as id) of the delayed operation
556      REAL(wp),         INTENT(in   ), DIMENSION(:)   ::   p_in    !
557      REAL(wp),         INTENT(  out), DIMENSION(:)   ::   pout    !
558      LOGICAL,          INTENT(in   )                 ::   ldlast  ! true if this is the last time we call this routine
559      INTEGER,          INTENT(in   ), OPTIONAL       ::   kcom
560      !!
561      INTEGER ::   ji, isz
562      INTEGER ::   idvar
563      INTEGER ::   ierr, ilocalcomm
564      INTEGER ::   MPI_TYPE
565      !!----------------------------------------------------------------------
566     
567#if defined key_mpp_mpi
568      if( wp == dp ) then
569         MPI_TYPE = MPI_DOUBLE_PRECISION
570      else if ( wp == sp ) then
571         MPI_TYPE = MPI_REAL
572      else
573        CALL ctl_stop( "Error defining type, wp is neither dp nor sp" )
574   
575      end if
576
577      ilocalcomm = mpi_comm_oce
578      IF( PRESENT(kcom) )   ilocalcomm = kcom
579
580      isz = SIZE(p_in)
581
582      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ld_dlg = .TRUE. )
583
584      idvar = -1
585      DO ji = 1, nbdelay
586         IF( TRIM(cdelay) == TRIM(c_delaylist(ji)) ) idvar = ji
587      END DO
588      IF ( idvar == -1 )   CALL ctl_stop( 'STOP',' mpp_delay_max : please add a new delayed exchange for '//TRIM(cdname) )
589
590      IF ( ndelayid(idvar) == 0 ) THEN         ! first call    with restart: %z1d defined in iom_delay_rst
591         !                                       --------------------------
592         IF ( SIZE(todelay(idvar)%z1d) /= isz ) THEN                  ! Check dimension coherence
593            IF(lwp) WRITE(numout,*) ' WARNING: the nb of delayed variables in restart file is not the model one'
594            DEALLOCATE(todelay(idvar)%z1d)
595            ndelayid(idvar) = -1                                      ! do as if we had no restart
596         END IF
597      ENDIF
598
599      IF( ndelayid(idvar) == -1 ) THEN         ! first call without restart: define %z1d from p_in with a blocking allreduce
600         !                                       --------------------------
601         ALLOCATE(todelay(idvar)%z1d(isz))
602         CALL mpi_allreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ierr )   ! get %z1d
603      ENDIF
604
605      IF( ndelayid(idvar) > 0 )   CALL mpp_delay_rcv( idvar )         ! make sure %z1d is received
606
607      ! send back pout from todelay(idvar)%z1d defined at previous call
608      pout(:) = todelay(idvar)%z1d(:)
609
610      ! send p_in into todelay(idvar)%z1d with a non-blocking communication
611# if defined key_mpi2
612      IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
613      CALL  mpi_allreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_TYPE, mpi_max, ilocalcomm, ndelayid(idvar), ierr )
614      IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
615# else
616      CALL mpi_iallreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_TYPE, mpi_max, ilocalcomm, ndelayid(idvar), ierr )
617# endif
618#else
619      pout(:) = p_in(:)
620#endif
621
622   END SUBROUTINE mpp_delay_max
623
624   
625   SUBROUTINE mpp_delay_rcv( kid )
626      !!----------------------------------------------------------------------
627      !!                   ***  routine mpp_delay_rcv  ***
628      !!
629      !! ** Purpose :  force barrier for delayed mpp (needed for restart)
630      !!
631      !!----------------------------------------------------------------------
632      INTEGER,INTENT(in   )      ::  kid 
633      INTEGER ::   ierr
634      !!----------------------------------------------------------------------
635#if defined key_mpp_mpi
636      IF( ndelayid(kid) /= -2 ) THEN 
637#if ! defined key_mpi2
638         IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
639         CALL mpi_wait( ndelayid(kid), MPI_STATUS_IGNORE, ierr )                        ! make sure todelay(kid) is received
640         IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
641#endif
642         IF( ASSOCIATED(todelay(kid)%y1d) )   todelay(kid)%z1d(:) = REAL(todelay(kid)%y1d(:), wp)  ! define %z1d from %y1d
643         ndelayid(kid) = -2   ! add flag to know that mpi_wait was already called on kid
644      ENDIF
645#endif
646   END SUBROUTINE mpp_delay_rcv
647
648   SUBROUTINE mpp_bcast_nml( cdnambuff , kleng )
649      CHARACTER(LEN=:)    , ALLOCATABLE, INTENT(INOUT) :: cdnambuff
650      INTEGER                          , INTENT(INOUT) :: kleng
651      !!----------------------------------------------------------------------
652      !!                  ***  routine mpp_bcast_nml  ***
653      !!
654      !! ** Purpose :   broadcast namelist character buffer
655      !!
656      !!----------------------------------------------------------------------
657      !!
658      INTEGER ::   iflag
659      !!----------------------------------------------------------------------
660      !
661#if defined key_mpp_mpi
662      call MPI_BCAST(kleng, 1, MPI_INT, 0, mpi_comm_oce, iflag)
663      call MPI_BARRIER(mpi_comm_oce, iflag)
664!$AGRIF_DO_NOT_TREAT
665      IF ( .NOT. ALLOCATED(cdnambuff) ) ALLOCATE( CHARACTER(LEN=kleng) :: cdnambuff )
666!$AGRIF_END_DO_NOT_TREAT
667      call MPI_BCAST(cdnambuff, kleng, MPI_CHARACTER, 0, mpi_comm_oce, iflag)
668      call MPI_BARRIER(mpi_comm_oce, iflag)
669#endif
670      !
671   END SUBROUTINE mpp_bcast_nml
672
673   
674   !!----------------------------------------------------------------------
675   !!    ***  mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real  ***
676   !!   
677   !!----------------------------------------------------------------------
678   !!
679#  define OPERATION_MAX
680#  define INTEGER_TYPE
681#  define DIM_0d
682#     define ROUTINE_ALLREDUCE           mppmax_int
683#     include "mpp_allreduce_generic.h90"
684#     undef ROUTINE_ALLREDUCE
685#  undef DIM_0d
686#  define DIM_1d
687#     define ROUTINE_ALLREDUCE           mppmax_a_int
688#     include "mpp_allreduce_generic.h90"
689#     undef ROUTINE_ALLREDUCE
690#  undef DIM_1d
691#  undef INTEGER_TYPE
692!
693   !!
694   !!   ----   SINGLE PRECISION VERSIONS
695   !!
696#  define SINGLE_PRECISION
697#  define REAL_TYPE
698#  define DIM_0d
699#     define ROUTINE_ALLREDUCE           mppmax_real_sp
700#     include "mpp_allreduce_generic.h90"
701#     undef ROUTINE_ALLREDUCE
702#  undef DIM_0d
703#  define DIM_1d
704#     define ROUTINE_ALLREDUCE           mppmax_a_real_sp
705#     include "mpp_allreduce_generic.h90"
706#     undef ROUTINE_ALLREDUCE
707#  undef DIM_1d
708#  undef SINGLE_PRECISION
709   !!
710   !!
711   !!   ----   DOUBLE PRECISION VERSIONS
712   !!
713!
714#  define DIM_0d
715#     define ROUTINE_ALLREDUCE           mppmax_real_dp
716#     include "mpp_allreduce_generic.h90"
717#     undef ROUTINE_ALLREDUCE
718#  undef DIM_0d
719#  define DIM_1d
720#     define ROUTINE_ALLREDUCE           mppmax_a_real_dp
721#     include "mpp_allreduce_generic.h90"
722#     undef ROUTINE_ALLREDUCE
723#  undef DIM_1d
724#  undef REAL_TYPE
725#  undef OPERATION_MAX
726   !!----------------------------------------------------------------------
727   !!    ***  mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real  ***
728   !!   
729   !!----------------------------------------------------------------------
730   !!
731#  define OPERATION_MIN
732#  define INTEGER_TYPE
733#  define DIM_0d
734#     define ROUTINE_ALLREDUCE           mppmin_int
735#     include "mpp_allreduce_generic.h90"
736#     undef ROUTINE_ALLREDUCE
737#  undef DIM_0d
738#  define DIM_1d
739#     define ROUTINE_ALLREDUCE           mppmin_a_int
740#     include "mpp_allreduce_generic.h90"
741#     undef ROUTINE_ALLREDUCE
742#  undef DIM_1d
743#  undef INTEGER_TYPE
744!
745   !!
746   !!   ----   SINGLE PRECISION VERSIONS
747   !!
748#  define SINGLE_PRECISION
749#  define REAL_TYPE
750#  define DIM_0d
751#     define ROUTINE_ALLREDUCE           mppmin_real_sp
752#     include "mpp_allreduce_generic.h90"
753#     undef ROUTINE_ALLREDUCE
754#  undef DIM_0d
755#  define DIM_1d
756#     define ROUTINE_ALLREDUCE           mppmin_a_real_sp
757#     include "mpp_allreduce_generic.h90"
758#     undef ROUTINE_ALLREDUCE
759#  undef DIM_1d
760#  undef SINGLE_PRECISION
761   !!
762   !!   ----   DOUBLE PRECISION VERSIONS
763   !!
764
765#  define DIM_0d
766#     define ROUTINE_ALLREDUCE           mppmin_real_dp
767#     include "mpp_allreduce_generic.h90"
768#     undef ROUTINE_ALLREDUCE
769#  undef DIM_0d
770#  define DIM_1d
771#     define ROUTINE_ALLREDUCE           mppmin_a_real_dp
772#     include "mpp_allreduce_generic.h90"
773#     undef ROUTINE_ALLREDUCE
774#  undef DIM_1d
775#  undef REAL_TYPE
776#  undef OPERATION_MIN
777
778   !!----------------------------------------------------------------------
779   !!    ***  mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real  ***
780   !!   
781   !!   Global sum of 1D array or a variable (integer, real or complex)
782   !!----------------------------------------------------------------------
783   !!
784#  define OPERATION_SUM
785#  define INTEGER_TYPE
786#  define DIM_0d
787#     define ROUTINE_ALLREDUCE           mppsum_int
788#     include "mpp_allreduce_generic.h90"
789#     undef ROUTINE_ALLREDUCE
790#  undef DIM_0d
791#  define DIM_1d
792#     define ROUTINE_ALLREDUCE           mppsum_a_int
793#     include "mpp_allreduce_generic.h90"
794#     undef ROUTINE_ALLREDUCE
795#  undef DIM_1d
796#  undef INTEGER_TYPE
797
798   !!
799   !!   ----   SINGLE PRECISION VERSIONS
800   !!
801#  define OPERATION_SUM
802#  define SINGLE_PRECISION
803#  define REAL_TYPE
804#  define DIM_0d
805#     define ROUTINE_ALLREDUCE           mppsum_real_sp
806#     include "mpp_allreduce_generic.h90"
807#     undef ROUTINE_ALLREDUCE
808#  undef DIM_0d
809#  define DIM_1d
810#     define ROUTINE_ALLREDUCE           mppsum_a_real_sp
811#     include "mpp_allreduce_generic.h90"
812#     undef ROUTINE_ALLREDUCE
813#  undef DIM_1d
814#  undef REAL_TYPE
815#  undef OPERATION_SUM
816
817#  undef SINGLE_PRECISION
818
819   !!
820   !!   ----   DOUBLE PRECISION VERSIONS
821   !!
822#  define OPERATION_SUM
823#  define REAL_TYPE
824#  define DIM_0d
825#     define ROUTINE_ALLREDUCE           mppsum_real_dp
826#     include "mpp_allreduce_generic.h90"
827#     undef ROUTINE_ALLREDUCE
828#  undef DIM_0d
829#  define DIM_1d
830#     define ROUTINE_ALLREDUCE           mppsum_a_real_dp
831#     include "mpp_allreduce_generic.h90"
832#     undef ROUTINE_ALLREDUCE
833#  undef DIM_1d
834#  undef REAL_TYPE
835#  undef OPERATION_SUM
836
837#  define OPERATION_SUM_DD
838#  define COMPLEX_TYPE
839#  define DIM_0d
840#     define ROUTINE_ALLREDUCE           mppsum_realdd
841#     include "mpp_allreduce_generic.h90"
842#     undef ROUTINE_ALLREDUCE
843#  undef DIM_0d
844#  define DIM_1d
845#     define ROUTINE_ALLREDUCE           mppsum_a_realdd
846#     include "mpp_allreduce_generic.h90"
847#     undef ROUTINE_ALLREDUCE
848#  undef DIM_1d
849#  undef COMPLEX_TYPE
850#  undef OPERATION_SUM_DD
851
852   !!----------------------------------------------------------------------
853   !!    ***  mpp_minloc2d, mpp_minloc3d, mpp_maxloc2d, mpp_maxloc3d
854   !!   
855   !!----------------------------------------------------------------------
856   !!
857   !!
858   !!   ----   SINGLE PRECISION VERSIONS
859   !!
860#  define SINGLE_PRECISION
861#  define OPERATION_MINLOC
862#  define DIM_2d
863#     define ROUTINE_LOC           mpp_minloc2d_sp
864#     include "mpp_loc_generic.h90"
865#     undef ROUTINE_LOC
866#  undef DIM_2d
867#  define DIM_3d
868#     define ROUTINE_LOC           mpp_minloc3d_sp
869#     include "mpp_loc_generic.h90"
870#     undef ROUTINE_LOC
871#  undef DIM_3d
872#  undef OPERATION_MINLOC
873
874#  define OPERATION_MAXLOC
875#  define DIM_2d
876#     define ROUTINE_LOC           mpp_maxloc2d_sp
877#     include "mpp_loc_generic.h90"
878#     undef ROUTINE_LOC
879#  undef DIM_2d
880#  define DIM_3d
881#     define ROUTINE_LOC           mpp_maxloc3d_sp
882#     include "mpp_loc_generic.h90"
883#     undef ROUTINE_LOC
884#  undef DIM_3d
885#  undef OPERATION_MAXLOC
886#  undef SINGLE_PRECISION
887   !!
888   !!   ----   DOUBLE PRECISION VERSIONS
889   !!
890#  define OPERATION_MINLOC
891#  define DIM_2d
892#     define ROUTINE_LOC           mpp_minloc2d_dp
893#     include "mpp_loc_generic.h90"
894#     undef ROUTINE_LOC
895#  undef DIM_2d
896#  define DIM_3d
897#     define ROUTINE_LOC           mpp_minloc3d_dp
898#     include "mpp_loc_generic.h90"
899#     undef ROUTINE_LOC
900#  undef DIM_3d
901#  undef OPERATION_MINLOC
902
903#  define OPERATION_MAXLOC
904#  define DIM_2d
905#     define ROUTINE_LOC           mpp_maxloc2d_dp
906#     include "mpp_loc_generic.h90"
907#     undef ROUTINE_LOC
908#  undef DIM_2d
909#  define DIM_3d
910#     define ROUTINE_LOC           mpp_maxloc3d_dp
911#     include "mpp_loc_generic.h90"
912#     undef ROUTINE_LOC
913#  undef DIM_3d
914#  undef OPERATION_MAXLOC
915
916
917   SUBROUTINE mppsync()
918      !!----------------------------------------------------------------------
919      !!                  ***  routine mppsync  ***
920      !!
921      !! ** Purpose :   Massively parallel processors, synchroneous
922      !!
923      !!-----------------------------------------------------------------------
924      INTEGER :: ierror
925      !!-----------------------------------------------------------------------
926      !
927#if defined key_mpp_mpi
928      CALL mpi_barrier( mpi_comm_oce, ierror )
929#endif
930      !
931   END SUBROUTINE mppsync
932
933
934   SUBROUTINE mppstop( ld_abort ) 
935      !!----------------------------------------------------------------------
936      !!                  ***  routine mppstop  ***
937      !!
938      !! ** purpose :   Stop massively parallel processors method
939      !!
940      !!----------------------------------------------------------------------
941      LOGICAL, OPTIONAL, INTENT(in) :: ld_abort    ! source process number
942      LOGICAL ::   ll_abort
943      INTEGER ::   info
944      !!----------------------------------------------------------------------
945      ll_abort = .FALSE.
946      IF( PRESENT(ld_abort) ) ll_abort = ld_abort
947      !
948#if defined key_mpp_mpi
949      IF(ll_abort) THEN
950         CALL mpi_abort( MPI_COMM_WORLD )
951      ELSE
952         CALL mppsync
953         CALL mpi_finalize( info )
954      ENDIF
955#endif
956      IF( ll_abort ) STOP 123
957      !
958   END SUBROUTINE mppstop
959
960
961   SUBROUTINE mpp_comm_free( kcom )
962      !!----------------------------------------------------------------------
963      INTEGER, INTENT(in) ::   kcom
964      !!
965      INTEGER :: ierr
966      !!----------------------------------------------------------------------
967      !
968#if defined key_mpp_mpi
969      CALL MPI_COMM_FREE(kcom, ierr)
970#endif
971      !
972   END SUBROUTINE mpp_comm_free
973
974
975   SUBROUTINE mpp_ini_znl( kumout )
976      !!----------------------------------------------------------------------
977      !!               ***  routine mpp_ini_znl  ***
978      !!
979      !! ** Purpose :   Initialize special communicator for computing zonal sum
980      !!
981      !! ** Method  : - Look for processors in the same row
982      !!              - Put their number in nrank_znl
983      !!              - Create group for the znl processors
984      !!              - Create a communicator for znl processors
985      !!              - Determine if processor should write znl files
986      !!
987      !! ** output
988      !!      ndim_rank_znl = number of processors on the same row
989      !!      ngrp_znl = group ID for the znl processors
990      !!      ncomm_znl = communicator for the ice procs.
991      !!      n_znl_root = number (in the world) of proc 0 in the ice comm.
992      !!
993      !!----------------------------------------------------------------------
994      INTEGER, INTENT(in) ::   kumout   ! ocean.output logical units
995      !
996      INTEGER :: jproc      ! dummy loop integer
997      INTEGER :: ierr, ii   ! local integer
998      INTEGER, ALLOCATABLE, DIMENSION(:) ::   kwork
999      !!----------------------------------------------------------------------
1000#if defined key_mpp_mpi
1001      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_world     : ', ngrp_world
1002      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_world : ', mpi_comm_world
1003      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_oce   : ', mpi_comm_oce
1004      !
1005      ALLOCATE( kwork(jpnij), STAT=ierr )
1006      IF( ierr /= 0 ) CALL ctl_stop( 'STOP', 'mpp_ini_znl : failed to allocate 1D array of length jpnij')
1007
1008      IF( jpnj == 1 ) THEN
1009         ngrp_znl  = ngrp_world
1010         ncomm_znl = mpi_comm_oce
1011      ELSE
1012         !
1013         CALL MPI_ALLGATHER ( njmpp, 1, mpi_integer, kwork, 1, mpi_integer, mpi_comm_oce, ierr )
1014         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - kwork pour njmpp : ', kwork
1015         !-$$        CALL flush(numout)
1016         !
1017         ! Count number of processors on the same row
1018         ndim_rank_znl = 0
1019         DO jproc=1,jpnij
1020            IF ( kwork(jproc) == njmpp ) THEN
1021               ndim_rank_znl = ndim_rank_znl + 1
1022            ENDIF
1023         END DO
1024         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ndim_rank_znl : ', ndim_rank_znl
1025         !-$$        CALL flush(numout)
1026         ! Allocate the right size to nrank_znl
1027         IF (ALLOCATED (nrank_znl)) DEALLOCATE(nrank_znl)
1028         ALLOCATE(nrank_znl(ndim_rank_znl))
1029         ii = 0
1030         nrank_znl (:) = 0
1031         DO jproc=1,jpnij
1032            IF ( kwork(jproc) == njmpp) THEN
1033               ii = ii + 1
1034               nrank_znl(ii) = jproc -1
1035            ENDIF
1036         END DO
1037         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - nrank_znl : ', nrank_znl
1038         !-$$        CALL flush(numout)
1039
1040         ! Create the opa group
1041         CALL MPI_COMM_GROUP(mpi_comm_oce,ngrp_opa,ierr)
1042         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_opa : ', ngrp_opa
1043         !-$$        CALL flush(numout)
1044
1045         ! Create the znl group from the opa group
1046         CALL MPI_GROUP_INCL  ( ngrp_opa, ndim_rank_znl, nrank_znl, ngrp_znl, ierr )
1047         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_znl ', ngrp_znl
1048         !-$$        CALL flush(numout)
1049
1050         ! Create the znl communicator from the opa communicator, ie the pool of procs in the same row
1051         CALL MPI_COMM_CREATE ( mpi_comm_oce, ngrp_znl, ncomm_znl, ierr )
1052         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ncomm_znl ', ncomm_znl
1053         !-$$        CALL flush(numout)
1054         !
1055      END IF
1056
1057      ! Determines if processor if the first (starting from i=1) on the row
1058      IF ( jpni == 1 ) THEN
1059         l_znl_root = .TRUE.
1060      ELSE
1061         l_znl_root = .FALSE.
1062         kwork (1) = nimpp
1063         CALL mpp_min ( 'lib_mpp', kwork(1), kcom = ncomm_znl)
1064         IF ( nimpp == kwork(1)) l_znl_root = .TRUE.
1065      END IF
1066
1067      DEALLOCATE(kwork)
1068#endif
1069
1070   END SUBROUTINE mpp_ini_znl
1071
1072   SUBROUTINE mpp_ini_nc
1073      !!----------------------------------------------------------------------
1074      !!               ***  routine mpp_ini_nc  ***
1075      !!
1076      !! ** Purpose :   Initialize special communicator for MPI3 neighbourhood
1077      !!                collectives
1078      !!
1079      !! ** Method  : - Create a graph communicator starting from the processes   
1080      !!                distribution along i and j directions
1081      !
1082      !! ** output
1083      !!         mpi_nc_com = MPI3 neighbourhood collectives communicator
1084      !!
1085      !!----------------------------------------------------------------------
1086      INTEGER, DIMENSION(:), ALLOCATABLE :: ineigh
1087      INTEGER :: ideg, icont
1088      INTEGER :: iinfo, ierr
1089      LOGICAL, PARAMETER :: ireord = .FALSE.
1090
1091#if defined key_mpp_mpi
1092      ideg = 0
1093      icont = 0
1094
1095      IF (nbondi .eq. 1) THEN
1096         ideg = ideg + 1
1097      ELSEIF (nbondi .eq. -1) THEN
1098         ideg = ideg + 1
1099      ELSEIF (nbondi .eq. 0) THEN
1100         ideg = ideg + 2
1101      ENDIF
1102
1103      IF (nbondj .eq. 1) THEN
1104         ideg = ideg + 1
1105      ELSEIF (nbondj .eq. -1) THEN
1106         ideg = ideg + 1
1107      ELSEIF (nbondj .eq. 0) THEN
1108         ideg = ideg + 2
1109      ENDIF
1110
1111      ALLOCATE(ineigh(ideg))
1112
1113      IF (nbondi .eq. 1) THEN
1114         icont = icont + 1
1115         ineigh(icont) = nowe
1116      ELSEIF (nbondi .eq. -1) THEN
1117         icont = icont + 1
1118         ineigh(icont) = noea
1119      ELSEIF (nbondi .eq. 0) THEN
1120         icont = icont + 1
1121         ineigh(icont) = nowe
1122         icont = icont + 1
1123         ineigh(icont) = noea
1124      ENDIF
1125
1126      IF (nbondj .eq. 1) THEN
1127         icont = icont + 1
1128         ineigh(icont) = noso
1129      ELSEIF (nbondj .eq. -1) THEN
1130         icont = icont + 1
1131         ineigh(icont) = nono
1132      ELSEIF (nbondj .eq. 0) THEN
1133         icont = icont + 1
1134         ineigh(icont) = noso
1135         icont = icont + 1
1136         ineigh(icont) = nono
1137      ENDIF
1138
1139      CALL MPI_Dist_graph_create_adjacent(mpi_comm_oce, ideg, ineigh, MPI_UNWEIGHTED, ideg, ineigh, MPI_UNWEIGHTED, MPI_INFO_NULL, ireord, mpi_nc_com, ierr)
1140      DEALLOCATE (ineigh)
1141#endif
1142   END SUBROUTINE mpp_ini_nc
1143
1144
1145
1146   SUBROUTINE mpp_ini_north
1147      !!----------------------------------------------------------------------
1148      !!               ***  routine mpp_ini_north  ***
1149      !!
1150      !! ** Purpose :   Initialize special communicator for north folding
1151      !!      condition together with global variables needed in the mpp folding
1152      !!
1153      !! ** Method  : - Look for northern processors
1154      !!              - Put their number in nrank_north
1155      !!              - Create groups for the world processors and the north processors
1156      !!              - Create a communicator for northern processors
1157      !!
1158      !! ** output
1159      !!      njmppmax = njmpp for northern procs
1160      !!      ndim_rank_north = number of processors in the northern line
1161      !!      nrank_north (ndim_rank_north) = number  of the northern procs.
1162      !!      ngrp_world = group ID for the world processors
1163      !!      ngrp_north = group ID for the northern processors
1164      !!      ncomm_north = communicator for the northern procs.
1165      !!      north_root = number (in the world) of proc 0 in the northern comm.
1166      !!
1167      !!----------------------------------------------------------------------
1168      INTEGER ::   ierr
1169      INTEGER ::   jjproc
1170      INTEGER ::   ii, ji
1171      !!----------------------------------------------------------------------
1172      !
1173#if defined key_mpp_mpi
1174      njmppmax = MAXVAL( njmppt )
1175      !
1176      ! Look for how many procs on the northern boundary
1177      ndim_rank_north = 0
1178      DO jjproc = 1, jpni
1179         IF( nfproc(jjproc) /= -1 )   ndim_rank_north = ndim_rank_north + 1
1180      END DO
1181      !
1182      ! Allocate the right size to nrank_north
1183      IF (ALLOCATED (nrank_north)) DEALLOCATE(nrank_north)
1184      ALLOCATE( nrank_north(ndim_rank_north) )
1185
1186      ! Fill the nrank_north array with proc. number of northern procs.
1187      ! Note : the rank start at 0 in MPI
1188      ii = 0
1189      DO ji = 1, jpni
1190         IF ( nfproc(ji) /= -1   ) THEN
1191            ii=ii+1
1192            nrank_north(ii)=nfproc(ji)
1193         END IF
1194      END DO
1195      !
1196      ! create the world group
1197      CALL MPI_COMM_GROUP( mpi_comm_oce, ngrp_world, ierr )
1198      !
1199      ! Create the North group from the world group
1200      CALL MPI_GROUP_INCL( ngrp_world, ndim_rank_north, nrank_north, ngrp_north, ierr )
1201      !
1202      ! Create the North communicator , ie the pool of procs in the north group
1203      CALL MPI_COMM_CREATE( mpi_comm_oce, ngrp_north, ncomm_north, ierr )
1204      !
1205#endif
1206   END SUBROUTINE mpp_ini_north
1207
1208
1209   SUBROUTINE DDPDD_MPI( ydda, yddb, ilen, itype )
1210      !!---------------------------------------------------------------------
1211      !!   Routine DDPDD_MPI: used by reduction operator MPI_SUMDD
1212      !!
1213      !!   Modification of original codes written by David H. Bailey
1214      !!   This subroutine computes yddb(i) = ydda(i)+yddb(i)
1215      !!---------------------------------------------------------------------
1216      INTEGER                     , INTENT(in)    ::   ilen, itype
1217      COMPLEX(dp), DIMENSION(ilen), INTENT(in)    ::   ydda
1218      COMPLEX(dp), DIMENSION(ilen), INTENT(inout) ::   yddb
1219      !
1220      REAL(dp) :: zerr, zt1, zt2    ! local work variables
1221      INTEGER  :: ji, ztmp           ! local scalar
1222      !!---------------------------------------------------------------------
1223      !
1224      ztmp = itype   ! avoid compilation warning
1225      !
1226      DO ji=1,ilen
1227      ! Compute ydda + yddb using Knuth's trick.
1228         zt1  = real(ydda(ji)) + real(yddb(ji))
1229         zerr = zt1 - real(ydda(ji))
1230         zt2  = ((real(yddb(ji)) - zerr) + (real(ydda(ji)) - (zt1 - zerr))) &
1231                + aimag(ydda(ji)) + aimag(yddb(ji))
1232
1233         ! The result is zt1 + zt2, after normalization.
1234         yddb(ji) = cmplx ( zt1 + zt2, zt2 - ((zt1 + zt2) - zt1),wp )
1235      END DO
1236      !
1237   END SUBROUTINE DDPDD_MPI
1238
1239
1240   SUBROUTINE mpp_report( cdname, kpk, kpl, kpf, ld_lbc, ld_glb, ld_dlg )
1241      !!----------------------------------------------------------------------
1242      !!                  ***  routine mpp_report  ***
1243      !!
1244      !! ** Purpose :   report use of mpp routines per time-setp
1245      !!
1246      !!----------------------------------------------------------------------
1247      CHARACTER(len=*),           INTENT(in   ) ::   cdname      ! name of the calling subroutine
1248      INTEGER         , OPTIONAL, INTENT(in   ) ::   kpk, kpl, kpf
1249      LOGICAL         , OPTIONAL, INTENT(in   ) ::   ld_lbc, ld_glb, ld_dlg
1250      !!
1251      CHARACTER(len=128)                        ::   ccountname  ! name of a subroutine to count communications
1252      LOGICAL ::   ll_lbc, ll_glb, ll_dlg
1253      INTEGER ::    ji,  jj,  jk,  jh, jf, jcount   ! dummy loop indices
1254      !!----------------------------------------------------------------------
1255#if defined key_mpp_mpi
1256      !
1257      ll_lbc = .FALSE.
1258      IF( PRESENT(ld_lbc) ) ll_lbc = ld_lbc
1259      ll_glb = .FALSE.
1260      IF( PRESENT(ld_glb) ) ll_glb = ld_glb
1261      ll_dlg = .FALSE.
1262      IF( PRESENT(ld_dlg) ) ll_dlg = ld_dlg
1263      !
1264      ! find the smallest common frequency: default = frequency product, if multiple, choose the larger of the 2 frequency
1265      ncom_freq = ncom_fsbc
1266      !
1267      IF ( ncom_stp == nit000+ncom_freq ) THEN   ! avoid to count extra communications in potential initializations at nit000
1268         IF( ll_lbc ) THEN
1269            IF( .NOT. ALLOCATED(ncomm_sequence) ) ALLOCATE( ncomm_sequence(ncom_rec_max,2) )
1270            IF( .NOT. ALLOCATED(    crname_lbc) ) ALLOCATE(     crname_lbc(ncom_rec_max  ) )
1271            n_sequence_lbc = n_sequence_lbc + 1
1272            IF( n_sequence_lbc > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1273            crname_lbc(n_sequence_lbc) = cdname     ! keep the name of the calling routine
1274            ncomm_sequence(n_sequence_lbc,1) = kpk*kpl   ! size of 3rd and 4th dimensions
1275            ncomm_sequence(n_sequence_lbc,2) = kpf       ! number of arrays to be treated (multi)
1276         ENDIF
1277         IF( ll_glb ) THEN
1278            IF( .NOT. ALLOCATED(crname_glb) ) ALLOCATE( crname_glb(ncom_rec_max) )
1279            n_sequence_glb = n_sequence_glb + 1
1280            IF( n_sequence_glb > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1281            crname_glb(n_sequence_glb) = cdname     ! keep the name of the calling routine
1282         ENDIF
1283         IF( ll_dlg ) THEN
1284            IF( .NOT. ALLOCATED(crname_dlg) ) ALLOCATE( crname_dlg(ncom_rec_max) )
1285            n_sequence_dlg = n_sequence_dlg + 1
1286            IF( n_sequence_dlg > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1287            crname_dlg(n_sequence_dlg) = cdname     ! keep the name of the calling routine
1288         ENDIF
1289      ELSE IF ( ncom_stp == nit000+2*ncom_freq ) THEN
1290         CALL ctl_opn( numcom, 'communication_report.txt', 'REPLACE', 'FORMATTED', 'SEQUENTIAL', -1, numout, .FALSE., narea )
1291         WRITE(numcom,*) ' '
1292         WRITE(numcom,*) ' ------------------------------------------------------------'
1293         WRITE(numcom,*) ' Communication pattern report (second oce+sbc+top time step):'
1294         WRITE(numcom,*) ' ------------------------------------------------------------'
1295         WRITE(numcom,*) ' '
1296         WRITE(numcom,'(A,I4)') ' Exchanged halos : ', n_sequence_lbc
1297         jj = 0; jk = 0; jf = 0; jh = 0
1298         DO ji = 1, n_sequence_lbc
1299            IF ( ncomm_sequence(ji,1) .GT. 1 ) jk = jk + 1
1300            IF ( ncomm_sequence(ji,2) .GT. 1 ) jf = jf + 1
1301            IF ( ncomm_sequence(ji,1) .GT. 1 .AND. ncomm_sequence(ji,2) .GT. 1 ) jj = jj + 1
1302            jh = MAX (jh, ncomm_sequence(ji,1)*ncomm_sequence(ji,2))
1303         END DO
1304         WRITE(numcom,'(A,I3)') ' 3D Exchanged halos : ', jk
1305         WRITE(numcom,'(A,I3)') ' Multi arrays exchanged halos : ', jf
1306         WRITE(numcom,'(A,I3)') '   from which 3D : ', jj
1307         WRITE(numcom,'(A,I10)') ' Array max size : ', jh*jpi*jpj
1308         WRITE(numcom,*) ' '
1309         WRITE(numcom,*) ' lbc_lnk called'
1310         DO ji = 1, n_sequence_lbc - 1
1311            IF ( crname_lbc(ji) /= 'already counted' ) THEN
1312               ccountname = crname_lbc(ji)
1313               crname_lbc(ji) = 'already counted'
1314               jcount = 1
1315               DO jj = ji + 1, n_sequence_lbc
1316                  IF ( ccountname ==  crname_lbc(jj) ) THEN
1317                     jcount = jcount + 1
1318                     crname_lbc(jj) = 'already counted'
1319                  END IF
1320               END DO
1321               WRITE(numcom,'(A, I4, A, A)') ' - ', jcount,' times by subroutine ', TRIM(ccountname)
1322            END IF
1323         END DO
1324         IF ( crname_lbc(n_sequence_lbc) /= 'already counted' ) THEN
1325            WRITE(numcom,'(A, I4, A, A)') ' - ', 1,' times by subroutine ', TRIM(crname_lbc(ncom_rec_max))
1326         END IF
1327         WRITE(numcom,*) ' '
1328         IF ( n_sequence_glb > 0 ) THEN
1329            WRITE(numcom,'(A,I4)') ' Global communications : ', n_sequence_glb
1330            jj = 1
1331            DO ji = 2, n_sequence_glb
1332               IF( crname_glb(ji-1) /= crname_glb(ji) ) THEN
1333                  WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(ji-1))
1334                  jj = 0
1335               END IF
1336               jj = jj + 1 
1337            END DO
1338            WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(n_sequence_glb))
1339            DEALLOCATE(crname_glb)
1340         ELSE
1341            WRITE(numcom,*) ' No MPI global communication '
1342         ENDIF
1343         WRITE(numcom,*) ' '
1344         IF ( n_sequence_dlg > 0 ) THEN
1345            WRITE(numcom,'(A,I4)') ' Delayed global communications : ', n_sequence_dlg
1346            jj = 1
1347            DO ji = 2, n_sequence_dlg
1348               IF( crname_dlg(ji-1) /= crname_dlg(ji) ) THEN
1349                  WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_dlg(ji-1))
1350                  jj = 0
1351               END IF
1352               jj = jj + 1 
1353            END DO
1354            WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_dlg(n_sequence_dlg))
1355            DEALLOCATE(crname_dlg)
1356         ELSE
1357            WRITE(numcom,*) ' No MPI delayed global communication '
1358         ENDIF
1359         WRITE(numcom,*) ' '
1360         WRITE(numcom,*) ' -----------------------------------------------'
1361         WRITE(numcom,*) ' '
1362         DEALLOCATE(ncomm_sequence)
1363         DEALLOCATE(crname_lbc)
1364      ENDIF
1365#endif
1366   END SUBROUTINE mpp_report
1367
1368   
1369   SUBROUTINE tic_tac (ld_tic, ld_global)
1370
1371    LOGICAL,           INTENT(IN) :: ld_tic
1372    LOGICAL, OPTIONAL, INTENT(IN) :: ld_global
1373    REAL(dp), DIMENSION(2), SAVE :: tic_wt
1374    REAL(dp),               SAVE :: tic_ct = 0._dp
1375    INTEGER :: ii
1376#if defined key_mpp_mpi
1377
1378    IF( ncom_stp <= nit000 ) RETURN
1379    IF( ncom_stp == nitend ) RETURN
1380    ii = 1
1381    IF( PRESENT( ld_global ) ) THEN
1382       IF( ld_global ) ii = 2
1383    END IF
1384   
1385    IF ( ld_tic ) THEN
1386       tic_wt(ii) = MPI_Wtime()                                                    ! start count tic->tac (waiting time)
1387       IF ( tic_ct > 0.0_dp ) compute_time = compute_time + MPI_Wtime() - tic_ct   ! cumulate count tac->tic
1388    ELSE
1389       waiting_time(ii) = waiting_time(ii) + MPI_Wtime() - tic_wt(ii)              ! cumulate count tic->tac
1390       tic_ct = MPI_Wtime()                                                        ! start count tac->tic (waiting time)
1391    ENDIF
1392#endif
1393   
1394   END SUBROUTINE tic_tac
1395
1396#if ! defined key_mpp_mpi
1397   SUBROUTINE mpi_wait(request, status, ierror)
1398      INTEGER                            , INTENT(in   ) ::   request
1399      INTEGER, DIMENSION(MPI_STATUS_SIZE), INTENT(  out) ::   status
1400      INTEGER                            , INTENT(  out) ::   ierror
1401   END SUBROUTINE mpi_wait
1402
1403   
1404   FUNCTION MPI_Wtime()
1405      REAL(wp) ::  MPI_Wtime
1406      MPI_Wtime = -1.
1407   END FUNCTION MPI_Wtime
1408#endif
1409
1410   !!----------------------------------------------------------------------
1411   !!   ctl_stop, ctl_warn, get_unit, ctl_opn, ctl_nam, load_nml   routines
1412   !!----------------------------------------------------------------------
1413
1414   SUBROUTINE ctl_stop( cd1, cd2, cd3, cd4, cd5 ,   &
1415      &                 cd6, cd7, cd8, cd9, cd10 )
1416      !!----------------------------------------------------------------------
1417      !!                  ***  ROUTINE  stop_opa  ***
1418      !!
1419      !! ** Purpose :   print in ocean.outpput file a error message and
1420      !!                increment the error number (nstop) by one.
1421      !!----------------------------------------------------------------------
1422      CHARACTER(len=*), INTENT(in   )           ::   cd1
1423      CHARACTER(len=*), INTENT(in   ), OPTIONAL ::        cd2, cd3, cd4, cd5
1424      CHARACTER(len=*), INTENT(in   ), OPTIONAL ::   cd6, cd7, cd8, cd9, cd10
1425      !
1426      CHARACTER(LEN=8) ::   clfmt            ! writing format
1427      INTEGER          ::   inum
1428      !!----------------------------------------------------------------------
1429      !
1430      nstop = nstop + 1
1431      !
1432      IF( cd1 == 'STOP' .AND. narea /= 1 ) THEN    ! Immediate stop: add an arror message in 'ocean.output' file
1433         CALL ctl_opn( inum, 'ocean.output', 'APPEND', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. )
1434         WRITE(inum,*)
1435         WRITE(inum,*) ' ==>>>   Look for "E R R O R" messages in all existing *ocean.output* files'
1436         CLOSE(inum)
1437      ENDIF
1438      IF( numout == 6 ) THEN                       ! force to open ocean.output file if not already opened
1439         CALL ctl_opn( numout, 'ocean.output', 'REPLACE', 'FORMATTED', 'SEQUENTIAL', -1, -1, .FALSE., narea )
1440      ENDIF
1441      !
1442                            WRITE(numout,*)
1443                            WRITE(numout,*) ' ===>>> : E R R O R'
1444                            WRITE(numout,*)
1445                            WRITE(numout,*) '         ==========='
1446                            WRITE(numout,*)
1447                            WRITE(numout,*) TRIM(cd1)
1448      IF( PRESENT(cd2 ) )   WRITE(numout,*) TRIM(cd2)
1449      IF( PRESENT(cd3 ) )   WRITE(numout,*) TRIM(cd3)
1450      IF( PRESENT(cd4 ) )   WRITE(numout,*) TRIM(cd4)
1451      IF( PRESENT(cd5 ) )   WRITE(numout,*) TRIM(cd5)
1452      IF( PRESENT(cd6 ) )   WRITE(numout,*) TRIM(cd6)
1453      IF( PRESENT(cd7 ) )   WRITE(numout,*) TRIM(cd7)
1454      IF( PRESENT(cd8 ) )   WRITE(numout,*) TRIM(cd8)
1455      IF( PRESENT(cd9 ) )   WRITE(numout,*) TRIM(cd9)
1456      IF( PRESENT(cd10) )   WRITE(numout,*) TRIM(cd10)
1457                            WRITE(numout,*)
1458      !
1459                               CALL FLUSH(numout    )
1460      IF( numstp     /= -1 )   CALL FLUSH(numstp    )
1461      IF( numrun     /= -1 )   CALL FLUSH(numrun    )
1462      IF( numevo_ice /= -1 )   CALL FLUSH(numevo_ice)
1463      !
1464      IF( cd1 == 'STOP' ) THEN
1465         WRITE(numout,*) 
1466         WRITE(numout,*)  'huge E-R-R-O-R : immediate stop'
1467         WRITE(numout,*) 
1468         CALL FLUSH(numout)
1469         CALL SLEEP(60)   ! make sure that all output and abort files are written by all cores. 60s should be enough...
1470         CALL mppstop( ld_abort = .true. )
1471      ENDIF
1472      !
1473   END SUBROUTINE ctl_stop
1474
1475
1476   SUBROUTINE ctl_warn( cd1, cd2, cd3, cd4, cd5,   &
1477      &                 cd6, cd7, cd8, cd9, cd10 )
1478      !!----------------------------------------------------------------------
1479      !!                  ***  ROUTINE  stop_warn  ***
1480      !!
1481      !! ** Purpose :   print in ocean.outpput file a error message and
1482      !!                increment the warning number (nwarn) by one.
1483      !!----------------------------------------------------------------------
1484      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd1, cd2, cd3, cd4, cd5
1485      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd6, cd7, cd8, cd9, cd10
1486      !!----------------------------------------------------------------------
1487      !
1488      nwarn = nwarn + 1
1489      !
1490      IF(lwp) THEN
1491                               WRITE(numout,*)
1492                               WRITE(numout,*) ' ===>>> : W A R N I N G'
1493                               WRITE(numout,*)
1494                               WRITE(numout,*) '         ==============='
1495                               WRITE(numout,*)
1496         IF( PRESENT(cd1 ) )   WRITE(numout,*) TRIM(cd1)
1497         IF( PRESENT(cd2 ) )   WRITE(numout,*) TRIM(cd2)
1498         IF( PRESENT(cd3 ) )   WRITE(numout,*) TRIM(cd3)
1499         IF( PRESENT(cd4 ) )   WRITE(numout,*) TRIM(cd4)
1500         IF( PRESENT(cd5 ) )   WRITE(numout,*) TRIM(cd5)
1501         IF( PRESENT(cd6 ) )   WRITE(numout,*) TRIM(cd6)
1502         IF( PRESENT(cd7 ) )   WRITE(numout,*) TRIM(cd7)
1503         IF( PRESENT(cd8 ) )   WRITE(numout,*) TRIM(cd8)
1504         IF( PRESENT(cd9 ) )   WRITE(numout,*) TRIM(cd9)
1505         IF( PRESENT(cd10) )   WRITE(numout,*) TRIM(cd10)
1506                               WRITE(numout,*)
1507      ENDIF
1508      CALL FLUSH(numout)
1509      !
1510   END SUBROUTINE ctl_warn
1511
1512
1513   SUBROUTINE ctl_opn( knum, cdfile, cdstat, cdform, cdacce, klengh, kout, ldwp, karea )
1514      !!----------------------------------------------------------------------
1515      !!                  ***  ROUTINE ctl_opn  ***
1516      !!
1517      !! ** Purpose :   Open file and check if required file is available.
1518      !!
1519      !! ** Method  :   Fortan open
1520      !!----------------------------------------------------------------------
1521      INTEGER          , INTENT(  out) ::   knum      ! logical unit to open
1522      CHARACTER(len=*) , INTENT(in   ) ::   cdfile    ! file name to open
1523      CHARACTER(len=*) , INTENT(in   ) ::   cdstat    ! disposition specifier
1524      CHARACTER(len=*) , INTENT(in   ) ::   cdform    ! formatting specifier
1525      CHARACTER(len=*) , INTENT(in   ) ::   cdacce    ! access specifier
1526      INTEGER          , INTENT(in   ) ::   klengh    ! record length
1527      INTEGER          , INTENT(in   ) ::   kout      ! number of logical units for write
1528      LOGICAL          , INTENT(in   ) ::   ldwp      ! boolean term for print
1529      INTEGER, OPTIONAL, INTENT(in   ) ::   karea     ! proc number
1530      !
1531      CHARACTER(len=80) ::   clfile
1532      CHARACTER(LEN=10) ::   clfmt            ! writing format
1533      INTEGER           ::   iost
1534      INTEGER           ::   idg              ! number of digits
1535      !!----------------------------------------------------------------------
1536      !
1537      ! adapt filename
1538      ! ----------------
1539      clfile = TRIM(cdfile)
1540      IF( PRESENT( karea ) ) THEN
1541         IF( karea > 1 ) THEN
1542            ! Warning: jpnij is maybe not already defined when calling ctl_opn -> use mppsize instead of jpnij
1543            idg = MAX( INT(LOG10(REAL(MAX(1,mppsize-1),wp))) + 1, 4 )      ! how many digits to we need to write? min=4, max=9
1544            WRITE(clfmt, "('(a,a,i', i1, '.', i1, ')')") idg, idg          ! '(a,a,ix.x)'
1545            WRITE(clfile, clfmt) TRIM(clfile), '_', karea-1
1546         ENDIF
1547      ENDIF
1548#if defined key_agrif
1549      IF( .NOT. Agrif_Root() )   clfile = TRIM(Agrif_CFixed())//'_'//TRIM(clfile)
1550      knum=Agrif_Get_Unit()
1551#else
1552      knum=get_unit()
1553#endif
1554      IF( TRIM(cdfile) == '/dev/null' )   clfile = TRIM(cdfile)   ! force the use of /dev/null
1555      !
1556      IF(       cdacce(1:6) == 'DIRECT' )  THEN   ! cdacce has always more than 6 characters
1557         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat, RECL=klengh         , ERR=100, IOSTAT=iost )
1558      ELSE IF( TRIM(cdstat) == 'APPEND' )  THEN   ! cdstat can have less than 6 characters
1559         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS='UNKNOWN', POSITION='APPEND', ERR=100, IOSTAT=iost )
1560      ELSE
1561         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )
1562      ENDIF
1563      IF( iost /= 0 .AND. TRIM(clfile) == '/dev/null' ) &   ! for windows
1564         &  OPEN(UNIT=knum,FILE='NUL', FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )   
1565      IF( iost == 0 ) THEN
1566         IF(ldwp .AND. kout > 0) THEN
1567            WRITE(kout,*) '     file   : ', TRIM(clfile),' open ok'
1568            WRITE(kout,*) '     unit   = ', knum
1569            WRITE(kout,*) '     status = ', cdstat
1570            WRITE(kout,*) '     form   = ', cdform
1571            WRITE(kout,*) '     access = ', cdacce
1572            WRITE(kout,*)
1573         ENDIF
1574      ENDIF
1575100   CONTINUE
1576      IF( iost /= 0 ) THEN
1577         WRITE(ctmp1,*) ' ===>>>> : bad opening file: ', TRIM(clfile)
1578         WRITE(ctmp2,*) ' =======   ===  '
1579         WRITE(ctmp3,*) '           unit   = ', knum
1580         WRITE(ctmp4,*) '           status = ', cdstat
1581         WRITE(ctmp5,*) '           form   = ', cdform
1582         WRITE(ctmp6,*) '           access = ', cdacce
1583         WRITE(ctmp7,*) '           iostat = ', iost
1584         WRITE(ctmp8,*) '           we stop. verify the file '
1585         CALL ctl_stop( 'STOP', ctmp1, ctmp2, ctmp3, ctmp4, ctmp5, ctmp6, ctmp7, ctmp8 )
1586      ENDIF
1587      !
1588   END SUBROUTINE ctl_opn
1589
1590
1591   SUBROUTINE ctl_nam ( kios, cdnam )
1592      !!----------------------------------------------------------------------
1593      !!                  ***  ROUTINE ctl_nam  ***
1594      !!
1595      !! ** Purpose :   Informations when error while reading a namelist
1596      !!
1597      !! ** Method  :   Fortan open
1598      !!----------------------------------------------------------------------
1599      INTEGER                                , INTENT(inout) ::   kios    ! IO status after reading the namelist
1600      CHARACTER(len=*)                       , INTENT(in   ) ::   cdnam   ! group name of namelist for which error occurs
1601      !
1602      CHARACTER(len=5) ::   clios   ! string to convert iostat in character for print
1603      !!----------------------------------------------------------------------
1604      !
1605      WRITE (clios, '(I5.0)')   kios
1606      IF( kios < 0 ) THEN         
1607         CALL ctl_warn( 'end of record or file while reading namelist '   &
1608            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
1609      ENDIF
1610      !
1611      IF( kios > 0 ) THEN
1612         CALL ctl_stop( 'misspelled variable in namelist '   &
1613            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
1614      ENDIF
1615      kios = 0
1616      !
1617   END SUBROUTINE ctl_nam
1618
1619
1620   INTEGER FUNCTION get_unit()
1621      !!----------------------------------------------------------------------
1622      !!                  ***  FUNCTION  get_unit  ***
1623      !!
1624      !! ** Purpose :   return the index of an unused logical unit
1625      !!----------------------------------------------------------------------
1626      LOGICAL :: llopn
1627      !!----------------------------------------------------------------------
1628      !
1629      get_unit = 15   ! choose a unit that is big enough then it is not already used in NEMO
1630      llopn = .TRUE.
1631      DO WHILE( (get_unit < 998) .AND. llopn )
1632         get_unit = get_unit + 1
1633         INQUIRE( unit = get_unit, opened = llopn )
1634      END DO
1635      IF( (get_unit == 999) .AND. llopn ) THEN
1636         CALL ctl_stop( 'STOP', 'get_unit: All logical units until 999 are used...' )
1637      ENDIF
1638      !
1639   END FUNCTION get_unit
1640
1641   SUBROUTINE load_nml( cdnambuff , cdnamfile, kout, ldwp)
1642      CHARACTER(LEN=:)    , ALLOCATABLE, INTENT(INOUT) :: cdnambuff
1643      CHARACTER(LEN=*), INTENT(IN )                :: cdnamfile
1644      CHARACTER(LEN=256)                           :: chline
1645      CHARACTER(LEN=1)                             :: csp
1646      INTEGER, INTENT(IN)                          :: kout
1647      LOGICAL, INTENT(IN)                          :: ldwp  !: .true. only for the root broadcaster
1648      INTEGER                                      :: itot, iun, iltc, inl, ios, itotsav
1649      !
1650      !csp = NEW_LINE('A')
1651      ! a new line character is the best seperator but some systems (e.g.Cray)
1652      ! seem to terminate namelist reads from internal files early if they
1653      ! encounter new-lines. Use a single space for safety.
1654      csp = ' '
1655      !
1656      ! Check if the namelist buffer has already been allocated. Return if it has.
1657      !
1658      IF ( ALLOCATED( cdnambuff ) ) RETURN
1659      IF( ldwp ) THEN
1660         !
1661         ! Open namelist file
1662         !
1663         CALL ctl_opn( iun, cdnamfile, 'OLD', 'FORMATTED', 'SEQUENTIAL', -1, kout, ldwp )
1664         !
1665         ! First pass: count characters excluding comments and trimable white space
1666         !
1667         itot=0
1668     10  READ(iun,'(A256)',END=20,ERR=20) chline
1669         iltc = LEN_TRIM(chline)
1670         IF ( iltc.GT.0 ) THEN
1671          inl = INDEX(chline, '!') 
1672          IF( inl.eq.0 ) THEN
1673           itot = itot + iltc + 1                                ! +1 for the newline character
1674          ELSEIF( inl.GT.0 .AND. LEN_TRIM( chline(1:inl-1) ).GT.0 ) THEN
1675           itot = itot + inl                                  !  includes +1 for the newline character
1676          ENDIF
1677         ENDIF
1678         GOTO 10
1679     20  CONTINUE
1680         !
1681         ! Allocate text cdnambuff for condensed namelist
1682         !
1683!$AGRIF_DO_NOT_TREAT
1684         ALLOCATE( CHARACTER(LEN=itot) :: cdnambuff )
1685!$AGRIF_END_DO_NOT_TREAT
1686         itotsav = itot
1687         !
1688         ! Second pass: read and transfer pruned characters into cdnambuff
1689         !
1690         REWIND(iun)
1691         itot=1
1692     30  READ(iun,'(A256)',END=40,ERR=40) chline
1693         iltc = LEN_TRIM(chline)
1694         IF ( iltc.GT.0 ) THEN
1695          inl = INDEX(chline, '!')
1696          IF( inl.eq.0 ) THEN
1697           inl = iltc
1698          ELSE
1699           inl = inl - 1
1700          ENDIF
1701          IF( inl.GT.0 .AND. LEN_TRIM( chline(1:inl) ).GT.0 ) THEN
1702             cdnambuff(itot:itot+inl-1) = chline(1:inl)
1703             WRITE( cdnambuff(itot+inl:itot+inl), '(a)' ) csp
1704             itot = itot + inl + 1
1705          ENDIF
1706         ENDIF
1707         GOTO 30
1708     40  CONTINUE
1709         itot = itot - 1
1710         IF( itotsav .NE. itot ) WRITE(*,*) 'WARNING in load_nml. Allocated ',itotsav,' for read buffer; but used ',itot
1711         !
1712         ! Close namelist file
1713         !
1714         CLOSE(iun)
1715         !write(*,'(32A)') cdnambuff
1716      ENDIF
1717#if defined key_mpp_mpi
1718      CALL mpp_bcast_nml( cdnambuff, itot )
1719#endif
1720  END SUBROUTINE load_nml
1721
1722
1723   !!----------------------------------------------------------------------
1724END MODULE lib_mpp
Note: See TracBrowser for help on using the repository browser.