derrickoswald / cimspark Goto Github PK
View Code? Open in Web Editor NEWSpark access to Common Information Model (CIM) files
License: MIT License
Spark access to Common Information Model (CIM) files
License: MIT License
Fix regular expressions to handle self closing tags, for example
<cim:IdentifiedObject.description/>
Operations in xslt processing using javax.xml.transform.TransformerFactory output self closing tags where the contents of an element are empty. This should be handled by the regular expressions used to parse the CIM RDF files.
To test, use a Java program like the attached Transform.java and identity.xslt, to transform an RDF with an empty element, and test that it is correctly read in by the CIMReader.
SelfClosingTagsTest.zip
The ability to read CIM files into two different named RDD sets, i.e. RDD[<CIM class>], via Allow for named RDD variations, allows for generation of a difference file based on two reference CIM files.
This issue tracks the creation of a new standalone executable called CIMDifference, that will perform this task.
Essentially:
A possibility exists that partitioning according to TopologicalIsland may be beneficial for many use-cases.
Most electric distribution consists of radial networks, where the network downstream of a supply transformer is an isolated network that can be modeled independently, albeit with a single attachment to the grid at the transformer. The network topology processor identifies these "islands of connectivity" and generates one TopologicalIsland object, which all contained Terminal and ConnectivityNode objects reference.
In some cases ganged/parallel transformers are used to supply higher power requirements, so the phrase "a supply transformer" in the above description should be phrased as "a group of supply transformers that share a common bus".
The term Trafokreis (english: transformer circuit) is used to refer to these islands serviced by a transformer group.
Not all islands correspond to a Trafokreis. Notable exceptions are:
For some analysis use-cases, these topological islands can be processed in parallel independent of each other. It may be possible to partition the CIM RDD classes, based on their topological island, so as to bring all relevant objects to the same machine for processing a priori.
So, for example, the CIM classes related to the island for transformer(s) X would be assigned to the Spark worker Y. The number of topological islands far exceeds the number of workers, so each worker would have several islands.
The RDD.coalesce()
method takes an optional PartiionCoalescer which generates an array of PartitionGroup
s, each of which has an array of Partition
s and their preferred location (machine name). But there doesn't seem to be a way to use anything except a HashPartitioner or RangePartitioner with the default RDD, so this will probably involve creating a subclass of RDD with the desired partitioning infrastructure.
The RDD.groupBy
method takes an optional Partitioner that would allow the creation of individual RDD for each partition, but it's unclear how this would work.
In any case, partitioning like this would require a mapping table between CIM rdf:ID and partition (Trafokreis) for all elements. This would include items not in the topology, such as assets, locations, etc. It also probably requires a partition0 to contain elements shared between islands such as voltages and power system resource types, etc.
There is currently a long list of 'post-processing' operations (about, normalize, deduplicate, join, topology, edges) performed by the CIMReader after reading in a set of CIM files.
These are currently hard-coded via options
to support the using ch.ninecode.cim
argument for sql import of CIM files in python and R (i.e. using the non-compiled API). It would be better if these operations were broken out into separate modules/packages and a generic mechanism to chain the operations was implemented.
This has implications such as:
This issue tracks code changes to adhere to Scala best practices:
⇒
, ←
and →
With the availability of Spark 3.0, there is a porting effort required to produce a version that runs with Spark 3, e.g. 2.12-3.0.0-5.0.0.
Of note:
Spark context Web UI available at http://192.168.10.221:4041
Spark context available as 'sc' (master = local[*], app id = local-1573335188812).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.0.0-preview
/_/
Using Scala version 2.12.10 (OpenJDK 64-Bit Server VM, Java 1.8.0_222)
Type in expressions to have them evaluated.
Type :help for more information.
This issue tracks the tasks around documentation in general and ScalaDoc in particular.
The CIM RDD are currently added to the default SparkSQL database:
scala> spark.sql ("show databases").show
+------------+
|databaseName|
+------------+
| default|
+------------+
It would be good if the database could be specified.
Many companies restrict and enhance the CIM classes that can be used in an application, using what are called profiles. For example ENTSO-E uses the Common Grid Model Exchange Specification (CGMES) which is a super/sub set of the CIM model.
This issue tracks the work necessary to support profiles.
Of interest:
The CIM specification allows for changes via the CIM Difference Model.
The CIMReader should:
The CIMReader is now a module (in the maven sense) of CIMSpark. But the Maven Central repo only has CIMReader and not it's parent CIMSpark.
This leads to build failures:
Failed to read artifact descriptor for ch.ninecode.cim:CIMReader:jar:2.11-2.4.3-3.6.0: Failure to find ch.ninecode.cim:CIMSpark:pom:2.11-2.4.3-3.6.0
because the CIMSpark pom is not available.
This issue is meant to track a solution, either by uploading CIMSpark (pom only project) to Maven Central, or some other (better) mechanism.
The wart remover (http://www.wartremover.org/) and scalastyle (http://www.scalastyle.org/) show a number of places where the codebase could be improved.
Some of the wart remover rules are stupid, like not allowing default method parameters - which is one of the best features of Scala, but it is what it is.
This issue tracks the changes that are needed to bring the codebase into compliance with a subset of the wart remover and scalastyle checks.
To work on this issue, in the CIMSpark pom.xml, add the wartremover compiler plugin to the net.alchim31.maven:scala-maven-plugin within the configuration, add wartremover options to the scala compiler args and comment out the fatal warnings flag:
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>${version.dependency.scala-maven-plugin}</version>
<configuration>
<scalaCompatVersion>${version.dependency.scala}</scalaCompatVersion>
<scalaVersion>${version.dependency.scalalibrary}</scalaVersion>
<archive>
<addMavenDescriptor>false</addMavenDescriptor>
</archive>
<compilerPlugins>
<compilerPlugin>
<groupId>org.wartremover</groupId>
<artifactId>wartremover_${version.dependency.scalalibrary}</artifactId>
<version>${version.dependency.wartremover}</version>
</compilerPlugin>
</compilerPlugins>
<displayCmd>true</displayCmd>
<args>
<arg>-deprecation</arg>
<arg>-feature</arg>
<arg>-unchecked</arg>
<arg>-Ywarn-dead-code</arg>
<arg>-Ywarn-unused</arg>
<!-- arg>-Xfatal-warnings</arg -->
<arg>-Xlint:_</arg>
<arg>-target:jvm-1.8</arg>
<!-- see https://github.com/wartremover/wartremover/blob/704113034f9d2829aa8d577ac4f059c2136c6781/core/src/main/scala/wartremover/warts/Unsafe.scala -->
<!-- arg>-P:wartremover:only-warn-traverser:org.wartremover.warts.Any</arg -->
<arg>-P:wartremover:only-warn-traverser:org.wartremover.warts.AsInstanceOf</arg>
<!-- arg>-P:wartremover:only-warn-traverser:org.wartremover.warts.DefaultArguments</arg -->
<arg>-P:wartremover:only-warn-traverser:org.wartremover.warts.EitherProjectionPartial</arg>
<arg>-P:wartremover:only-warn-traverser:org.wartremover.warts.IsInstanceOf</arg>
<arg>-P:wartremover:only-warn-traverser:org.wartremover.warts.NonUnitStatements</arg>
<arg>-P:wartremover:only-warn-traverser:org.wartremover.warts.Null</arg>
<arg>-P:wartremover:only-warn-traverser:org.wartremover.warts.OptionPartial</arg>
<arg>-P:wartremover:only-warn-traverser:org.wartremover.warts.Product</arg>
<arg>-P:wartremover:only-warn-traverser:org.wartremover.warts.Return</arg>
<arg>-P:wartremover:only-warn-traverser:org.wartremover.warts.Serializable</arg>
<arg>-P:wartremover:only-warn-traverser:org.wartremover.warts.StringPlusAny</arg>
<arg>-P:wartremover:only-warn-traverser:org.wartremover.warts.Throw</arg>
<arg>-P:wartremover:only-warn-traverser:org.wartremover.warts.TraversableOps</arg>
<arg>-P:wartremover:only-warn-traverser:org.wartremover.warts.TryPartial</arg>
</args>
</configuration>
...
Recompile some module and then work on resolving the WARNINGs.
In some cases, operations on RDD cause the re-reading of the CIM .rdf files, since the RDD constructor DAG graph dependency reaches back to where the file is read in. You can see this by asking for the debug string for an RDD before and after a checkpoint:
val lines = sc.getPersistentRDDs.filter(_._2.name == "ACLineSegment").head._2.asInstanceOf[RDD[ACLineSegment]]
lines: org.apache.spark.rdd.RDD[ch.ninecode.model.ACLineSegment] = ACLineSegment MapPartitionsRDD[383] at collect at CIMSubsetter.scala:48
lines.toDebugString
res6: String =
(1) ACLineSegment MapPartitionsRDD[383] at collect at CIMSubsetter.scala:48 [Disk Memory Serialized 1x Replicated]
| MapPartitionsRDD[382] at collect at CIMSubsetter.scala:48 [Disk Memory Serialized 1x Replicated]
| MapPartitionsRDD[1] at values at CIMRelation.scala:110 [Disk Memory Serialized 1x Replicated]
| CachedPartitions: 2; MemorySize: 4.9 MB; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
| NewHadoopRDD[0] at newAPIHadoopRDD at CIMRelation.scala:106 [Disk Memory Serialized 1x Replicated]
lines.checkpoint
lines.count
res8: Long = 823
lines.toDebugString
res9: String =
(1) ACLineSegment MapPartitionsRDD[383] at collect at CIMSubsetter.scala:48 [Disk Memory Serialized 1x Replicated]
| CachedPartitions: 1; MemorySize: 275.6 KB; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
| ReliableCheckpointRDD[2197] at count at <console>:35 [Disk Memory Serialized 1x Replicated]
It may be advantageous to use the checkpoint feature of the RDD API to mark the RDD generated by the read.cim call as checkpointable - to store the parsed classes on HDFS so that a re-read becomes unnecessary.
There are caveats of course, including:
The CIM model is, by and large, well normalized. Apart from a few pathological cases, such as the relationship between Asset and PowerSystemResource which is many-to-many (a PowerSystemResource can have many associated Asset, and an Asset can have many associated PowerSystemResource), most relationships in CIM follow the normalized model where child objects contain a reference to the (haveA) parent object.
Despite this well defined normalization, delinquent programs, such as CIMdesk needlessly generate malformed CIM files where multiple elements are contained in the reverse relation of a simple one-to-many relationship.
From the MicroGrid base case test configuration file MicroGridTestConfiguration_BC_BE_GL_V2.xml:
Location elements reference PowerSystemResource rather than vice versa
<cim:Location rdf:ID="_4aa56f5f-d576-49e5-8aa0-95ef1ba262e3">
<cim:Location.PowerSystemResources rdf:resource="#_37e14a0f-5e34-4647-a062-8bfd9305fa9d"/>
<cim:Location.CoordinateSystem rdf:resource="#_6acd306f-43e5-4f69-93cb-53c048125cdf"/>
</cim:Location>
From the RealGrid test configuration file CGMES_v2.4.15_RealGridTestConfiguration_SV_v2.xml:
TopologicalIsland elements reference TopologicalNode rather than vice versa
<cim:TopologicalIsland rdf:ID="_TI-1">
<cim:TopologicalIsland.TopologicalNodes rdf:resource="#_1841689480_VL_TN2"/>
<cim:TopologicalIsland.TopologicalNodes rdf:resource="#_1113529077_VL_TN1"/>
...
The CIMReader has handled the many-to-many case, starting from fairly early versions, but extending this to handle badly formed CIM files will require:
Care should be taken so that performance is not severely degraded.
In performing the network topology processing to create the TopologyNode and TopologyIsland RDD, numerous messages about joining VertexPartitions with different indexes is slow are logged:
WARN impl.ShippableVertexPartitionOps: Joining two VertexPartitions with different indexes is slow.
It is unclear which operations are causing these messages or if they can be avoided. Google searches indicate that the VertexRDD that are used should be cached, but preliminary attempts to cache generated VertexRDD had no effect. Other processing which may be at fault is to update the ACDCTerminal, and ConnectivityNode RDD including their superclasses.
An attempt should be made to understand the origin of these messages and ameliorate them if possible.
Testing of de-duplication with striped rdf files identifies some missing edges.
Steps to reproduce:
Result:
The numbers of edges are 1700967 vs. 1700989 (22 out of 1.7 million edges are missing from the full area conversion, but present in the striped area conversion).
Expected:
The numbers should be the same (although the actual number may be different since even the striped conversion may be missing some edges)
Probable cause:
The boundary between InputSplits (default 64MB) is not being handled correctly. By changing to a InputSplit size of 256M (ch.ninecode.cim.split_maxsize = 256000000) the number of missing features is reduced to ten. One of the missing edges (KLE447955) lies very close to the InputSplit boundary between splits 88 to 89.
In some cases, due perhaps to the distributed nature of the system, the Elements RDD is not found as a named RDD by the GridLAB-D application:
setup : 4.212088595 seconds
root
|-- sup: element (nullable = true)
== Physical Plan ==
*Scan ch.ninecode.cim.CIMRelation@24755a46 [sup#0]
read : 802.132211919 seconds
Exception in thread "main" java.lang.NullPointerException
at ch.ninecode.gl.GridLABD.prepare(GridLABD.scala:782)
at ch.ninecode.gl.Main$.main(Main.scala:237)
at ch.ninecode.gl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:736)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Simply repeating the same code often passes this same point without error.
The offending client code looks like this:
// get all elements
val elements = get ("Elements").asInstanceOf[RDD[Element]]
// join with WireInfo to get ratedCurrent (only for ACLineSegments)
val cableMaxCurrent = getCableMaxCurrent()
782 val joined_elements = elements.keyBy(_.id).leftOuterJoin(cableMaxCurrent).map(e =>
{
val ele = e._2._1
val wire = e._2._2
val wireinfo = wire match
{
case Some(maxCurrent) => maxCurrent
case None => Double.PositiveInfinity
}
(ele.id, (ele, wireinfo))
})
So it seems that the replacement of the Elements RDD is not happening atomically in the CIMNetworkTopologyProcessor:
// swap the old Elements RDD for the new one
old_elements.name = null
new_elements.name = "Elements"
val bogus = new_elements.count + 1L // needed for some reason
new_elements.persist (storage)
session.sparkContext.getCheckpointDir match
{
case Some (dir) => new_elements.checkpoint ()
case None =>
}
Somewhere between the name being set null and the new name being assigned, another process attempts to look up the name.
Not sure how to fix this. Note the kludgey call to count() that should maybe precede the name reassignment.
Currently, the CIMReader parses only rdf:ID elements (the reference element with the given ID).
In order to support Steady State Hypothesis (SSH) and other modifications to the reference element, it is necessary to handle rdf:about elements. This requires:
Consideration should be given to making the remembered fields also available to rdf:ID elements so that the export function can emit only the same fields seen on import.
An export function exists in the CIMReader, but only from Scala classes. This capability should be extended to the generated JavaScript classes. This will require:
The CIMreader is currently coded for CIM 17 or rather one specific combination of CIM17 (iec61970cim17v34_iec61968cim13v12_iec62325cim03v17a.eap) that has been labeled CIM100.
For generality, the CIMReader needs to be able to:
One of the most common operations for CIM class RDD is to generate a pairRDD for join operations with:
XXX.keyBy (_.id)
It may be advantageous to formalize this use-case by storing pre-keyed pairRDD in the persistent RDD cache pool instead of just CIM object RDD, since the id (CIM rdf:ID = mRID) is the unique identifier for each CIM object.
Unfortunately, this has pervasive downstream consequences. Each operation to "get" an RDD by name, which is used extensively in CIMScala and dependent code like CIMApplication, would need to be modified to take advantage of this - or to work-around it if the keyBy (_.id)
is not required.
For example:
val elements = get ("Elements").asInstanceOf[RDD[Element]].keyBy (_.id).join (...
becomes
val elements = get ("Elements").asInstanceOf[RDD[Element]].join (...
and
val terms = get ("Terminal").asInstanceOf[RDD[Terminal]].keyBy (_.ConductingEquipment).join (...
becomes
val terms = get ("Terminal").asInstanceOf[RDD[Terminal]].values.keyBy (_.ConductingEquipment).join (...
This also has effects on partitioning. I believe that the first element of the pair's hash code is used as the partition function for pairRDD, and hence caching pairRDD would trigger a shuffle as objects were coalesced into the machine that "owns" them.
Benchmarks should be performed before and after this change to determine if there is an actual speed improvement with typical use-case scenarios.
Spark recommends Kryo serialization.
This issue tracks tasks to ensure that this serialization framework is used and works correctly.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.