Comments (31)
ec8d1db and d57e8b7instead should fix the case when the client closes the connection; the server side, and in particular the varint-decoder reading loop, should handle this case. Note that we need to distinguish between EOF in "normal operation" and EOF due to an error on the client side. We do this by checking nb_read_bytes
, that should be 0
, i.e. no varint decoding process has already started.
from agent-academy-1.
I must have forgotten to sync these properly. I now synced the academy repo with our repo wrt to the connection: https://github.com/valory-xyz/agent-academy-1/pull/53/files
@0xArdi or @marcofavorito please fix the issue in tcp_channel.py
in tests. It's outside my comfort zone atm. Then we can try if merging this branch into the branch with the test agent and updating and running there will show the issue to be fixed or not.
Regarding the configuration. I would like to first sort the issue by aligning the hard coded upper bound to the default value we have on Tendermint. Once the issue is solved, then we can make it configurable in a separate step.
from agent-academy-1.
I am only able to get DecodeVarintError
when I kill break_abci.py
, ie the (mock) node goes down. Is that the case for you too @DavidMinarsch?
Regarding the error in break_abci.py
:
/Users/davidminarsch/v_projects/agent-academy-1/tests/test_connections/fuzzy_tests/mock_node/channels/tcp_channel.py:128: RuntimeWarning: coroutine 'StreamWriter.drain' was never awaited
cast(asyncio.StreamWriter, self.writer).drain()
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
The drain coroutine is not being awaited, I suppose something like this should work for that, what do you think @DavidMinarsch @marcofavorito ?
def _send_data(self, data: bytes) -> None:
"""Send data over the TCP connection."""
cast(asyncio.StreamWriter, self.writer).write(data)
drain_coro = cast(asyncio.StreamWriter, self.writer).drain()
loop = asyncio.new_event_loop()
loop.run_until_complete(drain_coro)
loop.close()
from agent-academy-1.
c6c208e and 7b5d200 should have fixed the issue on TCPChannel side. Working on the issue abci server-side.
from agent-academy-1.
Thank you very much for this @0xArdi! The solution proposed in #57 makes sense if we want to keep the connection open. I propose an alternative approach. When the client sends a varint that exceeds our upper bound (currently at 1 MB), we consider that as a failure on the client side; it can be due to a bug, or a non-compliance to our protocol that (should) enforce a limit on the transaction size (relatively easy) and each ABCI request (less trivial since it depends on Tendermint behaviour? Maybe just a Tendermint node configuration problem).
If we want to keep the current approach (don't drop the connection); what I don't like much is the fact that we still have to iterate n / 10k
times. This should not be a huge issue thanks to the concurrent execution of the connection tasks. Still, can we try to avoid that, and drop "as much bytes as possible", according to what is currently in the reader's buffer? Not sure how to do it, need to look at the internals of StreamReader
, but It might involve accessing some protected members of the reader
object.
from agent-academy-1.
That works too.
The only thing that worries me is that a single non-compliant/malicious actor is required to take the whole agent-network down. The tendermint node with the correct config would not forward it to the abci.
from agent-academy-1.
So since #56 is now merged - what shall we do about the outstanding issues? I guess we could add a simple PR to make the max bytes size configurable in config file? Anything else?
from agent-academy-1.
I suppose we could respond to the node with a non-zero code field, ie signaling that something went wrong.
For this the ABCI protocol has the
ResponseException
message type: https://github.com/valory-xyz/agent-academy-1/blob/main/packages/valory/connections/abci/protos/tendermint/abci/types.proto#L133. We could try to use it and see what is the outcome on Tendermint node side (although I think it won't be happy if it can't deliver a committed transaction!).However, reading the documentation on error handling (https://docs.tendermint.com/master/spec/abci/abci.html#errors):
Applications should always terminate if they encounter an issue in a method where continuing would corrupt their own state, or for which tendermint should not continue.
Do we fall in this category?
I think we do. I think its even worse than that, basically they are running with bad params. It is for cases it should've failed at startup, but for some reason, for example running the node independently of the abci ie setting the 'use_tendermint' flag to False.
I think its okay to have the option to discard a payload, however, I think its better to close the connection by default.
from agent-academy-1.
Okay its clearly introducing more problems than solving. I will make a pull req to just close the connection. Is that okay @marcofavorito?
Sound good @0xArdi, thank you very much.
A workaround would be to handle the inf
case explicitly. But then also nan
should be handled. And very large varints should be detected as well, like 2**100
.
In general, I think at some point in the implementation of the whole system will be important to guarantee liveness as much as possible, making the system resilient to such kind of local failures. The non-dropping-connection proposal is an important tentative in that direction. Probably at the current stage of the system implementation it is better focus on stability and giving more importance to safety rather than liveness. Let's keep in mind the alternatives so to have more choices available in the future.
from agent-academy-1.
This is not a blocker for now, since the grpc version works.
I think a possible solution would be to change the way buffer reads work.
@classmethod
def read_messages(
cls, buffer: BytesIO, message_cls: Type
) -> Generator[Request, None, None]:
"""
Return an iterator over the messages found in the `reader` buffer.
:param: buffer: the buffer to read messages from.
:param: message_cls: the message class to instantiate.
:yield: a new message.
:raise: DecodeVarintError if the varint cannot be decoded correctly.
:raise: ShortBufferLengthError if the buffer length is shorter than expected.
:raise: google.protobuf.message.DecodeError if the Protobuf decoding fails.
"""
total_length = buffer.getbuffer().nbytes
while buffer.tell() < total_length:
length = cls.decode_varint(buffer)
# check if 'length' > 'MAX_READ_IN_BYTES'
# if it is, do multiple reads until all the data has been read
data = buffer.read(length)
if len(data) < length:
raise ShortBufferLengthError(
f"expected buffer of length {length}, got {len(data)}"
)
message = message_cls()
message.ParseFromString(data)
yield message
from agent-academy-1.
Wouldn't the natural solution be to match the default settings on both connection and tendermint and leave the code unchanged? (Haven't looked at the code but remember @marcofavorito did quite a lot of fine tuning on this as there were various issues.)
from agent-academy-1.
I think the settings on tendermint and the abci should be synced. But I don't see an issue in reading chunk by chunk, unless that introduces problems. However, I don't think that 64KB will fit the needs of all users.
from agent-academy-1.
I think we can do both - put buffer length to 1MB and implement iterative reading, chunk by chunk. However, the iterative process must stop at some point anyway, for security reason; when do we stop it? What is the upper bound on the number of read bytes? We need a criterion that determines what is a good MAX_READ_IN_BYTES
for us. (EDIT: after having checked the code of the most recent implementation, I realized we already do what I think we consider reading "chunk by chink"; so we still have to increase MAX_READ_IN_BYTES
to accomodate larger txs).
I agree with David that the quickest solution is to align the abci connection settings to the Tendermint node's default settings. Other considerations:
- 1 MB denotes the maximum size of a transaction, but is this the same of the size of its serialized version, that is, the one that we read from the buffer? I suspect they are different (to be checked), that means 1MB would be an upper bound.
- the ABCI requests that contain transactions are
CheckTx
andDeliverTx
, so the buffer size should also consider the overhead introduced by the serialized ABCI message request. - given the above, would be a nice to have to allow
MAX_READ_IN_BYTES
configurable in theabci
connection config file.
Another observation is that the code snippet posted above refers to an old version of the reading loop of the ABCI connection.
The new code can be found here: https://github.com/valory-xyz/consensus-algorithms/blob/main/packages/valory/connections/abci/connection.py#L162-L187
class VarintMessageReader: # pylint: disable=too-few-public-methods
"""Varint message reader."""
def __init__(self, reader: asyncio.StreamReader) -> None:
"""Initialize the reader."""
self._reader = reader
async def read_next_message(self) -> bytes:
"""Read next message."""
varint = await _TendermintABCISerializer.decode_varint(self._reader)
if varint > MAX_READ_IN_BYTES:
raise TooLargeVarint()
message_bytes = await self.read_until(varint)
if len(message_bytes) < varint:
raise ShortBufferLengthError(varint, message_bytes)
return message_bytes
async def read_until(self, n: int) -> bytes:
"""Wait until n bytes are read from the stream."""
result = BytesIO(b"")
read_bytes = 0
while read_bytes < n:
data = await self._reader.read(n - read_bytes)
result.write(data)
read_bytes += len(data)
return result.getvalue()
And it is used here:
https://github.com/valory-xyz/consensus-algorithms/blob/main/packages/valory/connections/abci/connection.py#L281
from agent-academy-1.
How about making the upper bound configurable and enforcing it both in send and receive?
The biggest risk lies in having a lower upper bound in the abci than in tendermint, at least when the send is limit is not enforced, thats the situation where the tendermint node crashes.
In tendermint its quite easy to configure this. It just has to be synchronized. As for setting a limit, I don't think we can find a limit that serves all needs.
Regarding the status quo, this connection has grpc, so its not really affected.
from agent-academy-1.
Regarding the tcp_channel.py
issue in tests: a3f56c2
from agent-academy-1.
Currently on dd2a207 I ran in one terminal:
cd test_abci
aea run
and in another
python break_abci.py
I get:
[2022-05-04 19:50:17,904] [INFO] [test_abci] Checking status
[2022-05-04 19:50:27,910] [INFO] [test_abci] Checking status
[2022-05-04 19:50:37,918] [INFO] [test_abci] Checking status
[2022-05-04 19:50:47,052] [ERROR] [test_abci] an error occurred while reading a message: DecodeVarintError: could not decode varint. The message will be ignored.
[2022-05-04 19:50:47,053] [INFO] [test_abci] connection at EOF, stop receiving loop.
[2022-05-04 19:50:47,924] [INFO] [test_abci] Checking status
[2022-05-04 19:50:57,934] [INFO] [test_abci] Checking sync...
[2022-05-04 19:50:57,934] [INFO] [test_abci] Checking status
and
aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' and is_new=False
/Users/davidminarsch/v_projects/agent-academy-1/tests/test_connections/fuzzy_tests/mock_node/channels/tcp_channel.py:128: RuntimeWarning: coroutine 'StreamWriter.drain' was never awaited
cast(asyncio.StreamWriter, self.writer).drain()
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
INFO:tests.test_connections.fuzzy_tests.mock_node.node:Received response code: 1
log: "operation not supported"
info: "operation not supported"
from agent-academy-1.
I was about to suggest checking via self._reader.at_eof()
before trying to decode, and returning an appropriate error :), you are achieving the same thing I suppose, as EOF is called when the connection is lost.
Snippet from the StreamReader
:
def connection_lost(self, exc):
reader = self._stream_reader
if reader is not None:
if exc is None:
reader.feed_eof()
else:
reader.set_exception(exc)
if not self._closed.done():
if exc is None:
self._closed.set_result(None)
else:
self._closed.set_exception(exc)
super().connection_lost(exc)
self._stream_reader_wr = None
self._stream_writer = None
self._transport = None
```
from agent-academy-1.
Thank you for your comment @0xArdi! Yes, the fix relies on the behaviour of StreamReader.read(n)
, called in _read_one
. When n>0
, the expected behaviour is the following (as documented in the docstring):
If n is positive, this function try to read `n` bytes, and may return less or equal bytes than requested,
but at least one byte. If EOF was received before any byte is read, this function returns empty byte object.
from agent-academy-1.
Ok so works now seemingly for the current script. But the current script sends less than 1MB. When sending more than 1MB (i.e. data = 'a' * 10000000
) I get on agent side an infinite loop:
[2022-05-06 15:27:19,882] [ERROR] [test_abci] an error occurred while reading a message: TooLargeVarint: . The message will be ignored.
vendor/valory/connections/abci/connection.py:933: RuntimeWarning: Unexpected end-group tag: Not all data was converted
message.ParseFromString(message_bytes)
[2022-05-06 15:27:19,883] [WARNING] [test_abci] Decoded request None was not a match.
[2022-05-06 15:27:19,884] [ERROR] [test_abci] an error occurred while reading a message: DecodeError: Error parsing message with type 'tendermint.abci.Request'. The message will be ignored.
from agent-academy-1.
I think what is going on is the error is thrown but the buffer still has data from the request. I think what should be done is discarding the next varint
bytes.
from agent-academy-1.
@DavidMinarsch @marcofavorito This seems to do it #57 for me.
from agent-academy-1.
I would say that we can also integrate the solution that @marcofavorito suggested. We can have a flag to decide whether to close the connection, or continue by discarding the next 'varint' bytes. What do you think @marcofavorito @DavidMinarsch?
from agent-academy-1.
I would say that we can also integrate the solution that @marcofavorito suggested. We can have a flag to decide whether to close the connection, or continue by discarding the next 'varint' bytes. What do you think @marcofavorito @DavidMinarsch?
I think that since the PR is merged we can skip that for the moment. But we should keep in mind that the current solution might cause data losses wrt the ABCI application.
from agent-academy-1.
But wait... I am realizing that, since we never forward the too large request to the ABCI application (say, a DeliverTx
request), we never respond to the client (the Tendermint node); this might cause the Tendermint node to wait indefinitely for a response (because the ABCI protocol follows a strict request-response pattern) and cause liveness problems anyway. The other proposed solution is more strict as it would raise a failure, and teardown the agent and the Tendermint node; in some sense, it sacrifices liveness for better safety. The new feature needs to be tested with the node in such conditions to see the outcome. For the moment, the system configuration (Tendermint node + ABCI app + ABCI connection) should avoid as much as possible such eventuality, and keep ourselves far from the boundaries of such conditions like too big transaction whose size is too close to the maximum size allowed.
from agent-academy-1.
I suppose we could respond to the node with a non-zero code field, ie signaling that something went wrong.
from agent-academy-1.
I suppose we could respond to the node with a non-zero code field, ie signaling that something went wrong.
Another issue is that responses are request specific, so we would need to decode the request in the first place, or at least come up with a way to find the req type without decoding the whole payload.
from agent-academy-1.
I suppose we could respond to the node with a non-zero code field, ie signaling that something went wrong.
For this the ABCI protocol has the ResponseException
message type: https://github.com/valory-xyz/agent-academy-1/blob/main/packages/valory/connections/abci/protos/tendermint/abci/types.proto#L133. We could try to use it and see what is the outcome on Tendermint node side (although I think it won't be happy if it can't deliver a committed transaction!).
However, reading the documentation on error handling (https://docs.tendermint.com/master/spec/abci/abci.html#errors):
Applications should always terminate if they encounter an issue in a method where continuing would corrupt their own state, or for which tendermint should not continue.
Do we fall in this category?
from agent-academy-1.
@0xArdi @DavidMinarsch Another problem I am encountering while fixing tests for valory-xyz/open-autonomy#778:
here we test the connection behaves correctly when it receives inf
:
https://github.com/valory-xyz/consensus-algorithms/blob/bug/abci/tests/test_connections/test_abci/test_abci.py#L517-L521
But it ends up in an infinite loop because it continues reading from the stream the same amount of bytes b"hello"
and read_bytes will never reach inf
.
from agent-academy-1.
Okay its clearly introducing more problems than solving. I will make a pull req to just close the connection. Is that okay @marcofavorito?
from agent-academy-1.
#59 should do it.
from agent-academy-1.
Thanks, I merged #59. Should help with fixing tests @marcofavorito. Although you'll have to sync latest main here to the branch on consensus-algos
from agent-academy-1.
Related Issues (20)
- Backport grpc to open-autonomy
- Don't spend funds from non-whitelisted addresses
- El Collectooorr review 18/07/22 `e7f5eae0c430444ac43aa70ea64c63a5f219bca7` HOT 2
- Resetting support
- Fix the README HOT 2
- Deploy test version
- Reuse already deployed safe contract HOT 3
- Better gas setting for safe txs
- Replace AEAEnforceError with a more appropriate mechanism
- Unused reset_pause_abci
- Request -> Increase timeout for deployment as we are deploying on mainnet HOT 1
- Request -> Increase timeout for deployment as we are deploying on mainnet HOT 1
- Renaming HOT 1
- Address some points raised here
- Too many rpc calls HOT 2
- Reserve price for the first vault shows as 0 on fractional art HOT 6
- Move tests into packages
- Remove `safe_deployment_abci` HOT 1
- Move tests into the correct packages
- CI checks potentially missing
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from agent-academy-1.