Code Monkey home page Code Monkey logo

dask-mongo's Introduction

Dask-Mongo

Tests Linting

Read and write data to MongoDB with Dask

Installation

dask-mongo can be installed with pip:

pip install dask-mongo

or with conda:

conda install -c conda-forge dask-mongo

Example

import dask.bag as db
import dask_mongo

# Create Dask Bag
records = [
    {"name": "Alice", "fruit": "apricots"},
    {"name": "Bob", "fruit": ["apricots", "cherries"]},
    {"name": "John", "age": 17, "sports": "cycling"},
]

b = db.from_sequence(records)

# Write to a Mongo database
dask_mongo.to_mongo(
    b,
    database="your_database",
    collection="your_collection",
    connection_kwargs={"host": "localhost", "port": 27017},
)

# Read Dask Bag from Mongo database
b = dask_mongo.read_mongo(
    database="your_database",
    collection="your_collection",
    connection_kwargs={"host": "localhost", "port": 27017},
    chunksize=2,
)

# Perform normal operations with Dask
names = b.pluck("name").compute()
assert names == ["Alice", "Bob", "John"]

License

BSD-3

dask-mongo's People

Contributors

crusaderky avatar jrbourbeau avatar juliusgeo avatar ncclementi avatar scharlottej13 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

dask-mongo's Issues

Release?

Once #2 is merged, dask-mongo will have initial support for reading from / writing to mongo databases. This might be a good time for an initial dask-mongo release on PyPI.

cc @ncclementi

Replace name of `read_partition` to include mongo

When reading from mongo using read_mongo the low low-level functions that shows up on the task-stream plot in the dashboard is read_partitions . We want this to make a reference that is reading from a mongo database, therefore we should change the name.

One option could be fetch_mongo_partitions, but it seems long. Any suggestions?

Add project parameter

Very frequently, a user will want to drop some of the keys in the mongodb documents, server side, before they are loaded.

Please add a project: dict[str, Any] = None parameter to read_mongo.
The project must be applied after the two matches. Please include a test that would fail if it were the other way around.

Coiled runtime that includes dask-mongo?

I am seeing the following exception when using the dask-mongo package in my code. I can use the pymongo client directly to query the collection so I know my arguments for read_mongo are correct. When I call take(1) on the bag returned I get the following exception:

Traceback (most recent call last):
  File "C:\Users\antho\repos\py-compute-poc\py-compute-poc\main.py", line 48, in <module>
    do_work()
  File "C:\Users\antho\repos\py-compute-poc\py-compute-poc\main.py", line 37, in do_work
    b.take(1)
  File "C:\Users\antho\repos\py-compute-poc\venv\lib\site-packages\dask\bag\core.py", line 1456, in take       
    return tuple(b.compute())
  File "C:\Users\antho\repos\py-compute-poc\venv\lib\site-packages\dask\base.py", line 315, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "C:\Users\antho\repos\py-compute-poc\venv\lib\site-packages\dask\base.py", line 600, in compute
    results = schedule(dsk, keys, **kwargs)
  File "C:\Users\antho\repos\py-compute-poc\venv\lib\site-packages\distributed\client.py", line 3036, in get   
    results = self.gather(packed, asynchronous=asynchronous, direct=direct)
  File "C:\Users\antho\repos\py-compute-poc\venv\lib\site-packages\distributed\client.py", line 2210, in gather    return self.sync(
  File "C:\Users\antho\repos\py-compute-poc\venv\lib\site-packages\distributed\utils.py", line 338, in sync    
    return sync(
  File "C:\Users\antho\repos\py-compute-poc\venv\lib\site-packages\distributed\utils.py", line 405, in sync    
    raise exc.with_traceback(tb)
  File "C:\Users\antho\repos\py-compute-poc\venv\lib\site-packages\distributed\utils.py", line 378, in f       
    result = yield future
  File "C:\Users\antho\repos\py-compute-poc\venv\lib\site-packages\tornado\gen.py", line 762, in run
    value = future.result()
  File "C:\Users\antho\repos\py-compute-poc\venv\lib\site-packages\distributed\client.py", line 2073, in _gather
    raise exception.with_traceback(traceback)
distributed.scheduler.KilledWorker: ("('read_mongo-take-dee43a0b2ec2cd6605c74cebf61e8dcc', 0)", <WorkerState 'tls://10.0.15.194:38225', name: ghg-production-worker-de2008fcdc, status: closed, memory: 0, processing: 1>) 

I am wondering if my workers need a software environment that includes dask-mongo. Any ideas as to what might be happening?

Thanks in advance for your help!

-Tony

Upsert to mongo

In some cases, we would like to not only adding new documents to a mongo collection but also upserting documents.
It would be great if such task can also be catered by dask-mongo :)

Track appname in MongoClient

It'll be useful for the Atlas folks to see how many people are connecting to Atlas Clusters via our dask-mongo connector. To do that we need to include the appname when creating the client.

I believe it would look something like

app_info = f"dask-mongo/{dask_mongo.__version__}"

with pymongo.MongoClient(**connection_kwargs, appname=app_info) as mongo_client:
       #do things

We should include this in write_mongo, fetch_mongo, and read_mongo in https://github.com/coiled/dask-mongo/blob/main/dask_mongo/core.py

For reference, read appname in https://pymongo.readthedocs.io/en/stable/api/pymongo/mongo_client.html

cc: @jrbourbeau @rrpelgrim @baf509

Better error when unable to connect

@scharlottej13 was using an Atlas Mongo instance and forgot to open the firewall to connections from her Coiled cluster.

The "can't connect to mongo" error is not very clear. Can this be better?

File "/opt/conda/lib/python3.9/site-packages/dask_mongo/core.py", line 86, in fetch_mongo
    return list(coll.aggregate([{"$match": match}, {"$match": match2}]))
  File "/opt/conda/lib/python3.9/site-packages/pymongo/collection.py", line 2428, in aggregate
    with self.__database.client._tmp_session(session, close=False) as s:
  File "/opt/conda/lib/python3.9/contextlib.py", line 119, in __enter__
    return next(self.gen)
  File "/opt/conda/lib/python3.9/site-packages/pymongo/mongo_client.py", line 1729, in _tmp_session
    s = self._ensure_session(session)
  File "/opt/conda/lib/python3.9/site-packages/pymongo/mongo_client.py", line 1712, in _ensure_session
    return self.__start_session(True, causal_consistency=False)
  File "/opt/conda/lib/python3.9/site-packages/pymongo/mongo_client.py", line 1657, in __start_session
    self._topology._check_implicit_session_support()
  File "/opt/conda/lib/python3.9/site-packages/pymongo/topology.py", line 538, in _check_implicit_session_support
    self._check_session_support()
  File "/opt/conda/lib/python3.9/site-packages/pymongo/topology.py", line 554, in _check_session_support
    self._select_servers_loop(
  File "/opt/conda/lib/python3.9/site-packages/pymongo/topology.py", line 238, in _select_servers_loop
    raise ServerSelectionTimeoutError(

ServerSelectionTimeoutError("ac-dzkgf0a-shard-00-01.rnleese.mongodb.net:27017: connection closed,ac-dzkgf0a-shard-00-02.rnleese.mongodb.net:27017: connection closed,ac-dzkgf0a-shard-00-00.rnleese.mongodb.net:27017: connection closed, Timeout: 30s, Topology Description: <TopologyDescription id: 6306a6adfd07072a117b1355, topology_type: ReplicaSetNoPrimary, servers: [<ServerDescription ('ac-dzkgf0a-shard-00-00.rnleese.mongodb.net', 27017) server_type: Unknown, rtt: None, error=AutoReconnect('ac-dzkgf0a-shard-00-00.rnleese.mongodb.net:27017: connection closed')>, <ServerDescription ('ac-dzkgf0a-shard-00-01.rnleese.mongodb.net', 27017) server_type: Unknown, rtt: None, error=AutoReconnect('ac-dzkgf0a-shard-00-01.rnleese.mongodb.net:27017: connection closed')>, <ServerDescription ('ac-dzkgf0a-shard-00-02.rnleese.mongodb.net', 27017) server_type: Unknown, rtt: None, error=AutoReconnect('ac-dzkgf0a-shard-00-02.rnleese.mongodb.net:27017: connection closed')>]>")

Example in readme should not use chunksize=2

The example in readme should not use chunksize=2:

# Read Dask Bag from Mongo database
b = dask_mongo.read_mongo(
    database="your_database",
    collection="your_collection",
    connection_kwargs={"host": "localhost", "port": 27017},
    chunksize=2,
)

IIUC a chunksize of 2 will cause read_mongo to execute 1 query for every 2 documents in the result set, with 1000 docs there will be 500 queries. Let's use a realistic value of chunksize (maybe 10,000?) to avoid users copy/pasting a poor default value.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.