Code Monkey home page Code Monkey logo

aerospike-client-csharp's People

Contributors

atarassov-ttd avatar briannichols avatar dependabot[bot] avatar hexawyz avatar jac21 avatar leons727 avatar m-wilmo avatar olviko avatar pygupta avatar sanjalinagare57 avatar shannonklaus avatar verdie-g avatar wchu-citrusleaf avatar windsnow98 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

aerospike-client-csharp's Issues

Support for .NET Core

Are there any plans for the aerospike client for .NET to support the new .NET Core version? This is the .NET version that will run on Windows, OSX and Linux.

Replace BinaryFormatter for .net 5

Hello, we migrated our app to .net 5 , but we had issues with the aerospike client because it uses BinaryFormatter which is disabled by default.
In order to enable it we used <EnableUnsafeBinaryFormatterSerialization>true</EnableUnsafeBinaryFormatterSerialization> in the csproj file. Do you plan to replace this in a future version ?

Best regards,
Ionut

Intermittent Timeout Exceptions after upgrading to Aerospike (5.6->6.0) and client (4.7.2->5.3.0)

Hello, after we upgraded the server and client to the aforementioned versions we started noticing intermittent Aerospike.Timeout error for random write operations (from full record create operation to simple int bin (boolean) update operation), most (if not all) settings are default from server and client point of view.

What we noticed is that client is throwing Timeout exceptions but server metrics in Grafana are not showing any indication of that, server logs are also only info.

We've checked all the suggestions mentioned here: https://support.aerospike.com/s/article/Warning-write-fail-queue-too-deep to potentially look for answers but to no avail.

We noticed that most of the time when a timeout occurs server has this log: https://docs.aerospike.com/reference/server-log#1663663594

With all that said, could it be that client has timeout issues in 5.3.0 as well, as we don't see any indication from server of requests being timed out?

The client configuration is also pretty straightforward:

return services.AddSingleton<IAsyncClient>(
         new AsyncClient(new AsyncClientPolicy
         {
              asyncMaxCommandAction = MaxCommandAction.DELAY,
         }, hosts));
}

everything else is the default.

Negative int problem

In versions 3.9.4 and 3.9.5 when inserting negative int values into set, real value inserted is unit.Max (4294967295). while retreiving value client turns it back int same negative value but the actually inserted value is wrong. "this makes wrong indexing scan".
this code works correctly until version 3.9.3:

var bins = new Bin[] {new Bin("Test", -1)};
client.Add(policy, key, bins);

Fully implement INSERT_BOUNDED

According to the documentation, Aerospike supports inserting into a list using the INSERT_BOUNDED flag, which does the following:

Do not insert past index N, where N is element count

The flag for this feature is implemented:

However there doesn't appear to be a unit test or otherwise obvious way to provide the parameter value for 'N', leading me to believe it's not fully implemented.

Error using AsyncClient on .Net Core

When using real async call with CancellationToken async client is throwing error for Put and Get methods (that I tested)

Here is a sample call:

await client.Put(null, CancellationToken.None, GetKey("4"), new Bin("test", "2"));

I went at your test code AsyncPutGetWithTask but I don't know why here only sync calls are working

Discussion to reduce garbage collection pressure

After migrating to aerospike, a bump of gen 2 collections could be observed which increased the time spent in the gc. Using dotmemory on one of our production service, I can see many aerospike objects collected in gen 2.
dotmemory

For a single aerospike get, many objects are allocated:

  • Key
  • StringValue
  • string (key)
  • AsyncRead
  • Stopwatch (fixed by #67)
  • Partition
  • RecordListenerAdapter
  • TaskCompletionSource
  • Task
  • Record
  • Dictionary<string. object> (bins key/value)

Apparently all these objects can reach gen 2 during the round-trip to Aerospike. I'd like to start a discussion about what we can do about it. For example would it be possible to pool Key objects?

ConnectionStats.Bytes{Read/Written}

What do you think of adding bytes read and written to the socket in the ConnectionStats object? I could work on the PR if you're interested.

Request for Improved Release Documentation

I was wondering if it would be possible to get a greater amount of detail in the release documentation for major versions.

In the latest release for example, we seem to have added new code for Cluster Connection Management, yet there isn't anything that mentions why this was added or what initiated the change.

Additionally, we seem to have a new proxy feature for GRPC, but there's not a ton of documentation on how to use it, what advantages it has, etc.

Is this something we could help with? Unfortunately in this particular release version, I'm hesitant to upgrade without more insight into what has changed since it's a singular commit with over 13k lines changed.

AsyncClient stuck on many commands

Hello.
We use AsyncClient to perform write operations on many keys using TaskCompletionSource (or Channel<T> from System.Threading.Channels) to wait for all keys to complete.

After updating the client to version 5.2.1 we encountered a strange behavior.

Consider the following test :

    [Parallelizable(ParallelScope.None)]
    public class AsyncClientTest
    {
        private readonly Host[] _aerospikeHosts;

        public AsyncClientTest()
        {
            _aerospikeHosts = new Host[]
            {
                /* list of our aerospike hosts */
            };
        }

        [TestCase(MaxCommandAction.DELAY, 3000)]
        [TestCase(MaxCommandAction.DELAY, 2000)]
        public async Task PutLargeBatch_Test(MaxCommandAction maxCommandAction, int itemCount)
        {
            using var asyncClient = new AsyncClient(new AsyncClientPolicy
            {
                // some our defaults:
                asyncMaxCommandAction = maxCommandAction,
                useServicesAlternate = true,
                writePolicyDefault = new WritePolicy
                {
                    sendKey = true,
                    totalTimeout = 30000,
                    socketTimeout = 5000,
                    recordExistsAction = RecordExistsAction.CREATE_ONLY,
                    expiration = 600,
                    maxRetries = 0
                },
                asyncMaxCommands = 200
            }, _aerospikeHosts);

            using var cts = new CancellationTokenSource(60000);

            var data = Enumerable.Range(0, itemCount).Select(_ => new TestModel()).ToArray();

            var completionSource = new AerospikeOperationCompletionSource(data.Length, cts.Token);

            foreach (var item in data)
            {
                var key = new Key(ns: "stage", setName: "as_client_test", item.Id.ToString());

                var writeListener = new AerospikeWriteListener(key, completionSource);

                asyncClient.Put(null, writeListener, key, new Bin("id", item.Id.ToString()));
            }

            // wait for all keys to complete
            Func<Task> asyncAction = () => completionSource.Task;

            await asyncAction.Should().NotThrowAsync().ConfigureAwait(false);

            var results = await completionSource.Task.ConfigureAwait(false);

            results.Should().HaveCount(data.Length);

            results.Should().AllSatisfy(r => r.Success.Should().BeTrue());
        }

        public class AerospikeResult
        {
            private readonly AerospikeException _exception;

            public AerospikeResult(Key key, AerospikeException exception = null)
            {
                Argument.IsNotNull(key, nameof(key));

                Key = key;

                _exception = exception;
            }

            public Key Key { get; }

            public bool Success => _exception == null;
        }

        public class AerospikeOperationCompletionSource
        {
            private int _operationCount;
            private readonly TaskCompletionSource<AerospikeResult[]> _tcs;
            private readonly CancellationTokenRegistration _ctr;

            private readonly ConcurrentBag<AerospikeResult> _results;

            public AerospikeOperationCompletionSource(int operationCount, CancellationToken cancellationToken = default)
            {
                if (operationCount < 1)
                {
                    throw new ArgumentOutOfRangeException(nameof(operationCount));
                }

                _operationCount = operationCount;

                _tcs = new TaskCompletionSource<AerospikeResult[]>(TaskCreationOptions.RunContinuationsAsynchronously);
                _ctr = cancellationToken.Register(() => _tcs.TrySetCanceled(), useSynchronizationContext: false);

                _results = new ConcurrentBag<AerospikeResult>();
            }

            public Task<AerospikeResult[]> Task => _tcs.Task;

            public void OnOperationPerformed(AerospikeResult result)
            {
                Argument.IsNotNull(result, nameof(result));

                _results.Add(result);

                if (Interlocked.Decrement(ref _operationCount) == 0)
                {
                    _tcs.SetResult(_results.ToArray());
                }
            }

            public void Dispose()
            {
                _ctr.Dispose();
            }
        }

        public sealed class AerospikeWriteListener : WriteListener
        {
            private readonly Key _key;
            private readonly AerospikeOperationCompletionSource _completionSource;

            public AerospikeWriteListener(Key key, AerospikeOperationCompletionSource completionSource)
            {
                _key = key;
                _completionSource = completionSource;
            }

            public void OnSuccess(Key key)
            {
                _completionSource.OnOperationPerformed(new AerospikeResult(key));
            }

            public void OnFailure(AerospikeException exception)
            {
                _completionSource.OnOperationPerformed(new AerospikeResult(_key, exception));
            }
        }

        public class TestModel
        {
            public TestModel()
            {
                Id = Guid.NewGuid();
            }

            public Guid Id { get; set; }
        }
    }

The test fails if item count is greater than ~2000 items at client version 5.2.1 meaning that the client starts ignoring some queued keys and doesn't return any result on it (no exceptions).

But at 5.2.0 and earlier the test succeeds for 100000+ items with the same client policy configuration and normally can throw timeout exception if key count becomes too large to complete them quickly.

I tried to log operation completions to file (without using cancellation) and then I saw that after a few thousand completions nothing is happening (no invocations of listener's methods). Increasing total timeout doesn't change the behaviour.

Server version: aerospike:ce-6.0.0.1

Mono Crash

I'm getting this error on Mono 4.0.1 running on Debian, haven't chaced where it comes exactly but could come from the index create. This is quite bad since it crashes the whole runtime with it. Ideas?

Stacktrace:

  at <unknown> <0xffffffff>
  at (wrapper managed-to-native) object.__icall_wrapper_mono_object_isinst (object,intptr) <0xffffffff>
  at (wrapper stelemref) object.virt_stelemref_class (intptr,object) <0xffffffff>
  at Aerospike.Client.PartitionParser.DecodeBitmap (Aerospike.Client.Node,Aerospike.Client.Node[],int) <0x000e7>
  at Aerospike.Client.PartitionParser.ParseReplicasMaster (Aerospike.Client.Node) <0x001d7>
  at Aerospike.Client.PartitionParser..ctor (Aerospike.Client.Connection,Aerospike.Client.Node,System.Collections.Generic.Dictionary`2<string, Aerospike.Client.Node[][]>,int,bool) <0x00173>
  at Aerospike.Client.Cluster.UpdatePartitions (Aerospike.Client.Connection,Aerospike.Client.Node) <0x00067>
  at Aerospike.Client.Node.UpdatePartitions (Aerospike.Client.Connection,System.Collections.Generic.Dictionary`2<string, string>) <0x00167>
  at Aerospike.Client.Node.Refresh (System.Collections.Generic.List`1<Aerospike.Client.Host>) <0x000ef>
  at Aerospike.Client.Cluster.Tend (bool) <0x0018f>
  at Aerospike.Client.Cluster.Run () <0x0002b>
  at System.Threading.Thread.StartInternal () <0x000bf>
  at (wrapper runtime-invoke) object.runtime_invoke_void__this__ (object,intptr,intptr,intptr) <0xffffffff>

Native stacktrace:

    mono() [0x4c097c]
    mono() [0x52652e]
    mono() [0x43627d]
    /lib/x86_64-linux-gnu/libpthread.so.0(+0xf0a0) [0x7fd315feb0a0]
    mono(mono_class_is_assignable_from+0x22) [0x534702]
    mono(mono_object_isinst+0x3d) [0x5c892d]
    [0x414ca1b8]

Consider replacement for Iconic.Zlib.Netstandard

Package Iconic.Zlib.Netstandard is outdated and unmaintained, targeted at long deprecated .NETStandard 1.3. This causes issues for consumers as it pulls in a lot of unexpected old packages.

Other implementations of Zlib available on NuGet aren't much better (but at least they have netstandard-2.0 available) - but they aren't signed, which I understand is a requirement for Aerospike.Client?

Considering the only public type needed by ByteUtil is ZlibStream, perhaps inlining this type (and it's dependencies) could be a reasonable approach?

Util.Sleep() causing spike in CPU consumption

While inserting the record, we have initialized the WritePolicy and set the - sleepBetweenRetries = 50

The below screenshot is stack trace from Dump analysis
image

Gone through the code and found that Util.Sleep() internally calls - Thread.Sleep()

image

It looks like, once thread goes into sleep mode it unable wake up after sleep duration is expired.

Regards,
Mukesh

System.IndexOutOfRangeException When Unpacking Map

When trying to use the latest Aerospike C# driver (3.2.4) calling AerospikeClient.Get() on certain existing records throw the following exception.

The version of Aerospike we are using is 3.9.1, though we had seen this same problem when we were on the 3.7.x version. We upgraded Aeropsike thinking maybe there was a compatibility issue between the server and driver versions but that doesn't seem to be the case.

System.IndexOutOfRangeException: Index was outside the bounds of the array. at Aerospike.Client.Unpacker.CreateMap(Int32 count) in d:\SourceCode\Aerospike\aerospike-client-csharp-3.2.4\AerospikeClient\Util\Unpacker.cs:line 178 at Aerospike.Client.Unpacker.UnpackMap(Int32 count) in d:\SourceCode\Aerospike\aerospike-client-csharp-3.2.4\AerospikeClient\Util\Unpacker.cs:line 129 at Aerospike.Client.ByteUtil.BytesToParticle(Int32 type, Byte[] buf, Int32 offset, Int32 len) in d:\SourceCode\Aerospike\aerospike-client-csharp-3.2.4\AerospikeClient\Command\ByteUtil.cs:line 83 at Aerospike.Client.ReadCommand.ParseRecord(Int32 opCount, Int32 fieldCount, Int32 generation, Int32 expiration) in d:\SourceCode\Aerospike\aerospike-client-csharp-3.2.4\AerospikeClient\Command\ReadCommand.cs:line 161 at Aerospike.Client.ReadCommand.ParseResult(Connection conn) in d:\SourceCode\Aerospike\aerospike-client-csharp-3.2.4\AerospikeClient\Command\ReadCommand.cs:line 98 at Aerospike.Client.SyncCommand.Execute() in d:\SourceCode\Aerospike\aerospike-client-csharp-3.2.4\AerospikeClient\Command\SyncCommand.cs:line 102 at Aerospike.Client.AerospikeClient.Get(Policy policy, Key key) in d:\SourceCode\Aerospike\aerospike-client-csharp-3.2.4\AerospikeClient\Main\AerospikeClient.cs:line 438 ...

AsyncConnection is incompatible with MacOS

AsyncConnection sets
this.socket.SendBufferSize = 0; this.socket.ReceiveBufferSize = 0; in constructor.
This value is illegal for MacOS. See https://github.com/dotnet/corefx/issues/16716

Aerospike.Client.AerospikeException+Connection: Error -8,4,BB9020011AC4202 127.0.0.1 3000: Invalid argument ---> System.Net.Sockets.SocketException: Invalid argument at System.Net.Sockets.Socket.UpdateStatusAfterSocketErrorAndThrowException(SocketError error, String callerName) at System.Net.Sockets.Socket.SetSocketOption(SocketOptionLevel optionLevel, SocketOptionName optionName, Int32 optionValue, Boolean silent) at System.Net.Sockets.Socket.SetSocketOption(SocketOptionLevel optionLevel, SocketOptionName optionName, Int32 optionValue) at System.Net.Sockets.Socket.set_SendBufferSize(Int32 value) at Aerospike.Client.AsyncConnection..ctor(IPEndPoint address, AsyncCluster cluster, AsyncNode node) --- End of inner exception stack trace --- at Aerospike.Client.AsyncConnection..ctor(IPEndPoint address, AsyncCluster cluster, AsyncNode node) at Aerospike.Client.AsyncCommand.ExecuteCommand()

Why does Aerospike not follow msgpack?

Aerospike mostly follows msgpack format, but for certain specific cases, it does not.

When writing a string to aerospike, on subsequent reads from the server, the buffer will correctly be prefiexed with the correct type and size according to msgpack, but seems to contain extra data. See https://github.com/msgpack/msgpack/blob/master/spec.md#str-format-family

Let's say we have a string whose length is 32. We would expect, based on msgpack, for the serialized version of this string to look like
[0xd9, 32, data]

When reading this string from Aerospike, we will see the buffer look like
[0xd9, 32, 3, data]
We can see this clearly when calling the UnpackString() method in the Unpacker class.

https://github.com/aerospike/aerospike-client-csharp/blob/master/AerospikeClient/Util/Unpacker.cs#L670
->
https://github.com/aerospike/aerospike-client-csharp/blob/master/AerospikeClient/Util/Unpacker.cs#L682-L686
->
https://github.com/aerospike/aerospike-client-csharp/blob/master/AerospikeClient/Util/Unpacker.cs#L717-L721

where
ParticleType.STRING == 3 according to https://github.com/aerospike/aerospike-client-csharp/blob/master/AerospikeClient/Command/ParticleType.cs#L25

Can someone explain the purpose of this, as well as if it's documented anywhere where Aerospike buffer format diverges from msgpack?

The Unpacker class claims to be following msgpack https://github.com/aerospike/aerospike-client-csharp/blob/master/AerospikeClient/Util/Unpacker.cs#L23-L27

Error on Put data at a single-bin namespace

As i described at https://discuss.aerospike.com/t/error-put-data-into-single-bin-namespace-error-code-12-bin-type-error/1548 I'm facing "Error Code 12: Bin type error" when put data at a single-bin namespace.

string evidence = "<xml>test data</xml>";
string hash = GenerateMD5Hash(evidence); 
//This is used to generate a hash to be used as Key, since some content will generate same key

WritePolicy writePolicy = new WritePolicy();
writePolicy.recordExistsAction = RecordExistsAction.UPDATE;
writePolicy.commitLevel = CommitLevel.COMMIT_MASTER;
writePolicy.expiration = 604800;
writePolicy.timeout = 300;

string setName = "CustomSystemDiagnosticsConsoleSampleApplication_1_0_0";
Key key = new Key("evidence", setName, hash);
client.Put(writePolicy, key, new Bin(string.Empty, evidence));

Any idea?

Transitive dependency issue

It seems that the .NET Standard version of the client is unable to be referenced by a .NET Framework project.

Given the scenario:

  • .NET Standard library A references AerospikeClient NuGet package from NuGet.org
  • .NET Framework application B references .NET Standard library A
    ... results in a compiler warning:
There was a conflict between "AerospikeClient, Version=3.6.0.0, Culture=neutral, PublicKeyToken=26e01ad6884636d6" and "AerospikeClient, Version=3.6.3.0, Culture=neutral, PublicKeyToken=26e01ad6884636d6".
      "AerospikeClient, Version=3.6.0.0, Culture=neutral, PublicKeyToken=26e01ad6884636d6" was chosen because it was primary and "AerospikeClient, Version=3.6.3.0, Culture=neutral, PublicKeyToken=26e01ad6884636d6" was not.

... and a runtime error:
System.IO.FileLoadException: 'Could not load file or assembly 'AerospikeClient, Version=3.6.3.0, Culture=neutral, PublicKeyToken=26e01ad6884636d6' or one of its dependencies. The located assembly's manifest definition does not match the assembly reference. (Exception from HRESULT: 0x80131040)'

(Note that library A in our scenario must be .NET Standard so it can be consumed by both .NET Framework and .NET Core applications, so referencing the .NET Framework version of AerospikeClient is not an option.)

A couple of interesting bits:

  • Note the "3.6.0" reference in the detailed output above. Where did that come from?
  • When I build the AerospikeClient package locally, then use that package instead, the problem disappears. With no code changes at all.

This problem appears on every machine that I've seen. We're using Visual Studio 2017 (although the problem also existed back when 2015 was used to build it).

The only conclusion I have is that maybe there's an issue with the environment on the machine in which the nuget package is built.

To reproduce the error:

  • Create a .NET Standard library that references the AerospikeClient nuget package, and create a method that uses the client (constructs a client object for example).
  • Create a .NET Framework console app that references the library above, and also calls the method, ensuring that the Aerospike client actually gets referenced.

Thanks,
Brian

Bin name length greater than 14 characters or maximum bins exceeded error!

Hi all,

I have cloned the github. Use Aerospike docker image to setup the Aerospike server. Try to run the demo but it throw the following exception

2018-11-19 15:54:10 INFO Put: namespace=test set=demoset key=batchkey1 bin=batchbin value=batchvalue1
2018-11-19 15:54:10 ERROR Error 21 from BB9020011AC4202 ::1 3000: Bin name length greater than 14 characters or maximum bins exceeded
   at Aerospike.Client.WriteCommand.ParseResult(Connection conn) in C:\dev\aerospike-client-csharp\Framework\AerospikeClient\Command\WriteCommand.cs:line 50
   at Aerospike.Client.SyncCommand.Execute(Cluster cluster, Policy policy, Key key, Node node, Boolean isRead) in C:\dev\aerospike-client-csharp\Framework\AerospikeClient\Command\SyncCommand.cs:line 115
   at Aerospike.Client.AerospikeClient.Put(WritePolicy policy, Key key, Bin[] bins) in C:\dev\aerospike-client-csharp\Framework\AerospikeClient\Main\AerospikeClient.cs:line 274
   at Aerospike.Demo.Batch.WriteRecords(AerospikeClient client, Arguments args, String keyPrefix, String binName, String valuePrefix, Int32 size) in C:\dev\aerospike-client-csharp\Framework\AerospikeDemo\Batch.cs:line 78
   at Aerospike.Demo.Batch.RunExample(AerospikeClient client, Arguments args) in C:\dev\aerospike-client-csharp\Framework\AerospikeDemo\Batch.cs:line 39
   at Aerospike.Demo.SyncExample.RunExample(Arguments args) in C:\dev\aerospike-client-csharp\Framework\AerospikeDemo\SyncExample.cs:line 45
   at Aerospike.Demo.Example.Run(Arguments args) in C:\dev\aerospike-client-csharp\Framework\AerospikeDemo\Example.cs:line 44
   at Aerospike.Demo.ExampleTreeNode.Run(Arguments args) in C:\dev\aerospike-client-csharp\Framework\AerospikeDemo\DemoForm.cs:line 689
   at Aerospike.Demo.DemoForm.RunExampleThread(Object data) in C:\dev\aerospike-client-csharp\Framework\AerospikeDemo\DemoForm.cs:line 370

so if I change the bin name from "batchbin" to "mybin" then it works. I have no idea why!

I am using Windows10, Aerospike docker image and .NET core 2.1 and .NET framwork 4.6.7!

Any ideas?

Many thanks

StackOverflow Exception in AsyncCommand

Some information for the beginning:
Client version 3.9.1
.NET Core 3.1
Win10

Description
The problem I have is that Send and SendEvent methods in AsyncConnection fall in very long loop, potentially infinite that ends with StackOverflowException as they call each other recursively.

Issue occurs in very specific circumstances - Aerospike cluster to connect to sits within VPN. Issue occurs when I invoke .Get method on AsyncClient when VPN is disconnected.

The way I make the problem happen is:

  1. Connect to VPN
  2. Initialize AsyncClient
  3. Disconnect from VPN
  4. Invoke .Get method with minimal data (only namespace, set and empty key)
  5. I sometimes get StackOverflow, sometimes not (~70% of times I get them)

Async query doesn't return all objects that are subject of the filter

Hi,
we've recently updated our aerospike server to 6.0+ version to alleviate our secondary indexes queries. We've also updated our client (4.2.0->5.2.2). However, shortly after we deployed our new query we started receiving complaints that some objects were not processed and we concluded that the query doesn't return all of them because if we revert the old client with the old query (blocking) everything works as expected.

This is the new implementation of the query

        public async Task<IList<long>> GetPurchaseIds(SettlementInput settlementInput)
        {
                var listener = new RecordListener();
                var ids = new List<long>();
                listener.OnPurchase += (key, record) =>
                {
                     ids.Add(key.userKey.ToLong());
                };
        
                await listener.WaitHandle.EnterAsync();
                
                _client.Query(
                        new QueryPolicy(_client.QueryPolicyDefault),
                        listener,
                        new Statement
                        {
                            Namespace = _settings.Namespace,
                            SetName = _settings.PurchasesSetName,
                            Filter = Filter.Contains(
                                              MarketIdsBin.Name,
                                              IndexCollectionType.LIST,
                                              settlementInput.MarketId),
                            BinNames = new[] { ProcessedInputsBin.Name, SelectionsBin.Name, IsArchivedBin.Name }
                });

                await listener.WaitHandle.WaitAsync();
                
                if (listener.Exception != null)
                {
                      throw listener.Exception;  // query will be reprocessed
                }

            return ids;
        }
        
        private delegate void OnPurchase(Key key, Record record);

        private class RecordListener : RecordSequenceListener
        {
            public event OnPurchase OnPurchase;

            public AsyncMonitor WaitHandle { get; } // <-- Nito.Async
            public Exception Exception { get; }

            public RecordListener()
            {
                WaitHandle = new AsyncMonitor();
            }

            public void OnFailure(AerospikeException exception)
            {
                Exception = exception;
                WaitHandle.PulseAll();
            }

            public void OnRecord(Key key, Record record)
            {
                this.OnPurchase?.Invoke(key, record);
            }

            public void OnSuccess()
            {
                WaitHandle.PulseAll();
            }
        }

So my question is has anyone else observed such behaviour as well or is it something wrong that we are doing?

Option to call Tend and Asynctimeout in threadpool

The sdk creates 2 manual threads (tend and asynctimeout). I would prefer to move that logic to the threadpool, for example in some background tasks because in some cases even if we have a timeout of 300ms, I see that the asynctimeout thread gets CPU time every 3-4 seconds. Using cancellation tokens works ok, but still we want those actions on the threadpool.

Context: asp.net core, .net 5 rc1, high traffic (~12k IO per sec)

Should I make a PR ?

Aerospike library Conflicts with Bcrypt

The Aerospike client library includes it's own implementation of the popular BCrypt algorithm.
This class unfortunately is not only outdated, but also using a very generic namespace, making it incompatible with anyone wanting to use a recent BCrypt nuget package like BCrypt-Next.

Proposed solutions:

  1. Change the namespace of the class (Sounds the most easier to me and with less issues)
  2. Use a Nuget dependency for BCrypt, so we stay up-to-date with any security fix that comes out.

AsyncClient: BatchReads: SocketTimeout and CancellationToken aren't always honored.

Note: This work was started before Aerospike.Client was updated to 4.0.0 (yesterday), but based on the commit history I have no reason to think this was addressed. Specifically, I used 3.9.10.

In using the AsyncClient in one of my production applications, during batch reads, I've found that setting the SocketTimeout with a TotalTimeout of 0 and a series of MaxRetries isn't always effective in terminating the request to Aerospike.

As a result, I've added an execution timeout-- using Polly.Timeout. In particular, I've set the execution timeout to be 1 second. When this timeout is fired, the cancellation token provided to the get method is cancelled. In practice, however, I've found that the tasks sometimes linger for a substantial amount of time (I've seen 96 seconds of execution time in production).

To simplify the problem, I wrote a console app which demonstrates the behavior without need for using utilities like clumsy to alter network performance. This can be seen here:

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Aerospike.Client;
using Policy = Aerospike.Client.Policy;

namespace aerospike_tests
{
    class Program
    {
        private const int AEROSPIKE_PORT = 3000;
        private const int CONNECTION_TIMEOUT_MS = 500;
        private const int REQUEST_TIMEOUT_MS = 300;
        private const int TOTAL_TIMEOUT = 0;
        private const int MAX_RETRIES = 2;
        private const int SLEEP_BETWEEN_RETRIES_MS = 5;
        private const int TASK_EXECUTION_TIMEOUT = ((REQUEST_TIMEOUT_MS + SLEEP_BETWEEN_RETRIES_MS) * (MAX_RETRIES * 2));

        private const string AEROSPIKE_HOST = "localhost";
        private const string NAMESPACE = "{{YOUR_NAMESPACE_HERE}}";
        private const string SET = "aerospike-test";
        private const string BIN = "value";

        private static ConcurrentDictionary<int, (bool WasCancelled, bool DidSucceed, bool DidFail, DateTimeOffset? TimeCancelled, DateTimeOffset? TimeSucceeded)> Results = new ConcurrentDictionary<int, (bool WasCancelled, bool DidSucceed, bool DidFail, DateTimeOffset? TimeCancelled, DateTimeOffset? TimeSucceeded)>();

        static async Task Main(string[] args)
        {
            const int recordsToTest = 200;
            const int executionsToPerform = 100;
            AsyncClient client = GetAsyncClient();

            await WriteNRecordsToAerospike(client, recordsToTest);

            Console.WriteLine("Press any key to begin reads from Aerospike.");
            Console.ReadKey();

            Stopwatch stopwatch = new Stopwatch();
            stopwatch.Start();

            IList<Task<Record[]>> tasks = new List<Task<Record[]>>();

            for (int i = 0; i < executionsToPerform; i++)
            {
                Task<Record[]> task = GetNRecordsFromAerospike(client, recordsToTest, i, executionsToPerform);
                tasks.Add(task);
            }

            await Task.WhenAll(tasks);

            stopwatch.Stop();
            
            Console.WriteLine($"ApplicationExecutionComplete ::=> Execution took {GetStopwatchExecutionTime(stopwatch)}");

            Console.WriteLine("\nStarting analysis...");

            foreach (int iteration in Results.Keys)
            {
                string iterationString = $"Iteration ({iteration} / {executionsToPerform}) :: =>";
                if (Results.TryGetValue(iteration, out (bool WasCancelled, bool DidSucceed, bool DidFail, DateTimeOffset? TimeCancelled, DateTimeOffset? TimeSucceeded) value))
                {
                    if (value.WasCancelled && value.DidSucceed)
                    {
                        Console.WriteLine($"{iterationString} Task was cancelled, but also succeeded. Total difference between cancellation: {(value.TimeSucceeded.Value - value.TimeCancelled.Value).TotalSeconds} seconds or {(value.TimeSucceeded.Value - value.TimeCancelled.Value).TotalMilliseconds} milliseconds");
                    }
                    else if (value.WasCancelled && value.DidFail)
                    {
                        Console.WriteLine($"{iterationString} Task was cancelled and did not succeed.");
                    }
                    else if (!value.WasCancelled && value.DidSucceed)
                    {
                        Console.WriteLine($"{iterationString} Task succeeded without being cancelled.");
                    }
                }
                else
                {
                    Console.WriteLine($"Unable to retrieve result for {iteration}.");
                }
            }

            return;
        }

        private static string GetStopwatchExecutionTime(Stopwatch stopwatch) => $"{stopwatch.Elapsed.TotalMilliseconds} milliseconds";

        private static Task<Record[]> GetNRecordsFromAerospike(AsyncClient client, int numberOfRecordsToRead, int requestNumber, int requestTotal)
        {
            return Task.Run(async () =>
            {
                CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
                IList<Key> keys = new List<Key>();

                for (int i = 0; i < numberOfRecordsToRead; i++)
                {
                    keys.Add(new Key(NAMESPACE, SET, $"-{i}"));
                }

                Stopwatch stopwatch = Stopwatch.StartNew();

                try
                {
                    Task<Record[]> task = client.Get(null, cancellationTokenSource.Token, keys.ToArray()).ContinueWith(t =>
                    { 
                        stopwatch.Stop();
                        if (t.IsCompletedSuccessfully)
                        {
                            Console.WriteLine($"(Success) Elapsed ms: {stopwatch.Elapsed.TotalMilliseconds}. ({requestNumber} / {requestTotal})");
                            Results.AddOrUpdate(
                                requestNumber,
                                num => (false, true, false, null, DateTimeOffset.Now),
                                (num, tuple) =>
                                {
                                    tuple.TimeSucceeded = DateTimeOffset.Now;
                                    tuple.DidSucceed = true;

                                    return tuple;
                                }
                            );
                            return t.Result;
                        }

                        if (t.Status == TaskStatus.Faulted)
                        {
                            Console.WriteLine($"Task faulted, Elapsed ms: {stopwatch.Elapsed.TotalMilliseconds}. ({requestNumber} / {requestTotal})");
                            Results.AddOrUpdate(
                                requestNumber,
                                num => (false, false, true, null, null),
                                (num, tuple) =>
                                {
                                    tuple.DidFail = true;

                                    return tuple;
                                }
                            );
                            return null;
                        }

                        if (t.Status == TaskStatus.Canceled)
                        {
                            Console.WriteLine($"Task successfully cancelled, Elapsed ms: {stopwatch.Elapsed.TotalMilliseconds}. ({requestNumber} / {requestTotal})");
                            Results.AddOrUpdate(
                                requestNumber,
                                num => (true, false, false, DateTimeOffset.Now, null),
                                (num, tuple) =>
                                {
                                    if (!tuple.WasCancelled)
                                    {
                                        tuple.WasCancelled = true;
                                        tuple.TimeCancelled = DateTimeOffset.Now;
                                    }

                                    return tuple;
                                }
                            );
                            return null;
                        }

                        Console.WriteLine($"Unexpected failure, Elapsed ms: {stopwatch.Elapsed.TotalMilliseconds}. ({requestNumber} / {requestTotal})");
                        return null;

                    });

                    if (task.Wait(TASK_EXECUTION_TIMEOUT))
                    {
                        Record[] executionResult = await task;
                        return executionResult;
                    }
                    else
                    {
                        Console.WriteLine($"Cancelling ({requestNumber} / {requestTotal})");
                        cancellationTokenSource.Cancel();
                        Results.AddOrUpdate(
                            requestNumber, (num) => (true, false, false, DateTimeOffset.Now, null),
                            (i, tuple) =>
                            {
                                tuple.WasCancelled = true;
                                tuple.TimeCancelled = DateTimeOffset.Now;

                                return tuple;
                            }
                        );
                        return null;
                    }
                }
                catch (AerospikeException ae)
                {
                    stopwatch.Stop();
                    Console.WriteLine($"err Elapsed ms: {stopwatch.Elapsed.TotalMilliseconds}. ({requestNumber} / {requestTotal})");
                }
                catch (Exception e)
                {
                    stopwatch.Stop();
                    Console.WriteLine($"err Elapsed ms: {stopwatch.Elapsed.TotalMilliseconds}. ({requestNumber} / {requestTotal})");
                }

                return null;
            });

        }

        private static async Task WriteNRecordsToAerospike(AsyncClient client, int numberOfRecordsToWrite)
        {
            Console.WriteLine($"\nWriting {numberOfRecordsToWrite} records to Aerospike.");
            for (int i = 0; i < numberOfRecordsToWrite; i++) {
                await client.Put(
                    null,
                    CancellationToken.None,
                    new Key(NAMESPACE, SET, $"-{i}"),
                    new[] {new Bin(BIN, i + 1)}
                );
            }
            Console.WriteLine("Write complete.");
        }

        private static AsyncClient GetAsyncClient()
        {
            AsyncClient client = new AsyncClient(new AsyncClientPolicy
            {
                timeout = CONNECTION_TIMEOUT_MS,
                readPolicyDefault = new Policy
                {
                    socketTimeout = REQUEST_TIMEOUT_MS,
                    totalTimeout = TOTAL_TIMEOUT,
                    maxRetries = MAX_RETRIES,
                    sendKey = false,
                    sleepBetweenRetries = SLEEP_BETWEEN_RETRIES_MS
                }
            },
            new []
            {
                new Host(AEROSPIKE_HOST, AEROSPIKE_PORT), 
            });

            return client;
        }
    }
}

To execute, you'll need to specify a valid aerospike cluster address and namespace.

Testing

In my testing, I modify the variable executionsToPerform in an effort to control the applied contention.

executionsToPerform = 10

At 10, the application behaves ideally:

(Success) Elapsed ms: 64.3818. (5 / 10)
(Success) Elapsed ms: 64.4266. (2 / 10)
(Success) Elapsed ms: 76.9364. (0 / 10)
(Success) Elapsed ms: 77.0929. (7 / 10)
(Success) Elapsed ms: 77.0686. (8 / 10)
(Success) Elapsed ms: 77.1184. (4 / 10)
(Success) Elapsed ms: 76.898. (1 / 10)
(Success) Elapsed ms: 77.1592. (9 / 10)
(Success) Elapsed ms: 77.2408. (6 / 10)
(Success) Elapsed ms: 77.2891. (3 / 10)
ApplicationExecutionComplete ::=> Execution took 133.0206 milliseconds

Starting analysis...
Iteration (0 / 10) :: => Task succeeded without being cancelled.
Iteration (1 / 10) :: => Task succeeded without being cancelled.
Iteration (2 / 10) :: => Task succeeded without being cancelled.
Iteration (3 / 10) :: => Task succeeded without being cancelled.
Iteration (4 / 10) :: => Task succeeded without being cancelled.
Iteration (5 / 10) :: => Task succeeded without being cancelled.
Iteration (6 / 10) :: => Task succeeded without being cancelled.
Iteration (7 / 10) :: => Task succeeded without being cancelled.
Iteration (8 / 10) :: => Task succeeded without being cancelled.
Iteration (9 / 10) :: => Task succeeded without being cancelled.

executionsToPerform = 100

At 100, things get interesting:

Cancelling (0 / 100)
Cancelling (9 / 100)
Cancelling (1 / 100)
Cancelling (7 / 100)
Cancelling (3 / 100)
Cancelling (2 / 100)
Cancelling (11 / 100)
Cancelling (5 / 100)
Cancelling (10 / 100)
Cancelling (8 / 100)
Cancelling (4 / 100)
Cancelling (6 / 100)
Cancelling (12 / 100)
Cancelling (22 / 100)
Cancelling (21 / 100)
Cancelling (15 / 100)
Cancelling (14 / 100)
Cancelling (17 / 100)
Cancelling (23 / 100)
Cancelling (18 / 100)
Cancelling (20 / 100)
Cancelling (19 / 100)
Cancelling (13 / 100)
Cancelling (16 / 100)
Cancelling (24 / 100)
Cancelling (25 / 100)
Cancelling (26 / 100)
Cancelling (29 / 100)
Cancelling (27 / 100)
Cancelling (32 / 100)
Cancelling (28 / 100)
Cancelling (31 / 100)
Cancelling (30 / 100)
Cancelling (34 / 100)
Cancelling (33 / 100)
Cancelling (35 / 100)
Cancelling (36 / 100)
Cancelling (37 / 100)
Cancelling (38 / 100)
Cancelling (39 / 100)
Cancelling (40 / 100)
Cancelling (41 / 100)
Cancelling (42 / 100)
Cancelling (43 / 100)
Cancelling (44 / 100)
Cancelling (46 / 100)
Cancelling (47 / 100)
Cancelling (45 / 100)
Cancelling (48 / 100)
Cancelling (49 / 100)
Cancelling (50 / 100)
Cancelling (51 / 100)
Cancelling (52 / 100)
Cancelling (53 / 100)
Cancelling (54 / 100)
Cancelling (55 / 100)
Cancelling (56 / 100)
Cancelling (57 / 100)
Cancelling (58 / 100)
Cancelling (59 / 100)
Cancelling (61 / 100)
Cancelling (60 / 100)
Cancelling (62 / 100)
Cancelling (63 / 100)
Cancelling (64 / 100)
Cancelling (65 / 100)
Cancelling (66 / 100)
Cancelling (67 / 100)
Cancelling (68 / 100)
Cancelling (69 / 100)
Cancelling (71 / 100)
Cancelling (70 / 100)
Cancelling (73 / 100)
Cancelling (72 / 100)
Cancelling (75 / 100)
Cancelling (74 / 100)
Cancelling (76 / 100)
Cancelling (77 / 100)
Cancelling (78 / 100)
Cancelling (79 / 100)
Cancelling (80 / 100)
Cancelling (82 / 100)
Cancelling (81 / 100)
Cancelling (83 / 100)
Cancelling (84 / 100)
Cancelling (85 / 100)
Cancelling (86 / 100)
Cancelling (87 / 100)
(Success) Elapsed ms: 8599.2519. (0 / 100)
(Success) Elapsed ms: 8602.6503. (9 / 100)
Cancelling (90 / 100)
Cancelling (89 / 100)
Cancelling (88 / 100)
Cancelling (91 / 100)
Cancelling (92 / 100)
(Success) Elapsed ms: 8621.0279. (4 / 100)
(Success) Elapsed ms: 8630.6432. (7 / 100)
Cancelling (93 / 100)
Cancelling (94 / 100)
Cancelling (95 / 100)
(Success) Elapsed ms: 8647.2984. (8 / 100)
(Success) Elapsed ms: 8651.4726. (1 / 100)
(Success) Elapsed ms: 8652.1784. (3 / 100)
Cancelling (96 / 100)
Cancelling (97 / 100)
(Success) Elapsed ms: 8675.7053. (10 / 100)
(Success) Elapsed ms: 8680.4213. (5 / 100)
(Success) Elapsed ms: 7550.8311. (16 / 100)
(Success) Elapsed ms: 8707.3032. (11 / 100)
(Success) Elapsed ms: 8725.4796. (2 / 100)
(Success) Elapsed ms: 7830.7989. (12 / 100)
(Success) Elapsed ms: 7489.9457. (15 / 100)
(Success) Elapsed ms: 7494.0894. (21 / 100)
(Success) Elapsed ms: 7516.2372. (13 / 100)
(Success) Elapsed ms: 7520.5314. (14 / 100)
(Success) Elapsed ms: 7517.7631. (24 / 100)
(Success) Elapsed ms: 7523.0532. (20 / 100)
(Success) Elapsed ms: 7544.5024. (22 / 100)
(Success) Elapsed ms: 7549.9045. (19 / 100)
(Success) Elapsed ms: 8701.8034. (6 / 100)
(Success) Elapsed ms: 7572.8999. (17 / 100)
(Success) Elapsed ms: 7577.8698. (23 / 100)
(Success) Elapsed ms: 7578.5922. (18 / 100)
(Success) Elapsed ms: 6956.9484. (25 / 100)
(Success) Elapsed ms: 6742.6774. (26 / 100)
(Success) Elapsed ms: 6400.1214. (29 / 100)
(Success) Elapsed ms: 5609.7759. (40 / 100)
(Success) Elapsed ms: 6406.2664. (30 / 100)
(Success) Elapsed ms: 6428.6826. (27 / 100)
(Success) Elapsed ms: 6432.1711. (34 / 100)
(Success) Elapsed ms: 6453.623. (35 / 100)
(Success) Elapsed ms: 6461.6126. (31 / 100)
(Success) Elapsed ms: 6460.9483. (36 / 100)
(Success) Elapsed ms: 6484.039. (33 / 100)
(Success) Elapsed ms: 5374.6058. (49 / 100)
(Success) Elapsed ms: 6460.9683. (37 / 100)
(Success) Elapsed ms: 6479.8062. (38 / 100)
(Success) Elapsed ms: 5878.3424. (39 / 100)
(Success) Elapsed ms: 6404.5435. (28 / 100)
(Success) Elapsed ms: 4780.9508. (53 / 100)
(Success) Elapsed ms: 5315.1546. (42 / 100)
(Success) Elapsed ms: 5340.3914. (44 / 100)
(Success) Elapsed ms: 5344.3627. (46 / 100)
(Success) Elapsed ms: 5347.4235. (43 / 100)
(Success) Elapsed ms: 5369.4667. (48 / 100)
(Success) Elapsed ms: 5375.8805. (45 / 100)
(Success) Elapsed ms: 6490.2285. (32 / 100)
(Success) Elapsed ms: 5375.7127. (47 / 100)
(Success) Elapsed ms: 5368.8213. (50 / 100)
(Success) Elapsed ms: 5370.0847. (51 / 100)
(Success) Elapsed ms: 5368.3678. (52 / 100)
(Success) Elapsed ms: 5297.3996. (41 / 100)
(Success) Elapsed ms: 4528.808. (54 / 100)
(Success) Elapsed ms: 4237.3416. (55 / 100)
(Success) Elapsed ms: 4242.107. (56 / 100)
(Success) Elapsed ms: 4241.2251. (61 / 100)
(Success) Elapsed ms: 4246.7797. (57 / 100)
(Success) Elapsed ms: 4270.9717. (62 / 100)
(Success) Elapsed ms: 4277.7379. (63 / 100)
(Success) Elapsed ms: 4301.2064. (59 / 100)
(Success) Elapsed ms: 4308.0272. (58 / 100)
(Success) Elapsed ms: 4307.8508. (60 / 100)
(Success) Elapsed ms: 4293.8079. (64 / 100)
Cancelling (98 / 100)
(Success) Elapsed ms: 4303.1547. (65 / 100)
(Success) Elapsed ms: 4300.5034. (66 / 100)
(Success) Elapsed ms: 4311.8937. (67 / 100)
(Success) Elapsed ms: 3717.4712. (68 / 100)
(Success) Elapsed ms: 3458.4406. (69 / 100)
(Success) Elapsed ms: 3167.5063. (70 / 100)
(Success) Elapsed ms: 3169.1178. (71 / 100)
(Success) Elapsed ms: 3196.1138. (75 / 100)
(Success) Elapsed ms: 3205.8526. (72 / 100)
(Success) Elapsed ms: 3217.6829. (74 / 100)
(Success) Elapsed ms: 3224.8897. (73 / 100)
(Success) Elapsed ms: 3222.9186. (77 / 100)
(Success) Elapsed ms: 3225.8724. (76 / 100)
(Success) Elapsed ms: 3230.3647. (78 / 100)
(Success) Elapsed ms: 3236.2073. (79 / 100)
(Success) Elapsed ms: 3227.1404. (80 / 100)
(Success) Elapsed ms: 3232.734. (81 / 100)
(Success) Elapsed ms: 3238.8613. (82 / 100)
(Success) Elapsed ms: 2641.7087. (83 / 100)
(Success) Elapsed ms: 2386.1963. (84 / 100)
(Success) Elapsed ms: 2095.2441. (86 / 100)
(Success) Elapsed ms: 2113.4276. (85 / 100)
(Success) Elapsed ms: 2136.5558. (87 / 100)
(Success) Elapsed ms: 2133.4102. (88 / 100)
(Success) Elapsed ms: 2133.5221. (89 / 100)
Cancelling (99 / 100)
(Success) Elapsed ms: 2137.1506. (91 / 100)
(Success) Elapsed ms: 2160.0054. (90 / 100)
(Success) Elapsed ms: 2160.5841. (92 / 100)
(Success) Elapsed ms: 2140.9805. (93 / 100)
(Success) Elapsed ms: 2154.6686. (94 / 100)
(Success) Elapsed ms: 2159.8943. (95 / 100)
(Success) Elapsed ms: 2157.8588. (96 / 100)
(Success) Elapsed ms: 2158.7219. (97 / 100)
(Success) Elapsed ms: 1595.5928. (98 / 100)
(Success) Elapsed ms: 1331.9865. (99 / 100)
ApplicationExecutionComplete ::=> Execution took 9741.0888 milliseconds

Starting analysis...
Iteration (0 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 7.3893189 seconds or 7389.3189 milliseconds
Iteration (1 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 7.5438073 seconds or 7543.8073 milliseconds
Iteration (2 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 7.6017621 seconds or 7601.7621 milliseconds
Iteration (3 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 7.5441212 seconds or 7544.1212 milliseconds
Iteration (4 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 7.4687259 seconds or 7468.7259 milliseconds
Iteration (5 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 7.5740193 seconds or 7574.0193 milliseconds
Iteration (6 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 7.7031229 seconds or 7703.1229 milliseconds
Iteration (7 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 7.4869794 seconds or 7486.9794 milliseconds
Iteration (8 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 7.5393265 seconds or 7539.3265 milliseconds
Iteration (9 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 7.409441 seconds or 7409.441 milliseconds
Iteration (10 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 7.5731682 seconds or 7573.1682 milliseconds
Iteration (11 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 7.6010219 seconds or 7601.0219 milliseconds
Iteration (12 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 6.7356045 seconds or 6735.6045 milliseconds
Iteration (13 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 6.4035529 seconds or 6403.5529 milliseconds
Iteration (14 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 6.4068775 seconds or 6406.8775 milliseconds
Iteration (15 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 6.3826249 seconds or 6382.6249 milliseconds
Iteration (16 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 6.3487576 seconds or 6348.7576 milliseconds
Iteration (17 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 6.4860669 seconds or 6486.0669 milliseconds
Iteration (18 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 6.4919501 seconds or 6491.9501 milliseconds
Iteration (19 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 6.4605803 seconds or 6460.5803 milliseconds
Iteration (20 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 6.4336116 seconds or 6433.6116 milliseconds
Iteration (21 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 6.4008099 seconds or 6400.8099 milliseconds
Iteration (22 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 6.4569086 seconds or 6456.9086 milliseconds
Iteration (23 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 6.491182 seconds or 6491.182 milliseconds
Iteration (24 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 6.3999061 seconds or 6399.9061 milliseconds
Iteration (25 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 5.8736482 seconds or 5873.6482 milliseconds
Iteration (26 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 5.6569436 seconds or 5656.9436 milliseconds
Iteration (27 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 5.3412535 seconds or 5341.2535 milliseconds
Iteration (28 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 5.4300109 seconds or 5430.0109 milliseconds
Iteration (29 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 5.2960333 seconds or 5296.0333 milliseconds
Iteration (30 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 5.3151464 seconds or 5315.1464 milliseconds
Iteration (31 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 5.3711072 seconds or 5371.1072 milliseconds
Iteration (32 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 5.5249778 seconds or 5524.9778 milliseconds
Iteration (33 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 5.3762233 seconds or 5376.2233 milliseconds
Iteration (34 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 5.3453173 seconds or 5345.3173 milliseconds
Iteration (35 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 5.3468596 seconds or 5346.8596 milliseconds
Iteration (36 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 5.37511 seconds or 5375.11 milliseconds
Iteration (37 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 5.3491258 seconds or 5349.1258 milliseconds
Iteration (38 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 5.3690855 seconds or 5369.0855 milliseconds
Iteration (39 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 4.7705425 seconds or 4770.5425 milliseconds
Iteration (40 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 4.3899716 seconds or 4389.9716 milliseconds
Iteration (41 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 4.3411949 seconds or 4341.1949 milliseconds
Iteration (42 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 4.237412 seconds or 4237.412 milliseconds
Iteration (43 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 4.24723 seconds or 4247.23 milliseconds
Iteration (44 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 4.2421428 seconds or 4242.1428 milliseconds
Iteration (45 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 4.2794217 seconds or 4279.4217 milliseconds
Iteration (46 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 4.2423601 seconds or 4242.3601 milliseconds
Iteration (47 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 4.3082921 seconds or 4308.2921 milliseconds
Iteration (48 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 4.270697 seconds or 4270.697 milliseconds
Iteration (49 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 4.1530993 seconds or 4153.0993 milliseconds
Iteration (50 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 4.2911222 seconds or 4291.1222 milliseconds
Iteration (51 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 4.281639 seconds or 4281.639 milliseconds
Iteration (52 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 4.2944055 seconds or 4294.4055 milliseconds
Iteration (53 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 3.566405 seconds or 3566.405 milliseconds
Iteration (54 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 3.4521462 seconds or 3452.1462 milliseconds
Iteration (55 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 3.1428791 seconds or 3142.8791 milliseconds
Iteration (56 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 3.1608458 seconds or 3160.8458 milliseconds
Iteration (57 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 3.1681468 seconds or 3168.1468 milliseconds
Iteration (58 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 3.2253509 seconds or 3225.3509 milliseconds
Iteration (59 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 3.2186054 seconds or 3218.6054 milliseconds
Iteration (60 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 3.2251227 seconds or 3225.1227 milliseconds
Iteration (61 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 3.1653613 seconds or 3165.3613 milliseconds
Iteration (62 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 3.1898672 seconds or 3189.8672 milliseconds
Iteration (63 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 3.1950768 seconds or 3195.0768 milliseconds
Iteration (64 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 3.2091897 seconds or 3209.1897 milliseconds
Iteration (65 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 3.2254654 seconds or 3225.4654 milliseconds
Iteration (66 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 3.2114142 seconds or 3211.4142 milliseconds
Iteration (67 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 3.2324044 seconds or 3232.4044 milliseconds
Iteration (68 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 2.6386878 seconds or 2638.6878 milliseconds
Iteration (69 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 2.3649839 seconds or 2364.9839 milliseconds
Iteration (70 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 2.0881705 seconds or 2088.1705 milliseconds
Iteration (71 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 2.0959612 seconds or 2095.9612 milliseconds
Iteration (72 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 2.1370015 seconds or 2137.0015 milliseconds
Iteration (73 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 2.1424244 seconds or 2142.4244 milliseconds
Iteration (74 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 2.1416679 seconds or 2141.6679 milliseconds
Iteration (75 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 2.1129105 seconds or 2112.9105 milliseconds
Iteration (76 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 2.1681447 seconds or 2168.1447 milliseconds
Iteration (77 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 2.1332363 seconds or 2133.2363 milliseconds
Iteration (78 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 2.1408753 seconds or 2140.8753 milliseconds
Iteration (79 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 2.1340121 seconds or 2134.0121 milliseconds
Iteration (80 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 2.1539297 seconds or 2153.9297 milliseconds
Iteration (81 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 2.1309138 seconds or 2130.9138 milliseconds
Iteration (82 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 2.1582119 seconds or 2158.2119 milliseconds
Iteration (83 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 1.583232 seconds or 1583.232 milliseconds
Iteration (84 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 1.3244563 seconds or 1324.4563 milliseconds
Iteration (85 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 1.0521095 seconds or 1052.1095 milliseconds
Iteration (86 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 1.0305809 seconds or 1030.5809 milliseconds
Iteration (87 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 1.0348567 seconds or 1034.8567 milliseconds
Iteration (88 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 1.0038913 seconds or 1003.8913 milliseconds
Iteration (89 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 1.0310295 seconds or 1031.0295 milliseconds
Iteration (90 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 1.0627188 seconds or 1062.7188 milliseconds
Iteration (91 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 1.0260993 seconds or 1026.0993 milliseconds
Iteration (92 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 1.0337954 seconds or 1033.7954 milliseconds
Iteration (93 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 1.0096219 seconds or 1009.6219 milliseconds
Iteration (94 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 0.9914294 seconds or 991.4294 milliseconds
Iteration (95 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 1.0037271 seconds or 1003.7271 milliseconds
Iteration (96 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 0.9805735 seconds or 980.5735 milliseconds
Iteration (97 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 0.9600919 seconds or 960.0919 milliseconds
Iteration (98 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 0.3822146 seconds or 382.2146 milliseconds
Iteration (99 / 100) :: => Task was cancelled, but also succeeded. Total difference between cancellation: 0.1061034 seconds or 106.1034 milliseconds

In instances, we can see that the amount of time that the task runs beyond token cancellation gets large quick.

With a high traffic application, it's easy for this to cause an application to choke with little way out.

I can share application spans (obtained via traces) demonstrating this behavior in production, if needed.

Speculation

In digging into the source code, I followed the crumbs to RecordArrayListenerAdapter -> ListenerAdapter, where I found the only thing I can find that the CancellationToken is being used.

Beyond this, I'm unsure what's causing this behavior.

Limiting records returned by a query does not work

We are trying to limit the records returned by a query using statement max records or policy short query but it does not work. In the example bellow we expect ~10 records, but for some cases we get 100k. At this point we are using C# client version 5.3.2 but upgrading to 5.4.x or even 6.x does not change this behavior.

        var queryPolicy = new QueryPolicy
        {
            shortQuery = true,
            totalTimeout = 10000,
            readModeSC = ReadModeSC.ALLOW_REPLICA,
            includeBinData = false,
            maxRetries = 0,
        };
        
        var stmt = new Statement();
        stmt.SetNamespace("namespace");
        stmt.SetSetName("setname");
        stmt.MaxRecords = 10;
        stmt.SetFilter(Filter.Equal("binname", "binvalue"));

        
        var result = client.Query(queryPolicy, stmt);
        
        int count = 0;
        while (result.Next())
        {
            count++;
        }
        
        Console.WriteLine(count);`

No context on logs

Hello, to give a logger to the client the static method

public static void SetCallback(Callback callback)

Since it's static we can't add any context to the log such as the cluster name and we end up with log such as Add node X.X.X.X 4333 failed: Error -1: Invalid non-TLS address but it's hard to predict the impact with just an ip.

What do you think about these 3 different solutions:

  1. Add some context to the log callback with the cluster name?
  2. Have an instance logger by client so we can add any context and even business-specific ones
  3. Automatically add context to the log message

Better control over large object heap allocations

We would like to reduce the number of large object heap allocations when fetching records from Aerospike. As we query Aerospike a lot, the LOH allocations make it more likely for full GC to be triggered which damages performance of our application. We want to avoid/re-use temporary large buffers as much as possible to reduce large allocations.

In our case we are reading bins with BLOB values which may be over 85K individually or cumulatively. Aerospike client LOH allocations then come from two places:

  • ThreadLocalData allocating a new buffer to receive Aerospike response if the response is over 128K (currently rare for us)
  • BytesToParticle() function allocating a new array for each BLOB bin value (happens frequently for us)

We are affected by the issue with synchronous Get() (ReadCommand). I'd expect that similar issue exists for other paths as well.

In our internal prototype, I've added a new member to the Policy class which is a callback interface which ReadCommand invokes to handle bin values, e.g:

public interface IRecordBuilder
{
  object BytesToParticle(string binName, int type, byte[] buf, int offset, int len);
}

public class Policy
{
  ...
  public IRecordBuilder recordBuilder = new RecordBuilder();
}     

The default implementation of the interface simply calls into ByteUtil, but in our application we instead do direct processing (e.g. deserialisation) of the blob data without any need for creating subarrays in the first place. This has an extra benefit of speed improvement as the blob data is no longer copied around.

For ThreadLocalData I simply made the cutoff non-const, so that it could be overridden to an arbitrary value, but we have not seen a need to do so yet.

While this solves the immediate issue for us, we are keen to have a solution available in the mainstream as well (not at all attached to the design above). Please let me know your thoughts on this, and if that helps I am happy to devise a PR for the design we agree on.

Race condition in ListenerAdapter<T>.OnFailure, System.InvalidOperationException

After switching to the official async C# client, we are seeing lots of errors similar to this:

Exception: System.InvalidOperationException

Message: An attempt was made to transition a task to a final state when it had already completed.

StackTrace: at System.Threading.Tasks.TaskCompletionSource1.SetException(Exception exception) at Aerospike.Client.ListenerAdapter1.OnFailure(AerospikeException exception)
at Aerospike.Client.AsyncRead.OnFailure(AerospikeException e)
at Aerospike.Client.AsyncCommand.FailOnApplicationError(AerospikeException ae)
at Aerospike.Client.AsyncCommand.SocketHandler(Object sender, SocketAsyncEventArgs args)
at System.Net.Sockets.SocketAsyncEventArgs.OnCompleted(SocketAsyncEventArgs e)
at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state)
at System.Net.Sockets.SocketAsyncEventArgs.FinishOperationSuccess(SocketError socketError, Int32 bytesTransferred, SocketFlags flags)
at System.Net.Sockets.SocketAsyncEventArgs.CompletionPortCallback(UInt32 errorCode, UInt32 numBytes, NativeOverlapped* nativeOverlapped)
at System.Threading._IOCompletionCallback.PerformIOCompletionCallback(UInt32 errorCode, UInt32 numBytes, NativeOverlapped* pOVERLAP)

Follow C# naming conventions

Throughout the code, there is no sticking to C# coding covnentions.
For example, in the record class:

public readonly Dictionary<string,object> bins;
public readonly int generation;
public readonly int expiration;

You expose public fields, instead of properties, and the naming convention is CamelCase, where you use pascalCase.
Are you guys accepting pull requests?

Add tracing

Context

Tracing is a very powerful concept for modern backends and found its way as a basic of observability as logs and metrics are today.

In dotnet, distributed tracing was pushed forward in .NET 6 with the introduction of the System.Diagnostics.Activity APIs in the base class library.
See this article and the official documentation.

Instrumentation was progressively introduced for the most common protocols over the last two years, starting with the HttpClient and following with Grpc.Net.Client (see some example from the source code).
Given the critical place Aerospike spike has in many backends, it would be more than welcome for this client to implement the Activity APIs.

How

The Activity API is quite barebone and gives a lot of freedom as to how to use them. OpenTelemetry (part of the CNCF) proposes some naming conventions, which are already widely adopted by the industry (Grpc.Net.Client uses them).

  • The base idea is to create a new Activity whenever there is an I/O (in the case of retries, it means several Activities will be created).
  • Tags must be added to add some information such as: which node was called? What kind of operation is it? What is the key? Is it the first attempt? ...
  • Activity.Current uses AsyncLocal. It's an important detail to understand how this API works under the hood
  • In case of error, the ongoing Activity state must be marked as error
  • When there is no listener, ActivitySource.StartActivity(...) will return null. That means that this won't affect performances in any way if there is no trace/listener ongoing (it will make no difference for people that don't use tracing)

Aerospike under k8s using tcp proxy (null refrence)

We are having issue when using aerospike through tcp proxy.

If we disable use alternate-access-address we are having null reference in client.
The behavior is right. It can't find any working address. But, May be exception should be more informative?

Type: LogMessage
ExceptionDetails:
[0].Type: NullReferenceException
[0].Message: Object reference not set to an instance of an object.
[0].Source: AerospikeClient
[0].Method: FailOnApplicationError
[0].StackTrace:    at Aerospike.Client.AsyncCommand.FailOnApplicationError(AerospikeException ae)
   at Aerospike.Client.AsyncCommand.ExecuteCommand()
   at Aerospike.Client.AsyncCommand.ExecuteCore()
   at Aerospike.Client.AsyncCommand.ExecuteInline(SocketAsyncEventArgs e)
   at Aerospike.Client.AsyncCommandBlockingQueue.ScheduleCommand(AsyncCommand command)
   at Aerospike.Client.AsyncCluster.ScheduleCommandExecution(AsyncCommand command)
   at Aerospike.Client.AsyncCommand.Execute()
   at Aerospike.Client.AsyncClient.Put(WritePolicy policy, WriteListener listener, Key key, Bin[] bins)
  ...
[0].Data: 
Exceptions:
Exception: System.NullReferenceException
Error: Object reference not set to an instance of an object.
Source: AerospikeClient
Method: FailOnApplicationError
Data:
Stack:
   at Aerospike.Client.AsyncCommand.FailOnApplicationError(AerospikeException ae)
   at Aerospike.Client.AsyncCommand.ExecuteCommand()
   at Aerospike.Client.AsyncCommand.ExecuteCore()
   at Aerospike.Client.AsyncCommand.ExecuteInline(SocketAsyncEventArgs e)
   at Aerospike.Client.AsyncCommandBlockingQueue.ScheduleCommand(AsyncCommand command)
   at Aerospike.Client.AsyncCluster.ScheduleCommandExecution(AsyncCommand command)
   at Aerospike.Client.AsyncCommand.Execute()
   at Aerospike.Client.AsyncClient.Put(WritePolicy policy, WriteListener listener, Key key, Bin[] bins)
...)```


System.IndexOutOfRangeException in AsyncMultiExecutor -> ExecuteBatchRetry method.

It appears there might be an off-by-one error in ExecuteBatchRetry.

public void ExecuteBatchRetry(AsyncMultiCommand[] cmds, AsyncMultiCommand orig)

The math is correct, as long as cmd != orig evaluates to false at least once. However, in a simplified code example, we can easily prove that we'll always go out of bounds if it doesn't:

int[] commands = new[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
int[] cmds = new[]{11, 12, 13, 14, 15,};

int[] target = new int[commands.Length + cmds.Length - 1];
int count = 0;
foreach(int cmd in commands){
	target[count++] = cmd;
}

foreach(int cmd in cmds){
	target[count++] = cmd;
}

Screenshot from LINQPad execution:
image

Ran into this issue sporadically using the AsyncClient, and was able to prevent the issue from persisting by setting retries to 0.

I assume the fix would be to not perform the subtraction operation from the sum of the lengths-- unless there's pretty heavy reasoning that cmd == true should evaluate to true at some point.
Happy to submit a PR.

Edit: An alternative is to verify the equality checking between the command objects (== vs Equals)

UDF Returns Wrong Value for Certain Longs

Hello, I mentioned to Ronen on a call yesterday that we were seeing some weird behavior with a UDF vs a normal Get() operation and he suggested I post the bug here.

When storing certain long values in Aerospike we are seeing different results come back based on how we retrieve the data. If I insert the following value 717438855724142593 into a BIN and use the client.Get() method I get the same value back. However, if I get the value in a map from a UDF the value returned is 717438855724142592 (1 less than the original value) (Note that we've also seen other times where the value is off by 2).

Side note: We also tried this exact same thing using the PHP driver and it reproduced the same results.

Also note that we are using Aerospike 3.7.5.1 and the the C# Aerospike Client 3.1.7.

Below you will find the C# code and UDF I was using.

C# Code

using (AerospikeClient client = new AerospikeClient(new ClientPolicy(), new Host("YourHostName", 3000)))
{
    Key udidKey = new Key("yournamespace", null, "testkey");

    WritePolicy insertPolicy = new WritePolicy()
        {
            recordExistsAction = RecordExistsAction.UPDATE
        };

    client.Put(insertPolicy, udidKey, new Bin("Id", 717438855724142593));

    Record rec = client.Get(null, udidKey);

    IDictionary recordAsMap = (IDictionary)client.Execute(
        null,
        udidKey,
        "DeviceUdfs",
        "FetchForApp",
        Value.Get(new List<string>() {"Id"}),
        Value.Get(123),
        Value.Get("Publisher"));
}

UDF

-- asRecord is an Aerospike record.
-- binNames is a List (special type in Aerospike).
-- appId type is an int.
-- awrType is a string.
function FetchForApp(asRecord, binNames, appId, awrType)

    local results

    if not aerospike:exists(asRecord) then
        results = nil
    else
        --map is a special type provided by Aerospike.
        results = map()

        --Map all the BINs to the results map.
        for binName in list.iterator(binNames) do
            results[binName] = asRecord[binName]
        end

        results.Generation = record.gen(asRecord)

        if appId ~= nil then
            --Find the matching Last App Was Run record if it exists.
            local awrMap

            if awrType == 'Publisher' then
                awrMap = asRecord.LastPubAwr
            elseif awrType == 'Advertiser' then
                awrMap = asRecord.LastAdvAwr
            end

            if awrMap ~= nil then
                results.LastAwr = awrMap[appId]
            end
        end
    end

    return results
end

TestSuite?

Is there some test suite associated with this that I am missing?

Failed to connect to host(s)

The client is randomly throwing an error with the message "Failed to connect to host(s)". When we put the IP instead of the CNAME it starts working.

Switch to license expression

Summary

I wish for the nuget packages to have the licence expression property set correctly.

Details

The licence expression property should be set to the correct licence type I.e. Apache-2.0 as this will enable analysis of licences in use to occur in external tools & the license type will be shown in Nuget etc.

Memory leaks on task operations

Hi,

The client (3.1.5, installed via Nuget) leaks after any asynchronous operation involving tasks. Every task leaves some objects in memory, even after the client is closed and disposed.

RunPutGetWithTask() from AsyncPutGet.cs could be a test example.

2015-09-21_140110

Performance optimizations

Hi,

we are using this client in our company code base and I was just searching for some open source project to join into so I was wondering if you are interested in any kind of performance tweaks I can make and not only that but join the project with whatever I can do. Is there some sort of documentation/code of conduct?

Regards,
kuskmen

Set failOnClusterChange = true in QueryPolicy cause "Error -1,1,1000,1000,2: Object reference not set to an instance of an object."

I use query policy with condition
_queryPolicy = new QueryPolicy(_client.QueryPolicyDefault) { failOnClusterChange = true };
And during executing _client.Query(_queryPolicy, new RecordSequenceHandler(tcs), queryParameters); I get "Error -1,1,1000,1000,2: Object reference not set to an instance of an object."
I've investigated the source code, and NullReference is produced in class AsyncCommand method ExecuteCommand() in

   node = (AsyncNode)GetNode(cluster);
   eventArgs.RemoteEndPoint = node.address;

node is null.
Because in AsyncMultiExecutor method

public void ExecuteValidate(AsyncMultiCommand[] commands, int maxConcurrent, string ns)
{
	this.commands = commands;
	this.maxConcurrent = (maxConcurrent == 0 || maxConcurrent >= commands.Length) ? commands.Length : maxConcurrent;
	this.ns = ns;

	AsyncQueryValidate.ValidateBegin(cluster, new BeginHandler(this, commands, this.maxConcurrent), commands[0].node, ns);
}

commands[0].node is null, it has set commands[0].serverNode
My package version is Aerospike.Client 3.9.9, but I have the same problem on version 4.1.0

Potential command loss in AsyncCommandDelayingQueue

Hi folks!
We've got a trouble with AsyncClient when using MaxCommandAction.DELAY. Sometimes our code hangs when operating huge amount of records in an asynchronous manner. We think that the issue is in how AsyncCommandDelayingQueue schedules incoming commands.

Let's look at source code of AsyncCommandDelayingQueue:

public override void ScheduleCommand(AsyncCommand command)
{
SocketAsyncEventArgs e;
// Try to dequeue one SocketAsyncEventArgs object from the queue and execute it.
if (argsQueue.TryDequeue(out e))
{
// If there are no awaiting command, the current command can be executed immediately.
if (delayQueue.IsEmpty)
{
command.ExecuteInline(e);
return;
}
else
{
argsQueue.Enqueue(e);
}
}
else

There's a place where a passed command can be lost (maybe an exception should be thrown there?):

...
if (argsQueue.TryDequeue(out e))
{
	if (delayQueue.IsEmpty)
	{
		command.ExecuteInline(e);
		return;
	}
	else
	{
		// Is the command just ignored?
		argsQueue.Enqueue(e);
	}
}
... 

In some cases it can occur and it seems that we have encountered it in production when there's not enough available connections to process commands.

We use TaskCompletionSource to wait for all operations to finish:

internal abstract class AerospikeOperationCompletionSource<T>
{
	private readonly TaskCompletionSource<T> _tcs;
	private readonly CancellationTokenRegistration _ctr;

	protected AerospikeOperationCompletionSource(CancellationToken cancellationToken)
	{
		_tcs = new TaskCompletionSource<T>(TaskCreationOptions.RunContinuationsAsynchronously);

		_ctr = cancellationToken.Register(() => _tcs.TrySetCanceled(), useSynchronizationContext: false);
	}

	public Task<T> Task => _tcs.Task;

	protected void SetResult(T result)
	{
		_tcs.SetResult(result);

		_ctr.Dispose();
	}

	protected void SetException(Exception exception)
	{
		_tcs.TrySetException(exception);

		_ctr.Dispose();
	}
}

private class RecordCompletionSource : AerospikeOperationCompletionSource<OperationResult>, RecordListener
{
	private int _operationCount;

	private readonly ConcurrentBag<KeyValuePair<Key, Record>> _records = new();
	private readonly ConcurrentBag<AerospikeException> _exceptions = new();

	public RecordCompletionSource(int operationCount, CancellationToken cancellationToken) : base(cancellationToken)
	{
		if (operationCount <= 0)
		{
			throw new ArgumentOutOfRangeException(nameof(operationCount));
		}

		_operationCount = operationCount;
	}

	public void OnSuccess(Key key, Record record)
	{
		_records.Add(new KeyValuePair<Key, Record>(key, record));

		OnOperationCompleted();
	}

	public void OnFailure(AerospikeException exception)
	{
		_exceptions.Add(exception);

		OnOperationCompleted();
	}

	private void OnOperationCompleted()
	{
		var leftOperations = Interlocked.Decrement(ref _operationCount);

		if (leftOperations == 0)
		{
			SetResult(new OperationResult(_records, _exceptions));
		}
	}
}

Using:

var completionSource = new RecordCompletionSource(keys.Length);

foreach (var key in keys)
{
	client.Operate(policy: null, completionSource, key, ops);
}

var result = await completionSource.Task;

If at least one command is lost a completion task won't be resolved at all (but can be cancelled if we pass cancellation token).

Client policy:

new AsyncClientPolicy
{
	asyncMaxCommandsInQueue = 0,
	asyncMaxCommands = 200,
	asyncMaxCommandAction = MaxCommandAction.DELAY,
	writePolicyDefault = new WritePolicy
	{
		maxRetries = 0,
		sendKey = true
	}
}

Aerospike.Client 4.2.4
aerospike-server:5.6.0.9

Potential enhancement: Query result interfaces for mocking frameworks to utilize

I'm currently at a loss when it comes to unit testing some functionality within my applications that take advantage of query results. To be clear, I've mainly been working within Moq and related mocking frameworks, however, the inability to mock RecordSet instantiations have been a hurdle. I've read through a related issue concerning the Java client library, and this particular comment - aerospike/aerospike-client-java#34 (comment)

gives me hope that this may be up for consideration within the C# library. If there is some unspoken of way to overcome this, or if there may be a library update cooking for the near future that would provide this, please let me know, and thanks!

.NET Standard and Nuget packaging

The Aerospike client as downloaded from Nuget causes dll conflict warnings when used from within a .NET Standard project:

warning MSB3277: Found conflicts between different versions of the same dependent assembly that could not be resolved.

The problem can be resolved by changing the project to target .NET Standard 1.6 from 1.4, so that Aerospike and the library are both referencing the same version of the common libraries.

Retargeting would normally not be an issue; however, .NET Standard 1.6 libraries cannot currently be referenced by .NET Framework projects (at least, until later this year according to Microsoft), forcing us to target 1.4.

So I suppose my question is: Does the Aerospike client need to target .NET Standard 1.6, or will it work fine targeting 1.4? Microsoft recommends targeting the lowest version possible (https://docs.microsoft.com/en-us/dotnet/standard/library).
Otherwise it might be ok to eat this warning until later this year.

Restarting the server causes AsyncClient to not recover

If you are using the AsyncClient and only using async calls, the client does not recover when the server goes down and comes back up. If you use a mix of async and sync calls and restart the server, then the client does recover.

I've attached a sample program (https://gist.github.com/mchecca/a9665f0f16796a8ef1e6) which reproduces the behavior as follows:

  1. Ensure the Aerospike server is running
  2. Start the program
  3. Restart the Aerospike service
  4. Observer the client begin to (obviously) fail to read and write, but then it hangs

If you repeat the following steps but with the listener commented out in either the Put or Get method, the client does recover.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.