summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--drivers/block/rbd.c11
-rw-r--r--fs/ceph/addr.c273
-rw-r--r--fs/ceph/caps.c132
-rw-r--r--fs/ceph/dir.c27
-rw-r--r--fs/ceph/file.c97
-rw-r--r--fs/ceph/inode.c59
-rw-r--r--fs/ceph/locks.c64
-rw-r--r--fs/ceph/mds_client.c41
-rw-r--r--fs/ceph/mds_client.h10
-rw-r--r--fs/ceph/snap.c37
-rw-r--r--fs/ceph/super.c16
-rw-r--r--fs/ceph/super.h55
-rw-r--r--fs/ceph/super.h.rej10
-rw-r--r--fs/ceph/xattr.c7
-rw-r--r--include/linux/ceph/auth.h26
-rw-r--r--include/linux/ceph/buffer.h3
-rw-r--r--include/linux/ceph/ceph_features.h1
-rw-r--r--include/linux/ceph/ceph_fs.h10
-rw-r--r--include/linux/ceph/libceph.h2
-rw-r--r--include/linux/ceph/messenger.h9
-rw-r--r--include/linux/ceph/msgr.h11
-rw-r--r--include/linux/ceph/osd_client.h13
-rw-r--r--include/linux/ceph/pagelist.h4
-rw-r--r--net/ceph/auth_x.c76
-rw-r--r--net/ceph/auth_x.h1
-rw-r--r--net/ceph/buffer.c4
-rw-r--r--net/ceph/ceph_common.c21
-rw-r--r--net/ceph/messenger.c34
-rw-r--r--net/ceph/osd_client.c118
29 files changed, 992 insertions, 180 deletions
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 27b71a0..3ec85df 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -2370,8 +2370,12 @@ static void rbd_img_obj_request_fill(struct rbd_obj_request *obj_request,
opcode = CEPH_OSD_OP_READ;
}
- osd_req_op_extent_init(osd_request, num_ops, opcode, offset, length,
- 0, 0);
+ if (opcode == CEPH_OSD_OP_DELETE)
+ osd_req_op_init(osd_request, num_ops, opcode);
+ else
+ osd_req_op_extent_init(osd_request, num_ops, opcode,
+ offset, length, 0, 0);
+
if (obj_request->type == OBJ_REQUEST_BIO)
osd_req_op_extent_osd_data_bio(osd_request, num_ops,
obj_request->bio_list, length);
@@ -3405,8 +3409,7 @@ err_rq:
if (result)
rbd_warn(rbd_dev, "%s %llx at %llx result %d",
obj_op_name(op_type), length, offset, result);
- if (snapc)
- ceph_put_snap_context(snapc);
+ ceph_put_snap_context(snapc);
blk_end_request_all(rq, result);
}
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index 18c06bb..f5013d9 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -192,17 +192,30 @@ static int readpage_nounlock(struct file *filp, struct page *page)
struct ceph_osd_client *osdc =
&ceph_inode_to_client(inode)->client->osdc;
int err = 0;
+ u64 off = page_offset(page);
u64 len = PAGE_CACHE_SIZE;
- err = ceph_readpage_from_fscache(inode, page);
+ if (off >= i_size_read(inode)) {
+ zero_user_segment(page, err, PAGE_CACHE_SIZE);
+ SetPageUptodate(page);
+ return 0;
+ }
+ /*
+ * Uptodate inline data should have been added into page cache
+ * while getting Fcr caps.
+ */
+ if (ci->i_inline_version != CEPH_INLINE_NONE)
+ return -EINVAL;
+
+ err = ceph_readpage_from_fscache(inode, page);
if (err == 0)
goto out;
dout("readpage inode %p file %p page %p index %lu\n",
inode, filp, page, page->index);
err = ceph_osdc_readpages(osdc, ceph_vino(inode), &ci->i_layout,
- (u64) page_offset(page), &len,
+ off, &len,
ci->i_truncate_seq, ci->i_truncate_size,
&page, 1, 0);
if (err == -ENOENT)
@@ -319,7 +332,7 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max)
off, len);
vino = ceph_vino(inode);
req = ceph_osdc_new_request(osdc, &ci->i_layout, vino, off, &len,
- 1, CEPH_OSD_OP_READ,
+ 0, 1, CEPH_OSD_OP_READ,
CEPH_OSD_FLAG_READ, NULL,
ci->i_truncate_seq, ci->i_truncate_size,
false);
@@ -384,6 +397,9 @@ static int ceph_readpages(struct file *file, struct address_space *mapping,
int rc = 0;
int max = 0;
+ if (ceph_inode(inode)->i_inline_version != CEPH_INLINE_NONE)
+ return -EINVAL;
+
rc = ceph_readpages_from_fscache(mapping->host, mapping, page_list,
&nr_pages);
@@ -673,7 +689,7 @@ static int ceph_writepages_start(struct address_space *mapping,
int rc = 0;
unsigned wsize = 1 << inode->i_blkbits;
struct ceph_osd_request *req = NULL;
- int do_sync;
+ int do_sync = 0;
u64 truncate_size, snap_size;
u32 truncate_seq;
@@ -750,7 +766,6 @@ retry:
last_snapc = snapc;
while (!done && index <= end) {
- int num_ops = do_sync ? 2 : 1;
unsigned i;
int first;
pgoff_t next;
@@ -850,7 +865,8 @@ get_more_pages:
len = wsize;
req = ceph_osdc_new_request(&fsc->client->osdc,
&ci->i_layout, vino,
- offset, &len, num_ops,
+ offset, &len, 0,
+ do_sync ? 2 : 1,
CEPH_OSD_OP_WRITE,
CEPH_OSD_FLAG_WRITE |
CEPH_OSD_FLAG_ONDISK,
@@ -862,6 +878,9 @@ get_more_pages:
break;
}
+ if (do_sync)
+ osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC);
+
req->r_callback = writepages_finish;
req->r_inode = inode;
@@ -1204,6 +1223,7 @@ static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf)
struct inode *inode = file_inode(vma->vm_file);
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_file_info *fi = vma->vm_file->private_data;
+ struct page *pinned_page = NULL;
loff_t off = vmf->pgoff << PAGE_CACHE_SHIFT;
int want, got, ret;
@@ -1215,7 +1235,8 @@ static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf)
want = CEPH_CAP_FILE_CACHE;
while (1) {
got = 0;
- ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, &got, -1);
+ ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want,
+ -1, &got, &pinned_page);
if (ret == 0)
break;
if (ret != -ERESTARTSYS) {
@@ -1226,12 +1247,54 @@ static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf)
dout("filemap_fault %p %llu~%zd got cap refs on %s\n",
inode, off, (size_t)PAGE_CACHE_SIZE, ceph_cap_string(got));
- ret = filemap_fault(vma, vmf);
+ if ((got & (CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO)) ||
+ ci->i_inline_version == CEPH_INLINE_NONE)
+ ret = filemap_fault(vma, vmf);
+ else
+ ret = -EAGAIN;
dout("filemap_fault %p %llu~%zd dropping cap refs on %s ret %d\n",
inode, off, (size_t)PAGE_CACHE_SIZE, ceph_cap_string(got), ret);
+ if (pinned_page)
+ page_cache_release(pinned_page);
ceph_put_cap_refs(ci, got);
+ if (ret != -EAGAIN)
+ return ret;
+
+ /* read inline data */
+ if (off >= PAGE_CACHE_SIZE) {
+ /* does not support inline data > PAGE_SIZE */
+ ret = VM_FAULT_SIGBUS;
+ } else {
+ int ret1;
+ struct address_space *mapping = inode->i_mapping;
+ struct page *page = find_or_create_page(mapping, 0,
+ mapping_gfp_mask(mapping) &
+ ~__GFP_FS);
+ if (!page) {
+ ret = VM_FAULT_OOM;
+ goto out;
+ }
+ ret1 = __ceph_do_getattr(inode, page,
+ CEPH_STAT_CAP_INLINE_DATA, true);
+ if (ret1 < 0 || off >= i_size_read(inode)) {
+ unlock_page(page);
+ page_cache_release(page);
+ ret = VM_FAULT_SIGBUS;
+ goto out;
+ }
+ if (ret1 < PAGE_CACHE_SIZE)
+ zero_user_segment(page, ret1, PAGE_CACHE_SIZE);
+ else
+ flush_dcache_page(page);
+ SetPageUptodate(page);
+ vmf->page = page;
+ ret = VM_FAULT_MAJOR | VM_FAULT_LOCKED;
+ }
+out:
+ dout("filemap_fault %p %llu~%zd read inline data ret %d\n",
+ inode, off, (size_t)PAGE_CACHE_SIZE, ret);
return ret;
}
@@ -1250,6 +1313,19 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
size_t len;
int want, got, ret;
+ if (ci->i_inline_version != CEPH_INLINE_NONE) {
+ struct page *locked_page = NULL;
+ if (off == 0) {
+ lock_page(page);
+ locked_page = page;
+ }
+ ret = ceph_uninline_data(vma->vm_file, locked_page);
+ if (locked_page)
+ unlock_page(locked_page);
+ if (ret < 0)
+ return VM_FAULT_SIGBUS;
+ }
+
if (off + PAGE_CACHE_SIZE <= size)
len = PAGE_CACHE_SIZE;
else
@@ -1263,7 +1339,8 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
want = CEPH_CAP_FILE_BUFFER;
while (1) {
got = 0;
- ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, off + len);
+ ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, off + len,
+ &got, NULL);
if (ret == 0)
break;
if (ret != -ERESTARTSYS) {
@@ -1297,11 +1374,13 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
ret = VM_FAULT_SIGBUS;
}
out:
- if (ret != VM_FAULT_LOCKED) {
+ if (ret != VM_FAULT_LOCKED)
unlock_page(page);
- } else {
+ if (ret == VM_FAULT_LOCKED ||
+ ci->i_inline_version != CEPH_INLINE_NONE) {
int dirty;
spin_lock(&ci->i_ceph_lock);
+ ci->i_inline_version = CEPH_INLINE_NONE;
dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
spin_unlock(&ci->i_ceph_lock);
if (dirty)
@@ -1315,6 +1394,178 @@ out:
return ret;
}
+void ceph_fill_inline_data(struct inode *inode, struct page *locked_page,
+ char *data, size_t len)
+{
+ struct address_space *mapping = inode->i_mapping;
+ struct page *page;
+
+ if (locked_page) {
+ page = locked_page;
+ } else {
+ if (i_size_read(inode) == 0)
+ return;
+ page = find_or_create_page(mapping, 0,
+ mapping_gfp_mask(mapping) & ~__GFP_FS);
+ if (!page)
+ return;
+ if (PageUptodate(page)) {
+ unlock_page(page);
+ page_cache_release(page);
+ return;
+ }
+ }
+
+ dout("fill_inline_data %p %llx.%llx len %lu locked_page %p\n",
+ inode, ceph_vinop(inode), len, locked_page);
+
+ if (len > 0) {
+ void *kaddr = kmap_atomic(page);
+ memcpy(kaddr, data, len);
+ kunmap_atomic(kaddr);
+ }
+
+ if (page != locked_page) {
+ if (len < PAGE_CACHE_SIZE)
+ zero_user_segment(page, len, PAGE_CACHE_SIZE);
+ else
+ flush_dcache_page(page);
+
+ SetPageUptodate(page);
+ unlock_page(page);
+ page_cache_release(page);
+ }
+}
+
+int ceph_uninline_data(struct file *filp, struct page *locked_page)
+{
+ struct inode *inode = file_inode(filp);
+ struct ceph_inode_info *ci = ceph_inode(inode);
+ struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
+ struct ceph_osd_request *req;
+ struct page *page = NULL;
+ u64 len, inline_version;
+ int err = 0;
+ bool from_pagecache = false;
+
+ spin_lock(&ci->i_ceph_lock);
+ inline_version = ci->i_inline_version;
+ spin_unlock(&ci->i_ceph_lock);
+
+ dout("uninline_data %p %llx.%llx inline_version %llu\n",
+ inode, ceph_vinop(inode), inline_version);
+
+ if (inline_version == 1 || /* initial version, no data */
+ inline_version == CEPH_INLINE_NONE)
+ goto out;
+
+ if (locked_page) {
+ page = locked_page;
+ WARN_ON(!PageUptodate(page));
+ } else if (ceph_caps_issued(ci) &
+ (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) {
+ page = find_get_page(inode->i_mapping, 0);
+ if (page) {
+ if (PageUptodate(page)) {
+ from_pagecache = true;
+ lock_page(page);
+ } else {
+ page_cache_release(page);
+ page = NULL;
+ }
+ }
+ }
+
+ if (page) {
+ len = i_size_read(inode);
+ if (len > PAGE_CACHE_SIZE)
+ len = PAGE_CACHE_SIZE;
+ } else {
+ page = __page_cache_alloc(GFP_NOFS);
+ if (!page) {
+ err = -ENOMEM;
+ goto out;
+ }
+ err = __ceph_do_getattr(inode, page,
+ CEPH_STAT_CAP_INLINE_DATA, true);
+ if (err < 0) {
+ /* no inline data */
+ if (err == -ENODATA)
+ err = 0;
+ goto out;
+ }
+ len = err;
+ }
+
+ req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
+ ceph_vino(inode), 0, &len, 0, 1,
+ CEPH_OSD_OP_CREATE,
+ CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
+ ci->i_snap_realm->cached_context,
+ 0, 0, false);
+ if (IS_ERR(req)) {
+ err = PTR_ERR(req);
+ goto out;
+ }
+
+ ceph_osdc_build_request(req, 0, NULL, CEPH_NOSNAP, &inode->i_mtime);
+ err = ceph_osdc_start_request(&fsc->client->osdc, req, false);
+ if (!err)
+ err = ceph_osdc_wait_request(&fsc->client->osdc, req);
+ ceph_osdc_put_request(req);
+ if (err < 0)
+ goto out;
+
+ req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
+ ceph_vino(inode), 0, &len, 1, 3,
+ CEPH_OSD_OP_WRITE,
+ CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
+ ci->i_snap_realm->cached_context,
+ ci->i_truncate_seq, ci->i_truncate_size,
+ false);
+ if (IS_ERR(req)) {
+ err = PTR_ERR(req);
+ goto out;
+ }
+
+ osd_req_op_extent_osd_data_pages(req, 1, &page, len, 0, false, false);
+
+ err = osd_req_op_xattr_init(req, 0, CEPH_OSD_OP_CMPXATTR,
+ "inline_version", &inline_version,
+ sizeof(inline_version),
+ CEPH_OSD_CMPXATTR_OP_GT,
+ CEPH_OSD_CMPXATTR_MODE_U64);
+ if (err)
+ goto out_put;
+
+ err = osd_req_op_xattr_init(req, 2, CEPH_OSD_OP_SETXATTR,
+ "inline_version", &inline_version,
+ sizeof(inline_version), 0, 0);
+ if (err)
+ goto out_put;
+
+ ceph_osdc_build_request(req, 0, NULL, CEPH_NOSNAP, &inode->i_mtime);
+ err = ceph_osdc_start_request(&fsc->client->osdc, req, false);
+ if (!err)
+ err = ceph_osdc_wait_request(&fsc->client->osdc, req);
+out_put:
+ ceph_osdc_put_request(req);
+ if (err == -ECANCELED)
+ err = 0;
+out:
+ if (page && page != locked_page) {
+ if (from_pagecache) {
+ unlock_page(page);
+ page_cache_release(page);
+ } else
+ __free_pages(page, 0);
+ }
+
+ dout("uninline_data %p %llx.%llx inline_version %llu = %d\n",
+ inode, ceph_vinop(inode), inline_version, err);
+ return err;
+}
+
static struct vm_operations_struct ceph_vmops = {
.fault = ceph_filemap_fault,
.page_mkwrite = ceph_page_mkwrite,
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index cefca66..b93c631 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -975,10 +975,12 @@ static int send_cap_msg(struct ceph_mds_session *session,
kuid_t uid, kgid_t gid, umode_t mode,
u64 xattr_version,
struct ceph_buffer *xattrs_buf,
- u64 follows)
+ u64 follows, bool inline_data)
{
struct ceph_mds_caps *fc;
struct ceph_msg *msg;
+ void *p;
+ size_t extra_len;
dout("send_cap_msg %s %llx %llx caps %s wanted %s dirty %s"
" seq %u/%u mseq %u follows %lld size %llu/%llu"
@@ -988,7 +990,10 @@ static int send_cap_msg(struct ceph_mds_session *session,
seq, issue_seq, mseq, follows, size, max_size,
xattr_version, xattrs_buf ? (int)xattrs_buf->vec.iov_len : 0);
- msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc), GFP_NOFS, false);
+ /* flock buffer size + inline version + inline data size */
+ extra_len = 4 + 8 + 4;
+ msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc) + extra_len,
+ GFP_NOFS, false);
if (!msg)
return -ENOMEM;
@@ -1020,6 +1025,14 @@ static int send_cap_msg(struct ceph_mds_session *session,
fc->gid = cpu_to_le32(from_kgid(&init_user_ns, gid));
fc->mode = cpu_to_le32(mode);
+ p = fc + 1;
+ /* flock buffer size */
+ ceph_encode_32(&p, 0);
+ /* inline version */
+ ceph_encode_64(&p, inline_data ? 0 : CEPH_INLINE_NONE);
+ /* inline data size */
+ ceph_encode_32(&p, 0);
+
fc->xattr_version = cpu_to_le64(xattr_version);
if (xattrs_buf) {
msg->middle = ceph_buffer_get(xattrs_buf);
@@ -1126,6 +1139,7 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
u64 flush_tid = 0;
int i;
int ret;
+ bool inline_data;
held = cap->issued | cap->implemented;
revoking = cap->implemented & ~cap->issued;
@@ -1209,13 +1223,15 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
xattr_version = ci->i_xattrs.version;
}
+ inline_data = ci->i_inline_version != CEPH_INLINE_NONE;
+
spin_unlock(&ci->i_ceph_lock);
ret = send_cap_msg(session, ceph_vino(inode).ino, cap_id,
op, keep, want, flushing, seq, flush_tid, issue_seq, mseq,
size, max_size, &mtime, &atime, time_warp_seq,
uid, gid, mode, xattr_version, xattr_blob,
- follows);
+ follows, inline_data);
if (ret < 0) {
dout("error sending cap msg, must requeue %p\n", inode);
delayed = 1;
@@ -1336,7 +1352,7 @@ retry:
capsnap->time_warp_seq,
capsnap->uid, capsnap->gid, capsnap->mode,
capsnap->xattr_version, capsnap->xattr_blob,
- capsnap->follows);
+ capsnap->follows, capsnap->inline_data);
next_follows = capsnap->follows + 1;
ceph_put_cap_snap(capsnap);
@@ -2057,15 +2073,17 @@ static void __take_cap_refs(struct ceph_inode_info *ci, int got)
* requested from the MDS.
*/
static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
- int *got, loff_t endoff, int *check_max, int *err)
+ loff_t endoff, int *got, struct page **pinned_page,
+ int *check_max, int *err)
{
struct inode *inode = &ci->vfs_inode;
int ret = 0;
- int have, implemented;
+ int have, implemented, _got = 0;
int file_wanted;
dout("get_cap_refs %p need %s want %s\n", inode,
ceph_cap_string(need), ceph_cap_string(want));
+again:
spin_lock(&ci->i_ceph_lock);
/* make sure file is actually open */
@@ -2075,7 +2093,7 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
ceph_cap_string(need), ceph_cap_string(file_wanted));
*err = -EBADF;
ret = 1;
- goto out;
+ goto out_unlock;
}
/* finish pending truncate */
@@ -2095,7 +2113,7 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
*check_max = 1;
ret = 1;
}
- goto out;
+ goto out_unlock;
}
/*
* If a sync write is in progress, we must wait, so that we
@@ -2103,7 +2121,7 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
*/
if (__ceph_have_pending_cap_snap(ci)) {
dout("get_cap_refs %p cap_snap_pending\n", inode);
- goto out;
+ goto out_unlock;
}
}
@@ -2120,18 +2138,50 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
inode, ceph_cap_string(have), ceph_cap_string(not),
ceph_cap_string(revoking));
if ((revoking & not) == 0) {
- *got = need | (have & want);
- __take_cap_refs(ci, *got);
+ _got = need | (have & want);
+ __take_cap_refs(ci, _got);
ret = 1;
}
} else {
dout("get_cap_refs %p have %s needed %s\n", inode,
ceph_cap_string(have), ceph_cap_string(need));
}
-out:
+out_unlock:
spin_unlock(&ci->i_ceph_lock);
+
+ if (ci->i_inline_version != CEPH_INLINE_NONE &&
+ (_got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) &&
+ i_size_read(inode) > 0) {
+ int ret1;
+ struct page *page = find_get_page(inode->i_mapping, 0);
+ if (page) {
+ if (PageUptodate(page)) {
+ *pinned_page = page;
+ goto out;
+ }
+ page_cache_release(page);
+ }
+ /*
+ * drop cap refs first because getattr while holding
+ * caps refs can cause deadlock.
+ */
+ ceph_put_cap_refs(ci, _got);
+ _got = 0;
+
+ /* getattr request will bring inline data into page cache */
+ ret1 = __ceph_do_getattr(inode, NULL,
+ CEPH_STAT_CAP_INLINE_DATA, true);
+ if (ret1 >= 0) {
+ ret = 0;
+ goto again;
+ }
+ *err = ret1;
+ ret = 1;
+ }
+out:
dout("get_cap_refs %p ret %d got %s\n", inode,
- ret, ceph_cap_string(*got));
+ ret, ceph_cap_string(_got));
+ *got = _got;
return ret;
}
@@ -2168,8 +2218,8 @@ static void check_max_size(struct inode *inode, loff_t endoff)
* due to a small max_size, make sure we check_max_size (and possibly
* ask the mds) so we don't get hung up indefinitely.
*/
-int ceph_get_caps(struct ceph_inode_info *ci, int need, int want, int *got,
- loff_t endoff)
+int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
+ loff_t endoff, int *got, struct page **pinned_page)
{
int check_max, ret, err;
@@ -2179,8 +2229,8 @@ retry:
check_max = 0;
err = 0;
ret = wait_event_interruptible(ci->i_cap_wq,
- try_get_cap_refs(ci, need, want,
- got, endoff,
+ try_get_cap_refs(ci, need, want, endoff,
+ got, pinned_page,
&check_max, &err));
if (err)
ret = err;
@@ -2383,6 +2433,8 @@ static void invalidate_aliases(struct inode *inode)
static void handle_cap_grant(struct ceph_mds_client *mdsc,
struct inode *inode, struct ceph_mds_caps *grant,
void *snaptrace, int snaptrace_len,
+ u64 inline_version,
+ void *inline_data, int inline_len,
struct ceph_buffer *xattr_buf,
struct ceph_mds_session *session,
struct ceph_cap *cap, int issued)
@@ -2403,6 +2455,7 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
bool queue_invalidate = false;
bool queue_revalidate = false;
bool deleted_inode = false;
+ bool fill_inline = false;
dout("handle_cap_grant inode %p cap %p mds%d seq %d %s\n",
inode, cap, mds, seq, ceph_cap_string(newcaps));
@@ -2576,6 +2629,13 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
}
BUG_ON(cap->issued & ~cap->implemented);
+ if (inline_version > 0 && inline_version >= ci->i_inline_version) {
+ ci->i_inline_version = inline_version;
+ if (ci->i_inline_version != CEPH_INLINE_NONE &&
+ (newcaps & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)))
+ fill_inline = true;
+ }
+
spin_unlock(&ci->i_ceph_lock);
if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) {
@@ -2589,6 +2649,9 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
wake = true;
}
+ if (fill_inline)
+ ceph_fill_inline_data(inode, NULL, inline_data, inline_len);
+
if (queue_trunc) {
ceph_queue_vmtruncate(inode);
ceph_queue_revalidate(inode);
@@ -2996,11 +3059,12 @@ void ceph_handle_caps(struct ceph_mds_session *session,
u64 cap_id;
u64 size, max_size;
u64 tid;
+ u64 inline_version = 0;
+ void *inline_data = NULL;
+ u32 inline_len = 0;
void *snaptrace;
size_t snaptrace_len;
- void *flock;
- void *end;
- u32 flock_len;
+ void *p, *end;
dout("handle_caps from mds%d\n", mds);
@@ -3021,30 +3085,37 @@ void ceph_handle_caps(struct ceph_mds_session *session,
snaptrace = h + 1;
snaptrace_len = le32_to_cpu(h->snap_trace_len);
+ p = snaptrace + snaptrace_len;
if (le16_to_cpu(msg->hdr.version) >= 2) {
- void *p = snaptrace + snaptrace_len;
+ u32 flock_len;
ceph_decode_32_safe(&p, end, flock_len, bad);
if (p + flock_len > end)
goto bad;
- flock = p;
- } else {
- flock = NULL;
- flock_len = 0;
+ p += flock_len;
}
if (le16_to_cpu(msg->hdr.version) >= 3) {
if (op == CEPH_CAP_OP_IMPORT) {
- void *p = flock + flock_len;
if (p + sizeof(*peer) > end)
goto bad;
peer = p;
+ p += sizeof(*peer);
} else if (op == CEPH_CAP_OP_EXPORT) {
/* recorded in unused fields */
peer = (void *)&h->size;
}
}
+ if (le16_to_cpu(msg->hdr.version) >= 4) {
+ ceph_decode_64_safe(&p, end, inline_version, bad);
+ ceph_decode_32_safe(&p, end, inline_len, bad);
+ if (p + inline_len > end)
+ goto bad;
+ inline_data = p;
+ p += inline_len;
+ }
+
/* lookup ino */
inode = ceph_find_inode(sb, vino);
ci = ceph_inode(inode);
@@ -3085,6 +3156,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
handle_cap_import(mdsc, inode, h, peer, session,
&cap, &issued);
handle_cap_grant(mdsc, inode, h, snaptrace, snaptrace_len,
+ inline_version, inline_data, inline_len,
msg->middle, session, cap, issued);
goto done_unlocked;
}
@@ -3105,8 +3177,9 @@ void ceph_handle_caps(struct ceph_mds_session *session,
case CEPH_CAP_OP_GRANT:
__ceph_caps_issued(ci, &issued);
issued |= __ceph_caps_dirty(ci);
- handle_cap_grant(mdsc, inode, h, NULL, 0, msg->middle,
- session, cap, issued);
+ handle_cap_grant(mdsc, inode, h, NULL, 0,
+ inline_version, inline_data, inline_len,
+ msg->middle, session, cap, issued);
goto done_unlocked;
case CEPH_CAP_OP_FLUSH_ACK:
@@ -3137,8 +3210,7 @@ flush_cap_releases:
done:
mutex_unlock(&session->s_mutex);
done_unlocked:
- if (inode)
- iput(inode);
+ iput(inode);
return;
bad:
diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
index 681a853..c241603 100644
--- a/fs/ceph/dir.c
+++ b/fs/ceph/dir.c
@@ -183,7 +183,7 @@ more:
spin_unlock(&parent->d_lock);
/* make sure a dentry wasn't dropped while we didn't have parent lock */
- if (!ceph_dir_is_complete(dir)) {
+ if (!ceph_dir_is_complete_ordered(dir)) {
dout(" lost dir complete on %p; falling back to mds\n", dir);
dput(dentry);
err = -EAGAIN;
@@ -261,10 +261,6 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
/* always start with . and .. */
if (ctx->pos == 0) {
- /* note dir version at start of readdir so we can tell
- * if any dentries get dropped */
- fi->dir_release_count = atomic_read(&ci->i_release_count);
-
dout("readdir off 0 -> '.'\n");
if (!dir_emit(ctx, ".", 1,
ceph_translate_ino(inode->i_sb, inode->i_ino),
@@ -289,7 +285,7 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
if ((ctx->pos == 2 || fi->dentry) &&
!ceph_test_mount_opt(fsc, NOASYNCREADDIR) &&
ceph_snap(inode) != CEPH_SNAPDIR &&
- __ceph_dir_is_complete(ci) &&
+ __ceph_dir_is_complete_ordered(ci) &&
__ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1)) {
u32 shared_gen = ci->i_shared_gen;
spin_unlock(&ci->i_ceph_lock);
@@ -312,6 +308,13 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
/* proceed with a normal readdir */
+ if (ctx->pos == 2) {
+ /* note dir version at start of readdir so we can tell
+ * if any dentries get dropped */
+ fi->dir_release_count = atomic_read(&ci->i_release_count);
+ fi->dir_ordered_count = ci->i_ordered_count;
+ }
+
more:
/* do we have the correct frag content buffered? */
if (fi->frag != frag || fi->last_readdir == NULL) {
@@ -446,8 +449,12 @@ more:
*/
spin_lock(&ci->i_ceph_lock);
if (atomic_read(&ci->i_release_count) == fi->dir_release_count) {
- dout(" marking %p complete\n", inode);
- __ceph_dir_set_complete(ci, fi->dir_release_count);
+ if (ci->i_ordered_count == fi->dir_ordered_count)
+ dout(" marking %p complete and ordered\n", inode);
+ else
+ dout(" marking %p complete\n", inode);
+ __ceph_dir_set_complete(ci, fi->dir_release_count,
+ fi->dir_ordered_count);
}
spin_unlock(&ci->i_ceph_lock);
@@ -805,7 +812,9 @@ static int ceph_mkdir(struct inode *dir, struct dentry *dentry, umode_t mode)
acls.pagelist = NULL;
}
err = ceph_mdsc_do_request(mdsc, dir, req);
- if (!err && !req->r_reply_info.head->is_dentry)
+ if (!err &&
+ !req->r_reply_info.head->is_target &&
+ !req->r_reply_info.head->is_dentry)
err = ceph_handle_notrace_create(dir, dentry);
ceph_mdsc_put_request(req);
out:
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 9f8e357..ce74b39 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -333,6 +333,11 @@ int ceph_release(struct inode *inode, struct file *file)
return 0;
}
+enum {
+ CHECK_EOF = 1,
+ READ_INLINE = 2,
+};
+
/*
* Read a range of bytes striped over one or more objects. Iterate over
* objects we stripe over. (That's not atomic, but good enough for now.)
@@ -412,7 +417,7 @@ more:
ret = read;
/* did we bounce off eof? */
if (pos + left > inode->i_size)
- *checkeof = 1;
+ *checkeof = CHECK_EOF;
}
dout("striped_read returns %d\n", ret);
@@ -598,7 +603,7 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos)
snapc = ci->i_snap_realm->cached_context;
vino = ceph_vino(inode);
req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
- vino, pos, &len,
+ vino, pos, &len, 0,
2,/*include a 'startsync' command*/
CEPH_OSD_OP_WRITE, flags, snapc,
ci->i_truncate_seq,
@@ -609,6 +614,8 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos)
break;
}
+ osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC);
+
n = iov_iter_get_pages_alloc(from, &pages, len, &start);
if (unlikely(n < 0)) {
ret = n;
@@ -713,7 +720,7 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos)
snapc = ci->i_snap_realm->cached_context;
vino = ceph_vino(inode);
req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
- vino, pos, &len, 1,
+ vino, pos, &len, 0, 1,
CEPH_OSD_OP_WRITE, flags, snapc,
ci->i_truncate_seq,
ci->i_truncate_size,
@@ -803,9 +810,10 @@ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to)
size_t len = iocb->ki_nbytes;
struct inode *inode = file_inode(filp);
struct ceph_inode_info *ci = ceph_inode(inode);
+ struct page *pinned_page = NULL;
ssize_t ret;
int want, got = 0;
- int checkeof = 0, read = 0;
+ int retry_op = 0, read = 0;
again:
dout("aio_read %p %llx.%llx %llu~%u trying to get caps on %p\n",
@@ -815,7 +823,7 @@ again:
want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO;
else
want = CEPH_CAP_FILE_CACHE;
- ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, &got, -1);
+ ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, -1, &got, &pinned_page);
if (ret < 0)
return ret;
@@ -827,8 +835,12 @@ again:
inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len,
ceph_cap_string(got));
- /* hmm, this isn't really async... */
- ret = ceph_sync_read(iocb, to, &checkeof);
+ if (ci->i_inline_version == CEPH_INLINE_NONE) {
+ /* hmm, this isn't really async... */
+ ret = ceph_sync_read(iocb, to, &retry_op);
+ } else {
+ retry_op = READ_INLINE;
+ }
} else {
dout("aio_read %p %llx.%llx %llu~%u got cap refs on %s\n",
inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len,
@@ -838,13 +850,55 @@ again:
}
dout("aio_read %p %llx.%llx dropping cap refs on %s = %d\n",
inode, ceph_vinop(inode), ceph_cap_string(got), (int)ret);
+ if (pinned_page) {
+ page_cache_release(pinned_page);
+ pinned_page = NULL;
+ }
ceph_put_cap_refs(ci, got);
+ if (retry_op && ret >= 0) {
+ int statret;
+ struct page *page = NULL;
+ loff_t i_size;
+ if (retry_op == READ_INLINE) {
+ page = __page_cache_alloc(GFP_NOFS);
+ if (!page)
+ return -ENOMEM;
+ }
- if (checkeof && ret >= 0) {
- int statret = ceph_do_getattr(inode, CEPH_STAT_CAP_SIZE, false);
+ statret = __ceph_do_getattr(inode, page,
+ CEPH_STAT_CAP_INLINE_DATA, !!page);
+ if (statret < 0) {
+ __free_page(page);
+ if (statret == -ENODATA) {
+ BUG_ON(retry_op != READ_INLINE);
+ goto again;
+ }
+ return statret;
+ }
+
+ i_size = i_size_read(inode);
+ if (retry_op == READ_INLINE) {
+ /* does not support inline data > PAGE_SIZE */
+ if (i_size > PAGE_CACHE_SIZE) {
+ ret = -EIO;
+ } else if (iocb->ki_pos < i_size) {
+ loff_t end = min_t(loff_t, i_size,
+ iocb->ki_pos + len);
+ if (statret < end)
+ zero_user_segment(page, statret, end);
+ ret = copy_page_to_iter(page,
+ iocb->ki_pos & ~PAGE_MASK,
+ end - iocb->ki_pos, to);
+ iocb->ki_pos += ret;
+ } else {
+ ret = 0;
+ }
+ __free_pages(page, 0);
+ return ret;
+ }
/* hit EOF or hole? */
- if (statret == 0 && iocb->ki_pos < inode->i_size &&
+ if (retry_op == CHECK_EOF && iocb->ki_pos < i_size &&
ret < len) {
dout("sync_read hit hole, ppos %lld < size %lld"
", reading more\n", iocb->ki_pos,
@@ -852,7 +906,7 @@ again:
read += ret;
len -= ret;
- checkeof = 0;
+ retry_op = 0;
goto again;
}
}
@@ -909,6 +963,12 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
if (err)
goto out;
+ if (ci->i_inline_version != CEPH_INLINE_NONE) {
+ err = ceph_uninline_data(file, NULL);
+ if (err < 0)
+ goto out;
+ }
+
retry_snap:
if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL)) {
err = -ENOSPC;
@@ -922,7 +982,8 @@ retry_snap:
else
want = CEPH_CAP_FILE_BUFFER;
got = 0;
- err = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, pos + count);
+ err = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, pos + count,
+ &got, NULL);
if (err < 0)
goto out;
@@ -969,6 +1030,7 @@ retry_snap:
if (written >= 0) {
int dirty;
spin_lock(&ci->i_ceph_lock);
+ ci->i_inline_version = CEPH_INLINE_NONE;
dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
spin_unlock(&ci->i_ceph_lock);
if (dirty)
@@ -1111,7 +1173,7 @@ static int ceph_zero_partial_object(struct inode *inode,
req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
ceph_vino(inode),
offset, length,
- 1, op,
+ 0, 1, op,
CEPH_OSD_FLAG_WRITE |
CEPH_OSD_FLAG_ONDISK,
NULL, 0, 0, false);
@@ -1214,6 +1276,12 @@ static long ceph_fallocate(struct file *file, int mode,
goto unlock;
}
+ if (ci->i_inline_version != CEPH_INLINE_NONE) {
+ ret = ceph_uninline_data(file, NULL);
+ if (ret < 0)
+ goto unlock;
+ }
+
size = i_size_read(inode);
if (!(mode & FALLOC_FL_KEEP_SIZE))
endoff = offset + length;
@@ -1223,7 +1291,7 @@ static long ceph_fallocate(struct file *file, int mode,
else
want = CEPH_CAP_FILE_BUFFER;
- ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, endoff);
+ ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, endoff, &got, NULL);
if (ret < 0)
goto unlock;
@@ -1240,6 +1308,7 @@ static long ceph_fallocate(struct file *file, int mode,
if (!ret) {
spin_lock(&ci->i_ceph_lock);
+ ci->i_inline_version = CEPH_INLINE_NONE;
dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
spin_unlock(&ci->i_ceph_lock);
if (dirty)
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index a5593d5..f61a741 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -387,8 +387,10 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
spin_lock_init(&ci->i_ceph_lock);
ci->i_version = 0;
+ ci->i_inline_version = 0;
ci->i_time_warp_seq = 0;
ci->i_ceph_flags = 0;
+ ci->i_ordered_count = 0;
atomic_set(&ci->i_release_count, 1);
atomic_set(&ci->i_complete_count, 0);
ci->i_symlink = NULL;
@@ -657,7 +659,7 @@ void ceph_fill_file_time(struct inode *inode, int issued,
* Populate an inode based on info from mds. May be called on new or
* existing inodes.
*/
-static int fill_inode(struct inode *inode,
+static int fill_inode(struct inode *inode, struct page *locked_page,
struct ceph_mds_reply_info_in *iinfo,
struct ceph_mds_reply_dirfrag *dirinfo,
struct ceph_mds_session *session,
@@ -675,6 +677,7 @@ static int fill_inode(struct inode *inode,
bool wake = false;
bool queue_trunc = false;
bool new_version = false;
+ bool fill_inline = false;
dout("fill_inode %p ino %llx.%llx v %llu had %llu\n",
inode, ceph_vinop(inode), le64_to_cpu(info->version),
@@ -845,7 +848,8 @@ static int fill_inode(struct inode *inode,
(issued & CEPH_CAP_FILE_EXCL) == 0 &&
!__ceph_dir_is_complete(ci)) {
dout(" marking %p complete (empty)\n", inode);
- __ceph_dir_set_complete(ci, atomic_read(&ci->i_release_count));
+ __ceph_dir_set_complete(ci, atomic_read(&ci->i_release_count),
+ ci->i_ordered_count);
}
/* were we issued a capability? */
@@ -873,8 +877,23 @@ static int fill_inode(struct inode *inode,
ceph_vinop(inode));
__ceph_get_fmode(ci, cap_fmode);
}
+
+ if (iinfo->inline_version > 0 &&
+ iinfo->inline_version >= ci->i_inline_version) {
+ int cache_caps = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO;
+ ci->i_inline_version = iinfo->inline_version;
+ if (ci->i_inline_version != CEPH_INLINE_NONE &&
+ (locked_page ||
+ (le32_to_cpu(info->cap.caps) & cache_caps)))
+ fill_inline = true;
+ }
+
spin_unlock(&ci->i_ceph_lock);
+ if (fill_inline)
+ ceph_fill_inline_data(inode, locked_page,
+ iinfo->inline_data, iinfo->inline_len);
+
if (wake)
wake_up_all(&ci->i_cap_wq);
@@ -1062,7 +1081,8 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
struct inode *dir = req->r_locked_dir;
if (dir) {
- err = fill_inode(dir, &rinfo->diri, rinfo->dirfrag,
+ err = fill_inode(dir, NULL,
+ &rinfo->diri, rinfo->dirfrag,
session, req->r_request_started, -1,
&req->r_caps_reservation);
if (err < 0)
@@ -1132,7 +1152,7 @@ retry_lookup:
}
req->r_target_inode = in;
- err = fill_inode(in, &rinfo->targeti, NULL,
+ err = fill_inode(in, req->r_locked_page, &rinfo->targeti, NULL,
session, req->r_request_started,
(!req->r_aborted && rinfo->head->result == 0) ?
req->r_fmode : -1,
@@ -1204,8 +1224,8 @@ retry_lookup:
ceph_invalidate_dentry_lease(dn);
/* d_move screws up sibling dentries' offsets */
- ceph_dir_clear_complete(dir);
- ceph_dir_clear_complete(olddir);
+ ceph_dir_clear_ordered(dir);
+ ceph_dir_clear_ordered(olddir);
dout("dn %p gets new offset %lld\n", req->r_old_dentry,
ceph_dentry(req->r_old_dentry)->offset);
@@ -1217,6 +1237,7 @@ retry_lookup:
if (!rinfo->head->is_target) {
dout("fill_trace null dentry\n");
if (dn->d_inode) {
+ ceph_dir_clear_ordered(dir);
dout("d_delete %p\n", dn);
d_delete(dn);
} else {
@@ -1233,7 +1254,7 @@ retry_lookup:
/* attach proper inode */
if (!dn->d_inode) {
- ceph_dir_clear_complete(dir);
+ ceph_dir_clear_ordered(dir);
ihold(in);
dn = splice_dentry(dn, in, &have_lease);
if (IS_ERR(dn)) {
@@ -1263,7 +1284,7 @@ retry_lookup:
BUG_ON(!dir);
BUG_ON(ceph_snap(dir) != CEPH_SNAPDIR);
dout(" linking snapped dir %p to dn %p\n", in, dn);
- ceph_dir_clear_complete(dir);
+ ceph_dir_clear_ordered(dir);
ihold(in);
dn = splice_dentry(dn, in, NULL);
if (IS_ERR(dn)) {
@@ -1300,7 +1321,7 @@ static int readdir_prepopulate_inodes_only(struct ceph_mds_request *req,
dout("new_inode badness got %d\n", err);
continue;
}
- rc = fill_inode(in, &rinfo->dir_in[i], NULL, session,
+ rc = fill_inode(in, NULL, &rinfo->dir_in[i], NULL, session,
req->r_request_started, -1,
&req->r_caps_reservation);
if (rc < 0) {
@@ -1416,7 +1437,7 @@ retry_lookup:
}
}
- if (fill_inode(in, &rinfo->dir_in[i], NULL, session,
+ if (fill_inode(in, NULL, &rinfo->dir_in[i], NULL, session,
req->r_request_started, -1,
&req->r_caps_reservation) < 0) {
pr_err("fill_inode badness on %p\n", in);
@@ -1899,7 +1920,8 @@ out_put:
* Verify that we have a lease on the given mask. If not,
* do a getattr against an mds.
*/
-int ceph_do_getattr(struct inode *inode, int mask, bool force)
+int __ceph_do_getattr(struct inode *inode, struct page *locked_page,
+ int mask, bool force)
{
struct ceph_fs_client *fsc = ceph_sb_to_client(inode->i_sb);
struct ceph_mds_client *mdsc = fsc->mdsc;
@@ -1911,7 +1933,8 @@ int ceph_do_getattr(struct inode *inode, int mask, bool force)
return 0;
}
- dout("do_getattr inode %p mask %s mode 0%o\n", inode, ceph_cap_string(mask), inode->i_mode);
+ dout("do_getattr inode %p mask %s mode 0%o\n",
+ inode, ceph_cap_string(mask), inode->i_mode);
if (!force && ceph_caps_issued_mask(ceph_inode(inode), mask, 1))
return 0;
@@ -1922,7 +1945,19 @@ int ceph_do_getattr(struct inode *inode, int mask, bool force)
ihold(inode);
req->r_num_caps = 1;
req->r_args.getattr.mask = cpu_to_le32(mask);
+ req->r_locked_page = locked_page;
err = ceph_mdsc_do_request(mdsc, NULL, req);
+ if (locked_page && err == 0) {
+ u64 inline_version = req->r_reply_info.targeti.inline_version;
+ if (inline_version == 0) {
+ /* the reply is supposed to contain inline data */
+ err = -EINVAL;
+ } else if (inline_version == CEPH_INLINE_NONE) {
+ err = -ENODATA;
+ } else {
+ err = req->r_reply_info.targeti.inline_len;
+ }
+ }
ceph_mdsc_put_request(req);
dout("do_getattr result=%d\n", err);
return err;
diff --git a/fs/ceph/locks.c b/fs/ceph/locks.c
index fbc39c4..c35c5c6 100644
--- a/fs/ceph/locks.c
+++ b/fs/ceph/locks.c
@@ -9,6 +9,8 @@
#include <linux/ceph/pagelist.h>
static u64 lock_secret;
+static int ceph_lock_wait_for_completion(struct ceph_mds_client *mdsc,
+ struct ceph_mds_request *req);
static inline u64 secure_addr(void *addr)
{
@@ -40,6 +42,9 @@ static int ceph_lock_message(u8 lock_type, u16 operation, struct file *file,
u64 length = 0;
u64 owner;
+ if (operation != CEPH_MDS_OP_SETFILELOCK || cmd == CEPH_LOCK_UNLOCK)
+ wait = 0;
+
req = ceph_mdsc_create_request(mdsc, operation, USE_AUTH_MDS);
if (IS_ERR(req))
return PTR_ERR(req);
@@ -68,6 +73,9 @@ static int ceph_lock_message(u8 lock_type, u16 operation, struct file *file,
req->r_args.filelock_change.length = cpu_to_le64(length);
req->r_args.filelock_change.wait = wait;
+ if (wait)
+ req->r_wait_for_completion = ceph_lock_wait_for_completion;
+
err = ceph_mdsc_do_request(mdsc, inode, req);
if (operation == CEPH_MDS_OP_GETFILELOCK) {
@@ -96,6 +104,52 @@ static int ceph_lock_message(u8 lock_type, u16 operation, struct file *file,
return err;
}
+static int ceph_lock_wait_for_completion(struct ceph_mds_client *mdsc,
+ struct ceph_mds_request *req)
+{
+ struct ceph_mds_request *intr_req;
+ struct inode *inode = req->r_inode;
+ int err, lock_type;
+
+ BUG_ON(req->r_op != CEPH_MDS_OP_SETFILELOCK);
+ if (req->r_args.filelock_change.rule == CEPH_LOCK_FCNTL)
+ lock_type = CEPH_LOCK_FCNTL_INTR;
+ else if (req->r_args.filelock_change.rule == CEPH_LOCK_FLOCK)
+ lock_type = CEPH_LOCK_FLOCK_INTR;
+ else
+ BUG_ON(1);
+ BUG_ON(req->r_args.filelock_change.type == CEPH_LOCK_UNLOCK);
+
+ err = wait_for_completion_interruptible(&req->r_completion);
+ if (!err)
+ return 0;
+
+ dout("ceph_lock_wait_for_completion: request %llu was interrupted\n",
+ req->r_tid);
+
+ intr_req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_SETFILELOCK,
+ USE_AUTH_MDS);
+ if (IS_ERR(intr_req))
+ return PTR_ERR(intr_req);
+
+ intr_req->r_inode = inode;
+ ihold(inode);
+ intr_req->r_num_caps = 1;
+
+ intr_req->r_args.filelock_change = req->r_args.filelock_change;
+ intr_req->r_args.filelock_change.rule = lock_type;
+ intr_req->r_args.filelock_change.type = CEPH_LOCK_UNLOCK;
+
+ err = ceph_mdsc_do_request(mdsc, inode, intr_req);
+ ceph_mdsc_put_request(intr_req);
+
+ if (err && err != -ERESTARTSYS)
+ return err;
+
+ wait_for_completion(&req->r_completion);
+ return 0;
+}
+
/**
* Attempt to set an fcntl lock.
* For now, this just goes away to the server. Later it may be more awesome.
@@ -143,11 +197,6 @@ int ceph_lock(struct file *file, int cmd, struct file_lock *fl)
err);
}
}
-
- } else if (err == -ERESTARTSYS) {
- dout("undoing lock\n");
- ceph_lock_message(CEPH_LOCK_FCNTL, op, file,
- CEPH_LOCK_UNLOCK, 0, fl);
}
return err;
}
@@ -186,11 +235,6 @@ int ceph_flock(struct file *file, int cmd, struct file_lock *fl)
file, CEPH_LOCK_UNLOCK, 0, fl);
dout("got %d on flock_lock_file_wait, undid lock", err);
}
- } else if (err == -ERESTARTSYS) {
- dout("undoing lock\n");
- ceph_lock_message(CEPH_LOCK_FLOCK,
- CEPH_MDS_OP_SETFILELOCK,
- file, CEPH_LOCK_UNLOCK, 0, fl);
}
return err;
}
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index a92d3f5..d2171f4 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -89,6 +89,16 @@ static int parse_reply_info_in(void **p, void *end,
ceph_decode_need(p, end, info->xattr_len, bad);
info->xattr_data = *p;
*p += info->xattr_len;
+
+ if (features & CEPH_FEATURE_MDS_INLINE_DATA) {
+ ceph_decode_64_safe(p, end, info->inline_version, bad);
+ ceph_decode_32_safe(p, end, info->inline_len, bad);
+ ceph_decode_need(p, end, info->inline_len, bad);
+ info->inline_data = *p;
+ *p += info->inline_len;
+ } else
+ info->inline_version = CEPH_INLINE_NONE;
+
return 0;
bad:
return err;
@@ -524,8 +534,7 @@ void ceph_mdsc_release_request(struct kref *kref)
}
if (req->r_locked_dir)
ceph_put_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN);
- if (req->r_target_inode)
- iput(req->r_target_inode);
+ iput(req->r_target_inode);
if (req->r_dentry)
dput(req->r_dentry);
if (req->r_old_dentry)
@@ -861,8 +870,11 @@ static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u6
/*
* Serialize client metadata into waiting buffer space, using
* the format that userspace expects for map<string, string>
+ *
+ * ClientSession messages with metadata are v2
*/
- msg->hdr.version = 2; /* ClientSession messages with metadata are v2 */
+ msg->hdr.version = cpu_to_le16(2);
+ msg->hdr.compat_version = cpu_to_le16(1);
/* The write pointer, following the session_head structure */
p = msg->front.iov_base + sizeof(*h);
@@ -1066,8 +1078,7 @@ out:
session->s_cap_iterator = NULL;
spin_unlock(&session->s_cap_lock);
- if (last_inode)
- iput(last_inode);
+ iput(last_inode);
if (old_cap)
ceph_put_cap(session->s_mdsc, old_cap);
@@ -1874,7 +1885,7 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
goto out_free2;
}
- msg->hdr.version = 2;
+ msg->hdr.version = cpu_to_le16(2);
msg->hdr.tid = cpu_to_le64(req->r_tid);
head = msg->front.iov_base;
@@ -2208,6 +2219,8 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
&req->r_completion, req->r_timeout);
if (err == 0)
err = -EIO;
+ } else if (req->r_wait_for_completion) {
+ err = req->r_wait_for_completion(mdsc, req);
} else {
err = wait_for_completion_killable(&req->r_completion);
}
@@ -3744,6 +3757,20 @@ static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con,
return msg;
}
+static int sign_message(struct ceph_connection *con, struct ceph_msg *msg)
+{
+ struct ceph_mds_session *s = con->private;
+ struct ceph_auth_handshake *auth = &s->s_auth;
+ return ceph_auth_sign_message(auth, msg);
+}
+
+static int check_message_signature(struct ceph_connection *con, struct ceph_msg *msg)
+{
+ struct ceph_mds_session *s = con->private;
+ struct ceph_auth_handshake *auth = &s->s_auth;
+ return ceph_auth_check_message_signature(auth, msg);
+}
+
static const struct ceph_connection_operations mds_con_ops = {
.get = con_get,
.put = con_put,
@@ -3753,6 +3780,8 @@ static const struct ceph_connection_operations mds_con_ops = {
.invalidate_authorizer = invalidate_authorizer,
.peer_reset = peer_reset,
.alloc_msg = mds_alloc_msg,
+ .sign_message = sign_message,
+ .check_message_signature = check_message_signature,
};
/* eof */
diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
index 3288359..e2817d0 100644
--- a/fs/ceph/mds_client.h
+++ b/fs/ceph/mds_client.h
@@ -41,6 +41,9 @@ struct ceph_mds_reply_info_in {
char *symlink;
u32 xattr_len;
char *xattr_data;
+ u64 inline_version;
+ u32 inline_len;
+ char *inline_data;
};
/*
@@ -166,6 +169,11 @@ struct ceph_mds_client;
*/
typedef void (*ceph_mds_request_callback_t) (struct ceph_mds_client *mdsc,
struct ceph_mds_request *req);
+/*
+ * wait for request completion callback
+ */
+typedef int (*ceph_mds_request_wait_callback_t) (struct ceph_mds_client *mdsc,
+ struct ceph_mds_request *req);
/*
* an in-flight mds request
@@ -215,6 +223,7 @@ struct ceph_mds_request {
int r_request_release_offset;
struct ceph_msg *r_reply;
struct ceph_mds_reply_info_parsed r_reply_info;
+ struct page *r_locked_page;
int r_err;
bool r_aborted;
@@ -239,6 +248,7 @@ struct ceph_mds_request {
struct completion r_completion;
struct completion r_safe_completion;
ceph_mds_request_callback_t r_callback;
+ ceph_mds_request_wait_callback_t r_wait_for_completion;
struct list_head r_unsafe_item; /* per-session unsafe list item */
bool r_got_unsafe, r_got_safe, r_got_result;
diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c
index f01645a..ce35fbd 100644
--- a/fs/ceph/snap.c
+++ b/fs/ceph/snap.c
@@ -288,6 +288,9 @@ static int cmpu64_rev(const void *a, const void *b)
return 0;
}
+
+static struct ceph_snap_context *empty_snapc;
+
/*
* build the snap context for a given realm.
*/
@@ -328,6 +331,12 @@ static int build_snap_context(struct ceph_snap_realm *realm)
return 0;
}
+ if (num == 0 && realm->seq == empty_snapc->seq) {
+ ceph_get_snap_context(empty_snapc);
+ snapc = empty_snapc;
+ goto done;
+ }
+
/* alloc new snap context */
err = -ENOMEM;
if (num > (SIZE_MAX - sizeof(*snapc)) / sizeof(u64))
@@ -365,8 +374,8 @@ static int build_snap_context(struct ceph_snap_realm *realm)
realm->ino, realm, snapc, snapc->seq,
(unsigned int) snapc->num_snaps);
- if (realm->cached_context)
- ceph_put_snap_context(realm->cached_context);
+done:
+ ceph_put_snap_context(realm->cached_context);
realm->cached_context = snapc;
return 0;
@@ -466,6 +475,9 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci)
cap_snap. lucky us. */
dout("queue_cap_snap %p already pending\n", inode);
kfree(capsnap);
+ } else if (ci->i_snap_realm->cached_context == empty_snapc) {
+ dout("queue_cap_snap %p empty snapc\n", inode);
+ kfree(capsnap);
} else if (dirty & (CEPH_CAP_AUTH_EXCL|CEPH_CAP_XATTR_EXCL|
CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR)) {
struct ceph_snap_context *snapc = ci->i_head_snapc;
@@ -504,6 +516,8 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci)
capsnap->xattr_version = 0;
}
+ capsnap->inline_data = ci->i_inline_version != CEPH_INLINE_NONE;
+
/* dirty page count moved from _head to this cap_snap;
all subsequent writes page dirties occur _after_ this
snapshot. */
@@ -590,15 +604,13 @@ static void queue_realm_cap_snaps(struct ceph_snap_realm *realm)
if (!inode)
continue;
spin_unlock(&realm->inodes_with_caps_lock);
- if (lastinode)
- iput(lastinode);
+ iput(lastinode);
lastinode = inode;
ceph_queue_cap_snap(ci);
spin_lock(&realm->inodes_with_caps_lock);
}
spin_unlock(&realm->inodes_with_caps_lock);
- if (lastinode)
- iput(lastinode);
+ iput(lastinode);
list_for_each_entry(child, &realm->children, child_item) {
dout("queue_realm_cap_snaps %p %llx queue child %p %llx\n",
@@ -928,5 +940,16 @@ out:
return;
}
+int __init ceph_snap_init(void)
+{
+ empty_snapc = ceph_create_snap_context(0, GFP_NOFS);
+ if (!empty_snapc)
+ return -ENOMEM;
+ empty_snapc->seq = 1;
+ return 0;
+}
-
+void ceph_snap_exit(void)
+{
+ ceph_put_snap_context(empty_snapc);
+}
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index f6e1237..50f06cd 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -515,7 +515,8 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
struct ceph_fs_client *fsc;
const u64 supported_features =
CEPH_FEATURE_FLOCK |
- CEPH_FEATURE_DIRLAYOUTHASH;
+ CEPH_FEATURE_DIRLAYOUTHASH |
+ CEPH_FEATURE_MDS_INLINE_DATA;
const u64 required_features = 0;
int page_count;
size_t size;
@@ -1017,9 +1018,6 @@ static struct file_system_type ceph_fs_type = {
};
MODULE_ALIAS_FS("ceph");
-#define _STRINGIFY(x) #x
-#define STRINGIFY(x) _STRINGIFY(x)
-
static int __init init_ceph(void)
{
int ret = init_caches();
@@ -1028,15 +1026,20 @@ static int __init init_ceph(void)
ceph_flock_init();
ceph_xattr_init();
+ ret = ceph_snap_init();
+ if (ret)
+ goto out_xattr;
ret = register_filesystem(&ceph_fs_type);
if (ret)
- goto out_icache;
+ goto out_snap;
pr_info("loaded (mds proto %d)\n", CEPH_MDSC_PROTOCOL);
return 0;
-out_icache:
+out_snap:
+ ceph_snap_exit();
+out_xattr:
ceph_xattr_exit();
destroy_caches();
out:
@@ -1047,6 +1050,7 @@ static void __exit exit_ceph(void)
{
dout("exit_ceph\n");
unregister_filesystem(&ceph_fs_type);
+ ceph_snap_exit();
ceph_xattr_exit();
destroy_caches();
}
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index b82f507..e1aa32d 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -161,6 +161,7 @@ struct ceph_cap_snap {
u64 time_warp_seq;
int writing; /* a sync write is still in progress */
int dirty_pages; /* dirty pages awaiting writeback */
+ bool inline_data;
};
static inline void ceph_put_cap_snap(struct ceph_cap_snap *capsnap)
@@ -253,9 +254,11 @@ struct ceph_inode_info {
spinlock_t i_ceph_lock;
u64 i_version;
+ u64 i_inline_version;
u32 i_time_warp_seq;
unsigned i_ceph_flags;
+ int i_ordered_count;
atomic_t i_release_count;
atomic_t i_complete_count;
@@ -434,14 +437,19 @@ static inline struct inode *ceph_find_inode(struct super_block *sb,
/*
* Ceph inode.
*/
-#define CEPH_I_NODELAY 4 /* do not delay cap release */
-#define CEPH_I_FLUSH 8 /* do not delay flush of dirty metadata */
-#define CEPH_I_NOFLUSH 16 /* do not flush dirty caps */
+#define CEPH_I_DIR_ORDERED 1 /* dentries in dir are ordered */
+#define CEPH_I_NODELAY 4 /* do not delay cap release */
+#define CEPH_I_FLUSH 8 /* do not delay flush of dirty metadata */
+#define CEPH_I_NOFLUSH 16 /* do not flush dirty caps */
static inline void __ceph_dir_set_complete(struct ceph_inode_info *ci,
- int release_count)
+ int release_count, int ordered_count)
{
atomic_set(&ci->i_complete_count, release_count);
+ if (ci->i_ordered_count == ordered_count)
+ ci->i_ceph_flags |= CEPH_I_DIR_ORDERED;
+ else
+ ci->i_ceph_flags &= ~CEPH_I_DIR_ORDERED;
}
static inline void __ceph_dir_clear_complete(struct ceph_inode_info *ci)
@@ -455,16 +463,35 @@ static inline bool __ceph_dir_is_complete(struct ceph_inode_info *ci)
atomic_read(&ci->i_release_count);
}
+static inline bool __ceph_dir_is_complete_ordered(struct ceph_inode_info *ci)
+{
+ return __ceph_dir_is_complete(ci) &&
+ (ci->i_ceph_flags & CEPH_I_DIR_ORDERED);
+}
+
static inline void ceph_dir_clear_complete(struct inode *inode)
{
__ceph_dir_clear_complete(ceph_inode(inode));
}
-static inline bool ceph_dir_is_complete(struct inode *inode)
+static inline void ceph_dir_clear_ordered(struct inode *inode)
{
- return __ceph_dir_is_complete(ceph_inode(inode));
+ struct ceph_inode_info *ci = ceph_inode(inode);
+ spin_lock(&ci->i_ceph_lock);
+ ci->i_ordered_count++;
+ ci->i_ceph_flags &= ~CEPH_I_DIR_ORDERED;
+ spin_unlock(&ci->i_ceph_lock);
}
+static inline bool ceph_dir_is_complete_ordered(struct inode *inode)
+{
+ struct ceph_inode_info *ci = ceph_inode(inode);
+ bool ret;
+ spin_lock(&ci->i_ceph_lock);
+ ret = __ceph_dir_is_complete_ordered(ci);
+ spin_unlock(&ci->i_ceph_lock);
+ return ret;
+}
/* find a specific frag @f */
extern struct ceph_inode_frag *__ceph_find_frag(struct ceph_inode_info *ci,
@@ -580,6 +607,7 @@ struct ceph_file_info {
char *last_name; /* last entry in previous chunk */
struct dentry *dentry; /* next dentry (for dcache readdir) */
int dir_release_count;
+ int dir_ordered_count;
/* used for -o dirstat read() on directory thing */
char *dir_info;
@@ -673,6 +701,8 @@ extern void ceph_queue_cap_snap(struct ceph_inode_info *ci);
extern int __ceph_finish_cap_snap(struct ceph_inode_info *ci,
struct ceph_cap_snap *capsnap);
extern void ceph_cleanup_empty_realms(struct ceph_mds_client *mdsc);
+extern int ceph_snap_init(void);
+extern void ceph_snap_exit(void);
/*
* a cap_snap is "pending" if it is still awaiting an in-progress
@@ -715,7 +745,12 @@ extern void ceph_queue_vmtruncate(struct inode *inode);
extern void ceph_queue_invalidate(struct inode *inode);
extern void ceph_queue_writeback(struct inode *inode);
-extern int ceph_do_getattr(struct inode *inode, int mask, bool force);
+extern int __ceph_do_getattr(struct inode *inode, struct page *locked_page,
+ int mask, bool force);
+static inline int ceph_do_getattr(struct inode *inode, int mask, bool force)
+{
+ return __ceph_do_getattr(inode, NULL, mask, force);
+}
extern int ceph_permission(struct inode *inode, int mask);
extern int ceph_setattr(struct dentry *dentry, struct iattr *attr);
extern int ceph_getattr(struct vfsmount *mnt, struct dentry *dentry,
@@ -830,7 +865,7 @@ extern int ceph_encode_dentry_release(void **p, struct dentry *dn,
int mds, int drop, int unless);
extern int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
- int *got, loff_t endoff);
+ loff_t endoff, int *got, struct page **pinned_page);
/* for counting open files by mode */
static inline void __ceph_get_fmode(struct ceph_inode_info *ci, int mode)
@@ -852,7 +887,9 @@ extern int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
struct file *file, unsigned flags, umode_t mode,
int *opened);
extern int ceph_release(struct inode *inode, struct file *filp);
-
+extern void ceph_fill_inline_data(struct inode *inode, struct page *locked_page,
+ char *data, size_t len);
+int ceph_uninline_data(struct file *filp, struct page *locked_page);
/* dir.c */
extern const struct file_operations ceph_dir_fops;
extern const struct inode_operations ceph_dir_iops;
diff --git a/fs/ceph/super.h.rej b/fs/ceph/super.h.rej
new file mode 100644
index 0000000..88fe3df
--- /dev/null
+++ b/fs/ceph/super.h.rej
@@ -0,0 +1,10 @@
+--- fs/ceph/super.h
++++ fs/ceph/super.h
+@@ -254,6 +255,7 @@
+ spinlock_t i_ceph_lock;
+
+ u64 i_version;
++ u64 i_inline_version;
+ u32 i_time_warp_seq;
+
+ unsigned i_ceph_flags;
diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c
index 678b0d2..5a492ca 100644
--- a/fs/ceph/xattr.c
+++ b/fs/ceph/xattr.c
@@ -854,7 +854,7 @@ static int ceph_sync_setxattr(struct dentry *dentry, const char *name,
struct ceph_pagelist *pagelist = NULL;
int err;
- if (value) {
+ if (size > 0) {
/* copy value into pagelist */
pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS);
if (!pagelist)
@@ -864,7 +864,7 @@ static int ceph_sync_setxattr(struct dentry *dentry, const char *name,
err = ceph_pagelist_append(pagelist, value, size);
if (err)
goto out;
- } else {
+ } else if (!value) {
flags |= CEPH_XATTR_REMOVE;
}
@@ -1001,6 +1001,9 @@ int ceph_setxattr(struct dentry *dentry, const char *name,
if (!strncmp(name, XATTR_SYSTEM_PREFIX, XATTR_SYSTEM_PREFIX_LEN))
return generic_setxattr(dentry, name, value, size, flags);
+ if (size == 0)
+ value = ""; /* empty EA, do not remove */
+
return __ceph_setxattr(dentry, name, value, size, flags);
}
diff --git a/include/linux/ceph/auth.h b/include/linux/ceph/auth.h
index 5f33868..260d78b 100644
--- a/include/linux/ceph/auth.h
+++ b/include/linux/ceph/auth.h
@@ -13,6 +13,7 @@
struct ceph_auth_client;
struct ceph_authorizer;
+struct ceph_msg;
struct ceph_auth_handshake {
struct ceph_authorizer *authorizer;
@@ -20,6 +21,10 @@ struct ceph_auth_handshake {
size_t authorizer_buf_len;
void *authorizer_reply_buf;
size_t authorizer_reply_buf_len;
+ int (*sign_message)(struct ceph_auth_handshake *auth,
+ struct ceph_msg *msg);
+ int (*check_message_signature)(struct ceph_auth_handshake *auth,
+ struct ceph_msg *msg);
};
struct ceph_auth_client_ops {
@@ -66,6 +71,11 @@ struct ceph_auth_client_ops {
void (*reset)(struct ceph_auth_client *ac);
void (*destroy)(struct ceph_auth_client *ac);
+
+ int (*sign_message)(struct ceph_auth_handshake *auth,
+ struct ceph_msg *msg);
+ int (*check_message_signature)(struct ceph_auth_handshake *auth,
+ struct ceph_msg *msg);
};
struct ceph_auth_client {
@@ -113,4 +123,20 @@ extern int ceph_auth_verify_authorizer_reply(struct ceph_auth_client *ac,
extern void ceph_auth_invalidate_authorizer(struct ceph_auth_client *ac,
int peer_type);
+static inline int ceph_auth_sign_message(struct ceph_auth_handshake *auth,
+ struct ceph_msg *msg)
+{
+ if (auth->sign_message)
+ return auth->sign_message(auth, msg);
+ return 0;
+}
+
+static inline
+int ceph_auth_check_message_signature(struct ceph_auth_handshake *auth,
+ struct ceph_msg *msg)
+{
+ if (auth->check_message_signature)
+ return auth->check_message_signature(auth, msg);
+ return 0;
+}
#endif
diff --git a/include/linux/ceph/buffer.h b/include/linux/ceph/buffer.h
index 07ad423..07ca15e 100644
--- a/include/linux/ceph/buffer.h
+++ b/include/linux/ceph/buffer.h
@@ -10,8 +10,7 @@
/*
* a simple reference counted buffer.
*
- * use kmalloc for small sizes (<= one page), vmalloc for larger
- * sizes.
+ * use kmalloc for smaller sizes, vmalloc for larger sizes.
*/
struct ceph_buffer {
struct kref kref;
diff --git a/include/linux/ceph/ceph_features.h b/include/linux/ceph/ceph_features.h
index d12659c..71e05bb 100644
--- a/include/linux/ceph/ceph_features.h
+++ b/include/linux/ceph/ceph_features.h
@@ -84,6 +84,7 @@ static inline u64 ceph_sanitize_features(u64 features)
CEPH_FEATURE_PGPOOL3 | \
CEPH_FEATURE_OSDENC | \
CEPH_FEATURE_CRUSH_TUNABLES | \
+ CEPH_FEATURE_MSG_AUTH | \
CEPH_FEATURE_CRUSH_TUNABLES2 | \
CEPH_FEATURE_REPLY_CREATE_INODE | \
CEPH_FEATURE_OSDHASHPSPOOL | \
diff --git a/include/linux/ceph/ceph_fs.h b/include/linux/ceph/ceph_fs.h
index 3c97d5e..c0dadaa 100644
--- a/include/linux/ceph/ceph_fs.h
+++ b/include/linux/ceph/ceph_fs.h
@@ -522,8 +522,11 @@ struct ceph_mds_reply_dirfrag {
__le32 dist[];
} __attribute__ ((packed));
-#define CEPH_LOCK_FCNTL 1
-#define CEPH_LOCK_FLOCK 2
+#define CEPH_LOCK_FCNTL 1
+#define CEPH_LOCK_FLOCK 2
+#define CEPH_LOCK_FCNTL_INTR 3
+#define CEPH_LOCK_FLOCK_INTR 4
+
#define CEPH_LOCK_SHARED 1
#define CEPH_LOCK_EXCL 2
@@ -549,6 +552,7 @@ struct ceph_filelock {
int ceph_flags_to_mode(int flags);
+#define CEPH_INLINE_NONE ((__u64)-1)
/* capability bits */
#define CEPH_CAP_PIN 1 /* no specific capabilities beyond the pin */
@@ -613,6 +617,8 @@ int ceph_flags_to_mode(int flags);
CEPH_CAP_LINK_SHARED | \
CEPH_CAP_FILE_SHARED | \
CEPH_CAP_XATTR_SHARED)
+#define CEPH_STAT_CAP_INLINE_DATA (CEPH_CAP_FILE_SHARED | \
+ CEPH_CAP_FILE_RD)
#define CEPH_CAP_ANY_SHARED (CEPH_CAP_AUTH_SHARED | \
CEPH_CAP_LINK_SHARED | \
diff --git a/include/linux/ceph/libceph.h b/include/linux/ceph/libceph.h
index 07bc359..8b11a79 100644
--- a/include/linux/ceph/libceph.h
+++ b/include/linux/ceph/libceph.h
@@ -29,6 +29,7 @@
#define CEPH_OPT_NOSHARE (1<<1) /* don't share client with other sbs */
#define CEPH_OPT_MYIP (1<<2) /* specified my ip */
#define CEPH_OPT_NOCRC (1<<3) /* no data crc on writes */
+#define CEPH_OPT_NOMSGAUTH (1<<4) /* not require cephx message signature */
#define CEPH_OPT_DEFAULT (0)
@@ -184,7 +185,6 @@ extern bool libceph_compatible(void *data);
extern const char *ceph_msg_type_name(int type);
extern int ceph_check_fsid(struct ceph_client *client, struct ceph_fsid *fsid);
extern void *ceph_kvmalloc(size_t size, gfp_t flags);
-extern void ceph_kvfree(const void *ptr);
extern struct ceph_options *ceph_parse_options(char *options,
const char *dev_name, const char *dev_name_end,
diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
index 40ae58e..d9d396c 100644
--- a/include/linux/ceph/messenger.h
+++ b/include/linux/ceph/messenger.h
@@ -42,6 +42,10 @@ struct ceph_connection_operations {
struct ceph_msg * (*alloc_msg) (struct ceph_connection *con,
struct ceph_msg_header *hdr,
int *skip);
+ int (*sign_message) (struct ceph_connection *con, struct ceph_msg *msg);
+
+ int (*check_message_signature) (struct ceph_connection *con,
+ struct ceph_msg *msg);
};
/* use format string %s%d */
@@ -142,7 +146,10 @@ struct ceph_msg_data_cursor {
*/
struct ceph_msg {
struct ceph_msg_header hdr; /* header */
- struct ceph_msg_footer footer; /* footer */
+ union {
+ struct ceph_msg_footer footer; /* footer */
+ struct ceph_msg_footer_old old_footer; /* old format footer */
+ };
struct kvec front; /* unaligned blobs of message */
struct ceph_buffer *middle;
diff --git a/include/linux/ceph/msgr.h b/include/linux/ceph/msgr.h
index 3d94a73..1c18872 100644
--- a/include/linux/ceph/msgr.h
+++ b/include/linux/ceph/msgr.h
@@ -152,7 +152,8 @@ struct ceph_msg_header {
receiver: mask against ~PAGE_MASK */
struct ceph_entity_name src;
- __le32 reserved;
+ __le16 compat_version;
+ __le16 reserved;
__le32 crc; /* header crc32c */
} __attribute__ ((packed));
@@ -164,13 +165,21 @@ struct ceph_msg_header {
/*
* follows data payload
*/
+struct ceph_msg_footer_old {
+ __le32 front_crc, middle_crc, data_crc;
+ __u8 flags;
+} __attribute__ ((packed));
+
struct ceph_msg_footer {
__le32 front_crc, middle_crc, data_crc;
+ // sig holds the 64 bits of the digital signature for the message PLR
+ __le64 sig;
__u8 flags;
} __attribute__ ((packed));
#define CEPH_MSG_FOOTER_COMPLETE (1<<0) /* msg wasn't aborted */
#define CEPH_MSG_FOOTER_NOCRC (1<<1) /* no data crc */
+#define CEPH_MSG_FOOTER_SIGNED (1<<2) /* msg was signed */
#endif
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index 03aeb27..5d86416 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -87,6 +87,13 @@ struct ceph_osd_req_op {
struct ceph_osd_data osd_data;
} extent;
struct {
+ __le32 name_len;
+ __le32 value_len;
+ __u8 cmp_op; /* CEPH_OSD_CMPXATTR_OP_* */
+ __u8 cmp_mode; /* CEPH_OSD_CMPXATTR_MODE_* */
+ struct ceph_osd_data osd_data;
+ } xattr;
+ struct {
const char *class_name;
const char *method_name;
struct ceph_osd_data request_info;
@@ -295,6 +302,9 @@ extern void osd_req_op_cls_response_data_pages(struct ceph_osd_request *,
extern void osd_req_op_cls_init(struct ceph_osd_request *osd_req,
unsigned int which, u16 opcode,
const char *class, const char *method);
+extern int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which,
+ u16 opcode, const char *name, const void *value,
+ size_t size, u8 cmp_op, u8 cmp_mode);
extern void osd_req_op_watch_init(struct ceph_osd_request *osd_req,
unsigned int which, u16 opcode,
u64 cookie, u64 version, int flag);
@@ -318,7 +328,8 @@ extern struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *,
struct ceph_file_layout *layout,
struct ceph_vino vino,
u64 offset, u64 *len,
- int num_ops, int opcode, int flags,
+ unsigned int which, int num_ops,
+ int opcode, int flags,
struct ceph_snap_context *snapc,
u32 truncate_seq, u64 truncate_size,
bool use_mempool);
diff --git a/include/linux/ceph/pagelist.h b/include/linux/ceph/pagelist.h
index 5f871d8..13d71fe 100644
--- a/include/linux/ceph/pagelist.h
+++ b/include/linux/ceph/pagelist.h
@@ -1,8 +1,10 @@
#ifndef __FS_CEPH_PAGELIST_H
#define __FS_CEPH_PAGELIST_H
-#include <linux/list.h>
+#include <asm/byteorder.h>
#include <linux/atomic.h>
+#include <linux/list.h>
+#include <linux/types.h>
struct ceph_pagelist {
struct list_head head;
diff --git a/net/ceph/auth_x.c b/net/ceph/auth_x.c
index 7e38b72..1584581 100644
--- a/net/ceph/auth_x.c
+++ b/net/ceph/auth_x.c
@@ -8,6 +8,7 @@
#include <linux/ceph/decode.h>
#include <linux/ceph/auth.h>
+#include <linux/ceph/messenger.h>
#include "crypto.h"
#include "auth_x.h"
@@ -293,6 +294,11 @@ static int ceph_x_build_authorizer(struct ceph_auth_client *ac,
dout("build_authorizer for %s %p\n",
ceph_entity_type_name(th->service), au);
+ ceph_crypto_key_destroy(&au->session_key);
+ ret = ceph_crypto_key_clone(&au->session_key, &th->session_key);
+ if (ret)
+ return ret;
+
maxlen = sizeof(*msg_a) + sizeof(msg_b) +
ceph_x_encrypt_buflen(ticket_blob_len);
dout(" need len %d\n", maxlen);
@@ -302,8 +308,10 @@ static int ceph_x_build_authorizer(struct ceph_auth_client *ac,
}
if (!au->buf) {
au->buf = ceph_buffer_new(maxlen, GFP_NOFS);
- if (!au->buf)
+ if (!au->buf) {
+ ceph_crypto_key_destroy(&au->session_key);
return -ENOMEM;
+ }
}
au->service = th->service;
au->secret_id = th->secret_id;
@@ -329,7 +337,7 @@ static int ceph_x_build_authorizer(struct ceph_auth_client *ac,
get_random_bytes(&au->nonce, sizeof(au->nonce));
msg_b.struct_v = 1;
msg_b.nonce = cpu_to_le64(au->nonce);
- ret = ceph_x_encrypt(&th->session_key, &msg_b, sizeof(msg_b),
+ ret = ceph_x_encrypt(&au->session_key, &msg_b, sizeof(msg_b),
p, end - p);
if (ret < 0)
goto out_buf;
@@ -560,6 +568,8 @@ static int ceph_x_create_authorizer(
auth->authorizer_buf_len = au->buf->vec.iov_len;
auth->authorizer_reply_buf = au->reply_buf;
auth->authorizer_reply_buf_len = sizeof (au->reply_buf);
+ auth->sign_message = ac->ops->sign_message;
+ auth->check_message_signature = ac->ops->check_message_signature;
return 0;
}
@@ -588,17 +598,13 @@ static int ceph_x_verify_authorizer_reply(struct ceph_auth_client *ac,
struct ceph_authorizer *a, size_t len)
{
struct ceph_x_authorizer *au = (void *)a;
- struct ceph_x_ticket_handler *th;
int ret = 0;
struct ceph_x_authorize_reply reply;
void *preply = &reply;
void *p = au->reply_buf;
void *end = p + sizeof(au->reply_buf);
- th = get_ticket_handler(ac, au->service);
- if (IS_ERR(th))
- return PTR_ERR(th);
- ret = ceph_x_decrypt(&th->session_key, &p, end, &preply, sizeof(reply));
+ ret = ceph_x_decrypt(&au->session_key, &p, end, &preply, sizeof(reply));
if (ret < 0)
return ret;
if (ret != sizeof(reply))
@@ -618,6 +624,7 @@ static void ceph_x_destroy_authorizer(struct ceph_auth_client *ac,
{
struct ceph_x_authorizer *au = (void *)a;
+ ceph_crypto_key_destroy(&au->session_key);
ceph_buffer_put(au->buf);
kfree(au);
}
@@ -663,6 +670,59 @@ static void ceph_x_invalidate_authorizer(struct ceph_auth_client *ac,
memset(&th->validity, 0, sizeof(th->validity));
}
+static int calcu_signature(struct ceph_x_authorizer *au,
+ struct ceph_msg *msg, __le64 *sig)
+{
+ int ret;
+ char tmp_enc[40];
+ __le32 tmp[5] = {
+ 16u, msg->hdr.crc, msg->footer.front_crc,
+ msg->footer.middle_crc, msg->footer.data_crc,
+ };
+ ret = ceph_x_encrypt(&au->session_key, &tmp, sizeof(tmp),
+ tmp_enc, sizeof(tmp_enc));
+ if (ret < 0)
+ return ret;
+ *sig = *(__le64*)(tmp_enc + 4);
+ return 0;
+}
+
+static int ceph_x_sign_message(struct ceph_auth_handshake *auth,
+ struct ceph_msg *msg)
+{
+ int ret;
+ if (!auth->authorizer)
+ return 0;
+ ret = calcu_signature((struct ceph_x_authorizer *)auth->authorizer,
+ msg, &msg->footer.sig);
+ if (ret < 0)
+ return ret;
+ msg->footer.flags |= CEPH_MSG_FOOTER_SIGNED;
+ return 0;
+}
+
+static int ceph_x_check_message_signature(struct ceph_auth_handshake *auth,
+ struct ceph_msg *msg)
+{
+ __le64 sig_check;
+ int ret;
+
+ if (!auth->authorizer)
+ return 0;
+ ret = calcu_signature((struct ceph_x_authorizer *)auth->authorizer,
+ msg, &sig_check);
+ if (ret < 0)
+ return ret;
+ if (sig_check == msg->footer.sig)
+ return 0;
+ if (msg->footer.flags & CEPH_MSG_FOOTER_SIGNED)
+ dout("ceph_x_check_message_signature %p has signature %llx "
+ "expect %llx\n", msg, msg->footer.sig, sig_check);
+ else
+ dout("ceph_x_check_message_signature %p sender did not set "
+ "CEPH_MSG_FOOTER_SIGNED\n", msg);
+ return -EBADMSG;
+}
static const struct ceph_auth_client_ops ceph_x_ops = {
.name = "x",
@@ -677,6 +737,8 @@ static const struct ceph_auth_client_ops ceph_x_ops = {
.invalidate_authorizer = ceph_x_invalidate_authorizer,
.reset = ceph_x_reset,
.destroy = ceph_x_destroy,
+ .sign_message = ceph_x_sign_message,
+ .check_message_signature = ceph_x_check_message_signature,
};
diff --git a/net/ceph/auth_x.h b/net/ceph/auth_x.h
index 65ee720..e8b7c69 100644
--- a/net/ceph/auth_x.h
+++ b/net/ceph/auth_x.h
@@ -26,6 +26,7 @@ struct ceph_x_ticket_handler {
struct ceph_x_authorizer {
+ struct ceph_crypto_key session_key;
struct ceph_buffer *buf;
unsigned int service;
u64 nonce;
diff --git a/net/ceph/buffer.c b/net/ceph/buffer.c
index 621b5f6..add5f92 100644
--- a/net/ceph/buffer.c
+++ b/net/ceph/buffer.c
@@ -6,7 +6,7 @@
#include <linux/ceph/buffer.h>
#include <linux/ceph/decode.h>
-#include <linux/ceph/libceph.h> /* for ceph_kv{malloc,free} */
+#include <linux/ceph/libceph.h> /* for ceph_kvmalloc */
struct ceph_buffer *ceph_buffer_new(size_t len, gfp_t gfp)
{
@@ -35,7 +35,7 @@ void ceph_buffer_release(struct kref *kref)
struct ceph_buffer *b = container_of(kref, struct ceph_buffer, kref);
dout("buffer_release %p\n", b);
- ceph_kvfree(b->vec.iov_base);
+ kvfree(b->vec.iov_base);
kfree(b);
}
EXPORT_SYMBOL(ceph_buffer_release);
diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c
index 58fbfe1..5d5ab67 100644
--- a/net/ceph/ceph_common.c
+++ b/net/ceph/ceph_common.c
@@ -184,14 +184,6 @@ void *ceph_kvmalloc(size_t size, gfp_t flags)
return __vmalloc(size, flags | __GFP_HIGHMEM, PAGE_KERNEL);
}
-void ceph_kvfree(const void *ptr)
-{
- if (is_vmalloc_addr(ptr))
- vfree(ptr);
- else
- kfree(ptr);
-}
-
static int parse_fsid(const char *str, struct ceph_fsid *fsid)
{
@@ -245,6 +237,8 @@ enum {
Opt_noshare,
Opt_crc,
Opt_nocrc,
+ Opt_cephx_require_signatures,
+ Opt_nocephx_require_signatures,
};
static match_table_t opt_tokens = {
@@ -263,6 +257,8 @@ static match_table_t opt_tokens = {
{Opt_noshare, "noshare"},
{Opt_crc, "crc"},
{Opt_nocrc, "nocrc"},
+ {Opt_cephx_require_signatures, "cephx_require_signatures"},
+ {Opt_nocephx_require_signatures, "nocephx_require_signatures"},
{-1, NULL}
};
@@ -461,6 +457,12 @@ ceph_parse_options(char *options, const char *dev_name,
case Opt_nocrc:
opt->flags |= CEPH_OPT_NOCRC;
break;
+ case Opt_cephx_require_signatures:
+ opt->flags &= ~CEPH_OPT_NOMSGAUTH;
+ break;
+ case Opt_nocephx_require_signatures:
+ opt->flags |= CEPH_OPT_NOMSGAUTH;
+ break;
default:
BUG_ON(token);
@@ -504,6 +506,9 @@ struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private,
init_waitqueue_head(&client->auth_wq);
client->auth_err = 0;
+ if (!ceph_test_opt(client, NOMSGAUTH))
+ required_features |= CEPH_FEATURE_MSG_AUTH;
+
client->extra_mon_dispatch = NULL;
client->supported_features = CEPH_FEATURES_SUPPORTED_DEFAULT |
supported_features;
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 8d1653c..33a2f20 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -1196,8 +1196,18 @@ static void prepare_write_message_footer(struct ceph_connection *con)
dout("prepare_write_message_footer %p\n", con);
con->out_kvec_is_msg = true;
con->out_kvec[v].iov_base = &m->footer;
- con->out_kvec[v].iov_len = sizeof(m->footer);
- con->out_kvec_bytes += sizeof(m->footer);
+ if (con->peer_features & CEPH_FEATURE_MSG_AUTH) {
+ if (con->ops->sign_message)
+ con->ops->sign_message(con, m);
+ else
+ m->footer.sig = 0;
+ con->out_kvec[v].iov_len = sizeof(m->footer);
+ con->out_kvec_bytes += sizeof(m->footer);
+ } else {
+ m->old_footer.flags = m->footer.flags;
+ con->out_kvec[v].iov_len = sizeof(m->old_footer);
+ con->out_kvec_bytes += sizeof(m->old_footer);
+ }
con->out_kvec_left++;
con->out_more = m->more_to_follow;
con->out_msg_done = true;
@@ -2249,6 +2259,7 @@ static int read_partial_message(struct ceph_connection *con)
int ret;
unsigned int front_len, middle_len, data_len;
bool do_datacrc = !con->msgr->nocrc;
+ bool need_sign = (con->peer_features & CEPH_FEATURE_MSG_AUTH);
u64 seq;
u32 crc;
@@ -2361,12 +2372,21 @@ static int read_partial_message(struct ceph_connection *con)
}
/* footer */
- size = sizeof (m->footer);
+ if (need_sign)
+ size = sizeof(m->footer);
+ else
+ size = sizeof(m->old_footer);
+
end += size;
ret = read_partial(con, end, size, &m->footer);
if (ret <= 0)
return ret;
+ if (!need_sign) {
+ m->footer.flags = m->old_footer.flags;
+ m->footer.sig = 0;
+ }
+
dout("read_partial_message got msg %p %d (%u) + %d (%u) + %d (%u)\n",
m, front_len, m->footer.front_crc, middle_len,
m->footer.middle_crc, data_len, m->footer.data_crc);
@@ -2390,6 +2410,12 @@ static int read_partial_message(struct ceph_connection *con)
return -EBADMSG;
}
+ if (need_sign && con->ops->check_message_signature &&
+ con->ops->check_message_signature(con, m)) {
+ pr_err("read_partial_message %p signature check failed\n", m);
+ return -EBADMSG;
+ }
+
return 1; /* done! */
}
@@ -3288,7 +3314,7 @@ static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip)
static void ceph_msg_free(struct ceph_msg *m)
{
dout("%s %p\n", __func__, m);
- ceph_kvfree(m->front.iov_base);
+ kvfree(m->front.iov_base);
kmem_cache_free(ceph_msg_cache, m);
}
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 6f16428..53299c7 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -292,6 +292,10 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
ceph_osd_data_release(&op->cls.request_data);
ceph_osd_data_release(&op->cls.response_data);
break;
+ case CEPH_OSD_OP_SETXATTR:
+ case CEPH_OSD_OP_CMPXATTR:
+ ceph_osd_data_release(&op->xattr.osd_data);
+ break;
default:
break;
}
@@ -476,8 +480,7 @@ void osd_req_op_extent_init(struct ceph_osd_request *osd_req,
size_t payload_len = 0;
BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
- opcode != CEPH_OSD_OP_DELETE && opcode != CEPH_OSD_OP_ZERO &&
- opcode != CEPH_OSD_OP_TRUNCATE);
+ opcode != CEPH_OSD_OP_ZERO && opcode != CEPH_OSD_OP_TRUNCATE);
op->extent.offset = offset;
op->extent.length = length;
@@ -545,6 +548,39 @@ void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
}
EXPORT_SYMBOL(osd_req_op_cls_init);
+int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which,
+ u16 opcode, const char *name, const void *value,
+ size_t size, u8 cmp_op, u8 cmp_mode)
+{
+ struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode);
+ struct ceph_pagelist *pagelist;
+ size_t payload_len;
+
+ BUG_ON(opcode != CEPH_OSD_OP_SETXATTR && opcode != CEPH_OSD_OP_CMPXATTR);
+
+ pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS);
+ if (!pagelist)
+ return -ENOMEM;
+
+ ceph_pagelist_init(pagelist);
+
+ payload_len = strlen(name);
+ op->xattr.name_len = payload_len;
+ ceph_pagelist_append(pagelist, name, payload_len);
+
+ op->xattr.value_len = size;
+ ceph_pagelist_append(pagelist, value, size);
+ payload_len += size;
+
+ op->xattr.cmp_op = cmp_op;
+ op->xattr.cmp_mode = cmp_mode;
+
+ ceph_osd_data_pagelist_init(&op->xattr.osd_data, pagelist);
+ op->payload_len = payload_len;
+ return 0;
+}
+EXPORT_SYMBOL(osd_req_op_xattr_init);
+
void osd_req_op_watch_init(struct ceph_osd_request *osd_req,
unsigned int which, u16 opcode,
u64 cookie, u64 version, int flag)
@@ -626,7 +662,6 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req,
case CEPH_OSD_OP_READ:
case CEPH_OSD_OP_WRITE:
case CEPH_OSD_OP_ZERO:
- case CEPH_OSD_OP_DELETE:
case CEPH_OSD_OP_TRUNCATE:
if (src->op == CEPH_OSD_OP_WRITE)
request_data_len = src->extent.length;
@@ -676,6 +711,19 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req,
dst->alloc_hint.expected_write_size =
cpu_to_le64(src->alloc_hint.expected_write_size);
break;
+ case CEPH_OSD_OP_SETXATTR:
+ case CEPH_OSD_OP_CMPXATTR:
+ dst->xattr.name_len = cpu_to_le32(src->xattr.name_len);
+ dst->xattr.value_len = cpu_to_le32(src->xattr.value_len);
+ dst->xattr.cmp_op = src->xattr.cmp_op;
+ dst->xattr.cmp_mode = src->xattr.cmp_mode;
+ osd_data = &src->xattr.osd_data;
+ ceph_osdc_msg_data_add(req->r_request, osd_data);
+ request_data_len = osd_data->pagelist->length;
+ break;
+ case CEPH_OSD_OP_CREATE:
+ case CEPH_OSD_OP_DELETE:
+ break;
default:
pr_err("unsupported osd opcode %s\n",
ceph_osd_op_name(src->op));
@@ -705,7 +753,8 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req,
struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
struct ceph_file_layout *layout,
struct ceph_vino vino,
- u64 off, u64 *plen, int num_ops,
+ u64 off, u64 *plen,
+ unsigned int which, int num_ops,
int opcode, int flags,
struct ceph_snap_context *snapc,
u32 truncate_seq,
@@ -716,13 +765,11 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
u64 objnum = 0;
u64 objoff = 0;
u64 objlen = 0;
- u32 object_size;
- u64 object_base;
int r;
BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
- opcode != CEPH_OSD_OP_DELETE && opcode != CEPH_OSD_OP_ZERO &&
- opcode != CEPH_OSD_OP_TRUNCATE);
+ opcode != CEPH_OSD_OP_ZERO && opcode != CEPH_OSD_OP_TRUNCATE &&
+ opcode != CEPH_OSD_OP_CREATE && opcode != CEPH_OSD_OP_DELETE);
req = ceph_osdc_alloc_request(osdc, snapc, num_ops, use_mempool,
GFP_NOFS);
@@ -738,29 +785,24 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
return ERR_PTR(r);
}
- object_size = le32_to_cpu(layout->fl_object_size);
- object_base = off - objoff;
- if (!(truncate_seq == 1 && truncate_size == -1ULL)) {
- if (truncate_size <= object_base) {
- truncate_size = 0;
- } else {
- truncate_size -= object_base;
- if (truncate_size > object_size)
- truncate_size = object_size;
+ if (opcode == CEPH_OSD_OP_CREATE || opcode == CEPH_OSD_OP_DELETE) {
+ osd_req_op_init(req, which, opcode);
+ } else {
+ u32 object_size = le32_to_cpu(layout->fl_object_size);
+ u32 object_base = off - objoff;
+ if (!(truncate_seq == 1 && truncate_size == -1ULL)) {
+ if (truncate_size <= object_base) {
+ truncate_size = 0;
+ } else {
+ truncate_size -= object_base;
+ if (truncate_size > object_size)
+ truncate_size = object_size;
+ }
}
+ osd_req_op_extent_init(req, which, opcode, objoff, objlen,
+ truncate_size, truncate_seq);
}
- osd_req_op_extent_init(req, 0, opcode, objoff, objlen,
- truncate_size, truncate_seq);
-
- /*
- * A second op in the ops array means the caller wants to
- * also issue a include a 'startsync' command so that the
- * osd will flush data quickly.
- */
- if (num_ops > 1)
- osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC);
-
req->r_base_oloc.pool = ceph_file_layout_pg_pool(*layout);
snprintf(req->r_base_oid.name, sizeof(req->r_base_oid.name),
@@ -2626,7 +2668,7 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc,
dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino,
vino.snap, off, *plen);
- req = ceph_osdc_new_request(osdc, layout, vino, off, plen, 1,
+ req = ceph_osdc_new_request(osdc, layout, vino, off, plen, 0, 1,
CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
NULL, truncate_seq, truncate_size,
false);
@@ -2669,7 +2711,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
int page_align = off & ~PAGE_MASK;
BUG_ON(vino.snap != CEPH_NOSNAP); /* snapshots aren't writeable */
- req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 1,
+ req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 0, 1,
CEPH_OSD_OP_WRITE,
CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
snapc, truncate_seq, truncate_size,
@@ -2920,6 +2962,20 @@ static int invalidate_authorizer(struct ceph_connection *con)
return ceph_monc_validate_auth(&osdc->client->monc);
}
+static int sign_message(struct ceph_connection *con, struct ceph_msg *msg)
+{
+ struct ceph_osd *o = con->private;
+ struct ceph_auth_handshake *auth = &o->o_auth;
+ return ceph_auth_sign_message(auth, msg);
+}
+
+static int check_message_signature(struct ceph_connection *con, struct ceph_msg *msg)
+{
+ struct ceph_osd *o = con->private;
+ struct ceph_auth_handshake *auth = &o->o_auth;
+ return ceph_auth_check_message_signature(auth, msg);
+}
+
static const struct ceph_connection_operations osd_con_ops = {
.get = get_osd_con,
.put = put_osd_con,
@@ -2928,5 +2984,7 @@ static const struct ceph_connection_operations osd_con_ops = {
.verify_authorizer_reply = verify_authorizer_reply,
.invalidate_authorizer = invalidate_authorizer,
.alloc_msg = alloc_msg,
+ .sign_message = sign_message,
+ .check_message_signature = check_message_signature,
.fault = osd_reset,
};