发布于: -/最后更新: -/6 分钟/

使用Cloudflare Durable Objects构建一个分布式限流器

摘要

Cloudflare Durable Objects(DO)是一种有状态的计算资源,与Cloudflare Workers不同。DO提供了独立的存储空间和唯一的ID访问,适合处理特定场景,如分布式限流器。DO的特性包括数据隔离和强一致性,非常适合WebSocket服务、多人协作和定时任务。通过构建分布式Token Bucket限流器的例子,可以看到DO的强大之处,包括简单的状态管理、低延迟和直观的开发体验。DO的潜力远不止于此,可以用来实现更复杂的应用,如实时游戏逻辑和轻量级数据库应用。

Cloudflare Durable Objects(简称 DO)是一个这非常有意思的产品。初看文档时,很多人可能和我一样,对其概念感到一头雾水。但真正上手使用后,你会有一种豁然开朗的感觉。

简单来说,DO 也是一种计算资源,但它与 Cloudflare Workers 有着本质的区别:Workers 是无状态的 (Stateless),而 Durable Objects 是有状态的 (Stateful)。

为什么我们需要 Durable Objects?

通常我们使用 Workers 时,脚本跑完就销毁了,内存中不保存任何状态。虽然我们可以引入 KV、D1 数据库等外部存储来实现持久化,但 DO 提供了另一维度的解决方案。

DO 的每一个实例都有自己独立的存储空间(支持 KV 键值对或 SQLite),而且通过唯一的 ID 访问。这意味着:

  1. 数据隔离:每个 DO 实例的数据是独立的,不共享。

  2. 强一致性:同一个 DO 实例的请求是排队处理的,这为处理并发和状态同步提供了天然的优势。

这就让 DO 非常适合处理一些特定场景,比如我们今天要实现的——分布式限流器

典型的应用场景

除了限流器,DO 的特性还非常适合以下场景:

  • WebSocket 服务:维持长连接状态。

  • 多人协作:例如 Google Docs 这里的实时编辑,每个人都能实时看到别人的动作。

  • 定时任务:利用 DO 的 setAlarm 实现精准的定时触发。

这就好比你拥有了一个不仅能计算,还能"记住"事情的小型服务器,但它又是完全 Serverless 的,无需关心扩展性和运维。

实战:构建分布式 Token Bucket 限流器

分布式限流的核心在于:每个用户(或 IP)都有一个独立的计数器。这个数据虽非核心业务数据,但对实时性和一致性要求很高。DO 的独立存储特性完美契合这一需求。

我们将使用经典的 令牌桶算法 (Token Bucket) 来实现。

核心代码实现

下面是一个完整的 RateLimiter 类实现。这段代码利用了 Cloudflare Workers 的 RPC 特性,让调用 DO 就像调用本地方法一样简单。

TypeScript
import { DurableObject } from "cloudflare:workers";
import type { Duration } from "@/lib/duration";
import { ms } from "@/lib/duration";

// 定义存储在 DO 中的状态接口
interface BucketState {
  tokens: number; // 当前剩余令牌数
  lastRefill: number; // 上次填充令牌的时间戳
}

// 限流配置项
export type RateLimitOptions = {
  capacity: number; // 桶的容量(最大令牌数)
  interval: Duration; // 填充周期
  cost?: number; // 本次请求消耗的令牌数,默认为 1
};

export class RateLimiter extends DurableObject {
  private state: BucketState;

  constructor(ctx: DurableObjectState, env: Env) {
    super(ctx, env);
    // 初始化内存状态
    this.state = {
      tokens: 0,
      lastRefill: 0,
    };

    // 使用 blockConcurrencyWhile 确保在处理请求前,先从存储中加载状态
    // 这是保证数据一致性的关键
    ctx.blockConcurrencyWhile(async () => {
      const stored = await ctx.storage.get<BucketState>("bucket");
      if (stored) {
        this.state = stored;
      }
    });
  }

  // 检查是否允许请求的核心逻辑 (RPC 方法)
  checkLimit({ capacity, interval, cost = 1 }: RateLimitOptions): {
    allowed: boolean;
    remaining: number;
    retryAfterMs: number;
  } {
    // 边界检查:如果单次消耗超过总容量,直接拒绝
    if (cost > capacity) {
      return {
        allowed: false,
        remaining: 0,
        retryAfterMs: -1,
      };
    }

    const now = Date.now();
    const intervalMs = ms(interval); // 将时长转换为毫秒
    const rate = capacity / intervalMs; // 计算填充速率(令牌/毫秒)

    // 首次运行时的初始化
    if (this.state.lastRefill === 0) {
      this.state.lastRefill = now;
      this.state.tokens = capacity;
    }

    // 1. 计算自上次填充以来流逝的时间
    const timeSinceLastRefill = now - this.state.lastRefill;

    // 2. 根据速率计算应补充的令牌数
    const tokensToAdd = timeSinceLastRefill * rate;

    // 3. 更新当前令牌数,但不能超过桶的容量
    const currentTokens = Math.min(capacity, this.state.tokens + tokensToAdd);

    // 4. 检查是否有足够的令牌支付本次消耗
    if (currentTokens < cost) {
      // 令牌不足,拒绝请求
      return {
        allowed: false,
        remaining: Math.floor(currentTokens),
        // 计算还需要等待多久才有足够的令牌
        retryAfterMs: Math.ceil((cost - currentTokens) / rate),
      };
    }

    // 5. 令牌充足,扣除消耗并更新状态
    this.state.tokens = currentTokens - cost;
    this.state.lastRefill = now;

    // 异步将新状态写入持久化存储,不阻塞当前请求的返回
    this.ctx.waitUntil(this.ctx.storage.put("bucket", this.state));

    return {
      allowed: true,
      remaining: Math.floor(this.state.tokens),
      retryAfterMs: 0,
    };
  }
}

如何调用这个限流器?

使用起来非常简单。我们通常会根据 userIdIP 来生成唯一的 DO ID,这样每个用户就拥有了独立的限流器实例。

TypeScript
export default {
  async fetch(request: Request, env: Env) {
    const userId = "user-123"; // 实际场景中应从 Auth Header 获取

    // 1. 根据 userId 获取唯一的 Durable Object ID
    // idFromName 保证同一个名字总是返回相同的 ID
    const id = env.RATE_LIMITER.idFromName(userId);

    // 2. 获取该 ID 对应的 Stub (客户端存根)
    const rateLimiter = env.RATE_LIMITER.get(id);

    // 3. 像调用本地函数一样调用 DO 中的方法 (RPC)
    const result = await rateLimiter.checkLimit({
      capacity: 60, // 一分钟最多 60 次
      interval: "1m",
      // key 参数在这里其实是不需要的,因为 DO 实例本身就是按用户隔离的
    });

    if (!result.allowed) {
      return Response.json(
        {
          message: "Too Many Requests",
          retryAfterSeconds: Math.ceil(result.retryAfterMs / 1000),
        },
        {
          status: 429,
          headers: {
            "Retry-After": String(Math.ceil(result.retryAfterMs / 1000)),
          },
        },
      );
    }

    // 业务逻辑...
    // doSomething();

    return new Response("OK");
  },
};

总结

通过这个简单的例子,我们可以看到 Cloudflare Durable Objects 的强大之处:

  1. 极其简单的状态管理:不需要额外部署 Redis,状态就存在代码旁边。

  2. 低延迟:DO 实例通常会在靠近用户的边缘节点运行(尽管 DO 目前有地域限制,但配合 Workers 依然很快)。

  3. 开发体验:基于 RPC 的调用方式,让分布式系统的开发变得像写单机程序一样直观。

当然,DO 的潜力远不止于此。我们可以用它来实现更复杂的 WebSocket 消息广播、实时游戏逻辑,甚至是轻量级的数据库应用。如果你对 Serverless 架构感兴趣,Durable Objects 绝对是一个值得深入挖掘的宝藏.

正文结束