统一缓存系统(Redis / Memory)
大约 6 分钟
缓存
一个通过统一 Trait 封装的缓存系统,提供 Redis 与内存 Memory 两种实现。通过全局缓存器 CacheManager 暴露单例实例,业务层只关心一套 API,即可在开发、测试与生产间无缝切换。
- 统一接口:CacheProvider(Send + Sync + Clone,async 友好)
- 两种实现:Redis 与 Memory 均实现同一接口
- 全局单例:CacheManager::init + CacheManager::instance
- 命名空间:支持 namespace 与直达 direct 操作
- 丰富能力:KV、对象存取、队列/阻塞弹出、集合、有序集合、分布式锁、分页检索、一次性读取
flowchart LR
A[业务代码] -->|CacheManager::instance()| B[(Arc<Cache>)]
B -->|统一调用| C{Cache}
C -->|redis| D[RedisCache]
C -->|memory| E[MemoryCache]
一、快速上手
1) 在应用启动时初始化全局缓存
use crate::cache::CacheManager;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// 先加载 APPCOFIG(包含 cache.type/namespace/url/pool_size 等)
// ...
// 初始化全局缓存(根据配置选择 Redis 或 Memory)
CacheManager::init().await?;
// 启动你的服务...
Ok(())
}
未调用 init 会导致后续 instance() 直接 panic(设计即强制在启动时完成初始化)。
2) 在任意业务代码中获取实例并使用
use crate::cache::CacheManager;
#[derive(serde::Serialize, serde::Deserialize, Clone)]
struct User { id: i64, name: String }
async fn save_user(u: &User) -> anyhow::Result<()> {
let cache = CacheManager::instance().await; // Arc<Cache>
// 保存结构化对象,带过期时间(秒)
let _ = cache.set_value_ex(&format!("user:{}", u.id), u, 3600).await?;
Ok(())
}
二、核心能力与接口说明
以下方法在 Redis 与 Memory 上具有一致语义(返回 Result<T>,便于统一错误处理)。
1) 字符串 KV
- set_string(k, v) -> String
- get_string(k) -> String
- set_string_ex(k, v, ttl_secs) -> bool
- remove(k) -> usize
- contains_key(k) -> bool
- ttl(k) -> i64(秒;不存在/无 TTL 由实现决定返回值)
- get_one_use(k) -> String(一次性读取,通常读取后删除)
示例:
let cache = CacheManager::instance().await;
cache.set_string("k1", "v1").await?;
let v = cache.get_string("k1").await?;
cache.set_string_ex("otp:abc", "654321", 300).await?;
let once = cache.get_one_use("otp:abc").await?; // 只读一次,之后即失效
2) 结构化对象(JSON 序列化)
- set_value<T>(k, &value) -> String
- get_value<T>(k) -> T
- set_value_ex<T>(k, &value, ttl_secs) -> bool
- get_oneuse_value<T>(k) -> T(一次性对象读取,对应 get_one_use 的对象版)
示例:
#[derive(serde::Serialize, serde::Deserialize, Clone)]
struct Profile { name: String, age: u8 }
let cache = CacheManager::instance().await;
cache.set_value_ex("u:42", &Profile { name: "Alice".into(), age: 20 }, 3600).await?;
let p: Profile = cache.get_value("u:42").await?;
3) 命名空间与直达 direct
- with_namespace(namespace) -> Self(推荐:复制出带命名空间的新实例)
- set_namespace(namespace)(原地修改当前实例)
- namespaced_key(key) / namespaced_keys(keys)
- 直达 direct:绕过命名空间的操作(set/get/remove/ttl 等的 direct 版本)
示例:
let cache = CacheManager::instance().await;
let user_cache = cache.with_namespace("user:".to_string()).await;
user_cache.set_string("42", "Alice").await?; // 实际写入 key 为 user:42
// 直达(忽略 namespace)
cache.set_string_direct("user:42", "Bob").await?;
4) 列表队列与阻塞消费
- lpush(key, value) -> i64
- brpop(keys, timeout_secs) -> Option<(key, value)>
示例(简易队列):
let cache = CacheManager::instance().await;
// 生产
cache.lpush("q:email", r#"{"to":"a@b.com","subj":"Hi"}"#).await?;
// 消费(阻塞最多 10 秒)
if let Some((_k, msg)) = cache.brpop(vec!["q:email".into()], 10).await? {
handle_email(msg).await?;
}
5) 集合与幂等
- sadd(key, &[members]) -> i64(返回新增成员数量)
let added = cache.sadd("set:processed", &["task-1", "task-2"]).await?;
if added > 0 { /* 有新任务进入集合 */ }
6) 有序集合与延迟任务
- zadd(key, value, score) -> i64
- zadd_ch(key, value, score) -> i64(仅统计变更)
- zrange(key, start, stop) -> Vec<String>
- zrangebyscore_limit(key, min, max, offset, count) -> Vec<String>
- zrem(key, value) -> bool
示例(30 秒后执行的延迟任务):
use chrono::{Utc, Duration};
let when = Utc::now() + Duration::seconds(30);
cache.zadd("z:jobs", "job-123", when.timestamp()).await?;
// 扫描到期
let now = Utc::now().timestamp() as f64;
let due = cache.zrangebyscore_limit("z:jobs", f64::NEG_INFINITY, now, 0, 100).await?;
for job in due {
cache.zrem("z:jobs", &job).await?;
run_job(job).await?;
}
7) 分布式锁/原子写
- set_nx_ex(key, value, ttl_secs) -> bool(NX + EX 语义)
let ok = cache.set_nx_ex("lock:rebuild_index", "1", 30).await?;
if ok {
// 获得锁 → 执行独占逻辑
// ...
let _ = cache.remove("lock:rebuild_index").await;
}
8) 全量/分页检索(后台/运维)
- get_all() -> Vec<(String, String)>
- get_all_paginated(page_num, page_size, search_key) -> ListData<CacheItem>
let page = cache.get_all_paginated(1, 20, Some("user:".into())).await?;
// page.list: Vec<CacheItem>
9) 过期清理(Memory)
- recycling():回收过期键(Memory 实现需定期调用;Redis 通常无需)
use tokio::time::{sleep, Duration};
let mem = CacheManager::instance().await.clone();
tokio::spawn(async move {
loop { mem.recycling().await; sleep(Duration::from_secs(60)).await; }
});
三、配置与初始化
系统从 APPCOFIG.cache 读取配置并选择实现,默认 namespace 为 "qiluo"。
- cache_type: "redis" | "memory"
- url: Redis 连接串(当 cache_type=redis 时必填)
- namespace: 命名空间前缀(可选)
- pool_size: Memory 最大容量(当 cache_type=memory 时必须指定,代码会校验)
示例:
[cache]
cache_type = "redis"
url = "redis://127.0.0.1:6379/0"
namespace = "myapp"
# pool_size = 100000 # memory 模式下需要
错误场景:
- 缺少 redis.url 或 memory.pool_size 会导致 Cache::new() 返回错误,init 失败。
- init 未调用则 instance() 会 panic(设计即防止未初始化即使用)。
四、最佳实践
- 缓存旁路 Cache-Aside:MISS 则回源 DB,成功后写入缓存,并设置合理 TTL
- 命名空间隔离:不同业务/模块使用 with_namespace 前缀隔离键空间
- 直达 direct:仅用于跨命名空间或框架内部操作,避免与业务 namespace 混用
- JSON 对象:类型需实现 Serialize + Deserialize + Clone;反序列化失败应上报警告
- TTL 单位统一为秒:set_*_ex 与 ttl 的单位均为秒
- 内存实现:务必配合 recycling() 与容量限制,避免 OOM;生产建议优先 Redis
- 并发安全:接口均为 Send + Sync,单例通过 Arc 共享,Clone 成本应保持轻量
- 观测性:对缓存命中率/错误率/延迟做指标上报;关键路径增加熔断与降级
五、常见问题(FAQ)
- Q: 为什么 brpop 在内存实现里可能不如 Redis 阻塞精准?
- A: 内存实现通常通过轮询/条件变量模拟阻塞,时效性略逊于 Redis 原生阻塞。
- Q: get_value 未命中返回什么?
- A: 返回 Err。可先 contains_key 判断,或直接在 Err 分支回源 DB 并写回缓存。
- Q: 为什么既有 namespace 又有 direct 方法?
- A: direct 用于需要绕过命名空间的系统级场景(如跨业务运维或框架内部),业务层尽量使用带命名空间的实例。
- Q: 是否支持一次性对象读取?
- A: 支持,通过 get_oneuse_value<T>(k)(与 get_one_use 相对应)。
六、简易范式代码片段
- 缓存旁路读取
async fn get_user(id: i64) -> anyhow::Result<User> {
let cache = CacheManager::instance().await;
let key = format!("user:{id}");
if cache.contains_key(&key).await {
if let Ok(u) = cache.get_value::<User>(&key).await { return Ok(u); }
}
let u = load_user_from_db(id).await?;
let _ = cache.set_value_ex(&key, &u, 3600).await;
Ok(u)
}
- OTP 一次性校验
cache.set_string_ex("otp:abc", "654321", 300).await?;
let code = cache.get_one_use("otp:abc").await?; // 读取后即失效
- 使用全局缓存器的典型调用
let cache = CacheManager::instance().await;
let _ = cache.set_value_ex("session:token:xyz", &serde_json::json!({"uid": 42}), 7200).await?;