Code Monkey home page Code Monkey logo

fhir-data-pipes's Introduction

Build Status codecov

What is this?

This repository includes pipelines to transform data from a FHIR server (like HAPI, GCP FHIR store, or even OpenMRS) using the FHIR format into a data warehouse based on Apache Parquet files, or another FHIR server. There is also a query library in Python to make working with FHIR-based data warehouses simpler.

These tools are intended to be generic and eventually work with any FHIR-based data source and data warehouse. Here is the list of main directories with a brief description of their content:

  • pipelines/ *START HERE*: Batch and streaming pipelines to transform data from a FHIR-based source to an analytics-friendly data warehouse or another FHIR store.

  • docker/: Docker configurations for various servers/pipelines.

  • doc/: Documentation for project contributors. See the pipelines README and wiki for usage documentation.

  • utils/: Various artifacts for setting up an initial database, running pipelines, etc.

  • dwh/: Query library for working with distributed FHIR-based data warehouses.

  • bunsen/: A fork of a subset of the Bunsen project which is used to transform FHIR JSON resources to Avro records with SQL-on-FHIR schema.

  • e2e-tests/: Scripts for testing pipelines end-to-end.

NOTE: This was originally started as a collaboration between Google and the OpenMRS community.

fhir-data-pipes's People

Contributors

abhia-dev avatar atulai-sg avatar bashir2 avatar chandrashekar-s avatar dependabot[bot] avatar fredhersch avatar gdevanla avatar gitcliff avatar ibacher avatar jecihjoy avatar kimaina avatar ljnic avatar mozzy11 avatar muhammad-levi avatar omarismail94 avatar pmanko avatar rayyang29 avatar suriyan3 avatar suruchee avatar tawfiqkhalilieh avatar vbothe23 avatar williamito avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

fhir-data-pipes's Issues

Move debezium specific settings to the json config file

That was actually my initial intention but remembered we have about 15+ debezium parameters that can be modeled in the generalConfiguration. I.e we have a generalConfig model that houses

  • EventToFhir map
  • DebeziumParams
  • Other future pipeline params

That we can have one jsonschema with all necessary pipeline properties i.e

//generalConfig components
{
	"debeziumConfig": { 15+ params},
	"eventConfigurations": [],
}

WDYT?

Originally posted by @kimaina in #45 (comment)

Synching to an OpenHIM instance

As part of the PLIR / PLM work, we've decided on an architecture that uses OpenHIM as a queuing and auditing tool for data sent between OpenMRS and the HAPI FHIR server serving as the data store / SHR. To support this, we need to ensure that there is a proper pathway for sending FHIR-formated data from the analytics engine to an OpenHIM instance configured with whatever OpenHIM mediator we need (this can be a custom OpenHIM mediator, a simple pass-through or whatever else makes sense).

Standardise argument passing strategy using command-line flags across the 4 sub-projects

We don't need to address this issue in this PR, so I am just putting it here since we have a context to discuss it: While I understand it is a subjective choice I prefer using command line args and then environment variables. Benefits:

  1. Usually there are libraries to easily handle default values and help messages nicely. So a user can quickly see what are the arguments for the program and what they do without looking at the code, just through running the binary.
  2. If there is a typo in an argument, the command line parser catches it but with env var. or properties with defaults, it is harder to catch.
  3. If we prefer non-command line args approach, I think env. variables are better since they have more direct integration with non-java tools/langs, e.g., bash, or docker, or other languages (for example it is possible that we may have some Python in the future).

But again, let's not address it in this PR and discuss it further at some point (please feel free to reply with what you think).

Originally posted by @bashir2 in #14 (comment)

Potential performance issues in the JDBC based batch mode

@kimaina I tried the new JDBC based batch mode for extracting Patient, Encounter, and Observation resources from the test DB we set through openmrs-compose.yaml, i.e., ~1K patients, ~11K encounters, ~94K obs.

It took more than two hours to finish while the FHIR search based approach took ~5 minutes. I have not investigated much what the root cause is but am filing this issue to follow up. Here is the command I ran:

java -cp batch/target/fhir-batch-etl-bundled-0.1.0-SNAPSHOT.jar org.openmrs.analytics.FhirEtl \
  --openmrsServerUrl=http://localhost:8099/openmrs --jdbcUrl=jdbc:mysql://localhost:3306/openmrs \
  --jdbcMaxPoolSize=100 --jdbcModeEnabled --fileParquetPath=tmp/TEST_docker_batch/ \
  --tableFhirMapPath=tmp/dbz_event_to_fhir_config_MINIMAL.json --jdbcFetchSize=1000

Some notes and observations:

  • tmp/dbz_event_to_fhir_config_MINIMAL.json is a config in which I only kept, person, patient, obs, encounter to make sure the initial step of counting each resource is fast (and confirmed that step is quick).
  • When the pipeline was running the maximum number of connections to the DB that I saw was 38, according to: show status where variable_name = 'threads_connected';
  • According to top, the CPU usage of the pipeline was max, i.e., over ~1100% on my 12 core machine (i.e., most cores were used by the pipeline) while the mysql and openmrs processes were taking less than 10% CPU each.
  • When running in the FHIR search mode, the openmrs process was sometime taking more than 600% while the pipeline was usually less than 500%. Also mysql was sometime close to 300%. So it seems in the JDBC mode, the DB has not been utilized completely (number of DB connections was sometime more than 50 in the FHIR search mode).

Here is the command for the FHIR search mode:

java -cp batch/target/fhir-batch-etl-bundled-0.1.0-SNAPSHOT.jar org.openmrs.analytics.FhirEtl \
  --openmrsServerUrl=http://localhost:8099/openmrs --fileParquetPath=tmp/TEST_docker_batch_FHIR-search_retry/ 

Switch the batch JDBC mode to use HikariCP instead of C3P0

Lastly, I also tried using Hikari instead of c3po, however, I have been having compatibility issues. So APache beam PTransform/DoFn expects the datasource to be serializable. I tried injecting Hikari using @setup() without success as advised here. Also tried using .withDataSourceProviderFn((SerializableFunction<Void, DataSource>). Most recommendation keeps on pointing back to c3po. Please let me know if you know a walk around for this issue. A

Originally posted by @kimaina in #72 (comment)

Implement a Debezium based streaming pipeline.

Our method of listening to changes in OpenMRS is through the Atom Feed module currently. Based on the feedback from the community, there seems to be a preference to use MySQL binlog and Debezium.

Make the events to FHIR generator configurable

Currently, the debezium events have been mapped to the corresponding FHIR API. However, this has been hardcoded and needs to be configurable to:

  • Facilitate adding new events/tables and new FHIR endpoints
  • Facilitate extraction non-FHIR mapping to REST API

Should consider json config that can be passed during runtime

Set up a GCP project for test purposes

This is to give non-Googler contributors an easy way to test their changes and also to create a continuous integration test environment to be able to do end-to-end tests.

Add the ability to turn on/off snapshotMode of debezium

Currently, the pipeline will take a snapshot of the database if the binlog was not enabled from day 0 of data entry. While this might seem attractive, we need the ability to turn this feature on/off depending on different situations for example

  1. to make e2e only test for new events
  2. to allow the ability to switch between batch and streaming seamlessly without rerunning old events

how do we do this?

The criteria for running a snapshot upon startup of the connector. Options include: 'when_needed' to specify that the connector run a snapshot upon startup whenever it deems it necessary; 'schema_only' to only take a snapshot of the schema (table structures) but no actual data; 'initial' (the default) to specify the connector can run a snapshot only when no offsets are available for the logical server name; 'initial_only' same as 'initial' except the connector should stop after completing the snapshot and before it would normally read the binlog; and’never' to specify the connector should never run a snapshot and that upon first startup the connector should read from the beginning of the binlog. The 'never' mode should be used with care, and only when the binlog is known to contain all history.

https://camel.apache.org/components/latest/debezium-mysql-component.html

Fix Debezium events to FHIR mapping where uuid is missing for specific tables

We put a strong assumption that all tables use UUID, however, it has been reported that some tables don't have a UUID column.

There are 18 tables in the RefApp that don’t have a uuid column. Most of these are Hibernate child tables (e.g., concept_numeric shares ids with concept, but adds a few fields). Here’s the list from @ibacher :

  • concept_complex
  • concept_name_tag_map
  • concept_numeric
  • concept_proposal_tag_map
  • drug_order
  • liquibasechangelog
  • liquibasechangeloglock
  • location_tag_map
  • logic_rule_token_tag
  • logic_token_registration_tag
  • order_type_class_map
  • patient
  • role_privilege
  • role_role
  • scheduler_task_config_property
  • test_order
  • user_property
  • user_role

select T.TABLE_NAME
from information_schema.TABLES T
where TABLE_SCHEMA = 'core' and not exists (
 select *
 from information_schema.COLUMNS C
 where C.TABLE_CATALOG = T.TABLE_CATALOG
 and C.TABLE_SCHEMA = T.TABLE_SCHEMA
 and C.TABLE_NAME = T.TABLE_NAME
 and C.COLUMN_NAME = 'uuid'
)


Fix issues with openmrs-compose.yaml

After merging PR #24 the following command fails:
$ docker-compose -f openmrs-compose.yaml up
There are two errors. The first one is due to file permissions of the utils/ directory. Resolving this with:
$ sudo chown -R 999:0 utils/
causes the next issue which is liquibase related and happens when the OpenMRS starts and applies the DB changes (@kimaina I thought with DB_AUTO_UPDATE: 'false' this should not happen, right?). Sample stack trace:

openmrs    | SEVERE 10/16/20 7:44 PM:liquibase: Change Set liquibase-update-to-latest.xml::20090316-1008::vanand failed.  Error: Error executing SQL ALTER TABLE `concept_numeric` MODIFY `precise` SMALLINT NOT NULL DEFAULT 0 DE
FAULT 0: Unknown column 'precise' in 'concept_numeric'                                                           
openmrs    | liquibase.exception.DatabaseException: Error executing SQL ALTER TABLE `concept_numeric` MODIFY `precise` SMALLINT NOT NULL DEFAULT 0 DEFAULT 0: Unknown column 'precise' in 'concept_numeric'
openmrs    |    at liquibase.executor.jvm.JdbcExecutor.execute(JdbcExecutor.java:62)   
openmrs    |    at liquibase.executor.jvm.JdbcExecutor.execute(JdbcExecutor.java:104)         
openmrs    |    at liquibase.database.AbstractDatabase.execute(AbstractDatabase.java:1091)    
openmrs    |    at liquibase.database.AbstractDatabase.executeStatements(AbstractDatabase.java:1075)             
openmrs    |    at liquibase.changelog.ChangeSet.execute(ChangeSet.java:317)                                                                                                                                                      
openmrs    |    at liquibase.changelog.visitor.UpdateVisitor.visit(UpdateVisitor.java:27)
openmrs    |    at org.openmrs.util.DatabaseUpdater$1OpenmrsUpdateVisitor.visit(DatabaseUpdater.java:191)
openmrs    |    at liquibase.changelog.ChangeLogIterator.run(ChangeLogIterator.java:58)   
openmrs    |    at org.openmrs.util.DatabaseUpdater.executeChangelog(DatabaseUpdater.java:220)      
openmrs    |    at org.openmrs.util.DatabaseUpdater.executeChangelog(DatabaseUpdater.java:150)
openmrs    |    at org.openmrs.web.filter.initialization.InitializationFilter$InitializationCompletion$1.run(InitializationFilter.java:1644)
openmrs    |    at java.lang.Thread.run(Thread.java:748)                                                                                                                                                                          
openmrs    | Caused by: com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Unknown column 'precise' in 'concept_numeric'
openmrs    |    at sun.reflect.GeneratedConstructorAccessor116.newInstance(Unknown Source)                                                                                                                                        
openmrs    |    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
openmrs    |    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
openmrs    |    at com.mysql.jdbc.Util.handleNewInstance(Util.java:425)
openmrs    |    at com.mysql.jdbc.Util.getInstance(Util.java:408)               
openmrs    |    at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:944)
openmrs    |    at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3973)
openmrs    |    at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3909)
openmrs    |    at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2527)   
openmrs    |    at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2680)       
openmrs    |    at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2480)
openmrs    |    at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2438)     
openmrs    |    at com.mysql.jdbc.StatementImpl.executeInternal(StatementImpl.java:845)
openmrs    |    at com.mysql.jdbc.StatementImpl.execute(StatementImpl.java:745)                                                                                                                                                   
openmrs    |    at liquibase.executor.jvm.JdbcExecutor$1ExecuteStatementCallback.doInStatement(JdbcExecutor.java:92)                                                                                                              
openmrs    |    at liquibase.executor.jvm.JdbcExecutor.execute(JdbcExecutor.java:55)                             
openmrs    |    ... 11 more

Create and enable end-to-end test for the batch mode.

We need end-to-end tests that start from a fixed OpenMRS instance, bring up the pipelines in streaming and batch modes to ETL data from the source OMRS into the sink FHIR-store/data-warehouse; then verity the content of the sink.

This depends on #7.

Change POST to PUT when Uploading Fhir data TO the Fhir Store

Uploading Resources by POST ,forces the Hapi fhir server to strictly create New Resources and generates a new internal Resource ID.
This leads to data duplication in case the same resource is POSTed more than Once .
When handling Referential Integrity ,Hapi generates new dummy resources which continuously leads to unnecessary data duplication .
see Talk Post for more details.
This should be implemented both in Streaming and Batch modes.

Add ability to export and stream non-FHIR data elements

Currently, approximately all data elements in OpenMRS have been mapped to their corresponding FHIR resources. However, there is a need to extract tables which have not been mapped e.g module-tables, visittype, concept, e.t.c

Evaluate the option of integrating FHIR module inside the pipeline code.

Currently we rely on the source OpenMRS to have FHIR module installed and query that module to get FHIR resources. Given that some concerns have been raised re. installing new modules in current OMRS implementations (e.g., see #3 and Atom Feed module) the same argument may apply to FHIR module too.

One solution to avoid any extra module installations is to integrate the FHIR module code inside our pipelines and only use the MySQL DB (or a replica) to access data. This has the extra benefits of:

  1. Having some performance benefits by avoiding encoding and decoding FHIR resources as JSON (see here).
  2. Make it easier to use a replica DB instead of the primary MySQL DB (to reduce load on the main EMR).

Create a way to mark and reprocess failed debezium offset

Currently, debezium flushes offset periodically even in case of failure due to IO/networking when writing to Parque/FHIR store. There is a need to track and reprocess failed offset when

  1. FHIR resources can't be fetched from OMRS due to related exceptions
  2. FHIR bundle can't be written on disk (Parque) or FHIR store due to related exceptions

Thanks @kimaina for the review and for the great questions:

Re. 1) Currently we don't have any partitioning enabled, even for the batch mode where the files are sharded, I don't think any partitioning is enabled. For the streaming mode we have a bigger problem for sharding and support so called "appending" (which is not really append). But I have left all of those issues for the future when I try the output in Spark. All that this PR does is to create a single Parquet file.

Re. 2) So first, re. overwriting a record (e.g., Observation, Patient, etc.) we currently just add to the output file and we need to deal with this at query time. Re. a different schema, yes currently I am assuming that the schema is fixed, although I am not sure if using FHIR extensions will break that. I need to check how Bunsen deals with extensions but it is quite possible that its support is generic (e.g., string key-value) and does not break previous schema without extension.

Re. 3) That is a good point; I have currently ignored IOExceptions (it only produces an ERROR log); do you want me to add an onException to the RouteBuilder process set up and throw the exception in process?

Re. 4) Replied inline.

Thank you for addressing this! We will need to create an issue/ticket to handle failures /exceptions - I think we need to mark failed offset and might need to create a separate process to rerun --> We might need more discussion on this!

Originally posted by @kimaina in #47 (comment)

Create an accessible test dataset.

This was done as part of PR #13 but did not work in all environments for some reasons (background), so backup.sql and data/ volume mapping was removed to fix that PR. But we need that functionality in future to make development and integration testing faster and easier; hence this issue.

Switch to using Basic Auth

Currently we use JSESSIONID to circumvent OpenMRS authentication which obviously is a temporary solution. The first permanent authentication fix is to switch to Basic Auth.

@pmanko FYI

Add atomfeed db settings to docker setup

Currently, the config for the connection to the atomfeed client db is in hibernate.default.properties.

This file is used during the build process, and a result the DB connection cannot be set at runtime. However, to simplify configuration, this setting should probably be configurable in the environment section of the streaming-atomfeed docker-compose setup.

Clean-up POM files

A lot of dependencies can be pruned especially in the case of Beam pipeline as we don't really need many of the runners.

Implement Parquet output generation.

There are three pieces into this work:

  1. Implementing an automated way for converting FHIR resources into a schema for Parquet files. Bonus points for implementing SQL-on-FHIR schema but not necessary as part of this.

  2. Add Parquet output generation to the Beam pipeline for the batch mode.

  3. Implement some sort of append version for the streaming pipeline.

Remember to partitionBy OMRSID/ResourceID/ for faster append and downstream consumption e.g OMRSID20/PatientID/

Adding various schema like OMOP or even SQL-on-FHIR will have their own separate issues.

Try using HAPI FHIR client to push data to GCP FHIR store.

If this is easily achievable, then we can have a more uniform solution for all FHIR client requirements, whether getting data from OpenMRS, pushing data to HAPI FHIR JPA servers, GCP FHIR store, or any other FHIR store in the future.

Improve readme file and how to get started procedure

We have issues being reported on

  1. The readme file not clear on how to sink FHIR into a generic FHIR store. Only GCP has been addressed, however, we do have the functionality and the docker container to do bulk/stream export to generic FHIR. The Hapi FHIR container is here https://github.com/GoogleCloudPlatform/openmrs-fhir-analytics/blob/master/sink-compose.yml
  2. How to configure debezium config vs atom-feed config. It is not clear from the readme which config to use under different modes.

any other issues related to readme file, please feel free to update this issue

Implement FHIR bulk paging logic inside the analytics engine codebase

To add more context to this: Currently the batch pipeline relies on FHIR search API and the specific way HAPI implements paging. This means that for very large DBs, the initial query for creating the list of IDs for resources to return can take very long. The idea is to implement a way that avoids such long DB queries and instead reads the list of IDs in segments from the DB directly.

That said, one benefit of the current implementation is that it supports general FHIR search URLs, e.g., with filters as mentioned here so it would be great to keep the current functionality while adding a direct DB based implementation too.

My guess is that this is fast only because you are grabbing a limited number of uuids. If you stream all uuids into an app (e.g., our batch pipeline) it should not be faster than the count query.

I agree, streaming in 300 million UUID would still make the API call to timeout

Originally posted by @kimaina in #39 (comment)

Rewrite the streaming-binlog pipeline using Apache Beam

As part of #65, I did an evaluation of how much effort it is to switch to Beam; filing this to document the options for future reference. I can think of two main options:

  • Use the default way Debezium is used through Kafka (see Debezium tutorial) and use the KafkaIO support of Beam. The benefit of this approach is relying on standard packages and also preparation for a distributed deployment (i.e., consolidating many OpenMRS instances into a single data-warehouse). The drawback is the architecture complexity that this introduces, i.e., Kafka, ZooKeeper, and Kafka Connect which we have been trying to avoid for the MVP.

  • Implement a custom IO connector for Beam as described here which includes an UnboundedSource that wraps Debezium. The main drawback of this approach is its custom nature and also the fact that it is not extendable to a distributed environment (for which Kafka is probably the right approach). The main benefit is its architectural simplicity.

Change the FHIR fetch request from GET request to POST request: Batch JDBC Mode

The Batch JDBC fetch pipeline uses a GET request to fetch bundles. There have been a reported bug that happens when passing more than 200 UUIDS, please see the below conversation.

@kimaina That's an interesting finding. There are always likely to be hard limits on the size of queries like this, since most web servers have some kind of limits to prevent DOS-style attacks. At ~200 UUIDs (which are encoded as, at best, 36 bytes), we're probably hitting Tomcat 7's default maxHttpHeaderSize of 8 KB (back of the napkin math suggests that 8 KB can handle 228 UUIDs, assuming absolutely no other headers get sent). My impression is that instead of rejecting the request, Tomcat just cuts it off and hands it off to the server to process, hence HAPI seeing a "Bad Request". I see three options:

  1. Mess with Tomcat's configuration to up the maxHttpHeaderSize for the HTTP connector.
  2. Limit the batch size to, e.g., 150-175 UUIDs.
  3. Use the FHIR POST search end-point (i.e., post a request to /Observation/_search with the _id parameter in the body of the request). This gets around things as the default Tomcat limit for POST bodies is 2 MB.

2 is obviously the easiest to implement.

BUG: Exception: ca.uhn.fhir.rest.server.exceptions.InvalidRequestException: HTTP 400 Bad Request
Originally posted by @ibacher in #72 (comment)

Make exported resource IDs consistent

I noticed that in the generated Parquet files, the resource IDs include that base url of the original OpenMRS server, e.g.,
id = http://localhost:9021/openmrs/ws/fhir2/R3/Person/bee471c4-7e08-4a31-b9d8-a0c0bd2ab103
while in the exported resources to BigQuery they are not, e.g,
id = bee471c4-7e08-4a31-b9d8-a0c0bd2ab103
This is important when we are joining resources on their cross references. The reason for the latter is here where we explicitly use getIdPart() while in the former case it comes from IdType.getValue().

We need to make these consistent. This will probably be fixed once PR #30 is submitted but I am making a note here to make sure we check and then close this.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.