justtrackio / gosoline Goto Github PK
View Code? Open in Web Editor NEWGosoline is our framework which fuels all of our Golang applications
License: MIT License
Gosoline is our framework which fuels all of our Golang applications
License: MIT License
So far, only the fetching of input data of any pipeline or consumer is parallelized, e.g. multiple go processes can read from the same SQS queue. The processing of the read messages still is performed within a single thread. I would like to do that in parallel aswell, so the resources can be better utilized within one application. This also enables better balancing of input / processing / output.
Perhaps the batch consumer and the pipeline can be unified, as they share quite some similarities.
Add test-component for sns.
Based on the implementation of #169
test.config.yaml
.localhost
as a default hostname.create a uuid service
the sns topic functions partially don't use the executor yet
replace the redis client by a configurable kvstore in the currency service in pkg/currency/service.go:20
the config could look like this:
kvstore:
currency:
type: chain
application: example
elements: [redis, ddb]
Modify the mysql fixture writer to insert an item if it doesn't already exist and update it otherwise.
The ports for the integration tests are static.
This prevents us from running multiple tests at the same time on one CI instance.
We should instead randomize the docker assigned port, so we can run as many tests as we have resources for.
When the kernel is starting up, all modules are ranged over and the the Run function of those modules is called.
However, due to the current nature of the modules there is no reporting back when all modules are in a usable state.
This can be easily seen when doing integration tests against the api server (repeat them multiple times and see how your client suddenly fails to get a reponse from the given port).
Also this can be seen in the logs where you see that the kernel is up and running, while no module has yet reported to be running.
Currently it is supported by the sqs has a config aws_sqs_autoCreate
but sns not.
It would be nice to be able to disable the auto-create functionality for the sns too
If we write metrics to cloudwatch, we do not retry beyond the normal retries by the AWS sdk. As we already know from other places that these are sometimes not enough, we could add our own retry and backoff logic there, too.
See for example gosoline/pkg/mon/metric_writer_cw.go:144
We already have #196 for mysql.
Now we want also purge dynamo db.
We do need a partitioner for writing parquet with AWS Firehose.
This is required because we might not always want to partition by the CreatedAt of a struct.
Defining a struct like this will fail at the moment:
type Example struct {
Id uint `ddb:"key=hash"`
Enabled bool `ddb:"key=range"`
}
It fails because we don't allow ddb
tags on bool fields.
I think it could be fixed here, but haven't had the time to look further yet:
https://github.com/applike/gosoline/blob/master/pkg/ddb/metadata_factory.go#L288
check for panics and recover properly..
eg, nil pointer panics
I tried to add the following test, but it fails sometimes:
type fastExitModule struct{}
func (f fastExitModule) Boot(config cfg.Config, logger mon.Logger) error {
return nil
}
func (f fastExitModule) Run(ctx context.Context) error {
return nil
}
type slowExitModule struct {
kernel kernel.Kernel
}
func (s slowExitModule) Boot(config cfg.Config, logger mon.Logger) error {
return nil
}
func (s slowExitModule) Run(ctx context.Context) error {
s.kernel.Stop("slowly")
return nil
}
func TestModuleFastShutdown(t *testing.T) {
config, logger, _ := createMocks()
assert.NotPanics(t, func() {
k := kernel.New(config, logger)
for s := 5; s < 10; s++ {
k.Add("exit-fast", &fastExitModule{}, kernel.ModuleStage(s))
k.Add("exit-slow", &slowExitModule{
kernel: k,
}, kernel.ModuleStage(s))
}
k.Run()
})
}
The fast-exit module should not be needed, but it increases your chances of observing the bug. It does not happen every time.
Scenario:
Given an application, which is running and initializes the currency in its redis store at the startup via the currency module (gosoline/pkg/currency/module.go:17).
After application startup, the redis container is restarted / recreated / flushed, thus looses all keys stored.
Current:
Any subsequent calls to the currency service fail with "Currency not found", as the redis content is empty.
Expected:
The empty redis content is detected, and the currencies are refreshed.
The current implementation of acknowleding an sqs input message does not properly separate the domains of sqs and application.
Introduce a type wrapping the sqs related information and the message content, and feed it into the pipeline(s).
gosoline/pkg/stream/pipeline.go:158
instead of providing the name only to NewRedisClient
, there should be a settings struct with:
type Settings struct {
Name string
Backoff SettingsBackoff
}
type SettingsBackoff struct {
InitialInterval time.Duration
RandomizationFactor float64
Multiplier float64
MaxInterval time.Duration
MaxElapsedTime time.Duration
}
func NewRedisClient(logger mon.Logger, client baseRedis.Cmdable, settings *Settings) Client {
use the current values as default values for the backoff if they are not set via settings
Given i do a go test -tags integration ./...
, i get failures.
I believe these are due to the docker containers being still removed while the next integration test is already running and then compaining about that process:
go test -tags integration ./...
go: finding github.com/applike/gosoline v0.0.24
...
dynamodb component of type dynamodb is ready
could not start gosoline_test_mysql container
: container already exists
FAIL test1 44.724s
ok test2 59.378s
could not remove existing gosoline_test_dynamoDb container
Error while removing container with name gosoline_test_dynamoDb: API error (409): removal of container 6fc236688d90bc79d58a632dc6b1bcd7d2e7dfbd09ddc25f1e67181073751c2d is already in progress
FAIL test3 1.067s
instead of residing in output_configurable_multiple.go
the implementation should be in output_multiple.go
to be in line with the other outputs.
The signature of func NewConfigurableMultiOutput(config cfg.Config, logger mon.Logger, base string) Output
should be func NewConfigurableMultiOutput(config cfg.Config, logger mon.Logger, outputs []Output) Output
and func NewConfigurableMultiOutputWithInterfaces(logger mon.Logger, outputs []Output)
to be able to create this output type without configuration file.
To configure this output in output_configurable.go
a structure of
type multipleOutputConfiguration struct {
Outputs []string `cfg:"outputs"`
}
should be used in conjunction with a 'builder' function newMultipleOutputFromConfig
which reads the config and creates the outputs to provide them to the NewConfigurableMultiOutput
function.
in the end the multiple output should be configured like this:
stream:
output:
pipeline:
type: multiple
outputs: [outputA, outputB]
outputA:
type: kinesis
stream_name: kinesis-stream-a
outputB:
type: kinesis
stream_name: kinesis-stream-b
sns
based on the MVP implemented in #166 .since there are not yet integration tests for all test components, the missing ones should be added
See example: https://github.com/applike/gosoline/runs/453035977
Investigate why this is happening and fix it.
Add test-component for sqs
At the moment Redis will attempt to reconnect to the old address when it receives a "no such host" error but we should find a way to fetch a new address via service address because it might have changed when the redis actually died. From what I can see it will require initializing a new redis client.
Integrate this project into a CI environment and start running unit tests for every pull-request and for every commit in master.
We should add another cache layer into the kvstore which would be in-memory to reduce network usage whenever possible.
This is useful the most for applications that hold a small data set in their kvstore that is frequently used.
Introduce a new package for test-components.
A test component starts a docker container providing a service (mysql, sqs, sns, etc...)
It should be used to prepare preconditions for integration tests.
Hint: there is already a first iteration of this components in a branch from @ss89
See also some thoughts about possible features here: https://docs.google.com/document/d/13Vuil0Jxc-x_-uFrEwwbLb3NZXAuSDltBTHKPYuqDOU/edit?ts=5e4a40e3
Currently a bad request (e.g., failing a validation in a json body) causes a level 4 error message to get logged and an error metric to get written. As these are not really server issues, I would propose to change the level to warning and add a separate metric for bad requests. One could then define an alarm for these bad requests and filter out a small amount of bad requests as one expects them while keeping it tight for example for internal apis.
@j4k4 Maybe you have another opinion how this should be handled better.
redis
.while using the gosoline http client, the following is logged (without using the gosoline logger):
[2019-10-30T14:13:44,939][WARN ][logstash.filters.json ] Error parsing json {:source=>"message", :raw=>"RESTY 2019/10/30 14:13:44 ERROR Get https://host/path?query=1: net/http: request canceled (Client.Timeout exceeded while awaiting headers), Attempt 1", :exception=>#<LogStash::Json::ParserError: Unrecognized token 'RESTY': was expecting ('true', 'false' or 'null')
Solution: silence resty
We already have #196 for mysql.
Now we want also purge redis.
When using long polling for sqs queues with large wait times, this will block any issued kernel shutdown. As AWS ECS sends a SIGKILL after 30 seconds, I want my application to shutdown properly within a few seconds if possible.
Please use the available context when requesting messages from an input (i.e. SQS).
Also, the SNS subscriber has a hardcoded wait time. Please make this adaptable.
gosoline/pkg/sub/factory.go:104
To update batches of entities over http we need something in the api package to read structured data in a stream and performs the updates.
Improve the health check for localstack based test components in localstack.go
Hints:
Currently, it's not possible to switch e.g. to PostgreSQL, as there are some built-in dependencies towards mysql databases.
Noticed issues: connection string, metrics using the sql driver.
Please abstract the db wrapper further from the driver, so it can be easily replaced.
In some cases, I know that retrying a message will not help, because e.g. it's content is malformed. Then, I'd like to write them directly to the dead letter queue to prevent any additional load.
As a developer I am seeing one message object which is send to SQS. If I then enable content based deduplication I expect to only ever send one such object to my queue (in a certain time frame). What is happening is however a different story. Gosoline collects some attributes and stores an object in the queue with attribute and body fields. Thus, the traceId from the context can end up in my message. However, that ID is random, so I will always get a different message from SQSs perspective and content based deduplication no longer works.
To fix this, we should send a message deduplication ID to SQS together with the message instead of letting SQS handle this itself. We could also move the attributes to SQS attributes, but that does sound a lot more complicated.
As with other aws services, there should be a configuration flag to automatically create it
in the FixtureSet struct, we don't need the WriterMetadata field, because we want to set the Writer via a factory method that already has the required metadata information.
Introduce a new module for fixture loading.
See also some thoughts about possible features here: https://docs.google.com/document/d/13Vuil0Jxc-x_-uFrEwwbLb3NZXAuSDltBTHKPYuqDOU/edit?ts=5e4a40e3
Whenever fixture loader runs, we want to purge specific (configured) database tables.
This is a convenience feature for development only.
Note: We should be careful not to purge databases in production systems.
Background:
If available, add the http request referer to add more context when investigating errors.
I have one foreground module which did some http request, wrote a metric and exited. It seems like the metric writer did not in that time start and finished writing the metric (maybe it was still waiting to batch the metric request). The kernel then started to shut down which sometimes caused the metric module not to write the metric at all - even though it was logged. It could be that the context was already canceled and thus the metric was not written.
TODO: Investigate whether this is indeed the case. If yes, we might need to use something like a delayed cancel context for the metric writer to avoid losing metrics written only during kernel shutdown.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.