cheatfate / asynctools Goto Github PK
View Code? Open in Web Editor NEWVarious asynchronous tools for Nim language
License: MIT License
Various asynchronous tools for Nim language
License: MIT License
I am working on a long running process that will start another process using asyncproc. After a couple of days it crashes stating that too many file handles are open. I am suspecting it is because the processes started are not being closed (based on information in osproc). I found the closed procedure in the code, but it is not exported. I didn't want to export it because I thought there might be a reason I'm missing somewhere.
Therefore, why is the close procedure not exported in asyncproc?
I've tried both terminate
and kill
, in both cases the process continues running (even after my program is terminated :/).
waitForExit
also has some strange behaviour.
I'm running the Nim compiler itself in case that helps.
The use case is waiting for some event until some condition is met. If the condition is not met, it should go back to wait. In the following code the condition is never met (the while true simulates this), but the async proc does not wait again (or it waits but it's immediately awaken)
block:
var event = newAsyncEv()
proc testEvent(n: int, ev: AsyncEv) {.async.} =
while true:
await ev.wait()
debugEcho n
event.clear()
asyncCheck testEvent(0, event)
asyncCheck testEvent(1, event)
event.fire()
event.clear()
waitFor sleepAsync(10_000)
Current output:
0
1
0
1
0
1
[so on]
Expected:
0
1
import asynctools/asyncipc
let ipc = createIpc("blablabla")
let writeHandle = open("blablabla", sideWriter)
test.nim(3) test
asyncipc.nim(476) open
oserr.nim(110) raiseOSError
Error: unhandled exception: Device not configured [OSError]
Regarding asycdns, I have noticed that it only returns one IP regardless of how many A records there are for a name. Here is an example...
$ dig autodiscover.outlook.com
; <<>> DiG 9.10.3-P4-Ubuntu <<>> autodiscover.outlook.com
;; global options: +cmd
;; Got answer:
;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 3053
;; flags: qr rd ra; QUERY: 1, ANSWER: 13, AUTHORITY: 0, ADDITIONAL: 1
;; OPT PSEUDOSECTION:
; EDNS: version: 0, flags:; udp: 4096
;; QUESTION SECTION:
;autodiscover.outlook.com. IN A
;; ANSWER SECTION:
autodiscover.outlook.com. 47 IN CNAME autodiscover.geo.outlook.com.
autodiscover.geo.outlook.com. 42 IN CNAME autodiscover.outlook.com.g.outlook.com.
autodiscover.outlook.com.g.outlook.com. 42 IN CNAME autodiscover-nameast3.outlook.com.
autodiscover-nameast3.outlook.com. 42 IN A 40.97.126.40
autodiscover-nameast3.outlook.com. 42 IN A 40.97.160.24
autodiscover-nameast3.outlook.com. 42 IN A 40.97.100.56
autodiscover-nameast3.outlook.com. 42 IN A 40.97.134.200
autodiscover-nameast3.outlook.com. 42 IN A 40.97.176.24
autodiscover-nameast3.outlook.com. 42 IN A 40.97.180.8
autodiscover-nameast3.outlook.com. 42 IN A 40.97.176.8
autodiscover-nameast3.outlook.com. 42 IN A 40.97.24.24
autodiscover-nameast3.outlook.com. 42 IN A 40.97.130.56
autodiscover-nameast3.outlook.com. 42 IN A 40.97.92.40
...SNIP...
When I use asyncdns I get this response:
autodiscover.outlook.com
ai_flags = 0x00000000
ai_family = 0x00000002
ai_socktype = 0x00000001
ai_protocol = 0x00000006
ai_canonname = 0x0000000000000000
ai_addrlen = 16
ai_addr = 0x00007FC3727B72B8
sin_family = 0x0002
sin_port = 0x0000 (0)
sin_addr = 40.97.160.24
The IP returned is for autodiscover-nameast.outlook.com (40.97.166.24). Is there any way to get a seq with the actual A records that the CNAMEs resolve to? Note: this is true even if I attempt to resolve autodiscover-nameast3.outlook.com directly...
ai_flags = 0x00000000
ai_family = 0x00000002
ai_socktype = 0x00000001
ai_protocol = 0x00000006
ai_canonname = 0x0000000000000000
ai_addrlen = 16
ai_addr = 0x00007FCEC0FB8228
sin_family = 0x0002
sin_port = 0x0000 (0)
sin_addr = 40.97.92.40
FYI, nim v2 broke createPipe
bug: nim-lang/Nim#23118
repro
import os, asyncdispatch, winlean
# import asynctools/asyncproc
#
# proc main()=
#
# let cmd = "nim -v"
# let p = cmd.startProcess(options = {poStdErrToStdOut, poUsePath, poEvalCommand})
#
# let exitCode = waitFor p.waitForExit()
# quit exitCode
#
# main()
const
pipeHeaderName = r"\\.\pipe\asyncpipe_"
const
DEFAULT_PIPE_SIZE = 65536'i32
FILE_FLAG_FIRST_PIPE_INSTANCE = 0x00080000'i32
PIPE_WAIT = 0x00000000'i32
PIPE_TYPE_BYTE = 0x00000000'i32
PIPE_READMODE_BYTE = 0x00000000'i32
ERROR_PIPE_CONNECTED = 535
ERROR_PIPE_BUSY = 231
ERROR_BROKEN_PIPE = 109
ERROR_PIPE_NOT_CONNECTED = 233
proc QueryPerformanceCounter(res: var int64)
{.importc: "QueryPerformanceCounter", stdcall, dynlib: "kernel32".}
proc createPipe() =
var number = 0'i64
var pipeName: WideCString
var pipeIn: Handle
var pipeOut: Handle
var sa = SECURITY_ATTRIBUTES(nLength: sizeof(SECURITY_ATTRIBUTES).cint,
lpSecurityDescriptor: nil, bInheritHandle: 1)
while true:
QueryPerformanceCounter(number)
let p = pipeHeaderName & $number
pipeName = newWideCString(p)
var openMode = FILE_FLAG_FIRST_PIPE_INSTANCE or FILE_FLAG_OVERLAPPED or
PIPE_ACCESS_INBOUND
var pipeMode = PIPE_TYPE_BYTE or PIPE_READMODE_BYTE or PIPE_WAIT
pipeIn = createNamedPipe(pipeName, openMode, pipeMode, 1'i32,
DEFAULT_PIPE_SIZE, DEFAULT_PIPE_SIZE,
1'i32, addr sa)
if pipeIn == INVALID_HANDLE_VALUE:
let err = osLastError()
if err.int32 != ERROR_PIPE_BUSY:
raiseOsError(err)
else:
break
var openMode = (FILE_WRITE_DATA or SYNCHRONIZE)
pipeOut = createFileW(pipeName, openMode, 0, addr(sa), OPEN_EXISTING,
FILE_FLAG_OVERLAPPED, 0)
if pipeOut == INVALID_HANDLE_VALUE:
let err = osLastError()
discard closeHandle(pipeIn)
raiseOsError(err)
createPipe()
import asynctools/asyncproc, asyncdispatch
echo waitFor execProcess("pwd")
fails with
asyncprocBroken.nim(2) asyncprocBroken
asyncmacro.nim(304) execProcess
asyncmacro.nim(36) execProcess_continue
asyncproc.nim(893) execProcessIter
system.nim(2805) sysFatal
[[reraised from:
asyncprocBroken.nim(2) asyncprocBroken
asyncdispatch.nim(1652) waitFor
asyncfutures.nim(302) read
]]
Error: unhandled exception: index out of bounds
Async traceback:
asyncprocBroken.nim(2) asyncprocBroken
asyncmacro.nim(304) execProcess
asyncmacro.nim(36) execProcess_continue
## Resumes an async procedure
asyncproc.nim(893) execProcessIter
system.nim(2805) sysFatal
Exception message: index out of bounds
Exception type: [IndexError]
Hi, I'm trying to write an app that could start a subprocess, and react to its stdout or stderr, depending on what shows up. I was given some initial advice on Nim forum, and I tried to build something based on it, but I think I must be doing something wrong, as I'm not getting results I was expecting :( I have files like below:
import asynctools/[asyncproc, asyncpipe]
import asyncfutures
import asyncdispatch
var p = startProcess("q_outerr", options = {})
var pout = p.outputHandle
var perr = p.errorHandle
proc next() {.async.} =
var bufo = newString(1)
var fo = pout.readInto(bufo[0].addr, bufo.len)
var bufe = newString(1)
var fe = perr.readInto(bufe[0].addr, bufe.len)
var fx = p.waitForExit()
while true:
await fe or fo or fx
if fo.finished:
if fo.read > 0:
echo "O ", fo.read(), " ", bufo.substr(0, fo.read()-1)
fo = pout.readInto(bufo[0].addr, bufo.len)
if fe.finished:
if fe.read > 0:
echo "E ", fe.read(), " ", bufe.substr(0, fe.read()-1)
fe = perr.readInto(bufe[0].addr, bufe.len)
if fx.finished:
echo "X ", fx.read()
return
if fo.finished and fe.finished and fo.read==0 and fe.read==0:
return
waitFor next()
import os
stderr.writeLine "some stderr"
stderr.writeLine "some stderr2"
stdout.writeLine "some stdout2"
stderr.writeLine "some stderr3"
stderr.writeLine "some stderr4"
Now, when I compile both, and run q_async.exe
(on Windows), I'm getting output like below:
C:\prog\mana>q_async
O 1 s
E 1 s
O 1 o
X 0
If I remove fx
from q_async
, it's just hanging at some point, still not printing all output from q_outerr
.
Do you have any idea what I might be doing wrong, and how I could make this work correctly, to display all stdout & stderr from q_outerr.nim
?
i just found your library and i think it can solve my problem : i tried to start a process (a chess engine) in Nim and feed commands to it (stdin) and catch its output (stdout) .. i managed to get this working somehow but my solution seems primitive and incomplete, see https://forum.nim-lang.org/t/10613
when i execute the process 'stockfish' in terminal, the program returns its 'bestmove' but my Nim script doesn't output this last line !? I read about Nim threads, threadpools, async, await, spawn but it i don't find good examples and so it's all too difficult for me ..
can anybody give a (simple) example? Just feed commands to an external process and catch its output? It seems Malebolgia can do this easily, but i have no clue .. eventually i want to build a GUI app which uses the output info of such chess engine to calculate things and output a graph, while the engine is still running; eg. the engine process also has to listen to a "stop" command ..
This chain of events leads to a crash:
p1
started.p2
started.p1
finishes, and the crash happens here.Small example:
import asyncdispatch, asynctools/asyncproc
proc run() {.async.} =
while true:
await sleepAsync(300)
var p = startProcess("sleep 1 && ls", options={poUsePath, poEvalCommand})
asyncCheck p.waitForExit()
waitFor run()
Which gives the error:
Error: unhandled exception: Resource temporarily unavailable (code: 11) [IOSelectorsException]
This code eats 100MB memory on Windows.
import asyncdispatch
import asynctools/asyncpipe
proc main() {.async.} =
let pipe = createPipe()
var data = "x"
for i in 0..100000:
discard await pipe.write(data.cstring, 1)
discard await pipe.readInto(data.cstring, 1)
waitFor main()
echo "Press ENTER to exit"
discard stdin.readLine()
Is it possible to add a function that opens stdin and stdout asynchronously on windows ?
Some possible ways are mentioned in this article.
At least langserver and nimlsp need it. PMunch/nimlsp#131 (comment)
Running the example code in asyncpipe gives me this error
../asynctools/asyncpipe.nim(339, 18) Error: undeclared identifier: 'pipe2'
Using ubuntu 18.04, asyncipc works though
Hi, first of all, thanks so much for this. Is there any chance you can add examples? Specifically, I'm eager to try the async DNS resolver. I'm a noob to Nim and it would help!
edit
Well, I read through the code and found a bit of an example at the end:
echo "=== asynchronous variant"
var aiList = waitFor(asyncGetAddrInfo("www.google.com", Port(80), domain = Domain.AF_INET))
What's with the "Port(80)" in there? I guess I'm a bit confused because I'm not sure what the HTTP port has to do with a DNS request.
Also when I run that code I get:
Error: ambiguous call; both nativesockets.ntohs(x: uint16)[declared in lib/pure/nativesockets.nim(269, 5)] and posix.ntohs(a1: uint16)[declared in lib/posix/posix.nim(122, 5)] match for: (uint16)
import pkg/asynctools
C:\Users\User.nimble\pkgs\asynctools-0.1.1\asynctools\asyncipc.nim(190, 40) Error: expression cannot be cast to 'pointer'
OS: Windows 10
Nim: 1.9.5
.nimble/pkgs/asynctools-0.1.1/asynctools/asyncsync.nim(159, 15) Error: type mismatch: got <proc (){.closure.}>
but expected one of:
proc callSoon(cbproc: proc () {.gcsafe.})
first type mismatch at position: 1
required type for cbproc: proc (){.closure, gcsafe.}
but expression 'wakeupAll' is of type: proc (){.closure.}
This expression is not GC-safe. Annotate the proc with {.gcsafe.} to get extended error information.
expression: callSoon(wakeupAll)
The following simple program crashes on Windows:
import strformat, std/enumerate, os, asyncdispatch, asynctools/asyncproc
type
ProcessOutput = tuple[exitCode: int, output: string]
var urlsToDownload: seq[string] = @[
"www.w3.org/TR/html401/html40.txt",
"www.w3.org/TR/2002/REC-xhtml1-20020801/xhtml1.pdf"]
proc downloadUrl(url: string): Future[ProcessOutput] {.async.} =
return await asyncproc.execProcess(&"curl {url}")
var urlsContents: seq[Future[ProcessOutput]]
for url in urlsToDownload:
echo &"Downloading {url} ..."
urlsContents.add downloadUrl(url)
let results = waitFor all(urlsContents)
for i, result in enumerate(results):
if result.exitCode != QuitSuccess:
echo &"Error {result.exitCode}: {result.output}"
continue
let fileName = urlsToDownload[i].splitPath.tail
writeFile(fileName, result.output)
Tested with:
84ced6d002789567f2396c75800ffd6dff2866f7
Nim Compiler Version 1.5.1 [Windows: amd64]
Compiled at 2021-07-08
Copyright (c) 2006-2021 by Andreas Rumpf
git hash: c57bd0f3eed8ff6cbfc2f806bed6e1f6b5135f5b
active boot switches: -d:release -d:danger
Found during the CI for nimlsp: https://github.com/PMunch/nimlsp/actions/runs/3807847256/jobs/6477921305#step:7:438
Thanks for this cool lib!
Unfortunately I'm getting an error occasionally.
Reproducibility: 2 of 5 runs ("parallel" async tasks as in the sample seem to increase reproducibility)
OS: MacOS
import asyncdispatch, asynctools/asyncproc, os
proc oneTest() {.async.} =
discard await execProcess("git ls-remote https://github.com/yglukhov/nimx")
proc manyTests() {.async.} =
var futures = newSeq[Future[void]]()
for i in 0 .. 10:
futures.add(oneTest())
await all futures
waitFor manyTests()
Error: unhandled exception: not running(p)
execProcess's lead up to read of failed Future:
test3.nim(12) test3
asyncdispatch.nim(1492) waitFor
asyncdispatch.nim(1496) poll
asyncdispatch.nim(1262) runOnce
asyncdispatch.nim(183) processPendingCallbacks
asyncmacro.nim(34) cb0
asyncproc.nim(834) execProcessIter
asyncproc.nim(748) close
system.nim(3748) raiseAssert
system.nim(2811) sysFatal
asyncdispatch.all's lead up to read of failed Future:
test3.nim(12) test3
asyncdispatch.nim(1492) waitFor
asyncdispatch.nim(1496) poll
asyncdispatch.nim(1262) runOnce
asyncdispatch.nim(183) processPendingCallbacks
asyncmacro.nim(34) cb0
asyncproc.nim(834) execProcessIter
asyncproc.nim(748) close
system.nim(3748) raiseAssert
system.nim(2811) sysFatal
[[reraised from:
test3.nim(12) test3
asyncdispatch.nim(1492) waitFor
asyncdispatch.nim(1496) poll
asyncdispatch.nim(1262) runOnce
asyncdispatch.nim(183) processPendingCallbacks
asyncmacro.nim(34) cb0
test3.nim(4) oneTestIter
asyncfutures.nim(243) read
]]
manyTests's lead up to read of failed Future:
test3.nim(12) test3
asyncdispatch.nim(1492) waitFor
asyncdispatch.nim(1496) poll
asyncdispatch.nim(1262) runOnce
asyncdispatch.nim(183) processPendingCallbacks
asyncmacro.nim(34) cb0
asyncproc.nim(834) execProcessIter
asyncproc.nim(748) close
system.nim(3748) raiseAssert
system.nim(2811) sysFatal
[[reraised from:
test3.nim(12) test3
asyncdispatch.nim(1492) waitFor
asyncdispatch.nim(1496) poll
asyncdispatch.nim(1262) runOnce
asyncdispatch.nim(183) processPendingCallbacks
asyncmacro.nim(34) cb0
test3.nim(4) oneTestIter
asyncfutures.nim(243) read
]]
[[reraised from:
test3.nim(12) test3
asyncdispatch.nim(1492) waitFor
asyncdispatch.nim(1496) poll
asyncdispatch.nim(1262) runOnce
asyncdispatch.nim(183) processPendingCallbacks
asyncmacro.nim(34) cb0
asyncmacro.nim manyTestsIter
asyncfutures.nim(243) read
]] [AssertionError]
i get the following error when running the asyncipc example
example.nim(12) example
asyncipc.nim(238) open
oserr.nim(110) raiseOSError
Error: unhandled exception: The parameter is incorrect.
[OSError]
Error: execution of an external program failed: 'e:\inter-process\example.exe '
if i add
echo getLastError()
after this line it seems to work
https://github.com/cheatfate/asynctools/blob/master/asynctools/asyncipc.nim#L233
Consider the following examples:
Variant: 1 writing into a asyncPipe
import
faststreams/async_backend,
faststreams/asynctools_adapters,
faststreams/textio,
faststreams/inputs
proc copyStdioToPipe(pipe: AsyncPipe) {.thread.} =
var ch = "hello"
var ch2 = "\n"
while ch[0] != '\0':
discard waitFor write(pipe, ch[0].addr, 1)
discard waitFor write(pipe, ch[0].addr, 1)
discard waitFor write(pipe, ch[0].addr, 1)
discard waitFor write(pipe, ch[0].addr, 1)
discard waitFor write(pipe, ch[0].addr, 1)
discard waitFor write(pipe, ch[0].addr, 1)
discard waitFor write(pipe, ch2[0].addr, 1)
proc myReadLine(input: AsyncInputStream): Future[void] {.async.} =
while input.readable:
discard input.readLine()
when isMainModule:
var
pipe = createPipe(register = true)
stdioThread: Thread[AsyncPipe]
createThread(stdioThread, copyStdioToPipe, pipe)
let a = asyncPipeInput(pipe)
waitFor(myReadLine(a))
fails with
/home/yyoncho/Sources/nim/langserver/nls.nim(12) copyStdioToPipe
/home/yyoncho/Sources/nim/asynctools/asynctools/asyncpipe.nim(424) write
/home/yyoncho/.choosenim/toolchains/nim-#devel/lib/pure/asyncdispatch.nim(1220) addWrite
Error: unhandled exception: File descriptor not registered. [ValueError]
Variant: 2 writing into a pipe using posix
import
posix,
os,
faststreams/async_backend,
faststreams/asynctools_adapters,
faststreams/textio,
faststreams/inputs
proc writeToPipe(p: AsyncPipe, data: pointer, nbytes: int) =
if posix.write(p.getWriteHandle, data, cint(nbytes)) < 0:
raiseOsError(osLastError())
proc copyStdioToPipe(pipe: AsyncPipe) {.thread.} =
var ch = "X222"
var ch2 = "\n"
while ch[0] != '\0':
writeToPipe(pipe, ch[0].addr, 1)
writeToPipe(pipe, ch[0].addr, 1)
writeToPipe(pipe, ch[0].addr, 1)
writeToPipe(pipe, ch[0].addr, 1)
writeToPipe(pipe, ch[0].addr, 1)
writeToPipe(pipe, ch[0].addr, 1)
writeToPipe(pipe, ch2[0].addr, 1)
proc myReadLine(input: AsyncInputStream): Future[void] {.async.} =
while input.readable:
echo await input.readLine()
when isMainModule:
var
pipe = createPipe(register = true)
stdioThread: Thread[AsyncPipe]
createThread(stdioThread, copyStdioToPipe, pipe)
let a = asyncPipeInput(pipe)
waitFor(myReadLine(a))
Fails with:
/home/yyoncho/Sources/nim/langserver/nls.nim(19) copyStdioToPipe
/home/yyoncho/Sources/nim/langserver/nls.nim(11) writeToPipe
/home/yyoncho/.choosenim/toolchains/nim-#devel/lib/pure/includes/oserr.nim(94) raiseOSError
Error: unhandled exception: Resource temporarily unavailable [OSError]
Investigating the failure in variant 2 I found https://stackoverflow.com/questions/14370489/what-can-cause-a-resource-temporarily-unavailable-on-sock-send-command and by making the input file descriptor as non blocking I was able to solve the issue:
- proc createPipe*(size = 65536, register = true): AsyncPipe =
+ proc createPipe*(size = 65536, register = true, nonBlockingWrite = true): AsyncPipe =
var fds: array[2, cint]
if posix.pipe(fds) == -1:
raiseOSError(osLastError())
setNonBlocking(fds[0])
- setNonBlocking(fds[1])
+ if nonBlockingWrite:
+ setNonBlocking(fds[1])
Let me know if this fix makes sense to you (it is a bit controversial since it
will allow creating asyncPipe with sync FD). Alternatively, making
readPipe/writePipe public will do too but it still looks disputable.
.nimble/pkgs/asynctools-0.1.0/asynctools/asyncdns.nim(661, 28) Error: type mismatch: got <cint> but expected 'TSa_Family = uint16'
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.