Code Monkey home page Code Monkey logo

Comments (7)

frankmcsherry avatar frankmcsherry commented on June 28, 2024

Group is slower than it should be. I have some data now though.

I fired up the degrees.rs example, which starts from a stream of (src, dst) pair updates, and does two count_u operations in a row, computing first the out-degrees of each node and then the number of nodes with each degree. I ran this with 5 nodes and 10 edges, doing batches of 1,000,000 updates.

This exercise is meant to remove costs associated with looking up different keys, and ask "if we pile all of the changes into just a few keys, how long does it take for each update?".

It takes about 2.5us for each update, on average. Or, 2.5s for a batch of one million updates. These numbers aren't exactly accurate yet though, because each of the million "updates" are two changes, each of these input change usually results in two changes to the degrees (addition and retraction for the new and old source), which each usually result in two changes to the counts for each degree. So we may be working with as many as

1_000_000_000 * (2 + 4 + 8)

or fourteen million updates across the computation. We can see how many updates survive merging and it seems to be about 3.6 million per round for these parameters, so perhaps the 2.5 seconds for everything should be read more as 700ns per update; it is less embarrassing at that point. That being said, it should be even lower.

I got a breakdown of how long each group computation is taking in various stages, and I'll try and talk through them here. This is for batches of one million input changes.

Duration { secs: 0, nanos: 103592447 }	distinctions unmade
Duration { secs: 0, nanos: 639710777 }	keys processed
Duration { secs: 0, nanos: 156307150 }	sent and sealed
Duration { secs: 0, nanos: 16840250 }	traces advanced

The distinctions unmade code is where the group operator unblocks the merging of the traces it uses. This call provokes a merge where possible, and this can apparently take a serious amount of time. This is especially silly for this example because (if things are ideal) the existing history is compacted down to the 5 distinct values; a merge is not what we need at this point, what we need is to collapse the enormous pile of updates first.

The keys processed code is where we do all of the per-key work. This has a lot of room for improvement; in this case there are only a few keys, and we are slogging through the millions of distinct times at which each experience some sort of change. It would be great to get a breakdown of where time is spent in this method, but historically it is split between (i) loading the input data (merging from source batches), (ii) loading the output data (should be trivial here, unless we failed to compact history), and (iii) doing painfully general reasoning appropriate for multisets and partially ordered times, neither of which exist here.

The sent and sealed code is where we deposit the output updates for this batch into the output trace. This almost certainly provokes merging, and because of how the code is written we have not yet given the trace permission to collapse the output differences down (and in an operator fusion scenario we would not be so permitted anyhow). Even in an ideal scenario, this is merging a large pile of updates with the small historical data; in a suboptimal scenario, we haven't even collapsed that data yet. In both cases we would want to avoid doing anything significant until we can collapsed down both traces (at least, in a totally ordered setting; not so in an iterative computation).

Finally, the traces advanced code is where we give permission to the underlying traces to advance their underlying traces. Mysteriously, this takes some time but I can't for the life of me find the code that does anything in this call. The Spine does nothing on such a call (it updates its view of the frontier, but takes no action until we next merge things).

from differential-dataflow.

frankmcsherry avatar frankmcsherry commented on June 28, 2024

I think one of the problems here is that the collapsing and merging happens in a pessimal manner. Ideally when coming in to a group computation, the history would be in a mostly collapsed state already, so that reading out history would be cheap. When we write a batch into the outputs we should be able to collapse them then and there if no other handle need access (i.e. no fusion). Even if not, by the time we come around again, we would like the output history to be collapsed before we dump another pile of updates in and merge them, rather than just afterwards.

What we see is somewhat different. Here is the merge accounted for by distinctions unmade:

merging 1598668 and 1599700 -> 3198368
collapsed to: 1598672
Duration { secs: 0, nanos: 96649967 }	distinctions unmade

We merge together two large piles of input data, and then collapse them down to be roughly the former. This actually makes sense, because it is merging the current updates and the historical updates. However, we would rather have had the historical data collapsed down and the current data untouched, as we know it must be while we work on it. We can collapse down the "current" data once it becomes old, which it will at the end of the group execution, at which point it can be much more efficiently merged.

The sent and sealed code has a similar behavior:

merging 2983209 and 2984420 -> 5967629
collapsed to: 2983213
Duration { secs: 0, nanos: 164486774 }		sent and sealed

There are generally more changes in count output, so it is reasonable to see larger numbers here. But we see the same issue as above: We merge two piles of data, one of which could be greatly collapsed first, and the other of which cannot be collapsed and which benefits not at all from being merged with the collapsed historical data.

Both of these cases suggest that the discipline for collapsing and merging should perhaps be reviewed. These examples would prefer to have a more "collapse-driven" approach, where we collapse a batch as soon as we can, and we merge batches only when we must. The current implementation takes the opposite approach, which I think is due to "merge first, then collapse" having simple amortized performance bounds: it can be hard to know how often to collapse, or whether collapsing anything other than the oldest batch makes any sense.

In any case, perhaps we can work around this with better signaling from the group operator that unblocks merges later on, only after we have signaled that merging can happen.

from differential-dataflow.

frankmcsherry avatar frankmcsherry commented on June 28, 2024

Perhaps this should be related to #37 which suggests that descriptions might provide some smart guidance on when we should compact batches. For example, if the lower and upper bounds of a batch are not equal, but would become equal when they themselves are compacted, I suspect we have a good candidate for compaction. Just a gut feel at the moment, though.

Edit: since batches can be compacted already, perhaps I mean if lower and upper are not equal when compacted by since, but would be with the new frontier, go for it.

from differential-dataflow.

frankmcsherry avatar frankmcsherry commented on June 28, 2024

I made a few quick changes to try out ideas. They conceal information from the trace, so they aren't great things to want to do, but they have a beneficial impact:

  1. I changed the collapse logic, which fires after merging the largest two batches in a trace, to fire just before the largest two are merged. In essence, we collapse first and then merge, if we were planning on collapsing.

  2. I delayed group's distinction unmaking until after we have advanced the frontier used for collapsing. This prevents premature merging, and should only merge folks that are plausibly collapsable (it does not help if a fused operator or external trace handle holds back the collapse; a pointless merge will still happen in that case, unless that handle follows the same discipline).

The numbers improve a bunch. All of the work is now done at traces advanced, and what was about 250ms now takes about 50ms.

Duration { secs: 0, nanos: 629880697 }	keys processed
Duration { secs: 0, nanos: 50438806 }	traces advanced

The overall time drops from 2.5s per million input changes to about 2.0s, as we had two group operators contributing to the overall time. Barring other contributions elsewhere, it seems that most of the overhead is now focused in the per-key processing, which is a great place to find it.

I'll need to check out the numbers on other applications to see if the merge-collapse strategy is flawed for them (merging two collapsed traces does not necessarily result in minimal output, as merges do not do cancellation; collapsing merged traces does result in minimal output).

Another possible future issue is that collapsing can happen in-place, if the assets are uniquely owned; they are uniquely owned just after a merge, but might not be just before the merge. This could de-activate the hypothetical "collapse-in-place" future optimization.

from differential-dataflow.

frankmcsherry avatar frankmcsherry commented on June 28, 2024

A bit more data. Query 01 from the tpchlike workload, which is chiefly a count with u16 keys, changes its behavior a bit with the optimization. Here we vary batch size from 1k to 100k:

      1,000  10,000 100,000
old   4.04s   4.24s   5.10s
new   3.17s   3.10s   3.63s

The numbers are much improved (yay!), but also the performance degradation isn't as pronounced. This makes some sense, as pre-optimization, we would be sitting on un-collapsed data proportional to the batch size, merging batches of that size, etc. By doing less of that, we frustrate large-batch configurations less. I think.

from differential-dataflow.

frankmcsherry avatar frankmcsherry commented on June 28, 2024

Diving a bit deeper on the distinctions unmade, the group operator has previously read in as input the subset of the input trace up to and including the new batches it has to work on. For that reason, it tells the input trace: "feel free to merge up to and including ..".

However, the group operator also has the batches themselves, and there is good reason to only read historical input data up to but not including the new batches. Chiefly, we have to read the batches anyhow (to see when and where there are "new" changes), and some optimizations can make use of the distinction between what is old and what is new.

It is probably a good idea to pivot over to loading strictly historical input data from the trace, and current input data from the received batches. This has the potential to make the historical input reads much smaller, and the batch reads simpler (they needn't be complicated merges like the historical data).

This will probably reduce the time spent loading historical data for high-throughput scenarios, which should be a good thing. It also gives more precise information about which updates are new and which are not, which can allow some cute optimizations (or at least open the doors for them, a precursor of me typing them in).

from differential-dataflow.

frankmcsherry avatar frankmcsherry commented on June 28, 2024

The proposed change, loading historical data strictly prior to the batch, has been committed as #53. The result was some noticeable improvement for group performance in bfs, and no apparent regression in the TPCH suite (some improvements, but hard to credit to this change).

Additionally, we have #54 which supports a totally ordered count operator whose performance is quite solid (the time in degrees.rs is now mostly in arranging data).

I think this means we have a pretty good handle on where the time is going in group. There is a gap, and specialization seems to close this gap.

  1. Totally ordered times allow in-place accumulation, rather than maintaining and consolidating independent update buffers. Such times can also remove the need to maintain an output collection, as the "previous" output can be relatively easily determined from the previous inputs.

  2. Value-free collections allow simpler accumulation logic, in which we may not need to build and consolidate multisets, but rather single difference accumulation. This is most pronounce in combination with totally ordered times, where we can also excise the consolidated updates, and simply scan updates and directly accumulate into a scalar values. These operators are few, and may be limited to count, distinct, and their ilk.

There doesn't seem to be a great mystery about raw group performance, though there are still question marks about performance in the context of partially ordered times. Specifically, we see an increase in the number of times considered as batch size increases, likely due to the inability of group to compact its source trace as it proceeds.

from differential-dataflow.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.