Code Monkey home page Code Monkey logo

reactivemongo-streaming's Introduction

ReactiveMongo Streaming

Modules:

Build manually

The Akka Streams extension for ReactiveMongo can be built from this source repository.

sbt publish-local

To run the tests, use:

sbt test

CircleCI Test coverage

reactivemongo-streaming's People

Contributors

cchantep avatar ornicar avatar pjfanning avatar scala-steward avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar

reactivemongo-streaming's Issues

the scala 3 build pulls akka-stream_2.13 and akka-actor_2.13

Making it incompatible with ReactiveMongo, which correctly pulls akka-actor_3.

sbt
++ 3.1.3-RC2
dependencyTree

streaming> ++ 3.1.3-RC2
[info] Setting Scala version to 3.1.3-RC2 on 3 projects.
[info] Reapplying settings...
[info] set current project to streaming (in build file:/home/thib/ReactiveMongo-Streaming/)
streaming> dependencyTree
[info] org.reactivemongo:reactivemongo-akkastream_3:1.1.0-RC3-SNAPSHOT
[info]   +-com.typesafe.akka:akka-stream_2.13:2.6.18 [S]
[info]   | +-com.typesafe.akka:akka-actor_2.13:2.6.18 [S]
[info]   | | +-com.typesafe:config:1.4.0
[info]   | | +-org.scala-lang.modules:scala-java8-compat_2.13:1.0.0 [S]
[info]   | |
[info]   | +-com.typesafe.akka:akka-protobuf-v3_2.13:2.6.18

[...]

pekko support?

Would there be any interest in producing a pekko-stream module?

Apache Pekko 1.0.0 has just been released. It is an OSS fork of Akka.

Akka-stream: source are closed on tailable cursor when using ResponseStage

ReactiveMongo Version

0.12.0

MongoDB version

3.2.10

Operating System

MacOS 10.11

JDK

Oracle 1.8.0_72

Library Dependencies

reactivemongo-akkastream % 0.12.0

Expected Behavior

When consuming a capped collection with the tailable option and the method bulkSource, the source should not be closed.

Actual Behavior

The source created using bulkSource is closed when there is no more documents to consume. It works fine when using documentSource

Reproducible Test Case

val sourceBulk: Source[Iterator[String], _] = collection.find(query)
  .options(QueryOpts().tailable)
  .cursor[String]()
  .bulkSource()
// ko: sourceBulk is closed when there is no more documents to consume

val sourceDocs: Source[String, _] = collection.find(query)
  .options(QueryOpts().tailable)
  .cursor[String]()
  .documentSource()
// ok: sourceDocs is not closed

There may lack a check to see if the cursor is tailable before completing the stage here ?

Keep sending queries while the initial query result of a tailable cursor is empty.

ReactiveMongo Version (0.12.6)

MongoDB version (3.4.7)

Operating System (Windows 10)

JDK (Oracle 1.8.0_111)

java version "1.8.0_111"
Java(TM) SE Runtime Environment (build 1.8.0_111-b14)
Java HotSpot(TM) 64-Bit Server VM (build 25.111-b14, mixed mode)

Expected Behavior

I want to tail mongodb's oplog using akka stream:

implicit val system = akka.actor.ActorSystem("reactivemongo-akkastream")
implicit val materializer = akka.stream.ActorMaterializer.create(system)
oplogColFuture.map{ oplogCol =>
  val source =
    oplogCol
      .find(BSONDocument(
        "ns" -> BSONDocument("$in" -> Set(s"play-community.common-doc")),
        "ts" -> BSONDocument("$gt" -> BSONTimestamp(1503046219, 1))))
      .options(QueryOpts().tailable.awaitData.noCursorTimeout)
      .cursor[BSONDocument]().documentSource()
  source.runForeach{ doc =>
    println("fold " + doc.get("ns").get)
  }
}

It should work normally regardless of the initial query result is empty or not.

Actual Behavior

Before I run the tailing codes, the output of mongostat command is as follows:

insert query update delete getmore command dirty used flushes vsize  res qrw arw net_in net_out conn set repl                time
    *0    *0     *0     *0       0     1|0  0.5% 1.5%       0 1.54G 285M 0|0 0|0   157b   46.3k   17 rs0  PRI Aug 18 17:20:11.172
    *0    *0     *0     *0       0     2|0  0.5% 1.5%       0 1.54G 285M 0|0 0|0   158b   46.3k   17 rs0  PRI Aug 18 17:20:12.172
    *0    *0     *0     *0       0     2|0  0.0% 1.5%       1 1.54G 285M 0|0 0|0   215b   46.7k   17 rs0  PRI Aug 18 17:20:13.172
    *0    *0     *0     *0       0     1|0  0.0% 1.5%       0 1.54G 285M 0|0 0|0   310b   46.3k   17 rs0  PRI Aug 18 17:20:14.173
    *0    *0     *0     *0       0     1|0  0.0% 1.5%       0 1.54G 285M 0|0 0|0   157b   46.2k   17 rs0  PRI Aug 18 17:20:15.174

After I run the tailing codes, if no initial query results returned, the codes will keep sending the query request to mongo server. the output of mongostat command is:

insert query update delete getmore command dirty used flushes vsize  res qrw arw net_in net_out conn set repl                time
    *0    22     *0     *0       0     1|0  0.5% 1.5%       0 1.54G 282M 0|0 0|0  4.18k   47.1k   27 rs0  PRI Aug 18 17:24:21.172
    *0    22     *0     *0       0     2|0  0.5% 1.5%       0 1.54G 282M 0|0 0|0  4.01k   47.1k   27 rs0  PRI Aug 18 17:24:22.172
    *0    21     *0     *0       0     2|0  0.5% 1.5%       0 1.54G 282M 0|0 0|0  4.07k   47.5k   27 rs0  PRI Aug 18 17:24:23.172
    *0    22     *0     *0       0     1|0  0.5% 1.5%       0 1.54G 282M 0|0 0|0  4.18k   47.1k   27 rs0  PRI Aug 18 17:24:24.173
    *0    22     *0     *0       0     2|0  0.5% 1.5%       0 1.54G 282M 0|0 0|0  4.01k   47.1k   27 rs0  PRI Aug 18 17:24:25.172
    *0    22     *0     *0       0     1|0  0.5% 1.5%       0 1.54G 282M 0|0 0|0  4.17k   46.9k   27 rs0  PRI Aug 18 17:24:26.176
    *0    23     *0     *0       0     2|0  0.5% 1.5%       0 1.54G 282M 0|0 0|0  4.20k   47.3k   27 rs0  PRI Aug 18 17:24:27.172
    *0    22     *0     *0       0     1|0  0.5% 1.5%       0 1.54G 282M 0|0 0|0  4.18k   47.1k   27 rs0  PRI Aug 18 17:24:28.172

Reproducible Test Case

  1. Install MongoDB 3.4.7
  2. Modify /etc/mongod.conf
replication:
  replSetName: rs0
  1. Restart MongoDB and login to mongo shell
rs.initiate(
   {
      _id: "rs0",
      version: 1,
      members: [
         { _id: 0, host : "127.0.0.1:27017" }
      ]
   }
)
  1. Run the following codes:
import reactivemongo.akkastream.cursorProducer
import reactivemongo.api._
import reactivemongo.api.collections.bson.BSONCollection
import reactivemongo.bson.{BSONDocument, BSONTimestamp, _}
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global 

object Test extends App {
  val mongoUri = "mongodb://127.0.0.1:27017/admin?authMode=scram-sha1"
  val driver = MongoDriver()
  val parsedUri = MongoConnection.parseURI(mongoUri)
  val connection = parsedUri.map(driver.connection(_))
  val futureConnection = Future.fromTry(connection)
  def db: Future[DefaultDB] = futureConnection.flatMap(_.database("local"))
  def oplogColFuture: Future[BSONCollection] = db.map(_.collection("oplog.rs"))

  implicit val system = akka.actor.ActorSystem("reactivemongo-akkastream")
  implicit val materializer = akka.stream.ActorMaterializer.create(system)
  
  oplogColFuture.map{ oplogCol =>
    val source =
      oplogCol
        .find(BSONDocument(
          "ns" -> BSONDocument("$in" -> Set(s"play-community.common-doc")),
          "ts" -> BSONDocument("$gt" -> BSONTimestamp(1503046219, 1))))
        .options(QueryOpts().tailable.awaitData.noCursorTimeout)
        .cursor[BSONDocument]().documentSource()
    source.runForeach{ doc =>
      println("fold " + doc.get("ns").get)
    }
  }

  Thread.sleep(10000000)
}

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.