Lines Matching refs:osd
41 * are described by the osd map.
49 static void link_request(struct ceph_osd *osd, struct ceph_osd_request *req);
50 static void unlink_request(struct ceph_osd *osd, struct ceph_osd_request *req);
51 static void link_linger(struct ceph_osd *osd,
53 static void unlink_linger(struct ceph_osd *osd,
55 static void clear_backoffs(struct ceph_osd *osd);
77 static inline void verify_osd_locked(struct ceph_osd *osd)
79 struct ceph_osd_client *osdc = osd->o_osdc;
81 WARN_ON(!(mutex_is_locked(&osd->lock) &&
92 static inline void verify_osd_locked(struct ceph_osd *osd) { }
101 * fill osd op in request message.
428 t->osd = CEPH_HOMELESS_OSD;
457 dest->osd = src->osd;
708 * This is an osd op init function for opcodes that have no data or
1021 pr_err("unsupported osd opcode %s\n",
1124 * We keep osd requests in an rbtree, sorted by ->r_tid.
1139 struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
1141 for (p = rb_first(&osd->o_requests); p; ) {
1161 static bool osd_homeless(struct ceph_osd *osd)
1163 return osd->o_osd == CEPH_HOMELESS_OSD;
1166 static bool osd_registered(struct ceph_osd *osd)
1168 verify_osdc_locked(osd->o_osdc);
1170 return !RB_EMPTY_NODE(&osd->o_node);
1174 * Assumes @osd is zero-initialized.
1176 static void osd_init(struct ceph_osd *osd)
1178 refcount_set(&osd->o_ref, 1);
1179 RB_CLEAR_NODE(&osd->o_node);
1180 osd->o_requests = RB_ROOT;
1181 osd->o_linger_requests = RB_ROOT;
1182 osd->o_backoff_mappings = RB_ROOT;
1183 osd->o_backoffs_by_id = RB_ROOT;
1184 INIT_LIST_HEAD(&osd->o_osd_lru);
1185 INIT_LIST_HEAD(&osd->o_keepalive_item);
1186 osd->o_incarnation = 1;
1187 mutex_init(&osd->lock);
1190 static void osd_cleanup(struct ceph_osd *osd)
1192 WARN_ON(!RB_EMPTY_NODE(&osd->o_node));
1193 WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests));
1194 WARN_ON(!RB_EMPTY_ROOT(&osd->o_linger_requests));
1195 WARN_ON(!RB_EMPTY_ROOT(&osd->o_backoff_mappings));
1196 WARN_ON(!RB_EMPTY_ROOT(&osd->o_backoffs_by_id));
1197 WARN_ON(!list_empty(&osd->o_osd_lru));
1198 WARN_ON(!list_empty(&osd->o_keepalive_item));
1200 if (osd->o_auth.authorizer) {
1201 WARN_ON(osd_homeless(osd));
1202 ceph_auth_destroy_authorizer(osd->o_auth.authorizer);
1211 struct ceph_osd *osd;
1215 osd = kzalloc(sizeof(*osd), GFP_NOIO | __GFP_NOFAIL);
1216 osd_init(osd);
1217 osd->o_osdc = osdc;
1218 osd->o_osd = onum;
1220 ceph_con_init(&osd->o_con, osd, &osd_con_ops, &osdc->client->msgr);
1222 return osd;
1225 static struct ceph_osd *get_osd(struct ceph_osd *osd)
1227 if (refcount_inc_not_zero(&osd->o_ref)) {
1228 dout("get_osd %p %d -> %d\n", osd, refcount_read(&osd->o_ref)-1,
1229 refcount_read(&osd->o_ref));
1230 return osd;
1232 dout("get_osd %p FAIL\n", osd);
1237 static void put_osd(struct ceph_osd *osd)
1239 dout("put_osd %p %d -> %d\n", osd, refcount_read(&osd->o_ref),
1240 refcount_read(&osd->o_ref) - 1);
1241 if (refcount_dec_and_test(&osd->o_ref)) {
1242 osd_cleanup(osd);
1243 kfree(osd);
1247 DEFINE_RB_FUNCS(osd, struct ceph_osd, o_osd, o_node)
1249 static void __move_osd_to_lru(struct ceph_osd *osd)
1251 struct ceph_osd_client *osdc = osd->o_osdc;
1253 dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
1254 BUG_ON(!list_empty(&osd->o_osd_lru));
1257 list_add_tail(&osd->o_osd_lru, &osdc->osd_lru);
1260 osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl;
1263 static void maybe_move_osd_to_lru(struct ceph_osd *osd)
1265 if (RB_EMPTY_ROOT(&osd->o_requests) &&
1266 RB_EMPTY_ROOT(&osd->o_linger_requests))
1267 __move_osd_to_lru(osd);
1270 static void __remove_osd_from_lru(struct ceph_osd *osd)
1272 struct ceph_osd_client *osdc = osd->o_osdc;
1274 dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
1277 if (!list_empty(&osd->o_osd_lru))
1278 list_del_init(&osd->o_osd_lru);
1286 static void close_osd(struct ceph_osd *osd)
1288 struct ceph_osd_client *osdc = osd->o_osdc;
1292 dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
1294 ceph_con_close(&osd->o_con);
1296 for (n = rb_first(&osd->o_requests); n; ) {
1303 unlink_request(osd, req);
1306 for (n = rb_first(&osd->o_linger_requests); n; ) {
1314 unlink_linger(osd, lreq);
1317 clear_backoffs(osd);
1319 __remove_osd_from_lru(osd);
1320 erase_osd(&osdc->osds, osd);
1321 put_osd(osd);
1325 * reset osd connect
1327 static int reopen_osd(struct ceph_osd *osd)
1331 dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
1333 if (RB_EMPTY_ROOT(&osd->o_requests) &&
1334 RB_EMPTY_ROOT(&osd->o_linger_requests)) {
1335 close_osd(osd);
1339 peer_addr = &osd->o_osdc->osdmap->osd_addr[osd->o_osd];
1340 if (!memcmp(peer_addr, &osd->o_con.peer_addr, sizeof (*peer_addr)) &&
1341 !ceph_con_opened(&osd->o_con)) {
1344 dout("osd addr hasn't changed and connection never opened, "
1347 for (n = rb_first(&osd->o_requests); n; n = rb_next(n)) {
1356 ceph_con_close(&osd->o_con);
1357 ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, peer_addr);
1358 osd->o_incarnation++;
1366 struct ceph_osd *osd;
1374 osd = lookup_osd(&osdc->osds, o);
1376 osd = &osdc->homeless_osd;
1377 if (!osd) {
1381 osd = create_osd(osdc, o);
1382 insert_osd(&osdc->osds, osd);
1383 ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd,
1384 &osdc->osdmap->osd_addr[osd->o_osd]);
1387 dout("%s osdc %p osd%d -> osd %p\n", __func__, osdc, o, osd);
1388 return osd;
1394 * @req has to be assigned a tid, @osd may be homeless.
1396 static void link_request(struct ceph_osd *osd, struct ceph_osd_request *req)
1398 verify_osd_locked(osd);
1400 dout("%s osd %p osd%d req %p tid %llu\n", __func__, osd, osd->o_osd,
1403 if (!osd_homeless(osd))
1404 __remove_osd_from_lru(osd);
1406 atomic_inc(&osd->o_osdc->num_homeless);
1408 get_osd(osd);
1409 insert_request(&osd->o_requests, req);
1410 req->r_osd = osd;
1413 static void unlink_request(struct ceph_osd *osd, struct ceph_osd_request *req)
1415 verify_osd_locked(osd);
1416 WARN_ON(req->r_osd != osd);
1417 dout("%s osd %p osd%d req %p tid %llu\n", __func__, osd, osd->o_osd,
1421 erase_request(&osd->o_requests, req);
1422 put_osd(osd);
1424 if (!osd_homeless(osd))
1425 maybe_move_osd_to_lru(osd);
1427 atomic_dec(&osd->o_osdc->num_homeless);
1484 dout("%s picked osd%d, primary osd%d\n", __func__,
1514 dout("%s picked osd%d with locality %d, primary osd%d\n", __func__,
1546 t->osd = CEPH_HOMELESS_OSD;
1571 t->osd = CEPH_HOMELESS_OSD;
1634 t->osd = acting.osds[pos];
1637 t->osd = acting.primary;
1648 dout("%s t %p -> %d%d%d%d ct_res %d osd%d\n", __func__, t, unpaused,
1649 legacy_change, force_resend, split, ct_res, t->osd);
1908 static void clear_backoffs(struct ceph_osd *osd)
1910 while (!RB_EMPTY_ROOT(&osd->o_backoff_mappings)) {
1912 rb_entry(rb_first(&osd->o_backoff_mappings),
1921 erase_backoff_by_id(&osd->o_backoffs_by_id, backoff);
1924 erase_spg_mapping(&osd->o_backoff_mappings, spg);
1955 struct ceph_osd *osd = req->r_osd;
1960 spg = lookup_spg_mapping(&osd->o_backoff_mappings, &req->r_t.spgid);
1969 dout("%s req %p tid %llu backoff osd%d spgid %llu.%xs%d id %llu\n",
1970 __func__, req, req->r_tid, osd->o_osd, backoff->spgid.pgid.pool,
2254 struct ceph_osd *osd = req->r_osd;
2256 verify_osd_locked(osd);
2257 WARN_ON(osd->o_osd != req->r_t.osd);
2278 dout("%s req %p tid %llu to pgid %llu.%x spgid %llu.%xs%d osd%d e%u flags 0x%x attempt %d\n",
2281 req->r_t.spgid.shard, osd->o_osd, req->r_t.epoch, req->r_flags,
2288 req->r_sent = osd->o_incarnation;
2290 ceph_con_send(&osd->o_con, ceph_msg_get(req->r_request));
2320 struct ceph_osd *osd;
2334 osd = lookup_create_osd(osdc, req->r_t.osd, wrlocked);
2335 if (IS_ERR(osd)) {
2336 WARN_ON(PTR_ERR(osd) != -EAGAIN || wrlocked);
2371 } else if (!osd_homeless(osd)) {
2377 mutex_lock(&osd->lock);
2384 link_request(osd, req);
2389 mutex_unlock(&osd->lock);
2707 WARN_ON(lreq->osd);
2766 * @lreq has to be registered, @osd may be homeless.
2768 static void link_linger(struct ceph_osd *osd,
2771 verify_osd_locked(osd);
2772 WARN_ON(!lreq->linger_id || lreq->osd);
2773 dout("%s osd %p osd%d lreq %p linger_id %llu\n", __func__, osd,
2774 osd->o_osd, lreq, lreq->linger_id);
2776 if (!osd_homeless(osd))
2777 __remove_osd_from_lru(osd);
2779 atomic_inc(&osd->o_osdc->num_homeless);
2781 get_osd(osd);
2782 insert_linger(&osd->o_linger_requests, lreq);
2783 lreq->osd = osd;
2786 static void unlink_linger(struct ceph_osd *osd,
2789 verify_osd_locked(osd);
2790 WARN_ON(lreq->osd != osd);
2791 dout("%s osd %p osd%d lreq %p linger_id %llu\n", __func__, osd,
2792 osd->o_osd, lreq, lreq->linger_id);
2794 lreq->osd = NULL;
2795 erase_linger(&osd->o_linger_requests, lreq);
2796 put_osd(osd);
2798 if (!osd_homeless(osd))
2799 maybe_move_osd_to_lru(osd);
2801 atomic_dec(&osd->o_osdc->num_homeless);
3184 link_request(lreq->osd, req);
3191 struct ceph_osd *osd;
3197 osd = lookup_create_osd(osdc, lreq->t.osd, true);
3198 link_linger(osd, lreq);
3231 unlink_linger(lreq->osd, lreq);
3375 * a connection with that osd (from the fault callback).
3378 struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
3381 for (p = rb_first(&osd->o_requests); p; ) {
3388 dout(" req %p tid %llu on osd%d is laggy\n",
3389 req, req->r_tid, osd->o_osd);
3394 pr_err_ratelimited("tid %llu on osd%d timeout\n",
3395 req->r_tid, osd->o_osd);
3399 for (p = rb_first(&osd->o_linger_requests); p; p = rb_next(p)) {
3403 dout(" lreq %p linger_id %llu is served by osd%d\n",
3404 lreq, lreq->linger_id, osd->o_osd);
3414 list_move_tail(&osd->o_keepalive_item, &slow_osds);
3425 pr_err_ratelimited("tid %llu on osd%d timeout\n",
3436 struct ceph_osd *osd = list_first_entry(&slow_osds,
3439 list_del_init(&osd->o_keepalive_item);
3440 ceph_con_keepalive(&osd->o_con);
3454 struct ceph_osd *osd, *nosd;
3458 list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) {
3459 if (time_before(jiffies, osd->lru_ttl))
3462 WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests));
3463 WARN_ON(!RB_EMPTY_ROOT(&osd->o_linger_requests));
3464 close_osd(osd);
3679 static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg)
3681 struct ceph_osd_client *osdc = osd->o_osdc;
3692 if (!osd_registered(osd)) {
3693 dout("%s osd%d unknown\n", __func__, osd->o_osd);
3696 WARN_ON(osd->o_osd != le64_to_cpu(msg->hdr.src.num));
3698 mutex_lock(&osd->lock);
3699 req = lookup_request(&osd->o_requests, tid);
3701 dout("%s osd%d tid %llu unknown\n", __func__, osd->o_osd, tid);
3733 unlink_request(osd, req);
3734 mutex_unlock(&osd->lock);
3751 unlink_request(osd, req);
3752 mutex_unlock(&osd->lock);
3796 mutex_unlock(&osd->lock);
3805 mutex_unlock(&osd->lock);
3841 struct ceph_osd *osd;
3843 osd = lookup_create_osd(osdc, lreq->t.osd, true);
3844 if (osd != lreq->osd) {
3845 unlink_linger(lreq->osd, lreq);
3846 link_linger(osd, lreq);
3856 static void scan_requests(struct ceph_osd *osd,
3863 struct ceph_osd_client *osdc = osd->o_osdc;
3867 for (n = rb_first(&osd->o_linger_requests); n; ) {
3903 for (n = rb_first(&osd->o_requests); n; ) {
3925 unlink_request(osd, req);
3988 struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
3992 scan_requests(osd, skipped_map, was_full, true, need_resend,
3994 if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
3995 memcmp(&osd->o_con.peer_addr,
3996 ceph_osd_addr(osdc->osdmap, osd->o_osd),
3998 close_osd(osd);
4031 struct ceph_osd *osd;
4036 osd = lookup_create_osd(osdc, req->r_t.osd, true);
4037 link_request(osd, req);
4039 if (!osd_homeless(osd) && !req->r_t.paused)
4047 if (!osd_homeless(lreq->osd))
4055 * Process updated osd map.
4172 * Resubmit requests pending on the given osd.
4174 static void kick_osd_requests(struct ceph_osd *osd)
4178 clear_backoffs(osd);
4180 for (n = rb_first(&osd->o_requests); n; ) {
4193 for (n = rb_first(&osd->o_linger_requests); n; n = rb_next(n)) {
4202 * If the osd connection drops, we need to resubmit all requests.
4206 struct ceph_osd *osd = con->private;
4207 struct ceph_osd_client *osdc = osd->o_osdc;
4209 dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
4212 if (!osd_registered(osd)) {
4213 dout("%s osd%d unknown\n", __func__, osd->o_osd);
4217 if (!reopen_osd(osd))
4218 kick_osd_requests(osd);
4322 static void handle_backoff_block(struct ceph_osd *osd, struct MOSDBackoff *m)
4328 dout("%s osd%d spgid %llu.%xs%d id %llu\n", __func__, osd->o_osd,
4331 spg = lookup_spg_mapping(&osd->o_backoff_mappings, &m->spgid);
4339 insert_spg_mapping(&osd->o_backoff_mappings, spg);
4355 insert_backoff_by_id(&osd->o_backoffs_by_id, backoff);
4366 ceph_con_send(&osd->o_con, msg);
4381 static void handle_backoff_unblock(struct ceph_osd *osd,
4388 dout("%s osd%d spgid %llu.%xs%d id %llu\n", __func__, osd->o_osd,
4391 backoff = lookup_backoff_by_id(&osd->o_backoffs_by_id, m->id);
4393 pr_err("%s osd%d spgid %llu.%xs%d id %llu backoff dne\n",
4394 __func__, osd->o_osd, m->spgid.pgid.pool,
4401 pr_err("%s osd%d spgid %llu.%xs%d id %llu bad range?\n",
4402 __func__, osd->o_osd, m->spgid.pgid.pool,
4407 spg = lookup_spg_mapping(&osd->o_backoff_mappings, &backoff->spgid);
4411 erase_backoff_by_id(&osd->o_backoffs_by_id, backoff);
4415 erase_spg_mapping(&osd->o_backoff_mappings, spg);
4419 for (n = rb_first(&osd->o_requests); n; n = rb_next(n)) {
4439 static void handle_backoff(struct ceph_osd *osd, struct ceph_msg *msg)
4441 struct ceph_osd_client *osdc = osd->o_osdc;
4446 if (!osd_registered(osd)) {
4447 dout("%s osd%d unknown\n", __func__, osd->o_osd);
4451 WARN_ON(osd->o_osd != le64_to_cpu(msg->hdr.src.num));
4453 mutex_lock(&osd->lock);
4463 handle_backoff_block(osd, &m);
4466 handle_backoff_unblock(osd, &m);
4469 pr_err("%s osd%d unknown op %d\n", __func__, osd->o_osd, m.op);
4476 mutex_unlock(&osd->lock);
4481 * Process osd watch notifications
4651 struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
4653 mutex_lock(&osd->lock);
4654 for (p = rb_first(&osd->o_requests); p; p = rb_next(p)) {
4665 mutex_unlock(&osd->lock);
4674 mutex_unlock(&osd->lock);
5135 * reset all osd connections
5143 struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
5146 if (!reopen_osd(osd))
5147 kick_osd_requests(osd);
5234 struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds),
5236 close_osd(osd);
5359 struct ceph_osd *osd = con->private;
5360 struct ceph_osd_client *osdc = osd->o_osdc;
5368 handle_reply(osd, msg);
5371 handle_backoff(osd, msg);
5394 struct ceph_osd *osd = con->private;
5395 struct ceph_osd_client *osdc = osd->o_osdc;
5403 if (!osd_registered(osd)) {
5404 dout("%s osd%d unknown, skipping\n", __func__, osd->o_osd);
5408 WARN_ON(osd->o_osd != le64_to_cpu(hdr->src.num));
5410 mutex_lock(&osd->lock);
5411 req = lookup_request(&osd->o_requests, tid);
5413 dout("%s osd%d tid %llu unknown, skipping\n", __func__,
5414 osd->o_osd, tid);
5422 pr_warn("%s osd%d tid %llu front %d > preallocated %d\n",
5423 __func__, osd->o_osd, req->r_tid, front_len,
5434 pr_warn("%s osd%d tid %llu data %d > preallocated %zu, skipping\n",
5435 __func__, osd->o_osd, req->r_tid, data_len,
5446 mutex_unlock(&osd->lock);
5483 struct ceph_osd *osd = con->private;
5495 pr_warn("%s osd%d unknown msg type %d, skipping\n", __func__,
5496 osd->o_osd, type);
5507 struct ceph_osd *osd = con->private;
5508 if (get_osd(osd))
5515 struct ceph_osd *osd = con->private;
5516 put_osd(osd);