Code Monkey home page Code Monkey logo

flink-adt's People

Contributors

nickburkard avatar oker1 avatar shuttie avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

flink-adt's Issues

TypeMapper is not serializable, cause error when used in Broadcast State

Hi, I am using scala.math.BigDecimal in broadcast state, and it is not working.

I believe it is caused by TypeMapper which is used to handle java=>scala BigDecimal mapping, and broadcast state required the TypeInfo to be serializable.

I think this can be fixed by making TypeMapper serializable, do you think this is the right fix?

State schema evolution question

Hey @shuttie. Thank you for this project!
I'm currently considering to use your library but there is one detail that is important to me and which is not well covered in the README. How well generated serializers handle schema evolution for product types? Is it more like Flink's CaseClassSerializer (no schema evolution support), or like PojoSerializer?
To give you more context - I care only about field removal and addition (no type changes).
Thank you!

Option fields with None values cannot be serialized

The coproduct serializer is used for option fields, and it fails on None values.

A test case to reproduce:

  it should "derive serializer for option field" in {
    val ser = implicitly[TypeInformation[SimpleOption]].createSerializer(null)
    all(ser, SimpleOption(Some("bar")))
    all(ser, SimpleOption(None))
  }

Are Scala Enumeration types supported?

Hi, I'm testing out this library, and it seems that scala Enumeration types aren't supported in this library.

Test code:

class AdtSerializerTest {
  lazy val cluster = new MiniClusterWithClientResource(
    new MiniClusterResourceConfiguration.Builder().setNumberSlotsPerTaskManager(1).setNumberTaskManagers(1).build()
  )

  lazy val env: StreamExecutionEnvironment = {
    cluster.getTestEnvironment.setAsContext()
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
    env.enableCheckpointing(1000)
    env.setRestartStrategy(RestartStrategies.noRestart())
    env.getConfig.disableGenericTypes()
    env
  }

  @BeforeEach
  def beforeEach(): Unit = {
    cluster.before()
  }

  @AfterEach
  def afterEach(): Unit = {
    cluster.after()
  }

  @Test
  def serializerTest(): Unit = {
    val result = env
      .fromScalaCollection(
        List(A("abc", 0.2022f, SomeEnum.BAR), B(-22, SomeCaseClass(-0.2022f, SomeEnum.FOO)))
      ).executeAndCollect(10)

    assertEquals(2, result.size)
  }
}

object AdtSerializerTestAssets {
  object SomeEnum extends Enumeration {
    type SomeEnum = Value

    val FOO: SomeEnum = Value("FOO")
    val BAR: SomeEnum = Value("BAR")
  }

  sealed trait Event extends Product with Serializable
  object Event{
    case class A(someStr: String, someFloat: Float, someEnum: SomeEnum) extends Event
    case class B(someLong: Long, someCaseClass: SomeCaseClass) extends Event

    case class SomeCaseClass(someFloat: Float, someEnum: SomeEnum)

    implicit val eventTypeInfo: TypeInformation[Event] = deriveTypeInformation
  }

  implicit final class FlinkEnvOps(private val env: StreamExecutionEnvironment) extends AnyVal {
    import scala.collection.JavaConverters._

    def fromScalaCollection[A](data: Seq[A])(implicit typeInformation: TypeInformation[A]): DataStreamSource[A] =
      env.fromCollection(data.asJava, typeInformation)
  }
}

Output:

[Error].../AdtSerializerTest.scala:49: could not find implicit value for parameter typeInformation: org.apache.flink.api.common.typeinfo.TypeInformation[...AdtSerializerTestAssets.Event with Serializable]
[Error]...misc/AdtSerializerTest.scala:71: magnolia: could not find TypeInformation.Typeclass for type ....AdtSerializerTestAssets.SomeEnum.SomeEnum
    in parameter 'someEnum' of product type....AdtSerializerTestAssets.Event.A
    in coproduct type ...AdtSerializerTestAssets.Event

Thank you.

higherKinds warning

If we build with scalacOptions ++= Seq("-feature", "-deprecation"), then there is a warning:

higher-kinded type should be enabled
by making the implicit value scala.language.higherKinds visible.

The warning is present due to magnolia macro dumping F[_] stuff into the caller context.

release new version

I've ran into the serialization of serializers issue that's fixed in master, can you cut a release of it?

Refined type support

We are using some refined types in our ADTs with the lib, and made some instances for them in the project. Would you be interested in supporting refined in your library?

Serializing circe.Json in Flink

I have problems using Flink-ADT to serialize json in circe.Json format.

Approach 1

  implicit val jsonInfo: TypeInformation[Json] =
    deriveTypeInformation[Json]

Here is the error message:

magnolia: child class BiggerDecimalJsonNumber of class JsonNumber is neither final nor a case class

Approach 2

  implicit val jsonInfo: TypeInformation[Json] =
    TypeInformation.of(classOf[Json])

It compiles but crashes when I run unit test.

Approach 3

Based on the MappedTypeInfoTest.scala unit test. I created my own version

object CirceMappedTypeInfoTest {
    class WrappedMapper extends TypeMapper[Json, String] {
    override def map(json: Json): String = json.noSpaces

    override def contramap(jsonString: String): Json = {
      val res = parse(jsonString)
      res match {
        case Left(value) => Json.Null
        case Right(value) => value
      }
    }
  }
  implicit val mapper: TypeMapper[Json, String] = new WrappedMapper()
}

class CirceMappedTypeInfoTest extends AnyFlatSpec with Matchers with TestUtils {
  import CirceMappedTypeInfoTest._
  it should "derive TI for non-serializeable classes" in {

    drop(implicitly[TypeInformation[Json]])
    val ti = implicitly[TypeInformation[Json]]
    val ser = ti.createSerializer(null)
    assert(ser != null)
}

And it failed with this error message:

[info] - should derive TI for non-serializeable classes *** FAILED ***
[info]   java.lang.NullPointerException:
[info]   at org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton.equals(TypeSerializerSingleton.java:43)
[info]   at io.findify.flinkadt.api.serializer.MappedSerializer.equals(MappedSerializer.scala:17)
[info]   at io.findify.flinkadt.CirceMappedTypeInfoTest.$anonfun$new$1(CirceMappedTypeInfoTest.scala:23)
[info]   at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
[info]   at org.scalatest.flatspec.AnyFlatSpecLike$$anon$5.apply(AnyFlatSpecLike.scala:1812)
[info]   at org.scalatest.TestSuite.withFixture(TestSuite.scala:196)

Question

I would think that circe.Json format is very common among flink-adt users.

Am I just using flink-adt badly with circe.Json, or is this a hard class to serialize?

error serializing generic case class

The previous custom implementation (0.3.x) can serialize this both with and without ClassTag.
When upgrading from 0.3.x I've encountered a weird issue where flink uses a serializer with a different type parameter than what would be appropriate for a part of the pipeline. I've tried specifying the TypeInfos explicitly for every step of the pipeline, but it stil used a wrong one. I've tried fixing it by adding the ClassTag, and now it uses the correct seriliazer, but it seems like the implicit parameter breaks calling the constructor during deserialization.

A repro case:

  case class Generic[T: ClassTag](a: T, b: ADT)
  
  it should "serialize generic case class" in {
    val ser = implicitly[TypeInformation[Generic[SimpleOption]]].createSerializer(null)
    all(ser, Generic(SimpleOption(None), Bar(0)))
  }

Consider replacing Flink Scala dependencies with Java equivalents

Flink 1.15 will enable users to load their own Scala versions, rather than forcing 2.11 or 2.12 at the cluster level. This opens up the opportunity for developers to use Scala 2.13 and even Scala 3.

As a first step, flink-scala and flink-streaming-scala libraries would need to be replaced with their Java equivalents. This will likely require porting over some Scala-specific classes, such as common serializers for Option / Either / etc..

I am currently working on trying this out on my own local fork, if there's interest in supporting newer versions of Scala I can submit a PR once it's done.

Runtime error when using a List in a case class

I'm trying to use List in my model, however when I do that I have runtime exception during my tests. It seems like I end up with a broken model, with a List that throws whatever I do with it. I'm getting a java.util.NoSuchElementException: head of empty list exception from inside the Scala standard lib, so it's neither empty nor containing valid values.

I created a repro as a new test in this branch: https://github.com/erwan/flink-adt/tree/list-minicluster

Is there anything I should do differently here? Or maybe that would be a bug in flink-adt?

Part of the classes is compiled with Java 8 and the rest with Java 11

If we take the latest flink-adt_2.12-0.6.1.jar and look inside:

file io/findify/flinkadt/api/serializer/ScalaCaseClassSerializerSnapshot.class
io/findify/flinkadt/api/serializer/ScalaCaseClassSerializerSnapshot.class: compiled Java class data, version 55.0 (Java SE 11)
file io/findify/flinkadt/api/typeinfo/CaseClassComparator.class
io/findify/flinkadt/api/typeinfo/CaseClassComparator.class: compiled Java class data, version 52.0 (Java 1.8)

Any chance we could compile it all with Java 8?

Compiler crashes when deriving TypeInformation for case class with @DataTypeHint

Scala version: 3.1.3

The compiler crashes with Error: Unexpected error when compiling project_ea109e5296_ea109e5296-b3409dd64e: 'class Class' when trying to derive an instance of TypeInformation[?] for a case class that uses org.apache.flink.table.annotation.DataTypeHint

I minimized the issue to the following, runnable with scala-cli

//> using scala "3.1.3"
//> using lib "org.apache.flink:flink-table-runtime:1.15.2"
//> using lib "io.findify::flink-adt:0.6.1"

package com.stuff

import io.findify.flinkadt.api.*
import org.apache.flink.table.annotation.DataTypeHint
import org.apache.flink.api.common.typeinfo.TypeInformation


case class Test(@DataTypeHint("BIGINT") timestamp: Long)

@main def run() =
  summon[TypeInformation[Test]]

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.