libertyaces / bitswanpump Goto Github PK
View Code? Open in Web Editor NEWBitSwan Pump: A real-time stream processor for Python 3.5+
Home Page: https://bitswanpump.readthedocs.io
License: BSD 3-Clause "New" or "Revised" License
BitSwan Pump: A real-time stream processor for Python 3.5+
Home Page: https://bitswanpump.readthedocs.io
License: BSD 3-Clause "New" or "Revised" License
Pipeline 'Pipeline' stopped due to a processing error: 'list_iterator' object is not callable (<class 'TypeError'>)
The idea behind the AnalyzingSource
is that the analyze()
method of the matrix could be triggered by a source that will expect that the output of the analysis will be fed into a pipeline that contains AnalyzingSource
. The cycle could be configured to be a time or an PubSub event - but it will also be defined by a duty cycle of the pipeline - analyze()
will be called only if the pipeline is not throttled, which means that this system has a self-load-balancing property.
BitSwanPump/bspump/socket/udp.py
Line 30 in 30b543f
https://stackoverflow.com/questions/28563518/buffer-size-for-reading-udp-packets-in-python
self.Socker.setsockopt(socket.SOL_SOCKET,vsocket.SO_RCVBUF, [value-from-config])
name of the config item:
"receiver_buffer_size"
Eg. for "acks" KafkaProducer expects value to be one of (0, 1, -1, "all"), but ASAB file configuration always passes value of type string (eg "-1"), which fails with ValueError("Invalid ACKS parameter")
.
The same issue is for all int and boolean parameters. With these KafkaProducer raises error, or ignores invalid string value.
Currently it is not possible to get information about table name, where the modification occured.
The loop should stay blocked on the end of bin-log and wait for next replication events
Version: 20.03.04 and current master
With secondary pipeline throttling propagation mechanism can happen, that pipeline is throttled more than once in short time. In this case KafkaSource#_not_ready_handler
is running twice at the same time and causes issues in _commit
error recovery. If _commit
fails with recoverable error, it starts spawning connections exponentially, until too many open files
and Device or resource busy
is raised and application crashes.
This issue was introduced in commit 4a8f24b
Reproducer application:
https://gist.githubusercontent.com/bedlaj/ccc602d8bbccb2abd890eb60dfe92840/raw/a8db22c54ab54edac1f8d90405f7fb914f2e6c05/bspump-kafka-fail.py
Possible fix can be introduce self._commit_ready = asyncio.Event(loop=app.Loop)
and use this in _commit
method to ensure the error recovery is not running in parallel.
BitSwanPump/bspump/abc/matrix.py
Line 99 in 02a19be
matrix fields should be set to defalt set when calling close_row() because when we use numpy functions directly on matrix we will get hits from closed rows till they are reused
https://github.com/LibertyAces/BitSwanPump/blob/master/bspump/elasticsearch/sink.py#L29 ... remove "time", put "fixed"
not time_window_id or sessions_id in SA
timewindowanalyzer have default value analyze_on_clock=False, but timewindowmatrix have default value clock_driven=True,
While meaning is somewhat different this should still be unified to prevent confusion when inicialized separately (usualy when analyzer is not wall clock driven than matrix wont be as well)
The current concept of column_names and column_formats is unfortunately broken and has to be refactored:
The matrix has following dimensions:
row
... the dimension that is expected to grow and shrink over the lifetime of the matrixcolumn
) and more ... optional dimensions that are more or less fixed, change is possible but unlikelydtype
... a data type of the cell (!!!) of the matrix, this one is given and fixed over the whole lifecycle of the matrix. It is expected that an user will define his/her type.So creating dtype
from column_*
is incorrect.
SessionAnalyzer uses 1D matrix with arbitrary dtype
.
TimeWindowAnalyzer and GeoAnalyzer uses 2D matrix with arbitrary dtype
.
when max_partition_fetch_bytes is too high or if there is problem in processing (like slow lookup)
than kafka source is uneable to commit in time, stalls and is uneable to recover.
This way any unexpected small network/db glitch can indirectly kill whole processing and forces BSpump restart
Replace None
values with real defaults in FileCSVSink.ConfigDefaults
Python's csv formating documentation
Related change in asab
column_formats
, column_names
should be optional argument, because if I specify matrix_id
, I don't have to specify column_formats
, column_names
(+ it is confusing)
Add a check for providing either column_formats
, column_names
OR matrix_id
... but in the signature all mentioned arguments are optional.
Document this well (say specify column_formats
, column_names
to create own matrix OR matrix_id
to use existing matrix.
Add column_names = column_names[:]
There is unreachable code in
BitSwanPump/bspump/pipeline.py
Line 176 in 372a592
A recent refactoring broken the pypy support.
It should be relatively easy to fix that.
BitSwanPump/bspump/pipeline.py
Line 216 in 5a81ecd
21-Aug-2019 22:49:16.978449 ERROR root Task exception was never retrieved
{'future': <Task finished coro=<DHCPDeviceTracker.throttle() done, defined at /var/pumpy/DHCP/custommodules/DHCPDeviceTracker.py:101> exception=KeyError(DHCPDeviceTracker('DhcpEventInPipeline.DHCPDeviceTracker'))>}
Traceback (most recent call last):
File "/var/pumpy/DHCP/custommodules/DHCPDeviceTracker.py", line 105, in throttle
self.pipeline.throttle(self, enable=False)
File "/usr/local/lib/python3.7/site-packages/bspump/pipeline.py", line 216, in throttle
self._throttles.remove(who)
KeyError: DHCPDeviceTracker('DhcpEventInPipeline.DHCPDeviceTracker')
21-Aug-2019 22:49:17.032356 ERROR root Task exception was never retrieved
KeyError
should clearly says "Object ... is not present among throttlers"
and replace client-side filtering.
BitSwanPump/bspump/analyzer/analyzer.py
Line 22 in 75c3c2e
def predicate(self, context, event):
def evaluate(self, context, event):
It should start streaming from start, but no event is transmitted if Config['log_file'] = '' (Default)
A bunch of useful metrics has to be added into Matrix implementation - such as number of row (total and closed), number of columns
output_bucket_max_size
needs to be casted to int.
And timeout
parameter is not used at all.
BitSwanPump/bspump/file/filecsvsource.py
Line 15 in b683de3
Add a bspump.ipc
module with a Sink and Source that allows to interconnect pipelines in different processed using UDP, TCP and Unix Streams/Datagrams.
*
which could be a default value)self.Table
to be renamed to self.From
+ officially support (that includes a documentation) a more complex way how to specify what to select ... see example below'table': ' `DHCPSTATUS` JOIN RADIUS_USERDB_O2TV ON AP_SERVICEID = CETINSERVICEID JOIN RADIUSUSERS ON CASEID = TV '
... so that a BitSwan pipeline can read/write files from/to Google Drive.
We will likely start with "write" (Sink) part.
It is a special type of InternalSource
but instead of Queue, there is an user-provided aggregation function.
It means that incoming events are aggregated by such a function (e.g. count, sum or moving average) and when the secondary pipeline (with InternalSource
) is ready, it picks up that aggregation.
Possible scenario: the secondary pipeline is not saturated, so no aggregation is happening. Maybe some minimum number (of time and/or events) to be defined that enforces the aggregation if needed.
following (valid) configuration breaks the lookup
``` config = { 'from': '
DHCPSTATUS` JOIN RADIUS_USERDB_O2TV ON AP_SERVICEID = CETINSERVICEID JOIN RADIUSUSERS ON CASEID = TV ',
'key': 'SUBNET',
'statement': ' SUBNET, KRAJ, UNIX_TIMESTAMP() as lookup_fetch_time ',
}
where `_count(self)` fetches mysql error ...
option to specify SQL template is questionable solution since we are uneable to modify arguments...
another point is what value this function actualy have since there are scenarios (like mine) where this function will say 250k ... but there is actualy only 10k uniqe key-value pairs ... since function is counting amount of rows but not uniqe rows so its questionable how reliable this is
BitSwanPump/bspump/analyzer/analyzer.py
Line 22 in 75c3c2e
... currently it is only in TimeWindowAnalyzer but it should be available also for SA and others.
The current lookup convention violates the standard set by other top-level objects (Sinks, Pipelines, etc.).
E.g. here:
Correct example:
def __init__(self, app, connection, id=None, config=None):
Incorrect example:
def __init__(self, app, lookup_id, mysql_connection, config=None):
Spotted deviances:
app
argument)id
/lookup_id
should be None
with a default generated from a class namemysql_connection
... only connection
, lookup_id
argument should be called id
)Every lookup needs to be adjusted across the whole BitSwan BSPump
BitSwanPump/bspump/kafka/source.py
Line 38 in 52ee17b
When there is more complex (slow ) processing in the pipeline it may timeout from kafka side and sourcre crashes (first commit gets error, retry fail as well)
Default value should be significantly more defensive (10% of current?)
User can allways incerease this value if he feels its required
This will allow to continue streaming events from last position persisted on master node.
Top-level objects are:
Members to be added:
.Loop
.time()
.App
BitSwanPump/bspump/mysql/lookup.py
Line 61 in 30b543f
It will fail when multiple fields are used. Should be:
SELECT COUNT(*) FROM {...}
The statement field has no value and only breaks things when multiple fields are specified.
Currently there is hardcoded operation "index".
"_id" cannot be set from user code
There is no option to set target index dynamically
Other metadata (E.g. "doc_as_upsert") cannot be set from user code.
Currently, a strorage-backed lookups (such as MySQL or Mongo lookups) provides a very naive implementation of the cache in form of dictionary. E.g. :
BitSwanPump/bspump/mysql/lookup.py
Line 44 in 65b289d
This needs to be extended so that the user can specify cache replacement policy that suits his purpose the best, including a custom caching strategy, implemented by a user completely (see https://en.wikipedia.org/wiki/Cache_replacement_policies fo theory).
A cache replacement policy, respectively the cache itself is implemented as a dictionary. The lookup expects the interface of MutableMapping as defined in https://docs.python.org/3/library/collections.abc.html#collections.abc.MutableMapping .
The lookup should accept an optional parameter cache
that will be used for a cache attribute (default is a current dict()
until we have more sane implementation).
Additionally, we should implement bspump.cache
package with a few cache replacement policies (e.g. bspump.cache.LRUCacheDict()
...).
Some lookups can be very huge so they cannot be fully initialized during a start of the BSPump.
When mysql sink recieves value like integer (will be relevant for other non strings as well) than it crashes
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.