Code Monkey home page Code Monkey logo

hyades's People

Contributors

cortesnoel-lm avatar dependabot[bot] avatar dependencytrack-bot avatar japurva1502 avatar jeremylong avatar leec94 avatar mehab avatar nscuro avatar renovate[bot] avatar sahibamittal avatar vinodanandan avatar vithikas avatar worming004 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

hyades's Issues

Ingest vulnerability data reported by scanners into `dtrack.vulnerability` topic

Vulnerabilities reported by OSS Index, Snyk and other scanners that involve communication with external services must be ingested into the dtrack.vulnerability topic.

This will involve mapping the service provider's data schema to CycloneDX BOVs, like it's done for OSV already.

This logic should be located in the mirror service. It can consume from topic(s) that are populated by the vulnerability analyzer service.

Employ resiliency patterns for external service calls

In the context of #124, we should ensure that all HTTP clients are using resiliency patterns to ensure throughput remains acceptable.

In general, we want to ensure that stream threads are not blocked for longer than necessary.

  • Ensure all clients use sane timeouts (a request taking longer than a few seconds is perhaps not worth waiting for)
  • Consider using circuit breakers to not make the situation worse under high load: https://resilience4j.readme.io/docs/circuitbreaker
  • Ensure requests are retried (#124)

Achieve horizontal scalability of API server

Current Behavior

The API server is currently not horizontally scalable, for various reasons:

  • Search indexes being read from and written to on local disk
  • TaskExecutors for event subscribers consuming from an in-memory queue
    • Some of this has been offloaded to Kafka
  • DataNucleus' L2 cache being in-memory
    • There are options available to use distributed caches, but perhaps disabling caches altogether is also an option
  • Keys for secret encryption, and JWT signing and verification are stored on disk
    • Not really an issue; Can be replaced with k8s secrets and mounted at runtime
      • We already do this for the secret key

Proposed Behavior

Ensure that the API server is horizontally scalable. It should be possible to run multiple active instances of it in parallel.

Checklist

Migrate remaining events to use Protobuf schemas

We currently maintain duplicate code for message POJOs in the API server and PoC applications, adding unnecessary overhead, and constantly putting us in danger of various inconsistencies between model classes.

Look into defining message models in a common schema, and automatically generate POJOs from those.

Possible options:

For now, I think I'd prefer to not introduce the schema registry. Unless Redpanda is used (which bundles a schema registry), utilizing a schema registry will mean there's an additional service that needs to be taken care of.

Progress

  • Mirror
  • Metrics (#386)
  • Notifications (#415)
  • Repo meta analysis (#411)
  • Vulnerability analysis (#338)

Add architecture diagrams

In order for us (and others) to better understand what we're building here, we should have architecture diagrams.

Preferably there should be multiple "resolutions" from a high-level overview to individual services. The topology diagrams we can generate using Kafka Streams will be helpful for the latter.

Implement distribued tracing support with OpenTelemetry

Look into adding support for distributed tracing using OpenTelemetry and Jaeger.

Tracing would help us to better understand where the system spends most of its time, as well as observe how individual units of work / messages are flowing through the system.

Notes:

  • Tracing should be optional
  • The implementation should use the OpenTelemetry protocol and not provider-specific protocols like Jaeger
  • Context propagation over Kafka can be solved using message headers

https://quarkus.io/guides/opentelemetry

Investigate usage of Confluent's parallel consumer

Confluent has published a wrapper around Kafka's Consumer/Producer API that enables parallel processing of messages, and supports batching and retries: https://github.com/confluentinc/parallel-consumer

parallel-consumer can be used alongside Kafka Streams in the same application. It can be hooked into vert.x (which ships with Quarkus) to perform I/O operations.

Based on what I've seen so far, parallel-consumer may be a good fit for vulnerability analysis and repo meta analysis.

When it comes to the supported ordering guarantees, I think key-based ordering should suffice. These ordering guarantees are something that Quarkus' reactive messaging implementation is currently missing, see #215 (comment)

Fix Project Count for metrics-service

Deletion of project
Similarly, when a project is deleted, the metrics on its components is performed but the project count itself is not updated.

Referencing issue: #284

Consider (re-) writing services in Go

Java is a heavy dependency to have, and even native images compiled with GraalVM turn out to be rather big (~100mb).
Additionally, native images do not get the benefit of runtime optimizations provided by the JVM, which could be a downside for long-running processes. It also turns out that cross-compiling (e.g. compiling for arm64 on amd64) takes ages.

The main reason to use Java as of now is the maturity of available Kafka libraries (namely Kafka Streams), which provide solutions to common streams processing tasks. Most notably, Kafka Streams comes with good support for stateful streams processing, e.g. joins and aggregations. Stateful operations are required for us in certain areas, for example to implement scatter-gather patterns (#219).

Services that do not need to perform stateful processing could be (re-) written in Go instead. Go is easily cross-compilable, produces small binaries, generally has a smaller resource footprint, and widely adopted by open source projects.

Out of all the Go libraries out there, franz-go seems to be the most promising. It's the Go library used by Redpanda for its CLI and Console applications: https://github.com/twmb/franz-go

An example of how to use consumer groups with per-partition Goroutines is here: https://github.com/twmb/franz-go/tree/master/examples/goroutine_per_partition_consuming/manual_commit

Determine when all analyzers finished their analysis for a component

At the moment, all analyzers do their work in parallel, and potentially at a different pace.

For CI scans however, we need a way to check when all analyzers have finished their work on a given component.

This functionality was trivial in the original API server implementation, as analyzers would run sequentially.

Implement caching for repository meta analysis

The repository meta analyzer does currently not cache responses from external services.

Not caching results means many unnecessary lookups, reduced throughput, and an increased chance of necessary retries (see #124).

The vulnerability analyzer does have basic caching in place.
We could move the CacheConfig class to the commons module.

All repository analyzers will then need to have a dedicated Cache instance, similar to how it's done for OSS Index and Snyk clients: https://github.com/mehab/DTKafkaPOC/blob/main/vulnerability-analyzer/src/main/java/org/acme/analyzer/OssIndexAnalyzerConfig.java#L21-L35

Proposal: Moving Mirroring Tasks On to Quarkus Application

Current Implementation:
Currently the DT performs Mirroring of GitHub, OSV, NVD and EPSS on the scheduled time and also when user updates the config on the UI to enable mirroring for particular advisory.
In Current implementation, we sequentially download, parse and add vulnerabilities from the URL
Few bottlenecks with current approach:

  1. Mirroring happens in DT and take lot of CPU and can interfere with other functionality of DT
  2. All the ecosystems ( in case of OSV) and api hit for each year (NVD) happens sequentially, taking lot of time

Proposed Architecture:
Move the mirroring tasks out from DT to Quarkus Application and run the ecosystems in parallel.
Few positives outcomes from this approach:
1.The DT server can be offloaded from the mirror task and this could enhance the performance of DT
2.Parallel download of ecosystems can speed up the mirroring, however since the database remains with the DT, parallel processing of mirroring tasks could cause too many update requests to Database.
3. Can achieve parallelism using kafka streams consumer, which could be dynamically scaled

A rough sketch around the proposal : https://excalidraw.com/#room=2eabf2bfa48e3cdc9dd3,jrGbPL7Vgn2RwAXTnxeceg

Enhance the efficiency of streams by implementing the timestamp extractor interface and defining the grace period for record

The windowing operations are driven by record timestamps, not by wall-clock time. In Kafka Streams, the earliest timestamp across all partitions is chosen first for processing, and Kafka Streams uses the TimeStampExtractor interface to get the timestamp from the current record.
By implementing the TimeStamp extractor interface and defining the grace period for all the records, we would be able to better manage the parallel operations being done by all the analyzers. By defining a grace period ( which is by default 24 hours in Kafka) we would be able to set when the streams stops waiting for out of order records and return the result of computation.
My understanding is also that we do not need a 24 hour grace period for the results from analyzers.
Reference documentation here.

Producing Kafka messages in mirror-service fails when running as native image

The mirror service is using compression to account for large vulnerability detail messages. It is configured to use the zstd compression algorithm for now.

Running the native image version of the service yields the following exception when messages are being produced:

org.apache.kafka.common.KafkaException: java.lang.UnsatisfiedLinkError: com.github.luben.zstd.ZstdOutputStreamNoFinalizer.createCStream()J [symbol: Java_com_github_luben_zstd_ZstdOutputStreamNoFinalizer_createCStream or Java_com_github_luben_zstd_ZstdOutputStreamNoFinalizer_createCStream__]
        at org.apache.kafka.common.compress.ZstdFactory.wrapForOutput(ZstdFactory.java:45)
        at org.apache.kafka.common.record.CompressionType$5.wrapForOutput(CompressionType.java:122)
        at org.apache.kafka.common.record.MemoryRecordsBuilder.<init>(MemoryRecordsBuilder.java:140)
        at org.apache.kafka.common.record.MemoryRecordsBuilder.<init>(MemoryRecordsBuilder.java:160)
        at org.apache.kafka.common.record.MemoryRecordsBuilder.<init>(MemoryRecordsBuilder.java:198)
        at org.apache.kafka.common.record.MemoryRecords.builder(MemoryRecords.java:593)
        at org.apache.kafka.common.record.MemoryRecords.builder(MemoryRecords.java:575)
        at org.apache.kafka.common.record.MemoryRecords.builder(MemoryRecords.java:523)
        at org.apache.kafka.clients.producer.internals.RecordAccumulator.recordsBuilder(RecordAccumulator.java:414)
        at org.apache.kafka.clients.producer.internals.RecordAccumulator.appendNewBatch(RecordAccumulator.java:398)
        at org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:350)
        at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1027)
        at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:949)
        at org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:257)

zstd requires native libraries to be compiled into the native image. This is not happening per default.

Proposal: Look at rate limiting the scan requests at the api server side for future

Current Behavior

Currently we are looking at a way to scale the api server as well. But there might be a situation where even with the scaled version of api server we might hit resource limits with a very large volume of incoming scan requests considering that we want to have a central instance of dependncy track across an enterprise level.

Proposed Behavior

To deal with this situation, it might help to add some kind of throttling at the api server level to limit the number of requests it can allow at a time (considering its max capacity and fully scaled version) and the rest of the scan requests can wait in the kafka / redpanda queue to get processed.
This way we would not end up in over utilization of resources leading to memory/ cpu overrun.
This proposal's implementation would depend on detailed analysis of what the api server instance can handle without throttling and still needs to be gauged and thus proposed as a future enhancement.

Checklist

Calculate portfolio metrics in Kafka Streams

Portfolio metrics are currently calculated in the API server, using three separate tasks:

  • ComponentMetricsUpdateTask: Does the heavy lifting of gathering information about a given component from the database
    • Number of CRITICAL, HIGH, MEDIUM, LOW, UNASSIGNED severity vulnerabilities
    • Total number of vulnerabilities
    • Total number of findings, but also:
      • Number of audited findings
      • Number of unaudited findings
      • Number of suppressed findings
    • Number of FAIL, WARN, INFO policy violations
    • Total, audited, unaudited numbers of policy violations per type:
      • LICENSE, OPERATIONAL, SECURITY
  • ProjectMetricsUpdateTask: Kicks off a ComponentMetricsUpdateTask for each component in a given project, and aggregates the resulting metrics.
  • PortfolioMetricsUpdateTask: Kicks off multiple ProjectMetricsUpdateTasks in parallel, one for each project in the portfolio. Aggregates resulting metrics.

Metrics calculation has historically been one of the most expensive operations in DT, especially for large portfolios.
But not only is it resource-intensive, its batch-processing nature also causes lags between the time something changes (new vulnerability discovered), to it being reflected in the metrics.

While component and project metrics may be updated frequently, changes in these metrics will not be reflected in overall portfolio metrics until PortfolioMetricsUpdateTask was executed.

Per default, PortfolioMetricsUpdateTask is scheduled to run every hour. It can be triggered manually by users with PORTFOLIO_MANAGEMENT permission.


Both ProjectMetricsUpdateTask and PortfolioMetricsUpdateTask don't really do anything by themselves, besides aggregating.

Ideally, anytime the metrics for a component change, the corresponding project metrics should be updated in almost real time. The same should be true for portfolio metrics whenever project metrics change.

Using Kafka Streams, it should be possible to model component metrics as a stream of events. These events could be aggregated based on the UUID of the project the components belong to. Project metrics aggregates themselves can emit a stream of events, which, again, could be aggregated into portfolio metrics.

Theoretically, that would leave is with 3 topics:

  • dtrack.metrics.component
  • dtrack.metrics.project
  • dtrack.metrics.portfolio

Consumers of these topics could either:

  • Materialize the data in their own database
  • Stream them directly to users via Websockets, allowing for almost real time dashboards

Note that, in order to display these metrics as time series graph, consumers will still need to track individual data points themselves. The streams mentioned above will only ever contain the most recent data point per component or project.


A few complications I see here:

  • Metrics numbers can go up and down. How do we handle subtractions from an aggregate?
  • Components and projects may get deleted. How do we handle removals from an aggregate?

Implement retry mechanism

As a majority of what the applications in this repo are doing consists of reaching out to external services, they are of course very vulnerable to instabilities in those external services affecting overall system performance and reliability.

There are various reasons for external requests going bad:

  • Connection timeouts
  • Rate limiting (HTTP 429)
  • External system temporarily unavailable (HTTP 503)
  • Many more

At the moment, whenever the streams applications encounter an error, the stream thread is shut down, and will not be available for further task processing (that's the default behavior in Kafka Streams). Not good for obvious reasons.

Simply ignoring such errors is also not a good option.

According to some initial research, the idiomatic way of implementing retries in Kafka Streams is to have dedicated retry topics, and eventually a "dead letter topic" for when the maximum retries have exceeded. See https://www.confluent.io/blog/error-handling-patterns-in-kafka/

Having retries implemented in stream processors is most likely sub-optimal, as it will reduce message throughput of the consumer.

Rename "repository-meta-analyzer" to "component-analyzer" or "component-meta-analyzer"

As of now, the repository-meta-analyzer only performs lookups in remote package repositories. So the name is fitting.

However, there is more meta information about components that would be valuable to have:

Instead of introducing dedicated services for those lookups, have the repository meta analyzer do it.
To better reflect this broader responsibility, rename the service to something more generic.

Deserializing of CVE definitions from NVD API fails when using mirror-service native image

Exception while performing NVD mirroring io.github.jeremylong.nvdlib.NvdApiException: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `io.github.jeremylong.nvdlib.nvd.CveApiJson20`: cannot deserialize from Object value (no delegate- or property-based Creator): this appears to be a native image, in which case you may need to configure reflection for the class that is to be deserialized
 at [Source: (StringReader); line: 1, column: 2]

Similar to #300, the model classes of nvd-lib will need to be registered for reflection.

Make services "k8s-ready"

For our load testing (#136), we'll need to run the services in a k8s cluster to account for dynamic scaling etc.

In order for that to work, the services need to be deployable using Helm. Quarkus has a Helm extension that may be useful: https://quarkus.io/blog/quarkus-helm/

For verification, all services must be deployable in a local Minikube or KIND cluster.

Slow processors can slow down overall throughput when `num.stream.threads` < Number of Stream Tasks

As per Kafka Streams' threading model, a topology is broken down into stream tasks. The number of tasks created depends on how many sub-topologies there are, and from how many partitions those sub-topologies consume. One or more tasks are assigned to a single stream thread. The number of stream threads in an application instance is defined by kafka-streams.num.stream.threads.

If num.stream.threads is lower than the number of stream tasks generated for the topology, there will be some threads working on multiple tasks. In Kafka Streams, there is no way to influence how tasks are assigned. This could lead to situations where one thread is assigned to tasks from both:

  • A fast "upstream" task, responsible for ingesting input records (e.g. by consuming from dtrack.vuln-analysis.component)
  • A slow "downstream" task, responsible for processing records (e.g. the Snyk analyzer, capable of analyzing 3-4 records per second)

This means that the upstream task (capable of processing multiple hundreds of records per second) will be significantly delayed, which also has a negative impact on other tasks depending on its results.

In the screenshot below, consumers of the dtrack.vuln-analysis.component and dtrack.vuln-analysis.component.purl topics have a significant lag, despite the respective sub-topologies being capable of processing hundreds of records per second. This should not be the case. Instead, consumers of those topics should be able to keep up with new records in almost realtime.

image

For the vulnerability analyzer, a slow processor like Snyk can additionally lead to sub-optimal batching in other processors. Because fewer records arrive in a given time window, OSS Index will not be able to use the maximum batch size of 128 efficiently:

2023-01-09 10:18:54,317 INFO  [org.acm.pro.oss.OssIndexProcessor] (dtrack-vulnerability-analyzer-d1858502-0153-4268-b4bf-5bf788e1e051-StreamThread-2) Analyzing batch of 46 records
2023-01-09 10:18:54,797 INFO  [org.acm.pro.oss.OssIndexProcessor] (dtrack-vulnerability-analyzer-d1858502-0153-4268-b4bf-5bf788e1e051-StreamThread-1) Analyzing batch of 48 records
2023-01-09 10:19:04,425 INFO  [org.acm.pro.oss.OssIndexProcessor] (dtrack-vulnerability-analyzer-d1858502-0153-4268-b4bf-5bf788e1e051-StreamThread-3) Analyzing batch of 23 records
2023-01-09 10:19:04,934 INFO  [org.acm.pro.oss.OssIndexProcessor] (dtrack-vulnerability-analyzer-d1858502-0153-4268-b4bf-5bf788e1e051-StreamThread-1) Analyzing batch of 24 records
2023-01-09 10:19:05,954 INFO  [org.acm.pro.oss.OssIndexProcessor] (dtrack-vulnerability-analyzer-d1858502-0153-4268-b4bf-5bf788e1e051-StreamThread-2) Analyzing batch of 23 records
2023-01-09 10:19:11,077 INFO  [org.acm.pro.oss.OssIndexProcessor] (dtrack-vulnerability-analyzer-d1858502-0153-4268-b4bf-5bf788e1e051-StreamThread-1) Analyzing batch of 12 records
2023-01-09 10:19:11,079 INFO  [org.acm.pro.oss.OssIndexProcessor] (dtrack-vulnerability-analyzer-d1858502-0153-4268-b4bf-5bf788e1e051-StreamThread-2) Analyzing batch of 11 records
2023-01-09 10:19:11,896 INFO  [org.acm.pro.oss.OssIndexProcessor] (dtrack-vulnerability-analyzer-d1858502-0153-4268-b4bf-5bf788e1e051-StreamThread-3) Analyzing batch of 19 records
2023-01-09 10:19:16,812 INFO  [org.acm.pro.oss.OssIndexProcessor] (dtrack-vulnerability-analyzer-d1858502-0153-4268-b4bf-5bf788e1e051-StreamThread-2) Analyzing batch of 12 records
2023-01-09 10:19:17,068 INFO  [org.acm.pro.oss.OssIndexProcessor] (dtrack-vulnerability-analyzer-d1858502-0153-4268-b4bf-5bf788e1e051-StreamThread-1) Analyzing batch of 13 records
2023-01-09 10:19:21,825 INFO  [org.acm.pro.oss.OssIndexProcessor] (dtrack-vulnerability-analyzer-d1858502-0153-4268-b4bf-5bf788e1e051-StreamThread-2) Analyzing batch of 12 records
2023-01-09 10:19:21,826 INFO  [org.acm.pro.oss.OssIndexProcessor] (dtrack-vulnerability-analyzer-d1858502-0153-4268-b4bf-5bf788e1e051-StreamThread-3) Analyzing batch of 26 records
2023-01-09 10:19:22,072 INFO  [org.acm.pro.oss.OssIndexProcessor] (dtrack-vulnerability-analyzer-d1858502-0153-4268-b4bf-5bf788e1e051-StreamThread-1) Analyzing batch of 8 records
2023-01-09 10:19:26,887 INFO  [org.acm.pro.oss.OssIndexProcessor] (dtrack-vulnerability-analyzer-d1858502-0153-4268-b4bf-5bf788e1e051-StreamThread-3) Analyzing batch of 8 records
2023-01-09 10:19:27,235 INFO  [org.acm.pro.oss.OssIndexProcessor] (dtrack-vulnerability-analyzer-d1858502-0153-4268-b4bf-5bf788e1e051-StreamThread-2) Analyzing batch of 8 records
2023-01-09 10:19:31,943 INFO  [org.acm.pro.oss.OssIndexProcessor] (dtrack-vulnerability-analyzer-d1858502-0153-4268-b4bf-5bf788e1e051-StreamThread-1) Analyzing batch of 19 records
2023-01-09 10:19:31,996 INFO  [org.acm.pro.oss.OssIndexProcessor] (dtrack-vulnerability-analyzer-d1858502-0153-4268-b4bf-5bf788e1e051-StreamThread-3) Analyzing batch of 12 records
2023-01-09 10:19:36,983 INFO  [org.acm.pro.oss.OssIndexProcessor] (dtrack-vulnerability-analyzer-d1858502-0153-4268-b4bf-5bf788e1e051-StreamThread-1) Analyzing batch of 9 records

This is not an issue when there is one stream thread per stream task. For example, when both OSS Index and Snyk are enabled, but the internal analyzer is disabled, there are 18 partitions the application consumes from, leading to 18 stream tasks. Spawning one application instance with num.stream.threads=18, or three instances with num.stream.threads=6, leads to optimal processing conditions.

Effectively, scaling up is unproblematic, but scaling down always comes with the danger of significantly slowing down the entire system.

Possible solutions:

  • Move each analyzer into their own microservice (more operational overhead)
  • Start multiple KafkaStreams instances instead of just one (Quarkus only supports one KafkaStreams per application, we'd need more code to make this work, and we have to give up the dev mode integration)

Increase test coverage

Our code should be thoroughly tested, to give us a high confidence that what we've built actually does behave like we expect.

Integration tests should be prioritized, so that we don't have to refactor too many tests when we shift things around, as we're still in the early stages of the project and things may change frequently.

Long-running mirroring causes rebalance issues

When mirroring the NVD without an API key, the whole procedure takes way longer than when an API key is provided.

The Kafka Streams thread is effectively blocked while mirroring. During processing, it is not sending heartbeats to the Kafka broker, causing it to be kicked out of the consumer group (broker assumes it's dead):

2023-02-01 18:09:40,677 WARN  [org.apa.kaf.str.pro.int.StreamThread] (hyades-mirror-service-fe261319-eda1-44ee-a748-c5f33f51e2f8-StreamThread-1) stream-thread [hyades-mirror-service-fe261319-eda1-44ee-a748-c5f33f51e2f8-StreamThread-1] Detected that the thread is being fenced. This implies that this thread missed a rebalance and dropped out of the consumer group. Will close out all assigned tasks and rejoin the consumer group.: org.apache.kafka.streams.errors.TaskMigratedException: Consumer committing offsets failed, indicating the corresponding thread is no longer part of the group; it means all tasks belonging to this thread should be migrated.
        at org.apache.kafka.streams.processor.internals.TaskExecutor.commitOffsetsOrTransaction(TaskExecutor.java:225)
        at org.apache.kafka.streams.processor.internals.TaskExecutor.commitTasksAndMaybeUpdateCommittableOffsets(TaskExecutor.java:157)
        at org.apache.kafka.streams.processor.internals.TaskManager.commitTasksAndMaybeUpdateCommittableOffsets(TaskManager.java:1118)
        at org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:1086)
        at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1066)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:801)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:588)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:550)
        at org.graalvm.nativeimage.builder/com.oracle.svm.core.thread.PlatformThreads.threadStartRoutine(PlatformThreads.java:775)
        at org.graalvm.nativeimage.builder/com.oracle.svm.core.posix.thread.PosixPlatformThreads.pthreadStartRoutine(PosixPlatformThreads.java:203)
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1311)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:1168)
        at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1506)
        at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1453)
        at org.apache.kafka.streams.processor.internals.TaskExecutor.commitOffsetsOrTransaction(TaskExecutor.java:222)
        ... 9 more

Separate responsibilities in different applications

As there should really only be a single streams topology per application, we currently have shoved different responsibilities into the same topology.

Not only does this bloat the topology diagram, it also messes with scalability. For example, in the current main branch, we have both vulnerability analysis and notification sending. With the default kafka-streams.num.stream.threads=3, every stream thread gets assigned 20 partitions.

Screenshot from 2022-11-17 11-46-30

Partitions are evenly divided, because all topics we have as of now have exactly 3 partitions.

If we increase the stream threads to something like 12 however, the whole picture looks more messy:

Screenshot from 2022-11-17 11-55-00

3 tasks are assigned 17 partitions, while others only get one.

Those 3 tasks are involved in both notification sending and vulnerability analysis, which means a high load of notifications can block vulnerability analysis, and a high load on the analysis part can block notifications. Not a good situation to be in.

Let's separate the vulnerability analysis, notification sending, and repo meta analysis into their own applications. We can do this by splitting the Maven project into multiple modules. Each application should have its own topology, and its own consumer group.

Global Kafka and Kafka streams configuration review

The purpose of this issue is to list checkpoints related to Kafka usage that should be validated before releasing the POC (both on the external application and DT side).

Requirements

  • Is the processing semantic clearly defined (ALOS, EOS) and does the implementation match the chosen semantic ?

Topics

  • Is the topic partitioning strategy correctly thought of (# of partitions and partition keys) ?
  • Is the default replication factor set correctly for each Topic ?
  • Is the default minimum ISR set correctly for each Topic ?
  • Are the topic retention parameters correctly set (Time VS Capacity based retention) ?
  • Is Topic compaction used for the appropriate use case ?

Producers

  • Are the producer using the proper acks configuration ?
  • Are the producer using the proper batching configuration (linger.ms / batch.size / buffer.memory) ?
  • Are the producers providing callbacks to handle retryable errors ?
  • Are automatic producer retries configured (retries / retry.backoff.ms / delivery.timeout.ms) ?
  • Is idempotence enabled to avoid duplicates (enable.idempotence) ?
  • Is the number of in-flight requests per connection correctly setup (max.in.flight.requests.per.connection) ?
  • Does max.request.size match the upper cap on broker side ?
  • Is the compression algorithm properly thought off ?
  • Is client.id properly set ?
  • If event publication must be synchronized with DB update, is this correctly designed (i.e. Outbox pattern) ?
  • Are event production errors correctly handled ?

Consumers

  • Is the consumer auto.commit option correctly setup (enable.auto.commit) ?
  • Are the consumer correctly commiting the offsets in the consumer loop (synchronous VS asynchronous) ?
  • Are Rebalance operations correctly handled ? Is a rebalance listener defined ?
  • Are poison pills correctly handled ?
  • Are the consumer polling parameters correctly setup (session.timeout.ms, max.poll.records and max.poll.interval.ms) ?
  • Is the consumer heartbeat period correctly setup (heartbeat.interval.ms) ?
  • Is the Consumer Group protocol correctly used ? Is the Consumer Group identifier externalized ?
  • Is the consumer logic idempotent ?
  • Is the partition assignment strategy properly thought of (partition.assignment.strategy) ?
  • Are the events fetching parameters matching the required throughput (fetch.min.bytes / max.partition.fetch.bytes / fetch.max.wait.ms) ?
  • Is the offset retention matching the consumers SLAs or topic retention (offsets.retention.minutes) ?
  • Is the auto reset feature correctly used (auto.offset.reset) ?
  • Is client.id properly set ?
  • In case of multi DC deployment, is the rack awareness enabled (client.rack) ?
  • Are Consumer isolation level properly used (for transactional use cases) ?

Kafka Streams

  • Is the application.id option correctly setup (application.id) ?
  • Is the threading model properly thought of (# of threads vs # of instances : num.streams.threads) ?
  • If the Streams application is Stateful, is the State Directory overriden (state.dir) and made persistent ?
  • If the Streams application is Stateful, are hot standby configured for failover (num.standby.replicas) ?
  • If the Streams application is Stateful, is the replication factor for interna Topic overidden (replication.factor) ?
  • Is the Streams application inner producer configured for resiliency (acks - min.insync.replicas) ?
  • Are each processor in the topology explicity named to ensure readibility, prevent data loss and make the topology resilient to updates ?
  • Are Serdes explicitly set for each processor in the topology ?
  • Is the Streams Application Production Exception Handler configured ?
  • Is the Streams Application Deserialization Exception Handler configured ?
  • Is the Kafka Streams Topology showing some repartition operations ? If so, are repartition Topic renamed (Indeed repartition Topics should be renamed using the through operator) ?
  • Is the stateful cache configuration correct (cache.max.bytes.buffering) ?
  • Is the processing guarantee in line with the business requirements (processing.guarantee) ?
  • If exactly_once processing guarantee is used, are Kafka Transactional parameters correctly setup (transaction.state.log.replication.factor - transaction.state.log.min.isr) ?
  • If exactly_once processing guarantee is used, Does downstream consumers have the correct isolaton level (isolaton.level) ?
  • Are the DSL/Processor operations correctly used to implement the Streams application ?
  • Is the metric recording level correctly setup (metrics.recording.level) ?
  • Do you need to define your own time in your kafka stream application (default.timestamp.extractor) ?
  • If KStream/KTable join are used, does it account for KTable data coming after the KStream (max.task.idle.ms) ?
  • Are Serdes explicitly set for each operation ?

Proposal: Use PURL, CPE, SWID Tag ID as key for Kafka Messages

Current Implementation

For every component within a BOM uploaded to the API server, the API server will publish an event to the EventNew Kafka topic.

Those events currently have the form:

Key Value
Project UUID Component Details
Example
ebb10845-8f95-4194-85c0-0ff6c5ab3cdf
{
  "uuid": "445dc140-5638-4eb7-9409-53204d7f3cae",
  "group": "xerces",
  "name": "xercesImpl",
  "version": "2.12.2",
  "purl": "pkg:maven/xerces/[email protected]?type=jar",
  "cpe": null
}

The Kafka producer used by the API server utilizes the default partitioner, meaning that events with the same key will always end up in the same topic partition.

Kafka streams applications (read: consumer groups) in the analyzer application consume from the EventNew topic. At the time of writing, those applications are:

Application Name Class
OSSConsumer org.acme.consumer.OSSIndexBatcher
SnykAnalyzer org.acme.consumer.SnykAnalyzer

Quoting the streams architecture documentation:

Kafka Streams creates a fixed number of stream tasks based on the input stream partitions for the application, with each task being assigned a list of partitions from the input streams (i.e., Kafka topics). The assignment of stream partitions to stream tasks never changes, hence the stream task is a fixed unit of parallelism of the application.

Applying this to our current implementation, this means that events for the same project UUID will always end up being processed by the same streams task (maps to a JVM thread) within a streams application.

Example
  • The EventNew topic is created with 3 partitions
  • A BOM with 200 components is uploaded to the DT project with UUID ebb10845-8f95-4194-85c0-0ff6c5ab3cdf
  • The API server sends 200 messages with key ebb10845-8f95-4194-85c0-0ff6c5ab3cdf to the EventNew topic
  • The default partitioner assigns all 200 events to partition 1
  • The streams applications OSSConsumer and SnykAnalyzer are started with 3 threads each
  • Thread 1 of each streams application is assigned to partition 1 of the EventNew topic
  • Both thread 1s process the 200 events, while threads 0 and 2 of both Streams applications remain idle

Analyzers perform lookups with external services (OSS Index, Snyk, VulnDB APIs), unless they experience a cache hit for the component at hand. They will emit messages of the following form to the vuln-result topic:

Key Value
Component UUID Vulnerability Details (may be null when no vuln has been found)
Example
445dc140-5638-4eb7-9409-53204d7f3cae
{
  "vulnerability": {
      "vulnId": "CVE-2017-10355",
      "source": "NVD",
      "description": "sonatype-2017-0348 - xerces:xercesImpl - Denial of Service (DoS)\n\nThe software contains multiple threads or executable segments that are waiting for each other to release a necessary lock, resulting in deadlock.",
      "references": "* [https://ossindex.sonatype.org/vulnerability/sonatype-2017-0348?component-type=maven&component-name=xerces%2FxercesImpl&utm_source=unknown&utm_medium=integration&utm_content=Alpine](https://ossindex.sonatype.org/vulnerability/sonatype-2017-0348?component-type=maven&component-name=xerces%2FxercesImpl&utm_source=unknown&utm_medium=integration&utm_content=Alpine)\n* [https://blogs.securiteam.com/index.php/archives/3271](https://blogs.securiteam.com/index.php/archives/3271)",
      "cwes": [
          {
              "cweId": 833,
              "name": "Deadlock"
          }
      ],
      "severity": "MEDIUM",
      "affectedProjectCount": 0
  },
  "identity": "OSSINDEX_ANALYZER"
}

Using the component UUID from the message key, the API server can easily correlate the message with a specific component in the portfolio.

Benefits

  • ✅ Each event in EventNew represents a component in DT and thus a nicely encapsulated unit of work for the analyzer
  • ✅ Easy correlation of vuln-result events to components in the portfolio

Drawbacks

  1. ⛔ Projects with many components can clog a topic partition, keeping one streams task super busy while others run idle
    • Benefits of parallelizing the analysis work is done at the project-, rather than at the component-level
  2. ⛔ DT can consider components to be different, despite them having idential PURLs or CPEs. OSS Index, Snyk etc. don't do that, so triggering a scan for each DT component will result in many redundant calls
  3. ⛔ Because the same PURL or CPE may be analyzed in multiple stream tasks at once, there will be race conditions for cache lookups, again causing redundant calls to external services
  4. ⛔ The cache lookup issues and redundant calls mentioned above contribute to faster exceeding of rate limits emposed by the external services
  5. ⛔ If we want to support ad-hoc scanning of components or BOMs for which no project in DT exists, we can't rely on the project or component UUID to always be available for message keys

Note
Points (2) - (4) exist in vanilla DT, too.

Proposed Solution

Both of the following variants are based on option 2 in Alioune's comment here: DependencyTrack/dependency-track#2023 (comment) (and I think it is also what he was referring to in syalioune#1 (comment)):

  • Using a combination of object pools and sharding based on queues (in-memory or not) : Having a pool of analyzer objects with the proper logic fetching components to process from a dedicated queue.
  • Upstream process have to be updated to always send (or it can be wrapped) identical purls to the same queue (using some kind of hashed based partitioning).

This way, there would be no need for synchronization between the analyzer objects as identitical purls would always be processed by the same analyzer

Variant 1: Only work with PURL / CPE / SWID Tag ID

Instead of using the project UUID as message key, we use the identifiers used for vulnerability analysis:

  • PURL
  • CPE
  • SWID Tag ID (at a later point in time)
  • etc.

Further, the entire analysis process will happen without any relations to component identities in DT. There will be no IDs or UUIDs of components or projects transmitted.

Note
There is a complication regarding PURLs, in that they can contain qualifiers and sub-paths.
For example, pkg:maven/com.acme/[email protected] and pkg:maven/com.acme/[email protected]?type=jar are technically different, but describe the same component, and are treated as equal by all (currently known) analyzers.

We could implement a custom Kafka partitioner that would ensure that pkg:maven/com.acme/[email protected] and pkg:maven/com.acme/[email protected]?type=jar end up in the same partition. The partitioner would treat PURLs as equal, as long as their coordinates (type, namespace, name, version) are the same.

A similar strategy will be necessary for CPEs, too.

Key Value
PURL / CPE / SWID Tag ID Nothing / Additional Details
Example
pkg:maven/com.acme/[email protected]?type=jar
{}

cpe:2.3:a:apache:xerces2_java:*:*:*:*:*:*:*:*
{}

Results emitted by the analyzers would then have the form of:

Key Value
PURL / CPE / SWID Tag ID Vulnerability Details (may be null when no vuln has been found)
Example
pkg:maven/com.acme/[email protected]?type=jar
{
  "vulnerability": {
      "vulnId": "CVE-2017-10355",
      "source": "NVD",
      "description": "sonatype-2017-0348 - xerces:xercesImpl - Denial of Service (DoS)\n\nThe software contains multiple threads or executable segments that are waiting for each other to release a necessary lock, resulting in deadlock.",
      "references": "* [https://ossindex.sonatype.org/vulnerability/sonatype-2017-0348?component-type=maven&component-name=xerces%2FxercesImpl&utm_source=unknown&utm_medium=integration&utm_content=Alpine](https://ossindex.sonatype.org/vulnerability/sonatype-2017-0348?component-type=maven&component-name=xerces%2FxercesImpl&utm_source=unknown&utm_medium=integration&utm_content=Alpine)\n* [https://blogs.securiteam.com/index.php/archives/3271](https://blogs.securiteam.com/index.php/archives/3271)",
      "cwes": [
          {
              "cweId": 833,
              "name": "Deadlock"
          }
      ],
      "severity": "MEDIUM",
      "affectedProjectCount": 0
  },
  "identity": "OSSINDEX_ANALYZER"
}

Benefits

  1. ✅ Kafka streams guarantees us that the same PURL will be processed by the same streams task, solving the problem of race conditions in cache lookups
  2. ✅ Consequently, processing the same PURL multiple times is not an issue, because caching is more effective
  3. ✅ The API server can perform best-effort de-duplication of those identifiers before sending them off to Kafka. That way, a BOM upload to the same project should never result in duplicate PURL / CPE events. This can contribute to less overall load on the system.
  4. ✅ Streams tasks additionally get a chance to perform further de-duplication, so they don't process the same PURL multiple times within a window / batch. Duplicate PURL events can simply be discarded.
  5. ✅ Simplification of the recurring analysis of the entire portfolio: Instead of iterating over all individual components every X hours, iterate over all unique PURLs, CPEs, SWID Tag IDs in the entire portfolio and send them to Kafka
    • This has a potential to drastically reduce the effort and time needed to analyze the entire portfolio
  6. ✅ Vulnerability analysis results will be applied to all affected components in the portfolio in one go, whereas the current approach only applied them to a single component at a time

Drawbacks

  1. ⛔ More responsibility on the API server: Messages from the vuln-result topic will no longer be tied to a specific project UUID or component UUID
    • Instead, the API server will have to apply the results to all components matching the given PURL / CPE
    • This can be an expensive operation, but can be optimized with proper indexes and efficient use of transactions. Should be tested though
  2. ⛔ Batching of EventNew events (as required for OSS Index) will be harder (#50 (comment))
  3. ⛔ May not be efficient for use cases where the system is only exposed to little load, or BOMs are uploaded only sporadically (#50 (comment))

Possible Solution 2: Only change the key for EventNew messages

As a compromise between the current solution and variant 1 as described above: Still set the message key to PURL / CPE, but include the component UUID in the message body.

Example
pkg:maven/com.acme/[email protected]?type=jar
{
  "uuid": "445dc140-5638-4eb7-9409-53204d7f3cae",
  "group": "xerces",
  "name": "xercesImpl",
  "version": "2.12.2"
}

cpe:2.3:a:apache:xerces2_java:*:*:*:*:*:*:*:*
{
  "uuid": "445dc140-5638-4eb7-9409-53204d7f3cae",
  "group": "xerces",
  "name": "xercesImpl",
  "version": "2.12.2"
}

Benefits

TBD

Drawbacks

TBD

Dependency Dashboard

This issue lists Renovate updates and detected dependencies. Read the Dependency Dashboard docs to learn more.

Ignored or Blocked

These are blocked by an existing closed PR and will not be recreated unless you click a checkbox below.

Detected dependencies

docker-compose
docker-compose.yml
  • postgres 15-alpine
  • docker.redpanda.com/vectorized/redpanda v22.3.11
  • docker.redpanda.com/vectorized/redpanda v22.3.11
  • docker.redpanda.com/vectorized/console v2.1.1
  • prom/prometheus v2.41.0
  • grafana/grafana-oss 9.3.4
dockerfile
mirror-service/src/main/docker/Dockerfile.jvm
mirror-service/src/main/docker/Dockerfile.legacy-jar
mirror-service/src/main/docker/Dockerfile.native
  • registry.access.redhat.com/ubi8/ubi-minimal 8.7-1031
mirror-service/src/main/docker/Dockerfile.native-micro
  • quay.io/quarkus/quarkus-micro-image 2.0
notification-publisher/src/main/docker/Dockerfile.jvm
notification-publisher/src/main/docker/Dockerfile.legacy-jar
notification-publisher/src/main/docker/Dockerfile.native
  • registry.access.redhat.com/ubi8/ubi-minimal 8.7-1049
notification-publisher/src/main/docker/Dockerfile.native-micro
  • quay.io/quarkus/quarkus-micro-image 2.0
notification-publisher/src/main/docker/Dockerfile.native-multiarch
  • quay.io/quarkus/quarkus-micro-image 2.0
repository-meta-analyzer/src/main/docker/Dockerfile.jvm
repository-meta-analyzer/src/main/docker/Dockerfile.legacy-jar
repository-meta-analyzer/src/main/docker/Dockerfile.native
  • registry.access.redhat.com/ubi8/ubi-minimal 8.7-1049
repository-meta-analyzer/src/main/docker/Dockerfile.native-micro
  • quay.io/quarkus/quarkus-micro-image 2.0
repository-meta-analyzer/src/main/docker/Dockerfile.native-multiarch
  • quay.io/quarkus/quarkus-micro-image 2.0
vulnerability-analyzer/src/main/docker/Dockerfile.jvm
vulnerability-analyzer/src/main/docker/Dockerfile.legacy-jar
vulnerability-analyzer/src/main/docker/Dockerfile.native
  • registry.access.redhat.com/ubi8/ubi-minimal 8.7-1049
vulnerability-analyzer/src/main/docker/Dockerfile.native-micro
  • quay.io/quarkus/quarkus-micro-image 2.0
vulnerability-analyzer/src/main/docker/Dockerfile.native-multiarch
  • quay.io/quarkus/quarkus-micro-image 2.0
github-actions
.github/workflows/_build-native-meta.yml
  • actions/checkout v3
  • actions/setup-java v3
  • docker/setup-qemu-action v2.1.0
  • docker/setup-buildx-action v2.2.1
  • actions/upload-artifact v3.1.2
  • actions/checkout v3
  • actions/download-artifact v3.0.2
  • actions/download-artifact v3.0.2
  • docker/setup-qemu-action v2.1.0
  • docker/setup-buildx-action v2.2.1
  • docker/login-action v2
  • docker/build-push-action v3
.github/workflows/ci.yml
  • actions/checkout v3
  • actions/setup-java v3
.github/workflows/codeql.yml
  • actions/checkout v3
  • actions/setup-java v3.9.0
  • github/codeql-action v2
  • github/codeql-action v2
  • github/codeql-action v2
.github/workflows/publishJar.yml
  • actions/checkout v3
  • actions/setup-java v3
  • docker/setup-qemu-action v2.1.0
  • docker/setup-buildx-action v2.2.1
  • docker/login-action v2
  • docker/build-push-action v3
  • docker/build-push-action v3
  • docker/build-push-action v3
  • docker/build-push-action v3
.github/workflows/semgrep.yml
  • actions/checkout v3
  • returntocorp/semgrep-action 137b6fa8d60023238378b28e19b9600e4edce87e
  • github/codeql-action v2
maven
commons/pom.xml
mirror-service/pom.xml
notification-publisher/pom.xml
pom.xml
  • io.quarkus:quarkus-bom 2.16.0.Final
  • us.springett:cpe-parser 2.0.2
  • org.cyclonedx:cyclonedx-core-java 7.3.1
  • io.github.resilience4j:resilience4j-all 2.0.2
  • io.github.resilience4j:resilience4j-micrometer 2.0.2
  • org.json:json 20220924
  • io.pebbletemplates:pebble 3.1.6
  • io.quarkiverse.helm:quarkus-helm 0.2.3
  • xerces:xercesImpl 2.12.2
  • org.apache.maven:maven-artifact 4.0.0-alpha-3
  • us.springett:cvss-calculator 1.4.1
  • com.github.tomakehurst:wiremock-jre8-standalone 2.35.0
  • org.mock-server:mockserver-netty 5.15.0
  • org.assertj:assertj-core 3.24.2
  • net.javacrumbs.json-unit:json-unit-assertj 2.36.0
  • org.apache.maven.plugins:maven-compiler-plugin 3.10.1
  • io.smallrye:jandex-maven-plugin 3.0.5
  • io.quarkus.platform:quarkus-maven-plugin 2.16.0.Final
  • io.quarkus:quarkus-container-image-docker 2.16.0.Final
  • org.apache.maven.plugins:maven-surefire-plugin 3.0.0-M8
  • org.apache.maven.plugins:maven-failsafe-plugin 3.0.0-M8
  • org.jacoco:jacoco-maven-plugin 0.8.8
repository-meta-analyzer/pom.xml
vulnerability-analyzer/pom.xml
maven-wrapper
.mvn/wrapper/maven-wrapper.properties
  • maven 3.8.7
  • maven-wrapper 3.1.1
npm
load-tests/package.json
  • @types/k6 ^0.42.0

  • Check this box to trigger a request for Renovate to run again on this repository

Re-implement policy evaluation in hyades API server

Current Behavior

Currently the dependency track application performs policy evaluation of components of a project after the Vulnerability analysis is complete for all the components of the project. This is also a sequential and heavy task if triggered for a potfolio. This means that policies are evaluated sequentially for every project and every component in the portfolio. Thus if there are 10 policies to go by, and 150 components in a project and 1000 projects in a portfolio, we can picture how long we are looping around the policy evaluation to complete.


Proposed Change


To speed up the policy evaluation for project and portfolio levels, we could leverage the partitions on kafka topics in two ways to advantage us:

  1. One component being evaluated for multiple policies at the same time.
  2. Multiple components for a given project being evaluated for a given policy
  3. Mutliple projects being evaluated in parallel for a policy

If a policy needs the most recent vulnerability info of a component then we can achieve this by triggering policy evaluation of a component right after the vulnerability analysis of the component is complete. This would still be faster updates on policy rather than waiting for vulnerability analysis of all components to complete before doing policy evaluation.

I will attach a diagram as well for further clarity

Make topic prefix configurable

Some organizations may have a need to use custom prefixes for Kafka topics.

One such use case could be to run multiple Hyades instances on a shared, managed Kafka instance.

Note that for internal topics, Kafka Streams will use the application ID as topic prefix. In order to provide custom prefixes here, users will have to modify the kafka-streams.application.id property.

Plan and perform load tests

We need to ensure that the architecture and service implementations meet our performance and scalability requirements.

Test Data

Execution

Uploading a pool of SBOMs to Dependency-Track: https://github.com/nscuro/dtbench

Cleanup unused code

  • The event and model packages contain lots of unused classes
  • The serde package contains lots of redundant (de-) serialization classes
    • In most cases, using the ObjectMapperSerde provided by Kafka does the job, so there's no need for dedicated serializers like CacheKeySerializer
  • JDO / DataNucleus annotations should be removed from model classes
  • Dependency on most Alpine libraries (except alpine-common) should be removed
  • alpine.properties should be removed

Collect test data

For #136, we'll need a pool of realistic SBOMs we can use for testing.

Possible strategies:

  • Collect existing SBOMs from projects that publish them
  • Generate SBOMs ourselves for various popular OSS projects
  • Generate SBOMs based on data from 3rd party sources (e.g. deps.dev)

Possible sources:

`Unable to records bytes produced to topic` errors logged in repository meta analyzer

Whenever repository-meta-analyzer produces a result, Kafka Streams logs an error like the following:

2023-01-30 12:04:38,195 ERROR [org.apa.kaf.str.pro.int.RecordCollectorImpl] (kafka-producer-network-thread | hyades-repository-meta-analyzer-7308f986-fa3c-43a0-a365-5abc59f77ea9-StreamThread-3-producer) stream-thread [hyades-repository-meta-analyzer-7308f986-fa3c-43a0-a365-5abc59f77ea9-StreamThread-3] task [0_2] Unable to records bytes produced to topic dtrack.repo-meta-analysis.result by sink node produce_npm_result_to_dtrack.repo-meta-analysis.result_topic as the node is not recognized.
Known sink nodes are [produce_empty_result_to_dtrack.repo-meta-analysis.result_topic].

Records are produced as expected, so this does not appear to be a show stopper. We should investigate on this anyway and try to resolve it.

Look into handling unavailable kafka service/ broker

If for some reason, the topics that the application wants to send data to are temporarily unavailable then the application should be able to hold on to the events for a certain time and retry like a polling mechanism until the topics are available again. This would help in stopping any events being lost due to unavailability of service.

Enable compression on `dtrack.vuln-analysis.result` topic

As we have to transmit details for every identified vulnerability (except for those found by the internal analyzer), and each event can contain zero or more vulnerabilities, it is likely that in some situations, we will exceed Kafka's default message size limit of 1MB.

On top of using Protobuf, we should also enable compression on the topic to reduce the chances of limit exceeding even further.

Compression is already enabled in the mirror-service for the dtrack.vulnerability topic.

Note that only the snappy algorithm is supported by Quarkus Native Image as of now.

Onboarding of Notification Service on to Kafka

Proposed changes for Notification Service:

  1. Move the Notification module on to Kafka Service.

  2. Currently DT calls notification after any operation that it performs, the notification call goes to NotificationUtil and from there the notification is dispatched from overloaded method
    Proposed Changes - Instead of dispatching notifications, put the notification object on to kafka topic which will be read by Quarkus service and the Notification is dispatched from the Consumer.

  3. The Notification.Dispatch call for Exceptions is scattered throughout the project
    Proposed Changes - Create a common method for creating the Notification object in case of exceptions in NotificationUtil and again put that notification object on kafka topic

4.The NotificationRouter reads the notification rules from persistence manager
Proposed Changes - Since Quarkus application does not have access to database currently, therefore on Quarkus application startup, fetch the NotificationRules from DT using REST Call and Store them in the in-memory data structure in Quarkus application for each NotficationLevel( Informational, Warning and ERROR)

  1. Few configs are also read from database
    Proposed Changes - Add those configs in the application property maintained by quarkus application

Only publish events to `dtrack.vulnerability` when the respective vulnerability actually changed

When mirroring datasources, the mirror-service will currently blindly publish vulnerability data to the dtrack.vulnerability topic.

This means that the API server will be under increased pressure whenever a mirror operation is running, as it has to re-ingest >200k events every single time.

It would be a lot more efficient if the mirror-service would keep track of which vulnerabilities it has already seen, and which did not change since it saw them the last time. To reduce load on the API server, the mirror-service should only publish vulnerability events to the topic when the respective vuln is new, or was updated.

Proposal : Move the Repository MetaAnalyzer to Quarkus Application

Current Implementation

Currently, the Dependency Track API server has a RepositoryMetaAnalyzer class that analyses a given component based on its purl type to get a meta model of the given component. If the meta model has a latestVersion field that is not null then a metaComponent is created with the latest information from the model and then synchronised with the database.
This RepositoryMetaAnalyzer is actively listening for RepositoryMetaEvent.

RepositoryMetaEvent : This is an event type in DT API server that can be triggered from 3 different places:

  • ComponentResource calls it in two places:
    • Create a new component
    • Update an existing component
  • BomUploadProcessingTask: when a new sobs is uploaded
  • TaskScheduler is called once in every 24 hours after a delay of 1 hour upon application startup.

In the RepositoryMetaAnalyzer, there are two workflows:

  • If the arriving event has a component in it, this would result in an individual component analysis.
  • If the arriving event does not have any component in it, (this is the one coming from the task scheduler), it would result in a portfolio analysis.

Drawbacks of this approach
While the portfolio analysis can be seen as a really heavy task from the description above itself that is consuming a lot of resources of the API server. The events containing individual components would also lead to making DT API server resources busy, in the sense that, consider a large SBOM containing 10000 components uploaded resulting in an event for the BOMUploadProcessingTask. Then a repositoryMetaEvent is triggered for every component in the SBOM.

Proposal

We move out the RepositoryMetaAnalyzer to the external Quarkus application that leverages Kafka and we have used it already to offload the VulnerabilityAnalysis from DT Api server. This is the initial proposed design.

Analysing each component

Every time a component is created or updated in the UI via the componentResource or a new BOM is uploaded resulting in a BOMUploadProcessingTask, the proposal is to produce components as events on a Kafka topic (maybe the same topic being used for vulnerabilityAnalysis?). If component does not have a purl associated with it, then we do not need to send this on the kafka topic since in the current implementation, if the model retrieved after analysis has the latestVersion field as null (this would be the case when the component does not have a purl) then the meta analysis has no impact for this.

For each analyser type, we should get the supported repository and then get all the repositories corresponding to the supported type into a Global KTable to be used for this analysis. This KTable would use the repositoryType as the key and the list of supported repositories as the value.
A repository might also need a username and password. These are expected to be in the environment variables at the start of the application.

These events contain the component UUID and purl at a minimum. The incoming events can be mapped to different meta analyser consumers by a primary meta analyser consumer based on the type of Purl. Once the analysis is done by the corresponding meta analyser implementation like Maven or Golang, then the fields of a metaComponent object are updated with the meta Model.
This marks the completion of the work by the meta analysers. The output containing the metaComponent can be sent back to the DT API server to save it to the database.

RepositoryMetaEvent from Task Scheduler
When the repository meta event is triggered by the task scheduler, we are essentially doing a portfolio analysis. Get all the components of all the active projects in DT API server and then each of these components can be sent to the same event as above and be analysed.

Screenshot 2022-10-28 at 14 21 51

Native image build for mirror service is broken

https://github.com/DependencyTrack/hyades/actions/runs/4042636687/jobs/6950890993

There is no Quarkus extension for Apache HTTP client 5, we'll need to make it work with GraalVM manually:

Fatal error: com.oracle.graal.pointsto.util.AnalysisError$ParsingError: Error encountered while parsing org.apache.hc.client5.http.ssl.ConscryptClientTlsStrategy.createTlsDetails(javax.net.ssl.SSLEngine) 
Parsing context:
   at org.apache.hc.client5.http.ssl.ConscryptClientTlsStrategy.createTlsDetails(ConscryptClientTlsStrategy.java:102)
   at org.apache.hc.client5.http.ssl.AbstractClientTlsStrategy$2.verify(AbstractClientTlsStrategy.java:135)
   at org.apache.hc.core5.reactor.ssl.SSLIOSession.doHandshake(SSLIOSession.java:383)
   at org.apache.hc.core5.reactor.ssl.SSLIOSession.initialize(SSLIOSession.java:259)
   at org.apache.hc.core5.reactor.ssl.SSLIOSession.access$100(SSLIOSession.java:72)
   at org.apache.hc.core5.reactor.ssl.SSLIOSession$1.connected(SSLIOSession.java:161)
   at org.apache.hc.core5.reactor.InternalDataChannel.onIOEvent(InternalDataChannel.java:121)
   at org.apache.hc.core5.reactor.InternalChannel.handleIOEvent(InternalChannel.java:51)
   at org.apache.hc.core5.reactor.SingleCoreIOReactor.processConnectionRequest(SingleCoreIOReactor.java:385)
   at org.apache.hc.core5.reactor.SingleCoreIOReactor.processPendingConnectionRequests(SingleCoreIOReactor.java:307)
   at org.apache.hc.core5.reactor.SingleCoreIOReactor.doExecute(SingleCoreIOReactor.java:138)
   at org.apache.hc.core5.reactor.AbstractSingleCoreIOReactor.execute(AbstractSingleCoreIOReactor.java:85)
   at org.apache.hc.core5.reactor.IOReactorWorker.run(IOReactorWorker.java:44)
   at java.lang.Shutdown.runHooks(Shutdown.java:130)
   at java.lang.Shutdown.exit(Shutdown.java:173)
   at java.lang.Runtime.exit(Runtime.java:115)

Add project justification / value proposition

We should have documentation on the whys of this project, and what problems we're trying to solve.

This documentation should at least address the following points:

  • Why this project exists (limitations of vanilla DT regarding scalability)
    • Should include rough overview of how publish-subscribe works in vanilla DT and why it is limiting
  • Why Kafka was chosen as messaging solution
    • Why not RabbitMQ, Pulsar, NATS, Liftbridge
    • Any caveats that come with Kafka - we should be transparent about those
  • Why Java / Quarkus was chosen
    • e.g. availability of Kafka Streams library, option of using GraalVM native image etc.
  • Why we decided against microservices (for now)

I propose to put this information in a WTF.md file, and link to it from README.md for anyone who's interested.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.