handle_cap_grant

handle_cap_grant 是 Ceph 分布式一致性机制的核心实现,负责:

  • 处理 MDS 发送的能力授权/撤销消息
  • 维护客户端缓存一致性
  • 管理文件元数据同步
  • 协调分布式锁机制

完整的流程图

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
MDS 撤销 CEPH_CAP_FILE_BUFFER 权限
handle_cap_grant() 检测到权限撤销
设置 writeback = true
ceph_queue_writeback(inode)
ceph_queue_inode_work(inode, CEPH_I_WORK_WRITEBACK)
工作队列调度 ceph_inode_work()
filemap_fdatawrite(&inode->i_data)  ←── 这里连接内核 writeback
内核 writeback 子系统处理
do_writepages() → ceph_writepages()
页面写入到 OSD
写入完成回调
ceph_put_wrbuffer_cap_refs()  ←── 减少引用计数
当 i_wrbuffer_ref == 0 时
ceph_check_caps(ci, 0)  ←── 通知 MDS 权限已释放
发送 CEPH_CAP_OP_UPDATE 消息给 MDS

函数详细介绍

为函数添加详细的架构分析注释:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
/*
 * Handle a cap GRANT message from the MDS.  (Note that a GRANT may
 * actually be a revocation if it specifies a smaller cap set.)
 *
 * 主要架构组件:
 * 1. 能力管理系统 (Capability Management System)
 *    - 处理MDS发送的能力授权/撤销消息
 *    - 维护客户端的文件访问权限状态
 * 
 * 2. 锁机制 (Lock Management)
 *    - i_ceph_lock: 保护inode的ceph特定字段
 *    - snap_rwsem: 快照读写信号量
 *    - s_mutex: 会话互斥锁
 *
 * 3. 缓存一致性 (Cache Coherency)
 *    - 处理缓存失效和回写操作
 *    - 管理文件数据的一致性状态
 *
 * 4. 文件属性管理 (File Attribute Management)
 *    - 更新文件大小、权限、时间戳等元数据
 *    - 处理加密文件的特殊逻辑
 *
 * caller holds s_mutex and i_ceph_lock, we drop both.
 */
static void handle_cap_grant(struct inode *inode,
			     struct ceph_mds_session *session,
			     struct ceph_cap *cap,
			     struct ceph_mds_caps *grant,
			     struct ceph_buffer *xattr_buf,
			     struct cap_extra_info *extra_info)
	__releases(ci->i_ceph_lock)
	__releases(session->s_mdsc->snap_rwsem)
{
	struct ceph_client *cl = ceph_inode_to_client(inode);
	struct ceph_inode_info *ci = ceph_inode(inode);
	
	/* 
	 * 工作原理概述:
	 * 1. 解析MDS发送的能力授权消息
	 * 2. 处理能力状态变化(授权/撤销/无变化)
	 * 3. 更新文件元数据和属性
	 * 4. 处理缓存一致性操作
	 * 5. 触发相应的后续操作(回写、失效等)
	 */
	int seq = le32_to_cpu(grant->seq);
	int newcaps = le32_to_cpu(grant->caps);
	int used, wanted, dirty;
	u64 size = le64_to_cpu(grant->size);
	u64 max_size = le64_to_cpu(grant->max_size);
	
	/* 状态标志位 - 用于控制后续操作 */
	unsigned char check_caps = 0;
	bool was_stale = cap->cap_gen < atomic_read(&session->s_cap_gen);
	bool wake = false;           // 是否唤醒等待线程
	bool writeback = false;      // 是否需要回写数据
	bool queue_trunc = false;    // 是否需要截断文件
	bool queue_invalidate = false; // 是否需要失效缓存
	bool deleted_inode = false;  // inode是否被删除
	bool fill_inline = false;    // 是否填充内联数据
	bool revoke_wait = false;    // 是否等待撤销完成
	int flags = 0;

	/*
	 * 加密文件大小处理 - Ceph的加密文件系统支持
	 * 如果文件是加密的且有内容,使用fscrypt_file_size
	 */
	if (IS_ENCRYPTED(inode) && size)
		size = extra_info->fscrypt_file_size;

	/* 调试输出 - 记录能力变化 */
	doutc(cl, "%p %llx.%llx cap %p mds%d seq %d %s\n", inode,
	      ceph_vinop(inode), cap, session->s_mds, seq,
	      ceph_cap_string(newcaps));
	doutc(cl, " size %llu max_size %llu, i_size %llu\n", size,
	      max_size, i_size_read(inode));

	/*
	 * 缓存失效处理 - 一致性管理的核心
	 * 当CACHE能力被撤销且没有脏缓冲区时,尝试失效缓存
	 */
	if (S_ISREG(inode->i_mode) && /* 仅对常规文件 */
	    ((cap->issued & ~newcaps) & CEPH_CAP_FILE_CACHE) &&
	    (newcaps & CEPH_CAP_FILE_LAZYIO) == 0 &&
	    !(ci->i_wrbuffer_ref || ci->i_wb_ref)) {
		if (try_nonblocking_invalidate(inode)) {
			/* 页面被锁定,稍后在单独线程中失效 */
			if (ci->i_rdcache_revoking != ci->i_rdcache_gen) {
				queue_invalidate = true;
				ci->i_rdcache_revoking = ci->i_rdcache_gen;
			}
		}
	}

	/* 处理过期能力 - 重置为基本PIN能力 */
	if (was_stale)
		cap->issued = cap->implemented = CEPH_CAP_PIN;

	/*
	 * 能力迁移处理 - 处理MDS之间的能力转移
	 * 确保消息顺序的正确性
	 */
	if (ceph_seq_cmp(seq, cap->seq) <= 0) {
		WARN_ON(cap != ci->i_auth_cap);
		WARN_ON(cap->cap_id != le64_to_cpu(grant->cap_id));
		seq = cap->seq;
		newcaps |= cap->issued;
	}

	/* 更新能力状态 */
	cap->cap_gen = atomic_read(&session->s_cap_gen);
	cap->seq = seq;

	/* 检查能力授权的副作用 */
	__check_cap_issue(ci, cap, newcaps);

	/* 更新inode版本号 - 用于缓存一致性 */
	inode_set_max_iversion_raw(inode, extra_info->change_attr);

	/*
	 * 文件属性更新 - AUTH_SHARED能力允许读取基本属性
	 */
	if ((newcaps & CEPH_CAP_AUTH_SHARED) &&
	    (extra_info->issued & CEPH_CAP_AUTH_EXCL) == 0) {
		umode_t mode = le32_to_cpu(grant->mode);

		/* 检查inode类型是否发生变化 */
		if (inode_wrong_type(inode, mode))
			pr_warn_once("inode type changed! (ino %llx.%llx is 0%o, mds says 0%o)\n",
				     ceph_vinop(inode), inode->i_mode, mode);
		else
			inode->i_mode = mode;
		
		/* 更新用户ID和组ID */
		inode->i_uid = make_kuid(&init_user_ns, le32_to_cpu(grant->uid));
		inode->i_gid = make_kgid(&init_user_ns, le32_to_cpu(grant->gid));
		ci->i_btime = extra_info->btime;
		
		doutc(cl, "%p %llx.%llx mode 0%o uid.gid %d.%d\n", inode,
		      ceph_vinop(inode), inode->i_mode,
		      from_kuid(&init_user_ns, inode->i_uid),
		      from_kgid(&init_user_ns, inode->i_gid));

#if IS_ENABLED(CONFIG_FS_ENCRYPTION)
		/* 加密认证信息检查 - 防止未授权修改 */
		if (ci->fscrypt_auth_len != extra_info->fscrypt_auth_len ||
		    memcmp(ci->fscrypt_auth, extra_info->fscrypt_auth,
			   ci->fscrypt_auth_len))
			pr_warn_ratelimited_client(cl,
				"cap grant attempt to change fscrypt_auth on non-I_NEW inode (old len %d new len %d)\n",
				ci->fscrypt_auth_len,
				extra_info->fscrypt_auth_len);
#endif
	}

	/*
	 * 链接计数更新 - LINK_SHARED能力允许读取链接信息
	 */
	if ((newcaps & CEPH_CAP_LINK_SHARED) &&
	    (extra_info->issued & CEPH_CAP_LINK_EXCL) == 0) {
		set_nlink(inode, le32_to_cpu(grant->nlink));
		if (inode->i_nlink == 0)
			deleted_inode = true; // 标记为已删除
	}

	/*
	 * 扩展属性更新 - 处理xattr数据
	 */
	if ((extra_info->issued & CEPH_CAP_XATTR_EXCL) == 0 &&
	    grant->xattr_len) {
		int len = le32_to_cpu(grant->xattr_len);
		u64 version = le64_to_cpu(grant->xattr_version);

		if (version > ci->i_xattrs.version) {
			doutc(cl, " got new xattrs v%llu on %p %llx.%llx len %d\n",
			      version, inode, ceph_vinop(inode), len);
			if (ci->i_xattrs.blob)
				ceph_buffer_put(ci->i_xattrs.blob);
			ci->i_xattrs.blob = ceph_buffer_get(xattr_buf);
			ci->i_xattrs.version = version;
			ceph_forget_all_cached_acls(inode);
			ceph_security_invalidate_secctx(inode);
		}
	}

	/*
	 * 时间戳更新 - 当有读权限时更新文件时间
	 */
	if (newcaps & CEPH_CAP_ANY_RD) {
		struct timespec64 mtime, atime, ctime;
		ceph_decode_timespec64(&mtime, &grant->mtime);
		ceph_decode_timespec64(&atime, &grant->atime);
		ceph_decode_timespec64(&ctime, &grant->ctime);
		ceph_fill_file_time(inode, extra_info->issued,
				    le32_to_cpu(grant->time_warp_seq),
				    &ctime, &mtime, &atime);
	}

	/*
	 * 目录统计信息更新 - 用于目录的文件和子目录计数
	 */
	if ((newcaps & CEPH_CAP_FILE_SHARED) && extra_info->dirstat_valid) {
		ci->i_files = extra_info->nfiles;
		ci->i_subdirs = extra_info->nsubdirs;
	}

	/*
	 * 文件布局和大小处理 - 核心的文件元数据管理
	 */
	if (newcaps & (CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR)) {
		/* 文件布局可能已更改 */
		s64 old_pool = ci->i_layout.pool_id;
		struct ceph_string *old_ns;

		ceph_file_layout_from_legacy(&ci->i_layout, &grant->layout);
		old_ns = rcu_dereference_protected(ci->i_layout.pool_ns,
					lockdep_is_held(&ci->i_ceph_lock));
		rcu_assign_pointer(ci->i_layout.pool_ns, extra_info->pool_ns);

		/* 如果存储池发生变化,清除权限缓存 */
		if (ci->i_layout.pool_id != old_pool ||
		    extra_info->pool_ns != old_ns)
			ci->i_ceph_flags &= ~CEPH_I_POOL_PERM;

		extra_info->pool_ns = old_ns;

		/* 处理文件大小和截断 */
		queue_trunc = ceph_fill_file_size(inode, extra_info->issued,
					le32_to_cpu(grant->truncate_seq),
					le64_to_cpu(grant->truncate_size),
					size);
	}

	/*
	 * 最大文件大小处理 - 仅对授权MDS有效
	 */
	if (ci->i_auth_cap == cap && (newcaps & CEPH_CAP_ANY_FILE_WR)) {
		if (max_size != ci->i_max_size) {
			doutc(cl, "max_size %lld -> %llu\n", ci->i_max_size,
			      max_size);
			ci->i_max_size = max_size;
			if (max_size >= ci->i_wanted_max_size) {
				ci->i_wanted_max_size = 0;  /* 重置 */
				ci->i_requested_max_size = 0;
			}
			wake = true;
		}
	}

	/* 能力状态分析 */
	wanted = __ceph_caps_wanted(ci);
	used = __ceph_caps_used(ci);
	dirty = __ceph_caps_dirty(ci);
	doutc(cl, " my wanted = %s, used = %s, dirty %s\n",
	      ceph_cap_string(wanted), ceph_cap_string(used),
	      ceph_cap_string(dirty));

	/*
	 * 检查是否需要能力检查 - 处理导入或过期的情况
	 */
	if ((was_stale || le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) &&
	    (wanted & ~(cap->mds_wanted | newcaps))) {
		check_caps = 1;
	}

	/*
	 * 能力变化处理 - 这是函数的核心逻辑
	 * 分为三种情况:撤销、无变化、授权
	 */
	if (cap->issued & ~newcaps) {
		/* 能力撤销情况 */
		int revoking = cap->issued & ~newcaps;

		doutc(cl, "revocation: %s -> %s (revoking %s)\n",
		      ceph_cap_string(cap->issued), ceph_cap_string(newcaps),
		      ceph_cap_string(revoking));
		
		if (S_ISREG(inode->i_mode) &&
		    (revoking & used & CEPH_CAP_FILE_BUFFER)) {
			writeback = true;  /* 启动回写,延迟确认 */
			revoke_wait = true;
		} else if (queue_invalidate &&
			 revoking == CEPH_CAP_FILE_CACHE &&
			 (newcaps & CEPH_CAP_FILE_LAZYIO) == 0) {
			revoke_wait = true; /* 等待失效完成 */
		} else if (cap == ci->i_auth_cap) {
			check_caps = 1; /* 仅检查授权能力 */
		} else {
			check_caps = 2; /* 检查所有能力 */
		}
		
		/* 如果有新能力,尝试唤醒等待者 */
		if (~cap->issued & newcaps)
			wake = true;
		cap->issued = newcaps;
		cap->implemented |= newcaps;
	} else if (cap->issued == newcaps) {
		/* 能力无变化 */
		doutc(cl, "caps unchanged: %s -> %s\n",
		      ceph_cap_string(cap->issued),
		      ceph_cap_string(newcaps));
	} else {
		/* 能力授权情况 */
		doutc(cl, "grant: %s -> %s\n", ceph_cap_string(cap->issued),
		      ceph_cap_string(newcaps));
		
		/* 检查是否有其他MDS在撤销新授权的能力 */
		if (cap == ci->i_auth_cap &&
		    __ceph_caps_revoking_other(ci, cap, newcaps))
		    check_caps = 2;

		cap->issued = newcaps;
		cap->implemented |= newcaps; /* 仅添加位,避免覆盖待撤销的 */
		wake = true;
	}
	BUG_ON(cap->issued & ~cap->implemented);

	/*
	 * 撤销消息的特殊处理 - 强制发送响应给MDS
	 */
	if (!revoke_wait && le32_to_cpu(grant->op) == CEPH_CAP_OP_REVOKE) {
		cap->mds_wanted = 0;
		flags |= CHECK_CAPS_FLUSH_FORCE;
		if (cap == ci->i_auth_cap)
			check_caps = 1;
		else
			check_caps = 2;
	}

	/*
	 * 内联数据处理 - Ceph的小文件优化
	 */
	if (extra_info->inline_version > 0 &&
	    extra_info->inline_version >= ci->i_inline_version) {
		ci->i_inline_version = extra_info->inline_version;
		if (ci->i_inline_version != CEPH_INLINE_NONE &&
		    (newcaps & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)))
			fill_inline = true;
	}

	/*
	 * 导入操作的特殊处理 - 处理能力从其他MDS迁移过来的情况
	 */
	if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) {
		if (ci->i_auth_cap == cap) {
			if (newcaps & ~extra_info->issued)
				wake = true;

			if (ci->i_requested_max_size > max_size ||
			    !(le32_to_cpu(grant->wanted) & CEPH_CAP_ANY_FILE_WR)) {
				/* 如有必要,重新请求max_size */
				ci->i_requested_max_size = 0;
				wake = true;
			}

			ceph_kick_flushing_inode_caps(session, ci);
		}
		up_read(&session->s_mdsc->snap_rwsem);
	}
	spin_unlock(&ci->i_ceph_lock);

	/*
	 * 后续操作处理 - 在释放锁后执行的异步操作
	 */
	if (fill_inline)
		ceph_fill_inline_data(inode, NULL, extra_info->inline_data,
				      extra_info->inline_len);

	if (queue_trunc)
		ceph_queue_vmtruncate(inode);

	if (writeback)
		ceph_queue_writeback(inode);
	if (queue_invalidate)
		ceph_queue_invalidate(inode);
	if (deleted_inode)
		invalidate_aliases(inode);
	if (wake)
		wake_up_all(&ci->i_cap_wq);

	mutex_unlock(&session->s_mutex);
	
	/*
	 * 能力检查 - 根据前面的分析结果执行相应的能力检查
	 */
	if (check_caps == 1)
		ceph_check_caps(ci, flags | CHECK_CAPS_AUTHONLY | CHECK_CAPS_NOINVAL);
	else if (check_caps == 2)
		ceph_check_caps(ci, flags | CHECK_CAPS_NOINVAL);
}

主要流程分析

第一阶段:初始化与预处理

1
2
3
4
5
6
// 1. 加密文件大小处理
if (IS_ENCRYPTED(inode) && size)
    size = extra_info->fscrypt_file_size;

// 2. 调试日志输出
doutc(cl, "%p %llx.%llx cap %p mds%d seq %d %s\n", ...);

第二阶段:缓存一致性检查

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// 关键的缓存失效逻辑
if (S_ISREG(inode->i_mode) && 
    ((cap->issued & ~newcaps) & CEPH_CAP_FILE_CACHE) &&
    (newcaps & CEPH_CAP_FILE_LAZYIO) == 0 &&
    !(ci->i_wrbuffer_ref || ci->i_wb_ref)) {
    
    if (try_nonblocking_invalidate(inode)) {
        if (ci->i_rdcache_revoking != ci->i_rdcache_gen) {
            queue_invalidate = true;
            ci->i_rdcache_revoking = ci->i_rdcache_gen;
        }
    }
}

缓存失效策略

  • 仅对常规文件执行
  • 当 FILE_CACHE 能力被撤销时触发
  • 没有脏缓冲区时尝试立即失效
  • 失效失败时安排异步失效

第三阶段:能力迁移处理

1
2
3
4
5
6
7
// 处理 MDS 间的能力迁移
if (ceph_seq_cmp(seq, cap->seq) <= 0) {
    WARN_ON(cap != ci->i_auth_cap);
    WARN_ON(cap->cap_id != le64_to_cpu(grant->cap_id));
    seq = cap->seq;
    newcaps |= cap->issued;
}

第四阶段:元数据更新

认证信息更新

1
2
3
4
5
6
7
8
if ((newcaps & CEPH_CAP_AUTH_SHARED) &&
    (extra_info->issued & CEPH_CAP_AUTH_EXCL) == 0) {
    
    inode->i_mode = le32_to_cpu(grant->mode);
    inode->i_uid = make_kuid(&init_user_ns, le32_to_cpu(grant->uid));
    inode->i_gid = make_kgid(&init_user_ns, le32_to_cpu(grant->gid));
    ci->i_btime = extra_info->btime;
}

文件布局与大小处理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
if (newcaps & (CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR)) {
    // 文件布局可能变化
    ceph_file_layout_from_legacy(&ci->i_layout, &grant->layout);
    
    // 处理存储池变化
    if (ci->i_layout.pool_id != old_pool ||
        extra_info->pool_ns != old_ns)
        ci->i_ceph_flags &= ~CEPH_I_POOL_PERM;
    
    // 文件大小和截断处理
    queue_trunc = ceph_fill_file_size(inode, extra_info->issued,
                    le32_to_cpu(grant->truncate_seq),
                    le64_to_cpu(grant->truncate_size), size);
}

第五阶段:能力状态分析与处理

这是函数的核心逻辑,分为三种情况:

能力撤销 (Revocation)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
if (cap->issued & ~newcaps) {
    int revoking = cap->issued & ~newcaps;
    
    if (S_ISREG(inode->i_mode) &&
        (revoking & used & CEPH_CAP_FILE_BUFFER)) {
        writeback = true;   // 启动回写,延迟确认
        revoke_wait = true;
    }
    
    cap->issued = newcaps;
    cap->implemented |= newcaps;
}

能力授权 (Grant)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
else {
    // 检查是否有其他MDS在撤销新授权的能力
    if (cap == ci->i_auth_cap &&
        __ceph_caps_revoking_other(ci, cap, newcaps))
        check_caps = 2;
    
    cap->issued = newcaps;
    cap->implemented |= newcaps; // 仅添加位,避免覆盖待撤销的
    wake = true;
}

第六阶段:特殊操作处理

内联数据处理

1
2
3
4
5
6
7
if (extra_info->inline_version > 0 &&
    extra_info->inline_version >= ci->i_inline_version) {
    ci->i_inline_version = extra_info->inline_version;
    if (ci->i_inline_version != CEPH_INLINE_NONE &&
        (newcaps & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)))
        fill_inline = true;
}

导入操作处理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) {
    if (ci->i_auth_cap == cap) {
        if (newcaps & ~extra_info->issued)
            wake = true;
        
        // 重新请求max_size如果必要
        if (ci->i_requested_max_size > max_size ||
            !(le32_to_cpu(grant->wanted) & CEPH_CAP_ANY_FILE_WR)) {
            ci->i_requested_max_size = 0;
            wake = true;
        }
        
        ceph_kick_flushing_inode_caps(session, ci);
    }
}

第七阶段:后续操作调度

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
// 释放锁后的异步操作
if (fill_inline)
    ceph_fill_inline_data(inode, NULL, extra_info->inline_data,
                          extra_info->inline_len);

if (queue_trunc)
    ceph_queue_vmtruncate(inode);

if (writeback)
    ceph_queue_writeback(inode);

if (queue_invalidate)
    ceph_queue_invalidate(inode);

if (deleted_inode)
    invalidate_aliases(inode);

if (wake)
    wake_up_all(&ci->i_cap_wq);

主要子函数

__check_cap_issue(ci, cap, newcaps)

功能:检查新授予能力的副作用

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
// 增加读缓存生成号
if (S_ISREG(ci->netfs.inode.i_mode) &&
    (issued & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) &&
    (had & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0) {
    ci->i_rdcache_gen++;
}

// 处理FILE_SHARED能力变化
if ((issued & CEPH_CAP_FILE_SHARED) != (had & CEPH_CAP_FILE_SHARED)) {
    if (issued & CEPH_CAP_FILE_SHARED)
        atomic_inc(&ci->i_shared_gen);
    if (S_ISDIR(ci->netfs.inode.i_mode)) {
        __ceph_dir_clear_complete(ci);
    }
}

ceph_fill_file_size()

功能:更新文件大小和截断信息

  • 处理截断序列号验证
  • 更新 i_size
  • 检测文件截断操作
  • 返回是否需要队列截断操作

try_nonblocking_invalidate()

功能:非阻塞缓存失效

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
static int try_nonblocking_invalidate(struct inode *inode) {
    u32 invalidating_gen = ci->i_rdcache_gen;
    
    spin_unlock(&ci->i_ceph_lock);
    ceph_fscache_invalidate(inode, false);
    invalidate_mapping_pages(&inode->i_data, 0, -1);
    spin_lock(&ci->i_ceph_lock);
    
    if (inode->i_data.nrpages == 0 &&
        invalidating_gen == ci->i_rdcache_gen) {
        ci->i_rdcache_revoking = ci->i_rdcache_gen - 1;
        return 0;  // 成功
    }
    return -1;  // 失败,页面被锁定
}

核心数据结构

能力状态变量

1
2
3
4
int newcaps = le32_to_cpu(grant->caps);      // MDS新授权的能力
int used = __ceph_caps_used(ci);             // 当前使用的能力
int wanted = __ceph_caps_wanted(ci);         // 客户端期望的能力  
int dirty = __ceph_caps_dirty(ci);          // 脏数据相关能力

状态控制标志

1
2
3
4
5
6
bool was_stale;           // 能力是否过期
bool wake = false;        // 是否唤醒等待线程
bool writeback = false;   // 是否启动回写
bool queue_trunc = false; // 是否队列截断操作
bool queue_invalidate = false; // 是否队列失效操作
bool revoke_wait = false; // 是否等待撤销完成

错误处理与边界情况

会话状态检查

1
2
3
bool was_stale = cap->cap_gen < atomic_read(&session->s_cap_gen);
if (was_stale)
    cap->issued = cap->implemented = CEPH_CAP_PIN;

序列号验证

1
2
3
4
5
6
7
// 处理消息乱序
if (ceph_seq_cmp(seq, cap->seq) <= 0) {
    WARN_ON(cap != ci->i_auth_cap);
    WARN_ON(cap->cap_id != le64_to_cpu(grant->cap_id));
    seq = cap->seq;
    newcaps |= cap->issued;
}

性能优化策略

延迟处理机制

  • 能力检查通过 check_caps 标志延迟执行
  • 异步操作通过工作队列调度
  • 批量处理减少锁竞争

缓存策略

  • 非阻塞失效减少延迟
  • 生成号机制避免重复操作
  • 分层锁设计提高并发性

handle_cap_grant的异步回写

1. 工作队列机制

从 super.h 中可以看到关键的结构:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
struct ceph_fs_client {
    struct workqueue_struct *inode_wq;  // inode工作队列
    struct workqueue_struct *cap_wq;    // capability工作队列
    // ...
};

struct ceph_inode_info {
    struct work_struct i_work;          // 工作项
    unsigned long i_work_mask;          // 工作掩码
    // ...
};

// 工作类型定义
#define CEPH_I_WORK_WRITEBACK      0
#define CEPH_I_WORK_INVALIDATE_PAGES 1  
#define CEPH_I_WORK_VMTRUNCATE     2
#define CEPH_I_WORK_CHECK_CAPS     3
#define CEPH_I_WORK_FLUSH_SNAPS    4

2. 异步回写的实际流程

步骤1:启动异步回写

1
2
3
4
5
6
7
8
9
// 在 handle_cap_grant 中
if (writeback)
    ceph_queue_writeback(inode);

// ceph_queue_writeback 的实现
static inline void ceph_queue_writeback(struct inode *inode)
{
    ceph_queue_inode_work(inode, CEPH_I_WORK_WRITEBACK);
}

步骤2:工作队列调度

ceph_queue_inode_work 函数会:

  1. 设置 CEPH_I_WORK_WRITEBACK 位到 i_work_mask
  2. i_work 提交到 inode_wq 工作队列

步骤3:工作函数处理

工作队列会调用 ceph_inode_work 函数(在 inode.c 中实现):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
static void ceph_inode_work(struct work_struct *work)
{
    struct ceph_inode_info *ci = container_of(work, struct ceph_inode_info, i_work);
    struct inode *inode = &ci->netfs.inode;

    if (test_and_clear_bit(CEPH_I_WORK_WRITEBACK, &ci->i_work_mask)) {
        doutc(cl, "writeback %p %llx.%llx\n", inode, ceph_vinop(inode));
        filemap_fdatawrite(&inode->i_data);  // 启动页面回写
    }
    
    // 其他工作类型处理...
    
    if (test_and_clear_bit(CEPH_I_WORK_CHECK_CAPS, &ci->i_work_mask)) {
        ceph_check_caps(ci, 0);  // 检查能力状态
    }
}

3. 回写完成的回调机制

关键:引用计数驱动

真正的回调机制是通过 引用计数 实现的,而不是传统的回调函数:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// 在页面回写完成时调用
void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
                               struct ceph_snap_context *snapc)
{
    struct inode *inode = &ci->netfs.inode;
    bool last = false;
    bool complete_capsnap = false;
    bool wake_ci = false;

    spin_lock(&ci->i_ceph_lock);
    
    // 减少写缓冲区引用计数
    ci->i_wrbuffer_ref -= nr;
    if (ci->i_wrbuffer_ref == 0) {
        last = true;  // 最后一个写引用
    }
    
    // 处理 capsnap 相关逻辑
    if (ci->i_head_snapc == snapc) {
        ci->i_wrbuffer_ref_head -= nr;
        if (ci->i_wrbuffer_ref_head == 0) {
            // 触发快照相关处理
        }
    }
    
    // 减少总的写引用计数  
    if (ci->i_wb_ref && (--ci->i_wb_ref == 0)) {
        wake_ci = true;
    }
    
    spin_unlock(&ci->i_ceph_lock);

    // 关键:当所有写引用都释放完成时,触发能力检查
    if (last) {
        ceph_check_caps(ci, 0);  // 这里会检查并通知MDS
    }
    
    if (wake_ci)
        wake_up_all(&ci->i_cap_wq);  // 唤醒等待的线程
}

4. 触发后续能力检查的两种方式

方式1:直接调用(同步)

当引用计数归零时直接调用 ceph_check_caps(ci, 0)

方式2:工作队列(异步)

有时会通过工作队列异步调用:

1
2
3
4
static inline void ceph_queue_check_caps(struct inode *inode)
{
    ceph_queue_inode_work(inode, CEPH_I_WORK_CHECK_CAPS);
}

5. 页面回写完成时的调用路径

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
页面回写完成 (内核writeback机制)
end_page_writeback()  (内核函数)
ceph_writepages_end() 或类似的Ceph回写完成函数
ceph_put_wrbuffer_cap_refs()  (减少引用计数)
当 i_wrbuffer_ref == 0 && i_wb_ref == 0 时
ceph_check_caps(ci, 0)  (检查能力状态)
发现可以安全释放 CEPH_CAP_FILE_BUFFER
__send_cap() 发送消息给MDS

CEPH_I_WORK_WRITEBACK是怎么和内核的writeback函数联系起来的?

1. 工作队列机制

异步工作调度

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
void ceph_queue_inode_work(struct inode *inode, int work_bit)
{
    struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode);
    struct ceph_inode_info *ci = ceph_inode(inode);
    
    set_bit(work_bit, &ci->i_work_mask);  // 设置工作位
    ihold(inode);                         // 增加引用计数
    
    if (queue_work(fsc->inode_wq, &ci->i_work)) {
        // 成功提交到工作队列
    } else {
        // 已经在队列中,释放引用
        iput(inode);
    }
}

工作函数处理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
static void ceph_inode_work(struct work_struct *work)
{
    struct ceph_inode_info *ci = container_of(work, struct ceph_inode_info, i_work);
    struct inode *inode = &ci->netfs.inode;

    if (test_and_clear_bit(CEPH_I_WORK_WRITEBACK, &ci->i_work_mask)) {
        doutc(cl, "writeback %p %llx.%llx\n", inode, ceph_vinop(inode));
        filemap_fdatawrite(&inode->i_data);  // 这里是关键!
    }
    
    // 处理其他工作类型...
    
    iput(inode);  // 释放引用计数
}

2. 与内核 writeback 的连接点

直接调用内核函数

CEPH_I_WORK_WRITEBACK 工作通过 filemap_fdatawrite() 直接调用内核的 writeback 机制:

1
2
3
4
5
6
7
8
// 内核函数调用链
filemap_fdatawrite(&inode->i_data)
    
do_writepages()
    
address_space->a_ops->writepages()  // 调用 Ceph 的 writepages 实现
    
ceph_writepages()  // Ceph 自定义的 writepages 函数

3. 页面回写完成的回调机制

Ceph 的 writepages 实现

1
2
3
4
5
6
7
// 在 Ceph 的 address_space_operations 中
static const struct address_space_operations ceph_aops = {
    .writepages = ceph_writepages,
    .write_begin = ceph_write_begin,
    .write_end = ceph_write_end,
    // ...
};

页面回写完成处理

当页面写入完成时,内核会调用 Ceph 的回调函数:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
// 在 ceph_writepages 的实现中(通常在 addr.c 中)
static int ceph_writepages(struct address_space *mapping,
                          struct writeback_control *wbc)
{
    // 设置回写完成回调
    // 当写入完成时会调用相应的完成函数
    
    // 最终会调用类似这样的函数:
    ceph_put_wrbuffer_cap_refs(ci, nr_pages, snapc);
}

4. 引用计数驱动的后续处理

回写完成后的处理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
                               struct ceph_snap_context *snapc)
{
    bool last = false;
    bool wake_ci = false;

    spin_lock(&ci->i_ceph_lock);
    
    // 减少写缓冲区引用计数
    ci->i_wrbuffer_ref -= nr;
    if (ci->i_wrbuffer_ref == 0) {
        last = true;  // 所有写操作完成
    }
    
    // 减少总的写引用计数
    if (ci->i_wb_ref && (--ci->i_wb_ref == 0)) {
        wake_ci = true;
    }
    
    spin_unlock(&ci->i_ceph_lock);

    // 关键:当所有写引用都释放完成时
    if (last) {
        ceph_check_caps(ci, 0);  // 检查并通知 MDS
    }
    
    if (wake_ci)
        wake_up_all(&ci->i_cap_wq);  // 唤醒等待的线程
}

6. 关键连接点总结

  1. 工作队列桥接CEPH_I_WORK_WRITEBACK 通过工作队列机制异步执行
  2. 内核函数调用:直接调用 filemap_fdatawrite() 启动内核 writeback
  3. 地址空间操作:通过 address_space_operations 连接到 Ceph 的实现
  4. 引用计数回调:写入完成后通过引用计数机制触发后续处理
  5. 能力通知:最终通过 ceph_check_caps() 通知 MDS

ceph_put_wrbuffer_cap_refs 函数的实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
                               struct ceph_snap_context *snapc)
{
    struct inode *inode = &ci->netfs.inode;
    bool last = false;
    bool wake_ci = false;

    spin_lock(&ci->i_ceph_lock);
    
    // 减少总的写缓冲区引用计数
    ci->i_wrbuffer_ref -= nr;
    if (ci->i_wrbuffer_ref == 0) {
        last = true;  // 标记为最后一个引用
    }
    
    // 处理快照相关的引用计数
    if (ci->i_head_snapc == snapc) {
        ci->i_wrbuffer_ref_head -= nr;
    } else {
        // 处理 capsnap 的引用计数
        struct ceph_cap_snap *capsnap;
        list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) {
            if (capsnap->context == snapc) {
                capsnap->dirty_pages -= nr;
                break;
            }
        }
    }
    
    // 减少总的写引用计数
    if (ci->i_wb_ref && (--ci->i_wb_ref == 0)) {
        wake_ci = true;
    }
    
    spin_unlock(&ci->i_ceph_lock);

    // 关键:当所有写引用都释放完成时,触发能力检查
    if (last) {
        ceph_check_caps(ci, 0);  // 通知 MDS 权限可以释放
    }
    
    if (wake_ci)
        wake_up_all(&ci->i_cap_wq);
        
    // 如果是最后一个引用,释放 inode 引用
    if (last)
        iput(inode);
}

i_wrbuffer_ref 的增加标准

基本单位:按页面计数

i_wrbuffer_ref 的增加是以页面为单位的,每个脏页面对应一个引用计数。

具体增加规则

ceph_dirty_folio 中:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
static bool ceph_dirty_folio(struct address_space *mapping, struct folio *folio)
{
    struct inode *inode = mapping->host;
    struct ceph_inode_info *ci = ceph_inode(inode);
    
    spin_lock(&ci->i_ceph_lock);
    
    // 关键:每个 folio 增加 1 个引用计数
    if (ci->i_wrbuffer_ref == 0)
        ihold(inode);  // 第一个脏页时持有 inode 引用
    
    ++ci->i_wrbuffer_ref;  // 每个脏页增加 1
    
    // 根据快照上下文分别处理
    if (__ceph_have_pending_cap_snap(ci)) {
        // 如果有待处理的快照,增加快照的脏页计数
        struct ceph_cap_snap *capsnap = 
            list_last_entry(&ci->i_cap_snaps, struct ceph_cap_snap, ci_item);
        capsnap->dirty_pages++;  // 快照脏页计数 +1
    } else {
        // 否则增加 head 快照的脏页计数
        ++ci->i_wrbuffer_ref_head;  // head 快照脏页计数 +1
    }
    
    spin_unlock(&ci->i_ceph_lock);
}

ceph_write_begin 中(如果页面变脏):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
static int ceph_write_begin(struct file *file, struct address_space *mapping,
                           loff_t pos, unsigned len, 
                           struct folio **foliop, void **fsdata)
{
    // ...
    if (!folio_test_uptodate(folio)) {
        // 如果页面不是最新的,需要读取
        // 然后标记为脏页面,会触发 ceph_dirty_folio
    }
    
    // 当页面被标记为脏时,会调用 ceph_dirty_folio
    // 每个新的脏页面都会使 i_wrbuffer_ref 增加 1
}

2. 具体的计数标准

标准1:一个 folio = 一个引用计数

1
2
3
4
5
6
7
8
9
// 每当一个 folio 变为脏页面时
ci->i_wrbuffer_ref++;  // 总引用计数 +1

// 根据快照状态分别计数:
if (在快照中) {
    capsnap->dirty_pages++;     // 快照脏页 +1
} else {
    ci->i_wrbuffer_ref_head++;  // head快照脏页 +1
}

标准2:批量操作时按实际页面数计算

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// 在 writepages_finish 中,批量减少引用计数
static void writepages_finish(struct ceph_osd_request *req)
{
    int total_pages = 0;
    
    // 统计实际写入的页面数
    for (i = 0; i < req->r_num_ops; i++) {
        total_pages += osd_data->length >> PAGE_SHIFT;
    }
    
    // 按实际页面数减少引用计数
    ceph_put_wrbuffer_cap_refs(ci, total_pages, snapc);
}

标准3:单页操作时固定为 1

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// 单页写入完成
static int write_folio_nounlock(struct folio *folio, struct writeback_control *wbc)
{
    // ...
    folio_end_writeback(folio);
    ceph_put_wrbuffer_cap_refs(ci, 1, snapc);  // 固定减少 1
}

// 页面失效时
static void ceph_invalidate_folio(struct folio *folio, size_t offset, size_t length)
{
    // ...
    ceph_put_wrbuffer_cap_refs(ci, 1, snapc);  // 固定减少 1
}

4. 引用计数的生命周期追踪

创建阶段(增加)

1
页面变脏  ceph_dirty_folio()  i_wrbuffer_ref += 1

维护阶段(保持)

  • 每个脏页都有对应的引用计数
  • i_wrbuffer_ref 跟踪所有脏页的总数
  • i_wrbuffer_ref_head 跟踪 head snapshot 的脏页数
  • 快照页面通过 capsnap->dirty_pages 跟踪
1
2
3
// 脏页面在内存中等待回写期间,引用计数保持不变
// 可能的状态转换:
脏页面  正在回写  引用计数仍然存在

释放阶段(减少)

1
2
3
4
// 三种减少方式:
1. 写入完成:writepages_finish()  i_wrbuffer_ref -= pages_count
2. 页面失效:ceph_invalidate_folio()  i_wrbuffer_ref -= 1  
3. 错误处理:error_cleanup()  i_wrbuffer_ref -= failed_pages

5. 与 MDS 权限释放的关系

1
2
3
4
5
6
7
8
9
页面写入完成或失效
ceph_put_wrbuffer_cap_refs(ci, nr, snapc)
ci->i_wrbuffer_ref -= nr
if (ci->i_wrbuffer_ref == 0 && ci->i_wb_ref == 0):
    ├── ceph_check_caps(ci, 0)  // // 只有当所有脏页面都处理完成时,才能释放 FILE_BUFFER 权限
    └── iput(inode)             // 释放 inode 引用

总结

i_wrbuffer_ref 的计数标准是:

  • 基本单位:以页面(folio)为单位
  • 增加规则:每个脏页面增加 1 个引用计数
  • 减少规则:页面写入完成或失效时,按实际处理的页面数减少
  • 同步保证:只有当引用计数归零时,才表示所有脏数据已处理完毕
  • 权限释放:MDS 只有在 i_wrbuffer_ref == 0 时才能安全回收 CEPH_CAP_FILE_BUFFER 权限

ceph_add_cap 函数详细分析

1. 主要架构作用

ceph_add_cap 函数在 Ceph 客户端文件系统中扮演着核心角色:

  • Capability 管理: 负责在客户端添加或更新由 MDS (Metadata Server) 授予的能力(capabilities)
  • 权限控制: 实现分布式文件系统的细粒度访问控制
  • 缓存一致性: 维护客户端缓存与服务器状态的一致性
  • 会话管理: 将 capability 与特定的 MDS 会话关联

2. 工作原理

核心概念

  • Capability: 类似于传统文件系统的权限,但更细粒度,包括读、写、缓存、执行等多种权限
  • MDS Session: 客户端与元数据服务器的会话连接
  • Cap Generation: 用于检测会话重连和失效状态

权限位分析

1
2
3
4
5
6
7
8
// 主要的 capability 位包括:
CEPH_CAP_PIN      // 基础引用
CEPH_CAP_GSHARED  // 共享读权限  
CEPH_CAP_GEXCL    // 独占权限
CEPH_CAP_GCACHE   // 缓存权限
CEPH_CAP_GRD      // 读权限
CEPH_CAP_GWR      // 写权限
CEPH_CAP_GBUFFER  // 缓冲区权限

3. 主要流程分析

阶段一:权限验证和准备

  1. 锁检查: 通过 lockdep_assert_held(&ci->i_ceph_lock) 确保持有必要的锁
  2. 参数验证: 检查传入的 inode、session、capability 参数
  3. 生成检查: 获取当前会话的 generation 号码用于状态同步

阶段二:Capability 查找或创建

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
cap = __get_cap_for_mds(ci, mds);
if (!cap) {
    // 创建新的 capability
    cap = *new_cap;
    *new_cap = NULL;
    // 初始化新 capability
    cap->issued = 0;
    cap->implemented = 0;
    cap->mds = mds;
    // 插入到数据结构中
    __insert_cap_node(ci, cap);
} else {
    // 更新现有 capability
    list_move_tail(&cap->session_caps, &session->s_caps);
}

阶段三:状态同步处理

  • Generation 检查: 处理会话重连导致的状态过期
  • 序列号处理: 通过 ceph_seq_cmp 处理消息乱序问题
  • Auth Cap 迁移: 处理认证 capability 在 MDS 间的迁移

阶段四:Snap Realm 管理

1
2
3
4
5
6
7
if (!ci->i_snap_realm || 
    ((flags & CEPH_CAP_FLAG_AUTH) && realmino != (u64)-1)) {
    // 查找并切换到正确的快照域
    struct ceph_snap_realm *realm = ceph_lookup_snap_realm(mdsc, realmino);
    if (realm)
        ceph_change_snap_realm(inode, realm);
}

阶段五:权限检查和调度

  • 权限分析: 调用 __check_cap_issue 检查新授予的权限
  • 需求匹配: 比较实际需要的权限与授予的权限
  • 延迟处理: 如果权限不匹配,通过 __cap_delay_requeue 安排后续处理

4. 主要子函数分析

__get_cap_for_mds(ci, mds)

  • 功能: 从红黑树中查找特定 MDS 的 capability
  • 数据结构: 使用红黑树 ci->i_caps 进行高效查找
  • 时间复杂度: O(log n)

__insert_cap_node(ci, cap)

  • 功能: 将新的 capability 插入到 inode 的红黑树中
  • 排序: 按 MDS ID 排序维护
  • 并发: 在 i_ceph_lock 保护下执行

__check_cap_issue(ci, cap, issued)

  • 功能: 检查新授予权限的副作用
  • 缓存管理: 处理 FILE_CACHE 权限变化时的缓存失效
  • 目录完整性: 处理 FILE_SHARED 权限对目录缓存的影响

__cap_delay_requeue(mdsc, ci)

  • 功能: 将 inode 加入延迟处理队列
  • 调度策略: 实现权限检查的延迟批处理
  • 性能优化: 避免频繁的权限协商

5. 数据结构关系

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
struct ceph_inode_info
├── i_caps (红黑树) - 按 MDS ID 索引的 capabilities
├── i_auth_cap - 指向认证 capability
├── i_snap_realm - 快照域信息
└── i_cap_delay_list - 延迟处理链表

struct ceph_cap  
├── ci_node - 在 inode 红黑树中的节点
├── session_caps - 在会话链表中的节点
├── issued/implemented - 权限位掩码
└── mds_wanted - MDS 期望的权限

6. 错误处理和边界情况

  • 会话重连: 通过 generation 机制检测并处理
  • 权限迁移: 处理 capability 在不同 MDS 间的迁移
  • 快照域变更: 安全地切换 inode 的快照域
  • 内存分配失败: 通过 new_cap 参数的预分配机制处理

7. 性能考量

  • 锁粒度: 使用 inode 级别的细粒度锁
  • 延迟处理: 批量处理权限检查以减少网络开销
  • 数据结构: 红黑树提供 O(log n) 的查找性能
  • 内存管理: 预分配机制避免在锁内进行内存分配