Code Monkey home page Code Monkey logo

nifi-influxdb-bundle's Introduction

InfluxDB Processors For Apache NiFi

CircleCI codecov License GitHub issues GitHub pull requests Slack Status

About this Project

NiFi in current version has already built-in data processor org.apache.nifi.processors.influxdb.PutInfluxDB (doc) that accepts flow-files in InfluxDB’s Line Protocol format and stores the data into InfluxDB.

This processor is useful only for processing data that are generated by InfluxDB Telegraf or other applications that provides data directly in InfluxDB’s Line Protocol. It is not possible to use it to store structured data (json, avro, csv ...).

This motivates us to implement new processor org.influxdata.nifi.processors.PutInfluxDatabaseRecord that is based on NiFi Record Design. This processor allows to write any NiFi Record structured data into InfluxDB by PutInfluxDatabaseRecord. The processor works similarly as others NiFi built-in NiFi Record based Put*Record processors (PutDatabaseRecord, PutHBase,PutMongoRecord, ...).

We also support the InfluxDB 2.0 with several new processors:

Installation

To install the InfluxDB Processors you will need to copy the appropriate nar file into the lib directory of your NiFi installation ($NIFI_HOME/lib) and restart NiFi.

The Nar compatibility matrix:

Nar Version NiFi Version
nifi-influx-database-nar-1.30.0-SNAPSHOT.nar 1.26.0
nifi-influx-database-nar-1.29.0.nar 1.26.0
nifi-influx-database-nar-1.28.0.nar 1.25.0
nifi-influx-database-nar-1.27.0.nar 1.24.0
nifi-influx-database-nar-1.26.0.nar 1.24.0
nifi-influx-database-nar-1.25.0.nar 1.23.2
nifi-influx-database-nar-1.24.0.nar 1.19.0
nifi-influx-database-nar-1.23.0.nar 1.18.0
nifi-influx-database-nar-1.22.0.nar 1.17.0
nifi-influx-database-nar-1.21.0.nar 1.16.3
nifi-influx-database-nar-1.20.0.nar 1.16.3
nifi-influx-database-nar-1.19.0.nar 1.16.2
nifi-influx-database-nar-1.18.0.nar 1.16.1
nifi-influx-database-nar-1.17.0.nar 1.16.0
nifi-influx-database-nar-1.16.0.nar 1.15.3
nifi-influx-database-nar-1.15.0.nar 1.15.2
nifi-influx-database-nar-1.14.0.nar 1.15.0
nifi-influx-database-nar-1.13.0.nar 1.14.0
nifi-influx-database-nar-1.12.0.nar 1.13.2
nifi-influx-database-nar-1.11.0.nar 1.13.2
nifi-influx-database-nar-1.10.0.nar 1.13.2
nifi-influx-database-nar-1.9.0.nar 1.13.0
nifi-influx-database-nar-1.8.0.nar 1.12.1
nifi-influx-database-nar-1.7.0.nar 1.11.4
nifi-influx-database-nar-1.6.0.nar 1.11.3
nifi-influx-database-nar-1.5.0.nar 1.11.1
nifi-influx-database-nar-1.4.nar 1.11
nifi-influx-database-nar-1.3.nar 1.10
nifi-influx-database-nar-1.2.nar 1.9
nifi-influx-database-nar-1.1.nar 1.9
nifi-influx-database-nar-1.0.nar 1.8

For example, to install the nar after download it to ~/Downloads:

$ cp ~/Downloads/nifi-influx-database-nar-1.1.nar $NIFI_HOME/lib

How To Use

PutInfluxDatabaseRecord

Uses a specified RecordReader to write the content of a FlowFile into InfluxDB database.

Features

  • Input can be any built-in or custom implemented NiFi RecordReader (json, avro, csv, InfluxLineProtocolReader...)
  • Configurable mapping between NiFi Records and InfluxDB measurement, field and tags
  • Configurable timestamp precision
  • Reusable connection settings (InfluxDB url, password) for more processors via InfluxDatabaseService controller
  • Advanced InfluxDB client settings
    • Gzip compression
    • Batching, jitter, flush settings

Mapping Records to InfluxDB Data Point

Measurement

The value is determined from the field in the Record Schema. If the field is not found in the schema then is used the value of Measurement property. Any data type is converted into a String type and used as the value.

Tags

The name of the field in the Record Schema is used as the key of the Tag. The value of the field is used as the value of the Tag. Any data type is converted into a String type and used as the Tag value see also handling complex types.

Timestamp

The value is determined from the field in the Record Schema. If the field is not found in the schema or field has not defined value the timestamp is not specified for the Data Point. The precision for the supplied time value is determined from the property Timestamp precision.

Behavior of handling complex types for Tags and Fields

The Apache NiFi complex Record fields are handled by different strategy:

  • Map - keys are mapped as keys of Tags or Fields, values are mapped as values of Tags or Fields
  • Choice - for the value is used the compatible type from Choice definition
  • Array - based on property the Complex Field Behavior
  • Record - based on property the Complex Field Behavior

Batching

Enabled batching will reduce reliability in the cost of better performance. The PutInfluxDatabaseRecord processor uses batching/buffering implemented in influxdb-java client. Processor can route flow file to the success relation before the batch buffer is flushed into the database. The batch buffer is stored in the system memory, so in the case of power failure or process kill, is content of buffer not written into InfluxDB.

Batching is useful when the flow file contains large number of records. Records are sent into InfluxDB in batching points with preconfigured size.

Properties

Property Description
Record Reader Specifies the Controller Service to use for parsing incoming data and determining the data's schema
InfluxDB Controller Service A controller service that provides connection to InfluxDB
Database Name InfluxDB database to connect to
Enable gzip compression Enable gzip compression for InfluxDB http request body
Log Level Controls the level of logging for the REST layer of InfluxDB client
Consistency Level InfluxDB consistency level
Retention Policy Retention policy for the saving the records
Enable InfluxDB batching Enabled batching speed up writes significantly but in the cost of loosing reliability. Flow file can be transfered to success releation before the batch buffer is flushed into database. For additional information see processor documentation.
Batch flush duration Flush at least every specified time
Batch actions The number of batch actions to collect
Batch flush jitter Jitters the batch flush interval by a random amount.
Batch flush buffer limit The client maintains a buffer for failed writes so that the writes will be retried later on.
Measurement The name of the measurement. If the Record contains a field with measurement property value, then value of the Record field is use as InfluxDB measurement
Tags A comma-separated list of record fields stored in InfluxDB as 'tag'
Missing Tag Behavior If the specified tag is not present in the document, this property specifies how to handle the situation.
Fields A comma-separated list of record fields stored in InfluxDB as 'field'. At least one field must be defined
Missing Field Behavior If the specified field is not present in the document, this property specifies how to handle the situation
Timestamp field A name of the record field that used as a 'timestamp'
Timestamp precision The timestamp precision is ignore when the 'Timestamp field' value is 'java.util.Date'
Complex Field Behavior Indicates how to handle complex fields, i.e. fields that do not have a primitive value
Null Values Behavior Indicates how to handle null fields, i.e. fields that do not have a defined value
Max size of records Maximum size of records allowed to be posted in one batch

Relationships

Property Description
success All FlowFiles that are written into InfluxDB are routed to this relationship
retry A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed.
failure All FlowFiles that cannot be written to InfluxDB are routed to this relationship

PutInfluxDatabaseRecord_2

Uses a specified RecordReader to write the content of a FlowFile into InfluxDB 2.0 database.

Features

  • Input can be any built-in or custom implemented NiFi RecordReader (json, avro, csv, InfluxLineProtocolReader...)
  • Configurable mapping between NiFi Records and InfluxDB measurement, field and tags
  • Configurable timestamp precision
  • Reusable connection settings (InfluxDB url, password) for more processors via InfluxDatabaseService_2 controller
  • Advanced InfluxDB client settings
    • Gzip compression

Mapping Records to InfluxDB Data Point

Measurement

The value is determined from the field in the Record Schema. If the field is not found in the schema then is used the value of Measurement property. Any data type is converted into a String type and used as the value.

Tags

The name of the field in the Record Schema is used as the key of the Tag. The value of the field is used as the value of the Tag. Any data type is converted into a String type and used as the Tag value see also handling complex types.

Timestamp

The value is determined from the field in the Record Schema. If the field is not found in the schema or field has not defined value the timestamp is not specified for the Data Point. The precision for the supplied time value is determined from the property Timestamp precision.

Behavior of handling complex types for Tags and Fields

The Apache NiFi complex Record fields are handled by different strategy:

  • Map - keys are mapped as keys of Tags or Fields, values are mapped as values of Tags or Fields
  • Choice - for the value is used the compatible type from Choice definition
  • Array - based on property the Complex Field Behavior
  • Record - based on property the Complex Field Behavior

Properties

Property Description
Record Reader Specifies the Controller Service to use for parsing incoming data and determining the data's schema
InfluxDB Controller Service A controller service that provides connection to InfluxDB
Bucket Specifies the destination bucket for writes
Organization Specifies the destination organization for writes
Enable gzip compression Enable gzip compression for InfluxDB http request body
Log Level Controls the level of logging for the REST layer of InfluxDB client
Measurement The name of the measurement. If the Record contains a field with measurement property value, then value of the Record field is use as InfluxDB measurement
Tags A comma-separated list of record fields stored in InfluxDB as 'tag'
Missing Tag Behavior If the specified tag is not present in the document, this property specifies how to handle the situation.
Fields A comma-separated list of record fields stored in InfluxDB as 'field'. At least one field must be defined
Missing Field Behavior If the specified field is not present in the document, this property specifies how to handle the situation
Timestamp field A name of the record field that used as a 'timestamp'
Timestamp precision The timestamp precision is ignore when the 'Timestamp field' value is 'java.util.Date'
Complex Field Behavior Indicates how to handle complex fields, i.e. fields that do not have a primitive value
Null Values Behavior Indicates how to handle null fields, i.e. fields that do not have a defined value
Max size of records Maximum size of records allowed to be posted in one batch

Relationships

Property Description
success All FlowFiles that are written into InfluxDB are routed to this relationship
retry A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed.
failure All FlowFiles that cannot be written to InfluxDB are routed to this relationship

InfluxLineProtocolReader

Parses the InfluxDB Line Protocol into NiFi Record. This allows processing, filtering and partitioning data in NiFi obtained from Telegraf agents, IoT devices, InfluxDB subscriptions and other InfluxDB Line protocol devices.

Properties

Property Description
Character Set The Character Encoding that is used to decode the Line Protocol data

InfluxDatabaseService

Allows sharing connection configuration to InfluxDB 1.x among more NiFi processors. Also support a SSL connection.

Properties

Property Description
SSL Context Service The SSL Context Service used to provide client certificate information for TLS/SSL connections
Client Auth The client authentication policy to use for the SSL Context. Only used if an SSL Context Service is provided.
InfluxDB connection URL InfluxDB URL to connect to. Eg: http://influxdb:8086
InfluxDB Max Connection Time Out The maximum time for establishing connection and reading/writing to the InfluxDB
Username Username which is used to authorize against the InfluxDB
Password Password for the username which is used to authorize against the InfluxDB. If the authorization fail the FlowFile will be penalized and routed to 'retry' relationship.

InfluxDatabaseService_2

Allows sharing connection configuration to InfluxDB 2.0 among more NiFi processors. Also support a SSL connection.

Properties

Property Description
SSL Context Service The SSL Context Service used to provide client certificate information for TLS/SSL connections
Client Auth The client authentication policy to use for the SSL Context. Only used if an SSL Context Service is provided.
InfluxDB connection URL InfluxDB URL to connect to. Eg: http://influxdb:8086
InfluxDB Max Connection Time Out The maximum time for establishing connection and reading/writing to the InfluxDB
InfluxDB Access Token Access Token used for authenticating/authorizing the InfluxDB request sent by NiFi.

PutInfluxDatabase

Processor to write the content of a FlowFile in 'line protocol'. Please check details of the 'line protocol' in InfluxDB documentation (https://www.influxdb.com/). The flow file can contain single measurement point or multiple measurement points separated by line seperator. The timestamp precision is defined by Timestamp property. If you do not specify precision then the InfluxDB assumes that timestamps are in nanoseconds.

Properties

Property Description
Database Name InfluxDB database to connect to
InfluxDB connection URL InfluxDB URL to connect to. Eg: http://influxdb:8086
InfluxDB Max Connection Time Out The maximum time for establishing connection and reading/writing to the InfluxDB
Username Username for accessing InfluxDB
Password Password for user
Character Set Specifies the character set of the document data
Consistency Level InfluxDB consistency level
Retention Policy Retention policy for the saving the records
Timestamp precisions The precision of the time stamps. InfluxDB assumes that timestamps are in nanoseconds if you do not specify precision
Max size of records Maximum size of records allowed to be posted in one batch

PutInfluxDatabase_2

Processor to write the content of a FlowFile in 'line protocol'. Please check details of the 'line protocol' in InfluxDB 2.0 documentation (https://www.influxdb.com/). The flow file can contain single measurement point or multiple measurement points separated by line separator. The timestamp precision is defined by Timestamp property.

Properties

Property Description
InfluxDB Controller Service A controller service that provides connection to InfluxDB
Bucket Specifies the destination bucket for writes
Organization Specifies the destination organization for writes
Timestamp precisions The precision of the time stamps
Enable gzip compression Enable gzip compression for InfluxDB http request body
Log Level Controls the level of logging for the REST layer of InfluxDB client
Character Set Specifies the character set of the document data
Max size of records Maximum size of records allowed to be posted in one batch

GetInfluxDatabase_2

Creates FlowFiles from records in InfluxDB 2.0 loaded by a user-specified Flux query.

Property Description
InfluxDB Controller Service A controller service that provides connection to InfluxDB
Organization Specifies the source organization
Query A valid Flux query to use to execute against InfluxDB
Dialect Header If true, the results will contain a header row
Dialect Delimiter Separator between cells; the default is ","
Dialect Annotations Describing properties about the columns of the table. More than one can be supplied if comma separated. Allowable values: "group", "datatype", "default".
Dialect Comment Prefix Character prefixed to comment strings.
Dialect Date Time Format Format of timestamps.
Results Per FlowFile How many records to put into a FlowFile at once. The whole body will be treated as a CSV file.
Enable gzip compression Enable gzip compression for InfluxDB http request body
Log Level Controls the level of logging for the REST layer of InfluxDB client

GetInfluxDatabaseRecord_2

A record-based version of GetInfluxDatabase_2 that uses the Record writers to write the Flux result set.

Property Description
InfluxDB Controller Service A controller service that provides connection to InfluxDB
Record Writer The record writer to use to write the result sets
Organization Specifies the source organization
Query A valid Flux query to use to execute against InfluxDB
Dialect Date Time Format Format of timestamps.
Results Per FlowFile How many records to put into a FlowFile at once. The whole body will be treated as a set of Records.
Enable gzip compression Enable gzip compression for InfluxDB http request body
Log Level Controls the level of logging for the REST layer of InfluxDB client

InfluxLineProtocolRecordSetWriter

Writes the contents of a RecordSet as Line Protocol. The configured writer is able to make Line Protocol by the Expression Language that reference each of the fields that are available in a Record. Each record in the RecordSet will be separated by a single newline character. The Record Schema is read from the incoming FlowFile.

Property Description
Measurement The name of the measurement. If the Record contains a field with measurement property value, then value of the Record field is use as InfluxDB measurement
Tags A comma-separated list of record fields stored in InfluxDB as 'tag'
Missing Tag Behavior If the specified tag is not present in the document, this property specifies how to handle the situation
Fields A comma-separated list of record fields stored in InfluxDB as 'field'. At least one field must be defined
Missing Field Behavior If the specified field is not present in the document, this property specifies how to handle the situation
Timestamp field A name of the record field that used as a 'timestamp'
Timestamp precision The timestamp precision is ignore when the 'Timestamp field' value is 'java.util.Date'
Complex Field Behavior Indicates how to handle complex fields, i.e. fields that do not have a primitive value
Null Values Behavior Indicates how to handle null fields, i.e. fields that do not have a defined value
Character Set The Character Encoding that is used to encode/decode the Line Protocol

Demo

How to start

The demo requires Docker Engine, GNU gzip and curl on classpath.

  1. Download and unpack sources: download ZIP
  2. Run start script from the source directory:
    ./scripts/nifi-restart.sh
  3. Open Apache NiFi flow in browser: http://localhost:8080/nifi/
  4. Open Telegraf Dashboards in browser: Twitter, NiFi Container or NiFi Logs

Processing metrics in NiFI

This example show how to process structured metrics from Telegraf in NiFi.

The Telegraf send metrics into NiFi using SocketWriter output plugin. Metrics data are sent as InfluxDB’s Line Protocol. The NiFi parse Line Protocol through the org.influxdata.nifi.serialization.InfluxLineProtocolReader and allow user to process data with Record processors (SplitRecord, UpdateRecord, ValidateRecord, ...).

NiFi flow

The metrics from monitoring Docker containers are filtered in the NiFi. NiFi container metrics are stored in InfluxDB and metrics from other containers are logged.

  1. ListenTelegraf - Listens for incoming TCP connections and transform incoming Line Protocol to NiFi Record
  2. PartitionRecord - Group incoming records by container name
  3. RouteOnAttribute - Routes incoming container metrics: NiFi container metrics are routed to PutInfluxDatabaseRecord other metrics to LogAttribute
  4. PutInfluxDatabaseRecord - Writes NiFi container metrics to the InfluxDB
  5. PutInfluxDatabaseRecord_2 - Writes NiFi container metrics to the InfluxDB 2.0
  6. LogAttribute - Log metrics that aren't written to the InfluxDB

Result

The InfluxDB has a database telegraf_nifi_demo with measurements:

show measurements

name: measurements
name
----
docker_container_blkio
docker_container_cpu
docker_container_mem
docker_container_net
docker_container_status

For example the docker_container_status measurement contains:

select * from docker_container_status 

name: docker_container_status
time                container_image container_name container_status container_version engine_host           exitcode host         maintainer                        oomkilled pid   server_version site                    started_at
----                --------------- -------------- ---------------- ----------------- -----------           -------- ----         ----------                        --------- ---   -------------- ----                    ----------
1550148042000000000 nifi            nifi           running          unknown           linuxkit-025000000001 0        0c79c2e451ca Apache NiFi <[email protected]> false     43685 18.09.1        https://nifi.apache.org 1550147980248481800
1550148052000000000 nifi            nifi           running          unknown           linuxkit-025000000001 0        0c79c2e451ca Apache NiFi <[email protected]> false     43685 18.09.1        https://nifi.apache.org 1550147980248481800
1550148062000000000 nifi            nifi           running          unknown           linuxkit-025000000001 0        0c79c2e451ca Apache NiFi <[email protected]> false     43685 18.09.1        https://nifi.apache.org 1550147980248481800
1550148072000000000 nifi            nifi           running          unknown           linuxkit-025000000001 0        0c79c2e451ca Apache NiFi <[email protected]> false     43685 18.09.1        https://nifi.apache.org 1550147980248481800
1550148082000000000 nifi            nifi           running          unknown           linuxkit-025000000001 0        0c79c2e451ca Apache NiFi <[email protected]> false     43685 18.09.1        https://nifi.apache.org 1550147980248481800
...

Write LineProtocol to multiple storage

This example show how to store NiFi Records as a LineProtocol into multiple environments: InfluxDB, InfluxDB 2.0 and Kafka.

NiFi flow

InfluxLineProtocolRecordSetWriter settings

Expose data from InfluxDB 2.0 on the particular HTTP endpoint

This example show how to exposing InfluxDB 2.0 data by NiFi.

GetInfluxDatabase2 configuration

The processor is configured to invoke static flux query:

from(bucket: "my-bucket")
  |> range(start: 0)
  |> filter(fn: (r) => r._measurement == "tweets")
  |> drop(columns: ["keyword", "lang", "user_verified"])
  |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
  |> limit(n:1)
  |> keep(columns: ["tweet_id", "screen_name", "text"])

The result is mapped as CSV and returned as a response to incoming HTTP request.

GetInfluxDatabaseRecord_2 configuration

The processor invoke a flux query that is pass as a http query parameter:

curl -i -X GET -G http://localhost:8234 \
 --data-urlencode 'accept=xml'  \
 --data-urlencode 'query=from(bucket: "my-bucket")
 |> range(start: 0) |> filter(fn: (r) => r._measurement == "docker_container_status")
 |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
 |> limit(n:10, offset: 0)'

The result is mapped to format that is specified in request accept parameter.

Contributing

If you would like to contribute code you can do through GitHub by forking the repository and sending a pull request into the master branch.

License

InfluxDB Processors For Apache NiFi are released under the Apache License, Version 2.0.

nifi-influxdb-bundle's People

Contributors

alespour avatar bednar avatar dependabot[bot] avatar powersj avatar rhajek avatar timhallinflux avatar tysonkamp avatar ulbi avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

nifi-influxdb-bundle's Issues

Add support for InfluxDB v2.0

  • InfluxDatabaseService_2 - The controller service providing client connection to InfluxDB 2.0.
  • PutInfluxDatabase_2 - Processor to write the content of a FlowFile in 'line protocol' into InfluxDB 2.0.
  • PutInfluxDatabaseRecord_2 - processor uses a specified RecordReader to write the content of a FlowFile into InfluxDB 2.0.
  • GetInfluxDatabase_2 - Creates FlowFiles from records in InfluxDB 2.0 loaded by a user-specified Flux query.
  • GetInfluxDatabaseRecord_2 - A record-based version of GetInfluxDatabase_2 that uses the Record writers to write the Flux result set.

GetInfluxDatabase_2 10 seconds timeout problem

Hi there,

I'm experiencing a problem with the processor GetInfluxDatabase_2 in the 1.24.0 bundle version. The NiFi version is 1.19.0.

The Flux query that I'm sending takes more than 10 seconds, but after 10 seconds of elaboration the request times out, logging the error

  • "Failed to execute Flux query due SocketTimeoutException to class Query"
  • "java.net.SocketTimeoutException: timeout Caused by: java.net.SocketException: Socket closed".

I tried to change the parameter "InfluxDB Max Connection Time Out (seconds)" in the corresponding StandardInfluxDatabaseService_2 controller service to 60 seconds (or higher), but it still times out after 10 seconds.

immagine

It seems like it doesn't see this parameter for the timeout.

Do you know what is possibly causing this issue? Am I missing something?

Thanks a lot for your support. Any help would be much appreciated!

Use case oriented README.md

In the introductory part README.md should contain structured by use cases:

Use case 1: In the Nifi collect metrics from outside world (using no telegraf) and write data into influxdb

Use case 2: Configure telegraf to produce data into Nifi in order to transform the records, such as to remove some irrelevant fields, fork traffic to write to multiple DBs

Use case 3 Use Nifi as data lossless middleman between Telegraf and InfluxDB.

  • back pressure
  • reliable queues
  • nifi cluster

Cannot use TLS connection with PutInfluxDatabase_2

I cannot get the connection to an InfluxDB2 server (via Apache Reverse Proxy with TLS and Client-Auth) to work.

I always get this error, when PutInfluxDatabase_2/StandardInfluxDatabaseService_2 tries to establish a connection:

2021-05-11 07:51:48,338 ERROR [Timer-Driven Process Thread-2] o.i.nifi.processors.PutInfluxDatabase_2 PutInfluxDatabase_2[id=fe4d3e03-0177-1000-218e-e4211ca049d3] Failed to insert into influxDB due to PK
IX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target: com.influxdb.exceptions.InfluxException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
com.influxdb.exceptions.InfluxException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
        at com.influxdb.internal.AbstractRestClient.execute(AbstractRestClient.java:84)
        at com.influxdb.client.internal.WriteApiBlockingImpl.write(WriteApiBlockingImpl.java:229)
...

However, the same StandardRestrictedSSLContextService with the same target URL does work, when used in a GetHTTP processor.

So I'm quite certain that the TLS configuration is correct.

Are there any known issues with the https client used by StandardInfluxDatabaseService_2 regarding TLS client authentication, multi level CA hierarchies etc.?

Time Precision from PutInfluxDatabaseRecord_2

I am using simple Nifi flow to try to get data into InfluxDB2.
Nifi: 1.12.1
Influs Processor: nifi-influx-database-nar-1.8.0.nar

The result if that the time precision I get is not in microseconds but it is in hours.
image

Wireshark trace
image

Data explorer
image
Note the measurement 2.39 has date precision only to the hour

Now with Curl...
curl -X POST http://[HOST]/api/v2/write?org=[ORG]&bucket=[BUCKET]&precision=us
-H "Authorization: Token "
-H "Content-Type: text/plain; charset=utf-8"
-H "User-Agent: influxdb-client-java/1.11.0"
-H "Connection: Keep-Alive"
-d "VesselHeading,channel=0,hostname=xxxxxx,pgn=127250 Heading=2.40 1619815206123456"

(different measurement name without space)
see the value 2.4 below
image

In both cases the date precision is to the hour

Please help - what am I doing wrong???

Java exception in PutInfluxDatabaseRecord_2 for UTC timestamp

Hi there,

we are currently setting up NiFi with Influxdb 2.7 and are trying to put a JSON based record into the database using "putinfluxdatabaserecord_2".

Processor Version: 1.25.0-SNAPSHOT

Data payload of the JSON can be stored if we do not supply our timestamp for the data. Whenever we reference the timestamp of the data reading in the processor we continue to get Java exceptions and the put-command fails. The below error is thrown when attempting to store with a UTC timestamp in NS. Changing to a different time precision and shorter UTC timestamp has no effect.

2023-09-29 13:05:20,289 ERROR [Timer-Driven Process Thread-4] o.i.n.p.PutInfluxDatabaseRecord_2 PutInfluxDatabaseRecord_2[id=cbe3b51c-018a-1000-6d5c-3ea160436312] influxdb.error.message
java.lang.NumberFormatException: For input string: "2023-08-16T08:40:00.000000000Z"

We have also tried to supply the timestamp in unixepoch time. In that scenario there is no exception but also no data transfer to the database.

This is the sample data JSON Output Claim

[{"deviceid":"00ALQ3VC","datapoint":"Spannung L1","timestamp":"2023-08-16T08:40:00.000000000Z","unit":"V","value":233.0}

Is this an error on our part, or is this a bug in the processor?

EDIT: I re-read the last closed issue regarding timestamps which seems to be the same problem we have, but I am unable to match the solution to our setup. The "timestamp" information is defined as type string in the jsonrecordsetwriter.

image

The timestamp itself is put together from separate date and time values and converted to UTC NS.
image

image

Thanks a lot for your support. Any help would be much appreciated!

PutInfluxDatabase timestamp precision

Currently it seems that PutInfluxDatabase only supports nano-seconds but InfluxDB API provides the possibility to define in the url precision=ms this doesn't seems to work on the Nifi process.

Documentation

The timestamp (last field) should be in nano-seconds resolution.

GetInfluxDatabaseRecord_2 Processor Fails to Copy Attributes on Splitting Result Sets

NiFi Version

1.23.2 - Processor version 1.25.0

Processor Name

GetInfluxDatabaseRecord_2

Description

The GetInfluxDatabaseRecord_2 processor in Apache NiFi exhibits inconsistent behavior in handling flow file attributes when splitting result sets. When the processor splits a large result set into chunks (e.g., of 1000 elements), it fails to copy the original flow file's attributes to the new flow files generated post-split. However, if there is no split (i.e., the result set is small enough to not require splitting), the processor retains the attributes as expected.

Expected Behavior

When the GetInfluxDatabaseRecord_2 processor splits a flow file, each new flow file resulting from the split should retain the attributes of the original flow file.

Actual Behavior

The processor only retains the attributes if no splitting occurs. In cases where the result set is large enough to be split into multiple flow files, the newly created flow files lack the original attributes.

Steps to Reproduce

  1. Configure a GetInfluxDatabaseRecord_2 processor to query an InfluxDB instance where the expected result set is large enough to require splitting (e.g., more than 1000 records).
  2. Ensure the processor is set to split the result set into chunks of 1000 elements.
  3. Observe the attributes of the original flow file and compare them with the attributes of the flow files generated post-split.

Example Scenario

  • A flow file with custom attributes (e.g., source: sensor_data, type: temperature) is passed to the GetInfluxDatabaseRecord_2 processor.
  • The processor is set to retrieve a large data set from InfluxDB, resulting in the data being split into multiple flow files.
  • The original flow file's attributes are not present in the new flow files created after the split.

Impact

This behavior impedes workflows where attribute retention across splits is crucial for subsequent processing steps.

Additional Context

  • A potential workaround involves using a combination of attribute caching strategies, but this adds complexity and overhead to the flow.

Errors in InfluxDB logs when using HTTPS connection from NiFi

I'm using the PutInfluxDatabaseRecord processor in a NiFi flow to pull metrics from a Kafka topic and then put into InfluxDB.

The InfluxDB instance is configured to with both SSL and LDAP, and in the InfluxDB Controller Service, I have used :

SSL Conext Service : No Value set
InfluxDB connection URL : https://myserver:8086
InfluxDB Max Connection Time Out : 0 seconds
Username : myuser
Password : mypassword

The data is flowing into influx, but it's incredibly slow, and if I look in the Influx DB logs, I see it getting hammered with these messages
ts=2021-03-25T14:54:48.075630Z lvl=info msg="{\"error\":\"unable to parse authentication credentials\"}" log_id=0T6H~k~1000 service=subscriber

If I disable the NiFi flow, the messages stop immediately.

So why does Influx complain about the data if it's getting inserted to the database, and are these messages slowing down the ingestion rate?

By comparison, I wrote a Python script to do the same thing (same user + endpoint) and that is way faster and doesn't generate the Info messages above

The all fields of FlowFile has null value

Hi!
I'm trying to insert data from a SCADA system to an Influx DB. But the only reaction I get is the message

2022-07-27 14:45:55,865 INFO [Timer-Driven Process Thread-5] o.i.n.p.PutInfluxDatabaseRecord_2 PutInfluxDatabaseRecord_2[id=3f45277b-0182-1000-60a1-dcd6d238c16a] The all fields of FlowFile=3f42a181-2cac-4110-a4f0-b0a288b67a32 has null value. There is nothing to store to InfluxDB

I'm using NiFi 1.12.1 on Windows Server 2019 and installed version 1.8.0. InfluxDB is on anither Windows 2019 Server in version 2.3.0.

From the SCADA system I got an JSON flow file:

{ "ext_id": "WuT_OPC_Connector.FEU-BS01.E.0", "tag_value_int": 258.2, "tag_tsp": "20220727124121.0000388", "status": "GOOD", "prevValue": 248.06 }

I'm using the PutInfluxDatabaseRecord_2 processor with a JSONPathReader as Record Reader and StandardInfluxDatabaseService_2 as InfluxDB Controller Service.

The StandardInfluxDatabaseService_2 configuration can be seen here:

image

The InfluxDB connection URL is http://SERVERNAME:8086 and the InfluxDB Access Token is correct.

The JSONPathReader configuration can be seen here:

image

The PutInfluxDatabaseRecord_2 configuration can bee seen here

image

and here

image

Does not work with Nifi 1.9.2

When I try to use the processor in the latest version of Nifi (1.9.2) it states that it is not compatible. This does work with 1.9.0

Uploading to central maven repository

Hey team

I found that the binaries are not yet in the central maven repo(Or it's already in there but I fail to find it?), which makes updating them kind of not convenient. would you consider put them into the maven repo in the future?

BR

How can I matchup timestamp at nifi

my timestamp format is '20190705073248967' such as yyyyMMddHHmmsssss.

I had tried convert this format to influxdb time.
But I could not convert it. Could you help to resolve this issue.

My conversion code is below.
Timetamp Field : ${REQ_TIME:toDate("yyyyMMddHHmmsssss"):toNumber()}

Not able to use JsonPathReader

Hi there - I'm running into issues using the JsonPathReader controller service with the PutInfluxDatabaseRecord_2 processor. Can you help me identify if I'm doing something wrong or if this is a bug?

FlowFile content:
{"m":"m_val", "f1":"f1_val", "t1":"t1_val"}

Flow:
image

PutInfluxDatabaseRecord_2 Settings:
Screen Shot 2022-01-04 at 3 26 12 PM

JsonPathReader settings:
image

Error:

nifi          | java.lang.IllegalStateException: Cannot write FlowFile to InfluxDB because the required field 'f' is not present in Record.
nifi          | 	at org.influxdata.nifi.processors.RecordToPointMapper.findRecordField(RecordToPointMapper.java:229)
nifi          | 	at org.influxdata.nifi.processors.RecordToPointMapper.lambda$mapFields$0(RecordToPointMapper.java:133)
nifi          | 	at java.util.ArrayList.forEach(ArrayList.java:1259)
nifi          | 	at org.influxdata.nifi.processors.RecordToPointMapper.mapFields(RecordToPointMapper.java:131)
nifi          | 	at org.influxdata.nifi.processors.RecordToPointMapper.mapRecord(RecordToPointMapper.java:108)
nifi          | 	at org.influxdata.nifi.processors.RecordToPointMapper.mapRecordV2(RecordToPointMapper.java:102)
nifi          | 	at org.influxdata.nifi.processors.internal.FlowFileToPointMapperV2.mapRecord(FlowFileToPointMapperV2.java:82)
nifi          | 	at org.influxdata.nifi.processors.internal.AbstractFlowFileToPointMapper.mapInputStream(AbstractFlowFileToPointMapper.java:128)
nifi          | 	at org.influxdata.nifi.processors.internal.AbstractFlowFileToPointMapper.mapFlowFile(AbstractFlowFileToPointMapper.java:85)
nifi          | 	at org.influxdata.nifi.processors.internal.FlowFileToPointMapperV2.addFlowFile(FlowFileToPointMapperV2.java:75)
nifi          | 	at org.influxdata.nifi.processors.PutInfluxDatabaseRecord_2.onTrigger(PutInfluxDatabaseRecord_2.java:169)
nifi          | 	at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
nifi          | 	at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1202)
nifi          | 	at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:214)
nifi          | 	at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:103)
nifi          | 	at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
nifi          | 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
nifi          | 	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
nifi          | 	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
nifi          | 	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
nifi          | 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
nifi          | 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
nifi          | 	at java.lang.Thread.run(Thread.java:748)

JsonTreeReader controller service works fine but I'm wanting to use JsonPathReader so I can query properties in a complex json object.

And according to this comment I believe this should be possible.

This feels like a pretty trivial example so I'm hoping someone can point out what I'm doing wrong - thanks!

NiFi Version: 1.14.0

Question about using PutInfluxDatabaseRecord

Hi Guys,
I am doing some nifi extraction and are trying to work out how to do something with a nested json array

I have a json array like this
{"device":"JPE19151536","interface":"Port-Channel3.2512","key":"Port-Channel3.2512","rates":{"inBitsRate":{"value":4.687787409031554},"inPktsRate":{"value":0.007918559808362655},"outBitsRate":{"value":1.1345431741251526E-119},"outPktsRate":{"value":3.972490105480225E-123},"statsUpdateTime":1.602638448008944E9},"statistics":{"inOctets":4297765,"outOctets":336472,"inErrors":0,"outErrors":0,"inDiscards":0,"outDiscards":0,"inTotalPkts":0,"inUcastPkts":51714,"outUcastPkts":1085,"inBroadcastPkts":0,"inMulticastPkts":0,"outBroadcastPkts":0,"outMulticastPkts":0,"lastUpdate":1.602638448008944E9}}

I then converted the data to an avro record with a schema ready to write to influx after schema is applied the data looks like this
in the flow file with the formatted option applied. excuse the different record data in in example from the first step

[ {
  "device" : "JPE19151536",
  "interface" : "Ethernet12.1202",
  "key" : "Ethernet12.1202",
  "rates" : {
    "inBitsRate" : {
      "value" : 0
    },
    "inPktsRate" : {
      "value" : 0
    },
    "outBitsRate" : {
      "value" : 0
    },
    "outPktsRate" : {
      "value" : 0
    },
    "statsUpdateTime" : 1.60263846E9
  },
  "statistics" : {
    "inOctets" : 0,
    "outOctets" : 0,
    "inErrors" : 0,
    "outErrors" : 0,
    "inDiscards" : 0,
    "outDiscards" : 0,
    "inTotalPkts" : 0,
    "inUcastPkts" : 0,
    "outUcastPkts" : 0,
    "inBroadcastPkts" : 0,
    "inMulticastPkts" : 0,
    "outBroadcastPkts" : 0,
    "outMulticastPkts" : 0,
    "lastUpdate" : 1.60263846E9
  }
} ]

I then want to write the statistics.inOctets and many others as fields in the PutInfluxDatabaseRecord process

if I add device,interface,key as tags in the Tags property then tags get written to influx correctly.

But when it comes to fields if I try to use anything from statistics or rates I get item not present in record . Not sure if I am referencing the fields incorrectly or its an issue referencing a sub object of the data I have tried statistics.Octets $.statistics.Octets

Avro Schema looks like this

{
  "name": "AristaPerfInterface",
  "type": "record",
  "namespace": "com.acme.avro",
  "fields": [
    {
      "name": "device",
      "type": "string"
    },
    {
      "name": "interface",
      "type": "string"
    },
    {
      "name": "key",
      "type": "string"
    },
    {
      "name": "rates",
      "type": {
        "name": "rates",
        "type": "record",
        "fields": [
          {
            "name": "inBitsRate",
            "type": {
              "name": "inBitsRate",
              "type": "record",
              "fields": [
                {
                  "name": "value",
                  "type": "int"
                }
              ]
            }
          },
          {
            "name": "inPktsRate",
            "type": {
              "name": "inPktsRate",
              "type": "record",
              "fields": [
                {
                  "name": "value",
                  "type": "int"
                }
              ]
            }
          },
          {
            "name": "outBitsRate",
            "type": {
              "name": "outBitsRate",
              "type": "record",
              "fields": [
                {
                  "name": "value",
                  "type": "int"
                }
              ]
            }
          },
          {
            "name": "outPktsRate",
            "type": {
              "name": "outPktsRate",
              "type": "record",
              "fields": [
                {
                  "name": "value",
                  "type": "int"
                }
              ]
            }
          },
          {
            "name": "statsUpdateTime",
            "type": "float"
          }
        ]
      }
    },
    {
      "name": "statistics",
      "type": {
        "name": "statistics",
        "type": "record",
        "fields": [
          {
            "name": "inOctets",
            "type": "int"
          },
          {
            "name": "outOctets",
            "type": "int"
          },
          {
            "name": "inErrors",
            "type": "int"
          },
          {
            "name": "outErrors",
            "type": "int"
          },
          {
            "name": "inDiscards",
            "type": "int"
          },
          {
            "name": "outDiscards",
            "type": "int"
          },
          {
            "name": "inTotalPkts",
            "type": "int"
          },
          {
            "name": "inUcastPkts",
            "type": "int"
          },
          {
            "name": "outUcastPkts",
            "type": "int"
          },
          {
            "name": "inBroadcastPkts",
            "type": "int"
          },
          {
            "name": "inMulticastPkts",
            "type": "int"
          },
          {
            "name": "outBroadcastPkts",
            "type": "int"
          },
          {
            "name": "outMulticastPkts",
            "type": "int"
          },
          {
            "name": "lastUpdate",
            "type": "float"
          }
        ]
      }
    }
  ]
}

Would appreciate some advice thanks
Phil

Is is possible to use flowfile attribure as influxdb tags, fileds?

when creating the lineprotocol record of influxdb, i would like to use Use attribute of the flowfile
However, if you use the attribute of flowfile at making lineprotocol, cant not find the variable.

Is it possible to use the attribute of flowfile as a variable of lineprotocol?

Converting time

Hi,

it looks, I have a problem with time-conversion.
Input JSON :
only the date part:
"ActiveFromUtc":"2021-05-28T17:45:45.950Z"}
As I see, in Influx docs, precision ms means, that it will change the last numbers to 000, but not trimming it, so the timestamps should be sent with NS length. I set it to NS, in Putinfluxdb2 processor, but in the log it's always ms. I think it comes from the Jsontreereader, as I configured it like:
yyyy-MM-dd'T'HH:mm:ss.SSS'Z'
and also tried:
yyyy-MM-dd'T'HH:mm:ss.SSSSSSSSS'Z'
No luck.
Could you please help on this?

PutInfluxDatabaseRecord_2[id=76cf35a8-ae9d-1396-b2c1-3b5bcf0e026b] InfluxDB connection created for host https://influxdb-kafka.com:/

6:46:08.670 PM | INFO | OkHttpClient | --> POST https://influxdb-kafka.com/api/v2/write?org=asw&amp;bucket=kafka2&amp;precision=ms
6:46:08.670 PM | INFO | OkHttpClient | Content-Type: text/plain; charset=utf-8
6:46:08.670 PM | INFO | OkHttpClient | Content-Length: 76
6:46:08.670 PM | INFO | OkHttpClient | Content-Encoding: identity
6:46:08.670 PM | INFO | OkHttpClient | Accept: application/json
6:46:08.670 PM | INFO | OkHttpClient | User-Agent: influxdb-client-java/1.11.0
6:46:08.670 PM | INFO | OkHttpClient |  
6:46:08.670 PM | INFO | OkHttpClient | Test_sis,Type.ack=test_key_13 CustomId=7i 1622223945950
6:46:08.670 PM | INFO | OkHttpClient | --> END POST (76-byte body)
6:46:08.845 PM | INFO | OkHttpClient | <-- 204 No Content https://influxdb-kafka.com/api/v2/write?org=asw&amp;bucket=kafka2&amp;precision=ms (175ms)
6:46:08.846 PM | INFO | OkHttpClient | Date: Fri, 28 May 2021 16:46:08 GMT
6:46:08.846 PM | INFO | OkHttpClient | Connection: keep-alive
6:46:08.846 PM | INFO | OkHttpClient | <-- END HTTP

Create end-to-end test

The test will confirm that the processors are successfully used in Apache NiFi and also works correctly.

Improve CONTRIBUTING.md

Add sections to CONTRIBUTING.md:

  • Getting the source
  • Cloning a fork
  • Build and Test

Add GitHub template for Pull Requests:

Closes #

_Briefly describe your proposed changes:_

  - [ ] CHANGELOG.md updated
  - [ ] Rebased/mergeable
  - [ ] Tests pass
  - [ ] Sign [CLA](https://influxdata.com/community/cla/) (if not already signed)

timestamp field issues - PutInfluxDatabaseRecord_2

Processor Version: 1.25.0-SNAPSHOT

i seem to have an issue when using NiFI and Influx DB.

I am using the UpdateAttribute processor to convert epoch time to a format which will be readable by the Influx DB processor:

time
2023-09-12 15:55:01.242Z
No value set
timestamp
2023-09-12 15:55:01.242Z
1694534101242 (previous)

both attributes are the same in terms of value against the property:

${timestamp:formatInstant("yyyy-MM-dd HH:mm:ss.SSS'Z'", "GMT")}

the only difference between time and timestamp is that I have a field called timestamp within my json data:

{"timestamp":1694534104820,...}

its very strange as if the time field name follows the attribute "time" it sends data to the DB, and if I use the attribute "timestamp" it does not

for further information, if I use the attribute "timestamp" and its old buffered data it drops the data and gives the following error:

15:49:31 UTC  ERROR 
PutInfluxDatabaseRecord_2[id=018a1001-f95d-1a8c-7c43-630db8f73944] Failed to insert into influxDB due to HTTP status code: 422; Message: failure writing points to database: partial write: points beyond retention policy dropped=1: com.influxdb.exceptions.UnprocessableEntityException: HTTP status code: 422; Message: failure writing points to database: partial write: points beyond retention policy dropped=1

which again makes no sense as the buckets retention is set to forever:

Screenshot from 2023-09-12 17-11-43

then to make things even weirder if the data is no longer buffered and is a live stream, i get the following issue where it both sends and drops the data, but ultimately the influx processor is dropping the data:

Screenshot from 2023-09-12 17-17-24

my goal is to ensure that the json data is pushed to Influx using the timestamp of the data itself and not the time in which influx receives the data

as you can see the timestamp in which influx is processing the data can differ to that of when data has to be buffered due to a high volume of data that is being processed by NiFi:

image

but why would it send then drop the data when using the "timestamp" attribute as the time field vs using the "time" attribute as the time field when they are one and the same value, but as mentioned before the only difference is that I have a timestamp field in my json data

could it be that the timestamp field is still in epoch format within the json data?

and if so do you have any ideas how I can update that field - I was thinking that was what the UpdateAttribute processor was meant to do

any help would be appreciated :)

Question Regarding PutInfluxDatabaseRecord_2 1.10.0

Hi Guys,
I have been using PutInfluxDatabaseRecord_Version 1.10 for quite some time using the batch feature.

I notice this now appears to be gone in version 2.x. Just wondering what is the best way to batch up a lot of data from nifi.

I have an avro schema, I tried using MergeRecord and merged 10 records together in an array then sent the merged flow file array to the PutInfluxDatabaseRecord when I look in the nifi logs it looks like it still only writes one record at a time.

2021-05-10 11:40:36,195 INFO [Timer-Driven Process Thread-6] okhttp3.OkHttpClient --> POST http://testserver:8086/api/v2/write?org=mycomp&bucket=nifibkt&precision=ns
2021-05-10 11:40:36,195 INFO [Timer-Driven Process Thread-6] okhttp3.OkHttpClient Content-Type: text/plain; charset=utf-8
2021-05-10 11:40:36,195 INFO [Timer-Driven Process Thread-6] okhttp3.OkHttpClient Content-Length: 1104
2021-05-10 11:40:36,195 INFO [Timer-Driven Process Thread-6] okhttp3.OkHttpClient Content-Encoding: identity
2021-05-10 11:40:36,195 INFO [Timer-Driven Process Thread-6] okhttp3.OkHttpClient Accept: application/json
2021-05-10 11:40:36,195 INFO [Timer-Driven Process Thread-6] okhttp3.OkHttpClient User-Agent: influxdb-client-java/1.11.0
2021-05-10 11:40:36,195 INFO [Timer-Driven Process Thread-6] okhttp3.OkHttpClient
2021-05-10 11:40:36,195 INFO [Timer-Driven Process Thread-6] okhttp3.OkHttpClient net-linkerrors,device=JPE19350550,hostname=xxxx-fab01,location=xxxx,metric_type=linkerrors,port=Ethernet22/4,state=SA alignmentErrors_avg=0.0,alignmentErrors_max=0.0,alignmentErrors_min=0.0,alignmentErrors_stddev=0.0,alignmentErrors_weight=1.0,fcsErrors_avg=0.0,fcsErrors_max=0.0,fcsErrors_min=0.0,fcsErrors_stddev=0.0,fcsErrors_weight=1.0,frameTooLongs_avg=0.0,frameTooLongs_max=0.0,frameTooLongs_min=0.0,frameTooLongs_stddev=0.0,frameTooLongs_weight=1.0,frameTooShorts_avg=0.0,frameTooShorts_max=0.0,frameTooShorts_min=0.0,frameTooShorts_stddev=0.0,frameTooShorts_weight=1.0,inDiscards_avg=0.0,inDiscards_max=0.0,inDiscards_min=0.0,inDiscards_stddev=0.0,inDiscards_weight=1.0,inErrors_avg=0.0,inErrors_max=0.0,inErrors_min=0.0,inErrors_stddev=0.0,inErrors_weight=1.0,outDiscards_avg=0.0,outDiscards_max=0.0,outDiscards_min=0.0,outDiscards_stddev=0.0,outDiscards_weight=1.0,outErrors_avg=0.0,outErrors_max=0.0,outErrors_min=0.0,outErrors_stddev=0.0,outErrors_weight=1.0,symbolErrors_avg=0.0,symbolErrors_max=0.0,symbolErrors_min=0.0,symbolErrors_stddev=0.0,symbolErrors_weight=1.0 1619158166290763005
2021-05-10 11:40:36,196 INFO [Timer-Driven Process Thread-6] okhttp3.OkHttpClient --> END POST (1104-byte body)

I also sometimes see this in the Nifi logs which I am not sure why this is happening as all flow files have content No Content Packets

2021-05-10 11:40:36,181 INFO [Timer-Driven Process Thread-6] okhttp3.OkHttpClient <-- 204 No Content http://testserver:8086/api/v2/write?org=vocus&bucket=nifibkt&precision=ns (2ms)
2021-05-10 11:40:36,181 INFO [Timer-Driven Process Thread-6] okhttp3.OkHttpClient Date: Mon, 10 May 2021 01:40:36 GMT
2021-05-10 11:40:36,181 INFO [Timer-Driven Process Thread-6] okhttp3.OkHttpClient <-- END HTTP

Line Protocol Writer

Currently we have a LineProtocolReader Service, but it would be helpful to also have a LineProtocolWriter Service.

There are many cases where I may wish to write Line Protocol directly out of my Nifi Flow say to Kafka or another downstream listener without having to convert back ad forth to Avro, CSV or JSON.

NiFi does not start in Docker

Hi, I am using NiFi 1.12.0 running in docker. I've tried different versions of NAR (1.9.0-SNAPSHOT, 1.8.0, 1.0), but they all lead to an exception when starting NiFi that stops NiFi.

2020-11-02 12:55:38,373 ERROR [main] org.apache.nifi.NiFi Failure to launch NiFi due to java.util.ServiceConfigurationError: org.apache.nifi.controller.ControllerService: Provider org.influxdata.nifi.services.StandardInfluxDatabaseService could not be instantiated
java.util.ServiceConfigurationError: org.apache.nifi.controller.ControllerService: Provider org.influxdata.nifi.services.StandardInfluxDatabaseService could not be instantiated
        at java.util.ServiceLoader.fail(ServiceLoader.java:232)
        at java.util.ServiceLoader.access$100(ServiceLoader.java:185)
        at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:384)
        at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
        at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
        at org.apache.nifi.nar.StandardExtensionDiscoveringManager.loadExtensions(StandardExtensionDiscoveringManager.java:156)
        at org.apache.nifi.nar.StandardExtensionDiscoveringManager.discoverExtensions(StandardExtensionDiscoveringManager.java:131)
        at org.apache.nifi.nar.StandardExtensionDiscoveringManager.discoverExtensions(StandardExtensionDiscoveringManager.java:117)
        at org.apache.nifi.web.server.JettyServer.start(JettyServer.java:1048)
        at org.apache.nifi.NiFi.<init>(NiFi.java:158)
        at org.apache.nifi.NiFi.<init>(NiFi.java:72)
        at org.apache.nifi.NiFi.main(NiFi.java:301)
Caused by: java.lang.NoClassDefFoundError: org/apache/nifi/ssl/SSLContextService$ClientAuth
        at java.lang.Class.getDeclaredConstructors0(Native Method)
        at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671)
        at java.lang.Class.getConstructor0(Class.java:3075)
        at java.lang.Class.newInstance(Class.java:412)
        at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:380)
        ... 9 common frames omitted
Caused by: java.lang.ClassNotFoundException: org.apache.nifi.ssl.SSLContextService$ClientAuth
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
        ... 14 common frames omitted
2020-11-02 12:55:38,374 INFO [Thread-1] org.apache.nifi.NiFi Initiating shutdown of Jetty web server...
2020-11-02 12:55:38,374 INFO [Thread-1] org.apache.nifi.NiFi Jetty web server shutdown completed (nicely or otherwise). 

I use docker cp command to copy corresponding NAR to /opt/nifi/nifi-current/lib folder

PutInfluxDatabaseRecord_2 unable to write records with certain timestamps

Hi all,

I have a PutInfluxDatabaseRecord_2 processor configured with a JSONTreeReader. The JSON records I'm trying to write are very simple e.g.

{"ts":"1667496077593", "temp":12.23, "uid":"1234"}
{"ts":"1667496295251", "temp":11.12, "uid":"1234"}

The examples above write to my influxdb just fine. However, if I try to write records that contain timestamps with several zeros at the end. The data never appears in database, enough though the flowfile transitions to the success path. For example, the following records will not write to the database.

{"ts":"1667498400000", "temp":14.3, "uid":"1234"}
{"ts":"1667509200000", "temp":14.3, "uid":"1234"}
{"ts":"1667520000000", "temp":14.3, "uid":"1234"}

This is my config:
Screenshot 2022-11-03 at 17 29 11

This is my avro schema for the JSONTreeReader:

{
  "name": "ee",
  "type": "record",
  "namespace": "com.phyre.enviro.e.avro",
  "fields": [
	{
      "name": "uid",
      "type": "string"
    },
    {
      "name": "temp",
      "type": "float"
    },
    {
      "name": "ts",
      "type": "string"
    }  
  ]
}

I suspect this might have something to do with how Java handles scientific notation conversion. I've tried the ts as a long and a string but have the same problem either way.

I've also tried with a nanosecond timestamp and I still get the same issue.

I'm using version 1.17.0, as I'm unable to update our NiFi service at the moment but looking at the latest versions of these processors, I don't see any bug fix for this so would assume it applies to later versions.

Thanks,
Mike

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.