PK vFAVzq mb b lib/OutputBuffer.phpnu W+A deferred = new Deferred; } public function write(string $data): Promise { if ($this->closed) { throw new ClosedException("The stream has already been closed."); } $this->contents .= $data; return new Success(\strlen($data)); } public function end(string $finalData = ""): Promise { if ($this->closed) { throw new ClosedException("The stream has already been closed."); } $this->contents .= $finalData; $this->closed = true; $this->deferred->resolve($this->contents); $this->contents = ""; return new Success(\strlen($finalData)); } public function onResolve(callable $onResolved) { $this->deferred->promise()->onResolve($onResolved); } } PK vFAVD D lib/ZlibOutputStream.phpnu W+A destination = $destination; $this->encoding = $encoding; $this->options = $options; $this->resource = @\deflate_init($encoding, $options); if ($this->resource === false) { throw new StreamException("Failed initializing deflate context"); } } /** @inheritdoc */ public function write(string $data): Promise { if ($this->resource === null) { throw new ClosedException("The stream has already been closed"); } $compressed = \deflate_add($this->resource, $data, \ZLIB_SYNC_FLUSH); if ($compressed === false) { throw new StreamException("Failed adding data to deflate context"); } $promise = $this->destination->write($compressed); $promise->onResolve(function ($error) { if ($error) { $this->close(); } }); return $promise; } /** @inheritdoc */ public function end(string $finalData = ""): Promise { if ($this->resource === null) { throw new ClosedException("The stream has already been closed"); } $compressed = \deflate_add($this->resource, $finalData, \ZLIB_FINISH); if ($compressed === false) { throw new StreamException("Failed adding data to deflate context"); } $promise = $this->destination->end($compressed); $promise->onResolve(function () { $this->close(); }); return $promise; } /** @internal */ private function close() { $this->resource = null; $this->destination = null; } /** * Gets the used compression encoding. * * @return int Encoding specified on construction time. */ public function getEncoding(): int { return $this->encoding; } /** * Gets the used compression options. * * @return array Options array passed on construction time. */ public function getOptions(): array { return $this->options; } } PK vFAVh,{Lz z lib/Message.phpnu W+A read()) !== null) { * // Immediately use $chunk, reducing memory consumption since the entire message is never buffered. * } */ class Message implements InputStream, Promise { /** @var InputStream */ private $source; /** @var string */ private $buffer = ""; /** @var \Amp\Deferred|null */ private $pendingRead; /** @var \Amp\Coroutine */ private $coroutine; /** @var bool True if onResolve() has been called. */ private $buffering = false; /** @var \Amp\Deferred|null */ private $backpressure; /** @var bool True if the iterator has completed. */ private $complete = false; /** @var \Throwable Used to fail future reads on failure. */ private $error; /** * @param InputStream $source An iterator that only emits strings. */ public function __construct(InputStream $source) { $this->source = $source; } private function consume(): \Generator { while (($chunk = yield $this->source->read()) !== null) { $buffer = $this->buffer .= $chunk; if ($buffer === "") { continue; // Do not succeed reads with empty string. } elseif ($this->pendingRead) { $deferred = $this->pendingRead; $this->pendingRead = null; $this->buffer = ""; $deferred->resolve($buffer); $buffer = ""; // Destroy last emitted chunk to free memory. } elseif (!$this->buffering) { $buffer = ""; // Destroy last emitted chunk to free memory. $this->backpressure = new Deferred; yield $this->backpressure->promise(); } } $this->complete = true; if ($this->pendingRead) { $deferred = $this->pendingRead; $this->pendingRead = null; $deferred->resolve($this->buffer !== "" ? $this->buffer : null); $this->buffer = ""; } return $this->buffer; } /** @inheritdoc */ final public function read(): Promise { if ($this->pendingRead) { throw new PendingReadError; } if ($this->coroutine === null) { $this->coroutine = new Coroutine($this->consume()); $this->coroutine->onResolve(function ($error) { if ($error) { $this->error = $error; } if ($this->pendingRead) { $deferred = $this->pendingRead; $this->pendingRead = null; $deferred->fail($error); } }); } if ($this->error) { return new Failure($this->error); } if ($this->buffer !== "") { $buffer = $this->buffer; $this->buffer = ""; if ($this->backpressure) { $backpressure = $this->backpressure; $this->backpressure = null; $backpressure->resolve(); } return new Success($buffer); } if ($this->complete) { return new Success; } $this->pendingRead = new Deferred; return $this->pendingRead->promise(); } /** @inheritdoc */ final public function onResolve(callable $onResolved) { $this->buffering = true; if ($this->coroutine === null) { $this->coroutine = new Coroutine($this->consume()); } if ($this->backpressure) { $backpressure = $this->backpressure; $this->backpressure = null; $backpressure->resolve(); } $this->coroutine->onResolve($onResolved); } /** * Exposes the source input stream. * * This might be required to resolve a promise with an InputStream, because promises in Amp can't be resolved with * other promises. * * @return InputStream */ final public function getInputStream(): InputStream { return $this->source; } } PK vFAV5VO O lib/StreamException.phpnu W+A iterator = $iterator; } /** @inheritdoc */ public function read(): Promise { if ($this->exception) { return new Failure($this->exception); } if ($this->pending) { throw new PendingReadError; } $this->pending = true; $deferred = new Deferred; $this->iterator->advance()->onResolve(function ($error, $hasNextElement) use ($deferred) { $this->pending = false; if ($error) { $this->exception = $error; $deferred->fail($error); } elseif ($hasNextElement) { $chunk = $this->iterator->getCurrent(); if (!\is_string($chunk)) { $this->exception = new StreamException(\sprintf( "Unexpected iterator value of type '%s', expected string", \is_object($chunk) ? \get_class($chunk) : \gettype($chunk) )); $deferred->fail($this->exception); return; } $deferred->resolve($chunk); } else { $deferred->resolve(); } }); return $deferred->promise(); } } PK vFAVzX X lib/ResourceInputStream.phpnu W+A resource = $stream; $deferred = &$this->deferred; $readable = &$this->readable; $this->watcher = Loop::onReadable($this->resource, static function ($watcher, $stream) use (&$deferred, &$readable, $chunkSize, $useFread) { if ($useFread) { $data = @\fread($stream, $chunkSize); } else { $data = @\stream_get_contents($stream, $chunkSize); } \assert($data !== false, "Trying to read from a previously fclose()'d resource. Do NOT manually fclose() resources the loop still has a reference to."); // Error suppression, because pthreads does crazy things with resources, // which might be closed during two operations. // See https://github.com/amphp/byte-stream/issues/32 if ($data === '' && @\feof($stream)) { $readable = false; Loop::cancel($watcher); $data = null; // Stream closed, resolve read with null. } $temp = $deferred; $deferred = null; $temp->resolve($data); if ($deferred === null) { // Only disable watcher if no further read was requested. Loop::disable($watcher); } }); Loop::disable($this->watcher); } /** @inheritdoc */ public function read(): Promise { if ($this->deferred !== null) { throw new PendingReadError; } if (!$this->readable) { return new Success; // Resolve with null on closed stream. } $this->deferred = new Deferred; Loop::enable($this->watcher); return $this->deferred->promise(); } /** * Closes the stream forcefully. Multiple `close()` calls are ignored. * * @return void */ public function close() { if ($this->resource) { // Error suppression, as resource might already be closed $meta = @\stream_get_meta_data($this->resource); if ($meta && \strpos($meta["mode"], "+") !== false) { @\stream_socket_shutdown($this->resource, \STREAM_SHUT_RD); } else { @\fclose($this->resource); } $this->resource = null; } $this->free(); } /** * Nulls reference to resource, marks stream unreadable, and succeeds any pending read with null. */ private function free() { $this->readable = false; if ($this->deferred !== null) { $deferred = $this->deferred; $this->deferred = null; $deferred->resolve(null); } Loop::cancel($this->watcher); } /** * @return resource|null The stream resource or null if the stream has closed. */ public function getResource() { return $this->resource; } /** * References the read watcher, so the loop keeps running in case there's an active read. * * @see Loop::reference() */ public function reference() { if (!$this->resource) { throw new \Error("Resource has already been freed"); } Loop::reference($this->watcher); } /** * Unreferences the read watcher, so the loop doesn't keep running even if there are active reads. * * @see Loop::unreference() */ public function unreference() { if (!$this->resource) { throw new \Error("Resource has already been freed"); } Loop::unreference($this->watcher); } public function __destruct() { if ($this->resource !== null) { $this->free(); } } } PK vFAVQ7Z Z lib/ClosedException.phpnu W+A contents = $contents; } /** * Reads data from the stream. * * @return Promise Resolves with the full contents or `null` if the stream has closed / already been consumed. */ public function read(): Promise { if ($this->contents === null) { return new Success; } $promise = new Success($this->contents); $this->contents = null; return $promise; } } PK vFAVDeO O lib/ZlibInputStream.phpnu W+A source = $source; $this->encoding = $encoding; $this->options = $options; $this->resource = @\inflate_init($encoding, $options); if ($this->resource === false) { throw new StreamException("Failed initializing deflate context"); } } /** @inheritdoc */ public function read(): Promise { return call(function () { if ($this->resource === null) { return null; } $data = yield $this->source->read(); // Needs a double guard, as stream might have been closed while reading if ($this->resource === null) { return null; } if ($data === null) { $decompressed = @\inflate_add($this->resource, "", \ZLIB_FINISH); if ($decompressed === false) { throw new StreamException("Failed adding data to deflate context"); } $this->close(); return $decompressed; } $decompressed = @\inflate_add($this->resource, $data, \ZLIB_SYNC_FLUSH); if ($decompressed === false) { throw new StreamException("Failed adding data to deflate context"); } return $decompressed; }); } /** @internal */ private function close() { $this->resource = null; $this->source = null; } /** * Gets the used compression encoding. * * @return int Encoding specified on construction time. */ public function getEncoding(): int { return $this->encoding; } /** * Gets the used compression options. * * @return array Options array passed on construction time. */ public function getOptions(): array { return $this->options; } } PK vFAVe lib/PendingReadError.phpnu W+A read()) !== null) { * $buffer .= $chunk; * } * * return $buffer; * }); * } * ``` */ interface InputStream { /** * Reads data from the stream. * * @return Promise Resolves with a string when new data is available or `null` if the stream has closed. * * @throws PendingReadError Thrown if another read operation is still pending. */ public function read(): Promise; } PK vFAV,i6 6 lib/ResourceOutputStream.phpnu W+A resource = $stream; $this->chunkSize = $chunkSize; $writes = $this->writes = new \SplQueue; $writable = &$this->writable; $resource = &$this->resource; $this->watcher = Loop::onWritable($stream, static function ($watcher, $stream) use ($writes, $chunkSize, &$writable, &$resource) { $firstWrite = true; try { while (!$writes->isEmpty()) { /** @var \Amp\Deferred $deferred */ list($data, $previous, $deferred) = $writes->shift(); $length = \strlen($data); if ($length === 0) { $deferred->resolve(0); continue; } // Error reporting suppressed since fwrite() emits E_WARNING if the pipe is broken or the buffer is full. // Use conditional, because PHP doesn't like getting null passed if ($chunkSize) { $written = @\fwrite($stream, $data, $chunkSize); } else { $written = @\fwrite($stream, $data); } \assert($written !== false, "Trying to write on a previously fclose()'d resource. Do NOT manually fclose() resources the loop still has a reference to."); if ($written === 0) { // fwrite will also return 0 if the buffer is already full. Let's test it on the next call to this writability callback, this guarantees that the buffer isn't full. if (!$firstWrite) { $writes->unshift([$data, $previous, $deferred]); return; } $resource = null; $writable = false; $message = "Failed to write to socket"; if ($error = \error_get_last()) { $message .= \sprintf(" Errno: %d; %s", $error["type"], $error["message"]); } $exception = new StreamException($message); $deferred->fail($exception); while (!$writes->isEmpty()) { list(, , $deferred) = $writes->shift(); $deferred->fail($exception); } Loop::cancel($watcher); return; } if ($length > $written) { $data = \substr($data, $written); $writes->unshift([$data, $written + $previous, $deferred]); return; } $deferred->resolve($written + $previous); $firstWrite = false; } } finally { if ($writes->isEmpty()) { Loop::disable($watcher); } } }); Loop::disable($this->watcher); } /** * Writes data to the stream. * * @param string $data Bytes to write. * * @return Promise Succeeds once the data has been successfully written to the stream. * * @throws ClosedException If the stream has already been closed. */ public function write(string $data): Promise { return $this->send($data, false); } /** * Closes the stream after all pending writes have been completed. Optionally writes a final data chunk before. * * @param string $finalData Bytes to write. * * @return Promise Succeeds once the data has been successfully written to the stream. * * @throws ClosedException If the stream has already been closed. */ public function end(string $finalData = ""): Promise { return $this->send($finalData, true); } private function send(string $data, bool $end = false): Promise { if (!$this->writable) { return new Failure(new StreamException("The stream is not writable")); } $length = \strlen($data); $written = 0; if ($end) { $this->writable = false; } if ($this->writes->isEmpty()) { if ($length === 0) { if ($end) { $this->close(); } return new Success(0); } // Error reporting suppressed since fwrite() emits E_WARNING if the pipe is broken or the buffer is full. // Use conditional, because PHP doesn't like getting null passed. if ($this->chunkSize) { $written = @\fwrite($this->resource, $data, $this->chunkSize); } else { $written = @\fwrite($this->resource, $data); } \assert($written !== false, "Trying to write on a previously fclose()'d resource. Do NOT manually fclose() resources the loop still has a reference to."); if ($length === $written) { if ($end) { $this->close(); } return new Success($written); } $data = \substr($data, $written); } $deferred = new Deferred; $this->writes->push([$data, $written, $deferred]); Loop::enable($this->watcher); $promise = $deferred->promise(); if ($end) { $promise->onResolve([$this, "close"]); } return $promise; } /** * Closes the stream forcefully. Multiple `close()` calls are ignored. * * @return void */ public function close() { if ($this->resource) { // Error suppression, as resource might already be closed $meta = @\stream_get_meta_data($this->resource); if ($meta && \strpos($meta["mode"], "+") !== false) { @\stream_socket_shutdown($this->resource, \STREAM_SHUT_WR); } else { @\fclose($this->resource); } } $this->free(); } /** * Nulls reference to resource, marks stream unwritable, and fails any pending write. */ private function free() { $this->resource = null; $this->writable = false; if (!$this->writes->isEmpty()) { $exception = new ClosedException("The socket was closed before writing completed"); do { /** @var \Amp\Deferred $deferred */ list(, , $deferred) = $this->writes->shift(); $deferred->fail($exception); } while (!$this->writes->isEmpty()); } Loop::cancel($this->watcher); } /** * @return resource|null Stream resource or null if end() has been called or the stream closed. */ public function getResource() { return $this->resource; } public function __destruct() { if ($this->resource !== null) { $this->free(); } } } PK vFAVz lib/functions.phpnu W+A read()) !== null) { $written += \strlen($chunk); $writePromise = $destination->write($chunk); $chunk = null; // free memory yield $writePromise; } return $written; }); } PK vFAVE{ .editorconfignu W+A root = true [*] end_of_line = lf insert_final_newline = true trim_trailing_whitespace = true indent_style = space charset = utf-8 PK vFAV2F6 6 LICENSEnu W+A The MIT License (MIT) Copyright (c) 2016-2017 amphp Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. PK vFAVs s .php_cs.distnu W+A setRiskyAllowed(true) ->setRules([ "@PSR1" => true, "@PSR2" => true, "braces" => [ "allow_single_line_closure" => true, "position_after_functions_and_oop_constructs" => "same", ], "array_syntax" => ["syntax" => "short"], "cast_spaces" => true, "combine_consecutive_unsets" => true, "function_to_constant" => true, "no_multiline_whitespace_before_semicolons" => true, "no_unused_imports" => true, "no_useless_else" => true, "no_useless_return" => true, "no_whitespace_before_comma_in_array" => true, "no_whitespace_in_blank_line" => true, "non_printable_character" => true, "normalize_index_brace" => true, "ordered_imports" => true, "php_unit_construct" => true, "php_unit_dedicate_assert" => true, "php_unit_fqcn_annotation" => true, "phpdoc_summary" => true, "phpdoc_types" => true, "psr4" => true, "return_type_declaration" => ["space_before" => "none"], "short_scalar_cast" => true, "single_blank_line_before_namespace" => true, ]) ->setFinder( PhpCsFixer\Finder::create() ->in(__DIR__ . "/examples") ->in(__DIR__ . "/lib") ->in(__DIR__ . "/test") ); PK vFAVvT+R` ` .gitmodulesnu W+A [submodule "docs/.shared"] path = docs/.shared url = https://github.com/amphp/amphp.github.io PK vFAV-$n composer.jsonnu W+A { "name": "amphp/byte-stream", "homepage": "http://amphp.org/byte-stream", "description": "A stream abstraction to make working with non-blocking I/O simple.", "support": { "issues": "https://github.com/amphp/byte-stream/issues", "irc": "irc://irc.freenode.org/amphp" }, "keywords": [ "stream", "async", "non-blocking", "amp", "amphp", "io" ], "license": "MIT", "authors": [ { "name": "Aaron Piotrowski", "email": "aaron@trowski.com" }, { "name": "Niklas Keller", "email": "me@kelunik.com" } ], "require": { "amphp/amp": "^2" }, "require-dev": { "amphp/phpunit-util": "^1", "phpunit/phpunit": "^6", "friendsofphp/php-cs-fixer": "^2.3" }, "autoload": { "psr-4": { "Amp\\ByteStream\\": "lib" }, "files": [ "lib/functions.php" ] }, "autoload-dev": { "psr-4": { "Amp\\ByteStream\\Test\\": "test" } }, "config": { "platform": { "php": "7.0.0" } } } PK vFAVB/X`O O Makefilenu W+A PHP_BIN := php COMPOSER_BIN := composer COVERAGE = coverage SRCS = lib test find_php_files = $(shell find $(1) -type f -name "*.php") src = $(foreach d,$(SRCS),$(call find_php_files,$(d))) .PHONY: test test: setup phpunit code-style .PHONY: clean clean: clean-coverage clean-vendor .PHONY: clean-coverage clean-coverage: test ! -e coverage || rm -r coverage .PHONY: clean-vendor clean-vendor: test ! -e vendor || rm -r vendor .PHONY: setup setup: vendor/autoload.php .PHONY: deps-update deps-update: $(COMPOSER_BIN) update .PHONY: phpunit phpunit: setup $(PHP_BIN) vendor/bin/phpunit .PHONY: code-style code-style: setup PHP_CS_FIXER_IGNORE_ENV=1 $(PHP_BIN) vendor/bin/php-cs-fixer --diff -v fix composer.lock: composer.json $(COMPOSER_BIN) install touch $@ vendor/autoload.php: composer.lock $(COMPOSER_BIN) install touch $@ PK vFAV˱k examples/gzip-decompress.phpnu W+A read()) !== null) { yield $stdout->write($chunk); } }); PK vFAV1. ! examples/benchmark-throughput.phpnu W+A write('NOTICE: The "xdebug" extension is loaded, this has a major impact on performance.' . PHP_EOL); } try { if (!@\assert(false)) { $stderr->write("NOTICE: Assertions are enabled, this has a major impact on performance." . PHP_EOL); } } catch (AssertionError $exception) { $stderr->write("NOTICE: Assertions are enabled, this has a major impact on performance." . PHP_EOL); } $stderr->write('piping from ' . $if . ' to ' . $of . ' (for max ' . $t . ' second(s)) ...'. PHP_EOL); Loop::delay($t * 1000, [$in, "close"]); Loop::run(function () use ($stderr, $in, $out) { $start = microtime(true); while (($chunk = yield $in->read()) !== null) { yield $out->write($chunk); } $t = microtime(true) - $start; $bytes = ftell($out->getResource()); $stderr->write('read ' . $bytes . ' byte(s) in ' . round($t, 3) . ' second(s) => ' . round($bytes / 1024 / 1024 / $t, 1) . ' MiB/s' . PHP_EOL); $stderr->write('peak memory usage of ' . round(memory_get_peak_usage(true) / 1024 / 1024, 1) . ' MiB' . PHP_EOL); }); PK vFAV examples/gzip-compress.phpnu W+A read()) !== null) { yield $gzout->write($chunk); } }); PK vFAVtA A README.mdnu W+A [![byte-stream](https://raw.githubusercontent.com/amphp/logo/master/repos/byte-stream.png?v=07-09-2017)](https://amphp.org/byte-stream/)
`amphp/byte-stream` is a stream abstraction to make working with non-blocking I/O simple. ## Installation This package can be installed as a [Composer](https://getcomposer.org/) dependency. ```bash composer require amphp/byte-stream ``` ## Requirements - PHP 7.0+ ## Documentation Documentation is bundled within this repository in the [`./docs`](./docs) directory. ## Versioning `amphp/byte-stream` follows the [semver](http://semver.org/) semantic versioning specification like all other `amphp` packages. ## Security If you discover any security related issues, please email [`me@kelunik.com`](mailto:me@kelunik.com) instead of using the issue tracker. ## License The MIT License (MIT). Please see [`LICENSE`](./LICENSE) for more information. PK vFAVA_y y docs/Gemfilenu W+A source "https://rubygems.org" gem "github-pages" gem "kramdown" gem "jekyll-github-metadata" gem "jekyll-relative-links" PK vFAV+# docs/in-memory-stream.mdnu W+A --- title: InMemoryStream permalink: /in-memory-stream --- An `InMemoryStream` allows creating an `InputStream` from a single known string chunk. This is helpful if the complete stream contents are already known. ```php $inputStream = new InMemoryStream("foobar"); ``` It also allows creating a stream without any chunks by passing `null` as chunk. ```php $inputStream = new InMemoryStream; // The stream ends immediately assert(null === yield $inputStream->read()); ``` PK vFAV docs/Gemfile.locknu W+A GEM remote: https://rubygems.org/ specs: activesupport (4.2.8) i18n (~> 0.7) minitest (~> 5.1) thread_safe (~> 0.3, >= 0.3.4) tzinfo (~> 1.1) addressable (2.5.1) public_suffix (~> 2.0, >= 2.0.2) coffee-script (2.4.1) coffee-script-source execjs coffee-script-source (1.12.2) colorator (1.1.0) ethon (0.10.1) ffi (>= 1.3.0) execjs (2.7.0) faraday (0.12.1) multipart-post (>= 1.2, < 3) ffi (1.9.18) forwardable-extended (2.6.0) gemoji (3.0.0) github-pages (138) activesupport (= 4.2.8) github-pages-health-check (= 1.3.3) jekyll (= 3.4.3) jekyll-avatar (= 0.4.2) jekyll-coffeescript (= 1.0.1) jekyll-default-layout (= 0.1.4) jekyll-feed (= 0.9.2) jekyll-gist (= 1.4.0) jekyll-github-metadata (= 2.3.1) jekyll-mentions (= 1.2.0) jekyll-optional-front-matter (= 0.1.2) jekyll-paginate (= 1.1.0) jekyll-readme-index (= 0.1.0) jekyll-redirect-from (= 0.12.1) jekyll-relative-links (= 0.4.0) jekyll-sass-converter (= 1.5.0) jekyll-seo-tag (= 2.2.3) jekyll-sitemap (= 1.0.0) jekyll-swiss (= 0.4.0) jekyll-theme-architect (= 0.0.4) jekyll-theme-cayman (= 0.0.4) jekyll-theme-dinky (= 0.0.4) jekyll-theme-hacker (= 0.0.4) jekyll-theme-leap-day (= 0.0.4) jekyll-theme-merlot (= 0.0.4) jekyll-theme-midnight (= 0.0.4) jekyll-theme-minimal (= 0.0.4) jekyll-theme-modernist (= 0.0.4) jekyll-theme-primer (= 0.1.8) jekyll-theme-slate (= 0.0.4) jekyll-theme-tactile (= 0.0.4) jekyll-theme-time-machine (= 0.0.4) jekyll-titles-from-headings (= 0.1.5) jemoji (= 0.8.0) kramdown (= 1.13.2) liquid (= 3.0.6) listen (= 3.0.6) mercenary (~> 0.3) minima (= 2.1.1) rouge (= 1.11.1) terminal-table (~> 1.4) github-pages-health-check (1.3.3) addressable (~> 2.3) net-dns (~> 0.8) octokit (~> 4.0) public_suffix (~> 2.0) typhoeus (~> 0.7) html-pipeline (2.6.0) activesupport (>= 2) nokogiri (>= 1.4) i18n (0.8.1) jekyll (3.4.3) addressable (~> 2.4) colorator (~> 1.0) jekyll-sass-converter (~> 1.0) jekyll-watch (~> 1.1) kramdown (~> 1.3) liquid (~> 3.0) mercenary (~> 0.3.3) pathutil (~> 0.9) rouge (~> 1.7) safe_yaml (~> 1.0) jekyll-avatar (0.4.2) jekyll (~> 3.0) jekyll-coffeescript (1.0.1) coffee-script (~> 2.2) jekyll-default-layout (0.1.4) jekyll (~> 3.0) jekyll-feed (0.9.2) jekyll (~> 3.3) jekyll-gist (1.4.0) octokit (~> 4.2) jekyll-github-metadata (2.3.1) jekyll (~> 3.1) octokit (~> 4.0, != 4.4.0) jekyll-mentions (1.2.0) activesupport (~> 4.0) html-pipeline (~> 2.3) jekyll (~> 3.0) jekyll-optional-front-matter (0.1.2) jekyll (~> 3.0) jekyll-paginate (1.1.0) jekyll-readme-index (0.1.0) jekyll (~> 3.0) jekyll-redirect-from (0.12.1) jekyll (~> 3.3) jekyll-relative-links (0.4.0) jekyll (~> 3.3) jekyll-sass-converter (1.5.0) sass (~> 3.4) jekyll-seo-tag (2.2.3) jekyll (~> 3.3) jekyll-sitemap (1.0.0) jekyll (~> 3.3) jekyll-swiss (0.4.0) jekyll-theme-architect (0.0.4) jekyll (~> 3.3) jekyll-theme-cayman (0.0.4) jekyll (~> 3.3) jekyll-theme-dinky (0.0.4) jekyll (~> 3.3) jekyll-theme-hacker (0.0.4) jekyll (~> 3.3) jekyll-theme-leap-day (0.0.4) jekyll (~> 3.3) jekyll-theme-merlot (0.0.4) jekyll (~> 3.3) jekyll-theme-midnight (0.0.4) jekyll (~> 3.3) jekyll-theme-minimal (0.0.4) jekyll (~> 3.3) jekyll-theme-modernist (0.0.4) jekyll (~> 3.3) jekyll-theme-primer (0.1.8) jekyll (~> 3.3) jekyll-theme-slate (0.0.4) jekyll (~> 3.3) jekyll-theme-tactile (0.0.4) jekyll (~> 3.3) jekyll-theme-time-machine (0.0.4) jekyll (~> 3.3) jekyll-titles-from-headings (0.1.5) jekyll (~> 3.3) jekyll-watch (1.5.0) listen (~> 3.0, < 3.1) jemoji (0.8.0) activesupport (~> 4.0) gemoji (~> 3.0) html-pipeline (~> 2.2) jekyll (>= 3.0) kramdown (1.13.2) liquid (3.0.6) listen (3.0.6) rb-fsevent (>= 0.9.3) rb-inotify (>= 0.9.7) mercenary (0.3.6) mini_portile2 (2.1.0) minima (2.1.1) jekyll (~> 3.3) minitest (5.10.2) multipart-post (2.0.0) net-dns (0.8.0) nokogiri (1.7.2) mini_portile2 (~> 2.1.0) octokit (4.7.0) sawyer (~> 0.8.0, >= 0.5.3) pathutil (0.14.0) forwardable-extended (~> 2.6) public_suffix (2.0.5) rb-fsevent (0.9.8) rb-inotify (0.9.8) ffi (>= 0.5.0) rouge (1.11.1) safe_yaml (1.0.4) sass (3.4.24) sawyer (0.8.1) addressable (>= 2.3.5, < 2.6) faraday (~> 0.8, < 1.0) terminal-table (1.8.0) unicode-display_width (~> 1.1, >= 1.1.1) thread_safe (0.3.6) typhoeus (0.8.0) ethon (>= 0.8.0) tzinfo (1.2.3) thread_safe (~> 0.1) unicode-display_width (1.2.1) PLATFORMS ruby DEPENDENCIES github-pages jekyll-github-metadata jekyll-relative-links kramdown BUNDLED WITH 1.15.0 PK vFAV2 docs/resource-streams.mdnu W+A --- title: Resource Streams permalink: /resource-streams --- This package abstracts PHP's stream resources with `ResourceInputStream` and `ResourceOutputStream`. They automatically set the passed resource to non-blocking and allow reading and writing like any other `InputStream` / `OutputStream`. They also handle backpressure automatically by disabling the read watcher in case there's no read request and only activate a write watcher if the underlying write buffer is already full, which makes them very efficient. PK vFAVpʌ docs/index.mdnu W+A --- title: Overview permalink: / --- Streams are an abstraction over ordered sequences of bytes. This package provides the fundamental interfaces `InputStream` and `OutputStream`. ## InputStream `InputStream` offers a single method: `read()`. It returns a promise that gets either resolved with a `string` or `null`. `null` indicates that the stream has ended. ### Example This example shows a simple `InputStream` consumption that buffers the complete stream contents inside a coroutine. ```php $inputStream = ...; $buffer = ""; while (($chunk = yield $inputStream->read()) !== null) { $buffer .= $chunk; } // do something with $buffer ``` {:.note} > While buffering a stream that way is relatively straightforward, you might want to use `yield new Message($inputStream)` to buffer a complete `InputStream`, making it even easier. ### Implementations This package offers some basic implementations, other libraries might provide even more implementations, such as [`amphp/socket`](https://github.com/amphp/socket). * [`InMemoryStream`](./in-memory-stream.md) * [`IteratorStream`](./iterator-stream.md) * [`Message`](./message.md) * [`ResourceInputStream`](./resource-streams.md) * [`ZlibInputStream`](./compression-streams.md) ## OutputStream `OutputStream` offers two methods: `write()` and `end()`. ### `write()` `write()` writes the given string to the stream. The returned `Promise` might be used to wait for completion. Waiting for completion allows writing only as fast as the underlying stream can write and potentially send over a network. TCP streams will resolve the returned `Promise` immediately as long as the write buffer isn't full. The write order is always ensured, even if the writer doesn't wait on the promise. {:.note} > Use `Amp\Promise\rethrow` on the returned `Promise` if you do not wait on it to get notified about write errors instead of silently doing nothing on errors. ### `end()` `end()` marks the stream as ended, optionally writing a last data chunk before. TCP streams might close the underlying stream for writing, but MUST NOT close it. Instead, all resources should be freed and actual resource handles be closed by PHP's garbage collection process. ## Example This example uses the previous example to read from a stream and simply writes all data to an `OutputStream`. ```php $inputStream = ...; $outputStream = ...; $buffer = ""; while (($chunk = yield $inputStream->read()) !== null) { yield $outputStream->write($chunk); } yield $outputStream->end(); ``` ### Implementations This package offers some basic implementations, other libraries might provide even more implementations, such as [`amphp/socket`](https://github.com/amphp/socket). * [`ResourceOutputStream`](./resource-streams.md) * [`ZlibOutputStream`](./compression-streams.md) PK vFAVyTJ J docs/compression-streams.mdnu W+A --- title: Compression Streams permalink: /compression-streams --- This package implements compression and decompression streams based on Zlib. `ZlibOutputStream` can be used for compression, while `ZlibInputStream` can be used for decompression. Both can simply wrap an existing stream to apply them. Both accept an `$encoding` and `$options` parameter in their constructor. ## Examples ```php $inputStream = new ResourceInputStream(STDIN); $gzInputStream = new ZlibInputStream($inputStream, \ZLIB_ENCODING_GZIP); while (null !== $chunk = yield $gzInputStream) { print $chunk; } ``` ```php $outputStream = new ResourceOutputStream(STDOUT); $gzOutputStream = new ZlibOutputStream($outputStream, \ZLIB_ENCODING_GZIP); for ($i = 0; $i < 100; $i++) { yield $gzOutputStream->write(bin2hex(random_bytes(32)); } yield $gzOutputStream->end(); ``` ## See also * [`./examples/gzip-compress.php`](https://github.com/amphp/byte-stream/blob/master/examples/gzip-compress.php) * [`./examples/gzip-decompress.php`](https://github.com/amphp/byte-stream/blob/master/examples/gzip-decompress.php) PK vFAV(} docs/assetnu W+A .shared/assetPK vFAVR1 docs/_config.ymlnu W+A kramdown: input: GFM toc_levels: 2..3 baseurl: "/byte-stream" layouts_dir: ".shared/layout" includes_dir: ".shared/includes" exclude: ["Gemfile", "Gemfile.lock", "README.md", "vendor"] safe: true repository: amphp/byte-stream gems: - "jekyll-github-metadata" - "jekyll-relative-links" defaults: - scope: path: "" type: "pages" values: layout: "docs" description: "amphp/byte-stream is a byte stream abstraction for Amp providing interfaces for InputStream and OutputStream." keywords: ['amphp', 'amp', 'byte-stream', 'non-blocking', 'io', 'stream', 'compression'] shared_asset_path: "/byte-stream/asset" navigation: - resource-streams - in-memory-stream - iterator-stream - compression-streams - message PK vFAV=. docs/message.mdnu W+A --- title: Message permalink: /message --- `Message` implements both `InputStream` _and_ `Promise`. This allows consuming a message either in chunks (streaming) or consume everything at once (buffering). ## Buffering Buffering a complete input stream is quite easy, you can simply `yield` a `Message` just like any other `Promise`. If you have an `InputStream` that's not a `Message`, simply create a new instance from it using `new Message($inputStream)`. ```php $message = new Message($inputStream); $content = yield $message; ``` ## Streaming Sometimes it's useful / possible to consume a message in chunks rather than first buffering it completely. An example might be streaming a large HTTP response body directly to disk. ```php while (($chunk = yield $message->read()) !== null) { // Use $chunk here, works just like any other InputStream } ``` ## Unwrapping In some cases you might want to resolve a promise with an `InputStream` or your method explicitly declares `InputStream` as a return type. In these cases you should use `Message::getInputStream` to return the raw input stream. This makes it possible to resolve promises with the `InputStream` and not run into unexpected issues. Only return a `Message` if you declare that as a type, otherwise an API assumes `InputStream` and might try to resolve a promise with that, resulting in buffering the message's content instead of resolving the promise with an `InputStream` instance. PK vFAVZ {> docs/README.mdnu W+A # Documentation This directory contains the documentation for `amphp/byte-stream`. Documentation and code are bundled within a single repository for easier maintenance. Additionally, this preserves the documentation for older versions. ## Reading You can read this documentation either directly on GitHub or on our website. While the website will always contain the latest version, viewing on GitHub also works with older versions. ## Writing Our documentation is built using Jekyll. ``` sudo gem install bundler jekyll ``` ``` git submodule init git submodule update cd docs bundle install --path vendor/bundle bundle exec jekyll serve ``` PK vFAVܧX X docs/iterator-stream.mdnu W+A --- title: IteratorStream permalink: /iterator-stream --- `IteratorStream` allows converting an `Amp\Iterator` that yields strings into an `InputStream`. ```php $inputStream = new IteratorStream(new Producer(function (callable $emit) { for ($i = 0; $i < 10; $i++) { yield new Delayed(1000); yield $emit("."); } }); ``` PK vFAV docs/.shared/nu PK vFAVzq mb b lib/OutputBuffer.phpnu W+A PK vFAVD D lib/ZlibOutputStream.phpnu W+A PK vFAVh,{Lz z 2 lib/Message.phpnu W+A PK vFAV5VO O " lib/StreamException.phpnu W+A PK vFAVE # lib/OutputStream.phpnu W+A PK vFAVť͏ c( lib/IteratorStream.phpnu W+A PK vFAVzX X . lib/ResourceInputStream.phpnu W+A PK vFAVQ7Z Z cB lib/ClosedException.phpnu W+A PK vFAVeU(- - C lib/InMemoryStream.phpnu W+A PK vFAVDeO O wF lib/ZlibInputStream.phpnu W+A PK vFAVe Q lib/PendingReadError.phpnu W+A PK vFAV% S lib/InputStream.phpnu W+A PK vFAV,i6 6 RV lib/ResourceOutputStream.phpnu W+A PK vFAVz v lib/functions.phpnu W+A PK vFAVE{ %{ .editorconfignu W+A PK vFAV2F6 6 { LICENSEnu W+A PK vFAVs s R .php_cs.distnu W+A PK vFAVvT+R` ` .gitmodulesnu W+A PK vFAV-$n composer.jsonnu W+A PK vFAVB/X`O O Makefilenu W+A PK vFAV˱k examples/gzip-decompress.phpnu W+A PK vFAV1. ! ; examples/benchmark-throughput.phpnu W+A PK vFAV examples/gzip-compress.phpnu W+A PK vFAVtA A README.mdnu W+A PK vFAVA_y y docs/Gemfilenu W+A PK vFAV+# С docs/in-memory-stream.mdnu W+A PK vFAV docs/Gemfile.locknu W+A PK vFAV2 docs/resource-streams.mdnu W+A PK vFAVpʌ c docs/index.mdnu W+A PK vFAVyTJ J docs/compression-streams.mdnu W+A PK vFAV(} <