茄子在线看片免费人成视频,午夜福利精品a在线观看,国产高清自产拍在线观看,久久综合久久狠狠综合

    <s id="ddbnn"></s>
  • <sub id="ddbnn"><ol id="ddbnn"></ol></sub>

  • <legend id="ddbnn"></legend><s id="ddbnn"></s>

    Nodejs Stream 數(shù)據(jù)流使用手冊
    來源:易賢網(wǎng) 閱讀:924 次 日期:2016-07-08 10:49:12
    溫馨提示:易賢網(wǎng)小編為您整理了“Nodejs Stream 數(shù)據(jù)流使用手冊”,方便廣大網(wǎng)友查閱!

    這篇文章主要介紹了Nodejs Stream 數(shù)據(jù)流使用手冊的相關資料,感興趣的小伙伴一起學習吧

    1、介紹

    本文介紹了使用 node.js streams 開發(fā)程序的基本方法。

    <code class="hljs mizar">"We should have some ways of connecting programs like garden hose--screw in

    another segment when it becomes necessary to massage data in

    another way. This is the way of IO also."

    Doug McIlroy. October 11, 1964</code>

    最早接觸Stream是從早期的unix開始的數(shù)十年的實踐證明Stream 思想可以很簡單的開發(fā)出一些龐大的系統(tǒng)。在unix里,Stream是通過 |實現(xiàn)的;在node中,作為內(nèi)置的stream模塊,很多核心模塊和三方模塊都使用到。和unix一樣,node Stream主要的操作也是.pipe(),使用者可以使用反壓力機制來控制讀和寫的平衡。

    Stream 可以為開發(fā)者提供可以重復使用統(tǒng)一的接口,通過抽象的Stream接口來控制Stream之間的讀寫平衡。

    2、為什么使用Stream

    node中的I/O是異步的,因此對磁盤和網(wǎng)絡的讀寫需要通過回調(diào)函數(shù)來讀取數(shù)據(jù),下面是一個文件下載服務器的簡單代碼:

    <code class="hljs javascript">var http = require('http');

    var fs = require('fs');

    var server = http.createServer(function (req, res) {

    fs.readFile(__dirname + '/data.txt', function (err, data) {

    res.end(data);

    });

    });

    server.listen(8000);</code>

    這些代碼可以實現(xiàn)需要的功能,但是服務在發(fā)送文件數(shù)據(jù)之前需要緩存整個文件數(shù)據(jù)到內(nèi)存,如果"data.txt"文件很大且并發(fā)量很大的話,會浪費很多內(nèi)存。因為用戶需要等到整個文件緩存到內(nèi)存才能接受的文件數(shù)據(jù),這樣導致用戶體驗相當不好。不過還好(req, res)兩個參數(shù)都是Stream,這樣我們可以用fs.createReadStream()代替fs.readFile():

    <code class="hljs javascript">var http = require('http');

    var fs = require('fs');

    var server = http.createServer(function (req, res) {

    var stream = fs.createReadStream(__dirname + '/data.txt');

    stream.pipe(res);

    });

    server.listen(8000);</code>

    .pipe()方法監(jiān)聽fs.createReadStream()的'data' 和'end'事件,這樣"data.txt"文件就不需要緩存整個文件,當客戶端連接完成之后馬上可以發(fā)送一個數(shù)據(jù)塊到客戶端。使用.pipe()另一個好處是可以解決當客戶端延遲非常大時導致的讀寫不平衡問題。如果想壓縮文件再發(fā)送,可以使用三方模塊實現(xiàn):

    <code class="hljs javascript">var http = require('http');

    var fs = require('fs');

    var oppressor = require('oppressor');

    var server = http.createServer(function (req, res) {

    var stream = fs.createReadStream(__dirname + '/data.txt');

    stream.pipe(oppressor(req)).pipe(res);

    });

    server.listen(8000);</code>

    這樣文件就會對支持gzip和deflate的瀏覽器進行壓縮。oppressor 模塊會處理所有的content-encoding。

    Stream使開發(fā)程序變得簡單。

    3、基礎概念

    有五種基本的Stream: readable, writable, transform, duplex, and”classic”.

    3-1、pipe

    所有類型的Stream收是使用 .pipe() 來創(chuàng)建一個輸入輸出對,接收一個可讀流src并將其數(shù)據(jù)輸出到可寫流dst,如下:

    <code class="hljs perl">src.pipe(dst)</code>

    .pipe( dst )方法為返回dst流,這樣就可以接連使用多個.pipe(),如下:

    <code class="hljs perl">a.pipe( b ).pipe( c ).pipe( d )</code>

    功能與下面的代碼相同:

    <code class="hljs perl">a.pipe( b );

    b.pipe( c );

    c.pipe( d );</code>

    3-2、readable streams

    通過調(diào)用Readable streams的 .pipe()方法可以把Readable streams的數(shù)據(jù)寫入一個Writable , Transform, 或者Duplex stream。

    <code class="hljs perl">readableStream.pipe( dst )</code>

    1>創(chuàng)建 readable stream

    這里我們創(chuàng)建一個readable stream!

    <code class="hljs perl">var Readable = require('stream').Readable;

    var rs = new Readable;

    rs.push('beep ');

    rs.push('boop\n');

    rs.push(null);

    rs.pipe(process.stdout);

    $ node read0.js

    beep boop

    </code>

    rs.push( null ) 通知數(shù)據(jù)接收者數(shù)據(jù)已經(jīng)發(fā)送完畢.

    注意到我們在將所有數(shù)據(jù)內(nèi)容壓入可讀流之前并沒有調(diào)用rs.pipe(process.stdout);,但是我們壓入的所有數(shù)據(jù)內(nèi)容還是完全的輸出了,這是因為可讀流在接收者沒有讀取數(shù)據(jù)之前,會緩存所有壓入的數(shù)據(jù)。但是在很多情況下, 更好的方法是只有數(shù)據(jù)接收著請求數(shù)據(jù)的時候,才壓入數(shù)據(jù)到可讀流而不是緩存整個數(shù)據(jù)。下面我們重寫 一下._read()函數(shù):

    <code class="hljs javascript">var Readable = require('stream').Readable;

    var rs = Readable();

    var c = 97;

    rs._read = function () {

    rs.push(String.fromCharCode(c++));

    if (c > 'z'.charCodeAt(0)) rs.push(null);

    };

    rs.pipe(process.stdout);</code>

    <code class="hljs bash">$ node read1.js

    abcdefghijklmnopqrstuvwxyz</code>

    上面的代碼通過重寫_read()方法實現(xiàn)了只有在數(shù)據(jù)接受者請求數(shù)據(jù)才向可讀流中壓入數(shù)據(jù)。_read()方法也可以接收一個size參數(shù)表示數(shù)據(jù)請求著請求的數(shù)據(jù)大小,但是可讀流可以根據(jù)需要忽略這個參數(shù)。

    注意我們也可以用util.inherits()繼承可讀流。為了說明只有在數(shù)據(jù)接受者請求數(shù)據(jù)時_read()方法才被調(diào)用,我們在向可讀流壓入數(shù)據(jù)時做一個延時,如下:

    <code class="hljs javascript">var Readable = require('stream').Readable;

    var rs = Readable();

    var c = 97 - 1;

    rs._read = function () {

    if (c >= 'z'.charCodeAt(0)) return rs.push(null);

    setTimeout(function () {

    rs.push(String.fromCharCode(++c));

    }, 100);

    };

    rs.pipe(process.stdout);

    process.on('exit', function () {

    console.error('\n_read() called ' + (c - 97) + ' times');

    });

    process.stdout.on('error', process.exit);</code>

    用下面的命令運行程序我們發(fā)現(xiàn)_read()方法只調(diào)用了5次:

    <code class="hljs bash">$ node read2.js | head -c5

    abcde

    _read() called 5 times</code>

    使用計時器的原因是系統(tǒng)需要時間來發(fā)送信號來通知程序關閉管道。使用process.stdout.on('error', fn) 是為了處理系統(tǒng)因為header命令關閉管道而發(fā)送SIGPIPE信號,因為這樣會導致process.stdout觸發(fā)EPIPE事件。如果想創(chuàng)建一個的可以壓入任意形式數(shù)據(jù)的可讀流,只要在創(chuàng)建流的時候設置參數(shù)objectMode為true即可,例如:Readable({ objectMode: true })。

    2>讀取readable stream數(shù)據(jù)

    大部分情況下我們只要簡單的使用pipe方法將可讀流的數(shù)據(jù)重定向到另外形式的流,但是在某些情況下也許直接從可讀流中讀取數(shù)據(jù)更有用。如下:

    <code class="hljs php">process.stdin.on('readable', function () {

    var buf = process.stdin.read();

    console.dir(buf);

    });

    $ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume0.js 

    <buffer 0a="" 61="" 62="" 63="">

    <buffer 0a="" 64="" 65="" 66="">

    <buffer 0a="" 67="" 68="" 69="">

    null</buffer></buffer></buffer></code>

    當可讀流中有數(shù)據(jù)可讀取時,流會觸發(fā)'readable' 事件,這樣就可以調(diào)用.read()方法來讀取相關數(shù)據(jù),當可讀流中沒有數(shù)據(jù)可讀取時,.read() 會返回null,這樣就可以結(jié)束.read() 的調(diào)用, 等待下一次'readable' 事件的觸發(fā)。下面是一個使用.read(n)從標準輸入每次讀取3個字節(jié)的例子:

    <code class="hljs javascript">process.stdin.on('readable', function () {

    var buf = process.stdin.read(3);

    console.dir(buf);

    });</code>

    如下運行程序發(fā)現(xiàn),輸出結(jié)果并不完全!

    <code class="hljs bash">$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume1.js 

    <buffer 61="" 62="" 63="">

    <buffer 0a="" 64="" 65="">

    <buffer 0a="" 66="" 67=""></buffer></buffer></buffer></code>

    這是應為額外的數(shù)據(jù)數(shù)據(jù)留在流的內(nèi)部緩沖區(qū)里了,而我們需要通知流我們要讀取更多的數(shù)據(jù).read(0)可以達到這個目的。

    <code class="hljs javascript">process.stdin.on('readable', function () {

    var buf = process.stdin.read(3);

    console.dir(buf);

    process.stdin.read(0);

    });</code>

    這次運行結(jié)果如下:

    <code class="hljs xml">$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume2.js 

    <buffer 0a="" 64="" 65="">

    <buffer 0a="" 68="" 69=""></buffer></buffer></code>

    我們可以使用 .unshift() 將數(shù)據(jù)重新押回流數(shù)據(jù)隊列的頭部,這樣可以接續(xù)讀取押回的數(shù)據(jù)。如下面的代碼,會按行輸出標準輸入的內(nèi)容:

    <code class="hljs javascript">var offset = 0;

    process.stdin.on('readable', function () {

    var buf = process.stdin.read();

    if (!buf) return;

    for (; offset < buf.length; offset++) {

    if (buf[offset] === 0x0a) {

    console.dir(buf.slice(0, offset).toString());

    buf = buf.slice(offset + 1);

    offset = 0;

    process.stdin.unshift(buf);

    return;

    }

    }

    process.stdin.unshift(buf);

    });

    $ tail -n +50000 /usr/share/dict/american-english | head -n10 | node lines.js 

    'hearties'

    'heartiest'

    'heartily'

    'heartiness'

    'heartiness\'s'

    'heartland'

    'heartland\'s'

    'heartlands'

    'heartless'

    'heartlessly'</code>

    當然,有很多模塊可以實現(xiàn)這個功能,如:split 。

    3-3、writable streams

    writable streams只可以作為.pipe()函數(shù)的目的參數(shù)。如下代碼:

    <code class="hljs perl">src.pipe( writableStream );</code>

    1>創(chuàng)建 writable stream

    重寫 ._write(chunk, enc, next) 方法就可以接受一個readable stream的數(shù)據(jù)。

    <code class="hljs php">var Writable = require('stream').Writable;

    var ws = Writable();

    ws._write = function (chunk, enc, next) {

    console.dir(chunk);

    next();

    };

    process.stdin.pipe(ws);

    $ (echo beep; sleep 1; echo boop) | node write0.js 

    <buffer 0a="" 62="" 65="" 70="">

    <buffer 0a="" 62="" 6f="" 70=""></buffer></buffer></code>

    第一個參數(shù)chunk是數(shù)據(jù)輸入者寫入的數(shù)據(jù)。第二個參數(shù)end是數(shù)據(jù)的編碼格式。第三個參數(shù)next(err)通過回調(diào)函數(shù)通知數(shù)據(jù)寫入者可以寫入更多的時間。如果readable stream寫入的是字符串,那么字符串會默認轉(zhuǎn)換為Buffer,如果在創(chuàng)建流的時候設置Writable({ decodeStrings: false })參數(shù),那么不會做轉(zhuǎn)換。如果readable stream寫入的數(shù)據(jù)時對象,那么需要這樣創(chuàng)建writable stream

    <code class="hljs css">Writable({ objectMode: true })</code>

    2>寫數(shù)據(jù)到 writable stream

    調(diào)用writable stream的.write(data)方法即可完成數(shù)據(jù)寫入。

    <code class="hljs vala">process.stdout.write('beep boop\n');</code>

    調(diào)用.end()方法通知writable stream 數(shù)據(jù)已經(jīng)寫入完成。

    <code class="hljs javascript">var fs = require('fs');

    var ws = fs.createWriteStream('message.txt');

    ws.write('beep ');

    setTimeout(function () {

    ws.end('boop\n');

    }, 1000);

    $ node writing1.js 

    $ cat message.txt

    beep boop</code>

    如果需要設置writable stream的緩沖區(qū)的大小,那么在創(chuàng)建流的時候,需要設置opts.highWaterMark,這樣如果緩沖區(qū)里的數(shù)據(jù)超過opts.highWaterMark,.write(data)方法會返回false。當緩沖區(qū)可寫的時候,writable stream會觸發(fā)'drain' 事件。

    3-4、classic streams

    Classic streams比較老的接口了,最早出現(xiàn)在node 0.4版本中,但是了解一下其運行原理還是十分有好

    處的。當一個流被注冊了"data" 事件的回到函數(shù),那么流就會工作在老版本模式下,即會使用老的API。

    1>classic readable streams

    Classic readable streams事件就是一個事件觸發(fā)器,如果Classic readable streams有數(shù)據(jù)可讀取,那么其觸發(fā) "data" 事件,等到數(shù)據(jù)讀取完畢時,會觸發(fā)"end" 事件。.pipe() 方法通過檢查stream.readable 的值確定流是否有數(shù)據(jù)可讀。下面是一個使用Classic readable streams打印A-J字母的例子:

    <code class="hljs javascript">var Stream = require('stream');

    var stream = new Stream;

    stream.readable = true;

    var c = 64;

    var iv = setInterval(function () {

    if (++c >= 75) {

    clearInterval(iv);

    stream.emit('end');

    }

    else stream.emit('data', String.fromCharCode(c));

    }, 100);

    stream.pipe(process.stdout);

    $ node classic0.js

    ABCDEFGHIJ</code>

    如果要從classic readable stream中讀取數(shù)據(jù),注冊"data" 和"end"兩個事件的回調(diào)函數(shù)即可,代碼如下:

    <code class="hljs php">process.stdin.on('data', function (buf) {

    console.log(buf);

    });

    process.stdin.on('end', function () {

    console.log('__END__');

    });

    $ (echo beep; sleep 1; echo boop) | node classic1.js 

    <buffer 0a="" 62="" 65="" 70="">

    <buffer 0a="" 62="" 6f="" 70="">

    __END__</buffer></buffer></code>

    需要注意的是如果你使用這種方式讀取數(shù)據(jù),那么會失去使用新接口帶來的好處。比如你在往一個 延遲非常大的流寫數(shù)據(jù)時,需要注意讀取數(shù)據(jù)和寫數(shù)據(jù)的平衡問題,否則會導致大量數(shù)據(jù)緩存在內(nèi)存中,導致浪費大量內(nèi)存。一般這時候強烈建議使用流的.pipe()方法,這樣就不用自己監(jiān)聽”data” 和”end”事件了,也不用擔心讀寫不平衡的問題了。當然你也可以用 through代替自己監(jiān)聽”data” 和”end” 事件,如下面的代碼:

    <code class="hljs php">var through = require('through');

    process.stdin.pipe(through(write, end));

    function write (buf) {

    console.log(buf);

    }

    function end () {

    console.log('__END__');

    }

    $ (echo beep; sleep 1; echo boop) | node through.js 

    <buffer 0a="" 62="" 65="" 70="">

    <buffer 0a="" 62="" 6f="" 70="">

    __END__</buffer></buffer></code>

    或者也可以使用concat-stream來緩存整個流的內(nèi)容:

    <code class="hljs oxygene">var concat = require('concat-stream');

    process.stdin.pipe(concat(function (body) {

    console.log(JSON.parse(body));

    }));

    $ echo '{"beep":"boop"}' | node concat.js 

    { beep: 'boop' }</code>

    當然如果你非要自己監(jiān)聽"data" 和"end"事件,那么你可以在寫數(shù)據(jù)的流不可寫的時候使用.pause()方法暫停Classic readable streams繼續(xù)觸發(fā)”data” 事件。等到寫數(shù)據(jù)的流可寫的時候再使用.resume() 方法通知流繼續(xù)觸發(fā)"data" 事件繼續(xù)讀取

    數(shù)據(jù)。

    2>classic writable streams

    Classic writable streams 非常簡單。只有 .write(buf), .end(buf)和.destroy()三個方法。.end(buf) 方法的buf參數(shù)是可選的,如果選擇該參數(shù),相當于stream.write(buf); stream.end() 這樣的操作,需要注意的是當流的緩沖區(qū)寫滿即流不可寫時.write(buf)方法會返回false,如果流再次可寫時,流會觸發(fā)drain事件。

    4、transform

    transform是一個對讀入數(shù)據(jù)過濾然輸出的流。

    5、duplex

    duplex stream是一個可讀也可寫的雙向流,如下面的a就是一個duplex stream:

    <code class="hljs livecodeserver">a.pipe(b).pipe(a)</code>

    以上內(nèi)容是小編給大家介紹的Nodejs Stream 數(shù)據(jù)流使用手冊,希望對大家有所幫助!

    更多信息請查看網(wǎng)絡編程
    易賢網(wǎng)手機網(wǎng)站地址:Nodejs Stream 數(shù)據(jù)流使用手冊

    2026上岸·考公考編培訓報班

    • 報班類型
    • 姓名
    • 手機號
    • 驗證碼
    關于我們 | 聯(lián)系我們 | 人才招聘 | 網(wǎng)站聲明 | 網(wǎng)站幫助 | 非正式的簡要咨詢 | 簡要咨詢須知 | 新媒體/短視頻平臺 | 手機站點 | 投訴建議
    工業(yè)和信息化部備案號:滇ICP備2023014141號-1 云南省教育廳備案號:云教ICP備0901021 滇公網(wǎng)安備53010202001879號 人力資源服務許可證:(云)人服證字(2023)第0102001523號
    聯(lián)系電話:0871-65099533/13759567129 獲取招聘考試信息及咨詢關注公眾號:hfpxwx
    咨詢QQ:1093837350(9:00—18:00)版權所有:易賢網(wǎng)