New URL for NEMO forge!   http://forge.nemo-ocean.eu

Since March 2022 along with NEMO 4.2 release, the code development moved to a self-hosted GitLab.
This present forge is now archived and remained online for history.
lib_mpp.F90 in NEMO/branches/2018/dev_r9759_HPC09_ESIWACE/src/OCE/LBC – NEMO

source: NEMO/branches/2018/dev_r9759_HPC09_ESIWACE/src/OCE/LBC/lib_mpp.F90 @ 10136

Last change on this file since 10136 was 10136, checked in by dguibert, 6 years ago

bull: async/datatype

Experimental changes to enable/study/bench various mpi "optimisations":

  • BULL_ASYNC
  • BULL_DATATYPE_VECTOR/SUBARRAY

this has been applied to the nonosc subroutine (only for now).

  • Property svn:keywords set to Id
File size: 91.7 KB
Line 
1MODULE lib_mpp
2   !!======================================================================
3   !!                       ***  MODULE  lib_mpp  ***
4   !! Ocean numerics:  massively parallel processing library
5   !!=====================================================================
6   !! History :  OPA  !  1994  (M. Guyon, J. Escobar, M. Imbard)  Original code
7   !!            7.0  !  1997  (A.M. Treguier)  SHMEM additions
8   !!            8.0  !  1998  (M. Imbard, J. Escobar, L. Colombet ) SHMEM and MPI
9   !!                 !  1998  (J.M. Molines) Open boundary conditions
10   !!   NEMO     1.0  !  2003  (J.M. Molines, G. Madec)  F90, free form
11   !!                 !  2003  (J.M. Molines) add mpp_ini_north(_3d,_2d)
12   !!             -   !  2004  (R. Bourdalle Badie)  isend option in mpi
13   !!                 !  2004  (J.M. Molines) minloc, maxloc
14   !!             -   !  2005  (G. Madec, S. Masson)  npolj=5,6 F-point & ice cases
15   !!             -   !  2005  (R. Redler) Replacement of MPI_COMM_WORLD except for MPI_Abort
16   !!             -   !  2005  (R. Benshila, G. Madec)  add extra halo case
17   !!             -   !  2008  (R. Benshila) add mpp_ini_ice
18   !!            3.2  !  2009  (R. Benshila) SHMEM suppression, north fold in lbc_nfd
19   !!            3.2  !  2009  (O. Marti)    add mpp_ini_znl
20   !!            4.0  !  2011  (G. Madec)  move ctl_ routines from in_out_manager
21   !!            3.5  !  2012  (S.Mocavero, I. Epicoco) Add mpp_lnk_bdy_3d/2d routines to optimize the BDY comm.
22   !!            3.5  !  2013  (C. Ethe, G. Madec)  message passing arrays as local variables
23   !!            3.5  !  2013  (S.Mocavero, I.Epicoco - CMCC) north fold optimizations
24   !!            3.6  !  2015  (O. Tintó and M. Castrillo - BSC) Added '_multiple' case for 2D lbc and max
25   !!            4.0  !  2017  (G. Madec) automatique allocation of array argument (use any 3rd dimension)
26   !!             -   !  2017  (G. Madec) create generic.h90 files to generate all lbc and north fold routines
27   !!----------------------------------------------------------------------
28
29   !!----------------------------------------------------------------------
30   !!   ctl_stop      : update momentum and tracer Kz from a tke scheme
31   !!   ctl_warn      : initialization, namelist read, and parameters control
32   !!   ctl_opn       : Open file and check if required file is available.
33   !!   ctl_nam       : Prints informations when an error occurs while reading a namelist
34   !!   get_unit      : give the index of an unused logical unit
35   !!----------------------------------------------------------------------
36#if   defined key_mpp_mpi
37   !!----------------------------------------------------------------------
38   !!   'key_mpp_mpi'             MPI massively parallel processing library
39   !!----------------------------------------------------------------------
40   !!   lib_mpp_alloc : allocate mpp arrays
41   !!   mynode        : indentify the processor unit
42   !!   mpp_lnk       : interface (defined in lbclnk) for message passing of 2d or 3d arrays (mpp_lnk_2d, mpp_lnk_3d)
43   !!   mpp_lnk_icb   : interface for message passing of 2d arrays with extra halo for icebergs (mpp_lnk_2d_icb)
44   !!   mpprecv       :
45   !!   mppsend       :
46   !!   mppscatter    :
47   !!   mppgather     :
48   !!   mpp_min       : generic interface for mppmin_int , mppmin_a_int , mppmin_real, mppmin_a_real
49   !!   mpp_max       : generic interface for mppmax_int , mppmax_a_int , mppmax_real, mppmax_a_real
50   !!   mpp_sum       : generic interface for mppsum_int , mppsum_a_int , mppsum_real, mppsum_a_real
51   !!   mpp_minloc    :
52   !!   mpp_maxloc    :
53   !!   mppsync       :
54   !!   mppstop       :
55   !!   mpp_ini_north : initialisation of north fold
56   !!   mpp_lbc_north_icb : alternative to mpp_nfd for extra outer halo with icebergs
57   !!----------------------------------------------------------------------
58   USE dom_oce        ! ocean space and time domain
59   USE lbcnfd         ! north fold treatment
60   USE in_out_manager ! I/O manager
61
62   IMPLICIT NONE
63   PRIVATE
64
65   INTERFACE mpp_nfd
66      MODULE PROCEDURE   mpp_nfd_2d      , mpp_nfd_3d      , mpp_nfd_4d
67      MODULE PROCEDURE   mpp_nfd_2d_ptr, mpp_nfd_3d_ptr, mpp_nfd_4d_ptr
68   END INTERFACE
69
70   ! Interface associated to the mpp_lnk_... routines is defined in lbclnk
71   PUBLIC   mpp_lnk_2d      , mpp_lnk_3d      , mpp_lnk_4d
72   PUBLIC   mpp_lnk_2d_ptr, mpp_lnk_3d_ptr, mpp_lnk_4d_ptr
73   !
74#ifdef BULL_ASYNC
75   ! Interface associated to the mpp_lnk_... routines is defined in lbclnk
76   PUBLIC   mpp_lnk_2d_async    , mpp_lnk_3d_async    , mpp_lnk_4d_async
77   PUBLIC   mpp_lnk_2d_ptr_async, mpp_lnk_3d_ptr_async, mpp_lnk_4d_ptr_async
78#endif
79!!gm  this should be useless
80   PUBLIC   mpp_nfd_2d    , mpp_nfd_3d    , mpp_nfd_4d
81   PUBLIC   mpp_nfd_2d_ptr, mpp_nfd_3d_ptr, mpp_nfd_4d_ptr
82!!gm end
83   !
84   PUBLIC   ctl_stop, ctl_warn, get_unit, ctl_opn, ctl_nam
85   PUBLIC   mynode, mppstop, mppsync, mpp_comm_free
86   PUBLIC   mpp_ini_north
87   PUBLIC   mpp_lnk_2d_icb
88   PUBLIC   mpp_lbc_north_icb
89   PUBLIC   mpp_min, mpp_max, mpp_sum, mpp_minloc, mpp_maxloc
90   PUBLIC   mpp_max_multiple
91   PUBLIC   mppscatter, mppgather
92   PUBLIC   mpp_ini_ice, mpp_ini_znl
93   PUBLIC   mppsize
94   PUBLIC   mppsend, mpprecv                          ! needed by TAM and ICB routines
95   PUBLIC   mpp_lnk_bdy_2d, mpp_lnk_bdy_3d
96   PUBLIC   mpprank
97   
98   !! * Interfaces
99   !! define generic interface for these routine as they are called sometimes
100   !! with scalar arguments instead of array arguments, which causes problems
101   !! for the compilation on AIX system as well as NEC and SGI. Ok on COMPACQ
102   INTERFACE mpp_min
103      MODULE PROCEDURE mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real
104   END INTERFACE
105   INTERFACE mpp_max
106      MODULE PROCEDURE mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real
107   END INTERFACE
108   INTERFACE mpp_sum
109      MODULE PROCEDURE mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real,   &
110         &             mppsum_realdd, mppsum_a_realdd
111   END INTERFACE
112   INTERFACE mpp_minloc
113      MODULE PROCEDURE mpp_minloc2d ,mpp_minloc3d
114   END INTERFACE
115   INTERFACE mpp_maxloc
116      MODULE PROCEDURE mpp_maxloc2d ,mpp_maxloc3d
117   END INTERFACE
118   INTERFACE mpp_max_multiple
119      MODULE PROCEDURE mppmax_real_multiple
120   END INTERFACE
121
122   !! ========================= !!
123   !!  MPI  variable definition !!
124   !! ========================= !!
125!$AGRIF_DO_NOT_TREAT
126   INCLUDE 'mpif.h'
127!$AGRIF_END_DO_NOT_TREAT
128
129   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .TRUE.    !: mpp flag
130
131   INTEGER, PARAMETER         ::   nprocmax = 2**10   ! maximun dimension (required to be a power of 2)
132
133   INTEGER ::   mppsize        ! number of process
134   INTEGER ::   mpprank        ! process number  [ 0 - size-1 ]
135!$AGRIF_DO_NOT_TREAT
136   INTEGER, PUBLIC ::   mpi_comm_oce   ! opa local communicator
137!$AGRIF_END_DO_NOT_TREAT
138
139   INTEGER :: MPI_SUMDD
140
141   ! variables used in case of sea-ice
142   INTEGER, PUBLIC ::   ncomm_ice       !: communicator made by the processors with sea-ice (public so that it can be freed in icethd)
143   INTEGER         ::   ngrp_iworld     !  group ID for the world processors (for rheology)
144   INTEGER         ::   ngrp_ice        !  group ID for the ice processors (for rheology)
145   INTEGER         ::   ndim_rank_ice   !  number of 'ice' processors
146   INTEGER         ::   n_ice_root      !  number (in the comm_ice) of proc 0 in the ice comm
147   INTEGER, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_ice     ! dimension ndim_rank_ice
148
149   ! variables used for zonal integration
150   INTEGER, PUBLIC ::   ncomm_znl       !: communicator made by the processors on the same zonal average
151   LOGICAL, PUBLIC ::   l_znl_root      !: True on the 'left'most processor on the same row
152   INTEGER         ::   ngrp_znl        !  group ID for the znl processors
153   INTEGER         ::   ndim_rank_znl   !  number of processors on the same zonal average
154   INTEGER, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_znl  ! dimension ndim_rank_znl, number of the procs into the same znl domain
155
156   ! North fold condition in mpp_mpi with jpni > 1 (PUBLIC for TAM)
157   INTEGER, PUBLIC ::   ngrp_world        !: group ID for the world processors
158   INTEGER, PUBLIC ::   ngrp_opa          !: group ID for the opa processors
159   INTEGER, PUBLIC ::   ngrp_north        !: group ID for the northern processors (to be fold)
160   INTEGER, PUBLIC ::   ncomm_north       !: communicator made by the processors belonging to ngrp_north
161   INTEGER, PUBLIC ::   ndim_rank_north   !: number of 'sea' processor in the northern line (can be /= jpni !)
162   INTEGER, PUBLIC ::   njmppmax          !: value of njmpp for the processors of the northern line
163   INTEGER, PUBLIC ::   north_root        !: number (in the comm_opa) of proc 0 in the northern comm
164   INTEGER, PUBLIC, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_north   !: dimension ndim_rank_north
165
166   ! Type of send : standard, buffered, immediate
167   CHARACTER(len=1), PUBLIC ::   cn_mpi_send            !: type od mpi send/recieve (S=standard, B=bsend, I=isend)
168   LOGICAL         , PUBLIC ::   l_isend = .FALSE.      !: isend use indicator (T if cn_mpi_send='I')
169   INTEGER         , PUBLIC ::   nn_buffer              !: size of the buffer in case of mpi_bsend
170   INTEGER         , PUBLIC ::   nn_comm_mod = 1         !: frequency of communication calls (1 = every time step)
171   LOGICAL         , PUBLIC ::   ln_comm_only = .FALSE.  !: replace step routine by realistic communications only
172 
173   INTEGER         , PUBLIC ::   ncom_stp = 0                 !: copy of time step # istp
174   INTEGER         , PUBLIC ::   icomm_sequence(500,2) = 0    !: size of communicated arrays (halos)
175   INTEGER         , PUBLIC ::   n_sequence = 0               !: # of communicated arrays
176 
177
178   REAL(wp), DIMENSION(:), ALLOCATABLE, SAVE ::   tampon   ! buffer in case of bsend
179
180   LOGICAL, PUBLIC ::   ln_nnogather                !: namelist control of northfold comms
181   LOGICAL, PUBLIC ::   l_north_nogather = .FALSE.  !: internal control of northfold comms
182
183   LOGICAL ::           l_print_comm_report = .TRUE. 
184
185   !!----------------------------------------------------------------------
186   !! NEMO/OCE 4.0 , NEMO Consortium (2018)
187   !! $Id$
188   !! Software governed by the CeCILL licence     (./LICENSE)
189   !!----------------------------------------------------------------------
190CONTAINS
191
192   FUNCTION mynode( ldtxt, ldname, kumnam_ref, kumnam_cfg, kumond, kstop, localComm )
193      !!----------------------------------------------------------------------
194      !!                  ***  routine mynode  ***
195      !!
196      !! ** Purpose :   Find processor unit
197      !!----------------------------------------------------------------------
198      CHARACTER(len=*),DIMENSION(:), INTENT(  out) ::   ldtxt        !
199      CHARACTER(len=*)             , INTENT(in   ) ::   ldname       !
200      INTEGER                      , INTENT(in   ) ::   kumnam_ref   ! logical unit for reference namelist
201      INTEGER                      , INTENT(in   ) ::   kumnam_cfg   ! logical unit for configuration namelist
202      INTEGER                      , INTENT(inout) ::   kumond       ! logical unit for namelist output
203      INTEGER                      , INTENT(inout) ::   kstop        ! stop indicator
204      INTEGER         , OPTIONAL   , INTENT(in   ) ::   localComm    !
205      !
206      INTEGER ::   mynode, ierr, code, ji, ii, ios
207      LOGICAL ::   mpi_was_called
208      !
209      NAMELIST/nammpp/ cn_mpi_send, nn_buffer, jpni, jpnj, jpnij, ln_nnogather, nn_comm_mod, ln_comm_only
210      !!----------------------------------------------------------------------
211      !
212      ii = 1
213      WRITE(ldtxt(ii),*)                                                                  ;   ii = ii + 1
214      WRITE(ldtxt(ii),*) 'mynode : mpi initialisation'                                    ;   ii = ii + 1
215      WRITE(ldtxt(ii),*) '~~~~~~ '                                                        ;   ii = ii + 1
216      !
217      REWIND( kumnam_ref )              ! Namelist nammpp in reference namelist: mpi variables
218      READ  ( kumnam_ref, nammpp, IOSTAT = ios, ERR = 901)
219901   IF( ios /= 0 )   CALL ctl_nam ( ios , 'nammpp in reference namelist', lwp )
220      !
221      REWIND( kumnam_cfg )              ! Namelist nammpp in configuration namelist: mpi variables
222      READ  ( kumnam_cfg, nammpp, IOSTAT = ios, ERR = 902 )
223902   IF( ios >  0 )   CALL ctl_nam ( ios , 'nammpp in configuration namelist', lwp )
224      !
225      !                              ! control print
226      WRITE(ldtxt(ii),*) '   Namelist nammpp'                                             ;   ii = ii + 1
227      WRITE(ldtxt(ii),*) '      mpi send type          cn_mpi_send = ', cn_mpi_send       ;   ii = ii + 1
228      WRITE(ldtxt(ii),*) '      size exported buffer   nn_buffer   = ', nn_buffer,' bytes';   ii = ii + 1
229      !
230#if defined key_agrif
231      IF( .NOT. Agrif_Root() ) THEN
232         jpni  = Agrif_Parent(jpni )
233         jpnj  = Agrif_Parent(jpnj )
234         jpnij = Agrif_Parent(jpnij)
235      ENDIF
236#endif
237      !
238      IF( jpnij < 1 ) THEN         ! If jpnij is not specified in namelist then we calculate it
239         jpnij = jpni * jpnj       ! this means there will be no land cutting out.
240      ENDIF
241
242      IF( jpni < 1 .OR. jpnj < 1  ) THEN
243         WRITE(ldtxt(ii),*) '      jpni, jpnj and jpnij will be calculated automatically' ;   ii = ii + 1
244      ELSE
245         WRITE(ldtxt(ii),*) '      processor grid extent in i         jpni = ',jpni       ;   ii = ii + 1
246         WRITE(ldtxt(ii),*) '      processor grid extent in j         jpnj = ',jpnj       ;   ii = ii + 1
247         WRITE(ldtxt(ii),*) '      number of local domains           jpnij = ',jpnij      ;   ii = ii + 1
248      ENDIF
249
250      WRITE(ldtxt(ii),*) '      avoid use of mpi_allgather at the north fold  ln_nnogather = ', ln_nnogather  ; ii = ii + 1
251
252      CALL mpi_initialized ( mpi_was_called, code )
253      IF( code /= MPI_SUCCESS ) THEN
254         DO ji = 1, SIZE(ldtxt)
255            IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
256         END DO
257         WRITE(*, cform_err)
258         WRITE(*, *) 'lib_mpp: Error in routine mpi_initialized'
259         CALL mpi_abort( mpi_comm_world, code, ierr )
260      ENDIF
261
262      IF( mpi_was_called ) THEN
263         !
264         SELECT CASE ( cn_mpi_send )
265         CASE ( 'S' )                ! Standard mpi send (blocking)
266            WRITE(ldtxt(ii),*) '           Standard blocking mpi send (send)'             ;   ii = ii + 1
267         CASE ( 'B' )                ! Buffer mpi send (blocking)
268            WRITE(ldtxt(ii),*) '           Buffer blocking mpi send (bsend)'              ;   ii = ii + 1
269            IF( Agrif_Root() )   CALL mpi_init_oce( ldtxt, ii, ierr )
270         CASE ( 'I' )                ! Immediate mpi send (non-blocking send)
271            WRITE(ldtxt(ii),*) '           Immediate non-blocking send (isend)'           ;   ii = ii + 1
272            l_isend = .TRUE.
273         CASE DEFAULT
274            WRITE(ldtxt(ii),cform_err)                                                    ;   ii = ii + 1
275            WRITE(ldtxt(ii),*) '           bad value for cn_mpi_send = ', cn_mpi_send     ;   ii = ii + 1
276            kstop = kstop + 1
277            WRITE(*,*) '           bad value for cn_mpi_send = ', cn_mpi_send     ;   ii = ii + 1
278         END SELECT
279         !
280      ELSEIF ( PRESENT(localComm) .AND. .NOT. mpi_was_called ) THEN
281         WRITE(ldtxt(ii),*) ' lib_mpp: You cannot provide a local communicator '          ;   ii = ii + 1
282         WRITE(ldtxt(ii),*) '          without calling MPI_Init before ! '                ;   ii = ii + 1
283         kstop = kstop + 1
284         WRITE(*,*) ' lib_mpp: You cannot provide a local communicator '          ;   ii = ii + 1
285      ELSE
286         SELECT CASE ( cn_mpi_send )
287         CASE ( 'S' )                ! Standard mpi send (blocking)
288            WRITE(ldtxt(ii),*) '           Standard blocking mpi send (send)'             ;   ii = ii + 1
289            CALL mpi_init( ierr )
290         CASE ( 'B' )                ! Buffer mpi send (blocking)
291            WRITE(ldtxt(ii),*) '           Buffer blocking mpi send (bsend)'              ;   ii = ii + 1
292            IF( Agrif_Root() )   CALL mpi_init_oce( ldtxt, ii, ierr )
293         CASE ( 'I' )                ! Immediate mpi send (non-blocking send)
294            WRITE(ldtxt(ii),*) '           Immediate non-blocking send (isend)'           ;   ii = ii + 1
295            l_isend = .TRUE.
296            CALL mpi_init( ierr )
297         CASE DEFAULT
298            WRITE(ldtxt(ii),cform_err)                                                    ;   ii = ii + 1
299            WRITE(ldtxt(ii),*) '           bad value for cn_mpi_send = ', cn_mpi_send     ;   ii = ii + 1
300            kstop = kstop + 1
301            WRITE(*,*) '           bad value for cn_mpi_send = ', cn_mpi_send     ;   ii = ii + 1
302         END SELECT
303         !
304      ENDIF
305
306      IF( PRESENT(localComm) ) THEN
307         IF( Agrif_Root() ) THEN
308            mpi_comm_oce = localComm
309         ENDIF
310      ELSE
311         CALL mpi_comm_dup( mpi_comm_world, mpi_comm_oce, code)
312         IF( code /= MPI_SUCCESS ) THEN
313            DO ji = 1, SIZE(ldtxt)
314               IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
315            END DO
316            WRITE(*, cform_err)
317            WRITE(*, *) ' lib_mpp: Error in routine mpi_comm_dup'
318            CALL mpi_abort( mpi_comm_world, code, ierr )
319         ENDIF
320      ENDIF
321
322#if defined key_agrif
323      IF( Agrif_Root() ) THEN
324         CALL Agrif_MPI_Init(mpi_comm_oce)
325      ELSE
326         CALL Agrif_MPI_set_grid_comm(mpi_comm_oce)
327      ENDIF
328#endif
329
330      CALL mpi_comm_rank( mpi_comm_oce, mpprank, ierr )
331      CALL mpi_comm_size( mpi_comm_oce, mppsize, ierr )
332      mynode = mpprank
333
334      IF( mynode == 0 ) THEN
335         CALL ctl_opn( kumond, TRIM(ldname), 'UNKNOWN', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. , 1 )
336         WRITE(kumond, nammpp)     
337      ENDIF
338      !
339      CALL MPI_OP_CREATE(DDPDD_MPI, .TRUE., MPI_SUMDD, ierr)
340      !
341   END FUNCTION mynode
342
343   !!----------------------------------------------------------------------
344   !!                   ***  routine mpp_lnk_(2,3,4)d  ***
345   !!
346   !!   * Argument : dummy argument use in mpp_lnk_... routines
347   !!                ptab   :   array or pointer of arrays on which the boundary condition is applied
348   !!                cd_nat :   nature of array grid-points
349   !!                psgn   :   sign used across the north fold boundary
350   !!                kfld   :   optional, number of pt3d arrays
351   !!                cd_mpp :   optional, fill the overlap area only
352   !!                pval   :   optional, background value (used at closed boundaries)
353   !!----------------------------------------------------------------------
354   !
355   !                       !==  2D array and array of 2D pointer  ==!
356   !
357#  define DIM_2d
358#     define ROUTINE_LNK           mpp_lnk_2d
359#     include "mpp_lnk_generic.h90"
360#     undef ROUTINE_LNK
361#     define MULTI
362#     define ROUTINE_LNK           mpp_lnk_2d_ptr
363#     include "mpp_lnk_generic.h90"
364#     undef ROUTINE_LNK
365#     undef MULTI
366#  undef DIM_2d
367   !
368   !                       !==  3D array and array of 3D pointer  ==!
369   !
370#  define DIM_3d
371#     define ROUTINE_LNK           mpp_lnk_3d
372#     include "mpp_lnk_generic.h90"
373#     undef ROUTINE_LNK
374#     define MULTI
375#     define ROUTINE_LNK           mpp_lnk_3d_ptr
376#     include "mpp_lnk_generic.h90"
377#     undef ROUTINE_LNK
378#     undef MULTI
379#  undef DIM_3d
380   !
381   !                       !==  4D array and array of 4D pointer  ==!
382   !
383#  define DIM_4d
384#     define ROUTINE_LNK           mpp_lnk_4d
385#     include "mpp_lnk_generic.h90"
386#     undef ROUTINE_LNK
387#     define MULTI
388#     define ROUTINE_LNK           mpp_lnk_4d_ptr
389#     include "mpp_lnk_generic.h90"
390#     undef ROUTINE_LNK
391#     undef MULTI
392#  undef DIM_4d
393
394#ifdef BULL_ASYNC
395# define ASYNC
396
397#if (defined BULL_MPI_DATATYPE || defined BULL_MPI_DATATYPE_SUBARRAY)
398# define MPI_DATATYPE_SUBARRAY
399#warning "MPI_DATATYPE_SUBARRAY"
400#ifdef BULL_MPI_DATATYPE_VECTOR
401#undef MPI_DATATYPE_VECTOR
402#warning "BULL_MPI_DATATYPE_SUBARRAY and BULL_MPI_DATATYPE_VECTOR are defined: undef BULL_MPI_DATATYPE_VECTOR"
403#endif
404#endif
405#ifdef BULL_MPI_DATATYPE_VECTOR
406# define MPI_DATATYPE_VECTOR
407#warning "MPI_DATATYPE_VECTOR"
408#endif
409   !!----------------------------------------------------------------------
410   !!                   ***  routine mpp_lnk_(2,3,4)d  ***
411   !!
412   !!   * Argument : dummy argument use in mpp_lnk_... routines
413   !!                ptab   :   array or pointer of arrays on which the boundary condition is applied
414   !!                cd_nat :   nature of array grid-points
415   !!                psgn   :   sign used across the north fold boundary
416   !!                kfld   :   optional, number of pt3d arrays
417   !!                cd_mpp :   optional, fill the overlap area only
418   !!                pval   :   optional, background value (used at closed boundaries)
419   !!----------------------------------------------------------------------
420   !
421   !                       !==  2D array and array of 2D pointer  ==!
422   !
423#  define DIM_2d
424#     define ROUTINE_LNK           mpp_lnk_2d_async
425#     include "mpp_lnk_generic.h90"
426#     undef ROUTINE_LNK
427#     define MULTI
428#     define ROUTINE_LNK           mpp_lnk_2d_ptr_async
429#     include "mpp_lnk_generic.h90"
430#     undef ROUTINE_LNK
431#     undef MULTI
432#  undef DIM_2d
433   !
434   !                       !==  3D array and array of 3D pointer  ==!
435   !
436#  define DIM_3d
437#     define ROUTINE_LNK           mpp_lnk_3d_async
438#     include "mpp_lnk_generic.h90"
439#     undef ROUTINE_LNK
440#     define MULTI
441#     define ROUTINE_LNK           mpp_lnk_3d_ptr_async
442#     include "mpp_lnk_generic.h90"
443#     undef ROUTINE_LNK
444#     undef MULTI
445#  undef DIM_3d
446   !
447   !                       !==  4D array and array of 4D pointer  ==!
448   !
449#  define DIM_4d
450#     define ROUTINE_LNK           mpp_lnk_4d_async
451#     include "mpp_lnk_generic.h90"
452#     undef ROUTINE_LNK
453#     define MULTI
454#     define ROUTINE_LNK           mpp_lnk_4d_ptr_async
455#     include "mpp_lnk_generic.h90"
456#     undef ROUTINE_LNK
457#     undef MULTI
458#  undef DIM_4d
459
460#undef ASYNC
461#undef MPI_DATATYPE
462#endif
463
464   !!----------------------------------------------------------------------
465   !!                   ***  routine mpp_nfd_(2,3,4)d  ***
466   !!
467   !!   * Argument : dummy argument use in mpp_nfd_... routines
468   !!                ptab   :   array or pointer of arrays on which the boundary condition is applied
469   !!                cd_nat :   nature of array grid-points
470   !!                psgn   :   sign used across the north fold boundary
471   !!                kfld   :   optional, number of pt3d arrays
472   !!                cd_mpp :   optional, fill the overlap area only
473   !!                pval   :   optional, background value (used at closed boundaries)
474   !!----------------------------------------------------------------------
475   !
476   !                       !==  2D array and array of 2D pointer  ==!
477   !
478#  define DIM_2d
479#     define ROUTINE_NFD           mpp_nfd_2d
480#     include "mpp_nfd_generic.h90"
481#     undef ROUTINE_NFD
482#     define MULTI
483#     define ROUTINE_NFD           mpp_nfd_2d_ptr
484#     include "mpp_nfd_generic.h90"
485#     undef ROUTINE_NFD
486#     undef MULTI
487#  undef DIM_2d
488   !
489   !                       !==  3D array and array of 3D pointer  ==!
490   !
491#  define DIM_3d
492#     define ROUTINE_NFD           mpp_nfd_3d
493#     include "mpp_nfd_generic.h90"
494#     undef ROUTINE_NFD
495#     define MULTI
496#     define ROUTINE_NFD           mpp_nfd_3d_ptr
497#     include "mpp_nfd_generic.h90"
498#     undef ROUTINE_NFD
499#     undef MULTI
500#  undef DIM_3d
501   !
502   !                       !==  4D array and array of 4D pointer  ==!
503   !
504#  define DIM_4d
505#     define ROUTINE_NFD           mpp_nfd_4d
506#     include "mpp_nfd_generic.h90"
507#     undef ROUTINE_NFD
508#     define MULTI
509#     define ROUTINE_NFD           mpp_nfd_4d_ptr
510#     include "mpp_nfd_generic.h90"
511#     undef ROUTINE_NFD
512#     undef MULTI
513#  undef DIM_4d
514
515
516   !!----------------------------------------------------------------------
517   !!                   ***  routine mpp_lnk_bdy_(2,3,4)d  ***
518   !!
519   !!   * Argument : dummy argument use in mpp_lnk_... routines
520   !!                ptab   :   array or pointer of arrays on which the boundary condition is applied
521   !!                cd_nat :   nature of array grid-points
522   !!                psgn   :   sign used across the north fold boundary
523   !!                kb_bdy :   BDY boundary set
524   !!                kfld   :   optional, number of pt3d arrays
525   !!----------------------------------------------------------------------
526   !
527   !                       !==  2D array and array of 2D pointer  ==!
528   !
529#  define DIM_2d
530#     define ROUTINE_BDY           mpp_lnk_bdy_2d
531#     include "mpp_bdy_generic.h90"
532#     undef ROUTINE_BDY
533#  undef DIM_2d
534   !
535   !                       !==  3D array and array of 3D pointer  ==!
536   !
537#  define DIM_3d
538#     define ROUTINE_BDY           mpp_lnk_bdy_3d
539#     include "mpp_bdy_generic.h90"
540#     undef ROUTINE_BDY
541#  undef DIM_3d
542   !
543   !                       !==  4D array and array of 4D pointer  ==!
544   !
545!!#  define DIM_4d
546!!#     define ROUTINE_BDY           mpp_lnk_bdy_4d
547!!#     include "mpp_bdy_generic.h90"
548!!#     undef ROUTINE_BDY
549!!#  undef DIM_4d
550
551   !!----------------------------------------------------------------------
552   !!
553   !!   load_array  &   mpp_lnk_2d_9    à generaliser a 3D et 4D
554   
555   
556   !!    mpp_lnk_sum_2d et 3D   ====>>>>>>   à virer du code !!!!
557   
558   
559   !!----------------------------------------------------------------------
560
561
562
563   SUBROUTINE mppsend( ktyp, pmess, kbytes, kdest, md_req )
564      !!----------------------------------------------------------------------
565      !!                  ***  routine mppsend  ***
566      !!
567      !! ** Purpose :   Send messag passing array
568      !!
569      !!----------------------------------------------------------------------
570      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
571      INTEGER , INTENT(in   ) ::   kbytes     ! size of the array pmess
572      INTEGER , INTENT(in   ) ::   kdest      ! receive process number
573      INTEGER , INTENT(in   ) ::   ktyp       ! tag of the message
574      INTEGER , INTENT(in   ) ::   md_req     ! argument for isend
575      !!
576      INTEGER ::   iflag
577      !!----------------------------------------------------------------------
578      !
579      SELECT CASE ( cn_mpi_send )
580      CASE ( 'S' )                ! Standard mpi send (blocking)
581         CALL mpi_send ( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce        , iflag )
582      CASE ( 'B' )                ! Buffer mpi send (blocking)
583         CALL mpi_bsend( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce        , iflag )
584      CASE ( 'I' )                ! Immediate mpi send (non-blocking send)
585         ! be carefull, one more argument here : the mpi request identifier..
586         CALL mpi_isend( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce, md_req, iflag )
587      END SELECT
588      !
589   END SUBROUTINE mppsend
590
591
592   SUBROUTINE mpprecv( ktyp, pmess, kbytes, ksource )
593      !!----------------------------------------------------------------------
594      !!                  ***  routine mpprecv  ***
595      !!
596      !! ** Purpose :   Receive messag passing array
597      !!
598      !!----------------------------------------------------------------------
599      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
600      INTEGER , INTENT(in   ) ::   kbytes     ! suze of the array pmess
601      INTEGER , INTENT(in   ) ::   ktyp       ! Tag of the recevied message
602      INTEGER, OPTIONAL, INTENT(in) :: ksource    ! source process number
603      !!
604      INTEGER :: istatus(mpi_status_size)
605      INTEGER :: iflag
606      INTEGER :: use_source
607      !!----------------------------------------------------------------------
608      !
609      ! If a specific process number has been passed to the receive call,
610      ! use that one. Default is to use mpi_any_source
611      use_source = mpi_any_source
612      IF( PRESENT(ksource) )   use_source = ksource
613      !
614      CALL mpi_recv( pmess, kbytes, mpi_double_precision, use_source, ktyp, mpi_comm_oce, istatus, iflag )
615      !
616   END SUBROUTINE mpprecv
617
618
619   SUBROUTINE mppgather( ptab, kp, pio )
620      !!----------------------------------------------------------------------
621      !!                   ***  routine mppgather  ***
622      !!
623      !! ** Purpose :   Transfert between a local subdomain array and a work
624      !!     array which is distributed following the vertical level.
625      !!
626      !!----------------------------------------------------------------------
627      REAL(wp), DIMENSION(jpi,jpj)      , INTENT(in   ) ::   ptab   ! subdomain input array
628      INTEGER                           , INTENT(in   ) ::   kp     ! record length
629      REAL(wp), DIMENSION(jpi,jpj,jpnij), INTENT(  out) ::   pio    ! subdomain input array
630      !!
631      INTEGER :: itaille, ierror   ! temporary integer
632      !!---------------------------------------------------------------------
633      !
634      itaille = jpi * jpj
635      CALL mpi_gather( ptab, itaille, mpi_double_precision, pio, itaille     ,   &
636         &                            mpi_double_precision, kp , mpi_comm_oce, ierror )
637      !
638   END SUBROUTINE mppgather
639
640
641   SUBROUTINE mppscatter( pio, kp, ptab )
642      !!----------------------------------------------------------------------
643      !!                  ***  routine mppscatter  ***
644      !!
645      !! ** Purpose :   Transfert between awork array which is distributed
646      !!      following the vertical level and the local subdomain array.
647      !!
648      !!----------------------------------------------------------------------
649      REAL(wp), DIMENSION(jpi,jpj,jpnij)  ::   pio    ! output array
650      INTEGER                             ::   kp     ! Tag (not used with MPI
651      REAL(wp), DIMENSION(jpi,jpj)        ::   ptab   ! subdomain array input
652      !!
653      INTEGER :: itaille, ierror   ! temporary integer
654      !!---------------------------------------------------------------------
655      !
656      itaille = jpi * jpj
657      !
658      CALL mpi_scatter( pio, itaille, mpi_double_precision, ptab, itaille     ,   &
659         &                            mpi_double_precision, kp  , mpi_comm_oce, ierror )
660      !
661   END SUBROUTINE mppscatter
662
663   !!----------------------------------------------------------------------
664   !!    ***  mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real  ***
665   !!   
666   !!----------------------------------------------------------------------
667   !!
668   SUBROUTINE mppmax_a_int( ktab, kdim, kcom )
669      !!----------------------------------------------------------------------
670      INTEGER , INTENT(in   )                  ::   kdim   ! size of array
671      INTEGER , INTENT(inout), DIMENSION(kdim) ::   ktab   ! input array
672      INTEGER , INTENT(in   ), OPTIONAL        ::   kcom   !
673      INTEGER :: ierror, ilocalcomm   ! temporary integer
674      INTEGER, DIMENSION(kdim) ::   iwork
675      !!----------------------------------------------------------------------
676      ilocalcomm = mpi_comm_oce
677      IF( PRESENT(kcom) )   ilocalcomm = kcom
678      CALL mpi_allreduce( ktab, iwork, kdim, mpi_integer, mpi_max, ilocalcomm, ierror )
679      ktab(:) = iwork(:)
680   END SUBROUTINE mppmax_a_int
681   !!
682   SUBROUTINE mppmax_int( ktab, kcom )
683      !!----------------------------------------------------------------------
684      INTEGER, INTENT(inout)           ::   ktab   ! ???
685      INTEGER, INTENT(in   ), OPTIONAL ::   kcom   ! ???
686      INTEGER ::   ierror, iwork, ilocalcomm   ! temporary integer
687      !!----------------------------------------------------------------------
688      ilocalcomm = mpi_comm_oce
689      IF( PRESENT(kcom) )   ilocalcomm = kcom
690      CALL mpi_allreduce( ktab, iwork, 1, mpi_integer, mpi_max, ilocalcomm, ierror )
691      ktab = iwork
692   END SUBROUTINE mppmax_int
693   !!
694   SUBROUTINE mppmax_a_real( ptab, kdim, kcom )
695      !!----------------------------------------------------------------------
696      REAL(wp), DIMENSION(kdim), INTENT(inout) ::   ptab
697      INTEGER                  , INTENT(in   ) ::   kdim
698      INTEGER , OPTIONAL       , INTENT(in   ) ::   kcom
699      INTEGER :: ierror, ilocalcomm
700      REAL(wp), DIMENSION(kdim) ::  zwork
701      !!----------------------------------------------------------------------
702      ilocalcomm = mpi_comm_oce
703      IF( PRESENT(kcom) )   ilocalcomm = kcom
704      CALL mpi_allreduce( ptab, zwork, kdim, mpi_double_precision, mpi_max, ilocalcomm, ierror )
705      ptab(:) = zwork(:)
706   END SUBROUTINE mppmax_a_real
707   !!
708   SUBROUTINE mppmax_real( ptab, kcom )
709      !!----------------------------------------------------------------------
710      REAL(wp), INTENT(inout)           ::   ptab   ! ???
711      INTEGER , INTENT(in   ), OPTIONAL ::   kcom   ! ???
712      INTEGER  ::   ierror, ilocalcomm
713      REAL(wp) ::   zwork
714      !!----------------------------------------------------------------------
715      ilocalcomm = mpi_comm_oce
716      IF( PRESENT(kcom) )   ilocalcomm = kcom!
717      CALL mpi_allreduce( ptab, zwork, 1, mpi_double_precision, mpi_max, ilocalcomm, ierror )
718      ptab = zwork
719   END SUBROUTINE mppmax_real
720
721
722   !!----------------------------------------------------------------------
723   !!    ***  mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real  ***
724   !!   
725   !!----------------------------------------------------------------------
726   !!
727   SUBROUTINE mppmin_a_int( ktab, kdim, kcom )
728      !!----------------------------------------------------------------------
729      INTEGER , INTENT( in  )                  ::   kdim   ! size of array
730      INTEGER , INTENT(inout), DIMENSION(kdim) ::   ktab   ! input array
731      INTEGER , INTENT( in  ), OPTIONAL        ::   kcom   ! input array
732      !!
733      INTEGER ::   ierror, ilocalcomm   ! temporary integer
734      INTEGER, DIMENSION(kdim) ::   iwork
735      !!----------------------------------------------------------------------
736      ilocalcomm = mpi_comm_oce
737      IF( PRESENT(kcom) )   ilocalcomm = kcom
738      CALL mpi_allreduce( ktab, iwork, kdim, mpi_integer, mpi_min, ilocalcomm, ierror )
739      ktab(:) = iwork(:)
740   END SUBROUTINE mppmin_a_int
741   !!
742   SUBROUTINE mppmin_int( ktab, kcom )
743      !!----------------------------------------------------------------------
744      INTEGER, INTENT(inout) ::   ktab      ! ???
745      INTEGER , INTENT( in  ), OPTIONAL        ::   kcom        ! input array
746      !!
747      INTEGER ::  ierror, iwork, ilocalcomm
748      !!----------------------------------------------------------------------
749      ilocalcomm = mpi_comm_oce
750      IF( PRESENT(kcom) )   ilocalcomm = kcom
751      CALL mpi_allreduce( ktab, iwork, 1, mpi_integer, mpi_min, ilocalcomm, ierror )
752      ktab = iwork
753   END SUBROUTINE mppmin_int
754   !!
755   SUBROUTINE mppmin_a_real( ptab, kdim, kcom )
756      !!----------------------------------------------------------------------
757      INTEGER , INTENT(in   )                  ::   kdim
758      REAL(wp), INTENT(inout), DIMENSION(kdim) ::   ptab
759      INTEGER , INTENT(in   ), OPTIONAL        ::   kcom
760      INTEGER :: ierror, ilocalcomm
761      REAL(wp), DIMENSION(kdim) ::   zwork
762      !!-----------------------------------------------------------------------
763      ilocalcomm = mpi_comm_oce
764      IF( PRESENT(kcom) )   ilocalcomm = kcom
765      CALL mpi_allreduce( ptab, zwork, kdim, mpi_double_precision, mpi_min, ilocalcomm, ierror )
766      ptab(:) = zwork(:)
767   END SUBROUTINE mppmin_a_real
768   !!
769   SUBROUTINE mppmin_real( ptab, kcom )
770      !!-----------------------------------------------------------------------
771      REAL(wp), INTENT(inout)           ::   ptab        !
772      INTEGER , INTENT(in   ), OPTIONAL :: kcom
773      INTEGER  ::   ierror, ilocalcomm
774      REAL(wp) ::   zwork
775      !!-----------------------------------------------------------------------
776      ilocalcomm = mpi_comm_oce
777      IF( PRESENT(kcom) )   ilocalcomm = kcom
778      CALL mpi_allreduce( ptab, zwork, 1, mpi_double_precision, mpi_min, ilocalcomm, ierror )
779      ptab = zwork
780   END SUBROUTINE mppmin_real
781
782
783   !!----------------------------------------------------------------------
784   !!    ***  mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real  ***
785   !!   
786   !!   Global sum of 1D array or a variable (integer, real or complex)
787   !!----------------------------------------------------------------------
788   !!
789   SUBROUTINE mppsum_a_int( ktab, kdim )
790      !!----------------------------------------------------------------------
791      INTEGER, INTENT(in   )                   ::   kdim   ! ???
792      INTEGER, INTENT(inout), DIMENSION (kdim) ::   ktab   ! ???
793      INTEGER :: ierror
794      INTEGER, DIMENSION (kdim) ::  iwork
795      !!----------------------------------------------------------------------
796      CALL mpi_allreduce( ktab, iwork, kdim, mpi_integer, mpi_sum, mpi_comm_oce, ierror )
797      ktab(:) = iwork(:)
798   END SUBROUTINE mppsum_a_int
799   !!
800   SUBROUTINE mppsum_int( ktab )
801      !!----------------------------------------------------------------------
802      INTEGER, INTENT(inout) ::   ktab
803      INTEGER :: ierror, iwork
804      !!----------------------------------------------------------------------
805      CALL mpi_allreduce( ktab, iwork, 1, mpi_integer, mpi_sum, mpi_comm_oce, ierror )
806      ktab = iwork
807   END SUBROUTINE mppsum_int
808   !!
809   SUBROUTINE mppsum_a_real( ptab, kdim, kcom )
810      !!-----------------------------------------------------------------------
811      INTEGER                  , INTENT(in   ) ::   kdim   ! size of ptab
812      REAL(wp), DIMENSION(kdim), INTENT(inout) ::   ptab   ! input array
813      INTEGER , OPTIONAL       , INTENT(in   ) ::   kcom   ! specific communicator
814      INTEGER  ::   ierror, ilocalcomm    ! local integer
815      REAL(wp) ::   zwork(kdim)           ! local workspace
816      !!-----------------------------------------------------------------------
817      ilocalcomm = mpi_comm_oce
818      IF( PRESENT(kcom) )   ilocalcomm = kcom
819      CALL mpi_allreduce( ptab, zwork, kdim, mpi_double_precision, mpi_sum, ilocalcomm, ierror )
820      ptab(:) = zwork(:)
821   END SUBROUTINE mppsum_a_real
822   !!
823   SUBROUTINE mppsum_real( ptab, kcom )
824      !!-----------------------------------------------------------------------
825      REAL(wp)          , INTENT(inout)           ::   ptab   ! input scalar
826      INTEGER , OPTIONAL, INTENT(in   ) ::   kcom
827      INTEGER  ::   ierror, ilocalcomm
828      REAL(wp) ::   zwork
829      !!-----------------------------------------------------------------------
830      ilocalcomm = mpi_comm_oce
831      IF( PRESENT(kcom) )   ilocalcomm = kcom
832      CALL mpi_allreduce( ptab, zwork, 1, mpi_double_precision, mpi_sum, ilocalcomm, ierror )
833      ptab = zwork
834   END SUBROUTINE mppsum_real
835   !!
836   SUBROUTINE mppsum_realdd( ytab, kcom )
837      !!-----------------------------------------------------------------------
838      COMPLEX(wp)          , INTENT(inout) ::   ytab    ! input scalar
839      INTEGER    , OPTIONAL, INTENT(in   ) ::   kcom
840      INTEGER     ::   ierror, ilocalcomm
841      COMPLEX(wp) ::   zwork
842      !!-----------------------------------------------------------------------
843      ilocalcomm = mpi_comm_oce
844      IF( PRESENT(kcom) )   ilocalcomm = kcom
845      CALL MPI_ALLREDUCE( ytab, zwork, 1, MPI_DOUBLE_COMPLEX, MPI_SUMDD, ilocalcomm, ierror )
846      ytab = zwork
847   END SUBROUTINE mppsum_realdd
848   !!
849   SUBROUTINE mppsum_a_realdd( ytab, kdim, kcom )
850      !!----------------------------------------------------------------------
851      INTEGER                     , INTENT(in   ) ::   kdim   ! size of ytab
852      COMPLEX(wp), DIMENSION(kdim), INTENT(inout) ::   ytab   ! input array
853      INTEGER    , OPTIONAL       , INTENT(in   ) ::   kcom
854      INTEGER:: ierror, ilocalcomm    ! local integer
855      COMPLEX(wp), DIMENSION(kdim) :: zwork     ! temporary workspace
856      !!-----------------------------------------------------------------------
857      ilocalcomm = mpi_comm_oce
858      IF( PRESENT(kcom) )   ilocalcomm = kcom
859      CALL MPI_ALLREDUCE( ytab, zwork, kdim, MPI_DOUBLE_COMPLEX, MPI_SUMDD, ilocalcomm, ierror )
860      ytab(:) = zwork(:)
861   END SUBROUTINE mppsum_a_realdd
862   
863
864   SUBROUTINE mppmax_real_multiple( pt1d, kdim, kcom  )
865      !!----------------------------------------------------------------------
866      !!                  ***  routine mppmax_real  ***
867      !!
868      !! ** Purpose :   Maximum across processor of each element of a 1D arrays
869      !!
870      !!----------------------------------------------------------------------
871      REAL(wp), DIMENSION(kdim), INTENT(inout) ::   pt1d   ! 1D arrays
872      INTEGER                  , INTENT(in   ) ::   kdim
873      INTEGER , OPTIONAL       , INTENT(in   ) ::   kcom   ! local communicator
874      !!
875      INTEGER  ::   ierror, ilocalcomm
876      REAL(wp), DIMENSION(kdim) ::  zwork
877      !!----------------------------------------------------------------------
878      ilocalcomm = mpi_comm_oce
879      IF( PRESENT(kcom) )   ilocalcomm = kcom
880      !
881      CALL mpi_allreduce( pt1d, zwork, kdim, mpi_double_precision, mpi_max, ilocalcomm, ierror )
882      pt1d(:) = zwork(:)
883      !
884   END SUBROUTINE mppmax_real_multiple
885
886
887   SUBROUTINE mpp_minloc2d( ptab, pmask, pmin, ki,kj )
888      !!------------------------------------------------------------------------
889      !!             ***  routine mpp_minloc  ***
890      !!
891      !! ** Purpose :   Compute the global minimum of an array ptab
892      !!              and also give its global position
893      !!
894      !! ** Method  :   Use MPI_ALLREDUCE with MPI_MINLOC
895      !!
896      !!--------------------------------------------------------------------------
897      REAL(wp), DIMENSION (jpi,jpj), INTENT(in   ) ::   ptab    ! Local 2D array
898      REAL(wp), DIMENSION (jpi,jpj), INTENT(in   ) ::   pmask   ! Local mask
899      REAL(wp)                     , INTENT(  out) ::   pmin    ! Global minimum of ptab
900      INTEGER                      , INTENT(  out) ::   ki, kj  ! index of minimum in global frame
901      !
902      INTEGER :: ierror
903      INTEGER , DIMENSION(2)   ::   ilocs
904      REAL(wp) ::   zmin   ! local minimum
905      REAL(wp), DIMENSION(2,1) ::   zain, zaout
906      !!-----------------------------------------------------------------------
907      !
908      zmin  = MINVAL( ptab(:,:) , mask= pmask == 1._wp )
909      ilocs = MINLOC( ptab(:,:) , mask= pmask == 1._wp )
910      !
911      ki = ilocs(1) + nimpp - 1
912      kj = ilocs(2) + njmpp - 1
913      !
914      zain(1,:)=zmin
915      zain(2,:)=ki+10000.*kj
916      !
917      CALL MPI_ALLREDUCE( zain,zaout, 1, MPI_2DOUBLE_PRECISION,MPI_MINLOC,MPI_COMM_OCE,ierror)
918      !
919      pmin = zaout(1,1)
920      kj = INT(zaout(2,1)/10000.)
921      ki = INT(zaout(2,1) - 10000.*kj )
922      !
923   END SUBROUTINE mpp_minloc2d
924
925
926   SUBROUTINE mpp_minloc3d( ptab, pmask, pmin, ki, kj ,kk)
927      !!------------------------------------------------------------------------
928      !!             ***  routine mpp_minloc  ***
929      !!
930      !! ** Purpose :   Compute the global minimum of an array ptab
931      !!              and also give its global position
932      !!
933      !! ** Method  :   Use MPI_ALLREDUCE with MPI_MINLOC
934      !!
935      !!--------------------------------------------------------------------------
936      REAL(wp), DIMENSION(:,:,:), INTENT(in   ) ::   ptab         ! Local 2D array
937      REAL(wp), DIMENSION(:,:,:), INTENT(in   ) ::   pmask        ! Local mask
938      REAL(wp)                  , INTENT(  out) ::   pmin         ! Global minimum of ptab
939      INTEGER                   , INTENT(  out) ::   ki, kj, kk   ! index of minimum in global frame
940      !
941      INTEGER  ::   ierror
942      REAL(wp) ::   zmin     ! local minimum
943      INTEGER , DIMENSION(3)   ::   ilocs
944      REAL(wp), DIMENSION(2,1) ::   zain, zaout
945      !!-----------------------------------------------------------------------
946      !
947      zmin  = MINVAL( ptab(:,:,:) , mask= pmask == 1._wp )
948      ilocs = MINLOC( ptab(:,:,:) , mask= pmask == 1._wp )
949      !
950      ki = ilocs(1) + nimpp - 1
951      kj = ilocs(2) + njmpp - 1
952      kk = ilocs(3)
953      !
954      zain(1,:) = zmin
955      zain(2,:) = ki + 10000.*kj + 100000000.*kk
956      !
957      CALL MPI_ALLREDUCE( zain,zaout, 1, MPI_2DOUBLE_PRECISION,MPI_MINLOC,MPI_COMM_OCE,ierror)
958      !
959      pmin = zaout(1,1)
960      kk   = INT( zaout(2,1) / 100000000. )
961      kj   = INT( zaout(2,1) - kk * 100000000. ) / 10000
962      ki   = INT( zaout(2,1) - kk * 100000000. -kj * 10000. )
963      !
964   END SUBROUTINE mpp_minloc3d
965
966
967   SUBROUTINE mpp_maxloc2d( ptab, pmask, pmax, ki, kj )
968      !!------------------------------------------------------------------------
969      !!             ***  routine mpp_maxloc  ***
970      !!
971      !! ** Purpose :   Compute the global maximum of an array ptab
972      !!              and also give its global position
973      !!
974      !! ** Method  :   Use MPI_ALLREDUCE with MPI_MINLOC
975      !!
976      !!--------------------------------------------------------------------------
977      REAL(wp), DIMENSION (jpi,jpj), INTENT(in   ) ::   ptab     ! Local 2D array
978      REAL(wp), DIMENSION (jpi,jpj), INTENT(in   ) ::   pmask    ! Local mask
979      REAL(wp)                     , INTENT(  out) ::   pmax     ! Global maximum of ptab
980      INTEGER                      , INTENT(  out) ::   ki, kj   ! index of maximum in global frame
981      !!
982      INTEGER  :: ierror
983      INTEGER, DIMENSION (2)   ::   ilocs
984      REAL(wp) :: zmax   ! local maximum
985      REAL(wp), DIMENSION(2,1) ::   zain, zaout
986      !!-----------------------------------------------------------------------
987      !
988      zmax  = MAXVAL( ptab(:,:) , mask= pmask == 1._wp )
989      ilocs = MAXLOC( ptab(:,:) , mask= pmask == 1._wp )
990      !
991      ki = ilocs(1) + nimpp - 1
992      kj = ilocs(2) + njmpp - 1
993      !
994      zain(1,:) = zmax
995      zain(2,:) = ki + 10000. * kj
996      !
997      CALL MPI_ALLREDUCE( zain,zaout, 1, MPI_2DOUBLE_PRECISION,MPI_MAXLOC,MPI_COMM_OCE,ierror)
998      !
999      pmax = zaout(1,1)
1000      kj   = INT( zaout(2,1) / 10000.     )
1001      ki   = INT( zaout(2,1) - 10000.* kj )
1002      !
1003   END SUBROUTINE mpp_maxloc2d
1004
1005
1006   SUBROUTINE mpp_maxloc3d( ptab, pmask, pmax, ki, kj, kk )
1007      !!------------------------------------------------------------------------
1008      !!             ***  routine mpp_maxloc  ***
1009      !!
1010      !! ** Purpose :  Compute the global maximum of an array ptab
1011      !!              and also give its global position
1012      !!
1013      !! ** Method : Use MPI_ALLREDUCE with MPI_MINLOC
1014      !!
1015      !!--------------------------------------------------------------------------
1016      REAL(wp), DIMENSION (:,:,:), INTENT(in   ) ::   ptab         ! Local 2D array
1017      REAL(wp), DIMENSION (:,:,:), INTENT(in   ) ::   pmask        ! Local mask
1018      REAL(wp)                   , INTENT(  out) ::   pmax         ! Global maximum of ptab
1019      INTEGER                    , INTENT(  out) ::   ki, kj, kk   ! index of maximum in global frame
1020      !
1021      INTEGER  ::   ierror   ! local integer
1022      REAL(wp) ::   zmax     ! local maximum
1023      REAL(wp), DIMENSION(2,1) ::   zain, zaout
1024      INTEGER , DIMENSION(3)   ::   ilocs
1025      !!-----------------------------------------------------------------------
1026      !
1027      zmax  = MAXVAL( ptab(:,:,:) , mask= pmask == 1._wp )
1028      ilocs = MAXLOC( ptab(:,:,:) , mask= pmask == 1._wp )
1029      !
1030      ki = ilocs(1) + nimpp - 1
1031      kj = ilocs(2) + njmpp - 1
1032      kk = ilocs(3)
1033      !
1034      zain(1,:) = zmax
1035      zain(2,:) = ki + 10000.*kj + 100000000.*kk
1036      !
1037      CALL MPI_ALLREDUCE( zain,zaout, 1, MPI_2DOUBLE_PRECISION,MPI_MAXLOC,MPI_COMM_OCE,ierror)
1038      !
1039      pmax = zaout(1,1)
1040      kk   = INT( zaout(2,1) / 100000000. )
1041      kj   = INT( zaout(2,1) - kk * 100000000. ) / 10000
1042      ki   = INT( zaout(2,1) - kk * 100000000. -kj * 10000. )
1043      !
1044   END SUBROUTINE mpp_maxloc3d
1045
1046
1047   SUBROUTINE mppsync()
1048      !!----------------------------------------------------------------------
1049      !!                  ***  routine mppsync  ***
1050      !!
1051      !! ** Purpose :   Massively parallel processors, synchroneous
1052      !!
1053      !!-----------------------------------------------------------------------
1054      INTEGER :: ierror
1055      !!-----------------------------------------------------------------------
1056      !
1057      CALL mpi_barrier( mpi_comm_oce, ierror )
1058      !
1059   END SUBROUTINE mppsync
1060
1061
1062   SUBROUTINE mppstop
1063      !!----------------------------------------------------------------------
1064      !!                  ***  routine mppstop  ***
1065      !!
1066      !! ** purpose :   Stop massively parallel processors method
1067      !!
1068      !!----------------------------------------------------------------------
1069      INTEGER ::   info
1070      !!----------------------------------------------------------------------
1071      !
1072      CALL mppsync
1073      CALL mpi_finalize( info )
1074      !
1075   END SUBROUTINE mppstop
1076
1077
1078   SUBROUTINE mpp_comm_free( kcom )
1079      !!----------------------------------------------------------------------
1080      INTEGER, INTENT(in) ::   kcom
1081      !!
1082      INTEGER :: ierr
1083      !!----------------------------------------------------------------------
1084      !
1085      CALL MPI_COMM_FREE(kcom, ierr)
1086      !
1087   END SUBROUTINE mpp_comm_free
1088
1089
1090   SUBROUTINE mpp_ini_ice( pindic, kumout )
1091      !!----------------------------------------------------------------------
1092      !!               ***  routine mpp_ini_ice  ***
1093      !!
1094      !! ** Purpose :   Initialize special communicator for ice areas
1095      !!      condition together with global variables needed in the ddmpp folding
1096      !!
1097      !! ** Method  : - Look for ice processors in ice routines
1098      !!              - Put their number in nrank_ice
1099      !!              - Create groups for the world processors and the ice processors
1100      !!              - Create a communicator for ice processors
1101      !!
1102      !! ** output
1103      !!      njmppmax = njmpp for northern procs
1104      !!      ndim_rank_ice = number of processors with ice
1105      !!      nrank_ice (ndim_rank_ice) = ice processors
1106      !!      ngrp_iworld = group ID for the world processors
1107      !!      ngrp_ice = group ID for the ice processors
1108      !!      ncomm_ice = communicator for the ice procs.
1109      !!      n_ice_root = number (in the world) of proc 0 in the ice comm.
1110      !!
1111      !!----------------------------------------------------------------------
1112      INTEGER, INTENT(in) ::   pindic
1113      INTEGER, INTENT(in) ::   kumout   ! ocean.output logical unit
1114      !!
1115      INTEGER :: jjproc
1116      INTEGER :: ii, ierr
1117      INTEGER, ALLOCATABLE, DIMENSION(:) ::   kice
1118      INTEGER, ALLOCATABLE, DIMENSION(:) ::   zwork
1119      !!----------------------------------------------------------------------
1120      !
1121      ALLOCATE( kice(jpnij), zwork(jpnij), STAT=ierr )
1122      IF( ierr /= 0 ) THEN
1123         WRITE(kumout, cform_err)
1124         WRITE(kumout,*) 'mpp_ini_ice : failed to allocate 2, 1D arrays (jpnij in length)'
1125         CALL mppstop
1126      ENDIF
1127
1128      ! Look for how many procs with sea-ice
1129      !
1130      kice = 0
1131      DO jjproc = 1, jpnij
1132         IF( jjproc == narea .AND. pindic .GT. 0 )   kice(jjproc) = 1
1133      END DO
1134      !
1135      zwork = 0
1136      CALL MPI_ALLREDUCE( kice, zwork, jpnij, mpi_integer, mpi_sum, mpi_comm_oce, ierr )
1137      ndim_rank_ice = SUM( zwork )
1138
1139      ! Allocate the right size to nrank_north
1140      IF( ALLOCATED ( nrank_ice ) )   DEALLOCATE( nrank_ice )
1141      ALLOCATE( nrank_ice(ndim_rank_ice) )
1142      !
1143      ii = 0
1144      nrank_ice = 0
1145      DO jjproc = 1, jpnij
1146         IF( zwork(jjproc) == 1) THEN
1147            ii = ii + 1
1148            nrank_ice(ii) = jjproc -1
1149         ENDIF
1150      END DO
1151
1152      ! Create the world group
1153      CALL MPI_COMM_GROUP( mpi_comm_oce, ngrp_iworld, ierr )
1154
1155      ! Create the ice group from the world group
1156      CALL MPI_GROUP_INCL( ngrp_iworld, ndim_rank_ice, nrank_ice, ngrp_ice, ierr )
1157
1158      ! Create the ice communicator , ie the pool of procs with sea-ice
1159      CALL MPI_COMM_CREATE( mpi_comm_oce, ngrp_ice, ncomm_ice, ierr )
1160
1161      ! Find proc number in the world of proc 0 in the north
1162      ! The following line seems to be useless, we just comment & keep it as reminder
1163      ! CALL MPI_GROUP_TRANSLATE_RANKS(ngrp_ice,1,0,ngrp_iworld,n_ice_root,ierr)
1164      !
1165      CALL MPI_GROUP_FREE(ngrp_ice, ierr)
1166      CALL MPI_GROUP_FREE(ngrp_iworld, ierr)
1167
1168      DEALLOCATE(kice, zwork)
1169      !
1170   END SUBROUTINE mpp_ini_ice
1171
1172
1173   SUBROUTINE mpp_ini_znl( kumout )
1174      !!----------------------------------------------------------------------
1175      !!               ***  routine mpp_ini_znl  ***
1176      !!
1177      !! ** Purpose :   Initialize special communicator for computing zonal sum
1178      !!
1179      !! ** Method  : - Look for processors in the same row
1180      !!              - Put their number in nrank_znl
1181      !!              - Create group for the znl processors
1182      !!              - Create a communicator for znl processors
1183      !!              - Determine if processor should write znl files
1184      !!
1185      !! ** output
1186      !!      ndim_rank_znl = number of processors on the same row
1187      !!      ngrp_znl = group ID for the znl processors
1188      !!      ncomm_znl = communicator for the ice procs.
1189      !!      n_znl_root = number (in the world) of proc 0 in the ice comm.
1190      !!
1191      !!----------------------------------------------------------------------
1192      INTEGER, INTENT(in) ::   kumout   ! ocean.output logical units
1193      !
1194      INTEGER :: jproc      ! dummy loop integer
1195      INTEGER :: ierr, ii   ! local integer
1196      INTEGER, ALLOCATABLE, DIMENSION(:) ::   kwork
1197      !!----------------------------------------------------------------------
1198      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_world     : ', ngrp_world
1199      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_world : ', mpi_comm_world
1200      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_oce   : ', mpi_comm_oce
1201      !
1202      ALLOCATE( kwork(jpnij), STAT=ierr )
1203      IF( ierr /= 0 ) THEN
1204         WRITE(kumout, cform_err)
1205         WRITE(kumout,*) 'mpp_ini_znl : failed to allocate 1D array of length jpnij'
1206         CALL mppstop
1207      ENDIF
1208
1209      IF( jpnj == 1 ) THEN
1210         ngrp_znl  = ngrp_world
1211         ncomm_znl = mpi_comm_oce
1212      ELSE
1213         !
1214         CALL MPI_ALLGATHER ( njmpp, 1, mpi_integer, kwork, 1, mpi_integer, mpi_comm_oce, ierr )
1215         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - kwork pour njmpp : ', kwork
1216         !-$$        CALL flush(numout)
1217         !
1218         ! Count number of processors on the same row
1219         ndim_rank_znl = 0
1220         DO jproc=1,jpnij
1221            IF ( kwork(jproc) == njmpp ) THEN
1222               ndim_rank_znl = ndim_rank_znl + 1
1223            ENDIF
1224         END DO
1225         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ndim_rank_znl : ', ndim_rank_znl
1226         !-$$        CALL flush(numout)
1227         ! Allocate the right size to nrank_znl
1228         IF (ALLOCATED (nrank_znl)) DEALLOCATE(nrank_znl)
1229         ALLOCATE(nrank_znl(ndim_rank_znl))
1230         ii = 0
1231         nrank_znl (:) = 0
1232         DO jproc=1,jpnij
1233            IF ( kwork(jproc) == njmpp) THEN
1234               ii = ii + 1
1235               nrank_znl(ii) = jproc -1
1236            ENDIF
1237         END DO
1238         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - nrank_znl : ', nrank_znl
1239         !-$$        CALL flush(numout)
1240
1241         ! Create the opa group
1242         CALL MPI_COMM_GROUP(mpi_comm_oce,ngrp_opa,ierr)
1243         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_opa : ', ngrp_opa
1244         !-$$        CALL flush(numout)
1245
1246         ! Create the znl group from the opa group
1247         CALL MPI_GROUP_INCL  ( ngrp_opa, ndim_rank_znl, nrank_znl, ngrp_znl, ierr )
1248         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_znl ', ngrp_znl
1249         !-$$        CALL flush(numout)
1250
1251         ! Create the znl communicator from the opa communicator, ie the pool of procs in the same row
1252         CALL MPI_COMM_CREATE ( mpi_comm_oce, ngrp_znl, ncomm_znl, ierr )
1253         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ncomm_znl ', ncomm_znl
1254         !-$$        CALL flush(numout)
1255         !
1256      END IF
1257
1258      ! Determines if processor if the first (starting from i=1) on the row
1259      IF ( jpni == 1 ) THEN
1260         l_znl_root = .TRUE.
1261      ELSE
1262         l_znl_root = .FALSE.
1263         kwork (1) = nimpp
1264         CALL mpp_min ( kwork(1), kcom = ncomm_znl)
1265         IF ( nimpp == kwork(1)) l_znl_root = .TRUE.
1266      END IF
1267
1268      DEALLOCATE(kwork)
1269
1270   END SUBROUTINE mpp_ini_znl
1271
1272
1273   SUBROUTINE mpp_ini_north
1274      !!----------------------------------------------------------------------
1275      !!               ***  routine mpp_ini_north  ***
1276      !!
1277      !! ** Purpose :   Initialize special communicator for north folding
1278      !!      condition together with global variables needed in the mpp folding
1279      !!
1280      !! ** Method  : - Look for northern processors
1281      !!              - Put their number in nrank_north
1282      !!              - Create groups for the world processors and the north processors
1283      !!              - Create a communicator for northern processors
1284      !!
1285      !! ** output
1286      !!      njmppmax = njmpp for northern procs
1287      !!      ndim_rank_north = number of processors in the northern line
1288      !!      nrank_north (ndim_rank_north) = number  of the northern procs.
1289      !!      ngrp_world = group ID for the world processors
1290      !!      ngrp_north = group ID for the northern processors
1291      !!      ncomm_north = communicator for the northern procs.
1292      !!      north_root = number (in the world) of proc 0 in the northern comm.
1293      !!
1294      !!----------------------------------------------------------------------
1295      INTEGER ::   ierr
1296      INTEGER ::   jjproc
1297      INTEGER ::   ii, ji
1298      !!----------------------------------------------------------------------
1299      !
1300      njmppmax = MAXVAL( njmppt )
1301      !
1302      ! Look for how many procs on the northern boundary
1303      ndim_rank_north = 0
1304      DO jjproc = 1, jpnij
1305         IF( njmppt(jjproc) == njmppmax )   ndim_rank_north = ndim_rank_north + 1
1306      END DO
1307      !
1308      ! Allocate the right size to nrank_north
1309      IF (ALLOCATED (nrank_north)) DEALLOCATE(nrank_north)
1310      ALLOCATE( nrank_north(ndim_rank_north) )
1311
1312      ! Fill the nrank_north array with proc. number of northern procs.
1313      ! Note : the rank start at 0 in MPI
1314      ii = 0
1315      DO ji = 1, jpnij
1316         IF ( njmppt(ji) == njmppmax   ) THEN
1317            ii=ii+1
1318            nrank_north(ii)=ji-1
1319         END IF
1320      END DO
1321      !
1322      ! create the world group
1323      CALL MPI_COMM_GROUP( mpi_comm_oce, ngrp_world, ierr )
1324      !
1325      ! Create the North group from the world group
1326      CALL MPI_GROUP_INCL( ngrp_world, ndim_rank_north, nrank_north, ngrp_north, ierr )
1327      !
1328      ! Create the North communicator , ie the pool of procs in the north group
1329      CALL MPI_COMM_CREATE( mpi_comm_oce, ngrp_north, ncomm_north, ierr )
1330      !
1331   END SUBROUTINE mpp_ini_north
1332
1333
1334   SUBROUTINE mpi_init_oce( ldtxt, ksft, code )
1335      !!---------------------------------------------------------------------
1336      !!                   ***  routine mpp_init.opa  ***
1337      !!
1338      !! ** Purpose :: export and attach a MPI buffer for bsend
1339      !!
1340      !! ** Method  :: define buffer size in namelist, if 0 no buffer attachment
1341      !!            but classical mpi_init
1342      !!
1343      !! History :: 01/11 :: IDRIS initial version for IBM only
1344      !!            08/04 :: R. Benshila, generalisation
1345      !!---------------------------------------------------------------------
1346      CHARACTER(len=*),DIMENSION(:), INTENT(  out) ::   ldtxt
1347      INTEGER                      , INTENT(inout) ::   ksft
1348      INTEGER                      , INTENT(  out) ::   code
1349      INTEGER                                      ::   ierr, ji
1350      LOGICAL                                      ::   mpi_was_called
1351      !!---------------------------------------------------------------------
1352      !
1353      CALL mpi_initialized( mpi_was_called, code )      ! MPI initialization
1354      IF ( code /= MPI_SUCCESS ) THEN
1355         DO ji = 1, SIZE(ldtxt)
1356            IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
1357         END DO
1358         WRITE(*, cform_err)
1359         WRITE(*, *) ' lib_mpp: Error in routine mpi_initialized'
1360         CALL mpi_abort( mpi_comm_world, code, ierr )
1361      ENDIF
1362      !
1363      IF( .NOT. mpi_was_called ) THEN
1364         CALL mpi_init( code )
1365         CALL mpi_comm_dup( mpi_comm_world, mpi_comm_oce, code )
1366         IF ( code /= MPI_SUCCESS ) THEN
1367            DO ji = 1, SIZE(ldtxt)
1368               IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
1369            END DO
1370            WRITE(*, cform_err)
1371            WRITE(*, *) ' lib_mpp: Error in routine mpi_comm_dup'
1372            CALL mpi_abort( mpi_comm_world, code, ierr )
1373         ENDIF
1374      ENDIF
1375      !
1376      IF( nn_buffer > 0 ) THEN
1377         WRITE(ldtxt(ksft),*) 'mpi_bsend, buffer allocation of  : ', nn_buffer   ;   ksft = ksft + 1
1378         ! Buffer allocation and attachment
1379         ALLOCATE( tampon(nn_buffer), stat = ierr )
1380         IF( ierr /= 0 ) THEN
1381            DO ji = 1, SIZE(ldtxt)
1382               IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
1383            END DO
1384            WRITE(*, cform_err)
1385            WRITE(*, *) ' lib_mpp: Error in ALLOCATE', ierr
1386            CALL mpi_abort( mpi_comm_world, code, ierr )
1387         END IF
1388         CALL mpi_buffer_attach( tampon, nn_buffer, code )
1389      ENDIF
1390      !
1391   END SUBROUTINE mpi_init_oce
1392
1393
1394   SUBROUTINE DDPDD_MPI( ydda, yddb, ilen, itype )
1395      !!---------------------------------------------------------------------
1396      !!   Routine DDPDD_MPI: used by reduction operator MPI_SUMDD
1397      !!
1398      !!   Modification of original codes written by David H. Bailey
1399      !!   This subroutine computes yddb(i) = ydda(i)+yddb(i)
1400      !!---------------------------------------------------------------------
1401      INTEGER                     , INTENT(in)    ::   ilen, itype
1402      COMPLEX(wp), DIMENSION(ilen), INTENT(in)    ::   ydda
1403      COMPLEX(wp), DIMENSION(ilen), INTENT(inout) ::   yddb
1404      !
1405      REAL(wp) :: zerr, zt1, zt2    ! local work variables
1406      INTEGER  :: ji, ztmp           ! local scalar
1407      !!---------------------------------------------------------------------
1408      !
1409      ztmp = itype   ! avoid compilation warning
1410      !
1411      DO ji=1,ilen
1412      ! Compute ydda + yddb using Knuth's trick.
1413         zt1  = real(ydda(ji)) + real(yddb(ji))
1414         zerr = zt1 - real(ydda(ji))
1415         zt2  = ((real(yddb(ji)) - zerr) + (real(ydda(ji)) - (zt1 - zerr))) &
1416                + aimag(ydda(ji)) + aimag(yddb(ji))
1417
1418         ! The result is zt1 + zt2, after normalization.
1419         yddb(ji) = cmplx ( zt1 + zt2, zt2 - ((zt1 + zt2) - zt1),wp )
1420      END DO
1421      !
1422   END SUBROUTINE DDPDD_MPI
1423
1424
1425   SUBROUTINE mpp_lbc_north_icb( pt2d, cd_type, psgn, kextj)
1426      !!---------------------------------------------------------------------
1427      !!                   ***  routine mpp_lbc_north_icb  ***
1428      !!
1429      !! ** Purpose :   Ensure proper north fold horizontal bondary condition
1430      !!              in mpp configuration in case of jpn1 > 1 and for 2d
1431      !!              array with outer extra halo
1432      !!
1433      !! ** Method  :   North fold condition and mpp with more than one proc
1434      !!              in i-direction require a specific treatment. We gather
1435      !!              the 4+kextj northern lines of the global domain on 1
1436      !!              processor and apply lbc north-fold on this sub array.
1437      !!              Then we scatter the north fold array back to the processors.
1438      !!              This routine accounts for an extra halo with icebergs
1439      !!              and assumes ghost rows and columns have been suppressed.
1440      !!
1441      !!----------------------------------------------------------------------
1442      REAL(wp), DIMENSION(:,:), INTENT(inout) ::   pt2d     ! 2D array with extra halo
1443      CHARACTER(len=1)        , INTENT(in   ) ::   cd_type  ! nature of pt3d grid-points
1444      !                                                     !   = T ,  U , V , F or W -points
1445      REAL(wp)                , INTENT(in   ) ::   psgn     ! = -1. the sign change across the
1446      !!                                                    ! north fold, =  1. otherwise
1447      INTEGER                 , INTENT(in   ) ::   kextj    ! Extra halo width at north fold
1448      !
1449      INTEGER ::   ji, jj, jr
1450      INTEGER ::   ierr, itaille, ildi, ilei, iilb
1451      INTEGER ::   ipj, ij, iproc
1452      !
1453      REAL(wp), DIMENSION(:,:)  , ALLOCATABLE  ::  ztab_e, znorthloc_e
1454      REAL(wp), DIMENSION(:,:,:), ALLOCATABLE  ::  znorthgloio_e
1455      !!----------------------------------------------------------------------
1456      !
1457      ipj=4
1458      ALLOCATE(        ztab_e(jpiglo, 1-kextj:ipj+kextj)       ,       &
1459     &            znorthloc_e(jpimax, 1-kextj:ipj+kextj)       ,       &
1460     &          znorthgloio_e(jpimax, 1-kextj:ipj+kextj,jpni)    )
1461      !
1462      ztab_e(:,:)      = 0._wp
1463      znorthloc_e(:,:) = 0._wp
1464      !
1465      ij = 1 - kextj
1466      ! put the last ipj+2*kextj lines of pt2d into znorthloc_e
1467      DO jj = jpj - ipj + 1 - kextj , jpj + kextj
1468         znorthloc_e(1:jpi,ij)=pt2d(1:jpi,jj)
1469         ij = ij + 1
1470      END DO
1471      !
1472      itaille = jpimax * ( ipj + 2*kextj )
1473      CALL MPI_ALLGATHER( znorthloc_e(1,1-kextj)    , itaille, MPI_DOUBLE_PRECISION,    &
1474         &                znorthgloio_e(1,1-kextj,1), itaille, MPI_DOUBLE_PRECISION,    &
1475         &                ncomm_north, ierr )
1476      !
1477      DO jr = 1, ndim_rank_north            ! recover the global north array
1478         iproc = nrank_north(jr) + 1
1479         ildi = nldit (iproc)
1480         ilei = nleit (iproc)
1481         iilb = nimppt(iproc)
1482         DO jj = 1-kextj, ipj+kextj
1483            DO ji = ildi, ilei
1484               ztab_e(ji+iilb-1,jj) = znorthgloio_e(ji,jj,jr)
1485            END DO
1486         END DO
1487      END DO
1488
1489      ! 2. North-Fold boundary conditions
1490      ! ----------------------------------
1491      CALL lbc_nfd( ztab_e(:,1-kextj:ipj+kextj), cd_type, psgn, kextj )
1492
1493      ij = 1 - kextj
1494      !! Scatter back to pt2d
1495      DO jj = jpj - ipj + 1 - kextj , jpj + kextj
1496         DO ji= 1, jpi
1497            pt2d(ji,jj) = ztab_e(ji+nimpp-1,ij)
1498         END DO
1499         ij  = ij +1
1500      END DO
1501      !
1502      DEALLOCATE( ztab_e, znorthloc_e, znorthgloio_e )
1503      !
1504   END SUBROUTINE mpp_lbc_north_icb
1505
1506
1507   SUBROUTINE mpp_lnk_2d_icb( pt2d, cd_type, psgn, kexti, kextj )
1508      !!----------------------------------------------------------------------
1509      !!                  ***  routine mpp_lnk_2d_icb  ***
1510      !!
1511      !! ** Purpose :   Message passing management for 2d array (with extra halo for icebergs)
1512      !!                This routine receives a (1-kexti:jpi+kexti,1-kexti:jpj+kextj)
1513      !!                array (usually (0:jpi+1, 0:jpj+1)) from lbc_lnk_icb calls.
1514      !!
1515      !! ** Method  :   Use mppsend and mpprecv function for passing mask
1516      !!      between processors following neighboring subdomains.
1517      !!            domain parameters
1518      !!                    jpi    : first dimension of the local subdomain
1519      !!                    jpj    : second dimension of the local subdomain
1520      !!                    kexti  : number of columns for extra outer halo
1521      !!                    kextj  : number of rows for extra outer halo
1522      !!                    nbondi : mark for "east-west local boundary"
1523      !!                    nbondj : mark for "north-south local boundary"
1524      !!                    noea   : number for local neighboring processors
1525      !!                    nowe   : number for local neighboring processors
1526      !!                    noso   : number for local neighboring processors
1527      !!                    nono   : number for local neighboring processors
1528      !!----------------------------------------------------------------------
1529      REAL(wp), DIMENSION(1-kexti:jpi+kexti,1-kextj:jpj+kextj), INTENT(inout) ::   pt2d     ! 2D array with extra halo
1530      CHARACTER(len=1)                                        , INTENT(in   ) ::   cd_type  ! nature of ptab array grid-points
1531      REAL(wp)                                                , INTENT(in   ) ::   psgn     ! sign used across the north fold
1532      INTEGER                                                 , INTENT(in   ) ::   kexti    ! extra i-halo width
1533      INTEGER                                                 , INTENT(in   ) ::   kextj    ! extra j-halo width
1534      !
1535      INTEGER  ::   jl   ! dummy loop indices
1536      INTEGER  ::   imigr, iihom, ijhom        ! local integers
1537      INTEGER  ::   ipreci, iprecj             !   -       -
1538      INTEGER  ::   ml_req1, ml_req2, ml_err   ! for key_mpi_isend
1539      INTEGER, DIMENSION(MPI_STATUS_SIZE) ::   ml_stat   ! for key_mpi_isend
1540      !!
1541      REAL(wp), DIMENSION(1-kexti:jpi+kexti,nn_hls+kextj,2) ::   r2dns, r2dsn
1542      REAL(wp), DIMENSION(1-kextj:jpj+kextj,nn_hls+kexti,2) ::   r2dwe, r2dew
1543      !!----------------------------------------------------------------------
1544
1545      ipreci = nn_hls + kexti      ! take into account outer extra 2D overlap area
1546      iprecj = nn_hls + kextj
1547
1548
1549      ! 1. standard boundary treatment
1550      ! ------------------------------
1551      ! Order matters Here !!!!
1552      !
1553      !                                      ! East-West boundaries
1554      !                                           !* Cyclic east-west
1555      IF( l_Iperio ) THEN
1556         pt2d(1-kexti:     1   ,:) = pt2d(jpim1-kexti: jpim1 ,:)       ! east
1557         pt2d(  jpi  :jpi+kexti,:) = pt2d(     2     :2+kexti,:)       ! west
1558         !
1559      ELSE                                        !* closed
1560         IF( .NOT. cd_type == 'F' )   pt2d(  1-kexti   :nn_hls   ,:) = 0._wp    ! east except at F-point
1561                                      pt2d(jpi-nn_hls+1:jpi+kexti,:) = 0._wp    ! west
1562      ENDIF
1563      !                                      ! North-South boundaries
1564      IF( l_Jperio ) THEN                         !* cyclic (only with no mpp j-split)
1565         pt2d(:,1-kextj:     1   ) = pt2d(:,jpjm1-kextj:  jpjm1)       ! north
1566         pt2d(:,  jpj  :jpj+kextj) = pt2d(:,     2     :2+kextj)       ! south
1567      ELSE                                        !* closed
1568         IF( .NOT. cd_type == 'F' )   pt2d(:,  1-kextj   :nn_hls   ) = 0._wp    ! north except at F-point
1569                                      pt2d(:,jpj-nn_hls+1:jpj+kextj) = 0._wp    ! south
1570      ENDIF
1571      !
1572
1573      ! north fold treatment
1574      ! -----------------------
1575      IF( npolj /= 0 ) THEN
1576         !
1577         SELECT CASE ( jpni )
1578                   CASE ( 1 )     ;   CALL lbc_nfd          ( pt2d(1:jpi,1:jpj+kextj), cd_type, psgn, kextj )
1579                   CASE DEFAULT   ;   CALL mpp_lbc_north_icb( pt2d(1:jpi,1:jpj+kextj), cd_type, psgn, kextj )
1580         END SELECT
1581         !
1582      ENDIF
1583
1584      ! 2. East and west directions exchange
1585      ! ------------------------------------
1586      ! we play with the neigbours AND the row number because of the periodicity
1587      !
1588      SELECT CASE ( nbondi )      ! Read Dirichlet lateral conditions
1589      CASE ( -1, 0, 1 )                ! all exept 2 (i.e. close case)
1590         iihom = jpi-nreci-kexti
1591         DO jl = 1, ipreci
1592            r2dew(:,jl,1) = pt2d(nn_hls+jl,:)
1593            r2dwe(:,jl,1) = pt2d(iihom +jl,:)
1594         END DO
1595      END SELECT
1596      !
1597      !                           ! Migrations
1598      imigr = ipreci * ( jpj + 2*kextj )
1599      !
1600      SELECT CASE ( nbondi )
1601      CASE ( -1 )
1602         CALL mppsend( 2, r2dwe(1-kextj,1,1), imigr, noea, ml_req1 )
1603         CALL mpprecv( 1, r2dew(1-kextj,1,2), imigr, noea )
1604         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1605      CASE ( 0 )
1606         CALL mppsend( 1, r2dew(1-kextj,1,1), imigr, nowe, ml_req1 )
1607         CALL mppsend( 2, r2dwe(1-kextj,1,1), imigr, noea, ml_req2 )
1608         CALL mpprecv( 1, r2dew(1-kextj,1,2), imigr, noea )
1609         CALL mpprecv( 2, r2dwe(1-kextj,1,2), imigr, nowe )
1610         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1611         IF(l_isend) CALL mpi_wait(ml_req2,ml_stat,ml_err)
1612      CASE ( 1 )
1613         CALL mppsend( 1, r2dew(1-kextj,1,1), imigr, nowe, ml_req1 )
1614         CALL mpprecv( 2, r2dwe(1-kextj,1,2), imigr, nowe )
1615         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1616      END SELECT
1617      !
1618      !                           ! Write Dirichlet lateral conditions
1619      iihom = jpi - nn_hls
1620      !
1621      SELECT CASE ( nbondi )
1622      CASE ( -1 )
1623         DO jl = 1, ipreci
1624            pt2d(iihom+jl,:) = r2dew(:,jl,2)
1625         END DO
1626      CASE ( 0 )
1627         DO jl = 1, ipreci
1628            pt2d(jl-kexti,:) = r2dwe(:,jl,2)
1629            pt2d(iihom+jl,:) = r2dew(:,jl,2)
1630         END DO
1631      CASE ( 1 )
1632         DO jl = 1, ipreci
1633            pt2d(jl-kexti,:) = r2dwe(:,jl,2)
1634         END DO
1635      END SELECT
1636
1637
1638      ! 3. North and south directions
1639      ! -----------------------------
1640      ! always closed : we play only with the neigbours
1641      !
1642      IF( nbondj /= 2 ) THEN      ! Read Dirichlet lateral conditions
1643         ijhom = jpj-nrecj-kextj
1644         DO jl = 1, iprecj
1645            r2dsn(:,jl,1) = pt2d(:,ijhom +jl)
1646            r2dns(:,jl,1) = pt2d(:,nn_hls+jl)
1647         END DO
1648      ENDIF
1649      !
1650      !                           ! Migrations
1651      imigr = iprecj * ( jpi + 2*kexti )
1652      !
1653      SELECT CASE ( nbondj )
1654      CASE ( -1 )
1655         CALL mppsend( 4, r2dsn(1-kexti,1,1), imigr, nono, ml_req1 )
1656         CALL mpprecv( 3, r2dns(1-kexti,1,2), imigr, nono )
1657         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1658      CASE ( 0 )
1659         CALL mppsend( 3, r2dns(1-kexti,1,1), imigr, noso, ml_req1 )
1660         CALL mppsend( 4, r2dsn(1-kexti,1,1), imigr, nono, ml_req2 )
1661         CALL mpprecv( 3, r2dns(1-kexti,1,2), imigr, nono )
1662         CALL mpprecv( 4, r2dsn(1-kexti,1,2), imigr, noso )
1663         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1664         IF(l_isend) CALL mpi_wait(ml_req2,ml_stat,ml_err)
1665      CASE ( 1 )
1666         CALL mppsend( 3, r2dns(1-kexti,1,1), imigr, noso, ml_req1 )
1667         CALL mpprecv( 4, r2dsn(1-kexti,1,2), imigr, noso )
1668         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1669      END SELECT
1670      !
1671      !                           ! Write Dirichlet lateral conditions
1672      ijhom = jpj - nn_hls
1673      !
1674      SELECT CASE ( nbondj )
1675      CASE ( -1 )
1676         DO jl = 1, iprecj
1677            pt2d(:,ijhom+jl) = r2dns(:,jl,2)
1678         END DO
1679      CASE ( 0 )
1680         DO jl = 1, iprecj
1681            pt2d(:,jl-kextj) = r2dsn(:,jl,2)
1682            pt2d(:,ijhom+jl) = r2dns(:,jl,2)
1683         END DO
1684      CASE ( 1 )
1685         DO jl = 1, iprecj
1686            pt2d(:,jl-kextj) = r2dsn(:,jl,2)
1687         END DO
1688      END SELECT
1689      !
1690   END SUBROUTINE mpp_lnk_2d_icb
1691
1692   SUBROUTINE tic_tac (l_tic)
1693
1694    LOGICAL, INTENT(IN) :: l_tic
1695    LOGICAL, SAVE :: l_write_wt = .TRUE.
1696    REAL(wp),SAVE  ::   tic_wt=0., tac_wt=0., tic_ct=0., tac_ct=0.
1697
1698    ! start count
1699    IF ( l_tic ) THEN
1700       IF ( ncom_stp > nit000 ) tic_wt = MPI_Wtime()
1701       IF ( tic_ct > 0.0_wp ) tac_ct = tac_ct + MPI_Wtime() - tic_ct
1702    ! stop count and sum up
1703    ELSE
1704       IF ( ncom_stp > nit000 ) tac_wt = tac_wt + MPI_Wtime() - tic_wt
1705       IF ( ( ncom_stp == ( nitend - nn_comm_mod ) ) .AND. l_write_wt ) then
1706          WRITE(6,'(A20,F11.6,A15,I8)') 'Computing time : ',tac_ct,' on MPI rank : ', narea
1707          WRITE(6,'(A20,F11.6,A15,I8)') 'Waiting time : ',tac_wt,' on MPI rank : ', narea
1708          l_write_wt = .FALSE.
1709       END IF
1710       IF ( ncom_stp > nit000 ) tic_ct = MPI_Wtime()
1711    ENDIF
1712   END SUBROUTINE tic_tac
1713
1714   
1715#else
1716   !!----------------------------------------------------------------------
1717   !!   Default case:            Dummy module        share memory computing
1718   !!----------------------------------------------------------------------
1719   USE in_out_manager
1720
1721   INTERFACE mpp_sum
1722      MODULE PROCEDURE mpp_sum_a2s, mpp_sum_as, mpp_sum_ai, mpp_sum_s, mpp_sum_i, mppsum_realdd, mppsum_a_realdd
1723   END INTERFACE
1724   INTERFACE mpp_max
1725      MODULE PROCEDURE mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real
1726   END INTERFACE
1727   INTERFACE mpp_min
1728      MODULE PROCEDURE mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real
1729   END INTERFACE
1730   INTERFACE mpp_minloc
1731      MODULE PROCEDURE mpp_minloc2d ,mpp_minloc3d
1732   END INTERFACE
1733   INTERFACE mpp_maxloc
1734      MODULE PROCEDURE mpp_maxloc2d ,mpp_maxloc3d
1735   END INTERFACE
1736   INTERFACE mpp_max_multiple
1737      MODULE PROCEDURE mppmax_real_multiple
1738   END INTERFACE
1739
1740   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .FALSE.      !: mpp flag
1741   LOGICAL, PUBLIC            ::   ln_nnogather          !: namelist control of northfold comms (needed here in case "key_mpp_mpi" is not used)
1742   INTEGER :: ncomm_ice
1743   INTEGER, PUBLIC            ::   mpi_comm_oce          ! opa local communicator
1744   !!----------------------------------------------------------------------
1745CONTAINS
1746
1747   INTEGER FUNCTION lib_mpp_alloc(kumout)          ! Dummy function
1748      INTEGER, INTENT(in) ::   kumout
1749      lib_mpp_alloc = 0
1750   END FUNCTION lib_mpp_alloc
1751
1752   FUNCTION mynode( ldtxt, ldname, kumnam_ref, knumnam_cfg,  kumond , kstop, localComm ) RESULT (function_value)
1753      INTEGER, OPTIONAL            , INTENT(in   ) ::   localComm
1754      CHARACTER(len=*),DIMENSION(:) ::   ldtxt
1755      CHARACTER(len=*) ::   ldname
1756      INTEGER ::   kumnam_ref, knumnam_cfg , kumond , kstop
1757      IF( PRESENT( localComm ) ) mpi_comm_oce = localComm
1758      function_value = 0
1759      IF( .FALSE. )   ldtxt(:) = 'never done'
1760      CALL ctl_opn( kumond, TRIM(ldname), 'UNKNOWN', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. , 1 )
1761   END FUNCTION mynode
1762
1763   SUBROUTINE mppsync                       ! Dummy routine
1764   END SUBROUTINE mppsync
1765
1766   SUBROUTINE mpp_sum_as( parr, kdim, kcom )      ! Dummy routine
1767      REAL   , DIMENSION(:) :: parr
1768      INTEGER               :: kdim
1769      INTEGER, OPTIONAL     :: kcom
1770      WRITE(*,*) 'mpp_sum_as: You should not have seen this print! error?', kdim, parr(1), kcom
1771   END SUBROUTINE mpp_sum_as
1772
1773   SUBROUTINE mpp_sum_a2s( parr, kdim, kcom )      ! Dummy routine
1774      REAL   , DIMENSION(:,:) :: parr
1775      INTEGER               :: kdim
1776      INTEGER, OPTIONAL     :: kcom
1777      WRITE(*,*) 'mpp_sum_a2s: You should not have seen this print! error?', kdim, parr(1,1), kcom
1778   END SUBROUTINE mpp_sum_a2s
1779
1780   SUBROUTINE mpp_sum_ai( karr, kdim, kcom )      ! Dummy routine
1781      INTEGER, DIMENSION(:) :: karr
1782      INTEGER               :: kdim
1783      INTEGER, OPTIONAL     :: kcom
1784      WRITE(*,*) 'mpp_sum_ai: You should not have seen this print! error?', kdim, karr(1), kcom
1785   END SUBROUTINE mpp_sum_ai
1786
1787   SUBROUTINE mpp_sum_s( psca, kcom )            ! Dummy routine
1788      REAL                  :: psca
1789      INTEGER, OPTIONAL     :: kcom
1790      WRITE(*,*) 'mpp_sum_s: You should not have seen this print! error?', psca, kcom
1791   END SUBROUTINE mpp_sum_s
1792
1793   SUBROUTINE mpp_sum_i( kint, kcom )            ! Dummy routine
1794      integer               :: kint
1795      INTEGER, OPTIONAL     :: kcom
1796      WRITE(*,*) 'mpp_sum_i: You should not have seen this print! error?', kint, kcom
1797   END SUBROUTINE mpp_sum_i
1798
1799   SUBROUTINE mppsum_realdd( ytab, kcom )
1800      COMPLEX(wp), INTENT(inout)         :: ytab    ! input scalar
1801      INTEGER , INTENT( in  ), OPTIONAL :: kcom
1802      WRITE(*,*) 'mppsum_realdd: You should not have seen this print! error?', ytab
1803   END SUBROUTINE mppsum_realdd
1804
1805   SUBROUTINE mppsum_a_realdd( ytab, kdim, kcom )
1806      INTEGER , INTENT( in )                        ::   kdim      ! size of ytab
1807      COMPLEX(wp), DIMENSION(kdim), INTENT( inout ) ::   ytab      ! input array
1808      INTEGER , INTENT( in  ), OPTIONAL :: kcom
1809      WRITE(*,*) 'mppsum_a_realdd: You should not have seen this print! error?', kdim, ytab(1), kcom
1810   END SUBROUTINE mppsum_a_realdd
1811
1812   SUBROUTINE mppmax_a_real( parr, kdim, kcom )
1813      REAL   , DIMENSION(:) :: parr
1814      INTEGER               :: kdim
1815      INTEGER, OPTIONAL     :: kcom
1816      WRITE(*,*) 'mppmax_a_real: You should not have seen this print! error?', kdim, parr(1), kcom
1817   END SUBROUTINE mppmax_a_real
1818
1819   SUBROUTINE mppmax_real( psca, kcom )
1820      REAL                  :: psca
1821      INTEGER, OPTIONAL     :: kcom
1822      WRITE(*,*) 'mppmax_real: You should not have seen this print! error?', psca, kcom
1823   END SUBROUTINE mppmax_real
1824
1825   SUBROUTINE mppmin_a_real( parr, kdim, kcom )
1826      REAL   , DIMENSION(:) :: parr
1827      INTEGER               :: kdim
1828      INTEGER, OPTIONAL     :: kcom
1829      WRITE(*,*) 'mppmin_a_real: You should not have seen this print! error?', kdim, parr(1), kcom
1830   END SUBROUTINE mppmin_a_real
1831
1832   SUBROUTINE mppmin_real( psca, kcom )
1833      REAL                  :: psca
1834      INTEGER, OPTIONAL     :: kcom
1835      WRITE(*,*) 'mppmin_real: You should not have seen this print! error?', psca, kcom
1836   END SUBROUTINE mppmin_real
1837
1838   SUBROUTINE mppmax_a_int( karr, kdim ,kcom)
1839      INTEGER, DIMENSION(:) :: karr
1840      INTEGER               :: kdim
1841      INTEGER, OPTIONAL     :: kcom
1842      WRITE(*,*) 'mppmax_a_int: You should not have seen this print! error?', kdim, karr(1), kcom
1843   END SUBROUTINE mppmax_a_int
1844
1845   SUBROUTINE mppmax_int( kint, kcom)
1846      INTEGER               :: kint
1847      INTEGER, OPTIONAL     :: kcom
1848      WRITE(*,*) 'mppmax_int: You should not have seen this print! error?', kint, kcom
1849   END SUBROUTINE mppmax_int
1850
1851   SUBROUTINE mppmin_a_int( karr, kdim, kcom )
1852      INTEGER, DIMENSION(:) :: karr
1853      INTEGER               :: kdim
1854      INTEGER, OPTIONAL     :: kcom
1855      WRITE(*,*) 'mppmin_a_int: You should not have seen this print! error?', kdim, karr(1), kcom
1856   END SUBROUTINE mppmin_a_int
1857
1858   SUBROUTINE mppmin_int( kint, kcom )
1859      INTEGER               :: kint
1860      INTEGER, OPTIONAL     :: kcom
1861      WRITE(*,*) 'mppmin_int: You should not have seen this print! error?', kint, kcom
1862   END SUBROUTINE mppmin_int
1863
1864   SUBROUTINE mpp_minloc2d( ptab, pmask, pmin, ki, kj )
1865      REAL                   :: pmin
1866      REAL , DIMENSION (:,:) :: ptab, pmask
1867      INTEGER :: ki, kj
1868      WRITE(*,*) 'mpp_minloc2d: You should not have seen this print! error?', pmin, ki, kj, ptab(1,1), pmask(1,1)
1869   END SUBROUTINE mpp_minloc2d
1870
1871   SUBROUTINE mpp_minloc3d( ptab, pmask, pmin, ki, kj, kk )
1872      REAL                     :: pmin
1873      REAL , DIMENSION (:,:,:) :: ptab, pmask
1874      INTEGER :: ki, kj, kk
1875      WRITE(*,*) 'mpp_minloc3d: You should not have seen this print! error?', pmin, ki, kj, kk, ptab(1,1,1), pmask(1,1,1)
1876   END SUBROUTINE mpp_minloc3d
1877
1878   SUBROUTINE mpp_maxloc2d( ptab, pmask, pmax, ki, kj )
1879      REAL                   :: pmax
1880      REAL , DIMENSION (:,:) :: ptab, pmask
1881      INTEGER :: ki, kj
1882      WRITE(*,*) 'mpp_maxloc2d: You should not have seen this print! error?', pmax, ki, kj, ptab(1,1), pmask(1,1)
1883   END SUBROUTINE mpp_maxloc2d
1884
1885   SUBROUTINE mpp_maxloc3d( ptab, pmask, pmax, ki, kj, kk )
1886      REAL                     :: pmax
1887      REAL , DIMENSION (:,:,:) :: ptab, pmask
1888      INTEGER :: ki, kj, kk
1889      WRITE(*,*) 'mpp_maxloc3d: You should not have seen this print! error?', pmax, ki, kj, kk, ptab(1,1,1), pmask(1,1,1)
1890   END SUBROUTINE mpp_maxloc3d
1891
1892   SUBROUTINE mppstop
1893      STOP      ! non MPP case, just stop the run
1894   END SUBROUTINE mppstop
1895
1896   SUBROUTINE mpp_ini_ice( kcom, knum )
1897      INTEGER :: kcom, knum
1898      WRITE(*,*) 'mpp_ini_ice: You should not have seen this print! error?', kcom, knum
1899   END SUBROUTINE mpp_ini_ice
1900
1901   SUBROUTINE mpp_ini_znl( knum )
1902      INTEGER :: knum
1903      WRITE(*,*) 'mpp_ini_znl: You should not have seen this print! error?', knum
1904   END SUBROUTINE mpp_ini_znl
1905
1906   SUBROUTINE mpp_comm_free( kcom )
1907      INTEGER :: kcom
1908      WRITE(*,*) 'mpp_comm_free: You should not have seen this print! error?', kcom
1909   END SUBROUTINE mpp_comm_free
1910   
1911   SUBROUTINE mppmax_real_multiple( ptab, kdim , kcom  )
1912      REAL, DIMENSION(:) ::   ptab   !
1913      INTEGER            ::   kdim   !
1914      INTEGER, OPTIONAL  ::   kcom   !
1915      WRITE(*,*) 'mppmax_real_multiple: You should not have seen this print! error?', ptab(1), kdim
1916   END SUBROUTINE mppmax_real_multiple
1917
1918   SUBROUTINE simulated_lbc_lnk ()
1919      WRITE(*,*) 'simulated_lbc_lnk: You should not have seen this print! error?'
1920   END SUBROUTINE simulated_lbc_lnk ()
1921#endif
1922
1923   !!----------------------------------------------------------------------
1924   !!   All cases:         ctl_stop, ctl_warn, get_unit, ctl_opn, ctl_nam   routines
1925   !!----------------------------------------------------------------------
1926
1927   SUBROUTINE ctl_stop( cd1, cd2, cd3, cd4, cd5 ,   &
1928      &                 cd6, cd7, cd8, cd9, cd10 )
1929      !!----------------------------------------------------------------------
1930      !!                  ***  ROUTINE  stop_opa  ***
1931      !!
1932      !! ** Purpose :   print in ocean.outpput file a error message and
1933      !!                increment the error number (nstop) by one.
1934      !!----------------------------------------------------------------------
1935      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd1, cd2, cd3, cd4, cd5
1936      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd6, cd7, cd8, cd9, cd10
1937      !!----------------------------------------------------------------------
1938      !
1939      nstop = nstop + 1
1940      IF(lwp) THEN
1941         WRITE(numout,cform_err)
1942         IF( PRESENT(cd1 ) )   WRITE(numout,*) cd1
1943         IF( PRESENT(cd2 ) )   WRITE(numout,*) cd2
1944         IF( PRESENT(cd3 ) )   WRITE(numout,*) cd3
1945         IF( PRESENT(cd4 ) )   WRITE(numout,*) cd4
1946         IF( PRESENT(cd5 ) )   WRITE(numout,*) cd5
1947         IF( PRESENT(cd6 ) )   WRITE(numout,*) cd6
1948         IF( PRESENT(cd7 ) )   WRITE(numout,*) cd7
1949         IF( PRESENT(cd8 ) )   WRITE(numout,*) cd8
1950         IF( PRESENT(cd9 ) )   WRITE(numout,*) cd9
1951         IF( PRESENT(cd10) )   WRITE(numout,*) cd10
1952      ENDIF
1953                               CALL FLUSH(numout    )
1954      IF( numstp     /= -1 )   CALL FLUSH(numstp    )
1955      IF( numrun     /= -1 )   CALL FLUSH(numrun    )
1956      IF( numevo_ice /= -1 )   CALL FLUSH(numevo_ice)
1957      !
1958      IF( cd1 == 'STOP' ) THEN
1959         IF(lwp) WRITE(numout,*)  'huge E-R-R-O-R : immediate stop'
1960         CALL mppstop()
1961      ENDIF
1962      !
1963   END SUBROUTINE ctl_stop
1964
1965
1966   SUBROUTINE ctl_warn( cd1, cd2, cd3, cd4, cd5,   &
1967      &                 cd6, cd7, cd8, cd9, cd10 )
1968      !!----------------------------------------------------------------------
1969      !!                  ***  ROUTINE  stop_warn  ***
1970      !!
1971      !! ** Purpose :   print in ocean.outpput file a error message and
1972      !!                increment the warning number (nwarn) by one.
1973      !!----------------------------------------------------------------------
1974      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd1, cd2, cd3, cd4, cd5
1975      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd6, cd7, cd8, cd9, cd10
1976      !!----------------------------------------------------------------------
1977      !
1978      nwarn = nwarn + 1
1979      IF(lwp) THEN
1980         WRITE(numout,cform_war)
1981         IF( PRESENT(cd1 ) ) WRITE(numout,*) cd1
1982         IF( PRESENT(cd2 ) ) WRITE(numout,*) cd2
1983         IF( PRESENT(cd3 ) ) WRITE(numout,*) cd3
1984         IF( PRESENT(cd4 ) ) WRITE(numout,*) cd4
1985         IF( PRESENT(cd5 ) ) WRITE(numout,*) cd5
1986         IF( PRESENT(cd6 ) ) WRITE(numout,*) cd6
1987         IF( PRESENT(cd7 ) ) WRITE(numout,*) cd7
1988         IF( PRESENT(cd8 ) ) WRITE(numout,*) cd8
1989         IF( PRESENT(cd9 ) ) WRITE(numout,*) cd9
1990         IF( PRESENT(cd10) ) WRITE(numout,*) cd10
1991      ENDIF
1992      CALL FLUSH(numout)
1993      !
1994   END SUBROUTINE ctl_warn
1995
1996
1997   SUBROUTINE ctl_opn( knum, cdfile, cdstat, cdform, cdacce, klengh, kout, ldwp, karea )
1998      !!----------------------------------------------------------------------
1999      !!                  ***  ROUTINE ctl_opn  ***
2000      !!
2001      !! ** Purpose :   Open file and check if required file is available.
2002      !!
2003      !! ** Method  :   Fortan open
2004      !!----------------------------------------------------------------------
2005      INTEGER          , INTENT(  out) ::   knum      ! logical unit to open
2006      CHARACTER(len=*) , INTENT(in   ) ::   cdfile    ! file name to open
2007      CHARACTER(len=*) , INTENT(in   ) ::   cdstat    ! disposition specifier
2008      CHARACTER(len=*) , INTENT(in   ) ::   cdform    ! formatting specifier
2009      CHARACTER(len=*) , INTENT(in   ) ::   cdacce    ! access specifier
2010      INTEGER          , INTENT(in   ) ::   klengh    ! record length
2011      INTEGER          , INTENT(in   ) ::   kout      ! number of logical units for write
2012      LOGICAL          , INTENT(in   ) ::   ldwp      ! boolean term for print
2013      INTEGER, OPTIONAL, INTENT(in   ) ::   karea     ! proc number
2014      !
2015      CHARACTER(len=80) ::   clfile
2016      INTEGER           ::   iost
2017      !!----------------------------------------------------------------------
2018      !
2019      ! adapt filename
2020      ! ----------------
2021      clfile = TRIM(cdfile)
2022      IF( PRESENT( karea ) ) THEN
2023         IF( karea > 1 )   WRITE(clfile, "(a,'_',i4.4)") TRIM(clfile), karea-1
2024      ENDIF
2025#if defined key_agrif
2026      IF( .NOT. Agrif_Root() )   clfile = TRIM(Agrif_CFixed())//'_'//TRIM(clfile)
2027      knum=Agrif_Get_Unit()
2028#else
2029      knum=get_unit()
2030#endif
2031      !
2032      iost=0
2033      IF( cdacce(1:6) == 'DIRECT' )  THEN
2034         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat, RECL=klengh, ERR=100, IOSTAT=iost )
2035      ELSE
2036         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat             , ERR=100, IOSTAT=iost )
2037      ENDIF
2038      IF( iost == 0 ) THEN
2039         IF(ldwp) THEN
2040            WRITE(kout,*) '     file   : ', clfile,' open ok'
2041            WRITE(kout,*) '     unit   = ', knum
2042            WRITE(kout,*) '     status = ', cdstat
2043            WRITE(kout,*) '     form   = ', cdform
2044            WRITE(kout,*) '     access = ', cdacce
2045            WRITE(kout,*)
2046         ENDIF
2047      ENDIF
2048100   CONTINUE
2049      IF( iost /= 0 ) THEN
2050         IF(ldwp) THEN
2051            WRITE(kout,*)
2052            WRITE(kout,*) ' ===>>>> : bad opening file: ', clfile
2053            WRITE(kout,*) ' =======   ===  '
2054            WRITE(kout,*) '           unit   = ', knum
2055            WRITE(kout,*) '           status = ', cdstat
2056            WRITE(kout,*) '           form   = ', cdform
2057            WRITE(kout,*) '           access = ', cdacce
2058            WRITE(kout,*) '           iostat = ', iost
2059            WRITE(kout,*) '           we stop. verify the file '
2060            WRITE(kout,*)
2061         ELSE  !!! Force writing to make sure we get the information - at least once - in this violent STOP!!
2062            WRITE(*,*)
2063            WRITE(*,*) ' ===>>>> : bad opening file: ', clfile
2064            WRITE(*,*) ' =======   ===  '
2065            WRITE(*,*) '           unit   = ', knum
2066            WRITE(*,*) '           status = ', cdstat
2067            WRITE(*,*) '           form   = ', cdform
2068            WRITE(*,*) '           access = ', cdacce
2069            WRITE(*,*) '           iostat = ', iost
2070            WRITE(*,*) '           we stop. verify the file '
2071            WRITE(*,*)
2072         ENDIF
2073         CALL FLUSH( kout ) 
2074         STOP 'ctl_opn bad opening'
2075      ENDIF
2076      !
2077   END SUBROUTINE ctl_opn
2078
2079
2080   SUBROUTINE ctl_nam ( kios, cdnam, ldwp )
2081      !!----------------------------------------------------------------------
2082      !!                  ***  ROUTINE ctl_nam  ***
2083      !!
2084      !! ** Purpose :   Informations when error while reading a namelist
2085      !!
2086      !! ** Method  :   Fortan open
2087      !!----------------------------------------------------------------------
2088      INTEGER         , INTENT(inout) ::   kios    ! IO status after reading the namelist
2089      CHARACTER(len=*), INTENT(in   ) ::   cdnam   ! group name of namelist for which error occurs
2090      CHARACTER(len=5)                ::   clios   ! string to convert iostat in character for print
2091      LOGICAL         , INTENT(in   ) ::   ldwp    ! boolean term for print
2092      !!----------------------------------------------------------------------
2093      !
2094      WRITE (clios, '(I5.0)')   kios
2095      IF( kios < 0 ) THEN         
2096         CALL ctl_warn( 'end of record or file while reading namelist '   &
2097            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
2098      ENDIF
2099      !
2100      IF( kios > 0 ) THEN
2101         CALL ctl_stop( 'misspelled variable in namelist '   &
2102            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
2103      ENDIF
2104      kios = 0
2105      RETURN
2106      !
2107   END SUBROUTINE ctl_nam
2108
2109
2110   INTEGER FUNCTION get_unit()
2111      !!----------------------------------------------------------------------
2112      !!                  ***  FUNCTION  get_unit  ***
2113      !!
2114      !! ** Purpose :   return the index of an unused logical unit
2115      !!----------------------------------------------------------------------
2116      LOGICAL :: llopn
2117      !!----------------------------------------------------------------------
2118      !
2119      get_unit = 15   ! choose a unit that is big enough then it is not already used in NEMO
2120      llopn = .TRUE.
2121      DO WHILE( (get_unit < 998) .AND. llopn )
2122         get_unit = get_unit + 1
2123         INQUIRE( unit = get_unit, opened = llopn )
2124      END DO
2125      IF( (get_unit == 999) .AND. llopn ) THEN
2126         CALL ctl_stop( 'get_unit: All logical units until 999 are used...' )
2127         get_unit = -1
2128      ENDIF
2129      !
2130   END FUNCTION get_unit
2131
2132   !!----------------------------------------------------------------------
2133END MODULE lib_mpp
Note: See TracBrowser for help on using the repository browser.