Code Monkey home page Code Monkey logo

distwalk's People

Stargazers

 avatar

Watchers

 avatar

distwalk's Issues

allow for using UDP as transport

currently, only TCP/IP is used as transport, but for distributed real-time workloads it may be useful to have also an (unreliable) UDP-based messaging implementation in distwalk

Too many "Broken pipe" from dw_node

Launching

./dw_node &
./dw_client -C 6000 -n 1000 -nt 2

then killing the client with Ctrl+C results in a flood of Broken pipe errors, as visible below.

tommaso@laptom:~/work/distwalk$ ./dw_node 
RECV Unexpected error: Connection reset by peer
SEND Unexpected error: Broken pipe
SEND Unexpected error: Broken pipe
SEND Unexpected error: Broken pipe
SEND Unexpected error: Broken pipe
...

Under these conditions, the server is "lagging behind" trying to process requests (on a single CPU) taking longer than their inter-arrival time, when the connection is abruptly closed, it's normal that the server might be trying to reply to them and have a send() failing. However, at that point, we would close_and_forget() the client connection, thus subsequent requests should NOT keep trying to send anything, as there's no more a connection to send to.

Handling arbitrary command chains

Make dw_client able to build an arbitrary chain of commands in its messages sent to the node server, which can handle them locally one after the other, and/or forward them to other node servers.

Kubernetes support (aka KubeWalk)

One of the mail goal of DistWalk is to provide an distributed tool for emulating workloads on popular orchestration tools, such as Kubernetes. We need:

  1. A set of scripts and dockerfiles to automatically "install", configure and run DistWalk. Ideally, the modification to Kubernetes itself should be kept to a minimum (ideally, zero).

  2. Integration with the monitoring component envisioned in Issue #22

FORK/JOIN with arbitrary branch contents

Currently, the MULTI_FORWARD allows us to emulate topologies where parallel branches are an exact replication of the same commands. Indeed, each IP is forwarded the same command-sequence excerpt from the original request containing the MULTI_FORWARD.
We might add a more general mechanism allowing for arbitrary FORK points, where individual parallel branches may be instructed to execute different command sets, and JOIN points, where these individual asynchronous branches re-synchronize.
In terms of behavior, a FORK as just mentioned might resemble a FORWARD in the sense described in #40, with the difference that not all of the request is FORWARDed, but just a part of it, while the rest of the request is continued being executed by the current node (asynchronously with the forwarded sequence) ; a JOIN should resemble/reuse what we already implemented in the MULTI_FORWARD.

Practically, we might have:

COMPUTE 10ms
FORWARD_ASYNC next 3 commands (a)
  COMPUTE 5ms
  LOAD 256B
  REPLY 256B
FORWARD_ASYNC next 2 commands (b)
  COMPUTE 15ms
  REPLY 16B
JOIN a, b
REPLY 24B // final reply to the client

However, the JOIN in the above example might also need to be executed on a different node:

COMPUTE 10ms
FORWARD_ASYNC next 4 commands (a)
  COMPUTE 5ms
  LOAD 256B
  FORWARD_JOIN IP:port 256B
  REPLY 24B
FORWARD_F&F next 3 commands (b)
  COMPUTE 15ms
  FORWARD_JOIN IP:port 16B
  REPLY 24B

Here, the final "REPLY 24B" is the part to be executed on the joining node, IP:port, and it is replicated in both branches in the original request, however we might have a different syntax to denote this case. Moreover, the first FORWARD_ASYNC is meant to be non-blocking for the executing thread, that will continue executing the FORWARD_F&F in parallel; this latter command would be a Fire & Forget FORWARD, in the sense also discussed in #40.

Categorization of debug messages in dw_log()

The dw_log() function is full of clutter that may be not useful for a certain debugging session, making the simple print-and-pray approach more manageable. We need a way to categorize debug information and let the user specify the subset of debug information he's interested in.

Something along the line of

./dw_client -LOG [CONNECTION|STORAGE|ERROR] 

data sync options for STORE

The STORE command has been sketched out with the idea of a per-request sync, with the ODIRECT option specifiable for the whole node as command-line option to dw_node.
We'd need to add more options in this regard, to mimic common syncing behavior of a multitude of systems (ideally, still under control of the client):

  • no syncing at all
  • per-request data sync
  • periodic syncing of written changes (e.g., common of DBs, MongoDB/WiredTiger, Cassandra, ...) with customizable period

add probabilistic SKIP

In addition to #8, supporting forwarding of the same request to multiple nodes, it would be nice to support the ability to forward on one out of a number of nodes, with some pre-specified probability.
Actually, generalizing the idea, we could have probabilistic execution of arbitrary segments of the specified command/script.

This might be done introducing a probabilistic skip construct, e.g.:

  SKIP(n[, p=1.0])

would skip the next n commands with probability p, and execute them with probability (1-p) instead.

For example, to emulate a random load-balancer:
SKIP(2, 0.66) FORWARD(ip1) ... REPLY SKIP(4) SKIP(2, 0.5) FWD(ip2)...REPLY SKIP(1) FWD(ip3)...REPLY

Here, the first SKIP does a skip 66% of the times, and executes the first FWD and the subsequent deterministic SKIP only 34% of the times. In those 66% skipped instances, it finds another PROB_SKIP, doing 50% of the times the forward to ip2, and 50% to ip3. Overall, it's a 34%, 33%, 33% distribution among ip1, ip2 and ip3. When skipping, a FORWARD is being considered here as a single construct to split, for ease of specification, and also because I can't find a sense of leaving the ability tojump in the middle of a FORWARD body.

Other example: restore the (now lost) ability to have a random workload of LOAD, STORE:
SKIP(2,0.7) SAVE() SKIP(1) LOAD()

We're doing a SAVE 30% of the times, and a LOAD 70% of the times.

In terms of command-line syntax, AFAICS, this can be as straightforward as:

--skip n[,prob=1.0]

AFAICS, this might be realized entirely client-side, i.e., dw_node would never see the SKIP command, rather dw_client would send requests with various probabilistically distributed contents according to the specified probabilistic skips. Albeit, supporting that also server-side doesn't seem difficult at all.

allow for using DPDK as transport

This might be useful for ultra-low latency and/or high-performance services, for which absolute performance is more important than CPU cycles (with DPDK we'd have threads spinning at 100% CPU on the DPDK queues even when there's no packets to process);
The core changes might be the same needed also in #4, as both with UDP and DPDK we loose reliability and streaming of TCP.

thread affinity & NUMA-aware memory allocation

A problem in the current code base under --threads > 1, is that buf_alloc() is called always from the same thread, but the allocated memory buffers are later used from the worker threads. Ideally, we should call buf_alloc() from the various worker threads, so to have some advantage in allocated memory locality on NUMA servers (ideally, with worker threads pinned to physical cores, yet another cmd-line option to add).
The memory allocation part should be automagically handled by PR #26, but in order to work as expected, this needs to be coupled with an option to set a per-thread core affinity.
Could be smth. like:

dw_node --threads 4 --cpu-affinity 0-3

(or also the classical affinity mask specifiers like 0,1,2,3, or 0-1,2-3, etc...). In case --threads specifies more threads than the affinity mask contains, one could either return error, or simply distribute the threads round-robin over the specified CPUs.

With this option, to run dw_node on 4 arbitrary CPUs without forcing core pinning, one would still be able to use

taskset -c 0-3 dw_node --threads 4

threading model in dw_node

Currently, the --per-client-thread command-line option does not do what it seems from its name: it spawns a predefined number of threads (it was equal to MAX_BUFFERS, but now it's specified independently as MAX_THREADS, since commit 6fcca0b). These are used in round-robin fashion as the main thread keeps accept()ing new client connections.

This option should be renamed as smth. like --threads , where one can specify the desired parallelism level , up to a maximum of MAX_THREADS.

Second, the current implementation forces a context-switch at every accept(), increasing unneedlessly the first request latency. A better approach would be to have multiple threads:

  • either accept()ing directly on the same socket;
  • or binding their own socket to the same IP:port, using SO_REUSEPORT.

The second one should be better, according to man 7 socket:
SO_REUSEPORT [...] For TCP sockets, this option allows accept(2) load distribution in a multi-threaded server to be improved by using a distinct listener socket for each thread. This provides improved load distribution as compared to traditional techniques such using a single accept(2)ing thread that distributes connections, or having multiple threads that compete to accept(2) from the same socket.
We could try to measure these differences, though....

Further note from man accept:
There may not always be a connection waiting after a SIGIO is delivered or select(2), poll(2), or epoll(7) return a readability event because the connection might have been removed by an asynchronous network error or another thread before accept() is called. If this happens, then the call will block waiting for the next connection to arrive. To ensure that accept() never blocks, the passed socket sockfd needs to have the O_NONBLOCK flag set (see socket(7)).

add a fork/join commands in protocol

This is useful to model replication protocols: a node would receive a message containing a special FORK message that requests forwarding a subsequence of commands that follow in the same request to multiple node servers; those servers would simply execute processing and/or load/store workload as required, then execute a JOIN to reply back to the forking node; the forking node would be waiting for either all or a quorum of the replies, then execute a REPLY back to the client.

So, the workflow could be something like:

Request
1 COMPUTE(time)
2 FORK(dest1:port1, dest2:port2, dest3:port3; num_cmds=3)
3 COMPUTE(time)
4 STORE(bytes, sync=t/f)
5 REPLY
6 JOIN(type=all/quorum)
7 REPLY

The forking node would only forward to the "slaves" the 3 commands from 3 to 5, then wait for the REPLYies to come in the JOIN command, then REPLY to the client.

FORK might also be realized as an asynchronous variant of the current FORWARD cmd, i.e.:
1 COMPUTE(time)
2 FORWARD(dest1:port1, async=t, cmds=5..7)
3 FORWARD(dest2:port2, async=t, cmds=5..7)
4 FORWARD(dest3:port3, async=t, cmds=5..7, skip=3)
5 COMPUTE(time)
6 STORE(bytes, sync=t/f)
7 REPLY
8 JOIN(type=all/quorum)
9 REPLY

Here, the forking node would forward asynchronously commands from 5 to 7 only, then skip them (only in the last FORWARD) so to jump to the JOIN command, then REPLY to the client.

As from both examples, the JOIN command might have a type option, set to all if we need to wait for all the replies, before continuing, or quorum if we need to wait for just a majority of them. Similarly to the MongoDB r/w "concern", this might actually be an integer telling the JOIN command how many of the replies are expected, before moving forward.

Need to use data.u64 in epoll loop

While dealing with d0e7774, I just noticed that we need to switch to a better use of the data field in epoll event loop(s).

       struct epoll_event {
           uint32_t      events;  /* Epoll events */
           epoll_data_t  data;    /* User data variable */
       };

       union epoll_data {
           void     *ptr;
           int       fd;
           uint32_t  u32;
           uint64_t  u64;
       };

Currently, we use data.fd, that is compared with a few special file descriptors, otherwise we start exec_request() that uses data.u32. Unfortunately, there's quite some likelihood that the 4th, 5th, etc... request would clash with one of the existing file descriptors. So, we need to use the full 64-bits we have there, e.g., by OR-ing one of these

#define DATA_FD (0ul << 32)
#define DATA_CONN_ID (1ul<<32)

like this:

data.u64 = DATA_FD | fd;
data.u64 = DATA_CONN_ID | conn_id;

Automated tests

We need some automated tests to check possible regression bugs as development goes on.

Timeout & Retransmit for FORWARD

Currently, a FORWARD is actually a RPC to another dw_node, waiting for a reply. Real services would not wait forever, but implement a timeout within which to receive a reply. Once the timeout expires, the node should be able to either fail the single request towards the client (no need to close the connection in this case), or retry up to a specified number of times.
This could be added as an optional argument to the FORWARD command, or as an independent command that might be more general (e.g., being applicable also to other commands, or a sequence of them, e.g., having a timeout firing if all computations up to some point in the request do not complete on time).
What happens exactly to the workflow in-progress that didn't complete on time, is to be defined (e.g., aborting it might not be trivial).

Example 1:

TIMEOUT 50us, num_retries=0
FORWARD....
...
REPLY

If the reply to the FORWARD does not come within 50us, the request is failed to the client (a possible REPLY coming after the timeout failed will be ignored by dw_node)

Example 2:

TIMEOUT 50us, num_retries=1
FORWARD....
...
REPLY

If the reply does not come within 50us, try sending the FORWARD again, this time it would fail if the reply does not come within further 50us.

Not sure if, for the retry, we should allow for specifying an alternate endpoint to the FORWARD. Perhaps a way might be to realize the retry as a conditional SKIP:

1 TIMEOUT 50us, skip_on_fail=2
2 FORWARD ip1....
    ...
   REPLY
3 SKIP 2
4 TIMEOUT 50us
5 FORWARD ip2....
    ...
   REPLY

This would be executed as a RPC to ip1, if succeeds, skip the subsequent 2 commands (4 TIMEOUT and 5 FORWARD), otherwise if the timeout fires, then skip the 2 FORWARD...REPLY and 3 SKIP, thus jump to 4 TIMEOUT, that would retry for another 50us with a FORWARD to a different endpoint ip2, this time failing if the REPLY doesn't come on time.

This feature might be mixed with the multi-forward fork/join from #8, in this case the abort policy seems easy, whilst the retransmit might need some design clarifications.

dw_client: ability to read scenario(s) from script(s)

In order to specify a workload scenario in dw_client, in addition to providing command-line arguments, it might be useful to have the possibility to specify a "script" file, e.g.:

  dw_client -f script.dw

where script.dw could have the same exact syntax as the command-line arguments, perhaps with some more comfortable "dashless" way of specifying commands, e.g., the command

  -C 10000 -L 16384 -rs 512

might be specified with a script file like

#!dw_client -f

COMPUTE 10000us
LOAD 16KB
REPLY 512B

where, adding the "shell bang" first line, would allow for launching directly the script from the shell, making it executable.

The sharp symbol "#" could be used throughout the script to add comments that would be completely ignored by dw_client.

Restore epoll() based interactions

It's quite some time that setnonblock() was commented out, and dw_node is only used in --per-client-thread mode, basically not using epoll()-based interactions. Those should be restored, where one critical point is the FORWARD at the moment, that needs 3 different substates:

  • CONNECTING to the forwarded IP:port
  • SENDING data to the forwarded socket
  • RECEIVING a reply from the forwarded socket

Further attention is needed by LOAD and particularly STORE, that need further substates as well:

  • LOADING or STORING data
  • syncing writes on disk (for STORE)

Furthermore, process_messages() needs to keep track of the specific command in m->cmds[] that is being processed in the current state, so to be able to keep going from where it left off.

Use properly htonx() to make network protocol portable

Numeric arguments supplied in the client/node and node/node protocol should make use of the appropriate htonx() and ntohx() macros, to support different architectures (this was not a priority till now, but it has to be fixed now).

Generic ASYNC block

Just as a brainstorming around the recent #40 and #41, we might also think of a generic ASYNC block, applicable to any command, including COMPUTE, FORWARD/LOAD/STORE, CALL/RPC, etc..., e.g.:

ASYNC 2
COMPUTE 50ms
SKIP 1
LOAD 20B

would mean: emulate computations for 50ms while in parallel read 20B from the local storage;

ASYNC 2
COMPUTE 50ms
SKIP 1
COMPUTE 50ms

This would be a request for a parallel computation on 2 threads, if available;
and, a FORK might actually be implemented with an ASYNC + FORWARD_F&F (fire & forget) like this:

ASYNC n1 (n. commands in 1st branch)
  FORWARD_FF IP1:port1
  COMPUTE
  ...
ASYNC n2 (n. commands in 2nd branch)
  FORWARD_FF IP2:port2
  STORE
  ...
// 3rd branch
FORWARD_FF IP3:port3
LOAD
...

complete FORWARD

FORWARD was never finished in node.c, and cannot be used at the moment;
it needs to be completed with connect() to the forwarding nodes, to be added to sock_info[];
this needs to be done going through epoll_wait(), to avoid blocking the thread waiting for connection establishment

REPLY with flexible destination

-) Optionally change the destination of a REPLY. This is useful for DAG topologies with no-reply-back. It will be the last node of the chain to reply to the client
-) Optionally exclude a final REPLY for fire-and-forget use-case, even in the case of FORWARD messages. In the latter case, it is needed a mechanism to join a forked stream without relying on the node that called FORWARD
(however, we probably want to reply to the client nonetheless).

A first step may be converting REPLY to a "special" FORWARD case.

Secure exchanges via SSL/TLS

It might be good to support SSL/TLS in TCP-based communications between dw_client and dw_node, and among dw_node instances. For example, this might be supported through a 'https://x.y.w.z' syntax if using http-based communications, as per #10. Or, this might just be some additional option on the client command-line that prescribes use of SSL/TLS-based communications with some given parameters (certificate file(s), key file(s), required encryption algorithms, ...).

use store_opts to vehicle offset and load/store data size for disk operations

The store_opts (currently unused) structure might be useful to emulate sequential vs random data access patterns.
Also, it might be useful to tell the server what storage device to use, e.g., though a simple dev_id identifier (0, 1, 2, ...), to exploit multiple disk devices if available on the server, for emulating disk-related workloads taking advantage of multiple disks (e.g., data stores).

Related issue: #20

improve packet format with variable-size command payloads

Currently, a request leaves the client with a single request-size, then an array of embedded equal-sized commands.
This is about adding a proper request format supporting variable-sized commands, each with possibly variable-sized workload.

Docs update

After the recent developments, DistWalk deserves an End-of-Year documentation update, with a full review of the project README, the help message, and probably the creation of a man page for dw_client (and dw_node).

add support for the HTTP protocol in request/response exchanges

Currently, the client and server communicate using a binary custom protocol over a TCP connection. In order to emulate what happens with typical web servers and SOA traffic, and exploit special features of, e.g., load-balancers with http special handling, it is useful to add as an option the ability to use the HTTP protocol

check how to reuse cached sockets in socks[]

If I have a client asking its server node s to forward the request to a node n, and then another client asking the same server node s to forward the request to the same node n, then we'd try to reuse the socket already established between s and n, for the second client request. However, as n will reply onto the same socket, when the result comes back from the socket, we need to differentiate which client the answer is for. This is a case that cannot be handled in the current forward() code sketch, because a client sends a forward, expecting its reply back from that socket, but the reply could actually be for someone else.
In the shortest term, to recover consistency, we should instantiate the socks[] cache to be per-client (thread).
However, this case could be handled properly with a forward integrated properly within the epoll() machinery.

compute: add multiple CPU stressing abilities

Currently, the COMPUTE() loop is a pure time-wasting loop, designed to be insensitive to the DVFS/frequency settings of the underlying CPU, as well as to the underlying CPU architecture/type. This avoids us the need to lock the CPU frequency for each and every test, but it is far from representative of how real workloads behave. Furthermore, this makes it impossible to investigate on smart DVFS/performance trade-off management policies, esp. on heterogeneous distributed environments (including GPU/CUDA, cloud/edge, ...).

We'd need to have more involved stressing loops, that can be realized as:

  • simple loops that perform a pre-fixed amount of work (i.e., -C 10000 would mean 10ms at max frequency, but it will take longer for lower frequencies)
  • loops with more involved data processing, to exercise more elements of the CPU (adders, multipliers, floating point unit(s), ...
  • loops swiping a predefined amount of data, to emulate workloads with varying working-set sizes, e.g., data compression/decompression or encryption/decryption, A/V media encoding/decoding, algebraic computations (vectors, matrices and the likes), big-data workloads (very big vectors and matrices), ML/AI workloads (DNNs);

For example, we might be willing to specify, e.g., the number of times we gzip/encrypt/etc. with what operation parameters a data chunk of a given size; or, how many blocks 16x16 (or with different sizes if supported) of an image we process with a mpeg or different encoder/decoder, or what image of what size/resolution/depth we process with what image filter (e.g., using the ImageMagick library...).

Stressing different features of the CPU, its L1/L2/LLC cache, the memory datapath and memory controller(s), is useful to emulate phenomena of noisy neighbor due to cache-level interference or memory-access interference/saturation, as typically happens with cloud workloads, esp. in presence of big-data processing pipelines.

Perhaps some of the above might be realized by spawning stress-ng, or any command-line utility the client decides (ok, security concerns...), albeit that would imply forking and waiting for a process to complete per request. Web servers might still do that through CGI-BIN.

Probably adding CUDA/GPU workloads is also a must...

improve parallelism for independent requests picked up by same worker thread

In the current master, a connection worker thread can receive multiple messages from a socket, but then it stops processing them at the first blocking command (disk operation or FORWARD). This is fine if messages are from the same client, if we assume a strictly sequential semantics of messages sent by the same client over the same session (TCP connection). This is a natural and safe assumption at the moment, albeit in the future we might allow for parallelism also in that regard, if required by the client (e.g., multiple stores submitted after one another to a distributed data store made with different keys might be executed in parallel), i.e., using a command-level specification of sequentiality/serializability dependency w.r.t. operations issued previously. This could be achieved with a variant of the current MULTI_FORWARD at the protocol level, though.

However, when a connection was established by a different dw_node, then messages might belong to different sessions from potentially different and independent clients performing FORWARD operations. In this case, we should be able to bring forward processing of messages received after a message requiring a blocking interaction, if they're independent (i.e., NOT from the same client/session).

At the moment, messages are only distinguished by their request Id, assigned independently by each client. From a logical viewpoint, we would need to distinguish messages belonging to the same session or not. The message format might be enhanced having a (session Id, request Id), but the problem is that assigning unique session Id might be challenging, as clients might have different entry points in the dw_node processing graph.

Possible ways to assign a session Id:

  • UUID (random and quite long session Id -> might introduce slowness to process/hash it all the times)
  • unique increasing session Id computed by a "master" node, among those in the dw_node cluster -> what if we're going to let this "master" node fail, in some scenarios?
  • unique increasing session Id computed with a quorum-based peer-to-peer protocol by dw_node instances

Considering a connectionless transport like UDP, a session Id might anyway be established via a special message to be sent to (any node in?) the dw_node cluster at the beginning of a set of interactions. Such initial interaction might also be the one that in the current TCP-based transport establishes a connection.

add command to store/sync a given data size onto disk

in addition to the process, forward and reply commands, the distwalk should also support the possibility to specify a disk write operation that should be sync()ed on disk, before letting the next command be executed (e.g, reply or forward); similarly, we should have a disk read operation; for both, we should be able to specify the amount of data to write (alongside with the actual data to write, possibly) or read (causing that amount of data to be returned with a reply) as a parameter of the command.

add multiple probability distribution options to dw_client

The client currently only supports specification of fixed or exponentially distributed processing times, inter-arrival times, and send or reply packet sizes. However, there is very little ability to specify different combinations of these parameters at the moment, especially if using chains of multiple commands to be executed on different hosts using the FORWARD.
We should allow for a richer set of distributions, e.g.:

  • uniformly distributed within a (min,max) range;
  • exp-distributed with explicit (min,max) caps;
  • normally distributed with specified average and standard deviation;
  • Gamma distribution with parameters k and theta;
  • generic PMF distribution specified as vector of equally-sized bins;
  • generic distribution specified as samples of the CDF;
  • file-based samples, i.e., values read from client-side file containing experimental samples that are simply replayed.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.