Description
This document will present a new solution to deal with messages/events routing inside the built environment.
Reason
Right now the created environment uses Fanout Exchanges to publish events and Queues to subscribe to messages.
The problem with this approach is that if, for example, we want to collect all events about unknown data (the one sent by any Data Gateway) that service would need to subscribe to n exchanges.
Solution
Changing to Topic Exchanges may be the solution to the problem above presented. Each Data Gateway would simply publish in an Exchange with the following pattern data.unknown.{sensor.id}
. Now the new service would only need to subscribe to data.unknown.*
.
But other problems may arise with this approach.
Topic exchanges use routing keys to know to where to send the data1.
A solution is to define the sensor data model in this routing keys so that each container subscribes only to models that can handle.
For this the following "metamodel" can be defined in routing keys as such:
<containerType>.<containerName>.data.<infoType>.<sensorId>.<channel>.<records>.<gps>.<temp_celc>
And each key can have the following types:
- containerType: any container type, e.g. Data Gateway;
- containerName: any, defined by the configuration;
- infoType: encoded, decoded, processed;
- sensorId: any, defined by the configuration;
- channel: default and any defined in configuration (relevant for data ramify);
- records: withRecords, withoutRecords;
- gps: withGPSData, withoutGPSData;
- temp_celc: withTempC, withoutTempC;
More sensor data types or other options can be added as needed.
Design
The following table will attempt to set exchanges to each service, inbound and outbound (if needed). All services that don't interact with sensor data are not presented.
There are 3 types of containers: start
(publishes), middle
(subscribes and publishes), end
(subscribes).
This solution is viable only if we can keep flexibility while providing a way for each container to only subscribe and publish to one single exchange (without looking to special cases).
Container |
Subscribe |
Publish |
Notes |
Data Gateway |
--- |
datagateway.{container.name}.data.{sensor.infoType}.{sensor.id}.default.withoutRecords.withoutGPSData.withoutTempC |
|
Data Processor |
*.*.data.decoded.{sensor.id}.*.*.*.* |
dataprocessor.{container.name}.data.processed.<keep>.<keep>.withoutRecords.withGPSData.withoutTempC |
1 |
Data Store |
*.*.data.*.*.*.*.*.* |
--- |
2 |
Device Records Slave |
*.*.data.processed.*.*.withoutRecords.*.* |
devicerecordsslave.{container.name}.data.<keep>.<keep>.<keep>.withRecords.<keep>.<keep> |
|
Data Decoder |
*.*.data.encoded.{sensor.id}.*.*.*.* |
datadecoder.{container.name}.data.decoded.<keep>.<keep>.withoutRecords.withGPSData.withoutTempC |
4 |
Fast Data Store |
*.*.data.processed.*.*.*.withGPSData.* |
--- |
5 |
Data Aggregator |
*.*.data.processed.*.*.*.withGPSData.withoutTempC && data.processed.*.*.*.withoutGPSData.withTempC |
dataaggregator.{container.name}.data.processed.<keep>.<keep>.<keep>.withGPSData.withTempC |
6 |
Authenticator Slave |
TODO |
TODO |
|
Sensor Viewer Slave |
TODO |
TODO |
|
Tracking Delivery |
*.*.data.processed.*.*.*.withGPSData.* |
--- |
|
Farm Conditions Control |
TODO |
TODO |
|
Location Tracking |
*.*.data.processed.*.*.withRecords.withGPSData.* |
--- |
|
Additional context
The section presents notes about specific containers.
Data Store
A data store stores sensor data, this container can be deployed as many times as needed to aggregate data as requested.
So this can aggregate all data by subscribing to all data with *.*.data.*
or to more specific subsets of this data, like *.*.data.decoded.*
and *.*.data.processed.gps.*
.
Data Processor
As an example, the sensor.type
for this processor was a simple GPS data type. That with in the example this container publishes with this routing keys.
This container can possibly subscribe to *.*.data.encoded.*.*.*.*.*
if all data has the same format, e.g, they were all decoded to the same json format.
Data Decoder
It would be nice if we let, based on something, this container publish with the following keys:
*.*.data.processed.<something>
Fast Data Store
For now this container can only processed gps data but more is planed for the future.
Data Aggregator
This is an example. The idea is that the configuration defines what two data type it will aggregate. The only constrain is that the types it subscribes to can't be the same as the ones that it publishes.
E.g.: subscribe to *.*.data.processed.*.*.*.withGPSData.withoutTempC
&& *.*.data.processed.*.*.*.withGPSData.withTempC
, publish dataaggregator.thisaggregator.data.processed.<keep>.<keep>.<keep>.withGPSData.withTempC
is invalid.
Data Ramify
The idea where is that, in configuration, using a function a user can define what channel to send the information.
E.g.:
static outcome(sensor: any): Channel {
if(sensor.data.gps.latitude > 90) {
return Channel.of("drop")
}
if(sensor.data.gps.latitude < -90) {
return Channel.of("drop")
}
if(sensor.data.gps.longitude > 180) {
return Channel.of("drop")
}
if(sensor.data.gps.longitude < -180) {
return Channel.of("drop")
}
return Channel.of("default")
}
For now the config can only subscribe to *.*.data.processed.<something>
maybe we can change it to *.*.data.*.<something>
in the future.