Code Monkey home page Code Monkey logo

bitswanpump's People

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

bitswanpump's Issues

Add an AnalyzingSource

image

The idea behind the AnalyzingSource is that the analyze() method of the matrix could be triggered by a source that will expect that the output of the analysis will be fed into a pipeline that contains AnalyzingSource. The cycle could be configured to be a time or an PubSub event - but it will also be defined by a duty cycle of the pipeline - analyze()will be called only if the pipeline is not throttled, which means that this system has a self-load-balancing property.

[KafkaSink] Some producer params cannot be used with ASAB file configuration

Eg. for "acks" KafkaProducer expects value to be one of (0, 1, -1, "all"), but ASAB file configuration always passes value of type string (eg "-1"), which fails with ValueError("Invalid ACKS parameter").

The same issue is for all int and boolean parameters. With these KafkaProducer raises error, or ignores invalid string value.

[KafkaSource] Application may crash if pipeline is throttled during kafka error recovery

Version: 20.03.04 and current master

With secondary pipeline throttling propagation mechanism can happen, that pipeline is throttled more than once in short time. In this case KafkaSource#_not_ready_handler is running twice at the same time and causes issues in _commit error recovery. If _commit fails with recoverable error, it starts spawning connections exponentially, until too many open files and Device or resource busy is raised and application crashes.

This issue was introduced in commit 4a8f24b

Reproducer application:
https://gist.githubusercontent.com/bedlaj/ccc602d8bbccb2abd890eb60dfe92840/raw/a8db22c54ab54edac1f8d90405f7fb914f2e6c05/bspump-kafka-fail.py

Log:
https://gist.githubusercontent.com/bedlaj/ccc602d8bbccb2abd890eb60dfe92840/raw/a8db22c54ab54edac1f8d90405f7fb914f2e6c05/bspump-kafka-fail.log

Possible fix can be introduce self._commit_ready = asyncio.Event(loop=app.Loop) and use this in _commit method to ensure the error recovery is not running in parallel.

Default value for analyze_on_clock inconsistent between timewindowmatrix and timewindowanalyzer

def __init__(self, app, dtype='float_', start_time=None, resolution=60, columns=15, clock_driven=True, id=None, config=None):

def __init__(self, app, pipeline, matrix_id=None, dtype='float_', columns=15, analyze_on_clock=False, resolution=60,

timewindowanalyzer have default value analyze_on_clock=False, but timewindowmatrix have default value clock_driven=True,
While meaning is somewhat different this should still be unified to prevent confusion when inicialized separately (usualy when analyzer is not wall clock driven than matrix wont be as well)

Matrix should expect dtype instead of column_names and column_formats

The current concept of column_names and column_formats is unfortunately broken and has to be refactored:

The matrix has following dimensions:

  • 1st is a row ... the dimension that is expected to grow and shrink over the lifetime of the matrix
  • 2nd (column) and more ... optional dimensions that are more or less fixed, change is possible but unlikely
  • dtype ... a data type of the cell (!!!) of the matrix, this one is given and fixed over the whole lifecycle of the matrix. It is expected that an user will define his/her type.

So creating dtype from column_* is incorrect.

SessionAnalyzer uses 1D matrix with arbitrary dtype.
TimeWindowAnalyzer and GeoAnalyzer uses 2D matrix with arbitrary dtype.

Kafka Source stalls event processing becomes non-interactive

#30

when max_partition_fetch_bytes is too high or if there is problem in processing (like slow lookup)

than kafka source is uneable to commit in time, stalls and is uneable to recover.
This way any unexpected small network/db glitch can indirectly kill whole processing and forces BSpump restart

Matrix configuration should be optional in the *analyzer constructors

column_formats, column_names should be optional argument, because if I specify matrix_id, I don't have to specify column_formats, column_names (+ it is confusing)

Add a check for providing either column_formats, column_names OR matrix_id ... but in the signature all mentioned arguments are optional.

Document this well (say specify column_formats, column_names to create own matrix OR matrix_id to use existing matrix.

def __init__(self, app, pipeline, column_formats, column_names, analyze_on_clock=False, matrix_id=None, id=None, config=None):
... and other analyzers.

Improve the error message in the exception when throttling/unthrottling

self._throttles.remove(who)

  1. When unthrottling a pipeline with an object that is not present among throttles:
21-Aug-2019 22:49:16.978449 ERROR root Task exception was never retrieved
{'future': <Task finished coro=<DHCPDeviceTracker.throttle() done, defined at /var/pumpy/DHCP/custommodules/DHCPDeviceTracker.py:101> exception=KeyError(DHCPDeviceTracker('DhcpEventInPipeline.DHCPDeviceTracker'))>}
Traceback (most recent call last):
 File "/var/pumpy/DHCP/custommodules/DHCPDeviceTracker.py", line 105, in throttle
   self.pipeline.throttle(self, enable=False)
 File "/usr/local/lib/python3.7/site-packages/bspump/pipeline.py", line 216, in throttle
   self._throttles.remove(who)
KeyError: DHCPDeviceTracker('DhcpEventInPipeline.DHCPDeviceTracker')
21-Aug-2019 22:49:17.032356 ERROR root Task exception was never retrieved

KeyError should clearly says "Object ... is not present among throttlers"

  1. Similar message (and check) should appear when throttling and the object IS already present.

Improvements in a Kafka sink/source/connection

  • Add a support for Snappy, LZ4 a GSSAPI (especially in the official Docker container)
  • Add a API to Kafka Source to set offset for easier debugging
  • Add a key based consuming for a Kafka Source

MySQL lookup improvements

  • Allow to specify fields in the select (currently, there is hardcoded * which could be a default value)
  • self.Table to be renamed to self.From + officially support (that includes a documentation) a more complex way how to specify what to select ... see example below
'table': ' `DHCPSTATUS` JOIN RADIUS_USERDB_O2TV ON AP_SERVICEID = CETINSERVICEID JOIN RADIUSUSERS ON CASEID = TV '

Add a support for Google Drive

... so that a BitSwan pipeline can read/write files from/to Google Drive.
We will likely start with "write" (Sink) part.

  • Connection
  • Source
  • Sink

Add an "ReducingSource"

It is a special type of InternalSource but instead of Queue, there is an user-provided aggregation function.
It means that incoming events are aggregated by such a function (e.g. count, sum or moving average) and when the secondary pipeline (with InternalSource) is ready, it picks up that aggregation.

Possible scenario: the secondary pipeline is not saturated, so no aggregation is happening. Maybe some minimum number (of time and/or events) to be defined that enforces the aggregation if needed.

Mysql lookup _count(self) is Broken

following (valid) configuration breaks the lookup
``` config = { 'from': 'DHCPSTATUS` JOIN RADIUS_USERDB_O2TV ON AP_SERVICEID = CETINSERVICEID JOIN RADIUSUSERS ON CASEID = TV ',
'key': 'SUBNET',
'statement': ' SUBNET, KRAJ, UNIX_TIMESTAMP() as lookup_fetch_time ',
}

where `_count(self)` fetches mysql error ...
option to specify SQL template is questionable solution since we are uneable to modify arguments...

another point is what value this function actualy have since there are scenarios (like mine) where this function will say 250k ... but there is actualy only 10k uniqe key-value pairs ... since function is counting amount of rows but not uniqe rows so its questionable how reliable this is

Correct a lookup constructor code convention

The current lookup convention violates the standard set by other top-level objects (Sinks, Pipelines, etc.).

E.g. here:

def __init__(self, app, lookup_id, es_connection, config=None):

Correct example:
def __init__(self, app, connection, id=None, config=None):

Incorrect example:
def __init__(self, app, lookup_id, mysql_connection, config=None):

Spotted deviances:

  • Argument order is wrong (connection goes after app argument)
  • id/lookup_id should be None with a default generated from a class name
  • No qualifying prefixes such as (mysql_connection ... only connection, lookup_id argument should be called id)

Every lookup needs to be adjusted across the whole BitSwan BSPump

Allow multiple keys in ElasticSearch lookup

ES lookup should allow multiple keys + Add functionality to filter in range of times.

Example - For input {'epgTime': '2019-09-02 01:35:22', 'channelId': 2} the lookup should return 'epgId': 105
image

Provide a possibility to specify the cache replacement policy for storage backed lookups

Currently, a strorage-backed lookups (such as MySQL or Mongo lookups) provides a very naive implementation of the cache in form of dictionary. E.g. :

self.Cache = {}

This needs to be extended so that the user can specify cache replacement policy that suits his purpose the best, including a custom caching strategy, implemented by a user completely (see https://en.wikipedia.org/wiki/Cache_replacement_policies fo theory).

A cache replacement policy, respectively the cache itself is implemented as a dictionary. The lookup expects the interface of MutableMapping as defined in https://docs.python.org/3/library/collections.abc.html#collections.abc.MutableMapping .
The lookup should accept an optional parameter cache that will be used for a cache attribute (default is a current dict() until we have more sane implementation).

Additionally, we should implement bspump.cache package with a few cache replacement policies (e.g. bspump.cache.LRUCacheDict() ...).

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.