Code Monkey home page Code Monkey logo

flink-pb's Introduction

Flink pb serialization format

Environment

  1. java8
  2. maven
  3. proto installed in /usr/local/bin/protoc. test successfully with the version >= 3.12.2.
  4. test successfully with protobuf-java>=3.12.2

Run

Run mvn clean install

Run a complete example: org.apache.flink.pb.starter.Main

Connector params

  • pb.ignore-parse-errors: default is false. Deserialization task will keep running if pb parse error occurs.

  • pb.ignore-default-values: default is false. In deserialization process, if user ignore the default value, the nullability of row field value only depends on if this pb field exists in protobuf stream. When this setting is false, the row field value is always a non-null value mixing default value.

    For example of proto2 syntax:

      message SimpleTest {
          optional int32 a = 1 [default=10];
          optional int64 b = 2 [default=100];
      }
    

    pb.ignore-default-values=true

    SimpleTest.newBuilder().setA(88).build(), it will generate row(88, null)

    pb.ignore-default-values=false

    SimpleTest.newBuilder().setA(88).build(), it will generate row(88, 100) while 100 is the default value of b

    If the message class is proto3 syntax, pb.ignore-default-values will always be false.

  • pb.message-class-name: The full java class name of proto class.

Notice

  • The data type in table schema must be exactly match the pb java type. Below is the mapping:
Pb JavaType LogicalType
STRING VARCHAR or CHAR
ENUM VARCHAR or CHAR
BOOLEAN BOOLEAN
BYTE_STRING BINARY
INT INT
LONG BIGINT
FLOAT FLOAT
DOUBLE DOUBLE
MESSAGE ROW
REPEATED ARRAY
MAP Map
  • Pb table factory recognize corresponding field mapping by field name. So user can create table freely with subset of the pb fields.

  • If the output pb format has one-of field, the value is determined by the biggest position among the candidate non-null elements in result row.

  • In proto3 format, default protobuf serializer will not set field value if the value is equals to pb's default value of each type. For example, int -> 0, long -> 0L, string -> "". But this serializer will output all the non-null values to pb bytes regardless of whether the value is equals to pb's default value.

  • This serializer will not sort map keys when serialize the map type to bytes. So the output byte array of two equal map may differ because of the key order issue, but it will not affect pb consumers in normal case.

Null Value

In serialization process, flink row may contain null value in row/map/array. But protobuf treat null value differently, we should take care of it.

If flink row contains null element, this serializer will not write this field in the protobuf stream. If downstream user read this stream:

  1. With proto2 class, user can call proto.hasXXX() method to know if this field exists in the stream.

  2. With proto3 class, proto.hasXXX() is deprecated, so there is no way to know whether this field does not exist or is coincidentally set with a default value.

The default value of each type is defined below:

pb type default value
boolean false
int 0
long 0L
float 0.0f
double 0.0
string ""
enum first enum value
binary ByteString.EMPTY
list empty list
map empty map
message message class's default instance

Exmaple null conversion:

row value pb value
map<string,string>(<"a", null>) map<string,string>(("a", ""))
map<string,string>(<null, "a">) map<string,string>(("", "a"))
map<int, int>(null, 1) map<int, int>(0, 1)
map<int, int>(1, null) map<int, int>(1, 0)
map<long, long>(null, 1) map<long, long>(0, 1)
map<long, long>(1, null) map<long, long>(1, 0)
map<bool, bool>(null, true) map<bool, bool>(false, true)
map<bool, bool>(true, null) map<bool, bool>(true, false)
map<string, float>("key", null) map<string, float>("key", 0)
map<string, double>("key", null) map<string, double>("key", 0)
map<string, enum>("key", null) map<string, enum>("key", )
map<string, binary>("key", null) map<string, binary>("key", ByteString.EMPTY)
map<string, MESSAGE>("key", null) map<string, MESSAGE>("key", MESSAGE.getDefaultInstance())
array(null) array("")
array(null) array(0)
array(null) array(0)
array(null) array(false)
array(null) array(0)
array(null) array(0)
array(null) array()
array(null) array(ByteString.EMPTY)
array(null) array(MESSAGE.getDefaultInstance())

flink-pb's People

Contributors

maosuhan avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.