New URL for NEMO forge!   http://forge.nemo-ocean.eu

Since March 2022 along with NEMO 4.2 release, the code development moved to a self-hosted GitLab.
This present forge is now archived and remained online for history.
lib_mpp.F90 in utils/tools_dev_r12970_AGRIF_CMEMS/DOMAINcfg/src – NEMO

source: utils/tools_dev_r12970_AGRIF_CMEMS/DOMAINcfg/src/lib_mpp.F90 @ 13056

Last change on this file since 13056 was 13056, checked in by rblod, 4 years ago

ticket #2129 : cleaning domcfg

File size: 88.1 KB
Line 
1MODULE lib_mpp
2   !!======================================================================
3   !!                       ***  MODULE  lib_mpp  ***
4   !! Ocean numerics:  massively parallel processing library
5   !!=====================================================================
6   !! History :  OPA  !  1994  (M. Guyon, J. Escobar, M. Imbard)  Original code
7   !!            7.0  !  1997  (A.M. Treguier)  SHMEM additions
8   !!            8.0  !  1998  (M. Imbard, J. Escobar, L. Colombet ) SHMEM and MPI
9   !!                 !  1998  (J.M. Molines) Open boundary conditions
10   !!   NEMO     1.0  !  2003  (J.M. Molines, G. Madec)  F90, free form
11   !!                 !  2003  (J.M. Molines) add mpp_ini_north(_3d,_2d)
12   !!             -   !  2004  (R. Bourdalle Badie)  isend option in mpi
13   !!                 !  2004  (J.M. Molines) minloc, maxloc
14   !!             -   !  2005  (G. Madec, S. Masson)  npolj=5,6 F-point & ice cases
15   !!             -   !  2005  (R. Redler) Replacement of MPI_COMM_WORLD except for MPI_Abort
16   !!             -   !  2005  (R. Benshila, G. Madec)  add extra halo case
17   !!             -   !  2008  (R. Benshila) add mpp_ini_ice
18   !!            3.2  !  2009  (R. Benshila) SHMEM suppression, north fold in lbc_nfd
19   !!            3.2  !  2009  (O. Marti)    add mpp_ini_znl
20   !!            4.0  !  2011  (G. Madec)  move ctl_ routines from in_out_manager
21   !!            3.5  !  2013  (C. Ethe, G. Madec)  message passing arrays as local variables
22   !!            3.5  !  2013  (S.Mocavero, I.Epicoco - CMCC) north fold optimizations
23   !!            3.6  !  2015  (O. Tintó and M. Castrillo - BSC) Added '_multiple' case for 2D lbc and max
24   !!            4.0  !  2017  (G. Madec) automatique allocation of array argument (use any 3rd dimension)
25   !!             -   !  2017  (G. Madec) create generic.h90 files to generate all lbc and north fold routines
26   !!----------------------------------------------------------------------
27
28   !!----------------------------------------------------------------------
29   !!   ctl_stop      : update momentum and tracer Kz from a tke scheme
30   !!   ctl_warn      : initialization, namelist read, and parameters control
31   !!   ctl_opn       : Open file and check if required file is available.
32   !!   ctl_nam       : Prints informations when an error occurs while reading a namelist
33   !!   get_unit      : give the index of an unused logical unit
34   !!----------------------------------------------------------------------
35#if   defined key_mpp_mpi
36   !!----------------------------------------------------------------------
37   !!   'key_mpp_mpi'             MPI massively parallel processing library
38   !!----------------------------------------------------------------------
39   !!   lib_mpp_alloc : allocate mpp arrays
40   !!   mynode        : indentify the processor unit
41   !!   mpp_lnk       : interface (defined in lbclnk) for message passing of 2d or 3d arrays (mpp_lnk_2d, mpp_lnk_3d)
42   !!   mpp_lnk_icb   : interface for message passing of 2d arrays with extra halo for icebergs (mpp_lnk_2d_icb)
43   !!   mpprecv       :
44   !!   mppsend       :
45   !!   mppscatter    :
46   !!   mppgather     :
47   !!   mpp_min       : generic interface for mppmin_int , mppmin_a_int , mppmin_real, mppmin_a_real
48   !!   mpp_max       : generic interface for mppmax_int , mppmax_a_int , mppmax_real, mppmax_a_real
49   !!   mpp_sum       : generic interface for mppsum_int , mppsum_a_int , mppsum_real, mppsum_a_real
50   !!   mpp_minloc    :
51   !!   mpp_maxloc    :
52   !!   mppsync       :
53   !!   mppstop       :
54   !!   mpp_ini_north : initialisation of north fold
55   !!   mpp_lbc_north_icb : alternative to mpp_nfd for extra outer halo with icebergs
56   !!----------------------------------------------------------------------
57   USE dom_oce        ! ocean space and time domain
58   USE lbcnfd         ! north fold treatment
59   USE in_out_manager ! I/O manager
60
61   IMPLICIT NONE
62   PRIVATE
63
64   INTERFACE mpp_nfd
65      MODULE PROCEDURE   mpp_nfd_2d    , mpp_nfd_3d    , mpp_nfd_4d
66      MODULE PROCEDURE   mpp_nfd_2d_ptr, mpp_nfd_3d_ptr, mpp_nfd_4d_ptr
67   END INTERFACE
68
69   ! Interface associated to the mpp_lnk_... routines is defined in lbclnk
70   PUBLIC   mpp_lnk_2d    , mpp_lnk_3d    , mpp_lnk_4d
71   PUBLIC   mpp_lnk_2d_ptr, mpp_lnk_3d_ptr, mpp_lnk_4d_ptr
72   !
73!!gm  this should be useless
74   PUBLIC   mpp_nfd_2d    , mpp_nfd_3d    , mpp_nfd_4d
75   PUBLIC   mpp_nfd_2d_ptr, mpp_nfd_3d_ptr, mpp_nfd_4d_ptr
76!!gm end
77   !
78   PUBLIC   ctl_stop, ctl_warn, get_unit, ctl_opn, ctl_nam
79   PUBLIC   mynode, mppstop, mppsync, mpp_comm_free
80   PUBLIC   mpp_ini_north
81   PUBLIC   mpp_lnk_2d_icb
82   PUBLIC   mpp_lbc_north_icb
83   PUBLIC   mpp_min, mpp_max, mpp_sum, mpp_minloc, mpp_maxloc
84   PUBLIC   mpp_delay_max, mpp_delay_sum, mpp_delay_rcv
85   PUBLIC   mppscatter, mppgather
86   PUBLIC   mpp_ini_znl
87   PUBLIC   mppsend, mpprecv                          ! needed by TAM and ICB routines
88   
89   !! * Interfaces
90   !! define generic interface for these routine as they are called sometimes
91   !! with scalar arguments instead of array arguments, which causes problems
92   !! for the compilation on AIX system as well as NEC and SGI. Ok on COMPACQ
93   INTERFACE mpp_min
94      MODULE PROCEDURE mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real
95   END INTERFACE
96   INTERFACE mpp_max
97      MODULE PROCEDURE mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real
98   END INTERFACE
99   INTERFACE mpp_sum
100      MODULE PROCEDURE mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real,   &
101         &             mppsum_realdd, mppsum_a_realdd
102   END INTERFACE
103   INTERFACE mpp_minloc
104      MODULE PROCEDURE mpp_minloc2d ,mpp_minloc3d
105   END INTERFACE
106   INTERFACE mpp_maxloc
107      MODULE PROCEDURE mpp_maxloc2d ,mpp_maxloc3d
108   END INTERFACE
109
110   !! ========================= !!
111   !!  MPI  variable definition !!
112   !! ========================= !!
113!$AGRIF_DO_NOT_TREAT
114   INCLUDE 'mpif.h'
115!$AGRIF_END_DO_NOT_TREAT
116
117   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .TRUE.    !: mpp flag
118
119   INTEGER, PARAMETER         ::   nprocmax = 2**10   ! maximun dimension (required to be a power of 2)
120
121   INTEGER, PUBLIC ::   mppsize        ! number of process
122   INTEGER, PUBLIC ::   mpprank        ! process number  [ 0 - size-1 ]
123!$AGRIF_DO_NOT_TREAT
124   INTEGER, PUBLIC ::   mpi_comm_oce   ! opa local communicator
125!$AGRIF_END_DO_NOT_TREAT
126
127   INTEGER :: MPI_SUMDD
128
129   ! variables used for zonal integration
130   INTEGER, PUBLIC ::   ncomm_znl       !: communicator made by the processors on the same zonal average
131   LOGICAL, PUBLIC ::   l_znl_root      !: True on the 'left'most processor on the same row
132   INTEGER         ::   ngrp_znl        !  group ID for the znl processors
133   INTEGER         ::   ndim_rank_znl   !  number of processors on the same zonal average
134   INTEGER, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_znl  ! dimension ndim_rank_znl, number of the procs into the same znl domain
135
136   ! North fold condition in mpp_mpi with jpni > 1 (PUBLIC for TAM)
137   INTEGER, PUBLIC ::   ngrp_world        !: group ID for the world processors
138   INTEGER, PUBLIC ::   ngrp_opa          !: group ID for the opa processors
139   INTEGER, PUBLIC ::   ngrp_north        !: group ID for the northern processors (to be fold)
140   INTEGER, PUBLIC ::   ncomm_north       !: communicator made by the processors belonging to ngrp_north
141   INTEGER, PUBLIC ::   ndim_rank_north   !: number of 'sea' processor in the northern line (can be /= jpni !)
142   INTEGER, PUBLIC ::   njmppmax          !: value of njmpp for the processors of the northern line
143   INTEGER, PUBLIC ::   north_root        !: number (in the comm_opa) of proc 0 in the northern comm
144   INTEGER, PUBLIC, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_north   !: dimension ndim_rank_north
145
146   ! Type of send : standard, buffered, immediate
147   CHARACTER(len=1), PUBLIC ::   cn_mpi_send        !: type od mpi send/recieve (S=standard, B=bsend, I=isend)
148   LOGICAL         , PUBLIC ::   l_isend = .FALSE.  !: isend use indicator (T if cn_mpi_send='I')
149   INTEGER         , PUBLIC ::   nn_buffer          !: size of the buffer in case of mpi_bsend
150
151   ! Communications summary report
152   CHARACTER(len=400), DIMENSION(:), ALLOCATABLE ::   crname_lbc                   !: names of lbc_lnk calling routines
153   CHARACTER(len=400), DIMENSION(:), ALLOCATABLE ::   crname_glb                   !: names of global comm calling routines
154   CHARACTER(len=400), DIMENSION(:), ALLOCATABLE ::   crname_dlg                   !: names of delayed global comm calling routines
155   INTEGER, PUBLIC                               ::   ncom_stp = 0                 !: copy of time step # istp
156   INTEGER, PUBLIC                               ::   ncom_fsbc = 1                !: copy of sbc time step # nn_fsbc
157   INTEGER, PUBLIC                               ::   ncom_dttrc = 1               !: copy of top time step # nn_dttrc
158   INTEGER, PUBLIC                               ::   ncom_freq                    !: frequency of comm diagnostic
159   INTEGER, PUBLIC , DIMENSION(:,:), ALLOCATABLE ::   ncomm_sequence               !: size of communicated arrays (halos)
160   INTEGER, PARAMETER, PUBLIC                    ::   ncom_rec_max = 3000          !: max number of communication record
161   INTEGER, PUBLIC                               ::   n_sequence_lbc = 0           !: # of communicated arraysvia lbc
162   INTEGER, PUBLIC                               ::   n_sequence_glb = 0           !: # of global communications
163   INTEGER, PUBLIC                               ::   n_sequence_dlg = 0           !: # of delayed global communications
164   INTEGER, PUBLIC                               ::   numcom = -1                  !: logical unit for communicaton report
165   LOGICAL, PUBLIC                               ::   l_full_nf_update = .TRUE.    !: logical for a full (2lines) update of bc at North fold report
166   INTEGER,                    PARAMETER, PUBLIC ::   nbdelay = 2       !: number of delayed operations
167   !: name (used as id) of allreduce-delayed operations
168   ! Warning: we must use the same character length in an array constructor (at least for gcc compiler)
169   CHARACTER(len=32), DIMENSION(nbdelay), PUBLIC ::   c_delaylist = (/ 'cflice', 'fwb   ' /)
170   !: component name where the allreduce-delayed operation is performed
171   CHARACTER(len=3),  DIMENSION(nbdelay), PUBLIC ::   c_delaycpnt = (/ 'ICE'   , 'OCE' /)
172   TYPE, PUBLIC ::   DELAYARR
173      REAL(   wp), POINTER, DIMENSION(:) ::  z1d => NULL()
174      COMPLEX(wp), POINTER, DIMENSION(:) ::  y1d => NULL()
175   END TYPE DELAYARR
176   TYPE( DELAYARR ), DIMENSION(nbdelay), PUBLIC  ::   todelay             
177   INTEGER,          DIMENSION(nbdelay), PUBLIC  ::   ndelayid = -1     !: mpi request id of the delayed operations
178
179   REAL(wp), DIMENSION(:), ALLOCATABLE, SAVE ::   tampon   ! buffer in case of bsend
180
181   LOGICAL, PUBLIC ::   ln_nnogather                !: namelist control of northfold comms
182   LOGICAL, PUBLIC ::   l_north_nogather = .FALSE.  !: internal control of northfold comms
183
184   !!----------------------------------------------------------------------
185   !! NEMO/OCE 4.0 , NEMO Consortium (2018)
186   !! $Id: lib_mpp.F90 10538 2019-01-17 10:41:10Z clem $
187   !! Software governed by the CeCILL license (see ./LICENSE)
188   !!----------------------------------------------------------------------
189CONTAINS
190
191   FUNCTION mynode( ldtxt, ldname, kumnam_ref, kumnam_cfg, kumond, kstop, localComm )
192      !!----------------------------------------------------------------------
193      !!                  ***  routine mynode  ***
194      !!
195      !! ** Purpose :   Find processor unit
196      !!----------------------------------------------------------------------
197      CHARACTER(len=*),DIMENSION(:), INTENT(  out) ::   ldtxt        !
198      CHARACTER(len=*)             , INTENT(in   ) ::   ldname       !
199      INTEGER                      , INTENT(in   ) ::   kumnam_ref   ! logical unit for reference namelist
200      INTEGER                      , INTENT(in   ) ::   kumnam_cfg   ! logical unit for configuration namelist
201      INTEGER                      , INTENT(inout) ::   kumond       ! logical unit for namelist output
202      INTEGER                      , INTENT(inout) ::   kstop        ! stop indicator
203      INTEGER         , OPTIONAL   , INTENT(in   ) ::   localComm    !
204      !
205      INTEGER ::   mynode, ierr, code, ji, ii, ios
206      LOGICAL ::   mpi_was_called
207      !
208      NAMELIST/nammpp/ cn_mpi_send, nn_buffer, jpni, jpnj, ln_nnogather
209      !!----------------------------------------------------------------------
210      !
211      ii = 1
212      WRITE(ldtxt(ii),*)                                                                  ;   ii = ii + 1
213      WRITE(ldtxt(ii),*) 'mynode : mpi initialisation'                                    ;   ii = ii + 1
214      WRITE(ldtxt(ii),*) '~~~~~~ '                                                        ;   ii = ii + 1
215      !
216      REWIND( kumnam_ref )              ! Namelist nammpp in reference namelist: mpi variables
217      READ  ( kumnam_ref, nammpp, IOSTAT = ios, ERR = 901)
218901   IF( ios /= 0 )   CALL ctl_nam ( ios , 'nammpp in reference namelist', lwp )
219      !
220      REWIND( kumnam_cfg )              ! Namelist nammpp in configuration namelist: mpi variables
221      READ  ( kumnam_cfg, nammpp, IOSTAT = ios, ERR = 902 )
222902   IF( ios >  0 )   CALL ctl_nam ( ios , 'nammpp in configuration namelist', lwp )
223      !
224      !                              ! control print
225      WRITE(ldtxt(ii),*) '   Namelist nammpp'                                             ;   ii = ii + 1
226      WRITE(ldtxt(ii),*) '      mpi send type          cn_mpi_send = ', cn_mpi_send       ;   ii = ii + 1
227      WRITE(ldtxt(ii),*) '      size exported buffer   nn_buffer   = ', nn_buffer,' bytes';   ii = ii + 1
228      !
229      IF( jpni < 1 .OR. jpnj < 1  ) THEN
230         WRITE(ldtxt(ii),*) '      jpni and jpnj will be calculated automatically' ;   ii = ii + 1
231      ELSE
232         WRITE(ldtxt(ii),*) '      processor grid extent in i         jpni = ',jpni       ;   ii = ii + 1
233         WRITE(ldtxt(ii),*) '      processor grid extent in j         jpnj = ',jpnj       ;   ii = ii + 1
234      ENDIF
235
236      WRITE(ldtxt(ii),*) '      avoid use of mpi_allgather at the north fold  ln_nnogather = ', ln_nnogather  ; ii = ii + 1
237
238      CALL mpi_initialized ( mpi_was_called, code )
239      IF( code /= MPI_SUCCESS ) THEN
240         DO ji = 1, SIZE(ldtxt)
241            IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
242         END DO
243         WRITE(*, cform_err)
244         WRITE(*, *) 'lib_mpp: Error in routine mpi_initialized'
245         CALL mpi_abort( mpi_comm_world, code, ierr )
246      ENDIF
247
248      IF( mpi_was_called ) THEN
249         !
250         SELECT CASE ( cn_mpi_send )
251         CASE ( 'S' )                ! Standard mpi send (blocking)
252            WRITE(ldtxt(ii),*) '           Standard blocking mpi send (send)'             ;   ii = ii + 1
253         CASE ( 'B' )                ! Buffer mpi send (blocking)
254            WRITE(ldtxt(ii),*) '           Buffer blocking mpi send (bsend)'              ;   ii = ii + 1
255            IF( Agrif_Root() )   CALL mpi_init_oce( ldtxt, ii, ierr )
256         CASE ( 'I' )                ! Immediate mpi send (non-blocking send)
257            WRITE(ldtxt(ii),*) '           Immediate non-blocking send (isend)'           ;   ii = ii + 1
258            l_isend = .TRUE.
259         CASE DEFAULT
260            WRITE(ldtxt(ii),cform_err)                                                    ;   ii = ii + 1
261            WRITE(ldtxt(ii),*) '           bad value for cn_mpi_send = ', cn_mpi_send     ;   ii = ii + 1
262            kstop = kstop + 1
263         END SELECT
264         !
265      ELSEIF ( PRESENT(localComm) .AND. .NOT. mpi_was_called ) THEN
266         WRITE(ldtxt(ii),cform_err)                                                    ;   ii = ii + 1
267         WRITE(ldtxt(ii),*) ' lib_mpp: You cannot provide a local communicator '          ;   ii = ii + 1
268         WRITE(ldtxt(ii),*) '          without calling MPI_Init before ! '                ;   ii = ii + 1
269         kstop = kstop + 1
270      ELSE
271         SELECT CASE ( cn_mpi_send )
272         CASE ( 'S' )                ! Standard mpi send (blocking)
273            WRITE(ldtxt(ii),*) '           Standard blocking mpi send (send)'             ;   ii = ii + 1
274            CALL mpi_init( ierr )
275         CASE ( 'B' )                ! Buffer mpi send (blocking)
276            WRITE(ldtxt(ii),*) '           Buffer blocking mpi send (bsend)'              ;   ii = ii + 1
277            IF( Agrif_Root() )   CALL mpi_init_oce( ldtxt, ii, ierr )
278         CASE ( 'I' )                ! Immediate mpi send (non-blocking send)
279            WRITE(ldtxt(ii),*) '           Immediate non-blocking send (isend)'           ;   ii = ii + 1
280            l_isend = .TRUE.
281            CALL mpi_init( ierr )
282         CASE DEFAULT
283            WRITE(ldtxt(ii),cform_err)                                                    ;   ii = ii + 1
284            WRITE(ldtxt(ii),*) '           bad value for cn_mpi_send = ', cn_mpi_send     ;   ii = ii + 1
285            kstop = kstop + 1
286         END SELECT
287         !
288      ENDIF
289
290      IF( PRESENT(localComm) ) THEN
291         IF( Agrif_Root() ) THEN
292            mpi_comm_oce = localComm
293         ENDIF
294      ELSE
295         CALL mpi_comm_dup( mpi_comm_world, mpi_comm_oce, code)
296         IF( code /= MPI_SUCCESS ) THEN
297            DO ji = 1, SIZE(ldtxt)
298               IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
299            END DO
300            WRITE(*, cform_err)
301            WRITE(*, *) ' lib_mpp: Error in routine mpi_comm_dup'
302            CALL mpi_abort( mpi_comm_world, code, ierr )
303         ENDIF
304      ENDIF
305
306#if defined key_agrif
307      IF( Agrif_Root() ) THEN
308         CALL Agrif_MPI_Init(mpi_comm_oce)
309      ELSE
310         CALL Agrif_MPI_set_grid_comm(mpi_comm_oce)
311      ENDIF
312#endif
313
314      CALL mpi_comm_rank( mpi_comm_oce, mpprank, ierr )
315      CALL mpi_comm_size( mpi_comm_oce, mppsize, ierr )
316      mynode = mpprank
317
318      IF( mynode == 0 ) THEN
319         CALL ctl_opn( kumond, TRIM(ldname), 'UNKNOWN', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. , 1 )
320         WRITE(kumond, nammpp)     
321      ENDIF
322      !
323      CALL MPI_OP_CREATE(DDPDD_MPI, .TRUE., MPI_SUMDD, ierr)
324      !
325   END FUNCTION mynode
326
327   !!----------------------------------------------------------------------
328   !!                   ***  routine mpp_lnk_(2,3,4)d  ***
329   !!
330   !!   * Argument : dummy argument use in mpp_lnk_... routines
331   !!                ptab   :   array or pointer of arrays on which the boundary condition is applied
332   !!                cd_nat :   nature of array grid-points
333   !!                psgn   :   sign used across the north fold boundary
334   !!                kfld   :   optional, number of pt3d arrays
335   !!                cd_mpp :   optional, fill the overlap area only
336   !!                pval   :   optional, background value (used at closed boundaries)
337   !!----------------------------------------------------------------------
338   !
339   !                       !==  2D array and array of 2D pointer  ==!
340   !
341#  define DIM_2d
342#     define ROUTINE_LNK           mpp_lnk_2d
343#     include "mpp_lnk_generic.h90"
344#     undef ROUTINE_LNK
345#     define MULTI
346#     define ROUTINE_LNK           mpp_lnk_2d_ptr
347#     include "mpp_lnk_generic.h90"
348#     undef ROUTINE_LNK
349#     undef MULTI
350#  undef DIM_2d
351   !
352   !                       !==  3D array and array of 3D pointer  ==!
353   !
354#  define DIM_3d
355#     define ROUTINE_LNK           mpp_lnk_3d
356#     include "mpp_lnk_generic.h90"
357#     undef ROUTINE_LNK
358#     define MULTI
359#     define ROUTINE_LNK           mpp_lnk_3d_ptr
360#     include "mpp_lnk_generic.h90"
361#     undef ROUTINE_LNK
362#     undef MULTI
363#  undef DIM_3d
364   !
365   !                       !==  4D array and array of 4D pointer  ==!
366   !
367#  define DIM_4d
368#     define ROUTINE_LNK           mpp_lnk_4d
369#     include "mpp_lnk_generic.h90"
370#     undef ROUTINE_LNK
371#     define MULTI
372#     define ROUTINE_LNK           mpp_lnk_4d_ptr
373#     include "mpp_lnk_generic.h90"
374#     undef ROUTINE_LNK
375#     undef MULTI
376#  undef DIM_4d
377
378   !!----------------------------------------------------------------------
379   !!                   ***  routine mpp_nfd_(2,3,4)d  ***
380   !!
381   !!   * Argument : dummy argument use in mpp_nfd_... routines
382   !!                ptab   :   array or pointer of arrays on which the boundary condition is applied
383   !!                cd_nat :   nature of array grid-points
384   !!                psgn   :   sign used across the north fold boundary
385   !!                kfld   :   optional, number of pt3d arrays
386   !!                cd_mpp :   optional, fill the overlap area only
387   !!                pval   :   optional, background value (used at closed boundaries)
388   !!----------------------------------------------------------------------
389   !
390   !                       !==  2D array and array of 2D pointer  ==!
391   !
392#  define DIM_2d
393#     define ROUTINE_NFD           mpp_nfd_2d
394#     include "mpp_nfd_generic.h90"
395#     undef ROUTINE_NFD
396#     define MULTI
397#     define ROUTINE_NFD           mpp_nfd_2d_ptr
398#     include "mpp_nfd_generic.h90"
399#     undef ROUTINE_NFD
400#     undef MULTI
401#  undef DIM_2d
402   !
403   !                       !==  3D array and array of 3D pointer  ==!
404   !
405#  define DIM_3d
406#     define ROUTINE_NFD           mpp_nfd_3d
407#     include "mpp_nfd_generic.h90"
408#     undef ROUTINE_NFD
409#     define MULTI
410#     define ROUTINE_NFD           mpp_nfd_3d_ptr
411#     include "mpp_nfd_generic.h90"
412#     undef ROUTINE_NFD
413#     undef MULTI
414#  undef DIM_3d
415   !
416   !                       !==  4D array and array of 4D pointer  ==!
417   !
418#  define DIM_4d
419#     define ROUTINE_NFD           mpp_nfd_4d
420#     include "mpp_nfd_generic.h90"
421#     undef ROUTINE_NFD
422#     define MULTI
423#     define ROUTINE_NFD           mpp_nfd_4d_ptr
424#     include "mpp_nfd_generic.h90"
425#     undef ROUTINE_NFD
426#     undef MULTI
427#  undef DIM_4d
428
429
430   !!----------------------------------------------------------------------
431   !!
432   !!   load_array  &   mpp_lnk_2d_9    à generaliser a 3D et 4D
433   
434   
435   !!    mpp_lnk_sum_2d et 3D   ====>>>>>>   à virer du code !!!!
436   
437   
438   !!----------------------------------------------------------------------
439
440
441
442   SUBROUTINE mppsend( ktyp, pmess, kbytes, kdest, md_req )
443      !!----------------------------------------------------------------------
444      !!                  ***  routine mppsend  ***
445      !!
446      !! ** Purpose :   Send messag passing array
447      !!
448      !!----------------------------------------------------------------------
449      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
450      INTEGER , INTENT(in   ) ::   kbytes     ! size of the array pmess
451      INTEGER , INTENT(in   ) ::   kdest      ! receive process number
452      INTEGER , INTENT(in   ) ::   ktyp       ! tag of the message
453      INTEGER , INTENT(in   ) ::   md_req     ! argument for isend
454      !!
455      INTEGER ::   iflag
456      !!----------------------------------------------------------------------
457      !
458      SELECT CASE ( cn_mpi_send )
459      CASE ( 'S' )                ! Standard mpi send (blocking)
460         CALL mpi_send ( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce        , iflag )
461      CASE ( 'B' )                ! Buffer mpi send (blocking)
462         CALL mpi_bsend( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce        , iflag )
463      CASE ( 'I' )                ! Immediate mpi send (non-blocking send)
464         ! be carefull, one more argument here : the mpi request identifier..
465         CALL mpi_isend( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce, md_req, iflag )
466      END SELECT
467      !
468   END SUBROUTINE mppsend
469
470
471   SUBROUTINE mpprecv( ktyp, pmess, kbytes, ksource )
472      !!----------------------------------------------------------------------
473      !!                  ***  routine mpprecv  ***
474      !!
475      !! ** Purpose :   Receive messag passing array
476      !!
477      !!----------------------------------------------------------------------
478      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
479      INTEGER , INTENT(in   ) ::   kbytes     ! suze of the array pmess
480      INTEGER , INTENT(in   ) ::   ktyp       ! Tag of the recevied message
481      INTEGER, OPTIONAL, INTENT(in) :: ksource    ! source process number
482      !!
483      INTEGER :: istatus(mpi_status_size)
484      INTEGER :: iflag
485      INTEGER :: use_source
486      !!----------------------------------------------------------------------
487      !
488      ! If a specific process number has been passed to the receive call,
489      ! use that one. Default is to use mpi_any_source
490      use_source = mpi_any_source
491      IF( PRESENT(ksource) )   use_source = ksource
492      !
493      CALL mpi_recv( pmess, kbytes, mpi_double_precision, use_source, ktyp, mpi_comm_oce, istatus, iflag )
494      !
495   END SUBROUTINE mpprecv
496
497
498   SUBROUTINE mppgather( ptab, kp, pio )
499      !!----------------------------------------------------------------------
500      !!                   ***  routine mppgather  ***
501      !!
502      !! ** Purpose :   Transfert between a local subdomain array and a work
503      !!     array which is distributed following the vertical level.
504      !!
505      !!----------------------------------------------------------------------
506      REAL(wp), DIMENSION(jpi,jpj)      , INTENT(in   ) ::   ptab   ! subdomain input array
507      INTEGER                           , INTENT(in   ) ::   kp     ! record length
508      REAL(wp), DIMENSION(jpi,jpj,jpnij), INTENT(  out) ::   pio    ! subdomain input array
509      !!
510      INTEGER :: itaille, ierror   ! temporary integer
511      !!---------------------------------------------------------------------
512      !
513      itaille = jpi * jpj
514      CALL mpi_gather( ptab, itaille, mpi_double_precision, pio, itaille     ,   &
515         &                            mpi_double_precision, kp , mpi_comm_oce, ierror )
516      !
517   END SUBROUTINE mppgather
518
519
520   SUBROUTINE mppscatter( pio, kp, ptab )
521      !!----------------------------------------------------------------------
522      !!                  ***  routine mppscatter  ***
523      !!
524      !! ** Purpose :   Transfert between awork array which is distributed
525      !!      following the vertical level and the local subdomain array.
526      !!
527      !!----------------------------------------------------------------------
528      REAL(wp), DIMENSION(jpi,jpj,jpnij)  ::   pio    ! output array
529      INTEGER                             ::   kp     ! Tag (not used with MPI
530      REAL(wp), DIMENSION(jpi,jpj)        ::   ptab   ! subdomain array input
531      !!
532      INTEGER :: itaille, ierror   ! temporary integer
533      !!---------------------------------------------------------------------
534      !
535      itaille = jpi * jpj
536      !
537      CALL mpi_scatter( pio, itaille, mpi_double_precision, ptab, itaille     ,   &
538         &                            mpi_double_precision, kp  , mpi_comm_oce, ierror )
539      !
540   END SUBROUTINE mppscatter
541
542   
543   SUBROUTINE mpp_delay_sum( cdname, cdelay, y_in, pout, ldlast, kcom )
544     !!----------------------------------------------------------------------
545      !!                   ***  routine mpp_delay_sum  ***
546      !!
547      !! ** Purpose :   performed delayed mpp_sum, the result is received on next call
548      !!
549      !!----------------------------------------------------------------------
550      CHARACTER(len=*), INTENT(in   )               ::   cdname  ! name of the calling subroutine
551      CHARACTER(len=*), INTENT(in   )               ::   cdelay  ! name (used as id) of the delayed operation
552      COMPLEX(wp),      INTENT(in   ), DIMENSION(:) ::   y_in
553      REAL(wp),         INTENT(  out), DIMENSION(:) ::   pout
554      LOGICAL,          INTENT(in   )               ::   ldlast  ! true if this is the last time we call this routine
555      INTEGER,          INTENT(in   ), OPTIONAL     ::   kcom
556      !!
557      INTEGER ::   ji, isz
558      INTEGER ::   idvar
559      INTEGER ::   ierr, ilocalcomm
560      COMPLEX(wp), ALLOCATABLE, DIMENSION(:) ::   ytmp
561      !!----------------------------------------------------------------------
562      ilocalcomm = mpi_comm_oce
563      IF( PRESENT(kcom) )   ilocalcomm = kcom
564
565      isz = SIZE(y_in)
566     
567      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ld_dlg = .TRUE. )
568
569      idvar = -1
570      DO ji = 1, nbdelay
571         IF( TRIM(cdelay) == TRIM(c_delaylist(ji)) ) idvar = ji
572      END DO
573      IF ( idvar == -1 )   CALL ctl_stop( 'STOP',' mpp_delay_sum : please add a new delayed exchange for '//TRIM(cdname) )
574
575      IF ( ndelayid(idvar) == 0 ) THEN         ! first call    with restart: %z1d defined in iom_delay_rst
576         !                                       --------------------------
577         IF ( SIZE(todelay(idvar)%z1d) /= isz ) THEN                  ! Check dimension coherence
578            IF(lwp) WRITE(numout,*) ' WARNING: the nb of delayed variables in restart file is not the model one'
579            DEALLOCATE(todelay(idvar)%z1d)
580            ndelayid(idvar) = -1                                      ! do as if we had no restart
581         ELSE
582            ALLOCATE(todelay(idvar)%y1d(isz))
583            todelay(idvar)%y1d(:) = CMPLX(todelay(idvar)%z1d(:), 0., wp)   ! create %y1d, complex variable needed by mpi_sumdd
584         END IF
585      ENDIF
586     
587      IF( ndelayid(idvar) == -1 ) THEN         ! first call without restart: define %y1d and %z1d from y_in with blocking allreduce
588         !                                       --------------------------
589         ALLOCATE(todelay(idvar)%z1d(isz), todelay(idvar)%y1d(isz))   ! allocate also %z1d as used for the restart
590         CALL mpi_allreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ierr )   ! get %y1d
591         todelay(idvar)%z1d(:) = REAL(todelay(idvar)%y1d(:), wp)      ! define %z1d from %y1d
592      ENDIF
593
594      IF( ndelayid(idvar) > 0 )   CALL mpp_delay_rcv( idvar )         ! make sure %z1d is received
595
596      ! send back pout from todelay(idvar)%z1d defined at previous call
597      pout(:) = todelay(idvar)%z1d(:)
598
599      ! send y_in into todelay(idvar)%y1d with a non-blocking communication
600#if defined key_mpi2
601      CALL  mpi_allreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ndelayid(idvar), ierr )
602#else
603      CALL mpi_iallreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ndelayid(idvar), ierr )
604#endif
605
606   END SUBROUTINE mpp_delay_sum
607
608   
609   SUBROUTINE mpp_delay_max( cdname, cdelay, p_in, pout, ldlast, kcom )
610      !!----------------------------------------------------------------------
611      !!                   ***  routine mpp_delay_max  ***
612      !!
613      !! ** Purpose :   performed delayed mpp_max, the result is received on next call
614      !!
615      !!----------------------------------------------------------------------
616      CHARACTER(len=*), INTENT(in   )                 ::   cdname  ! name of the calling subroutine
617      CHARACTER(len=*), INTENT(in   )                 ::   cdelay  ! name (used as id) of the delayed operation
618      REAL(wp),         INTENT(in   ), DIMENSION(:)   ::   p_in    !
619      REAL(wp),         INTENT(  out), DIMENSION(:)   ::   pout    !
620      LOGICAL,          INTENT(in   )                 ::   ldlast  ! true if this is the last time we call this routine
621      INTEGER,          INTENT(in   ), OPTIONAL       ::   kcom
622      !!
623      INTEGER ::   ji, isz
624      INTEGER ::   idvar
625      INTEGER ::   ierr, ilocalcomm
626      !!----------------------------------------------------------------------
627      ilocalcomm = mpi_comm_oce
628      IF( PRESENT(kcom) )   ilocalcomm = kcom
629
630      isz = SIZE(p_in)
631
632      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ld_dlg = .TRUE. )
633
634      idvar = -1
635      DO ji = 1, nbdelay
636         IF( TRIM(cdelay) == TRIM(c_delaylist(ji)) ) idvar = ji
637      END DO
638      IF ( idvar == -1 )   CALL ctl_stop( 'STOP',' mpp_delay_max : please add a new delayed exchange for '//TRIM(cdname) )
639
640      IF ( ndelayid(idvar) == 0 ) THEN         ! first call    with restart: %z1d defined in iom_delay_rst
641         !                                       --------------------------
642         IF ( SIZE(todelay(idvar)%z1d) /= isz ) THEN                  ! Check dimension coherence
643            IF(lwp) WRITE(numout,*) ' WARNING: the nb of delayed variables in restart file is not the model one'
644            DEALLOCATE(todelay(idvar)%z1d)
645            ndelayid(idvar) = -1                                      ! do as if we had no restart
646         END IF
647      ENDIF
648
649      IF( ndelayid(idvar) == -1 ) THEN         ! first call without restart: define %z1d from p_in with a blocking allreduce
650         !                                       --------------------------
651         ALLOCATE(todelay(idvar)%z1d(isz))
652         CALL mpi_allreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ierr )   ! get %z1d
653      ENDIF
654
655      IF( ndelayid(idvar) > 0 )   CALL mpp_delay_rcv( idvar )         ! make sure %z1d is received
656
657      ! send back pout from todelay(idvar)%z1d defined at previous call
658      pout(:) = todelay(idvar)%z1d(:)
659
660      ! send p_in into todelay(idvar)%z1d with a non-blocking communication
661#if defined key_mpi2
662      CALL  mpi_allreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ndelayid(idvar), ierr )
663#else
664      CALL mpi_iallreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ndelayid(idvar), ierr )
665#endif
666
667   END SUBROUTINE mpp_delay_max
668
669   
670   SUBROUTINE mpp_delay_rcv( kid )
671      !!----------------------------------------------------------------------
672      !!                   ***  routine mpp_delay_rcv  ***
673      !!
674      !! ** Purpose :  force barrier for delayed mpp (needed for restart)
675      !!
676      !!----------------------------------------------------------------------
677      INTEGER,INTENT(in   )      ::  kid 
678      INTEGER ::   ierr
679      !!----------------------------------------------------------------------
680      IF( ndelayid(kid) /= -2 ) THEN 
681#if ! defined key_mpi2
682         CALL mpi_wait( ndelayid(kid), MPI_STATUS_IGNORE, ierr )                        ! make sure todelay(kid) is received
683#endif
684         IF( ASSOCIATED(todelay(kid)%y1d) )   todelay(kid)%z1d(:) = REAL(todelay(kid)%y1d(:), wp)  ! define %z1d from %y1d
685         ndelayid(kid) = -2   ! add flag to know that mpi_wait was already called on kid
686      ENDIF
687   END SUBROUTINE mpp_delay_rcv
688
689   
690   !!----------------------------------------------------------------------
691   !!    ***  mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real  ***
692   !!   
693   !!----------------------------------------------------------------------
694   !!
695#  define OPERATION_MAX
696#  define INTEGER_TYPE
697#  define DIM_0d
698#     define ROUTINE_ALLREDUCE           mppmax_int
699#     include "mpp_allreduce_generic.h90"
700#     undef ROUTINE_ALLREDUCE
701#  undef DIM_0d
702#  define DIM_1d
703#     define ROUTINE_ALLREDUCE           mppmax_a_int
704#     include "mpp_allreduce_generic.h90"
705#     undef ROUTINE_ALLREDUCE
706#  undef DIM_1d
707#  undef INTEGER_TYPE
708!
709#  define REAL_TYPE
710#  define DIM_0d
711#     define ROUTINE_ALLREDUCE           mppmax_real
712#     include "mpp_allreduce_generic.h90"
713#     undef ROUTINE_ALLREDUCE
714#  undef DIM_0d
715#  define DIM_1d
716#     define ROUTINE_ALLREDUCE           mppmax_a_real
717#     include "mpp_allreduce_generic.h90"
718#     undef ROUTINE_ALLREDUCE
719#  undef DIM_1d
720#  undef REAL_TYPE
721#  undef OPERATION_MAX
722   !!----------------------------------------------------------------------
723   !!    ***  mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real  ***
724   !!   
725   !!----------------------------------------------------------------------
726   !!
727#  define OPERATION_MIN
728#  define INTEGER_TYPE
729#  define DIM_0d
730#     define ROUTINE_ALLREDUCE           mppmin_int
731#     include "mpp_allreduce_generic.h90"
732#     undef ROUTINE_ALLREDUCE
733#  undef DIM_0d
734#  define DIM_1d
735#     define ROUTINE_ALLREDUCE           mppmin_a_int
736#     include "mpp_allreduce_generic.h90"
737#     undef ROUTINE_ALLREDUCE
738#  undef DIM_1d
739#  undef INTEGER_TYPE
740!
741#  define REAL_TYPE
742#  define DIM_0d
743#     define ROUTINE_ALLREDUCE           mppmin_real
744#     include "mpp_allreduce_generic.h90"
745#     undef ROUTINE_ALLREDUCE
746#  undef DIM_0d
747#  define DIM_1d
748#     define ROUTINE_ALLREDUCE           mppmin_a_real
749#     include "mpp_allreduce_generic.h90"
750#     undef ROUTINE_ALLREDUCE
751#  undef DIM_1d
752#  undef REAL_TYPE
753#  undef OPERATION_MIN
754
755   !!----------------------------------------------------------------------
756   !!    ***  mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real  ***
757   !!   
758   !!   Global sum of 1D array or a variable (integer, real or complex)
759   !!----------------------------------------------------------------------
760   !!
761#  define OPERATION_SUM
762#  define INTEGER_TYPE
763#  define DIM_0d
764#     define ROUTINE_ALLREDUCE           mppsum_int
765#     include "mpp_allreduce_generic.h90"
766#     undef ROUTINE_ALLREDUCE
767#  undef DIM_0d
768#  define DIM_1d
769#     define ROUTINE_ALLREDUCE           mppsum_a_int
770#     include "mpp_allreduce_generic.h90"
771#     undef ROUTINE_ALLREDUCE
772#  undef DIM_1d
773#  undef INTEGER_TYPE
774!
775#  define REAL_TYPE
776#  define DIM_0d
777#     define ROUTINE_ALLREDUCE           mppsum_real
778#     include "mpp_allreduce_generic.h90"
779#     undef ROUTINE_ALLREDUCE
780#  undef DIM_0d
781#  define DIM_1d
782#     define ROUTINE_ALLREDUCE           mppsum_a_real
783#     include "mpp_allreduce_generic.h90"
784#     undef ROUTINE_ALLREDUCE
785#  undef DIM_1d
786#  undef REAL_TYPE
787#  undef OPERATION_SUM
788
789#  define OPERATION_SUM_DD
790#  define COMPLEX_TYPE
791#  define DIM_0d
792#     define ROUTINE_ALLREDUCE           mppsum_realdd
793#     include "mpp_allreduce_generic.h90"
794#     undef ROUTINE_ALLREDUCE
795#  undef DIM_0d
796#  define DIM_1d
797#     define ROUTINE_ALLREDUCE           mppsum_a_realdd
798#     include "mpp_allreduce_generic.h90"
799#     undef ROUTINE_ALLREDUCE
800#  undef DIM_1d
801#  undef COMPLEX_TYPE
802#  undef OPERATION_SUM_DD
803
804   !!----------------------------------------------------------------------
805   !!    ***  mpp_minloc2d, mpp_minloc3d, mpp_maxloc2d, mpp_maxloc3d
806   !!   
807   !!----------------------------------------------------------------------
808   !!
809#  define OPERATION_MINLOC
810#  define DIM_2d
811#     define ROUTINE_LOC           mpp_minloc2d
812#     include "mpp_loc_generic.h90"
813#     undef ROUTINE_LOC
814#  undef DIM_2d
815#  define DIM_3d
816#     define ROUTINE_LOC           mpp_minloc3d
817#     include "mpp_loc_generic.h90"
818#     undef ROUTINE_LOC
819#  undef DIM_3d
820#  undef OPERATION_MINLOC
821
822#  define OPERATION_MAXLOC
823#  define DIM_2d
824#     define ROUTINE_LOC           mpp_maxloc2d
825#     include "mpp_loc_generic.h90"
826#     undef ROUTINE_LOC
827#  undef DIM_2d
828#  define DIM_3d
829#     define ROUTINE_LOC           mpp_maxloc3d
830#     include "mpp_loc_generic.h90"
831#     undef ROUTINE_LOC
832#  undef DIM_3d
833#  undef OPERATION_MAXLOC
834
835   SUBROUTINE mppsync()
836      !!----------------------------------------------------------------------
837      !!                  ***  routine mppsync  ***
838      !!
839      !! ** Purpose :   Massively parallel processors, synchroneous
840      !!
841      !!-----------------------------------------------------------------------
842      INTEGER :: ierror
843      !!-----------------------------------------------------------------------
844      !
845      CALL mpi_barrier( mpi_comm_oce, ierror )
846      !
847   END SUBROUTINE mppsync
848
849
850   SUBROUTINE mppstop( ldfinal, ld_force_abort ) 
851      !!----------------------------------------------------------------------
852      !!                  ***  routine mppstop  ***
853      !!
854      !! ** purpose :   Stop massively parallel processors method
855      !!
856      !!----------------------------------------------------------------------
857      LOGICAL, OPTIONAL, INTENT(in) :: ldfinal    ! source process number
858      LOGICAL, OPTIONAL, INTENT(in) :: ld_force_abort    ! source process number
859      LOGICAL ::   llfinal, ll_force_abort
860      INTEGER ::   info
861      !!----------------------------------------------------------------------
862      llfinal = .FALSE.
863      IF( PRESENT(ldfinal) ) llfinal = ldfinal
864      ll_force_abort = .FALSE.
865      IF( PRESENT(ld_force_abort) ) ll_force_abort = ld_force_abort
866      !
867      IF(ll_force_abort) THEN
868         CALL mpi_abort( MPI_COMM_WORLD )
869      ELSE
870         CALL mppsync
871         CALL mpi_finalize( info )
872      ENDIF
873      IF( .NOT. llfinal ) STOP 123456
874      !
875   END SUBROUTINE mppstop
876
877
878   SUBROUTINE mpp_comm_free( kcom )
879      !!----------------------------------------------------------------------
880      INTEGER, INTENT(in) ::   kcom
881      !!
882      INTEGER :: ierr
883      !!----------------------------------------------------------------------
884      !
885      CALL MPI_COMM_FREE(kcom, ierr)
886      !
887   END SUBROUTINE mpp_comm_free
888
889
890   SUBROUTINE mpp_ini_znl( kumout )
891      !!----------------------------------------------------------------------
892      !!               ***  routine mpp_ini_znl  ***
893      !!
894      !! ** Purpose :   Initialize special communicator for computing zonal sum
895      !!
896      !! ** Method  : - Look for processors in the same row
897      !!              - Put their number in nrank_znl
898      !!              - Create group for the znl processors
899      !!              - Create a communicator for znl processors
900      !!              - Determine if processor should write znl files
901      !!
902      !! ** output
903      !!      ndim_rank_znl = number of processors on the same row
904      !!      ngrp_znl = group ID for the znl processors
905      !!      ncomm_znl = communicator for the ice procs.
906      !!      n_znl_root = number (in the world) of proc 0 in the ice comm.
907      !!
908      !!----------------------------------------------------------------------
909      INTEGER, INTENT(in) ::   kumout   ! ocean.output logical units
910      !
911      INTEGER :: jproc      ! dummy loop integer
912      INTEGER :: ierr, ii   ! local integer
913      INTEGER, ALLOCATABLE, DIMENSION(:) ::   kwork
914      !!----------------------------------------------------------------------
915      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_world     : ', ngrp_world
916      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_world : ', mpi_comm_world
917      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_oce   : ', mpi_comm_oce
918      !
919      ALLOCATE( kwork(jpnij), STAT=ierr )
920      IF( ierr /= 0 ) THEN
921         WRITE(kumout, cform_err)
922         WRITE(kumout,*) 'mpp_ini_znl : failed to allocate 1D array of length jpnij'
923         CALL mppstop
924      ENDIF
925
926      IF( jpnj == 1 ) THEN
927         ngrp_znl  = ngrp_world
928         ncomm_znl = mpi_comm_oce
929      ELSE
930         !
931         CALL MPI_ALLGATHER ( njmpp, 1, mpi_integer, kwork, 1, mpi_integer, mpi_comm_oce, ierr )
932         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - kwork pour njmpp : ', kwork
933         !-$$        CALL flush(numout)
934         !
935         ! Count number of processors on the same row
936         ndim_rank_znl = 0
937         DO jproc=1,jpnij
938            IF ( kwork(jproc) == njmpp ) THEN
939               ndim_rank_znl = ndim_rank_znl + 1
940            ENDIF
941         END DO
942         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ndim_rank_znl : ', ndim_rank_znl
943         !-$$        CALL flush(numout)
944         ! Allocate the right size to nrank_znl
945         IF (ALLOCATED (nrank_znl)) DEALLOCATE(nrank_znl)
946         ALLOCATE(nrank_znl(ndim_rank_znl))
947         ii = 0
948         nrank_znl (:) = 0
949         DO jproc=1,jpnij
950            IF ( kwork(jproc) == njmpp) THEN
951               ii = ii + 1
952               nrank_znl(ii) = jproc -1
953            ENDIF
954         END DO
955         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - nrank_znl : ', nrank_znl
956         !-$$        CALL flush(numout)
957
958         ! Create the opa group
959         CALL MPI_COMM_GROUP(mpi_comm_oce,ngrp_opa,ierr)
960         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_opa : ', ngrp_opa
961         !-$$        CALL flush(numout)
962
963         ! Create the znl group from the opa group
964         CALL MPI_GROUP_INCL  ( ngrp_opa, ndim_rank_znl, nrank_znl, ngrp_znl, ierr )
965         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_znl ', ngrp_znl
966         !-$$        CALL flush(numout)
967
968         ! Create the znl communicator from the opa communicator, ie the pool of procs in the same row
969         CALL MPI_COMM_CREATE ( mpi_comm_oce, ngrp_znl, ncomm_znl, ierr )
970         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ncomm_znl ', ncomm_znl
971         !-$$        CALL flush(numout)
972         !
973      END IF
974
975      ! Determines if processor if the first (starting from i=1) on the row
976      IF ( jpni == 1 ) THEN
977         l_znl_root = .TRUE.
978      ELSE
979         l_znl_root = .FALSE.
980         kwork (1) = nimpp
981         CALL mpp_min ( 'lib_mpp', kwork(1), kcom = ncomm_znl)
982         IF ( nimpp == kwork(1)) l_znl_root = .TRUE.
983      END IF
984
985      DEALLOCATE(kwork)
986
987   END SUBROUTINE mpp_ini_znl
988
989
990   SUBROUTINE mpp_ini_north
991      !!----------------------------------------------------------------------
992      !!               ***  routine mpp_ini_north  ***
993      !!
994      !! ** Purpose :   Initialize special communicator for north folding
995      !!      condition together with global variables needed in the mpp folding
996      !!
997      !! ** Method  : - Look for northern processors
998      !!              - Put their number in nrank_north
999      !!              - Create groups for the world processors and the north processors
1000      !!              - Create a communicator for northern processors
1001      !!
1002      !! ** output
1003      !!      njmppmax = njmpp for northern procs
1004      !!      ndim_rank_north = number of processors in the northern line
1005      !!      nrank_north (ndim_rank_north) = number  of the northern procs.
1006      !!      ngrp_world = group ID for the world processors
1007      !!      ngrp_north = group ID for the northern processors
1008      !!      ncomm_north = communicator for the northern procs.
1009      !!      north_root = number (in the world) of proc 0 in the northern comm.
1010      !!
1011      !!----------------------------------------------------------------------
1012      INTEGER ::   ierr
1013      INTEGER ::   jjproc
1014      INTEGER ::   ii, ji
1015      !!----------------------------------------------------------------------
1016      !
1017      njmppmax = MAXVAL( njmppt )
1018      !
1019      ! Look for how many procs on the northern boundary
1020      ndim_rank_north = 0
1021      DO jjproc = 1, jpnij
1022         IF( njmppt(jjproc) == njmppmax )   ndim_rank_north = ndim_rank_north + 1
1023      END DO
1024      !
1025      ! Allocate the right size to nrank_north
1026      IF (ALLOCATED (nrank_north)) DEALLOCATE(nrank_north)
1027      ALLOCATE( nrank_north(ndim_rank_north) )
1028
1029      ! Fill the nrank_north array with proc. number of northern procs.
1030      ! Note : the rank start at 0 in MPI
1031      ii = 0
1032      DO ji = 1, jpnij
1033         IF ( njmppt(ji) == njmppmax   ) THEN
1034            ii=ii+1
1035            nrank_north(ii)=ji-1
1036         END IF
1037      END DO
1038      !
1039      ! create the world group
1040      CALL MPI_COMM_GROUP( mpi_comm_oce, ngrp_world, ierr )
1041      !
1042      ! Create the North group from the world group
1043      CALL MPI_GROUP_INCL( ngrp_world, ndim_rank_north, nrank_north, ngrp_north, ierr )
1044      !
1045      ! Create the North communicator , ie the pool of procs in the north group
1046      CALL MPI_COMM_CREATE( mpi_comm_oce, ngrp_north, ncomm_north, ierr )
1047      !
1048   END SUBROUTINE mpp_ini_north
1049
1050
1051   SUBROUTINE mpi_init_oce( ldtxt, ksft, code )
1052      !!---------------------------------------------------------------------
1053      !!                   ***  routine mpp_init.opa  ***
1054      !!
1055      !! ** Purpose :: export and attach a MPI buffer for bsend
1056      !!
1057      !! ** Method  :: define buffer size in namelist, if 0 no buffer attachment
1058      !!            but classical mpi_init
1059      !!
1060      !! History :: 01/11 :: IDRIS initial version for IBM only
1061      !!            08/04 :: R. Benshila, generalisation
1062      !!---------------------------------------------------------------------
1063      CHARACTER(len=*),DIMENSION(:), INTENT(  out) ::   ldtxt
1064      INTEGER                      , INTENT(inout) ::   ksft
1065      INTEGER                      , INTENT(  out) ::   code
1066      INTEGER                                      ::   ierr, ji
1067      LOGICAL                                      ::   mpi_was_called
1068      !!---------------------------------------------------------------------
1069      !
1070      CALL mpi_initialized( mpi_was_called, code )      ! MPI initialization
1071      IF ( code /= MPI_SUCCESS ) THEN
1072         DO ji = 1, SIZE(ldtxt)
1073            IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
1074         END DO
1075         WRITE(*, cform_err)
1076         WRITE(*, *) ' lib_mpp: Error in routine mpi_initialized'
1077         CALL mpi_abort( mpi_comm_world, code, ierr )
1078      ENDIF
1079      !
1080      IF( .NOT. mpi_was_called ) THEN
1081         CALL mpi_init( code )
1082         CALL mpi_comm_dup( mpi_comm_world, mpi_comm_oce, code )
1083         IF ( code /= MPI_SUCCESS ) THEN
1084            DO ji = 1, SIZE(ldtxt)
1085               IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
1086            END DO
1087            WRITE(*, cform_err)
1088            WRITE(*, *) ' lib_mpp: Error in routine mpi_comm_dup'
1089            CALL mpi_abort( mpi_comm_world, code, ierr )
1090         ENDIF
1091      ENDIF
1092      !
1093      IF( nn_buffer > 0 ) THEN
1094         WRITE(ldtxt(ksft),*) 'mpi_bsend, buffer allocation of  : ', nn_buffer   ;   ksft = ksft + 1
1095         ! Buffer allocation and attachment
1096         ALLOCATE( tampon(nn_buffer), stat = ierr )
1097         IF( ierr /= 0 ) THEN
1098            DO ji = 1, SIZE(ldtxt)
1099               IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
1100            END DO
1101            WRITE(*, cform_err)
1102            WRITE(*, *) ' lib_mpp: Error in ALLOCATE', ierr
1103            CALL mpi_abort( mpi_comm_world, code, ierr )
1104         END IF
1105         CALL mpi_buffer_attach( tampon, nn_buffer, code )
1106      ENDIF
1107      !
1108   END SUBROUTINE mpi_init_oce
1109
1110
1111   SUBROUTINE DDPDD_MPI( ydda, yddb, ilen, itype )
1112      !!---------------------------------------------------------------------
1113      !!   Routine DDPDD_MPI: used by reduction operator MPI_SUMDD
1114      !!
1115      !!   Modification of original codes written by David H. Bailey
1116      !!   This subroutine computes yddb(i) = ydda(i)+yddb(i)
1117      !!---------------------------------------------------------------------
1118      INTEGER                     , INTENT(in)    ::   ilen, itype
1119      COMPLEX(wp), DIMENSION(ilen), INTENT(in)    ::   ydda
1120      COMPLEX(wp), DIMENSION(ilen), INTENT(inout) ::   yddb
1121      !
1122      REAL(wp) :: zerr, zt1, zt2    ! local work variables
1123      INTEGER  :: ji, ztmp           ! local scalar
1124      !!---------------------------------------------------------------------
1125      !
1126      ztmp = itype   ! avoid compilation warning
1127      !
1128      DO ji=1,ilen
1129      ! Compute ydda + yddb using Knuth's trick.
1130         zt1  = real(ydda(ji)) + real(yddb(ji))
1131         zerr = zt1 - real(ydda(ji))
1132         zt2  = ((real(yddb(ji)) - zerr) + (real(ydda(ji)) - (zt1 - zerr))) &
1133                + aimag(ydda(ji)) + aimag(yddb(ji))
1134
1135         ! The result is zt1 + zt2, after normalization.
1136         yddb(ji) = cmplx ( zt1 + zt2, zt2 - ((zt1 + zt2) - zt1),wp )
1137      END DO
1138      !
1139   END SUBROUTINE DDPDD_MPI
1140
1141
1142   SUBROUTINE mpp_lbc_north_icb( pt2d, cd_type, psgn, kextj)
1143      !!---------------------------------------------------------------------
1144      !!                   ***  routine mpp_lbc_north_icb  ***
1145      !!
1146      !! ** Purpose :   Ensure proper north fold horizontal bondary condition
1147      !!              in mpp configuration in case of jpn1 > 1 and for 2d
1148      !!              array with outer extra halo
1149      !!
1150      !! ** Method  :   North fold condition and mpp with more than one proc
1151      !!              in i-direction require a specific treatment. We gather
1152      !!              the 4+kextj northern lines of the global domain on 1
1153      !!              processor and apply lbc north-fold on this sub array.
1154      !!              Then we scatter the north fold array back to the processors.
1155      !!              This routine accounts for an extra halo with icebergs
1156      !!              and assumes ghost rows and columns have been suppressed.
1157      !!
1158      !!----------------------------------------------------------------------
1159      REAL(wp), DIMENSION(:,:), INTENT(inout) ::   pt2d     ! 2D array with extra halo
1160      CHARACTER(len=1)        , INTENT(in   ) ::   cd_type  ! nature of pt3d grid-points
1161      !                                                     !   = T ,  U , V , F or W -points
1162      REAL(wp)                , INTENT(in   ) ::   psgn     ! = -1. the sign change across the
1163      !!                                                    ! north fold, =  1. otherwise
1164      INTEGER                 , INTENT(in   ) ::   kextj    ! Extra halo width at north fold
1165      !
1166      INTEGER ::   ji, jj, jr
1167      INTEGER ::   ierr, itaille, ildi, ilei, iilb
1168      INTEGER ::   ipj, ij, iproc
1169      !
1170      REAL(wp), DIMENSION(:,:)  , ALLOCATABLE  ::  ztab_e, znorthloc_e
1171      REAL(wp), DIMENSION(:,:,:), ALLOCATABLE  ::  znorthgloio_e
1172      !!----------------------------------------------------------------------
1173      !
1174      ipj=4
1175      ALLOCATE(        ztab_e(jpiglo, 1-kextj:ipj+kextj)       ,       &
1176     &            znorthloc_e(jpimax, 1-kextj:ipj+kextj)       ,       &
1177     &          znorthgloio_e(jpimax, 1-kextj:ipj+kextj,jpni)    )
1178      !
1179      ztab_e(:,:)      = 0._wp
1180      znorthloc_e(:,:) = 0._wp
1181      !
1182      ij = 1 - kextj
1183      ! put the last ipj+2*kextj lines of pt2d into znorthloc_e
1184      DO jj = jpj - ipj + 1 - kextj , jpj + kextj
1185         znorthloc_e(1:jpi,ij)=pt2d(1:jpi,jj)
1186         ij = ij + 1
1187      END DO
1188      !
1189      itaille = jpimax * ( ipj + 2*kextj )
1190      !
1191      CALL MPI_ALLGATHER( znorthloc_e(1,1-kextj)    , itaille, MPI_DOUBLE_PRECISION,    &
1192         &                znorthgloio_e(1,1-kextj,1), itaille, MPI_DOUBLE_PRECISION,    &
1193         &                ncomm_north, ierr )
1194      !
1195      !
1196      DO jr = 1, ndim_rank_north            ! recover the global north array
1197         iproc = nrank_north(jr) + 1
1198         ildi = nldit (iproc)
1199         ilei = nleit (iproc)
1200         iilb = nimppt(iproc)
1201         DO jj = 1-kextj, ipj+kextj
1202            DO ji = ildi, ilei
1203               ztab_e(ji+iilb-1,jj) = znorthgloio_e(ji,jj,jr)
1204            END DO
1205         END DO
1206      END DO
1207
1208      ! 2. North-Fold boundary conditions
1209      ! ----------------------------------
1210      CALL lbc_nfd( ztab_e(:,1-kextj:ipj+kextj), cd_type, psgn, kextj )
1211
1212      ij = 1 - kextj
1213      !! Scatter back to pt2d
1214      DO jj = jpj - ipj + 1 - kextj , jpj + kextj
1215         DO ji= 1, jpi
1216            pt2d(ji,jj) = ztab_e(ji+nimpp-1,ij)
1217         END DO
1218         ij  = ij +1
1219      END DO
1220      !
1221      DEALLOCATE( ztab_e, znorthloc_e, znorthgloio_e )
1222      !
1223   END SUBROUTINE mpp_lbc_north_icb
1224
1225
1226   SUBROUTINE mpp_lnk_2d_icb( cdname, pt2d, cd_type, psgn, kexti, kextj )
1227      !!----------------------------------------------------------------------
1228      !!                  ***  routine mpp_lnk_2d_icb  ***
1229      !!
1230      !! ** Purpose :   Message passing management for 2d array (with extra halo for icebergs)
1231      !!                This routine receives a (1-kexti:jpi+kexti,1-kexti:jpj+kextj)
1232      !!                array (usually (0:jpi+1, 0:jpj+1)) from lbc_lnk_icb calls.
1233      !!
1234      !! ** Method  :   Use mppsend and mpprecv function for passing mask
1235      !!      between processors following neighboring subdomains.
1236      !!            domain parameters
1237      !!                    jpi    : first dimension of the local subdomain
1238      !!                    jpj    : second dimension of the local subdomain
1239      !!                    kexti  : number of columns for extra outer halo
1240      !!                    kextj  : number of rows for extra outer halo
1241      !!                    nbondi : mark for "east-west local boundary"
1242      !!                    nbondj : mark for "north-south local boundary"
1243      !!                    noea   : number for local neighboring processors
1244      !!                    nowe   : number for local neighboring processors
1245      !!                    noso   : number for local neighboring processors
1246      !!                    nono   : number for local neighboring processors
1247      !!----------------------------------------------------------------------
1248      CHARACTER(len=*)                                        , INTENT(in   ) ::   cdname      ! name of the calling subroutine
1249      REAL(wp), DIMENSION(1-kexti:jpi+kexti,1-kextj:jpj+kextj), INTENT(inout) ::   pt2d     ! 2D array with extra halo
1250      CHARACTER(len=1)                                        , INTENT(in   ) ::   cd_type  ! nature of ptab array grid-points
1251      REAL(wp)                                                , INTENT(in   ) ::   psgn     ! sign used across the north fold
1252      INTEGER                                                 , INTENT(in   ) ::   kexti    ! extra i-halo width
1253      INTEGER                                                 , INTENT(in   ) ::   kextj    ! extra j-halo width
1254      !
1255      INTEGER  ::   jl   ! dummy loop indices
1256      INTEGER  ::   imigr, iihom, ijhom        ! local integers
1257      INTEGER  ::   ipreci, iprecj             !   -       -
1258      INTEGER  ::   ml_req1, ml_req2, ml_err   ! for key_mpi_isend
1259      INTEGER, DIMENSION(MPI_STATUS_SIZE) ::   ml_stat   ! for key_mpi_isend
1260      !!
1261      REAL(wp), DIMENSION(1-kexti:jpi+kexti,nn_hls+kextj,2) ::   r2dns, r2dsn
1262      REAL(wp), DIMENSION(1-kextj:jpj+kextj,nn_hls+kexti,2) ::   r2dwe, r2dew
1263      !!----------------------------------------------------------------------
1264
1265      ipreci = nn_hls + kexti      ! take into account outer extra 2D overlap area
1266      iprecj = nn_hls + kextj
1267
1268      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, 1, 1, 1, ld_lbc = .TRUE. )
1269
1270      ! 1. standard boundary treatment
1271      ! ------------------------------
1272      ! Order matters Here !!!!
1273      !
1274      !                                      ! East-West boundaries
1275      !                                           !* Cyclic east-west
1276      IF( l_Iperio ) THEN
1277         pt2d(1-kexti:     1   ,:) = pt2d(jpim1-kexti: jpim1 ,:)       ! east
1278         pt2d(  jpi  :jpi+kexti,:) = pt2d(     2     :2+kexti,:)       ! west
1279         !
1280      ELSE                                        !* closed
1281         IF( .NOT. cd_type == 'F' )   pt2d(  1-kexti   :nn_hls   ,:) = 0._wp    ! east except at F-point
1282                                      pt2d(jpi-nn_hls+1:jpi+kexti,:) = 0._wp    ! west
1283      ENDIF
1284      !                                      ! North-South boundaries
1285      IF( l_Jperio ) THEN                         !* cyclic (only with no mpp j-split)
1286         pt2d(:,1-kextj:     1   ) = pt2d(:,jpjm1-kextj:  jpjm1)       ! north
1287         pt2d(:,  jpj  :jpj+kextj) = pt2d(:,     2     :2+kextj)       ! south
1288      ELSE                                        !* closed
1289         IF( .NOT. cd_type == 'F' )   pt2d(:,  1-kextj   :nn_hls   ) = 0._wp    ! north except at F-point
1290                                      pt2d(:,jpj-nn_hls+1:jpj+kextj) = 0._wp    ! south
1291      ENDIF
1292      !
1293
1294      ! north fold treatment
1295      ! -----------------------
1296      IF( npolj /= 0 ) THEN
1297         !
1298         SELECT CASE ( jpni )
1299                   CASE ( 1 )     ;   CALL lbc_nfd          ( pt2d(1:jpi,1:jpj+kextj), cd_type, psgn, kextj )
1300                   CASE DEFAULT   ;   CALL mpp_lbc_north_icb( pt2d(1:jpi,1:jpj+kextj), cd_type, psgn, kextj )
1301         END SELECT
1302         !
1303      ENDIF
1304
1305      ! 2. East and west directions exchange
1306      ! ------------------------------------
1307      ! we play with the neigbours AND the row number because of the periodicity
1308      !
1309      SELECT CASE ( nbondi )      ! Read Dirichlet lateral conditions
1310      CASE ( -1, 0, 1 )                ! all exept 2 (i.e. close case)
1311         iihom = jpi-nreci-kexti
1312         DO jl = 1, ipreci
1313            r2dew(:,jl,1) = pt2d(nn_hls+jl,:)
1314            r2dwe(:,jl,1) = pt2d(iihom +jl,:)
1315         END DO
1316      END SELECT
1317      !
1318      !                           ! Migrations
1319      imigr = ipreci * ( jpj + 2*kextj )
1320      !
1321      SELECT CASE ( nbondi )
1322      CASE ( -1 )
1323         CALL mppsend( 2, r2dwe(1-kextj,1,1), imigr, noea, ml_req1 )
1324         CALL mpprecv( 1, r2dew(1-kextj,1,2), imigr, noea )
1325         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1326      CASE ( 0 )
1327         CALL mppsend( 1, r2dew(1-kextj,1,1), imigr, nowe, ml_req1 )
1328         CALL mppsend( 2, r2dwe(1-kextj,1,1), imigr, noea, ml_req2 )
1329         CALL mpprecv( 1, r2dew(1-kextj,1,2), imigr, noea )
1330         CALL mpprecv( 2, r2dwe(1-kextj,1,2), imigr, nowe )
1331         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1332         IF(l_isend) CALL mpi_wait(ml_req2,ml_stat,ml_err)
1333      CASE ( 1 )
1334         CALL mppsend( 1, r2dew(1-kextj,1,1), imigr, nowe, ml_req1 )
1335         CALL mpprecv( 2, r2dwe(1-kextj,1,2), imigr, nowe )
1336         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1337      END SELECT
1338      !
1339      !                           ! Write Dirichlet lateral conditions
1340      iihom = jpi - nn_hls
1341      !
1342      SELECT CASE ( nbondi )
1343      CASE ( -1 )
1344         DO jl = 1, ipreci
1345            pt2d(iihom+jl,:) = r2dew(:,jl,2)
1346         END DO
1347      CASE ( 0 )
1348         DO jl = 1, ipreci
1349            pt2d(jl-kexti,:) = r2dwe(:,jl,2)
1350            pt2d(iihom+jl,:) = r2dew(:,jl,2)
1351         END DO
1352      CASE ( 1 )
1353         DO jl = 1, ipreci
1354            pt2d(jl-kexti,:) = r2dwe(:,jl,2)
1355         END DO
1356      END SELECT
1357
1358
1359      ! 3. North and south directions
1360      ! -----------------------------
1361      ! always closed : we play only with the neigbours
1362      !
1363      IF( nbondj /= 2 ) THEN      ! Read Dirichlet lateral conditions
1364         ijhom = jpj-nrecj-kextj
1365         DO jl = 1, iprecj
1366            r2dsn(:,jl,1) = pt2d(:,ijhom +jl)
1367            r2dns(:,jl,1) = pt2d(:,nn_hls+jl)
1368         END DO
1369      ENDIF
1370      !
1371      !                           ! Migrations
1372      imigr = iprecj * ( jpi + 2*kexti )
1373      !
1374      SELECT CASE ( nbondj )
1375      CASE ( -1 )
1376         CALL mppsend( 4, r2dsn(1-kexti,1,1), imigr, nono, ml_req1 )
1377         CALL mpprecv( 3, r2dns(1-kexti,1,2), imigr, nono )
1378         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1379      CASE ( 0 )
1380         CALL mppsend( 3, r2dns(1-kexti,1,1), imigr, noso, ml_req1 )
1381         CALL mppsend( 4, r2dsn(1-kexti,1,1), imigr, nono, ml_req2 )
1382         CALL mpprecv( 3, r2dns(1-kexti,1,2), imigr, nono )
1383         CALL mpprecv( 4, r2dsn(1-kexti,1,2), imigr, noso )
1384         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1385         IF(l_isend) CALL mpi_wait(ml_req2,ml_stat,ml_err)
1386      CASE ( 1 )
1387         CALL mppsend( 3, r2dns(1-kexti,1,1), imigr, noso, ml_req1 )
1388         CALL mpprecv( 4, r2dsn(1-kexti,1,2), imigr, noso )
1389         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1390      END SELECT
1391      !
1392      !                           ! Write Dirichlet lateral conditions
1393      ijhom = jpj - nn_hls
1394      !
1395      SELECT CASE ( nbondj )
1396      CASE ( -1 )
1397         DO jl = 1, iprecj
1398            pt2d(:,ijhom+jl) = r2dns(:,jl,2)
1399         END DO
1400      CASE ( 0 )
1401         DO jl = 1, iprecj
1402            pt2d(:,jl-kextj) = r2dsn(:,jl,2)
1403            pt2d(:,ijhom+jl) = r2dns(:,jl,2)
1404         END DO
1405      CASE ( 1 )
1406         DO jl = 1, iprecj
1407            pt2d(:,jl-kextj) = r2dsn(:,jl,2)
1408         END DO
1409      END SELECT
1410      !
1411   END SUBROUTINE mpp_lnk_2d_icb
1412
1413
1414   SUBROUTINE mpp_report( cdname, kpk, kpl, kpf, ld_lbc, ld_glb, ld_dlg )
1415      !!----------------------------------------------------------------------
1416      !!                  ***  routine mpp_report  ***
1417      !!
1418      !! ** Purpose :   report use of mpp routines per time-setp
1419      !!
1420      !!----------------------------------------------------------------------
1421      CHARACTER(len=*),           INTENT(in   ) ::   cdname      ! name of the calling subroutine
1422      INTEGER         , OPTIONAL, INTENT(in   ) ::   kpk, kpl, kpf
1423      LOGICAL         , OPTIONAL, INTENT(in   ) ::   ld_lbc, ld_glb, ld_dlg
1424      !!
1425      LOGICAL ::   ll_lbc, ll_glb, ll_dlg
1426      INTEGER ::    ji,  jj,  jk,  jh, jf   ! dummy loop indices
1427      !!----------------------------------------------------------------------
1428      !
1429      ll_lbc = .FALSE.
1430      IF( PRESENT(ld_lbc) ) ll_lbc = ld_lbc
1431      ll_glb = .FALSE.
1432      IF( PRESENT(ld_glb) ) ll_glb = ld_glb
1433      ll_dlg = .FALSE.
1434      IF( PRESENT(ld_dlg) ) ll_dlg = ld_dlg
1435      !
1436      ! find the smallest common frequency: default = frequency product, if multiple, choose the larger of the 2 frequency
1437      IF( ncom_dttrc /= 1 )   CALL ctl_stop( 'STOP', 'mpp_report, ncom_dttrc /= 1 not coded...' ) 
1438      ncom_freq = ncom_fsbc
1439      !
1440      IF ( ncom_stp == nit000+ncom_freq ) THEN   ! avoid to count extra communications in potential initializations at nit000
1441         IF( ll_lbc ) THEN
1442            IF( .NOT. ALLOCATED(ncomm_sequence) ) ALLOCATE( ncomm_sequence(ncom_rec_max,2) )
1443            IF( .NOT. ALLOCATED(    crname_lbc) ) ALLOCATE(     crname_lbc(ncom_rec_max  ) )
1444            n_sequence_lbc = n_sequence_lbc + 1
1445            IF( n_sequence_lbc > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1446            crname_lbc(n_sequence_lbc) = cdname     ! keep the name of the calling routine
1447            ncomm_sequence(n_sequence_lbc,1) = kpk*kpl   ! size of 3rd and 4th dimensions
1448            ncomm_sequence(n_sequence_lbc,2) = kpf       ! number of arrays to be treated (multi)
1449         ENDIF
1450         IF( ll_glb ) THEN
1451            IF( .NOT. ALLOCATED(crname_glb) ) ALLOCATE( crname_glb(ncom_rec_max) )
1452            n_sequence_glb = n_sequence_glb + 1
1453            IF( n_sequence_glb > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1454            crname_glb(n_sequence_glb) = cdname     ! keep the name of the calling routine
1455         ENDIF
1456         IF( ll_dlg ) THEN
1457            IF( .NOT. ALLOCATED(crname_dlg) ) ALLOCATE( crname_dlg(ncom_rec_max) )
1458            n_sequence_dlg = n_sequence_dlg + 1
1459            IF( n_sequence_dlg > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1460            crname_dlg(n_sequence_dlg) = cdname     ! keep the name of the calling routine
1461         ENDIF
1462      ELSE IF ( ncom_stp == nit000+2*ncom_freq ) THEN
1463         CALL ctl_opn( numcom, 'communication_report.txt', 'REPLACE', 'FORMATTED', 'SEQUENTIAL', -1, numout, .FALSE., narea )
1464         WRITE(numcom,*) ' '
1465         WRITE(numcom,*) ' ------------------------------------------------------------'
1466         WRITE(numcom,*) ' Communication pattern report (second oce+sbc+top time step):'
1467         WRITE(numcom,*) ' ------------------------------------------------------------'
1468         WRITE(numcom,*) ' '
1469         WRITE(numcom,'(A,I4)') ' Exchanged halos : ', n_sequence_lbc
1470         jj = 0; jk = 0; jf = 0; jh = 0
1471         DO ji = 1, n_sequence_lbc
1472            IF ( ncomm_sequence(ji,1) .GT. 1 ) jk = jk + 1
1473            IF ( ncomm_sequence(ji,2) .GT. 1 ) jf = jf + 1
1474            IF ( ncomm_sequence(ji,1) .GT. 1 .AND. ncomm_sequence(ji,2) .GT. 1 ) jj = jj + 1
1475            jh = MAX (jh, ncomm_sequence(ji,1)*ncomm_sequence(ji,2))
1476         END DO
1477         WRITE(numcom,'(A,I3)') ' 3D Exchanged halos : ', jk
1478         WRITE(numcom,'(A,I3)') ' Multi arrays exchanged halos : ', jf
1479         WRITE(numcom,'(A,I3)') '   from which 3D : ', jj
1480         WRITE(numcom,'(A,I10)') ' Array max size : ', jh*jpi*jpj
1481         WRITE(numcom,*) ' '
1482         WRITE(numcom,*) ' lbc_lnk called'
1483         jj = 1
1484         DO ji = 2, n_sequence_lbc
1485            IF( crname_lbc(ji-1) /= crname_lbc(ji) ) THEN
1486               WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_lbc(ji-1))
1487               jj = 0
1488            END IF
1489            jj = jj + 1 
1490         END DO
1491         WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_lbc(n_sequence_lbc))
1492         WRITE(numcom,*) ' '
1493         IF ( n_sequence_glb > 0 ) THEN
1494            WRITE(numcom,'(A,I4)') ' Global communications : ', n_sequence_glb
1495            jj = 1
1496            DO ji = 2, n_sequence_glb
1497               IF( crname_glb(ji-1) /= crname_glb(ji) ) THEN
1498                  WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(ji-1))
1499                  jj = 0
1500               END IF
1501               jj = jj + 1 
1502            END DO
1503            WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(n_sequence_glb))
1504            DEALLOCATE(crname_glb)
1505         ELSE
1506            WRITE(numcom,*) ' No MPI global communication '
1507         ENDIF
1508         WRITE(numcom,*) ' '
1509         IF ( n_sequence_dlg > 0 ) THEN
1510            WRITE(numcom,'(A,I4)') ' Delayed global communications : ', n_sequence_dlg
1511            jj = 1
1512            DO ji = 2, n_sequence_dlg
1513               IF( crname_dlg(ji-1) /= crname_dlg(ji) ) THEN
1514                  WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_dlg(ji-1))
1515                  jj = 0
1516               END IF
1517               jj = jj + 1 
1518            END DO
1519            WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_dlg(n_sequence_dlg))
1520            DEALLOCATE(crname_dlg)
1521         ELSE
1522            WRITE(numcom,*) ' No MPI delayed global communication '
1523         ENDIF
1524         WRITE(numcom,*) ' '
1525         WRITE(numcom,*) ' -----------------------------------------------'
1526         WRITE(numcom,*) ' '
1527         DEALLOCATE(ncomm_sequence)
1528         DEALLOCATE(crname_lbc)
1529      ENDIF
1530   END SUBROUTINE mpp_report
1531
1532   
1533   SUBROUTINE tic_tac (ld_tic, ld_global)
1534
1535    LOGICAL,           INTENT(IN) :: ld_tic
1536    LOGICAL, OPTIONAL, INTENT(IN) :: ld_global
1537    REAL(wp), DIMENSION(2), SAVE :: tic_wt
1538    REAL(wp),               SAVE :: tic_ct = 0._wp
1539    INTEGER :: ii
1540
1541    IF( ncom_stp <= nit000 ) RETURN
1542    IF( ncom_stp == nitend ) RETURN
1543    ii = 1
1544    IF( PRESENT( ld_global ) ) THEN
1545       IF( ld_global ) ii = 2
1546    END IF
1547   
1548    IF ( ld_tic ) THEN
1549       tic_wt(ii) = MPI_Wtime()                                                    ! start count tic->tac (waiting time)
1550       IF ( tic_ct > 0.0_wp ) compute_time = compute_time + MPI_Wtime() - tic_ct   ! cumulate count tac->tic
1551    ELSE
1552       waiting_time(ii) = waiting_time(ii) + MPI_Wtime() - tic_wt(ii)              ! cumulate count tic->tac
1553       tic_ct = MPI_Wtime()                                                        ! start count tac->tic (waiting time)
1554    ENDIF
1555   
1556   END SUBROUTINE tic_tac
1557
1558   
1559#else
1560   !!----------------------------------------------------------------------
1561   !!   Default case:            Dummy module        share memory computing
1562   !!----------------------------------------------------------------------
1563   USE in_out_manager
1564
1565   INTERFACE mpp_sum
1566      MODULE PROCEDURE mppsum_int, mppsum_a_int, mppsum_real, mppsum_a_real, mppsum_realdd, mppsum_a_realdd
1567   END INTERFACE
1568   INTERFACE mpp_max
1569      MODULE PROCEDURE mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real
1570   END INTERFACE
1571   INTERFACE mpp_min
1572      MODULE PROCEDURE mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real
1573   END INTERFACE
1574   INTERFACE mpp_minloc
1575      MODULE PROCEDURE mpp_minloc2d ,mpp_minloc3d
1576   END INTERFACE
1577   INTERFACE mpp_maxloc
1578      MODULE PROCEDURE mpp_maxloc2d ,mpp_maxloc3d
1579   END INTERFACE
1580
1581   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .FALSE.      !: mpp flag
1582   LOGICAL, PUBLIC            ::   ln_nnogather          !: namelist control of northfold comms (needed here in case "key_mpp_mpi" is not used)
1583   INTEGER, PUBLIC            ::   mpi_comm_oce          ! opa local communicator
1584
1585   INTEGER, PARAMETER, PUBLIC               ::   nbdelay = 0   ! make sure we don't enter loops: DO ji = 1, nbdelay
1586   CHARACTER(len=32), DIMENSION(1), PUBLIC  ::   c_delaylist = 'empty'
1587   CHARACTER(len=32), DIMENSION(1), PUBLIC  ::   c_delaycpnt = 'empty'
1588   LOGICAL, PUBLIC                          ::   l_full_nf_update = .TRUE.
1589   TYPE ::   DELAYARR
1590      REAL(   wp), POINTER, DIMENSION(:) ::  z1d => NULL()
1591      COMPLEX(wp), POINTER, DIMENSION(:) ::  y1d => NULL()
1592   END TYPE DELAYARR
1593   TYPE( DELAYARR ), DIMENSION(1), PUBLIC  ::   todelay             
1594   INTEGER,  PUBLIC, DIMENSION(1)           ::   ndelayid = -1
1595   !!----------------------------------------------------------------------
1596CONTAINS
1597
1598   INTEGER FUNCTION lib_mpp_alloc(kumout)          ! Dummy function
1599      INTEGER, INTENT(in) ::   kumout
1600      lib_mpp_alloc = 0
1601   END FUNCTION lib_mpp_alloc
1602
1603   FUNCTION mynode( ldtxt, ldname, kumnam_ref, knumnam_cfg,  kumond , kstop, localComm ) RESULT (function_value)
1604      INTEGER, OPTIONAL            , INTENT(in   ) ::   localComm
1605      CHARACTER(len=*),DIMENSION(:) ::   ldtxt
1606      CHARACTER(len=*) ::   ldname
1607      INTEGER ::   kumnam_ref, knumnam_cfg , kumond , kstop
1608      IF( PRESENT( localComm ) ) mpi_comm_oce = localComm
1609      function_value = 0
1610      IF( .FALSE. )   ldtxt(:) = 'never done'
1611      CALL ctl_opn( kumond, TRIM(ldname), 'UNKNOWN', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. , 1 )
1612   END FUNCTION mynode
1613
1614   SUBROUTINE mppsync                       ! Dummy routine
1615   END SUBROUTINE mppsync
1616
1617   !!----------------------------------------------------------------------
1618   !!    ***  mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real  ***
1619   !!   
1620   !!----------------------------------------------------------------------
1621   !!
1622#  define OPERATION_MAX
1623#  define INTEGER_TYPE
1624#  define DIM_0d
1625#     define ROUTINE_ALLREDUCE           mppmax_int
1626#     include "mpp_allreduce_generic.h90"
1627#     undef ROUTINE_ALLREDUCE
1628#  undef DIM_0d
1629#  define DIM_1d
1630#     define ROUTINE_ALLREDUCE           mppmax_a_int
1631#     include "mpp_allreduce_generic.h90"
1632#     undef ROUTINE_ALLREDUCE
1633#  undef DIM_1d
1634#  undef INTEGER_TYPE
1635!
1636#  define REAL_TYPE
1637#  define DIM_0d
1638#     define ROUTINE_ALLREDUCE           mppmax_real
1639#     include "mpp_allreduce_generic.h90"
1640#     undef ROUTINE_ALLREDUCE
1641#  undef DIM_0d
1642#  define DIM_1d
1643#     define ROUTINE_ALLREDUCE           mppmax_a_real
1644#     include "mpp_allreduce_generic.h90"
1645#     undef ROUTINE_ALLREDUCE
1646#  undef DIM_1d
1647#  undef REAL_TYPE
1648#  undef OPERATION_MAX
1649   !!----------------------------------------------------------------------
1650   !!    ***  mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real  ***
1651   !!   
1652   !!----------------------------------------------------------------------
1653   !!
1654#  define OPERATION_MIN
1655#  define INTEGER_TYPE
1656#  define DIM_0d
1657#     define ROUTINE_ALLREDUCE           mppmin_int
1658#     include "mpp_allreduce_generic.h90"
1659#     undef ROUTINE_ALLREDUCE
1660#  undef DIM_0d
1661#  define DIM_1d
1662#     define ROUTINE_ALLREDUCE           mppmin_a_int
1663#     include "mpp_allreduce_generic.h90"
1664#     undef ROUTINE_ALLREDUCE
1665#  undef DIM_1d
1666#  undef INTEGER_TYPE
1667!
1668#  define REAL_TYPE
1669#  define DIM_0d
1670#     define ROUTINE_ALLREDUCE           mppmin_real
1671#     include "mpp_allreduce_generic.h90"
1672#     undef ROUTINE_ALLREDUCE
1673#  undef DIM_0d
1674#  define DIM_1d
1675#     define ROUTINE_ALLREDUCE           mppmin_a_real
1676#     include "mpp_allreduce_generic.h90"
1677#     undef ROUTINE_ALLREDUCE
1678#  undef DIM_1d
1679#  undef REAL_TYPE
1680#  undef OPERATION_MIN
1681
1682   !!----------------------------------------------------------------------
1683   !!    ***  mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real  ***
1684   !!   
1685   !!   Global sum of 1D array or a variable (integer, real or complex)
1686   !!----------------------------------------------------------------------
1687   !!
1688#  define OPERATION_SUM
1689#  define INTEGER_TYPE
1690#  define DIM_0d
1691#     define ROUTINE_ALLREDUCE           mppsum_int
1692#     include "mpp_allreduce_generic.h90"
1693#     undef ROUTINE_ALLREDUCE
1694#  undef DIM_0d
1695#  define DIM_1d
1696#     define ROUTINE_ALLREDUCE           mppsum_a_int
1697#     include "mpp_allreduce_generic.h90"
1698#     undef ROUTINE_ALLREDUCE
1699#  undef DIM_1d
1700#  undef INTEGER_TYPE
1701!
1702#  define REAL_TYPE
1703#  define DIM_0d
1704#     define ROUTINE_ALLREDUCE           mppsum_real
1705#     include "mpp_allreduce_generic.h90"
1706#     undef ROUTINE_ALLREDUCE
1707#  undef DIM_0d
1708#  define DIM_1d
1709#     define ROUTINE_ALLREDUCE           mppsum_a_real
1710#     include "mpp_allreduce_generic.h90"
1711#     undef ROUTINE_ALLREDUCE
1712#  undef DIM_1d
1713#  undef REAL_TYPE
1714#  undef OPERATION_SUM
1715
1716#  define OPERATION_SUM_DD
1717#  define COMPLEX_TYPE
1718#  define DIM_0d
1719#     define ROUTINE_ALLREDUCE           mppsum_realdd
1720#     include "mpp_allreduce_generic.h90"
1721#     undef ROUTINE_ALLREDUCE
1722#  undef DIM_0d
1723#  define DIM_1d
1724#     define ROUTINE_ALLREDUCE           mppsum_a_realdd
1725#     include "mpp_allreduce_generic.h90"
1726#     undef ROUTINE_ALLREDUCE
1727#  undef DIM_1d
1728#  undef COMPLEX_TYPE
1729#  undef OPERATION_SUM_DD
1730
1731   !!----------------------------------------------------------------------
1732   !!    ***  mpp_minloc2d, mpp_minloc3d, mpp_maxloc2d, mpp_maxloc3d
1733   !!   
1734   !!----------------------------------------------------------------------
1735   !!
1736#  define OPERATION_MINLOC
1737#  define DIM_2d
1738#     define ROUTINE_LOC           mpp_minloc2d
1739#     include "mpp_loc_generic.h90"
1740#     undef ROUTINE_LOC
1741#  undef DIM_2d
1742#  define DIM_3d
1743#     define ROUTINE_LOC           mpp_minloc3d
1744#     include "mpp_loc_generic.h90"
1745#     undef ROUTINE_LOC
1746#  undef DIM_3d
1747#  undef OPERATION_MINLOC
1748
1749#  define OPERATION_MAXLOC
1750#  define DIM_2d
1751#     define ROUTINE_LOC           mpp_maxloc2d
1752#     include "mpp_loc_generic.h90"
1753#     undef ROUTINE_LOC
1754#  undef DIM_2d
1755#  define DIM_3d
1756#     define ROUTINE_LOC           mpp_maxloc3d
1757#     include "mpp_loc_generic.h90"
1758#     undef ROUTINE_LOC
1759#  undef DIM_3d
1760#  undef OPERATION_MAXLOC
1761
1762   SUBROUTINE mpp_delay_sum( cdname, cdelay, y_in, pout, ldlast, kcom )
1763      CHARACTER(len=*), INTENT(in   )               ::   cdname  ! name of the calling subroutine
1764      CHARACTER(len=*), INTENT(in   )               ::   cdelay  ! name (used as id) of the delayed operation
1765      COMPLEX(wp),      INTENT(in   ), DIMENSION(:) ::   y_in
1766      REAL(wp),         INTENT(  out), DIMENSION(:) ::   pout
1767      LOGICAL,          INTENT(in   )               ::   ldlast  ! true if this is the last time we call this routine
1768      INTEGER,          INTENT(in   ), OPTIONAL     ::   kcom
1769      !
1770      pout(:) = REAL(y_in(:), wp)
1771   END SUBROUTINE mpp_delay_sum
1772
1773   SUBROUTINE mpp_delay_max( cdname, cdelay, p_in, pout, ldlast, kcom )
1774      CHARACTER(len=*), INTENT(in   )               ::   cdname  ! name of the calling subroutine
1775      CHARACTER(len=*), INTENT(in   )               ::   cdelay  ! name (used as id) of the delayed operation
1776      REAL(wp),         INTENT(in   ), DIMENSION(:) ::   p_in
1777      REAL(wp),         INTENT(  out), DIMENSION(:) ::   pout
1778      LOGICAL,          INTENT(in   )               ::   ldlast  ! true if this is the last time we call this routine
1779      INTEGER,          INTENT(in   ), OPTIONAL     ::   kcom
1780      !
1781      pout(:) = p_in(:)
1782   END SUBROUTINE mpp_delay_max
1783
1784   SUBROUTINE mpp_delay_rcv( kid )
1785      INTEGER,INTENT(in   )      ::  kid 
1786      WRITE(*,*) 'mpp_delay_rcv: You should not have seen this print! error?', kid
1787   END SUBROUTINE mpp_delay_rcv
1788   
1789   SUBROUTINE mppstop( ldfinal, ld_force_abort )
1790      LOGICAL, OPTIONAL, INTENT(in) :: ldfinal    ! source process number
1791      LOGICAL, OPTIONAL, INTENT(in) :: ld_force_abort    ! source process number
1792      STOP      ! non MPP case, just stop the run
1793   END SUBROUTINE mppstop
1794
1795   SUBROUTINE mpp_ini_znl( knum )
1796      INTEGER :: knum
1797      WRITE(*,*) 'mpp_ini_znl: You should not have seen this print! error?', knum
1798   END SUBROUTINE mpp_ini_znl
1799
1800   SUBROUTINE mpp_comm_free( kcom )
1801      INTEGER :: kcom
1802      WRITE(*,*) 'mpp_comm_free: You should not have seen this print! error?', kcom
1803   END SUBROUTINE mpp_comm_free
1804   
1805#endif
1806
1807   !!----------------------------------------------------------------------
1808   !!   All cases:         ctl_stop, ctl_warn, get_unit, ctl_opn, ctl_nam   routines
1809   !!----------------------------------------------------------------------
1810
1811   SUBROUTINE ctl_stop( cd1, cd2, cd3, cd4, cd5 ,   &
1812      &                 cd6, cd7, cd8, cd9, cd10 )
1813      !!----------------------------------------------------------------------
1814      !!                  ***  ROUTINE  stop_opa  ***
1815      !!
1816      !! ** Purpose :   print in ocean.outpput file a error message and
1817      !!                increment the error number (nstop) by one.
1818      !!----------------------------------------------------------------------
1819      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd1, cd2, cd3, cd4, cd5
1820      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd6, cd7, cd8, cd9, cd10
1821      !!----------------------------------------------------------------------
1822      !
1823      nstop = nstop + 1
1824
1825      ! force to open ocean.output file
1826      IF( numout == 6 ) CALL ctl_opn( numout, 'ocean.output', 'APPEND', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. )
1827       
1828      WRITE(numout,cform_err)
1829      IF( PRESENT(cd1 ) )   WRITE(numout,*) TRIM(cd1)
1830      IF( PRESENT(cd2 ) )   WRITE(numout,*) TRIM(cd2)
1831      IF( PRESENT(cd3 ) )   WRITE(numout,*) TRIM(cd3)
1832      IF( PRESENT(cd4 ) )   WRITE(numout,*) TRIM(cd4)
1833      IF( PRESENT(cd5 ) )   WRITE(numout,*) TRIM(cd5)
1834      IF( PRESENT(cd6 ) )   WRITE(numout,*) TRIM(cd6)
1835      IF( PRESENT(cd7 ) )   WRITE(numout,*) TRIM(cd7)
1836      IF( PRESENT(cd8 ) )   WRITE(numout,*) TRIM(cd8)
1837      IF( PRESENT(cd9 ) )   WRITE(numout,*) TRIM(cd9)
1838      IF( PRESENT(cd10) )   WRITE(numout,*) TRIM(cd10)
1839
1840                               CALL FLUSH(numout    )
1841      IF( numstp     /= -1 )   CALL FLUSH(numstp    )
1842      IF( numrun     /= -1 )   CALL FLUSH(numrun    )
1843      IF( numevo_ice /= -1 )   CALL FLUSH(numevo_ice)
1844      !
1845      IF( cd1 == 'STOP' ) THEN
1846         WRITE(numout,*)  'huge E-R-R-O-R : immediate stop'
1847         CALL mppstop(ld_force_abort = .true.)
1848      ENDIF
1849      !
1850   END SUBROUTINE ctl_stop
1851
1852
1853   SUBROUTINE ctl_warn( cd1, cd2, cd3, cd4, cd5,   &
1854      &                 cd6, cd7, cd8, cd9, cd10 )
1855      !!----------------------------------------------------------------------
1856      !!                  ***  ROUTINE  stop_warn  ***
1857      !!
1858      !! ** Purpose :   print in ocean.outpput file a error message and
1859      !!                increment the warning number (nwarn) by one.
1860      !!----------------------------------------------------------------------
1861      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd1, cd2, cd3, cd4, cd5
1862      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd6, cd7, cd8, cd9, cd10
1863      !!----------------------------------------------------------------------
1864      !
1865      nwarn = nwarn + 1
1866      IF(lwp) THEN
1867         WRITE(numout,cform_war)
1868         IF( PRESENT(cd1 ) ) WRITE(numout,*) TRIM(cd1)
1869         IF( PRESENT(cd2 ) ) WRITE(numout,*) TRIM(cd2)
1870         IF( PRESENT(cd3 ) ) WRITE(numout,*) TRIM(cd3)
1871         IF( PRESENT(cd4 ) ) WRITE(numout,*) TRIM(cd4)
1872         IF( PRESENT(cd5 ) ) WRITE(numout,*) TRIM(cd5)
1873         IF( PRESENT(cd6 ) ) WRITE(numout,*) TRIM(cd6)
1874         IF( PRESENT(cd7 ) ) WRITE(numout,*) TRIM(cd7)
1875         IF( PRESENT(cd8 ) ) WRITE(numout,*) TRIM(cd8)
1876         IF( PRESENT(cd9 ) ) WRITE(numout,*) TRIM(cd9)
1877         IF( PRESENT(cd10) ) WRITE(numout,*) TRIM(cd10)
1878      ENDIF
1879      CALL FLUSH(numout)
1880      !
1881   END SUBROUTINE ctl_warn
1882
1883
1884   SUBROUTINE ctl_opn( knum, cdfile, cdstat, cdform, cdacce, klengh, kout, ldwp, karea )
1885      !!----------------------------------------------------------------------
1886      !!                  ***  ROUTINE ctl_opn  ***
1887      !!
1888      !! ** Purpose :   Open file and check if required file is available.
1889      !!
1890      !! ** Method  :   Fortan open
1891      !!----------------------------------------------------------------------
1892      INTEGER          , INTENT(  out) ::   knum      ! logical unit to open
1893      CHARACTER(len=*) , INTENT(in   ) ::   cdfile    ! file name to open
1894      CHARACTER(len=*) , INTENT(in   ) ::   cdstat    ! disposition specifier
1895      CHARACTER(len=*) , INTENT(in   ) ::   cdform    ! formatting specifier
1896      CHARACTER(len=*) , INTENT(in   ) ::   cdacce    ! access specifier
1897      INTEGER          , INTENT(in   ) ::   klengh    ! record length
1898      INTEGER          , INTENT(in   ) ::   kout      ! number of logical units for write
1899      LOGICAL          , INTENT(in   ) ::   ldwp      ! boolean term for print
1900      INTEGER, OPTIONAL, INTENT(in   ) ::   karea     ! proc number
1901      !
1902      CHARACTER(len=80) ::   clfile
1903      INTEGER           ::   iost
1904      !!----------------------------------------------------------------------
1905      !
1906      ! adapt filename
1907      ! ----------------
1908      clfile = TRIM(cdfile)
1909      IF( PRESENT( karea ) ) THEN
1910         IF( karea > 1 )   WRITE(clfile, "(a,'_',i4.4)") TRIM(clfile), karea-1
1911      ENDIF
1912#if defined key_agrif
1913      IF( .NOT. Agrif_Root() )   clfile = TRIM(Agrif_CFixed())//'_'//TRIM(clfile)
1914      knum=Agrif_Get_Unit()
1915#else
1916      knum=get_unit()
1917#endif
1918      IF( TRIM(cdfile) == '/dev/null' )   clfile = TRIM(cdfile)   ! force the use of /dev/null
1919      !
1920      iost=0
1921      IF( cdacce(1:6) == 'DIRECT' )  THEN         ! cdacce has always more than 6 characters
1922         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat, RECL=klengh         , ERR=100, IOSTAT=iost )
1923      ELSE IF( TRIM(cdstat) == 'APPEND' )  THEN   ! cdstat can have less than 6 characters
1924         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS='UNKNOWN', POSITION='APPEND', ERR=100, IOSTAT=iost )
1925      ELSE
1926         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )
1927      ENDIF
1928      IF( iost /= 0 .AND. TRIM(clfile) == '/dev/null' ) &   ! for windows
1929         &  OPEN(UNIT=knum,FILE='NUL', FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )   
1930      IF( iost == 0 ) THEN
1931         IF(ldwp) THEN
1932            WRITE(kout,*) '     file   : ', TRIM(clfile),' open ok'
1933            WRITE(kout,*) '     unit   = ', knum
1934            WRITE(kout,*) '     status = ', cdstat
1935            WRITE(kout,*) '     form   = ', cdform
1936            WRITE(kout,*) '     access = ', cdacce
1937            WRITE(kout,*)
1938         ENDIF
1939      ENDIF
1940100   CONTINUE
1941      IF( iost /= 0 ) THEN
1942         IF(ldwp) THEN
1943            WRITE(kout,*)
1944            WRITE(kout,*) ' ===>>>> : bad opening file: ', TRIM(clfile)
1945            WRITE(kout,*) ' =======   ===  '
1946            WRITE(kout,*) '           unit   = ', knum
1947            WRITE(kout,*) '           status = ', cdstat
1948            WRITE(kout,*) '           form   = ', cdform
1949            WRITE(kout,*) '           access = ', cdacce
1950            WRITE(kout,*) '           iostat = ', iost
1951            WRITE(kout,*) '           we stop. verify the file '
1952            WRITE(kout,*)
1953         ELSE  !!! Force writing to make sure we get the information - at least once - in this violent STOP!!
1954            WRITE(*,*)
1955            WRITE(*,*) ' ===>>>> : bad opening file: ', TRIM(clfile)
1956            WRITE(*,*) ' =======   ===  '
1957            WRITE(*,*) '           unit   = ', knum
1958            WRITE(*,*) '           status = ', cdstat
1959            WRITE(*,*) '           form   = ', cdform
1960            WRITE(*,*) '           access = ', cdacce
1961            WRITE(*,*) '           iostat = ', iost
1962            WRITE(*,*) '           we stop. verify the file '
1963            WRITE(*,*)
1964         ENDIF
1965         CALL FLUSH( kout ) 
1966         STOP 'ctl_opn bad opening'
1967      ENDIF
1968      !
1969   END SUBROUTINE ctl_opn
1970
1971
1972   SUBROUTINE ctl_nam ( kios, cdnam, ldwp )
1973      !!----------------------------------------------------------------------
1974      !!                  ***  ROUTINE ctl_nam  ***
1975      !!
1976      !! ** Purpose :   Informations when error while reading a namelist
1977      !!
1978      !! ** Method  :   Fortan open
1979      !!----------------------------------------------------------------------
1980      INTEGER         , INTENT(inout) ::   kios    ! IO status after reading the namelist
1981      CHARACTER(len=*), INTENT(in   ) ::   cdnam   ! group name of namelist for which error occurs
1982      CHARACTER(len=5)                ::   clios   ! string to convert iostat in character for print
1983      LOGICAL         , INTENT(in   ) ::   ldwp    ! boolean term for print
1984      !!----------------------------------------------------------------------
1985      !
1986      WRITE (clios, '(I5.0)')   kios
1987      IF( kios < 0 ) THEN         
1988         CALL ctl_warn( 'end of record or file while reading namelist '   &
1989            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
1990      ENDIF
1991      !
1992      IF( kios > 0 ) THEN
1993         CALL ctl_stop( 'misspelled variable in namelist '   &
1994            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
1995      ENDIF
1996      kios = 0
1997      RETURN
1998      !
1999   END SUBROUTINE ctl_nam
2000
2001
2002   INTEGER FUNCTION get_unit()
2003      !!----------------------------------------------------------------------
2004      !!                  ***  FUNCTION  get_unit  ***
2005      !!
2006      !! ** Purpose :   return the index of an unused logical unit
2007      !!----------------------------------------------------------------------
2008      LOGICAL :: llopn
2009      !!----------------------------------------------------------------------
2010      !
2011      get_unit = 15   ! choose a unit that is big enough then it is not already used in NEMO
2012      llopn = .TRUE.
2013      DO WHILE( (get_unit < 998) .AND. llopn )
2014         get_unit = get_unit + 1
2015         INQUIRE( unit = get_unit, opened = llopn )
2016      END DO
2017      IF( (get_unit == 999) .AND. llopn ) THEN
2018         CALL ctl_stop( 'get_unit: All logical units until 999 are used...' )
2019         get_unit = -1
2020      ENDIF
2021      !
2022   END FUNCTION get_unit
2023
2024   !!----------------------------------------------------------------------
2025END MODULE lib_mpp
Note: See TracBrowser for help on using the repository browser.