Code Monkey home page Code Monkey logo

Comments (6)

chandrashekar-s avatar chandrashekar-s commented on August 27, 2024 1

Thanks @chandrashekar-s for documenting/fixing these. Can you please clarify what determines "number of task managers"? Is it controlled by maxParallelism and maxWorkers here?

The maxParallelism which is set by the maxWorkers over here, controls the maximum number of tasks to which keyed state can be distributed, i.e it is something like distributing the incoming streamed data over N keyed partitions. Few details can be found here. This looks like it will be used in a Streaming environment and this should not affect our pipelines as we don't have any streams.

Also, commenting on the number of task managers it is set to 1 by default in a local environment, can be found here. Increasing it in a local environment might not help much. However, in a clustered environment this gets dynamically calculated based on the parallelism set for operators in tasks (not the maxParallelism) and the number of taskSlots in a single TaskManager defined by this parameter.

Looks like the maxWorkers might not be needed here and for a clustered environment we need to understand on how the number of Task Managers are created dynamically and how is it dependent on the taskmanager.memory.network.max parameters. Created this issue for this.

from fhir-data-pipes.

bashir2 avatar bashir2 commented on August 27, 2024

One particular issue that we have been facing is that multiple runs of the pipeline (started by the controller) keep increasing the memory usage. Both this behavior and the Java heap issue described in the OP are probably due to Flink using off-heap memory (link1, link2). This memory is not managed directly by Java and in particular Flink needs to take care of its garbage collection. I don't know the details of how Flink does this but from notes like this it seems that freeing up that memory is tied to how/when Java's GC is triggered and sometime it can take too long to happen. To mitigate this I added a GC hint at the end of pipeline runs in PR #802 and verified that the memory usage actually comes down after each run.

Clarification: The memory still increases over time, after multiple pipeline runs, so we still have an issue; but the rate of increase is significantly lower with the GC hint.

from fhir-data-pipes.

bashir2 avatar bashir2 commented on August 27, 2024

BTW, here are the steps I used to figure out that it is indeed the off-heap memory which keeps accumulating:

  • Run the controller with memory tracking enabled: java -XX:NativeMemoryTracking=detail -jar ./target/controller-0.1.0-SNAPSHOT-exec.jar
  • Take a baseline before a pipeline run: jcmd [PID] VM.native_memory baseline
  • Take another memory snapshot after a run and compare the diff: jcmd [PID] VM.native_memory detail.diff
    The output of the last command showed that we had ~5GB of committed memory (which is close to taskmanager.memory.network.max). This extra usage was not in Java Heap; instead it was mostly from the "Other" section. The details of that shows it almost entirely comes from Unsafe.allocateMemory:
[0x00007fb2640e703c] Unsafe_AllocateMemory0+0x7c
[0x00007fb24d5c4b53]
                             (malloc=10478114KB type=Other +5242882KB #327443 +163841)

from fhir-data-pipes.

chandrashekar-s avatar chandrashekar-s commented on August 27, 2024

In continuation to the above mentioned GC Hint change, analysis was further made to find the root of the problem for the memory leak. Here is the analysis -

Root cause:
By default, glibc is the default memory allocator for the JVM. This allocator currently has an memory fragmentation issue due to which when the JVM frees the memory the glibc does not return memory to kernel gracefully (please refer to glibc bugzilla and glibc manual. This was the reason why our application RSS memory kept increasing.

Fix
The GC hint fix at the end of the pipeline run, mitigated the problem significantly. However the complete problem can be fixed if the memory can be released by the glibc.

  1. As part of this change the jcmd System.trim_native_heapcommand will release the superfluous memory to the Operating System. This has an immediate effect. But since this is outside of the application process, it requires a cron/periodic process to trigger this command.
  2. There is a similar change which is getting addressed as part of the new JDK release (17.0.9) in Oct-2023, which enables the JVM to trim the native memory as a configuration parameter (the parameter is an experimental feature now), the details are in this link.
  3. The flink docker image has jemalloc as the default memory allocator to avoid the above mentioned memory fragmentations (Jemalloc intends to emphasize fragmentation avoidance), this change has been introduced as part of this request.

Tests
Few tests were made to test the above mentioned fixes.

Test Environment

Data Set : 2.5K Patients
JVM Paremeters : -Xms4g -Xmx4g
Flink Parameters :
   taskmanager.memory.network.max: 3gb
   taskmanager.memory.managed.size: 1gb

The test was made with a docker image. Docker image was constrained with a memory of 12 GB. Around 5 batch runs (Pipeline was triggered 5 times one after the other). Below fixes was used to test for each round

  1. With no Fix
  2. With GC Hint Fix and
  3. With GC Hint Fix + JCMD Trim Native memory command.

The results are shown in the screenshot below.

Screenshot 2023-09-05 at 8 54 15 PM

Observations:

  1. With No fix the RSS memory of the application increased with each run and eventually the docker container was killed due to OOM error.
  2. With ‘GC hint` fix the RSS memory was released significantly after each pipeline run. Docker was not killed even after 5 runs.
  3. With ‘GC hint + JCMD Trim Native Memory’ command the RSS memory was released after every batch run and it was reset to the -Xmx4g limit. The pattern was consistent even after 5 runs.

Similar tests were also repeated with the JDK pre release version 17.0.9 at this link and the results were consistent with the above observations.

Conclusion:
From the analysis, it is clear that the glibc which is the default memory allocator for the JVM is causing the memory leak. As an immediate fix, the GC Hint change has mitigated the problem to a good extent. For the complete fix we could wait for the JDK release 17.0.9 which will be made in Oct-2023 and upgrade our application. Alternatively, we can modify the default memory allocator to jemalloc similar to what flink has done in its docker image, which doesn’t have these memory fragmentation issues.

from fhir-data-pipes.

chandrashekar-s avatar chandrashekar-s commented on August 27, 2024

Memory Leak
As documented in the previous comments, the default glibc memory allocator causes memory fragmentation and is leading to memory leak. This has been fixed by this PR where the default memory allocator has been replaced with jemalloc allocator which does not have these issues.

Flink memory parameters

Observations made during testing
When the number of FHIR resource types to be processed is more, then the Flink network memory taskmanager.memory.network.max needed was also more. Similarly, when the number of cores of the machine in which the pipeline is running is more, even then the taskmanager.memory.network.max needed was also more.

Based on the details mentioned in this link and the observations made during the testing, the number of network buffers required for an incremental pipeline can be defined as follows

  `#FHIR-resource-types * #slots-per-TM^2 * #TMs * 10`
  • #FHIR-resource-types are the number of different FHIR resource types to be processed
  • #slots per TM are the number of slots per TaskManager
  • #TMs are the total number of task managers
  • 10 being the number of repartitioning-/broadcasting steps being active at the same time

To support, for example, one 8-slot machine and 3 FHIR resource types, you should use roughly need 3 * 8^2 * 1* 10 = 1920 network buffers for optimal throughput. Each network buffer has by default a size of 32 KiBytes. In the example above, the system would thus allocate roughly 60 MiBytes for network buffers.

Fixes

As part of this PR the pipelines are refactored such that the Flink network memory is not dependent on the number of FHIR resource types and is always a constant. As part of this change, multiple pipelines (one for each FHIR resource type) are created during each incremental run and only a fixed number (currently set to 2) of pipelines run at any point in time.

The number of slots in a pipeline currently defaults to the number of the cores of the machine, but also can be configured via numThreads.

Based on these changes, the taskmanager.memory.network.max memory required for each refactored pipeline is as shown below

    `#slots-per-TM^2 * #TMs * 10`

from fhir-data-pipes.

bashir2 avatar bashir2 commented on August 27, 2024

Thanks @chandrashekar-s for documenting/fixing these. Can you please clarify what determines "number of task managers"? Is it controlled by maxParallelism and maxWorkers here?

from fhir-data-pipes.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.