New URL for NEMO forge!   http://forge.nemo-ocean.eu

Since March 2022 along with NEMO 4.2 release, the code development moved to a self-hosted GitLab.
This present forge is now archived and remained online for history.
lib_mpp.F90 in NEMO/branches/UKMO/NEMO_4.0_mirror_text_diagnostics/src/OCE/LBC – NEMO

source: NEMO/branches/UKMO/NEMO_4.0_mirror_text_diagnostics/src/OCE/LBC/lib_mpp.F90 @ 10986

Last change on this file since 10986 was 10986, checked in by andmirek, 5 years ago

GMED 462 add flush

File size: 90.6 KB
Line 
1MODULE lib_mpp
2   !!======================================================================
3   !!                       ***  MODULE  lib_mpp  ***
4   !! Ocean numerics:  massively parallel processing library
5   !!=====================================================================
6   !! History :  OPA  !  1994  (M. Guyon, J. Escobar, M. Imbard)  Original code
7   !!            7.0  !  1997  (A.M. Treguier)  SHMEM additions
8   !!            8.0  !  1998  (M. Imbard, J. Escobar, L. Colombet ) SHMEM and MPI
9   !!                 !  1998  (J.M. Molines) Open boundary conditions
10   !!   NEMO     1.0  !  2003  (J.M. Molines, G. Madec)  F90, free form
11   !!                 !  2003  (J.M. Molines) add mpp_ini_north(_3d,_2d)
12   !!             -   !  2004  (R. Bourdalle Badie)  isend option in mpi
13   !!                 !  2004  (J.M. Molines) minloc, maxloc
14   !!             -   !  2005  (G. Madec, S. Masson)  npolj=5,6 F-point & ice cases
15   !!             -   !  2005  (R. Redler) Replacement of MPI_COMM_WORLD except for MPI_Abort
16   !!             -   !  2005  (R. Benshila, G. Madec)  add extra halo case
17   !!             -   !  2008  (R. Benshila) add mpp_ini_ice
18   !!            3.2  !  2009  (R. Benshila) SHMEM suppression, north fold in lbc_nfd
19   !!            3.2  !  2009  (O. Marti)    add mpp_ini_znl
20   !!            4.0  !  2011  (G. Madec)  move ctl_ routines from in_out_manager
21   !!            3.5  !  2012  (S.Mocavero, I. Epicoco) Add mpp_lnk_bdy_3d/2d routines to optimize the BDY comm.
22   !!            3.5  !  2013  (C. Ethe, G. Madec)  message passing arrays as local variables
23   !!            3.5  !  2013  (S.Mocavero, I.Epicoco - CMCC) north fold optimizations
24   !!            3.6  !  2015  (O. Tintó and M. Castrillo - BSC) Added '_multiple' case for 2D lbc and max
25   !!            4.0  !  2017  (G. Madec) automatique allocation of array argument (use any 3rd dimension)
26   !!             -   !  2017  (G. Madec) create generic.h90 files to generate all lbc and north fold routines
27   !!----------------------------------------------------------------------
28
29   !!----------------------------------------------------------------------
30   !!   ctl_stop      : update momentum and tracer Kz from a tke scheme
31   !!   ctl_warn      : initialization, namelist read, and parameters control
32   !!   ctl_opn       : Open file and check if required file is available.
33   !!   ctl_nam       : Prints informations when an error occurs while reading a namelist
34   !!   get_unit      : give the index of an unused logical unit
35   !!----------------------------------------------------------------------
36#if   defined key_mpp_mpi
37   !!----------------------------------------------------------------------
38   !!   'key_mpp_mpi'             MPI massively parallel processing library
39   !!----------------------------------------------------------------------
40   !!   lib_mpp_alloc : allocate mpp arrays
41   !!   mynode        : indentify the processor unit
42   !!   mpp_lnk       : interface (defined in lbclnk) for message passing of 2d or 3d arrays (mpp_lnk_2d, mpp_lnk_3d)
43   !!   mpp_lnk_icb   : interface for message passing of 2d arrays with extra halo for icebergs (mpp_lnk_2d_icb)
44   !!   mpprecv       :
45   !!   mppsend       :
46   !!   mppscatter    :
47   !!   mppgather     :
48   !!   mpp_min       : generic interface for mppmin_int , mppmin_a_int , mppmin_real, mppmin_a_real
49   !!   mpp_max       : generic interface for mppmax_int , mppmax_a_int , mppmax_real, mppmax_a_real
50   !!   mpp_sum       : generic interface for mppsum_int , mppsum_a_int , mppsum_real, mppsum_a_real
51   !!   mpp_minloc    :
52   !!   mpp_maxloc    :
53   !!   mppsync       :
54   !!   mppstop       :
55   !!   mpp_ini_north : initialisation of north fold
56   !!   mpp_lbc_north_icb : alternative to mpp_nfd for extra outer halo with icebergs
57   !!----------------------------------------------------------------------
58   USE dom_oce        ! ocean space and time domain
59   USE lbcnfd         ! north fold treatment
60   USE in_out_manager ! I/O manager
61
62   IMPLICIT NONE
63   PRIVATE
64
65   INTERFACE mpp_nfd
66      MODULE PROCEDURE   mpp_nfd_2d    , mpp_nfd_3d    , mpp_nfd_4d
67      MODULE PROCEDURE   mpp_nfd_2d_ptr, mpp_nfd_3d_ptr, mpp_nfd_4d_ptr
68   END INTERFACE
69
70   ! Interface associated to the mpp_lnk_... routines is defined in lbclnk
71   PUBLIC   mpp_lnk_2d    , mpp_lnk_3d    , mpp_lnk_4d
72   PUBLIC   mpp_lnk_2d_ptr, mpp_lnk_3d_ptr, mpp_lnk_4d_ptr
73   !
74!!gm  this should be useless
75   PUBLIC   mpp_nfd_2d    , mpp_nfd_3d    , mpp_nfd_4d
76   PUBLIC   mpp_nfd_2d_ptr, mpp_nfd_3d_ptr, mpp_nfd_4d_ptr
77!!gm end
78   !
79   PUBLIC   ctl_stop, ctl_warn, get_unit, ctl_opn, ctl_nam
80   PUBLIC   mynode, mppstop, mppsync, mpp_comm_free
81   PUBLIC   mpp_ini_north
82   PUBLIC   mpp_lnk_2d_icb
83   PUBLIC   mpp_lbc_north_icb
84   PUBLIC   mpp_min, mpp_max, mpp_sum, mpp_minloc, mpp_maxloc
85   PUBLIC   mpp_delay_max, mpp_delay_sum, mpp_delay_rcv
86   PUBLIC   mppscatter, mppgather
87   PUBLIC   mpp_ini_znl
88   PUBLIC   mppsend, mpprecv                          ! needed by TAM and ICB routines
89   PUBLIC   mpp_lnk_bdy_2d, mpp_lnk_bdy_3d, mpp_lnk_bdy_4d
90   
91   !! * Interfaces
92   !! define generic interface for these routine as they are called sometimes
93   !! with scalar arguments instead of array arguments, which causes problems
94   !! for the compilation on AIX system as well as NEC and SGI. Ok on COMPACQ
95   INTERFACE mpp_min
96      MODULE PROCEDURE mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real
97   END INTERFACE
98   INTERFACE mpp_max
99      MODULE PROCEDURE mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real
100   END INTERFACE
101   INTERFACE mpp_sum
102      MODULE PROCEDURE mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real,   &
103         &             mppsum_realdd, mppsum_a_realdd
104   END INTERFACE
105   INTERFACE mpp_minloc
106      MODULE PROCEDURE mpp_minloc2d ,mpp_minloc3d
107   END INTERFACE
108   INTERFACE mpp_maxloc
109      MODULE PROCEDURE mpp_maxloc2d ,mpp_maxloc3d
110   END INTERFACE
111
112   !! ========================= !!
113   !!  MPI  variable definition !!
114   !! ========================= !!
115!$AGRIF_DO_NOT_TREAT
116   INCLUDE 'mpif.h'
117!$AGRIF_END_DO_NOT_TREAT
118
119   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .TRUE.    !: mpp flag
120
121   INTEGER, PARAMETER         ::   nprocmax = 2**10   ! maximun dimension (required to be a power of 2)
122
123   INTEGER, PUBLIC ::   mppsize        ! number of process
124   INTEGER, PUBLIC ::   mpprank        ! process number  [ 0 - size-1 ]
125!$AGRIF_DO_NOT_TREAT
126   INTEGER, PUBLIC ::   mpi_comm_oce   ! opa local communicator
127!$AGRIF_END_DO_NOT_TREAT
128
129   INTEGER :: MPI_SUMDD
130
131   ! variables used for zonal integration
132   INTEGER, PUBLIC ::   ncomm_znl       !: communicator made by the processors on the same zonal average
133   LOGICAL, PUBLIC ::   l_znl_root      !: True on the 'left'most processor on the same row
134   INTEGER         ::   ngrp_znl        !  group ID for the znl processors
135   INTEGER         ::   ndim_rank_znl   !  number of processors on the same zonal average
136   INTEGER, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_znl  ! dimension ndim_rank_znl, number of the procs into the same znl domain
137
138   ! North fold condition in mpp_mpi with jpni > 1 (PUBLIC for TAM)
139   INTEGER, PUBLIC ::   ngrp_world        !: group ID for the world processors
140   INTEGER, PUBLIC ::   ngrp_opa          !: group ID for the opa processors
141   INTEGER, PUBLIC ::   ngrp_north        !: group ID for the northern processors (to be fold)
142   INTEGER, PUBLIC ::   ncomm_north       !: communicator made by the processors belonging to ngrp_north
143   INTEGER, PUBLIC ::   ndim_rank_north   !: number of 'sea' processor in the northern line (can be /= jpni !)
144   INTEGER, PUBLIC ::   njmppmax          !: value of njmpp for the processors of the northern line
145   INTEGER, PUBLIC ::   north_root        !: number (in the comm_opa) of proc 0 in the northern comm
146   INTEGER, PUBLIC, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_north   !: dimension ndim_rank_north
147
148   ! Type of send : standard, buffered, immediate
149   CHARACTER(len=1), PUBLIC ::   cn_mpi_send        !: type od mpi send/recieve (S=standard, B=bsend, I=isend)
150   LOGICAL         , PUBLIC ::   l_isend = .FALSE.  !: isend use indicator (T if cn_mpi_send='I')
151   INTEGER         , PUBLIC ::   nn_buffer          !: size of the buffer in case of mpi_bsend
152
153   ! Communications summary report
154   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_lbc                   !: names of lbc_lnk calling routines
155   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_glb                   !: names of global comm calling routines
156   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_dlg                   !: names of delayed global comm calling routines
157   INTEGER, PUBLIC                               ::   ncom_stp = 0                 !: copy of time step # istp
158   INTEGER, PUBLIC                               ::   ncom_fsbc = 1                !: copy of sbc time step # nn_fsbc
159   INTEGER, PUBLIC                               ::   ncom_dttrc = 1               !: copy of top time step # nn_dttrc
160   INTEGER, PUBLIC                               ::   ncom_freq                    !: frequency of comm diagnostic
161   INTEGER, PUBLIC , DIMENSION(:,:), ALLOCATABLE ::   ncomm_sequence               !: size of communicated arrays (halos)
162   INTEGER, PARAMETER, PUBLIC                    ::   ncom_rec_max = 5000          !: max number of communication record
163   INTEGER, PUBLIC                               ::   n_sequence_lbc = 0           !: # of communicated arraysvia lbc
164   INTEGER, PUBLIC                               ::   n_sequence_glb = 0           !: # of global communications
165   INTEGER, PUBLIC                               ::   n_sequence_dlg = 0           !: # of delayed global communications
166   INTEGER, PUBLIC                               ::   numcom = -1                  !: logical unit for communicaton report
167   LOGICAL, PUBLIC                               ::   l_full_nf_update = .TRUE.    !: logical for a full (2lines) update of bc at North fold report
168   INTEGER,                    PARAMETER, PUBLIC ::   nbdelay = 2       !: number of delayed operations
169   !: name (used as id) of allreduce-delayed operations
170   ! Warning: we must use the same character length in an array constructor (at least for gcc compiler)
171   CHARACTER(len=32), DIMENSION(nbdelay), PUBLIC ::   c_delaylist = (/ 'cflice', 'fwb   ' /)
172   !: component name where the allreduce-delayed operation is performed
173   CHARACTER(len=3),  DIMENSION(nbdelay), PUBLIC ::   c_delaycpnt = (/ 'ICE'   , 'OCE' /)
174   TYPE, PUBLIC ::   DELAYARR
175      REAL(   wp), POINTER, DIMENSION(:) ::  z1d => NULL()
176      COMPLEX(wp), POINTER, DIMENSION(:) ::  y1d => NULL()
177   END TYPE DELAYARR
178   TYPE( DELAYARR ), DIMENSION(nbdelay), PUBLIC, SAVE  ::   todelay         !: must have SAVE for default initialization of DELAYARR
179   INTEGER,          DIMENSION(nbdelay), PUBLIC        ::   ndelayid = -1   !: mpi request id of the delayed operations
180
181   ! timing summary report
182   REAL(wp), DIMENSION(2), PUBLIC ::  waiting_time = 0._wp
183   REAL(wp)              , PUBLIC ::  compute_time = 0._wp, elapsed_time = 0._wp
184   
185   REAL(wp), DIMENSION(:), ALLOCATABLE, SAVE ::   tampon   ! buffer in case of bsend
186
187   LOGICAL, PUBLIC ::   ln_nnogather                !: namelist control of northfold comms
188   LOGICAL, PUBLIC ::   l_north_nogather = .FALSE.  !: internal control of northfold comms
189
190   !!----------------------------------------------------------------------
191   !! NEMO/OCE 4.0 , NEMO Consortium (2018)
192   !! $Id$
193   !! Software governed by the CeCILL license (see ./LICENSE)
194   !!----------------------------------------------------------------------
195CONTAINS
196
197   FUNCTION mynode( ldtxt, ldname, kumnam_ref, kumnam_cfg, kumond, kstop, localComm )
198      !!----------------------------------------------------------------------
199      !!                  ***  routine mynode  ***
200      !!
201      !! ** Purpose :   Find processor unit
202      !!----------------------------------------------------------------------
203      CHARACTER(len=*),DIMENSION(:), INTENT(  out) ::   ldtxt        !
204      CHARACTER(len=*)             , INTENT(in   ) ::   ldname       !
205      INTEGER                      , INTENT(in   ) ::   kumnam_ref   ! logical unit for reference namelist
206      INTEGER                      , INTENT(in   ) ::   kumnam_cfg   ! logical unit for configuration namelist
207      INTEGER                      , INTENT(inout) ::   kumond       ! logical unit for namelist output
208      INTEGER                      , INTENT(inout) ::   kstop        ! stop indicator
209      INTEGER         , OPTIONAL   , INTENT(in   ) ::   localComm    !
210      !
211      INTEGER ::   mynode, ierr, code, ji, ii, ios
212      LOGICAL ::   mpi_was_called
213      !
214      NAMELIST/nammpp/ cn_mpi_send, nn_buffer, jpni, jpnj, ln_nnogather
215      !!----------------------------------------------------------------------
216      !
217      ii = 1
218      WRITE(ldtxt(ii),*)                                                                  ;   ii = ii + 1
219      WRITE(ldtxt(ii),*) 'mynode : mpi initialisation'                                    ;   ii = ii + 1
220      WRITE(ldtxt(ii),*) '~~~~~~ '                                                        ;   ii = ii + 1
221      !
222      REWIND( kumnam_ref )              ! Namelist nammpp in reference namelist: mpi variables
223      READ  ( kumnam_ref, nammpp, IOSTAT = ios, ERR = 901)
224901   IF( ios /= 0 )   CALL ctl_nam ( ios , 'nammpp in reference namelist', lwp )
225      !
226      REWIND( kumnam_cfg )              ! Namelist nammpp in configuration namelist: mpi variables
227      READ  ( kumnam_cfg, nammpp, IOSTAT = ios, ERR = 902 )
228902   IF( ios >  0 )   CALL ctl_nam ( ios , 'nammpp in configuration namelist', lwp )
229      !
230      !                              ! control print
231      WRITE(ldtxt(ii),*) '   Namelist nammpp'                                             ;   ii = ii + 1
232      WRITE(ldtxt(ii),*) '      mpi send type          cn_mpi_send = ', cn_mpi_send       ;   ii = ii + 1
233      WRITE(ldtxt(ii),*) '      size exported buffer   nn_buffer   = ', nn_buffer,' bytes';   ii = ii + 1
234      !
235      IF( jpni < 1 .OR. jpnj < 1  ) THEN
236         WRITE(ldtxt(ii),*) '      jpni and jpnj will be calculated automatically' ;   ii = ii + 1
237      ELSE
238         WRITE(ldtxt(ii),*) '      processor grid extent in i         jpni = ',jpni       ;   ii = ii + 1
239         WRITE(ldtxt(ii),*) '      processor grid extent in j         jpnj = ',jpnj       ;   ii = ii + 1
240      ENDIF
241
242      WRITE(ldtxt(ii),*) '      avoid use of mpi_allgather at the north fold  ln_nnogather = ', ln_nnogather  ; ii = ii + 1
243
244      CALL mpi_initialized ( mpi_was_called, code )
245      IF( code /= MPI_SUCCESS ) THEN
246         DO ji = 1, SIZE(ldtxt)
247            IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
248         END DO
249         WRITE(*, cform_err)
250         WRITE(*, *) 'lib_mpp: Error in routine mpi_initialized'
251         CALL mpi_abort( mpi_comm_world, code, ierr )
252      ENDIF
253
254      IF( mpi_was_called ) THEN
255         !
256         SELECT CASE ( cn_mpi_send )
257         CASE ( 'S' )                ! Standard mpi send (blocking)
258            WRITE(ldtxt(ii),*) '           Standard blocking mpi send (send)'             ;   ii = ii + 1
259         CASE ( 'B' )                ! Buffer mpi send (blocking)
260            WRITE(ldtxt(ii),*) '           Buffer blocking mpi send (bsend)'              ;   ii = ii + 1
261            IF( Agrif_Root() )   CALL mpi_init_oce( ldtxt, ii, ierr )
262         CASE ( 'I' )                ! Immediate mpi send (non-blocking send)
263            WRITE(ldtxt(ii),*) '           Immediate non-blocking send (isend)'           ;   ii = ii + 1
264            l_isend = .TRUE.
265         CASE DEFAULT
266            WRITE(ldtxt(ii),cform_err)                                                    ;   ii = ii + 1
267            WRITE(ldtxt(ii),*) '           bad value for cn_mpi_send = ', cn_mpi_send     ;   ii = ii + 1
268            kstop = kstop + 1
269         END SELECT
270         !
271      ELSEIF ( PRESENT(localComm) .AND. .NOT. mpi_was_called ) THEN
272         WRITE(ldtxt(ii),cform_err)                                                    ;   ii = ii + 1
273         WRITE(ldtxt(ii),*) ' lib_mpp: You cannot provide a local communicator '          ;   ii = ii + 1
274         WRITE(ldtxt(ii),*) '          without calling MPI_Init before ! '                ;   ii = ii + 1
275         kstop = kstop + 1
276      ELSE
277         SELECT CASE ( cn_mpi_send )
278         CASE ( 'S' )                ! Standard mpi send (blocking)
279            WRITE(ldtxt(ii),*) '           Standard blocking mpi send (send)'             ;   ii = ii + 1
280            CALL mpi_init( ierr )
281         CASE ( 'B' )                ! Buffer mpi send (blocking)
282            WRITE(ldtxt(ii),*) '           Buffer blocking mpi send (bsend)'              ;   ii = ii + 1
283            IF( Agrif_Root() )   CALL mpi_init_oce( ldtxt, ii, ierr )
284         CASE ( 'I' )                ! Immediate mpi send (non-blocking send)
285            WRITE(ldtxt(ii),*) '           Immediate non-blocking send (isend)'           ;   ii = ii + 1
286            l_isend = .TRUE.
287            CALL mpi_init( ierr )
288         CASE DEFAULT
289            WRITE(ldtxt(ii),cform_err)                                                    ;   ii = ii + 1
290            WRITE(ldtxt(ii),*) '           bad value for cn_mpi_send = ', cn_mpi_send     ;   ii = ii + 1
291            kstop = kstop + 1
292         END SELECT
293         !
294      ENDIF
295
296      IF( PRESENT(localComm) ) THEN
297         IF( Agrif_Root() ) THEN
298            mpi_comm_oce = localComm
299         ENDIF
300      ELSE
301         CALL mpi_comm_dup( mpi_comm_world, mpi_comm_oce, code)
302         IF( code /= MPI_SUCCESS ) THEN
303            DO ji = 1, SIZE(ldtxt)
304               IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
305            END DO
306            WRITE(*, cform_err)
307            WRITE(*, *) ' lib_mpp: Error in routine mpi_comm_dup'
308            CALL mpi_abort( mpi_comm_world, code, ierr )
309         ENDIF
310      ENDIF
311
312#if defined key_agrif
313      IF( Agrif_Root() ) THEN
314         CALL Agrif_MPI_Init(mpi_comm_oce)
315      ELSE
316         CALL Agrif_MPI_set_grid_comm(mpi_comm_oce)
317      ENDIF
318#endif
319
320      CALL mpi_comm_rank( mpi_comm_oce, mpprank, ierr )
321      CALL mpi_comm_size( mpi_comm_oce, mppsize, ierr )
322      mynode = mpprank
323
324      IF( mynode == 0 .AND. nprint > 2) THEN
325         CALL ctl_opn( kumond, TRIM(ldname), 'UNKNOWN', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. , 1 )
326         WRITE(kumond, nammpp)     
327      ENDIF
328      !
329      CALL MPI_OP_CREATE(DDPDD_MPI, .TRUE., MPI_SUMDD, ierr)
330      !
331   END FUNCTION mynode
332
333   !!----------------------------------------------------------------------
334   !!                   ***  routine mpp_lnk_(2,3,4)d  ***
335   !!
336   !!   * Argument : dummy argument use in mpp_lnk_... routines
337   !!                ptab   :   array or pointer of arrays on which the boundary condition is applied
338   !!                cd_nat :   nature of array grid-points
339   !!                psgn   :   sign used across the north fold boundary
340   !!                kfld   :   optional, number of pt3d arrays
341   !!                cd_mpp :   optional, fill the overlap area only
342   !!                pval   :   optional, background value (used at closed boundaries)
343   !!----------------------------------------------------------------------
344   !
345   !                       !==  2D array and array of 2D pointer  ==!
346   !
347#  define DIM_2d
348#     define ROUTINE_LNK           mpp_lnk_2d
349#     include "mpp_lnk_generic.h90"
350#     undef ROUTINE_LNK
351#     define MULTI
352#     define ROUTINE_LNK           mpp_lnk_2d_ptr
353#     include "mpp_lnk_generic.h90"
354#     undef ROUTINE_LNK
355#     undef MULTI
356#  undef DIM_2d
357   !
358   !                       !==  3D array and array of 3D pointer  ==!
359   !
360#  define DIM_3d
361#     define ROUTINE_LNK           mpp_lnk_3d
362#     include "mpp_lnk_generic.h90"
363#     undef ROUTINE_LNK
364#     define MULTI
365#     define ROUTINE_LNK           mpp_lnk_3d_ptr
366#     include "mpp_lnk_generic.h90"
367#     undef ROUTINE_LNK
368#     undef MULTI
369#  undef DIM_3d
370   !
371   !                       !==  4D array and array of 4D pointer  ==!
372   !
373#  define DIM_4d
374#     define ROUTINE_LNK           mpp_lnk_4d
375#     include "mpp_lnk_generic.h90"
376#     undef ROUTINE_LNK
377#     define MULTI
378#     define ROUTINE_LNK           mpp_lnk_4d_ptr
379#     include "mpp_lnk_generic.h90"
380#     undef ROUTINE_LNK
381#     undef MULTI
382#  undef DIM_4d
383
384   !!----------------------------------------------------------------------
385   !!                   ***  routine mpp_nfd_(2,3,4)d  ***
386   !!
387   !!   * Argument : dummy argument use in mpp_nfd_... routines
388   !!                ptab   :   array or pointer of arrays on which the boundary condition is applied
389   !!                cd_nat :   nature of array grid-points
390   !!                psgn   :   sign used across the north fold boundary
391   !!                kfld   :   optional, number of pt3d arrays
392   !!                cd_mpp :   optional, fill the overlap area only
393   !!                pval   :   optional, background value (used at closed boundaries)
394   !!----------------------------------------------------------------------
395   !
396   !                       !==  2D array and array of 2D pointer  ==!
397   !
398#  define DIM_2d
399#     define ROUTINE_NFD           mpp_nfd_2d
400#     include "mpp_nfd_generic.h90"
401#     undef ROUTINE_NFD
402#     define MULTI
403#     define ROUTINE_NFD           mpp_nfd_2d_ptr
404#     include "mpp_nfd_generic.h90"
405#     undef ROUTINE_NFD
406#     undef MULTI
407#  undef DIM_2d
408   !
409   !                       !==  3D array and array of 3D pointer  ==!
410   !
411#  define DIM_3d
412#     define ROUTINE_NFD           mpp_nfd_3d
413#     include "mpp_nfd_generic.h90"
414#     undef ROUTINE_NFD
415#     define MULTI
416#     define ROUTINE_NFD           mpp_nfd_3d_ptr
417#     include "mpp_nfd_generic.h90"
418#     undef ROUTINE_NFD
419#     undef MULTI
420#  undef DIM_3d
421   !
422   !                       !==  4D array and array of 4D pointer  ==!
423   !
424#  define DIM_4d
425#     define ROUTINE_NFD           mpp_nfd_4d
426#     include "mpp_nfd_generic.h90"
427#     undef ROUTINE_NFD
428#     define MULTI
429#     define ROUTINE_NFD           mpp_nfd_4d_ptr
430#     include "mpp_nfd_generic.h90"
431#     undef ROUTINE_NFD
432#     undef MULTI
433#  undef DIM_4d
434
435
436   !!----------------------------------------------------------------------
437   !!                   ***  routine mpp_lnk_bdy_(2,3,4)d  ***
438   !!
439   !!   * Argument : dummy argument use in mpp_lnk_... routines
440   !!                ptab   :   array or pointer of arrays on which the boundary condition is applied
441   !!                cd_nat :   nature of array grid-points
442   !!                psgn   :   sign used across the north fold boundary
443   !!                kb_bdy :   BDY boundary set
444   !!                kfld   :   optional, number of pt3d arrays
445   !!----------------------------------------------------------------------
446   !
447   !                       !==  2D array and array of 2D pointer  ==!
448   !
449#  define DIM_2d
450#     define ROUTINE_BDY           mpp_lnk_bdy_2d
451#     include "mpp_bdy_generic.h90"
452#     undef ROUTINE_BDY
453#  undef DIM_2d
454   !
455   !                       !==  3D array and array of 3D pointer  ==!
456   !
457#  define DIM_3d
458#     define ROUTINE_BDY           mpp_lnk_bdy_3d
459#     include "mpp_bdy_generic.h90"
460#     undef ROUTINE_BDY
461#  undef DIM_3d
462   !
463   !                       !==  4D array and array of 4D pointer  ==!
464   !
465#  define DIM_4d
466#     define ROUTINE_BDY           mpp_lnk_bdy_4d
467#     include "mpp_bdy_generic.h90"
468#     undef ROUTINE_BDY
469#  undef DIM_4d
470
471   !!----------------------------------------------------------------------
472   !!
473   !!   load_array  &   mpp_lnk_2d_9    à generaliser a 3D et 4D
474   
475   
476   !!    mpp_lnk_sum_2d et 3D   ====>>>>>>   à virer du code !!!!
477   
478   
479   !!----------------------------------------------------------------------
480
481
482
483   SUBROUTINE mppsend( ktyp, pmess, kbytes, kdest, md_req )
484      !!----------------------------------------------------------------------
485      !!                  ***  routine mppsend  ***
486      !!
487      !! ** Purpose :   Send messag passing array
488      !!
489      !!----------------------------------------------------------------------
490      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
491      INTEGER , INTENT(in   ) ::   kbytes     ! size of the array pmess
492      INTEGER , INTENT(in   ) ::   kdest      ! receive process number
493      INTEGER , INTENT(in   ) ::   ktyp       ! tag of the message
494      INTEGER , INTENT(in   ) ::   md_req     ! argument for isend
495      !!
496      INTEGER ::   iflag
497      !!----------------------------------------------------------------------
498      !
499      SELECT CASE ( cn_mpi_send )
500      CASE ( 'S' )                ! Standard mpi send (blocking)
501         CALL mpi_send ( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce        , iflag )
502      CASE ( 'B' )                ! Buffer mpi send (blocking)
503         CALL mpi_bsend( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce        , iflag )
504      CASE ( 'I' )                ! Immediate mpi send (non-blocking send)
505         ! be carefull, one more argument here : the mpi request identifier..
506         CALL mpi_isend( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce, md_req, iflag )
507      END SELECT
508      !
509   END SUBROUTINE mppsend
510
511
512   SUBROUTINE mpprecv( ktyp, pmess, kbytes, ksource )
513      !!----------------------------------------------------------------------
514      !!                  ***  routine mpprecv  ***
515      !!
516      !! ** Purpose :   Receive messag passing array
517      !!
518      !!----------------------------------------------------------------------
519      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
520      INTEGER , INTENT(in   ) ::   kbytes     ! suze of the array pmess
521      INTEGER , INTENT(in   ) ::   ktyp       ! Tag of the recevied message
522      INTEGER, OPTIONAL, INTENT(in) :: ksource    ! source process number
523      !!
524      INTEGER :: istatus(mpi_status_size)
525      INTEGER :: iflag
526      INTEGER :: use_source
527      !!----------------------------------------------------------------------
528      !
529      ! If a specific process number has been passed to the receive call,
530      ! use that one. Default is to use mpi_any_source
531      use_source = mpi_any_source
532      IF( PRESENT(ksource) )   use_source = ksource
533      !
534      CALL mpi_recv( pmess, kbytes, mpi_double_precision, use_source, ktyp, mpi_comm_oce, istatus, iflag )
535      !
536   END SUBROUTINE mpprecv
537
538
539   SUBROUTINE mppgather( ptab, kp, pio )
540      !!----------------------------------------------------------------------
541      !!                   ***  routine mppgather  ***
542      !!
543      !! ** Purpose :   Transfert between a local subdomain array and a work
544      !!     array which is distributed following the vertical level.
545      !!
546      !!----------------------------------------------------------------------
547      REAL(wp), DIMENSION(jpi,jpj)      , INTENT(in   ) ::   ptab   ! subdomain input array
548      INTEGER                           , INTENT(in   ) ::   kp     ! record length
549      REAL(wp), DIMENSION(jpi,jpj,jpnij), INTENT(  out) ::   pio    ! subdomain input array
550      !!
551      INTEGER :: itaille, ierror   ! temporary integer
552      !!---------------------------------------------------------------------
553      !
554      itaille = jpi * jpj
555      CALL mpi_gather( ptab, itaille, mpi_double_precision, pio, itaille     ,   &
556         &                            mpi_double_precision, kp , mpi_comm_oce, ierror )
557      !
558   END SUBROUTINE mppgather
559
560
561   SUBROUTINE mppscatter( pio, kp, ptab )
562      !!----------------------------------------------------------------------
563      !!                  ***  routine mppscatter  ***
564      !!
565      !! ** Purpose :   Transfert between awork array which is distributed
566      !!      following the vertical level and the local subdomain array.
567      !!
568      !!----------------------------------------------------------------------
569      REAL(wp), DIMENSION(jpi,jpj,jpnij)  ::   pio    ! output array
570      INTEGER                             ::   kp     ! Tag (not used with MPI
571      REAL(wp), DIMENSION(jpi,jpj)        ::   ptab   ! subdomain array input
572      !!
573      INTEGER :: itaille, ierror   ! temporary integer
574      !!---------------------------------------------------------------------
575      !
576      itaille = jpi * jpj
577      !
578      CALL mpi_scatter( pio, itaille, mpi_double_precision, ptab, itaille     ,   &
579         &                            mpi_double_precision, kp  , mpi_comm_oce, ierror )
580      !
581   END SUBROUTINE mppscatter
582
583   
584   SUBROUTINE mpp_delay_sum( cdname, cdelay, y_in, pout, ldlast, kcom )
585     !!----------------------------------------------------------------------
586      !!                   ***  routine mpp_delay_sum  ***
587      !!
588      !! ** Purpose :   performed delayed mpp_sum, the result is received on next call
589      !!
590      !!----------------------------------------------------------------------
591      CHARACTER(len=*), INTENT(in   )               ::   cdname  ! name of the calling subroutine
592      CHARACTER(len=*), INTENT(in   )               ::   cdelay  ! name (used as id) of the delayed operation
593      COMPLEX(wp),      INTENT(in   ), DIMENSION(:) ::   y_in
594      REAL(wp),         INTENT(  out), DIMENSION(:) ::   pout
595      LOGICAL,          INTENT(in   )               ::   ldlast  ! true if this is the last time we call this routine
596      INTEGER,          INTENT(in   ), OPTIONAL     ::   kcom
597      !!
598      INTEGER ::   ji, isz
599      INTEGER ::   idvar
600      INTEGER ::   ierr, ilocalcomm
601      COMPLEX(wp), ALLOCATABLE, DIMENSION(:) ::   ytmp
602      !!----------------------------------------------------------------------
603      ilocalcomm = mpi_comm_oce
604      IF( PRESENT(kcom) )   ilocalcomm = kcom
605
606      isz = SIZE(y_in)
607     
608      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ld_dlg = .TRUE. )
609
610      idvar = -1
611      DO ji = 1, nbdelay
612         IF( TRIM(cdelay) == TRIM(c_delaylist(ji)) ) idvar = ji
613      END DO
614      IF ( idvar == -1 )   CALL ctl_stop( 'STOP',' mpp_delay_sum : please add a new delayed exchange for '//TRIM(cdname) )
615
616      IF ( ndelayid(idvar) == 0 ) THEN         ! first call    with restart: %z1d defined in iom_delay_rst
617         !                                       --------------------------
618         IF ( SIZE(todelay(idvar)%z1d) /= isz ) THEN                  ! Check dimension coherence
619            IF(lwp) THEN
620               WRITE(numout,*) ' WARNING: the nb of delayed variables in restart file is not the model one'
621               IF(lflush) CALL FLUSH(numout)
622            ENDIF
623            DEALLOCATE(todelay(idvar)%z1d)
624            ndelayid(idvar) = -1                                      ! do as if we had no restart
625         ELSE
626            ALLOCATE(todelay(idvar)%y1d(isz))
627            todelay(idvar)%y1d(:) = CMPLX(todelay(idvar)%z1d(:), 0., wp)   ! create %y1d, complex variable needed by mpi_sumdd
628         END IF
629      ENDIF
630     
631      IF( ndelayid(idvar) == -1 ) THEN         ! first call without restart: define %y1d and %z1d from y_in with blocking allreduce
632         !                                       --------------------------
633         ALLOCATE(todelay(idvar)%z1d(isz), todelay(idvar)%y1d(isz))   ! allocate also %z1d as used for the restart
634         CALL mpi_allreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ierr )   ! get %y1d
635         todelay(idvar)%z1d(:) = REAL(todelay(idvar)%y1d(:), wp)      ! define %z1d from %y1d
636      ENDIF
637
638      IF( ndelayid(idvar) > 0 )   CALL mpp_delay_rcv( idvar )         ! make sure %z1d is received
639
640      ! send back pout from todelay(idvar)%z1d defined at previous call
641      pout(:) = todelay(idvar)%z1d(:)
642
643      ! send y_in into todelay(idvar)%y1d with a non-blocking communication
644#if defined key_mpi2
645      IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
646      CALL  mpi_allreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ndelayid(idvar), ierr )
647      IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
648#else
649      CALL mpi_iallreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ndelayid(idvar), ierr )
650#endif
651
652   END SUBROUTINE mpp_delay_sum
653
654   
655   SUBROUTINE mpp_delay_max( cdname, cdelay, p_in, pout, ldlast, kcom )
656      !!----------------------------------------------------------------------
657      !!                   ***  routine mpp_delay_max  ***
658      !!
659      !! ** Purpose :   performed delayed mpp_max, the result is received on next call
660      !!
661      !!----------------------------------------------------------------------
662      CHARACTER(len=*), INTENT(in   )                 ::   cdname  ! name of the calling subroutine
663      CHARACTER(len=*), INTENT(in   )                 ::   cdelay  ! name (used as id) of the delayed operation
664      REAL(wp),         INTENT(in   ), DIMENSION(:)   ::   p_in    !
665      REAL(wp),         INTENT(  out), DIMENSION(:)   ::   pout    !
666      LOGICAL,          INTENT(in   )                 ::   ldlast  ! true if this is the last time we call this routine
667      INTEGER,          INTENT(in   ), OPTIONAL       ::   kcom
668      !!
669      INTEGER ::   ji, isz
670      INTEGER ::   idvar
671      INTEGER ::   ierr, ilocalcomm
672      !!----------------------------------------------------------------------
673      ilocalcomm = mpi_comm_oce
674      IF( PRESENT(kcom) )   ilocalcomm = kcom
675
676      isz = SIZE(p_in)
677
678      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ld_dlg = .TRUE. )
679
680      idvar = -1
681      DO ji = 1, nbdelay
682         IF( TRIM(cdelay) == TRIM(c_delaylist(ji)) ) idvar = ji
683      END DO
684      IF ( idvar == -1 )   CALL ctl_stop( 'STOP',' mpp_delay_max : please add a new delayed exchange for '//TRIM(cdname) )
685
686      IF ( ndelayid(idvar) == 0 ) THEN         ! first call    with restart: %z1d defined in iom_delay_rst
687         !                                       --------------------------
688         IF ( SIZE(todelay(idvar)%z1d) /= isz ) THEN                  ! Check dimension coherence
689            IF(lwp) THEN
690               WRITE(numout,*) ' WARNING: the nb of delayed variables in restart file is not the model one'
691               IF(lflush) CALL FLUSH(numout)
692            ENDIF
693            DEALLOCATE(todelay(idvar)%z1d)
694            ndelayid(idvar) = -1                                      ! do as if we had no restart
695         END IF
696      ENDIF
697
698      IF( ndelayid(idvar) == -1 ) THEN         ! first call without restart: define %z1d from p_in with a blocking allreduce
699         !                                       --------------------------
700         ALLOCATE(todelay(idvar)%z1d(isz))
701         CALL mpi_allreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ierr )   ! get %z1d
702      ENDIF
703
704      IF( ndelayid(idvar) > 0 )   CALL mpp_delay_rcv( idvar )         ! make sure %z1d is received
705
706      ! send back pout from todelay(idvar)%z1d defined at previous call
707      pout(:) = todelay(idvar)%z1d(:)
708
709      ! send p_in into todelay(idvar)%z1d with a non-blocking communication
710#if defined key_mpi2
711      IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
712      CALL  mpi_allreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ndelayid(idvar), ierr )
713      IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
714#else
715      CALL mpi_iallreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ndelayid(idvar), ierr )
716#endif
717
718   END SUBROUTINE mpp_delay_max
719
720   
721   SUBROUTINE mpp_delay_rcv( kid )
722      !!----------------------------------------------------------------------
723      !!                   ***  routine mpp_delay_rcv  ***
724      !!
725      !! ** Purpose :  force barrier for delayed mpp (needed for restart)
726      !!
727      !!----------------------------------------------------------------------
728      INTEGER,INTENT(in   )      ::  kid 
729      INTEGER ::   ierr
730      !!----------------------------------------------------------------------
731      IF( ndelayid(kid) /= -2 ) THEN 
732#if ! defined key_mpi2
733         IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
734         CALL mpi_wait( ndelayid(kid), MPI_STATUS_IGNORE, ierr )                        ! make sure todelay(kid) is received
735         IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
736#endif
737         IF( ASSOCIATED(todelay(kid)%y1d) )   todelay(kid)%z1d(:) = REAL(todelay(kid)%y1d(:), wp)  ! define %z1d from %y1d
738         ndelayid(kid) = -2   ! add flag to know that mpi_wait was already called on kid
739      ENDIF
740   END SUBROUTINE mpp_delay_rcv
741
742   
743   !!----------------------------------------------------------------------
744   !!    ***  mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real  ***
745   !!   
746   !!----------------------------------------------------------------------
747   !!
748#  define OPERATION_MAX
749#  define INTEGER_TYPE
750#  define DIM_0d
751#     define ROUTINE_ALLREDUCE           mppmax_int
752#     include "mpp_allreduce_generic.h90"
753#     undef ROUTINE_ALLREDUCE
754#  undef DIM_0d
755#  define DIM_1d
756#     define ROUTINE_ALLREDUCE           mppmax_a_int
757#     include "mpp_allreduce_generic.h90"
758#     undef ROUTINE_ALLREDUCE
759#  undef DIM_1d
760#  undef INTEGER_TYPE
761!
762#  define REAL_TYPE
763#  define DIM_0d
764#     define ROUTINE_ALLREDUCE           mppmax_real
765#     include "mpp_allreduce_generic.h90"
766#     undef ROUTINE_ALLREDUCE
767#  undef DIM_0d
768#  define DIM_1d
769#     define ROUTINE_ALLREDUCE           mppmax_a_real
770#     include "mpp_allreduce_generic.h90"
771#     undef ROUTINE_ALLREDUCE
772#  undef DIM_1d
773#  undef REAL_TYPE
774#  undef OPERATION_MAX
775   !!----------------------------------------------------------------------
776   !!    ***  mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real  ***
777   !!   
778   !!----------------------------------------------------------------------
779   !!
780#  define OPERATION_MIN
781#  define INTEGER_TYPE
782#  define DIM_0d
783#     define ROUTINE_ALLREDUCE           mppmin_int
784#     include "mpp_allreduce_generic.h90"
785#     undef ROUTINE_ALLREDUCE
786#  undef DIM_0d
787#  define DIM_1d
788#     define ROUTINE_ALLREDUCE           mppmin_a_int
789#     include "mpp_allreduce_generic.h90"
790#     undef ROUTINE_ALLREDUCE
791#  undef DIM_1d
792#  undef INTEGER_TYPE
793!
794#  define REAL_TYPE
795#  define DIM_0d
796#     define ROUTINE_ALLREDUCE           mppmin_real
797#     include "mpp_allreduce_generic.h90"
798#     undef ROUTINE_ALLREDUCE
799#  undef DIM_0d
800#  define DIM_1d
801#     define ROUTINE_ALLREDUCE           mppmin_a_real
802#     include "mpp_allreduce_generic.h90"
803#     undef ROUTINE_ALLREDUCE
804#  undef DIM_1d
805#  undef REAL_TYPE
806#  undef OPERATION_MIN
807
808   !!----------------------------------------------------------------------
809   !!    ***  mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real  ***
810   !!   
811   !!   Global sum of 1D array or a variable (integer, real or complex)
812   !!----------------------------------------------------------------------
813   !!
814#  define OPERATION_SUM
815#  define INTEGER_TYPE
816#  define DIM_0d
817#     define ROUTINE_ALLREDUCE           mppsum_int
818#     include "mpp_allreduce_generic.h90"
819#     undef ROUTINE_ALLREDUCE
820#  undef DIM_0d
821#  define DIM_1d
822#     define ROUTINE_ALLREDUCE           mppsum_a_int
823#     include "mpp_allreduce_generic.h90"
824#     undef ROUTINE_ALLREDUCE
825#  undef DIM_1d
826#  undef INTEGER_TYPE
827!
828#  define REAL_TYPE
829#  define DIM_0d
830#     define ROUTINE_ALLREDUCE           mppsum_real
831#     include "mpp_allreduce_generic.h90"
832#     undef ROUTINE_ALLREDUCE
833#  undef DIM_0d
834#  define DIM_1d
835#     define ROUTINE_ALLREDUCE           mppsum_a_real
836#     include "mpp_allreduce_generic.h90"
837#     undef ROUTINE_ALLREDUCE
838#  undef DIM_1d
839#  undef REAL_TYPE
840#  undef OPERATION_SUM
841
842#  define OPERATION_SUM_DD
843#  define COMPLEX_TYPE
844#  define DIM_0d
845#     define ROUTINE_ALLREDUCE           mppsum_realdd
846#     include "mpp_allreduce_generic.h90"
847#     undef ROUTINE_ALLREDUCE
848#  undef DIM_0d
849#  define DIM_1d
850#     define ROUTINE_ALLREDUCE           mppsum_a_realdd
851#     include "mpp_allreduce_generic.h90"
852#     undef ROUTINE_ALLREDUCE
853#  undef DIM_1d
854#  undef COMPLEX_TYPE
855#  undef OPERATION_SUM_DD
856
857   !!----------------------------------------------------------------------
858   !!    ***  mpp_minloc2d, mpp_minloc3d, mpp_maxloc2d, mpp_maxloc3d
859   !!   
860   !!----------------------------------------------------------------------
861   !!
862#  define OPERATION_MINLOC
863#  define DIM_2d
864#     define ROUTINE_LOC           mpp_minloc2d
865#     include "mpp_loc_generic.h90"
866#     undef ROUTINE_LOC
867#  undef DIM_2d
868#  define DIM_3d
869#     define ROUTINE_LOC           mpp_minloc3d
870#     include "mpp_loc_generic.h90"
871#     undef ROUTINE_LOC
872#  undef DIM_3d
873#  undef OPERATION_MINLOC
874
875#  define OPERATION_MAXLOC
876#  define DIM_2d
877#     define ROUTINE_LOC           mpp_maxloc2d
878#     include "mpp_loc_generic.h90"
879#     undef ROUTINE_LOC
880#  undef DIM_2d
881#  define DIM_3d
882#     define ROUTINE_LOC           mpp_maxloc3d
883#     include "mpp_loc_generic.h90"
884#     undef ROUTINE_LOC
885#  undef DIM_3d
886#  undef OPERATION_MAXLOC
887
888   SUBROUTINE mppsync()
889      !!----------------------------------------------------------------------
890      !!                  ***  routine mppsync  ***
891      !!
892      !! ** Purpose :   Massively parallel processors, synchroneous
893      !!
894      !!-----------------------------------------------------------------------
895      INTEGER :: ierror
896      !!-----------------------------------------------------------------------
897      !
898      CALL mpi_barrier( mpi_comm_oce, ierror )
899      !
900   END SUBROUTINE mppsync
901
902
903   SUBROUTINE mppstop( ldfinal, ld_force_abort ) 
904      !!----------------------------------------------------------------------
905      !!                  ***  routine mppstop  ***
906      !!
907      !! ** purpose :   Stop massively parallel processors method
908      !!
909      !!----------------------------------------------------------------------
910      LOGICAL, OPTIONAL, INTENT(in) :: ldfinal    ! source process number
911      LOGICAL, OPTIONAL, INTENT(in) :: ld_force_abort    ! source process number
912      LOGICAL ::   llfinal, ll_force_abort
913      INTEGER ::   info
914      !!----------------------------------------------------------------------
915      llfinal = .FALSE.
916      IF( PRESENT(ldfinal) ) llfinal = ldfinal
917      ll_force_abort = .FALSE.
918      IF( PRESENT(ld_force_abort) ) ll_force_abort = ld_force_abort
919      !
920      IF(ll_force_abort) THEN
921         CALL mpi_abort( MPI_COMM_WORLD )
922      ELSE
923         CALL mppsync
924         CALL mpi_finalize( info )
925      ENDIF
926      IF( .NOT. llfinal ) STOP 123
927      !
928   END SUBROUTINE mppstop
929
930
931   SUBROUTINE mpp_comm_free( kcom )
932      !!----------------------------------------------------------------------
933      INTEGER, INTENT(in) ::   kcom
934      !!
935      INTEGER :: ierr
936      !!----------------------------------------------------------------------
937      !
938      CALL MPI_COMM_FREE(kcom, ierr)
939      !
940   END SUBROUTINE mpp_comm_free
941
942
943   SUBROUTINE mpp_ini_znl( kumout )
944      !!----------------------------------------------------------------------
945      !!               ***  routine mpp_ini_znl  ***
946      !!
947      !! ** Purpose :   Initialize special communicator for computing zonal sum
948      !!
949      !! ** Method  : - Look for processors in the same row
950      !!              - Put their number in nrank_znl
951      !!              - Create group for the znl processors
952      !!              - Create a communicator for znl processors
953      !!              - Determine if processor should write znl files
954      !!
955      !! ** output
956      !!      ndim_rank_znl = number of processors on the same row
957      !!      ngrp_znl = group ID for the znl processors
958      !!      ncomm_znl = communicator for the ice procs.
959      !!      n_znl_root = number (in the world) of proc 0 in the ice comm.
960      !!
961      !!----------------------------------------------------------------------
962      INTEGER, INTENT(in) ::   kumout   ! ocean.output logical units
963      !
964      INTEGER :: jproc      ! dummy loop integer
965      INTEGER :: ierr, ii   ! local integer
966      INTEGER, ALLOCATABLE, DIMENSION(:) ::   kwork
967      !!----------------------------------------------------------------------
968      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_world     : ', ngrp_world
969      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_world : ', mpi_comm_world
970      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_oce   : ', mpi_comm_oce
971      !
972      ALLOCATE( kwork(jpnij), STAT=ierr )
973      IF( ierr /= 0 ) THEN
974         WRITE(kumout, cform_err)
975         WRITE(kumout,*) 'mpp_ini_znl : failed to allocate 1D array of length jpnij'
976         CALL mppstop
977      ENDIF
978
979      IF( jpnj == 1 ) THEN
980         ngrp_znl  = ngrp_world
981         ncomm_znl = mpi_comm_oce
982      ELSE
983         !
984         CALL MPI_ALLGATHER ( njmpp, 1, mpi_integer, kwork, 1, mpi_integer, mpi_comm_oce, ierr )
985         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - kwork pour njmpp : ', kwork
986         !-$$        CALL flush(numout)
987         !
988         ! Count number of processors on the same row
989         ndim_rank_znl = 0
990         DO jproc=1,jpnij
991            IF ( kwork(jproc) == njmpp ) THEN
992               ndim_rank_znl = ndim_rank_znl + 1
993            ENDIF
994         END DO
995         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ndim_rank_znl : ', ndim_rank_znl
996         !-$$        CALL flush(numout)
997         ! Allocate the right size to nrank_znl
998         IF (ALLOCATED (nrank_znl)) DEALLOCATE(nrank_znl)
999         ALLOCATE(nrank_znl(ndim_rank_znl))
1000         ii = 0
1001         nrank_znl (:) = 0
1002         DO jproc=1,jpnij
1003            IF ( kwork(jproc) == njmpp) THEN
1004               ii = ii + 1
1005               nrank_znl(ii) = jproc -1
1006            ENDIF
1007         END DO
1008         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - nrank_znl : ', nrank_znl
1009         !-$$        CALL flush(numout)
1010
1011         ! Create the opa group
1012         CALL MPI_COMM_GROUP(mpi_comm_oce,ngrp_opa,ierr)
1013         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_opa : ', ngrp_opa
1014         !-$$        CALL flush(numout)
1015
1016         ! Create the znl group from the opa group
1017         CALL MPI_GROUP_INCL  ( ngrp_opa, ndim_rank_znl, nrank_znl, ngrp_znl, ierr )
1018         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_znl ', ngrp_znl
1019         !-$$        CALL flush(numout)
1020
1021         ! Create the znl communicator from the opa communicator, ie the pool of procs in the same row
1022         CALL MPI_COMM_CREATE ( mpi_comm_oce, ngrp_znl, ncomm_znl, ierr )
1023         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ncomm_znl ', ncomm_znl
1024         !-$$        CALL flush(numout)
1025         !
1026      END IF
1027
1028      ! Determines if processor if the first (starting from i=1) on the row
1029      IF ( jpni == 1 ) THEN
1030         l_znl_root = .TRUE.
1031      ELSE
1032         l_znl_root = .FALSE.
1033         kwork (1) = nimpp
1034         CALL mpp_min ( 'lib_mpp', kwork(1), kcom = ncomm_znl)
1035         IF ( nimpp == kwork(1)) l_znl_root = .TRUE.
1036      END IF
1037
1038      DEALLOCATE(kwork)
1039
1040   END SUBROUTINE mpp_ini_znl
1041
1042
1043   SUBROUTINE mpp_ini_north
1044      !!----------------------------------------------------------------------
1045      !!               ***  routine mpp_ini_north  ***
1046      !!
1047      !! ** Purpose :   Initialize special communicator for north folding
1048      !!      condition together with global variables needed in the mpp folding
1049      !!
1050      !! ** Method  : - Look for northern processors
1051      !!              - Put their number in nrank_north
1052      !!              - Create groups for the world processors and the north processors
1053      !!              - Create a communicator for northern processors
1054      !!
1055      !! ** output
1056      !!      njmppmax = njmpp for northern procs
1057      !!      ndim_rank_north = number of processors in the northern line
1058      !!      nrank_north (ndim_rank_north) = number  of the northern procs.
1059      !!      ngrp_world = group ID for the world processors
1060      !!      ngrp_north = group ID for the northern processors
1061      !!      ncomm_north = communicator for the northern procs.
1062      !!      north_root = number (in the world) of proc 0 in the northern comm.
1063      !!
1064      !!----------------------------------------------------------------------
1065      INTEGER ::   ierr
1066      INTEGER ::   jjproc
1067      INTEGER ::   ii, ji
1068      !!----------------------------------------------------------------------
1069      !
1070      njmppmax = MAXVAL( njmppt )
1071      !
1072      ! Look for how many procs on the northern boundary
1073      ndim_rank_north = 0
1074      DO jjproc = 1, jpnij
1075         IF( njmppt(jjproc) == njmppmax )   ndim_rank_north = ndim_rank_north + 1
1076      END DO
1077      !
1078      ! Allocate the right size to nrank_north
1079      IF (ALLOCATED (nrank_north)) DEALLOCATE(nrank_north)
1080      ALLOCATE( nrank_north(ndim_rank_north) )
1081
1082      ! Fill the nrank_north array with proc. number of northern procs.
1083      ! Note : the rank start at 0 in MPI
1084      ii = 0
1085      DO ji = 1, jpnij
1086         IF ( njmppt(ji) == njmppmax   ) THEN
1087            ii=ii+1
1088            nrank_north(ii)=ji-1
1089         END IF
1090      END DO
1091      !
1092      ! create the world group
1093      CALL MPI_COMM_GROUP( mpi_comm_oce, ngrp_world, ierr )
1094      !
1095      ! Create the North group from the world group
1096      CALL MPI_GROUP_INCL( ngrp_world, ndim_rank_north, nrank_north, ngrp_north, ierr )
1097      !
1098      ! Create the North communicator , ie the pool of procs in the north group
1099      CALL MPI_COMM_CREATE( mpi_comm_oce, ngrp_north, ncomm_north, ierr )
1100      !
1101   END SUBROUTINE mpp_ini_north
1102
1103
1104   SUBROUTINE mpi_init_oce( ldtxt, ksft, code )
1105      !!---------------------------------------------------------------------
1106      !!                   ***  routine mpp_init.opa  ***
1107      !!
1108      !! ** Purpose :: export and attach a MPI buffer for bsend
1109      !!
1110      !! ** Method  :: define buffer size in namelist, if 0 no buffer attachment
1111      !!            but classical mpi_init
1112      !!
1113      !! History :: 01/11 :: IDRIS initial version for IBM only
1114      !!            08/04 :: R. Benshila, generalisation
1115      !!---------------------------------------------------------------------
1116      CHARACTER(len=*),DIMENSION(:), INTENT(  out) ::   ldtxt
1117      INTEGER                      , INTENT(inout) ::   ksft
1118      INTEGER                      , INTENT(  out) ::   code
1119      INTEGER                                      ::   ierr, ji
1120      LOGICAL                                      ::   mpi_was_called
1121      !!---------------------------------------------------------------------
1122      !
1123      CALL mpi_initialized( mpi_was_called, code )      ! MPI initialization
1124      IF ( code /= MPI_SUCCESS ) THEN
1125         DO ji = 1, SIZE(ldtxt)
1126            IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
1127         END DO
1128         WRITE(*, cform_err)
1129         WRITE(*, *) ' lib_mpp: Error in routine mpi_initialized'
1130         CALL mpi_abort( mpi_comm_world, code, ierr )
1131      ENDIF
1132      !
1133      IF( .NOT. mpi_was_called ) THEN
1134         CALL mpi_init( code )
1135         CALL mpi_comm_dup( mpi_comm_world, mpi_comm_oce, code )
1136         IF ( code /= MPI_SUCCESS ) THEN
1137            DO ji = 1, SIZE(ldtxt)
1138               IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
1139            END DO
1140            WRITE(*, cform_err)
1141            WRITE(*, *) ' lib_mpp: Error in routine mpi_comm_dup'
1142            CALL mpi_abort( mpi_comm_world, code, ierr )
1143         ENDIF
1144      ENDIF
1145      !
1146      IF( nn_buffer > 0 ) THEN
1147         WRITE(ldtxt(ksft),*) 'mpi_bsend, buffer allocation of  : ', nn_buffer   ;   ksft = ksft + 1
1148         ! Buffer allocation and attachment
1149         ALLOCATE( tampon(nn_buffer), stat = ierr )
1150         IF( ierr /= 0 ) THEN
1151            DO ji = 1, SIZE(ldtxt)
1152               IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
1153            END DO
1154            WRITE(*, cform_err)
1155            WRITE(*, *) ' lib_mpp: Error in ALLOCATE', ierr
1156            CALL mpi_abort( mpi_comm_world, code, ierr )
1157         END IF
1158         CALL mpi_buffer_attach( tampon, nn_buffer, code )
1159      ENDIF
1160      !
1161   END SUBROUTINE mpi_init_oce
1162
1163
1164   SUBROUTINE DDPDD_MPI( ydda, yddb, ilen, itype )
1165      !!---------------------------------------------------------------------
1166      !!   Routine DDPDD_MPI: used by reduction operator MPI_SUMDD
1167      !!
1168      !!   Modification of original codes written by David H. Bailey
1169      !!   This subroutine computes yddb(i) = ydda(i)+yddb(i)
1170      !!---------------------------------------------------------------------
1171      INTEGER                     , INTENT(in)    ::   ilen, itype
1172      COMPLEX(wp), DIMENSION(ilen), INTENT(in)    ::   ydda
1173      COMPLEX(wp), DIMENSION(ilen), INTENT(inout) ::   yddb
1174      !
1175      REAL(wp) :: zerr, zt1, zt2    ! local work variables
1176      INTEGER  :: ji, ztmp           ! local scalar
1177      !!---------------------------------------------------------------------
1178      !
1179      ztmp = itype   ! avoid compilation warning
1180      !
1181      DO ji=1,ilen
1182      ! Compute ydda + yddb using Knuth's trick.
1183         zt1  = real(ydda(ji)) + real(yddb(ji))
1184         zerr = zt1 - real(ydda(ji))
1185         zt2  = ((real(yddb(ji)) - zerr) + (real(ydda(ji)) - (zt1 - zerr))) &
1186                + aimag(ydda(ji)) + aimag(yddb(ji))
1187
1188         ! The result is zt1 + zt2, after normalization.
1189         yddb(ji) = cmplx ( zt1 + zt2, zt2 - ((zt1 + zt2) - zt1),wp )
1190      END DO
1191      !
1192   END SUBROUTINE DDPDD_MPI
1193
1194
1195   SUBROUTINE mpp_lbc_north_icb( pt2d, cd_type, psgn, kextj)
1196      !!---------------------------------------------------------------------
1197      !!                   ***  routine mpp_lbc_north_icb  ***
1198      !!
1199      !! ** Purpose :   Ensure proper north fold horizontal bondary condition
1200      !!              in mpp configuration in case of jpn1 > 1 and for 2d
1201      !!              array with outer extra halo
1202      !!
1203      !! ** Method  :   North fold condition and mpp with more than one proc
1204      !!              in i-direction require a specific treatment. We gather
1205      !!              the 4+kextj northern lines of the global domain on 1
1206      !!              processor and apply lbc north-fold on this sub array.
1207      !!              Then we scatter the north fold array back to the processors.
1208      !!              This routine accounts for an extra halo with icebergs
1209      !!              and assumes ghost rows and columns have been suppressed.
1210      !!
1211      !!----------------------------------------------------------------------
1212      REAL(wp), DIMENSION(:,:), INTENT(inout) ::   pt2d     ! 2D array with extra halo
1213      CHARACTER(len=1)        , INTENT(in   ) ::   cd_type  ! nature of pt3d grid-points
1214      !                                                     !   = T ,  U , V , F or W -points
1215      REAL(wp)                , INTENT(in   ) ::   psgn     ! = -1. the sign change across the
1216      !!                                                    ! north fold, =  1. otherwise
1217      INTEGER                 , INTENT(in   ) ::   kextj    ! Extra halo width at north fold
1218      !
1219      INTEGER ::   ji, jj, jr
1220      INTEGER ::   ierr, itaille, ildi, ilei, iilb
1221      INTEGER ::   ipj, ij, iproc
1222      !
1223      REAL(wp), DIMENSION(:,:)  , ALLOCATABLE  ::  ztab_e, znorthloc_e
1224      REAL(wp), DIMENSION(:,:,:), ALLOCATABLE  ::  znorthgloio_e
1225      !!----------------------------------------------------------------------
1226      !
1227      ipj=4
1228      ALLOCATE(        ztab_e(jpiglo, 1-kextj:ipj+kextj)       ,       &
1229     &            znorthloc_e(jpimax, 1-kextj:ipj+kextj)       ,       &
1230     &          znorthgloio_e(jpimax, 1-kextj:ipj+kextj,jpni)    )
1231      !
1232      ztab_e(:,:)      = 0._wp
1233      znorthloc_e(:,:) = 0._wp
1234      !
1235      ij = 1 - kextj
1236      ! put the last ipj+2*kextj lines of pt2d into znorthloc_e
1237      DO jj = jpj - ipj + 1 - kextj , jpj + kextj
1238         znorthloc_e(1:jpi,ij)=pt2d(1:jpi,jj)
1239         ij = ij + 1
1240      END DO
1241      !
1242      itaille = jpimax * ( ipj + 2*kextj )
1243      !
1244      IF( ln_timing ) CALL tic_tac(.TRUE.)
1245      CALL MPI_ALLGATHER( znorthloc_e(1,1-kextj)    , itaille, MPI_DOUBLE_PRECISION,    &
1246         &                znorthgloio_e(1,1-kextj,1), itaille, MPI_DOUBLE_PRECISION,    &
1247         &                ncomm_north, ierr )
1248      !
1249      IF( ln_timing ) CALL tic_tac(.FALSE.)
1250      !
1251      DO jr = 1, ndim_rank_north            ! recover the global north array
1252         iproc = nrank_north(jr) + 1
1253         ildi = nldit (iproc)
1254         ilei = nleit (iproc)
1255         iilb = nimppt(iproc)
1256         DO jj = 1-kextj, ipj+kextj
1257            DO ji = ildi, ilei
1258               ztab_e(ji+iilb-1,jj) = znorthgloio_e(ji,jj,jr)
1259            END DO
1260         END DO
1261      END DO
1262
1263      ! 2. North-Fold boundary conditions
1264      ! ----------------------------------
1265      CALL lbc_nfd( ztab_e(:,1-kextj:ipj+kextj), cd_type, psgn, kextj )
1266
1267      ij = 1 - kextj
1268      !! Scatter back to pt2d
1269      DO jj = jpj - ipj + 1 - kextj , jpj + kextj
1270         DO ji= 1, jpi
1271            pt2d(ji,jj) = ztab_e(ji+nimpp-1,ij)
1272         END DO
1273         ij  = ij +1
1274      END DO
1275      !
1276      DEALLOCATE( ztab_e, znorthloc_e, znorthgloio_e )
1277      !
1278   END SUBROUTINE mpp_lbc_north_icb
1279
1280
1281   SUBROUTINE mpp_lnk_2d_icb( cdname, pt2d, cd_type, psgn, kexti, kextj )
1282      !!----------------------------------------------------------------------
1283      !!                  ***  routine mpp_lnk_2d_icb  ***
1284      !!
1285      !! ** Purpose :   Message passing management for 2d array (with extra halo for icebergs)
1286      !!                This routine receives a (1-kexti:jpi+kexti,1-kexti:jpj+kextj)
1287      !!                array (usually (0:jpi+1, 0:jpj+1)) from lbc_lnk_icb calls.
1288      !!
1289      !! ** Method  :   Use mppsend and mpprecv function for passing mask
1290      !!      between processors following neighboring subdomains.
1291      !!            domain parameters
1292      !!                    jpi    : first dimension of the local subdomain
1293      !!                    jpj    : second dimension of the local subdomain
1294      !!                    kexti  : number of columns for extra outer halo
1295      !!                    kextj  : number of rows for extra outer halo
1296      !!                    nbondi : mark for "east-west local boundary"
1297      !!                    nbondj : mark for "north-south local boundary"
1298      !!                    noea   : number for local neighboring processors
1299      !!                    nowe   : number for local neighboring processors
1300      !!                    noso   : number for local neighboring processors
1301      !!                    nono   : number for local neighboring processors
1302      !!----------------------------------------------------------------------
1303      CHARACTER(len=*)                                        , INTENT(in   ) ::   cdname      ! name of the calling subroutine
1304      REAL(wp), DIMENSION(1-kexti:jpi+kexti,1-kextj:jpj+kextj), INTENT(inout) ::   pt2d     ! 2D array with extra halo
1305      CHARACTER(len=1)                                        , INTENT(in   ) ::   cd_type  ! nature of ptab array grid-points
1306      REAL(wp)                                                , INTENT(in   ) ::   psgn     ! sign used across the north fold
1307      INTEGER                                                 , INTENT(in   ) ::   kexti    ! extra i-halo width
1308      INTEGER                                                 , INTENT(in   ) ::   kextj    ! extra j-halo width
1309      !
1310      INTEGER  ::   jl   ! dummy loop indices
1311      INTEGER  ::   imigr, iihom, ijhom        ! local integers
1312      INTEGER  ::   ipreci, iprecj             !   -       -
1313      INTEGER  ::   ml_req1, ml_req2, ml_err   ! for key_mpi_isend
1314      INTEGER, DIMENSION(MPI_STATUS_SIZE) ::   ml_stat   ! for key_mpi_isend
1315      !!
1316      REAL(wp), DIMENSION(1-kexti:jpi+kexti,nn_hls+kextj,2) ::   r2dns, r2dsn
1317      REAL(wp), DIMENSION(1-kextj:jpj+kextj,nn_hls+kexti,2) ::   r2dwe, r2dew
1318      !!----------------------------------------------------------------------
1319
1320      ipreci = nn_hls + kexti      ! take into account outer extra 2D overlap area
1321      iprecj = nn_hls + kextj
1322
1323      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, 1, 1, 1, ld_lbc = .TRUE. )
1324
1325      ! 1. standard boundary treatment
1326      ! ------------------------------
1327      ! Order matters Here !!!!
1328      !
1329      !                                      ! East-West boundaries
1330      !                                           !* Cyclic east-west
1331      IF( l_Iperio ) THEN
1332         pt2d(1-kexti:     1   ,:) = pt2d(jpim1-kexti: jpim1 ,:)       ! east
1333         pt2d(  jpi  :jpi+kexti,:) = pt2d(     2     :2+kexti,:)       ! west
1334         !
1335      ELSE                                        !* closed
1336         IF( .NOT. cd_type == 'F' )   pt2d(  1-kexti   :nn_hls   ,:) = 0._wp    ! east except at F-point
1337                                      pt2d(jpi-nn_hls+1:jpi+kexti,:) = 0._wp    ! west
1338      ENDIF
1339      !                                      ! North-South boundaries
1340      IF( l_Jperio ) THEN                         !* cyclic (only with no mpp j-split)
1341         pt2d(:,1-kextj:     1   ) = pt2d(:,jpjm1-kextj:  jpjm1)       ! north
1342         pt2d(:,  jpj  :jpj+kextj) = pt2d(:,     2     :2+kextj)       ! south
1343      ELSE                                        !* closed
1344         IF( .NOT. cd_type == 'F' )   pt2d(:,  1-kextj   :nn_hls   ) = 0._wp    ! north except at F-point
1345                                      pt2d(:,jpj-nn_hls+1:jpj+kextj) = 0._wp    ! south
1346      ENDIF
1347      !
1348
1349      ! north fold treatment
1350      ! -----------------------
1351      IF( npolj /= 0 ) THEN
1352         !
1353         SELECT CASE ( jpni )
1354                   CASE ( 1 )     ;   CALL lbc_nfd          ( pt2d(1:jpi,1:jpj+kextj), cd_type, psgn, kextj )
1355                   CASE DEFAULT   ;   CALL mpp_lbc_north_icb( pt2d(1:jpi,1:jpj+kextj), cd_type, psgn, kextj )
1356         END SELECT
1357         !
1358      ENDIF
1359
1360      ! 2. East and west directions exchange
1361      ! ------------------------------------
1362      ! we play with the neigbours AND the row number because of the periodicity
1363      !
1364      SELECT CASE ( nbondi )      ! Read Dirichlet lateral conditions
1365      CASE ( -1, 0, 1 )                ! all exept 2 (i.e. close case)
1366         iihom = jpi-nreci-kexti
1367         DO jl = 1, ipreci
1368            r2dew(:,jl,1) = pt2d(nn_hls+jl,:)
1369            r2dwe(:,jl,1) = pt2d(iihom +jl,:)
1370         END DO
1371      END SELECT
1372      !
1373      !                           ! Migrations
1374      imigr = ipreci * ( jpj + 2*kextj )
1375      !
1376      IF( ln_timing ) CALL tic_tac(.TRUE.)
1377      !
1378      SELECT CASE ( nbondi )
1379      CASE ( -1 )
1380         CALL mppsend( 2, r2dwe(1-kextj,1,1), imigr, noea, ml_req1 )
1381         CALL mpprecv( 1, r2dew(1-kextj,1,2), imigr, noea )
1382         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1383      CASE ( 0 )
1384         CALL mppsend( 1, r2dew(1-kextj,1,1), imigr, nowe, ml_req1 )
1385         CALL mppsend( 2, r2dwe(1-kextj,1,1), imigr, noea, ml_req2 )
1386         CALL mpprecv( 1, r2dew(1-kextj,1,2), imigr, noea )
1387         CALL mpprecv( 2, r2dwe(1-kextj,1,2), imigr, nowe )
1388         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1389         IF(l_isend) CALL mpi_wait(ml_req2,ml_stat,ml_err)
1390      CASE ( 1 )
1391         CALL mppsend( 1, r2dew(1-kextj,1,1), imigr, nowe, ml_req1 )
1392         CALL mpprecv( 2, r2dwe(1-kextj,1,2), imigr, nowe )
1393         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1394      END SELECT
1395      !
1396      IF( ln_timing ) CALL tic_tac(.FALSE.)
1397      !
1398      !                           ! Write Dirichlet lateral conditions
1399      iihom = jpi - nn_hls
1400      !
1401      SELECT CASE ( nbondi )
1402      CASE ( -1 )
1403         DO jl = 1, ipreci
1404            pt2d(iihom+jl,:) = r2dew(:,jl,2)
1405         END DO
1406      CASE ( 0 )
1407         DO jl = 1, ipreci
1408            pt2d(jl-kexti,:) = r2dwe(:,jl,2)
1409            pt2d(iihom+jl,:) = r2dew(:,jl,2)
1410         END DO
1411      CASE ( 1 )
1412         DO jl = 1, ipreci
1413            pt2d(jl-kexti,:) = r2dwe(:,jl,2)
1414         END DO
1415      END SELECT
1416
1417
1418      ! 3. North and south directions
1419      ! -----------------------------
1420      ! always closed : we play only with the neigbours
1421      !
1422      IF( nbondj /= 2 ) THEN      ! Read Dirichlet lateral conditions
1423         ijhom = jpj-nrecj-kextj
1424         DO jl = 1, iprecj
1425            r2dsn(:,jl,1) = pt2d(:,ijhom +jl)
1426            r2dns(:,jl,1) = pt2d(:,nn_hls+jl)
1427         END DO
1428      ENDIF
1429      !
1430      !                           ! Migrations
1431      imigr = iprecj * ( jpi + 2*kexti )
1432      !
1433      IF( ln_timing ) CALL tic_tac(.TRUE.)
1434      !
1435      SELECT CASE ( nbondj )
1436      CASE ( -1 )
1437         CALL mppsend( 4, r2dsn(1-kexti,1,1), imigr, nono, ml_req1 )
1438         CALL mpprecv( 3, r2dns(1-kexti,1,2), imigr, nono )
1439         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1440      CASE ( 0 )
1441         CALL mppsend( 3, r2dns(1-kexti,1,1), imigr, noso, ml_req1 )
1442         CALL mppsend( 4, r2dsn(1-kexti,1,1), imigr, nono, ml_req2 )
1443         CALL mpprecv( 3, r2dns(1-kexti,1,2), imigr, nono )
1444         CALL mpprecv( 4, r2dsn(1-kexti,1,2), imigr, noso )
1445         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1446         IF(l_isend) CALL mpi_wait(ml_req2,ml_stat,ml_err)
1447      CASE ( 1 )
1448         CALL mppsend( 3, r2dns(1-kexti,1,1), imigr, noso, ml_req1 )
1449         CALL mpprecv( 4, r2dsn(1-kexti,1,2), imigr, noso )
1450         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1451      END SELECT
1452      !
1453      IF( ln_timing ) CALL tic_tac(.FALSE.)
1454      !
1455      !                           ! Write Dirichlet lateral conditions
1456      ijhom = jpj - nn_hls
1457      !
1458      SELECT CASE ( nbondj )
1459      CASE ( -1 )
1460         DO jl = 1, iprecj
1461            pt2d(:,ijhom+jl) = r2dns(:,jl,2)
1462         END DO
1463      CASE ( 0 )
1464         DO jl = 1, iprecj
1465            pt2d(:,jl-kextj) = r2dsn(:,jl,2)
1466            pt2d(:,ijhom+jl) = r2dns(:,jl,2)
1467         END DO
1468      CASE ( 1 )
1469         DO jl = 1, iprecj
1470            pt2d(:,jl-kextj) = r2dsn(:,jl,2)
1471         END DO
1472      END SELECT
1473      !
1474   END SUBROUTINE mpp_lnk_2d_icb
1475
1476
1477   SUBROUTINE mpp_report( cdname, kpk, kpl, kpf, ld_lbc, ld_glb, ld_dlg )
1478      !!----------------------------------------------------------------------
1479      !!                  ***  routine mpp_report  ***
1480      !!
1481      !! ** Purpose :   report use of mpp routines per time-setp
1482      !!
1483      !!----------------------------------------------------------------------
1484      CHARACTER(len=*),           INTENT(in   ) ::   cdname      ! name of the calling subroutine
1485      INTEGER         , OPTIONAL, INTENT(in   ) ::   kpk, kpl, kpf
1486      LOGICAL         , OPTIONAL, INTENT(in   ) ::   ld_lbc, ld_glb, ld_dlg
1487      !!
1488      LOGICAL ::   ll_lbc, ll_glb, ll_dlg
1489      INTEGER ::    ji,  jj,  jk,  jh, jf   ! dummy loop indices
1490      !!----------------------------------------------------------------------
1491      !
1492      ll_lbc = .FALSE.
1493      IF( PRESENT(ld_lbc) ) ll_lbc = ld_lbc
1494      ll_glb = .FALSE.
1495      IF( PRESENT(ld_glb) ) ll_glb = ld_glb
1496      ll_dlg = .FALSE.
1497      IF( PRESENT(ld_dlg) ) ll_dlg = ld_dlg
1498      !
1499      ! find the smallest common frequency: default = frequency product, if multiple, choose the larger of the 2 frequency
1500      IF( ncom_dttrc /= 1 )   CALL ctl_stop( 'STOP', 'mpp_report, ncom_dttrc /= 1 not coded...' ) 
1501      ncom_freq = ncom_fsbc
1502      !
1503      IF ( ncom_stp == nit000+ncom_freq ) THEN   ! avoid to count extra communications in potential initializations at nit000
1504         IF( ll_lbc ) THEN
1505            IF( .NOT. ALLOCATED(ncomm_sequence) ) ALLOCATE( ncomm_sequence(ncom_rec_max,2) )
1506            IF( .NOT. ALLOCATED(    crname_lbc) ) ALLOCATE(     crname_lbc(ncom_rec_max  ) )
1507            n_sequence_lbc = n_sequence_lbc + 1
1508            IF( n_sequence_lbc > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1509            crname_lbc(n_sequence_lbc) = cdname     ! keep the name of the calling routine
1510            ncomm_sequence(n_sequence_lbc,1) = kpk*kpl   ! size of 3rd and 4th dimensions
1511            ncomm_sequence(n_sequence_lbc,2) = kpf       ! number of arrays to be treated (multi)
1512         ENDIF
1513         IF( ll_glb ) THEN
1514            IF( .NOT. ALLOCATED(crname_glb) ) ALLOCATE( crname_glb(ncom_rec_max) )
1515            n_sequence_glb = n_sequence_glb + 1
1516            IF( n_sequence_glb > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1517            crname_glb(n_sequence_glb) = cdname     ! keep the name of the calling routine
1518         ENDIF
1519         IF( ll_dlg ) THEN
1520            IF( .NOT. ALLOCATED(crname_dlg) ) ALLOCATE( crname_dlg(ncom_rec_max) )
1521            n_sequence_dlg = n_sequence_dlg + 1
1522            IF( n_sequence_dlg > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1523            crname_dlg(n_sequence_dlg) = cdname     ! keep the name of the calling routine
1524         ENDIF
1525      ELSE IF ( ncom_stp == nit000+2*ncom_freq .AND. nprint > 1 ) THEN
1526         CALL ctl_opn( numcom, 'communication_report.txt', 'REPLACE', 'FORMATTED', 'SEQUENTIAL', -1, numout, .FALSE., narea )
1527         WRITE(numcom,*) ' '
1528         WRITE(numcom,*) ' ------------------------------------------------------------'
1529         WRITE(numcom,*) ' Communication pattern report (second oce+sbc+top time step):'
1530         WRITE(numcom,*) ' ------------------------------------------------------------'
1531         WRITE(numcom,*) ' '
1532         WRITE(numcom,'(A,I4)') ' Exchanged halos : ', n_sequence_lbc
1533         jj = 0; jk = 0; jf = 0; jh = 0
1534         DO ji = 1, n_sequence_lbc
1535            IF ( ncomm_sequence(ji,1) .GT. 1 ) jk = jk + 1
1536            IF ( ncomm_sequence(ji,2) .GT. 1 ) jf = jf + 1
1537            IF ( ncomm_sequence(ji,1) .GT. 1 .AND. ncomm_sequence(ji,2) .GT. 1 ) jj = jj + 1
1538            jh = MAX (jh, ncomm_sequence(ji,1)*ncomm_sequence(ji,2))
1539         END DO
1540         WRITE(numcom,'(A,I3)') ' 3D Exchanged halos : ', jk
1541         WRITE(numcom,'(A,I3)') ' Multi arrays exchanged halos : ', jf
1542         WRITE(numcom,'(A,I3)') '   from which 3D : ', jj
1543         WRITE(numcom,'(A,I10)') ' Array max size : ', jh*jpi*jpj
1544         WRITE(numcom,*) ' '
1545         WRITE(numcom,*) ' lbc_lnk called'
1546         jj = 1
1547         DO ji = 2, n_sequence_lbc
1548            IF( crname_lbc(ji-1) /= crname_lbc(ji) ) THEN
1549               WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_lbc(ji-1))
1550               jj = 0
1551            END IF
1552            jj = jj + 1 
1553         END DO
1554         WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_lbc(n_sequence_lbc))
1555         WRITE(numcom,*) ' '
1556         IF ( n_sequence_glb > 0 ) THEN
1557            WRITE(numcom,'(A,I4)') ' Global communications : ', n_sequence_glb
1558            jj = 1
1559            DO ji = 2, n_sequence_glb
1560               IF( crname_glb(ji-1) /= crname_glb(ji) ) THEN
1561                  WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(ji-1))
1562                  jj = 0
1563               END IF
1564               jj = jj + 1 
1565            END DO
1566            WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(n_sequence_glb))
1567            DEALLOCATE(crname_glb)
1568         ELSE
1569            WRITE(numcom,*) ' No MPI global communication '
1570         ENDIF
1571         WRITE(numcom,*) ' '
1572         IF ( n_sequence_dlg > 0 ) THEN
1573            WRITE(numcom,'(A,I4)') ' Delayed global communications : ', n_sequence_dlg
1574            jj = 1
1575            DO ji = 2, n_sequence_dlg
1576               IF( crname_dlg(ji-1) /= crname_dlg(ji) ) THEN
1577                  WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_dlg(ji-1))
1578                  jj = 0
1579               END IF
1580               jj = jj + 1 
1581            END DO
1582            WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_dlg(n_sequence_dlg))
1583            DEALLOCATE(crname_dlg)
1584         ELSE
1585            WRITE(numcom,*) ' No MPI delayed global communication '
1586         ENDIF
1587         WRITE(numcom,*) ' '
1588         WRITE(numcom,*) ' -----------------------------------------------'
1589         WRITE(numcom,*) ' '
1590         DEALLOCATE(ncomm_sequence)
1591         DEALLOCATE(crname_lbc)
1592      ENDIF
1593   END SUBROUTINE mpp_report
1594
1595   
1596   SUBROUTINE tic_tac (ld_tic, ld_global)
1597
1598    LOGICAL,           INTENT(IN) :: ld_tic
1599    LOGICAL, OPTIONAL, INTENT(IN) :: ld_global
1600    REAL(wp), DIMENSION(2), SAVE :: tic_wt
1601    REAL(wp),               SAVE :: tic_ct = 0._wp
1602    INTEGER :: ii
1603
1604    IF( ncom_stp <= nit000 ) RETURN
1605    IF( ncom_stp == nitend ) RETURN
1606    ii = 1
1607    IF( PRESENT( ld_global ) ) THEN
1608       IF( ld_global ) ii = 2
1609    END IF
1610   
1611    IF ( ld_tic ) THEN
1612       tic_wt(ii) = MPI_Wtime()                                                    ! start count tic->tac (waiting time)
1613       IF ( tic_ct > 0.0_wp ) compute_time = compute_time + MPI_Wtime() - tic_ct   ! cumulate count tac->tic
1614    ELSE
1615       waiting_time(ii) = waiting_time(ii) + MPI_Wtime() - tic_wt(ii)              ! cumulate count tic->tac
1616       tic_ct = MPI_Wtime()                                                        ! start count tac->tic (waiting time)
1617    ENDIF
1618   
1619   END SUBROUTINE tic_tac
1620
1621   
1622#else
1623   !!----------------------------------------------------------------------
1624   !!   Default case:            Dummy module        share memory computing
1625   !!----------------------------------------------------------------------
1626   USE in_out_manager
1627
1628   INTERFACE mpp_sum
1629      MODULE PROCEDURE mppsum_int, mppsum_a_int, mppsum_real, mppsum_a_real, mppsum_realdd, mppsum_a_realdd
1630   END INTERFACE
1631   INTERFACE mpp_max
1632      MODULE PROCEDURE mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real
1633   END INTERFACE
1634   INTERFACE mpp_min
1635      MODULE PROCEDURE mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real
1636   END INTERFACE
1637   INTERFACE mpp_minloc
1638      MODULE PROCEDURE mpp_minloc2d ,mpp_minloc3d
1639   END INTERFACE
1640   INTERFACE mpp_maxloc
1641      MODULE PROCEDURE mpp_maxloc2d ,mpp_maxloc3d
1642   END INTERFACE
1643
1644   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .FALSE.      !: mpp flag
1645   LOGICAL, PUBLIC            ::   ln_nnogather          !: namelist control of northfold comms (needed here in case "key_mpp_mpi" is not used)
1646   INTEGER, PUBLIC            ::   mpi_comm_oce          ! opa local communicator
1647
1648   INTEGER, PARAMETER, PUBLIC               ::   nbdelay = 0   ! make sure we don't enter loops: DO ji = 1, nbdelay
1649   CHARACTER(len=32), DIMENSION(1), PUBLIC  ::   c_delaylist = 'empty'
1650   CHARACTER(len=32), DIMENSION(1), PUBLIC  ::   c_delaycpnt = 'empty'
1651   LOGICAL, PUBLIC                          ::   l_full_nf_update = .TRUE.
1652   TYPE ::   DELAYARR
1653      REAL(   wp), POINTER, DIMENSION(:) ::  z1d => NULL()
1654      COMPLEX(wp), POINTER, DIMENSION(:) ::  y1d => NULL()
1655   END TYPE DELAYARR
1656   TYPE( DELAYARR ), DIMENSION(1), PUBLIC  ::   todelay             
1657   INTEGER,  PUBLIC, DIMENSION(1)           ::   ndelayid = -1
1658   !!----------------------------------------------------------------------
1659CONTAINS
1660
1661   INTEGER FUNCTION lib_mpp_alloc(kumout)          ! Dummy function
1662      INTEGER, INTENT(in) ::   kumout
1663      lib_mpp_alloc = 0
1664   END FUNCTION lib_mpp_alloc
1665
1666   FUNCTION mynode( ldtxt, ldname, kumnam_ref, knumnam_cfg,  kumond , kstop, localComm ) RESULT (function_value)
1667      INTEGER, OPTIONAL            , INTENT(in   ) ::   localComm
1668      CHARACTER(len=*),DIMENSION(:) ::   ldtxt
1669      CHARACTER(len=*) ::   ldname
1670      INTEGER ::   kumnam_ref, knumnam_cfg , kumond , kstop
1671      IF( PRESENT( localComm ) ) mpi_comm_oce = localComm
1672      function_value = 0
1673      IF( .FALSE. )   ldtxt(:) = 'never done'
1674      CALL ctl_opn( kumond, TRIM(ldname), 'UNKNOWN', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. , 1 )
1675   END FUNCTION mynode
1676
1677   SUBROUTINE mppsync                       ! Dummy routine
1678   END SUBROUTINE mppsync
1679
1680   !!----------------------------------------------------------------------
1681   !!    ***  mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real  ***
1682   !!   
1683   !!----------------------------------------------------------------------
1684   !!
1685#  define OPERATION_MAX
1686#  define INTEGER_TYPE
1687#  define DIM_0d
1688#     define ROUTINE_ALLREDUCE           mppmax_int
1689#     include "mpp_allreduce_generic.h90"
1690#     undef ROUTINE_ALLREDUCE
1691#  undef DIM_0d
1692#  define DIM_1d
1693#     define ROUTINE_ALLREDUCE           mppmax_a_int
1694#     include "mpp_allreduce_generic.h90"
1695#     undef ROUTINE_ALLREDUCE
1696#  undef DIM_1d
1697#  undef INTEGER_TYPE
1698!
1699#  define REAL_TYPE
1700#  define DIM_0d
1701#     define ROUTINE_ALLREDUCE           mppmax_real
1702#     include "mpp_allreduce_generic.h90"
1703#     undef ROUTINE_ALLREDUCE
1704#  undef DIM_0d
1705#  define DIM_1d
1706#     define ROUTINE_ALLREDUCE           mppmax_a_real
1707#     include "mpp_allreduce_generic.h90"
1708#     undef ROUTINE_ALLREDUCE
1709#  undef DIM_1d
1710#  undef REAL_TYPE
1711#  undef OPERATION_MAX
1712   !!----------------------------------------------------------------------
1713   !!    ***  mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real  ***
1714   !!   
1715   !!----------------------------------------------------------------------
1716   !!
1717#  define OPERATION_MIN
1718#  define INTEGER_TYPE
1719#  define DIM_0d
1720#     define ROUTINE_ALLREDUCE           mppmin_int
1721#     include "mpp_allreduce_generic.h90"
1722#     undef ROUTINE_ALLREDUCE
1723#  undef DIM_0d
1724#  define DIM_1d
1725#     define ROUTINE_ALLREDUCE           mppmin_a_int
1726#     include "mpp_allreduce_generic.h90"
1727#     undef ROUTINE_ALLREDUCE
1728#  undef DIM_1d
1729#  undef INTEGER_TYPE
1730!
1731#  define REAL_TYPE
1732#  define DIM_0d
1733#     define ROUTINE_ALLREDUCE           mppmin_real
1734#     include "mpp_allreduce_generic.h90"
1735#     undef ROUTINE_ALLREDUCE
1736#  undef DIM_0d
1737#  define DIM_1d
1738#     define ROUTINE_ALLREDUCE           mppmin_a_real
1739#     include "mpp_allreduce_generic.h90"
1740#     undef ROUTINE_ALLREDUCE
1741#  undef DIM_1d
1742#  undef REAL_TYPE
1743#  undef OPERATION_MIN
1744
1745   !!----------------------------------------------------------------------
1746   !!    ***  mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real  ***
1747   !!   
1748   !!   Global sum of 1D array or a variable (integer, real or complex)
1749   !!----------------------------------------------------------------------
1750   !!
1751#  define OPERATION_SUM
1752#  define INTEGER_TYPE
1753#  define DIM_0d
1754#     define ROUTINE_ALLREDUCE           mppsum_int
1755#     include "mpp_allreduce_generic.h90"
1756#     undef ROUTINE_ALLREDUCE
1757#  undef DIM_0d
1758#  define DIM_1d
1759#     define ROUTINE_ALLREDUCE           mppsum_a_int
1760#     include "mpp_allreduce_generic.h90"
1761#     undef ROUTINE_ALLREDUCE
1762#  undef DIM_1d
1763#  undef INTEGER_TYPE
1764!
1765#  define REAL_TYPE
1766#  define DIM_0d
1767#     define ROUTINE_ALLREDUCE           mppsum_real
1768#     include "mpp_allreduce_generic.h90"
1769#     undef ROUTINE_ALLREDUCE
1770#  undef DIM_0d
1771#  define DIM_1d
1772#     define ROUTINE_ALLREDUCE           mppsum_a_real
1773#     include "mpp_allreduce_generic.h90"
1774#     undef ROUTINE_ALLREDUCE
1775#  undef DIM_1d
1776#  undef REAL_TYPE
1777#  undef OPERATION_SUM
1778
1779#  define OPERATION_SUM_DD
1780#  define COMPLEX_TYPE
1781#  define DIM_0d
1782#     define ROUTINE_ALLREDUCE           mppsum_realdd
1783#     include "mpp_allreduce_generic.h90"
1784#     undef ROUTINE_ALLREDUCE
1785#  undef DIM_0d
1786#  define DIM_1d
1787#     define ROUTINE_ALLREDUCE           mppsum_a_realdd
1788#     include "mpp_allreduce_generic.h90"
1789#     undef ROUTINE_ALLREDUCE
1790#  undef DIM_1d
1791#  undef COMPLEX_TYPE
1792#  undef OPERATION_SUM_DD
1793
1794   !!----------------------------------------------------------------------
1795   !!    ***  mpp_minloc2d, mpp_minloc3d, mpp_maxloc2d, mpp_maxloc3d
1796   !!   
1797   !!----------------------------------------------------------------------
1798   !!
1799#  define OPERATION_MINLOC
1800#  define DIM_2d
1801#     define ROUTINE_LOC           mpp_minloc2d
1802#     include "mpp_loc_generic.h90"
1803#     undef ROUTINE_LOC
1804#  undef DIM_2d
1805#  define DIM_3d
1806#     define ROUTINE_LOC           mpp_minloc3d
1807#     include "mpp_loc_generic.h90"
1808#     undef ROUTINE_LOC
1809#  undef DIM_3d
1810#  undef OPERATION_MINLOC
1811
1812#  define OPERATION_MAXLOC
1813#  define DIM_2d
1814#     define ROUTINE_LOC           mpp_maxloc2d
1815#     include "mpp_loc_generic.h90"
1816#     undef ROUTINE_LOC
1817#  undef DIM_2d
1818#  define DIM_3d
1819#     define ROUTINE_LOC           mpp_maxloc3d
1820#     include "mpp_loc_generic.h90"
1821#     undef ROUTINE_LOC
1822#  undef DIM_3d
1823#  undef OPERATION_MAXLOC
1824
1825   SUBROUTINE mpp_delay_sum( cdname, cdelay, y_in, pout, ldlast, kcom )
1826      CHARACTER(len=*), INTENT(in   )               ::   cdname  ! name of the calling subroutine
1827      CHARACTER(len=*), INTENT(in   )               ::   cdelay  ! name (used as id) of the delayed operation
1828      COMPLEX(wp),      INTENT(in   ), DIMENSION(:) ::   y_in
1829      REAL(wp),         INTENT(  out), DIMENSION(:) ::   pout
1830      LOGICAL,          INTENT(in   )               ::   ldlast  ! true if this is the last time we call this routine
1831      INTEGER,          INTENT(in   ), OPTIONAL     ::   kcom
1832      !
1833      pout(:) = REAL(y_in(:), wp)
1834   END SUBROUTINE mpp_delay_sum
1835
1836   SUBROUTINE mpp_delay_max( cdname, cdelay, p_in, pout, ldlast, kcom )
1837      CHARACTER(len=*), INTENT(in   )               ::   cdname  ! name of the calling subroutine
1838      CHARACTER(len=*), INTENT(in   )               ::   cdelay  ! name (used as id) of the delayed operation
1839      REAL(wp),         INTENT(in   ), DIMENSION(:) ::   p_in
1840      REAL(wp),         INTENT(  out), DIMENSION(:) ::   pout
1841      LOGICAL,          INTENT(in   )               ::   ldlast  ! true if this is the last time we call this routine
1842      INTEGER,          INTENT(in   ), OPTIONAL     ::   kcom
1843      !
1844      pout(:) = p_in(:)
1845   END SUBROUTINE mpp_delay_max
1846
1847   SUBROUTINE mpp_delay_rcv( kid )
1848      INTEGER,INTENT(in   )      ::  kid 
1849      WRITE(*,*) 'mpp_delay_rcv: You should not have seen this print! error?', kid
1850   END SUBROUTINE mpp_delay_rcv
1851   
1852   SUBROUTINE mppstop( ldfinal, ld_force_abort )
1853      LOGICAL, OPTIONAL, INTENT(in) :: ldfinal    ! source process number
1854      LOGICAL, OPTIONAL, INTENT(in) :: ld_force_abort    ! source process number
1855      STOP      ! non MPP case, just stop the run
1856   END SUBROUTINE mppstop
1857
1858   SUBROUTINE mpp_ini_znl( knum )
1859      INTEGER :: knum
1860      WRITE(*,*) 'mpp_ini_znl: You should not have seen this print! error?', knum
1861   END SUBROUTINE mpp_ini_znl
1862
1863   SUBROUTINE mpp_comm_free( kcom )
1864      INTEGER :: kcom
1865      WRITE(*,*) 'mpp_comm_free: You should not have seen this print! error?', kcom
1866   END SUBROUTINE mpp_comm_free
1867   
1868#endif
1869
1870   !!----------------------------------------------------------------------
1871   !!   All cases:         ctl_stop, ctl_warn, get_unit, ctl_opn, ctl_nam   routines
1872   !!----------------------------------------------------------------------
1873
1874   SUBROUTINE ctl_stop( cd1, cd2, cd3, cd4, cd5 ,   &
1875      &                 cd6, cd7, cd8, cd9, cd10 )
1876      !!----------------------------------------------------------------------
1877      !!                  ***  ROUTINE  stop_opa  ***
1878      !!
1879      !! ** Purpose :   print in ocean.outpput file a error message and
1880      !!                increment the error number (nstop) by one.
1881      !!----------------------------------------------------------------------
1882      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd1, cd2, cd3, cd4, cd5
1883      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd6, cd7, cd8, cd9, cd10
1884      !!----------------------------------------------------------------------
1885      !
1886      nstop = nstop + 1
1887
1888      ! force to open ocean.output file
1889      IF( numout == 6 ) CALL ctl_opn( numout, 'ocean.output', 'APPEND', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. )
1890       
1891      WRITE(numout,cform_err)
1892      IF( PRESENT(cd1 ) )   WRITE(numout,*) TRIM(cd1)
1893      IF( PRESENT(cd2 ) )   WRITE(numout,*) TRIM(cd2)
1894      IF( PRESENT(cd3 ) )   WRITE(numout,*) TRIM(cd3)
1895      IF( PRESENT(cd4 ) )   WRITE(numout,*) TRIM(cd4)
1896      IF( PRESENT(cd5 ) )   WRITE(numout,*) TRIM(cd5)
1897      IF( PRESENT(cd6 ) )   WRITE(numout,*) TRIM(cd6)
1898      IF( PRESENT(cd7 ) )   WRITE(numout,*) TRIM(cd7)
1899      IF( PRESENT(cd8 ) )   WRITE(numout,*) TRIM(cd8)
1900      IF( PRESENT(cd9 ) )   WRITE(numout,*) TRIM(cd9)
1901      IF( PRESENT(cd10) )   WRITE(numout,*) TRIM(cd10)
1902
1903                               CALL FLUSH(numout    )
1904      IF( numstp     /= -1 )   CALL FLUSH(numstp    )
1905      IF( numrun     /= -1 )   CALL FLUSH(numrun    )
1906      IF( numevo_ice /= -1 )   CALL FLUSH(numevo_ice)
1907      !
1908      IF( cd1 == 'STOP' ) THEN
1909         WRITE(numout,*)  'huge E-R-R-O-R : immediate stop'
1910         CALL mppstop(ld_force_abort = .true.)
1911      ENDIF
1912      !
1913   END SUBROUTINE ctl_stop
1914
1915
1916   SUBROUTINE ctl_warn( cd1, cd2, cd3, cd4, cd5,   &
1917      &                 cd6, cd7, cd8, cd9, cd10 )
1918      !!----------------------------------------------------------------------
1919      !!                  ***  ROUTINE  stop_warn  ***
1920      !!
1921      !! ** Purpose :   print in ocean.outpput file a error message and
1922      !!                increment the warning number (nwarn) by one.
1923      !!----------------------------------------------------------------------
1924      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd1, cd2, cd3, cd4, cd5
1925      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd6, cd7, cd8, cd9, cd10
1926      !!----------------------------------------------------------------------
1927      !
1928      nwarn = nwarn + 1
1929      IF(lwp) THEN
1930         WRITE(numout,cform_war)
1931         IF( PRESENT(cd1 ) ) WRITE(numout,*) TRIM(cd1)
1932         IF( PRESENT(cd2 ) ) WRITE(numout,*) TRIM(cd2)
1933         IF( PRESENT(cd3 ) ) WRITE(numout,*) TRIM(cd3)
1934         IF( PRESENT(cd4 ) ) WRITE(numout,*) TRIM(cd4)
1935         IF( PRESENT(cd5 ) ) WRITE(numout,*) TRIM(cd5)
1936         IF( PRESENT(cd6 ) ) WRITE(numout,*) TRIM(cd6)
1937         IF( PRESENT(cd7 ) ) WRITE(numout,*) TRIM(cd7)
1938         IF( PRESENT(cd8 ) ) WRITE(numout,*) TRIM(cd8)
1939         IF( PRESENT(cd9 ) ) WRITE(numout,*) TRIM(cd9)
1940         IF( PRESENT(cd10) ) WRITE(numout,*) TRIM(cd10)
1941      ENDIF
1942      CALL FLUSH(numout)
1943      !
1944   END SUBROUTINE ctl_warn
1945
1946
1947   SUBROUTINE ctl_opn( knum, cdfile, cdstat, cdform, cdacce, klengh, kout, ldwp, karea )
1948      !!----------------------------------------------------------------------
1949      !!                  ***  ROUTINE ctl_opn  ***
1950      !!
1951      !! ** Purpose :   Open file and check if required file is available.
1952      !!
1953      !! ** Method  :   Fortan open
1954      !!----------------------------------------------------------------------
1955      INTEGER          , INTENT(  out) ::   knum      ! logical unit to open
1956      CHARACTER(len=*) , INTENT(in   ) ::   cdfile    ! file name to open
1957      CHARACTER(len=*) , INTENT(in   ) ::   cdstat    ! disposition specifier
1958      CHARACTER(len=*) , INTENT(in   ) ::   cdform    ! formatting specifier
1959      CHARACTER(len=*) , INTENT(in   ) ::   cdacce    ! access specifier
1960      INTEGER          , INTENT(in   ) ::   klengh    ! record length
1961      INTEGER          , INTENT(in   ) ::   kout      ! number of logical units for write
1962      LOGICAL          , INTENT(in   ) ::   ldwp      ! boolean term for print
1963      INTEGER, OPTIONAL, INTENT(in   ) ::   karea     ! proc number
1964      !
1965      CHARACTER(len=80) ::   clfile
1966      INTEGER           ::   iost
1967      !!----------------------------------------------------------------------
1968      !
1969      ! adapt filename
1970      ! ----------------
1971      clfile = TRIM(cdfile)
1972      IF( PRESENT( karea ) ) THEN
1973         IF( karea > 1 )   WRITE(clfile, "(a,'_',i4.4)") TRIM(clfile), karea-1
1974      ENDIF
1975#if defined key_agrif
1976      IF( .NOT. Agrif_Root() )   clfile = TRIM(Agrif_CFixed())//'_'//TRIM(clfile)
1977      knum=Agrif_Get_Unit()
1978#else
1979      knum=get_unit()
1980#endif
1981      IF( TRIM(cdfile) == '/dev/null' )   clfile = TRIM(cdfile)   ! force the use of /dev/null
1982      !
1983      iost=0
1984      IF( cdacce(1:6) == 'DIRECT' )  THEN         ! cdacce has always more than 6 characters
1985         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat, RECL=klengh         , ERR=100, IOSTAT=iost )
1986      ELSE IF( TRIM(cdstat) == 'APPEND' )  THEN   ! cdstat can have less than 6 characters
1987         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS='UNKNOWN', POSITION='APPEND', ERR=100, IOSTAT=iost )
1988      ELSE
1989         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )
1990      ENDIF
1991      IF( iost /= 0 .AND. TRIM(clfile) == '/dev/null' ) &   ! for windows
1992         &  OPEN(UNIT=knum,FILE='NUL', FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )   
1993      IF( iost == 0 ) THEN
1994         IF(ldwp .AND. nprint > 2) THEN
1995            WRITE(kout,*) '     file   : ', TRIM(clfile),' open ok'
1996            WRITE(kout,*) '     unit   = ', knum
1997            WRITE(kout,*) '     status = ', cdstat
1998            WRITE(kout,*) '     form   = ', cdform
1999            WRITE(kout,*) '     access = ', cdacce
2000            WRITE(kout,*)
2001         ENDIF
2002      ENDIF
2003100   CONTINUE
2004      IF( iost /= 0 ) THEN
2005         IF(ldwp) THEN
2006            WRITE(kout,*)
2007            WRITE(kout,*) ' ===>>>> : bad opening file: ', TRIM(clfile)
2008            WRITE(kout,*) ' =======   ===  '
2009            WRITE(kout,*) '           unit   = ', knum
2010            WRITE(kout,*) '           status = ', cdstat
2011            WRITE(kout,*) '           form   = ', cdform
2012            WRITE(kout,*) '           access = ', cdacce
2013            WRITE(kout,*) '           iostat = ', iost
2014            WRITE(kout,*) '           we stop. verify the file '
2015            WRITE(kout,*)
2016         ELSE  !!! Force writing to make sure we get the information - at least once - in this violent STOP!!
2017            WRITE(*,*)
2018            WRITE(*,*) ' ===>>>> : bad opening file: ', TRIM(clfile)
2019            WRITE(*,*) ' =======   ===  '
2020            WRITE(*,*) '           unit   = ', knum
2021            WRITE(*,*) '           status = ', cdstat
2022            WRITE(*,*) '           form   = ', cdform
2023            WRITE(*,*) '           access = ', cdacce
2024            WRITE(*,*) '           iostat = ', iost
2025            WRITE(*,*) '           we stop. verify the file '
2026            WRITE(*,*)
2027         ENDIF
2028         CALL FLUSH( kout ) 
2029         STOP 'ctl_opn bad opening'
2030      ENDIF
2031      !
2032   END SUBROUTINE ctl_opn
2033
2034
2035   SUBROUTINE ctl_nam ( kios, cdnam, ldwp )
2036      !!----------------------------------------------------------------------
2037      !!                  ***  ROUTINE ctl_nam  ***
2038      !!
2039      !! ** Purpose :   Informations when error while reading a namelist
2040      !!
2041      !! ** Method  :   Fortan open
2042      !!----------------------------------------------------------------------
2043      INTEGER         , INTENT(inout) ::   kios    ! IO status after reading the namelist
2044      CHARACTER(len=*), INTENT(in   ) ::   cdnam   ! group name of namelist for which error occurs
2045      CHARACTER(len=5)                ::   clios   ! string to convert iostat in character for print
2046      LOGICAL         , INTENT(in   ) ::   ldwp    ! boolean term for print
2047      !!----------------------------------------------------------------------
2048      !
2049      WRITE (clios, '(I5.0)')   kios
2050      IF( kios < 0 ) THEN         
2051         CALL ctl_warn( 'end of record or file while reading namelist '   &
2052            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
2053      ENDIF
2054      !
2055      IF( kios > 0 ) THEN
2056         CALL ctl_stop( 'misspelled variable in namelist '   &
2057            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
2058      ENDIF
2059      kios = 0
2060      RETURN
2061      !
2062   END SUBROUTINE ctl_nam
2063
2064
2065   INTEGER FUNCTION get_unit()
2066      !!----------------------------------------------------------------------
2067      !!                  ***  FUNCTION  get_unit  ***
2068      !!
2069      !! ** Purpose :   return the index of an unused logical unit
2070      !!----------------------------------------------------------------------
2071      LOGICAL :: llopn
2072      !!----------------------------------------------------------------------
2073      !
2074      get_unit = 15   ! choose a unit that is big enough then it is not already used in NEMO
2075      llopn = .TRUE.
2076      DO WHILE( (get_unit < 998) .AND. llopn )
2077         get_unit = get_unit + 1
2078         INQUIRE( unit = get_unit, opened = llopn )
2079      END DO
2080      IF( (get_unit == 999) .AND. llopn ) THEN
2081         CALL ctl_stop( 'get_unit: All logical units until 999 are used...' )
2082         get_unit = -1
2083      ENDIF
2084      !
2085   END FUNCTION get_unit
2086
2087   !!----------------------------------------------------------------------
2088END MODULE lib_mpp
Note: See TracBrowser for help on using the repository browser.