当前 eKuiper 支持在 Create Stream 的时候指定数据结构类型等。然而该方式有几个问题:

  • 额外性能消耗。当前的 schema 没有与数据原本的格式 schema 关联,因此在数据解码之后,需要再额外进行一次 validation/转换;而且该过程基于反射动态完成,性能较差。例如,使用 protobuf 等强schema时,经 protobuf 解码之后的数据应当已经符合格式,不应再进行转换。

  • Schema 定义繁琐。同样无法利用数据本身格式的 schema,而是需要额外配置。

  • Schemaless 时,没有 schema 信息提供给 Flow editor.

本 PIP 旨在梳理 Schema 定义方式,在兼容原有配置的基础上,支持高效而且便捷的 Schema 配置方式,并力求提供 Schema API,方便 UI 使用。


Schema 包含两个部分,一个是数据编解码的 Schema,强类型的编解码方式都有自带 Schema,例如 Protobuf,Avro 等;另一种是 eKuiper source 层的 Schema。

整体上,我们将支持3种递进的 Schema 方式:

  1. Schemaless,用户无需定义任何形式的 schema,主要用于弱结构化数据流,数据结构经常变化的情况。目前运行时已支持。可添加 test schema API,完成运行时 Schema 推断,自动生成 schema 定义。使用该方式,Flow Editor 将无法做 schema 推断。

  2. Schema hint,用户在 eKuiper source 层定义 schema。多用于弱类型的编码方式,例如最常用的 JSON。用户的数据有固定或大致固定的格式,通过 source 定义出来。与当前实现的区别主要是运行时不再做数据的验证转换。该 schema 定义可用于 SQL validation;Flow Editor 字段选择框。

  3. Dynamic schema,用户使用强类型编解码方式,并通过 schemaId 提供其 Schema。例如 Protobuf 方式下,动态解析 pb 文件。该方法较为灵活但无法完全发挥强类型的性能优势。需要支持自动 merge schema,供 Flow Editor 使用。

  4. Static schema,用户将Schema 生成静态代码,作为插件添加到系统中。该方法性能最好但较为繁琐,适合性能要求高的场景。需要用户自行编写实现了编解码接口和 schema descriptor 接口的代码。针对特定的格式,我们可提供自动代码生成和编译打包工具/API。

Source 如何定义数据结构

当前,创建流或者表时,有3个部分与 Schema 相关:

create stream protoDemo (
  id int, 
  name string
  ) WITH (FORMAT="protobuf", DATASOURCE="protoDemo",SCHEMAID="helloworld.HelloReply"
  1. 流的数据结构定义,即()部分。该部分为可选,若放空则为 schemaless

  2. FORMAT: 即 serilization format,定义数据的解码方式。当前支持 JSON,BINARY,PROTOBUF。这个格式实际上跟 schema 是相对独立,不一定要强绑定。但是,有些格式例如 protobuf,本身是需要有个 schema的,即数据结构描述。

  3. SCHEMAID: 即数据本身的结构描述,区别于 1 流的结构描述。理想情况下,若用户使用强类型带 schema 的解码方式如 protobuf,则数据的 schema 应当能自动应用到流中,而无需用户再定义。


Stream schema

定义方法保持不变。实现上,schema 主要作为元数据使用,可辅助实现 SQL 语法验证,Flow editor 中的节点数据结构推断从而可在节点属性中做字段选择。数据校验为可选且建议关闭,因为校验性能损耗大。

仍然支持不定义 schema;若后续定义了 schemaId,则stream schema 可自动推断。



  1. 添加更多解码类型,如 custom (自定义),delemited,avro

  2. 每种解码类型均支持静态方式。用户需实现编解码接口,并编译为 go 插件 so,提前通过 schema registry 注册。

Serilization Format

Schema Keys

required, provided by the extended code

optional, provided by the extended code

delimited (TBD)

avro (TBD)



Schema Registry

添加功能,允许注册静态 schema。即添加可选的 soFile 属性,用于指定静态解析代码。若存在 soFile,则使用插件系统载入 so 文件,进行解析;否则动态解析 proto 文件。

POST http://{{host}}/schemas/protobuf
Content-Type: application/json

  "name": "schema2",
  "file": "file:///C:/repos/go/src/",
  "soFile": "file:///C:/repos/go/src/"


Use static protobuf

已有文件 helloworld.proto

syntax = "proto3";

// The request message containing the user's name.
message HelloRequest {
  string name = 1;

// The response message containing the greetings
message HelloReply {
  string message = 1;


  1. Generate go code for the pb file, check <> for detail.

    protoc --go_opt=Mhelloworld.proto=com.main --go_out=. helloworld.proto
  2. Move the generated file helloworld.pb.go into the go project test. Rename the package to main.

  3. Create the wrapper file. For each message in the proto, implement 3 functions: Encode, Decode, GetXXX. Example:

    package main
    func (x *HelloRequest) Encode(d interface{}) ([]byte, error) {
      switch r := d.(type) {
      case map[string]interface{}:
          t, ok := r["name"]
          if ok {
              if v, ok := t.(string); ok {
                  x.Name = v
              } else {
                  return nil, fmt.Errorf("name is not string")
          } else {
              // if required, return error
              fmt.Println("message is not found")
          return proto.Marshal(x)
          return nil, fmt.Errorf("unsupported type %v, must be a map", d)
    func (x *HelloRequest) Decode(b []byte) (interface{}, error) {
      err := proto.Unmarshal(b, x)
      if err != nil {
      return nil, err
      result := make(map[string]interface{}, 1)
      result["name"] = x.Name
      return result, nil
    func GetHelloRequest() interface{} {
      return &HelloRequest{}
  4. Build the project into a plugin so file. go build --buildmode=plugin -o ..

  5. Create schema with the .so file.

    POST http://{{host}}/schemas/protobuf
    Content-Type: application/json
      "name": "helloworld",
      "file": "file:///tmp/helloworld.proto",
      "soFile": "file:///tmp/"

  6. Create the stream to use the static schema.

    create stream protoDemo () WITH (FORMAT="protobuf", DATASOURCE="protoDemo",SCHEMAID="helloworld.HelloReply"

Use custom format

若数据的编码类型不在已支持类型之中,用户可实现编解码接口,并注册为 schema,调用时使用 custom 格式即可。

  1. 实现编解码。实现接口方法 Encode,Decode,GetXXX

  2. 实现数据结构描述接口(可选)。

  3. 按照插件编译为 so

  4. 注册到 schema registry

    POST http://{{host}}/schemas/custom
    Content-Type: application/json
      "name": "custom1",
      "soFile": "file:///tmp/"
  5. 在source中使用。

    create stream customDemo () WITH (FORMAT="custom", DATASOURCE="protoDemo",SCHEMAID="custom1.Message"

Sink static schema

与 source 类似,需要支持高性能场景下 sink 的 static schema 插件方式。与 source 不同的是,在 sink 中,我们主要关心编码方法,不关心结构描述。使用方法同样是配置 formatschemaId

POST http://{{host}}/rules
Content-Type: application/json

  "id": "rule1",
  "sql": "SELECT * FROM demo",
  "actions": [{
    "mqtt": {
      "server": "tcp://syno.home:1883",
      "topic": "result/protobuf",
      "format": "custom",
      "schemaId": "custom1.Message",
      "sendSingle": true

