渐进式 JSON:200 行代码实现流式传输,让页面加载“快如闪电”

摘要:在深入研究 React 服务端组件(React Server Components)时,偶然发现一篇关于“渐进式 JSON(progressive JSON)”的文章。作者 Dan Abramov 在文中介绍了一种从服务器向客户端分块流式传输 JSON 的技术,允许客户端在尚未接收完整个数据之前,提前开始渲染部分内容

在深入研究 React 服务端组件(React Server Components)时,偶然发现一篇关于“渐进式 JSON(progressive JSON)”的文章。作者 Dan Abramov 在文中介绍了一种从服务器向客户端分块流式传输 JSON 的技术,允许客户端在尚未接收完整个数据之前,提前开始渲染部分内容。对于大型数据集而言,这种方式能显著提升“感知性能”。这引发了笔者的好奇:要实现这样一个功能究竟需要多少代码?事实证明,这是一次颇为有趣的练习,最终诞生了一个约 200 行代码的小型库,名为 Streamson。本文即分享其构建过程。

Streamson 项目地址:https://github.com/krasimir/streamson


核心思路

渐进式 JSON 流式传输的核心思想在于:一旦部分数据准备就绪,立即将其发送至客户端,而非等待整个 JSON 结构完全生成后再统一返回。这在处理大型数据集或实时生成的数据时尤为实用。对于尚未就绪的部分,可先发送占位符,待数据准备好后,再由客户端替换为真实内容。示例如下:

{
  "user": {
    "id": 1,
    "name": "John Doe",
    "posts": [
      { "id": 101, "title": "First Post", "content": "..." },
      { "id": 102, "title": "Second Post", "content": "..." }
    ]
  }
}

假设用户信息可立即获取,而帖子内容需从数据库读取,存在一定延迟。传统做法是等待所有帖子加载完毕后再返回完整对象。而采用渐进式 JSON,可先发送占位符:

{
  "user": {
    "id": 1,
    "name": "John Doe",
    "posts": "_$1"
  }
}

待帖子加载完成后,再单独发送一个分块数据:

{
  "_$1": [
    { "id": 101, "title": "First Post", "content": "..." },
    { "id": 102, "title": "Second Post", "content": "..." }
  ]
}

客户端需具备识别占位符并在对应数据到达时完成替换的能力。


服务端实现

首先编写一个简单的函数,接收服务器响应对象(即通向客户端的通道)和待发送的数据对象:

function serve(res, data) {
  res.setHeader("Content-Type", "application/x-ndjson; charset=utf-8");
  res.setHeader("Transfer-Encoding", "chunked");

  // 向客户端发送分块数据
  res.write(JSON.stringify(...) + "\
");
  res.write(JSON.stringify(...) + "\
");

  // 全部完成后
  res.end();
}

关键点说明:

  • 使用 application/x-ndjson 内容类型:NDJSON(Newline Delimited JSON,换行分隔 JSON)是一种便捷的流式传输格式,每行均为独立的 JSON 对象。这使得在一个响应中发送多个 JSON 对象成为可能,并以换行符进行分隔。

  • 设置 Transfer-Encoding: chunked 头:该头信息告知客户端响应将以分块形式传输,因此不能依赖 Content-Length 判断数据结束。同时,连接将保持开启,直至调用 res.end()。

接下来需要对数据进行“分块化”处理。遍历数据对象,将需要后续发送的部分替换为占位符。当遇到异步数据(如 Promise)时,将其放入队列,待其完成后再作为独立分块发送。

以下是用于处理数据的函数:

function normalize(value) {
  function walk(node) {
    if (isPromise(node)) {
      const id = getId();
      registerPromise(node, id);
      return id;
    }
    if (Array.isArray(node)) {
      return node.map((item) => walk(item));
    }
    if (node && typeof node === "object") {
      const out = {};
      for (const [key, val] of Object.entries(node)) {
        out[key] = walk(val);
      }
      return out;
    }
    return node;
  }
  return walk(value);
}

该函数递归遍历数据对象。遇到 Promise 时,生成唯一占位符 ID 并注册该 Promise,等待其解析。数组和对象递归处理,原始值(如数字、字符串)则直接返回。

registerPromise 函数将 Promise 和占位符 ID 存入队列。当 Promise 解析成功时,将结果作为新分块发送给客户端:

let promises = [];

function registerPromise(promise, id) {
  promises.push({ promise, id });
  promise
    .then((value) => {
      send(id, value);
    })
    .catch((err) => {
      console.error("Error resolving promise for path", err);
      send(id, { error: "promise error", timeoutMs: TIMEOUT });
    });
}

send 函数负责将解析后的数据写入响应:

function send(id, value) {
  res.write(JSON.stringify({ i: id, c: normalize(value) }) + "\
");
  promises = promises.filter((p) => p.id !== id);
  if (promises.length === 0) res.end();
}

它会向客户端写入一个新的 JSON 行,包含占位符 ID 及对应数据。
当 Promise 处理完成后,从队列中移除。若队列中无待处理的 Promise,则调用 res.end() 结束响应。

完整服务端实现可参考:https://github.com/krasimir/streamson/blob/main/packages/streamson/lib/server.js

以下是一个可从服务端发送的对象示例:

const data = {
  user: {
    id: 1,
    name: "John Doe",
    posts: fetchPostsFromDatabase(), // 返回一个 Promise
  },
};

async function fetchPostsFromDatabase() {
  const posts = await database.query("SELECT * FROM posts WHERE userId = 1");
  return posts.map((post) => ({
    id: post.id,
    title: post.title,
    content: post.content,
    comments: fetchCommentsForPost(post.id), // 同样返回 Promise
  }));
}

注意,每个帖子中的 comments 字段也是一个 Promise,这意味着评论数据将在帖子数据发送之后,作为单独分块传送给客户端。


客户端实现

客户端需处理从服务器接收的分块数据,并将占位符替换为真实内容。可使用 Fetch API 发起请求,并将响应作为流读取。遇到占位符时,用 Promise 替代;待实际数据到达时,再解析该 Promise。核心逻辑大致如下:

try {
  const res = await fetch(endpoint);
  const reader = res.body.getReader();
  const decoder = new TextDecoder();

  async function process() {
    let done = false;
    while (!done) {
      const { value, done: readerDone } = await reader.read();
      done = readerDone;
      if (value) {
        try {
          const chunk = JSON.parse(decoder.decode(value, { stream: true }));
          chunk.c = walk(chunk.c);
          if (promises.has(chunk.i)) {
            promises.get(chunk.i)(chunk.c);
            promises.delete(chunk.i);
          }
        } catch (e) {
          console.error(`解析分块数据出错`, e);
        }
      }
    }
  }
  process();
} catch (e) {
  console.error(e);
  throw new Error(`从 Streamson 接口 ${endpoint} 获取数据失败`);
}

process 函数逐个读取响应流的分块。每个分块被解析为 JSON 后,调用 walk 函数将占位符替换为 Promise。若分块包含之前注册过的占位符 ID 对应的数据,则解析该 Promise。关键点在于 reader.read() —— 它允许等待新数据的到来。

以下是 walk 函数的实现,用于将占位符替换为 Promise:

function walk(node) {
  if (isPromisePlaceholder(node)) {
    return new Promise((done) => {
      promises.set(node, done);
    });
  }
  if (Array.isArray(node)) {
    return node.map((item) => walk(item));
  }
  if (node && typeof node === "object") {
    const out = {};
    for (const [key, val] of Object.entries(node)) {
      out[key] = walk(val);
    }
    return out;
  }
  return node;
}

function isPromisePlaceholder(val) {
  return typeof val === "string" && val.match(/^_\$(\d)/);
}

该函数逻辑与服务端的 normalize 函数相似。遇到占位符时返回一个新的 Promise,待实际数据到达时解析。数组和对象递归处理,原始值直接返回。占位符 ID 需与服务器生成的一致。

完整客户端实现可参考:https://github.com/krasimir/streamson/blob/main/packages/streamson/lib/client.js

服务端与客户端的代码合计仅 155 行 😎。


NPM 包:Streamson

是的,这套实现已被封装为 NPM 库 —— Streamson! 👨‍💻

通过占位符分块流式传输 JSON,是一种颇具趣味的技术。它能显著提升 Web 应用的“感知性能”,尤其适用于处理大型数据集或动态生成的数据。通过让服务器在数据就绪时立即发送分块,客户端得以更早开始渲染页面,从而带来更优的用户体验。

只需同时掌控服务端与客户端,约 200 行 JavaScript 代码 即可实现。

现已将这套代码封装为 NPM 包,名为 Streamson

安装命令如下:

npm install streamson

在服务端的使用方式:

import { serve } from "streamson";
import express from "express";

const app = express();
const port = 5009;

app.get("/data", async (req, res) => {
  const myData = {
    title: "My Blog",
    description: "A simple blog example using Streamson",
    posts: getBlogPosts(), // 返回一个 Promise
  };
  serve(res, myData);
});

app.listen(port, () => {
  console.log(`示例应用已启动,监听端口 ${port}`);
});

客户端部分仅需约 1KB 的 JavaScript,可从以下地址获取:https://unpkg.com/streamson@latest/dist/streamson.min.js

引入后会得到一个全局函数 Streamson,使用方式如下:

const request = Streamson("/data");

const data = await request.get();
console.log(data.title); // "My Blog"

const posts = await request.get("posts");
console.log(posts); // 博客文章数组

本文内容仅供个人学习、研究或参考使用,不构成任何形式的决策建议、专业指导或法律依据。未经授权,禁止任何单位或个人以商业售卖、虚假宣传、侵权传播等非学习研究目的使用本文内容。如需分享或转载,请保留原文来源信息,不得篡改、删减内容或侵犯相关权益。感谢您的理解与支持!

链接: https://shenqiku.cn/article/FLY_13463