Code Monkey home page Code Monkey logo

broker's Introduction

Broker: Zeek's Messaging Library

The Broker library implements Zeek's high-level communication patterns:

  • remote logging
  • remote events
  • distributed data stores

Remote logging and events all follow a pub/sub communication model between Broker endpoints that are directly peered with each other. An endpoint also has the option of subscribing to its own messages. Subscriptions are matched prefix-wise and endpoints have the capability of fine-tuning the subscription topic strings they wish to advertise to peers as well as the messages they wish to send to them.

The distributed data store functionality allows a master data store associated with one Broker endpoint to be cloned at peer endpoints which may then perform lightweight, local queries against the clone, which automatically stays synchronized with the master store. Clones cannot modify their content directly, instead they send modifications to the centralized master store which applies them and then broadcasts them to all clones.

Applications which integrate the Broker library may communicate with each other using the above-mentioned patterns which are common to Zeek.

See the User Manual for more information. For offline reading, it's also available in the doc/ directory of the source tree.

See the NEWS file for the most important release notes and the CHANGES file for the complete history of changes.

Dependencies

Compiling Broker requires the following libraries/tools to already be installed:

The optional Python bindings also require Python 3.5 or greater along with Python development packages.

By default, Broker will use an integrated version of the C++ Actor Framework (CAF; https://actor-framework.org), though there's still the option to specify an exernal CAF version via the --with-caf= configure script option.

Compiling/Installing

On UNIX, we provide a configure script to automate the CMake setup. To compile and install into /usr/local:

./configure
make
make install

See ./configure --help for more advanced configuration options.

On Windows, we support MSVC natively. We do not support builds via MinGW or Cygwin. For Windows builds, please use CMake directly and use the CMake generator for your Visual Studio version. Afterwards, you can either open the project file with Visual Studio to build Broker, or you can build directly from the command line using CMake:

cmake --build <build-dir> --target install --config release

Please note that Broker currently only supports static builds on MSVC.

broker's People

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

broker's Issues

Segfault on core unit test with CAF 0.17

+----------------------------------------------------------------------+
                                  core
+----------------------------------------------------------------------+

- local_peers
  -> connect a consumer (leaf) to core2 [line 132]
  -> core1: 5@invalid-node [line 134]
  -> core2: 8@invalid-node [line 135]
  -> leaf: 9@invalid-node [line 136]
  -> expect(atom_value, filter_type).from(leaf).to(core2).with(join_atom::value, filter_type{"b"}) [line 139]
  -> expect(atom::peer, actor).from(self).to(core1).with(_, core2) [line 143]
  -> query peer information from core1 [line 145]
  -> run handshake between peers [line 156]
AddressSanitizer:DEADLYSIGNAL
=================================================================
==6273==ERROR: AddressSanitizer: SEGV on unknown address 0x000000000000 (pc 0x000115054871 bp 0x7ffee13eac70 sp 0x7ffee13eac50 T0)
==6273==The signal is caused by a READ memory access.
==6273==Hint: address points to the zero page.
    #0 0x115054870 in std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >::__is_long() const string:1406
    #1 0x115054eb8 in std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >::__get_pointer() const string:1500
    #2 0x115054e74 in std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >::data() const string:1228
    #3 0x11503c284 in std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >::c_str() const string:1226
    #4 0x11503c253 in caf::logger::line_builder::operator<<(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) logger.cpp:271
    #5 0x114fb2dd4 in caf::inbound_path::inbound_path(caf::intrusive_ptr<caf::stream_manager>, caf::stream_slots, caf::intrusive_ptr<caf::actor_control_block>, std::__1::pair<unsigned short, std::type_info const*>) inbound_path.cpp:80
    #6 0x114fb3d14 in caf::inbound_path::inbound_path(caf::intrusive_ptr<caf::stream_manager>, caf::stream_slots, caf::intrusive_ptr<caf::actor_control_block>, std::__1::pair<unsigned short, std::type_info const*>) inbound_path.cpp:77
    #7 0x115205c25 in caf::scheduled_actor::make_inbound_path(caf::intrusive_ptr<caf::stream_manager>, caf::stream_slots, caf::intrusive_ptr<caf::actor_control_block>, std::__1::pair<unsigned short, std::type_info const*>) scheduled_actor.cpp:892
    #8 0x11536da5d in caf::stream_manager::add_unchecked_inbound_path_impl(std::__1::pair<unsigned short, std::type_info const*>) stream_manager.cpp:315
    #9 0x110be9823 in unsigned short caf::stream_manager::add_unchecked_inbound_path<broker::node_message>(caf::stream<broker::node_message> const&) stream_manager.hpp:250
    #10 0x110be9191 in broker::detail::core_policy::ack_peering(caf::stream<broker::node_message> const&, caf::actor const&) core_policy.cc:284
    #11 0x1106d5727 in broker::core_actor(caf::stateful_actor<broker::core_state, caf::event_based_actor>*, std::__1::vector<broker::topic, std::__1::allocator<broker::topic> >, broker::broker_options, broker::endpoint::clock*)::$_18::operator()(caf::stream<broker::node_message> const&, std::__1::vector<broker::topic, std::__1::allocator<broker::topic> >&, caf::actor&) const core_actor.cc:319
    #12 0x1106d38a7 in caf::unit_t caf::lfinvoker<true, broker::core_actor(caf::stateful_actor<broker::core_state, caf::event_based_actor>*, std::__1::vector<broker::topic, std::__1::allocator<broker::topic> >, broker::broker_options, broker::endpoint::clock*)::$_18>::operator()<caf::stream<broker::node_message>&, std::__1::vector<broker::topic, std::__1::allocator<broker::topic> >&, caf::actor&>(caf::stream<broker::node_message>&&&, std::__1::vector<broker::topic, std::__1::allocator<broker::topic> >&&&, caf::actor&&&) match_case.hpp:92
    #13 0x1106d3060 in decltype(fp(get<0l>(fp1), get<1l>(fp1), get<2l>(fp1))) caf::detail::apply_args<caf::lfinvoker<true, broker::core_actor(caf::stateful_actor<broker::core_state, caf::event_based_actor>*, std::__1::vector<broker::topic, std::__1::allocator<broker::topic> >, broker::broker_options, broker::endpoint::clock*)::$_18>, 0l, 1l, 2l, caf::detail::pseudo_tuple<caf::stream<broker::node_message>, std::__1::vector<broker::topic, std::__1::allocator<broker::topic> >, caf::actor> >(caf::lfinvoker<true, broker::core_actor(caf::stateful_actor<broker::core_state, caf::event_based_actor>*, std::__1::vector<broker::topic, std::__1::allocator<broker::topic> >, broker::broker_options, broker::endpoint::clock*)::$_18>&, caf::detail::int_list<0l, 1l, 2l>, caf::detail::pseudo_tuple<caf::stream<broker::node_message>, std::__1::vector<broker::topic, std::__1::allocator<broker::topic> >, caf::actor>&) apply_args.hpp:38
    #14 0x1106d2d1f in caf::trivial_match_case<broker::core_actor(caf::stateful_actor<broker::core_state, caf::event_based_actor>*, std::__1::vector<broker::topic, std::__1::allocator<broker::topic> >, broker::broker_options, broker::endpoint::clock*)::$_18>::invoke(caf::detail::invoke_result_visitor&, caf::type_erased_tuple&) match_case.hpp:164
    #15 0x114d064c9 in caf::detail::behavior_impl::invoke(caf::detail::invoke_result_visitor&, caf::type_erased_tuple&) behavior_impl.cpp:102
    #16 0x114d06de9 in caf::detail::behavior_impl::invoke(caf::detail::invoke_result_visitor&, caf::message&) behavior_impl.cpp:135
    #17 0x1152177a9 in caf::behavior::operator()(caf::detail::invoke_result_visitor&, caf::message&) behavior.hpp:115
    #18 0x1151fdb58 in caf::scheduled_actor::handle_open_stream_msg(caf::mailbox_element&) scheduled_actor.cpp:1120
    #19 0x1151fae76 in caf::scheduled_actor::categorize(caf::mailbox_element&) scheduled_actor.cpp:619
    #20 0x115200ea4 in caf::scheduled_actor::consume(caf::mailbox_element&) scheduled_actor.cpp:676
    #21 0x1151df25b in caf::scheduled_actor::reactivate(caf::mailbox_element&) scheduled_actor.cpp:786
    #22 0x1151ddfe3 in caf::scheduled_actor::mailbox_visitor::operator()(caf::mailbox_element&) scheduled_actor.cpp:358
    #23 0x11526dd24 in caf::intrusive::task_result caf::scheduled_actor::mailbox_visitor::operator()<caf::intrusive::drr_cached_queue<caf::policy::normal_messages> >(unsigned long, caf::intrusive::drr_cached_queue<caf::policy::normal_messages>&, caf::mailbox_element&) scheduled_actor.hpp:234
    #24 0x11526dc7e in decltype(decltype(std::__1::__declval<caf::scheduled_actor::mailbox_visitor&>(0)) std::__1::declval<caf::scheduled_actor::mailbox_visitor&>()()(decltype(std::__1::__declval<caf::scheduled_actor::mailbox_visitor&>(0)) std::__1::declval<std::__1::integral_constant<unsigned long, 1ul> >()(), decltype(std::__1::__declval<caf::scheduled_actor::mailbox_visitor&>(0)) std::__1::declval<caf::intrusive::drr_cached_queue<caf::policy::normal_messages>&>()(), std::forward<caf::intrusive::drr_cached_queue<caf::policy::normal_messages>&>(fp)...)) caf::intrusive::wdrr_fixed_multiplexed_queue<caf::policy::categorized, caf::intrusive::drr_cached_queue<caf::policy::urgent_messages>, caf::intrusive::drr_cached_queue<caf::policy::normal_messages>, caf::intrusive::drr_queue<caf::policy::upstream_messages>, caf::intrusive::wdrr_dynamic_multiplexed_queue<caf::policy::downstream_messages> >::new_round_recursion_helper<1ul, caf::intrusive::drr_cached_queue<caf::policy::normal_messages>, caf::scheduled_actor::mailbox_visitor>::operator()<caf::mailbox_element&>(caf::intrusive::drr_cached_queue<caf::policy::normal_messages>&&&...) wdrr_fixed_multiplexed_queue.hpp:170
    #25 0x11526cc85 in caf::intrusive::new_round_result caf::intrusive::drr_cached_queue<caf::policy::normal_messages>::new_round<caf::intrusive::wdrr_fixed_multiplexed_queue<caf::policy::categorized, caf::intrusive::drr_cached_queue<caf::policy::urgent_messages>, caf::intrusive::drr_cached_queue<caf::policy::normal_messages>, caf::intrusive::drr_queue<caf::policy::upstream_messages>, caf::intrusive::wdrr_dynamic_multiplexed_queue<caf::policy::downstream_messages> >::new_round_recursion_helper<1ul, caf::intrusive::drr_cached_queue<caf::policy::normal_messages>, caf::scheduled_actor::mailbox_visitor> >(unsigned long, caf::intrusive::wdrr_fixed_multiplexed_queue<caf::policy::categorized, caf::intrusive::drr_cached_queue<caf::policy::urgent_messages>, caf::intrusive::drr_cached_queue<caf::policy::normal_messages>, caf::intrusive::drr_queue<caf::policy::upstream_messages>, caf::intrusive::wdrr_dynamic_multiplexed_queue<caf::policy::downstream_messages> >::new_round_recursion_helper<1ul, caf::intrusive::drr_cached_queue<caf::policy::normal_messages>, caf::scheduled_actor::mailbox_visitor>&) drr_cached_queue.hpp:190
    #26 0x11526c256 in std::__1::enable_if<(1ul) != (caf::intrusive::wdrr_fixed_multiplexed_queue<caf::policy::categorized, caf::intrusive::drr_cached_queue<caf::policy::urgent_messages>, caf::intrusive::drr_cached_queue<caf::policy::normal_messages>, caf::intrusive::drr_queue<caf::policy::upstream_messages>, caf::intrusive::wdrr_dynamic_multiplexed_queue<caf::policy::downstream_messages> >::num_queues), caf::intrusive::new_round_result>::type caf::intrusive::wdrr_fixed_multiplexed_queue<caf::policy::categorized, caf::intrusive::drr_cached_queue<caf::policy::urgent_messages>, caf::intrusive::drr_cached_queue<caf::policy::normal_messages>, caf::intrusive::drr_queue<caf::policy::upstream_messages>, caf::intrusive::wdrr_dynamic_multiplexed_queue<caf::policy::downstream_messages> >::new_round_recursion<1ul, caf::scheduled_actor::mailbox_visitor>(unsigned long, caf::scheduled_actor::mailbox_visitor&) wdrr_fixed_multiplexed_queue.hpp:200
    #27 0x11526b10f in std::__1::enable_if<(0ul) != (caf::intrusive::wdrr_fixed_multiplexed_queue<caf::policy::categorized, caf::intrusive::drr_cached_queue<caf::policy::urgent_messages>, caf::intrusive::drr_cached_queue<caf::policy::normal_messages>, caf::intrusive::drr_queue<caf::policy::upstream_messages>, caf::intrusive::wdrr_dynamic_multiplexed_queue<caf::policy::downstream_messages> >::num_queues), caf::intrusive::new_round_result>::type caf::intrusive::wdrr_fixed_multiplexed_queue<caf::policy::categorized, caf::intrusive::drr_cached_queue<caf::policy::urgent_messages>, caf::intrusive::drr_cached_queue<caf::policy::normal_messages>, caf::intrusive::drr_queue<caf::policy::upstream_messages>, caf::intrusive::wdrr_dynamic_multiplexed_queue<caf::policy::downstream_messages> >::new_round_recursion<0ul, caf::scheduled_actor::mailbox_visitor>(unsigned long, caf::scheduled_actor::mailbox_visitor&) wdrr_fixed_multiplexed_queue.hpp:207
    #28 0x11526ac68 in caf::intrusive::new_round_result caf::intrusive::wdrr_fixed_multiplexed_queue<caf::policy::categorized, caf::intrusive::drr_cached_queue<caf::policy::urgent_messages>, caf::intrusive::drr_cached_queue<caf::policy::normal_messages>, caf::intrusive::drr_queue<caf::policy::upstream_messages>, caf::intrusive::wdrr_dynamic_multiplexed_queue<caf::policy::downstream_messages> >::new_round<caf::scheduled_actor::mailbox_visitor>(unsigned long, caf::scheduled_actor::mailbox_visitor&) wdrr_fixed_multiplexed_queue.hpp:93
    #29 0x1151e6d7f in caf::intrusive::new_round_result caf::intrusive::fifo_inbox<caf::scheduled_actor::mailbox_policy>::new_round<caf::scheduled_actor::mailbox_visitor>(unsigned long, caf::scheduled_actor::mailbox_visitor&) fifo_inbox.hpp:174
    #30 0x1151e272c in caf::scheduled_actor::resume(caf::execution_unit*, unsigned long) scheduled_actor.cpp:399
    #31 0x1153b8812 in caf::scheduler::test_coordinator::try_run_once() test_coordinator.cpp:110
    #32 0x10e8f1a6e in test_coordinator_fixture<(anonymous namespace)::config>::consume_message() dsl.hpp:580
    #33 0x10e94fd67 in unsigned long test_coordinator_fixture<(anonymous namespace)::config>::run_until<test_coordinator_fixture<(anonymous namespace)::config>::run()::'lambda'()>(test_coordinator_fixture<(anonymous namespace)::config>::run()::'lambda'()) dsl.hpp:643
    #34 0x10e94f094 in test_coordinator_fixture<(anonymous namespace)::config>::run() dsl.hpp:628
    #35 0x10e8e6873 in local_tests::(anonymous namespace)::test123::run_test_impl() core.cc:157
    #36 0x10e8dd27a in caf::test::test_impl<local_tests::(anonymous namespace)::test123>::run_test_impl() unit_test.hpp:277
    #37 0x10f020550 in caf::test::engine::run(bool, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, int, int, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) unit_test_impl.hpp:372
    #38 0x10f02ac58 in caf::test::main(int, char**) unit_test_impl.hpp:571
    #39 0x10f02dac1 in main test.cpp:63
    #40 0x7fff65d2c3d4 in start (libdyld.dylib:x86_64+0x163d4)

Python Bindings: attach_clone does not block and results in a race condition

Creating and immediately referencing a clone data store within the Python bindings will result in a RuntimeError. This appears to be because the attach_clone call returns before the store is ready for use. From my experience, this is a pretty unusual pattern in Python. Is there a way to determine if the store is ready for use before actually referencing it?

Here, we can see this behavior in action:

(.env) devel:~/pybroker# cat test-client.py 
import broker

e = broker.Endpoint()
e.peer('127.0.0.1', 47762)

stor = e.attach_clone('bro/discovery/nodes')
print(stor.keys())
(.env) devel:~/pybroker# python test-client.py 
Traceback (most recent call last):
  File "test-client.py", line 7, in <module>
    print(stor.keys())
  File "/root/pybroker/.env/lib/python3.6/site-packages/broker/__init__.py", line 223, in keys
    return Data.to_py(keys.get())
RuntimeError: invalid type found

Adding a simple sleep call fixes this:

(.env) devel:~/pybroker# cat test-client.py 
import broker
import time

e = broker.Endpoint()
e.peer('127.0.0.1', 47762)

stor = e.attach_clone('bro/discovery/nodes')
time.sleep(3)
print(stor.keys())
(.env) devel:~/pybroker# python test-client.py 
{'control', 'proxy-1', 'worker-1', 'logger', 'manager', 'worker-2'}

Python3 bindings don't get built correctly with --with-python

I think this is the underlying issue to zeek/zeekctl#24

Since Python2 is now end-of-lifed, I built Zeek 3.0.2 with --with-python=/bin/python3. However, trying to import the bindings gives:

[zeek@zeek-box ~]$ /bin/python3
Python 3.6.8 (default, Aug  7 2019, 17:28:10)
[GCC 4.8.5 20150623 (Red Hat 4.8.5-39)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import sys
>>> sys.path.insert(0, "/usr/local/zeek/lib/zeekctl")
>>> import broker
Traceback (most recent call last):
  File "/usr/local/zeek/lib/zeekctl/broker/__init__.py", line 3, in <module>
    from . import _broker
ImportError: dynamic module does not define module export function (PyInit__broker)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/zeek/lib/zeekctl/broker/__init__.py", line 5, in <module>
    import _broker
ModuleNotFoundError: No module named '_broker'

It looks like Python3 changed the module format a bit?

Missing documentation of enum_value

I noticed in the documentation that the types section of the broker lacks a section about the enum_value type.

The header broker/data.hh clearly supports enum_value as a data_variant so it should be included in the docs.

I'd update the file doc/data.rst with a sentence or two that describes the type like so:

An enum_value wraps enum types defined by zeek and is a type alias for std::string.

However, I am not sure that this accurate, given that I just learned about its existence.

Python User Experience (make a PyPI package for bindings)

The easiest way I could figure out to get python bindings up and running (I didn't even figure it out, thanks Justin!) was to install Bro and set my PYTHONPATH to /usr/local/bro/lib/broctl.

Is there a better way to install the bindings that I just failed to realize?

It would be immensely convenient to have something like a pypi package, so that broker bindings could be installed in a virtualenv.

Python Bindings: Malformed Zeek events cause segmentation fault

Bug description:

When using the python bindings, malformed broker messages can lead to a segfault of the receiving python process.

Zeek Version: v3.0.1
Broker Version: v1.2.4

Minimal example:

  1. Sender sends an invalid event
import broker
ep = broker.Endpoint()
ep.peer("localhost", 9999)
ep.publish("topic", None)
  1. Receiver parses messages
import broker
ep = broker.Endpoint()
ep.listen('localhost', 9999)
sub = ep.make_subscriber('topic')
msg = sub.get()    # wait for sender (1)
parsed = broker.zeek.Event(msg)
parsed.args()      # <-- this segfaults
# parsed.name()    # <-- that would also segfault

I tested the following variations and all do lead to a segfault on the receiving side:

ep.publish("topic", None)
ep.publish("topic", "hi")
ep.publish("topic", 0)
ev = broker.zeek.Event(None)
ep.publish("topic", ev)

Expected behavior:

Python receiver does not segfault.

Comparison -- Zeek behavior:

Interestingly, Zeek does not segfault when it receives malformed messages. Using the same python sender and the following receiver code, Zeek simply prints a warning.

event zeek_init() {
    Broker::listen("localhost", 9999/tcp);
    Broker::subscribe("topic");
}

warning: received invalid broker message: nil

Passing a Record as Event data

Hello
Is there a way we could pass a Record as Event data?

I mean, the Python 'dict' is mapped to Bro Table type, which restrict the dict having all values of the same type ('string' for example).

But a Record could contain different values types (a timestamp, an int counter, a string description).

Or maybe there is a workaround to pass 'Record' between Bro and Python through Broker ?

Thanks :-)

Debugging output not working as expected

Not sure if I'm missing something, but I'm not getting the debug output I'd expect.

According to an old commit, we have 3 environment vars:

        BROKER_DEBUG_LEVEL=<level>
            Set the CAF logging level explicitly.

        BROKER_DEBUG_COMPONENT_FILTER
            Set the CAF component filter explicitly.

        BROKER_DEBUG_VERBOSE=1
            Switch back to old settings: logging at level DEBUG, no
            component filtering.

However, no matter how I set these, I don't get any Broker-level messages, but only caf.flow DEBUG level messages. Would be good to review how debug logging is working currently and document how to activate it.

Note I noticed this outside of Zeek with the new zeek-agent. That one hardcodes the debug level to -1 currently, which I changed to 2 when trying this. See also zeek/zeek-agent#2

Differentiate error from status messages (Python bindings)

So I have a status subscriber here in my pybroker which receives both status and error messages. Now I just received an error. Unfortunately I was unable to reproduce the error in my more complicated, dynamic setup. But I managed to recreate it with a simple script:

import broker

ep = broker.Endpoint()
status = ep.make_status_subscriber(True)
ep.peer("127.0.0.1", 9999, retry=0)
# this peer does not exist so it will produce an error on our status
# subscriber

s = status.get()

if s.code() == broker.SC.PeerAdded:
    print("Hurra")
else:
    print("Nope")

This produces the error I received in my original setup as well:

Traceback (most recent call last):
  File (path removed for privacy reasons), line 11, in <module>
    if s.code() == broker.SC.PeerAdded:
TypeError: __eq__(): incompatible function arguments. The following argument types are supported:
    1. (self: broker._broker.SC, arg0: broker._broker.SC) -> bool

Invoked with: SC.PeerAdded, 4

The problem obviously is that the SC codes only go up to 3 while the error message (which is not part of the SC enum) has the number 4. Of course, I should first check if this really is a status message, but here is the problem: I can't. There is almost (see next paragraph) no way to check if the value I receive with status.get() is an error or a status. In the _broker.cpp for the Python Bindings I can see that there is in fact a ValueType class that supports checks like isError but it appears to be unusable in the Python files.

It discovered that is possible to use if isinstance(s, broker._broker.Error): in order to find out if it is an error, but I assume that the _broker class is supposed to be hidden and therefore not to be accessed by the users, so I think it might be a good idea to implement a check for error messages for the status_subscriber output in order to avoid dirty workarounds.

make fails if log level is not declared?

Hey,

I really appreciate the option to set a log level in the configure command now. But I believe there might be an error if the log level option is NOT set. It looks like there is a fallback that should set it to quiet, but it seems to fail when make is executed. I might have made a mistake there, so I'll just leave this here for you to check:

jens@iss-lab3:~/broker$ make
make -C build all
make[1]: Entering directory '/home/jens/broker/build'
make[2]: Entering directory '/home/jens/broker/build'
make[3]: Entering directory '/home/jens/broker/build'
Scanning dependencies of target project_caf
make[3]: Leaving directory '/home/jens/broker/build'
make[3]: Entering directory '/home/jens/broker/build'
[  1%] Building CAF
make[4]: Entering directory '/home/jens/broker/build/caf-build'
make[5]: Entering directory '/home/jens/broker/build/caf-build'
make[6]: Entering directory '/home/jens/broker/build/caf-build'
make[6]: Leaving directory '/home/jens/broker/build/caf-build'
[  0%] Built target libcaf_core
make[6]: Entering directory '/home/jens/broker/build/caf-build'
make[6]: Leaving directory '/home/jens/broker/build/caf-build'
make[6]: Entering directory '/home/jens/broker/build/caf-build'
[  0%] Building CXX object libcaf_core/CMakeFiles/libcaf_core_shared.dir/src/logger.cpp.o
In file included from /home/jens/broker/3rdparty/caf/libcaf_core/caf/config.hpp:22:0,
                 from /home/jens/broker/3rdparty/caf/libcaf_core/caf/logger.hpp:31,
                 from /home/jens/broker/3rdparty/caf/libcaf_core/src/logger.cpp:19:
/home/jens/broker/3rdparty/caf/libcaf_core/src/logger.cpp: In constructor โ€˜caf::logger::logger(caf::actor_system&)โ€™:
/home/jens/broker/3rdparty/caf/libcaf_core/caf/detail/build_config.hpp:24:23: error: โ€˜QUIETโ€™ was not declared in this scope
 #define CAF_LOG_LEVEL QUIET
                       ^
/home/jens/broker/3rdparty/caf/libcaf_core/src/logger.cpp:236:14: note: in expansion of macro โ€˜CAF_LOG_LEVELโ€™
       level_(CAF_LOG_LEVEL),
              ^
libcaf_core/CMakeFiles/libcaf_core_shared.dir/build.make:1478: recipe for target 'libcaf_core/CMakeFiles/libcaf_core_shared.dir/src/logger.cpp.o' failed
make[6]: *** [libcaf_core/CMakeFiles/libcaf_core_shared.dir/src/logger.cpp.o] Error 1
make[6]: Leaving directory '/home/jens/broker/build/caf-build'
CMakeFiles/Makefile2:214: recipe for target 'libcaf_core/CMakeFiles/libcaf_core_shared.dir/all' failed
make[5]: *** [libcaf_core/CMakeFiles/libcaf_core_shared.dir/all] Error 2
make[5]: Leaving directory '/home/jens/broker/build/caf-build'
Makefile:127: recipe for target 'all' failed
make[4]: *** [all] Error 2
make[4]: Leaving directory '/home/jens/broker/build/caf-build'
CMakeFiles/project_caf.dir/build.make:116: recipe for target 'caf-ep/src/project_caf-stamp/project_caf-project_caf_build_step' failed
make[3]: *** [caf-ep/src/project_caf-stamp/project_caf-project_caf_build_step] Error 2
make[3]: Leaving directory '/home/jens/broker/build'
CMakeFiles/Makefile2:152: recipe for target 'CMakeFiles/project_caf.dir/all' failed
make[2]: *** [CMakeFiles/project_caf.dir/all] Error 2
make[2]: Leaving directory '/home/jens/broker/build'
Makefile:127: recipe for target 'all' failed
make[1]: *** [all] Error 2
make[1]: Leaving directory '/home/jens/broker/build'
Makefile:8: recipe for target 'all' failed
make: *** [all] Error 2

My configure command was jens@iss-lab3:~/broker$ ./configure --with-python=/usr/bin/python3.5. I had just pulled the recent version of the repository before that.

Compiler warnings

Seeing the following compiler warnings in broker/master now that are a bit distracting, anything that can be done to fix them @Neverlord ?

E.g. should be able to repro w/ GCC 7.4.0, Ubuntu 18

[67/274] Building CXX object 3rdparty/caf/libcaf_core/CMakeFiles/libcaf_core_shared.dir/src/actor_system_config.cpp.o
In file included from /usr/include/c++/7/tuple:39:0,
                 from /usr/include/c++/7/bits/unique_ptr.h:37,
                 from /usr/include/c++/7/memory:80,
                 from ../3rdparty/caf/libcaf_core/caf/actor_system_config.hpp:23,
                 from ../3rdparty/caf/libcaf_core/src/actor_system_config.cpp:19:
../3rdparty/caf/libcaf_core/caf/detail/parser/read_ipv6_address.hpp: In instantiation of โ€˜caf::detail::parser::read_ipv6_address(State&, Consumer&&)::<lambda()> [with State = caf::parser_state<caf::{anonymous}::ini_iter, caf::{anonymous}::ini_sentinel>; Consumer = caf::detail::parser::read_uri(State&, Consumer&&) [with State = caf::parser_state<caf::{anonymous}::ini_iter, caf::{anonymous}::ini_sentinel>; Consumer = caf::uri_builder&]::<unnamed struct>&]โ€™:
../3rdparty/caf/libcaf_core/caf/detail/parser/read_ipv6_address.hpp:196:7:   required from โ€˜struct caf::detail::parser::read_ipv6_address(State&, Consumer&&) [with State = caf::parser_state<caf::{anonymous}::ini_iter, caf::{anonymous}::ini_sentinel>; Consumer = caf::detail::parser::read_uri(State&, Consumer&&) [with State = caf::parser_state<caf::{anonymous}::ini_iter, caf::{anonymous}::ini_sentinel>; Consumer = caf::uri_builder&]::<unnamed struct>&]::<lambda()>โ€™
../3rdparty/caf/libcaf_core/caf/detail/parser/read_ipv6_address.hpp:190:41:   [ skipping 4 instantiation contexts, use -ftemplate-backtrace-limit=0 to disable ]
../3rdparty/caf/libcaf_core/caf/detail/parser/read_ini.hpp:243:5:   required from โ€˜void caf::detail::parser::read_ini_section(State&, Consumer&&) [with State = caf::parser_state<caf::{anonymous}::ini_iter, caf::{anonymous}::ini_sentinel>; Consumer = caf::detail::ini_category_consumer]โ€™
../3rdparty/caf/libcaf_core/caf/detail/parser/read_ini.hpp:314:5:   required from โ€˜void caf::detail::parser::read_ini(State&, Consumer&&) [with State = caf::parser_state<caf::{anonymous}::ini_iter, caf::{anonymous}::ini_sentinel>; Consumer = caf::detail::ini_consumer&]โ€™
../3rdparty/caf/libcaf_core/src/actor_system_config.cpp:507:41:   required from here
/usr/include/c++/7/array:94:12: note: โ€˜using array_type = struct std::array<unsigned char, 16> {aka struct std::array<unsigned char, 16>}โ€™ has no user-provided default constructor
     struct array
            ^~~~~
/usr/include/c++/7/array:110:56: note: and the implicitly-defined constructor does not initialize โ€˜unsigned char std::array<unsigned char, 16>::_M_elems [16]โ€™
       typename _AT_Type::_Type                         _M_elems;
                                                        ^~~~~~~~
../3rdparty/caf/libcaf_core/caf/detail/parser/read_ipv6_address.hpp: In instantiation of โ€˜caf::detail::parser::read_ipv6_address(State&, Consumer&&)::<lambda()> [with State = caf::parser_state<caf::{anonymous}::ini_iter, caf::{anonymous}::ini_sentinel>; Consumer = caf::detail::parser::read_uri(State&, Consumer&&) [with State = caf::parser_state<caf::{anonymous}::ini_iter, caf::{anonymous}::ini_sentinel>; Consumer = caf::uri_builder&]::<unnamed struct>&]โ€™:
../3rdparty/caf/libcaf_core/caf/detail/parser/read_ipv6_address.hpp:219:28:   required from โ€˜struct caf::detail::parser::read_ipv6_address(State&, Consumer&&) [with State = caf::parser_state<caf::{anonymous}::ini_iter, caf::{anonymous}::ini_sentinel>; Consumer = caf::detail::parser::read_uri(State&, Consumer&&) [with State = caf::parser_state<caf::{anonymous}::ini_iter, caf::{anonymous}::ini_sentinel>; Consumer = caf::uri_builder&]::<unnamed struct>&]::<lambda()>โ€™
../3rdparty/caf/libcaf_core/caf/detail/parser/read_ipv6_address.hpp:216:8:   [ skipping 4 instantiation contexts, use -ftemplate-backtrace-limit=0 to disable ]
../3rdparty/caf/libcaf_core/caf/detail/parser/read_ini.hpp:243:5:   required from โ€˜void caf::detail::parser::read_ini_section(State&, Consumer&&) [with State = caf::parser_state<caf::{anonymous}::ini_iter, caf::{anonymous}::ini_sentinel>; Consumer = caf::detail::ini_category_consumer]โ€™
../3rdparty/caf/libcaf_core/caf/detail/parser/read_ini.hpp:314:5:   required from โ€˜void caf::detail::parser::read_ini(State&, Consumer&&) [with State = caf::parser_state<caf::{anonymous}::ini_iter, caf::{anonymous}::ini_sentinel>; Consumer = caf::detail::ini_consumer&]โ€™
../3rdparty/caf/libcaf_core/src/actor_system_config.cpp:507:41:   required from here
/usr/include/c++/7/array:94:12: note: โ€˜using array_type = struct std::array<unsigned char, 4> {aka struct std::array<unsigned char, 4>}โ€™ has no user-provided default constructor
     struct array
            ^~~~~
/usr/include/c++/7/array:110:56: note: and the implicitly-defined constructor does not initialize โ€˜unsigned char std::array<unsigned char, 4>::_M_elems [4]โ€™
       typename _AT_Type::_Type                         _M_elems;
                                                        ^~~~~~~~
[85/274] Building CXX object 3rdparty/caf/libcaf_core/CMakeFiles/libcaf_core_shared.dir/src/config_value.cpp.o
In file included from /usr/include/c++/7/tuple:39:0,
                 from /usr/include/c++/7/bits/stl_map.h:63,
                 from /usr/include/c++/7/map:61,
                 from ../3rdparty/caf/libcaf_core/caf/config_value.hpp:25,
                 from ../3rdparty/caf/libcaf_core/src/config_value.cpp:20:
../3rdparty/caf/libcaf_core/caf/detail/parser/read_ipv6_address.hpp: In instantiation of โ€˜caf::detail::parser::read_ipv6_address(State&, Consumer&&)::<lambda()> [with State = caf::parser_state<const char*>; Consumer = caf::detail::parser::read_uri(State&, Consumer&&) [with State = caf::parser_state<const char*>; Consumer = caf::uri_builder&]::<unnamed struct>&]โ€™:
../3rdparty/caf/libcaf_core/caf/detail/parser/read_ipv6_address.hpp:196:7:   required from โ€˜struct caf::detail::parser::read_ipv6_address(State&, Consumer&&) [with State = caf::parser_state<const char*>; Consumer = caf::detail::parser::read_uri(State&, Consumer&&) [with State = caf::parser_state<const char*>; Consumer = caf::uri_builder&]::<unnamed struct>&]::<lambda()>โ€™
../3rdparty/caf/libcaf_core/caf/detail/parser/read_ipv6_address.hpp:190:41:   [ skipping 2 instantiation contexts, use -ftemplate-backtrace-limit=0 to disable ]
../3rdparty/caf/libcaf_core/caf/detail/parser/read_ini.hpp:171:5:   required from โ€˜void caf::detail::parser::read_ini_uri(State&, Consumer&&) [with State = caf::parser_state<const char*>; Consumer = caf::detail::ini_value_consumer&]โ€™
../3rdparty/caf/libcaf_core/caf/detail/parser/read_ini.hpp:192:5:   required from โ€˜void caf::detail::parser::read_ini_value(State&, Consumer&&) [with State = caf::parser_state<const char*>; Consumer = caf::detail::ini_value_consumer&]โ€™
../3rdparty/caf/libcaf_core/src/config_value.cpp:73:32:   required from here
/usr/include/c++/7/array:94:12: note: โ€˜using array_type = struct std::array<unsigned char, 16> {aka struct std::array<unsigned char, 16>}โ€™ has no user-provided default constructor
     struct array
            ^~~~~
/usr/include/c++/7/array:110:56: note: and the implicitly-defined constructor does not initialize โ€˜unsigned char std::array<unsigned char, 16>::_M_elems [16]โ€™
       typename _AT_Type::_Type                         _M_elems;
                                                        ^~~~~~~~
../3rdparty/caf/libcaf_core/caf/detail/parser/read_ipv6_address.hpp: In instantiation of โ€˜caf::detail::parser::read_ipv6_address(State&, Consumer&&)::<lambda()> [with State = caf::parser_state<const char*>; Consumer = caf::detail::parser::read_uri(State&, Consumer&&) [with State = caf::parser_state<const char*>; Consumer = caf::uri_builder&]::<unnamed struct>&]โ€™:
../3rdparty/caf/libcaf_core/caf/detail/parser/read_ipv6_address.hpp:219:28:   required from โ€˜struct caf::detail::parser::read_ipv6_address(State&, Consumer&&) [with State = caf::parser_state<const char*>; Consumer = caf::detail::parser::read_uri(State&, Consumer&&) [with State = caf::parser_state<const char*>; Consumer = caf::uri_builder&]::<unnamed struct>&]::<lambda()>โ€™
../3rdparty/caf/libcaf_core/caf/detail/parser/read_ipv6_address.hpp:216:8:   [ skipping 2 instantiation contexts, use -ftemplate-backtrace-limit=0 to disable ]
../3rdparty/caf/libcaf_core/caf/detail/parser/read_ini.hpp:171:5:   required from โ€˜void caf::detail::parser::read_ini_uri(State&, Consumer&&) [with State = caf::parser_state<const char*>; Consumer = caf::detail::ini_value_consumer&]โ€™
../3rdparty/caf/libcaf_core/caf/detail/parser/read_ini.hpp:192:5:   required from โ€˜void caf::detail::parser::read_ini_value(State&, Consumer&&) [with State = caf::parser_state<const char*>; Consumer = caf::detail::ini_value_consumer&]โ€™
../3rdparty/caf/libcaf_core/src/config_value.cpp:73:32:   required from here
/usr/include/c++/7/array:94:12: note: โ€˜using array_type = struct std::array<unsigned char, 4> {aka struct std::array<unsigned char, 4>}โ€™ has no user-provided default constructor
     struct array
            ^~~~~
/usr/include/c++/7/array:110:56: note: and the implicitly-defined constructor does not initialize โ€˜unsigned char std::array<unsigned char, 4>::_M_elems [4]โ€™
       typename _AT_Type::_Type                         _M_elems;
                                                        ^~~~~~~~
[103/274] Building CXX object 3rdparty/caf/libcaf_core/CMakeFiles/libcaf_core_shared.dir/src/detail/parse.cpp.o
In file included from /usr/include/c++/7/tuple:39:0,
                 from ../3rdparty/caf/libcaf_core/caf/detail/type_traits.hpp:21,
                 from ../3rdparty/caf/libcaf_core/caf/detail/parse.hpp:29,
                 from ../3rdparty/caf/libcaf_core/src/detail/parse.cpp:19:
../3rdparty/caf/libcaf_core/caf/detail/parser/read_ipv6_address.hpp: In instantiation of โ€˜caf::detail::parser::read_ipv6_address(State&, Consumer&&)::<lambda()> [with State = caf::parser_state<const char*>; Consumer = caf::detail::consumer<caf::ipv6_address>]โ€™:
../3rdparty/caf/libcaf_core/caf/detail/parser/read_ipv6_address.hpp:196:7:   required from โ€˜struct caf::detail::parser::read_ipv6_address(State&, Consumer&&) [with State = caf::parser_state<const char*>; Consumer = caf::detail::consumer<caf::ipv6_address>]::<lambda()>โ€™
../3rdparty/caf/libcaf_core/caf/detail/parser/read_ipv6_address.hpp:190:41:   required from โ€˜void caf::detail::parser::read_ipv6_address(State&, Consumer&&) [with State = caf::parser_state<const char*>; Consumer = caf::detail::consumer<caf::ipv6_address>]โ€™
../3rdparty/caf/libcaf_core/src/detail/parse.cpp:158:1:   required from here
/usr/include/c++/7/array:94:12: note: โ€˜using array_type = struct std::array<unsigned char, 16> {aka struct std::array<unsigned char, 16>}โ€™ has no user-provided default constructor
     struct array
            ^~~~~
/usr/include/c++/7/array:110:56: note: and the implicitly-defined constructor does not initialize โ€˜unsigned char std::array<unsigned char, 16>::_M_elems [16]โ€™
       typename _AT_Type::_Type                         _M_elems;
                                                        ^~~~~~~~
../3rdparty/caf/libcaf_core/caf/detail/parser/read_ipv6_address.hpp: In instantiation of โ€˜caf::detail::parser::read_ipv6_address(State&, Consumer&&)::<lambda()> [with State = caf::parser_state<const char*>; Consumer = caf::detail::consumer<caf::ipv6_address>]โ€™:
../3rdparty/caf/libcaf_core/caf/detail/parser/read_ipv6_address.hpp:219:28:   required from โ€˜struct caf::detail::parser::read_ipv6_address(State&, Consumer&&) [with State = caf::parser_state<const char*>; Consumer = caf::detail::consumer<caf::ipv6_address>]::<lambda()>โ€™
../3rdparty/caf/libcaf_core/caf/detail/parser/read_ipv6_address.hpp:216:8:   required from โ€˜void caf::detail::parser::read_ipv6_address(State&, Consumer&&) [with State = caf::parser_state<const char*>; Consumer = caf::detail::consumer<caf::ipv6_address>]โ€™
../3rdparty/caf/libcaf_core/src/detail/parse.cpp:158:1:   required from here
/usr/include/c++/7/array:94:12: note: โ€˜using array_type = struct std::array<unsigned char, 4> {aka struct std::array<unsigned char, 4>}โ€™ has no user-provided default constructor
     struct array
            ^~~~~
/usr/include/c++/7/array:110:56: note: and the implicitly-defined constructor does not initialize โ€˜unsigned char std::array<unsigned char, 4>::_M_elems [4]โ€™
       typename _AT_Type::_Type                         _M_elems;
                                                        ^~~~~~~~
../3rdparty/caf/libcaf_core/caf/detail/parser/read_ipv6_address.hpp: In instantiation of โ€˜caf::detail::parser::read_ipv6_address(State&, Consumer&&)::<lambda()> [with State = caf::parser_state<const char*>; Consumer = caf::detail::parser::read_uri(State&, Consumer&&) [with State = caf::parser_state<const char*>; Consumer = caf::uri_builder&]::<unnamed struct>&]โ€™:
../3rdparty/caf/libcaf_core/caf/detail/parser/read_ipv6_address.hpp:196:7:   required from โ€˜struct caf::detail::parser::read_ipv6_address(State&, Consumer&&) [with State = caf::parser_state<const char*>; Consumer = caf::detail::parser::read_uri(State&, Consumer&&) [with State = caf::parser_state<const char*>; Consumer = caf::uri_builder&]::<unnamed struct>&]::<lambda()>โ€™
../3rdparty/caf/libcaf_core/caf/detail/parser/read_ipv6_address.hpp:190:41:   required from โ€˜void caf::detail::parser::read_ipv6_address(State&, Consumer&&) [with State = caf::parser_state<const char*>; Consumer = caf::detail::parser::read_uri(State&, Consumer&&) [with State = caf::parser_state<const char*>; Consumer = caf::uri_builder&]::<unnamed struct>&]โ€™
../3rdparty/caf/libcaf_core/caf/detail/parser/read_uri.hpp:169:5:   required from โ€˜void caf::detail::parser::read_uri(State&, Consumer&&) [with State = caf::parser_state<const char*>; Consumer = caf::uri_builder&]โ€™
../3rdparty/caf/libcaf_core/src/detail/parse.cpp:121:33:   required from here
/usr/include/c++/7/array:94:12: note: โ€˜using array_type = struct std::array<unsigned char, 16> {aka struct std::array<unsigned char, 16>}โ€™ has no user-provided default constructor
     struct array
            ^~~~~
/usr/include/c++/7/array:110:56: note: and the implicitly-defined constructor does not initialize โ€˜unsigned char std::array<unsigned char, 16>::_M_elems [16]โ€™
       typename _AT_Type::_Type                         _M_elems;
                                                        ^~~~~~~~
../3rdparty/caf/libcaf_core/caf/detail/parser/read_ipv6_address.hpp: In instantiation of โ€˜caf::detail::parser::read_ipv6_address(State&, Consumer&&)::<lambda()> [with State = caf::parser_state<const char*>; Consumer = caf::detail::parser::read_uri(State&, Consumer&&) [with State = caf::parser_state<const char*>; Consumer = caf::uri_builder&]::<unnamed struct>&]โ€™:
../3rdparty/caf/libcaf_core/caf/detail/parser/read_ipv6_address.hpp:219:28:   required from โ€˜struct caf::detail::parser::read_ipv6_address(State&, Consumer&&) [with State = caf::parser_state<const char*>; Consumer = caf::detail::parser::read_uri(State&, Consumer&&) [with State = caf::parser_state<const char*>; Consumer = caf::uri_builder&]::<unnamed struct>&]::<lambda()>โ€™
../3rdparty/caf/libcaf_core/caf/detail/parser/read_ipv6_address.hpp:216:8:   required from โ€˜void caf::detail::parser::read_ipv6_address(State&, Consumer&&) [with State = caf::parser_state<const char*>; Consumer = caf::detail::parser::read_uri(State&, Consumer&&) [with State = caf::parser_state<const char*>; Consumer = caf::uri_builder&]::<unnamed struct>&]โ€™
../3rdparty/caf/libcaf_core/caf/detail/parser/read_uri.hpp:169:5:   required from โ€˜void caf::detail::parser::read_uri(State&, Consumer&&) [with State = caf::parser_state<const char*>; Consumer = caf::uri_builder&]โ€™
../3rdparty/caf/libcaf_core/src/detail/parse.cpp:121:33:   required from here
/usr/include/c++/7/array:94:12: note: โ€˜using array_type = struct std::array<unsigned char, 4> {aka struct std::array<unsigned char, 4>}โ€™ has no user-provided default constructor
     struct array
            ^~~~~
/usr/include/c++/7/array:110:56: note: and the implicitly-defined constructor does not initialize โ€˜unsigned char std::array<unsigned char, 4>::_M_elems [4]โ€™
       typename _AT_Type::_Type                         _M_elems;
                                                        ^~~~~~~~
[148/274] Building CXX object 3rdparty/caf/libcaf_core/CMakeFiles/libcaf_core_shared.dir/src/ipv6_address.cpp.o
In file included from ../3rdparty/caf/libcaf_core/caf/ipv6_address.hpp:21:0,
                 from ../3rdparty/caf/libcaf_core/src/ipv6_address.cpp:19:
../3rdparty/caf/libcaf_core/caf/detail/parser/read_ipv6_address.hpp: In instantiation of โ€˜caf::detail::parser::read_ipv6_address(State&, Consumer&&)::<lambda()> [with State = caf::parser_state<const char*>; Consumer = caf::{anonymous}::ipv6_address_consumer&]โ€™:
../3rdparty/caf/libcaf_core/caf/detail/parser/read_ipv6_address.hpp:196:7:   required from โ€˜struct caf::detail::parser::read_ipv6_address(State&, Consumer&&) [with State = caf::parser_state<const char*>; Consumer = caf::{anonymous}::ipv6_address_consumer&]::<lambda()>โ€™
../3rdparty/caf/libcaf_core/caf/detail/parser/read_ipv6_address.hpp:190:41:   required from โ€˜void caf::detail::parser::read_ipv6_address(State&, Consumer&&) [with State = caf::parser_state<const char*>; Consumer = caf::{anonymous}::ipv6_address_consumer&]โ€™
../3rdparty/caf/libcaf_core/src/ipv6_address.cpp:218:35:   required from here
/usr/include/c++/7/array:94:12: note: โ€˜using array_type = struct std::array<unsigned char, 16> {aka struct std::array<unsigned char, 16>}โ€™ has no user-provided default constructor
     struct array
            ^~~~~
/usr/include/c++/7/array:110:56: note: and the implicitly-defined constructor does not initialize โ€˜unsigned char std::array<unsigned char, 16>::_M_elems [16]โ€™
       typename _AT_Type::_Type                         _M_elems;
                                                        ^~~~~~~~
../3rdparty/caf/libcaf_core/caf/detail/parser/read_ipv6_address.hpp: In instantiation of โ€˜caf::detail::parser::read_ipv6_address(State&, Consumer&&)::<lambda()> [with State = caf::parser_state<const char*>; Consumer = caf::{anonymous}::ipv6_address_consumer&]โ€™:
../3rdparty/caf/libcaf_core/caf/detail/parser/read_ipv6_address.hpp:219:28:   required from โ€˜struct caf::detail::parser::read_ipv6_address(State&, Consumer&&) [with State = caf::parser_state<const char*>; Consumer = caf::{anonymous}::ipv6_address_consumer&]::<lambda()>โ€™
../3rdparty/caf/libcaf_core/caf/detail/parser/read_ipv6_address.hpp:216:8:   required from โ€˜void caf::detail::parser::read_ipv6_address(State&, Consumer&&) [with State = caf::parser_state<const char*>; Consumer = caf::{anonymous}::ipv6_address_consumer&]โ€™
../3rdparty/caf/libcaf_core/src/ipv6_address.cpp:218:35:   required from here
/usr/include/c++/7/array:94:12: note: โ€˜using array_type = struct std::array<unsigned char, 4> {aka struct std::array<unsigned char, 4>}โ€™ has no user-provided default constructor
     struct array
            ^~~~~
/usr/include/c++/7/array:110:56: note: and the implicitly-defined constructor does not initialize โ€˜unsigned char std::array<unsigned char, 4>::_M_elems [4]โ€™
       typename _AT_Type::_Type                         _M_elems;
                                                        ^~~~~~~~

Test failure

The python-data fails for me because of a type mismatch for integers:

======================================================================
FAIL: test_integer (__main__.TestDataConstruction)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "data.py", line 112, in test_integer
    self.check_to_broker_and_back(42, '42', broker.Data.Type.Integer)
  File "data.py", line 104, in check_to_broker_and_back
    self.check_to_py(b, p)
  File "data.py", line 58, in check_to_py
    self.assertIsInstance(b2p, type(p))
AssertionError: 42 is not an instance of <type 'long'>

type(42) says it's a <type 'int'> (not long).

Anybody else seeing that?

[Python bindings] Unable to convert function return value to a Python type! when accessing caf::node_id

When trying to access a field of type caf::node_id e.g. through a status update (status.context().node) Python exits with the error in the title.
As the error says the type from libcaf cannot be converted to a Python object, which makes sense, as broker does not ship with libcaf Python bindings.

Is this intended behaviour? If so then the caf types should probably not be exposed to Python at all (see: _broker.cpp).

We would really like to use the node id in our project though, is there any supported way of doing so?
For the moment we discovered that context.__repr__ looks like it contains the node id, which lets an endpoint identify its peers (on status updates) but as Endpoint::node_id is not bound at all the endpoint cannot get its own id.

This is just one example of unconvertible objects. endpoint.peers() is unable to convert std::vector<broker::peer_info, std::allocator<broker::peer_info>>. Unfortunately we are not sure if this happens because of the vector itself, broker::peer_info or the std::allocator.

If we can provide any more information, please let us know :)

Consider sending only put messages to clones

Currently, the master forwards messages such as add_command, put_unique_command, etc. This means the clones perform unnecessary processing steps, since the master already knows the new value and instead could send put_command messages exclusively.

Re-visit API for Creating Endpoints

Currently, users create an endpoint directly by calling its constructor. However, creating an endpoint can fail for several reasons. The same is true for configuration.

In #81 (comment), we've discussed this straw-man proposal to improve error signaling to the user by going through factory functions:

int main(int argc, char** argv) {
  if (auto ep = endpoint::make(argc, argv); // passing (argc, argv) is optional
      not ep) {
    // oops, configuration error
    auto& err = ep.error();
    // ...
  } else {
    // ok, endpoint up and running
  }
}

// With explicitly using a configuration.
int main(int argc, char** argv) {
  configuration conf;
  // ... tweak default settings ...
  if (auto err = conf.init(argc, argv)) {
    // deal with error explicitly, this step is optional and otherwise performed by endoint::run
  }
  if (auto ep = endpoint::make(std::move(conf);
      not ep) {
    // ...
  } else {
    // ...
  }
}

Support serialization of uint8 into broker::data

It would be nice to support serialization of uint8s into broker::data. Several of our data structures (at least HLL & paraglob) make quite extensive use of uint8s for internal data storage.

At the moment these get blown up into uint64s for serialization - which is a tad inefficient. It would theoretically be possible to pack several uint8s into an uint64 - but that would make the serialization code for each datastructure more complex - and also would mean quite a bit of repeated work each time one needs this.

Use shared memory for sharing data store content between local processes

When workers keep shared state in a table, they currently each need to have full copy of all the data, even when running on the same host, as in most cluster setups. Broker offers an opportunity to avoid that: it could use shared memory to provide access to a single copy for all workers running on the same system.

[Python] Subscriber.poll() is unusable in Python

The subscriber.poll() function is currently not usable via Python, as pybind complains about unconvertable types. The following excerpt from a Python REPL illustrates the issue.

>>> q = ep.make_subscriber("foo")
>>> q
<broker.Subscriber object at 0x7fbf65886430>
>>> q.poll()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/lib/python3.8/site-packages/broker/__init__.py", line 145, in poll
    msgs = self._subscriber.poll()
TypeError: Unable to convert function return value to a Python type! The signature was
	(self: broker._broker.SubscriberBase) -> std::vector<caf::cow_tuple<broker::topic, broker::data>, std::allocator<caf::cow_tuple<broker::topic, broker::data> > >

Did you forget to `#include <pybind11/stl.h>`? Or <pybind11/complex.h>,
<pybind11/functional.h>, <pybind11/chrono.h>, etc. Some automatic
conversions are optional and require extra headers to be included
when compiling your pybind11 module.

I vaguely recall this issue appearing some time ago for other functions which needed additional annotations to work w/ pybind. Also I am 99% sure, that I used poll() successfully in the past. Did this happen because of the switch to caf::cow_tuple?

Python Bindings: "unpeer_nosync" missing

Hi,

first of all, thanks for all the quick fixes during the past few weeks. I have another request now, as I can not find the unpeer_nosync() function from the endpoint class in the python bindings (it does, however, already exist in the C++ code). I would be very glad if you could add that as I really need non-blocking operations in my code.

Thanks!

Broker apparently crashes when another endpoint with CAF TRACE logs enabled tries to peer with it

So I ran into just another problem trying to analyze the connection problems. When I attempted to peer one of my Broker endpoints with the CAF version with loglevel set to TRACE with another endpoint (this one using the integrated CAF version from Broker), the other endpoint crashed with an unfortunately unmeaningful message:

terminate called after throwing an instance of 'std::runtime_error'
  what():  invalid type found

Do you have an idea what might have caused this? I used the 0.16.3 release for cloning the CAF version and the most recent Broker version (which states that it supports CAF in 0.16.3). Curiously, if I peer two endpoints with the integrated version, no crashes occur - but I do not get the CAF logs this way and I need those because I still have this problem that the other side sometimes does not notice that someone attempted to peer with it. The problem now occurs a bit more regularly, so I really need it fixed.

The container running the compiled CAF version is setup like this in case you need to know:

FROM python as broker

RUN git clone https://github.com/bro/broker --recursive
RUN apt-get update && apt-get install -y openssl \
    gcc \
    cmake

RUN git clone https://github.com/actor-framework/actor-framework.git && cd actor-framework && git checkout ec6a892
RUN cd actor-framework && ./configure --with-log-level=TRACE && cd build && make && make install

RUN cd broker && ./configure --with-caf=/usr/local/actor-framework && make && make test && make install

Status subscriber timeout doesn't work in Python

Consider this example:

#!/usr/bin/env python

import broker

endpoint = broker.Endpoint()
status_subscriber = endpoint.make_status_subscriber(True)
msg = status_subscriber.get(0.5)

The output:

Traceback (most recent call last):
  File "test.py", line 7, in <module>
    msg = status_subscriber.get(0.5)
  File ".../env/lib/python/broker/__init__.py", line 128, in get
    x = self._subscriber.get(*args, **kwargs)
TypeError: Unable to convert function return value to a Python type! The signature was
        (self: broker._broker.StatusSubscriberBase, arg0: float) -> caf::optional<caf::variant<broker::none, caf::error, broker::status> >

It looks like we need to dispatch the variant to different Python types.

OpenSSL Unit Test on FreeBSD Always Times Out

Broker runs fine on our Fedora and macOS machines. However, the OpenSSL unit test on FreeBSD build always times out. Did you observe similar behavior in your CI?

Here's the full log:

prepare build steps on stage [2:0] FreeBSD && clang: debug
get latest CAF master build for FreeBSD && clang && debug
Copied 1 artifact from "CAF: C++ Actor Framework ยป actor-framework ยป master" build number 139
Extracting from /home/jenkins/data/workspace/Broker/caf-import/FreeBSD && clang && debug.zip
Extracted: 467 files
get sources from previous stage and run CMake
[build] $ cmake -D CMAKE_BUILD_TYPE=debug -DDISABLE_PYTHON_BINDINGS:BOOL=yes -DENABLE_ADDRESS_SANITIZER:BOOL=yes -DCAF_ROOT_DIR=/home/jenkins/data/workspace/Broker/package -DCMAKE_INSTALL_PREFIX=/home/jenkins/data/workspace/Broker/package /home/jenkins/data/workspace/Broker/broker-sources
-- The C compiler identification is Clang 6.0.0
-- The CXX compiler identification is Clang 6.0.0
-- Check for working C compiler: /usr/bin/cc
-- Check for working C compiler: /usr/bin/cc -- works
-- Detecting C compiler ABI info
-- Detecting C compiler ABI info - done
-- Detecting C compile features
-- Detecting C compile features - done
-- Check for working CXX compiler: /usr/bin/c++
-- Check for working CXX compiler: /usr/bin/c++ -- works
-- Detecting CXX compiler ABI info
-- Detecting CXX compiler ABI info - done
-- Detecting CXX compile features
-- Detecting CXX compile features - done
-- Found CAF: /home/jenkins/data/workspace/Broker/package/lib/libcaf_core.so;/home/jenkins/data/workspace/Broker/package/lib/libcaf_io.so;/home/jenkins/data/workspace/Broker/package/lib/libcaf_openssl.so  found components:  core io test openssl 
-- Found OpenSSL: /usr/local/lib/libcrypto.so (found version "1.1.0i")  
-- Could NOT find RocksDB (missing: ROCKSDB_LIBRARIES ROCKSDB_INCLUDE_DIRS) 
-- Performing Test cxx11_header_works
-- Performing Test cxx11_header_works - Success
-- Check if the system is big endian
-- Searching 16 bit integer
-- Looking for sys/types.h
-- Looking for sys/types.h - found
-- Looking for stdint.h
-- Looking for stdint.h - found
-- Looking for stddef.h
-- Looking for stddef.h - found
-- Check size of unsigned short
-- Check size of unsigned short - done
-- Using unsigned short
-- Check if the system is big endian - little endian
-- Looking for include file emmintrin.h
-- Looking for include file emmintrin.h - found

==================|  Broker Config Summary  |====================
Version:         1.1.0-479
SO version:      0

Build Type:      debug
Install prefix:  /home/jenkins/data/workspace/Broker/package
Library prefix:  /home/jenkins/data/workspace/Broker/package/lib
Shared libs:     yes
Static libs:     no

CC:              /usr/bin/cc
CFLAGS:           -Wall -Wno-unused -g -DDEBUG -DBRO_DEBUG -g
CXX:             /usr/bin/c++
CXXFLAGS:         -Wall -Wno-unused -g -DDEBUG -DBRO_DEBUG -std=c++11  -pthread -Wall -Wno-unused -pedantic -ftemplate-depth=512 -ftemplate-backtrace-limit=3 -fsanitize=address -fno-omit-frame-pointer -g

CAF:             /home/jenkins/data/workspace/Broker/package/include (0.16.2)
RocksDB:         no
Python bindings: no
Bro:             no
=================================================================

-- Configuring done
-- Generating done
-- Build files have been written to: /usr/home/jenkins/data/workspace/Broker/broker-sources/build
[build] $ cmake --build /home/jenkins/data/workspace/Broker/broker-sources/build --target install
-- Could NOT find RocksDB (missing: ROCKSDB_LIBRARIES ROCKSDB_INCLUDE_DIRS) 

==================|  Broker Config Summary  |====================
Version:         1.1.0-479
SO version:      0

Build Type:      debug
Install prefix:  /home/jenkins/data/workspace/Broker/package
Library prefix:  /home/jenkins/data/workspace/Broker/package/lib
Shared libs:     yes
Static libs:     no

CC:              /usr/bin/cc
CFLAGS:           -Wall -Wno-unused -g -DDEBUG -DBRO_DEBUG -g
CXX:             /usr/bin/c++
CXXFLAGS:         -Wall -Wno-unused -g -DDEBUG -DBRO_DEBUG -std=c++11  -pthread -Wall -Wno-unused -pedantic -ftemplate-depth=512 -ftemplate-backtrace-limit=3 -fsanitize=address -fno-omit-frame-pointer -g

CAF:             /home/jenkins/data/workspace/Broker/package/include (0.16.2)
RocksDB:         no
Python bindings: no
Bro:             no
=================================================================

-- Configuring done
-- Generating done
-- Build files have been written to: /usr/home/jenkins/data/workspace/Broker/broker-sources/build
Scanning dependencies of target broker
[  1%] Building CXX object CMakeFiles/broker.dir/src/address.cc.o
[  3%] Building CXX object CMakeFiles/broker.dir/src/configuration.cc.o
[  4%] Building CXX object CMakeFiles/broker.dir/src/core_actor.cc.o
[  6%] Building CXX object CMakeFiles/broker.dir/src/data.cc.o
[  7%] Building CXX object CMakeFiles/broker.dir/src/endpoint.cc.o
[  9%] Building CXX object CMakeFiles/broker.dir/src/error.cc.o
[ 10%] Building CXX object CMakeFiles/broker.dir/src/status_subscriber.cc.o
[ 12%] Building CXX object CMakeFiles/broker.dir/src/internal_command.cc.o
[ 13%] Building CXX object CMakeFiles/broker.dir/src/mailbox.cc.o
[ 15%] Building CXX object CMakeFiles/broker.dir/src/network_info.cc.o
[ 16%] Building CXX object CMakeFiles/broker.dir/src/peer_status.cc.o
[ 18%] Building CXX object CMakeFiles/broker.dir/src/port.cc.o
[ 19%] Building CXX object CMakeFiles/broker.dir/src/publisher.cc.o
[ 21%] Building CXX object CMakeFiles/broker.dir/src/status.cc.o
[ 22%] Building CXX object CMakeFiles/broker.dir/src/store.cc.o
[ 24%] Building CXX object CMakeFiles/broker.dir/src/subnet.cc.o
[ 25%] Building CXX object CMakeFiles/broker.dir/src/subscriber.cc.o
[ 27%] Building CXX object CMakeFiles/broker.dir/src/time.cc.o
[ 28%] Building CXX object CMakeFiles/broker.dir/src/topic.cc.o
[ 30%] Building CXX object CMakeFiles/broker.dir/src/version.cc.o
[ 31%] Building CXX object CMakeFiles/broker.dir/src/detail/abstract_backend.cc.o
[ 33%] Building CXX object CMakeFiles/broker.dir/src/detail/clone_actor.cc.o
[ 34%] Building CXX object CMakeFiles/broker.dir/src/detail/core_policy.cc.o
[ 36%] Building CXX object CMakeFiles/broker.dir/src/detail/filesystem.cc.o
[ 37%] Building CXX object CMakeFiles/broker.dir/src/detail/flare.cc.o
[ 39%] Building CXX object CMakeFiles/broker.dir/src/detail/flare_actor.cc.o
[ 40%] Building CXX object CMakeFiles/broker.dir/src/detail/make_backend.cc.o
[ 42%] Building CXX object CMakeFiles/broker.dir/src/detail/master_actor.cc.o
[ 43%] Building CXX object CMakeFiles/broker.dir/src/detail/master_resolver.cc.o
[ 45%] Building CXX object CMakeFiles/broker.dir/src/detail/memory_backend.cc.o
[ 46%] Building CXX object CMakeFiles/broker.dir/src/detail/network_cache.cc.o
[ 48%] Building CXX object CMakeFiles/broker.dir/src/detail/prefix_matcher.cc.o
[ 50%] Building CXX object CMakeFiles/broker.dir/src/detail/sqlite_backend.cc.o
[ 51%] Building C object CMakeFiles/broker.dir/3rdparty/sqlite3.c.o
[ 53%] Linking CXX shared library lib/libbroker.so
[ 53%] Built target broker
Scanning dependencies of target broker-pipe
[ 54%] Building CXX object CMakeFiles/broker-pipe.dir/src/broker-pipe.cc.o
[ 56%] Linking CXX executable bin/broker-pipe
[ 56%] Built target broker-pipe
Scanning dependencies of target broker-node
[ 57%] Building CXX object CMakeFiles/broker-node.dir/src/broker-node.cc.o
[ 59%] Linking CXX executable bin/broker-node
[ 59%] Built target broker-node
Scanning dependencies of target broker-stream-benchmark
[ 60%] Building CXX object tests/CMakeFiles/broker-stream-benchmark.dir/benchmark/broker-stream-benchmark.cc.o
[ 62%] Linking CXX executable ../bin/broker-stream-benchmark
[ 62%] Built target broker-stream-benchmark
Scanning dependencies of target broker-benchmark
[ 63%] Building CXX object tests/CMakeFiles/broker-benchmark.dir/benchmark/broker-benchmark.cc.o
[ 65%] Linking CXX executable ../bin/broker-benchmark
[ 65%] Built target broker-benchmark
Scanning dependencies of target broker-test
[ 66%] Building CXX object tests/CMakeFiles/broker-test.dir/cpp/backend.cc.o
[ 68%] Building CXX object tests/CMakeFiles/broker-test.dir/cpp/bro.cc.o
[ 69%] Building CXX object tests/CMakeFiles/broker-test.dir/cpp/core.cc.o
[ 71%] Building CXX object tests/CMakeFiles/broker-test.dir/cpp/data.cc.o
[ 72%] Building CXX object tests/CMakeFiles/broker-test.dir/cpp/status_subscriber.cc.o
[ 74%] Building CXX object tests/CMakeFiles/broker-test.dir/cpp/integration.cc.o
[ 75%] Building CXX object tests/CMakeFiles/broker-test.dir/cpp/master.cc.o
[ 77%] Building CXX object tests/CMakeFiles/broker-test.dir/cpp/publisher.cc.o
[ 78%] Building CXX object tests/CMakeFiles/broker-test.dir/cpp/radix_tree.cc.o
[ 80%] Building CXX object tests/CMakeFiles/broker-test.dir/cpp/ssl.cc.o
[ 81%] Building CXX object tests/CMakeFiles/broker-test.dir/cpp/store.cc.o
[ 83%] Building CXX object tests/CMakeFiles/broker-test.dir/cpp/subscriber.cc.o
[ 84%] Building CXX object tests/CMakeFiles/broker-test.dir/cpp/topic.cc.o
[ 86%] Building CXX object tests/CMakeFiles/broker-test.dir/test.cpp.o
[ 87%] Linking CXX executable ../bin/broker-test
[ 87%] Built target broker-test
Scanning dependencies of target stores
[ 89%] Building CXX object doc/_examples/CMakeFiles/stores.dir/stores.cc.o
[ 90%] Linking CXX executable ../../bin/stores
[ 90%] Built target stores
Scanning dependencies of target ping
[ 92%] Building CXX object doc/_examples/CMakeFiles/ping.dir/ping.cc.o
[ 93%] Linking CXX executable ../../bin/ping
[ 93%] Built target ping
Scanning dependencies of target comm
[ 95%] Building CXX object doc/_examples/CMakeFiles/comm.dir/comm.cc.o
[ 96%] Linking CXX executable ../../bin/comm
[ 96%] Built target comm
Scanning dependencies of target synopsis
[ 98%] Building CXX object doc/_examples/CMakeFiles/synopsis.dir/synopsis.cc.o
[100%] Linking CXX executable ../../bin/synopsis
[100%] Built target synopsis
Install the project...
-- Install configuration: "debug"
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/address.hh
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/api_flags.hh
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/atoms.hh
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/backend.hh
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/backend_options.hh
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/bad_variant_access.hh
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/bro.hh
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/broker.hh
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/configuration.hh
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/convert.hh
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/core_actor.hh
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/data.hh
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/detail
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/detail/abstract_backend.hh
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/detail/appliers.hh
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/detail/assert.hh
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/detail/blob.hh
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/detail/clone_actor.hh
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/detail/core_policy.hh
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/detail/core_scatterer.hh
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/detail/die.hh
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/detail/filesystem.hh
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/detail/flare.hh
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/detail/flare_actor.hh
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/detail/hash.hh
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/detail/make_backend.hh
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/detail/make_unique.hh
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/detail/master_actor.hh
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/detail/master_resolver.hh
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/detail/memory_backend.hh
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/detail/network_cache.hh
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/detail/operators.hh
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/detail/prefix_matcher.hh
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/detail/radix_tree.hh
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/detail/rocksdb_backend.hh
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/detail/scoped_flare_actor.hh
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/detail/shared_publisher_queue.hh
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/detail/shared_queue.hh
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/detail/shared_subscriber_queue.hh
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/detail/sqlite_backend.hh
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/detail/subscription.hh
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/detail/type_traits.hh
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/endpoint.hh
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/endpoint_info.hh
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/enum_value.hh
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/error.hh
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/expected.hh
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/filter_type.hh
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/frontend.hh
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/fwd.hh
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/internal_command.hh
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/logger.hh
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/mailbox.hh
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/network_info.hh
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/none.hh
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/optional.hh
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/peer_filter.hh
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/peer_flags.hh
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/peer_info.hh
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/peer_status.hh
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/port.hh
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/publisher.hh
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/snapshot.hh
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/status.hh
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/status_subscriber.hh
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/store.hh
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/subnet.hh
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/subscriber.hh
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/subscriber_base.hh
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/time.hh
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/timeout.hh
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/topic.hh
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/version.hh
-- Installing: /home/jenkins/data/workspace/Broker/package/include/broker/config.hh
-- Installing: /home/jenkins/data/workspace/Broker/package/lib/libbroker.so..
-- Installing: /home/jenkins/data/workspace/Broker/package/lib/libbroker.so.0
-- Set runtime path of "/home/jenkins/data/workspace/Broker/package/lib/libbroker.so.." to "/home/jenkins/data/workspace/Broker/package/lib:/usr/local/lib"
-- Installing: /home/jenkins/data/workspace/Broker/package/lib/libbroker.so
[build] $ ctest --output-on-failure
Test project /usr/home/jenkins/data/workspace/Broker/broker-sources/build
      Start  1: backend
 1/13 Test  #1: backend ..........................   Passed    0.19 sec
      Start  2: bro
 2/13 Test  #2: bro ..............................   Passed    0.05 sec
      Start  3: core
 3/13 Test  #3: core .............................   Passed    1.59 sec
      Start  4: data
 4/13 Test  #4: data .............................   Passed    0.06 sec
      Start  5: status_subscriber
 5/13 Test  #5: status_subscriber ................   Passed    0.10 sec
      Start  6: integration
 6/13 Test  #6: integration ......................   Passed    3.23 sec
      Start  7: master
 7/13 Test  #7: master ...........................   Passed    0.56 sec
      Start  8: publisher
 8/13 Test  #8: publisher ........................   Passed    0.38 sec
      Start  9: radix_tree
 9/13 Test  #9: radix_tree .......................   Passed    7.56 sec
      Start 10: ssl
10/13 Test #10: ssl ..............................***Timeout  20.01 sec
+----------------------------------------------------------------------+
                                  ssl
+----------------------------------------------------------------------+

- authenticated_session
** /home/jenkins/data/workspace/Broker/broker-sources/tests/cpp/ssl.cc:30   test_dir
  -> using certififcate /home/jenkins/data/workspace/Broker/broker-sources/tests/cpp/certs/cert.1.pem, key /home/jenkins/data/workspace/Broker/broker-sources/tests/cpp/certs/key.1.pem [line 35]
** /home/jenkins/data/workspace/Broker/broker-sources/tests/cpp/ssl.cc:30   test_dir
  -> using certififcate /home/jenkins/data/workspace/Broker/broker-sources/tests/cpp/certs/cert.2.pem, key /home/jenkins/data/workspace/Broker/broker-sources/tests/cpp/certs/key.2.pem [line 35]
** /home/jenkins/data/workspace/Broker/broker-sources/tests/cpp/ssl.cc:30   test_dir
  -> using certififcate /home/jenkins/data/workspace/Broker/broker-sources/tests/cpp/certs/cert.self-signed.pem, key /home/jenkins/data/workspace/Broker/broker-sources/tests/cpp/certs/key.self-signed.pem [line 35]
  -> prepare authenticated connection [line 74]
  -> mercury_auth listen [line 78]
  -> venus_auth peer with mecury_auth on port 19713 [line 80]

      Start 11: store
11/13 Test #11: store ............................   Passed    1.43 sec
      Start 12: subscriber
12/13 Test #12: subscriber .......................   Passed    0.32 sec
      Start 13: topic
13/13 Test #13: topic ............................   Passed    0.06 sec

92% tests passed, 1 tests failed out of 13

Total Test time (real) =  35.55 sec

The following tests FAILED:
	 10 - ssl (Timeout)
Errors while running CTest
ctest exited with failure code 8
[WS-CLEANUP] Deleting project workspace...
[WS-CLEANUP] Deferred wipeout is used...
[WS-CLEANUP] done

store_events currently give no way to determine the store an event belongs to

This is kind of an addition that was already discussed in #100, but then got forgotten (I think).

It is now possible to receive store events. However there seems to be no way to determine which store an event actually was sent from. Since we will have several of them simultaneously this is kind of important :)

As John noted in #100 , having the store id in the event as well as the topic seems like the best way.

Invalid / corrupt batches

As reported here, broker-pipe runs into received unexpected batch type errors. Tweaking the log output to print the unexpected batch even results in a segfault.

Consider using UTC timezone in python bindings

Currently time formatter for the Time type is specified like this

 Data.Type.Timestamp: lambda: datetime.datetime.fromtimestamp(d.as_timestamp())

This can creates a naive datetime object with no timezone attached to it, which makes it very inconvinient to user, since he is forced to transform it to timezone aware object and look current timezone up.

It would be much more convinient to use if converter was something like this
lambda: datetime.datetime.utcfromtimestamp(d.as_timestamp()).replace(tz_info=pytz.utc())

TZ aware time objects are better than TZ unaware. Always.

Segfault when creating single-argument events

I'm experimenting with Bro to Python communication and run into a scenario where I get segfault. Here are the Bro and Python scripts:

redef exit_only_after_terminate = T;

event foo()
  {
  # nop
  }

event bar(x: int)
  {
  print x;
  }

event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string)
  {
  local e = Broker::make_event(foo);
  Broker::publish("/test", e);
  }

event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string)
  {
  terminate();
  }

event bro_init()
  {
  Broker::subscribe("/test");
  Broker::peer("localhost", 55555/tcp);
  }
#!/usr/bin/env python

import broker
import broker.bro

# Setup endpoint and subscribers.
endpoint = broker.Endpoint()
endpoint.listen("localhost", 55555)
subscriber = endpoint.make_subscriber("/test")
while True:
    print("waiting for remote signal...")
    topic, data = subscriber.get()
    print("got message:", data)
    event = broker.bro.Event("bar", 42)    # <-------------------- crash
    endpoint.publish("/test", event)

When I run first the Python (python -X faulthandler test.py) and then the Bro script, I get a backtrace that points to this function:

class Event(_broker.bro.Event):
    def __init__(self, *args):
        if len(args) == 1 and isinstance(args[0], list):
            # Parse raw broker message as event.
            _broker.bro.Event.__init__(self, broker.Data.from_py(args[0]))
        elif len(args) == 2 and isinstance(args[0], str) and not isinstance(args[0], broker.Data):
            # (name, (arg1, arg2, ...))
            _broker.bro.Event.__init__(self, args[0], broker.Data.from_py(args[1])) # <-------------- crash
        else:
            # (name, arg1, arg2, ...)
            _broker.bro.Event.__init__(self, args[0], broker.Data.from_py(args[1:]))

    def args(self):
        return [broker.Data.to_py(a) for a in _broker.bro.Event.args(self)]

The segfault doesn't show up when I add a second argument (e.g.l broker.bro.Event("bar", 42, 42)).

I'm not sure what's going on here under the hood yet and how to use the machinery. For example, why do we need broker.Data.from_py(x) when we can write broker.Data(x)?

Extend Broker data stores for subnet lookup

Bro's tables special-case tables of subnets for address lookups. We should do the same for Broker's data stores so that they can support functionality that needs this, such as the intel framework. (Potential challenge: do the backends support network lookups?)

broker-cluster-benchmark sometimes fails to terminate

Sometimes broker-cluster-benchmark doesn't terminate. E.g. running with the following recordings:

zeek-broker-recordings.tar.gz

It sometimes terminates cleanly and other times (maybe every 5-ish times for me) hangs for several minutes so I assume it's not going to terminate. Here's the verbose output from one of those times:

broker-cluster-bench-hang.txt

I was able to run the benchmark 40 times without hanging by adding a 2 second sleep() after this line:

verbose::println("all nodes are up and running, run benchmark");

So guessing something in the node setup for the benchmark isn't correctly waiting for peerings/subscriptions to be fully established before nodes start sending messages?

Integrate Streams

Integrating CAF's new (experimental) streaming feature allows Broker to benefit from back-pressure. At the same time, this simplifies the implementation of Broker due to the more powerful building blocks provided by CAF.

The following list tracks open questions (๐Ÿ”) and programming tasks (โš”๏ธ). It is updated as needed.

  • Context
    • Use a single core actor instead of one core per endpoint. โš”๏ธ
    • Merge with endpoint. โš”๏ธ
  • Core Actor
    • Multiplex streams using the new proof-of-concept governor. โš”๏ธ
    • Base the new implementation on CAF's manual_stream_management unit test. โš”๏ธ
    • Is a down handler still needed or can we leave peer management to the governor? ๐Ÿ”
    • What is the best way to integrate master/replica traffic into the new governor model? ๐Ÿ”
    • Re-implement unpeer. โš”๏ธ
    • Re-integrate storage functionality. โš”๏ธ
      • master attach. โš”๏ธ
      • clone attach. โš”๏ธ
    • Move all network_info message handlers to a new "network manager" of sorts. โš”๏ธ
  • Stores
    • Re-implement clear. โš”๏ธ
  • Endpoints
    • Redesign Broker's public API as discussed at the last February Meeting at ICSI. โš”๏ธ
      • Unify nonblocking and blocking endpoints. โš”๏ธ
      • Implement publish function family. โš”๏ธ
      • Implement subscribe function family. โš”๏ธ
    • Allow one-to-one "publishing" between nodes. โš”๏ธ
    • Calculate send rate on publisher and subscriber. โš”๏ธ
  • Unit testing.
    • Peering of two or more core actors. โš”๏ธ
    • Stream passing through two or more cores. โš”๏ธ
    • Stream connected with a publisher on one end as producer. โš”๏ธ
    • Stream connected with a subscriber on one end as consumer. โš”๏ธ
    • Attach a master to a core and store & retrieve data via put/get API . โš”๏ธ
    • Attach a master to one core and a clone to a second (connected) core . โš”๏ธ

Trouble with installing into right Python home

I'm trying to setup a Python virtual env with Broker. It seems that the install step installs packages in the wrong destination. This is what I did:

Setup a Python virtual environment:

export PREFIX=$(pwd)/env
python3 -m venv $PREFIX
source $PREFIX/bin/activate

Then, install Broker:

git clone [email protected]:bro/broker.git
cd broker
git submodule update --recursive --init
./configure
  --generator=Ninja \
  --prefix=$PREFIX \
  --with-python=$PREFIX/bin/python
cd build
ninja
ninja install

The resulting directory layout:

ls $PREFIX/lib
python             libbroker.dylib           libcaf_io.dylib
python3.7          libcaf_core.0.15.7.dylib  libcaf_openssl.0.15.7.dylib
libbroker...dylib  libcaf_core.dylib         libcaf_openssl.dylib
libbroker.0.dylib  libcaf_io.0.15.7.dylib

Here, the problem is that the bindings are installed into python and not python3.7. The problem lies in the CMake script:

if ( NOT PY_MOD_INSTALL_DIR )
  # Figure out Python module install directory.
  if (BROKER_PYTHON_PREFIX)
    set(pyver ${PYTHON_VERSION_MAJOR}.${PYTHON_VERSION_MINOR})
    set(PY_MOD_INSTALL_DIR
        ${BROKER_PYTHON_PREFIX}/lib/python${pyver}/site-packages)
  elseif (BROKER_PYTHON_HOME)
    set(PY_MOD_INSTALL_DIR ${BROKER_PYTHON_HOME}/lib/python)    <-------
  else ()
    execute_process(COMMAND ${PYTHON_EXECUTABLE} -c
      "from distutils.sysconfig import get_python_lib; print(get_python_lib())"
      OUTPUT_VARIABLE python_site_packages
      OUTPUT_STRIP_TRAILING_WHITESPACE)
    set(PY_MOD_INSTALL_DIR ${python_site_packages})
  endif ()
endif ()

I'm not sure what the right fix is, though, but wanted to bring this issue to your attention.

Python Tests break on Python 3.8

Python 3.8 introduces async test cases which are imported by default in unittest. This results in an import of asyncio which in turn imports ssl. As one test is also called ssl.py the test is imported instead of the standard library package and due to the use of unittest in ssl.py the test break. This affects all tests since at least 1.2.0, but I think it might go back later then this. Below is an example traceback. The obvious fix would be to rename ssl.py to avoid the nameclash.

Traceback (most recent call last):
  File "/home/florian/src/cpp/broker/tests/python/communication.py", line 5, in <module>
    import unittest
  File "/usr/lib/python3.8/unittest/__init__.py", line 60, in <module>
    from .async_case import IsolatedAsyncioTestCase
  File "/usr/lib/python3.8/unittest/async_case.py", line 1, in <module>
    import asyncio
  File "/usr/lib/python3.8/asyncio/__init__.py", line 8, in <module>
    from .base_events import *
  File "/usr/lib/python3.8/asyncio/base_events.py", line 34, in <module>
    import ssl
  File "/home/florian/src/cpp/broker/tests/python/ssl.py", line 13, in <module>
    class TestSSL(unittest.TestCase):
AttributeError: partially initialized module 'unittest' has no attribute 'TestCase' (most likely due to a circular import)

No guaranteed status on `peer_nosync` failure? (Python Interface)

Hey,

I ran into another problem today. Apparently it happens very rarely (like once every 1000 peering attempts or so), but I can not explain it with a problem in my code right now so I'm asking it here. I use the Python interface for broker.

I have a node that is told to unpeer one node and peer with another. I use endpoint.peer_nosync(self.tree[number].connection_ip, self.NODE_PORT + number, retry=0) (basically the param for IP is a normal IPv4 IP and the port is a normal integer) for the peering. Normally, this all works just fine. I checked the params and they all check out. But I only get this message 07 Jan 14:39:56 Tree 2: Peering with node '885B6' on IP '172.17.0.29' and Port 9992. and that's all. Normally I should then soon get another message like 07 Jan 14:39:56 Tree 2: Peer added. Node ID: '885B6' IP: '172.17.0.29'. Port: 9992. or at least an error/status that said that the node can not be reached or the peering timed out. But this never happens, even after half an hour of waiting. I only get the status confirmation that the unpeering worked:

07 Jan 14:39:56 Tree 2: Handed over to new potential parent '885B6' on '172.17.0.29'. Unpeering with previous parent, connecting with that one now... 
07 Jan 14:39:56 Tree 2: Peering with node '885B6' on IP '172.17.0.29' and Port 9992. 
07 Jan 14:39:56 Tree 2: Peer removed. Node ID: '19007' IP: '172.17.0.3'. Port: 9992. 

My program catches all errors and status messages that the endpoints throw and every error/status produces logging entries - even if I get an unknown error or status, I still produce a log entry for that. But nothing happens here. Neither the peering endpoint itself nor the endpoint that is the target of the peering make any further mentions in their logs of this attempt to peer. As the .hh file (https://github.com/zeek/broker/blob/master/broker/endpoint.hh) for the endpoint in C++ states (and I hope this is also the case for the python interface), there should always be "a status message indicating the result of the peering operation." I do not get this status message, though. This is in fact a serious problem for me because the program will not make further steps until it learns of the success or failure of its peering attempt.

To me it looks like the peering started after the peer_nosync call (or maybe even didn't, as the other node never says anything) but at some point it fails and the failure is not caught. In case you need further information, please let me know.

Seemingly rare deadlock in (debug) communication unit test

Very rarely, I see a deadlock in the communication.py unit test. So far, I could reproduce it only in debug mode with logging enabled. Here's the stack trace I managed to obtain:

(lldb) process attach -n Python
Process 76505 stopped
* thread #1, queue = 'com.apple.main-thread', stop reason = signal SIGSTOP
    frame #0: 0x00007fff6e723882 libsystem_kernel.dylib`__psynch_cvwait + 10
libsystem_kernel.dylib`__psynch_cvwait:
->  0x7fff6e723882 <+10>: jae    0x7fff6e72388c            ; <+20>
    0x7fff6e723884 <+12>: movq   %rax, %rdi
    0x7fff6e723887 <+15>: jmp    0x7fff6e721629            ; cerror_nocancel
    0x7fff6e72388c <+20>: retq
Target 0: (Python) stopped.

Executable module set to "/usr/local/Cellar/python/3.7.7/Frameworks/Python.framework/Versions/3.7/Resources/Python.app/Contents/MacOS/Python".
Architecture set to: x86_64-apple-macosx-.
(lldb) bt
* thread #1, queue = 'com.apple.main-thread', stop reason = signal SIGSTOP
  * frame #0: 0x00007fff6e723882 libsystem_kernel.dylib`__psynch_cvwait + 10
    frame #1: 0x00007fff6e7e8425 libsystem_pthread.dylib`_pthread_cond_wait + 698
    frame #2: 0x00007fff6b8bb592 libc++.1.dylib`std::__1::condition_variable::wait(std::__1::unique_lock<std::__1::mutex>&) + 18
    frame #3: 0x000000010edd3fcb libcaf_core.0.17.4.dylib`caf::detail::ringbuffer<caf::logger::event, 128ul>::push_back(caf::logger::event&&) + 91
    frame #4: 0x000000010edb0985 libcaf_core.0.17.4.dylib`caf::group_manager::start() + 613
    frame #5: 0x000000010ed0d722 libcaf_core.0.17.4.dylib`caf::actor_system::actor_system(caf::actor_system_config&) + 6434
    frame #6: 0x000000010e4f598e libbroker.1.4.dylib`broker::endpoint::endpoint(broker::configuration) + 1582
    frame #7: 0x000000010e24869d _broker.so`___lldb_unnamed_symbol288$$_broker.so + 61
    frame #8: 0x000000010e276af7 _broker.so`___lldb_unnamed_symbol677$$_broker.so + 3623
    frame #9: 0x000000010d976aed Python`_PyMethodDef_RawFastCallDict + 516
    frame #10: 0x000000010d97611f Python`_PyCFunction_FastCallDict + 41
    frame #11: 0x000000010d9771a3 Python`_PyObject_Call_Prepend + 131
    frame #12: 0x000000010d976569 Python`PyObject_Call + 136
    frame #13: 0x000000010d9b4642 Python`slot_tp_init + 96
    frame #14: 0x000000010d9b135e Python`type_call + 172
    frame #15: 0x000000010d9762d2 Python`_PyObject_FastCallKeywords + 358
    frame #16: 0x000000010da0bad2 Python`call_function + 730

This could be an issue with running two actor systems in the same process. At a first glance, it seems like it could be a CAF bug. However, I didn't dig deep enough yet.

Need help regarding Broker Installation

I tried to install broker on centos, fedora and security-onion but get the same error:

"""""CMake Error at CMakeLists.txt:3 (include):
include could not find load file:

cmake/CommonCMakeConfig.cmake

CMake Error at CMakeLists.txt:5 (find_package):
By not providing "FindLibcaf.cmake" in CMAKE_MODULE_PATH this project has
asked CMake to find a package configuration file provided by "Libcaf", but
CMake did not find one.

Could not find a package configuration file provided by "Libcaf" with any
of the following names:

LibcafConfig.cmake
libcaf-config.cmake

Add the installation prefix of "Libcaf" to CMAKE_PREFIX_PATH or set
"Libcaf_DIR" to a directory containing one of the above files. If "Libcaf"
provides a separate development package or SDK, be sure it has been
installed.

-- Configuring incomplete, errors occurred!"""""

Please guide me. Thanks ahead.

Clean arch-dependent guessing, allow cross-compile

I have been struggling with getting bro (i.e. zeek) package updated in Gentoo and a random issue caught my eye: use of -msse2 CFLAG. I looked around and it seems part of broker starting at around https://github.com/zeek/broker/blob/master/CMakeLists.txt#L378 :

include(TestBigEndian)
test_big_endian(BROKER_BIG_ENDIAN)

include(CheckIncludeFiles)
set(CMAKE_REQUIRED_FLAGS -msse2)
check_include_files(emmintrin.h HAVE_SSE2)
set(CMAKE_REQUIRED_FLAGS)

if (HAVE_SSE2)
  add_definitions(-DBROKER_USE_SSE2 -msse2)
endif ()

Especially when part of another build (zeek) that makes quite a headache....
It was mentioned in zeek/zeek#249 (comment) that "that code isn't actually used at run-time anymore in latest versions of Bro 2.6.x.", may be this can just be purged?

Packaging software, e.g. in Gentoo, a source-based bistro, means that often flexibility such as choosing the target hardware is needed and used.

Aging: More verbose logging

After implementing the time-based aging of routing table entries, @rsmmr also asked for verbose logging output if receiving updates for a blacklisted entry after half of the expiration time has already passed.

Receiving update after 50% of the expiration time has passed may indicate that the configured timeout may be too short.

CAF Dependencies

In the README.rst, under dependenciese, the CAF version is as follows.

"CAF (C++ Actor Framework) version 0.14+"

Making the assumption that the "+" means "this version or later" is incorrect. The Cmake fails with the following error on the current release of CAF.

"CMake Error at CMakeLists.txt:21 (message):
Broker requires CAF version 0.14, older and newer versions are not supported."

I suggest removing the "+" from the dependency requirement.

"Peer added" status message sometimes triggered AFTER the first topic message

Hey,

it's me again. I'm having a little problem here with the timing of status messages. I am simulating a network here on my computer that obviously doesn't really have delays and I believe this is part of the problem, but still I think the problem should not exist.

My Python program has several endpoints that communicate. When an endpoint peers with another endpoint and gets its "Peer added" status, it sends a "HELLO" message to transmit some basic info. Now, my endpoints are programmed not to accept messages from peers they are not connected to, and they update their connection lists through these status messages. If a message arrives from someone that is not in their list, they discard it. This leads to the problem that the "HELLO" message gets lost in some rare cases, because the message subscriber is ready for reading BEFORE the status subscriber (which I check via select() and file descriptors for the subscribers). See this example here:

18 Oct 12:12:44 Tree 1: Message received. Topic: '/connect/78830909B4E33CE60E3E435C005F360572365E1'. Content: '{"sender_id": "DF572413EFA22C7E4F34966686B32C57132F541", "receiver_id": "78830909B4E33CE60E3E435C005F360572365E1", "sender_IP": "172.17.0.7", "descendants": 0, "number": 1, "leaves": 3, "replacement_id": null, "code": "200"}' 
18 Oct 12:12:44 Tree 1: WARNING! Received message from sender not in my connected_nodes list.  
18 Oct 12:12:44 Tree 1: Peer added. Node ID: 'DF572' IP: '::ffff:172.17.0.7'. Port: 41442. 

I do not know how this is handled internally in broker, but is it possible to guarantee that message subscribers will not be ready until the status subscribers say that the peer sending the message has been added? (If not, what are the internal mechanics for that so I can try handling it differently?). I can deal with simultaneousness; my program always prioritizes status subscribers if they are ready at the same time that a message subscriber is, but if the message subscriber actually is ready BEFORE the status subscriber, the program fails. And I do not really want to buffer messages from unknown peers until they are connected as it might lead to even more problems.

Explicit --with-caf root during ./configure does not seem to work

When I try to provide a manual path to ./configure via --with-caf, I get an error:

./configure --generator=Ninja --with-caf=$HOME/code/caf/build
Build Directory : build
Source Directory: .../broker
-- The C compiler identification is AppleClang 9.1.0.9020039
-- The CXX compiler identification is AppleClang 9.1.0.9020039
-- Check for working C compiler: /Library/Developer/CommandLineTools/usr/bin/cc
-- Check for working C compiler: /Library/Developer/CommandLineTools/usr/bin/cc -- works
-- Detecting C compiler ABI info
-- Detecting C compiler ABI info - done
-- Detecting C compile features
-- Detecting C compile features - done
-- Check for working CXX compiler: /Library/Developer/CommandLineTools/usr/bin/c++
-- Check for working CXX compiler: /Library/Developer/CommandLineTools/usr/bin/c++ -- works
-- Detecting CXX compiler ABI info
-- Detecting CXX compiler ABI info - done
-- Detecting CXX compile features
-- Detecting CXX compile features - done
CMake Error at /usr/local/Cellar/cmake/3.12.1/share/cmake/Modules/FindPackageHandleStandardArgs.cmake:137 (message):
  Could NOT find CAF (missing: CAF_LIBRARIES)
Call Stack (most recent call first):
  /usr/local/Cellar/cmake/3.12.1/share/cmake/Modules/FindPackageHandleStandardArgs.cmake:378 (_FPHSA_FAILURE_MESSAGE)
  cmake/FindCAF.cmake:91 (find_package_handle_standard_args)
  CMakeLists.txt:26 (find_package)


-- Configuring incomplete, errors occurred!
See also ".../broker/build/CMakeFiles/CMakeOutput.log".

Can you reproduce this?

inefficient store_event behavior for sets/tables

Hi,

After playing around with it a bunch, I realized that the current behavior of store_events does not really mesh that greatly when combined with storing sets or tables in the store.

Consider the following example. Given an existing brokerstore called store, when executing the following three commands:

	Broker::insert_into_table(tablestore, "table", "5", 5);
	Broker::insert_into_table(tablestore, "table", "6", 5);
	Broker::insert_into_table(tablestore, "table", "7", 5);

This leads to three events. One insert for key table, containing a table {5 -> 5}. And two updates, containing (old_value)->(new_value):
{5 -> 5}->{5 -> 5, 6 -> 5} for the first update and
{5 -> 5, 6 -> 5}->{5 -> 5, 6 -> 5, 7 -> 5} for the second update

I admittedly did not think about this case before - but this is not really optimal behavior. Assuming updates are made to big tables, the events that we are sent get huge - and don't actually reflect the changes that are happening. From the broker debug output I assume that the brokerstore internally actually only propagates the small diff.

So - I think we should change this. I admittedly don't know the internals quite good enough - but if possible it might be neat for table and set updates to only contain information about the indexes that actually changed.

ping.py - Vector passed instead of int

When testing the broker example I got the following error.

warning: failed to convert remote event 'ping' arg #0, got vector, expected int

Here's the fix that worked for me:

 23     ping = broker.bro.Event("ping", [n]);

Had to remove the brackets around "n". Replaced with this.
23 ping = broker.bro.Event("ping", n);

Connection failure using Python bindings

Hi there

I downloaded last Broker (7dab576) based on CAF 0.16.3, and installed it using virtualenv exactly as in tutorial.

I'm working on last SecurityOnion using Bro version 2.6.1.

I tried the ping example, to check it was working:

$ bro ping.bro

Then:

$ . ./venv/bin/activate
(venv)$ python ping.py

But then it stays stuck in call to ep.peer("127.0.0.1", 9999, 10) !

I checked the connection was possible:

$ nc -zv 127.0.0.1 9999
Connection to 127.0.0.1 9999 port [tcp/*] succeeded!

In Wireshark, the TLS hanshake seems to restart continuously, from Client Hello to Application Data.

At least, having both listener and subscriber in same Python script (as in doc) seems to work!

Any idea about what could possibly go wrong?

Thanks!

Use Radix Tree to optimize filter access

We can most likely increase the performance of forwarding decisions by using a radix tree for storing peer filters. Currently, the core_actor/stream_governor simply stores all filters as vector<topic> and iterates them with O(n) on each forwarding decision.

Build fails with libc++-8.0 on case-insensitive file systems

Steps to reproduce:

  1. Configure broker with clang-8 and an accompanying libc++ on darwin. The toolchain can be obtained from Homebrew for example: brew install --with-toolchain llvm
  2. run make in the build dir.

Result:

In file included from /Users/tobim/tenzir/vast/aux/broker/broker/src/data.cc:1:
In file included from /Users/tobim/tenzir/vast/aux/broker/broker/broker/data.hh:4:
In file included from /nix/store/9fykw9zr14n7palmkcd74x6x2x5yraf4-libc++-8.0.0/include/c++/v1/utility:200:
In file included from /nix/store/9fykw9zr14n7palmkcd74x6x2x5yraf4-libc++-8.0.0/include/c++/v1/__tuple:15:
In file included from /nix/store/9fykw9zr14n7palmkcd74x6x2x5yraf4-libc++-8.0.0/include/c++/v1/cstddef:38:
/Users/tobim/tenzir/vast/aux/broker/broker/version:1:1: error: expected unqualified-id
1.2.0-4

This happens because a header is going to be added to the standard library in C++20, and libc++ already implements it. Technically this is a bug in libc++, as the header should probably not be included with -std=c++17 and lower, however I consider the chances to get it "fixed" upstream as low.

I suggest to move the broker directory to include/broker so the projects root directory does not need to be added to the include directories.

Broker event queues behaviour

I've got some questions on broker events and how they behave in edge cases as i haven't found any details covering it in documentation.

  • Will it cause a memory leak in zeek when i publish events in some script.bro to some topic with no alive subscriber? As i see that does not happen, store.bro documentation does not actually figures it out

  • How to investigate a size of published to some topic events queue using broctl/programmatically?

Thanks in advance for the clarification

Making Broker fails test cases twice with TRACE logs for caf

Hey,

so I did try making Broker in my Docker container with logging level TRACE enabled for CAF in order to find the source of the peering problems. Unfortunately the make failed on two test cases. I will remake this with --no-cache in order to see if I maybe did something wrong, but if I didn't, then there's another problem to be fixed.

I used a docker container with these settings:

FROM python as broker

RUN git clone https://github.com/bro/broker --recursive
RUN apt-get update && apt-get install -y openssl \
    gcc \
    cmake

RUN git clone https://github.com/actor-framework/actor-framework.git
RUN cd actor-framework && ./configure --with-log-level=TRACE && cd build && make && make install

RUN cd broker && ./configure --with-caf=/usr/local/actor-framework && make && make test && make install

The error that came was this (I only included the last part before the test):

make[1]: Entering directory '/broker/build'
Running tests...
Test project /broker/build
      Start  1: backend
 1/19 Test  #1: backend ..........................   Passed    0.13 sec
      Start  2: bro
 2/19 Test  #2: bro ..............................   Passed    0.02 sec
      Start  3: core
 3/19 Test  #3: core .............................   Passed    0.22 sec
      Start  4: data
 4/19 Test  #4: data .............................   Passed    0.02 sec
      Start  5: status_subscriber
 5/19 Test  #5: status_subscriber ................   Passed    0.02 sec
      Start  6: integration
 6/19 Test  #6: integration ......................***Timeout  20.00 sec
+----------------------------------------------------------------------+
                              integration
+----------------------------------------------------------------------+

- topic_prefix_matching_async_subscribe
  -> prepare connections [line 273]
  -> start listening on mercury:4040 [line 283]
  -> peer venus to mercury:4040 [line 288]
  -> peer earth to mercury:4040 [line 291]
  -> assume two peers for mercury [line 310]
** /broker/tests/cpp/integration.cc:313  ::caf::test::equal_to(mercury_peers.size(), 2u)
** /broker/tests/cpp/integration.cc:314  ::caf::test::equal_to(mercury_peers.front().status, peer_status::peered)
** /broker/tests/cpp/integration.cc:315  ::caf::test::equal_to(mercury_peers.back().status, peer_status::peered)
  -> assume one peer for venus [line 316]
** /broker/tests/cpp/integration.cc:319  ::caf::test::equal_to(venus_peers.size(), 1u)
** /broker/tests/cpp/integration.cc:320  ::caf::test::equal_to(venus_peers.front().status, peer_status::peered)
  -> assume one peer for earth [line 321]
** /broker/tests/cpp/integration.cc:324  ::caf::test::equal_to(earth_peers.size(), 1u)
** /broker/tests/cpp/integration.cc:325  ::caf::test::equal_to(earth_peers.front().status, peer_status::peered)
  -> subscribe to 'bro/events' on venus [line 326]
  -> subscribe to 'bro/events/failures' on earth [line 328]
  -> verify subscriptions [line 330]
** /broker/tests/cpp/integration.cc:336  ::caf::test::equal_to(mercury.ep.peer_subscriptions(), filter({"bro/events", "bro/events/failures"}))
** /broker/tests/cpp/integration.cc:338  ::caf::test::equal_to(venus.ep.peer_subscriptions(), filter({}))
** /broker/tests/cpp/integration.cc:340  ::caf::test::equal_to(earth.ep.peer_subscriptions(), filter({}))
  -> publish to 'bro/events/(logging|failures)' on mercury [line 341]
  -> push2values downstream [line 202]
  -> push2values downstream [line 202]
  -> verify published data [line 344]
** /broker/tests/cpp/integration.cc:348  ::caf::test::equal_to(mercury.data, data({}))
** /broker/tests/cpp/integration.cc:352  ::caf::test::equal_to(venus.data, data({{"bro/events/failures", "oops"}, {"bro/events/failures", "sorry!"}, {"bro/events/logging", 123}, {"bro/events/logging", 456}}))
** /broker/tests/cpp/integration.cc:354  ::caf::test::equal_to(earth.data, data({{"bro/events/failures", "oops"}, {"bro/events/failures", "sorry!"}}))
  -> shut down earth [line 151]

      Start  7: master
 7/19 Test  #7: master ...........................   Passed    0.11 sec
      Start  8: publisher
 8/19 Test  #8: publisher ........................   Passed    0.05 sec
      Start  9: radix_tree
 9/19 Test  #9: radix_tree .......................   Passed    1.08 sec
      Start 10: ssl
10/19 Test #10: ssl ..............................***Exception: Other  0.23 sec
+----------------------------------------------------------------------+
                                  ssl
+----------------------------------------------------------------------+

- authenticated_session
** /broker/tests/cpp/ssl.cc:30   test_dir
  -> using certififcate , key  [line 35]
** /broker/tests/cpp/ssl.cc:30   test_dir
  -> using certififcate , key  [line 35]
** /broker/tests/cpp/ssl.cc:30   test_dir
  -> using certififcate , key  [line 35]
  -> prepare authenticated connection [line 74]
  -> mercury_auth listen [line 78]
  -> venus_auth peer with mecury_auth on port 40807 [line 80]
** /broker/tests/cpp/ssl.cc:82   b
  -> mercury_auth sending ping [line 88]
  -> venus_auth waiting for ping [line 90]
** /broker/tests/cpp/ssl.cc:91   ::caf::test::equal_to(venus_auth_es.get(), ping)
** /broker/tests/cpp/ssl.cc:92   mercury_auth_es.poll().empty()
** /broker/tests/cpp/ssl.cc:93   venus_auth_es.poll().empty()
  -> venus_auth sending pong [line 95]
  -> mercury_auth waiting for pong [line 97]
** /broker/tests/cpp/ssl.cc:98   ::caf::test::equal_to(mercury_auth_es.get(), pong)
** /broker/tests/cpp/ssl.cc:99   mercury_auth_es.poll().empty()
** /broker/tests/cpp/ssl.cc:100  venus_auth_es.poll().empty()
  -> disconnect venus_auth from mercury_auth [line 102]
  -> venus_auth to shutdown [line 104]
  -> mercury_auth to shutdown [line 106]
  -> all done [line 108]
  -> 10 checks took 76 ms

- authenticated_failure_no_ssl_peer
** /broker/tests/cpp/ssl.cc:30   test_dir
  -> using certififcate , key  [line 35]
** /broker/tests/cpp/ssl.cc:30   test_dir
  -> using certififcate , key  [line 35]
** /broker/tests/cpp/ssl.cc:30   test_dir
  -> using certififcate , key  [line 35]
  -> prepare authenticated connection expected to fail [line 112]
  -> earth_no_auth listen [line 113]
  -> venus_auth peer with earth_no_auth on port 39727 [line 116]
!! /broker/tests/cpp/ssl.cc:118  not b
     REQUIRED: not b
     /broker/tests/cpp/ssl.cc:30   had last successful check

      Start 11: store
11/19 Test #11: store ............................   Passed    1.22 sec
      Start 12: subscriber
12/19 Test #12: subscriber .......................   Passed    0.04 sec
      Start 13: topic
13/19 Test #13: topic ............................   Passed    0.01 sec
      Start 14: python-communication
14/19 Test #14: python-communication .............   Passed    1.62 sec
      Start 15: python-data
15/19 Test #15: python-data ......................   Passed    0.09 sec
      Start 16: python-forwarding
16/19 Test #16: python-forwarding ................   Passed    5.12 sec
      Start 17: python-ssl
17/19 Test #17: python-ssl .......................   Passed    0.49 sec
      Start 18: python-store
18/19 Test #18: python-store .....................   Passed    4.97 sec
      Start 19: python-topic
19/19 Test #19: python-topic .....................   Passed    0.08 sec

89% tests passed, 2 tests failed out of 19

Total Test time (real) =  35.53 sec

The following tests FAILED:
	  6 - integration (Timeout)
	 10 - ssl (OTHER_FAULT)
Errors while running CTest
make[1]: *** [test] Error 8
Makefile:127: recipe for target 'test' failed
make[1]: Leaving directory '/broker/build'
make: *** [test] Error 2
Makefile:14: recipe for target 'test' failed

Broker fails to serialize fa_file record

Tried to use Broker to extract files and pass them to python script without saving it to disk, so I added Files::ANALYZER_DATA_EVENT analyzer with stream event, in which I publish extracted data and fa_file to some topic /files.

event manager_data_event(f: fa_file, data: string)
    {
        Broker::publish("/files", manager_data_event, f, data);
    }

event worker_data_event(f: fa_file, data: string)
    {
        Broker::publish(Cluster::manager_topic, manager_data_event, f, data);
    }

event file_sniff(f: fa_file, meta: fa_metadata)
    {
        Files::add_analyzer(f, Files::ANALYZER_DATA_EVENT, [$stream_event=worker_data_event]);
    }

And my python subscriber script:

ep = broker.Endpoint()
sub = ep.make_subscriber("/files")
ss = ep.make_status_subscriber(True)
ep.peer(BC_ADDR, BC_PORT)
st = ss.get()

while True:
    (t, d) = sub.get()
     .....

fails with the following:

Traceback (most recent call last):
  File "./script.py", line 203, in <module>
    (t, d) = sub.get()
  File "/usr/lib/python3/dist-packages/broker/__init__.py", line 100, in get
    return (msg[0].string(), Data.to_py(msg[1]))
  File "/usr/lib/python3/dist-packages/broker/__init__.py", line 456, in to_py
    return converters[d.get_type()]()
  File "/usr/lib/python3/dist-packages/broker/__init__.py", line 452, in <lambda>
    Data.Type.Vector: lambda: to_vector(d.as_vector())
  File "/usr/lib/python3/dist-packages/broker/__init__.py", line 429, in to_vector
    return [Data.to_py(i) for i in v]
  File "/usr/lib/python3/dist-packages/broker/__init__.py", line 429, in <listcomp>
    return [Data.to_py(i) for i in v]
  File "/usr/lib/python3/dist-packages/broker/__init__.py", line 456, in to_py
    return converters[d.get_type()]()
  File "/usr/lib/python3/dist-packages/broker/__init__.py", line 452, in <lambda>
    Data.Type.Vector: lambda: to_vector(d.as_vector())
  File "/usr/lib/python3/dist-packages/broker/__init__.py", line 429, in to_vector
    return [Data.to_py(i) for i in v]
  File "/usr/lib/python3/dist-packages/broker/__init__.py", line 429, in <listcomp>
    return [Data.to_py(i) for i in v]
  File "/usr/lib/python3/dist-packages/broker/__init__.py", line 456, in to_py
    return converters[d.get_type()]()
  File "/usr/lib/python3/dist-packages/broker/__init__.py", line 452, in <lambda>
    Data.Type.Vector: lambda: to_vector(d.as_vector())
  File "/usr/lib/python3/dist-packages/broker/__init__.py", line 429, in to_vector
    return [Data.to_py(i) for i in v]
  File "/usr/lib/python3/dist-packages/broker/__init__.py", line 429, in <listcomp>
    return [Data.to_py(i) for i in v]
  File "/usr/lib/python3/dist-packages/broker/__init__.py", line 456, in to_py
    return converters[d.get_type()]()
  File "/usr/lib/python3/dist-packages/broker/__init__.py", line 452, in <lambda>
    Data.Type.Vector: lambda: to_vector(d.as_vector())
  File "/usr/lib/python3/dist-packages/broker/__init__.py", line 429, in to_vector
    return [Data.to_py(i) for i in v]
  File "/usr/lib/python3/dist-packages/broker/__init__.py", line 429, in <listcomp>
    return [Data.to_py(i) for i in v]
  File "/usr/lib/python3/dist-packages/broker/__init__.py", line 456, in to_py
    return converters[d.get_type()]()
  File "/usr/lib/python3/dist-packages/broker/__init__.py", line 449, in <lambda>
    Data.Type.Table: lambda: to_table(d.as_table()),
  File "/usr/lib/python3/dist-packages/broker/__init__.py", line 426, in to_table
    return {Data.to_py(k): Data.to_py(v) for (k, v) in t.items()}
  File "/usr/lib/python3/dist-packages/broker/__init__.py", line 426, in <dictcomp>
    return {Data.to_py(k): Data.to_py(v) for (k, v) in t.items()}
TypeError: unhashable type: 'list'

Passing any other arguments works correct, including passing only data without fa_file.

Also probably any struct and record should be converted to Python dict?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.