New URL for NEMO forge!   http://forge.nemo-ocean.eu

Since March 2022 along with NEMO 4.2 release, the code development moved to a self-hosted GitLab.
This present forge is now archived and remained online for history.
lib_mpp.F90 in NEMO/branches/2018/dev_r10164_HPC09_ESIWACE_PREP_MERGE/src/OCE/LBC – NEMO

source: NEMO/branches/2018/dev_r10164_HPC09_ESIWACE_PREP_MERGE/src/OCE/LBC/lib_mpp.F90 @ 10386

Last change on this file since 10386 was 10386, checked in by smasson, 5 years ago

dev_r10164_HPC09_ESIWACE_PREP_MERGE: safer use of l_full_nf_update (2 lines north fold exchange), see #2133

  • Property svn:keywords set to Id
File size: 82.3 KB
Line 
1MODULE lib_mpp
2   !!======================================================================
3   !!                       ***  MODULE  lib_mpp  ***
4   !! Ocean numerics:  massively parallel processing library
5   !!=====================================================================
6   !! History :  OPA  !  1994  (M. Guyon, J. Escobar, M. Imbard)  Original code
7   !!            7.0  !  1997  (A.M. Treguier)  SHMEM additions
8   !!            8.0  !  1998  (M. Imbard, J. Escobar, L. Colombet ) SHMEM and MPI
9   !!                 !  1998  (J.M. Molines) Open boundary conditions
10   !!   NEMO     1.0  !  2003  (J.M. Molines, G. Madec)  F90, free form
11   !!                 !  2003  (J.M. Molines) add mpp_ini_north(_3d,_2d)
12   !!             -   !  2004  (R. Bourdalle Badie)  isend option in mpi
13   !!                 !  2004  (J.M. Molines) minloc, maxloc
14   !!             -   !  2005  (G. Madec, S. Masson)  npolj=5,6 F-point & ice cases
15   !!             -   !  2005  (R. Redler) Replacement of MPI_COMM_WORLD except for MPI_Abort
16   !!             -   !  2005  (R. Benshila, G. Madec)  add extra halo case
17   !!             -   !  2008  (R. Benshila) add mpp_ini_ice
18   !!            3.2  !  2009  (R. Benshila) SHMEM suppression, north fold in lbc_nfd
19   !!            3.2  !  2009  (O. Marti)    add mpp_ini_znl
20   !!            4.0  !  2011  (G. Madec)  move ctl_ routines from in_out_manager
21   !!            3.5  !  2012  (S.Mocavero, I. Epicoco) Add mpp_lnk_bdy_3d/2d routines to optimize the BDY comm.
22   !!            3.5  !  2013  (C. Ethe, G. Madec)  message passing arrays as local variables
23   !!            3.5  !  2013  (S.Mocavero, I.Epicoco - CMCC) north fold optimizations
24   !!            3.6  !  2015  (O. Tintó and M. Castrillo - BSC) Added '_multiple' case for 2D lbc and max
25   !!            4.0  !  2017  (G. Madec) automatique allocation of array argument (use any 3rd dimension)
26   !!             -   !  2017  (G. Madec) create generic.h90 files to generate all lbc and north fold routines
27   !!----------------------------------------------------------------------
28
29   !!----------------------------------------------------------------------
30   !!   ctl_stop      : update momentum and tracer Kz from a tke scheme
31   !!   ctl_warn      : initialization, namelist read, and parameters control
32   !!   ctl_opn       : Open file and check if required file is available.
33   !!   ctl_nam       : Prints informations when an error occurs while reading a namelist
34   !!   get_unit      : give the index of an unused logical unit
35   !!----------------------------------------------------------------------
36#if   defined key_mpp_mpi
37   !!----------------------------------------------------------------------
38   !!   'key_mpp_mpi'             MPI massively parallel processing library
39   !!----------------------------------------------------------------------
40   !!   lib_mpp_alloc : allocate mpp arrays
41   !!   mynode        : indentify the processor unit
42   !!   mpp_lnk       : interface (defined in lbclnk) for message passing of 2d or 3d arrays (mpp_lnk_2d, mpp_lnk_3d)
43   !!   mpp_lnk_icb   : interface for message passing of 2d arrays with extra halo for icebergs (mpp_lnk_2d_icb)
44   !!   mpprecv       :
45   !!   mppsend       :
46   !!   mppscatter    :
47   !!   mppgather     :
48   !!   mpp_min       : generic interface for mppmin_int , mppmin_a_int , mppmin_real, mppmin_a_real
49   !!   mpp_max       : generic interface for mppmax_int , mppmax_a_int , mppmax_real, mppmax_a_real
50   !!   mpp_sum       : generic interface for mppsum_int , mppsum_a_int , mppsum_real, mppsum_a_real
51   !!   mpp_minloc    :
52   !!   mpp_maxloc    :
53   !!   mppsync       :
54   !!   mppstop       :
55   !!   mpp_ini_north : initialisation of north fold
56   !!   mpp_lbc_north_icb : alternative to mpp_nfd for extra outer halo with icebergs
57   !!----------------------------------------------------------------------
58   USE dom_oce        ! ocean space and time domain
59   USE lbcnfd         ! north fold treatment
60   USE in_out_manager ! I/O manager
61
62   IMPLICIT NONE
63   PRIVATE
64
65   INTERFACE mpp_nfd
66      MODULE PROCEDURE   mpp_nfd_2d    , mpp_nfd_3d    , mpp_nfd_4d
67      MODULE PROCEDURE   mpp_nfd_2d_ptr, mpp_nfd_3d_ptr, mpp_nfd_4d_ptr
68   END INTERFACE
69
70   ! Interface associated to the mpp_lnk_... routines is defined in lbclnk
71   PUBLIC   mpp_lnk_2d    , mpp_lnk_3d    , mpp_lnk_4d
72   PUBLIC   mpp_lnk_2d_ptr, mpp_lnk_3d_ptr, mpp_lnk_4d_ptr
73   !
74!!gm  this should be useless
75   PUBLIC   mpp_nfd_2d    , mpp_nfd_3d    , mpp_nfd_4d
76   PUBLIC   mpp_nfd_2d_ptr, mpp_nfd_3d_ptr, mpp_nfd_4d_ptr
77!!gm end
78   !
79   PUBLIC   ctl_stop, ctl_warn, get_unit, ctl_opn, ctl_nam
80   PUBLIC   mynode, mppstop, mppsync, mpp_comm_free
81   PUBLIC   mpp_ini_north
82   PUBLIC   mpp_lnk_2d_icb
83   PUBLIC   mpp_lbc_north_icb
84   PUBLIC   mpp_min, mpp_max, mpp_sum, mpp_minloc, mpp_maxloc
85   PUBLIC   mpp_delay_max
86   PUBLIC   mppscatter, mppgather
87   PUBLIC   mpp_ini_znl
88   PUBLIC   mppsend, mpprecv                          ! needed by TAM and ICB routines
89   PUBLIC   mpp_lnk_bdy_2d, mpp_lnk_bdy_3d, mpp_lnk_bdy_4d
90   
91   !! * Interfaces
92   !! define generic interface for these routine as they are called sometimes
93   !! with scalar arguments instead of array arguments, which causes problems
94   !! for the compilation on AIX system as well as NEC and SGI. Ok on COMPACQ
95   INTERFACE mpp_min
96      MODULE PROCEDURE mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real
97   END INTERFACE
98   INTERFACE mpp_max
99      MODULE PROCEDURE mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real
100   END INTERFACE
101   INTERFACE mpp_sum
102      MODULE PROCEDURE mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real,   &
103         &             mppsum_realdd, mppsum_a_realdd
104   END INTERFACE
105   INTERFACE mpp_minloc
106      MODULE PROCEDURE mpp_minloc2d ,mpp_minloc3d
107   END INTERFACE
108   INTERFACE mpp_maxloc
109      MODULE PROCEDURE mpp_maxloc2d ,mpp_maxloc3d
110   END INTERFACE
111
112   !! ========================= !!
113   !!  MPI  variable definition !!
114   !! ========================= !!
115!$AGRIF_DO_NOT_TREAT
116   INCLUDE 'mpif.h'
117!$AGRIF_END_DO_NOT_TREAT
118
119   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .TRUE.    !: mpp flag
120
121   INTEGER, PARAMETER         ::   nprocmax = 2**10   ! maximun dimension (required to be a power of 2)
122
123   INTEGER, PUBLIC ::   mppsize        ! number of process
124   INTEGER, PUBLIC ::   mpprank        ! process number  [ 0 - size-1 ]
125!$AGRIF_DO_NOT_TREAT
126   INTEGER, PUBLIC ::   mpi_comm_oce   ! opa local communicator
127!$AGRIF_END_DO_NOT_TREAT
128
129   INTEGER :: MPI_SUMDD
130
131   ! variables used for zonal integration
132   INTEGER, PUBLIC ::   ncomm_znl       !: communicator made by the processors on the same zonal average
133   LOGICAL, PUBLIC ::   l_znl_root      !: True on the 'left'most processor on the same row
134   INTEGER         ::   ngrp_znl        !  group ID for the znl processors
135   INTEGER         ::   ndim_rank_znl   !  number of processors on the same zonal average
136   INTEGER, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_znl  ! dimension ndim_rank_znl, number of the procs into the same znl domain
137
138   ! North fold condition in mpp_mpi with jpni > 1 (PUBLIC for TAM)
139   INTEGER, PUBLIC ::   ngrp_world        !: group ID for the world processors
140   INTEGER, PUBLIC ::   ngrp_opa          !: group ID for the opa processors
141   INTEGER, PUBLIC ::   ngrp_north        !: group ID for the northern processors (to be fold)
142   INTEGER, PUBLIC ::   ncomm_north       !: communicator made by the processors belonging to ngrp_north
143   INTEGER, PUBLIC ::   ndim_rank_north   !: number of 'sea' processor in the northern line (can be /= jpni !)
144   INTEGER, PUBLIC ::   njmppmax          !: value of njmpp for the processors of the northern line
145   INTEGER, PUBLIC ::   north_root        !: number (in the comm_opa) of proc 0 in the northern comm
146   INTEGER, PUBLIC, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_north   !: dimension ndim_rank_north
147
148   ! Type of send : standard, buffered, immediate
149   CHARACTER(len=1), PUBLIC ::   cn_mpi_send        !: type od mpi send/recieve (S=standard, B=bsend, I=isend)
150   LOGICAL         , PUBLIC ::   l_isend = .FALSE.  !: isend use indicator (T if cn_mpi_send='I')
151   INTEGER         , PUBLIC ::   nn_buffer          !: size of the buffer in case of mpi_bsend
152
153   ! Communications summary report
154   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_lbc                   !: names of lbc_lnk calling routines
155   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_glb                   !: names of global comm  calling routines
156   INTEGER, PUBLIC                               ::   ncom_stp = 0                 !: copy of time step # istp
157   INTEGER, PUBLIC                               ::   ncom_fsbc = 1                !: copy of sbc time step # nn_fsbc
158   INTEGER, PUBLIC                               ::   ncom_dttrc = 1               !: copy of top time step # nn_dttrc
159   INTEGER, PUBLIC                               ::   ncom_freq                    !: frequency of comm diagnostic
160   INTEGER, PUBLIC , DIMENSION(:,:), ALLOCATABLE ::   ncomm_sequence               !: size of communicated arrays (halos)
161   INTEGER, PARAMETER, PUBLIC                    ::   ncom_rec_max = 2000          !: max number of communication record
162   INTEGER, PUBLIC                               ::   n_sequence_lbc = 0           !: # of communicated arraysvia lbc
163   INTEGER, PUBLIC                               ::   n_sequence_glb = 0           !: # of global communications
164   INTEGER, PUBLIC                               ::   numcom = -1                  !: logical unit for communicaton report
165   LOGICAL, PUBLIC                               ::   l_full_nf_update = .TRUE.    !: logical for a full (2lines) update of bc at North fold report
166   INTEGER,                    PARAMETER, PUBLIC ::   nbdelay = 1       !: number of delayed operations
167!$AGRIF_DO_NOT_TREAT
168   CHARACTER(len=32), DIMENSION(nbdelay), PUBLIC ::   c_delaylist       !: name (used as id) of allreduce-delayed operations
169   DATA c_delaylist(1) / 'advumx_delay' /
170!$AGRIF_END_DO_NOT_TREAT
171   REAL(wp),          DIMENSION(nbdelay), PUBLIC ::   todelay           !: current value of the delayed operations
172   INTEGER,           DIMENSION(nbdelay), PUBLIC ::   ndelayid = -1     !: mpi request id of the delayed operations
173
174   ! timing summary report
175   REAL(wp), DIMENSION(2), PUBLIC ::  waiting_time = 0._wp
176   REAL(wp)              , PUBLIC ::  compute_time = 0._wp, elapsed_time = 0._wp
177   
178   REAL(wp), DIMENSION(:), ALLOCATABLE, SAVE ::   tampon   ! buffer in case of bsend
179
180   LOGICAL, PUBLIC ::   ln_nnogather                !: namelist control of northfold comms
181   LOGICAL, PUBLIC ::   l_north_nogather = .FALSE.  !: internal control of northfold comms
182
183   !!----------------------------------------------------------------------
184   !! NEMO/OCE 4.0 , NEMO Consortium (2018)
185   !! $Id$
186   !! Software governed by the CeCILL license (see ./LICENSE)
187   !!----------------------------------------------------------------------
188CONTAINS
189
190   FUNCTION mynode( ldtxt, ldname, kumnam_ref, kumnam_cfg, kumond, kstop, localComm )
191      !!----------------------------------------------------------------------
192      !!                  ***  routine mynode  ***
193      !!
194      !! ** Purpose :   Find processor unit
195      !!----------------------------------------------------------------------
196      CHARACTER(len=*),DIMENSION(:), INTENT(  out) ::   ldtxt        !
197      CHARACTER(len=*)             , INTENT(in   ) ::   ldname       !
198      INTEGER                      , INTENT(in   ) ::   kumnam_ref   ! logical unit for reference namelist
199      INTEGER                      , INTENT(in   ) ::   kumnam_cfg   ! logical unit for configuration namelist
200      INTEGER                      , INTENT(inout) ::   kumond       ! logical unit for namelist output
201      INTEGER                      , INTENT(inout) ::   kstop        ! stop indicator
202      INTEGER         , OPTIONAL   , INTENT(in   ) ::   localComm    !
203      !
204      INTEGER ::   mynode, ierr, code, ji, ii, ios
205      LOGICAL ::   mpi_was_called
206      !
207      NAMELIST/nammpp/ cn_mpi_send, nn_buffer, jpni, jpnj, jpnij, ln_nnogather
208      !!----------------------------------------------------------------------
209      !
210      ii = 1
211      WRITE(ldtxt(ii),*)                                                                  ;   ii = ii + 1
212      WRITE(ldtxt(ii),*) 'mynode : mpi initialisation'                                    ;   ii = ii + 1
213      WRITE(ldtxt(ii),*) '~~~~~~ '                                                        ;   ii = ii + 1
214      !
215      REWIND( kumnam_ref )              ! Namelist nammpp in reference namelist: mpi variables
216      READ  ( kumnam_ref, nammpp, IOSTAT = ios, ERR = 901)
217901   IF( ios /= 0 )   CALL ctl_nam ( ios , 'nammpp in reference namelist', lwp )
218      !
219      REWIND( kumnam_cfg )              ! Namelist nammpp in configuration namelist: mpi variables
220      READ  ( kumnam_cfg, nammpp, IOSTAT = ios, ERR = 902 )
221902   IF( ios >  0 )   CALL ctl_nam ( ios , 'nammpp in configuration namelist', lwp )
222      !
223      !                              ! control print
224      WRITE(ldtxt(ii),*) '   Namelist nammpp'                                             ;   ii = ii + 1
225      WRITE(ldtxt(ii),*) '      mpi send type          cn_mpi_send = ', cn_mpi_send       ;   ii = ii + 1
226      WRITE(ldtxt(ii),*) '      size exported buffer   nn_buffer   = ', nn_buffer,' bytes';   ii = ii + 1
227      !
228      IF( jpni < 1 .OR. jpnj < 1  ) THEN
229         WRITE(ldtxt(ii),*) '      jpni, jpnj and jpnij will be calculated automatically' ;   ii = ii + 1
230      ELSE
231         WRITE(ldtxt(ii),*) '      processor grid extent in i         jpni = ',jpni       ;   ii = ii + 1
232         WRITE(ldtxt(ii),*) '      processor grid extent in j         jpnj = ',jpnj       ;   ii = ii + 1
233      ENDIF
234
235      WRITE(ldtxt(ii),*) '      avoid use of mpi_allgather at the north fold  ln_nnogather = ', ln_nnogather  ; ii = ii + 1
236
237      CALL mpi_initialized ( mpi_was_called, code )
238      IF( code /= MPI_SUCCESS ) THEN
239         DO ji = 1, SIZE(ldtxt)
240            IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
241         END DO
242         WRITE(*, cform_err)
243         WRITE(*, *) 'lib_mpp: Error in routine mpi_initialized'
244         CALL mpi_abort( mpi_comm_world, code, ierr )
245      ENDIF
246
247      IF( mpi_was_called ) THEN
248         !
249         SELECT CASE ( cn_mpi_send )
250         CASE ( 'S' )                ! Standard mpi send (blocking)
251            WRITE(ldtxt(ii),*) '           Standard blocking mpi send (send)'             ;   ii = ii + 1
252         CASE ( 'B' )                ! Buffer mpi send (blocking)
253            WRITE(ldtxt(ii),*) '           Buffer blocking mpi send (bsend)'              ;   ii = ii + 1
254            IF( Agrif_Root() )   CALL mpi_init_oce( ldtxt, ii, ierr )
255         CASE ( 'I' )                ! Immediate mpi send (non-blocking send)
256            WRITE(ldtxt(ii),*) '           Immediate non-blocking send (isend)'           ;   ii = ii + 1
257            l_isend = .TRUE.
258         CASE DEFAULT
259            WRITE(ldtxt(ii),cform_err)                                                    ;   ii = ii + 1
260            WRITE(ldtxt(ii),*) '           bad value for cn_mpi_send = ', cn_mpi_send     ;   ii = ii + 1
261            kstop = kstop + 1
262         END SELECT
263         !
264      ELSEIF ( PRESENT(localComm) .AND. .NOT. mpi_was_called ) THEN
265         WRITE(ldtxt(ii),*) ' lib_mpp: You cannot provide a local communicator '          ;   ii = ii + 1
266         WRITE(ldtxt(ii),*) '          without calling MPI_Init before ! '                ;   ii = ii + 1
267         kstop = kstop + 1
268      ELSE
269         SELECT CASE ( cn_mpi_send )
270         CASE ( 'S' )                ! Standard mpi send (blocking)
271            WRITE(ldtxt(ii),*) '           Standard blocking mpi send (send)'             ;   ii = ii + 1
272            CALL mpi_init( ierr )
273         CASE ( 'B' )                ! Buffer mpi send (blocking)
274            WRITE(ldtxt(ii),*) '           Buffer blocking mpi send (bsend)'              ;   ii = ii + 1
275            IF( Agrif_Root() )   CALL mpi_init_oce( ldtxt, ii, ierr )
276         CASE ( 'I' )                ! Immediate mpi send (non-blocking send)
277            WRITE(ldtxt(ii),*) '           Immediate non-blocking send (isend)'           ;   ii = ii + 1
278            l_isend = .TRUE.
279            CALL mpi_init( ierr )
280         CASE DEFAULT
281            WRITE(ldtxt(ii),cform_err)                                                    ;   ii = ii + 1
282            WRITE(ldtxt(ii),*) '           bad value for cn_mpi_send = ', cn_mpi_send     ;   ii = ii + 1
283            kstop = kstop + 1
284         END SELECT
285         !
286      ENDIF
287
288      IF( PRESENT(localComm) ) THEN
289         IF( Agrif_Root() ) THEN
290            mpi_comm_oce = localComm
291         ENDIF
292      ELSE
293         CALL mpi_comm_dup( mpi_comm_world, mpi_comm_oce, code)
294         IF( code /= MPI_SUCCESS ) THEN
295            DO ji = 1, SIZE(ldtxt)
296               IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
297            END DO
298            WRITE(*, cform_err)
299            WRITE(*, *) ' lib_mpp: Error in routine mpi_comm_dup'
300            CALL mpi_abort( mpi_comm_world, code, ierr )
301         ENDIF
302      ENDIF
303
304#if defined key_agrif
305      IF( Agrif_Root() ) THEN
306         CALL Agrif_MPI_Init(mpi_comm_oce)
307      ELSE
308         CALL Agrif_MPI_set_grid_comm(mpi_comm_oce)
309      ENDIF
310#endif
311
312      CALL mpi_comm_rank( mpi_comm_oce, mpprank, ierr )
313      CALL mpi_comm_size( mpi_comm_oce, mppsize, ierr )
314      mynode = mpprank
315
316      IF( mynode == 0 ) THEN
317         CALL ctl_opn( kumond, TRIM(ldname), 'UNKNOWN', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. , 1 )
318         WRITE(kumond, nammpp)     
319      ENDIF
320      !
321      CALL MPI_OP_CREATE(DDPDD_MPI, .TRUE., MPI_SUMDD, ierr)
322      !
323   END FUNCTION mynode
324
325   !!----------------------------------------------------------------------
326   !!                   ***  routine mpp_lnk_(2,3,4)d  ***
327   !!
328   !!   * Argument : dummy argument use in mpp_lnk_... routines
329   !!                ptab   :   array or pointer of arrays on which the boundary condition is applied
330   !!                cd_nat :   nature of array grid-points
331   !!                psgn   :   sign used across the north fold boundary
332   !!                kfld   :   optional, number of pt3d arrays
333   !!                cd_mpp :   optional, fill the overlap area only
334   !!                pval   :   optional, background value (used at closed boundaries)
335   !!----------------------------------------------------------------------
336   !
337   !                       !==  2D array and array of 2D pointer  ==!
338   !
339#  define DIM_2d
340#     define ROUTINE_LNK           mpp_lnk_2d
341#     include "mpp_lnk_generic.h90"
342#     undef ROUTINE_LNK
343#     define MULTI
344#     define ROUTINE_LNK           mpp_lnk_2d_ptr
345#     include "mpp_lnk_generic.h90"
346#     undef ROUTINE_LNK
347#     undef MULTI
348#  undef DIM_2d
349   !
350   !                       !==  3D array and array of 3D pointer  ==!
351   !
352#  define DIM_3d
353#     define ROUTINE_LNK           mpp_lnk_3d
354#     include "mpp_lnk_generic.h90"
355#     undef ROUTINE_LNK
356#     define MULTI
357#     define ROUTINE_LNK           mpp_lnk_3d_ptr
358#     include "mpp_lnk_generic.h90"
359#     undef ROUTINE_LNK
360#     undef MULTI
361#  undef DIM_3d
362   !
363   !                       !==  4D array and array of 4D pointer  ==!
364   !
365#  define DIM_4d
366#     define ROUTINE_LNK           mpp_lnk_4d
367#     include "mpp_lnk_generic.h90"
368#     undef ROUTINE_LNK
369#     define MULTI
370#     define ROUTINE_LNK           mpp_lnk_4d_ptr
371#     include "mpp_lnk_generic.h90"
372#     undef ROUTINE_LNK
373#     undef MULTI
374#  undef DIM_4d
375
376   !!----------------------------------------------------------------------
377   !!                   ***  routine mpp_nfd_(2,3,4)d  ***
378   !!
379   !!   * Argument : dummy argument use in mpp_nfd_... routines
380   !!                ptab   :   array or pointer of arrays on which the boundary condition is applied
381   !!                cd_nat :   nature of array grid-points
382   !!                psgn   :   sign used across the north fold boundary
383   !!                kfld   :   optional, number of pt3d arrays
384   !!                cd_mpp :   optional, fill the overlap area only
385   !!                pval   :   optional, background value (used at closed boundaries)
386   !!----------------------------------------------------------------------
387   !
388   !                       !==  2D array and array of 2D pointer  ==!
389   !
390#  define DIM_2d
391#     define ROUTINE_NFD           mpp_nfd_2d
392#     include "mpp_nfd_generic.h90"
393#     undef ROUTINE_NFD
394#     define MULTI
395#     define ROUTINE_NFD           mpp_nfd_2d_ptr
396#     include "mpp_nfd_generic.h90"
397#     undef ROUTINE_NFD
398#     undef MULTI
399#  undef DIM_2d
400   !
401   !                       !==  3D array and array of 3D pointer  ==!
402   !
403#  define DIM_3d
404#     define ROUTINE_NFD           mpp_nfd_3d
405#     include "mpp_nfd_generic.h90"
406#     undef ROUTINE_NFD
407#     define MULTI
408#     define ROUTINE_NFD           mpp_nfd_3d_ptr
409#     include "mpp_nfd_generic.h90"
410#     undef ROUTINE_NFD
411#     undef MULTI
412#  undef DIM_3d
413   !
414   !                       !==  4D array and array of 4D pointer  ==!
415   !
416#  define DIM_4d
417#     define ROUTINE_NFD           mpp_nfd_4d
418#     include "mpp_nfd_generic.h90"
419#     undef ROUTINE_NFD
420#     define MULTI
421#     define ROUTINE_NFD           mpp_nfd_4d_ptr
422#     include "mpp_nfd_generic.h90"
423#     undef ROUTINE_NFD
424#     undef MULTI
425#  undef DIM_4d
426
427
428   !!----------------------------------------------------------------------
429   !!                   ***  routine mpp_lnk_bdy_(2,3,4)d  ***
430   !!
431   !!   * Argument : dummy argument use in mpp_lnk_... routines
432   !!                ptab   :   array or pointer of arrays on which the boundary condition is applied
433   !!                cd_nat :   nature of array grid-points
434   !!                psgn   :   sign used across the north fold boundary
435   !!                kb_bdy :   BDY boundary set
436   !!                kfld   :   optional, number of pt3d arrays
437   !!----------------------------------------------------------------------
438   !
439   !                       !==  2D array and array of 2D pointer  ==!
440   !
441#  define DIM_2d
442#     define ROUTINE_BDY           mpp_lnk_bdy_2d
443#     include "mpp_bdy_generic.h90"
444#     undef ROUTINE_BDY
445#  undef DIM_2d
446   !
447   !                       !==  3D array and array of 3D pointer  ==!
448   !
449#  define DIM_3d
450#     define ROUTINE_BDY           mpp_lnk_bdy_3d
451#     include "mpp_bdy_generic.h90"
452#     undef ROUTINE_BDY
453#  undef DIM_3d
454   !
455   !                       !==  4D array and array of 4D pointer  ==!
456   !
457#  define DIM_4d
458#     define ROUTINE_BDY           mpp_lnk_bdy_4d
459#     include "mpp_bdy_generic.h90"
460#     undef ROUTINE_BDY
461#  undef DIM_4d
462
463   !!----------------------------------------------------------------------
464   !!
465   !!   load_array  &   mpp_lnk_2d_9    à generaliser a 3D et 4D
466   
467   
468   !!    mpp_lnk_sum_2d et 3D   ====>>>>>>   à virer du code !!!!
469   
470   
471   !!----------------------------------------------------------------------
472
473
474
475   SUBROUTINE mppsend( ktyp, pmess, kbytes, kdest, md_req )
476      !!----------------------------------------------------------------------
477      !!                  ***  routine mppsend  ***
478      !!
479      !! ** Purpose :   Send messag passing array
480      !!
481      !!----------------------------------------------------------------------
482      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
483      INTEGER , INTENT(in   ) ::   kbytes     ! size of the array pmess
484      INTEGER , INTENT(in   ) ::   kdest      ! receive process number
485      INTEGER , INTENT(in   ) ::   ktyp       ! tag of the message
486      INTEGER , INTENT(in   ) ::   md_req     ! argument for isend
487      !!
488      INTEGER ::   iflag
489      !!----------------------------------------------------------------------
490      !
491      SELECT CASE ( cn_mpi_send )
492      CASE ( 'S' )                ! Standard mpi send (blocking)
493         CALL mpi_send ( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce        , iflag )
494      CASE ( 'B' )                ! Buffer mpi send (blocking)
495         CALL mpi_bsend( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce        , iflag )
496      CASE ( 'I' )                ! Immediate mpi send (non-blocking send)
497         ! be carefull, one more argument here : the mpi request identifier..
498         CALL mpi_isend( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce, md_req, iflag )
499      END SELECT
500      !
501   END SUBROUTINE mppsend
502
503
504   SUBROUTINE mpprecv( ktyp, pmess, kbytes, ksource )
505      !!----------------------------------------------------------------------
506      !!                  ***  routine mpprecv  ***
507      !!
508      !! ** Purpose :   Receive messag passing array
509      !!
510      !!----------------------------------------------------------------------
511      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
512      INTEGER , INTENT(in   ) ::   kbytes     ! suze of the array pmess
513      INTEGER , INTENT(in   ) ::   ktyp       ! Tag of the recevied message
514      INTEGER, OPTIONAL, INTENT(in) :: ksource    ! source process number
515      !!
516      INTEGER :: istatus(mpi_status_size)
517      INTEGER :: iflag
518      INTEGER :: use_source
519      !!----------------------------------------------------------------------
520      !
521      ! If a specific process number has been passed to the receive call,
522      ! use that one. Default is to use mpi_any_source
523      use_source = mpi_any_source
524      IF( PRESENT(ksource) )   use_source = ksource
525      !
526      CALL mpi_recv( pmess, kbytes, mpi_double_precision, use_source, ktyp, mpi_comm_oce, istatus, iflag )
527      !
528   END SUBROUTINE mpprecv
529
530
531   SUBROUTINE mppgather( ptab, kp, pio )
532      !!----------------------------------------------------------------------
533      !!                   ***  routine mppgather  ***
534      !!
535      !! ** Purpose :   Transfert between a local subdomain array and a work
536      !!     array which is distributed following the vertical level.
537      !!
538      !!----------------------------------------------------------------------
539      REAL(wp), DIMENSION(jpi,jpj)      , INTENT(in   ) ::   ptab   ! subdomain input array
540      INTEGER                           , INTENT(in   ) ::   kp     ! record length
541      REAL(wp), DIMENSION(jpi,jpj,jpnij), INTENT(  out) ::   pio    ! subdomain input array
542      !!
543      INTEGER :: itaille, ierror   ! temporary integer
544      !!---------------------------------------------------------------------
545      !
546      itaille = jpi * jpj
547      CALL mpi_gather( ptab, itaille, mpi_double_precision, pio, itaille     ,   &
548         &                            mpi_double_precision, kp , mpi_comm_oce, ierror )
549      !
550   END SUBROUTINE mppgather
551
552
553   SUBROUTINE mppscatter( pio, kp, ptab )
554      !!----------------------------------------------------------------------
555      !!                  ***  routine mppscatter  ***
556      !!
557      !! ** Purpose :   Transfert between awork array which is distributed
558      !!      following the vertical level and the local subdomain array.
559      !!
560      !!----------------------------------------------------------------------
561      REAL(wp), DIMENSION(jpi,jpj,jpnij)  ::   pio    ! output array
562      INTEGER                             ::   kp     ! Tag (not used with MPI
563      REAL(wp), DIMENSION(jpi,jpj)        ::   ptab   ! subdomain array input
564      !!
565      INTEGER :: itaille, ierror   ! temporary integer
566      !!---------------------------------------------------------------------
567      !
568      itaille = jpi * jpj
569      !
570      CALL mpi_scatter( pio, itaille, mpi_double_precision, ptab, itaille     ,   &
571         &                            mpi_double_precision, kp  , mpi_comm_oce, ierror )
572      !
573   END SUBROUTINE mppscatter
574
575 
576   SUBROUTINE mpp_delay_max( cdname, cdelay, pinout, ldlast, kcom )
577      !!----------------------------------------------------------------------
578      !!                   ***  routine mpp_delay_max  ***
579      !!
580      !! ** Purpose :   performed delayed mpp_max, the result is received on next call
581      !!
582      !!----------------------------------------------------------------------
583      CHARACTER(len=*), INTENT(in   )               ::   cdname  ! name of the calling subroutine
584      CHARACTER(len=*), INTENT(in   )               ::   cdelay  ! name (used as id) of the delayed operation
585      REAL(wp),         INTENT(inout), DIMENSION(2) ::   pinout  ! pinout(1): in data, pinout(2): out data
586      LOGICAL,          INTENT(in   )               ::   ldlast  ! true if this is the last time we call this routine
587      INTEGER,          INTENT(in   ), OPTIONAL     ::   kcom
588      !!
589      INTEGER ::   ji
590      INTEGER ::   idvar
591      INTEGER ::   ierror, ilocalcomm
592      !!----------------------------------------------------------------------
593      ilocalcomm = mpi_comm_oce
594      IF( PRESENT(kcom) )   ilocalcomm = kcom
595
596      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ld_glb = .TRUE. )
597
598      idvar = -1
599      DO ji = 1, nbdelay
600         IF( TRIM(cdelay) == trim(c_delaylist(ji)) ) idvar = ji
601      END DO
602      IF ( idvar == -1 )   CALL ctl_stop( 'STOP',' mpp_delay_max : please add a new delayed exchange for '//TRIM(cdname) )
603
604      IF( ndelayid(idvar) == -1 ) THEN        ! first call without restart: get pinout(2) from pinout(1) with a blocking allreduce
605         CALL mpi_allreduce(      pinout(1), pinout(2), 1, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ierror )
606      ELSE IF ( ndelayid(idvar) == 0 ) THEN   ! first call    with restart: get pinout(2) from todelay with a blocking allreduce
607         CALL mpi_allreduce( todelay(idvar), pinout(2), 1, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ierror )
608      ELSE                                    ! from the second call, get value from previous time step
609         IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
610         CALL mpi_wait(ndelayid(idvar), MPI_STATUS_IGNORE, ierror )   ! make sure todelay(idvar) is received
611         IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
612         pinout(2) = todelay(idvar)           ! send back pinout(2) from todelay(idvar) defined at previous call
613      ENDIF
614
615      IF( ldlast ) THEN      ! last call: put pinout(1) in todelay that will be stored in the restart files
616         todelay(idvar) = pinout(1)
617      ELSE                   ! send pinout(1) to todelay(idvar), to be received at next call
618         CALL mpi_iallreduce( pinout(1), todelay(idvar), 1, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ndelayid(idvar), ierror )
619      ENDIF
620
621   END SUBROUTINE mpp_delay_max
622   
623   !!----------------------------------------------------------------------
624   !!    ***  mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real  ***
625   !!   
626   !!----------------------------------------------------------------------
627   !!
628#  define OPERATION_MAX
629#  define INTEGER_TYPE
630#  define DIM_0d
631#     define ROUTINE_ALLREDUCE           mppmax_int
632#     include "mpp_allreduce_generic.h90"
633#     undef ROUTINE_ALLREDUCE
634#  undef DIM_0d
635#  define DIM_1d
636#     define ROUTINE_ALLREDUCE           mppmax_a_int
637#     include "mpp_allreduce_generic.h90"
638#     undef ROUTINE_ALLREDUCE
639#  undef DIM_1d
640#  undef INTEGER_TYPE
641!
642#  define REAL_TYPE
643#  define DIM_0d
644#     define ROUTINE_ALLREDUCE           mppmax_real
645#     include "mpp_allreduce_generic.h90"
646#     undef ROUTINE_ALLREDUCE
647#  undef DIM_0d
648#  define DIM_1d
649#     define ROUTINE_ALLREDUCE           mppmax_a_real
650#     include "mpp_allreduce_generic.h90"
651#     undef ROUTINE_ALLREDUCE
652#  undef DIM_1d
653#  undef REAL_TYPE
654#  undef OPERATION_MAX
655   !!----------------------------------------------------------------------
656   !!    ***  mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real  ***
657   !!   
658   !!----------------------------------------------------------------------
659   !!
660#  define OPERATION_MIN
661#  define INTEGER_TYPE
662#  define DIM_0d
663#     define ROUTINE_ALLREDUCE           mppmin_int
664#     include "mpp_allreduce_generic.h90"
665#     undef ROUTINE_ALLREDUCE
666#  undef DIM_0d
667#  define DIM_1d
668#     define ROUTINE_ALLREDUCE           mppmin_a_int
669#     include "mpp_allreduce_generic.h90"
670#     undef ROUTINE_ALLREDUCE
671#  undef DIM_1d
672#  undef INTEGER_TYPE
673!
674#  define REAL_TYPE
675#  define DIM_0d
676#     define ROUTINE_ALLREDUCE           mppmin_real
677#     include "mpp_allreduce_generic.h90"
678#     undef ROUTINE_ALLREDUCE
679#  undef DIM_0d
680#  define DIM_1d
681#     define ROUTINE_ALLREDUCE           mppmin_a_real
682#     include "mpp_allreduce_generic.h90"
683#     undef ROUTINE_ALLREDUCE
684#  undef DIM_1d
685#  undef REAL_TYPE
686#  undef OPERATION_MIN
687
688   !!----------------------------------------------------------------------
689   !!    ***  mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real  ***
690   !!   
691   !!   Global sum of 1D array or a variable (integer, real or complex)
692   !!----------------------------------------------------------------------
693   !!
694#  define OPERATION_SUM
695#  define INTEGER_TYPE
696#  define DIM_0d
697#     define ROUTINE_ALLREDUCE           mppsum_int
698#     include "mpp_allreduce_generic.h90"
699#     undef ROUTINE_ALLREDUCE
700#  undef DIM_0d
701#  define DIM_1d
702#     define ROUTINE_ALLREDUCE           mppsum_a_int
703#     include "mpp_allreduce_generic.h90"
704#     undef ROUTINE_ALLREDUCE
705#  undef DIM_1d
706#  undef INTEGER_TYPE
707!
708#  define REAL_TYPE
709#  define DIM_0d
710#     define ROUTINE_ALLREDUCE           mppsum_real
711#     include "mpp_allreduce_generic.h90"
712#     undef ROUTINE_ALLREDUCE
713#  undef DIM_0d
714#  define DIM_1d
715#     define ROUTINE_ALLREDUCE           mppsum_a_real
716#     include "mpp_allreduce_generic.h90"
717#     undef ROUTINE_ALLREDUCE
718#  undef DIM_1d
719#  undef REAL_TYPE
720#  undef OPERATION_SUM
721
722#  define OPERATION_SUM_DD
723#  define COMPLEX_TYPE
724#  define DIM_0d
725#     define ROUTINE_ALLREDUCE           mppsum_realdd
726#     include "mpp_allreduce_generic.h90"
727#     undef ROUTINE_ALLREDUCE
728#  undef DIM_0d
729#  define DIM_1d
730#     define ROUTINE_ALLREDUCE           mppsum_a_realdd
731#     include "mpp_allreduce_generic.h90"
732#     undef ROUTINE_ALLREDUCE
733#  undef DIM_1d
734#  undef COMPLEX_TYPE
735#  undef OPERATION_SUM_DD
736
737   !!----------------------------------------------------------------------
738   !!    ***  mpp_minloc2d, mpp_minloc3d, mpp_maxloc2d, mpp_maxloc3d
739   !!   
740   !!----------------------------------------------------------------------
741   !!
742#  define OPERATION_MINLOC
743#  define DIM_2d
744#     define ROUTINE_LOC           mpp_minloc2d
745#     include "mpp_loc_generic.h90"
746#     undef ROUTINE_LOC
747#  undef DIM_2d
748#  define DIM_3d
749#     define ROUTINE_LOC           mpp_minloc3d
750#     include "mpp_loc_generic.h90"
751#     undef ROUTINE_LOC
752#  undef DIM_3d
753#  undef OPERATION_MINLOC
754
755#  define OPERATION_MAXLOC
756#  define DIM_2d
757#     define ROUTINE_LOC           mpp_maxloc2d
758#     include "mpp_loc_generic.h90"
759#     undef ROUTINE_LOC
760#  undef DIM_2d
761#  define DIM_3d
762#     define ROUTINE_LOC           mpp_maxloc3d
763#     include "mpp_loc_generic.h90"
764#     undef ROUTINE_LOC
765#  undef DIM_3d
766#  undef OPERATION_MAXLOC
767
768   SUBROUTINE mppsync()
769      !!----------------------------------------------------------------------
770      !!                  ***  routine mppsync  ***
771      !!
772      !! ** Purpose :   Massively parallel processors, synchroneous
773      !!
774      !!-----------------------------------------------------------------------
775      INTEGER :: ierror
776      !!-----------------------------------------------------------------------
777      !
778      CALL mpi_barrier( mpi_comm_oce, ierror )
779      !
780   END SUBROUTINE mppsync
781
782
783   SUBROUTINE mppstop( ldfinal, ld_force_abort ) 
784      !!----------------------------------------------------------------------
785      !!                  ***  routine mppstop  ***
786      !!
787      !! ** purpose :   Stop massively parallel processors method
788      !!
789      !!----------------------------------------------------------------------
790      LOGICAL, OPTIONAL, INTENT(in) :: ldfinal    ! source process number
791      LOGICAL, OPTIONAL, INTENT(in) :: ld_force_abort    ! source process number
792      LOGICAL ::   llfinal, ll_force_abort
793      INTEGER ::   info
794      !!----------------------------------------------------------------------
795      llfinal = .FALSE.
796      IF( PRESENT(ldfinal) ) llfinal = ldfinal
797      ll_force_abort = .FALSE.
798      IF( PRESENT(ld_force_abort) ) ll_force_abort = ld_force_abort
799      !
800      IF(ll_force_abort) THEN
801         CALL mpi_abort( MPI_COMM_WORLD )
802      ELSE
803         CALL mppsync
804         CALL mpi_finalize( info )
805      ENDIF
806      IF( .NOT. llfinal ) STOP 123456
807      !
808   END SUBROUTINE mppstop
809
810
811   SUBROUTINE mpp_comm_free( kcom )
812      !!----------------------------------------------------------------------
813      INTEGER, INTENT(in) ::   kcom
814      !!
815      INTEGER :: ierr
816      !!----------------------------------------------------------------------
817      !
818      CALL MPI_COMM_FREE(kcom, ierr)
819      !
820   END SUBROUTINE mpp_comm_free
821
822
823   SUBROUTINE mpp_ini_znl( kumout )
824      !!----------------------------------------------------------------------
825      !!               ***  routine mpp_ini_znl  ***
826      !!
827      !! ** Purpose :   Initialize special communicator for computing zonal sum
828      !!
829      !! ** Method  : - Look for processors in the same row
830      !!              - Put their number in nrank_znl
831      !!              - Create group for the znl processors
832      !!              - Create a communicator for znl processors
833      !!              - Determine if processor should write znl files
834      !!
835      !! ** output
836      !!      ndim_rank_znl = number of processors on the same row
837      !!      ngrp_znl = group ID for the znl processors
838      !!      ncomm_znl = communicator for the ice procs.
839      !!      n_znl_root = number (in the world) of proc 0 in the ice comm.
840      !!
841      !!----------------------------------------------------------------------
842      INTEGER, INTENT(in) ::   kumout   ! ocean.output logical units
843      !
844      INTEGER :: jproc      ! dummy loop integer
845      INTEGER :: ierr, ii   ! local integer
846      INTEGER, ALLOCATABLE, DIMENSION(:) ::   kwork
847      !!----------------------------------------------------------------------
848      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_world     : ', ngrp_world
849      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_world : ', mpi_comm_world
850      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_oce   : ', mpi_comm_oce
851      !
852      ALLOCATE( kwork(jpnij), STAT=ierr )
853      IF( ierr /= 0 ) THEN
854         WRITE(kumout, cform_err)
855         WRITE(kumout,*) 'mpp_ini_znl : failed to allocate 1D array of length jpnij'
856         CALL mppstop
857      ENDIF
858
859      IF( jpnj == 1 ) THEN
860         ngrp_znl  = ngrp_world
861         ncomm_znl = mpi_comm_oce
862      ELSE
863         !
864         CALL MPI_ALLGATHER ( njmpp, 1, mpi_integer, kwork, 1, mpi_integer, mpi_comm_oce, ierr )
865         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - kwork pour njmpp : ', kwork
866         !-$$        CALL flush(numout)
867         !
868         ! Count number of processors on the same row
869         ndim_rank_znl = 0
870         DO jproc=1,jpnij
871            IF ( kwork(jproc) == njmpp ) THEN
872               ndim_rank_znl = ndim_rank_znl + 1
873            ENDIF
874         END DO
875         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ndim_rank_znl : ', ndim_rank_znl
876         !-$$        CALL flush(numout)
877         ! Allocate the right size to nrank_znl
878         IF (ALLOCATED (nrank_znl)) DEALLOCATE(nrank_znl)
879         ALLOCATE(nrank_znl(ndim_rank_znl))
880         ii = 0
881         nrank_znl (:) = 0
882         DO jproc=1,jpnij
883            IF ( kwork(jproc) == njmpp) THEN
884               ii = ii + 1
885               nrank_znl(ii) = jproc -1
886            ENDIF
887         END DO
888         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - nrank_znl : ', nrank_znl
889         !-$$        CALL flush(numout)
890
891         ! Create the opa group
892         CALL MPI_COMM_GROUP(mpi_comm_oce,ngrp_opa,ierr)
893         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_opa : ', ngrp_opa
894         !-$$        CALL flush(numout)
895
896         ! Create the znl group from the opa group
897         CALL MPI_GROUP_INCL  ( ngrp_opa, ndim_rank_znl, nrank_znl, ngrp_znl, ierr )
898         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_znl ', ngrp_znl
899         !-$$        CALL flush(numout)
900
901         ! Create the znl communicator from the opa communicator, ie the pool of procs in the same row
902         CALL MPI_COMM_CREATE ( mpi_comm_oce, ngrp_znl, ncomm_znl, ierr )
903         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ncomm_znl ', ncomm_znl
904         !-$$        CALL flush(numout)
905         !
906      END IF
907
908      ! Determines if processor if the first (starting from i=1) on the row
909      IF ( jpni == 1 ) THEN
910         l_znl_root = .TRUE.
911      ELSE
912         l_znl_root = .FALSE.
913         kwork (1) = nimpp
914         CALL mpp_min ( 'lib_mpp', kwork(1), kcom = ncomm_znl)
915         IF ( nimpp == kwork(1)) l_znl_root = .TRUE.
916      END IF
917
918      DEALLOCATE(kwork)
919
920   END SUBROUTINE mpp_ini_znl
921
922
923   SUBROUTINE mpp_ini_north
924      !!----------------------------------------------------------------------
925      !!               ***  routine mpp_ini_north  ***
926      !!
927      !! ** Purpose :   Initialize special communicator for north folding
928      !!      condition together with global variables needed in the mpp folding
929      !!
930      !! ** Method  : - Look for northern processors
931      !!              - Put their number in nrank_north
932      !!              - Create groups for the world processors and the north processors
933      !!              - Create a communicator for northern processors
934      !!
935      !! ** output
936      !!      njmppmax = njmpp for northern procs
937      !!      ndim_rank_north = number of processors in the northern line
938      !!      nrank_north (ndim_rank_north) = number  of the northern procs.
939      !!      ngrp_world = group ID for the world processors
940      !!      ngrp_north = group ID for the northern processors
941      !!      ncomm_north = communicator for the northern procs.
942      !!      north_root = number (in the world) of proc 0 in the northern comm.
943      !!
944      !!----------------------------------------------------------------------
945      INTEGER ::   ierr
946      INTEGER ::   jjproc
947      INTEGER ::   ii, ji
948      !!----------------------------------------------------------------------
949      !
950      njmppmax = MAXVAL( njmppt )
951      !
952      ! Look for how many procs on the northern boundary
953      ndim_rank_north = 0
954      DO jjproc = 1, jpnij
955         IF( njmppt(jjproc) == njmppmax )   ndim_rank_north = ndim_rank_north + 1
956      END DO
957      !
958      ! Allocate the right size to nrank_north
959      IF (ALLOCATED (nrank_north)) DEALLOCATE(nrank_north)
960      ALLOCATE( nrank_north(ndim_rank_north) )
961
962      ! Fill the nrank_north array with proc. number of northern procs.
963      ! Note : the rank start at 0 in MPI
964      ii = 0
965      DO ji = 1, jpnij
966         IF ( njmppt(ji) == njmppmax   ) THEN
967            ii=ii+1
968            nrank_north(ii)=ji-1
969         END IF
970      END DO
971      !
972      ! create the world group
973      CALL MPI_COMM_GROUP( mpi_comm_oce, ngrp_world, ierr )
974      !
975      ! Create the North group from the world group
976      CALL MPI_GROUP_INCL( ngrp_world, ndim_rank_north, nrank_north, ngrp_north, ierr )
977      !
978      ! Create the North communicator , ie the pool of procs in the north group
979      CALL MPI_COMM_CREATE( mpi_comm_oce, ngrp_north, ncomm_north, ierr )
980      !
981   END SUBROUTINE mpp_ini_north
982
983
984   SUBROUTINE mpi_init_oce( ldtxt, ksft, code )
985      !!---------------------------------------------------------------------
986      !!                   ***  routine mpp_init.opa  ***
987      !!
988      !! ** Purpose :: export and attach a MPI buffer for bsend
989      !!
990      !! ** Method  :: define buffer size in namelist, if 0 no buffer attachment
991      !!            but classical mpi_init
992      !!
993      !! History :: 01/11 :: IDRIS initial version for IBM only
994      !!            08/04 :: R. Benshila, generalisation
995      !!---------------------------------------------------------------------
996      CHARACTER(len=*),DIMENSION(:), INTENT(  out) ::   ldtxt
997      INTEGER                      , INTENT(inout) ::   ksft
998      INTEGER                      , INTENT(  out) ::   code
999      INTEGER                                      ::   ierr, ji
1000      LOGICAL                                      ::   mpi_was_called
1001      !!---------------------------------------------------------------------
1002      !
1003      CALL mpi_initialized( mpi_was_called, code )      ! MPI initialization
1004      IF ( code /= MPI_SUCCESS ) THEN
1005         DO ji = 1, SIZE(ldtxt)
1006            IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
1007         END DO
1008         WRITE(*, cform_err)
1009         WRITE(*, *) ' lib_mpp: Error in routine mpi_initialized'
1010         CALL mpi_abort( mpi_comm_world, code, ierr )
1011      ENDIF
1012      !
1013      IF( .NOT. mpi_was_called ) THEN
1014         CALL mpi_init( code )
1015         CALL mpi_comm_dup( mpi_comm_world, mpi_comm_oce, code )
1016         IF ( code /= MPI_SUCCESS ) THEN
1017            DO ji = 1, SIZE(ldtxt)
1018               IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
1019            END DO
1020            WRITE(*, cform_err)
1021            WRITE(*, *) ' lib_mpp: Error in routine mpi_comm_dup'
1022            CALL mpi_abort( mpi_comm_world, code, ierr )
1023         ENDIF
1024      ENDIF
1025      !
1026      IF( nn_buffer > 0 ) THEN
1027         WRITE(ldtxt(ksft),*) 'mpi_bsend, buffer allocation of  : ', nn_buffer   ;   ksft = ksft + 1
1028         ! Buffer allocation and attachment
1029         ALLOCATE( tampon(nn_buffer), stat = ierr )
1030         IF( ierr /= 0 ) THEN
1031            DO ji = 1, SIZE(ldtxt)
1032               IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
1033            END DO
1034            WRITE(*, cform_err)
1035            WRITE(*, *) ' lib_mpp: Error in ALLOCATE', ierr
1036            CALL mpi_abort( mpi_comm_world, code, ierr )
1037         END IF
1038         CALL mpi_buffer_attach( tampon, nn_buffer, code )
1039      ENDIF
1040      !
1041   END SUBROUTINE mpi_init_oce
1042
1043
1044   SUBROUTINE DDPDD_MPI( ydda, yddb, ilen, itype )
1045      !!---------------------------------------------------------------------
1046      !!   Routine DDPDD_MPI: used by reduction operator MPI_SUMDD
1047      !!
1048      !!   Modification of original codes written by David H. Bailey
1049      !!   This subroutine computes yddb(i) = ydda(i)+yddb(i)
1050      !!---------------------------------------------------------------------
1051      INTEGER                     , INTENT(in)    ::   ilen, itype
1052      COMPLEX(wp), DIMENSION(ilen), INTENT(in)    ::   ydda
1053      COMPLEX(wp), DIMENSION(ilen), INTENT(inout) ::   yddb
1054      !
1055      REAL(wp) :: zerr, zt1, zt2    ! local work variables
1056      INTEGER  :: ji, ztmp           ! local scalar
1057      !!---------------------------------------------------------------------
1058      !
1059      ztmp = itype   ! avoid compilation warning
1060      !
1061      DO ji=1,ilen
1062      ! Compute ydda + yddb using Knuth's trick.
1063         zt1  = real(ydda(ji)) + real(yddb(ji))
1064         zerr = zt1 - real(ydda(ji))
1065         zt2  = ((real(yddb(ji)) - zerr) + (real(ydda(ji)) - (zt1 - zerr))) &
1066                + aimag(ydda(ji)) + aimag(yddb(ji))
1067
1068         ! The result is zt1 + zt2, after normalization.
1069         yddb(ji) = cmplx ( zt1 + zt2, zt2 - ((zt1 + zt2) - zt1),wp )
1070      END DO
1071      !
1072   END SUBROUTINE DDPDD_MPI
1073
1074
1075   SUBROUTINE mpp_lbc_north_icb( pt2d, cd_type, psgn, kextj)
1076      !!---------------------------------------------------------------------
1077      !!                   ***  routine mpp_lbc_north_icb  ***
1078      !!
1079      !! ** Purpose :   Ensure proper north fold horizontal bondary condition
1080      !!              in mpp configuration in case of jpn1 > 1 and for 2d
1081      !!              array with outer extra halo
1082      !!
1083      !! ** Method  :   North fold condition and mpp with more than one proc
1084      !!              in i-direction require a specific treatment. We gather
1085      !!              the 4+kextj northern lines of the global domain on 1
1086      !!              processor and apply lbc north-fold on this sub array.
1087      !!              Then we scatter the north fold array back to the processors.
1088      !!              This routine accounts for an extra halo with icebergs
1089      !!              and assumes ghost rows and columns have been suppressed.
1090      !!
1091      !!----------------------------------------------------------------------
1092      REAL(wp), DIMENSION(:,:), INTENT(inout) ::   pt2d     ! 2D array with extra halo
1093      CHARACTER(len=1)        , INTENT(in   ) ::   cd_type  ! nature of pt3d grid-points
1094      !                                                     !   = T ,  U , V , F or W -points
1095      REAL(wp)                , INTENT(in   ) ::   psgn     ! = -1. the sign change across the
1096      !!                                                    ! north fold, =  1. otherwise
1097      INTEGER                 , INTENT(in   ) ::   kextj    ! Extra halo width at north fold
1098      !
1099      INTEGER ::   ji, jj, jr
1100      INTEGER ::   ierr, itaille, ildi, ilei, iilb
1101      INTEGER ::   ipj, ij, iproc
1102      !
1103      REAL(wp), DIMENSION(:,:)  , ALLOCATABLE  ::  ztab_e, znorthloc_e
1104      REAL(wp), DIMENSION(:,:,:), ALLOCATABLE  ::  znorthgloio_e
1105      !!----------------------------------------------------------------------
1106      !
1107      ipj=4
1108      ALLOCATE(        ztab_e(jpiglo, 1-kextj:ipj+kextj)       ,       &
1109     &            znorthloc_e(jpimax, 1-kextj:ipj+kextj)       ,       &
1110     &          znorthgloio_e(jpimax, 1-kextj:ipj+kextj,jpni)    )
1111      !
1112      ztab_e(:,:)      = 0._wp
1113      znorthloc_e(:,:) = 0._wp
1114      !
1115      ij = 1 - kextj
1116      ! put the last ipj+2*kextj lines of pt2d into znorthloc_e
1117      DO jj = jpj - ipj + 1 - kextj , jpj + kextj
1118         znorthloc_e(1:jpi,ij)=pt2d(1:jpi,jj)
1119         ij = ij + 1
1120      END DO
1121      !
1122      itaille = jpimax * ( ipj + 2*kextj )
1123      !
1124      IF( ln_timing ) CALL tic_tac(.TRUE.)
1125      CALL MPI_ALLGATHER( znorthloc_e(1,1-kextj)    , itaille, MPI_DOUBLE_PRECISION,    &
1126         &                znorthgloio_e(1,1-kextj,1), itaille, MPI_DOUBLE_PRECISION,    &
1127         &                ncomm_north, ierr )
1128      !
1129      IF( ln_timing ) CALL tic_tac(.FALSE.)
1130      !
1131      DO jr = 1, ndim_rank_north            ! recover the global north array
1132         iproc = nrank_north(jr) + 1
1133         ildi = nldit (iproc)
1134         ilei = nleit (iproc)
1135         iilb = nimppt(iproc)
1136         DO jj = 1-kextj, ipj+kextj
1137            DO ji = ildi, ilei
1138               ztab_e(ji+iilb-1,jj) = znorthgloio_e(ji,jj,jr)
1139            END DO
1140         END DO
1141      END DO
1142
1143      ! 2. North-Fold boundary conditions
1144      ! ----------------------------------
1145      CALL lbc_nfd( ztab_e(:,1-kextj:ipj+kextj), cd_type, psgn, kextj )
1146
1147      ij = 1 - kextj
1148      !! Scatter back to pt2d
1149      DO jj = jpj - ipj + 1 - kextj , jpj + kextj
1150         DO ji= 1, jpi
1151            pt2d(ji,jj) = ztab_e(ji+nimpp-1,ij)
1152         END DO
1153         ij  = ij +1
1154      END DO
1155      !
1156      DEALLOCATE( ztab_e, znorthloc_e, znorthgloio_e )
1157      !
1158   END SUBROUTINE mpp_lbc_north_icb
1159
1160
1161   SUBROUTINE mpp_lnk_2d_icb( cdname, pt2d, cd_type, psgn, kexti, kextj )
1162      !!----------------------------------------------------------------------
1163      !!                  ***  routine mpp_lnk_2d_icb  ***
1164      !!
1165      !! ** Purpose :   Message passing management for 2d array (with extra halo for icebergs)
1166      !!                This routine receives a (1-kexti:jpi+kexti,1-kexti:jpj+kextj)
1167      !!                array (usually (0:jpi+1, 0:jpj+1)) from lbc_lnk_icb calls.
1168      !!
1169      !! ** Method  :   Use mppsend and mpprecv function for passing mask
1170      !!      between processors following neighboring subdomains.
1171      !!            domain parameters
1172      !!                    jpi    : first dimension of the local subdomain
1173      !!                    jpj    : second dimension of the local subdomain
1174      !!                    kexti  : number of columns for extra outer halo
1175      !!                    kextj  : number of rows for extra outer halo
1176      !!                    nbondi : mark for "east-west local boundary"
1177      !!                    nbondj : mark for "north-south local boundary"
1178      !!                    noea   : number for local neighboring processors
1179      !!                    nowe   : number for local neighboring processors
1180      !!                    noso   : number for local neighboring processors
1181      !!                    nono   : number for local neighboring processors
1182      !!----------------------------------------------------------------------
1183      CHARACTER(len=*)                                        , INTENT(in   ) ::   cdname      ! name of the calling subroutine
1184      REAL(wp), DIMENSION(1-kexti:jpi+kexti,1-kextj:jpj+kextj), INTENT(inout) ::   pt2d     ! 2D array with extra halo
1185      CHARACTER(len=1)                                        , INTENT(in   ) ::   cd_type  ! nature of ptab array grid-points
1186      REAL(wp)                                                , INTENT(in   ) ::   psgn     ! sign used across the north fold
1187      INTEGER                                                 , INTENT(in   ) ::   kexti    ! extra i-halo width
1188      INTEGER                                                 , INTENT(in   ) ::   kextj    ! extra j-halo width
1189      !
1190      INTEGER  ::   jl   ! dummy loop indices
1191      INTEGER  ::   imigr, iihom, ijhom        ! local integers
1192      INTEGER  ::   ipreci, iprecj             !   -       -
1193      INTEGER  ::   ml_req1, ml_req2, ml_err   ! for key_mpi_isend
1194      INTEGER, DIMENSION(MPI_STATUS_SIZE) ::   ml_stat   ! for key_mpi_isend
1195      !!
1196      REAL(wp), DIMENSION(1-kexti:jpi+kexti,nn_hls+kextj,2) ::   r2dns, r2dsn
1197      REAL(wp), DIMENSION(1-kextj:jpj+kextj,nn_hls+kexti,2) ::   r2dwe, r2dew
1198      !!----------------------------------------------------------------------
1199
1200      ipreci = nn_hls + kexti      ! take into account outer extra 2D overlap area
1201      iprecj = nn_hls + kextj
1202
1203      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, 1, 1, 1, ld_lbc = .TRUE. )
1204
1205      ! 1. standard boundary treatment
1206      ! ------------------------------
1207      ! Order matters Here !!!!
1208      !
1209      !                                      ! East-West boundaries
1210      !                                           !* Cyclic east-west
1211      IF( l_Iperio ) THEN
1212         pt2d(1-kexti:     1   ,:) = pt2d(jpim1-kexti: jpim1 ,:)       ! east
1213         pt2d(  jpi  :jpi+kexti,:) = pt2d(     2     :2+kexti,:)       ! west
1214         !
1215      ELSE                                        !* closed
1216         IF( .NOT. cd_type == 'F' )   pt2d(  1-kexti   :nn_hls   ,:) = 0._wp    ! east except at F-point
1217                                      pt2d(jpi-nn_hls+1:jpi+kexti,:) = 0._wp    ! west
1218      ENDIF
1219      !                                      ! North-South boundaries
1220      IF( l_Jperio ) THEN                         !* cyclic (only with no mpp j-split)
1221         pt2d(:,1-kextj:     1   ) = pt2d(:,jpjm1-kextj:  jpjm1)       ! north
1222         pt2d(:,  jpj  :jpj+kextj) = pt2d(:,     2     :2+kextj)       ! south
1223      ELSE                                        !* closed
1224         IF( .NOT. cd_type == 'F' )   pt2d(:,  1-kextj   :nn_hls   ) = 0._wp    ! north except at F-point
1225                                      pt2d(:,jpj-nn_hls+1:jpj+kextj) = 0._wp    ! south
1226      ENDIF
1227      !
1228
1229      ! north fold treatment
1230      ! -----------------------
1231      IF( npolj /= 0 ) THEN
1232         !
1233         SELECT CASE ( jpni )
1234                   CASE ( 1 )     ;   CALL lbc_nfd          ( pt2d(1:jpi,1:jpj+kextj), cd_type, psgn, kextj )
1235                   CASE DEFAULT   ;   CALL mpp_lbc_north_icb( pt2d(1:jpi,1:jpj+kextj), cd_type, psgn, kextj )
1236         END SELECT
1237         !
1238      ENDIF
1239
1240      ! 2. East and west directions exchange
1241      ! ------------------------------------
1242      ! we play with the neigbours AND the row number because of the periodicity
1243      !
1244      SELECT CASE ( nbondi )      ! Read Dirichlet lateral conditions
1245      CASE ( -1, 0, 1 )                ! all exept 2 (i.e. close case)
1246         iihom = jpi-nreci-kexti
1247         DO jl = 1, ipreci
1248            r2dew(:,jl,1) = pt2d(nn_hls+jl,:)
1249            r2dwe(:,jl,1) = pt2d(iihom +jl,:)
1250         END DO
1251      END SELECT
1252      !
1253      !                           ! Migrations
1254      imigr = ipreci * ( jpj + 2*kextj )
1255      !
1256      IF( ln_timing ) CALL tic_tac(.TRUE.)
1257      !
1258      SELECT CASE ( nbondi )
1259      CASE ( -1 )
1260         CALL mppsend( 2, r2dwe(1-kextj,1,1), imigr, noea, ml_req1 )
1261         CALL mpprecv( 1, r2dew(1-kextj,1,2), imigr, noea )
1262         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1263      CASE ( 0 )
1264         CALL mppsend( 1, r2dew(1-kextj,1,1), imigr, nowe, ml_req1 )
1265         CALL mppsend( 2, r2dwe(1-kextj,1,1), imigr, noea, ml_req2 )
1266         CALL mpprecv( 1, r2dew(1-kextj,1,2), imigr, noea )
1267         CALL mpprecv( 2, r2dwe(1-kextj,1,2), imigr, nowe )
1268         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1269         IF(l_isend) CALL mpi_wait(ml_req2,ml_stat,ml_err)
1270      CASE ( 1 )
1271         CALL mppsend( 1, r2dew(1-kextj,1,1), imigr, nowe, ml_req1 )
1272         CALL mpprecv( 2, r2dwe(1-kextj,1,2), imigr, nowe )
1273         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1274      END SELECT
1275      !
1276      IF( ln_timing ) CALL tic_tac(.FALSE.)
1277      !
1278      !                           ! Write Dirichlet lateral conditions
1279      iihom = jpi - nn_hls
1280      !
1281      SELECT CASE ( nbondi )
1282      CASE ( -1 )
1283         DO jl = 1, ipreci
1284            pt2d(iihom+jl,:) = r2dew(:,jl,2)
1285         END DO
1286      CASE ( 0 )
1287         DO jl = 1, ipreci
1288            pt2d(jl-kexti,:) = r2dwe(:,jl,2)
1289            pt2d(iihom+jl,:) = r2dew(:,jl,2)
1290         END DO
1291      CASE ( 1 )
1292         DO jl = 1, ipreci
1293            pt2d(jl-kexti,:) = r2dwe(:,jl,2)
1294         END DO
1295      END SELECT
1296
1297
1298      ! 3. North and south directions
1299      ! -----------------------------
1300      ! always closed : we play only with the neigbours
1301      !
1302      IF( nbondj /= 2 ) THEN      ! Read Dirichlet lateral conditions
1303         ijhom = jpj-nrecj-kextj
1304         DO jl = 1, iprecj
1305            r2dsn(:,jl,1) = pt2d(:,ijhom +jl)
1306            r2dns(:,jl,1) = pt2d(:,nn_hls+jl)
1307         END DO
1308      ENDIF
1309      !
1310      !                           ! Migrations
1311      imigr = iprecj * ( jpi + 2*kexti )
1312      !
1313      IF( ln_timing ) CALL tic_tac(.TRUE.)
1314      !
1315      SELECT CASE ( nbondj )
1316      CASE ( -1 )
1317         CALL mppsend( 4, r2dsn(1-kexti,1,1), imigr, nono, ml_req1 )
1318         CALL mpprecv( 3, r2dns(1-kexti,1,2), imigr, nono )
1319         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1320      CASE ( 0 )
1321         CALL mppsend( 3, r2dns(1-kexti,1,1), imigr, noso, ml_req1 )
1322         CALL mppsend( 4, r2dsn(1-kexti,1,1), imigr, nono, ml_req2 )
1323         CALL mpprecv( 3, r2dns(1-kexti,1,2), imigr, nono )
1324         CALL mpprecv( 4, r2dsn(1-kexti,1,2), imigr, noso )
1325         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1326         IF(l_isend) CALL mpi_wait(ml_req2,ml_stat,ml_err)
1327      CASE ( 1 )
1328         CALL mppsend( 3, r2dns(1-kexti,1,1), imigr, noso, ml_req1 )
1329         CALL mpprecv( 4, r2dsn(1-kexti,1,2), imigr, noso )
1330         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1331      END SELECT
1332      !
1333      IF( ln_timing ) CALL tic_tac(.FALSE.)
1334      !
1335      !                           ! Write Dirichlet lateral conditions
1336      ijhom = jpj - nn_hls
1337      !
1338      SELECT CASE ( nbondj )
1339      CASE ( -1 )
1340         DO jl = 1, iprecj
1341            pt2d(:,ijhom+jl) = r2dns(:,jl,2)
1342         END DO
1343      CASE ( 0 )
1344         DO jl = 1, iprecj
1345            pt2d(:,jl-kextj) = r2dsn(:,jl,2)
1346            pt2d(:,ijhom+jl) = r2dns(:,jl,2)
1347         END DO
1348      CASE ( 1 )
1349         DO jl = 1, iprecj
1350            pt2d(:,jl-kextj) = r2dsn(:,jl,2)
1351         END DO
1352      END SELECT
1353      !
1354   END SUBROUTINE mpp_lnk_2d_icb
1355
1356
1357   SUBROUTINE mpp_report( cdname, kpk, kpl, kpf, ld_lbc, ld_glb )
1358      !!----------------------------------------------------------------------
1359      !!                  ***  routine mpp_report  ***
1360      !!
1361      !! ** Purpose :   report use of mpp routines per time-setp
1362      !!
1363      !!----------------------------------------------------------------------
1364      CHARACTER(len=*),           INTENT(in   ) ::   cdname      ! name of the calling subroutine
1365      INTEGER         , OPTIONAL, INTENT(in   ) ::   kpk, kpl, kpf
1366      LOGICAL         , OPTIONAL, INTENT(in   ) ::   ld_lbc, ld_glb
1367      !!
1368      LOGICAL ::   ll_lbc, ll_glb
1369      INTEGER ::    ji,  jj,  jk,  jh, jf   ! dummy loop indices
1370      !!----------------------------------------------------------------------
1371      !
1372      ll_lbc = .FALSE.
1373      IF( PRESENT(ld_lbc) ) ll_lbc = ld_lbc
1374      ll_glb = .FALSE.
1375      IF( PRESENT(ld_glb) ) ll_glb = ld_glb
1376      !
1377      ! find the smallest common frequency: default = frequency product, if multiple, choose the larger of the 2 frequency
1378      ncom_freq = ncom_fsbc * ncom_dttrc
1379      IF( MOD( MAX(ncom_fsbc,ncom_dttrc), MIN(ncom_fsbc,ncom_dttrc) ) == 0 ) ncom_freq = MAX(ncom_fsbc,ncom_dttrc)
1380      !
1381      IF ( ncom_stp == nit000+ncom_freq ) THEN   ! avoid to count extra communications in potential initializations at nit000
1382         IF( ll_lbc ) THEN
1383            IF( .NOT. ALLOCATED(ncomm_sequence) ) ALLOCATE( ncomm_sequence(ncom_rec_max,2) )
1384            IF( .NOT. ALLOCATED(    crname_lbc) ) ALLOCATE(     crname_lbc(ncom_rec_max  ) )
1385            n_sequence_lbc = n_sequence_lbc + 1
1386            IF( n_sequence_lbc > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lnk_generic, increase ncom_rec_max' )   ! deadlock
1387            crname_lbc(n_sequence_lbc) = cdname     ! keep the name of the calling routine
1388            ncomm_sequence(n_sequence_lbc,1) = kpk*kpl   ! size of 3rd and 4th dimensions
1389            ncomm_sequence(n_sequence_lbc,2) = kpf       ! number of arrays to be treated (multi)
1390         ENDIF
1391         IF( ll_glb ) THEN
1392            IF( .NOT. ALLOCATED(crname_glb) ) ALLOCATE( crname_glb(ncom_rec_max) )
1393            n_sequence_glb = n_sequence_glb + 1
1394            IF( n_sequence_glb > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lnk_generic, increase ncom_rec_max' )   ! deadlock
1395            crname_glb(n_sequence_glb) = cdname     ! keep the name of the calling routine
1396         ENDIF
1397      ELSE IF ( ncom_stp == nit000+2*ncom_freq ) THEN
1398         CALL ctl_opn( numcom, 'communication_report.txt', 'REPLACE', 'FORMATTED', 'SEQUENTIAL', -1, numout, .FALSE., narea )
1399         WRITE(numcom,*) ' '
1400         WRITE(numcom,*) ' ------------------------------------------------------------'
1401         WRITE(numcom,*) ' Communication pattern report (second oce+sbc+top time step):'
1402         WRITE(numcom,*) ' ------------------------------------------------------------'
1403         WRITE(numcom,*) ' '
1404         WRITE(numcom,'(A,I4)') ' Exchanged halos : ', n_sequence_lbc
1405         jj = 0; jk = 0; jf = 0; jh = 0
1406         DO ji = 1, n_sequence_lbc
1407            IF ( ncomm_sequence(ji,1) .GT. 1 ) jk = jk + 1
1408            IF ( ncomm_sequence(ji,2) .GT. 1 ) jf = jf + 1
1409            IF ( ncomm_sequence(ji,1) .GT. 1 .AND. ncomm_sequence(ji,2) .GT. 1 ) jj = jj + 1
1410            jh = MAX (jh, ncomm_sequence(ji,1)*ncomm_sequence(ji,2))
1411         END DO
1412         WRITE(numcom,'(A,I3)') ' 3D Exchanged halos : ', jk
1413         WRITE(numcom,'(A,I3)') ' Multi arrays exchanged halos : ', jf
1414         WRITE(numcom,'(A,I3)') '   from which 3D : ', jj
1415         WRITE(numcom,'(A,I10)') ' Array max size : ', jh*jpi*jpj
1416         WRITE(numcom,*) ' '
1417         WRITE(numcom,*) ' lbc_lnk called'
1418         jj = 1
1419         DO ji = 2, n_sequence_lbc
1420            IF( crname_lbc(ji-1) /= crname_lbc(ji) ) THEN
1421               WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_lbc(ji-1))
1422               jj = 0
1423            END IF
1424            jj = jj + 1 
1425         END DO
1426         WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_lbc(n_sequence_lbc))
1427         WRITE(numcom,*) ' '
1428         IF ( n_sequence_glb > 0 ) THEN
1429            WRITE(numcom,'(A,I4)') ' Global communications : ', n_sequence_glb
1430            jj = 1
1431            DO ji = 2, n_sequence_glb
1432               IF( crname_glb(ji-1) /= crname_glb(ji) ) THEN
1433                  WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(ji-1))
1434                  jj = 0
1435               END IF
1436               jj = jj + 1 
1437            END DO
1438            WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(n_sequence_glb))
1439            DEALLOCATE(crname_glb)
1440         ELSE
1441            WRITE(numcom,*) ' No MPI global communication '
1442         ENDIF
1443         WRITE(numcom,*) ' '
1444         WRITE(numcom,*) ' -----------------------------------------------'
1445         WRITE(numcom,*) ' '
1446         DEALLOCATE(ncomm_sequence)
1447         DEALLOCATE(crname_lbc)
1448      ENDIF
1449   END SUBROUTINE mpp_report
1450
1451   
1452   SUBROUTINE tic_tac (ld_tic, ld_global)
1453
1454    LOGICAL,           INTENT(IN) :: ld_tic
1455    LOGICAL, OPTIONAL, INTENT(IN) :: ld_global
1456    REAL(wp), DIMENSION(2), SAVE :: tic_wt
1457    REAL(wp),               SAVE :: tic_ct = 0._wp
1458    INTEGER :: ii
1459
1460    IF( ncom_stp <= nit000 ) RETURN
1461    IF( ncom_stp == nitend ) RETURN
1462    ii = 1
1463    IF( PRESENT( ld_global ) ) THEN
1464       IF( ld_global ) ii = 2
1465    END IF
1466   
1467    IF ( ld_tic ) THEN
1468       tic_wt(ii) = MPI_Wtime()                                                    ! start count tic->tac (waiting time)
1469       IF ( tic_ct > 0.0_wp ) compute_time = compute_time + MPI_Wtime() - tic_ct   ! cumulate count tac->tic
1470    ELSE
1471       waiting_time(ii) = waiting_time(ii) + MPI_Wtime() - tic_wt(ii)              ! cumulate count tic->tac
1472       tic_ct = MPI_Wtime()                                                        ! start count tac->tic (waiting time)
1473    ENDIF
1474   
1475   END SUBROUTINE tic_tac
1476
1477   
1478#else
1479   !!----------------------------------------------------------------------
1480   !!   Default case:            Dummy module        share memory computing
1481   !!----------------------------------------------------------------------
1482   USE in_out_manager
1483
1484   INTERFACE mpp_sum
1485      MODULE PROCEDURE mppsum_int, mppsum_a_int, mppsum_real, mppsum_a_real, mppsum_realdd, mppsum_a_realdd
1486   END INTERFACE
1487   INTERFACE mpp_max
1488      MODULE PROCEDURE mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real
1489   END INTERFACE
1490   INTERFACE mpp_min
1491      MODULE PROCEDURE mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real
1492   END INTERFACE
1493   INTERFACE mpp_minloc
1494      MODULE PROCEDURE mpp_minloc2d ,mpp_minloc3d
1495   END INTERFACE
1496   INTERFACE mpp_maxloc
1497      MODULE PROCEDURE mpp_maxloc2d ,mpp_maxloc3d
1498   END INTERFACE
1499
1500   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .FALSE.      !: mpp flag
1501   LOGICAL, PUBLIC            ::   ln_nnogather          !: namelist control of northfold comms (needed here in case "key_mpp_mpi" is not used)
1502   INTEGER, PUBLIC            ::   mpi_comm_oce          ! opa local communicator
1503
1504   INTEGER, PARAMETER, PUBLIC               ::   nbdelay = 0   ! make sure we don't enter loops: DO ji = 1, nbdelay
1505   CHARACTER(len=32), DIMENSION(1), PUBLIC  ::   c_delaylist = 'empty'
1506   REAL(wp), PUBLIC, DIMENSION(1)           ::   todelay
1507   INTEGER,  PUBLIC, DIMENSION(1)           ::   ndelayid = -1
1508   !!----------------------------------------------------------------------
1509CONTAINS
1510
1511   INTEGER FUNCTION lib_mpp_alloc(kumout)          ! Dummy function
1512      INTEGER, INTENT(in) ::   kumout
1513      lib_mpp_alloc = 0
1514   END FUNCTION lib_mpp_alloc
1515
1516   FUNCTION mynode( ldtxt, ldname, kumnam_ref, knumnam_cfg,  kumond , kstop, localComm ) RESULT (function_value)
1517      INTEGER, OPTIONAL            , INTENT(in   ) ::   localComm
1518      CHARACTER(len=*),DIMENSION(:) ::   ldtxt
1519      CHARACTER(len=*) ::   ldname
1520      INTEGER ::   kumnam_ref, knumnam_cfg , kumond , kstop
1521      IF( PRESENT( localComm ) ) mpi_comm_oce = localComm
1522      function_value = 0
1523      IF( .FALSE. )   ldtxt(:) = 'never done'
1524      CALL ctl_opn( kumond, TRIM(ldname), 'UNKNOWN', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. , 1 )
1525   END FUNCTION mynode
1526
1527   SUBROUTINE mppsync                       ! Dummy routine
1528   END SUBROUTINE mppsync
1529
1530   !!----------------------------------------------------------------------
1531   !!    ***  mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real  ***
1532   !!   
1533   !!----------------------------------------------------------------------
1534   !!
1535#  define OPERATION_MAX
1536#  define INTEGER_TYPE
1537#  define DIM_0d
1538#     define ROUTINE_ALLREDUCE           mppmax_int
1539#     include "mpp_allreduce_generic.h90"
1540#     undef ROUTINE_ALLREDUCE
1541#  undef DIM_0d
1542#  define DIM_1d
1543#     define ROUTINE_ALLREDUCE           mppmax_a_int
1544#     include "mpp_allreduce_generic.h90"
1545#     undef ROUTINE_ALLREDUCE
1546#  undef DIM_1d
1547#  undef INTEGER_TYPE
1548!
1549#  define REAL_TYPE
1550#  define DIM_0d
1551#     define ROUTINE_ALLREDUCE           mppmax_real
1552#     include "mpp_allreduce_generic.h90"
1553#     undef ROUTINE_ALLREDUCE
1554#  undef DIM_0d
1555#  define DIM_1d
1556#     define ROUTINE_ALLREDUCE           mppmax_a_real
1557#     include "mpp_allreduce_generic.h90"
1558#     undef ROUTINE_ALLREDUCE
1559#  undef DIM_1d
1560#  undef REAL_TYPE
1561#  undef OPERATION_MAX
1562   !!----------------------------------------------------------------------
1563   !!    ***  mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real  ***
1564   !!   
1565   !!----------------------------------------------------------------------
1566   !!
1567#  define OPERATION_MIN
1568#  define INTEGER_TYPE
1569#  define DIM_0d
1570#     define ROUTINE_ALLREDUCE           mppmin_int
1571#     include "mpp_allreduce_generic.h90"
1572#     undef ROUTINE_ALLREDUCE
1573#  undef DIM_0d
1574#  define DIM_1d
1575#     define ROUTINE_ALLREDUCE           mppmin_a_int
1576#     include "mpp_allreduce_generic.h90"
1577#     undef ROUTINE_ALLREDUCE
1578#  undef DIM_1d
1579#  undef INTEGER_TYPE
1580!
1581#  define REAL_TYPE
1582#  define DIM_0d
1583#     define ROUTINE_ALLREDUCE           mppmin_real
1584#     include "mpp_allreduce_generic.h90"
1585#     undef ROUTINE_ALLREDUCE
1586#  undef DIM_0d
1587#  define DIM_1d
1588#     define ROUTINE_ALLREDUCE           mppmin_a_real
1589#     include "mpp_allreduce_generic.h90"
1590#     undef ROUTINE_ALLREDUCE
1591#  undef DIM_1d
1592#  undef REAL_TYPE
1593#  undef OPERATION_MIN
1594
1595   !!----------------------------------------------------------------------
1596   !!    ***  mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real  ***
1597   !!   
1598   !!   Global sum of 1D array or a variable (integer, real or complex)
1599   !!----------------------------------------------------------------------
1600   !!
1601#  define OPERATION_SUM
1602#  define INTEGER_TYPE
1603#  define DIM_0d
1604#     define ROUTINE_ALLREDUCE           mppsum_int
1605#     include "mpp_allreduce_generic.h90"
1606#     undef ROUTINE_ALLREDUCE
1607#  undef DIM_0d
1608#  define DIM_1d
1609#     define ROUTINE_ALLREDUCE           mppsum_a_int
1610#     include "mpp_allreduce_generic.h90"
1611#     undef ROUTINE_ALLREDUCE
1612#  undef DIM_1d
1613#  undef INTEGER_TYPE
1614!
1615#  define REAL_TYPE
1616#  define DIM_0d
1617#     define ROUTINE_ALLREDUCE           mppsum_real
1618#     include "mpp_allreduce_generic.h90"
1619#     undef ROUTINE_ALLREDUCE
1620#  undef DIM_0d
1621#  define DIM_1d
1622#     define ROUTINE_ALLREDUCE           mppsum_a_real
1623#     include "mpp_allreduce_generic.h90"
1624#     undef ROUTINE_ALLREDUCE
1625#  undef DIM_1d
1626#  undef REAL_TYPE
1627#  undef OPERATION_SUM
1628
1629#  define OPERATION_SUM_DD
1630#  define COMPLEX_TYPE
1631#  define DIM_0d
1632#     define ROUTINE_ALLREDUCE           mppsum_realdd
1633#     include "mpp_allreduce_generic.h90"
1634#     undef ROUTINE_ALLREDUCE
1635#  undef DIM_0d
1636#  define DIM_1d
1637#     define ROUTINE_ALLREDUCE           mppsum_a_realdd
1638#     include "mpp_allreduce_generic.h90"
1639#     undef ROUTINE_ALLREDUCE
1640#  undef DIM_1d
1641#  undef COMPLEX_TYPE
1642#  undef OPERATION_SUM_DD
1643
1644   !!----------------------------------------------------------------------
1645   !!    ***  mpp_minloc2d, mpp_minloc3d, mpp_maxloc2d, mpp_maxloc3d
1646   !!   
1647   !!----------------------------------------------------------------------
1648   !!
1649#  define OPERATION_MINLOC
1650#  define DIM_2d
1651#     define ROUTINE_LOC           mpp_minloc2d
1652#     include "mpp_loc_generic.h90"
1653#     undef ROUTINE_LOC
1654#  undef DIM_2d
1655#  define DIM_3d
1656#     define ROUTINE_LOC           mpp_minloc3d
1657#     include "mpp_loc_generic.h90"
1658#     undef ROUTINE_LOC
1659#  undef DIM_3d
1660#  undef OPERATION_MINLOC
1661
1662#  define OPERATION_MAXLOC
1663#  define DIM_2d
1664#     define ROUTINE_LOC           mpp_maxloc2d
1665#     include "mpp_loc_generic.h90"
1666#     undef ROUTINE_LOC
1667#  undef DIM_2d
1668#  define DIM_3d
1669#     define ROUTINE_LOC           mpp_maxloc3d
1670#     include "mpp_loc_generic.h90"
1671#     undef ROUTINE_LOC
1672#  undef DIM_3d
1673#  undef OPERATION_MAXLOC
1674
1675   SUBROUTINE mpp_delay_max( cdname, cdelay, pinout, ldlast, kcom )
1676      CHARACTER(len=*), INTENT(in   )               ::   cdname  ! name of the calling subroutine
1677      CHARACTER(len=*), INTENT(in   )               ::   cdelay  ! name (used as id) of the delayed operation
1678      REAL(wp),         INTENT(inout), DIMENSION(2) ::   pinout  ! pinout(1): in data, pinout(2): out data
1679      LOGICAL,          INTENT(in   )               ::   ldlast  ! true if this is the last time we call this routine
1680      INTEGER,          INTENT(in   ), OPTIONAL     ::   kcom
1681      WRITE(*,*) 'mpp_delay_max: You should not have seen this print! error?', cdname
1682   END SUBROUTINE mpp_delay_max
1683
1684   SUBROUTINE mppstop( ldfinal, ld_force_abort )
1685      LOGICAL, OPTIONAL, INTENT(in) :: ldfinal    ! source process number
1686      LOGICAL, OPTIONAL, INTENT(in) :: ld_force_abort    ! source process number
1687      STOP      ! non MPP case, just stop the run
1688   END SUBROUTINE mppstop
1689
1690   SUBROUTINE mpp_ini_znl( knum )
1691      INTEGER :: knum
1692      WRITE(*,*) 'mpp_ini_znl: You should not have seen this print! error?', knum
1693   END SUBROUTINE mpp_ini_znl
1694
1695   SUBROUTINE mpp_comm_free( kcom )
1696      INTEGER :: kcom
1697      WRITE(*,*) 'mpp_comm_free: You should not have seen this print! error?', kcom
1698   END SUBROUTINE mpp_comm_free
1699   
1700#endif
1701
1702   !!----------------------------------------------------------------------
1703   !!   All cases:         ctl_stop, ctl_warn, get_unit, ctl_opn, ctl_nam   routines
1704   !!----------------------------------------------------------------------
1705
1706   SUBROUTINE ctl_stop( cd1, cd2, cd3, cd4, cd5 ,   &
1707      &                 cd6, cd7, cd8, cd9, cd10 )
1708      !!----------------------------------------------------------------------
1709      !!                  ***  ROUTINE  stop_opa  ***
1710      !!
1711      !! ** Purpose :   print in ocean.outpput file a error message and
1712      !!                increment the error number (nstop) by one.
1713      !!----------------------------------------------------------------------
1714      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd1, cd2, cd3, cd4, cd5
1715      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd6, cd7, cd8, cd9, cd10
1716      !!----------------------------------------------------------------------
1717      !
1718      nstop = nstop + 1
1719      IF(lwp) THEN
1720         WRITE(numout,cform_err)
1721         IF( PRESENT(cd1 ) )   WRITE(numout,*) TRIM(cd1)
1722         IF( PRESENT(cd2 ) )   WRITE(numout,*) TRIM(cd2)
1723         IF( PRESENT(cd3 ) )   WRITE(numout,*) TRIM(cd3)
1724         IF( PRESENT(cd4 ) )   WRITE(numout,*) TRIM(cd4)
1725         IF( PRESENT(cd5 ) )   WRITE(numout,*) TRIM(cd5)
1726         IF( PRESENT(cd6 ) )   WRITE(numout,*) TRIM(cd6)
1727         IF( PRESENT(cd7 ) )   WRITE(numout,*) TRIM(cd7)
1728         IF( PRESENT(cd8 ) )   WRITE(numout,*) TRIM(cd8)
1729         IF( PRESENT(cd9 ) )   WRITE(numout,*) TRIM(cd9)
1730         IF( PRESENT(cd10) )   WRITE(numout,*) TRIM(cd10)
1731      ENDIF
1732                               CALL FLUSH(numout    )
1733      IF( numstp     /= -1 )   CALL FLUSH(numstp    )
1734      IF( numrun     /= -1 )   CALL FLUSH(numrun    )
1735      IF( numevo_ice /= -1 )   CALL FLUSH(numevo_ice)
1736      !
1737      IF( cd1 == 'STOP' ) THEN
1738         IF(lwp) WRITE(numout,*)  'huge E-R-R-O-R : immediate stop'
1739         CALL mppstop()
1740      ENDIF
1741      !
1742   END SUBROUTINE ctl_stop
1743
1744
1745   SUBROUTINE ctl_warn( cd1, cd2, cd3, cd4, cd5,   &
1746      &                 cd6, cd7, cd8, cd9, cd10 )
1747      !!----------------------------------------------------------------------
1748      !!                  ***  ROUTINE  stop_warn  ***
1749      !!
1750      !! ** Purpose :   print in ocean.outpput file a error message and
1751      !!                increment the warning number (nwarn) by one.
1752      !!----------------------------------------------------------------------
1753      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd1, cd2, cd3, cd4, cd5
1754      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd6, cd7, cd8, cd9, cd10
1755      !!----------------------------------------------------------------------
1756      !
1757      nwarn = nwarn + 1
1758      IF(lwp) THEN
1759         WRITE(numout,cform_war)
1760         IF( PRESENT(cd1 ) ) WRITE(numout,*) TRIM(cd1)
1761         IF( PRESENT(cd2 ) ) WRITE(numout,*) TRIM(cd2)
1762         IF( PRESENT(cd3 ) ) WRITE(numout,*) TRIM(cd3)
1763         IF( PRESENT(cd4 ) ) WRITE(numout,*) TRIM(cd4)
1764         IF( PRESENT(cd5 ) ) WRITE(numout,*) TRIM(cd5)
1765         IF( PRESENT(cd6 ) ) WRITE(numout,*) TRIM(cd6)
1766         IF( PRESENT(cd7 ) ) WRITE(numout,*) TRIM(cd7)
1767         IF( PRESENT(cd8 ) ) WRITE(numout,*) TRIM(cd8)
1768         IF( PRESENT(cd9 ) ) WRITE(numout,*) TRIM(cd9)
1769         IF( PRESENT(cd10) ) WRITE(numout,*) TRIM(cd10)
1770      ENDIF
1771      CALL FLUSH(numout)
1772      !
1773   END SUBROUTINE ctl_warn
1774
1775
1776   SUBROUTINE ctl_opn( knum, cdfile, cdstat, cdform, cdacce, klengh, kout, ldwp, karea )
1777      !!----------------------------------------------------------------------
1778      !!                  ***  ROUTINE ctl_opn  ***
1779      !!
1780      !! ** Purpose :   Open file and check if required file is available.
1781      !!
1782      !! ** Method  :   Fortan open
1783      !!----------------------------------------------------------------------
1784      INTEGER          , INTENT(  out) ::   knum      ! logical unit to open
1785      CHARACTER(len=*) , INTENT(in   ) ::   cdfile    ! file name to open
1786      CHARACTER(len=*) , INTENT(in   ) ::   cdstat    ! disposition specifier
1787      CHARACTER(len=*) , INTENT(in   ) ::   cdform    ! formatting specifier
1788      CHARACTER(len=*) , INTENT(in   ) ::   cdacce    ! access specifier
1789      INTEGER          , INTENT(in   ) ::   klengh    ! record length
1790      INTEGER          , INTENT(in   ) ::   kout      ! number of logical units for write
1791      LOGICAL          , INTENT(in   ) ::   ldwp      ! boolean term for print
1792      INTEGER, OPTIONAL, INTENT(in   ) ::   karea     ! proc number
1793      !
1794      CHARACTER(len=80) ::   clfile
1795      INTEGER           ::   iost
1796      !!----------------------------------------------------------------------
1797      !
1798      ! adapt filename
1799      ! ----------------
1800      clfile = TRIM(cdfile)
1801      IF( PRESENT( karea ) ) THEN
1802         IF( karea > 1 )   WRITE(clfile, "(a,'_',i4.4)") TRIM(clfile), karea-1
1803      ENDIF
1804#if defined key_agrif
1805      IF( .NOT. Agrif_Root() )   clfile = TRIM(Agrif_CFixed())//'_'//TRIM(clfile)
1806      knum=Agrif_Get_Unit()
1807#else
1808      knum=get_unit()
1809#endif
1810      IF( TRIM(cdfile) == '/dev/null' )   clfile = TRIM(cdfile)   ! force the use of /dev/null
1811      !
1812      iost=0
1813      IF( cdacce(1:6) == 'DIRECT' )  THEN         ! cdacce has always more than 6 characters
1814         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat, RECL=klengh         , ERR=100, IOSTAT=iost )
1815      ELSE IF( TRIM(cdstat) == 'APPEND' )  THEN   ! cdstat can have less than 6 characters
1816         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS='UNKNOWN', POSITION='APPEND', ERR=100, IOSTAT=iost )
1817      ELSE
1818         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )
1819      ENDIF
1820      IF( iost /= 0 .AND. TRIM(clfile) == '/dev/null' ) &   ! for windows
1821         &  OPEN(UNIT=knum,FILE='NUL', FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )   
1822      IF( iost == 0 ) THEN
1823         IF(ldwp) THEN
1824            WRITE(kout,*) '     file   : ', TRIM(clfile),' open ok'
1825            WRITE(kout,*) '     unit   = ', knum
1826            WRITE(kout,*) '     status = ', cdstat
1827            WRITE(kout,*) '     form   = ', cdform
1828            WRITE(kout,*) '     access = ', cdacce
1829            WRITE(kout,*)
1830         ENDIF
1831      ENDIF
1832100   CONTINUE
1833      IF( iost /= 0 ) THEN
1834         IF(ldwp) THEN
1835            WRITE(kout,*)
1836            WRITE(kout,*) ' ===>>>> : bad opening file: ', TRIM(clfile)
1837            WRITE(kout,*) ' =======   ===  '
1838            WRITE(kout,*) '           unit   = ', knum
1839            WRITE(kout,*) '           status = ', cdstat
1840            WRITE(kout,*) '           form   = ', cdform
1841            WRITE(kout,*) '           access = ', cdacce
1842            WRITE(kout,*) '           iostat = ', iost
1843            WRITE(kout,*) '           we stop. verify the file '
1844            WRITE(kout,*)
1845         ELSE  !!! Force writing to make sure we get the information - at least once - in this violent STOP!!
1846            WRITE(*,*)
1847            WRITE(*,*) ' ===>>>> : bad opening file: ', TRIM(clfile)
1848            WRITE(*,*) ' =======   ===  '
1849            WRITE(*,*) '           unit   = ', knum
1850            WRITE(*,*) '           status = ', cdstat
1851            WRITE(*,*) '           form   = ', cdform
1852            WRITE(*,*) '           access = ', cdacce
1853            WRITE(*,*) '           iostat = ', iost
1854            WRITE(*,*) '           we stop. verify the file '
1855            WRITE(*,*)
1856         ENDIF
1857         CALL FLUSH( kout ) 
1858         STOP 'ctl_opn bad opening'
1859      ENDIF
1860      !
1861   END SUBROUTINE ctl_opn
1862
1863
1864   SUBROUTINE ctl_nam ( kios, cdnam, ldwp )
1865      !!----------------------------------------------------------------------
1866      !!                  ***  ROUTINE ctl_nam  ***
1867      !!
1868      !! ** Purpose :   Informations when error while reading a namelist
1869      !!
1870      !! ** Method  :   Fortan open
1871      !!----------------------------------------------------------------------
1872      INTEGER         , INTENT(inout) ::   kios    ! IO status after reading the namelist
1873      CHARACTER(len=*), INTENT(in   ) ::   cdnam   ! group name of namelist for which error occurs
1874      CHARACTER(len=5)                ::   clios   ! string to convert iostat in character for print
1875      LOGICAL         , INTENT(in   ) ::   ldwp    ! boolean term for print
1876      !!----------------------------------------------------------------------
1877      !
1878      WRITE (clios, '(I5.0)')   kios
1879      IF( kios < 0 ) THEN         
1880         CALL ctl_warn( 'end of record or file while reading namelist '   &
1881            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
1882      ENDIF
1883      !
1884      IF( kios > 0 ) THEN
1885         CALL ctl_stop( 'misspelled variable in namelist '   &
1886            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
1887      ENDIF
1888      kios = 0
1889      RETURN
1890      !
1891   END SUBROUTINE ctl_nam
1892
1893
1894   INTEGER FUNCTION get_unit()
1895      !!----------------------------------------------------------------------
1896      !!                  ***  FUNCTION  get_unit  ***
1897      !!
1898      !! ** Purpose :   return the index of an unused logical unit
1899      !!----------------------------------------------------------------------
1900      LOGICAL :: llopn
1901      !!----------------------------------------------------------------------
1902      !
1903      get_unit = 15   ! choose a unit that is big enough then it is not already used in NEMO
1904      llopn = .TRUE.
1905      DO WHILE( (get_unit < 998) .AND. llopn )
1906         get_unit = get_unit + 1
1907         INQUIRE( unit = get_unit, opened = llopn )
1908      END DO
1909      IF( (get_unit == 999) .AND. llopn ) THEN
1910         CALL ctl_stop( 'get_unit: All logical units until 999 are used...' )
1911         get_unit = -1
1912      ENDIF
1913      !
1914   END FUNCTION get_unit
1915
1916   !!----------------------------------------------------------------------
1917END MODULE lib_mpp
Note: See TracBrowser for help on using the repository browser.