diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index f3496db4bb3e80d24adec6413168b20299321439..94c026bba2c226a4b034ca0db2011aa411726179 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -657,6 +657,9 @@ void ceph_add_cap(struct inode *inode,
 		session->s_nr_caps++;
 		spin_unlock(&session->s_cap_lock);
 	} else {
+		if (cap->cap_gen < session->s_cap_gen)
+			cap->issued = cap->implemented = CEPH_CAP_PIN;
+
 		/*
 		 * auth mds of the inode changed. we received the cap export
 		 * message, but still haven't received the cap import message.
@@ -1855,14 +1858,17 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
 			retain |= CEPH_CAP_ANY;       /* be greedy */
 		} else if (S_ISDIR(inode->i_mode) &&
 			   (issued & CEPH_CAP_FILE_SHARED) &&
-			    __ceph_dir_is_complete(ci)) {
+			   __ceph_dir_is_complete(ci)) {
 			/*
 			 * If a directory is complete, we want to keep
 			 * the exclusive cap. So that MDS does not end up
 			 * revoking the shared cap on every create/unlink
 			 * operation.
 			 */
-			want = CEPH_CAP_ANY_SHARED | CEPH_CAP_FILE_EXCL;
+			if (IS_RDONLY(inode))
+				want = CEPH_CAP_ANY_SHARED;
+			else
+				want = CEPH_CAP_ANY_SHARED | CEPH_CAP_FILE_EXCL;
 			retain |= want;
 		} else {
 
@@ -1970,8 +1976,7 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
 			goto ack;
 
 		/* things we might delay */
-		if ((cap->issued & ~retain) == 0 &&
-		    cap->mds_wanted == want)
+		if ((cap->issued & ~retain) == 0)
 			continue;     /* nope, all good */
 
 		if (no_delay)
@@ -3048,7 +3053,8 @@ static void handle_cap_grant(struct inode *inode,
 	int used, wanted, dirty;
 	u64 size = le64_to_cpu(grant->size);
 	u64 max_size = le64_to_cpu(grant->max_size);
-	int check_caps = 0;
+	unsigned char check_caps = 0;
+	bool was_stale = cap->cap_gen < session->s_cap_gen;
 	bool wake = false;
 	bool writeback = false;
 	bool queue_trunc = false;
@@ -3062,21 +3068,6 @@ static void handle_cap_grant(struct inode *inode,
 		inode->i_size);
 
 
-	/*
-	 * auth mds of the inode changed. we received the cap export message,
-	 * but still haven't received the cap import message. handle_cap_export
-	 * updated the new auth MDS' cap.
-	 *
-	 * "ceph_seq_cmp(seq, cap->seq) <= 0" means we are processing a message
-	 * that was sent before the cap import message. So don't remove caps.
-	 */
-	if (ceph_seq_cmp(seq, cap->seq) <= 0) {
-		WARN_ON(cap != ci->i_auth_cap);
-		WARN_ON(cap->cap_id != le64_to_cpu(grant->cap_id));
-		seq = cap->seq;
-		newcaps |= cap->issued;
-	}
-
 	/*
 	 * If CACHE is being revoked, and we have no dirty buffers,
 	 * try to invalidate (once).  (If there are dirty buffers, we
@@ -3096,6 +3087,24 @@ static void handle_cap_grant(struct inode *inode,
 		}
 	}
 
+	if (was_stale)
+		cap->issued = cap->implemented = CEPH_CAP_PIN;
+
+	/*
+	 * auth mds of the inode changed. we received the cap export message,
+	 * but still haven't received the cap import message. handle_cap_export
+	 * updated the new auth MDS' cap.
+	 *
+	 * "ceph_seq_cmp(seq, cap->seq) <= 0" means we are processing a message
+	 * that was sent before the cap import message. So don't remove caps.
+	 */
+	if (ceph_seq_cmp(seq, cap->seq) <= 0) {
+		WARN_ON(cap != ci->i_auth_cap);
+		WARN_ON(cap->cap_id != le64_to_cpu(grant->cap_id));
+		seq = cap->seq;
+		newcaps |= cap->issued;
+	}
+
 	/* side effects now are allowed */
 	cap->cap_gen = session->s_cap_gen;
 	cap->seq = seq;
@@ -3200,13 +3209,20 @@ static void handle_cap_grant(struct inode *inode,
 	     ceph_cap_string(wanted),
 	     ceph_cap_string(used),
 	     ceph_cap_string(dirty));
-	if (wanted != le32_to_cpu(grant->wanted)) {
-		dout("mds wanted %s -> %s\n",
-		     ceph_cap_string(le32_to_cpu(grant->wanted)),
-		     ceph_cap_string(wanted));
-		/* imported cap may not have correct mds_wanted */
-		if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT)
-			check_caps = 1;
+
+	if ((was_stale || le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) &&
+	    (wanted & ~(cap->mds_wanted | newcaps))) {
+		/*
+		 * If mds is importing cap, prior cap messages that update
+		 * 'wanted' may get dropped by mds (migrate seq mismatch).
+		 *
+		 * We don't send cap message to update 'wanted' if what we
+		 * want are already issued. If mds revokes caps, cap message
+		 * that releases caps also tells mds what we want. But if
+		 * caps got revoked by mds forcedly (session stale). We may
+		 * haven't told mds what we want.
+		 */
+		check_caps = 1;
 	}
 
 	/* revocation, grant, or no-op? */
@@ -3539,9 +3555,9 @@ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex,
 		goto out_unlock;
 
 	if (target < 0) {
-		__ceph_remove_cap(cap, false);
-		if (!ci->i_auth_cap)
+		if (cap->mds_wanted | cap->issued)
 			ci->i_ceph_flags |= CEPH_I_CAP_DROPPED;
+		__ceph_remove_cap(cap, false);
 		goto out_unlock;
 	}
 
@@ -3569,7 +3585,6 @@ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex,
 			tcap->cap_id = t_cap_id;
 			tcap->seq = t_seq - 1;
 			tcap->issue_seq = t_seq - 1;
-			tcap->mseq = t_mseq;
 			tcap->issued |= issued;
 			tcap->implemented |= issued;
 			if (cap == ci->i_auth_cap)
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index 79dd5e6ed7559568b666784cae7b728dee72ba1a..9d1f34d4662702c8dc970861da7817d2d9d801d9 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -1098,8 +1098,9 @@ static void update_dentry_lease(struct dentry *dentry,
  * splice a dentry to an inode.
  * caller must hold directory i_mutex for this to be safe.
  */
-static struct dentry *splice_dentry(struct dentry *dn, struct inode *in)
+static int splice_dentry(struct dentry **pdn, struct inode *in)
 {
+	struct dentry *dn = *pdn;
 	struct dentry *realdn;
 
 	BUG_ON(d_inode(dn));
@@ -1132,28 +1133,23 @@ static struct dentry *splice_dentry(struct dentry *dn, struct inode *in)
 	if (IS_ERR(realdn)) {
 		pr_err("splice_dentry error %ld %p inode %p ino %llx.%llx\n",
 		       PTR_ERR(realdn), dn, in, ceph_vinop(in));
-		dn = realdn;
-		/*
-		 * Caller should release 'dn' in the case of error.
-		 * If 'req->r_dentry' is passed to this function,
-		 * caller should leave 'req->r_dentry' untouched.
-		 */
-		goto out;
-	} else if (realdn) {
+		return PTR_ERR(realdn);
+	}
+
+	if (realdn) {
 		dout("dn %p (%d) spliced with %p (%d) "
 		     "inode %p ino %llx.%llx\n",
 		     dn, d_count(dn),
 		     realdn, d_count(realdn),
 		     d_inode(realdn), ceph_vinop(d_inode(realdn)));
 		dput(dn);
-		dn = realdn;
+		*pdn = realdn;
 	} else {
 		BUG_ON(!ceph_dentry(dn));
 		dout("dn %p attached to %p ino %llx.%llx\n",
 		     dn, d_inode(dn), ceph_vinop(d_inode(dn)));
 	}
-out:
-	return dn;
+	return 0;
 }
 
 /*
@@ -1340,7 +1336,12 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req)
 			dout("dn %p gets new offset %lld\n", req->r_old_dentry,
 			     ceph_dentry(req->r_old_dentry)->offset);
 
-			dn = req->r_old_dentry;  /* use old_dentry */
+			/* swap r_dentry and r_old_dentry in case that
+			 * splice_dentry() gets called later. This is safe
+			 * because no other place will use them */
+			req->r_dentry = req->r_old_dentry;
+			req->r_old_dentry = dn;
+			dn = req->r_dentry;
 		}
 
 		/* null dentry? */
@@ -1365,12 +1366,10 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req)
 		if (d_really_is_negative(dn)) {
 			ceph_dir_clear_ordered(dir);
 			ihold(in);
-			dn = splice_dentry(dn, in);
-			if (IS_ERR(dn)) {
-				err = PTR_ERR(dn);
+			err = splice_dentry(&req->r_dentry, in);
+			if (err < 0)
 				goto done;
-			}
-			req->r_dentry = dn;  /* may have spliced */
+			dn = req->r_dentry;  /* may have spliced */
 		} else if (d_really_is_positive(dn) && d_inode(dn) != in) {
 			dout(" %p links to %p %llx.%llx, not %llx.%llx\n",
 			     dn, d_inode(dn), ceph_vinop(d_inode(dn)),
@@ -1390,22 +1389,18 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req)
 	} else if ((req->r_op == CEPH_MDS_OP_LOOKUPSNAP ||
 		    req->r_op == CEPH_MDS_OP_MKSNAP) &&
 		   !test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
-		struct dentry *dn = req->r_dentry;
 		struct inode *dir = req->r_parent;
 
 		/* fill out a snapdir LOOKUPSNAP dentry */
-		BUG_ON(!dn);
 		BUG_ON(!dir);
 		BUG_ON(ceph_snap(dir) != CEPH_SNAPDIR);
-		dout(" linking snapped dir %p to dn %p\n", in, dn);
+		BUG_ON(!req->r_dentry);
+		dout(" linking snapped dir %p to dn %p\n", in, req->r_dentry);
 		ceph_dir_clear_ordered(dir);
 		ihold(in);
-		dn = splice_dentry(dn, in);
-		if (IS_ERR(dn)) {
-			err = PTR_ERR(dn);
+		err = splice_dentry(&req->r_dentry, in);
+		if (err < 0)
 			goto done;
-		}
-		req->r_dentry = dn;  /* may have spliced */
 	} else if (rinfo->head->is_dentry) {
 		struct ceph_vino *ptvino = NULL;
 
@@ -1669,8 +1664,6 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
 		}
 
 		if (d_really_is_negative(dn)) {
-			struct dentry *realdn;
-
 			if (ceph_security_xattr_deadlock(in)) {
 				dout(" skip splicing dn %p to inode %p"
 				     " (security xattr deadlock)\n", dn, in);
@@ -1679,13 +1672,9 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
 				goto next_item;
 			}
 
-			realdn = splice_dentry(dn, in);
-			if (IS_ERR(realdn)) {
-				err = PTR_ERR(realdn);
-				d_drop(dn);
+			err = splice_dentry(&dn, in);
+			if (err < 0)
 				goto next_item;
-			}
-			dn = realdn;
 		}
 
 		ceph_dentry(dn)->offset = rde->offset;
@@ -1701,8 +1690,7 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
 				err = ret;
 		}
 next_item:
-		if (dn)
-			dput(dn);
+		dput(dn);
 	}
 out:
 	if (err == 0 && skipped == 0) {
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index bd13a3267ae03c401d7b0dd0c1f37626bbc42b0a..163fc74bf22174be678f7a826a6eaddc953872c0 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -1232,13 +1232,13 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
 	dout("removing cap %p, ci is %p, inode is %p\n",
 	     cap, ci, &ci->vfs_inode);
 	spin_lock(&ci->i_ceph_lock);
+	if (cap->mds_wanted | cap->issued)
+		ci->i_ceph_flags |= CEPH_I_CAP_DROPPED;
 	__ceph_remove_cap(cap, false);
 	if (!ci->i_auth_cap) {
 		struct ceph_cap_flush *cf;
 		struct ceph_mds_client *mdsc = fsc->mdsc;
 
-		ci->i_ceph_flags |= CEPH_I_CAP_DROPPED;
-
 		if (ci->i_wrbuffer_ref > 0 &&
 		    READ_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
 			invalidate = true;
@@ -1355,6 +1355,12 @@ static void remove_session_caps(struct ceph_mds_session *session)
 	dispose_cap_releases(session->s_mdsc, &dispose);
 }
 
+enum {
+	RECONNECT,
+	RENEWCAPS,
+	FORCE_RO,
+};
+
 /*
  * wake up any threads waiting on this session's caps.  if the cap is
  * old (didn't get renewed on the client reconnect), remove it now.
@@ -1365,23 +1371,34 @@ static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap,
 			      void *arg)
 {
 	struct ceph_inode_info *ci = ceph_inode(inode);
+	unsigned long ev = (unsigned long)arg;
 
-	if (arg) {
+	if (ev == RECONNECT) {
 		spin_lock(&ci->i_ceph_lock);
 		ci->i_wanted_max_size = 0;
 		ci->i_requested_max_size = 0;
 		spin_unlock(&ci->i_ceph_lock);
+	} else if (ev == RENEWCAPS) {
+		if (cap->cap_gen < cap->session->s_cap_gen) {
+			/* mds did not re-issue stale cap */
+			spin_lock(&ci->i_ceph_lock);
+			cap->issued = cap->implemented = CEPH_CAP_PIN;
+			/* make sure mds knows what we want */
+			if (__ceph_caps_file_wanted(ci) & ~cap->mds_wanted)
+				ci->i_ceph_flags |= CEPH_I_CAP_DROPPED;
+			spin_unlock(&ci->i_ceph_lock);
+		}
+	} else if (ev == FORCE_RO) {
 	}
 	wake_up_all(&ci->i_cap_wq);
 	return 0;
 }
 
-static void wake_up_session_caps(struct ceph_mds_session *session,
-				 int reconnect)
+static void wake_up_session_caps(struct ceph_mds_session *session, int ev)
 {
 	dout("wake_up_session_caps %p mds%d\n", session, session->s_mds);
 	iterate_session_caps(session, wake_up_session_cb,
-			     (void *)(unsigned long)reconnect);
+			     (void *)(unsigned long)ev);
 }
 
 /*
@@ -1466,7 +1483,7 @@ static void renewed_caps(struct ceph_mds_client *mdsc,
 	spin_unlock(&session->s_cap_lock);
 
 	if (wake)
-		wake_up_session_caps(session, 0);
+		wake_up_session_caps(session, RENEWCAPS);
 }
 
 /*
@@ -2847,7 +2864,7 @@ static void handle_session(struct ceph_mds_session *session,
 		spin_lock(&session->s_cap_lock);
 		session->s_readonly = true;
 		spin_unlock(&session->s_cap_lock);
-		wake_up_session_caps(session, 0);
+		wake_up_session_caps(session, FORCE_RO);
 		break;
 
 	case CEPH_SESSION_REJECT:
@@ -2943,11 +2960,8 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
 	struct ceph_inode_info *ci = cap->ci;
 	struct ceph_reconnect_state *recon_state = arg;
 	struct ceph_pagelist *pagelist = recon_state->pagelist;
-	char *path;
-	int pathlen, err;
-	u64 pathbase;
+	int err;
 	u64 snap_follows;
-	struct dentry *dentry;
 
 	dout(" adding %p ino %llx.%llx cap %p %lld %s\n",
 	     inode, ceph_vinop(inode), cap, cap->cap_id,
@@ -2956,19 +2970,6 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
 	if (err)
 		return err;
 
-	dentry = d_find_alias(inode);
-	if (dentry) {
-		path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase, 0);
-		if (IS_ERR(path)) {
-			err = PTR_ERR(path);
-			goto out_dput;
-		}
-	} else {
-		path = NULL;
-		pathlen = 0;
-		pathbase = 0;
-	}
-
 	spin_lock(&ci->i_ceph_lock);
 	cap->seq = 0;        /* reset cap seq */
 	cap->issue_seq = 0;  /* and issue_seq */
@@ -2980,7 +2981,7 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
 		rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
 		rec.v2.issued = cpu_to_le32(cap->issued);
 		rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
-		rec.v2.pathbase = cpu_to_le64(pathbase);
+		rec.v2.pathbase = 0;
 		rec.v2.flock_len = (__force __le32)
 			((ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK) ? 0 : 1);
 	} else {
@@ -2991,7 +2992,7 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
 		ceph_encode_timespec64(&rec.v1.mtime, &inode->i_mtime);
 		ceph_encode_timespec64(&rec.v1.atime, &inode->i_atime);
 		rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
-		rec.v1.pathbase = cpu_to_le64(pathbase);
+		rec.v1.pathbase = 0;
 	}
 
 	if (list_empty(&ci->i_cap_snaps)) {
@@ -3023,7 +3024,7 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
 					       GFP_NOFS);
 			if (!flocks) {
 				err = -ENOMEM;
-				goto out_free;
+				goto out_err;
 			}
 			err = ceph_encode_locks_to_buffer(inode, flocks,
 							  num_fcntl_locks,
@@ -3033,7 +3034,7 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
 				flocks = NULL;
 				if (err == -ENOSPC)
 					goto encode_again;
-				goto out_free;
+				goto out_err;
 			}
 		} else {
 			kfree(flocks);
@@ -3053,44 +3054,64 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
 			    sizeof(struct ceph_filelock);
 		rec.v2.flock_len = cpu_to_le32(struct_len);
 
-		struct_len += sizeof(rec.v2);
-		struct_len += sizeof(u32) + pathlen;
+		struct_len += sizeof(u32) + sizeof(rec.v2);
 
 		if (struct_v >= 2)
 			struct_len += sizeof(u64); /* snap_follows */
 
 		total_len += struct_len;
 		err = ceph_pagelist_reserve(pagelist, total_len);
+		if (err) {
+			kfree(flocks);
+			goto out_err;
+		}
 
-		if (!err) {
-			if (recon_state->msg_version >= 3) {
-				ceph_pagelist_encode_8(pagelist, struct_v);
-				ceph_pagelist_encode_8(pagelist, 1);
-				ceph_pagelist_encode_32(pagelist, struct_len);
-			}
-			ceph_pagelist_encode_string(pagelist, path, pathlen);
-			ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2));
-			ceph_locks_to_pagelist(flocks, pagelist,
-					       num_fcntl_locks,
-					       num_flock_locks);
-			if (struct_v >= 2)
-				ceph_pagelist_encode_64(pagelist, snap_follows);
+		if (recon_state->msg_version >= 3) {
+			ceph_pagelist_encode_8(pagelist, struct_v);
+			ceph_pagelist_encode_8(pagelist, 1);
+			ceph_pagelist_encode_32(pagelist, struct_len);
 		}
+		ceph_pagelist_encode_string(pagelist, NULL, 0);
+		ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2));
+		ceph_locks_to_pagelist(flocks, pagelist,
+				       num_fcntl_locks, num_flock_locks);
+		if (struct_v >= 2)
+			ceph_pagelist_encode_64(pagelist, snap_follows);
+
 		kfree(flocks);
 	} else {
-		size_t size = sizeof(u32) + pathlen + sizeof(rec.v1);
-		err = ceph_pagelist_reserve(pagelist, size);
-		if (!err) {
-			ceph_pagelist_encode_string(pagelist, path, pathlen);
-			ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1));
+		u64 pathbase = 0;
+		int pathlen = 0;
+		char *path = NULL;
+		struct dentry *dentry;
+
+		dentry = d_find_alias(inode);
+		if (dentry) {
+			path = ceph_mdsc_build_path(dentry,
+						&pathlen, &pathbase, 0);
+			dput(dentry);
+			if (IS_ERR(path)) {
+				err = PTR_ERR(path);
+				goto out_err;
+			}
+			rec.v1.pathbase = cpu_to_le64(pathbase);
 		}
+
+		err = ceph_pagelist_reserve(pagelist,
+				pathlen + sizeof(u32) + sizeof(rec.v1));
+		if (err) {
+			kfree(path);
+			goto out_err;
+		}
+
+		ceph_pagelist_encode_string(pagelist, path, pathlen);
+		ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1));
+
+		kfree(path);
 	}
 
 	recon_state->nr_caps++;
-out_free:
-	kfree(path);
-out_dput:
-	dput(dentry);
+out_err:
 	return err;
 }
 
@@ -3339,7 +3360,7 @@ static void check_new_map(struct ceph_mds_client *mdsc,
 				pr_info("mds%d recovery completed\n", s->s_mds);
 			kick_requests(mdsc, i);
 			ceph_kick_flushing_caps(mdsc, s);
-			wake_up_session_caps(s, 1);
+			wake_up_session_caps(s, RECONNECT);
 		}
 	}
 
diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
index 32fcce0d4d3cbbc47a106b0297a61aa39f6c6e84..729da155ebf04761fd2883448b26779e4307f050 100644
--- a/fs/ceph/mds_client.h
+++ b/fs/ceph/mds_client.h
@@ -17,14 +17,16 @@
 #include <linux/ceph/auth.h>
 
 /* The first 8 bits are reserved for old ceph releases */
-#define CEPHFS_FEATURE_MIMIC    8
-
-#define CEPHFS_FEATURES_ALL {           \
-  0, 1, 2, 3, 4, 5, 6, 7,		\
-  CEPHFS_FEATURE_MIMIC,                 \
+#define CEPHFS_FEATURE_MIMIC		8
+#define CEPHFS_FEATURE_REPLY_ENCODING	9
+#define CEPHFS_FEATURE_RECLAIM_CLIENT	10
+#define CEPHFS_FEATURE_LAZY_CAP_WANTED	11
+
+#define CEPHFS_FEATURES_CLIENT_SUPPORTED { 	\
+	0, 1, 2, 3, 4, 5, 6, 7,			\
+	CEPHFS_FEATURE_MIMIC,			\
+	CEPHFS_FEATURE_LAZY_CAP_WANTED,		\
 }
-
-#define CEPHFS_FEATURES_CLIENT_SUPPORTED CEPHFS_FEATURES_ALL
 #define CEPHFS_FEATURES_CLIENT_REQUIRED {}
 
 
diff --git a/fs/ceph/mdsmap.c b/fs/ceph/mdsmap.c
index 44e53abeb32ae6495663afd1131564f33fea3188..1a2c5d390f7f184705b3bde1f68f36074b6bf39e 100644
--- a/fs/ceph/mdsmap.c
+++ b/fs/ceph/mdsmap.c
@@ -35,7 +35,6 @@ int ceph_mdsmap_get_random_mds(struct ceph_mdsmap *m)
 
 	/* pick */
 	n = prandom_u32() % n;
-	i = 0;
 	for (i = 0; n > 0; i++, n--)
 		while (m->m_info[i].state <= 0)
 			i++;
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 2f126eff275d58417d2397b15e7fcef2351cdde5..d5718284db5721dfe96554b48cca5d38bdc40942 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -544,7 +544,7 @@ static int ceph_tcp_recvpage(struct socket *sock, struct page *page,
  * shortly.
  */
 static int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov,
-		     size_t kvlen, size_t len, int more)
+			    size_t kvlen, size_t len, bool more)
 {
 	struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
 	int r;
@@ -560,24 +560,15 @@ static int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov,
 	return r;
 }
 
-static int __ceph_tcp_sendpage(struct socket *sock, struct page *page,
-		     int offset, size_t size, bool more)
-{
-	int flags = MSG_DONTWAIT | MSG_NOSIGNAL | (more ? MSG_MORE : MSG_EOR);
-	int ret;
-
-	ret = kernel_sendpage(sock, page, offset, size, flags);
-	if (ret == -EAGAIN)
-		ret = 0;
-
-	return ret;
-}
-
+/*
+ * @more: either or both of MSG_MORE and MSG_SENDPAGE_NOTLAST
+ */
 static int ceph_tcp_sendpage(struct socket *sock, struct page *page,
-		     int offset, size_t size, bool more)
+			     int offset, size_t size, int more)
 {
-	struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
-	struct bio_vec bvec;
+	ssize_t (*sendpage)(struct socket *sock, struct page *page,
+			    int offset, size_t size, int flags);
+	int flags = MSG_DONTWAIT | MSG_NOSIGNAL | more;
 	int ret;
 
 	/*
@@ -589,19 +580,11 @@ static int ceph_tcp_sendpage(struct socket *sock, struct page *page,
 	 * triggers one of hardened usercopy checks.
 	 */
 	if (page_count(page) >= 1 && !PageSlab(page))
-		return __ceph_tcp_sendpage(sock, page, offset, size, more);
-
-	bvec.bv_page = page;
-	bvec.bv_offset = offset;
-	bvec.bv_len = size;
-
-	if (more)
-		msg.msg_flags |= MSG_MORE;
+		sendpage = sock->ops->sendpage;
 	else
-		msg.msg_flags |= MSG_EOR;  /* superfluous, but what the hell */
+		sendpage = sock_no_sendpage;
 
-	iov_iter_bvec(&msg.msg_iter, WRITE, &bvec, 1, size);
-	ret = sock_sendmsg(sock, &msg);
+	ret = sendpage(sock, page, offset, size, flags);
 	if (ret == -EAGAIN)
 		ret = 0;
 
@@ -1572,6 +1555,7 @@ static int write_partial_message_data(struct ceph_connection *con)
 	struct ceph_msg *msg = con->out_msg;
 	struct ceph_msg_data_cursor *cursor = &msg->cursor;
 	bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
+	int more = MSG_MORE | MSG_SENDPAGE_NOTLAST;
 	u32 crc;
 
 	dout("%s %p msg %p\n", __func__, con, msg);
@@ -1592,7 +1576,6 @@ static int write_partial_message_data(struct ceph_connection *con)
 		struct page *page;
 		size_t page_offset;
 		size_t length;
-		bool last_piece;
 		int ret;
 
 		if (!cursor->resid) {
@@ -1600,10 +1583,11 @@ static int write_partial_message_data(struct ceph_connection *con)
 			continue;
 		}
 
-		page = ceph_msg_data_next(cursor, &page_offset, &length,
-					  &last_piece);
-		ret = ceph_tcp_sendpage(con->sock, page, page_offset,
-					length, !last_piece);
+		page = ceph_msg_data_next(cursor, &page_offset, &length, NULL);
+		if (length == cursor->total_resid)
+			more = MSG_MORE;
+		ret = ceph_tcp_sendpage(con->sock, page, page_offset, length,
+					more);
 		if (ret <= 0) {
 			if (do_datacrc)
 				msg->footer.data_crc = cpu_to_le32(crc);
@@ -1633,13 +1617,16 @@ static int write_partial_message_data(struct ceph_connection *con)
  */
 static int write_partial_skip(struct ceph_connection *con)
 {
+	int more = MSG_MORE | MSG_SENDPAGE_NOTLAST;
 	int ret;
 
 	dout("%s %p %d left\n", __func__, con, con->out_skip);
 	while (con->out_skip > 0) {
 		size_t size = min(con->out_skip, (int) PAGE_SIZE);
 
-		ret = ceph_tcp_sendpage(con->sock, zero_page, 0, size, true);
+		if (size == con->out_skip)
+			more = MSG_MORE;
+		ret = ceph_tcp_sendpage(con->sock, zero_page, 0, size, more);
 		if (ret <= 0)
 			goto out;
 		con->out_skip -= ret;