akkadotnet / akka.persistence.sqlserver Goto Github PK
View Code? Open in Web Editor NEWAkka.Persistence.SqlServer provider
License: Apache License 2.0
Akka.Persistence.SqlServer provider
License: Apache License 2.0
Can we read the connection string name property first then fallback to connection string? This makes deployment easier.
Cheers
In my system I'm using Akka.Persistence.SQL each IIS have its actor incarnation in order to avoid events mixin.
I'm using Akka 1.3.2.
To reduce the number of rows in the eventjournal table I'm subscribing to SaveSnapshotSuccess using the following code:
public class TheSend : AtLeastOnceDeliveryActor
{
// Logger
private readonly ILoggingAdapter _log = Logging.GetLogger(Context);
// Snapshot
private readonly int snapshotinterval = Int32.Parse(System.Configuration.ConfigurationManager.AppSettings["AkkaSnapshotInterval"]);
private int counter = 0;
private readonly string persistenceActorId = System.Configuration.ConfigurationManager.AppSettings["AkkaPersistenceActorId"];
// Actor system
protected readonly IActorRef _backEndActor;
// Cluster system
protected Akka.Cluster.Cluster Cluster = Akka.Cluster.Cluster.Get(Context.System);
public TheSend(IActorRef orderProcesserActor)
{
// router for backend actorref.
this._backEndActor = orderProcesserActor;
}
/// <summary>
/// Need to subscribe to cluster changes
/// </summary>
protected override void PreStart()
{
Cluster.Subscribe(Self, new[] { typeof(ClusterEvent.MemberUp) });
}
/// <summary>
/// Re-subscribe on restart
/// </summary>
protected override void PostStop()
{
Cluster.Unsubscribe(Self);
}
public override string PersistenceId
{
get { return String.Format("{0}-{1}-{2}", Context.Parent.Path.Name, Context.Self.Path.Name, persistenceActorId); }
}
/**
* RECOVER
*/
protected override bool ReceiveRecover(object message)
{
if (message is TheSendMessage)
{
var messageData = (TheSendMessage)message;
_log.Info("recovered {0}", messageData);
Deliver(_backEndActor.Path,
id =>
{
return new ConfirmableTheSend(id);
});
}
else if (message is TheSendConfirmation)
{
ConfirmDelivery(((TheSendConfirmation)message).DeliveryId);
}
else if (message is CheckMessage)
{
var messageData = ((CheckMessage)message);
_log.Info("recovered {0}", messageData.TrackingNumber);
Deliver(_backEndActor.Path,
id =>
{
return new ConfirmableCheck(id, messageData.TrackingNumber);
});
}
else if (message is ConfirmationCheck)
{
ConfirmDelivery(((ConfirmationCheck)message).DeliveryId);
}
else if (message is SnapshotOffer)
{
var m = (SnapshotOffer)message;
var s = (AtLeastOnceDeliverySnapshot)m.Snapshot;
SetDeliverySnapshot(s);
}
else
return false;
return true;
}
/**
* RECEIVE
*/
protected override bool ReceiveCommand(object message)
{
if (message is TheSendMessage)
{
Persist(message as InvoiceMessage, m =>
{
// is time for a new snapshot?
counter = (counter + 1) % snapshotinterval;
if (counter == 0)
{
var snapshot = GetDeliverySnapshot();
SaveSnapshot(snapshot);
}
// send a confirmation.
Deliver(_backEndActor.Path,
id =>
{
return new ConfirmableTheSend(id);
});
});
}
else if (message is TheSendConfirmation)
{
_log.Info($"Confirmation: {message}");
Persist(message as TheSendConfirmation, m => ConfirmDelivery(m.DeliveryId));
var mes = (TheSendConfirmation)message;
}
else if (message is CheckMessage)
{
Persist(message as CheckMessage, m =>
{
// is time for a new snapshot?
counter = (counter + 1) % snapshotinterval;
if (counter == 0)
{
var snapshot = GetDeliverySnapshot();
SaveSnapshot(snapshot);
}
Deliver(_backEndActor.Path,
id =>
{
});
});
}
else if (message is ConfirmationCheck)
{
_log.Info($"Confirmation Check: {message}");
var messageData = (ConfirmationCheck)message;
Persist(message as ConfirmationCheck, m => ConfirmDelivery(m.DeliveryId));
}
else if (message is UnconfirmedWarning)
{
_log.Info(message.ToString());
}
else if (message is SaveSnapshotSuccess)
{
var snapshotSeqNr = ((SaveSnapshotSuccess)message).Metadata.SequenceNr;
/**
* Enable if you want to decrease the size of Journal ed Snapshot tables.
*/
DeleteMessages(snapshotSeqNr);
DeleteSnapshots(new SnapshotSelectionCriteria(snapshotSeqNr - 1));
_log.Info("SaveSnapshotSuccess");
}
else if (message is SaveSnapshotFailure)
{
_log.Info("SaveSnapshotFailure");
}
else if (message is ClusterEvent.MemberUp)
{
_log.Info("Frontend [{0}]: Cluster is ready. Able to begin jobs.");
// pronto per ricevere messaggi dal backend.
Become(ReceiveCommand);
Stash.UnstashAll();
}
else if (message is ClusterEvent.CurrentClusterState)
{
var clusterState = ((ClusterEvent.CurrentClusterState)message);
_log.Info($"Store Cluster State: {clusterState}");
// TODO Put this on Elastic (clusterState) ed un timestamp
}
else
{
Stash.Stash();
return false;
}
return true;
}
}
the limit is set to 25000 events...the issue is that in production is never fired so I have to clean it manually...but why? The system in production is under an heavy usage, can be a rece condition issue? Can you please confim the that only confirmed messages are snapshotted and deleted? Can it be a SaveSnapshotFailure?
In our cluster we have five nodes composite of:
The cluster is joined, up and running; but when the EventJournal is a bit full (~ 1 million rows) the insert in the EventJournal became slow and often (or always on rows growing) if a message must be redeliverd because of the redeliver interval is reached the message is not sent again to the backend.
How do I have to manage this issue?
Here is my IIS config:
<hocon>
<![CDATA[
akka.loglevel = INFO
akka.log-config-on-start = off
akka.stdout-loglevel = INFO
akka.actor {
provider = "Akka.Cluster.ClusterActorRefProvider, Akka.Cluster"
deployment {
/Process {
router = round-robin-group
routees.paths = ["/user/Process"] # path of routee on each node
# nr-of-instances = 3 # max number of total routees
cluster {
enabled = on
allow-local-routees = off
use-role = Process
}
}
}
debug {
receive = on
autoreceive = on
lifecycle = on
event-stream = on
unhandled = on
}
}
akka.remote {
helios.tcp {
# transport-class = "Akka.Remote.Transport.Helios.HeliosTcpTransport, Akka.Remote"
# applied-adapters = []
# transport-protocol = tcp
port = 0
hostname = 172.16.1.8
}
log-remote-lifecyclo-events = DEBUG
}
akka.cluster {
seed-nodes = [
"akka.tcp://[email protected]:2551",
"akka.tcp://[email protected]:2552"
]
roles = [Send]
auto-down-unreachable-after = 10s
# how often should the node send out gossip information?
gossip-interval = 1s
# discard incoming gossip messages if not handled within this duration
gossip-time-to-live = 2s
}
# http://getakka.net/docs/persistence/at-least-once-delivery
akka.persistence.at-least-once-delivery.redeliver-interval = 300s
# akka.persistence.at-least-once-delivery.redelivery-burst-limit =
# akka.persistence.at-least-once-delivery.warn-after-number-of-unconfirmed-attempts =
akka.persistence.at-least-once-delivery.max-unconfirmed-messages = 1000000
akka.persistence.journal.plugin = "akka.persistence.journal.sql-server"
akka.persistence.journal.publish-plugin-commands = on
akka.persistence.journal.sql-server {
class = "Akka.Persistence.SqlServer.Journal.SqlServerJournal, Akka.Persistence.SqlServer"
plugin-dispatcher = "akka.actor.default-dispatcher"
table-name = EventJournal
schema-name = dbo
auto-initialize = on
connection-string-name = "HubAkkaPersistence"
refresh-interval = 1s
connection-timeout = 30s
timestamp-provider = "Akka.Persistence.Sql.Common.Journal.DefaultTimestampProvider, Akka.Persistence.Sql.Common"
metadata-table-name = Metadata
}
akka.persistence.snapshot-store.plugin = ""akka.persistence.snapshot-store.sql-server""
akka.persistence.snapshot-store.sql-server {
class = "Akka.Persistence.SqlServer.Snapshot.SqlServerSnapshotStore, Akka.Persistence.SqlServer"
plugin-dispatcher = ""akka.actor.default-dispatcher""
connection-string-name = "HubAkkaPersistence"
schema-name = dbo
table-name = SnapshotStore
auto-initialize = on
}
]]>
</hocon>
and here is our backend config:
<hocon><![CDATA[
akka.loglevel = INFO
akka.log-config-on-start = on
akka.stdout-loglevel = INFO
akka.actor {
provider = "Akka.Cluster.ClusterActorRefProvider, Akka.Cluster"
debug {
receive = on
autoreceive = on
lifecycle = on
event-stream = on
unhandled = on
}
}
akka.remote {
helios.tcp {
# transport-class = "Akka.Remote.Transport.Helios.HeliosTcpTransport, Akka.Remote"
# applied-adapters = []
# transport-protocol = tcp
#
# seed-node ports 2551 and 2552
# non-seed-node port 0
port = 2551
hostname = 172.16.1.8
}
log-remote-lifecyclo-events = INFO
}
akka.cluster {
seed-nodes = [
"akka.tcp://[email protected]:2551",
"akka.tcp://[email protected]:2552"
]
roles = [Process]
auto-down-unreachable-after = 10s
}
]]></hocon>
The issue in present using Akka 1.3.2, but I have faced using with previous versions too.
Akka.Persistence implementations are not released automatically as part of Akka.NET updates CC @Horusiath
During the maintenance of our production SqlServer database hosting event journal (active/passive setup), we have encountered a problem, that whole application got unresponsive. Only restarting of actor systems on all nodes in cluster via pbm helped. We are using Akka.Persistence.SqlServer 1.3.7 and BatchingSqlServerJournal.
I managed to simulate problem locally by running load test and taking local journal DB offline.
Even after putting DB online, the actors are still stuck.
Problem seems to be between expectations of ReceivePersistentActor (Eventsourced parent class) and implementation of BatchingSqlServerJournal. When Persist is called in a persistent actor, a WriteMessages message is sent to journal actor and persistent actor changes its behaviour (waiting for a sequence of WriteMessageSuccess/WriteMessageRejected/WriteMessageFailurea and finally WriteMessagesSuccessful message, while stashing all other messages). The problem happens when journal actor will not reply, there is no timeout mechanism in persistent actor treating persist as failed and it blocks infinitely. This is the case of BatchingSqlServerJournal, which has some code paths not replying to journal requests (connection open error, circuit breaker opened and maybe more).
The consequences are quite severe, persistent actor may be unblocked only by external actor stop or whole actor system restart.
Observing some failures with the latest, 1.1.1
I created a small project reproducing this issue. It's completely weird since it everything works fine if I limit the result size to a very small number (up to 16 rows). After that it doesn't respond. Here's the project:
https://github.com/object/Akka.PersistenceQuery.Performance
And here's the code (in F#) to reproduce the error. I included a copy of my SQL db in the repo, but I don't think actual data should matter since it's only IDs that are requested.
let getAllPersistenceIds system =
let queries = PersistenceQuery.Get(system).ReadJournalFor("akka.persistence.query.journal.sql")
let mat = ActorMaterializer.Create(system)
let src : Source<string, NotUsed> = queries.AllPersistenceIds()
src.Take(int64 MaxResultCount).RunSum(System.Func<string,string,string> reduce, mat)
|> Async.AwaitTask
|> Async.RunSynchronously
Need to ensure project dependencies are up to to v1.2 stable release
I created a simple example of an akka.persistence program (basically a carbon copy of the example on Pluralsight) running it on a Mac and connecting to a Sql Server database hosted in a Docker container (here is the link of how I set that up). And it will save Journal Events and play them back just fine, but if I try to do the same with a snapshot, it will save the snapshot to the database, but it does not play it back. I tried the exact same code on a Windows virtual machine (pointed to a sql server database that was running locally on Windows, NOT in a Docker container) and it worked perfectly. So, I am guessing this is an issue with Mono? Just wanted to let you guys know.
When I am trying to replay events, the error message below shows up.
BeginExecuteReader requires the command to have a transaction when the connection assigned to the command is in a pending local transaction. The Transaction property of the command has not been initialized.
Below is the summary of the program flow.
ActorA > FowardCommand > ActorB > PublishEvent > ActorC > Tell > ActorD > ForwardCommand > ActorE
Actors below are actor base:
ActorA, ActorD
Actors below are persistence actor:
ActorB, ActorE
Actors below are receive actor:
ActorC
When ActorE trying to replay events, error occur.
Below are the stackflow I grab from RecoveryFailure
object.
at System.Threading.Tasks.Task.ThrowIfExceptional(Boolean includeTaskCanceledExceptions)
at System.Threading.Tasks.Task`1.GetResultCore(Boolean waitCompletionNotification)
at System.Threading.Tasks.Task`1.get_Result()
at Akka.Persistence.Sql.Common.Journal.JournalDbEngine.<>c__DisplayClass1.<ReplayMessagesAsync>b__0(Task`1 task)
at System.Threading.Tasks.ContinuationTaskFromResultTask`1.InnerInvoke()
at System.Threading.Tasks.Task.Execute()
Error happened at line below:
https://github.com/rambo406/Akka.NET-SQL-Server-issue-example/blob/master/Actors/Aggregates/Accountant.cs#L28
In an answer to De-Serialize akka.net persistence message using C# Horusiath said:
In the future there will be a possibility to change this format from binary into JSON data type if your database of choice will support that format.
Has the future arrived yet? We would like to store the event payload as JSON string in a SQL Server database. Converting the JSON string to a byte array is a waste of CPU cycles. Depending on the size of the payload the overhead is between 50% and 100% for serialising, and between 100% and 200% for deserialising.
using version 1.1.5 pre-release but was seen with other versions a well
we run our standartised test with 5000 incoming messages and test actor system performance under various settings (journal and snapshot enabled)
in all tests we see lot of synchronisation/high SQL time spent around journal write
mosts test would spend 120 seconds in total just journalling these messages
to be precise it is journalling 10000 message as we also journal MessageHandled for each incoming message when it was actually processed
based on the data, there seems to be synchronization/ async wait bottleneck in journal write routine where system is under no real particular stress (low CPU etc.)
see the snippet from the ANTS profiles
also note the level of wait callstack (over 30K invocations deep) consisting System.Threading.ThreadPoolWorkQueue.Dispatch()
and Method
System.Threading._ThreadPoolWaitCallback.PerformWaitCallback() nested over and over
see the comparision over wait time over actual execution vs the inserts themselves and their timing
the dabatase server is under no stress and within our actor we do our own upserts of business data derived from incoming messages with much higher throughput
with increased number of instances of the actor, the insert into journal gets considerably slower and journal would start throwing lot of PK violation exceptions (#50)
the HOCON is here (passwords removed)
sql-server
{
# qualified type name of the SQL Server persistence journal actor
class = "Akka.Persistence.SqlServer.Journal.SqlServerJournal, Akka.Persistence.SqlServer"
# dispatcher used to drive journal actor
plugin-dispatcher = "akka.actor.default-dispatcher"
# connection string used for database access
connection-string = REMOVED"
# default SQL commands timeout
connection-timeout = 30s
# SQL server schema name to table corresponding with persistent journal
schema-name = dbo
# SQL server table corresponding with persistent journal
table-name = AKKAJournal
# should corresponding journal table be initialized automatically
auto-initialize = off
# timestamp provider used for generation of journal entries timestamps
timestamp-provider = "Akka.Persistence.Sql.Common.Journal.DefaultTimestampProvider, Akka.Persistence.Sql.Common"
# metadata table
metadata-table-name = AKKAMetadata
}
}
snapshot-store
{
plugin = "akka.persistence.snapshot-store.sql-server"
sql-server
{
# qualified type name of the SQL Server persistence journal actor
class = "Akka.Persistence.SqlServer.Snapshot.SqlServerSnapshotStore, Akka.Persistence.SqlServer"
# dispatcher used to drive journal actor
plugin-dispatcher = "akka.actor.default-dispatcher"
# connection string used for database access
connection-string = "REMOVED*"
# default SQL commands timeout
connection-timeout = 30s
# SQL server schema name to table corresponding with persistent journal
schema-name = dbo
# SQL server table corresponding with persistent journal
table-name = AKKASnapshot
# should corresponding journal table be initialized automatically
auto-initialize = off
}
}
}
# this config section will be referenced as akka.actor
actor {
serializers {
wire = "Akka.Serialization.WireSerializer, Akka.Serialization.Wire"
}
serialization-bindings {
"System.Object" = wire
}
provider = "Akka.Remote.RemoteActorRefProvider, Akka.Remote"
debug {
receive = on
autoreceive = on
lifecycle = on
event-stream = on
unhandled = on
}
deployment {
/CashFundingManagementView {
router = round-robin-pool
nr-of-instances = 100
}
}
}
# here we're configuring the Akka.Remote module
remote {
helios.tcp {
transport-class =
"Akka.Remote.Transport.Helios.HeliosTcpTransport, Akka.Remote"
#applied-adapters = []
transport-protocol = tcp
port = 7095
hostname = "localhost"
}
log-remote-lifecycle-events = INFO
}
]]>
</hocon>
Migration script to v 1.3.2 syntax should be:
ALTER TABLE dbo.EventJournal ADD SerializerId INTEGER NULL
ALTER TABLE dbo.SnapshotStore ADD SerializerId INTEGER NULL
instead of ADD COLUMN SerializerId
With the following hocon
<hocon>
<![CDATA[
akka.persistence {
journal {
plugin = "akka.persistence.journal.sql-server"
sql-server {
class = "Akka.Persistence.SqlServer.Journal.SqlServerJournal, Akka.Persistence.SqlServer"
plugin-dispatcher = "akka.actor.default-dispatcher"
# connection string used for database access
connection-string = "Data Source=(local)\\MSSQLLocalDB;Initial Catalog=PSAkka;Integrated Security=True"
# can alternativly specify: connection-string-name
# default SQL timeout
connection-timeout = 30s
# SQL server schema name
schema-name = dbo
# persistent journal table name
table-name = EventJournal
# initialize journal table automatically
auto-initialize = on
timestamp-provider = "Akka.Persistence.Sql.Common.Journal.DefaultTimestampProvider, Akka.Persistence.Sql.Common"
metadata-table-name = Metadata
}
}
}
]]>
</hocon>
I get the following error:
Is this supported?
Thanks
For some unknown reason Akka.Persistence.SqlServer package on Nuget has a dependency on Akka.TestKit. Detect how and fix it.
In our cluster we have four nodes composite of:
The cluster is joined, up and running; when i send a POST to the webapi,:
Here is my IIS config:
<akka>
<hocon>
<![CDATA[
akka.loglevel = INFO
akka.log-config-on-start = off
akka.stdout-loglevel = INFO
akka.actor {
provider = "Akka.Cluster.ClusterActorRefProvider, Akka.Cluster"
deployment {
/TheProcess {
router = round-robin-group
routees.paths = ["/user/TheProcess"] # path of routee on each node
# nr-of-instances = 3 # max number of total routees
cluster {
enabled = on
allow-local-routees = off
use-role = TheProcess
}
}
}
debug {
receive = on
autoreceive = on
lifecycle = on
event-stream = on
unhandled = on
}
}
akka.remote {
helios.tcp {
# transport-class = "Akka.Remote.Transport.Helios.HeliosTcpTransport, Akka.Remote"
# applied-adapters = []
# transport-protocol = tcp
# public-hostname = "localhost"
# 0 or 46001-46010
port = 0
hostname = "localhost"
}
log-remote-lifecyclo-events = DEBUG
}
akka.cluster {
seed-nodes = [
"akka.tcp://ActorSystem@localhost:2551",
"akka.tcp://ActorSystem@localhost:2552"
]
roles = [TheSend]
# auto-down-unreachable-after = 10s
# how often should the node send out gossip information?
# gossip-interval = 1s
# discard incoming gossip messages if not handled within this duration
# gossip-time-to-live = 2s
}
# http://getakka.net/docs/persistence/at-least-once-delivery
akka.persistence.at-least-once-delivery.redeliver-interval = 300s
# akka.persistence.at-least-once-delivery.redelivery-burst-limit =
# akka.persistence.at-least-once-delivery.warn-after-number-of-unconfirmed-attempts =
akka.persistence.at-least-once-delivery.max-unconfirmed-messages = 1000000
akka.persistence.journal.plugin = "akka.persistence.journal.sql-server"
akka.persistence.journal.publish-plugin-commands = on
akka.persistence.journal.sql-server {
class = "Akka.Persistence.SqlServer.Journal.SqlServerJournal, Akka.Persistence.SqlServer"
plugin-dispatcher = "akka.actor.default-dispatcher"
table-name = EventJournal
schema-name = dbo
auto-initialize = on
connection-string-name = "AkkaPersistence"
refresh-interval = 1s
connection-timeout = 30s
timestamp-provider = "Akka.Persistence.Sql.Common.Journal.DefaultTimestampProvider, Akka.Persistence.Sql.Common"
metadata-table-name = Metadata
}
akka.persistence.snapshot-store.plugin = ""akka.persistence.snapshot-store.sql-server""
akka.persistence.snapshot-store.sql-server {
class = "Akka.Persistence.SqlServer.Snapshot.SqlServerSnapshotStore, Akka.Persistence.SqlServer"
plugin-dispatcher = ""akka.actor.default-dispatcher""
connection-string-name = "AkkaPersistence"
schema-name = dbo
table-name = SnapshotStore
auto-initialize = on
}
]]>
</hocon>
And here is my backend config:
<hocon>
<![CDATA[
akka.loglevel = INFO
akka.log-config-on-start = on
akka.stdout-loglevel = INFO
akka.actor {
provider = "Akka.Cluster.ClusterActorRefProvider, Akka.Cluster"
debug {
receive = on
autoreceive = on
lifecycle = on
event-stream = on
unhandled = on
}
}
akka.remote {
helios.tcp {
# transport-class = "Akka.Remote.Transport.Helios.HeliosTcpTransport, Akka.Remote"
# applied-adapters = []
# transport-protocol = tcp
# public-hostname = "localhost"
#
# seed-node ports 2551 and 2552
# non-seed-node port 0 or 46001-46010
port = 2551
hostname = "localhost"
}
log-remote-lifecyclo-events = INFO
}
akka.cluster {
seed-nodes = [
"akka.tcp://ActorSystem@localhost:2551",
"akka.tcp://ActorSystem@localhost:2552"
]
roles = [TheProcess]
# auto-down-unreachable-after = 10s
}
]]>
</hocon>
I think the issue is akka persistence related, what can the issue be?
This might be the same as #104 which is supposed to be fixed in 1.3.13. However we upgraded to 1.3.13 and it looks like this error occurs more often than before. Here are the symptoms:
Need to upgrade to the latest revision of https://github.com/petabridge/petabridge-dotnet-new - having a lot of issues with old NuGet versions et al in this project's CI.
This error
Xunit.Sdk.TrueException
Failed: Expected a message of type Akka.Streams.TestKit.TestSubscriber+OnComplete, but received {TestSubscriber.OnNext(g1-1)} (type Akka.Streams.TestKit.TestSubscriber+OnNext`1[System.Object]) instead from [akka://test/user/StreamSupervisor-11/Flow-0-0-select#838415787]
Expected: True
Actual: False
at Akka.TestKit.Xunit2.XunitAssertions.Fail(String format, Object[] args)
at Akka.TestKit.TestKitBase.InternalExpectMsgEnvelope[T](Nullable`1 timeout, Action`2 assert, String hint, Boolean shouldLog)
at Akka.TestKit.TestKitBase.InternalExpectMsgEnvelope[T](Nullable`1 timeout, Action`1 msgAssert, Action`1 senderAssert, String hint)
at Akka.TestKit.TestKitBase.InternalExpectMsg[T](Nullable`1 timeout, Action`1 msgAssert, String hint)
at Akka.TestKit.TestKitBase.ExpectMsg[T](Nullable`1 duration, String hint)
at Akka.Persistence.Sql.TestKit.EventsByPersistenceIdSpec.Sql_query_EventsByPersistenceId_should_return_empty_stream_for_cleaned_journal_from_0_to_MaxLong()
[INFO][01.08.2016 14:39:40][Thread 0041][akka://test/user/testActor143] Message OnError from akka://test/user/StreamSupervisor-11/Flow-0-0-select to akka://test/user/testActor143 was not delivered. 1 dead letters encountered.
[INFO][01.08.2016 14:39:40][Thread 0027][akka://test/user/StreamSupervisor-11/Flow-0-0-select] Message Terminate from akka://test/user/StreamSupervisor-11/Flow-0-0-select to akka://test/user/StreamSupervisor-11/Flow-0-0-select was not delivered. 2 dead letters encountered.
In follow up on the ordering issue on SQL server: akkadotnet/akka.net#2272
Please release a new nightly of package Akka.Persistence.SqlServer so it handles the new ordering column configuration. We're experiencing a missing method exception with the current 1.1.1.158-beta
System.Reflection.TargetInvocationException: Exception has been thrown by the target of an invocation. ---> System.MissingMethodException: Method not found: 'Void Akka.Persistence.Sql.Common.Journal.QueryConfiguration..ctor(System.String, System.String, System.String, System.String, System.String, System.String, System.String, System.String, System.String, System.String, System.TimeSpan)'.
at Akka.Persistence.SqlServer.Journal.SqlServerJournal..ctor(Config journalConfig)
--- End of inner exception stack trace ---
at System.RuntimeMethodHandle.InvokeMethod(Object target, Object[] arguments, Signature sig, Boolean constructor)
at System.Reflection.RuntimeConstructorInfo.Invoke(BindingFlags invokeAttr, Binder binder, Object[] parameters, CultureInfo culture)
at System.RuntimeType.CreateInstanceImpl(BindingFlags bindingAttr, Binder binder, Object[] args, CultureInfo culture, Object[] activationAttributes, StackCrawlMark& stackMark)
at System.Activator.CreateInstance(Type type, BindingFlags bindingAttr, Binder binder, Object[] args, CultureInfo culture, Object[] activationAttributes)
at System.Activator.CreateInstance(Type type, Object[] args)
at Akka.Actor.Props.ActivatorProducer.Produce()
at Akka.Actor.Props.NewActor()
--- End of inner exception stack trace ---
at Akka.Actor.Props.NewActor()
at Akka.Actor.ActorCell.CreateNewActorInstance()
at Akka.Actor.ActorCell.<>c__DisplayClass118_0.b__0()
at Akka.Actor.ActorCell.UseThreadContext(Action action)
at Akka.Actor.ActorCell.NewActor()
at Akka.Actor.ActorCell.Create(Exception failure)
--- End of inner exception stack trace ---
at Akka.Actor.ActorCell.Create(Exception failure)
at Akka.Actor.ActorCell.SysMsgInvokeAll(EarliestFirstSystemMessageList messages, Int32 currentState)
Thanks :)
Trying to make an Akka.Persistence example with sql server and autoinit of tables. However, the snapshot table fails to create.
I'm using Akka.NET 1.3.13.
Here is my HOCON:
akka {
persistence {
journal {
plugin = "akka.persistence.journal.sql-server"
sql-server {
class = "Akka.Persistence.SqlServer.Journal.SqlServerJournal, Akka.Persistence.SqlServer"
schema-name = dbo
auto-initialize = on
connection-string = "Data Source=(LocalDB)\\MSSQLLocalDB;AttachDbFilename=\\\\Mac\\Dropbox\\Nastava\\2018-2019\\RS_18-19\\Vj07\\PersistentActorExample\\PersistentActorExample\\App_Data\\Persistence.mdf;Integrated Security=True"
}
}
snapshot-store {
plugin = "akka.persistence.snapshot-store.sql-server"
sql-server {
class = "Akka.Persistence.SqlServer.Snapshot.SqlServerSnapshotStore, Akka.Persistence.SqlServer"
schema-name = dbo
auto-initialize = on
connection-string = "Data Source=(LocalDB)\\MSSQLLocalDB;AttachDbFilename=\\\\Mac\\Dropbox\\Nastava\\2018-2019\\RS_18-19\\Vj07\\PersistentActorExample\\PersistentActorExample\\App_Data\\Persistence.mdf;Integrated Security=True"
}
}
}
}
I'm working on a OSX with parallels running windows 10.
Here is the error that occurs:
[ERROR][5/5/2019 9:56:15 PM][Thread 0009][[akka://persistence-example/system/akka.persistence.snapshot-store.sql-server#670725692]] Error during snapshot store initialization
Cause: System.Data.SqlClient.SqlException (0x80131904): Incorrect syntax near '('.
Incorrect syntax near '('.
Incorrect syntax near '('.
at System.Data.SqlClient.SqlConnection.OnError(SqlException exception, Boolean breakConnection, Action`1 wrapCloseInAction)
at System.Data.SqlClient.SqlInternalConnection.OnError(SqlException exception, Boolean breakConnection, Action`1 wrapCloseInAction)
at System.Data.SqlClient.TdsParser.ThrowExceptionAndWarning(TdsParserStateObject stateObj, Boolean callerHasConnectionLock, Boolean asyncClose)
at System.Data.SqlClient.TdsParser.TryRun(RunBehavior runBehavior, SqlCommand cmdHandler, SqlDataReader dataStream, BulkCopySimpleResultSet bulkCopyHandler, TdsParserStateObject stateObj, Boolean& dataReady)
at System.Data.SqlClient.SqlCommand.InternalEndExecuteNonQuery(IAsyncResult asyncResult, String endMethod, Boolean isInternal)
at System.Data.SqlClient.SqlCommand.EndExecuteNonQueryInternal(IAsyncResult asyncResult)
at System.Data.SqlClient.SqlCommand.EndExecuteNonQueryAsync(IAsyncResult asyncResult)
at System.Threading.Tasks.TaskFactory`1.FromAsyncCoreLogic(IAsyncResult iar, Func`2 endFunction, Action`1 endAction, Task`1 promise, Boolean requiresSynchronization)
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at Akka.Persistence.Sql.Common.Snapshot.AbstractQueryExecutor.<CreateTableAsync>d__28.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at Akka.Persistence.Sql.Common.Snapshot.SqlSnapshotStore.<Initialize>d__17.MoveNext()
ClientConnectionId:69ba6e78-21b1-4cda-999b-94b7b0f703a5
Error Number:102,State:1,Class:15
The tables EventJournal and Metadata are created.
While trying to fix #91 I've noticed that there are quite a few tests which are failing in an indeterministic way. Here are some tests results which I got after pulling this repo and trying to run the tests. For the record, I am not using SQL Express but SQL Server 2017 (it shouldn't be a case anyway).
https://gist.github.com/Havret/54b8e2c0dde512ea727dd159b3bb70b7
Got a message of the expected type <Akka.Streams.TestKit.TestSubscriber+OnNext`1[[System.Object, mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089]]>. Also expected the predicate to return true but the message {TestSubscriber.OnNext(h-1)} of type <Akka.Streams.TestKit.TestSubscriber+OnNext`1[[System.Object, mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089]]> did not match
Expected: True
Actual: False
at Akka.TestKit.Xunit2.XunitAssertions.AssertTrue(Boolean condition, String format, Object[] args)
at Akka.TestKit.TestKitBase.AssertPredicateIsTrueForMessage[T](Predicate`1 isMessage, T m, String hint)
at Akka.TestKit.TestKitBase.<>c__DisplayClass91_0`1.<ExpectMsg>b__0(T m, IActorRef sender)
at Akka.TestKit.TestKitBase.InternalExpectMsgEnvelope[T](Nullable`1 timeout, Action`2 assert, String hint, Boolean shouldLog)
at Akka.TestKit.TestKitBase.InternalExpectMsg[T](Nullable`1 timeout, Action`2 assert, String hint)
at Akka.TestKit.TestKitBase.ExpectMsg[T](Predicate`1 isMessage, Nullable`1 timeout, String hint)
at Akka.Streams.TestKit.TestSubscriber.ManualProbe`1.ExpectNext(T element)
at Akka.Persistence.Sql.TestKit.EventsByPersistenceIdSpec.Sql_query_EventsByPersistenceId_should_return_remaining_values_after_partial_journal_cleanup()
Right now there is no implementation for the JournalSerializationSpec from the persistence TCK.
https://github.com/akkadotnet/akka.net/blob/dev/src/core/Akka.Persistence.TCK/Serialization/JournalSerializationSpec.cs
Before we take this plugin out of beta, all the TCK tests should be implementated in this plugin.
Related:
akkadotnet/akka.net#3019
After I run my application I get the error:
[WARNING][21-Apr-16 1:00:56 PM][Thread 0021][[akka://TaskManagementSystem/user/tenantRouter/TaskManagementDeliveryActor-45bcbf03-756f-4821-9c27-15035fe10621]] Rejected to persist event type [TaskManagement.Messages.CreateTaskMessage] with sequence number [1] for persistenceId [TaskManager] due to [Cannot insert the value NULL into column 'IsDeleted', table 'TaskManagment.dbo.EventJournal'; column does not allow nulls. INSERT fails.
The statement has been terminated.].
My table schema is the same like in the documentation.
Other plugins, as we moved them to using container dbs, we're running any Docker-related scripts outside of the actual build process. This is better, especially for Mono because it's currently breaking the build on Linux looking for System.Management.Automation
.
I have one DB with case sensitive collation (for db not collumn), and Persistence on this DB throws errors:
{ "Severity": "ERROR", "message": "Persistence failure when replaying events for persistenceId [ADER-636032988151227420-04]. Last known sequence number [0]", "timestamp": "2016-07-05 07:10:16.9931", "logger": "Offers.OfferActor", "stack": "System.Data.SqlClient.SqlException (0x80131904): Nieprawid\u0142owa nazwa kolumny PersistenceId.\r\n at System.Data.SqlClient.SqlCommand.<>c.b__167_0(Task1 result)\r\n at System.Threading.Tasks.ContinuationResultTaskFromResultTask
2.InnerInvoke()\r\n at System.Threading.Tasks.Task.Execute()\r\n--- End of stack trace from previous location where exception was thrown ---\r\n at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)\r\n at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)\r\n at Akka.Persistence.Sql.Common.Journal.JournalDbEngine.d__23.MoveNext()\r\nClientConnectionId:b2fc7f28-29d8-4882-b365-d358a25b1d91\r\nError Number:207,State:1,Class:16", "appdomain": "0001:Offers.Node.Service.exe", "machinename": "O360NODE01 ", "Hostname": "Offers.Node.Service" }
I've noticed that sometimes in the Akka.Persistence.SqlServer code there is PersistenceId instead of PersistenceID as collumn name. Small typo and such a trouble ๐
Akka.NET Persistence is still in beta but AFAIK is planned for RTM during the summer. There are some PRs that are important to merge before the release, in particular #59 and #60.
#59 doesn't affect anything at runtime, it's about consistend and deterministic names.
#60 is quite important, we had to fork Akka.Persistence.SqlServer to apply it, otherwise streaming AllEventsByTag don't work properly.
Both PRs are shown as if they break the build but IMHO it's a false negative, they are good. Is there anything that can be done to ensure the pending PRs are applied before the release? Anything I could do?
I have an actor inherited from ReceivePersistentActor
.
It save snapshots and then deletes the old snapshots after saving snapshot is successfully done using the following method:
DeleteSnapshots(new SnapshotSelectionCriteria(@event.Metadata.SequenceNr - 1));
Looks like this method doesn't delete relevant metadata records from MetaData table.
Is there any way to delete those records from MetaData table?
akka packages:
Akka 1.2.0
Akka.DI.AutoFac 1.0.8
Akka.DI.Core 1.0.8
Akka.Persistence 1.1.2.30-beta
Akka.Persistence.Sql.Common 1.1.2.30-beta
Akka.Persistence.SqlServer 1.1.1.7-beta
Akka.Remote 1.2.0
Here's the callstack:
Unhandled message from akka://Oddjob/system/akka.persistence.snapshot-store.sql-server : SaveSnapshotFailure<meta: SnapshotMetadata<pid: akamai-volume-usage:22, seqNr: 1053, timestamp: 0001.01.01>, cause: System.Data.SqlClient.SqlException (0x80131904): Incorrect syntax near the keyword 'WHERE'.
Incorrect syntax near the keyword 'SET'.
Incorrect syntax near '('.
at System.Data.SqlClient.SqlConnection.OnError(SqlException exception, Boolean breakConnection, Action1 wrapCloseInAction) at System.Data.SqlClient.SqlInternalConnection.OnError(SqlException exception, Boolean breakConnection, Action
1 wrapCloseInAction)
at System.Data.SqlClient.TdsParser.ThrowExceptionAndWarning(TdsParserStateObject stateObj, Boolean callerHasConnectionLock, Boolean asyncClose)
at System.Data.SqlClient.TdsParser.TryRun(RunBehavior runBehavior, SqlCommand cmdHandler, SqlDataReader dataStream, BulkCopySimpleResultSet bulkCopyHandler, TdsParserStateObject stateObj, Boolean& dataReady)
at System.Data.SqlClient.SqlCommand.FinishExecuteReader(SqlDataReader ds, RunBehavior runBehavior, String resetOptionsString)
at System.Data.SqlClient.SqlCommand.CompleteAsyncExecuteReader()
at System.Data.SqlClient.SqlCommand.EndExecuteNonQueryInternal(IAsyncResult asyncResult)
at System.Data.SqlClient.SqlCommand.EndExecuteNonQueryAsync(IAsyncResult asyncResult)
at System.Threading.Tasks.TaskFactory1.FromAsyncCoreLogic(IAsyncResult iar, Func
2 endFunction, Action1 endAction, Task
1 promise, Boolean requiresSynchronization)
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at Akka.Persistence.Sql.Common.Snapshot.AbstractQueryExecutor.d__26.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at Akka.Persistence.Sql.Common.Snapshot.SqlSnapshotStore.d__21.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at Akka.Util.Internal.AtomicState.d__8.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at Akka.Util.Internal.AtomicState.d__8.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at Akka.Pattern.HalfOpen.d__4.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at Akka.Pattern.CircuitBreaker.d__33.MoveNext()
ClientConnectionId:5a1a9a0c-5099-4e42-942b-cd935519dbfa
Error Number:156,State:1,Class:15>
I updated the references to 1.3 and the app throws the following exception:
System.MissingMethodException: Method not found: 'Void Akka.Persistence.Sql.Common.Journal.QueryConfiguration..ctor(System.String, System.String, System.String, System.String, System.String, System.String, System.String, System.String, System.String, System.String, System.String, System.TimeSpan)
akka packages:
Akka 1.3.0
Akka.DI.AutoFac 1.0.8
Akka.DI.Core 1.0.8
Akka.Logger.NLog 1.2.0
Akka.Persistence 1.3.0
Akka.Persistence.Sql.Common 1.3.0
Akka.Persistence.SqlServer 1.1.1.7-beta
Akka.Remote 1.3.0
full log:
2017-08-25 10:52:55.9321 [20] Info local-machine daemon Akka.Actor.Internal.ActorSystemImpl - akka : {
persistence : {
journal : {
plugin : akka.persistence.journal.sql-server
sql-server : {
class : "Akka.Persistence.SqlServer.Journal.SqlServerJournal, Akka.Persistence.SqlServer"
plugin-dispatcher : akka.actor.default-dispatcher
connection-string : "server=localhost; database=RapidBilling; user=sa; password=123456; Application Name=AdminBillingApi;"
connection-timeout : 30s
schema-name : dbo
table-name : AkkaEventJournal
auto-initialize : on
timestamp-provider : "Akka.Persistence.Sql.Common.Journal.DefaultTimestampProvider, Akka.Persistence.Sql.Common"
metadata-table-name : AkkaMetadata
}
}
snapshot-store : {
plugin : akka.persistence.snapshot-store.sql-server
sql-server : {
class : "Akka.Persistence.SqlServer.Snapshot.SqlServerSnapshotStore, Akka.Persistence.SqlServer"
plugin-dispatcher : akka.actor.default-dispatcher
connection-string : "server=localhost; database=RapidBilling; user=sa; password=123456; Application Name=AdminBillingApi;"
connection-timeout : 30s
schema-name : dbo
table-name : AkkaSnapshotStore
auto-initialize : on
}
}
}
loggers : ["Akka.Logger.NLog.NLogLogger, Akka.Logger.NLog"]
actor : {
debug : {
unhandled : on
receive : on
autoreceive : on
lifecycle : on
event-stream : on
}
}
stdout-loglevel : Error
loglevel : DEBUG
log-config-on-start : on
}
2017-08-25 10:52:55.9211 [7] Debug local-machine daemon Akka.Event.EventStream - subscribing [akka://BillingActorSystem/system/log1-NLogLogger#699980222] to channel Akka.Event.Error
2017-08-25 10:52:55.9211 [19] Warn local-machine daemon Akka.Actor.Internal.ActorSystemImpl - NewtonSoftJsonSerializer has been detected as a default serializer. It will be obsoleted in Akka.NET starting from version 1.5 in the favor of Hyperion (for more info visit: http://getakka.net/docs/Serialization#how-to-setup-hyperion-as-default-serializer ). If you want to suppress this message set HOCON `akka.suppress-json-serializer-warning` config flag to on.
2017-08-25 10:52:59.6202 [24] Error local-machine daemon Akka.Actor.OneForOneStrategy - Error while creating actor instance of type Akka.Persistence.SqlServer.Journal.SqlServerJournal with 1 args: ( class : "Akka.Persistence.SqlServer.Journal.SqlServerJournal, Akka.Persistence.SqlServer"
plugin-dispatcher : akka.actor.default-dispatcher
connection-string : "server=localhost; database=RapidBilling; user=sa; password=123456; Application Name=AdminBillingApi;"
connection-timeout : 30s
schema-name : dbo
table-name : AkkaEventJournal
auto-initialize : on
timestamp-provider : "Akka.Persistence.Sql.Common.Journal.DefaultTimestampProvider, Akka.Persistence.Sql.Common"
metadata-table-name : AkkaMetadata
)[akka://BillingActorSystem/system/akka.persistence.journal.sql-server#796296213]: Akka.Actor.ActorInitializationException: Exception during creation ---> System.TypeLoadException: Error while creating actor instance of type Akka.Persistence.SqlServer.Journal.SqlServerJournal with 1 args: ( class : "Akka.Persistence.SqlServer.Journal.SqlServerJournal, Akka.Persistence.SqlServer"
plugin-dispatcher : akka.actor.default-dispatcher
connection-string : "server=localhost; database=RapidBilling; user=sa; password=123456; Application Name=AdminBillingApi;"
connection-timeout : 30s
schema-name : dbo
table-name : AkkaEventJournal
auto-initialize : on
timestamp-provider : "Akka.Persistence.Sql.Common.Journal.DefaultTimestampProvider, Akka.Persistence.Sql.Common"
metadata-table-name : AkkaMetadata
) ---> System.Reflection.TargetInvocationException: Exception has been thrown by the target of an invocation. ---> System.MissingMethodException: Method not found: 'Void Akka.Persistence.Sql.Common.Journal.QueryConfiguration..ctor(System.String, System.String, System.String, System.String, System.String, System.String, System.String, System.String, System.String, System.String, System.String, System.TimeSpan)'.
at Akka.Persistence.SqlServer.Journal.SqlServerJournal..ctor(Config journalConfig)
--- End of inner exception stack trace ---
at System.RuntimeMethodHandle.InvokeMethod(Object target, Object[] arguments, Signature sig, Boolean constructor)
at System.Reflection.RuntimeConstructorInfo.Invoke(BindingFlags invokeAttr, Binder binder, Object[] parameters, CultureInfo culture)
at System.RuntimeType.CreateInstanceImpl(BindingFlags bindingAttr, Binder binder, Object[] args, CultureInfo culture, Object[] activationAttributes, StackCrawlMark& stackMark)
at System.Activator.CreateInstance(Type type, BindingFlags bindingAttr, Binder binder, Object[] args, CultureInfo culture, Object[] activationAttributes)
at System.Activator.CreateInstance(Type type, Object[] args)
at Akka.Actor.Props.ActivatorProducer.Produce()
at Akka.Actor.Props.NewActor()
--- End of inner exception stack trace ---
at Akka.Actor.Props.NewActor()
at Akka.Actor.ActorCell.CreateNewActorInstance()
at Akka.Actor.ActorCell.<>c__DisplayClass109_0.<NewActor>b__0()
at Akka.Actor.ActorCell.UseThreadContext(Action action)
at Akka.Actor.ActorCell.NewActor()
at Akka.Actor.ActorCell.Create(Exception failure)
--- End of inner exception stack trace ---
at Akka.Actor.ActorCell.Create(Exception failure)
at Akka.Actor.ActorCell.SysMsgInvokeAll(EarliestFirstSystemMessageList messages, Int32 currentState)[akka://BillingActorSystem/system/akka.persistence.journal.sql-server#796296213]: Akka.Actor.ActorInitializationException: Exception during creation ---> System.TypeLoadException: Error while creating actor instance of type Akka.Persistence.SqlServer.Journal.SqlServerJournal with 1 args: ( class : "Akka.Persistence.SqlServer.Journal.SqlServerJournal, Akka.Persistence.SqlServer"
plugin-dispatcher : akka.actor.default-dispatcher
connection-string : "server=localhost; database=RapidBilling; user=sa; password=123456; Application Name=AdminBillingApi;"
connection-timeout : 30s
schema-name : dbo
table-name : AkkaEventJournal
auto-initialize : on
timestamp-provider : "Akka.Persistence.Sql.Common.Journal.DefaultTimestampProvider, Akka.Persistence.Sql.Common"
metadata-table-name : AkkaMetadata
) ---> System.Reflection.TargetInvocationException: Exception has been thrown by the target of an invocation. ---> System.MissingMethodException: Method not found: 'Void Akka.Persistence.Sql.Common.Journal.QueryConfiguration..ctor(System.String, System.String, System.String, System.String, System.String, System.String, System.String, System.String, System.String, System.String, System.String, System.TimeSpan)'.
at Akka.Persistence.SqlServer.Journal.SqlServerJournal..ctor(Config journalConfig)
--- End of inner exception stack trace ---
at System.RuntimeMethodHandle.InvokeMethod(Object target, Object[] arguments, Signature sig, Boolean constructor)
at System.Reflection.RuntimeConstructorInfo.Invoke(BindingFlags invokeAttr, Binder binder, Object[] parameters, CultureInfo culture)
at System.RuntimeType.CreateInstanceImpl(BindingFlags bindingAttr, Binder binder, Object[] args, CultureInfo culture, Object[] activationAttributes, StackCrawlMark& stackMark)
at System.Activator.CreateInstance(Type type, BindingFlags bindingAttr, Binder binder, Object[] args, CultureInfo culture, Object[] activationAttributes)
at System.Activator.CreateInstance(Type type, Object[] args)
at Akka.Actor.Props.ActivatorProducer.Produce()
at Akka.Actor.Props.NewActor()
--- End of inner exception stack trace ---
at Akka.Actor.Props.NewActor()
at Akka.Actor.ActorCell.CreateNewActorInstance()
at Akka.Actor.ActorCell.<>c__DisplayClass109_0.<NewActor>b__0()
at Akka.Actor.ActorCell.UseThreadContext(Action action)
at Akka.Actor.ActorCell.NewActor()
at Akka.Actor.ActorCell.Create(Exception failure)
--- End of inner exception stack trace ---
at Akka.Actor.ActorCell.Create(Exception failure)
at Akka.Actor.ActorCell.SysMsgInvokeAll(EarliestFirstSystemMessageList messages, Int32 currentState)
The same source code works perfectly with the following packages:
Akka 1.2.0
Akka.DI.AutoFac 1.0.8
Akka.DI.Core 1.0.8
Akka.Persistence 1.1.2.30-beta
Akka.Persistence.Sql.Common 1.1.2.30-beta
Akka.Persistence.SqlServer 1.1.1.7-beta
Akka.Remote 1.2.0
Akka v1.3.11
Everything was working fine for months. I have a scheduled restart of the windows service hosting this application.
Today, I got this error.
2019-04-15 04:09:18,798 [1] INFO Archive.Api.Startup - ActorSystem Started !
2019-04-15 04:09:18,798 [1] INFO Archive.Api.Startup - ASP.NET application started !
2019-04-15 04:09:18,829 [11] INFO Archive.Shared.Export.VideoMaster - [PreStart] VideoMaster
2019-04-15 04:09:18,829 [27] INFO Archive.Shared.Export.ApiMaster - [PreStart] ApiMaster
2019-04-15 04:09:18,954 [30] ERROR Akka.Actor.OneForOneStrategy - Error while creating actor instance of type Akka.Persistence.SqlServer.Journal.SqlServerJournal with 1 args: ( class : "Akka.Persistence.SqlServer.Journal.SqlServerJournal, Akka.Persistence.SqlServer"
plugin-dispatcher : akka.actor.default-dispatcher
connection-string : "Server=.;Database=AkkaPersistenceArchiveDb;User ID=sa;Password=xxxxx;Connection Timeout=30;MultipleActiveResultSets=True; Pooling=True; Min Pool Size=2; Connect Timeout=250;"
connection-timeout : 30s
schema-name : dbo
table-name : EventJournal
auto-initialize : on
timestamp-provider : "Akka.Persistence.Sql.Common.Journal.DefaultTimestampProvider, Akka.Persistence.Sql.Common"
metadata-table-name : Metadata
)
[akka://ArchiveSystem/system/akka.persistence.journal.sql-server#15922092]: Akka.Actor.ActorInitializationException: Exception during creation ---> System.TypeLoadException: Error while creating actor instance of type Akka.Persistence.SqlServer.Journal.SqlServerJournal with 1 args: ( class : "Akka.Persistence.SqlServer.Journal.SqlServerJournal, Akka.Persistence.SqlServer"
plugin-dispatcher : akka.actor.default-dispatcher
connection-string : "Server=.;Database=AkkaPersistenceArchiveDb;User ID=sa;Password=xxxxx;Connection Timeout=30;MultipleActiveResultSets=True; Pooling=True; Min Pool Size=2; Connect Timeout=250;"
connection-timeout : 30s
schema-name : dbo
table-name : EventJournal
auto-initialize : on
timestamp-provider : "Akka.Persistence.Sql.Common.Journal.DefaultTimestampProvider, Akka.Persistence.Sql.Common"
metadata-table-name : Metadata
) ---> System.Reflection.TargetInvocationException: Exception has been thrown by the target of an invocation. ---> System.NullReferenceException: Object reference not set to an instance of an object.
at Akka.Persistence.Journal.AsyncWriteJournal..ctor()
at Akka.Persistence.Sql.Common.Journal.SqlJournal..ctor(Config journalConfig)
at Akka.Persistence.SqlServer.Journal.SqlServerJournal..ctor(Config journalConfig)
--- End of inner exception stack trace ---
at System.RuntimeMethodHandle.InvokeMethod(Object target, Object[] arguments, Signature sig, Boolean constructor)
at System.Reflection.RuntimeConstructorInfo.Invoke(BindingFlags invokeAttr, Binder binder, Object[] parameters, CultureInfo culture)
at System.RuntimeType.CreateInstanceImpl(BindingFlags bindingAttr, Binder binder, Object[] args, CultureInfo culture, Object[] activationAttributes, StackCrawlMark& stackMark)
at System.Activator.CreateInstance(Type type, BindingFlags bindingAttr, Binder binder, Object[] args, CultureInfo culture, Object[] activationAttributes)
at System.Activator.CreateInstance(Type type, Object[] args)
at Akka.Actor.Props.ActivatorProducer.Produce()
at Akka.Actor.Props.NewActor()
--- End of inner exception stack trace ---
at Akka.Actor.Props.NewActor()
at Akka.Actor.ActorCell.CreateNewActorInstance()
at Akka.Actor.ActorCell.<>c__DisplayClass109_0.<NewActor>b__0()
at Akka.Actor.ActorCell.UseThreadContext(Action action)
at Akka.Actor.ActorCell.NewActor()
at Akka.Actor.ActorCell.Create(Exception failure)
--- End of inner exception stack trace ---
at Akka.Actor.ActorCell.Create(Exception failure)
at Akka.Actor.ActorCell.SysMsgInvokeAll(EarliestFirstSystemMessageList messages, Int32 currentState)
2019-04-15 04:09:19,063 [30] WARN Archive.Shared.Cluster.ClusterStatus - ClusterLeader is null
2019-04-15 04:09:19,720 [11] INFO Akka.Event.DummyClassForStringSources - Cluster Node [akka.tcp://[email protected]:16666] - Welcome from [akka.tcp://[email protected]:4053]
...
CS_PID field doesn't seems to work the way, it was designed to. Therefore it should be dropped and PersistenceID field should be used instead.
I have created a sample application, which creates a snapshot every 25 events and removes all event messages until that time. But when it reload the actor, only the events are recovered and not the snapshot.
i am using the Akka.Persistence.SqlServer version 1.1.1.7-beta and Akka.Persistence.Sql.Common version 1.2.0.36-beta.
I have also observed that the metadata table never get any records... not sure whether its part of design.
In our application we are using Akka.net, with event sourcing. The persistent actors save their events in an SQL Server database.
We also have view actors, which subscribe to these events, using a journal reader/persistence query, to create materialised views. We have a table in the database, that has a row for every view actor. This row contains the name of the view actor and the offset of the last event prccessed.
At first sight, this is working smoothly. Sometimes however, when we run a test that results in thousands of events, the journal reader is missing some events.
A view actor is a ReceiveActor. When started, it retrieves the last handled event offset from the database (called from the actor's constructor). The offset is piped to self in an OffsetMessage.
On receiving the OffsetMessage the view actor initialises the journal reader. On receiving events (in EventEnvelope messages), the views are updated.
The action that is run from the journal reader, first writes a line to the log. That line contains the event offset.
The EventEnvelope receive handler also writes a line to the log. That line also contains the event offset.
We have a test that results in 9635 event inserted into the journal. Sometimes the journal reader and the EventEnvelope receive handler are logging less than 9635 events.
They both log the same numbers, so it seems the events are missed by the journal reader. The missed events from the log are corresponding to the missing items in the views.
We run the test on an empty database. Logging is at the debug level, and does not show exceptions. The missing events (we have seen numbers of 1 to 4) can be among the first, middle or last events. Everytime this is different.
So far we have no idea what is causing this problem, or how it can be solved.
Following are fragments of our code. The view actors all inherit from a base class: ViewActorBase.
internal abstract class ViewActorBase : ReceiveActor, ILogReceive
{
public ViewActorBase()
{
// Some initialisation code
....
this.Receive<OffsetMessage>(this.HandleOffsetMessage);
this.ReceiveAsync<EventEnvelope>(this.UpdateState);
var sender = this.Sender;
var self = this.Self;
this.GetViewActorOffset(self, sender);
}
private void HandleOffsetMessage(OffsetMessage offsetMessage)
{
this.InitialiseJournalReader(offsetMessage.Offset);
}
private void InitialiseJournalReader(long offset)
{
// obtain read journal by plugin id
var readJournal = PersistenceQuery.Get(Context.System).ReadJournalFor<SqlReadJournal>($"akka.persistence.query");
// materialize stream, consuming events
var materializer = ActorMaterializer.Create(Context.System);
// issue query to journal
Source<EventEnvelope, NotUsed> source = readJournal.EventsByTag(this.QueryEventTag, new Sequence(offset));
var self = this.Self;
source.RunForeach(envelope => { this.Logger.Debug("{Date:HH:mm:ss.fffff} JournalReader.Tell {Offset}", DateTime.Now, (envelope.Offset as Sequence).Value); self.Tell(envelope); }, materializer);
}
private void GetViewActorOffset(IActorRef self, IActorRef sender)
{
// Initialise repository
....
repository.GetViewActorOffset(this.GetViewName()).PipeTo(self, sender, offset => new OffsetMessage(offset));
}
}
internal class MyViewActor : ViewActorBase
{
protected override async Task UpdateState(EventEnvelope envelope)
{
var offset = (envelope.Offset as Sequence).Value;
this.Logger.Debug("{Date:HH:mm:ss.fffff} {MethodName} {Offset}", DateTime.Now, $"{this.GetType().Name}.UpdateState", offset);
// Update views
....
}
}
Is there something wrong in our code or architecture? Are there better solutions?
Additional information
We have run some tests with SQL Server profiler monitoring the queries to the database.
A query was executed on the event journal, asking for 100 events, starting at offset 204743. The result contained 61 rows.
<Event id="10" name="RPC:Completed">
<Column id="1" name="TextData">exec sp_executesql N'
SELECT TOP (@Take)
e.PersistenceId as PersistenceId,
e.SequenceNr as SequenceNr,
e.Timestamp as Timestamp,
e.IsDeleted as IsDeleted,
e.Manifest as Manifest,
e.Payload as Payload,
e.SerializerId as SerializerId,
e.Ordering as Ordering
FROM dbo.EventJournal e
WHERE e.Ordering > @Ordering AND e.Tags LIKE @Tag
ORDER BY Ordering ASC
',N'@Tag nvarchar(10),@Ordering bigint,@Take bigint',@Tag=N'%;Module;%',@Ordering=204743,@Take=100</Column>
<Column id="9" name="ClientProcessID">1169425116</Column>
<Column id="10" name="ApplicationName">Core .Net SqlClient Data Provider</Column>
<Column id="12" name="SPID">82</Column>
<Column id="13" name="Duration">353890</Column>
<Column id="14" name="StartTime">2018-08-30T16:32:32.927+02:00</Column>
<Column id="15" name="EndTime">2018-08-30T16:32:33.28+02:00</Column>
<Column id="16" name="Reads">326</Column>
<Column id="17" name="Writes">0</Column>
<Column id="18" name="CPU">0</Column>
<Column id="48" name="RowCounts">61</Column>
</Event>
We expexted the next query to start at 204804 (204743 + 61). However, it started at 204810. Why is it skipping (or missing) 6 events?
We could run our tests under different versions of SqlServer
https://www.appveyor.com/docs/services-databases
Due to the changes to Akka.Persistence.Sql.Common QueryConfiguration and the addition of the extra parameter of useSequentialAccess this package does not operate with Akka 1.3.7
I have updated the code to support this parameter in #96
Current build script doesn't run specs on .NET Core yet
I'm getting strange error when my persistent actor failed to save snaphot:
Error in saving snapshot SnapshotMetadata<pid: TaskManager, seqNr: 218, timestamp: 0001-01-01>
Cause: System.NotSupportedException: There is no active ActorContext, this is most likely due to use of async operations from within this actor.
How can I avoid this kind of SaveSnapshotFailure?
Hey guys,
It looks like in 1.0.5 and on "SqlServerPersistence.Init" is missing -- and there's no documented way to initialize this extension.
The README for this repo still says that this is the correct way to initialize the extension. -- Please update the readme, because whatever the new method is, it's not clear, and it means we can't use a newer version than 1.0.4. :(
Adding the column SerializerId
per changes in v1.3.1 may not be sufficient as we are now possibly inserting NULL
into the Manifest column per this line: https://github.com/akkadotnet/akka.net/blob/dev/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/QueryExecutor.cs#L654
If the serializer is set to not include manifest info, then we insert NULL, which is not allowed per the existing schema: https://github.com/akkadotnet/akka.net/blob/dev/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/QueryExecutor.cs#L654
CreateEventsJournalSql = $@"
IF NOT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '{configuration.SchemaName}' AND TABLE_NAME = '{configuration.JournalEventsTableName}')
BEGIN
CREATE TABLE {configuration.FullJournalTableName} (
{configuration.OrderingColumnName} BIGINT IDENTITY(1,1) PRIMARY KEY NOT NULL,
{configuration.PersistenceIdColumnName} NVARCHAR(255) NOT NULL,
{configuration.SequenceNrColumnName} BIGINT NOT NULL,
{configuration.TimestampColumnName} BIGINT NOT NULL,
{configuration.IsDeletedColumnName} BIT NOT NULL,
{configuration.ManifestColumnName} NVARCHAR(500) NOT NULL,
{configuration.PayloadColumnName} VARBINARY(MAX) NOT NULL,
{configuration.TagsColumnName} NVARCHAR(100) NULL,
CONSTRAINT UQ_{configuration.JournalEventsTableName} UNIQUE ({configuration.PersistenceIdColumnName}, {configuration.SequenceNrColumnName})
);
CREATE INDEX IX_{configuration.JournalEventsTableName}_{configuration.SequenceNrColumnName} ON {configuration.FullJournalTableName}({configuration.SequenceNrColumnName});
CREATE INDEX IX_{configuration.JournalEventsTableName}_{configuration.TimestampColumnName} ON {configuration.FullJournalTableName}({configuration.TimestampColumnName});
END
";
May need an additional ALTER COLUMN
statement in README to make Manifest
nullable for backwards compatibility with 1.1.1.7-beta
Hi there. The example HOCON file appears to contain an error in the 'akka.persistence.snapshot-store.sql-server.plugin-dispatcher' field:
# dispatcher used to drive journal actor
plugin-dispatcher = ""akka.actor.default-dispatcher""
In order to prevent bugs when the default config changes, every persistence plugin should have an implementation of this Test:
https://github.com/akkadotnet/akka.net/blob/dev/src/core/Akka.Cluster.Tests/ClusterConfigSpec.cs
After updating the NuGet package to version 1.0.6.3 I'm getting an error:
[Thread 0014][akka://TaskManagementSystem/system/akka.persistence.journal.sql-server] Object reference not set to an instance of an object.
Cause: [akka://TaskManagementSystem/system/akka.persistence.journal.sql-server]: Akka.Actor.ActorInitializationException: Exception during creation ---> System.NullReferenceException: Object reference not set to an instance of an object.
Without changing anything to the project I have made downgrade to the package 1.0.6.0 and my system starter working propertly.
New transaction is not allowed because there are other threads running in the session.
at System.Data.SqlClient.SqlConnection.OnError(SqlException exception, Boolean breakConnection, Action`1 wrapCloseInAction)
at System.Data.SqlClient.TdsParser.ThrowExceptionAndWarning(TdsParserStateObject stateObj, Boolean callerHasConnectionLock, Boolean asyncClose)
at System.Data.SqlClient.TdsParser.TryRun(RunBehavior runBehavior, SqlCommand cmdHandler, SqlDataReader dataStream, BulkCopySimpleResultSet bulkCopyHandler, TdsParserStateObject stateObj, Boolean& dataReady)
at System.Data.SqlClient.TdsParser.Run(RunBehavior runBehavior, SqlCommand cmdHandler, SqlDataReader dataStream, BulkCopySimpleResultSet bulkCopyHandler, TdsParserStateObject stateObj)
at System.Data.SqlClient.TdsParser.TdsExecuteTransactionManagerRequest(Byte[] buffer, TransactionManagerRequestType request, String transactionName, TransactionManagerIsolationLevel isoLevel, Int32 timeout, SqlInternalTransaction transaction, TdsParserStateObject stateObj, Boolean isDelegateControlRequest)
at System.Data.SqlClient.SqlInternalConnectionTds.ExecuteTransactionYukon(TransactionRequest transactionRequest, String transactionName, IsolationLevel iso, SqlInternalTransaction internalTransaction, Boolean isDelegateControlRequest)
at System.Data.SqlClient.SqlInternalConnection.BeginSqlTransaction(IsolationLevel iso, String transactionName, Boolean shouldReconnect)
at System.Data.SqlClient.SqlConnection.BeginTransaction(IsolationLevel iso, String transactionName)
at System.Data.SqlClient.SqlConnection.BeginDbTransaction(IsolationLevel isolationLevel)
at Akka.Persistence.Sql.Common.Journal.JournalDbEngine.<InsertInTransactionAsync>d__10.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at Akka.Persistence.Sql.Common.Journal.JournalDbEngine.<WriteMessagesAsync>d__6.MoveNext()
Wanted to have a Mono build in CI to validate that the plugin builds on Mono, but in order to trigger the Docker-related Powershells from within build.fsx, had to import System.Management.Automation which throws here: http://petabridge-ci.cloudapp.net/viewLog.html?buildId=24898&tab=buildResultsDiv&buildTypeId=AkkaNet_AkkaPersistenceImplementations_AkkaPersistenceSqlServer_AkkaP
The other plugins spin up containers in separate build steps - with the building and running tests done inside of build.fsx. Think this is a better approach, to decouple the Docker infrastructure from the build scripts.
using 1.1.5 pre-release version of the plugin but the issue was seen with earlier versions too
an example -
Rejected to persist event type ["BrokerDealer.Contracts.Messages.MessageHandled"] with sequence number [59] for persistenceId ["/user/CashFundingManagementView/$Gb"] due to ["Violation of PRIMARY KEY constraint 'PK_AKKAJournal'. Cannot insert duplicate key in object 'dbo.AKKAJournal'. The duplicate key value is (/user/CashFundingManagementView/$Gb, 59).
I see higher incidence of this with higher actor instance count
what we do is very simple - every incoming message is persisted into the journal and when the message is handled the corresponding MessageHandled is appended to the journal - this allows us to only replay messages that have not yet been processed
I see sufficient time span between original message under certain persistence id and sequence number and the exception e.g.
this is the exception from the log (note the TS)
2016-09-21 10:34:42.843 +01:00 [Warning] Rejected to persist event type ["BrokerDealer.Contracts.Messages.Feed.CashManager.GrossFlow"] with sequence number [8] for persistenceId ["/user/CashFundingManagementView/$h"] due to ["Violation of PRIMARY KEY constraint 'PK_AKKAJournal'. Cannot insert duplicate key in object 'dbo.AKKAJournal'. The duplicate key value is (/user/CashFundingManagementView/$h, 8).
this is the original journal entry
SELECT [PersistenceID]
,[SequenceNr]
,[Timestamp]
,[IsDeleted]
,[Manifest]
,[Payload]
,[Tags]
FROM [BrokerDealer].[dbo].[AKKAJournal] where PersistenceID='/user/CashFundingManagementView/$h' and SequenceNr=8
I have removed the payload for brewity
PersistenceID SequenceNr Timestamp IsDeleted Manifest
1 /user/CashFundingManagementView/$H 8 636100472666516918 0 BrokerDealer.Contracts.Messages.Feed.CashManager.GrossFlow, BrokerDealer.Contracts
unix timestamp converts to 2016-09-21โT09:34:26.651Z (vs the exception time of 2016-09-21 10:34:42.843 +01:00
We are aware of the fact that this is pre-release so I would also like to ask for suggestions re alternative approach/other akka persistance plugin implementations that would behave stable
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.