From: Sunil Mushran on
Signed-off-by: Sunil Mushran <sunil.mushran(a)oracle.com>


Joel Becker wrote:
> Inside the stackglue, the locking protocol structure is hanging off of
> the ocfs2_cluster_connection. This takes it one further; the locking
> protocol is passed into ocfs2_cluster_connect(). Now different cluster
> connections can have different locking protocols with distinct asts.
> Note that all locking protocols have to keep their maximum protocol
> version in lock-step.
>
> With the protocol structure set in ocfs2_cluster_connect(), there is no
> need for the stackglue to have a static pointer to a specific protocol
> structure. We can change initialization to only pass in the maximum
> protocol version.
>
> Signed-off-by: Joel Becker <joel.becker(a)oracle.com>
> ---
> fs/ocfs2/dlmglue.c | 168 +++++++++++++++++++++++++-------------------------
> fs/ocfs2/stackglue.c | 43 ++++++++-----
> fs/ocfs2/stackglue.h | 3 +-
> 3 files changed, 110 insertions(+), 104 deletions(-)
>
> diff --git a/fs/ocfs2/dlmglue.c b/fs/ocfs2/dlmglue.c
> index 4cb3ac2..dde55c4 100644
> --- a/fs/ocfs2/dlmglue.c
> +++ b/fs/ocfs2/dlmglue.c
> @@ -1036,7 +1036,6 @@ static unsigned int lockres_set_pending(struct ocfs2_lock_res *lockres)
> return lockres->l_pending_gen;
> }
>
> -
> static void ocfs2_blocking_ast(struct ocfs2_dlm_lksb *lksb, int level)
> {
> struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb);
> @@ -1130,6 +1129,88 @@ out:
> spin_unlock_irqrestore(&lockres->l_lock, flags);
> }
>
> +static void ocfs2_unlock_ast(struct ocfs2_dlm_lksb *lksb, int error)
> +{
> + struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb);
> + unsigned long flags;
> +
> + mlog_entry_void();
> +
> + mlog(0, "UNLOCK AST called on lock %s, action = %d\n", lockres->l_name,
> + lockres->l_unlock_action);
> +
> + spin_lock_irqsave(&lockres->l_lock, flags);
> + if (error) {
> + mlog(ML_ERROR, "Dlm passes error %d for lock %s, "
> + "unlock_action %d\n", error, lockres->l_name,
> + lockres->l_unlock_action);
> + spin_unlock_irqrestore(&lockres->l_lock, flags);
> + mlog_exit_void();
> + return;
> + }
> +
> + switch(lockres->l_unlock_action) {
> + case OCFS2_UNLOCK_CANCEL_CONVERT:
> + mlog(0, "Cancel convert success for %s\n", lockres->l_name);
> + lockres->l_action = OCFS2_AST_INVALID;
> + /* Downconvert thread may have requeued this lock, we
> + * need to wake it. */
> + if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
> + ocfs2_wake_downconvert_thread(ocfs2_get_lockres_osb(lockres));
> + break;
> + case OCFS2_UNLOCK_DROP_LOCK:
> + lockres->l_level = DLM_LOCK_IV;
> + break;
> + default:
> + BUG();
> + }
> +
> + lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
> + lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
> + wake_up(&lockres->l_event);
> + spin_unlock_irqrestore(&lockres->l_lock, flags);
> +
> + mlog_exit_void();
> +}
> +
> +/*
> + * This is the filesystem locking protocol. It provides the lock handling
> + * hooks for the underlying DLM. It has a maximum version number.
> + * The version number allows interoperability with systems running at
> + * the same major number and an equal or smaller minor number.
> + *
> + * Whenever the filesystem does new things with locks (adds or removes a
> + * lock, orders them differently, does different things underneath a lock),
> + * the version must be changed. The protocol is negotiated when joining
> + * the dlm domain. A node may join the domain if its major version is
> + * identical to all other nodes and its minor version is greater than
> + * or equal to all other nodes. When its minor version is greater than
> + * the other nodes, it will run at the minor version specified by the
> + * other nodes.
> + *
> + * If a locking change is made that will not be compatible with older
> + * versions, the major number must be increased and the minor version set
> + * to zero. If a change merely adds a behavior that can be disabled when
> + * speaking to older versions, the minor version must be increased. If a
> + * change adds a fully backwards compatible change (eg, LVB changes that
> + * are just ignored by older versions), the version does not need to be
> + * updated.
> + */
> +static struct ocfs2_locking_protocol lproto = {
> + .lp_max_version = {
> + .pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR,
> + .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR,
> + },
> + .lp_lock_ast = ocfs2_locking_ast,
> + .lp_blocking_ast = ocfs2_blocking_ast,
> + .lp_unlock_ast = ocfs2_unlock_ast,
> +};
> +
> +void ocfs2_set_locking_protocol(void)
> +{
> + ocfs2_stack_glue_set_max_proto_version(&lproto.lp_max_version);
> +}
> +
> static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
> int convert)
> {
> @@ -2959,7 +3040,7 @@ int ocfs2_dlm_init(struct ocfs2_super *osb)
> status = ocfs2_cluster_connect(osb->osb_cluster_stack,
> osb->uuid_str,
> strlen(osb->uuid_str),
> - ocfs2_do_node_down, osb,
> + &lproto, ocfs2_do_node_down, osb,
> &conn);
> if (status) {
> mlog_errno(status);
> @@ -3026,50 +3107,6 @@ void ocfs2_dlm_shutdown(struct ocfs2_super *osb,
> mlog_exit_void();
> }
>
> -static void ocfs2_unlock_ast(struct ocfs2_dlm_lksb *lksb, int error)
> -{
> - struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb);
> - unsigned long flags;
> -
> - mlog_entry_void();
> -
> - mlog(0, "UNLOCK AST called on lock %s, action = %d\n", lockres->l_name,
> - lockres->l_unlock_action);
> -
> - spin_lock_irqsave(&lockres->l_lock, flags);
> - if (error) {
> - mlog(ML_ERROR, "Dlm passes error %d for lock %s, "
> - "unlock_action %d\n", error, lockres->l_name,
> - lockres->l_unlock_action);
> - spin_unlock_irqrestore(&lockres->l_lock, flags);
> - mlog_exit_void();
> - return;
> - }
> -
> - switch(lockres->l_unlock_action) {
> - case OCFS2_UNLOCK_CANCEL_CONVERT:
> - mlog(0, "Cancel convert success for %s\n", lockres->l_name);
> - lockres->l_action = OCFS2_AST_INVALID;
> - /* Downconvert thread may have requeued this lock, we
> - * need to wake it. */
> - if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
> - ocfs2_wake_downconvert_thread(ocfs2_get_lockres_osb(lockres));
> - break;
> - case OCFS2_UNLOCK_DROP_LOCK:
> - lockres->l_level = DLM_LOCK_IV;
> - break;
> - default:
> - BUG();
> - }
> -
> - lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
> - lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
> - wake_up(&lockres->l_event);
> - spin_unlock_irqrestore(&lockres->l_lock, flags);
> -
> - mlog_exit_void();
> -}
> -
> static int ocfs2_drop_lock(struct ocfs2_super *osb,
> struct ocfs2_lock_res *lockres)
> {
> @@ -3843,45 +3880,6 @@ void ocfs2_refcount_unlock(struct ocfs2_refcount_tree *ref_tree, int ex)
> ocfs2_cluster_unlock(osb, lockres, level);
> }
>
> -/*
> - * This is the filesystem locking protocol. It provides the lock handling
> - * hooks for the underlying DLM. It has a maximum version number.
> - * The version number allows interoperability with systems running at
> - * the same major number and an equal or smaller minor number.
> - *
> - * Whenever the filesystem does new things with locks (adds or removes a
> - * lock, orders them differently, does different things underneath a lock),
> - * the version must be changed. The protocol is negotiated when joining
> - * the dlm domain. A node may join the domain if its major version is
> - * identical to all other nodes and its minor version is greater than
> - * or equal to all other nodes. When its minor version is greater than
> - * the other nodes, it will run at the minor version specified by the
> - * other nodes.
> - *
> - * If a locking change is made that will not be compatible with older
> - * versions, the major number must be increased and the minor version set
> - * to zero. If a change merely adds a behavior that can be disabled when
> - * speaking to older versions, the minor version must be increased. If a
> - * change adds a fully backwards compatible change (eg, LVB changes that
> - * are just ignored by older versions), the version does not need to be
> - * updated.
> - */
> -static struct ocfs2_locking_protocol lproto = {
> - .lp_max_version = {
> - .pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR,
> - .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR,
> - },
> - .lp_lock_ast = ocfs2_locking_ast,
> - .lp_blocking_ast = ocfs2_blocking_ast,
> - .lp_unlock_ast = ocfs2_unlock_ast,
> -};
> -
> -void ocfs2_set_locking_protocol(void)
> -{
> - ocfs2_stack_glue_set_locking_protocol(&lproto);
> -}
> -
> -
> static void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
> struct ocfs2_lock_res *lockres)
> {
> diff --git a/fs/ocfs2/stackglue.c b/fs/ocfs2/stackglue.c
> index fc184c7..31db2e8 100644
> --- a/fs/ocfs2/stackglue.c
> +++ b/fs/ocfs2/stackglue.c
> @@ -36,7 +36,7 @@
> #define OCFS2_STACK_PLUGIN_USER "user"
> #define OCFS2_MAX_HB_CTL_PATH 256
>
> -static struct ocfs2_locking_protocol *lproto;
> +static struct ocfs2_protocol_version locking_max_version;
> static DEFINE_SPINLOCK(ocfs2_stack_lock);
> static LIST_HEAD(ocfs2_stack_list);
> static char cluster_stack_name[OCFS2_STACK_LABEL_LEN + 1];
> @@ -176,7 +176,7 @@ int ocfs2_stack_glue_register(struct ocfs2_stack_plugin *plugin)
> spin_lock(&ocfs2_stack_lock);
> if (!ocfs2_stack_lookup(plugin->sp_name)) {
> plugin->sp_count = 0;
> - plugin->sp_max_proto = lproto->lp_max_version;
> + plugin->sp_max_proto = locking_max_version;
> list_add(&plugin->sp_list, &ocfs2_stack_list);
> printk(KERN_INFO "ocfs2: Registered cluster interface %s\n",
> plugin->sp_name);
> @@ -213,23 +213,23 @@ void ocfs2_stack_glue_unregister(struct ocfs2_stack_plugin *plugin)
> }
> EXPORT_SYMBOL_GPL(ocfs2_stack_glue_unregister);
>
> -void ocfs2_stack_glue_set_locking_protocol(struct ocfs2_locking_protocol *proto)
> +void ocfs2_stack_glue_set_max_proto_version(struct ocfs2_protocol_version *max_proto)
> {
> struct ocfs2_stack_plugin *p;
>
> - BUG_ON(proto == NULL);
> -
> spin_lock(&ocfs2_stack_lock);
> - BUG_ON(active_stack != NULL);
> + if (memcmp(max_proto, &locking_max_version,
> + sizeof(struct ocfs2_protocol_version))) {
> + BUG_ON(locking_max_version.pv_major != 0);
>
> - lproto = proto;
> - list_for_each_entry(p, &ocfs2_stack_list, sp_list) {
> - p->sp_max_proto = lproto->lp_max_version;
> + locking_max_version = *max_proto;
> + list_for_each_entry(p, &ocfs2_stack_list, sp_list) {
> + p->sp_max_proto = locking_max_version;
> + }
> }
> -
> spin_unlock(&ocfs2_stack_lock);
> }
> -EXPORT_SYMBOL_GPL(ocfs2_stack_glue_set_locking_protocol);
> +EXPORT_SYMBOL_GPL(ocfs2_stack_glue_set_max_proto_version);
>
>
> /*
> @@ -245,8 +245,6 @@ int ocfs2_dlm_lock(struct ocfs2_cluster_connection *conn,
> void *name,
> unsigned int namelen)
> {
> - BUG_ON(lproto == NULL);
> -
> if (!lksb->lksb_conn)
> lksb->lksb_conn = conn;
> else
> @@ -260,7 +258,6 @@ int ocfs2_dlm_unlock(struct ocfs2_cluster_connection *conn,
> struct ocfs2_dlm_lksb *lksb,
> u32 flags)
> {
> - BUG_ON(lproto == NULL);
> BUG_ON(lksb->lksb_conn == NULL);
>
> return active_stack->sp_ops->dlm_unlock(conn, lksb, flags);
> @@ -314,6 +311,7 @@ EXPORT_SYMBOL_GPL(ocfs2_plock);
> int ocfs2_cluster_connect(const char *stack_name,
> const char *group,
> int grouplen,
> + struct ocfs2_locking_protocol *lproto,
> void (*recovery_handler)(int node_num,
> void *recovery_data),
> void *recovery_data,
> @@ -331,6 +329,12 @@ int ocfs2_cluster_connect(const char *stack_name,
> goto out;
> }
>
> + if (memcmp(&lproto->lp_max_version, &locking_max_version,
> + sizeof(struct ocfs2_protocol_version))) {
> + rc = -EINVAL;
> + goto out;
> + }
> +
> new_conn = kzalloc(sizeof(struct ocfs2_cluster_connection),
> GFP_KERNEL);
> if (!new_conn) {
> @@ -456,10 +460,10 @@ static ssize_t ocfs2_max_locking_protocol_show(struct kobject *kobj,
> ssize_t ret = 0;
>
> spin_lock(&ocfs2_stack_lock);
> - if (lproto)
> + if (locking_max_version.pv_major)
> ret = snprintf(buf, PAGE_SIZE, "%u.%u\n",
> - lproto->lp_max_version.pv_major,
> - lproto->lp_max_version.pv_minor);
> + locking_max_version.pv_major,
> + locking_max_version.pv_minor);
> spin_unlock(&ocfs2_stack_lock);
>
> return ret;
> @@ -688,7 +692,10 @@ static int __init ocfs2_stack_glue_init(void)
>
> static void __exit ocfs2_stack_glue_exit(void)
> {
> - lproto = NULL;
> + memset(&locking_max_version, 0,
> + sizeof(struct ocfs2_protocol_version));
> + locking_max_version.pv_major = 0;
> + locking_max_version.pv_minor = 0;
> ocfs2_sysfs_exit();
> if (ocfs2_table_header)
> unregister_sysctl_table(ocfs2_table_header);
> diff --git a/fs/ocfs2/stackglue.h b/fs/ocfs2/stackglue.h
> index 77a7a9a..b1981ba 100644
> --- a/fs/ocfs2/stackglue.h
> +++ b/fs/ocfs2/stackglue.h
> @@ -241,6 +241,7 @@ struct ocfs2_stack_plugin {
> int ocfs2_cluster_connect(const char *stack_name,
> const char *group,
> int grouplen,
> + struct ocfs2_locking_protocol *lproto,
> void (*recovery_handler)(int node_num,
> void *recovery_data),
> void *recovery_data,
> @@ -270,7 +271,7 @@ int ocfs2_stack_supports_plocks(void);
> int ocfs2_plock(struct ocfs2_cluster_connection *conn, u64 ino,
> struct file *file, int cmd, struct file_lock *fl);
>
> -void ocfs2_stack_glue_set_locking_protocol(struct ocfs2_locking_protocol *proto);
> +void ocfs2_stack_glue_set_max_proto_version(struct ocfs2_protocol_version *max_proto);
>
>
> /* Used by stack plugins */
>

--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majordomo(a)vger.kernel.org
More majordomo info at http://vger.kernel.org/majordomo-info.html
Please read the FAQ at http://www.tux.org/lkml/