admin管理员组文章数量:1327890
This is part of a larger process that I've distilled down to the minimal, reproducible example in node v14.4.0. In this code, it outputs nothing from inside the for
loop.
I see only this output in the console:
before for() loop
finished
finally
done
The for await (const line1 of rl1)
loop never goes into the for
loop - it just skips right over it:
const fs = require('fs');
const readline = require('readline');
const { once } = require('events');
async function test(file1, file2) {
try {
const stream1 = fs.createReadStream(file1);
await once(stream1, 'open');
const rl1 = readline.createInterface({input: stream1, crlfDelay: Infinity});
const stream2 = fs.createReadStream(file2);
await once(stream2, 'open');
const rl2 = readline.createInterface({input: stream2, crlfDelay: Infinity});
console.log('before for() loop');
for await (const line1 of rl1) {
console.log(line1);
}
console.log('finished');
} finally {
console.log('finally');
}
}
test("data/numbers.txt", "data/letters.txt").then(() => {
console.log(`done`);
}).catch(err => {
console.log('Got rejected promise:', err);
})
But, if I remove either of the await once(stream, 'open')
statements, then the for
loop does exactly what it is expected to (lists all the lines of the rl1
file). So, apparently, there's some timing problem with the async iterator from the readline interface between that and the stream. Any ideas what could be going on. Any idea what could be causing this one or how to work around it?
FYI, the await once(stream, 'open')
is there because of another bug in the async iterator where it does not reject if there's an issue opening the file whereas the await once(stream, 'open')
causes you to properly get a rejection if the file can't be opened (essentially pre-flighting the open).
If you're wondering why the stream2 code is there, it is used in the larger project, but I've reduced this example down to the minimal, reproducible example and only this much of the code is needed to demonstrate the problem.
Edit: In trying a slightly different implementation, I found that if I bine the two once(stream, "open")
calls in a Promise.all()
, that it then works. So, this works:
const fs = require('fs');
const readline = require('readline');
const { once } = require('events');
async function test(file1, file2) {
try {
const stream1 = fs.createReadStream(file1);
const rl1 = readline.createInterface({input: stream1, crlfDelay: Infinity});
const stream2 = fs.createReadStream(file2);
const rl2 = readline.createInterface({input: stream2, crlfDelay: Infinity});
// pre-flight file open to catch any open errors here
// because of existing bug in async iterator with file open errors
await Promise.all([once(stream1, "open"), once(stream2, "open")]);
console.log('before for() loop');
for await (const line1 of rl1) {
console.log(line1);
}
console.log('finished');
} finally {
console.log('finally');
}
}
test("data/numbers.txt", "data/letters.txt").then(() => {
console.log(`done`);
}).catch(err => {
console.log('Got rejected promise:', err);
});
This is obviously not supposed to be sensitive to exactly how you wait for file open There's some timing bug somewhere. I'd like to find that bug on either readline or readStream and file it. Any ideas?
This is part of a larger process that I've distilled down to the minimal, reproducible example in node v14.4.0. In this code, it outputs nothing from inside the for
loop.
I see only this output in the console:
before for() loop
finished
finally
done
The for await (const line1 of rl1)
loop never goes into the for
loop - it just skips right over it:
const fs = require('fs');
const readline = require('readline');
const { once } = require('events');
async function test(file1, file2) {
try {
const stream1 = fs.createReadStream(file1);
await once(stream1, 'open');
const rl1 = readline.createInterface({input: stream1, crlfDelay: Infinity});
const stream2 = fs.createReadStream(file2);
await once(stream2, 'open');
const rl2 = readline.createInterface({input: stream2, crlfDelay: Infinity});
console.log('before for() loop');
for await (const line1 of rl1) {
console.log(line1);
}
console.log('finished');
} finally {
console.log('finally');
}
}
test("data/numbers.txt", "data/letters.txt").then(() => {
console.log(`done`);
}).catch(err => {
console.log('Got rejected promise:', err);
})
But, if I remove either of the await once(stream, 'open')
statements, then the for
loop does exactly what it is expected to (lists all the lines of the rl1
file). So, apparently, there's some timing problem with the async iterator from the readline interface between that and the stream. Any ideas what could be going on. Any idea what could be causing this one or how to work around it?
FYI, the await once(stream, 'open')
is there because of another bug in the async iterator where it does not reject if there's an issue opening the file whereas the await once(stream, 'open')
causes you to properly get a rejection if the file can't be opened (essentially pre-flighting the open).
If you're wondering why the stream2 code is there, it is used in the larger project, but I've reduced this example down to the minimal, reproducible example and only this much of the code is needed to demonstrate the problem.
Edit: In trying a slightly different implementation, I found that if I bine the two once(stream, "open")
calls in a Promise.all()
, that it then works. So, this works:
const fs = require('fs');
const readline = require('readline');
const { once } = require('events');
async function test(file1, file2) {
try {
const stream1 = fs.createReadStream(file1);
const rl1 = readline.createInterface({input: stream1, crlfDelay: Infinity});
const stream2 = fs.createReadStream(file2);
const rl2 = readline.createInterface({input: stream2, crlfDelay: Infinity});
// pre-flight file open to catch any open errors here
// because of existing bug in async iterator with file open errors
await Promise.all([once(stream1, "open"), once(stream2, "open")]);
console.log('before for() loop');
for await (const line1 of rl1) {
console.log(line1);
}
console.log('finished');
} finally {
console.log('finally');
}
}
test("data/numbers.txt", "data/letters.txt").then(() => {
console.log(`done`);
}).catch(err => {
console.log('Got rejected promise:', err);
});
This is obviously not supposed to be sensitive to exactly how you wait for file open There's some timing bug somewhere. I'd like to find that bug on either readline or readStream and file it. Any ideas?
Share Improve this question edited Jul 13, 2020 at 23:30 jfriend00 asked Jul 13, 2020 at 23:08 jfriend00jfriend00 708k103 gold badges1k silver badges1k bronze badges 12- I see an existing May 18th issue filed on a similar problem. I have added my example to that issue. And, another related issue from July 5th. – jfriend00 Commented Jul 13, 2020 at 23:37
-
And, here''s the issue from last December that causes me to have to put in the
await once(stream, 'open')
in order to properly catch errors when opening the file. – jfriend00 Commented Jul 13, 2020 at 23:42 - That December bug arose out of this stackoverflow question: How to handle error from fs readline interface async iterator. – jfriend00 Commented Jul 13, 2020 at 23:56
-
My first thought based on the difference between your two code snippets is that a
data
event is fired before you attach the read stream to the readline interface. Awaitingopen
before doing something with the stream likely has implications since event emitters are synchronous and promise resolution is not. – Jake Holzinger Commented Jul 14, 2020 at 0:00 - 2 @JakeHolzinger - Is it documented that you have to use it immediately without any intervening asynchronous operations? And, why would an ASYNCHRONOUS iterator have a requirement that you can't use it around other asynchronous operations? And, it's got other bugs. So, many I've encountered that I would rather reimplement line by line processing myself (or use an external module) rather than use it. When you insert one line of code and it breaks for some undocumented timing reasons, that's just a broken design. It should be fixed to be reliable or removed. – jfriend00 Commented Jul 14, 2020 at 0:42
3 Answers
Reset to default 5It turns out the underlying issue is that readline.createInterface()
immediately, upon calling it will add a data
event listener (code reference here) and resume the stream to start the stream flowing.
input.on('data', ondata);
and
input.resume();
Then, in the ondata
listener, it parses the data for lines and when it finds a line, it fires a line
events here.
for (let n = 0; n < lines.length; n++)
this._onLine(lines[n]);
But, in my examples, there were other asynchronous things happening between the time that readline.createInterface()
was called and the async iterator was created (that would listen for the line
events). So, line
events were being emitted and nothing was yet listening for them.
So, to work properly readline.createInterface()
REQUIRES that whatever is going to listen for the line
events MUST be added synchronously after calling readline.createInterface()
or there is a race condition and line
events may get lost.
In my original code example, a reliable way to work-around it is to not call readline.createInterface()
until after I've done the await once(...)
. Then, the asynchronous iterator will be created synchronously right after readline.createInterface()
is called.
const fs = require('fs');
const readline = require('readline');
const { once } = require('events');
async function test(file1, file2) {
try {
const stream1 = fs.createReadStream(file1);
const stream2 = fs.createReadStream(file2);
// wait for both files to be open to catch any "open" errors here
// since readline has bugs about not properly reporting file open errors
// this await must be done before either call to readline.createInterface()
// to avoid race conditions that can lead to lost lines of data
await Promise.all([once(stream1, "open"), once(stream2, "open")]);
const rl1 = readline.createInterface({input: stream1, crlfDelay: Infinity});
const rl2 = readline.createInterface({input: stream2, crlfDelay: Infinity});
console.log('before for() loop');
for await (const line1 of rl1) {
console.log(line1);
}
console.log('finished');
} finally {
console.log('finally');
}
}
test("data/numbers.txt", "data/letters.txt").then(() => {
console.log(`done`);
}).catch(err => {
console.log('Got rejected promise:', err);
});
One way to fix this general issue would be to change readline.createInterface()
so that it does not add the data
event and resume the stream UNTIL somebody adds a line
event listener. This would prevent data loss. It would allow the readline interface object to sit there quietly without losing data until the receiver of its output was actually ready. This would work for the async iterator and it would also prevent other uses of the interface that had other asynchronous code mixed in from possibly losing line
events.
Note about this added to a related open readline bug issue here.
You can make this work as expected if you create the async iterator immediately after constructing the readline interface. If you wait to create the async iterator you may lose some lines as the line events are not buffered by the readline interface, but by virtue of the async iterator they will be buffered.
const fs = require('fs');
const readline = require('readline');
const { once } = require('events');
async function test(file1, file2) {
try {
const stream1 = fs.createReadStream(file1);
await once(stream1, 'open');
const rl1 = readline.createInterface({input: stream1, crlfDelay: Infinity});
const rl1Iterator = rl1[Symbol.asyncIterator]();
const stream2 = fs.createReadStream(file2);
await once(stream2, 'open');
const rl2 = readline.createInterface({input: stream2, crlfDelay: Infinity});
console.log('before for() loop');
for await (const line1 of rl1Iterator) {
console.log(line1);
}
console.log('finished');
} finally {
console.log('finally');
}
}
test("stream.txt", "stream.txt").then(() => {
console.log(`done`);
}).catch(err => {
console.log('Got rejected promise:', err);
})
Based on discussion in the ments this still may not be a desirable solution as the readline module has various other issues, but I figured I would add an answer to resolve the problem as indicated in the original question.
The readline
module could also be replaced with a simple Transform
stream using the more modern stream API. The modern stream API supports async iterators out of the box as well as backpressure (e.g. the write side of the stream (file reading) will pause until the read side of the stream (line reading) is being consumed).
const fs = require('fs');
const { Transform } = require('stream');
function toLines() {
let remaining = '';
return new Transform({
writableObjectMode: false,
readableObjectMode: true,
transform(chunk, encoding, callback) {
try {
const lines = (remaining + chunk).split(/\r?\n/g);
remaining = lines.pop();
for (const line of lines) {
this.push(line);
}
callback();
} catch (err) {
callback(err);
}
},
flush(callback) {
if (remaining !== '') {
this.push(remaining);
}
callback();
}
});
}
async function test(file1, file2) {
try {
const stream1 = fs.createReadStream(file1, { encoding: 'utf8' });
const rl1 = stream1.pipe(toLines());
const stream2 = fs.createReadStream(file2, { encoding: 'utf8' });
const rl2 = stream2.pipe(toLines());
console.log('before for() loop');
for await (const line1 of rl1) {
console.log(line1);
}
console.log('finished');
} finally {
console.log('finally');
}
}
This example doesn't support the crlfDelay
option of the readline
module, but the algorithm could be modified to do something similar. It also (as far as I can tell) has better error handling than is supported by the readline
module.
本文标签: javascriptWhy does this readline async iterator not work properlyStack Overflow
版权声明:本文标题:javascript - Why does this readline async iterator not work properly? - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1742229842a2437019.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论