Modules:
The Akka Streams extension for ReactiveMongo can be built from this source repository.
sbt publish-local
To run the tests, use:
sbt test
:leaves: Streaming libraries for ReactiveMongo
Home Page: http://reactivemongo.org
License: Apache License 2.0
Modules:
The Akka Streams extension for ReactiveMongo can be built from this source repository.
sbt publish-local
To run the tests, use:
sbt test
Making it incompatible with ReactiveMongo, which correctly pulls akka-actor_3
.
sbt
++ 3.1.3-RC2
dependencyTree
streaming> ++ 3.1.3-RC2
[info] Setting Scala version to 3.1.3-RC2 on 3 projects.
[info] Reapplying settings...
[info] set current project to streaming (in build file:/home/thib/ReactiveMongo-Streaming/)
streaming> dependencyTree
[info] org.reactivemongo:reactivemongo-akkastream_3:1.1.0-RC3-SNAPSHOT
[info] +-com.typesafe.akka:akka-stream_2.13:2.6.18 [S]
[info] | +-com.typesafe.akka:akka-actor_2.13:2.6.18 [S]
[info] | | +-com.typesafe:config:1.4.0
[info] | | +-org.scala-lang.modules:scala-java8-compat_2.13:1.0.0 [S]
[info] | |
[info] | +-com.typesafe.akka:akka-protobuf-v3_2.13:2.6.18
[...]
Would there be any interest in producing a pekko-stream module?
Apache Pekko 1.0.0 has just been released. It is an OSS fork of Akka.
Just a marker so others are aware, combined with a TODO/Feature request.
After akka/akka#25234
0.12.0
3.2.10
MacOS 10.11
Oracle 1.8.0_72
reactivemongo-akkastream % 0.12.0
When consuming a capped collection with the tailable
option and the method bulkSource
, the source should not be closed.
The source created using bulkSource
is closed when there is no more documents to consume. It works fine when using documentSource
val sourceBulk: Source[Iterator[String], _] = collection.find(query)
.options(QueryOpts().tailable)
.cursor[String]()
.bulkSource()
// ko: sourceBulk is closed when there is no more documents to consume
val sourceDocs: Source[String, _] = collection.find(query)
.options(QueryOpts().tailable)
.cursor[String]()
.documentSource()
// ok: sourceDocs is not closed
There may lack a check to see if the cursor is tailable before completing the stage here ?
java version "1.8.0_111"
Java(TM) SE Runtime Environment (build 1.8.0_111-b14)
Java HotSpot(TM) 64-Bit Server VM (build 25.111-b14, mixed mode)
I want to tail mongodb's oplog using akka stream:
implicit val system = akka.actor.ActorSystem("reactivemongo-akkastream")
implicit val materializer = akka.stream.ActorMaterializer.create(system)
oplogColFuture.map{ oplogCol =>
val source =
oplogCol
.find(BSONDocument(
"ns" -> BSONDocument("$in" -> Set(s"play-community.common-doc")),
"ts" -> BSONDocument("$gt" -> BSONTimestamp(1503046219, 1))))
.options(QueryOpts().tailable.awaitData.noCursorTimeout)
.cursor[BSONDocument]().documentSource()
source.runForeach{ doc =>
println("fold " + doc.get("ns").get)
}
}
It should work normally regardless of the initial query result is empty or not.
Before I run the tailing codes, the output of mongostat command is as follows:
insert query update delete getmore command dirty used flushes vsize res qrw arw net_in net_out conn set repl time
*0 *0 *0 *0 0 1|0 0.5% 1.5% 0 1.54G 285M 0|0 0|0 157b 46.3k 17 rs0 PRI Aug 18 17:20:11.172
*0 *0 *0 *0 0 2|0 0.5% 1.5% 0 1.54G 285M 0|0 0|0 158b 46.3k 17 rs0 PRI Aug 18 17:20:12.172
*0 *0 *0 *0 0 2|0 0.0% 1.5% 1 1.54G 285M 0|0 0|0 215b 46.7k 17 rs0 PRI Aug 18 17:20:13.172
*0 *0 *0 *0 0 1|0 0.0% 1.5% 0 1.54G 285M 0|0 0|0 310b 46.3k 17 rs0 PRI Aug 18 17:20:14.173
*0 *0 *0 *0 0 1|0 0.0% 1.5% 0 1.54G 285M 0|0 0|0 157b 46.2k 17 rs0 PRI Aug 18 17:20:15.174
After I run the tailing codes, if no initial query results returned, the codes will keep sending the query request to mongo server. the output of mongostat command is:
insert query update delete getmore command dirty used flushes vsize res qrw arw net_in net_out conn set repl time
*0 22 *0 *0 0 1|0 0.5% 1.5% 0 1.54G 282M 0|0 0|0 4.18k 47.1k 27 rs0 PRI Aug 18 17:24:21.172
*0 22 *0 *0 0 2|0 0.5% 1.5% 0 1.54G 282M 0|0 0|0 4.01k 47.1k 27 rs0 PRI Aug 18 17:24:22.172
*0 21 *0 *0 0 2|0 0.5% 1.5% 0 1.54G 282M 0|0 0|0 4.07k 47.5k 27 rs0 PRI Aug 18 17:24:23.172
*0 22 *0 *0 0 1|0 0.5% 1.5% 0 1.54G 282M 0|0 0|0 4.18k 47.1k 27 rs0 PRI Aug 18 17:24:24.173
*0 22 *0 *0 0 2|0 0.5% 1.5% 0 1.54G 282M 0|0 0|0 4.01k 47.1k 27 rs0 PRI Aug 18 17:24:25.172
*0 22 *0 *0 0 1|0 0.5% 1.5% 0 1.54G 282M 0|0 0|0 4.17k 46.9k 27 rs0 PRI Aug 18 17:24:26.176
*0 23 *0 *0 0 2|0 0.5% 1.5% 0 1.54G 282M 0|0 0|0 4.20k 47.3k 27 rs0 PRI Aug 18 17:24:27.172
*0 22 *0 *0 0 1|0 0.5% 1.5% 0 1.54G 282M 0|0 0|0 4.18k 47.1k 27 rs0 PRI Aug 18 17:24:28.172
replication:
replSetName: rs0
rs.initiate(
{
_id: "rs0",
version: 1,
members: [
{ _id: 0, host : "127.0.0.1:27017" }
]
}
)
import reactivemongo.akkastream.cursorProducer
import reactivemongo.api._
import reactivemongo.api.collections.bson.BSONCollection
import reactivemongo.bson.{BSONDocument, BSONTimestamp, _}
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
object Test extends App {
val mongoUri = "mongodb://127.0.0.1:27017/admin?authMode=scram-sha1"
val driver = MongoDriver()
val parsedUri = MongoConnection.parseURI(mongoUri)
val connection = parsedUri.map(driver.connection(_))
val futureConnection = Future.fromTry(connection)
def db: Future[DefaultDB] = futureConnection.flatMap(_.database("local"))
def oplogColFuture: Future[BSONCollection] = db.map(_.collection("oplog.rs"))
implicit val system = akka.actor.ActorSystem("reactivemongo-akkastream")
implicit val materializer = akka.stream.ActorMaterializer.create(system)
oplogColFuture.map{ oplogCol =>
val source =
oplogCol
.find(BSONDocument(
"ns" -> BSONDocument("$in" -> Set(s"play-community.common-doc")),
"ts" -> BSONDocument("$gt" -> BSONTimestamp(1503046219, 1))))
.options(QueryOpts().tailable.awaitData.noCursorTimeout)
.cursor[BSONDocument]().documentSource()
source.runForeach{ doc =>
println("fold " + doc.get("ns").get)
}
}
Thread.sleep(10000000)
}
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.