diff options
Diffstat (limited to 'net/ceph/osd_client.c')
-rw-r--r-- | net/ceph/osd_client.c | 169 |
1 files changed, 169 insertions, 0 deletions
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index a97e7b506612..d9bf7a1d0a58 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -338,6 +338,9 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req, ceph_osd_data_release(&op->notify.request_data); ceph_osd_data_release(&op->notify.response_data); break; + case CEPH_OSD_OP_LIST_WATCHERS: + ceph_osd_data_release(&op->list_watchers.response_data); + break; default: break; } @@ -863,6 +866,8 @@ static u32 osd_req_encode_op(struct ceph_osd_op *dst, case CEPH_OSD_OP_NOTIFY: dst->notify.cookie = cpu_to_le64(src->notify.cookie); break; + case CEPH_OSD_OP_LIST_WATCHERS: + break; case CEPH_OSD_OP_SETALLOCHINT: dst->alloc_hint.expected_object_size = cpu_to_le64(src->alloc_hint.expected_object_size); @@ -1445,6 +1450,10 @@ static void setup_request_data(struct ceph_osd_request *req, ceph_osdc_msg_data_add(req->r_reply, &op->extent.osd_data); break; + case CEPH_OSD_OP_LIST_WATCHERS: + ceph_osdc_msg_data_add(req->r_reply, + &op->list_watchers.response_data); + break; /* both */ case CEPH_OSD_OP_CALL: @@ -3891,12 +3900,121 @@ int ceph_osdc_watch_check(struct ceph_osd_client *osdc, return ret; } +static int decode_watcher(void **p, void *end, struct ceph_watch_item *item) +{ + u8 struct_v; + u32 struct_len; + int ret; + + ret = ceph_start_decoding(p, end, 2, "watch_item_t", + &struct_v, &struct_len); + if (ret) + return ret; + + ceph_decode_copy(p, &item->name, sizeof(item->name)); + item->cookie = ceph_decode_64(p); + *p += 4; /* skip timeout_seconds */ + if (struct_v >= 2) { + ceph_decode_copy(p, &item->addr, sizeof(item->addr)); + ceph_decode_addr(&item->addr); + } + + dout("%s %s%llu cookie %llu addr %s\n", __func__, + ENTITY_NAME(item->name), item->cookie, + ceph_pr_addr(&item->addr.in_addr)); + return 0; +} + +static int decode_watchers(void **p, void *end, + struct ceph_watch_item **watchers, + u32 *num_watchers) +{ + u8 struct_v; + u32 struct_len; + int i; + int ret; + + ret = ceph_start_decoding(p, end, 1, "obj_list_watch_response_t", + &struct_v, &struct_len); + if (ret) + return ret; + + *num_watchers = ceph_decode_32(p); + *watchers = kcalloc(*num_watchers, sizeof(**watchers), GFP_NOIO); + if (!*watchers) + return -ENOMEM; + + for (i = 0; i < *num_watchers; i++) { + ret = decode_watcher(p, end, *watchers + i); + if (ret) { + kfree(*watchers); + return ret; + } + } + + return 0; +} + +/* + * On success, the caller is responsible for: + * + * kfree(watchers); + */ +int ceph_osdc_list_watchers(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + struct ceph_watch_item **watchers, + u32 *num_watchers) +{ + struct ceph_osd_request *req; + struct page **pages; + int ret; + + req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO); + if (!req) + return -ENOMEM; + + ceph_oid_copy(&req->r_base_oid, oid); + ceph_oloc_copy(&req->r_base_oloc, oloc); + req->r_flags = CEPH_OSD_FLAG_READ; + + ret = ceph_osdc_alloc_messages(req, GFP_NOIO); + if (ret) + goto out_put_req; + + pages = ceph_alloc_page_vector(1, GFP_NOIO); + if (IS_ERR(pages)) { + ret = PTR_ERR(pages); + goto out_put_req; + } + + osd_req_op_init(req, 0, CEPH_OSD_OP_LIST_WATCHERS, 0); + ceph_osd_data_pages_init(osd_req_op_data(req, 0, list_watchers, + response_data), + pages, PAGE_SIZE, 0, false, true); + + ceph_osdc_start_request(osdc, req, false); + ret = ceph_osdc_wait_request(osdc, req); + if (ret >= 0) { + void *p = page_address(pages[0]); + void *const end = p + req->r_ops[0].outdata_len; + + ret = decode_watchers(&p, end, watchers, num_watchers); + } + +out_put_req: + ceph_osdc_put_request(req); + return ret; +} +EXPORT_SYMBOL(ceph_osdc_list_watchers); + /* * Call all pending notify callbacks - for use after a watch is * unregistered, to make sure no more callbacks for it will be invoked */ void ceph_osdc_flush_notifies(struct ceph_osd_client *osdc) { + dout("%s osdc %p\n", __func__, osdc); flush_workqueue(osdc->notify_wq); } EXPORT_SYMBOL(ceph_osdc_flush_notifies); @@ -3910,6 +4028,57 @@ void ceph_osdc_maybe_request_map(struct ceph_osd_client *osdc) EXPORT_SYMBOL(ceph_osdc_maybe_request_map); /* + * Execute an OSD class method on an object. + * + * @flags: CEPH_OSD_FLAG_* + * @resp_len: out param for reply length + */ +int ceph_osdc_call(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + const char *class, const char *method, + unsigned int flags, + struct page *req_page, size_t req_len, + struct page *resp_page, size_t *resp_len) +{ + struct ceph_osd_request *req; + int ret; + + req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO); + if (!req) + return -ENOMEM; + + ceph_oid_copy(&req->r_base_oid, oid); + ceph_oloc_copy(&req->r_base_oloc, oloc); + req->r_flags = flags; + + ret = ceph_osdc_alloc_messages(req, GFP_NOIO); + if (ret) + goto out_put_req; + + osd_req_op_cls_init(req, 0, CEPH_OSD_OP_CALL, class, method); + if (req_page) + osd_req_op_cls_request_data_pages(req, 0, &req_page, req_len, + 0, false, false); + if (resp_page) + osd_req_op_cls_response_data_pages(req, 0, &resp_page, + PAGE_SIZE, 0, false, false); + + ceph_osdc_start_request(osdc, req, false); + ret = ceph_osdc_wait_request(osdc, req); + if (ret >= 0) { + ret = req->r_ops[0].rval; + if (resp_page) + *resp_len = req->r_ops[0].outdata_len; + } + +out_put_req: + ceph_osdc_put_request(req); + return ret; +} +EXPORT_SYMBOL(ceph_osdc_call); + +/* * init, shutdown */ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) |