Code Monkey home page Code Monkey logo

dp3's Introduction

dp3: multimodal log database

dp3 is an experimental database for management of multimodal log data, such as logs produced by sensors and internal processing logic on robots.

It is under active development and not suitable for production use. For details on the motivation and architecture see the paper.

There is a basic web application backed by dp3 at https://demo.dp3.dev.

Quickstart

Requirements

  • Go 1.22

The following instructions will start dp3 with a data directory on local disk.

  1. Build the dp3 binary.
    make build
  1. Start the server.
    mkdir data
    ./dp3 server --data-dir data
  1. Start the interpreter and type help to get started.
$ ./dp3 client

__        _      _____ 
\ \    __| |_ __|___ / 
 \ \  / _` | '_ \ |_ \ 
 / / | (_| | |_) |__) |
/_/   \__,_| .__/____/ 
           |_|         
Type "help" for help.

dp3:[default] # help
The dp3 client is an interactive interpreter for dp3.  dp3 is a
multimodal log database for low-latency playback and analytics.

The client supports interaction via either queries or dot commands. The
supported dot commands are:
  .h [topic] to print help text. If topic is blank, prints this text.
  .connect [database] to connect to a database
  .statrange to run a statrange query
  .import to import data to the database
  .delete to delete data from the database
  .truncate to truncate data from the database
  .tables to inspect tables available in the database

Available help topics are:
  query: Show examples of query syntax.
  statrange: Explain the .statrange command.
  import: Explain the .import command.
  delete: Explain the .delete command.
  truncate: Explain the .truncate command.
  tables: Explain the .tables command.

Any input aside from "help" that does not start with a dot is interpreted as
a query. Queries are terminated with a semicolon.
dp3:[default] #
dp3:[default] # .import my-robot example-data/fix.mcap
dp3:[default] #
dp3:[default] # from my-robot /fix limit 1;
{"topic":"/fix","sequence":13,"log_time":1479512770.309617340,"publish_time":1479512770.309617340,"data":{"header":{"seq":1,"stamp":1479512770.308916091,"frame_id":"/imu"},"status":{"status":0,"service":0},"latitude":37.39973449707031,"longitude":-122.108154296875,"altitude":-8.731653213500977,"position_covariance":[0,0,0,0,0,0,0,0,0],"position_covariance_type":0}}

Background

Multimodal log data may be characterized by,

  • High frequencies
  • Large volumes
  • Highly variable message sizes and schemas (images, pointclouds, text logs, numeric measurements, compressed video, ...)
  • Various different message encodings (protobuf, ros1msg, cdr, flatbuffers, ...)
  • Timeseries orientation

Common workloads on the data are,

  • Stream data in time order at a point in time, for a particular device or simulation, on a selection of "topics", into some sort of visualization application such as webviz or Foxglove Studio, or export to a local file and view with rviz or otherwise locally analyze. The selection of topics in this kind of query can be wide - frequently in the dozens or hundreds, and infrequently in the thousands depending on the architecture of the producer.
  • Execute "as-of join"-style temporal queries across different topics to identify scenes of interest. For example: "Show me the last 100 instances when the car was turning left in the rain with more than 10 pedestrians in the intersection".
  • Run heavy computational workloads on a "narrow" selection of topics. For instance, run distributed Spark jobs over hundreds of terabytes of images. This workload may care less or not at all about ordering, but cares a lot about throughput, and that the available throughput, availability, and cost scalability effectively matches that of the underlying storage and networking, either on-premise or in the cloud.
  • Summarize individual message field values at multiple granularities, at low-enough latencies to drive responsive web applications. Consider for instance the plotting facilities available in datadog or cloud monitoring systems like Cloudwatch or GCP stackdriver, which can (sometimes) plot multigranular statistical aggregates spanning many weeks or years and trillions of points in under a second.

dp3 attempts to address all three of these in a single easy-to-administer solution.

Architecture

The architecture of dp3 is inspired by btrdb. It differs in that it supports multimodal data and multiplexed playback, and in drawing a slightly different contract with its consumers -- one based on "topics" and "producer IDs" rather than "stream IDs".

In large deployments, dp3 is envisioned as a component within a greater domain-specific data infrastructure. However, in smaller deployments the hope of dp3 is that incorporation of topics and producers in the core data model will enable orgs to make use of dp3 "right off the bat" without secondary indicies.

Glossary

  • Producer ID: a unique identifier assigned by the user to the producer of some data. For instance, a device identifier or a simulation run ID.
  • Topic: a string identifying a logical channel in the customer's data stream. For instance, "/images" or "/imu". See http://wiki.ros.org/Topics for more information on how topics relate to robot architecture.
  • MCAP: a heterogeneously-schematized binary log container format. See https://mcap.dev/.

Multigranular summarization

dp3's underlying storage is a time-partitioned tree spanning a range of time from the epoch to a future date. The typical height of the tree is 5 but it can vary based on parameter selection. Data is stored in the leaf nodes, and the inner nodes contain pointers to children as well as statistical summaries of children. Data consists of nanosecond-timestamped messages.

In the service, nodes are cached on read in an LRU cache of configurable byte capacity. In production deployments, this cache will be sized such that most important inner nodes will fit within it at all times. Multigranular summarization requires traversing the tree down to a sufficiently granular height, and then scanning the statistical summaries at that height for the requested range of time. If the cache is performing well this operation can be done in RAM.

Low-latency playback

Input files are associated with a producer ID by the user. During ingestion they are split by topic and messages are routed to a tree associated with that topic and the producer ID. Merged playback on a selection of topics requires doing simultaneous scans of one tree per topic, feeding into a streaming merge.

Read scalability for ML jobs

The query execution logic of dp3 can be coded in fat client libraries in other languages like python or scala. Large heavy read jobs can use one of these clients to execute their business. The ML cluster simply needs to query for the current root location, which can be done once and then passed to the job.

With the dp3 server out of the way, all the heavy reading goes straight to S3 and can scale accordingly. This mode of operation does come with some compromises - clients are accessing data directly which complicates your ACL management - but these complexities may be preferable to running an expensive and dynamically scaling service that, for many of these workloads, might as well be doing S3 passthrough.

MCAP-based

Data in the leaf nodes is stored in MCAP format. Initial focus is on ros1msg-serialized messages, but this should be extensible to other formats in use. The format of the playback datastream is also MCAP.

Users who are already producing MCAP files, such as ROS 2 users, will have automatic compatibility between dp3 and all of their internal data tooling. The message bytes logged by the device are exactly the ones stored in the database.

Users of ROS 1 bag files can try dp3 by converting their bags to mcap with the mcap CLI tool.

Developers

We use golangci-lint for linting. To install it it follow the directions here: https://golangci-lint.run/usage/install.

Run the tests

To run the tests:

    make test

To run the linter:

    make lint

Build the binary

    make build

Profiling the server

Profiles can be generated with the pprof webserver on port 6060. For example,

  • Heap snapshot
    go tool pprof http://localhost:6060/debug/pprof/heap
  • CPU profile
    go tool pprof http://localhost:6060/debug/pprof/profile?seconds=30
  • Goroutine blocking
    go tool pprof http://localhost:6060/debug/pprof/block
  • Mutex contention
    go tool pprof http://localhost:6060/debug/pprof/mutex
  • Function tracing
    curl -o trace.out http://localhost:6060/debug/pprof/trace?seconds=5
    go tool trace trace.out

See https://pkg.go.dev/net/http/pprof for additional information about pprof.

dp3's People

Contributors

wkalt avatar jainilajmera avatar

Stargazers

Audrey Li avatar  avatar Bahram Banisadr avatar Raphael Deem avatar pezy avatar Hans-Joachim Krauch avatar Vinny Ruia avatar Adam Aposhian avatar Richard Dancsi avatar John Hubberts avatar Benji Barash avatar  avatar

Watchers

 avatar

dp3's Issues

monster issue list

bugs

  • bug: versionstore is not hooked up properly. See nodestore.nextVersion or something - needs to come from versionstore.
  • queries for topics/producers that are not stored produce 500 errors, including multi-topic requests when some topics are present but not others
  • duplicate uploads don't duplicate. If I upload the same file twice, I don't get duplicate messages in GetMessages. On one hand this is desired, on the other hand I don't remember implementing it... #5
  • output MCAP writer is misassociating schemas when merging across different files, resulting in nil-valued schemas for one of the files #6
  • Tree merge is erroneously merging areas of the tree that did not change 3e16a2e
  • Statistics mishandles NaN and probably infinite valued floats. Probably the thing to do is skip these for sum/min/max/mean accounting, and maintain nan/inf counts as a separate statistic. 1883231
  • Tree iterator is building a full list of leaves up front, which is extremely slow on huge resultsets. Needs to be incremental. 60685f5
  • nondeterministic import failure: #11 #18
  • service does not accept read requests during recovery (in startup), but it seems like it could
  • service does not currently crash on a port conflict c0440db
  • semicolon termination should be in the grammar not enforced in client - to enable batched queries. cfc95d8
  • local disk storage implementation should write to a tmpfile + rename #14
  • statistics storage on inner nodes is currently stored per-schema, but there are some kinds of statistics that are difficult for us to get per-schema, such as the compressed size of leaf data. We need to restructure the statistics representation a bit to handle this.
  • the executor defers initialization of the output writer until a message is successfully pulled, to allow schema conflicts to be surfaced as 400 errors. It needs to write an empty file if the resultset is simply empty. 82dbf9c
  • tables response format needs structural cleanup

tree methods

  • delete
  • return message diff between versions
  • get statistics since (version) maybe? does that make sense?

testing

  • service-level integration tests. Test service restarts don't corrupt database.
  • tests for concurrent inserts into tree
  • duplicate data must be deduplicated (on timestamp and message byte)
  • testing for storage with minio
  • test a huge rootmap

performance analysis

  • establish metrics of interest: memory usage during querying and ingestion, tree node sizes, records per second @ various input file dimensions, read throughput, time to first record. These need to be easy to observe through prometheus or something. When we get to evaluation it should focus on minio-backed deployments and be run against both local and remote minio deployments.
  • effectiveness of write batching

design questions

  • leaf nodes are sized by time, not byte size. There is probably an ideal write size for our storage writes. If we get a sample of messages we can attempt to size a tree to hit that, but if the sample is bad we will produce sub-ideal writes, through no fault of the user. We need to implement that sampling mechanism and also think harder about the general problem. It would be ideal if user write patterns influenced physical storage layout as little as possible (it won't be totally avoidable).
  • We are starting with ros1msg support because there are a lot of bag files available on the internet that we can use to test with, but ros1msg is not the only recording format used in ros2, or possibly even the most common. To support ros2 we will need to add parsing/statistics support for protobuf, CDR, and flatbuffers. We will need to survey the mcap community to figure out what's highest priority - my guess is CDR will be.
  • Multiple schemas may be used for a single topic name, particularly over long periods of time as schemas evolve. Is it OK to have multiple schemas in one tree, or do we really need to make trees unique by schema? Nothing in playback breaks due to multiple schemas, but search/statistics features could be complicated (they will be complicated whether there are multiple trees or one). 70905e5
  • bidirectional playback protocol - currently the user makes a request and gets a dump of MCAP response data back. If frontend tools could define a contract, would they choose this or something with finer-grained bidirectional controls? If a spec were defined would FE tooling implement support? This issue seems worth a read/consideration: foxglove/ws-protocol#261
  • It would similarly be useful to spike out a variant with parquet (or a columnar format of some kind) in the leaves. We would need to transcode both on the way in and out, which would be a pain, but having some sense of how much would be lost or gained in row-oriented throughput would be useful.
  • If we want to support the ability to dump messages on a topic irrespective of producer, we are going to have a problem with the current arch doing an in-memory merge join if the number of producers is very large. Very large numbers of producers can happen easily in simulation, if each run ID is treated as a different producer. For heavy analytics usecases we have an answer, which is use spark or something, but for usecases like viewing logs in a webpage that doesn't work. Probably to support this we will need to spill to disk in the executor. I think as long as we can see a path we can defer this for a while.
  • we use data files numbered with unpadded decimal numbers, but padded numbers could be helpful for lexicographical sorting. Maybe we should be padding our object IDs.
  • Today our data files are a concatenation of node serializations from leaf to root. If this could itself be packaged as a single MCAP file (maybe with attachments for the inner nodes) this would be a big usability win.

features needed

  • Currently we flush WAL synchronously with inserts. WAL flushing needs to get moved to a background thread that intelligently flushes after periods of inactivity on a topic or when size limits get reached. #5
  • can we ditch the nodestore staging map if inserts flush to WAL? #5
  • data files should be segregated by tree in storage, with a meaningful name 20c80fe
  • warm node cache prior to accepting requests?
  • statrange command should not require start/end #7
  • export command should not require topics. When no topics are supplied it should return all topics. be61a9b
  • inner node serialization should change from JSON to compact binary format. Waiting on full statistics support to gather a better picture of what we will need.
  • inner nodes should be cached in serialized form, not deserialized
  • statrange queries currently have a minimum granularity of 60s (I think) lower than which will produce a 400. We need to extend it to actually look at the message data and produce a correct result.
  • Switch to 64-bit offsets and lengths in IDs. This costs 8 bytes per IDs but will insulate us against gigantic messages. d6ad718
  • dump command: it should be possible to dump the database to a hive-partitioned directory of MCAP files.
  • WAL doesn't garbage collect yet #8
  • export command shouldn't require a producer. Should be able to return data across all producers.
  • statistics must be extended to variable-length arrays
  • currently we route topics to ingestion workers based on a hash of producer/topic to ensure two workers don't process the same topic, but this causes underutilization when a worker gets assigned multiple slow-to-process topics. We should switch to a semaphore based strategy to allow other workers to pick up the extra work in this case.
  • track original import request ID through WAL, and log completion
  • need API for looking up message definitions by hash, which implies storing them somewhere by hash.
  • multiple database support. it should be possible to have sim and real-world data segregated on one instance. 54707ef
  • Statistics should degrade gracefully for encoding formats we don't understand, i.e still keep message count, bytecount, just not fieldlevel. This will let non ros1msg MCAP users get some benefit before we implement full parsers.
  • Die immediately on second sigint 3e35173
  • export query results into parquet files
  • can we detect if a CLI user is in vim mode and have vim mode in the client?
  • make number of concurrent wal files configurable. Today we use just one. We don't want one per producer/topic. But one probably isn't optimal.
  • playback needs to support a mode where the first message returned on each merged stream is the last prior to the requested time, within an adjustable time bound, to allow visualizations to avoid data gradually filtering in.
  • it should be possible for a database to span different storage buckets. This enables the user to configure retention policies at the bucket level instead of based on an object prefix, which is usually frowned on. It will not be possible for individual trees to span buckets, without storing a bit more state in the node IDs (one byte for 128 allowed buckets within a database seems like it would be sufficient and leave us plenty of range for length).

catalog introspection

from within the client,

  • what producers do I have?
  • what topics exist for a producer?
  • what are the message-level stats for each table's root nodes?
  • what previous versions of a table do I have, dated and numbered?
  • what schema(s) are associated with a topic?
  • what fields on a topic can be queried?
  • eventually - what databases do I have?

community

  • present @ foxglove community meetup
  • present @ foxglove community meetup 1 mo followup
  • project logo

performance evaluation

  • is there MCAP or bag data at berkeley we could load up for an evaluation at the end?
  • establish benchmark metrics

client

  • interfaces - stick with REST? Use gprc? Yes: grpc. Maybe keep rest, but if the CLI tool is good we don't need rest.
  • switch to string-format time params in APIs, or js clients will struggle
  • fun CLI features. Like psql "session" interface, plotting of statistical ranges, displaying images? playing video? f688894
  • web interface - just to display functionality. Maybe coverage (ranges of data coverage at a given granularity).
  • autocomplete based on producer/table listing
  • autocomplete grammar
  • MCAP library in Java or Scala
  • Python and java iterator implementations that access a root directly.

clustering

  • versionstore, wal, rootmap are currently sqlite-based. Both versionstore and rootmap need to move out of sqlite because multiple nodes need to hit them. WAL can stay sqlite for now. Let's go with postgres for now.
  • storage needs an S3-compatible implementation. Use minio libraries.
    *inserts need to shard across replicas based on producer + topic. What manages the shards? Probably goes in postgres.
  • on the read side, it would be best if we could merge reads with WAL. The "problem" is this would require distribute WAL storage IF we also want any node to be able to serve reads. We can solve this with distributed WAL storage but that's more complicated and slower.
  • Expanded in #10

monitoring

  • metrics instrumentation & scraping support
  • function tracing
  • pprof debugging endpoint

deployment

  • kubernetes manifests (probably helm?)
  • guidance for how to deploy

retention policies

  • I think the way retention will work is to store a retention policy on the root's record in the rootmap, and guard readers against reading data older than the policy dictates. Once that is in place retention can be managed with regular object lifecycle policies supported by the cloud provider.
  • Targeted exemption from GC is still outstanding
  • We will probably need to stick insertion times on inner nodes (in the children probably?) in order to implement the guard.

search & query language

  • statistics: field-level
  • SQL or not SQL?
  • SQL: better 3rd party compatibility, maybe chatgpt can answer queries for us
  • Not SQL: SQL is crappy for expressing complex as-of joins, which are a common kind of query. Maybe we can do a lot better. Ideally end users would be able to express queries themselves. Queries might be something like "show me all times in last 6 months when it was raining and we were taking an unprotected left and there were dogs in the intersection". That is hard to write in SQL if you aren't a SQL expert. We don't want customers to need to hire teams of SQL experts to translate. Also chatgpt is far from writing good english to SQL for arbitrary business contexts - not clear it will ever work.
  • Expanded in #9.
  • descending keyword to reverse sort order
  • variable-length array support for where clauses
  • statistics acceleration for scans #25
  • support within (bidirectional precedes/succeeds)
  • we should be able to accelerate as-of joins using the MCAP message index. Prior to decompressing anything, consult the indexes to see if messages on the relevant topic are within the threshold of each other.
  • Once we have UDF support, it would be really useful to have materialized views
  • "neighbors" remains unimplemented in the query language
  • statistics acceleration can be applied at a higher level than the scan level, so that scans on different tables can restrict otherwise unrestricted scans on other tables by time. This would be helpful to improve performance in join scenarios but will require more sophistication.

maintenance

  • Custom golang-ci lint rule enforcing capitalization of log lines
  • Custom assert/require lib with better pretty-printing and representation of unsigned numerics
  • tree pretty printer for better test diffs

weirdnesses

  • versions are assigned unnecessarily while staging writes to WAL. Each write to WAL gets a version, then we merge them and create one big commit with a final version. I think the version assignment can just be deferred until the big commit. #5
  • tree insert over existing data currently clones all nodes down to the leaf. Pretty sure it only needs to clone the root for tree dimensions, and then all the other copying happens at time of merge from WAL. No indication so far that this is a bottleneck but it probably will be if it isn't yet. #5
  • cgo sqlite stuff is hard to inspect with pprof. Need a solution or perhaps switch to golang embedded db. #5
  • Usage of the word granularity is weird and we may want to revise. Our granularity is an interval in seconds that the stats bucket width must be at least as small as, but this means low "granularity" is "highly granular". Maybe we are misusing the word or should pick a better one.

beta release blockers

  • document versioning strategy
  • document versioning strategy for physical tree nodes
  • graceful statistics degradation for non-ros1msg format messages
  • document data deletion strategy (based on object lifecycle policies) and implement feature support in the server to mask deleted data.
  • whole-tree delete command
  • swagger API docs

query language

Background

Today dp3 executes only one kind of query, which you could consider as an unrestricted as-of join on timestamp, or a timestamp-sorted union.

We will want to expand this to more flexibly defined as-of joins on timestamp, and more generally provide a query language interface with some "psql-like" functionality in the CLI to the user to give them a more interactive and databasey experience.

SQL is a poor language for this because supporting any SQL sets up the expectation that we will support all of it -- which we are not prepared to do efficiently -- and even if we did there are no SQL implementations that express as-of joins in a concise and user-friendly way, that SQL non-users will be excited to pick up. People who like SQL will be annoyed that we only support a subset and people that hate it will be annoyed that our query language resembles it.

Finally SQL is going to limit our eventual prospects for flexible autocompletion support in the client, due to the way it lists columns prior to tables.

So, we are not going to implement SQL.

The concern about only implementing part of SQL also applies to the other languages people are trying to standardize like kusto or prql. We are at best going to get inspired by them - we aren't going to implement them or advertise support for them.

We do need a few similar features to what all of these provide though:

  • Filtering i.e where clauses with the usual binary operators
  • N-way as-of join on timestamp only, with support for either "succeeds" or "precedes" relationships since the user may want to think either way.
  • A modifier on the as-of join like "by less than X/by more than X", allowing you to specify you only want hits within a particular amount of time.
  • Composable
  • Limit/offset
  • Filtering on timestamp, but elevated to a part of the syntax.
  • Reverse the order of timestamp iteration, to enable reverse search as typically presented in log search interfaces.
  • Restriction on producer elevated to part of the syntax. Initially we will require it but not forever.
  • Complex array support. What we need here actually goes beyond SQL’s capabilities.
  • UDF support - a bonus but something we should plan for/think about, as users may wish to do some custom computations like linear algebra or whatever that we will never be fast enough to generalize in the product.

The intent of the language will be to support easy interactive searching, primarily on small numbers of topics, and to enable the expression of multi-topic conditions like "show me 50 times in the last month when we braked hard while it was raining" (considering in that example separate topics for "hard brake" and "is raining"). These conditions can get complex and incorporate several topics. I would expect that as-of joins bigger than 12-way will be rare and that most will be (well) under 6-way.

We likely never support other kinds of joins, or sorting on fields other than log time, or heavy analytics (at least in this engine). We need to play to the strengths of our system and the query engine that supports this will only be single-node. A closer analog for what we are targeting would be the query languages of sumologic, stackdriver, or any of the cloud log search tools, except with a lot more focus on as-of joins. For heavy analytics work users can use spark.

For purposes of autocomplete, we can assume that we can get both a fast listing of available tables (i.e topics), and a fast schema listing for a particular table. Both of those are true, we just don't have APIs for them. So if a query leads with a table and follows with the column restrictions, we will be able to autocomplete it (probably glossing over some nuances about the grammar).

Proposal

Get single topic:

/topic

Get two topics joined

Current behavior of topics param - comma operator

/topic1, /topic2

Restrict a single topic with scalar subfield

Conventional comparison operators supported i.e =, <, <=, ~, *~, <>, etc.

/topic where field.subfield.subfield2 = 'foo'

Restrict on a fixed-size array element.

/topic where field.subfield.subfield2[1] = 'foo'

Restrict on variable-sized array element.

We probably do need the ability to address a variable-sized element by index, but other queries on variable-sized arrays are going to require some kind of "any", "all", or "none" semantics I think. For access by index we will use the same syntax as above for fixed-size array. For the others we need something special

;; access by index
/topic where field.subfield.subfield2[1] = 'foo'

;; any/all/none - element alias. This can be anything the user wants but conventionally “.” will be used unless
;; disambiguation is required (e.g nested arrays). Note that we should be able to autocomplete child fields.

;; scalar arrays
/topic where field.subfield.subfield2 has any . > 10
/topic where field.subfield has no . > 10
/topic where field.subfield has all . > 10

;; complex element with scalar subfield
/topic where field.subfield.subfield2 has any .x.y > 10

;; complex element with scalar array-valued subfield – note use of alias “element” - could be anything.
/topic where field.subfield has any (.x.y has any element > 10)

;; complex element with complex array-valued subfield
/topic where field.subfield has any (.x.y has any element.x > 10)

;; double nested
/topic where field.subfield has any (.x.y has any (element.x.y has no child.x > 10))

It’s possible we won’t need the parentheses, but if we don’t it would be good to support them anyway since they make the structure clearer. In all cases above the argument to “any” could have been parenthesized.

Join two restrictions on timestamp

(/topic where ...), (/topic2 where ...)

Precedes/succeeds/neighbors operators.

Supports keywords nanoseconds, microseconds, milliseconds, seconds, minutes. We will probably cap at 1 or 5 minutes for now since something is going to buffer that data until/unless we spill queries to disk, which we can defer until requested.

(/topic where …) precedes (/topic2 where …) by less than 5 seconds
(/topic where …) succeeds (/topic2 where …) by less than or equal to 5 minutes
(/topic where …) neighbors (/topic2 where …) by less than 5 seconds

;; unrestricted
/topic precedes /topic2 by less than 5 seconds

;; Composition
((/topic where …) precedes (/topic2 where …) by less than 5 seconds) neighbors (/topic3 where…) by less than 5 seconds

As-of semijoin

^ AKA suppression operator

;; unrestricted semijoin of /topic and /topic2. Valid but pointless.
/topic, ^/topic2
;; useful example of semijoin, for instance if /topic is the only one required.
(/topic where …) precedes ^(/topic2 where …) by less than 5 seconds

Timestamp restriction

From/to keywords

;; dates
/topic from '2020-01-01' to '2020-01-02'

;; nanoseconds
/topic from 123 to 345

;; on the result of a join
/topic, /topic2 from '2020-01-01' to '2020-01-02'
(/topic where ...) precedes (/topic2 where ...) by less than 5 seconds from '2020-01-01' to '2020-01-02'

;; applied to subquery - mostly pointless but valid
(/topic from '2020-01-01' to '2020-01-02') precedes (/topic2 from '2020-01-02' to '2020-01-02')

Descending keyword

Reverse the sort order. This should be used at the end of queries (before limit/offset) but is also valid in subqueries. Most likely when used in subqueries the effect on precedes/succeeds behavior will become confusing.

/topic descending

/topic where ... descending

/topic where ... from a to b descending

;; legal but don't do this
(/topic where ... descending) precedes (/topic2 where ...) by less than 5 seconds

;; better
(/topic where ...) precedes (/topic2 where ...) by less than 5 seconds descending

Limit/offset

Valid at the end of any query/subquery only.

;; get 5 records, e.g to inspect structure
/topic limit 5

;; equivalent
/topic limit 5 offset 5
/topic offset 5 limit 5

;; on a join
(/topic where ...) precedes (/topic2 where ...) by less than 5 seconds descending limit 5 offset 5

segfault in gc

what is this?

2024/06/19 20:08:26 INFO tables request database=default producer="" topic="" historical=false request_id=ee4cff6f-5e30-4a54-9b9c-e5a769e083ec
SIGSEGV: segmentation violation
PC=0x79d421 m=9 sigcode=1 addr=0xe100002aa3

goroutine 0 gp=0xc00057c000 m=9 mp=0xc000566808 [idle]:
runtime.scanblock(0x40008, 0x40000, 0xe100002aa3, 0xc000057268, 0x0)
        /usr/local/go/src/runtime/mgcmark.go:1366 +0x41 fp=0x7f3e373ffcb0 sp=0x7f3e373ffc50 pc=0x79d421
runtime.markrootBlock(0xc0067a2148?, 0x0?, 0xc00057c000?, 0x7f3e373ffd58?, 0x7bf989?)
        /usr/local/go/src/runtime/mgcmark.go:286 +0x54 fp=0x7f3e373ffcf0 sp=0x7f3e373ffcb0 pc=0x79b3d4
runtime.markroot(0xc000057268, 0x3, 0x1)
        /usr/local/go/src/runtime/mgcmark.go:172 +0x3e5 fp=0x7f3e373ffd98 sp=0x7f3e373ffcf0 pc=0x79b1c5
runtime.gcDrain(0xc000057268, 0x3)
        /usr/local/go/src/runtime/mgcmark.go:1200 +0x3d4 fp=0x7f3e373ffe00 sp=0x7f3e373ffd98 pc=0x79d154
runtime.gcDrainMarkWorkerDedicated(...)
        /usr/local/go/src/runtime/mgcmark.go:1124
runtime.gcBgMarkWorker.func2()
        /usr/local/go/src/runtime/mgc.go:1387 +0xa5 fp=0x7f3e373ffe50 sp=0x7f3e373ffe00 pc=0x7995a5
runtime.systemstack(0x800000)
        /usr/local/go/src/runtime/asm_amd64.s:509 +0x4a fp=0x7f3e373ffe60 sp=0x7f3e373ffe50 pc=0x7f218a
Segmentation fault

nondeterministic failure when querying concurrently with big import

I am able to cause a failure that logs this:

2024/04/05 19:45:08 ERROR Internal server error msg="error getting messages: failed to get messages: failed to load iterators: failed to get iterators: failed to get next message: failed to get next leaf: failed to get node 17858078595780745765:15437249478555840127:10845002079556675049: node 17858078595780745765:15437249478555840127:10845002079556675049 no
t found" request_id=5d4ebdbb-2fca-498a-8137-b20b2c5e359c

that node ID looks random, so it's either an issue in the node serialization logic not overwriting all temporary addresses, or the bytes we are interpreting as an ID are misaligned. Better logging would also make it possible to understand what version we are working on here, which should allow us to inspect the correct tree state.

This seems to happen rarely, and subsequent requests (concurrent with newer inserts) succeed.

Edit - forgot to provide instructions. To produce this I am doing a concurrent import of all of my MCAP data:

./dp3 import --producer my-robot ~/data/**/*.mcap --workers 16

while repeatedly starting and killing requests to export all topics:

./dp3 export --producer my-robot --json

clustering

dp3 deployments will need to scale dynamically. There are some complications:

  • A particular (producer, topic) combo must be routed to only one node at a time. Zero (failure or blocking) is ok; multiple is unacceptable. Writes to a single tree must be serialized or data may be lost. After a node stops processing a (producer, topic) it must never do so again until it restarts and gains a higher range of versions than any other node has claimed.
  • To make things as easy as possible for the user, we want to minimize the number of deployed components. We will already have at least two in distributed environments (dp3 + rootmap). Adding more for cluster management/cluster state management will be unpleasant. Components written in go that we can run in-process are attractive.
  • Since we don't want to put too many assumptions on the load balancer, we should assume any node can receive a write request and must then forward it to the node that can actually process it. So each node needs an up-to-date shard mapping. If the mapping is out of date, the request needs to be rejected or fail.
  • Shard re-balancing must consider the write ahead log.

Limited factors in our favor:

  • dp3 is targeting a deployment shape with a relatively static number of beefy nodes. Scaling is not expected to be highly dynamic. Limited blocking during scaling events (while shards are rebalanced) may be acceptable.
  • reads can be served by any node at any time, limiting disruption to writes, which are probably queue-fed.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.