summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Elder <elder@inktank.com>2013-03-06 23:39:39 -0600
committerSage Weil <sage@inktank.com>2013-05-01 21:16:59 -0700
commit6aaa4511deb4b0fd776d1153dc63a89cdc024fb8 (patch)
tree455fb4334de3518e411daaa7e3ff1d9aef187cd8
parent7fe1e5e57b84eab98ff352519aa66e86dac5bf61 (diff)
libceph: implement bio message data item cursor
Implement and use cursor routines for bio message data items for outbound message data. (See the previous commit for reasoning in support of the changes in out_msg_pos_next().) Signed-off-by: Alex Elder <elder@inktank.com> Reviewed-by: Josh Durgin <josh.durgin@inktank.com>
-rw-r--r--include/linux/ceph/messenger.h7
-rw-r--r--net/ceph/messenger.c137
2 files changed, 123 insertions, 21 deletions
diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
index 716c3fdeb257..76b4645e2dff 100644
--- a/include/linux/ceph/messenger.h
+++ b/include/linux/ceph/messenger.h
@@ -98,6 +98,13 @@ static __inline__ bool ceph_msg_data_type_valid(enum ceph_msg_data_type type)
struct ceph_msg_data_cursor {
bool last_piece; /* now at last piece of data item */
union {
+#ifdef CONFIG_BLOCK
+ struct { /* bio */
+ struct bio *bio; /* bio from list */
+ unsigned int vector_index; /* vector from bio */
+ unsigned int vector_offset; /* bytes from vector */
+ };
+#endif /* CONFIG_BLOCK */
struct { /* pagelist */
struct page *page; /* page from list */
size_t offset; /* bytes from list */
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 30c8792be180..209990a853e5 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -739,6 +739,95 @@ static void iter_bio_next(struct bio **bio_iter, unsigned int *seg)
if (*seg == (*bio_iter)->bi_vcnt)
init_bio_iter((*bio_iter)->bi_next, bio_iter, seg);
}
+
+/*
+ * For a bio data item, a piece is whatever remains of the next
+ * entry in the current bio iovec, or the first entry in the next
+ * bio in the list.
+ */
+static void ceph_msg_data_bio_cursor_init(struct ceph_msg_data *data)
+{
+ struct ceph_msg_data_cursor *cursor = &data->cursor;
+ struct bio *bio;
+
+ BUG_ON(data->type != CEPH_MSG_DATA_BIO);
+
+ bio = data->bio;
+ BUG_ON(!bio);
+ BUG_ON(!bio->bi_vcnt);
+ /* resid = bio->bi_size */
+
+ cursor->bio = bio;
+ cursor->vector_index = 0;
+ cursor->vector_offset = 0;
+ cursor->last_piece = !bio->bi_next && bio->bi_vcnt == 1;
+}
+
+static struct page *ceph_msg_data_bio_next(struct ceph_msg_data *data,
+ size_t *page_offset,
+ size_t *length)
+{
+ struct ceph_msg_data_cursor *cursor = &data->cursor;
+ struct bio *bio;
+ struct bio_vec *bio_vec;
+ unsigned int index;
+
+ BUG_ON(data->type != CEPH_MSG_DATA_BIO);
+
+ bio = cursor->bio;
+ BUG_ON(!bio);
+
+ index = cursor->vector_index;
+ BUG_ON(index >= (unsigned int) bio->bi_vcnt);
+
+ bio_vec = &bio->bi_io_vec[index];
+ BUG_ON(cursor->vector_offset >= bio_vec->bv_len);
+ *page_offset = (size_t) (bio_vec->bv_offset + cursor->vector_offset);
+ BUG_ON(*page_offset >= PAGE_SIZE);
+ *length = (size_t) (bio_vec->bv_len - cursor->vector_offset);
+ BUG_ON(*length > PAGE_SIZE);
+
+ return bio_vec->bv_page;
+}
+
+static bool ceph_msg_data_bio_advance(struct ceph_msg_data *data, size_t bytes)
+{
+ struct ceph_msg_data_cursor *cursor = &data->cursor;
+ struct bio *bio;
+ struct bio_vec *bio_vec;
+ unsigned int index;
+
+ BUG_ON(data->type != CEPH_MSG_DATA_BIO);
+
+ bio = cursor->bio;
+ BUG_ON(!bio);
+
+ index = cursor->vector_index;
+ BUG_ON(index >= (unsigned int) bio->bi_vcnt);
+ bio_vec = &bio->bi_io_vec[index];
+ BUG_ON(cursor->vector_offset + bytes > bio_vec->bv_len);
+
+ /* Advance the cursor offset */
+
+ cursor->vector_offset += bytes;
+ if (cursor->vector_offset < bio_vec->bv_len)
+ return false; /* more bytes to process in this segment */
+
+ /* Move on to the next segment, and possibly the next bio */
+
+ if (++cursor->vector_index == (unsigned int) bio->bi_vcnt) {
+ bio = bio->bi_next;
+ cursor->bio = bio;
+ cursor->vector_index = 0;
+ }
+ cursor->vector_offset = 0;
+
+ if (!cursor->last_piece && bio && !bio->bi_next)
+ if (cursor->vector_index == (unsigned int) bio->bi_vcnt - 1)
+ cursor->last_piece = true;
+
+ return true;
+}
#endif
/*
@@ -843,11 +932,13 @@ static void ceph_msg_data_cursor_init(struct ceph_msg_data *data)
case CEPH_MSG_DATA_PAGELIST:
ceph_msg_data_pagelist_cursor_init(data);
break;
- case CEPH_MSG_DATA_NONE:
- case CEPH_MSG_DATA_PAGES:
#ifdef CONFIG_BLOCK
case CEPH_MSG_DATA_BIO:
+ ceph_msg_data_bio_cursor_init(data);
+ break;
#endif /* CONFIG_BLOCK */
+ case CEPH_MSG_DATA_NONE:
+ case CEPH_MSG_DATA_PAGES:
default:
/* BUG(); */
break;
@@ -870,11 +961,13 @@ static struct page *ceph_msg_data_next(struct ceph_msg_data *data,
case CEPH_MSG_DATA_PAGELIST:
page = ceph_msg_data_pagelist_next(data, page_offset, length);
break;
- case CEPH_MSG_DATA_NONE:
- case CEPH_MSG_DATA_PAGES:
#ifdef CONFIG_BLOCK
case CEPH_MSG_DATA_BIO:
+ page = ceph_msg_data_bio_next(data, page_offset, length);
+ break;
#endif /* CONFIG_BLOCK */
+ case CEPH_MSG_DATA_NONE:
+ case CEPH_MSG_DATA_PAGES:
default:
page = NULL;
break;
@@ -900,11 +993,13 @@ static bool ceph_msg_data_advance(struct ceph_msg_data *data, size_t bytes)
case CEPH_MSG_DATA_PAGELIST:
new_piece = ceph_msg_data_pagelist_advance(data, bytes);
break;
- case CEPH_MSG_DATA_NONE:
- case CEPH_MSG_DATA_PAGES:
#ifdef CONFIG_BLOCK
case CEPH_MSG_DATA_BIO:
+ new_piece = ceph_msg_data_bio_advance(data, bytes);
+ break;
#endif /* CONFIG_BLOCK */
+ case CEPH_MSG_DATA_NONE:
+ case CEPH_MSG_DATA_PAGES:
default:
BUG();
break;
@@ -933,6 +1028,10 @@ static void prepare_message_data(struct ceph_msg *msg,
/* Initialize data cursors */
+#ifdef CONFIG_BLOCK
+ if (ceph_msg_has_bio(msg))
+ ceph_msg_data_cursor_init(&msg->b);
+#endif /* CONFIG_BLOCK */
if (ceph_msg_has_pagelist(msg))
ceph_msg_data_cursor_init(&msg->l);
if (ceph_msg_has_trail(msg))
@@ -1233,6 +1332,10 @@ static void out_msg_pos_next(struct ceph_connection *con, struct page *page,
need_crc = ceph_msg_data_advance(&msg->t, sent);
else if (ceph_msg_has_pagelist(msg))
need_crc = ceph_msg_data_advance(&msg->l, sent);
+#ifdef CONFIG_BLOCK
+ else if (ceph_msg_has_bio(msg))
+ need_crc = ceph_msg_data_advance(&msg->b, sent);
+#endif /* CONFIG_BLOCK */
BUG_ON(need_crc && sent != len);
if (sent < len)
@@ -1242,10 +1345,6 @@ static void out_msg_pos_next(struct ceph_connection *con, struct page *page,
msg_pos->page_pos = 0;
msg_pos->page++;
msg_pos->did_page_crc = false;
-#ifdef CONFIG_BLOCK
- if (ceph_msg_has_bio(msg))
- iter_bio_next(&msg->b.bio_iter, &msg->b.bio_seg);
-#endif
}
static void in_msg_pos_next(struct ceph_connection *con, size_t len,
@@ -1323,8 +1422,6 @@ static int write_partial_message_data(struct ceph_connection *con)
struct page *page = NULL;
size_t page_offset;
size_t length;
- int max_write = PAGE_SIZE;
- int bio_offset = 0;
bool use_cursor = false;
bool last_piece = true; /* preserve existing behavior */
@@ -1345,21 +1442,19 @@ static int write_partial_message_data(struct ceph_connection *con)
&length, &last_piece);
#ifdef CONFIG_BLOCK
} else if (ceph_msg_has_bio(msg)) {
- struct bio_vec *bv;
-
- bv = bio_iovec_idx(msg->b.bio_iter, msg->b.bio_seg);
- page = bv->bv_page;
- bio_offset = bv->bv_offset;
- max_write = bv->bv_len;
+ use_cursor = true;
+ page = ceph_msg_data_next(&msg->b, &page_offset,
+ &length, &last_piece);
#endif
} else {
page = zero_page;
}
- if (!use_cursor)
- length = min_t(int, max_write - msg_pos->page_pos,
+ if (!use_cursor) {
+ length = min_t(int, PAGE_SIZE - msg_pos->page_pos,
total_max_write);
- page_offset = msg_pos->page_pos + bio_offset;
+ page_offset = msg_pos->page_pos;
+ }
if (do_datacrc && !msg_pos->did_page_crc) {
u32 crc = le32_to_cpu(msg->footer.data_crc);