Root/fs/ocfs2/dlmglue.c

1/* -*- mode: c; c-basic-offset: 8; -*-
2 * vim: noexpandtab sw=8 ts=8 sts=0:
3 *
4 * dlmglue.c
5 *
6 * Code which implements an OCFS2 specific interface to our DLM.
7 *
8 * Copyright (C) 2003, 2004 Oracle. All rights reserved.
9 *
10 * This program is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU General Public
12 * License as published by the Free Software Foundation; either
13 * version 2 of the License, or (at your option) any later version.
14 *
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 * General Public License for more details.
19 *
20 * You should have received a copy of the GNU General Public
21 * License along with this program; if not, write to the
22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23 * Boston, MA 021110-1307, USA.
24 */
25
26#include <linux/types.h>
27#include <linux/slab.h>
28#include <linux/highmem.h>
29#include <linux/mm.h>
30#include <linux/kthread.h>
31#include <linux/pagemap.h>
32#include <linux/debugfs.h>
33#include <linux/seq_file.h>
34#include <linux/time.h>
35#include <linux/quotaops.h>
36
37#define MLOG_MASK_PREFIX ML_DLM_GLUE
38#include <cluster/masklog.h>
39
40#include "ocfs2.h"
41#include "ocfs2_lockingver.h"
42
43#include "alloc.h"
44#include "dcache.h"
45#include "dlmglue.h"
46#include "extent_map.h"
47#include "file.h"
48#include "heartbeat.h"
49#include "inode.h"
50#include "journal.h"
51#include "stackglue.h"
52#include "slot_map.h"
53#include "super.h"
54#include "uptodate.h"
55#include "quota.h"
56#include "refcounttree.h"
57
58#include "buffer_head_io.h"
59
60struct ocfs2_mask_waiter {
61    struct list_head mw_item;
62    int mw_status;
63    struct completion mw_complete;
64    unsigned long mw_mask;
65    unsigned long mw_goal;
66#ifdef CONFIG_OCFS2_FS_STATS
67    unsigned long long mw_lock_start;
68#endif
69};
70
71static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres);
72static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres);
73static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres);
74static struct ocfs2_super *ocfs2_get_qinfo_osb(struct ocfs2_lock_res *lockres);
75
76/*
77 * Return value from ->downconvert_worker functions.
78 *
79 * These control the precise actions of ocfs2_unblock_lock()
80 * and ocfs2_process_blocked_lock()
81 *
82 */
83enum ocfs2_unblock_action {
84    UNBLOCK_CONTINUE = 0, /* Continue downconvert */
85    UNBLOCK_CONTINUE_POST = 1, /* Continue downconvert, fire
86                      * ->post_unlock callback */
87    UNBLOCK_STOP_POST = 2, /* Do not downconvert, fire
88                      * ->post_unlock() callback. */
89};
90
91struct ocfs2_unblock_ctl {
92    int requeue;
93    enum ocfs2_unblock_action unblock_action;
94};
95
96/* Lockdep class keys */
97struct lock_class_key lockdep_keys[OCFS2_NUM_LOCK_TYPES];
98
99static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres,
100                    int new_level);
101static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres);
102
103static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
104                     int blocking);
105
106static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
107                       int blocking);
108
109static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
110                     struct ocfs2_lock_res *lockres);
111
112static void ocfs2_set_qinfo_lvb(struct ocfs2_lock_res *lockres);
113
114static int ocfs2_check_refcount_downconvert(struct ocfs2_lock_res *lockres,
115                        int new_level);
116static int ocfs2_refcount_convert_worker(struct ocfs2_lock_res *lockres,
117                     int blocking);
118
119#define mlog_meta_lvb(__level, __lockres) ocfs2_dump_meta_lvb_info(__level, __PRETTY_FUNCTION__, __LINE__, __lockres)
120
121/* This aids in debugging situations where a bad LVB might be involved. */
122static void ocfs2_dump_meta_lvb_info(u64 level,
123                     const char *function,
124                     unsigned int line,
125                     struct ocfs2_lock_res *lockres)
126{
127    struct ocfs2_meta_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
128
129    mlog(level, "LVB information for %s (called from %s:%u):\n",
130         lockres->l_name, function, line);
131    mlog(level, "version: %u, clusters: %u, generation: 0x%x\n",
132         lvb->lvb_version, be32_to_cpu(lvb->lvb_iclusters),
133         be32_to_cpu(lvb->lvb_igeneration));
134    mlog(level, "size: %llu, uid %u, gid %u, mode 0x%x\n",
135         (unsigned long long)be64_to_cpu(lvb->lvb_isize),
136         be32_to_cpu(lvb->lvb_iuid), be32_to_cpu(lvb->lvb_igid),
137         be16_to_cpu(lvb->lvb_imode));
138    mlog(level, "nlink %u, atime_packed 0x%llx, ctime_packed 0x%llx, "
139         "mtime_packed 0x%llx iattr 0x%x\n", be16_to_cpu(lvb->lvb_inlink),
140         (long long)be64_to_cpu(lvb->lvb_iatime_packed),
141         (long long)be64_to_cpu(lvb->lvb_ictime_packed),
142         (long long)be64_to_cpu(lvb->lvb_imtime_packed),
143         be32_to_cpu(lvb->lvb_iattr));
144}
145
146
147/*
148 * OCFS2 Lock Resource Operations
149 *
150 * These fine tune the behavior of the generic dlmglue locking infrastructure.
151 *
152 * The most basic of lock types can point ->l_priv to their respective
153 * struct ocfs2_super and allow the default actions to manage things.
154 *
155 * Right now, each lock type also needs to implement an init function,
156 * and trivial lock/unlock wrappers. ocfs2_simple_drop_lockres()
157 * should be called when the lock is no longer needed (i.e., object
158 * destruction time).
159 */
160struct ocfs2_lock_res_ops {
161    /*
162     * Translate an ocfs2_lock_res * into an ocfs2_super *. Define
163     * this callback if ->l_priv is not an ocfs2_super pointer
164     */
165    struct ocfs2_super * (*get_osb)(struct ocfs2_lock_res *);
166
167    /*
168     * Optionally called in the downconvert thread after a
169     * successful downconvert. The lockres will not be referenced
170     * after this callback is called, so it is safe to free
171     * memory, etc.
172     *
173     * The exact semantics of when this is called are controlled
174     * by ->downconvert_worker()
175     */
176    void (*post_unlock)(struct ocfs2_super *, struct ocfs2_lock_res *);
177
178    /*
179     * Allow a lock type to add checks to determine whether it is
180     * safe to downconvert a lock. Return 0 to re-queue the
181     * downconvert at a later time, nonzero to continue.
182     *
183     * For most locks, the default checks that there are no
184     * incompatible holders are sufficient.
185     *
186     * Called with the lockres spinlock held.
187     */
188    int (*check_downconvert)(struct ocfs2_lock_res *, int);
189
190    /*
191     * Allows a lock type to populate the lock value block. This
192     * is called on downconvert, and when we drop a lock.
193     *
194     * Locks that want to use this should set LOCK_TYPE_USES_LVB
195     * in the flags field.
196     *
197     * Called with the lockres spinlock held.
198     */
199    void (*set_lvb)(struct ocfs2_lock_res *);
200
201    /*
202     * Called from the downconvert thread when it is determined
203     * that a lock will be downconverted. This is called without
204     * any locks held so the function can do work that might
205     * schedule (syncing out data, etc).
206     *
207     * This should return any one of the ocfs2_unblock_action
208     * values, depending on what it wants the thread to do.
209     */
210    int (*downconvert_worker)(struct ocfs2_lock_res *, int);
211
212    /*
213     * LOCK_TYPE_* flags which describe the specific requirements
214     * of a lock type. Descriptions of each individual flag follow.
215     */
216    int flags;
217};
218
219/*
220 * Some locks want to "refresh" potentially stale data when a
221 * meaningful (PRMODE or EXMODE) lock level is first obtained. If this
222 * flag is set, the OCFS2_LOCK_NEEDS_REFRESH flag will be set on the
223 * individual lockres l_flags member from the ast function. It is
224 * expected that the locking wrapper will clear the
225 * OCFS2_LOCK_NEEDS_REFRESH flag when done.
226 */
227#define LOCK_TYPE_REQUIRES_REFRESH 0x1
228
229/*
230 * Indicate that a lock type makes use of the lock value block. The
231 * ->set_lvb lock type callback must be defined.
232 */
233#define LOCK_TYPE_USES_LVB 0x2
234
235static struct ocfs2_lock_res_ops ocfs2_inode_rw_lops = {
236    .get_osb = ocfs2_get_inode_osb,
237    .flags = 0,
238};
239
240static struct ocfs2_lock_res_ops ocfs2_inode_inode_lops = {
241    .get_osb = ocfs2_get_inode_osb,
242    .check_downconvert = ocfs2_check_meta_downconvert,
243    .set_lvb = ocfs2_set_meta_lvb,
244    .downconvert_worker = ocfs2_data_convert_worker,
245    .flags = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB,
246};
247
248static struct ocfs2_lock_res_ops ocfs2_super_lops = {
249    .flags = LOCK_TYPE_REQUIRES_REFRESH,
250};
251
252static struct ocfs2_lock_res_ops ocfs2_rename_lops = {
253    .flags = 0,
254};
255
256static struct ocfs2_lock_res_ops ocfs2_nfs_sync_lops = {
257    .flags = 0,
258};
259
260static struct ocfs2_lock_res_ops ocfs2_orphan_scan_lops = {
261    .flags = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB,
262};
263
264static struct ocfs2_lock_res_ops ocfs2_dentry_lops = {
265    .get_osb = ocfs2_get_dentry_osb,
266    .post_unlock = ocfs2_dentry_post_unlock,
267    .downconvert_worker = ocfs2_dentry_convert_worker,
268    .flags = 0,
269};
270
271static struct ocfs2_lock_res_ops ocfs2_inode_open_lops = {
272    .get_osb = ocfs2_get_inode_osb,
273    .flags = 0,
274};
275
276static struct ocfs2_lock_res_ops ocfs2_flock_lops = {
277    .get_osb = ocfs2_get_file_osb,
278    .flags = 0,
279};
280
281static struct ocfs2_lock_res_ops ocfs2_qinfo_lops = {
282    .set_lvb = ocfs2_set_qinfo_lvb,
283    .get_osb = ocfs2_get_qinfo_osb,
284    .flags = LOCK_TYPE_REQUIRES_REFRESH | LOCK_TYPE_USES_LVB,
285};
286
287static struct ocfs2_lock_res_ops ocfs2_refcount_block_lops = {
288    .check_downconvert = ocfs2_check_refcount_downconvert,
289    .downconvert_worker = ocfs2_refcount_convert_worker,
290    .flags = 0,
291};
292
293static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres)
294{
295    return lockres->l_type == OCFS2_LOCK_TYPE_META ||
296        lockres->l_type == OCFS2_LOCK_TYPE_RW ||
297        lockres->l_type == OCFS2_LOCK_TYPE_OPEN;
298}
299
300static inline struct ocfs2_lock_res *ocfs2_lksb_to_lock_res(struct ocfs2_dlm_lksb *lksb)
301{
302    return container_of(lksb, struct ocfs2_lock_res, l_lksb);
303}
304
305static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres)
306{
307    BUG_ON(!ocfs2_is_inode_lock(lockres));
308
309    return (struct inode *) lockres->l_priv;
310}
311
312static inline struct ocfs2_dentry_lock *ocfs2_lock_res_dl(struct ocfs2_lock_res *lockres)
313{
314    BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_DENTRY);
315
316    return (struct ocfs2_dentry_lock *)lockres->l_priv;
317}
318
319static inline struct ocfs2_mem_dqinfo *ocfs2_lock_res_qinfo(struct ocfs2_lock_res *lockres)
320{
321    BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_QINFO);
322
323    return (struct ocfs2_mem_dqinfo *)lockres->l_priv;
324}
325
326static inline struct ocfs2_refcount_tree *
327ocfs2_lock_res_refcount_tree(struct ocfs2_lock_res *res)
328{
329    return container_of(res, struct ocfs2_refcount_tree, rf_lockres);
330}
331
332static inline struct ocfs2_super *ocfs2_get_lockres_osb(struct ocfs2_lock_res *lockres)
333{
334    if (lockres->l_ops->get_osb)
335        return lockres->l_ops->get_osb(lockres);
336
337    return (struct ocfs2_super *)lockres->l_priv;
338}
339
340static int ocfs2_lock_create(struct ocfs2_super *osb,
341                 struct ocfs2_lock_res *lockres,
342                 int level,
343                 u32 dlm_flags);
344static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
345                             int wanted);
346static void __ocfs2_cluster_unlock(struct ocfs2_super *osb,
347                   struct ocfs2_lock_res *lockres,
348                   int level, unsigned long caller_ip);
349static inline void ocfs2_cluster_unlock(struct ocfs2_super *osb,
350                    struct ocfs2_lock_res *lockres,
351                    int level)
352{
353    __ocfs2_cluster_unlock(osb, lockres, level, _RET_IP_);
354}
355
356static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres);
357static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres);
358static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres);
359static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, int level);
360static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
361                    struct ocfs2_lock_res *lockres);
362static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
363                        int convert);
364#define ocfs2_log_dlm_error(_func, _err, _lockres) do { \
365    if ((_lockres)->l_type != OCFS2_LOCK_TYPE_DENTRY) \
366        mlog(ML_ERROR, "DLM error %d while calling %s on resource %s\n", \
367             _err, _func, _lockres->l_name); \
368    else \
369        mlog(ML_ERROR, "DLM error %d while calling %s on resource %.*s%08x\n", \
370             _err, _func, OCFS2_DENTRY_LOCK_INO_START - 1, (_lockres)->l_name, \
371             (unsigned int)ocfs2_get_dentry_lock_ino(_lockres)); \
372} while (0)
373static int ocfs2_downconvert_thread(void *arg);
374static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb,
375                    struct ocfs2_lock_res *lockres);
376static int ocfs2_inode_lock_update(struct inode *inode,
377                  struct buffer_head **bh);
378static void ocfs2_drop_osb_locks(struct ocfs2_super *osb);
379static inline int ocfs2_highest_compat_lock_level(int level);
380static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
381                          int new_level);
382static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
383                  struct ocfs2_lock_res *lockres,
384                  int new_level,
385                  int lvb,
386                  unsigned int generation);
387static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb,
388                        struct ocfs2_lock_res *lockres);
389static int ocfs2_cancel_convert(struct ocfs2_super *osb,
390                struct ocfs2_lock_res *lockres);
391
392
393static void ocfs2_build_lock_name(enum ocfs2_lock_type type,
394                  u64 blkno,
395                  u32 generation,
396                  char *name)
397{
398    int len;
399
400    mlog_entry_void();
401
402    BUG_ON(type >= OCFS2_NUM_LOCK_TYPES);
403
404    len = snprintf(name, OCFS2_LOCK_ID_MAX_LEN, "%c%s%016llx%08x",
405               ocfs2_lock_type_char(type), OCFS2_LOCK_ID_PAD,
406               (long long)blkno, generation);
407
408    BUG_ON(len != (OCFS2_LOCK_ID_MAX_LEN - 1));
409
410    mlog(0, "built lock resource with name: %s\n", name);
411
412    mlog_exit_void();
413}
414
415static DEFINE_SPINLOCK(ocfs2_dlm_tracking_lock);
416
417static void ocfs2_add_lockres_tracking(struct ocfs2_lock_res *res,
418                       struct ocfs2_dlm_debug *dlm_debug)
419{
420    mlog(0, "Add tracking for lockres %s\n", res->l_name);
421
422    spin_lock(&ocfs2_dlm_tracking_lock);
423    list_add(&res->l_debug_list, &dlm_debug->d_lockres_tracking);
424    spin_unlock(&ocfs2_dlm_tracking_lock);
425}
426
427static void ocfs2_remove_lockres_tracking(struct ocfs2_lock_res *res)
428{
429    spin_lock(&ocfs2_dlm_tracking_lock);
430    if (!list_empty(&res->l_debug_list))
431        list_del_init(&res->l_debug_list);
432    spin_unlock(&ocfs2_dlm_tracking_lock);
433}
434
435#ifdef CONFIG_OCFS2_FS_STATS
436static void ocfs2_init_lock_stats(struct ocfs2_lock_res *res)
437{
438    res->l_lock_num_prmode = 0;
439    res->l_lock_num_prmode_failed = 0;
440    res->l_lock_total_prmode = 0;
441    res->l_lock_max_prmode = 0;
442    res->l_lock_num_exmode = 0;
443    res->l_lock_num_exmode_failed = 0;
444    res->l_lock_total_exmode = 0;
445    res->l_lock_max_exmode = 0;
446    res->l_lock_refresh = 0;
447}
448
449static void ocfs2_update_lock_stats(struct ocfs2_lock_res *res, int level,
450                    struct ocfs2_mask_waiter *mw, int ret)
451{
452    unsigned long long *num, *sum;
453    unsigned int *max, *failed;
454    struct timespec ts = current_kernel_time();
455    unsigned long long time = timespec_to_ns(&ts) - mw->mw_lock_start;
456
457    if (level == LKM_PRMODE) {
458        num = &res->l_lock_num_prmode;
459        sum = &res->l_lock_total_prmode;
460        max = &res->l_lock_max_prmode;
461        failed = &res->l_lock_num_prmode_failed;
462    } else if (level == LKM_EXMODE) {
463        num = &res->l_lock_num_exmode;
464        sum = &res->l_lock_total_exmode;
465        max = &res->l_lock_max_exmode;
466        failed = &res->l_lock_num_exmode_failed;
467    } else
468        return;
469
470    (*num)++;
471    (*sum) += time;
472    if (time > *max)
473        *max = time;
474    if (ret)
475        (*failed)++;
476}
477
478static inline void ocfs2_track_lock_refresh(struct ocfs2_lock_res *lockres)
479{
480    lockres->l_lock_refresh++;
481}
482
483static inline void ocfs2_init_start_time(struct ocfs2_mask_waiter *mw)
484{
485    struct timespec ts = current_kernel_time();
486    mw->mw_lock_start = timespec_to_ns(&ts);
487}
488#else
489static inline void ocfs2_init_lock_stats(struct ocfs2_lock_res *res)
490{
491}
492static inline void ocfs2_update_lock_stats(struct ocfs2_lock_res *res,
493               int level, struct ocfs2_mask_waiter *mw, int ret)
494{
495}
496static inline void ocfs2_track_lock_refresh(struct ocfs2_lock_res *lockres)
497{
498}
499static inline void ocfs2_init_start_time(struct ocfs2_mask_waiter *mw)
500{
501}
502#endif
503
504static void ocfs2_lock_res_init_common(struct ocfs2_super *osb,
505                       struct ocfs2_lock_res *res,
506                       enum ocfs2_lock_type type,
507                       struct ocfs2_lock_res_ops *ops,
508                       void *priv)
509{
510    res->l_type = type;
511    res->l_ops = ops;
512    res->l_priv = priv;
513
514    res->l_level = DLM_LOCK_IV;
515    res->l_requested = DLM_LOCK_IV;
516    res->l_blocking = DLM_LOCK_IV;
517    res->l_action = OCFS2_AST_INVALID;
518    res->l_unlock_action = OCFS2_UNLOCK_INVALID;
519
520    res->l_flags = OCFS2_LOCK_INITIALIZED;
521
522    ocfs2_add_lockres_tracking(res, osb->osb_dlm_debug);
523
524    ocfs2_init_lock_stats(res);
525#ifdef CONFIG_DEBUG_LOCK_ALLOC
526    if (type != OCFS2_LOCK_TYPE_OPEN)
527        lockdep_init_map(&res->l_lockdep_map, ocfs2_lock_type_strings[type],
528                 &lockdep_keys[type], 0);
529    else
530        res->l_lockdep_map.key = NULL;
531#endif
532}
533
534void ocfs2_lock_res_init_once(struct ocfs2_lock_res *res)
535{
536    /* This also clears out the lock status block */
537    memset(res, 0, sizeof(struct ocfs2_lock_res));
538    spin_lock_init(&res->l_lock);
539    init_waitqueue_head(&res->l_event);
540    INIT_LIST_HEAD(&res->l_blocked_list);
541    INIT_LIST_HEAD(&res->l_mask_waiters);
542}
543
544void ocfs2_inode_lock_res_init(struct ocfs2_lock_res *res,
545                   enum ocfs2_lock_type type,
546                   unsigned int generation,
547                   struct inode *inode)
548{
549    struct ocfs2_lock_res_ops *ops;
550
551    switch(type) {
552        case OCFS2_LOCK_TYPE_RW:
553            ops = &ocfs2_inode_rw_lops;
554            break;
555        case OCFS2_LOCK_TYPE_META:
556            ops = &ocfs2_inode_inode_lops;
557            break;
558        case OCFS2_LOCK_TYPE_OPEN:
559            ops = &ocfs2_inode_open_lops;
560            break;
561        default:
562            mlog_bug_on_msg(1, "type: %d\n", type);
563            ops = NULL; /* thanks, gcc */
564            break;
565    };
566
567    ocfs2_build_lock_name(type, OCFS2_I(inode)->ip_blkno,
568                  generation, res->l_name);
569    ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), res, type, ops, inode);
570}
571
572static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres)
573{
574    struct inode *inode = ocfs2_lock_res_inode(lockres);
575
576    return OCFS2_SB(inode->i_sb);
577}
578
579static struct ocfs2_super *ocfs2_get_qinfo_osb(struct ocfs2_lock_res *lockres)
580{
581    struct ocfs2_mem_dqinfo *info = lockres->l_priv;
582
583    return OCFS2_SB(info->dqi_gi.dqi_sb);
584}
585
586static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres)
587{
588    struct ocfs2_file_private *fp = lockres->l_priv;
589
590    return OCFS2_SB(fp->fp_file->f_mapping->host->i_sb);
591}
592
593static __u64 ocfs2_get_dentry_lock_ino(struct ocfs2_lock_res *lockres)
594{
595    __be64 inode_blkno_be;
596
597    memcpy(&inode_blkno_be, &lockres->l_name[OCFS2_DENTRY_LOCK_INO_START],
598           sizeof(__be64));
599
600    return be64_to_cpu(inode_blkno_be);
601}
602
603static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres)
604{
605    struct ocfs2_dentry_lock *dl = lockres->l_priv;
606
607    return OCFS2_SB(dl->dl_inode->i_sb);
608}
609
610void ocfs2_dentry_lock_res_init(struct ocfs2_dentry_lock *dl,
611                u64 parent, struct inode *inode)
612{
613    int len;
614    u64 inode_blkno = OCFS2_I(inode)->ip_blkno;
615    __be64 inode_blkno_be = cpu_to_be64(inode_blkno);
616    struct ocfs2_lock_res *lockres = &dl->dl_lockres;
617
618    ocfs2_lock_res_init_once(lockres);
619
620    /*
621     * Unfortunately, the standard lock naming scheme won't work
622     * here because we have two 16 byte values to use. Instead,
623     * we'll stuff the inode number as a binary value. We still
624     * want error prints to show something without garbling the
625     * display, so drop a null byte in there before the inode
626     * number. A future version of OCFS2 will likely use all
627     * binary lock names. The stringified names have been a
628     * tremendous aid in debugging, but now that the debugfs
629     * interface exists, we can mangle things there if need be.
630     *
631     * NOTE: We also drop the standard "pad" value (the total lock
632     * name size stays the same though - the last part is all
633     * zeros due to the memset in ocfs2_lock_res_init_once()
634     */
635    len = snprintf(lockres->l_name, OCFS2_DENTRY_LOCK_INO_START,
636               "%c%016llx",
637               ocfs2_lock_type_char(OCFS2_LOCK_TYPE_DENTRY),
638               (long long)parent);
639
640    BUG_ON(len != (OCFS2_DENTRY_LOCK_INO_START - 1));
641
642    memcpy(&lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], &inode_blkno_be,
643           sizeof(__be64));
644
645    ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres,
646                   OCFS2_LOCK_TYPE_DENTRY, &ocfs2_dentry_lops,
647                   dl);
648}
649
650static void ocfs2_super_lock_res_init(struct ocfs2_lock_res *res,
651                      struct ocfs2_super *osb)
652{
653    /* Superblock lockres doesn't come from a slab so we call init
654     * once on it manually. */
655    ocfs2_lock_res_init_once(res);
656    ocfs2_build_lock_name(OCFS2_LOCK_TYPE_SUPER, OCFS2_SUPER_BLOCK_BLKNO,
657                  0, res->l_name);
658    ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_SUPER,
659                   &ocfs2_super_lops, osb);
660}
661
662static void ocfs2_rename_lock_res_init(struct ocfs2_lock_res *res,
663                       struct ocfs2_super *osb)
664{
665    /* Rename lockres doesn't come from a slab so we call init
666     * once on it manually. */
667    ocfs2_lock_res_init_once(res);
668    ocfs2_build_lock_name(OCFS2_LOCK_TYPE_RENAME, 0, 0, res->l_name);
669    ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_RENAME,
670                   &ocfs2_rename_lops, osb);
671}
672
673static void ocfs2_nfs_sync_lock_res_init(struct ocfs2_lock_res *res,
674                     struct ocfs2_super *osb)
675{
676    /* nfs_sync lockres doesn't come from a slab so we call init
677     * once on it manually. */
678    ocfs2_lock_res_init_once(res);
679    ocfs2_build_lock_name(OCFS2_LOCK_TYPE_NFS_SYNC, 0, 0, res->l_name);
680    ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_NFS_SYNC,
681                   &ocfs2_nfs_sync_lops, osb);
682}
683
684static void ocfs2_orphan_scan_lock_res_init(struct ocfs2_lock_res *res,
685                        struct ocfs2_super *osb)
686{
687    ocfs2_lock_res_init_once(res);
688    ocfs2_build_lock_name(OCFS2_LOCK_TYPE_ORPHAN_SCAN, 0, 0, res->l_name);
689    ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_ORPHAN_SCAN,
690                   &ocfs2_orphan_scan_lops, osb);
691}
692
693void ocfs2_file_lock_res_init(struct ocfs2_lock_res *lockres,
694                  struct ocfs2_file_private *fp)
695{
696    struct inode *inode = fp->fp_file->f_mapping->host;
697    struct ocfs2_inode_info *oi = OCFS2_I(inode);
698
699    ocfs2_lock_res_init_once(lockres);
700    ocfs2_build_lock_name(OCFS2_LOCK_TYPE_FLOCK, oi->ip_blkno,
701                  inode->i_generation, lockres->l_name);
702    ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres,
703                   OCFS2_LOCK_TYPE_FLOCK, &ocfs2_flock_lops,
704                   fp);
705    lockres->l_flags |= OCFS2_LOCK_NOCACHE;
706}
707
708void ocfs2_qinfo_lock_res_init(struct ocfs2_lock_res *lockres,
709                   struct ocfs2_mem_dqinfo *info)
710{
711    ocfs2_lock_res_init_once(lockres);
712    ocfs2_build_lock_name(OCFS2_LOCK_TYPE_QINFO, info->dqi_gi.dqi_type,
713                  0, lockres->l_name);
714    ocfs2_lock_res_init_common(OCFS2_SB(info->dqi_gi.dqi_sb), lockres,
715                   OCFS2_LOCK_TYPE_QINFO, &ocfs2_qinfo_lops,
716                   info);
717}
718
719void ocfs2_refcount_lock_res_init(struct ocfs2_lock_res *lockres,
720                  struct ocfs2_super *osb, u64 ref_blkno,
721                  unsigned int generation)
722{
723    ocfs2_lock_res_init_once(lockres);
724    ocfs2_build_lock_name(OCFS2_LOCK_TYPE_REFCOUNT, ref_blkno,
725                  generation, lockres->l_name);
726    ocfs2_lock_res_init_common(osb, lockres, OCFS2_LOCK_TYPE_REFCOUNT,
727                   &ocfs2_refcount_block_lops, osb);
728}
729
730void ocfs2_lock_res_free(struct ocfs2_lock_res *res)
731{
732    mlog_entry_void();
733
734    if (!(res->l_flags & OCFS2_LOCK_INITIALIZED))
735        return;
736
737    ocfs2_remove_lockres_tracking(res);
738
739    mlog_bug_on_msg(!list_empty(&res->l_blocked_list),
740            "Lockres %s is on the blocked list\n",
741            res->l_name);
742    mlog_bug_on_msg(!list_empty(&res->l_mask_waiters),
743            "Lockres %s has mask waiters pending\n",
744            res->l_name);
745    mlog_bug_on_msg(spin_is_locked(&res->l_lock),
746            "Lockres %s is locked\n",
747            res->l_name);
748    mlog_bug_on_msg(res->l_ro_holders,
749            "Lockres %s has %u ro holders\n",
750            res->l_name, res->l_ro_holders);
751    mlog_bug_on_msg(res->l_ex_holders,
752            "Lockres %s has %u ex holders\n",
753            res->l_name, res->l_ex_holders);
754
755    /* Need to clear out the lock status block for the dlm */
756    memset(&res->l_lksb, 0, sizeof(res->l_lksb));
757
758    res->l_flags = 0UL;
759    mlog_exit_void();
760}
761
762static inline void ocfs2_inc_holders(struct ocfs2_lock_res *lockres,
763                     int level)
764{
765    mlog_entry_void();
766
767    BUG_ON(!lockres);
768
769    switch(level) {
770    case DLM_LOCK_EX:
771        lockres->l_ex_holders++;
772        break;
773    case DLM_LOCK_PR:
774        lockres->l_ro_holders++;
775        break;
776    default:
777        BUG();
778    }
779
780    mlog_exit_void();
781}
782
783static inline void ocfs2_dec_holders(struct ocfs2_lock_res *lockres,
784                     int level)
785{
786    mlog_entry_void();
787
788    BUG_ON(!lockres);
789
790    switch(level) {
791    case DLM_LOCK_EX:
792        BUG_ON(!lockres->l_ex_holders);
793        lockres->l_ex_holders--;
794        break;
795    case DLM_LOCK_PR:
796        BUG_ON(!lockres->l_ro_holders);
797        lockres->l_ro_holders--;
798        break;
799    default:
800        BUG();
801    }
802    mlog_exit_void();
803}
804
805/* WARNING: This function lives in a world where the only three lock
806 * levels are EX, PR, and NL. It *will* have to be adjusted when more
807 * lock types are added. */
808static inline int ocfs2_highest_compat_lock_level(int level)
809{
810    int new_level = DLM_LOCK_EX;
811
812    if (level == DLM_LOCK_EX)
813        new_level = DLM_LOCK_NL;
814    else if (level == DLM_LOCK_PR)
815        new_level = DLM_LOCK_PR;
816    return new_level;
817}
818
819static void lockres_set_flags(struct ocfs2_lock_res *lockres,
820                  unsigned long newflags)
821{
822    struct ocfs2_mask_waiter *mw, *tmp;
823
824     assert_spin_locked(&lockres->l_lock);
825
826    lockres->l_flags = newflags;
827
828    list_for_each_entry_safe(mw, tmp, &lockres->l_mask_waiters, mw_item) {
829        if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal)
830            continue;
831
832        list_del_init(&mw->mw_item);
833        mw->mw_status = 0;
834        complete(&mw->mw_complete);
835    }
836}
837static void lockres_or_flags(struct ocfs2_lock_res *lockres, unsigned long or)
838{
839    lockres_set_flags(lockres, lockres->l_flags | or);
840}
841static void lockres_clear_flags(struct ocfs2_lock_res *lockres,
842                unsigned long clear)
843{
844    lockres_set_flags(lockres, lockres->l_flags & ~clear);
845}
846
847static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres)
848{
849    mlog_entry_void();
850
851    BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
852    BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
853    BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
854    BUG_ON(lockres->l_blocking <= DLM_LOCK_NL);
855
856    lockres->l_level = lockres->l_requested;
857    if (lockres->l_level <=
858        ocfs2_highest_compat_lock_level(lockres->l_blocking)) {
859        lockres->l_blocking = DLM_LOCK_NL;
860        lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED);
861    }
862    lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
863
864    mlog_exit_void();
865}
866
867static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres)
868{
869    mlog_entry_void();
870
871    BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
872    BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
873
874    /* Convert from RO to EX doesn't really need anything as our
875     * information is already up to data. Convert from NL to
876     * *anything* however should mark ourselves as needing an
877     * update */
878    if (lockres->l_level == DLM_LOCK_NL &&
879        lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
880        lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
881
882    lockres->l_level = lockres->l_requested;
883
884    /*
885     * We set the OCFS2_LOCK_UPCONVERT_FINISHING flag before clearing
886     * the OCFS2_LOCK_BUSY flag to prevent the dc thread from
887     * downconverting the lock before the upconvert has fully completed.
888     */
889    lockres_or_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING);
890
891    lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
892
893    mlog_exit_void();
894}
895
896static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres)
897{
898    mlog_entry_void();
899
900    BUG_ON((!(lockres->l_flags & OCFS2_LOCK_BUSY)));
901    BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
902
903    if (lockres->l_requested > DLM_LOCK_NL &&
904        !(lockres->l_flags & OCFS2_LOCK_LOCAL) &&
905        lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
906        lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
907
908    lockres->l_level = lockres->l_requested;
909    lockres_or_flags(lockres, OCFS2_LOCK_ATTACHED);
910    lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
911
912    mlog_exit_void();
913}
914
915static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres,
916                     int level)
917{
918    int needs_downconvert = 0;
919    mlog_entry_void();
920
921    assert_spin_locked(&lockres->l_lock);
922
923    if (level > lockres->l_blocking) {
924        /* only schedule a downconvert if we haven't already scheduled
925         * one that goes low enough to satisfy the level we're
926         * blocking. this also catches the case where we get
927         * duplicate BASTs */
928        if (ocfs2_highest_compat_lock_level(level) <
929            ocfs2_highest_compat_lock_level(lockres->l_blocking))
930            needs_downconvert = 1;
931
932        lockres->l_blocking = level;
933    }
934
935    mlog(ML_BASTS, "lockres %s, block %d, level %d, l_block %d, dwn %d\n",
936         lockres->l_name, level, lockres->l_level, lockres->l_blocking,
937         needs_downconvert);
938
939    if (needs_downconvert)
940        lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED);
941
942    mlog_exit(needs_downconvert);
943    return needs_downconvert;
944}
945
946/*
947 * OCFS2_LOCK_PENDING and l_pending_gen.
948 *
949 * Why does OCFS2_LOCK_PENDING exist? To close a race between setting
950 * OCFS2_LOCK_BUSY and calling ocfs2_dlm_lock(). See ocfs2_unblock_lock()
951 * for more details on the race.
952 *
953 * OCFS2_LOCK_PENDING closes the race quite nicely. However, it introduces
954 * a race on itself. In o2dlm, we can get the ast before ocfs2_dlm_lock()
955 * returns. The ast clears OCFS2_LOCK_BUSY, and must therefore clear
956 * OCFS2_LOCK_PENDING at the same time. When ocfs2_dlm_lock() returns,
957 * the caller is going to try to clear PENDING again. If nothing else is
958 * happening, __lockres_clear_pending() sees PENDING is unset and does
959 * nothing.
960 *
961 * But what if another path (eg downconvert thread) has just started a
962 * new locking action? The other path has re-set PENDING. Our path
963 * cannot clear PENDING, because that will re-open the original race
964 * window.
965 *
966 * [Example]
967 *
968 * ocfs2_meta_lock()
969 * ocfs2_cluster_lock()
970 * set BUSY
971 * set PENDING
972 * drop l_lock
973 * ocfs2_dlm_lock()
974 * ocfs2_locking_ast() ocfs2_downconvert_thread()
975 * clear PENDING ocfs2_unblock_lock()
976 * take_l_lock
977 * !BUSY
978 * ocfs2_prepare_downconvert()
979 * set BUSY
980 * set PENDING
981 * drop l_lock
982 * take l_lock
983 * clear PENDING
984 * drop l_lock
985 * <window>
986 * ocfs2_dlm_lock()
987 *
988 * So as you can see, we now have a window where l_lock is not held,
989 * PENDING is not set, and ocfs2_dlm_lock() has not been called.
990 *
991 * The core problem is that ocfs2_cluster_lock() has cleared the PENDING
992 * set by ocfs2_prepare_downconvert(). That wasn't nice.
993 *
994 * To solve this we introduce l_pending_gen. A call to
995 * lockres_clear_pending() will only do so when it is passed a generation
996 * number that matches the lockres. lockres_set_pending() will return the
997 * current generation number. When ocfs2_cluster_lock() goes to clear
998 * PENDING, it passes the generation it got from set_pending(). In our
999 * example above, the generation numbers will *not* match. Thus,
1000 * ocfs2_cluster_lock() will not clear the PENDING set by
1001 * ocfs2_prepare_downconvert().
1002 */
1003
1004/* Unlocked version for ocfs2_locking_ast() */
1005static void __lockres_clear_pending(struct ocfs2_lock_res *lockres,
1006                    unsigned int generation,
1007                    struct ocfs2_super *osb)
1008{
1009    assert_spin_locked(&lockres->l_lock);
1010
1011    /*
1012     * The ast and locking functions can race us here. The winner
1013     * will clear pending, the loser will not.
1014     */
1015    if (!(lockres->l_flags & OCFS2_LOCK_PENDING) ||
1016        (lockres->l_pending_gen != generation))
1017        return;
1018
1019    lockres_clear_flags(lockres, OCFS2_LOCK_PENDING);
1020    lockres->l_pending_gen++;
1021
1022    /*
1023     * The downconvert thread may have skipped us because we
1024     * were PENDING. Wake it up.
1025     */
1026    if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
1027        ocfs2_wake_downconvert_thread(osb);
1028}
1029
1030/* Locked version for callers of ocfs2_dlm_lock() */
1031static void lockres_clear_pending(struct ocfs2_lock_res *lockres,
1032                  unsigned int generation,
1033                  struct ocfs2_super *osb)
1034{
1035    unsigned long flags;
1036
1037    spin_lock_irqsave(&lockres->l_lock, flags);
1038    __lockres_clear_pending(lockres, generation, osb);
1039    spin_unlock_irqrestore(&lockres->l_lock, flags);
1040}
1041
1042static unsigned int lockres_set_pending(struct ocfs2_lock_res *lockres)
1043{
1044    assert_spin_locked(&lockres->l_lock);
1045    BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
1046
1047    lockres_or_flags(lockres, OCFS2_LOCK_PENDING);
1048
1049    return lockres->l_pending_gen;
1050}
1051
1052static void ocfs2_blocking_ast(struct ocfs2_dlm_lksb *lksb, int level)
1053{
1054    struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb);
1055    struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
1056    int needs_downconvert;
1057    unsigned long flags;
1058
1059    BUG_ON(level <= DLM_LOCK_NL);
1060
1061    mlog(ML_BASTS, "BAST fired for lockres %s, blocking %d, level %d, "
1062         "type %s\n", lockres->l_name, level, lockres->l_level,
1063         ocfs2_lock_type_string(lockres->l_type));
1064
1065    /*
1066     * We can skip the bast for locks which don't enable caching -
1067     * they'll be dropped at the earliest possible time anyway.
1068     */
1069    if (lockres->l_flags & OCFS2_LOCK_NOCACHE)
1070        return;
1071
1072    spin_lock_irqsave(&lockres->l_lock, flags);
1073    needs_downconvert = ocfs2_generic_handle_bast(lockres, level);
1074    if (needs_downconvert)
1075        ocfs2_schedule_blocked_lock(osb, lockres);
1076    spin_unlock_irqrestore(&lockres->l_lock, flags);
1077
1078    wake_up(&lockres->l_event);
1079
1080    ocfs2_wake_downconvert_thread(osb);
1081}
1082
1083static void ocfs2_locking_ast(struct ocfs2_dlm_lksb *lksb)
1084{
1085    struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb);
1086    struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
1087    unsigned long flags;
1088    int status;
1089
1090    spin_lock_irqsave(&lockres->l_lock, flags);
1091
1092    status = ocfs2_dlm_lock_status(&lockres->l_lksb);
1093
1094    if (status == -EAGAIN) {
1095        lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
1096        goto out;
1097    }
1098
1099    if (status) {
1100        mlog(ML_ERROR, "lockres %s: lksb status value of %d!\n",
1101             lockres->l_name, status);
1102        spin_unlock_irqrestore(&lockres->l_lock, flags);
1103        return;
1104    }
1105
1106    mlog(ML_BASTS, "AST fired for lockres %s, action %d, unlock %d, "
1107         "level %d => %d\n", lockres->l_name, lockres->l_action,
1108         lockres->l_unlock_action, lockres->l_level, lockres->l_requested);
1109
1110    switch(lockres->l_action) {
1111    case OCFS2_AST_ATTACH:
1112        ocfs2_generic_handle_attach_action(lockres);
1113        lockres_clear_flags(lockres, OCFS2_LOCK_LOCAL);
1114        break;
1115    case OCFS2_AST_CONVERT:
1116        ocfs2_generic_handle_convert_action(lockres);
1117        break;
1118    case OCFS2_AST_DOWNCONVERT:
1119        ocfs2_generic_handle_downconvert_action(lockres);
1120        break;
1121    default:
1122        mlog(ML_ERROR, "lockres %s: AST fired with invalid action: %u, "
1123             "flags 0x%lx, unlock: %u\n",
1124             lockres->l_name, lockres->l_action, lockres->l_flags,
1125             lockres->l_unlock_action);
1126        BUG();
1127    }
1128out:
1129    /* set it to something invalid so if we get called again we
1130     * can catch it. */
1131    lockres->l_action = OCFS2_AST_INVALID;
1132
1133    /* Did we try to cancel this lock? Clear that state */
1134    if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT)
1135        lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
1136
1137    /*
1138     * We may have beaten the locking functions here. We certainly
1139     * know that dlm_lock() has been called :-)
1140     * Because we can't have two lock calls in flight at once, we
1141     * can use lockres->l_pending_gen.
1142     */
1143    __lockres_clear_pending(lockres, lockres->l_pending_gen, osb);
1144
1145    wake_up(&lockres->l_event);
1146    spin_unlock_irqrestore(&lockres->l_lock, flags);
1147}
1148
1149static void ocfs2_unlock_ast(struct ocfs2_dlm_lksb *lksb, int error)
1150{
1151    struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb);
1152    unsigned long flags;
1153
1154    mlog_entry_void();
1155
1156    mlog(ML_BASTS, "UNLOCK AST fired for lockres %s, action = %d\n",
1157         lockres->l_name, lockres->l_unlock_action);
1158
1159    spin_lock_irqsave(&lockres->l_lock, flags);
1160    if (error) {
1161        mlog(ML_ERROR, "Dlm passes error %d for lock %s, "
1162             "unlock_action %d\n", error, lockres->l_name,
1163             lockres->l_unlock_action);
1164        spin_unlock_irqrestore(&lockres->l_lock, flags);
1165        mlog_exit_void();
1166        return;
1167    }
1168
1169    switch(lockres->l_unlock_action) {
1170    case OCFS2_UNLOCK_CANCEL_CONVERT:
1171        mlog(0, "Cancel convert success for %s\n", lockres->l_name);
1172        lockres->l_action = OCFS2_AST_INVALID;
1173        /* Downconvert thread may have requeued this lock, we
1174         * need to wake it. */
1175        if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
1176            ocfs2_wake_downconvert_thread(ocfs2_get_lockres_osb(lockres));
1177        break;
1178    case OCFS2_UNLOCK_DROP_LOCK:
1179        lockres->l_level = DLM_LOCK_IV;
1180        break;
1181    default:
1182        BUG();
1183    }
1184
1185    lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
1186    lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
1187    wake_up(&lockres->l_event);
1188    spin_unlock_irqrestore(&lockres->l_lock, flags);
1189
1190    mlog_exit_void();
1191}
1192
1193/*
1194 * This is the filesystem locking protocol. It provides the lock handling
1195 * hooks for the underlying DLM. It has a maximum version number.
1196 * The version number allows interoperability with systems running at
1197 * the same major number and an equal or smaller minor number.
1198 *
1199 * Whenever the filesystem does new things with locks (adds or removes a
1200 * lock, orders them differently, does different things underneath a lock),
1201 * the version must be changed. The protocol is negotiated when joining
1202 * the dlm domain. A node may join the domain if its major version is
1203 * identical to all other nodes and its minor version is greater than
1204 * or equal to all other nodes. When its minor version is greater than
1205 * the other nodes, it will run at the minor version specified by the
1206 * other nodes.
1207 *
1208 * If a locking change is made that will not be compatible with older
1209 * versions, the major number must be increased and the minor version set
1210 * to zero. If a change merely adds a behavior that can be disabled when
1211 * speaking to older versions, the minor version must be increased. If a
1212 * change adds a fully backwards compatible change (eg, LVB changes that
1213 * are just ignored by older versions), the version does not need to be
1214 * updated.
1215 */
1216static struct ocfs2_locking_protocol lproto = {
1217    .lp_max_version = {
1218        .pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR,
1219        .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR,
1220    },
1221    .lp_lock_ast = ocfs2_locking_ast,
1222    .lp_blocking_ast = ocfs2_blocking_ast,
1223    .lp_unlock_ast = ocfs2_unlock_ast,
1224};
1225
1226void ocfs2_set_locking_protocol(void)
1227{
1228    ocfs2_stack_glue_set_max_proto_version(&lproto.lp_max_version);
1229}
1230
1231static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
1232                        int convert)
1233{
1234    unsigned long flags;
1235
1236    mlog_entry_void();
1237    spin_lock_irqsave(&lockres->l_lock, flags);
1238    lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
1239    lockres_clear_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING);
1240    if (convert)
1241        lockres->l_action = OCFS2_AST_INVALID;
1242    else
1243        lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
1244    spin_unlock_irqrestore(&lockres->l_lock, flags);
1245
1246    wake_up(&lockres->l_event);
1247    mlog_exit_void();
1248}
1249
1250/* Note: If we detect another process working on the lock (i.e.,
1251 * OCFS2_LOCK_BUSY), we'll bail out returning 0. It's up to the caller
1252 * to do the right thing in that case.
1253 */
1254static int ocfs2_lock_create(struct ocfs2_super *osb,
1255                 struct ocfs2_lock_res *lockres,
1256                 int level,
1257                 u32 dlm_flags)
1258{
1259    int ret = 0;
1260    unsigned long flags;
1261    unsigned int gen;
1262
1263    mlog_entry_void();
1264
1265    mlog(0, "lock %s, level = %d, flags = %u\n", lockres->l_name, level,
1266         dlm_flags);
1267
1268    spin_lock_irqsave(&lockres->l_lock, flags);
1269    if ((lockres->l_flags & OCFS2_LOCK_ATTACHED) ||
1270        (lockres->l_flags & OCFS2_LOCK_BUSY)) {
1271        spin_unlock_irqrestore(&lockres->l_lock, flags);
1272        goto bail;
1273    }
1274
1275    lockres->l_action = OCFS2_AST_ATTACH;
1276    lockres->l_requested = level;
1277    lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
1278    gen = lockres_set_pending(lockres);
1279    spin_unlock_irqrestore(&lockres->l_lock, flags);
1280
1281    ret = ocfs2_dlm_lock(osb->cconn,
1282                 level,
1283                 &lockres->l_lksb,
1284                 dlm_flags,
1285                 lockres->l_name,
1286                 OCFS2_LOCK_ID_MAX_LEN - 1);
1287    lockres_clear_pending(lockres, gen, osb);
1288    if (ret) {
1289        ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
1290        ocfs2_recover_from_dlm_error(lockres, 1);
1291    }
1292
1293    mlog(0, "lock %s, return from ocfs2_dlm_lock\n", lockres->l_name);
1294
1295bail:
1296    mlog_exit(ret);
1297    return ret;
1298}
1299
1300static inline int ocfs2_check_wait_flag(struct ocfs2_lock_res *lockres,
1301                    int flag)
1302{
1303    unsigned long flags;
1304    int ret;
1305
1306    spin_lock_irqsave(&lockres->l_lock, flags);
1307    ret = lockres->l_flags & flag;
1308    spin_unlock_irqrestore(&lockres->l_lock, flags);
1309
1310    return ret;
1311}
1312
1313static inline void ocfs2_wait_on_busy_lock(struct ocfs2_lock_res *lockres)
1314
1315{
1316    wait_event(lockres->l_event,
1317           !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_BUSY));
1318}
1319
1320static inline void ocfs2_wait_on_refreshing_lock(struct ocfs2_lock_res *lockres)
1321
1322{
1323    wait_event(lockres->l_event,
1324           !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_REFRESHING));
1325}
1326
1327/* predict what lock level we'll be dropping down to on behalf
1328 * of another node, and return true if the currently wanted
1329 * level will be compatible with it. */
1330static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
1331                             int wanted)
1332{
1333    BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
1334
1335    return wanted <= ocfs2_highest_compat_lock_level(lockres->l_blocking);
1336}
1337
1338static void ocfs2_init_mask_waiter(struct ocfs2_mask_waiter *mw)
1339{
1340    INIT_LIST_HEAD(&mw->mw_item);
1341    init_completion(&mw->mw_complete);
1342    ocfs2_init_start_time(mw);
1343}
1344
1345static int ocfs2_wait_for_mask(struct ocfs2_mask_waiter *mw)
1346{
1347    wait_for_completion(&mw->mw_complete);
1348    /* Re-arm the completion in case we want to wait on it again */
1349    INIT_COMPLETION(mw->mw_complete);
1350    return mw->mw_status;
1351}
1352
1353static void lockres_add_mask_waiter(struct ocfs2_lock_res *lockres,
1354                    struct ocfs2_mask_waiter *mw,
1355                    unsigned long mask,
1356                    unsigned long goal)
1357{
1358    BUG_ON(!list_empty(&mw->mw_item));
1359
1360    assert_spin_locked(&lockres->l_lock);
1361
1362    list_add_tail(&mw->mw_item, &lockres->l_mask_waiters);
1363    mw->mw_mask = mask;
1364    mw->mw_goal = goal;
1365}
1366
1367/* returns 0 if the mw that was removed was already satisfied, -EBUSY
1368 * if the mask still hadn't reached its goal */
1369static int lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres,
1370                      struct ocfs2_mask_waiter *mw)
1371{
1372    unsigned long flags;
1373    int ret = 0;
1374
1375    spin_lock_irqsave(&lockres->l_lock, flags);
1376    if (!list_empty(&mw->mw_item)) {
1377        if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal)
1378            ret = -EBUSY;
1379
1380        list_del_init(&mw->mw_item);
1381        init_completion(&mw->mw_complete);
1382    }
1383    spin_unlock_irqrestore(&lockres->l_lock, flags);
1384
1385    return ret;
1386
1387}
1388
1389static int ocfs2_wait_for_mask_interruptible(struct ocfs2_mask_waiter *mw,
1390                         struct ocfs2_lock_res *lockres)
1391{
1392    int ret;
1393
1394    ret = wait_for_completion_interruptible(&mw->mw_complete);
1395    if (ret)
1396        lockres_remove_mask_waiter(lockres, mw);
1397    else
1398        ret = mw->mw_status;
1399    /* Re-arm the completion in case we want to wait on it again */
1400    INIT_COMPLETION(mw->mw_complete);
1401    return ret;
1402}
1403
1404static int __ocfs2_cluster_lock(struct ocfs2_super *osb,
1405                struct ocfs2_lock_res *lockres,
1406                int level,
1407                u32 lkm_flags,
1408                int arg_flags,
1409                int l_subclass,
1410                unsigned long caller_ip)
1411{
1412    struct ocfs2_mask_waiter mw;
1413    int wait, catch_signals = !(osb->s_mount_opt & OCFS2_MOUNT_NOINTR);
1414    int ret = 0; /* gcc doesn't realize wait = 1 guarantees ret is set */
1415    unsigned long flags;
1416    unsigned int gen;
1417    int noqueue_attempted = 0;
1418
1419    mlog_entry_void();
1420
1421    ocfs2_init_mask_waiter(&mw);
1422
1423    if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
1424        lkm_flags |= DLM_LKF_VALBLK;
1425
1426again:
1427    wait = 0;
1428
1429    spin_lock_irqsave(&lockres->l_lock, flags);
1430
1431    if (catch_signals && signal_pending(current)) {
1432        ret = -ERESTARTSYS;
1433        goto unlock;
1434    }
1435
1436    mlog_bug_on_msg(lockres->l_flags & OCFS2_LOCK_FREEING,
1437            "Cluster lock called on freeing lockres %s! flags "
1438            "0x%lx\n", lockres->l_name, lockres->l_flags);
1439
1440    /* We only compare against the currently granted level
1441     * here. If the lock is blocked waiting on a downconvert,
1442     * we'll get caught below. */
1443    if (lockres->l_flags & OCFS2_LOCK_BUSY &&
1444        level > lockres->l_level) {
1445        /* is someone sitting in dlm_lock? If so, wait on
1446         * them. */
1447        lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1448        wait = 1;
1449        goto unlock;
1450    }
1451
1452    if (lockres->l_flags & OCFS2_LOCK_UPCONVERT_FINISHING) {
1453        /*
1454         * We've upconverted. If the lock now has a level we can
1455         * work with, we take it. If, however, the lock is not at the
1456         * required level, we go thru the full cycle. One way this could
1457         * happen is if a process requesting an upconvert to PR is
1458         * closely followed by another requesting upconvert to an EX.
1459         * If the process requesting EX lands here, we want it to
1460         * continue attempting to upconvert and let the process
1461         * requesting PR take the lock.
1462         * If multiple processes request upconvert to PR, the first one
1463         * here will take the lock. The others will have to go thru the
1464         * OCFS2_LOCK_BLOCKED check to ensure that there is no pending
1465         * downconvert request.
1466         */
1467        if (level <= lockres->l_level)
1468            goto update_holders;
1469    }
1470
1471    if (lockres->l_flags & OCFS2_LOCK_BLOCKED &&
1472        !ocfs2_may_continue_on_blocked_lock(lockres, level)) {
1473        /* is the lock is currently blocked on behalf of
1474         * another node */
1475        lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BLOCKED, 0);
1476        wait = 1;
1477        goto unlock;
1478    }
1479
1480    if (level > lockres->l_level) {
1481        if (noqueue_attempted > 0) {
1482            ret = -EAGAIN;
1483            goto unlock;
1484        }
1485        if (lkm_flags & DLM_LKF_NOQUEUE)
1486            noqueue_attempted = 1;
1487
1488        if (lockres->l_action != OCFS2_AST_INVALID)
1489            mlog(ML_ERROR, "lockres %s has action %u pending\n",
1490                 lockres->l_name, lockres->l_action);
1491
1492        if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
1493            lockres->l_action = OCFS2_AST_ATTACH;
1494            lkm_flags &= ~DLM_LKF_CONVERT;
1495        } else {
1496            lockres->l_action = OCFS2_AST_CONVERT;
1497            lkm_flags |= DLM_LKF_CONVERT;
1498        }
1499
1500        lockres->l_requested = level;
1501        lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
1502        gen = lockres_set_pending(lockres);
1503        spin_unlock_irqrestore(&lockres->l_lock, flags);
1504
1505        BUG_ON(level == DLM_LOCK_IV);
1506        BUG_ON(level == DLM_LOCK_NL);
1507
1508        mlog(ML_BASTS, "lockres %s, convert from %d to %d\n",
1509             lockres->l_name, lockres->l_level, level);
1510
1511        /* call dlm_lock to upgrade lock now */
1512        ret = ocfs2_dlm_lock(osb->cconn,
1513                     level,
1514                     &lockres->l_lksb,
1515                     lkm_flags,
1516                     lockres->l_name,
1517                     OCFS2_LOCK_ID_MAX_LEN - 1);
1518        lockres_clear_pending(lockres, gen, osb);
1519        if (ret) {
1520            if (!(lkm_flags & DLM_LKF_NOQUEUE) ||
1521                (ret != -EAGAIN)) {
1522                ocfs2_log_dlm_error("ocfs2_dlm_lock",
1523                            ret, lockres);
1524            }
1525            ocfs2_recover_from_dlm_error(lockres, 1);
1526            goto out;
1527        }
1528
1529        mlog(0, "lock %s, successful return from ocfs2_dlm_lock\n",
1530             lockres->l_name);
1531
1532        /* At this point we've gone inside the dlm and need to
1533         * complete our work regardless. */
1534        catch_signals = 0;
1535
1536        /* wait for busy to clear and carry on */
1537        goto again;
1538    }
1539
1540update_holders:
1541    /* Ok, if we get here then we're good to go. */
1542    ocfs2_inc_holders(lockres, level);
1543
1544    ret = 0;
1545unlock:
1546    lockres_clear_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING);
1547
1548    spin_unlock_irqrestore(&lockres->l_lock, flags);
1549out:
1550    /*
1551     * This is helping work around a lock inversion between the page lock
1552     * and dlm locks. One path holds the page lock while calling aops
1553     * which block acquiring dlm locks. The voting thread holds dlm
1554     * locks while acquiring page locks while down converting data locks.
1555     * This block is helping an aop path notice the inversion and back
1556     * off to unlock its page lock before trying the dlm lock again.
1557     */
1558    if (wait && arg_flags & OCFS2_LOCK_NONBLOCK &&
1559        mw.mw_mask & (OCFS2_LOCK_BUSY|OCFS2_LOCK_BLOCKED)) {
1560        wait = 0;
1561        if (lockres_remove_mask_waiter(lockres, &mw))
1562            ret = -EAGAIN;
1563        else
1564            goto again;
1565    }
1566    if (wait) {
1567        ret = ocfs2_wait_for_mask(&mw);
1568        if (ret == 0)
1569            goto again;
1570        mlog_errno(ret);
1571    }
1572    ocfs2_update_lock_stats(lockres, level, &mw, ret);
1573
1574#ifdef CONFIG_DEBUG_LOCK_ALLOC
1575    if (!ret && lockres->l_lockdep_map.key != NULL) {
1576        if (level == DLM_LOCK_PR)
1577            rwsem_acquire_read(&lockres->l_lockdep_map, l_subclass,
1578                !!(arg_flags & OCFS2_META_LOCK_NOQUEUE),
1579                caller_ip);
1580        else
1581            rwsem_acquire(&lockres->l_lockdep_map, l_subclass,
1582                !!(arg_flags & OCFS2_META_LOCK_NOQUEUE),
1583                caller_ip);
1584    }
1585#endif
1586    mlog_exit(ret);
1587    return ret;
1588}
1589
1590static inline int ocfs2_cluster_lock(struct ocfs2_super *osb,
1591                     struct ocfs2_lock_res *lockres,
1592                     int level,
1593                     u32 lkm_flags,
1594                     int arg_flags)
1595{
1596    return __ocfs2_cluster_lock(osb, lockres, level, lkm_flags, arg_flags,
1597                    0, _RET_IP_);
1598}
1599
1600
1601static void __ocfs2_cluster_unlock(struct ocfs2_super *osb,
1602                   struct ocfs2_lock_res *lockres,
1603                   int level,
1604                   unsigned long caller_ip)
1605{
1606    unsigned long flags;
1607
1608    mlog_entry_void();
1609    spin_lock_irqsave(&lockres->l_lock, flags);
1610    ocfs2_dec_holders(lockres, level);
1611    ocfs2_downconvert_on_unlock(osb, lockres);
1612    spin_unlock_irqrestore(&lockres->l_lock, flags);
1613#ifdef CONFIG_DEBUG_LOCK_ALLOC
1614    if (lockres->l_lockdep_map.key != NULL)
1615        rwsem_release(&lockres->l_lockdep_map, 1, caller_ip);
1616#endif
1617    mlog_exit_void();
1618}
1619
1620static int ocfs2_create_new_lock(struct ocfs2_super *osb,
1621                 struct ocfs2_lock_res *lockres,
1622                 int ex,
1623                 int local)
1624{
1625    int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
1626    unsigned long flags;
1627    u32 lkm_flags = local ? DLM_LKF_LOCAL : 0;
1628
1629    spin_lock_irqsave(&lockres->l_lock, flags);
1630    BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
1631    lockres_or_flags(lockres, OCFS2_LOCK_LOCAL);
1632    spin_unlock_irqrestore(&lockres->l_lock, flags);
1633
1634    return ocfs2_lock_create(osb, lockres, level, lkm_flags);
1635}
1636
1637/* Grants us an EX lock on the data and metadata resources, skipping
1638 * the normal cluster directory lookup. Use this ONLY on newly created
1639 * inodes which other nodes can't possibly see, and which haven't been
1640 * hashed in the inode hash yet. This can give us a good performance
1641 * increase as it'll skip the network broadcast normally associated
1642 * with creating a new lock resource. */
1643int ocfs2_create_new_inode_locks(struct inode *inode)
1644{
1645    int ret;
1646    struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1647
1648    BUG_ON(!inode);
1649    BUG_ON(!ocfs2_inode_is_new(inode));
1650
1651    mlog_entry_void();
1652
1653    mlog(0, "Inode %llu\n", (unsigned long long)OCFS2_I(inode)->ip_blkno);
1654
1655    /* NOTE: That we don't increment any of the holder counts, nor
1656     * do we add anything to a journal handle. Since this is
1657     * supposed to be a new inode which the cluster doesn't know
1658     * about yet, there is no need to. As far as the LVB handling
1659     * is concerned, this is basically like acquiring an EX lock
1660     * on a resource which has an invalid one -- we'll set it
1661     * valid when we release the EX. */
1662
1663    ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_rw_lockres, 1, 1);
1664    if (ret) {
1665        mlog_errno(ret);
1666        goto bail;
1667    }
1668
1669    /*
1670     * We don't want to use DLM_LKF_LOCAL on a meta data lock as they
1671     * don't use a generation in their lock names.
1672     */
1673    ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_inode_lockres, 1, 0);
1674    if (ret) {
1675        mlog_errno(ret);
1676        goto bail;
1677    }
1678
1679    ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_open_lockres, 0, 0);
1680    if (ret) {
1681        mlog_errno(ret);
1682        goto bail;
1683    }
1684
1685bail:
1686    mlog_exit(ret);
1687    return ret;
1688}
1689
1690int ocfs2_rw_lock(struct inode *inode, int write)
1691{
1692    int status, level;
1693    struct ocfs2_lock_res *lockres;
1694    struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1695
1696    BUG_ON(!inode);
1697
1698    mlog_entry_void();
1699
1700    mlog(0, "inode %llu take %s RW lock\n",
1701         (unsigned long long)OCFS2_I(inode)->ip_blkno,
1702         write ? "EXMODE" : "PRMODE");
1703
1704    if (ocfs2_mount_local(osb)) {
1705        mlog_exit(0);
1706        return 0;
1707    }
1708
1709    lockres = &OCFS2_I(inode)->ip_rw_lockres;
1710
1711    level = write ? DLM_LOCK_EX : DLM_LOCK_PR;
1712
1713    status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level, 0,
1714                    0);
1715    if (status < 0)
1716        mlog_errno(status);
1717
1718    mlog_exit(status);
1719    return status;
1720}
1721
1722void ocfs2_rw_unlock(struct inode *inode, int write)
1723{
1724    int level = write ? DLM_LOCK_EX : DLM_LOCK_PR;
1725    struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_rw_lockres;
1726    struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1727
1728    mlog_entry_void();
1729
1730    mlog(0, "inode %llu drop %s RW lock\n",
1731         (unsigned long long)OCFS2_I(inode)->ip_blkno,
1732         write ? "EXMODE" : "PRMODE");
1733
1734    if (!ocfs2_mount_local(osb))
1735        ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
1736
1737    mlog_exit_void();
1738}
1739
1740/*
1741 * ocfs2_open_lock always get PR mode lock.
1742 */
1743int ocfs2_open_lock(struct inode *inode)
1744{
1745    int status = 0;
1746    struct ocfs2_lock_res *lockres;
1747    struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1748
1749    BUG_ON(!inode);
1750
1751    mlog_entry_void();
1752
1753    mlog(0, "inode %llu take PRMODE open lock\n",
1754         (unsigned long long)OCFS2_I(inode)->ip_blkno);
1755
1756    if (ocfs2_mount_local(osb))
1757        goto out;
1758
1759    lockres = &OCFS2_I(inode)->ip_open_lockres;
1760
1761    status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres,
1762                    DLM_LOCK_PR, 0, 0);
1763    if (status < 0)
1764        mlog_errno(status);
1765
1766out:
1767    mlog_exit(status);
1768    return status;
1769}
1770
1771int ocfs2_try_open_lock(struct inode *inode, int write)
1772{
1773    int status = 0, level;
1774    struct ocfs2_lock_res *lockres;
1775    struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1776
1777    BUG_ON(!inode);
1778
1779    mlog_entry_void();
1780
1781    mlog(0, "inode %llu try to take %s open lock\n",
1782         (unsigned long long)OCFS2_I(inode)->ip_blkno,
1783         write ? "EXMODE" : "PRMODE");
1784
1785    if (ocfs2_mount_local(osb))
1786        goto out;
1787
1788    lockres = &OCFS2_I(inode)->ip_open_lockres;
1789
1790    level = write ? DLM_LOCK_EX : DLM_LOCK_PR;
1791
1792    /*
1793     * The file system may already holding a PRMODE/EXMODE open lock.
1794     * Since we pass DLM_LKF_NOQUEUE, the request won't block waiting on
1795     * other nodes and the -EAGAIN will indicate to the caller that
1796     * this inode is still in use.
1797     */
1798    status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres,
1799                    level, DLM_LKF_NOQUEUE, 0);
1800
1801out:
1802    mlog_exit(status);
1803    return status;
1804}
1805
1806/*
1807 * ocfs2_open_unlock unlock PR and EX mode open locks.
1808 */
1809void ocfs2_open_unlock(struct inode *inode)
1810{
1811    struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_open_lockres;
1812    struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1813
1814    mlog_entry_void();
1815
1816    mlog(0, "inode %llu drop open lock\n",
1817         (unsigned long long)OCFS2_I(inode)->ip_blkno);
1818
1819    if (ocfs2_mount_local(osb))
1820        goto out;
1821
1822    if(lockres->l_ro_holders)
1823        ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres,
1824                     DLM_LOCK_PR);
1825    if(lockres->l_ex_holders)
1826        ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres,
1827                     DLM_LOCK_EX);
1828
1829out:
1830    mlog_exit_void();
1831}
1832
1833static int ocfs2_flock_handle_signal(struct ocfs2_lock_res *lockres,
1834                     int level)
1835{
1836    int ret;
1837    struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
1838    unsigned long flags;
1839    struct ocfs2_mask_waiter mw;
1840
1841    ocfs2_init_mask_waiter(&mw);
1842
1843retry_cancel:
1844    spin_lock_irqsave(&lockres->l_lock, flags);
1845    if (lockres->l_flags & OCFS2_LOCK_BUSY) {
1846        ret = ocfs2_prepare_cancel_convert(osb, lockres);
1847        if (ret) {
1848            spin_unlock_irqrestore(&lockres->l_lock, flags);
1849            ret = ocfs2_cancel_convert(osb, lockres);
1850            if (ret < 0) {
1851                mlog_errno(ret);
1852                goto out;
1853            }
1854            goto retry_cancel;
1855        }
1856        lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1857        spin_unlock_irqrestore(&lockres->l_lock, flags);
1858
1859        ocfs2_wait_for_mask(&mw);
1860        goto retry_cancel;
1861    }
1862
1863    ret = -ERESTARTSYS;
1864    /*
1865     * We may still have gotten the lock, in which case there's no
1866     * point to restarting the syscall.
1867     */
1868    if (lockres->l_level == level)
1869        ret = 0;
1870
1871    mlog(0, "Cancel returning %d. flags: 0x%lx, level: %d, act: %d\n", ret,
1872         lockres->l_flags, lockres->l_level, lockres->l_action);
1873
1874    spin_unlock_irqrestore(&lockres->l_lock, flags);
1875
1876out:
1877    return ret;
1878}
1879
1880/*
1881 * ocfs2_file_lock() and ocfs2_file_unlock() map to a single pair of
1882 * flock() calls. The locking approach this requires is sufficiently
1883 * different from all other cluster lock types that we implement a
1884 * separate path to the "low-level" dlm calls. In particular:
1885 *
1886 * - No optimization of lock levels is done - we take at exactly
1887 * what's been requested.
1888 *
1889 * - No lock caching is employed. We immediately downconvert to
1890 * no-lock at unlock time. This also means flock locks never go on
1891 * the blocking list).
1892 *
1893 * - Since userspace can trivially deadlock itself with flock, we make
1894 * sure to allow cancellation of a misbehaving applications flock()
1895 * request.
1896 *
1897 * - Access to any flock lockres doesn't require concurrency, so we
1898 * can simplify the code by requiring the caller to guarantee
1899 * serialization of dlmglue flock calls.
1900 */
1901int ocfs2_file_lock(struct file *file, int ex, int trylock)
1902{
1903    int ret, level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
1904    unsigned int lkm_flags = trylock ? DLM_LKF_NOQUEUE : 0;
1905    unsigned long flags;
1906    struct ocfs2_file_private *fp = file->private_data;
1907    struct ocfs2_lock_res *lockres = &fp->fp_flock;
1908    struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb);
1909    struct ocfs2_mask_waiter mw;
1910
1911    ocfs2_init_mask_waiter(&mw);
1912
1913    if ((lockres->l_flags & OCFS2_LOCK_BUSY) ||
1914        (lockres->l_level > DLM_LOCK_NL)) {
1915        mlog(ML_ERROR,
1916             "File lock \"%s\" has busy or locked state: flags: 0x%lx, "
1917             "level: %u\n", lockres->l_name, lockres->l_flags,
1918             lockres->l_level);
1919        return -EINVAL;
1920    }
1921
1922    spin_lock_irqsave(&lockres->l_lock, flags);
1923    if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
1924        lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1925        spin_unlock_irqrestore(&lockres->l_lock, flags);
1926
1927        /*
1928         * Get the lock at NLMODE to start - that way we
1929         * can cancel the upconvert request if need be.
1930         */
1931        ret = ocfs2_lock_create(osb, lockres, DLM_LOCK_NL, 0);
1932        if (ret < 0) {
1933            mlog_errno(ret);
1934            goto out;
1935        }
1936
1937        ret = ocfs2_wait_for_mask(&mw);
1938        if (ret) {
1939            mlog_errno(ret);
1940            goto out;
1941        }
1942        spin_lock_irqsave(&lockres->l_lock, flags);
1943    }
1944
1945    lockres->l_action = OCFS2_AST_CONVERT;
1946    lkm_flags |= DLM_LKF_CONVERT;
1947    lockres->l_requested = level;
1948    lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
1949
1950    lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1951    spin_unlock_irqrestore(&lockres->l_lock, flags);
1952
1953    ret = ocfs2_dlm_lock(osb->cconn, level, &lockres->l_lksb, lkm_flags,
1954                 lockres->l_name, OCFS2_LOCK_ID_MAX_LEN - 1);
1955    if (ret) {
1956        if (!trylock || (ret != -EAGAIN)) {
1957            ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
1958            ret = -EINVAL;
1959        }
1960
1961        ocfs2_recover_from_dlm_error(lockres, 1);
1962        lockres_remove_mask_waiter(lockres, &mw);
1963        goto out;
1964    }
1965
1966    ret = ocfs2_wait_for_mask_interruptible(&mw, lockres);
1967    if (ret == -ERESTARTSYS) {
1968        /*
1969         * Userspace can cause deadlock itself with
1970         * flock(). Current behavior locally is to allow the
1971         * deadlock, but abort the system call if a signal is
1972         * received. We follow this example, otherwise a
1973         * poorly written program could sit in kernel until
1974         * reboot.
1975         *
1976         * Handling this is a bit more complicated for Ocfs2
1977         * though. We can't exit this function with an
1978         * outstanding lock request, so a cancel convert is
1979         * required. We intentionally overwrite 'ret' - if the
1980         * cancel fails and the lock was granted, it's easier
1981         * to just bubble success back up to the user.
1982         */
1983        ret = ocfs2_flock_handle_signal(lockres, level);
1984    } else if (!ret && (level > lockres->l_level)) {
1985        /* Trylock failed asynchronously */
1986        BUG_ON(!trylock);
1987        ret = -EAGAIN;
1988    }
1989
1990out:
1991
1992    mlog(0, "Lock: \"%s\" ex: %d, trylock: %d, returns: %d\n",
1993         lockres->l_name, ex, trylock, ret);
1994    return ret;
1995}
1996
1997void ocfs2_file_unlock(struct file *file)
1998{
1999    int ret;
2000    unsigned int gen;
2001    unsigned long flags;
2002    struct ocfs2_file_private *fp = file->private_data;
2003    struct ocfs2_lock_res *lockres = &fp->fp_flock;
2004    struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb);
2005    struct ocfs2_mask_waiter mw;
2006
2007    ocfs2_init_mask_waiter(&mw);
2008
2009    if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED))
2010        return;
2011
2012    if (lockres->l_level == DLM_LOCK_NL)
2013        return;
2014
2015    mlog(0, "Unlock: \"%s\" flags: 0x%lx, level: %d, act: %d\n",
2016         lockres->l_name, lockres->l_flags, lockres->l_level,
2017         lockres->l_action);
2018
2019    spin_lock_irqsave(&lockres->l_lock, flags);
2020    /*
2021     * Fake a blocking ast for the downconvert code.
2022     */
2023    lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED);
2024    lockres->l_blocking = DLM_LOCK_EX;
2025
2026    gen = ocfs2_prepare_downconvert(lockres, DLM_LOCK_NL);
2027    lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
2028    spin_unlock_irqrestore(&lockres->l_lock, flags);
2029
2030    ret = ocfs2_downconvert_lock(osb, lockres, DLM_LOCK_NL, 0, gen);
2031    if (ret) {
2032        mlog_errno(ret);
2033        return;
2034    }
2035
2036    ret = ocfs2_wait_for_mask(&mw);
2037    if (ret)
2038        mlog_errno(ret);
2039}
2040
2041static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb,
2042                    struct ocfs2_lock_res *lockres)
2043{
2044    int kick = 0;
2045
2046    mlog_entry_void();
2047
2048    /* If we know that another node is waiting on our lock, kick
2049     * the downconvert thread * pre-emptively when we reach a release
2050     * condition. */
2051    if (lockres->l_flags & OCFS2_LOCK_BLOCKED) {
2052        switch(lockres->l_blocking) {
2053        case DLM_LOCK_EX:
2054            if (!lockres->l_ex_holders && !lockres->l_ro_holders)
2055                kick = 1;
2056            break;
2057        case DLM_LOCK_PR:
2058            if (!lockres->l_ex_holders)
2059                kick = 1;
2060            break;
2061        default:
2062            BUG();
2063        }
2064    }
2065
2066    if (kick)
2067        ocfs2_wake_downconvert_thread(osb);
2068
2069    mlog_exit_void();
2070}
2071
2072#define OCFS2_SEC_BITS 34
2073#define OCFS2_SEC_SHIFT (64 - 34)
2074#define OCFS2_NSEC_MASK ((1ULL << OCFS2_SEC_SHIFT) - 1)
2075
2076/* LVB only has room for 64 bits of time here so we pack it for
2077 * now. */
2078static u64 ocfs2_pack_timespec(struct timespec *spec)
2079{
2080    u64 res;
2081    u64 sec = spec->tv_sec;
2082    u32 nsec = spec->tv_nsec;
2083
2084    res = (sec << OCFS2_SEC_SHIFT) | (nsec & OCFS2_NSEC_MASK);
2085
2086    return res;
2087}
2088
2089/* Call this with the lockres locked. I am reasonably sure we don't
2090 * need ip_lock in this function as anyone who would be changing those
2091 * values is supposed to be blocked in ocfs2_inode_lock right now. */
2092static void __ocfs2_stuff_meta_lvb(struct inode *inode)
2093{
2094    struct ocfs2_inode_info *oi = OCFS2_I(inode);
2095    struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres;
2096    struct ocfs2_meta_lvb *lvb;
2097
2098    mlog_entry_void();
2099
2100    lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2101
2102    /*
2103     * Invalidate the LVB of a deleted inode - this way other
2104     * nodes are forced to go to disk and discover the new inode
2105     * status.
2106     */
2107    if (oi->ip_flags & OCFS2_INODE_DELETED) {
2108        lvb->lvb_version = 0;
2109        goto out;
2110    }
2111
2112    lvb->lvb_version = OCFS2_LVB_VERSION;
2113    lvb->lvb_isize = cpu_to_be64(i_size_read(inode));
2114    lvb->lvb_iclusters = cpu_to_be32(oi->ip_clusters);
2115    lvb->lvb_iuid = cpu_to_be32(inode->i_uid);
2116    lvb->lvb_igid = cpu_to_be32(inode->i_gid);
2117    lvb->lvb_imode = cpu_to_be16(inode->i_mode);
2118    lvb->lvb_inlink = cpu_to_be16(inode->i_nlink);
2119    lvb->lvb_iatime_packed =
2120        cpu_to_be64(ocfs2_pack_timespec(&inode->i_atime));
2121    lvb->lvb_ictime_packed =
2122        cpu_to_be64(ocfs2_pack_timespec(&inode->i_ctime));
2123    lvb->lvb_imtime_packed =
2124        cpu_to_be64(ocfs2_pack_timespec(&inode->i_mtime));
2125    lvb->lvb_iattr = cpu_to_be32(oi->ip_attr);
2126    lvb->lvb_idynfeatures = cpu_to_be16(oi->ip_dyn_features);
2127    lvb->lvb_igeneration = cpu_to_be32(inode->i_generation);
2128
2129out:
2130    mlog_meta_lvb(0, lockres);
2131
2132    mlog_exit_void();
2133}
2134
2135static void ocfs2_unpack_timespec(struct timespec *spec,
2136                  u64 packed_time)
2137{
2138    spec->tv_sec = packed_time >> OCFS2_SEC_SHIFT;
2139    spec->tv_nsec = packed_time & OCFS2_NSEC_MASK;
2140}
2141
2142static void ocfs2_refresh_inode_from_lvb(struct inode *inode)
2143{
2144    struct ocfs2_inode_info *oi = OCFS2_I(inode);
2145    struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres;
2146    struct ocfs2_meta_lvb *lvb;
2147
2148    mlog_entry_void();
2149
2150    mlog_meta_lvb(0, lockres);
2151
2152    lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2153
2154    /* We're safe here without the lockres lock... */
2155    spin_lock(&oi->ip_lock);
2156    oi->ip_clusters = be32_to_cpu(lvb->lvb_iclusters);
2157    i_size_write(inode, be64_to_cpu(lvb->lvb_isize));
2158
2159    oi->ip_attr = be32_to_cpu(lvb->lvb_iattr);
2160    oi->ip_dyn_features = be16_to_cpu(lvb->lvb_idynfeatures);
2161    ocfs2_set_inode_flags(inode);
2162
2163    /* fast-symlinks are a special case */
2164    if (S_ISLNK(inode->i_mode) && !oi->ip_clusters)
2165        inode->i_blocks = 0;
2166    else
2167        inode->i_blocks = ocfs2_inode_sector_count(inode);
2168
2169    inode->i_uid = be32_to_cpu(lvb->lvb_iuid);
2170    inode->i_gid = be32_to_cpu(lvb->lvb_igid);
2171    inode->i_mode = be16_to_cpu(lvb->lvb_imode);
2172    inode->i_nlink = be16_to_cpu(lvb->lvb_inlink);
2173    ocfs2_unpack_timespec(&inode->i_atime,
2174                  be64_to_cpu(lvb->lvb_iatime_packed));
2175    ocfs2_unpack_timespec(&inode->i_mtime,
2176                  be64_to_cpu(lvb->lvb_imtime_packed));
2177    ocfs2_unpack_timespec(&inode->i_ctime,
2178                  be64_to_cpu(lvb->lvb_ictime_packed));
2179    spin_unlock(&oi->ip_lock);
2180
2181    mlog_exit_void();
2182}
2183
2184static inline int ocfs2_meta_lvb_is_trustable(struct inode *inode,
2185                          struct ocfs2_lock_res *lockres)
2186{
2187    struct ocfs2_meta_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2188
2189    if (ocfs2_dlm_lvb_valid(&lockres->l_lksb)
2190        && lvb->lvb_version == OCFS2_LVB_VERSION
2191        && be32_to_cpu(lvb->lvb_igeneration) == inode->i_generation)
2192        return 1;
2193    return 0;
2194}
2195
2196/* Determine whether a lock resource needs to be refreshed, and
2197 * arbitrate who gets to refresh it.
2198 *
2199 * 0 means no refresh needed.
2200 *
2201 * > 0 means you need to refresh this and you MUST call
2202 * ocfs2_complete_lock_res_refresh afterwards. */
2203static int ocfs2_should_refresh_lock_res(struct ocfs2_lock_res *lockres)
2204{
2205    unsigned long flags;
2206    int status = 0;
2207
2208    mlog_entry_void();
2209
2210refresh_check:
2211    spin_lock_irqsave(&lockres->l_lock, flags);
2212    if (!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) {
2213        spin_unlock_irqrestore(&lockres->l_lock, flags);
2214        goto bail;
2215    }
2216
2217    if (lockres->l_flags & OCFS2_LOCK_REFRESHING) {
2218        spin_unlock_irqrestore(&lockres->l_lock, flags);
2219
2220        ocfs2_wait_on_refreshing_lock(lockres);
2221        goto refresh_check;
2222    }
2223
2224    /* Ok, I'll be the one to refresh this lock. */
2225    lockres_or_flags(lockres, OCFS2_LOCK_REFRESHING);
2226    spin_unlock_irqrestore(&lockres->l_lock, flags);
2227
2228    status = 1;
2229bail:
2230    mlog_exit(status);
2231    return status;
2232}
2233
2234/* If status is non zero, I'll mark it as not being in refresh
2235 * anymroe, but i won't clear the needs refresh flag. */
2236static inline void ocfs2_complete_lock_res_refresh(struct ocfs2_lock_res *lockres,
2237                           int status)
2238{
2239    unsigned long flags;
2240    mlog_entry_void();
2241
2242    spin_lock_irqsave(&lockres->l_lock, flags);
2243    lockres_clear_flags(lockres, OCFS2_LOCK_REFRESHING);
2244    if (!status)
2245        lockres_clear_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
2246    spin_unlock_irqrestore(&lockres->l_lock, flags);
2247
2248    wake_up(&lockres->l_event);
2249
2250    mlog_exit_void();
2251}
2252
2253/* may or may not return a bh if it went to disk. */
2254static int ocfs2_inode_lock_update(struct inode *inode,
2255                  struct buffer_head **bh)
2256{
2257    int status = 0;
2258    struct ocfs2_inode_info *oi = OCFS2_I(inode);
2259    struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres;
2260    struct ocfs2_dinode *fe;
2261    struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
2262
2263    mlog_entry_void();
2264
2265    if (ocfs2_mount_local(osb))
2266        goto bail;
2267
2268    spin_lock(&oi->ip_lock);
2269    if (oi->ip_flags & OCFS2_INODE_DELETED) {
2270        mlog(0, "Orphaned inode %llu was deleted while we "
2271             "were waiting on a lock. ip_flags = 0x%x\n",
2272             (unsigned long long)oi->ip_blkno, oi->ip_flags);
2273        spin_unlock(&oi->ip_lock);
2274        status = -ENOENT;
2275        goto bail;
2276    }
2277    spin_unlock(&oi->ip_lock);
2278
2279    if (!ocfs2_should_refresh_lock_res(lockres))
2280        goto bail;
2281
2282    /* This will discard any caching information we might have had
2283     * for the inode metadata. */
2284    ocfs2_metadata_cache_purge(INODE_CACHE(inode));
2285
2286    ocfs2_extent_map_trunc(inode, 0);
2287
2288    if (ocfs2_meta_lvb_is_trustable(inode, lockres)) {
2289        mlog(0, "Trusting LVB on inode %llu\n",
2290             (unsigned long long)oi->ip_blkno);
2291        ocfs2_refresh_inode_from_lvb(inode);
2292    } else {
2293        /* Boo, we have to go to disk. */
2294        /* read bh, cast, ocfs2_refresh_inode */
2295        status = ocfs2_read_inode_block(inode, bh);
2296        if (status < 0) {
2297            mlog_errno(status);
2298            goto bail_refresh;
2299        }
2300        fe = (struct ocfs2_dinode *) (*bh)->b_data;
2301
2302        /* This is a good chance to make sure we're not
2303         * locking an invalid object. ocfs2_read_inode_block()
2304         * already checked that the inode block is sane.
2305         *
2306         * We bug on a stale inode here because we checked
2307         * above whether it was wiped from disk. The wiping
2308         * node provides a guarantee that we receive that
2309         * message and can mark the inode before dropping any
2310         * locks associated with it. */
2311        mlog_bug_on_msg(inode->i_generation !=
2312                le32_to_cpu(fe->i_generation),
2313                "Invalid dinode %llu disk generation: %u "
2314                "inode->i_generation: %u\n",
2315                (unsigned long long)oi->ip_blkno,
2316                le32_to_cpu(fe->i_generation),
2317                inode->i_generation);
2318        mlog_bug_on_msg(le64_to_cpu(fe->i_dtime) ||
2319                !(fe->i_flags & cpu_to_le32(OCFS2_VALID_FL)),
2320                "Stale dinode %llu dtime: %llu flags: 0x%x\n",
2321                (unsigned long long)oi->ip_blkno,
2322                (unsigned long long)le64_to_cpu(fe->i_dtime),
2323                le32_to_cpu(fe->i_flags));
2324
2325        ocfs2_refresh_inode(inode, fe);
2326        ocfs2_track_lock_refresh(lockres);
2327    }
2328
2329    status = 0;
2330bail_refresh:
2331    ocfs2_complete_lock_res_refresh(lockres, status);
2332bail:
2333    mlog_exit(status);
2334    return status;
2335}
2336
2337static int ocfs2_assign_bh(struct inode *inode,
2338               struct buffer_head **ret_bh,
2339               struct buffer_head *passed_bh)
2340{
2341    int status;
2342
2343    if (passed_bh) {
2344        /* Ok, the update went to disk for us, use the
2345         * returned bh. */
2346        *ret_bh = passed_bh;
2347        get_bh(*ret_bh);
2348
2349        return 0;
2350    }
2351
2352    status = ocfs2_read_inode_block(inode, ret_bh);
2353    if (status < 0)
2354        mlog_errno(status);
2355
2356    return status;
2357}
2358
2359/*
2360 * returns < 0 error if the callback will never be called, otherwise
2361 * the result of the lock will be communicated via the callback.
2362 */
2363int ocfs2_inode_lock_full_nested(struct inode *inode,
2364                 struct buffer_head **ret_bh,
2365                 int ex,
2366                 int arg_flags,
2367                 int subclass)
2368{
2369    int status, level, acquired;
2370    u32 dlm_flags;
2371    struct ocfs2_lock_res *lockres = NULL;
2372    struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
2373    struct buffer_head *local_bh = NULL;
2374
2375    BUG_ON(!inode);
2376
2377    mlog_entry_void();
2378
2379    mlog(0, "inode %llu, take %s META lock\n",
2380         (unsigned long long)OCFS2_I(inode)->ip_blkno,
2381         ex ? "EXMODE" : "PRMODE");
2382
2383    status = 0;
2384    acquired = 0;
2385    /* We'll allow faking a readonly metadata lock for
2386     * rodevices. */
2387    if (ocfs2_is_hard_readonly(osb)) {
2388        if (ex)
2389            status = -EROFS;
2390        goto bail;
2391    }
2392
2393    if (ocfs2_mount_local(osb))
2394        goto local;
2395
2396    if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
2397        ocfs2_wait_for_recovery(osb);
2398
2399    lockres = &OCFS2_I(inode)->ip_inode_lockres;
2400    level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2401    dlm_flags = 0;
2402    if (arg_flags & OCFS2_META_LOCK_NOQUEUE)
2403        dlm_flags |= DLM_LKF_NOQUEUE;
2404
2405    status = __ocfs2_cluster_lock(osb, lockres, level, dlm_flags,
2406                      arg_flags, subclass, _RET_IP_);
2407    if (status < 0) {
2408        if (status != -EAGAIN && status != -EIOCBRETRY)
2409            mlog_errno(status);
2410        goto bail;
2411    }
2412
2413    /* Notify the error cleanup path to drop the cluster lock. */
2414    acquired = 1;
2415
2416    /* We wait twice because a node may have died while we were in
2417     * the lower dlm layers. The second time though, we've
2418     * committed to owning this lock so we don't allow signals to
2419     * abort the operation. */
2420    if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
2421        ocfs2_wait_for_recovery(osb);
2422
2423local:
2424    /*
2425     * We only see this flag if we're being called from
2426     * ocfs2_read_locked_inode(). It means we're locking an inode
2427     * which hasn't been populated yet, so clear the refresh flag
2428     * and let the caller handle it.
2429     */
2430    if (inode->i_state & I_NEW) {
2431        status = 0;
2432        if (lockres)
2433            ocfs2_complete_lock_res_refresh(lockres, 0);
2434        goto bail;
2435    }
2436
2437    /* This is fun. The caller may want a bh back, or it may
2438     * not. ocfs2_inode_lock_update definitely wants one in, but
2439     * may or may not read one, depending on what's in the
2440     * LVB. The result of all of this is that we've *only* gone to
2441     * disk if we have to, so the complexity is worthwhile. */
2442    status = ocfs2_inode_lock_update(inode, &local_bh);
2443    if (status < 0) {
2444        if (status != -ENOENT)
2445            mlog_errno(status);
2446        goto bail;
2447    }
2448
2449    if (ret_bh) {
2450        status = ocfs2_assign_bh(inode, ret_bh, local_bh);
2451        if (status < 0) {
2452            mlog_errno(status);
2453            goto bail;
2454        }
2455    }
2456
2457bail:
2458    if (status < 0) {
2459        if (ret_bh && (*ret_bh)) {
2460            brelse(*ret_bh);
2461            *ret_bh = NULL;
2462        }
2463        if (acquired)
2464            ocfs2_inode_unlock(inode, ex);
2465    }
2466
2467    if (local_bh)
2468        brelse(local_bh);
2469
2470    mlog_exit(status);
2471    return status;
2472}
2473
2474/*
2475 * This is working around a lock inversion between tasks acquiring DLM
2476 * locks while holding a page lock and the downconvert thread which
2477 * blocks dlm lock acquiry while acquiring page locks.
2478 *
2479 * ** These _with_page variantes are only intended to be called from aop
2480 * methods that hold page locks and return a very specific *positive* error
2481 * code that aop methods pass up to the VFS -- test for errors with != 0. **
2482 *
2483 * The DLM is called such that it returns -EAGAIN if it would have
2484 * blocked waiting for the downconvert thread. In that case we unlock
2485 * our page so the downconvert thread can make progress. Once we've
2486 * done this we have to return AOP_TRUNCATED_PAGE so the aop method
2487 * that called us can bubble that back up into the VFS who will then
2488 * immediately retry the aop call.
2489 *
2490 * We do a blocking lock and immediate unlock before returning, though, so that
2491 * the lock has a great chance of being cached on this node by the time the VFS
2492 * calls back to retry the aop. This has a potential to livelock as nodes
2493 * ping locks back and forth, but that's a risk we're willing to take to avoid
2494 * the lock inversion simply.
2495 */
2496int ocfs2_inode_lock_with_page(struct inode *inode,
2497                  struct buffer_head **ret_bh,
2498                  int ex,
2499                  struct page *page)
2500{
2501    int ret;
2502
2503    ret = ocfs2_inode_lock_full(inode, ret_bh, ex, OCFS2_LOCK_NONBLOCK);
2504    if (ret == -EAGAIN) {
2505        unlock_page(page);
2506        if (ocfs2_inode_lock(inode, ret_bh, ex) == 0)
2507            ocfs2_inode_unlock(inode, ex);
2508        ret = AOP_TRUNCATED_PAGE;
2509    }
2510
2511    return ret;
2512}
2513
2514int ocfs2_inode_lock_atime(struct inode *inode,
2515              struct vfsmount *vfsmnt,
2516              int *level)
2517{
2518    int ret;
2519
2520    mlog_entry_void();
2521    ret = ocfs2_inode_lock(inode, NULL, 0);
2522    if (ret < 0) {
2523        mlog_errno(ret);
2524        return ret;
2525    }
2526
2527    /*
2528     * If we should update atime, we will get EX lock,
2529     * otherwise we just get PR lock.
2530     */
2531    if (ocfs2_should_update_atime(inode, vfsmnt)) {
2532        struct buffer_head *bh = NULL;
2533
2534        ocfs2_inode_unlock(inode, 0);
2535        ret = ocfs2_inode_lock(inode, &bh, 1);
2536        if (ret < 0) {
2537            mlog_errno(ret);
2538            return ret;
2539        }
2540        *level = 1;
2541        if (ocfs2_should_update_atime(inode, vfsmnt))
2542            ocfs2_update_inode_atime(inode, bh);
2543        if (bh)
2544            brelse(bh);
2545    } else
2546        *level = 0;
2547
2548    mlog_exit(ret);
2549    return ret;
2550}
2551
2552void ocfs2_inode_unlock(struct inode *inode,
2553               int ex)
2554{
2555    int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2556    struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_inode_lockres;
2557    struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
2558
2559    mlog_entry_void();
2560
2561    mlog(0, "inode %llu drop %s META lock\n",
2562         (unsigned long long)OCFS2_I(inode)->ip_blkno,
2563         ex ? "EXMODE" : "PRMODE");
2564
2565    if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb)) &&
2566        !ocfs2_mount_local(osb))
2567        ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
2568
2569    mlog_exit_void();
2570}
2571
2572int ocfs2_orphan_scan_lock(struct ocfs2_super *osb, u32 *seqno)
2573{
2574    struct ocfs2_lock_res *lockres;
2575    struct ocfs2_orphan_scan_lvb *lvb;
2576    int status = 0;
2577
2578    if (ocfs2_is_hard_readonly(osb))
2579        return -EROFS;
2580
2581    if (ocfs2_mount_local(osb))
2582        return 0;
2583
2584    lockres = &osb->osb_orphan_scan.os_lockres;
2585    status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 0, 0);
2586    if (status < 0)
2587        return status;
2588
2589    lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2590    if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) &&
2591        lvb->lvb_version == OCFS2_ORPHAN_LVB_VERSION)
2592        *seqno = be32_to_cpu(lvb->lvb_os_seqno);
2593    else
2594        *seqno = osb->osb_orphan_scan.os_seqno + 1;
2595
2596    return status;
2597}
2598
2599void ocfs2_orphan_scan_unlock(struct ocfs2_super *osb, u32 seqno)
2600{
2601    struct ocfs2_lock_res *lockres;
2602    struct ocfs2_orphan_scan_lvb *lvb;
2603
2604    if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb)) {
2605        lockres = &osb->osb_orphan_scan.os_lockres;
2606        lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2607        lvb->lvb_version = OCFS2_ORPHAN_LVB_VERSION;
2608        lvb->lvb_os_seqno = cpu_to_be32(seqno);
2609        ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX);
2610    }
2611}
2612
2613int ocfs2_super_lock(struct ocfs2_super *osb,
2614             int ex)
2615{
2616    int status = 0;
2617    int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2618    struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
2619
2620    mlog_entry_void();
2621
2622    if (ocfs2_is_hard_readonly(osb))
2623        return -EROFS;
2624
2625    if (ocfs2_mount_local(osb))
2626        goto bail;
2627
2628    status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
2629    if (status < 0) {
2630        mlog_errno(status);
2631        goto bail;
2632    }
2633
2634    /* The super block lock path is really in the best position to
2635     * know when resources covered by the lock need to be
2636     * refreshed, so we do it here. Of course, making sense of
2637     * everything is up to the caller :) */
2638    status = ocfs2_should_refresh_lock_res(lockres);
2639    if (status < 0) {
2640        mlog_errno(status);
2641        goto bail;
2642    }
2643    if (status) {
2644        status = ocfs2_refresh_slot_info(osb);
2645
2646        ocfs2_complete_lock_res_refresh(lockres, status);
2647
2648        if (status < 0)
2649            mlog_errno(status);
2650        ocfs2_track_lock_refresh(lockres);
2651    }
2652bail:
2653    mlog_exit(status);
2654    return status;
2655}
2656
2657void ocfs2_super_unlock(struct ocfs2_super *osb,
2658            int ex)
2659{
2660    int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2661    struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
2662
2663    if (!ocfs2_mount_local(osb))
2664        ocfs2_cluster_unlock(osb, lockres, level);
2665}
2666
2667int ocfs2_rename_lock(struct ocfs2_super *osb)
2668{
2669    int status;
2670    struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;
2671
2672    if (ocfs2_is_hard_readonly(osb))
2673        return -EROFS;
2674
2675    if (ocfs2_mount_local(osb))
2676        return 0;
2677
2678    status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 0, 0);
2679    if (status < 0)
2680        mlog_errno(status);
2681
2682    return status;
2683}
2684
2685void ocfs2_rename_unlock(struct ocfs2_super *osb)
2686{
2687    struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;
2688
2689    if (!ocfs2_mount_local(osb))
2690        ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX);
2691}
2692
2693int ocfs2_nfs_sync_lock(struct ocfs2_super *osb, int ex)
2694{
2695    int status;
2696    struct ocfs2_lock_res *lockres = &osb->osb_nfs_sync_lockres;
2697
2698    if (ocfs2_is_hard_readonly(osb))
2699        return -EROFS;
2700
2701    if (ocfs2_mount_local(osb))
2702        return 0;
2703
2704    status = ocfs2_cluster_lock(osb, lockres, ex ? LKM_EXMODE : LKM_PRMODE,
2705                    0, 0);
2706    if (status < 0)
2707        mlog(ML_ERROR, "lock on nfs sync lock failed %d\n", status);
2708
2709    return status;
2710}
2711
2712void ocfs2_nfs_sync_unlock(struct ocfs2_super *osb, int ex)
2713{
2714    struct ocfs2_lock_res *lockres = &osb->osb_nfs_sync_lockres;
2715
2716    if (!ocfs2_mount_local(osb))
2717        ocfs2_cluster_unlock(osb, lockres,
2718                     ex ? LKM_EXMODE : LKM_PRMODE);
2719}
2720
2721int ocfs2_dentry_lock(struct dentry *dentry, int ex)
2722{
2723    int ret;
2724    int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2725    struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
2726    struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
2727
2728    BUG_ON(!dl);
2729
2730    if (ocfs2_is_hard_readonly(osb))
2731        return -EROFS;
2732
2733    if (ocfs2_mount_local(osb))
2734        return 0;
2735
2736    ret = ocfs2_cluster_lock(osb, &dl->dl_lockres, level, 0, 0);
2737    if (ret < 0)
2738        mlog_errno(ret);
2739
2740    return ret;
2741}
2742
2743void ocfs2_dentry_unlock(struct dentry *dentry, int ex)
2744{
2745    int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2746    struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
2747    struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
2748
2749    if (!ocfs2_mount_local(osb))
2750        ocfs2_cluster_unlock(osb, &dl->dl_lockres, level);
2751}
2752
2753/* Reference counting of the dlm debug structure. We want this because
2754 * open references on the debug inodes can live on after a mount, so
2755 * we can't rely on the ocfs2_super to always exist. */
2756static void ocfs2_dlm_debug_free(struct kref *kref)
2757{
2758    struct ocfs2_dlm_debug *dlm_debug;
2759
2760    dlm_debug = container_of(kref, struct ocfs2_dlm_debug, d_refcnt);
2761
2762    kfree(dlm_debug);
2763}
2764
2765void ocfs2_put_dlm_debug(struct ocfs2_dlm_debug *dlm_debug)
2766{
2767    if (dlm_debug)
2768        kref_put(&dlm_debug->d_refcnt, ocfs2_dlm_debug_free);
2769}
2770
2771static void ocfs2_get_dlm_debug(struct ocfs2_dlm_debug *debug)
2772{
2773    kref_get(&debug->d_refcnt);
2774}
2775
2776struct ocfs2_dlm_debug *ocfs2_new_dlm_debug(void)
2777{
2778    struct ocfs2_dlm_debug *dlm_debug;
2779
2780    dlm_debug = kmalloc(sizeof(struct ocfs2_dlm_debug), GFP_KERNEL);
2781    if (!dlm_debug) {
2782        mlog_errno(-ENOMEM);
2783        goto out;
2784    }
2785
2786    kref_init(&dlm_debug->d_refcnt);
2787    INIT_LIST_HEAD(&dlm_debug->d_lockres_tracking);
2788    dlm_debug->d_locking_state = NULL;
2789out:
2790    return dlm_debug;
2791}
2792
2793/* Access to this is arbitrated for us via seq_file->sem. */
2794struct ocfs2_dlm_seq_priv {
2795    struct ocfs2_dlm_debug *p_dlm_debug;
2796    struct ocfs2_lock_res p_iter_res;
2797    struct ocfs2_lock_res p_tmp_res;
2798};
2799
2800static struct ocfs2_lock_res *ocfs2_dlm_next_res(struct ocfs2_lock_res *start,
2801                         struct ocfs2_dlm_seq_priv *priv)
2802{
2803    struct ocfs2_lock_res *iter, *ret = NULL;
2804    struct ocfs2_dlm_debug *dlm_debug = priv->p_dlm_debug;
2805
2806    assert_spin_locked(&ocfs2_dlm_tracking_lock);
2807
2808    list_for_each_entry(iter, &start->l_debug_list, l_debug_list) {
2809        /* discover the head of the list */
2810        if (&iter->l_debug_list == &dlm_debug->d_lockres_tracking) {
2811            mlog(0, "End of list found, %p\n", ret);
2812            break;
2813        }
2814
2815        /* We track our "dummy" iteration lockres' by a NULL
2816         * l_ops field. */
2817        if (iter->l_ops != NULL) {
2818            ret = iter;
2819            break;
2820        }
2821    }
2822
2823    return ret;
2824}
2825
2826static void *ocfs2_dlm_seq_start(struct seq_file *m, loff_t *pos)
2827{
2828    struct ocfs2_dlm_seq_priv *priv = m->private;
2829    struct ocfs2_lock_res *iter;
2830
2831    spin_lock(&ocfs2_dlm_tracking_lock);
2832    iter = ocfs2_dlm_next_res(&priv->p_iter_res, priv);
2833    if (iter) {
2834        /* Since lockres' have the lifetime of their container
2835         * (which can be inodes, ocfs2_supers, etc) we want to
2836         * copy this out to a temporary lockres while still
2837         * under the spinlock. Obviously after this we can't
2838         * trust any pointers on the copy returned, but that's
2839         * ok as the information we want isn't typically held
2840         * in them. */
2841        priv->p_tmp_res = *iter;
2842        iter = &priv->p_tmp_res;
2843    }
2844    spin_unlock(&ocfs2_dlm_tracking_lock);
2845
2846    return iter;
2847}
2848
2849static void ocfs2_dlm_seq_stop(struct seq_file *m, void *v)
2850{
2851}
2852
2853static void *ocfs2_dlm_seq_next(struct seq_file *m, void *v, loff_t *pos)
2854{
2855    struct ocfs2_dlm_seq_priv *priv = m->private;
2856    struct ocfs2_lock_res *iter = v;
2857    struct ocfs2_lock_res *dummy = &priv->p_iter_res;
2858
2859    spin_lock(&ocfs2_dlm_tracking_lock);
2860    iter = ocfs2_dlm_next_res(iter, priv);
2861    list_del_init(&dummy->l_debug_list);
2862    if (iter) {
2863        list_add(&dummy->l_debug_list, &iter->l_debug_list);
2864        priv->p_tmp_res = *iter;
2865        iter = &priv->p_tmp_res;
2866    }
2867    spin_unlock(&ocfs2_dlm_tracking_lock);
2868
2869    return iter;
2870}
2871
2872/* So that debugfs.ocfs2 can determine which format is being used */
2873#define OCFS2_DLM_DEBUG_STR_VERSION 2
2874static int ocfs2_dlm_seq_show(struct seq_file *m, void *v)
2875{
2876    int i;
2877    char *lvb;
2878    struct ocfs2_lock_res *lockres = v;
2879
2880    if (!lockres)
2881        return -EINVAL;
2882
2883    seq_printf(m, "0x%x\t", OCFS2_DLM_DEBUG_STR_VERSION);
2884
2885    if (lockres->l_type == OCFS2_LOCK_TYPE_DENTRY)
2886        seq_printf(m, "%.*s%08x\t", OCFS2_DENTRY_LOCK_INO_START - 1,
2887               lockres->l_name,
2888               (unsigned int)ocfs2_get_dentry_lock_ino(lockres));
2889    else
2890        seq_printf(m, "%.*s\t", OCFS2_LOCK_ID_MAX_LEN, lockres->l_name);
2891
2892    seq_printf(m, "%d\t"
2893           "0x%lx\t"
2894           "0x%x\t"
2895           "0x%x\t"
2896           "%u\t"
2897           "%u\t"
2898           "%d\t"
2899           "%d\t",
2900           lockres->l_level,
2901           lockres->l_flags,
2902           lockres->l_action,
2903           lockres->l_unlock_action,
2904           lockres->l_ro_holders,
2905           lockres->l_ex_holders,
2906           lockres->l_requested,
2907           lockres->l_blocking);
2908
2909    /* Dump the raw LVB */
2910    lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2911    for(i = 0; i < DLM_LVB_LEN; i++)
2912        seq_printf(m, "0x%x\t", lvb[i]);
2913
2914#ifdef CONFIG_OCFS2_FS_STATS
2915# define lock_num_prmode(_l) (_l)->l_lock_num_prmode
2916# define lock_num_exmode(_l) (_l)->l_lock_num_exmode
2917# define lock_num_prmode_failed(_l) (_l)->l_lock_num_prmode_failed
2918# define lock_num_exmode_failed(_l) (_l)->l_lock_num_exmode_failed
2919# define lock_total_prmode(_l) (_l)->l_lock_total_prmode
2920# define lock_total_exmode(_l) (_l)->l_lock_total_exmode
2921# define lock_max_prmode(_l) (_l)->l_lock_max_prmode
2922# define lock_max_exmode(_l) (_l)->l_lock_max_exmode
2923# define lock_refresh(_l) (_l)->l_lock_refresh
2924#else
2925# define lock_num_prmode(_l) (0ULL)
2926# define lock_num_exmode(_l) (0ULL)
2927# define lock_num_prmode_failed(_l) (0)
2928# define lock_num_exmode_failed(_l) (0)
2929# define lock_total_prmode(_l) (0ULL)
2930# define lock_total_exmode(_l) (0ULL)
2931# define lock_max_prmode(_l) (0)
2932# define lock_max_exmode(_l) (0)
2933# define lock_refresh(_l) (0)
2934#endif
2935    /* The following seq_print was added in version 2 of this output */
2936    seq_printf(m, "%llu\t"
2937           "%llu\t"
2938           "%u\t"
2939           "%u\t"
2940           "%llu\t"
2941           "%llu\t"
2942           "%u\t"
2943           "%u\t"
2944           "%u\t",
2945           lock_num_prmode(lockres),
2946           lock_num_exmode(lockres),
2947           lock_num_prmode_failed(lockres),
2948           lock_num_exmode_failed(lockres),
2949           lock_total_prmode(lockres),
2950           lock_total_exmode(lockres),
2951           lock_max_prmode(lockres),
2952           lock_max_exmode(lockres),
2953           lock_refresh(lockres));
2954
2955    /* End the line */
2956    seq_printf(m, "\n");
2957    return 0;
2958}
2959
2960static const struct seq_operations ocfs2_dlm_seq_ops = {
2961    .start = ocfs2_dlm_seq_start,
2962    .stop = ocfs2_dlm_seq_stop,
2963    .next = ocfs2_dlm_seq_next,
2964    .show = ocfs2_dlm_seq_show,
2965};
2966
2967static int ocfs2_dlm_debug_release(struct inode *inode, struct file *file)
2968{
2969    struct seq_file *seq = (struct seq_file *) file->private_data;
2970    struct ocfs2_dlm_seq_priv *priv = seq->private;
2971    struct ocfs2_lock_res *res = &priv->p_iter_res;
2972
2973    ocfs2_remove_lockres_tracking(res);
2974    ocfs2_put_dlm_debug(priv->p_dlm_debug);
2975    return seq_release_private(inode, file);
2976}
2977
2978static int ocfs2_dlm_debug_open(struct inode *inode, struct file *file)
2979{
2980    int ret;
2981    struct ocfs2_dlm_seq_priv *priv;
2982    struct seq_file *seq;
2983    struct ocfs2_super *osb;
2984
2985    priv = kzalloc(sizeof(struct ocfs2_dlm_seq_priv), GFP_KERNEL);
2986    if (!priv) {
2987        ret = -ENOMEM;
2988        mlog_errno(ret);
2989        goto out;
2990    }
2991    osb = inode->i_private;
2992    ocfs2_get_dlm_debug(osb->osb_dlm_debug);
2993    priv->p_dlm_debug = osb->osb_dlm_debug;
2994    INIT_LIST_HEAD(&priv->p_iter_res.l_debug_list);
2995
2996    ret = seq_open(file, &ocfs2_dlm_seq_ops);
2997    if (ret) {
2998        kfree(priv);
2999        mlog_errno(ret);
3000        goto out;
3001    }
3002
3003    seq = (struct seq_file *) file->private_data;
3004    seq->private = priv;
3005
3006    ocfs2_add_lockres_tracking(&priv->p_iter_res,
3007                   priv->p_dlm_debug);
3008
3009out:
3010    return ret;
3011}
3012
3013static const struct file_operations ocfs2_dlm_debug_fops = {
3014    .open = ocfs2_dlm_debug_open,
3015    .release = ocfs2_dlm_debug_release,
3016    .read = seq_read,
3017    .llseek = seq_lseek,
3018};
3019
3020static int ocfs2_dlm_init_debug(struct ocfs2_super *osb)
3021{
3022    int ret = 0;
3023    struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug;
3024
3025    dlm_debug->d_locking_state = debugfs_create_file("locking_state",
3026                             S_IFREG|S_IRUSR,
3027                             osb->osb_debug_root,
3028                             osb,
3029                             &ocfs2_dlm_debug_fops);
3030    if (!dlm_debug->d_locking_state) {
3031        ret = -EINVAL;
3032        mlog(ML_ERROR,
3033             "Unable to create locking state debugfs file.\n");
3034        goto out;
3035    }
3036
3037    ocfs2_get_dlm_debug(dlm_debug);
3038out:
3039    return ret;
3040}
3041
3042static void ocfs2_dlm_shutdown_debug(struct ocfs2_super *osb)
3043{
3044    struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug;
3045
3046    if (dlm_debug) {
3047        debugfs_remove(dlm_debug->d_locking_state);
3048        ocfs2_put_dlm_debug(dlm_debug);
3049    }
3050}
3051
3052int ocfs2_dlm_init(struct ocfs2_super *osb)
3053{
3054    int status = 0;
3055    struct ocfs2_cluster_connection *conn = NULL;
3056
3057    mlog_entry_void();
3058
3059    if (ocfs2_mount_local(osb)) {
3060        osb->node_num = 0;
3061        goto local;
3062    }
3063
3064    status = ocfs2_dlm_init_debug(osb);
3065    if (status < 0) {
3066        mlog_errno(status);
3067        goto bail;
3068    }
3069
3070    /* launch downconvert thread */
3071    osb->dc_task = kthread_run(ocfs2_downconvert_thread, osb, "ocfs2dc");
3072    if (IS_ERR(osb->dc_task)) {
3073        status = PTR_ERR(osb->dc_task);
3074        osb->dc_task = NULL;
3075        mlog_errno(status);
3076        goto bail;
3077    }
3078
3079    /* for now, uuid == domain */
3080    status = ocfs2_cluster_connect(osb->osb_cluster_stack,
3081                       osb->uuid_str,
3082                       strlen(osb->uuid_str),
3083                       &lproto, ocfs2_do_node_down, osb,
3084                       &conn);
3085    if (status) {
3086        mlog_errno(status);
3087        goto bail;
3088    }
3089
3090    status = ocfs2_cluster_this_node(&osb->node_num);
3091    if (status < 0) {
3092        mlog_errno(status);
3093        mlog(ML_ERROR,
3094             "could not find this host's node number\n");
3095        ocfs2_cluster_disconnect(conn, 0);
3096        goto bail;
3097    }
3098
3099local:
3100    ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb);
3101    ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb);
3102    ocfs2_nfs_sync_lock_res_init(&osb->osb_nfs_sync_lockres, osb);
3103    ocfs2_orphan_scan_lock_res_init(&osb->osb_orphan_scan.os_lockres, osb);
3104
3105    osb->cconn = conn;
3106
3107    status = 0;
3108bail:
3109    if (status < 0) {
3110        ocfs2_dlm_shutdown_debug(osb);
3111        if (osb->dc_task)
3112            kthread_stop(osb->dc_task);
3113    }
3114
3115    mlog_exit(status);
3116    return status;
3117}
3118
3119void ocfs2_dlm_shutdown(struct ocfs2_super *osb,
3120            int hangup_pending)
3121{
3122    mlog_entry_void();
3123
3124    ocfs2_drop_osb_locks(osb);
3125
3126    /*
3127     * Now that we have dropped all locks and ocfs2_dismount_volume()
3128     * has disabled recovery, the DLM won't be talking to us. It's
3129     * safe to tear things down before disconnecting the cluster.
3130     */
3131
3132    if (osb->dc_task) {
3133        kthread_stop(osb->dc_task);
3134        osb->dc_task = NULL;
3135    }
3136
3137    ocfs2_lock_res_free(&osb->osb_super_lockres);
3138    ocfs2_lock_res_free(&osb->osb_rename_lockres);
3139    ocfs2_lock_res_free(&osb->osb_nfs_sync_lockres);
3140    ocfs2_lock_res_free(&osb->osb_orphan_scan.os_lockres);
3141
3142    ocfs2_cluster_disconnect(osb->cconn, hangup_pending);
3143    osb->cconn = NULL;
3144
3145    ocfs2_dlm_shutdown_debug(osb);
3146
3147    mlog_exit_void();
3148}
3149
3150static int ocfs2_drop_lock(struct ocfs2_super *osb,
3151               struct ocfs2_lock_res *lockres)
3152{
3153    int ret;
3154    unsigned long flags;
3155    u32 lkm_flags = 0;
3156
3157    /* We didn't get anywhere near actually using this lockres. */
3158    if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED))
3159        goto out;
3160
3161    if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
3162        lkm_flags |= DLM_LKF_VALBLK;
3163
3164    spin_lock_irqsave(&lockres->l_lock, flags);
3165
3166    mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_FREEING),
3167            "lockres %s, flags 0x%lx\n",
3168            lockres->l_name, lockres->l_flags);
3169
3170    while (lockres->l_flags & OCFS2_LOCK_BUSY) {
3171        mlog(0, "waiting on busy lock \"%s\": flags = %lx, action = "
3172             "%u, unlock_action = %u\n",
3173             lockres->l_name, lockres->l_flags, lockres->l_action,
3174             lockres->l_unlock_action);
3175
3176        spin_unlock_irqrestore(&lockres->l_lock, flags);
3177
3178        /* XXX: Today we just wait on any busy
3179         * locks... Perhaps we need to cancel converts in the
3180         * future? */
3181        ocfs2_wait_on_busy_lock(lockres);
3182
3183        spin_lock_irqsave(&lockres->l_lock, flags);
3184    }
3185
3186    if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) {
3187        if (lockres->l_flags & OCFS2_LOCK_ATTACHED &&
3188            lockres->l_level == DLM_LOCK_EX &&
3189            !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH))
3190            lockres->l_ops->set_lvb(lockres);
3191    }
3192
3193    if (lockres->l_flags & OCFS2_LOCK_BUSY)
3194        mlog(ML_ERROR, "destroying busy lock: \"%s\"\n",
3195             lockres->l_name);
3196    if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
3197        mlog(0, "destroying blocked lock: \"%s\"\n", lockres->l_name);
3198
3199    if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
3200        spin_unlock_irqrestore(&lockres->l_lock, flags);
3201        goto out;
3202    }
3203
3204    lockres_clear_flags(lockres, OCFS2_LOCK_ATTACHED);
3205
3206    /* make sure we never get here while waiting for an ast to
3207     * fire. */
3208    BUG_ON(lockres->l_action != OCFS2_AST_INVALID);
3209
3210    /* is this necessary? */
3211    lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
3212    lockres->l_unlock_action = OCFS2_UNLOCK_DROP_LOCK;
3213    spin_unlock_irqrestore(&lockres->l_lock, flags);
3214
3215    mlog(0, "lock %s\n", lockres->l_name);
3216
3217    ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, lkm_flags);
3218    if (ret) {
3219        ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres);
3220        mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags);
3221        ocfs2_dlm_dump_lksb(&lockres->l_lksb);
3222        BUG();
3223    }
3224    mlog(0, "lock %s, successful return from ocfs2_dlm_unlock\n",
3225         lockres->l_name);
3226
3227    ocfs2_wait_on_busy_lock(lockres);
3228out:
3229    mlog_exit(0);
3230    return 0;
3231}
3232
3233/* Mark the lockres as being dropped. It will no longer be
3234 * queued if blocking, but we still may have to wait on it
3235 * being dequeued from the downconvert thread before we can consider
3236 * it safe to drop.
3237 *
3238 * You can *not* attempt to call cluster_lock on this lockres anymore. */
3239void ocfs2_mark_lockres_freeing(struct ocfs2_lock_res *lockres)
3240{
3241    int status;
3242    struct ocfs2_mask_waiter mw;
3243    unsigned long flags;
3244
3245    ocfs2_init_mask_waiter(&mw);
3246
3247    spin_lock_irqsave(&lockres->l_lock, flags);
3248    lockres->l_flags |= OCFS2_LOCK_FREEING;
3249    while (lockres->l_flags & OCFS2_LOCK_QUEUED) {
3250        lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_QUEUED, 0);
3251        spin_unlock_irqrestore(&lockres->l_lock, flags);
3252
3253        mlog(0, "Waiting on lockres %s\n", lockres->l_name);
3254
3255        status = ocfs2_wait_for_mask(&mw);
3256        if (status)
3257            mlog_errno(status);
3258
3259        spin_lock_irqsave(&lockres->l_lock, flags);
3260    }
3261    spin_unlock_irqrestore(&lockres->l_lock, flags);
3262}
3263
3264void ocfs2_simple_drop_lockres(struct ocfs2_super *osb,
3265                   struct ocfs2_lock_res *lockres)
3266{
3267    int ret;
3268
3269    ocfs2_mark_lockres_freeing(lockres);
3270    ret = ocfs2_drop_lock(osb, lockres);
3271    if (ret)
3272        mlog_errno(ret);
3273}
3274
3275static void ocfs2_drop_osb_locks(struct ocfs2_super *osb)
3276{
3277    ocfs2_simple_drop_lockres(osb, &osb->osb_super_lockres);
3278    ocfs2_simple_drop_lockres(osb, &osb->osb_rename_lockres);
3279    ocfs2_simple_drop_lockres(osb, &osb->osb_nfs_sync_lockres);
3280    ocfs2_simple_drop_lockres(osb, &osb->osb_orphan_scan.os_lockres);
3281}
3282
3283int ocfs2_drop_inode_locks(struct inode *inode)
3284{
3285    int status, err;
3286
3287    mlog_entry_void();
3288
3289    /* No need to call ocfs2_mark_lockres_freeing here -
3290     * ocfs2_clear_inode has done it for us. */
3291
3292    err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
3293                  &OCFS2_I(inode)->ip_open_lockres);
3294    if (err < 0)
3295        mlog_errno(err);
3296
3297    status = err;
3298
3299    err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
3300                  &OCFS2_I(inode)->ip_inode_lockres);
3301    if (err < 0)
3302        mlog_errno(err);
3303    if (err < 0 && !status)
3304        status = err;
3305
3306    err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
3307                  &OCFS2_I(inode)->ip_rw_lockres);
3308    if (err < 0)
3309        mlog_errno(err);
3310    if (err < 0 && !status)
3311        status = err;
3312
3313    mlog_exit(status);
3314    return status;
3315}
3316
3317static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
3318                          int new_level)
3319{
3320    assert_spin_locked(&lockres->l_lock);
3321
3322    BUG_ON(lockres->l_blocking <= DLM_LOCK_NL);
3323
3324    if (lockres->l_level <= new_level) {
3325        mlog(ML_ERROR, "lockres %s, lvl %d <= %d, blcklst %d, mask %d, "
3326             "type %d, flags 0x%lx, hold %d %d, act %d %d, req %d, "
3327             "block %d, pgen %d\n", lockres->l_name, lockres->l_level,
3328             new_level, list_empty(&lockres->l_blocked_list),
3329             list_empty(&lockres->l_mask_waiters), lockres->l_type,
3330             lockres->l_flags, lockres->l_ro_holders,
3331             lockres->l_ex_holders, lockres->l_action,
3332             lockres->l_unlock_action, lockres->l_requested,
3333             lockres->l_blocking, lockres->l_pending_gen);
3334        BUG();
3335    }
3336
3337    mlog(ML_BASTS, "lockres %s, level %d => %d, blocking %d\n",
3338         lockres->l_name, lockres->l_level, new_level, lockres->l_blocking);
3339
3340    lockres->l_action = OCFS2_AST_DOWNCONVERT;
3341    lockres->l_requested = new_level;
3342    lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
3343    return lockres_set_pending(lockres);
3344}
3345
3346static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
3347                  struct ocfs2_lock_res *lockres,
3348                  int new_level,
3349                  int lvb,
3350                  unsigned int generation)
3351{
3352    int ret;
3353    u32 dlm_flags = DLM_LKF_CONVERT;
3354
3355    mlog_entry_void();
3356
3357    mlog(ML_BASTS, "lockres %s, level %d => %d\n", lockres->l_name,
3358         lockres->l_level, new_level);
3359
3360    if (lvb)
3361        dlm_flags |= DLM_LKF_VALBLK;
3362
3363    ret = ocfs2_dlm_lock(osb->cconn,
3364                 new_level,
3365                 &lockres->l_lksb,
3366                 dlm_flags,
3367                 lockres->l_name,
3368                 OCFS2_LOCK_ID_MAX_LEN - 1);
3369    lockres_clear_pending(lockres, generation, osb);
3370    if (ret) {
3371        ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
3372        ocfs2_recover_from_dlm_error(lockres, 1);
3373        goto bail;
3374    }
3375
3376    ret = 0;
3377bail:
3378    mlog_exit(ret);
3379    return ret;
3380}
3381
3382/* returns 1 when the caller should unlock and call ocfs2_dlm_unlock */
3383static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb,
3384                        struct ocfs2_lock_res *lockres)
3385{
3386    assert_spin_locked(&lockres->l_lock);
3387
3388    mlog_entry_void();
3389
3390    if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) {
3391        /* If we're already trying to cancel a lock conversion
3392         * then just drop the spinlock and allow the caller to
3393         * requeue this lock. */
3394        mlog(ML_BASTS, "lockres %s, skip convert\n", lockres->l_name);
3395        return 0;
3396    }
3397
3398    /* were we in a convert when we got the bast fire? */
3399    BUG_ON(lockres->l_action != OCFS2_AST_CONVERT &&
3400           lockres->l_action != OCFS2_AST_DOWNCONVERT);
3401    /* set things up for the unlockast to know to just
3402     * clear out the ast_action and unset busy, etc. */
3403    lockres->l_unlock_action = OCFS2_UNLOCK_CANCEL_CONVERT;
3404
3405    mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_BUSY),
3406            "lock %s, invalid flags: 0x%lx\n",
3407            lockres->l_name, lockres->l_flags);
3408
3409    mlog(ML_BASTS, "lockres %s\n", lockres->l_name);
3410
3411    return 1;
3412}
3413
3414static int ocfs2_cancel_convert(struct ocfs2_super *osb,
3415                struct ocfs2_lock_res *lockres)
3416{
3417    int ret;
3418
3419    mlog_entry_void();
3420
3421    ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb,
3422                   DLM_LKF_CANCEL);
3423    if (ret) {
3424        ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres);
3425        ocfs2_recover_from_dlm_error(lockres, 0);
3426    }
3427
3428    mlog(ML_BASTS, "lockres %s\n", lockres->l_name);
3429
3430    mlog_exit(ret);
3431    return ret;
3432}
3433
3434static int ocfs2_unblock_lock(struct ocfs2_super *osb,
3435                  struct ocfs2_lock_res *lockres,
3436                  struct ocfs2_unblock_ctl *ctl)
3437{
3438    unsigned long flags;
3439    int blocking;
3440    int new_level;
3441    int level;
3442    int ret = 0;
3443    int set_lvb = 0;
3444    unsigned int gen;
3445
3446    mlog_entry_void();
3447
3448    spin_lock_irqsave(&lockres->l_lock, flags);
3449
3450recheck:
3451    /*
3452     * Is it still blocking? If not, we have no more work to do.
3453     */
3454    if (!(lockres->l_flags & OCFS2_LOCK_BLOCKED)) {
3455        BUG_ON(lockres->l_blocking != DLM_LOCK_NL);
3456        spin_unlock_irqrestore(&lockres->l_lock, flags);
3457        ret = 0;
3458        goto leave;
3459    }
3460
3461    if (lockres->l_flags & OCFS2_LOCK_BUSY) {
3462        /* XXX
3463         * This is a *big* race. The OCFS2_LOCK_PENDING flag
3464         * exists entirely for one reason - another thread has set
3465         * OCFS2_LOCK_BUSY, but has *NOT* yet called dlm_lock().
3466         *
3467         * If we do ocfs2_cancel_convert() before the other thread
3468         * calls dlm_lock(), our cancel will do nothing. We will
3469         * get no ast, and we will have no way of knowing the
3470         * cancel failed. Meanwhile, the other thread will call
3471         * into dlm_lock() and wait...forever.
3472         *
3473         * Why forever? Because another node has asked for the
3474         * lock first; that's why we're here in unblock_lock().
3475         *
3476         * The solution is OCFS2_LOCK_PENDING. When PENDING is
3477         * set, we just requeue the unblock. Only when the other
3478         * thread has called dlm_lock() and cleared PENDING will
3479         * we then cancel their request.
3480         *
3481         * All callers of dlm_lock() must set OCFS2_DLM_PENDING
3482         * at the same time they set OCFS2_DLM_BUSY. They must
3483         * clear OCFS2_DLM_PENDING after dlm_lock() returns.
3484         */
3485        if (lockres->l_flags & OCFS2_LOCK_PENDING) {
3486            mlog(ML_BASTS, "lockres %s, ReQ: Pending\n",
3487                 lockres->l_name);
3488            goto leave_requeue;
3489        }
3490
3491        ctl->requeue = 1;
3492        ret = ocfs2_prepare_cancel_convert(osb, lockres);
3493        spin_unlock_irqrestore(&lockres->l_lock, flags);
3494        if (ret) {
3495            ret = ocfs2_cancel_convert(osb, lockres);
3496            if (ret < 0)
3497                mlog_errno(ret);
3498        }
3499        goto leave;
3500    }
3501
3502    /*
3503     * This prevents livelocks. OCFS2_LOCK_UPCONVERT_FINISHING flag is
3504     * set when the ast is received for an upconvert just before the
3505     * OCFS2_LOCK_BUSY flag is cleared. Now if the fs received a bast
3506     * on the heels of the ast, we want to delay the downconvert just
3507     * enough to allow the up requestor to do its task. Because this
3508     * lock is in the blocked queue, the lock will be downconverted
3509     * as soon as the requestor is done with the lock.
3510     */
3511    if (lockres->l_flags & OCFS2_LOCK_UPCONVERT_FINISHING)
3512        goto leave_requeue;
3513
3514    /*
3515     * How can we block and yet be at NL? We were trying to upconvert
3516     * from NL and got canceled. The code comes back here, and now
3517     * we notice and clear BLOCKING.
3518     */
3519    if (lockres->l_level == DLM_LOCK_NL) {
3520        BUG_ON(lockres->l_ex_holders || lockres->l_ro_holders);
3521        mlog(ML_BASTS, "lockres %s, Aborting dc\n", lockres->l_name);
3522        lockres->l_blocking = DLM_LOCK_NL;
3523        lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED);
3524        spin_unlock_irqrestore(&lockres->l_lock, flags);
3525        goto leave;
3526    }
3527
3528    /* if we're blocking an exclusive and we have *any* holders,
3529     * then requeue. */
3530    if ((lockres->l_blocking == DLM_LOCK_EX)
3531        && (lockres->l_ex_holders || lockres->l_ro_holders)) {
3532        mlog(ML_BASTS, "lockres %s, ReQ: EX/PR Holders %u,%u\n",
3533             lockres->l_name, lockres->l_ex_holders,
3534             lockres->l_ro_holders);
3535        goto leave_requeue;
3536    }
3537
3538    /* If it's a PR we're blocking, then only
3539     * requeue if we've got any EX holders */
3540    if (lockres->l_blocking == DLM_LOCK_PR &&
3541        lockres->l_ex_holders) {
3542        mlog(ML_BASTS, "lockres %s, ReQ: EX Holders %u\n",
3543             lockres->l_name, lockres->l_ex_holders);
3544        goto leave_requeue;
3545    }
3546
3547    /*
3548     * Can we get a lock in this state if the holder counts are
3549     * zero? The meta data unblock code used to check this.
3550     */
3551    if ((lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
3552        && (lockres->l_flags & OCFS2_LOCK_REFRESHING)) {
3553        mlog(ML_BASTS, "lockres %s, ReQ: Lock Refreshing\n",
3554             lockres->l_name);
3555        goto leave_requeue;
3556    }
3557
3558    new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking);
3559
3560    if (lockres->l_ops->check_downconvert
3561        && !lockres->l_ops->check_downconvert(lockres, new_level)) {
3562        mlog(ML_BASTS, "lockres %s, ReQ: Checkpointing\n",
3563             lockres->l_name);
3564        goto leave_requeue;
3565    }
3566
3567    /* If we get here, then we know that there are no more
3568     * incompatible holders (and anyone asking for an incompatible
3569     * lock is blocked). We can now downconvert the lock */
3570    if (!lockres->l_ops->downconvert_worker)
3571        goto downconvert;
3572
3573    /* Some lockres types want to do a bit of work before
3574     * downconverting a lock. Allow that here. The worker function
3575     * may sleep, so we save off a copy of what we're blocking as
3576     * it may change while we're not holding the spin lock. */
3577    blocking = lockres->l_blocking;
3578    level = lockres->l_level;
3579    spin_unlock_irqrestore(&lockres->l_lock, flags);
3580
3581    ctl->unblock_action = lockres->l_ops->downconvert_worker(lockres, blocking);
3582
3583    if (ctl->unblock_action == UNBLOCK_STOP_POST) {
3584        mlog(ML_BASTS, "lockres %s, UNBLOCK_STOP_POST\n",
3585             lockres->l_name);
3586        goto leave;
3587    }
3588
3589    spin_lock_irqsave(&lockres->l_lock, flags);
3590    if ((blocking != lockres->l_blocking) || (level != lockres->l_level)) {
3591        /* If this changed underneath us, then we can't drop
3592         * it just yet. */
3593        mlog(ML_BASTS, "lockres %s, block=%d:%d, level=%d:%d, "
3594             "Recheck\n", lockres->l_name, blocking,
3595             lockres->l_blocking, level, lockres->l_level);
3596        goto recheck;
3597    }
3598
3599downconvert:
3600    ctl->requeue = 0;
3601
3602    if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) {
3603        if (lockres->l_level == DLM_LOCK_EX)
3604            set_lvb = 1;
3605
3606        /*
3607         * We only set the lvb if the lock has been fully
3608         * refreshed - otherwise we risk setting stale
3609         * data. Otherwise, there's no need to actually clear
3610         * out the lvb here as it's value is still valid.
3611         */
3612        if (set_lvb && !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH))
3613            lockres->l_ops->set_lvb(lockres);
3614    }
3615
3616    gen = ocfs2_prepare_downconvert(lockres, new_level);
3617    spin_unlock_irqrestore(&lockres->l_lock, flags);
3618    ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb,
3619                     gen);
3620
3621leave:
3622    mlog_exit(ret);
3623    return ret;
3624
3625leave_requeue:
3626    spin_unlock_irqrestore(&lockres->l_lock, flags);
3627    ctl->requeue = 1;
3628
3629    mlog_exit(0);
3630    return 0;
3631}
3632
3633static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
3634                     int blocking)
3635{
3636    struct inode *inode;
3637    struct address_space *mapping;
3638
3639           inode = ocfs2_lock_res_inode(lockres);
3640    mapping = inode->i_mapping;
3641
3642    if (!S_ISREG(inode->i_mode))
3643        goto out;
3644
3645    /*
3646     * We need this before the filemap_fdatawrite() so that it can
3647     * transfer the dirty bit from the PTE to the
3648     * page. Unfortunately this means that even for EX->PR
3649     * downconverts, we'll lose our mappings and have to build
3650     * them up again.
3651     */
3652    unmap_mapping_range(mapping, 0, 0, 0);
3653
3654    if (filemap_fdatawrite(mapping)) {
3655        mlog(ML_ERROR, "Could not sync inode %llu for downconvert!",
3656             (unsigned long long)OCFS2_I(inode)->ip_blkno);
3657    }
3658    sync_mapping_buffers(mapping);
3659    if (blocking == DLM_LOCK_EX) {
3660        truncate_inode_pages(mapping, 0);
3661    } else {
3662        /* We only need to wait on the I/O if we're not also
3663         * truncating pages because truncate_inode_pages waits
3664         * for us above. We don't truncate pages if we're
3665         * blocking anything < EXMODE because we want to keep
3666         * them around in that case. */
3667        filemap_fdatawait(mapping);
3668    }
3669
3670out:
3671    return UNBLOCK_CONTINUE;
3672}
3673
3674static int ocfs2_ci_checkpointed(struct ocfs2_caching_info *ci,
3675                 struct ocfs2_lock_res *lockres,
3676                 int new_level)
3677{
3678    int checkpointed = ocfs2_ci_fully_checkpointed(ci);
3679
3680    BUG_ON(new_level != DLM_LOCK_NL && new_level != DLM_LOCK_PR);
3681    BUG_ON(lockres->l_level != DLM_LOCK_EX && !checkpointed);
3682
3683    if (checkpointed)
3684        return 1;
3685
3686    ocfs2_start_checkpoint(OCFS2_SB(ocfs2_metadata_cache_get_super(ci)));
3687    return 0;
3688}
3689
3690static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres,
3691                    int new_level)
3692{
3693    struct inode *inode = ocfs2_lock_res_inode(lockres);
3694
3695    return ocfs2_ci_checkpointed(INODE_CACHE(inode), lockres, new_level);
3696}
3697
3698static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres)
3699{
3700    struct inode *inode = ocfs2_lock_res_inode(lockres);
3701
3702    __ocfs2_stuff_meta_lvb(inode);
3703}
3704
3705/*
3706 * Does the final reference drop on our dentry lock. Right now this
3707 * happens in the downconvert thread, but we could choose to simplify the
3708 * dlmglue API and push these off to the ocfs2_wq in the future.
3709 */
3710static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
3711                     struct ocfs2_lock_res *lockres)
3712{
3713    struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
3714    ocfs2_dentry_lock_put(osb, dl);
3715}
3716
3717/*
3718 * d_delete() matching dentries before the lock downconvert.
3719 *
3720 * At this point, any process waiting to destroy the
3721 * dentry_lock due to last ref count is stopped by the
3722 * OCFS2_LOCK_QUEUED flag.
3723 *
3724 * We have two potential problems
3725 *
3726 * 1) If we do the last reference drop on our dentry_lock (via dput)
3727 * we'll wind up in ocfs2_release_dentry_lock(), waiting on
3728 * the downconvert to finish. Instead we take an elevated
3729 * reference and push the drop until after we've completed our
3730 * unblock processing.
3731 *
3732 * 2) There might be another process with a final reference,
3733 * waiting on us to finish processing. If this is the case, we
3734 * detect it and exit out - there's no more dentries anyway.
3735 */
3736static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
3737                       int blocking)
3738{
3739    struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
3740    struct ocfs2_inode_info *oi = OCFS2_I(dl->dl_inode);
3741    struct dentry *dentry;
3742    unsigned long flags;
3743    int extra_ref = 0;
3744
3745    /*
3746     * This node is blocking another node from getting a read
3747     * lock. This happens when we've renamed within a
3748     * directory. We've forced the other nodes to d_delete(), but
3749     * we never actually dropped our lock because it's still
3750     * valid. The downconvert code will retain a PR for this node,
3751     * so there's no further work to do.
3752     */
3753    if (blocking == DLM_LOCK_PR)
3754        return UNBLOCK_CONTINUE;
3755
3756    /*
3757     * Mark this inode as potentially orphaned. The code in
3758     * ocfs2_delete_inode() will figure out whether it actually
3759     * needs to be freed or not.
3760     */
3761    spin_lock(&oi->ip_lock);
3762    oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED;
3763    spin_unlock(&oi->ip_lock);
3764
3765    /*
3766     * Yuck. We need to make sure however that the check of
3767     * OCFS2_LOCK_FREEING and the extra reference are atomic with
3768     * respect to a reference decrement or the setting of that
3769     * flag.
3770     */
3771    spin_lock_irqsave(&lockres->l_lock, flags);
3772    spin_lock(&dentry_attach_lock);
3773    if (!(lockres->l_flags & OCFS2_LOCK_FREEING)
3774        && dl->dl_count) {
3775        dl->dl_count++;
3776        extra_ref = 1;
3777    }
3778    spin_unlock(&dentry_attach_lock);
3779    spin_unlock_irqrestore(&lockres->l_lock, flags);
3780
3781    mlog(0, "extra_ref = %d\n", extra_ref);
3782
3783    /*
3784     * We have a process waiting on us in ocfs2_dentry_iput(),
3785     * which means we can't have any more outstanding
3786     * aliases. There's no need to do any more work.
3787     */
3788    if (!extra_ref)
3789        return UNBLOCK_CONTINUE;
3790
3791    spin_lock(&dentry_attach_lock);
3792    while (1) {
3793        dentry = ocfs2_find_local_alias(dl->dl_inode,
3794                        dl->dl_parent_blkno, 1);
3795        if (!dentry)
3796            break;
3797        spin_unlock(&dentry_attach_lock);
3798
3799        mlog(0, "d_delete(%.*s);\n", dentry->d_name.len,
3800             dentry->d_name.name);
3801
3802        /*
3803         * The following dcache calls may do an
3804         * iput(). Normally we don't want that from the
3805         * downconverting thread, but in this case it's ok
3806         * because the requesting node already has an
3807         * exclusive lock on the inode, so it can't be queued
3808         * for a downconvert.
3809         */
3810        d_delete(dentry);
3811        dput(dentry);
3812
3813        spin_lock(&dentry_attach_lock);
3814    }
3815    spin_unlock(&dentry_attach_lock);
3816
3817    /*
3818     * If we are the last holder of this dentry lock, there is no
3819     * reason to downconvert so skip straight to the unlock.
3820     */
3821    if (dl->dl_count == 1)
3822        return UNBLOCK_STOP_POST;
3823
3824    return UNBLOCK_CONTINUE_POST;
3825}
3826
3827static int ocfs2_check_refcount_downconvert(struct ocfs2_lock_res *lockres,
3828                        int new_level)
3829{
3830    struct ocfs2_refcount_tree *tree =
3831                ocfs2_lock_res_refcount_tree(lockres);
3832
3833    return ocfs2_ci_checkpointed(&tree->rf_ci, lockres, new_level);
3834}
3835
3836static int ocfs2_refcount_convert_worker(struct ocfs2_lock_res *lockres,
3837                     int blocking)
3838{
3839    struct ocfs2_refcount_tree *tree =
3840                ocfs2_lock_res_refcount_tree(lockres);
3841
3842    ocfs2_metadata_cache_purge(&tree->rf_ci);
3843
3844    return UNBLOCK_CONTINUE;
3845}
3846
3847static void ocfs2_set_qinfo_lvb(struct ocfs2_lock_res *lockres)
3848{
3849    struct ocfs2_qinfo_lvb *lvb;
3850    struct ocfs2_mem_dqinfo *oinfo = ocfs2_lock_res_qinfo(lockres);
3851    struct mem_dqinfo *info = sb_dqinfo(oinfo->dqi_gi.dqi_sb,
3852                        oinfo->dqi_gi.dqi_type);
3853
3854    mlog_entry_void();
3855
3856    lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
3857    lvb->lvb_version = OCFS2_QINFO_LVB_VERSION;
3858    lvb->lvb_bgrace = cpu_to_be32(info->dqi_bgrace);
3859    lvb->lvb_igrace = cpu_to_be32(info->dqi_igrace);
3860    lvb->lvb_syncms = cpu_to_be32(oinfo->dqi_syncms);
3861    lvb->lvb_blocks = cpu_to_be32(oinfo->dqi_gi.dqi_blocks);
3862    lvb->lvb_free_blk = cpu_to_be32(oinfo->dqi_gi.dqi_free_blk);
3863    lvb->lvb_free_entry = cpu_to_be32(oinfo->dqi_gi.dqi_free_entry);
3864
3865    mlog_exit_void();
3866}
3867
3868void ocfs2_qinfo_unlock(struct ocfs2_mem_dqinfo *oinfo, int ex)
3869{
3870    struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock;
3871    struct ocfs2_super *osb = OCFS2_SB(oinfo->dqi_gi.dqi_sb);
3872    int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
3873
3874    mlog_entry_void();
3875    if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb))
3876        ocfs2_cluster_unlock(osb, lockres, level);
3877    mlog_exit_void();
3878}
3879
3880static int ocfs2_refresh_qinfo(struct ocfs2_mem_dqinfo *oinfo)
3881{
3882    struct mem_dqinfo *info = sb_dqinfo(oinfo->dqi_gi.dqi_sb,
3883                        oinfo->dqi_gi.dqi_type);
3884    struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock;
3885    struct ocfs2_qinfo_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
3886    struct buffer_head *bh = NULL;
3887    struct ocfs2_global_disk_dqinfo *gdinfo;
3888    int status = 0;
3889
3890    if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) &&
3891        lvb->lvb_version == OCFS2_QINFO_LVB_VERSION) {
3892        info->dqi_bgrace = be32_to_cpu(lvb->lvb_bgrace);
3893        info->dqi_igrace = be32_to_cpu(lvb->lvb_igrace);
3894        oinfo->dqi_syncms = be32_to_cpu(lvb->lvb_syncms);
3895        oinfo->dqi_gi.dqi_blocks = be32_to_cpu(lvb->lvb_blocks);
3896        oinfo->dqi_gi.dqi_free_blk = be32_to_cpu(lvb->lvb_free_blk);
3897        oinfo->dqi_gi.dqi_free_entry =
3898                    be32_to_cpu(lvb->lvb_free_entry);
3899    } else {
3900        status = ocfs2_read_quota_block(oinfo->dqi_gqinode, 0, &bh);
3901        if (status) {
3902            mlog_errno(status);
3903            goto bail;
3904        }
3905        gdinfo = (struct ocfs2_global_disk_dqinfo *)
3906                    (bh->b_data + OCFS2_GLOBAL_INFO_OFF);
3907        info->dqi_bgrace = le32_to_cpu(gdinfo->dqi_bgrace);
3908        info->dqi_igrace = le32_to_cpu(gdinfo->dqi_igrace);
3909        oinfo->dqi_syncms = le32_to_cpu(gdinfo->dqi_syncms);
3910        oinfo->dqi_gi.dqi_blocks = le32_to_cpu(gdinfo->dqi_blocks);
3911        oinfo->dqi_gi.dqi_free_blk = le32_to_cpu(gdinfo->dqi_free_blk);
3912        oinfo->dqi_gi.dqi_free_entry =
3913                    le32_to_cpu(gdinfo->dqi_free_entry);
3914        brelse(bh);
3915        ocfs2_track_lock_refresh(lockres);
3916    }
3917
3918bail:
3919    return status;
3920}
3921
3922/* Lock quota info, this function expects at least shared lock on the quota file
3923 * so that we can safely refresh quota info from disk. */
3924int ocfs2_qinfo_lock(struct ocfs2_mem_dqinfo *oinfo, int ex)
3925{
3926    struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock;
3927    struct ocfs2_super *osb = OCFS2_SB(oinfo->dqi_gi.dqi_sb);
3928    int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
3929    int status = 0;
3930
3931    mlog_entry_void();
3932
3933    /* On RO devices, locking really isn't needed... */
3934    if (ocfs2_is_hard_readonly(osb)) {
3935        if (ex)
3936            status = -EROFS;
3937        goto bail;
3938    }
3939    if (ocfs2_mount_local(osb))
3940        goto bail;
3941
3942    status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
3943    if (status < 0) {
3944        mlog_errno(status);
3945        goto bail;
3946    }
3947    if (!ocfs2_should_refresh_lock_res(lockres))
3948        goto bail;
3949    /* OK, we have the lock but we need to refresh the quota info */
3950    status = ocfs2_refresh_qinfo(oinfo);
3951    if (status)
3952        ocfs2_qinfo_unlock(oinfo, ex);
3953    ocfs2_complete_lock_res_refresh(lockres, status);
3954bail:
3955    mlog_exit(status);
3956    return status;
3957}
3958
3959int ocfs2_refcount_lock(struct ocfs2_refcount_tree *ref_tree, int ex)
3960{
3961    int status;
3962    int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
3963    struct ocfs2_lock_res *lockres = &ref_tree->rf_lockres;
3964    struct ocfs2_super *osb = lockres->l_priv;
3965
3966
3967    if (ocfs2_is_hard_readonly(osb))
3968        return -EROFS;
3969
3970    if (ocfs2_mount_local(osb))
3971        return 0;
3972
3973    status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
3974    if (status < 0)
3975        mlog_errno(status);
3976
3977    return status;
3978}
3979
3980void ocfs2_refcount_unlock(struct ocfs2_refcount_tree *ref_tree, int ex)
3981{
3982    int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
3983    struct ocfs2_lock_res *lockres = &ref_tree->rf_lockres;
3984    struct ocfs2_super *osb = lockres->l_priv;
3985
3986    if (!ocfs2_mount_local(osb))
3987        ocfs2_cluster_unlock(osb, lockres, level);
3988}
3989
3990static void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
3991                       struct ocfs2_lock_res *lockres)
3992{
3993    int status;
3994    struct ocfs2_unblock_ctl ctl = {0, 0,};
3995    unsigned long flags;
3996
3997    /* Our reference to the lockres in this function can be
3998     * considered valid until we remove the OCFS2_LOCK_QUEUED
3999     * flag. */
4000
4001    mlog_entry_void();
4002
4003    BUG_ON(!lockres);
4004    BUG_ON(!lockres->l_ops);
4005
4006    mlog(ML_BASTS, "lockres %s blocked\n", lockres->l_name);
4007
4008    /* Detect whether a lock has been marked as going away while
4009     * the downconvert thread was processing other things. A lock can
4010     * still be marked with OCFS2_LOCK_FREEING after this check,
4011     * but short circuiting here will still save us some
4012     * performance. */
4013    spin_lock_irqsave(&lockres->l_lock, flags);
4014    if (lockres->l_flags & OCFS2_LOCK_FREEING)
4015        goto unqueue;
4016    spin_unlock_irqrestore(&lockres->l_lock, flags);
4017
4018    status = ocfs2_unblock_lock(osb, lockres, &ctl);
4019    if (status < 0)
4020        mlog_errno(status);
4021
4022    spin_lock_irqsave(&lockres->l_lock, flags);
4023unqueue:
4024    if (lockres->l_flags & OCFS2_LOCK_FREEING || !ctl.requeue) {
4025        lockres_clear_flags(lockres, OCFS2_LOCK_QUEUED);
4026    } else
4027        ocfs2_schedule_blocked_lock(osb, lockres);
4028
4029    mlog(ML_BASTS, "lockres %s, requeue = %s.\n", lockres->l_name,
4030         ctl.requeue ? "yes" : "no");
4031    spin_unlock_irqrestore(&lockres->l_lock, flags);
4032
4033    if (ctl.unblock_action != UNBLOCK_CONTINUE
4034        && lockres->l_ops->post_unlock)
4035        lockres->l_ops->post_unlock(osb, lockres);
4036
4037    mlog_exit_void();
4038}
4039
4040static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
4041                    struct ocfs2_lock_res *lockres)
4042{
4043    mlog_entry_void();
4044
4045    assert_spin_locked(&lockres->l_lock);
4046
4047    if (lockres->l_flags & OCFS2_LOCK_FREEING) {
4048        /* Do not schedule a lock for downconvert when it's on
4049         * the way to destruction - any nodes wanting access
4050         * to the resource will get it soon. */
4051        mlog(ML_BASTS, "lockres %s won't be scheduled: flags 0x%lx\n",
4052             lockres->l_name, lockres->l_flags);
4053        return;
4054    }
4055
4056    lockres_or_flags(lockres, OCFS2_LOCK_QUEUED);
4057
4058    spin_lock(&osb->dc_task_lock);
4059    if (list_empty(&lockres->l_blocked_list)) {
4060        list_add_tail(&lockres->l_blocked_list,
4061                  &osb->blocked_lock_list);
4062        osb->blocked_lock_count++;
4063    }
4064    spin_unlock(&osb->dc_task_lock);
4065
4066    mlog_exit_void();
4067}
4068
4069static void ocfs2_downconvert_thread_do_work(struct ocfs2_super *osb)
4070{
4071    unsigned long processed;
4072    struct ocfs2_lock_res *lockres;
4073
4074    mlog_entry_void();
4075
4076    spin_lock(&osb->dc_task_lock);
4077    /* grab this early so we know to try again if a state change and
4078     * wake happens part-way through our work */
4079    osb->dc_work_sequence = osb->dc_wake_sequence;
4080
4081    processed = osb->blocked_lock_count;
4082    while (processed) {
4083        BUG_ON(list_empty(&osb->blocked_lock_list));
4084
4085        lockres = list_entry(osb->blocked_lock_list.next,
4086                     struct ocfs2_lock_res, l_blocked_list);
4087        list_del_init(&lockres->l_blocked_list);
4088        osb->blocked_lock_count--;
4089        spin_unlock(&osb->dc_task_lock);
4090
4091        BUG_ON(!processed);
4092        processed--;
4093
4094        ocfs2_process_blocked_lock(osb, lockres);
4095
4096        spin_lock(&osb->dc_task_lock);
4097    }
4098    spin_unlock(&osb->dc_task_lock);
4099
4100    mlog_exit_void();
4101}
4102
4103static int ocfs2_downconvert_thread_lists_empty(struct ocfs2_super *osb)
4104{
4105    int empty = 0;
4106
4107    spin_lock(&osb->dc_task_lock);
4108    if (list_empty(&osb->blocked_lock_list))
4109        empty = 1;
4110
4111    spin_unlock(&osb->dc_task_lock);
4112    return empty;
4113}
4114
4115static int ocfs2_downconvert_thread_should_wake(struct ocfs2_super *osb)
4116{
4117    int should_wake = 0;
4118
4119    spin_lock(&osb->dc_task_lock);
4120    if (osb->dc_work_sequence != osb->dc_wake_sequence)
4121        should_wake = 1;
4122    spin_unlock(&osb->dc_task_lock);
4123
4124    return should_wake;
4125}
4126
4127static int ocfs2_downconvert_thread(void *arg)
4128{
4129    int status = 0;
4130    struct ocfs2_super *osb = arg;
4131
4132    /* only quit once we've been asked to stop and there is no more
4133     * work available */
4134    while (!(kthread_should_stop() &&
4135        ocfs2_downconvert_thread_lists_empty(osb))) {
4136
4137        wait_event_interruptible(osb->dc_event,
4138                     ocfs2_downconvert_thread_should_wake(osb) ||
4139                     kthread_should_stop());
4140
4141        mlog(0, "downconvert_thread: awoken\n");
4142
4143        ocfs2_downconvert_thread_do_work(osb);
4144    }
4145
4146    osb->dc_task = NULL;
4147    return status;
4148}
4149
4150void ocfs2_wake_downconvert_thread(struct ocfs2_super *osb)
4151{
4152    spin_lock(&osb->dc_task_lock);
4153    /* make sure the voting thread gets a swipe at whatever changes
4154     * the caller may have made to the voting state */
4155    osb->dc_wake_sequence++;
4156    spin_unlock(&osb->dc_task_lock);
4157    wake_up(&osb->dc_event);
4158}
4159

Archive Download this file



interactive