Code Monkey home page Code Monkey logo

airbnb-spark-thrift's Introduction

Spark Thrift Loader

Build Status

A library for loadling Thrift data into Spark SQL.

Features

It supports conversions from Thrift records to Spark SQL, making Thrift a first-class citizen in Spark. It automatically derives Spark SQL schema from Thrift struct and convert Thrift object to Spark Row in runtime. Any nested-structs are all support except Map key field needs to be primitive.

It is especially useful when running spark streaming job to consume thrift events from different streaming sources.

Supported types for Thrift -> Spark SQL conversion

This library supports reading following types. It uses the following mapping from convert Thrift types to Spark SQL types:

Thrift Type Spark SQL type
bool BooleanType
i16 ShortType
i32 IntegerType
i64 LongType
double DoubleType
binary StringType
string StringType
enum String
list ArrayType
set ArrayType
map MapType
struct StructType

Examples

Convert Thrift Schema to StructType in Spark

import com.airbnb.spark.thrift.ThriftSchemaConverter

// this will return a StructType for the thrift class
val thriftStructType = ThriftSchemaConverter.convert(ThriftExampleClass.getClass)

Convert Thrift Object to Row in Spark

import com.airbnb.spark.thrift.ThriftSchemaConverter
import com.airbnb.spark.thrift.ThriftParser

// this will return a StructType for the thrift class
val thriftStructType = ThriftSchemaConverter.convert(ThriftExampleClass.getClass)
val row =  ThriftParser.convertObject(
                thriftObject,
                thriftStructType)

Use cases: consume Kafka Streaming, where each event is a thrift object

import com.airbnb.spark.thrift.ThriftSchemaConverter
import com.airbnb.spark.thrift.ThriftParser


 directKafkaStream.foreachRDD(rdd => {
    val schema = ThriftSchemaConverter.convert(ThriftExampleClass.getClass)

     val deserializedEvents = rdd
       .map(_.message)
       .filter(_ != null)
       .flatMap(eventBytes => {
           try Some(MessageSerializer.getInstance().fromBytes(eventBytes))
             .asInstanceOf[Option[Message[_]]]
           catch {
               case e: Exception => {
                   LOG.warn(s"Failed to deserialize  thrift event ${e.toString}")
                   None
               }
           }
       }).map(_.getEvent.asInstanceOf[TBaseType])

       val rows: RDD[Row] = ThriftParser(
           ThriftExampleClass.getClass,
           deserializedEvents,
           schema)

       val df = sqlContext.createDataFrame(rows, schema)

       // Process the dataframe on this micrao batch
    })
 }

How to get started

Clone the project and mvn package to get the artifact.

How to contribute

Please send the PR here and cc @liyintang or @jingweilu1974 for reviewing

airbnb-spark-thrift's People

Contributors

jingweilu1974 avatar liyintang avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

airbnb-spark-thrift's Issues

type mismatch error

Error:(59, 118) type mismatch;
found : Class[?0(in value fieldMeta)] where type ?0(in value fieldMeta) <: org.apache.thrift.TBase[T,F]
required: Class[_ <: org.apache.thrift.TBase[?0(in trait TBase),?1] forSome { type ?0(in trait TBase) <: org.apache.thrift.TBase[?0(in trait TBase),?1]; type ?1 <: org.apache.thrift.TFieldIdEnum }]
val fieldMeta: util.Map[_ <: TFieldIdEnum, FieldMetaData] = FieldMetaData.getStructMetaDataMap(tBaseInstance.getClass)

Error:(26, 100) type mismatch;
found : Class[$1] where type $1 <: org.apache.thrift.TBase[ <: org.apache.thrift.TBase[, ], _ <: org.apache.thrift.TFieldIdEnum]
required: Class[
<: org.apache.thrift.TBase[?0(in trait TBase),?1] forSome { type ?0(in trait TBase) <: org.apache.thrift.TBase[?0(in trait TBase),?1]; type ?1 <: org.apache.thrift.TFieldIdEnum }]
val fieldMeta: util.Map[_ <: TFieldIdEnum, FieldMetaData] = FieldMetaData.getStructMetaDataMap(tbaseClass)

Supporting More Types / Fixing Bugs/ Etc

Hey @liyintang or @jingweilu1974,

Can you please reach out to me? I have done an entire re-write of the code here, but took inspiration from your code.
The version I have fixes a few bugs, supports all Thrift types & recursive structures. It also adds the functionality to convert both ways along with complete unit testing.

It'd be easier to chat offline and figure things out. If things work, I can contribute here. Otherwise I'll create a new repo. Please let me know!

[BUG] ThriftParser.convertObject convert set field default value bug, it will be set java base type Default value.

Bug example:
MusicResource is a thrift class include many java base type field, Assuming that some java base type not set value, but converObject method will set java base default value to the output Row field value.

val oneThriftClassObject = new MusicResource().setId(4413874251774771629L).setDeviceStatus(3)
println("origin thrift object: " + oneThriftClassObject)

val thriftStructType = ThriftSchemaConverter.convert(classOf[MusicResource])
val newRowRDD =
  spark.sparkContext.parallelize(Array(oneThriftClassObject))
    .map(thriftObject => ThriftParser.convertObject(thriftObject, thriftStructType))

spark.createDataFrame(newRowRDD, thriftStructType)
  .rdd
  .mapPartitions(rows => rows.map(row => ConvertRowToThriftDataV2.convertRow(row, classOf[MusicResource])))
  .foreach(x => println("cast after thrift object: " + x))

OutPut Println:

origin thrift object: MusicResource(id:4413874251774771629, deviceStatus:3)

cast after thrift object: MusicResource(id:4413874251774771629, playCount:0, duration:0, metaId:0, createTime:0, op:0, quality:0.0, scale:-1.0, miPlayCount:0, miRank:0.0, cpPlayable:0, isFeatured:0, finishedRatio:0.0, mvStatus:0, deviceStatus:3, globalResourceId:0, vipType:0, qqRank:0.0, qqSongRawRank:0, qqArtistRawRank:0, commentCount:0, isOriginal:0, digitAlbum:0, mvAudioId:0, hasTryPlayUrl:0)

i guess, this bug will be fixed , change the method convertObject through TBase. isSet(TFieldEnum f) judge field Whether or not set value? if not , value will return null. The following

image

if community agree this advice, i would be happy to provide fixed PR.

What do you think? @liyintang @jingweilu1974

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.