The eventsByTag query is very slow because of a subquery and the incorrect placement of the where clause.
Dumped from our logs
select x2.x3, x2.x4, x2.x5, x2.x6, x2.x7, x2.x8, x2.x9, x2.x10, x2.x11, x2.x12, x2.x13, x2.x14, x2.x15 from (select "deleted" as x4, "meta_ser_id" as x14, "meta_ser_manifest" as x15, "sequence_number" as x6, "writer" as x7, "event_ser_manifest" as x12, "write_timestamp" as x8, "persistence_id" as x5, "adapter_manifest" as x9, "ordering" as x3, "event_payload" as x10, "event_ser_id" as x11, "meta_payload" as x13 from "event_journal" where "deleted" = false) x2, "event_tag" x16 where ((x16."tag" = ?) and ((x2.x3 > ?) and (x2.x3 <= ?))) and (x2.x3 = x16."event_id") order by x2.x3 limit ?
This subquery
select "deleted" as x4, "meta_ser_id" as x14, "meta_ser_manifest" as x15, "sequence_number" as x6, "writer" as x7, "event_ser_manifest" as x12, "write_timestamp" as x8, "persistence_id" as x5, "adapter_manifest" as x9, "ordering" as x3, "event_payload" as x10, "event_ser_id" as x11, "meta_payload" as x13 from "event_journal" where "deleted" = false
Doesn't have the where clause for the Ordering field. It will run the entire event-journal each time.
Initial PR that seems to have created the issue. akka/akka-persistence-jdbc#467
The query in question
https://github.com/apache/incubator-pekko-persistence-jdbc/blob/a9ebfe6da617f8cd2abc5813d2a530b1e1de70bb/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/dao/ReadJournalQueries.scala#L61
The issue is the Join happens before the filters on the event_journal table. This pushes the where clause to the main query forcing a full event journal query in the subquery.
private def baseTableWithTagsQuery() = {
baseTableQuery().join(TagTable).on(_.ordering === _.eventId)
}
private def eventsByTag(
tag: Rep[String],
offset: ConstColumn[Long],
maxOffset: ConstColumn[Long],
max: ConstColumn[Long]) = {
baseTableWithTagsQuery()
.filter(.2.tag === tag)
.sortBy(._1.ordering.asc)
.filter(row => row._1.ordering > offset && row.1.ordering <= maxOffset)
.take(max)
.map(._1)
}
I think the fix is as simple as reordering the query to push the filter above the join.
private def eventsByTag(
tag: Rep[String],
offset: ConstColumn[Long],
maxOffset: ConstColumn[Long],
max: ConstColumn[Long]
) = {
baseTableQuery()
.filter(row => row.ordering > offset && row.ordering <= maxOffset)
.sortBy(.ordering.asc)
.join(TagTable)
.on(_.ordering === .eventId)
.filter(.2.tag === tag)
.take(max)
.map(._1)
}