Hello all I am new to this , I have read a Stream and then I want to select some fields and push that strucutred Stream to ScyllaDb please tell me where I am making a mistake.
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{DataTypes, IntegerType, StringType, StructType}
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{SQLContext, SparkSession}
import org.apache.spark.sql.execution.streaming.Sink
import org.apache.spark.sql.sources.StreamSinkProvider
import org.apache.spark.sql.{functions => f, DataFrame}
import org.apache.spark.sql.cassandra._
object StreamProcessor {
def main(args: Array[String]): Unit = {
new StreamProcessor().process()
}
}
class StreamProcessor {
def process(): Unit = {
val spark = SparkSession.builder()
.config("spark.eventLog.enabled", "false") // 3 lines included later
.config("spark.driver.memory", "2g")
.config("spark.executor.memory", "2g")
.config("spark.sql.catalog.cass100", "com.datastax.spark.connector.datasource.CassandraCatalog")
.config("spark.sql.catalog.cass100.spark.cassandra.connection.host", "192.168.144.23")
.appName("kafka-tutorials")
.master("local[*]")
.getOrCreate()
spark.sparkContext.setLogLevel("DEBUG")
// spark.conf.set("spark.sql.catalog.cass100", "com.datastax.spark.connector.datasource.CassandraCatalog")
// spark.conf.set("spark.sql.catalog.cass100.spark.cassandra.connection.host", "192.168.144.24")
val jsonSchema = new StructType()
.add("@type", DataTypes.StringType).add("1", DataTypes.IntegerType)
.add("2", DataTypes.IntegerType)
.add("8", DataTypes.StringType).add("12", DataTypes.StringType)
.add("7", DataTypes.StringType).add("11", DataTypes.StringType)
.add("4", DataTypes.StringType).add("6", DataTypes.StringType)
.add("27", DataTypes.StringType).add("28", DataTypes.StringType)
.add("152", DataTypes.StringType).add("153", DataTypes.StringType)
.add("0", DataTypes.StringType).add("subTemplateMultiList",DataTypes.StringType)
import spark.implicits._
val outputTopic = "test"
val inputTopic= "ipfix320"
val brokers= "192.168.144.93:9092"
val inputDf = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("subscribe", inputTopic)
.load()
.selectExpr("CAST(value AS STRING)")
.select(from_json($"value", jsonSchema) as "value") //.where($"value.1">2000 && $"value.2">12)
.select("value.*")
.withColumn("timestamp", lit(current_timestamp()))
val newDf= inputDf.columns.foldLeft(inputDf)((curr, n)=>curr.withColumnRenamed(n, "c"+n.toString()))
newDf.printSchema()
val otherDf= newDf.select(col("c12").as("src"),col("c8").as("dest"),col("c2").as("max"),col("c1").as("sum"),col("c152").as("window"))
otherDf.printSchema()
try {
val query = otherDf
.writeStream
.format("ScyllaSinkProvider")
.outputMode(OutputMode.Append)
.options(
Map(
"cluster" -> "Test Cluster",
"keyspace" -> "tls",
"table" -> "agg",
"checkpointLocation" -> "/tmp/checkpoints"
)
)
.start()
}
catch {
case ex:Exception=>println
}
}
}
class ScyllaSinkProvider extends StreamSinkProvider {
println("########################################HELLO############################")
override def createSink(sqlContext: SQLContext,
parameters: Map[String, String],
partitionColumns: Seq[String],
outputMode: OutputMode): ScyllaSink =
new ScyllaSink(parameters)
}
class ScyllaSink(parameters: Map[String, String]) extends Sink {
override def addBatch(batchId: Long, data: DataFrame): Unit =
data.write
.cassandraFormat(parameters("agg"),
parameters("tls"),
parameters("Test Cluster"))
.mode(SaveMode.Append)
.save()
}
And my keyspace is tls, with table agg as following on ScyllaDb :
CREATE TABLE tls.agg (
src text,
dest text,
max float,
sum float,
window text,
PRIMARY KEY (src, dest)
) WITH CLUSTERING ORDER BY (dest ASC)
AND bloom_filter_fp_chance = 0.01
AND caching = {'keys': 'ALL', 'rows_per_partition': 'ALL'}
AND comment = ''
AND compaction = {'class': 'SizeTieredCompactionStrategy'}
AND compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'}
AND crc_check_chance = 1.0
AND dclocal_read_repair_chance = 0.1
AND default_time_to_live = 0
AND gc_grace_seconds = 864000
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
AND read_repair_chance = 0.0
AND speculative_retry = '99.0PERCENTILE';
Please tell me why the code is getting exited without giving any error.