Code Monkey home page Code Monkey logo

cassandra-migrator's Introduction

Migrate Cassandra data with Azure Databricks

This sample allows you to migrate data between tables in Apache Cassandra using Spark with Azure Databricks, while preserving the original writetime. This can be useful when doing historic data loads during a live migration.

Setup Azure Databricks

Prerequisites

  • Provision an Azure Databricks cluster. Ensure it also has network access to your source and target Cassandra clusters.

  • Ensure you've already migrated the keyspace/table schema from your source Cassandra database to your target Cassandra database.

Provision a Spark cluster

Select an Azure Databricks runtime version which supports Spark 3.0 or higher.

Databricks runtime

Add Cassandra Migrator Spark dependencies

  • Download the dependency jar here *
  • Upload and install the jar on your Databricks cluster:

Dependency jar

Select Install, and then restart the cluster when installation is complete.

* You can also build the dependency jar using SBT by running ./build.sh in the /build_files directory of this repo.

Note

Make sure that you restart the Databricks cluster after the dependency jar has been installed.

Configure Spark Connector throughput

In order to maximize throughput for large migrations, you may need to change Spark parameters at the cluster level. You can apply these settings in advanced options within cluster config, e.g. below. You may also want to increase the number of workers in your Spark cluster.

spark.cassandra.output.batch.size.rows 1
spark.cassandra.output.concurrent.writes 500
spark.cassandra.concurrent.reads 512
spark.cassandra.output.batch.grouping.buffer.size 1000
spark.cassandra.connection.keep_alive_ms 600000000

Config

Migrate Cassandra tables

Create a new Scala notebook in Databricks with two seperate cells:

Read Cassandra source table

In this case, we are migrating from a source cluster which does not implement SSL, to a target table which does. You can adjust sslOptions for your source/target tables accordingly.

import org.apache.spark.sql._

val spark = SparkSession
      .builder()
      .appName("cassandra-migrator")
      .config("spark.task.maxFailures", "1024")
      .config("spark.stage.maxConsecutiveAttempts", "60") 
      .getOrCreate

import com.cassandra.migrator.readers.Cassandra
import com.cassandra.migrator.config._
import com.datastax.spark.connector.cql.CassandraConnector;

val cassandraSource = new SourceSettings.Cassandra(
  host = "<source Cassandra host name/IP here>",
  port = 9042,
  localDC = None,
  credentials = Some(Credentials(
    username="<username here>", 
    password="<password here>")
  ),
  sslOptions = Some(SSLOptions(
    clientAuthEnabled=false,
    enabled=false,
    trustStorePassword = None,
    trustStorePath = None,
    trustStoreType = None,
    keyStorePassword = None,
    keyStorePath = None,
    keyStoreType = None,
    enabledAlgorithms = None,
    protocol = Some("TLS")
  )),  
  keyspace = "<source keyspace name>",
  table = "<source table name>",
  splitCount = Some(1), // Number of splits to use - this should be at minimum the amount of cores available in the Spark cluster, and optimally more; higher splits will lead to more fine-grained resumes. Aim for 8 * (Spark cores).
  connections = Some(1), // Number of connections to use to Cassandra when copying
  fetchSize = 1000, // Number of rows to fetch in each read
  preserveTimestamps = true, // Preserve TTLs and WRITETIMEs of cells in the source database. Note that this option is *incompatible* when copying tables with collections (lists, maps, sets).
  where = None // Optional condition to filter source table data that will be migrated, e.g. where: race_start_date = '2015-05-27' AND race_end_date = '2015-05-27'
)

val sourceDF = Cassandra.readDataframe(
  spark,
  cassandraSource,
  cassandraSource.preserveTimestamps,
  tokenRangesToSkip = Set()
)
sourceDF.dataFrame.printSchema()

Migrate to Cassandra target table

import com.cassandra.migrator.writers

implicit val spark = SparkSession
      .builder()
      .appName("cassandra-migrator")
      .config("spark.task.maxFailures", "1024")
      .config("spark.stage.maxConsecutiveAttempts", "60")
      .getOrCreate

val target = new TargetSettings.Cassandra(
  host = "<target Cassandra host name/IP>",
  port = 9042,
  localDC = None,
  credentials = Some(com.cassandra.migrator.config.Credentials(
    username="<username here>", 
    password="<password here>")
  ),
  sslOptions = Some(SSLOptions(
    clientAuthEnabled=false,
    enabled=true,
    trustStorePassword = None,
    trustStorePath = None,
    trustStoreType = None,
    keyStorePassword = None,
    keyStorePath = None,
    keyStoreType = None,
    enabledAlgorithms = Some(Set("TLS_RSA_WITH_AES_128_CBC_SHA","TLS_RSA_WITH_AES_256_CBC_SHA")),
    protocol = Some("TLS")
  )),   
  keyspace = "<target keyspace name>",
  table = "<target table name>",
   connections = Some(1),
  stripTrailingZerosForDecimals = false
)

writers.Cassandra.writeDataframe(
            target,
            List(),
            sourceDF.dataFrame,
            sourceDF.timestampColumns
)

Validate Migration

To validate the migration using row comparison, create a third cell with the following and adjust the parameters to preferred tolerance:

import com.cassandra.migrator.Validator
import com.cassandra.migrator.config._

val spark = SparkSession
      .builder()
      .appName("cassandra-migrator")
      .config("spark.task.maxFailures", "1024")
      .config("spark.stage.maxConsecutiveAttempts", "60") 
      .getOrCreate

val validatorConfig = new Validation(
  compareTimestamps = true,
  ttlToleranceMillis = 1,
  writetimeToleranceMillis = 1,
  failuresToFetch = 10,
  floatingPointTolerance = 1.0
)

val migratorConfig = new MigratorConfig(
  cassandraSource,
  target,
  List(),
  savepoints = null,
  skipTokenRanges = Set(),
  validatorConfig
)

Validator.runValidation(migratorConfig)(spark)

If rows do not match, this will return something like the following output:

Validation output

Retry transient failures

The row comparison in Validation may return an error for missing rows in the target table, for example see below:

Missing records

This may indicate that a transient failure occured during the overall migration process. If this happens, you can use the Validator to extract the missing records, and re-run the migration inserting only those records, as long as you can specify the primary key for filtering.

Add the below sample cell after your existing cells in the same notebook. This will construct the values required in the "where" parameter of SourceSettings resulting from the row comparison, and will then write only those filtered records to the target table. Be sure to change the value of primaryKey to be the name of your primary key field, as well as replacing the credentials and source keyspace/table:

implicit val spark = SparkSession
      .builder()
      .appName("cassandra-migrator")
      .config("spark.task.maxFailures", "1024")
      .config("spark.stage.maxConsecutiveAttempts", "60")
      .getOrCreate

//construct Cassandra IN clause to filter only missing rows - ***CHANGE primaryKey
val primaryKey = "<primary key of source/target table>"
val failures = Validator.runValidation(migratorConfig)(spark)
val whereValues = failures
  .map(failure => s"'${failure.row.getString(primaryKey)}'")
  .mkString(s"$primaryKey IN (", ",", ")")

//re-set cassandraSource
var cassandraSource = new SourceSettings.Cassandra(
  host = "<source Cassandra host name/IP here>",
  port = 9042,
  localDC = None,
  credentials = Some(Credentials(
    username="<username here>", 
    password="<password here>")
  ),
  sslOptions = Some(SSLOptions(
    clientAuthEnabled=false,
    enabled=true,
    trustStorePassword = None,
    trustStorePath = None,
    trustStoreType = None,
    keyStorePassword = None,
    keyStorePath = None,
    keyStoreType = None,
    enabledAlgorithms = None,
    protocol = Some("TLS")
  )),  
  keyspace = "<source keyspace name>",
  table = "<source table name>",
  splitCount = Some(1),
  connections = Some(1), 
  fetchSize = 1000, 
  preserveTimestamps = true,
  //specifying where values extracted from validation above to filter only missing records for migration
  where = Some(whereValues)
)

val sourceDF = Cassandra.readDataframe(
  spark,
  cassandraSource,
  cassandraSource.preserveTimestamps,
  tokenRangesToSkip = Set()
)
// re-use exiting target config to re-migrate only failed records
writers.Cassandra.writeDataframe(
            target,
            List(),
            sourceDF.dataFrame,
            sourceDF.timestampColumns
)

SSLOptions Parameters

Parameter Description Default value
enabled Enable secure connection to Cassandra cluster false
trustStorePath Path for the trust store being used None
trustStorePassword Trust store password None
trustStoreType Trust store type JKS
protocol SSL protocol TLS
enabledAlgorithms SSL cipher suites Set("TLS_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_256_CBC_SHA")
clientAuthEnabled Enable 2-way secure connection to Cassandra cluster false
keyStorePath Path for the key store being used None
keyStorePassword Key store password None
keyStoreType Key store type JKS

cassandra-migrator's People

Contributors

microsoft-github-operations[bot] avatar microsoftopensource avatar theovankraay avatar wentingwu000 avatar

Stargazers

 avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

cassandra-migrator's Issues

Support adding whole keyspace instead of only individual tables

Please provide us with the following information:

This issue is for a: (mark with an x)

- [ ] bug report -> please search issues before submitting
- [x] feature request
- [ ] documentation issue or request
- [ ] regression (a behavior that used to work and stopped in a new release)

Minimal steps to reproduce

Any log messages given by the failure

Expected/desired behavior

OS and Version?

Windows 7, 8 or 10. Linux (which distribution). macOS (Yosemite? El Capitan? Sierra?)

Versions

Mention any other details that might be useful

Right now code supports only table level, can we get it for whole keyspace


Thanks! We'll be in touch soon.

auto-sync missing rows after validate

Please provide us with the following information:

This issue is for a: (mark with an x)

- [ ] bug report -> please search issues before submitting
- [x] feature request
- [ ] documentation issue or request
- [ ] regression (a behavior that used to work and stopped in a new release)

Minimal steps to reproduce

Any log messages given by the failure

Expected/desired behavior

OS and Version?

Windows 7, 8 or 10. Linux (which distribution). macOS (Yosemite? El Capitan? Sierra?)

Versions

Mention any other details that might be useful

We want to auto-sync missing rows after validating/comparing source and target


Thanks! We'll be in touch soon.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.