PK tAVCif f LICENSEnu W+A The MIT License (MIT)
Copyright (c) 2013 Daniel Lowrey, Levi Morrison
Copyright (c) 2014-2019 amphp
Permission is hereby granted, free of charge, to any person obtaining a
copy of this software and associated documentation files (the "Software"),
to deal in the Software without restriction, including without limitation
the rights to use, copy, modify, merge, publish, distribute, sublicense,
and/or sell copies of the Software, and to permit persons to whom the
Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included
in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
IN THE SOFTWARE.
PK tAV,$ $ appveyor.ymlnu W+A build: false
shallow_clone: false
platform:
- x86
- x64
clone_folder: c:\projects\amphp
cache:
- c:\tools\php74 -> appveyor.yml
init:
- SET PATH=C:\Program Files\OpenSSL;c:\tools\php74;%PATH%
- SET COMPOSER_NO_INTERACTION=1
- SET PHP=1
- SET ANSICON=121x90 (121x90)
install:
- IF EXIST c:\tools\php74 (SET PHP=0)
- IF %PHP%==1 sc config wuauserv start= auto
- IF %PHP%==1 net start wuauserv
- IF %PHP%==1 cinst -y OpenSSL.Light
- IF %PHP%==1 cinst -y php
- cd c:\tools\php74
- IF %PHP%==1 copy php.ini-production php.ini /Y
- IF %PHP%==1 echo date.timezone="UTC" >> php.ini
- IF %PHP%==1 echo extension_dir=ext >> php.ini
- IF %PHP%==1 echo extension=php_openssl.dll >> php.ini
- IF %PHP%==1 echo extension=php_mbstring.dll >> php.ini
- IF %PHP%==1 echo extension=php_fileinfo.dll >> php.ini
- cd c:\projects\amphp
- appveyor DownloadFile https://getcomposer.org/composer.phar
- php composer.phar install --prefer-dist --no-progress
test_script:
- cd c:\projects\amphp
- vendor/bin/phpunit --colors=always
PK tAViTQ| |
composer.jsonnu W+A {
"name": "amphp/http-client",
"homepage": "https://github.com/amphp/http-client",
"description": "Asynchronous concurrent HTTP/2 and HTTP/1.1 client built on the Amp concurrency framework",
"keywords": [
"http",
"rest",
"client",
"concurrent",
"async",
"non-blocking"
],
"license": "MIT",
"authors": [
{
"name": "Daniel Lowrey",
"email": "rdlowrey@gmail.com"
},
{
"name": "Niklas Keller",
"email": "me@kelunik.com"
},
{
"name": "Aaron Piotrowski",
"email": "aaron@trowski.com"
}
],
"require": {
"php": ">=7.2",
"amphp/amp": "^2.4",
"amphp/byte-stream": "^1.6",
"amphp/hpack": "^3",
"amphp/http": "^1.6",
"amphp/socket": "^1",
"amphp/sync": "^1.3",
"league/uri": "^6",
"psr/http-message": "^1"
},
"require-dev": {
"ext-json": "*",
"amphp/file": "^1 || ^0.3 || ^0.2",
"amphp/phpunit-util": "^1.1",
"amphp/php-cs-fixer-config": "dev-master",
"phpunit/phpunit": "^7 || ^8",
"amphp/http-server": "^2-rc4"
},
"suggest": {
"ext-zlib": "Allows using compression for response bodies.",
"ext-json": "Required for logging HTTP archives",
"amphp/file": "Required for file request bodies and HTTP archive logging"
},
"autoload": {
"psr-4": {
"Amp\\Http\\Client\\": "src"
}
},
"autoload-dev": {
"psr-4": {
"Amp\\Http\\Client\\": "test"
}
},
"conflict": {
"amphp/file": "<0.2 || >=2"
},
"scripts": {
"check": [
"@cs",
"@test"
],
"cs": "PHP_CS_FIXER_IGNORE_ENV=1 php-cs-fixer fix -v --diff --dry-run",
"cs-fix": "PHP_CS_FIXER_IGNORE_ENV=1 php-cs-fixer fix -v --diff",
"test": "@php -dzend.assertions=1 -dassert.exception=1 ./vendor/bin/phpunit --coverage-text"
},
"extra": {
"branch-alias": {
"dev-master": "4.x-dev"
}
}
}
PK tAVCk CHANGELOG.mdnu W+A # Changelog
### 4.2.0
- Add improved ConnectionLimitingPool
The new ConnectionLimitingPool limits connections instead of streams. In addition, it has improved connection handling, racing between new connections and existing connections becoming available once the limit has been reached. The older LimitedConnectionPool has been renamed to StreamLimitingPool with a class alias for backward compatibility.
- Don't set ALPN if only HTTP/1.1 is enabled, which allows connections to certain misbehaving servers (#255)
### 4.1.0
- Fix possible double resolution of promises (#244)
- Fix assertion error on invalid HTTP/2 frame (#236)
- Fix HTTP/2 connection reuse if too many concurrent streams for one connection are in use (#246)
- Allow skipping default `accept`, `accept-encoding` and `user-agent` headers (#238)
- Keep original header case for HTTP/1 requests (#250)
- Allow access to informational responses (1XX) (#239)
- Await `startReceiveResponse` event listeners on HTTP/2 before resolving the response promise (#254)
- Delay `startReceiveResponse` event until the final response is started to be received, instead of calling it for the first byte or multiple times for HTTP/2 (#254)
- Use common HTTP/2 parser from `amphp/http`
## 4.0.0
Initial release of `amphp/http-client`, the successor of `amphp/artax`.
This is a major rewrite to support interceptors and HTTP/2.
**Major Changes**
- Support for HTTP/2 (including push)
- Support for interceptors to customize behavior
- Switch to a mutable `Request` / `Response` API, because streams are never immutable
- Compatibility with `amphp/socket@^1`
- Compatibility with `amphp/file@^1`
- Compatibility with `league/uri@^6`
## 3.x - 1.x
Please refer to `CHANGELOD.md` in [`amphp/artax`](https://github.com/amphp/artax).
PK tAVvpp/
/
README.mdnu W+A
[![Build Status](https://img.shields.io/travis/amphp/http-client/master.svg?style=flat-square)](https://travis-ci.org/amphp/http-client)
[![CoverageStatus](https://img.shields.io/coveralls/amphp/http-client/master.svg?style=flat-square)](https://coveralls.io/github/amphp/http-client?branch=master)
![License](https://img.shields.io/badge/license-MIT-blue.svg?style=flat-square)
This package provides an asynchronous HTTP client for PHP based on [Amp](https://github.com/amphp/amp). Its API simplifies standards-compliant HTTP resource traversal and RESTful web service consumption without obscuring the underlying protocol. The library manually implements HTTP over TCP sockets; as such it has no dependency on `ext/curl`.
## Features
- Supports HTTP/1 and HTTP/2
- [Requests concurrently by default](examples/concurrency/1-concurrent-fetch.php)
- [Pools persistent connections (keep-alive @ HTTP/1.1, multiplexing @ HTTP/2)](examples/pooling/1-connection-count.php)
- [Transparently follows redirects](https://amphp.org/http-client/follow-redirects)
- [Decodes compressed entity bodies (gzip, deflate)](examples/basic/7-gzip.php)
- [Exposes headers and message data](examples/basic/1-get-request.php)
- [Streams entity bodies for memory management with large transfers](examples/streaming/1-large-response.php)
- [Supports all standard and custom HTTP method verbs](https://amphp.org/http-client/requests#request-method)
- [Simplifies HTTP form submissions](examples/basic/4-forms.php)
- [Implements secure-by-default TLS (`https://`)](examples/basic/1-get-request.php)
- [Supports cookies and sessions](https://github.com/amphp/http-client-cookies)
- [Functions seamlessly behind HTTP proxies](https://github.com/amphp/http-tunnel)
## Installation
This package can be installed as a [Composer](https://getcomposer.org/) dependency.
```bash
composer require amphp/http-client
```
Additionally, you might want to install the `nghttp2` library to take advantage of FFI to speed up and reduce the memory usage on PHP 7.4.
## Documentation
Documentation is bundled within this repository in the [`docs`](./docs) directory.
## Examples
More extensive code examples reside in the [`examples`](./examples) directory.
## Versioning
`amphp/http-client` follows the [semver](http://semver.org/) semantic versioning specification like all other `amphp` packages.
Everything in an `Internal` namespace or marked as `@internal` is not public API and therefore not covered by BC guarantees.
##### 4.x
Under development.
##### [3.x](https://github.com/amphp/artax/tree/master)
Use [`amphp/artax`](https://github.com/amphp/artax) as package name instead.
##### [2.x](https://github.com/amphp/artax/tree/2.x)
No longer maintained. Use [`amphp/artax`](https://github.com/amphp/artax) as package name instead.
##### [1.x](https://github.com/amphp/artax/tree/1.x)
No longer maintained. Use [`amphp/artax`](https://github.com/amphp/artax) as package name instead.
## Security
If you discover any security related issues, please email [`me@kelunik.com`](mailto:me@kelunik.com) instead of using the issue tracker.
## License
The MIT License (MIT). Please see [`LICENSE`](./LICENSE) for more information.
PK tAVA src/MissingAttributeError.phpnu W+A hasAttribute(HarAttributes::STARTED_DATE_TIME)) {
$request->setAttribute(HarAttributes::STARTED_DATE_TIME, new \DateTimeImmutable);
}
return $this->addTiming(HarAttributes::TIME_START, $request);
}
public function startDnsResolution(Request $request): Promise
{
return new Success; // not implemented
}
public function startConnectionCreation(Request $request): Promise
{
return $this->addTiming(HarAttributes::TIME_CONNECT, $request);
}
public function startTlsNegotiation(Request $request): Promise
{
return $this->addTiming(HarAttributes::TIME_SSL, $request);
}
public function startSendingRequest(Request $request, Stream $stream): Promise
{
$host = $stream->getRemoteAddress()->getHost();
if (\strrpos($host, ':')) {
$host = '[' . $host . ']';
}
$request->setAttribute(HarAttributes::SERVER_IP_ADDRESS, $host);
return $this->addTiming(HarAttributes::TIME_SEND, $request);
}
public function completeSendingRequest(Request $request, Stream $stream): Promise
{
return $this->addTiming(HarAttributes::TIME_WAIT, $request);
}
public function startReceivingResponse(Request $request, Stream $stream): Promise
{
return $this->addTiming(HarAttributes::TIME_RECEIVE, $request);
}
public function completeReceivingResponse(Request $request, Stream $stream): Promise
{
return $this->addTiming(HarAttributes::TIME_COMPLETE, $request);
}
public function completeDnsResolution(Request $request): Promise
{
return new Success; // not implemented
}
public function completeConnectionCreation(Request $request): Promise
{
return new Success; // not implemented
}
public function completeTlsNegotiation(Request $request): Promise
{
return new Success; // not implemented
}
private function addTiming(string $key, Request $request): Promise
{
if (!$request->hasAttribute($key)) {
$request->setAttribute($key, getCurrentTime());
}
return new Success;
}
public function abort(Request $request, \Throwable $cause): Promise
{
return new Success;
}
}
PK tAV
^ src/NetworkInterceptor.phpnu W+A request(...)` resolved.
*
* A NetworkInterceptor SHOULD NOT short-circuit and SHOULD delegate to the `$stream` passed as third argument
* exactly once. The only exception to this is throwing an exception, e.g. because the TLS settings used are
* unacceptable. If you need short circuits, use an {@see ApplicationInterceptor} instead.
*
* @param Request $request
* @param CancellationToken $cancellation
* @param Stream $stream
*
* @return Promise
*/
public function requestViaNetwork(Request $request, CancellationToken $cancellation, Stream $stream): Promise;
}
PK tAV src/Body/StringBody.phpnu W+A body = $body;
}
public function createBodyStream(): InputStream
{
return new InMemoryStream($this->body !== '' ? $this->body : null);
}
public function getHeaders(): Promise
{
return new Success([]);
}
public function getBodyLength(): Promise
{
return new Success(\strlen($this->body));
}
}
PK tAV8a
src/Body/FormBody.phpnu W+A boundary = $boundary ?? \bin2hex(\random_bytes(16));
}
/**
* Add a data field to the form entity body.
*
* @param string $name
* @param string $value
* @param string $contentType
*/
public function addField(string $name, string $value, string $contentType = 'text/plain'): void
{
$this->fields[] = [$name, $value, $contentType, null];
$this->resetCache();
}
/**
* Add each element of a associative array as a data field to the form entity body.
*
* @param array $data
* @param string $contentType
*/
public function addFields(array $data, string $contentType = 'text/plain'): void
{
foreach ($data as $key => $value) {
$this->addField($key, $value, $contentType);
}
}
/**
* Add a file field to the form entity body.
*
* @param string $name
* @param string $filePath
* @param string $contentType
*/
public function addFile(string $name, string $filePath, string $contentType = 'application/octet-stream'): void
{
$fileName = \basename($filePath);
$this->fields[] = [$name, new FileBody($filePath), $contentType, $fileName];
$this->isMultipart = true;
$this->resetCache();
}
/**
* Add each element of a associative array as a file field to the form entity body.
*
* @param array $data
* @param string $contentType
*/
public function addFiles(array $data, string $contentType = 'application/octet-stream'): void
{
foreach ($data as $key => $value) {
$this->addFile($key, $value, $contentType);
}
}
/**
* Returns an array of fields, each being an array of [name, value, content-type, file-name|null].
* Both fields and files are returned in the array. Files use a FileBody object as the value. The file-name is
* always null for fields.
*
* @return array
*/
public function getFields(): array
{
return $this->fields;
}
private function resetCache(): void
{
$this->cachedBody = null;
$this->cachedLength = null;
$this->cachedFields = null;
}
public function createBodyStream(): InputStream
{
if ($this->isMultipart) {
return $this->generateMultipartStreamFromFields($this->getMultipartFieldArray());
}
return new InMemoryStream($this->getFormEncodedBodyString());
}
private function getMultipartFieldArray(): array
{
if (isset($this->cachedFields)) {
return $this->cachedFields;
}
$fields = [];
foreach ($this->fields as $fieldArr) {
[$name, $field, $contentType, $fileName] = $fieldArr;
$fields[] = "--{$this->boundary}\r\n";
$fields[] = $field instanceof FileBody
? $this->generateMultipartFileHeader($name, $fileName, $contentType)
: $this->generateMultipartFieldHeader($name, $contentType);
$fields[] = $field;
$fields[] = "\r\n";
}
$fields[] = "--{$this->boundary}--\r\n";
return $this->cachedFields = $fields;
}
private function generateMultipartFileHeader(string $name, string $fileName, string $contentType): string
{
$header = "Content-Disposition: form-data; name=\"{$name}\"; filename=\"{$fileName}\"\r\n";
$header .= "Content-Type: {$contentType}\r\n";
$header .= "Content-Transfer-Encoding: binary\r\n\r\n";
return $header;
}
private function generateMultipartFieldHeader(string $name, string $contentType): string
{
$header = "Content-Disposition: form-data; name=\"{$name}\"\r\n";
if ($contentType !== "") {
$header .= "Content-Type: {$contentType}\r\n\r\n";
} else {
$header .= "\r\n";
}
return $header;
}
private function generateMultipartStreamFromFields(array $fields): InputStream
{
foreach ($fields as $key => $field) {
$fields[$key] = $field instanceof FileBody ? $field->createBodyStream() : new InMemoryStream($field);
}
return new IteratorStream(new Producer(static function (callable $emit) use ($fields) {
foreach ($fields as $key => $stream) {
while (($chunk = yield $stream->read()) !== null) {
yield $emit($chunk);
}
}
}));
}
private function getFormEncodedBodyString(): string
{
if ($this->cachedBody) {
return $this->cachedBody;
}
$fields = [];
foreach ($this->fields as $fieldArr) {
[$name, $value] = $fieldArr;
$fields[$name][] = $value;
}
foreach ($fields as $key => $value) {
$fields[$key] = isset($value[1]) ? $value : $value[0];
}
return $this->cachedBody = \http_build_query($fields);
}
public function getHeaders(): Promise
{
return new Success([
'Content-Type' => $this->determineContentType(),
]);
}
private function determineContentType(): string
{
return $this->isMultipart
? "multipart/form-data; boundary={$this->boundary}"
: 'application/x-www-form-urlencoded';
}
public function getBodyLength(): Promise
{
if ($this->cachedLength) {
return $this->cachedLength;
}
if (!$this->isMultipart) {
return $this->cachedLength = new Success(\strlen($this->getFormEncodedBodyString()));
}
return $this->cachedLength = call(function () {
$fields = $this->getMultipartFieldArray();
$length = 0;
foreach ($fields as $field) {
if (\is_string($field)) {
$length += \strlen($field);
} else {
$length += yield $field->getBodyLength();
}
}
return $length;
});
}
}
PK tAVYE/ src/Body/JsonBody.phpnu W+A json = \json_encode($data, $options, $depth);
if (\json_last_error() !== \JSON_ERROR_NONE) {
throw new HttpException('Failed to encode data to JSON');
}
}
public function getHeaders(): Promise
{
return new Success(['content-type' => 'application/json; charset=utf-8']);
}
public function createBodyStream(): InputStream
{
return new InMemoryStream($this->json);
}
public function getBodyLength(): Promise
{
return new Success(\strlen($this->json));
}
}
PK tAVW~ ~ src/Body/FileBody.phpnu W+A path = $path;
}
public function createBodyStream(): InputStream
{
$handlePromise = open($this->path, "r");
return new class($handlePromise) implements InputStream {
/** @var Promise */
private $promise;
/** @var InputStream */
private $stream;
public function __construct(Promise $promise)
{
$this->promise = $promise;
$this->promise->onResolve(function ($error, $stream) {
if ($error) {
return;
}
$this->stream = $stream;
});
}
public function read(): Promise
{
if (!$this->stream) {
return call(function () {
/** @var InputStream $stream */
$stream = yield $this->promise;
return $stream->read();
});
}
return $this->stream->read();
}
};
}
public function getHeaders(): Promise
{
return new Success([]);
}
public function getBodyLength(): Promise
{
return size($this->path);
}
}
PK tAV*UH# H# + src/Connection/DefaultConnectionFactory.phpnu W+A connector = $connector;
$this->connectContext = $connectContext;
}
public function create(
Request $request,
CancellationToken $cancellationToken
): Promise {
return call(function () use ($request, $cancellationToken) {
foreach ($request->getEventListeners() as $eventListener) {
yield $eventListener->startConnectionCreation($request);
}
$connector = $this->connector ?? connector();
$connectContext = $this->connectContext ?? new ConnectContext;
$uri = $request->getUri();
$scheme = $uri->getScheme();
if (!\in_array($scheme, ['http', 'https'], true)) {
throw new InvalidRequestException($request, 'Invalid scheme provided in the request URI: ' . $uri);
}
$isHttps = $scheme === 'https';
$defaultPort = $isHttps ? 443 : 80;
$host = $uri->getHost();
$port = $uri->getPort() ?? $defaultPort;
if ($host === '') {
throw new InvalidRequestException($request, 'A host must be provided in the request URI: ' . $uri);
}
$authority = $host . ':' . $port;
$protocolVersions = $request->getProtocolVersions();
$isConnect = $request->getMethod() === 'CONNECT';
if ($isHttps) {
$protocols = [];
if (!$isConnect && \in_array('2', $protocolVersions, true)) {
$protocols[] = 'h2';
}
if (\in_array('1.1', $protocolVersions, true) || \in_array('1.0', $protocolVersions, true)) {
$protocols[] = 'http/1.1';
}
if (!$protocols) {
throw new InvalidRequestException(
$request,
\sprintf(
"None of the requested protocol versions (%s) are supported by %s (HTTP/2 is only supported on HTTPS)",
\implode(', ', $protocolVersions),
self::class
)
);
}
$tlsContext = ($connectContext->getTlsContext() ?? new ClientTlsContext(''))
->withApplicationLayerProtocols($protocols)
->withPeerCapturing();
if ($protocols === ['http/1.1']) {
// If we only have HTTP/1.1 available, don't set application layer protocols.
// There are misbehaving sites like n11.com, see https://github.com/amphp/http-client/issues/255
$tlsContext = $tlsContext->withApplicationLayerProtocols([]);
}
if ($tlsContext->getPeerName() === '') {
$tlsContext = $tlsContext->withPeerName($host);
}
$connectContext = $connectContext->withTlsContext($tlsContext);
}
try {
/** @var EncryptableSocket $socket */
$socket = yield $connector->connect(
'tcp://' . $authority,
$connectContext->withConnectTimeout($request->getTcpConnectTimeout()),
$cancellationToken
);
} catch (Socket\ConnectException $e) {
throw new UnprocessedRequestException(
new SocketException(\sprintf("Connection to '%s' failed", $authority), 0, $e)
);
} catch (CancelledException $e) {
// In case of a user cancellation request, throw the expected exception
$cancellationToken->throwIfRequested();
// Otherwise we ran into a timeout of our TimeoutCancellationToken
throw new UnprocessedRequestException(new TimeoutException(\sprintf(
"Connection to '%s' timed out, took longer than " . $request->getTcpConnectTimeout() . ' ms',
$authority
))); // don't pass $e
}
if ($isHttps) {
try {
$tlsState = $socket->getTlsState();
// Error if anything enabled TLS on a new connection before we can do it
if ($tlsState !== EncryptableSocket::TLS_STATE_DISABLED) {
$socket->close();
throw new UnprocessedRequestException(
new SocketException('Failed to setup TLS connection, connection was in an unexpected TLS state (' . $tlsState . ')')
);
}
foreach ($request->getEventListeners() as $eventListener) {
yield $eventListener->startTlsNegotiation($request);
}
$tlsCancellationToken = new CombinedCancellationToken(
$cancellationToken,
new TimeoutCancellationToken($request->getTlsHandshakeTimeout())
);
yield $socket->setupTls($tlsCancellationToken);
foreach ($request->getEventListeners() as $eventListener) {
yield $eventListener->completeTlsNegotiation($request);
}
} catch (StreamException $exception) {
$socket->close();
throw new UnprocessedRequestException(new SocketException(\sprintf(
"Connection to '%s' @ '%s' closed during TLS handshake",
$authority,
$socket->getRemoteAddress()->toString()
), 0, $exception));
} catch (CancelledException $e) {
$socket->close();
// In case of a user cancellation request, throw the expected exception
$cancellationToken->throwIfRequested();
// Otherwise we ran into a timeout of our TimeoutCancellationToken
throw new UnprocessedRequestException(new TimeoutException(\sprintf(
"TLS handshake with '%s' @ '%s' timed out, took longer than " . $request->getTlsHandshakeTimeout() . ' ms',
$authority,
$socket->getRemoteAddress()->toString()
))); // don't pass $e
}
$tlsInfo = $socket->getTlsInfo();
if ($tlsInfo === null) {
throw new UnprocessedRequestException(
new SocketException(\sprintf(
"Socket closed after TLS handshake with '%s' @ '%s'",
$authority,
$socket->getRemoteAddress()->toString()
))
);
}
if ($tlsInfo->getApplicationLayerProtocol() === 'h2') {
$connection = new Http2Connection($socket);
yield $connection->initialize();
foreach ($request->getEventListeners() as $eventListener) {
yield $eventListener->completeConnectionCreation($request);
}
return $connection;
}
}
if (!\array_intersect($request->getProtocolVersions(), ['1.0', '1.1'])) {
$socket->close();
throw new InvalidRequestException(
$request,
\sprintf(
"None of the requested protocol versions (%s) are supported by '%s' @ '%s'",
\implode(', ', $protocolVersions),
$authority,
$socket->getRemoteAddress()->toString()
)
);
}
foreach ($request->getEventListeners() as $eventListener) {
yield $eventListener->completeConnectionCreation($request);
}
return new Http1Connection($socket);
});
}
}
PK tAV src/Connection/HttpStream.phpnu W+A getLocalAddress(),
$connection->getRemoteAddress(),
$connection->getTlsInfo(),
$requestCallback,
$releaseCallback
);
}
public static function fromStream(Stream $stream, callable $requestCallback, callable $releaseCallback): self
{
return new self(
$stream->getLocalAddress(),
$stream->getRemoteAddress(),
$stream->getTlsInfo(),
$requestCallback,
$releaseCallback
);
}
/** @var SocketAddress */
private $localAddress;
/** @var SocketAddress */
private $remoteAddress;
/** @var TlsInfo */
private $tlsInfo;
/** @var callable */
private $requestCallback;
/** @var callable|null */
private $releaseCallback;
private function __construct(
SocketAddress $localAddress,
SocketAddress $remoteAddress,
?TlsInfo $tlsInfo,
callable $requestCallback,
callable $releaseCallback
) {
$this->localAddress = $localAddress;
$this->remoteAddress = $remoteAddress;
$this->tlsInfo = $tlsInfo;
$this->requestCallback = $requestCallback;
$this->releaseCallback = $releaseCallback;
}
public function __destruct()
{
if ($this->releaseCallback !== null) {
($this->releaseCallback)();
}
}
public function request(Request $request, CancellationToken $token): Promise
{
if ($this->releaseCallback === null) {
throw new \Error('A stream may only be used for a single request');
}
$this->releaseCallback = null;
return call(function () use ($request, $token) {
foreach ($request->getEventListeners() as $eventListener) {
yield $eventListener->startRequest($request);
}
return call($this->requestCallback, $request, $token, $this);
});
}
public function getLocalAddress(): SocketAddress
{
return $this->localAddress;
}
public function getRemoteAddress(): SocketAddress
{
return $this->remoteAddress;
}
public function getTlsInfo(): ?TlsInfo
{
return $this->tlsInfo;
}
}
PK tAVIX X " src/Connection/Http1Connection.phpnu W+A socket = $socket;
$this->localAddress = $socket->getLocalAddress();
$this->remoteAddress = $socket->getRemoteAddress();
$this->tlsInfo = $socket->getTlsInfo();
$this->timeoutGracePeriod = $timeoutGracePeriod;
$this->lastUsedAt = getCurrentTime();
}
public function __destruct()
{
$this->close();
}
public function onClose(callable $onClose): void
{
if (!$this->socket || $this->socket->isClosed()) {
Promise\rethrow(call($onClose, $this));
return;
}
$this->onClose[] = $onClose;
}
public function close(): Promise
{
if ($this->socket) {
$this->socket->close();
}
return $this->free();
}
public function getLocalAddress(): SocketAddress
{
return $this->localAddress;
}
public function getRemoteAddress(): SocketAddress
{
return $this->remoteAddress;
}
public function getTlsInfo(): ?TlsInfo
{
return $this->tlsInfo;
}
public function getProtocolVersions(): array
{
return self::PROTOCOL_VERSIONS;
}
public function getStream(Request $request): Promise
{
if ($this->busy || ($this->requestCounter && !$this->hasStreamFor($request))) {
return new Success;
}
$this->busy = true;
return new Success(HttpStream::fromConnection(
$this,
\Closure::fromCallable([$this, 'request']),
\Closure::fromCallable([$this, 'release'])
));
}
private function free(): Promise
{
$this->socket = null;
$this->lastUsedAt = 0;
if ($this->timeoutWatcher !== null) {
Loop::cancel($this->timeoutWatcher);
}
if ($this->onClose !== null) {
$onClose = $this->onClose;
$this->onClose = null;
foreach ($onClose as $callback) {
asyncCall($callback, $this);
}
}
return new Success;
}
private function hasStreamFor(Request $request): bool
{
return !$this->busy
&& $this->socket
&& !$this->socket->isClosed()
&& ($this->getRemainingTime() > 0 || $request->isIdempotent());
}
/** @inheritdoc */
private function request(Request $request, CancellationToken $cancellation, Stream $stream): Promise
{
return call(function () use ($request, $cancellation, $stream) {
++$this->requestCounter;
if ($this->timeoutWatcher !== null) {
Loop::cancel($this->timeoutWatcher);
$this->timeoutWatcher = null;
}
yield RequestNormalizer::normalizeRequest($request);
$protocolVersion = $this->determineProtocolVersion($request);
$request->setProtocolVersions([$protocolVersion]);
if ($request->getTransferTimeout() > 0) {
$timeoutToken = new TimeoutCancellationToken($request->getTransferTimeout());
$combinedCancellation = new CombinedCancellationToken($cancellation, $timeoutToken);
} else {
$combinedCancellation = $cancellation;
}
$id = $combinedCancellation->subscribe([$this, 'close']);
try {
foreach ($request->getEventListeners() as $eventListener) {
yield $eventListener->startSendingRequest($request, $stream);
}
yield from $this->writeRequest($request, $protocolVersion, $combinedCancellation);
foreach ($request->getEventListeners() as $eventListener) {
yield $eventListener->completeSendingRequest($request, $stream);
}
return yield from $this->readResponse($request, $cancellation, $combinedCancellation, $stream);
} catch (\Throwable $e) {
foreach ($request->getEventListeners() as $eventListener) {
yield $eventListener->abort($request, $e);
}
$this->socket->close();
throw $e;
} finally {
$combinedCancellation->unsubscribe($id);
$cancellation->throwIfRequested();
}
});
}
private function release(): void
{
$this->busy = false;
}
/**
* @param Request $request
* @param CancellationToken $originalCancellation
* @param CancellationToken $readingCancellation
*
* @param Stream $stream
*
* @return \Generator
* @throws CancelledException
* @throws HttpException
* @throws ParseException
* @throws SocketException
*/
private function readResponse(
Request $request,
CancellationToken $originalCancellation,
CancellationToken $readingCancellation,
Stream $stream
): \Generator {
$bodyEmitter = new Emitter;
$backpressure = new Success;
$bodyCallback = static function ($data) use ($bodyEmitter, &$backpressure): void {
$backpressure = $bodyEmitter->emit($data);
};
$trailersDeferred = new Deferred;
$trailers = [];
$trailersCallback = static function (array $headers) use (&$trailers): void {
$trailers = $headers;
};
$parser = new Http1Parser($request, $bodyCallback, $trailersCallback);
$start = getCurrentTime();
try {
while (null !== $chunk = yield $this->socket->read()) {
parseChunk:
$response = $parser->parse($chunk);
if ($response === null) {
continue;
}
$status = $response->getStatus();
if ($status === Http\Status::SWITCHING_PROTOCOLS) {
$connection = Http\createFieldValueComponentMap(Http\parseFieldValueComponents(
$response,
'connection'
));
if (!isset($connection['upgrade'])) {
throw new HttpException('Switching protocols response missing "Connection: upgrade" header');
}
if (!$response->hasHeader('upgrade')) {
throw new HttpException('Switching protocols response missing "Upgrade" header');
}
foreach ($request->getEventListeners() as $eventListener) {
yield $eventListener->completeReceivingResponse($request, $stream);
}
$trailersDeferred->resolve($trailers);
return $this->handleUpgradeResponse($request, $response, $parser->getBuffer());
}
if ($status < 200) { // 1XX responses (excluding 101, handled above)
$onInformationalResponse = $request->getInformationalResponseHandler();
if ($onInformationalResponse !== null) {
yield call($onInformationalResponse, $response);
}
$chunk = $parser->getBuffer();
$parser = new Http1Parser($request, $bodyCallback, $trailersCallback);
goto parseChunk;
}
foreach ($request->getEventListeners() as $eventListener) {
yield $eventListener->startReceivingResponse($request, $stream);
}
if ($status >= 200 && $status < 300 && $request->getMethod() === 'CONNECT') {
foreach ($request->getEventListeners() as $eventListener) {
yield $eventListener->completeReceivingResponse($request, $stream);
}
$trailersDeferred->resolve($trailers);
return $this->handleUpgradeResponse($request, $response, $parser->getBuffer());
}
$bodyCancellationSource = new CancellationTokenSource;
$bodyCancellationToken = new CombinedCancellationToken(
$readingCancellation,
$bodyCancellationSource->getToken()
);
$response->setTrailers($trailersDeferred->promise());
$response->setBody(new ResponseBodyStream(
new IteratorStream($bodyEmitter->iterate()),
$bodyCancellationSource
));
// Read body async
asyncCall(function () use (
$parser,
$request,
$response,
$bodyEmitter,
$trailersDeferred,
$originalCancellation,
$readingCancellation,
$bodyCancellationToken,
$stream,
&$backpressure,
&$trailers
) {
$id = $bodyCancellationToken->subscribe([$this, 'close']);
try {
// Required, otherwise responses without body hang
if (!$parser->isComplete()) {
// Directly parse again in case we already have the full body but aborted parsing
// to resolve promise with headers.
$chunk = null;
do {
/** @noinspection CallableParameterUseCaseInTypeContextInspection */
$parser->parse($chunk);
/** @noinspection NotOptimalIfConditionsInspection */
if ($parser->isComplete()) {
break;
}
if (!$backpressure instanceof Success) {
yield $this->withCancellation($backpressure, $bodyCancellationToken);
}
} while (null !== $chunk = yield $this->socket->read());
$originalCancellation->throwIfRequested();
if ($readingCancellation->isRequested()) {
throw new TimeoutException('Allowed transfer timeout exceeded, took longer than ' . $request->getTransferTimeout() . ' ms');
}
$bodyCancellationToken->throwIfRequested();
// Ignore check if neither content-length nor chunked encoding are given.
if (!$parser->isComplete() && $parser->getState() !== Http1Parser::BODY_IDENTITY_EOF) {
throw new SocketException('Socket disconnected prior to response completion');
}
}
$timeout = $this->determineKeepAliveTimeout($response);
if ($timeout > 0 && $parser->getState() !== Http1Parser::BODY_IDENTITY_EOF) {
$this->timeoutWatcher = Loop::delay($timeout * 1000, [$this, 'close']);
Loop::unreference($this->timeoutWatcher);
} else {
$this->close();
}
$this->busy = false;
foreach ($request->getEventListeners() as $eventListener) {
yield $eventListener->completeReceivingResponse($request, $stream);
}
$bodyEmitter->complete();
$trailersDeferred->resolve($trailers);
} catch (\Throwable $e) {
$this->close();
try {
foreach ($request->getEventListeners() as $eventListener) {
yield $eventListener->abort($request, $e);
}
} finally {
$bodyEmitter->fail($e);
$trailersDeferred->fail($e);
}
} finally {
$bodyCancellationToken->unsubscribe($id);
}
});
return $response;
}
$originalCancellation->throwIfRequested();
throw new SocketException(\sprintf(
"Receiving the response headers for '%s' failed, because the socket to '%s' @ '%s' closed early with %d bytes received within %d milliseconds",
$request->getUri()->withUserInfo(''),
$request->getUri()->withUserInfo('')->getAuthority(),
$this->socket->getRemoteAddress(),
\strlen($parser->getBuffer()),
getCurrentTime() - $start
));
} catch (StreamException $e) {
throw new SocketException('Receiving the response headers failed: ' . $e->getMessage());
}
}
private function handleUpgradeResponse(Request $request, Response $response, string $buffer): Response
{
$socket = new UpgradedSocket($this->socket, $buffer);
$this->free(); // Mark this connection as unusable without closing socket.
if (($onUpgrade = $request->getUpgradeHandler()) === null) {
$socket->close();
throw new HttpException('CONNECT or upgrade request made without upgrade handler callback');
}
asyncCall(static function () use ($onUpgrade, $socket, $request, $response): \Generator {
try {
yield call($onUpgrade, $socket, $request, $response);
} catch (\Throwable $exception) {
$socket->close();
throw new HttpException('Upgrade handler threw an exception', 0, $exception);
}
});
return $response;
}
/**
* @return int Approximate number of milliseconds remaining until the connection is closed.
*/
private function getRemainingTime(): int
{
$timestamp = $this->lastUsedAt + $this->explicitTimeout ? $this->priorTimeout * 1000 : $this->timeoutGracePeriod;
return \max(0, $timestamp - getCurrentTime());
}
private function withCancellation(Promise $promise, CancellationToken $cancellationToken): Promise
{
$deferred = new Deferred;
$newPromise = $deferred->promise();
$promise->onResolve(static function ($error, $value) use (&$deferred): void {
if ($deferred) {
$temp = $deferred;
$deferred = null;
if ($error) {
$temp->fail($error);
} else {
$temp->resolve($value);
}
}
});
$cancellationSubscription = $cancellationToken->subscribe(static function ($e) use (&$deferred): void {
if ($deferred) {
$temp = $deferred;
$deferred = null;
$temp->fail($e);
}
});
$newPromise->onResolve(static function () use ($cancellationToken, $cancellationSubscription): void {
$cancellationToken->unsubscribe($cancellationSubscription);
});
return $newPromise;
}
private function determineKeepAliveTimeout(Response $response): int
{
$request = $response->getRequest();
$requestConnHeader = $request->getHeader('connection');
$responseConnHeader = $response->getHeader('connection');
if (!\strcasecmp($requestConnHeader, 'close')) {
return 0;
}
if ($response->getProtocolVersion() === '1.0') {
return 0;
}
if (!\strcasecmp($responseConnHeader, 'close')) {
return 0;
}
$params = Http\createFieldValueComponentMap(Http\parseFieldValueComponents($response, 'keep-alive'));
$timeout = (int) ($params['timeout'] ?? $this->priorTimeout);
if (isset($params['timeout'])) {
$this->explicitTimeout = true;
}
return $this->priorTimeout = \min(\max(0, $timeout), self::MAX_KEEP_ALIVE_TIMEOUT);
}
private function determineProtocolVersion(Request $request): string
{
$protocolVersions = $request->getProtocolVersions();
if (\in_array("1.1", $protocolVersions, true)) {
return "1.1";
}
if (\in_array("1.0", $protocolVersions, true)) {
return "1.0";
}
throw new InvalidRequestException(
$request,
"None of the requested protocol versions is supported: " . \implode(", ", $protocolVersions)
);
}
private function writeRequest(
Request $request,
string $protocolVersion,
CancellationToken $cancellation
): \Generator {
try {
$rawHeaders = $this->generateRawHeader($request, $protocolVersion);
yield $this->socket->write($rawHeaders);
if ($request->getMethod() === 'CONNECT') {
return;
}
$body = $request->getBody()->createBodyStream();
$chunking = $request->getHeader("transfer-encoding") === "chunked";
$remainingBytes = $request->getHeader("content-length");
if ($chunking && $protocolVersion === "1.0") {
throw new InvalidRequestException($request, "Can't send chunked bodies over HTTP/1.0");
}
// We always buffer the last chunk to make sure we don't write $contentLength bytes if the body is too long.
$buffer = "";
while (null !== $chunk = yield $body->read()) {
$cancellation->throwIfRequested();
if ($chunk === "") {
continue;
}
if ($chunking) {
$chunk = \dechex(\strlen($chunk)) . "\r\n" . $chunk . "\r\n";
} elseif ($remainingBytes !== null) {
$remainingBytes -= \strlen($chunk);
if ($remainingBytes < 0) {
throw new InvalidRequestException(
$request,
"Body contained more bytes than specified in Content-Length, aborting request"
);
}
}
yield $this->socket->write($buffer);
$buffer = $chunk;
}
$cancellation->throwIfRequested();
// Flush last buffered chunk.
yield $this->socket->write($buffer);
if ($chunking) {
yield $this->socket->write("0\r\n\r\n");
} elseif ($remainingBytes !== null && $remainingBytes > 0) {
throw new InvalidRequestException(
$request,
"Body contained fewer bytes than specified in Content-Length, aborting request"
);
}
} catch (StreamException $exception) {
throw new SocketException('Socket disconnected prior to response completion');
}
}
/**
* @param Request $request
* @param string $protocolVersion
*
* @return string
*
* @throws HttpException
*/
private function generateRawHeader(Request $request, string $protocolVersion): string
{
$uri = $request->getUri();
$requestUri = $uri->getPath() ?: '/';
if ('' !== $query = $uri->getQuery()) {
$requestUri .= '?' . $query;
}
$method = $request->getMethod();
if ($method === 'CONNECT') {
$defaultPort = $uri->getScheme() === 'https' ? 443 : 80;
$requestUri = $uri->getHost() . ':' . ($uri->getPort() ?? $defaultPort);
}
$header = $method . ' ' . $requestUri . ' HTTP/' . $protocolVersion . "\r\n";
try {
$header .= Rfc7230::formatRawHeaders($request->getRawHeaders());
} catch (InvalidHeaderException $e) {
throw new HttpException($e->getMessage());
}
return $header . "\r\n";
}
}
PK tAVkJc
c
" src/Connection/Http2Connection.phpnu W+A socket = $socket;
$this->processor = new Http2ConnectionProcessor($socket);
}
public function getProtocolVersions(): array
{
return self::PROTOCOL_VERSIONS;
}
public function initialize(): Promise
{
return $this->processor->initialize();
}
public function getStream(Request $request): Promise
{
if (!$this->processor->isInitialized()) {
throw new \Error('The promise returned from ' . __CLASS__ . '::initialize() must resolve before using the connection');
}
return call(function () {
if ($this->processor->isClosed() || $this->processor->getRemainingStreams() <= 0) {
return null;
}
$this->processor->reserveStream();
return HttpStream::fromConnection(
$this,
\Closure::fromCallable([$this, 'request']),
\Closure::fromCallable([$this->processor, 'unreserveStream'])
);
});
}
public function onClose(callable $onClose): void
{
$this->processor->onClose($onClose);
}
public function close(): Promise
{
return $this->processor->close();
}
public function getLocalAddress(): SocketAddress
{
return $this->socket->getLocalAddress();
}
public function getRemoteAddress(): SocketAddress
{
return $this->socket->getRemoteAddress();
}
public function getTlsInfo(): ?TlsInfo
{
return $this->socket->getTlsInfo();
}
private function request(Request $request, CancellationToken $token, Stream $applicationStream): Promise
{
$this->requestCount++;
return $this->processor->request($request, $token, $applicationStream);
}
}
PK tAV6Ϛ ( src/Connection/LimitedConnectionPool.phpnu W+A Returns a stream for the given request, or null if no stream is available or if
* the connection is not suited for the given request. The first request for a stream
* on a new connection MUST resolve the promise with a Stream instance.
*/
public function getStream(Request $request): Promise;
/**
* @return string[] Array of supported protocol versions.
*/
public function getProtocolVersions(): array;
public function close(): Promise;
public function onClose(callable $onClose): void;
public function getLocalAddress(): SocketAddress;
public function getRemoteAddress(): SocketAddress;
public function getTlsInfo(): ?TlsInfo;
}
PK tAV{ + src/Connection/Http2ConnectionException.phpnu W+A pool = ConnectionLimitingPool::byAuthority(\PHP_INT_MAX, $connectionFactory);
}
public function __clone()
{
$this->pool = clone $this->pool;
}
public function getTotalConnectionAttempts(): int
{
return $this->pool->getTotalConnectionAttempts();
}
public function getTotalStreamRequests(): int
{
return $this->pool->getTotalStreamRequests();
}
public function getOpenConnectionCount(): int
{
return $this->pool->getOpenConnectionCount();
}
public function getStream(Request $request, CancellationToken $cancellation): Promise
{
return $this->pool->getStream($request, $cancellation);
}
}
PK tAVTѴN N ' src/Connection/Http2StreamException.phpnu W+A streamId = $streamId;
}
public function getStreamId(): int
{
return $this->streamId;
}
}
PK tAViՖ $ src/Connection/ConnectionFactory.phpnu W+A getBody()->getHeaders();
foreach ($headers as $name => $header) {
if (!$request->hasHeader($name)) {
$request->setHeaders([$name => $header]);
}
}
yield from self::normalizeRequestBodyHeaders($request);
// Always normalize this as last item, because we need to strip sensitive headers
self::normalizeTraceRequest($request);
return $request;
});
}
private static function normalizeRequestBodyHeaders(Request $request): \Generator
{
if (!$request->hasHeader('host')) {
// Though servers are supposed to be able to handle standard port names on the end of the
// Host header some fail to do this correctly. Thankfully PSR-7 recommends to strip the port
// if it is the standard port for the given scheme.
$request->setHeader('host', $request->getUri()->withUserInfo('')->getAuthority());
}
if ($request->hasHeader("transfer-encoding")) {
$request->removeHeader("content-length");
return;
}
if ($request->hasHeader("content-length")) {
return;
}
/** @var RequestBody $body */
$body = $request->getBody();
$bodyLength = yield $body->getBodyLength();
if ($bodyLength === 0) {
if (\in_array($request->getMethod(), ['HEAD', 'GET', 'CONNECT'], true)) {
$request->removeHeader('content-length');
} else {
$request->setHeader('content-length', '0');
}
$request->removeHeader('transfer-encoding');
} elseif ($bodyLength > 0) {
$request->setHeader("content-length", $bodyLength);
$request->removeHeader("transfer-encoding");
} else {
$request->setHeader("transfer-encoding", "chunked");
}
}
private static function normalizeTraceRequest(Request $request): void
{
$method = $request->getMethod();
if ($method !== 'TRACE') {
return;
}
// https://tools.ietf.org/html/rfc7231#section-4.3.8
$request->setBody(null);
// Remove all body and sensitive headers
$request->setHeaders([
"transfer-encoding" => [],
"content-length" => [],
"authorization" => [],
"proxy-authorization" => [],
"cookie" => [],
]);
}
}
PK tAV
0Bm m ' src/Connection/Internal/Http2Stream.phpnu W+A id = $id;
$this->request = $request;
$this->stream = $stream;
$this->cancellationToken = $cancellationToken;
$this->serverWindow = $serverSize;
$this->clientWindow = $clientSize;
$this->pendingResponse = new Deferred;
$this->requestBodyCompletion = new Deferred;
}
}
PK tAV)L4 L4 ' src/Connection/Internal/Http1Parser.phpnu W+A \d+\.\d+)[\x20\x09]+
(?P[1-9]\d\d)[\x20\x09]*
(?P[^\x01-\x08\x10-\x19]*)
$#ix";
public const AWAITING_HEADERS = 0;
public const BODY_IDENTITY = 1;
public const BODY_IDENTITY_EOF = 2;
public const BODY_CHUNKS = 3;
public const TRAILERS_START = 4;
public const TRAILERS = 5;
/** @var int */
private $state = self::AWAITING_HEADERS;
/** @var string */
private $buffer = '';
/** @var string|null */
private $protocol;
/** @var int|null */
private $statusCode;
/** @var string|null */
private $statusReason;
/** @var string[][] */
private $headers = [];
/** @var int|null */
private $remainingBodyBytes;
/** @var int */
private $bodyBytesConsumed = 0;
/** @var bool */
private $chunkedEncoding = false;
/** @var int|null */
private $chunkLengthRemaining;
/** @var bool */
private $complete = false;
/** @var string */
private $request;
/** @var int */
private $maxHeaderBytes;
/** @var int */
private $maxBodyBytes;
/** @var callable */
private $bodyDataCallback;
/** @var callable */
private $trailersCallback;
public function __construct(Request $request, callable $bodyDataCallback, callable $trailersCallback)
{
$this->request = $request;
$this->bodyDataCallback = $bodyDataCallback;
$this->trailersCallback = $trailersCallback;
$this->maxHeaderBytes = $request->getHeaderSizeLimit();
$this->maxBodyBytes = $request->getBodySizeLimit();
}
public function getBuffer(): string
{
return $this->buffer;
}
public function getState(): int
{
return $this->state;
}
public function buffer(string $data): void
{
$this->buffer .= $data;
}
/**
* @param string|null $data
*
* @return Response|null
*
* @throws ParseException
*/
public function parse(string $data = null): ?Response
{
if ($data !== null) {
$this->buffer .= $data;
}
if ($this->buffer === '') {
return null;
}
if ($this->complete) {
throw new ParseException('Can\'t continue parsing, response is already complete', Status::BAD_REQUEST);
}
switch ($this->state) {
case self::AWAITING_HEADERS:
goto headers;
case self::BODY_IDENTITY:
goto body_identity;
case self::BODY_IDENTITY_EOF:
goto body_identity_eof;
case self::BODY_CHUNKS:
goto body_chunks;
case self::TRAILERS_START:
goto trailers_start;
case self::TRAILERS:
goto trailers;
}
headers:
{
$startLineAndHeaders = $this->shiftHeadersFromBuffer();
if ($startLineAndHeaders === null) {
return null;
}
$startLineEndPos = \strpos($startLineAndHeaders, "\r\n");
$startLine = \substr($startLineAndHeaders, 0, $startLineEndPos);
$rawHeaders = \substr($startLineAndHeaders, $startLineEndPos + 2);
if (\preg_match(self::STATUS_LINE_PATTERN, $startLine, $match)) {
$this->protocol = $match['protocol'];
$this->statusCode = (int) $match['status'];
$this->statusReason = \trim($match['reason']);
} else {
throw new ParseException('Invalid status line: ' . $startLine, Status::BAD_REQUEST);
}
if ($rawHeaders !== '') {
$this->headers = $this->parseRawHeaders($rawHeaders);
} else {
$this->headers = [];
}
$requestMethod = $this->request->getMethod();
$skipBody = $this->statusCode < Status::OK || $this->statusCode === Status::NOT_MODIFIED || $this->statusCode === Status::NO_CONTENT
|| $requestMethod === 'HEAD' || $requestMethod === 'CONNECT';
if ($skipBody) {
$this->complete = true;
} elseif ($this->chunkedEncoding) {
$this->state = self::BODY_CHUNKS;
} elseif ($this->remainingBodyBytes === null) {
$this->state = self::BODY_IDENTITY_EOF;
} elseif ($this->remainingBodyBytes > 0) {
$this->state = self::BODY_IDENTITY;
} else {
$this->complete = true;
}
$response = new Response($this->protocol, $this->statusCode, $this->statusReason, [], new InMemoryStream, $this->request);
foreach ($this->headers as [$key, $value]) {
$response->addHeader($key, $value);
}
return $response;
}
body_identity:
{
$bufferDataSize = \strlen($this->buffer);
if ($bufferDataSize <= $this->remainingBodyBytes) {
$chunk = $this->buffer;
$this->buffer = '';
$this->remainingBodyBytes -= $bufferDataSize;
$this->addToBody($chunk);
if ($this->remainingBodyBytes === 0) {
$this->complete = true;
}
return null;
}
$bodyData = \substr($this->buffer, 0, $this->remainingBodyBytes);
$this->addToBody($bodyData);
$this->buffer = \substr($this->buffer, $this->remainingBodyBytes);
$this->remainingBodyBytes = 0;
goto complete;
}
body_identity_eof:
{
$this->addToBody($this->buffer);
$this->buffer = '';
return null;
}
body_chunks:
{
if ($this->parseChunkedBody()) {
$this->state = self::TRAILERS_START;
goto trailers_start;
}
return null;
}
trailers_start:
{
$firstTwoBytes = \substr($this->buffer, 0, 2);
if ($firstTwoBytes === "" || $firstTwoBytes === "\r") {
return null;
}
if ($firstTwoBytes === "\r\n") {
$this->buffer = \substr($this->buffer, 2);
goto complete;
}
$this->state = self::TRAILERS;
goto trailers;
}
trailers:
{
$trailers = $this->shiftHeadersFromBuffer();
if ($trailers === null) {
return null;
}
$this->parseTrailers($trailers);
goto complete;
}
complete:
{
$this->complete = true;
return null;
}
}
public function isComplete(): bool
{
return $this->complete;
}
/**
* @return string|null
*
* @throws ParseException
*/
private function shiftHeadersFromBuffer(): ?string
{
$this->buffer = \ltrim($this->buffer, "\r\n");
if ($headersSize = \strpos($this->buffer, "\r\n\r\n")) {
$headers = \substr($this->buffer, 0, $headersSize + 2);
$this->buffer = \substr($this->buffer, $headersSize + 4);
} else {
$headersSize = \strlen($this->buffer);
$headers = null;
}
if ($this->maxHeaderBytes > 0 && $headersSize > $this->maxHeaderBytes) {
throw new ParseException("Configured header size exceeded: {$headersSize} bytes received, while the configured limit is {$this->maxHeaderBytes} bytes", Status::REQUEST_HEADER_FIELDS_TOO_LARGE);
}
return $headers;
}
/**
* @param string $rawHeaders
*
* @return array
*
* @throws ParseException
*/
private function parseRawHeaders(string $rawHeaders): array
{
// Legacy support for folded headers
if (\strpos($rawHeaders, "\r\n\x20") || \strpos($rawHeaders, "\r\n\t")) {
$rawHeaders = \preg_replace("/\r\n[\x20\t]++/", ' ', $rawHeaders);
}
try {
$headers = Rfc7230::parseRawHeaders($rawHeaders);
$headerMap = [];
foreach ($headers as [$key, $value]) {
$headerMap[\strtolower($key)][] = $value;
}
} catch (InvalidHeaderException $e) {
throw new ParseException('Invalid headers', Status::BAD_REQUEST, $e);
}
if (isset($headerMap['transfer-encoding'])) {
$transferEncodings = \explode(',', \strtolower(\implode(',', $headerMap['transfer-encoding'])));
$transferEncodings = \array_map('trim', $transferEncodings);
$this->chunkedEncoding = \in_array('chunked', $transferEncodings, true);
} elseif (isset($headerMap['content-length'])) {
if (\count($headerMap['content-length']) > 1) {
throw new ParseException('Can\'t determine body length, because multiple content-length headers present in the response', Status::BAD_REQUEST);
}
$contentLength = $headerMap['content-length'][0];
if (!\preg_match('/^(0|[1-9][0-9]*)$/', $contentLength)) {
throw new ParseException('Can\'t determine body length, because the content-length header value is invalid', Status::BAD_REQUEST);
}
$this->remainingBodyBytes = (int) $contentLength;
}
return $headers;
}
/**
* Decodes a chunked response body.
*
* @return bool {@code true} if the body is complete, otherwise {@code false}.
*
* @throws ParseException
*/
private function parseChunkedBody(): bool
{
if ($this->chunkLengthRemaining !== null) {
goto decode_chunk;
}
determine_chunk_size:
{
if (false === ($lineEndPos = \strpos($this->buffer, "\r\n"))) {
return false;
}
if ($lineEndPos === 0) {
throw new ParseException('Invalid line; hexadecimal chunk size expected', Status::BAD_REQUEST);
}
$line = \substr($this->buffer, 0, $lineEndPos);
$hex = \strtolower(\trim(\ltrim($line, '0'))) ?: '0';
$dec = \hexdec($hex);
if ($hex !== \dechex($dec)) {
throw new ParseException('Invalid hexadecimal chunk size', Status::BAD_REQUEST);
}
$this->chunkLengthRemaining = $dec;
$this->buffer = \substr($this->buffer, $lineEndPos + 2);
if ($this->chunkLengthRemaining === 0) {
return true;
}
}
decode_chunk:
{
$bufferLength = \strlen($this->buffer);
// These first two (extreme) edge cases prevent errors where the packet boundary ends after
// the \r and before the \n at the end of a chunk.
if ($bufferLength === $this->chunkLengthRemaining || $bufferLength === $this->chunkLengthRemaining + 1) {
return false;
}
if ($bufferLength >= $this->chunkLengthRemaining + 2) {
$chunk = \substr($this->buffer, 0, $this->chunkLengthRemaining);
$this->buffer = \substr($this->buffer, $this->chunkLengthRemaining + 2);
$this->chunkLengthRemaining = null;
$this->addToBody($chunk);
goto determine_chunk_size;
}
/** @noinspection SuspiciousAssignmentsInspection */
$chunk = $this->buffer;
$this->buffer = '';
$this->chunkLengthRemaining -= $bufferLength;
$this->addToBody($chunk);
return false;
}
}
/**
* @param string $trailers
*
* @throws ParseException
*/
private function parseTrailers(string $trailers): void
{
try {
$trailers = Rfc7230::parseHeaders($trailers);
} catch (InvalidHeaderException $e) {
throw new ParseException('Invalid trailers', Status::BAD_REQUEST, $e);
}
($this->trailersCallback)($trailers);
}
/**
* @param string $data
*
* @throws ParseException
*/
private function addToBody(string $data): void
{
$length = \strlen($data);
if (!$length) {
return;
}
$this->bodyBytesConsumed += $length;
if ($this->maxBodyBytes > 0 && $this->bodyBytesConsumed > $this->maxBodyBytes) {
throw new ParseException("Configured body size exceeded: {$this->bodyBytesConsumed} bytes received, while the configured limit is {$this->maxBodyBytes} bytes", Status::PAYLOAD_TOO_LARGE);
}
if ($this->bodyDataCallback) {
($this->bodyDataCallback)($data);
}
}
}
PK tAVgrP P 4 src/Connection/Internal/Http2ConnectionProcessor.phpnu W+A socket = $socket;
$this->hpack = new HPack;
}
public function isInitialized(): bool
{
return $this->initialized;
}
/**
* Returns a promise that is resolved once the connection has been initialized. A stream cannot be obtained from the
* connection until the promise returned by this method resolves.
*
* @return Promise
*/
public function initialize(): Promise
{
if ($this->initializeStarted) {
throw new \Error('Connection may only be initialized once');
}
$this->initializeStarted = true;
if ($this->socket->isClosed()) {
return new Failure(new UnprocessedRequestException(
new SocketException('The socket closed before the connection could be initialized')
));
}
$this->settings = new Deferred;
$promise = $this->settings->promise();
Promise\rethrow(new Coroutine($this->run()));
return $promise;
}
public function onClose(callable $onClose): void
{
if ($this->onClose === null) {
asyncCall($onClose, $this);
return;
}
$this->onClose[] = $onClose;
}
public function close(): Promise
{
$this->socket->close();
if ($this->onClose !== null) {
$onClose = $this->onClose;
$this->onClose = null;
foreach ($onClose as $callback) {
asyncCall($callback, $this);
}
}
return new Success;
}
public function handlePong(string $data): void
{
$this->writeFrame(Http2Parser::PING, Http2Parser::ACK, 0, $data);
}
public function handlePing(string $data): void
{
if ($this->pongDeferred !== null) {
if ($this->pongWatcher !== null) {
Loop::cancel($this->pongWatcher);
$this->pongWatcher = null;
}
$deferred = $this->pongDeferred;
$this->pongDeferred = null;
$deferred->resolve(true);
}
}
public function handleShutdown(int $lastId, int $error): void
{
$message = \sprintf(
"Received GOAWAY frame from %s with error code %d",
$this->socket->getRemoteAddress(),
$error
);
$this->shutdown($lastId, new ClientHttp2ConnectionException($message, $error));
}
public function handleStreamWindowIncrement(int $streamId, int $windowSize): void
{
if (!isset($this->streams[$streamId])) {
return;
}
$stream = $this->streams[$streamId];
if ($stream->clientWindow + $windowSize > 2147483647) {
$this->handleStreamException(new Http2StreamException(
"Current window size plus new window exceeds maximum size",
$streamId,
Http2Parser::FLOW_CONTROL_ERROR
));
return;
}
$stream->clientWindow += $windowSize;
$this->writeBufferedData($stream);
}
public function handleConnectionWindowIncrement(int $windowSize): void
{
if ($this->clientWindow + $windowSize > 2147483647) {
$this->handleConnectionException(new Http2ConnectionException(
"Current window size plus new window exceeds maximum size",
Http2Parser::FLOW_CONTROL_ERROR
));
return;
}
$this->clientWindow += $windowSize;
foreach ($this->streams as $stream) {
if ($this->clientWindow <= 0) {
return;
}
if ($stream->requestBodyBuffer === '' || $stream->clientWindow <= 0) {
continue;
}
$this->writeBufferedData($stream);
}
}
public function handleHeaders(int $streamId, array $pseudo, array $headers, bool $streamEnded): void
{
foreach ($pseudo as $name => $value) {
if (!isset(Http2Parser::KNOWN_RESPONSE_PSEUDO_HEADERS[$name])) {
throw new Http2StreamException(
"Invalid pseudo header",
$streamId,
Http2Parser::PROTOCOL_ERROR
);
}
}
if (!isset($this->streams[$streamId])) {
return;
}
$stream = $this->streams[$streamId];
if ($stream->trailers) {
if ($stream->expectedLength && $stream->received !== $stream->expectedLength) {
$diff = $stream->expectedLength - $stream->received;
$this->handleStreamException(new Http2StreamException(
"Content length mismatch: " . \abs($diff) . ' bytes ' . ($diff > 0 ? ' missing' : 'too much'),
$streamId,
Http2Parser::PROTOCOL_ERROR
));
return;
}
if (!empty($pseudo)) {
$this->handleStreamException(new Http2StreamException(
"Trailers must not contain pseudo headers",
$streamId,
Http2Parser::PROTOCOL_ERROR
));
return;
}
try {
// Constructor checks for any disallowed fields
$parsedTrailers = new Trailers($headers);
} catch (InvalidHeaderException $exception) {
$this->handleStreamException(new Http2StreamException(
"Disallowed field names in trailer",
$streamId,
Http2Parser::PROTOCOL_ERROR,
$exception
));
return;
}
$trailers = $stream->trailers;
$stream->trailers = null;
$trailers->resolve(call(function () use ($stream, $streamId, $parsedTrailers) {
try {
foreach ($stream->request->getEventListeners() as $eventListener) {
yield $eventListener->completeReceivingResponse($stream->request, $stream->stream);
}
return $parsedTrailers;
} catch (\Throwable $e) {
$this->handleStreamException(new Http2StreamException(
"Event listener error",
$streamId,
Http2Parser::CANCEL
));
throw $e;
}
}));
$this->setupPingIfIdle();
return;
}
if (!isset($pseudo[":status"])) {
$this->handleConnectionException(new Http2ConnectionException(
"No status pseudo header in response",
Http2Parser::PROTOCOL_ERROR
));
return;
}
if (!\preg_match("/^[1-5]\d\d$/", $pseudo[":status"])) {
$this->handleStreamException(new Http2StreamException(
"Invalid response status code: " . $pseudo[':status'],
$streamId,
Http2Parser::PROTOCOL_ERROR
));
return;
}
if ($stream->response !== null) {
$this->handleStreamException(new Http2StreamException(
"Stream headers already received",
$streamId,
Http2Parser::PROTOCOL_ERROR
));
return;
}
$status = (int) $pseudo[":status"];
if ($status === Status::SWITCHING_PROTOCOLS) {
$this->handleConnectionException(new Http2ConnectionException(
"Switching Protocols (101) is not part of HTTP/2",
Http2Parser::PROTOCOL_ERROR
));
return;
}
$response = new Response(
'2',
$status,
Status::getReason($status),
$headers,
new InMemoryStream,
$stream->request
);
if ($status < 200) {
$onInformationalResponse = $stream->request->getInformationalResponseHandler();
if ($onInformationalResponse !== null) {
$stream->preResponseResolution = call(function () use (
$onInformationalResponse,
$response,
$stream,
$streamId
) {
yield $stream->preResponseResolution;
try {
yield call($onInformationalResponse, $response);
} catch (\Throwable $e) {
$this->handleStreamException(new Http2StreamException(
'Informational response handler threw an exception',
$streamId,
Http2Parser::CANCEL
));
}
});
}
return;
}
\assert($stream->preResponseResolution === null);
$stream->preResponseResolution = call(function () use ($stream, $streamId) {
try {
foreach ($stream->request->getEventListeners() as $eventListener) {
yield $eventListener->startReceivingResponse($stream->request, $stream->stream);
}
} catch (\Throwable $e) {
$this->handleStreamException(new Http2StreamException(
"Event listener error",
$streamId,
Http2Parser::CANCEL
));
}
});
$stream->body = new Emitter;
$stream->trailers = new Deferred;
$bodyCancellation = new CancellationTokenSource;
$cancellationToken = new CombinedCancellationToken(
$stream->cancellationToken,
$bodyCancellation->getToken()
);
$response->setBody(
new ResponseBodyStream(
new IteratorStream($stream->body->iterate()),
$bodyCancellation
)
);
$response->setTrailers($stream->trailers->promise());
$stream->responsePending = false;
$stream->pendingResponse->resolve(call(static function () use ($response, $stream) {
yield $stream->requestBodyCompletion->promise();
yield $stream->preResponseResolution;
$stream->preResponseResolution = null;
$stream->pendingResponse = null;
return $response;
}));
$this->increaseConnectionWindow();
$this->increaseStreamWindow($stream);
if (isset($headers["content-length"])) {
if (\count($headers['content-length']) !== 1) {
$this->handleStreamException(new Http2StreamException(
"Multiple content-length header values",
$streamId,
Http2Parser::PROTOCOL_ERROR
));
return;
}
$contentLength = $headers["content-length"][0];
if (!\preg_match('/^(0|[1-9][0-9]*)$/', $contentLength)) {
$this->handleStreamException(new Http2StreamException(
"Invalid content-length header value",
$streamId,
Http2Parser::PROTOCOL_ERROR
));
return;
}
$stream->expectedLength = (int) $contentLength;
}
$cancellationToken->subscribe(function (CancelledException $exception) use ($streamId): void {
if (!isset($this->streams[$streamId])) {
return;
}
$this->writeFrame(
Http2Parser::RST_STREAM,
Http2Parser::NO_FLAG,
$streamId,
\pack("N", Http2Parser::CANCEL)
);
$this->releaseStream($streamId, $exception);
});
unset($bodyCancellation, $cancellationToken); // Remove reference to cancellation token.
}
public function handlePushPromise(int $parentId, int $streamId, array $pseudo, array $headers): void
{
foreach ($pseudo as $name => $value) {
if (!isset(Http2Parser::KNOWN_REQUEST_PSEUDO_HEADERS[$name])) {
throw new Http2StreamException(
"Invalid pseudo header",
$streamId,
Http2Parser::PROTOCOL_ERROR
);
}
}
if (!isset($pseudo[":method"], $pseudo[":path"], $pseudo[":scheme"], $pseudo[":authority"])
|| isset($headers["connection"])
|| $pseudo[":path"] === ''
|| (isset($headers["te"]) && \implode($headers["te"]) !== "trailers")
) {
$this->handleStreamException(new Http2StreamException(
"Invalid header values",
$streamId,
Http2Parser::PROTOCOL_ERROR
));
return;
}
$method = $pseudo[":method"];
$target = $pseudo[":path"];
$scheme = $pseudo[":scheme"];
$host = $pseudo[":authority"];
$query = null;
if ($method !== 'GET' && $method !== 'HEAD') {
$this->handleStreamException(new Http2StreamException(
"Pushed request method must be a safe method",
$streamId,
Http2Parser::PROTOCOL_ERROR
));
return;
}
if (!\preg_match("#^([A-Z\d.\-]+|\[[\d:]+])(?::([1-9]\d*))?$#i", $host, $matches)) {
$this->handleStreamException(new Http2StreamException(
"Invalid pushed authority (host) name",
$streamId,
Http2Parser::PROTOCOL_ERROR
));
return;
}
$host = $matches[1];
$port = isset($matches[2]) ? (int) $matches[2] : $this->socket->getRemoteAddress()->getPort();
if (!isset($this->streams[$parentId])) {
$this->handleStreamException(new Http2StreamException(
"Parent stream {$parentId} is no longer open",
$streamId,
Http2Parser::PROTOCOL_ERROR
));
return;
}
/** @var Http2Stream $parentStream */
$parentStream = $this->streams[$parentId];
if (\strcasecmp($host, $parentStream->request->getUri()->getHost()) !== 0) {
$this->handleStreamException(new Http2StreamException(
"Authority does not match original request authority",
$streamId,
Http2Parser::PROTOCOL_ERROR
));
return;
}
if ($position = \strpos($target, "#")) {
$target = \substr($target, 0, $position);
}
if ($position = \strpos($target, "?")) {
$query = \substr($target, $position + 1);
$target = \substr($target, 0, $position);
}
try {
$uri = Uri\Http::createFromComponents([
"scheme" => $scheme,
"host" => $host,
"port" => $port,
"path" => $target,
"query" => $query,
]);
} catch (\Exception $exception) {
$this->handleConnectionException(new Http2ConnectionException(
"Invalid push URI",
Http2Parser::PROTOCOL_ERROR
));
return;
}
$request = new Request($uri, $method);
$request->setHeaders($headers);
$request->setProtocolVersions(['2']);
$request->setPushHandler($parentStream->request->getPushHandler());
$request->setHeaderSizeLimit($parentStream->request->getHeaderSizeLimit());
$request->setBodySizeLimit($parentStream->request->getBodySizeLimit());
$stream = new Http2Stream(
$streamId,
$request,
HttpStream::fromStream(
$parentStream->stream,
static function () {
throw new \Error('Calling Stream::request() on a pushed request is forbidden');
},
static function () {
// nothing to do
}
),
$parentStream->cancellationToken,
self::DEFAULT_WINDOW_SIZE,
0
);
$stream->dependency = $parentId;
$this->streams[$streamId] = $stream;
$stream->requestBodyComplete = true;
$stream->requestBodyCompletion->resolve();
if ($parentStream->request->getPushHandler() === null) {
$this->handleStreamException(new Http2StreamException(
"Push promise refused",
$streamId,
Http2Parser::CANCEL
));
return;
}
asyncCall(function () use ($streamId, $stream): \Generator {
$tokenSource = new CancellationTokenSource;
$cancellationToken = new CombinedCancellationToken(
$stream->cancellationToken,
$tokenSource->getToken()
);
$cancellationId = $cancellationToken->subscribe(function (
CancelledException $exception
) use ($streamId): void {
if (!isset($this->streams[$streamId])) {
return;
}
$this->writeFrame(
Http2Parser::RST_STREAM,
Http2Parser::NO_FLAG,
$streamId,
\pack("N", Http2Parser::CANCEL)
);
$this->releaseStream($streamId, $exception);
});
$onPush = $stream->request->getPushHandler();
try {
yield call($onPush, $stream->request, $stream->pendingResponse->promise());
} catch (HttpException | StreamException | CancelledException $exception) {
$tokenSource->cancel($exception);
} catch (\Throwable $exception) {
$tokenSource->cancel($exception);
throw $exception;
} finally {
$cancellationToken->unsubscribe($cancellationId);
}
});
}
public function handlePriority(int $streamId, int $parentId, int $weight): void
{
if (!isset($this->streams[$streamId])) {
return;
}
$stream = $this->streams[$streamId];
$stream->dependency = $parentId;
$stream->weight = $weight;
}
public function handleStreamReset(int $streamId, int $errorCode): void
{
if (!isset($this->streams[$streamId])) {
return;
}
$this->handleStreamException(new Http2StreamException("Stream closed by server", $streamId, $errorCode));
}
public function handleStreamException(Http2StreamException $exception): void
{
$id = $exception->getStreamId();
$code = $exception->getCode();
$exception = new ClientHttp2StreamException($exception->getMessage(), $id, $code, $exception);
if ($code === Http2Parser::REFUSED_STREAM) {
$exception = new UnprocessedRequestException($exception);
}
$this->writeFrame(Http2Parser::RST_STREAM, Http2Parser::NO_FLAG, $id, \pack("N", $code));
if (isset($this->streams[$id])) {
$this->releaseStream($id, $exception);
}
}
public function handleConnectionException(Http2ConnectionException $exception): void
{
$this->shutdown(
null,
new ClientHttp2ConnectionException($exception->getMessage(), $exception->getCode(), $exception)
);
}
public function handleData(int $streamId, string $data): void
{
if (!isset($this->streams[$streamId])) {
return;
}
$stream = $this->streams[$streamId];
if (!$stream->body) {
$this->handleStreamException(new Http2StreamException(
"Stream headers not complete or body already complete",
$streamId,
Http2Parser::PROTOCOL_ERROR
));
return;
}
$length = \strlen($data);
$this->serverWindow -= $length;
$stream->serverWindow -= $length;
$stream->received += $length;
if ($stream->received >= $stream->request->getBodySizeLimit()) {
$this->handleStreamException(new Http2StreamException(
"Body size limit exceeded",
$streamId,
Http2Parser::CANCEL
));
return;
}
if ($stream->expectedLength !== null && $stream->received > $stream->expectedLength) {
$this->handleStreamException(new Http2StreamException(
"Body size exceeded content-length in header",
$streamId,
Http2Parser::CANCEL
));
return;
}
$this->increaseConnectionWindow();
$promise = $stream->body->emit($data);
$promise->onResolve(function (?\Throwable $exception) use ($streamId): void {
if ($exception || !isset($this->streams[$streamId])) {
return;
}
$this->increaseStreamWindow($this->streams[$streamId]);
});
}
public function handleSettings(array $settings): void
{
foreach ($settings as $setting => $value) {
$this->applySetting($setting, $value);
}
$this->writeFrame(Http2Parser::SETTINGS, Http2Parser::ACK);
if ($this->settings) {
$deferred = $this->settings;
$this->settings = null;
$this->initialized = true;
$deferred->resolve($this->remainingStreams);
}
}
public function handleStreamEnd(int $streamId): void
{
if (!isset($this->streams[$streamId])) {
return;
}
$stream = $this->streams[$streamId];
if ($stream->expectedLength !== null && $stream->received !== $stream->expectedLength) {
$this->handleStreamException(new Http2StreamException(
"Body length does not match content-length header",
$streamId,
Http2Parser::PROTOCOL_ERROR
));
return;
}
$body = $stream->body;
$stream->body = null;
$body->complete();
$trailers = $stream->trailers;
$stream->trailers = null;
$trailers->resolve(call(function () use ($stream, $streamId) {
try {
foreach ($stream->request->getEventListeners() as $eventListener) {
yield $eventListener->completeReceivingResponse($stream->request, $stream->stream);
}
return new Trailers([]);
} catch (\Throwable $e) {
$this->handleStreamException(new Http2StreamException(
"Event listener error",
$streamId,
Http2Parser::CANCEL
));
throw $e;
}
}));
$this->setupPingIfIdle();
$this->releaseStream($streamId);
}
public function reserveStream(): void
{
--$this->remainingStreams;
}
public function unreserveStream(): void
{
++$this->remainingStreams;
}
public function getRemainingStreams(): int
{
return $this->remainingStreams;
}
public function request(Request $request, CancellationToken $cancellationToken, Stream $stream): Promise
{
return call(function () use ($request, $cancellationToken, $stream): \Generator {
$this->idlePings = 0;
$this->cancelIdleWatcher();
yield RequestNormalizer::normalizeRequest($request);
// Remove defunct HTTP/1.x headers.
$request->removeHeader('host');
$request->removeHeader('connection');
$request->removeHeader('keep-alive');
$request->removeHeader('transfer-encoding');
$request->removeHeader('upgrade');
$request->setProtocolVersions(['2']);
if ($request->getMethod() === 'CONNECT') {
$exception = new HttpException("CONNECT requests are currently not supported on HTTP/2");
foreach ($request->getEventListeners() as $eventListener) {
yield $eventListener->abort($request, $exception);
}
throw $exception;
}
if ($this->socket->isClosed()) {
$exception = new UnprocessedRequestException(
new SocketException(\sprintf(
"Socket to '%s' closed before the request could be sent",
$this->socket->getRemoteAddress()
))
);
foreach ($request->getEventListeners() as $eventListener) {
yield $eventListener->abort($request, $exception);
}
throw $exception;
}
$streamId = $this->streamId += 2; // Client streams should be odd-numbered, starting at 1.
$this->streams[$streamId] = $http2stream = new Http2Stream(
$streamId,
$request,
$stream,
$cancellationToken,
self::DEFAULT_WINDOW_SIZE,
$this->initialWindowSize
);
if ($request->getTransferTimeout() > 0) {
// Cancellation token combined with timeout token should not be stored in $stream->cancellationToken,
// otherwise the timeout applies to the body transfer and pushes.
$cancellationToken = new CombinedCancellationToken(
$cancellationToken,
new TimeoutCancellationToken($request->getTransferTimeout())
);
}
$this->socket->reference();
$onCancel = function (CancelledException $exception) use ($streamId): void {
if (!isset($this->streams[$streamId])) {
return;
}
$this->writeFrame(
Http2Parser::RST_STREAM,
Http2Parser::NO_FLAG,
$streamId,
\pack("N", Http2Parser::CANCEL)
);
$this->releaseStream($streamId, $exception);
};
$cancellationId = $cancellationToken->subscribe($onCancel);
try {
$headers = $this->generateHeaders($request);
$headers = $this->hpack->encode($headers);
$body = $request->getBody()->createBodyStream();
foreach ($request->getEventListeners() as $eventListener) {
yield $eventListener->startSendingRequest($request, $stream);
}
$chunk = yield $body->read();
if (!isset($this->streams[$streamId])) {
foreach ($request->getEventListeners() as $eventListener) {
yield $eventListener->completeSendingRequest($request, $stream);
}
return yield $http2stream->pendingResponse->promise();
}
$flag = Http2Parser::END_HEADERS | ($chunk === null ? Http2Parser::END_STREAM : Http2Parser::NO_FLAG);
if (\strlen($headers) > $this->frameSizeLimit) {
$split = \str_split($headers, $this->frameSizeLimit);
$firstChunk = \array_shift($split);
$lastChunk = \array_pop($split);
// no yield, because there must not be other frames in between
$this->writeFrame(Http2Parser::HEADERS, Http2Parser::NO_FLAG, $streamId, $firstChunk);
foreach ($split as $headerChunk) {
// no yield, because there must not be other frames in between
$this->writeFrame(Http2Parser::CONTINUATION, Http2Parser::NO_FLAG, $streamId, $headerChunk);
}
yield $this->writeFrame(Http2Parser::CONTINUATION, $flag, $streamId, $lastChunk);
} else {
yield $this->writeFrame(Http2Parser::HEADERS, $flag, $streamId, $headers);
}
if ($chunk === null) {
foreach ($request->getEventListeners() as $eventListener) {
yield $eventListener->completeSendingRequest($request, $stream);
}
$http2stream->requestBodyComplete = true;
$http2stream->requestBodyCompletion->resolve();
return yield $http2stream->pendingResponse->promise();
}
$buffer = $chunk;
while (null !== $chunk = yield $body->read()) {
if (!isset($this->streams[$streamId])) {
foreach ($request->getEventListeners() as $eventListener) {
yield $eventListener->completeSendingRequest($request, $stream);
}
return yield $http2stream->pendingResponse->promise();
}
yield $this->writeData($http2stream, $buffer);
$buffer = $chunk;
}
if (!isset($this->streams[$streamId])) {
foreach ($request->getEventListeners() as $eventListener) {
yield $eventListener->completeSendingRequest($request, $stream);
}
return yield $http2stream->pendingResponse->promise();
}
$responsePromise = $http2stream->pendingResponse->promise();
$http2stream->requestBodyComplete = true;
$http2stream->requestBodyCompletion->resolve();
yield $this->writeData($http2stream, $buffer);
foreach ($request->getEventListeners() as $eventListener) {
yield $eventListener->completeSendingRequest($request, $stream);
}
return yield $responsePromise;
} catch (\Throwable $exception) {
if (isset($this->streams[$streamId])) {
if (!$http2stream->requestBodyComplete) {
$http2stream->requestBodyCompletion->fail($exception);
}
$this->releaseStream($streamId, $exception);
}
if ($exception instanceof StreamException) {
$exception = new SocketException('Failed to write request to socket: ' . $exception->getMessage());
}
throw $exception;
} finally {
$cancellationToken->unsubscribe($cancellationId);
}
});
}
public function isClosed(): bool
{
return $this->onClose === null;
}
private function run(): \Generator
{
try {
yield $this->socket->write(Http2Parser::PREFACE);
yield $this->writeFrame(
Http2Parser::SETTINGS,
0,
0,
\pack(
"nNnNnNnN",
Http2Parser::ENABLE_PUSH,
1,
Http2Parser::MAX_CONCURRENT_STREAMS,
256,
Http2Parser::INITIAL_WINDOW_SIZE,
self::DEFAULT_WINDOW_SIZE,
Http2Parser::MAX_FRAME_SIZE,
self::DEFAULT_MAX_FRAME_SIZE
)
);
$parser = (new Http2Parser($this))->parse();
while (null !== $chunk = yield $this->socket->read()) {
$return = $parser->send($chunk);
\assert($return === null);
}
$this->shutdown();
} catch (\Throwable $exception) {
$this->shutdown(null, new ClientHttp2ConnectionException(
"The HTTP/2 connection closed unexpectedly",
Http2Parser::INTERNAL_ERROR,
$exception
));
}
}
private function writeFrame(
int $type,
int $flags = Http2Parser::NO_FLAG,
int $stream = 0,
string $data = ''
): Promise {
\assert(Http2Parser::logDebugFrame('send', $type, $flags, $stream, \strlen($data)));
/** @noinspection PhpUnhandledExceptionInspection */
return $this->socket->write(\substr(\pack("NccN", \strlen($data), $type, $flags, $stream), 1) . $data);
}
private function applySetting(int $setting, int $value): void
{
switch ($setting) {
case Http2Parser::INITIAL_WINDOW_SIZE:
if ($value > 2147483647) { // (1 << 31) - 1
$this->handleConnectionException(new Http2ConnectionException(
"Invalid window size: {$value}",
Http2Parser::FLOW_CONTROL_ERROR
));
return;
}
$priorWindowSize = $this->initialWindowSize;
$this->initialWindowSize = $value;
$difference = $this->initialWindowSize - $priorWindowSize;
foreach ($this->streams as $stream) {
$stream->clientWindow += $difference;
}
// Settings ACK should be sent before HEADER or DATA frames.
if ($difference > 0) {
Loop::defer(function () {
foreach ($this->streams as $stream) {
if ($this->clientWindow <= 0) {
return;
}
if ($stream->requestBodyBuffer === '' || $stream->clientWindow <= 0) {
continue;
}
$this->writeBufferedData($stream);
}
});
}
return;
case Http2Parser::MAX_FRAME_SIZE:
if ($value < 1 << 14 || $value >= 1 << 24) {
$this->handleConnectionException(new Http2ConnectionException(
"Invalid maximum frame size: {$value}",
Http2Parser::PROTOCOL_ERROR
));
return;
}
$this->frameSizeLimit = $value;
return;
case Http2Parser::MAX_CONCURRENT_STREAMS:
if ($value > 2147483647) { // (1 << 31) - 1
$this->handleConnectionException(new Http2ConnectionException(
"Invalid concurrent streams value: {$value}",
Http2Parser::PROTOCOL_ERROR
));
return;
}
$priorUsedStreams = $this->concurrentStreamLimit - $this->remainingStreams;
$this->concurrentStreamLimit = $value;
$this->remainingStreams = $this->concurrentStreamLimit - $priorUsedStreams;
return;
case Http2Parser::HEADER_TABLE_SIZE: // TODO Respect this setting from the server
case Http2Parser::MAX_HEADER_LIST_SIZE: // TODO Respect this setting from the server
case Http2Parser::ENABLE_PUSH: // No action needed.
default: // Unknown setting, ignore (6.5.2).
return;
}
}
private function writeBufferedData(Http2Stream $stream): Promise
{
if ($stream->requestBodyComplete && $stream->requestBodyBuffer === '') {
return new Success;
}
$windowSize = \min($this->clientWindow, $stream->clientWindow);
$length = \strlen($stream->requestBodyBuffer);
if ($length <= $windowSize) {
if ($stream->windowSizeIncrease) {
$deferred = $stream->windowSizeIncrease;
$stream->windowSizeIncrease = null;
$deferred->resolve();
}
$this->clientWindow -= $length;
$stream->clientWindow -= $length;
if ($length > $this->frameSizeLimit) {
$chunks = \str_split($stream->requestBodyBuffer, $this->frameSizeLimit);
$stream->requestBodyBuffer = \array_pop($chunks);
foreach ($chunks as $chunk) {
$this->writeFrame(Http2Parser::DATA, Http2Parser::NO_FLAG, $stream->id, $chunk);
}
}
if ($stream->requestBodyComplete) {
$promise = $this->writeFrame(
Http2Parser::DATA,
Http2Parser::END_STREAM,
$stream->id,
$stream->requestBodyBuffer
);
} else {
$promise = $this->writeFrame(
Http2Parser::DATA,
Http2Parser::NO_FLAG,
$stream->id,
$stream->requestBodyBuffer
);
}
$stream->requestBodyBuffer = "";
return $promise;
}
if ($windowSize > 0) {
// Read next body chunk if less than 8192 bytes will remain in the buffer
if ($length - 8192 < $windowSize && $stream->windowSizeIncrease) {
$deferred = $stream->windowSizeIncrease;
$stream->windowSizeIncrease = null;
$deferred->resolve();
}
$data = $stream->requestBodyBuffer;
$end = $windowSize - $this->frameSizeLimit;
$stream->clientWindow -= $windowSize;
$this->clientWindow -= $windowSize;
for ($off = 0; $off < $end; $off += $this->frameSizeLimit) {
$this->writeFrame(
Http2Parser::DATA,
Http2Parser::NO_FLAG,
$stream->id,
\substr($data, $off, $this->frameSizeLimit)
);
}
$promise = $this->writeFrame(
Http2Parser::DATA,
Http2Parser::NO_FLAG,
$stream->id,
\substr($data, $off, $windowSize - $off)
);
$stream->requestBodyBuffer = \substr($data, $windowSize);
return $promise;
}
if ($stream->windowSizeIncrease === null) {
$stream->windowSizeIncrease = new Deferred;
}
return $stream->windowSizeIncrease->promise();
}
private function releaseStream(int $streamId, ?\Throwable $exception = null): void
{
\assert(isset($this->streams[$streamId]));
$stream = $this->streams[$streamId];
if ($stream->responsePending || $stream->body || $stream->trailers) {
$exception = $exception ?? new ClientHttp2StreamException(
\sprintf("Stream %d closed unexpectedly", $streamId),
$streamId,
Http2Parser::INTERNAL_ERROR
);
if (!$exception instanceof HttpException && !$exception instanceof CancelledException) {
$exception = new HttpException($exception->getMessage(), 0, $exception);
}
/** @var Deferred[]|Emitter[] $deferredAndEmitter */
$deferredAndEmitter = [];
if ($stream->responsePending) {
$stream->responsePending = false;
$deferredAndEmitter[] = $stream->pendingResponse;
$stream->pendingResponse = null;
}
if ($stream->body) {
$deferredAndEmitter[] = $stream->body;
$stream->body = null;
}
if ($stream->trailers) {
$deferredAndEmitter[] = $stream->trailers;
$stream->trailers = null;
}
$request = $stream->request;
asyncCall(static function () use ($request, $deferredAndEmitter, $exception) {
try {
foreach ($request->getEventListeners() as $eventListener) {
yield $eventListener->abort($request, $exception);
}
} finally {
foreach ($deferredAndEmitter as $deferredOrEmitter) {
$deferredOrEmitter->fail($exception);
}
}
});
}
unset($this->streams[$streamId]);
if ($streamId & 1) { // Client-initiated stream.
$this->remainingStreams++;
}
if (!$this->streams && !$this->socket->isClosed()) {
$this->socket->unreference();
}
}
private function setupPingIfIdle(): void
{
if ($this->idleWatcher !== null) {
return;
}
$this->idleWatcher = Loop::defer(function ($watcher) {
\assert($this->idleWatcher === null || $this->idleWatcher === $watcher);
$this->idleWatcher = null;
if (!empty($this->streams)) {
return;
}
$this->idleWatcher = Loop::delay(300000, function ($watcher) {
\assert($this->idleWatcher === null || $this->idleWatcher === $watcher);
\assert(empty($this->streams));
$this->idleWatcher = null;
// Connection idle for 10 minutes
if ($this->idlePings >= 1) {
$this->shutdown();
return;
}
if (yield $this->ping()) {
$this->setupPingIfIdle();
}
});
Loop::unreference($this->idleWatcher);
});
Loop::unreference($this->idleWatcher);
}
private function cancelIdleWatcher(): void
{
if ($this->idleWatcher !== null) {
Loop::cancel($this->idleWatcher);
$this->idleWatcher = null;
}
}
/**
* @return Promise Fulfilled with true if a pong is received within the timeout, false if none is received.
*/
private function ping(): Promise
{
if ($this->onClose === null) {
return new Success(false);
}
if ($this->pongDeferred !== null) {
return $this->pongDeferred->promise();
}
$this->pongDeferred = new Deferred;
$this->idlePings++;
$this->writeFrame(Http2Parser::PING, 0, 0, $this->counter++);
$this->pongWatcher = Loop::delay(self::PONG_TIMEOUT, [$this, 'close']);
return $this->pongDeferred->promise();
}
/**
* @param int|null $lastId ID of last processed frame. Null to use the last opened frame ID or 0 if no
* streams have been opened.
* @param HttpException|null $reason
*
* @return Promise
*/
private function shutdown(?int $lastId = null, ?HttpException $reason = null): Promise
{
if ($this->onClose === null) {
return new Success;
}
return call(function () use ($lastId, $reason) {
$code = $reason ? $reason->getCode() : Http2Parser::GRACEFUL_SHUTDOWN;
$lastId = $lastId ?? ($this->streamId > 0 ? $this->streamId : 0);
$goawayPromise = $this->writeFrame(
Http2Parser::GOAWAY,
Http2Parser::NO_FLAG,
0,
\pack("NN", $lastId, $code)
);
if ($this->settings !== null) {
$settings = $this->settings;
$this->settings = null;
$settings->fail($reason ?? new UnprocessedRequestException(new SocketException("Connection closed")));
}
if ($this->streams) {
$reason = $reason ?? new SocketException("Connection closed");
foreach ($this->streams as $id => $stream) {
$this->releaseStream($id, $id > $lastId ? new UnprocessedRequestException($reason) : $reason);
}
}
if ($this->pongDeferred !== null) {
$this->pongDeferred->resolve(false);
}
if ($this->pongWatcher !== null) {
Loop::cancel($this->pongWatcher);
}
$this->cancelIdleWatcher();
if ($this->onClose !== null) {
$onClose = $this->onClose;
$this->onClose = null;
foreach ($onClose as $callback) {
asyncCall($callback, $this);
}
}
yield $goawayPromise;
$this->socket->close();
});
}
private function generateHeaders(Request $request): array
{
$uri = $request->getUri();
$path = $uri->getPath();
if ($path === '') {
$path = '/';
}
$query = $uri->getQuery();
if ($query !== '') {
$path .= '?' . $query;
}
$authority = $uri->getHost();
if ($port = $uri->getPort()) {
$authority .= ':' . $port;
}
$headers = [
[":authority", $authority],
[":path", $path],
[":scheme", $uri->getScheme()],
[":method", $request->getMethod()],
];
foreach ($request->getHeaders() as $field => $values) {
foreach ($values as $value) {
$headers[] = [$field, $value];
}
}
return $headers;
}
private function writeData(Http2Stream $stream, string $data): Promise
{
$stream->requestBodyBuffer .= $data;
return $this->writeBufferedData($stream);
}
private function increaseConnectionWindow(): void
{
$increase = 0;
while ($this->serverWindow <= self::MINIMUM_WINDOW) {
$this->serverWindow += self::WINDOW_INCREMENT;
$increase += self::WINDOW_INCREMENT;
}
if ($increase > 0) {
$this->writeFrame(Http2Parser::WINDOW_UPDATE, 0, 0, \pack("N", self::WINDOW_INCREMENT));
}
}
private function increaseStreamWindow(Http2Stream $stream): void
{
$minWindow = \min($stream->request->getBodySizeLimit(), self::MINIMUM_WINDOW);
$increase = 0;
while ($stream->serverWindow <= $minWindow) {
$stream->serverWindow += self::WINDOW_INCREMENT;
$increase += self::WINDOW_INCREMENT;
}
if ($increase > 0) {
$this->writeFrame(
Http2Parser::WINDOW_UPDATE,
Http2Parser::NO_FLAG,
$stream->id,
\pack("N", self::WINDOW_INCREMENT)
);
}
}
}
PK tAV T< < src/Connection/Stream.phpnu W+A getUri();
$scheme = $uri->getScheme();
$isHttps = $scheme === 'https';
$defaultPort = $isHttps ? 443 : 80;
$host = $uri->getHost();
$port = $uri->getPort() ?? $defaultPort;
$authority = $host . ':' . $port;
return $scheme . '://' . $authority;
}
private function __construct(int $connectionLimit, ?ConnectionFactory $connectionFactory = null)
{
if ($connectionLimit < 1) {
throw new \Error('The connection limit must be greater than 0');
}
$this->connectionLimit = $connectionLimit;
$this->connectionFactory = $connectionFactory ?? new DefaultConnectionFactory;
}
public function __clone()
{
$this->connections = [];
$this->totalConnectionAttempts = 0;
$this->totalStreamRequests = 0;
$this->openConnectionCount = 0;
}
public function getTotalConnectionAttempts(): int
{
return $this->totalConnectionAttempts;
}
public function getTotalStreamRequests(): int
{
return $this->totalStreamRequests;
}
public function getOpenConnectionCount(): int
{
return $this->openConnectionCount;
}
public function getStream(Request $request, CancellationToken $cancellation): Promise
{
return call(function () use ($request, $cancellation) {
$this->totalStreamRequests++;
$uri = self::formatUri($request);
/** @var Stream $stream */
[$connection, $stream] = yield from $this->getStreamFor($uri, $request, $cancellation);
$connectionId = \spl_object_id($connection);
$this->activeRequestCounts[$connectionId] = ($this->activeRequestCounts[$connectionId] ?? 0) + 1;
return HttpStream::fromStream(
$stream,
coroutine(function (Request $request, CancellationToken $cancellationToken) use (
$connection,
$stream,
$uri
) {
try {
/** @var Response $response */
$response = yield $stream->request($request, $cancellationToken);
} catch (\Throwable $e) {
$this->onReadyConnection($connection, $uri);
throw $e;
}
// await response being completely received
$response->getTrailers()->onResolve(function () use ($connection, $uri): void {
$this->onReadyConnection($connection, $uri);
});
return $response;
}),
function () use ($connection, $uri): void {
$this->onReadyConnection($connection, $uri);
}
);
});
}
private function getStreamFor(string $uri, Request $request, CancellationToken $cancellation): \Generator
{
$isHttps = $request->getUri()->getScheme() === 'https';
$connections = $this->connections[$uri] ?? new \ArrayObject;
do {
foreach ($connections as $connectionPromise) {
\assert($connectionPromise instanceof Promise);
try {
if ($isHttps && ($this->waitForPriorConnection[$uri] ?? true)) {
// Wait for first successful connection if using a secure connection (maybe we can use HTTP/2).
$connection = yield $connectionPromise;
} else {
$connection = yield Promise\first([$connectionPromise, new Success]);
if ($connection === null) {
continue;
}
}
} catch (\Exception $exception) {
continue; // Ignore cancellations and errors of other requests.
}
\assert($connection instanceof Connection);
$stream = yield $this->getStreamFromConnection($connection, $request);
if ($stream === null) {
if (!$this->isAdditionalConnectionAllowed($uri) && $this->isConnectionIdle($connection)) {
$connection->close();
break;
}
continue; // No stream available for the given request.
}
return [$connection, $stream];
}
$deferred = new Deferred;
$deferredId = \spl_object_id($deferred);
$this->waiting[$uri][$deferredId] = $deferred;
$deferredPromise = $deferred->promise();
$deferredPromise->onResolve(function () use ($uri, $deferredId): void {
$this->removeWaiting($uri, $deferredId);
});
if ($this->isAdditionalConnectionAllowed($uri)) {
break;
}
$connection = yield $deferredPromise;
\assert($connection instanceof Connection);
$stream = yield $this->getStreamFromConnection($connection, $request);
if ($stream === null) {
continue; // Wait for a different connection to become available.
}
return [$connection, $stream];
} while (true);
$this->totalConnectionAttempts++;
$connectionPromise = $this->connectionFactory->create($request, $cancellation);
$connectionId = \spl_object_id($connectionPromise);
$this->connections[$uri] = $this->connections[$uri] ?? new \ArrayObject;
$this->connections[$uri][$connectionId] = $connectionPromise;
$connectionPromise->onResolve(function (?\Throwable $exception, ?Connection $connection) use (
&$deferred,
$uri,
$connectionId,
$isHttps
): void {
if ($exception) {
$this->dropConnection($uri, $connectionId);
if ($deferred !== null) {
$deferred->fail($exception); // Fail Deferred so Promise\first() below fails.
}
return;
}
$this->openConnectionCount++;
if ($isHttps) {
$this->waitForPriorConnection[$uri] = \in_array('2', $connection->getProtocolVersions(), true);
}
$connection->onClose(function () use ($uri, $connectionId): void {
$this->openConnectionCount--;
$this->dropConnection($uri, $connectionId);
});
});
try {
$connection = yield Promise\first([$connectionPromise, $deferredPromise]);
} catch (MultiReasonException $exception) {
[$exception] = $exception->getReasons(); // The first reason is why the connection failed.
throw $exception;
}
$deferred = null; // Null reference so connection promise handler does not double-resolve the Deferred.
$this->removeWaiting($uri, $deferredId); // Deferred no longer needed for this request.
\assert($connection instanceof Connection);
$stream = yield $this->getStreamFromConnection($connection, $request);
if ($stream === null) {
// Reused connection did not have an available stream for the given request.
$connection = yield $connectionPromise; // Wait for new connection request instead.
$stream = yield $this->getStreamFromConnection($connection, $request);
if ($stream === null) {
// Other requests used the new connection first, so we need to go around again.
return yield from $this->getStreamFor($uri, $request, $cancellation);
}
}
return [$connection, $stream];
}
private function getStreamFromConnection(Connection $connection, Request $request): Promise
{
if (!\array_intersect($request->getProtocolVersions(), $connection->getProtocolVersions())) {
return new Success; // Connection does not support any of the requested protocol versions.
}
return $connection->getStream($request);
}
private function isAdditionalConnectionAllowed(string $uri): bool
{
return \count($this->connections[$uri] ?? []) < $this->connectionLimit;
}
private function onReadyConnection(Connection $connection, string $uri): void
{
$connectionId = \spl_object_id($connection);
if (isset($this->activeRequestCounts[$connectionId])) {
$this->activeRequestCounts[$connectionId]--;
}
if (empty($this->waiting[$uri])) {
return;
}
$deferred = \reset($this->waiting[$uri]);
// Deferred is removed from waiting list in onResolve callback attached above.
$deferred->resolve($connection);
}
private function isConnectionIdle(Connection $connection): bool
{
$connectionId = \spl_object_id($connection);
\assert(
!isset($this->activeRequestCounts[$connectionId])
|| $this->activeRequestCounts[$connectionId] >= 0
);
return ($this->activeRequestCounts[$connectionId] ?? 0) === 0;
}
private function removeWaiting(string $uri, int $deferredId): void
{
unset($this->waiting[$uri][$deferredId]);
if (empty($this->waiting[$uri])) {
unset($this->waiting[$uri]);
}
}
private function dropConnection(string $uri, int $connectionId): void
{
unset($this->connections[$uri][$connectionId], $this->activeRequestCounts[$connectionId]);
if (empty($this->connections[$uri])) {
unset($this->connections[$uri], $this->waitForPriorConnection[$uri]);
}
}
}
PK tAVG % src/Connection/StreamLimitingPool.phpnu W+A getUri()->getHost();
});
}
public static function byStaticKey(
ConnectionPool $delegate,
KeyedSemaphore $semaphore,
string $key = ''
): self {
return new self($delegate, $semaphore, static function () use ($key) {
return $key;
});
}
public static function byCustomKey(
ConnectionPool $delegate,
KeyedSemaphore $semaphore,
callable $requestToKeyMapper
): self {
return new self($delegate, $semaphore, $requestToKeyMapper);
}
/** @var ConnectionPool */
private $delegate;
/** @var KeyedSemaphore */
private $semaphore;
/** @var callable */
private $requestToKeyMapper;
private function __construct(ConnectionPool $delegate, KeyedSemaphore $semaphore, callable $requestToKeyMapper)
{
$this->delegate = $delegate;
$this->semaphore = $semaphore;
$this->requestToKeyMapper = $requestToKeyMapper;
}
public function getStream(Request $request, CancellationToken $token): Promise
{
return call(function () use ($request, $token) {
/** @var Lock $lock */
$lock = yield $this->semaphore->acquire(($this->requestToKeyMapper)($request));
/** @var Stream $stream */
$stream = yield $this->delegate->getStream($request, $token);
return HttpStream::fromStream(
$stream,
coroutine(static function (Request $request, CancellationToken $cancellationToken) use (
$stream,
$lock
) {
try {
/** @var Response $response */
$response = yield $stream->request($request, $cancellationToken);
// await response being completely received
$response->getTrailers()->onResolve(static function () use ($lock) {
$lock->release();
});
} catch (\Throwable $e) {
$lock->release();
throw $e;
}
return $response;
}),
static function () use ($lock) {
$lock->release();
}
);
});
}
}
PK tAVڨ ! src/Connection/ConnectionPool.phpnu W+A
*/
public function getStream(Request $request, CancellationToken $token): Promise;
}
PK tAVRq q ! src/Connection/UpgradedSocket.phpnu W+A socket = $socket;
$this->buffer = $buffer !== '' ? $buffer : null;
}
public function read(): Promise
{
if ($this->buffer !== null) {
$buffer = $this->buffer;
$this->buffer = null;
return new Success($buffer);
}
return $this->socket->read();
}
public function close(): void
{
$this->socket->close();
}
public function __destruct()
{
$this->close();
}
public function write(string $data): Promise
{
return $this->socket->write($data);
}
public function end(string $finalData = ""): Promise
{
return $this->socket->end($finalData);
}
public function reference(): void
{
$this->socket->reference();
}
public function unreference(): void
{
$this->socket->unreference();
}
public function isClosed(): bool
{
return $this->socket->isClosed();
}
public function getLocalAddress(): SocketAddress
{
return $this->socket->getLocalAddress();
}
public function getRemoteAddress(): SocketAddress
{
return $this->socket->getRemoteAddress();
}
public function setupTls(?CancellationToken $token = null): Promise
{
return $this->socket->setupTls($token);
}
public function shutdownTls(?CancellationToken $token = null): Promise
{
return $this->socket->shutdownTls();
}
public function getTlsState(): int
{
return $this->socket->getTlsState();
}
public function getTlsInfo(): ?TlsInfo
{
return $this->socket->getTlsInfo();
}
}
PK tAVfG6$ $ $ src/Connection/InterceptedStream.phpnu W+A stream = $stream;
$this->interceptor = $interceptor;
}
public function request(Request $request, CancellationToken $cancellation): Promise
{
if (!$this->interceptor) {
throw new \Error(__METHOD__ . ' may only be invoked once per instance. '
. 'If you need to implement retries or otherwise issue multiple requests, register an ApplicationInterceptor to do so.');
}
$interceptor = $this->interceptor;
$this->interceptor = null;
return call(function () use ($interceptor, $request, $cancellation) {
foreach ($request->getEventListeners() as $eventListener) {
yield $eventListener->startRequest($request);
}
return $interceptor->requestViaNetwork($request, $cancellation, $this->stream);
});
}
public function getLocalAddress(): SocketAddress
{
return $this->stream->getLocalAddress();
}
public function getRemoteAddress(): SocketAddress
{
return $this->stream->getRemoteAddress();
}
public function getTlsInfo(): ?TlsInfo
{
return $this->stream->getTlsInfo();
}
}
PK tAVTg g src/RequestBody.phpnu W+A true]);
}
/** @var string[] */
private $protocolVersions = ['1.1', '2'];
/** @var string */
private $method;
/** @var UriInterface */
private $uri;
/** @var RequestBody */
private $body;
/** @var int */
private $tcpConnectTimeout = 10000;
/** @var int */
private $tlsHandshakeTimeout = 10000;
/** @var int */
private $transferTimeout = 10000;
/** @var int */
private $bodySizeLimit = self::DEFAULT_BODY_SIZE_LIMIT;
/** @var int */
private $headerSizeLimit = self::DEFAULT_HEADER_SIZE_LIMIT;
/** @var callable|null */
private $onPush;
/** @var callable|null */
private $onUpgrade;
/** @var callable|null */
private $onInformationalResponse;
/** @var mixed[] */
private $attributes = [];
/** @var EventListener[] */
private $eventListeners = [];
/**
* Request constructor.
*
* @param string|UriInterface $uri
* @param string $method
* @param string $body
*/
public function __construct($uri, string $method = "GET", ?string $body = null)
{
$this->setUri($uri);
$this->setMethod($method);
$this->setBody($body);
}
public function addEventListener(EventListener $eventListener): void
{
$this->eventListeners[] = $eventListener;
}
/**
* @return EventListener[]
*/
public function getEventListeners(): array
{
return $this->eventListeners;
}
/**
* Retrieve the requests's acceptable HTTP protocol versions.
*
* @return string[]
*/
public function getProtocolVersions(): array
{
return $this->protocolVersions;
}
/**
* Assign the requests's acceptable HTTP protocol versions.
*
* The HTTP client might choose any of these.
*
* @param string[] $versions
*/
public function setProtocolVersions(array $versions): void
{
$versions = \array_unique($versions);
if (empty($versions)) {
/** @noinspection PhpUndefinedClassInspection */
throw new \Error("Empty array of protocol versions provided, must not be empty.");
}
foreach ($versions as $version) {
if (!\in_array($version, ["1.0", "1.1", "2"], true)) {
/** @noinspection PhpUndefinedClassInspection */
throw new \Error(
"Invalid HTTP protocol version: " . $version
);
}
}
$this->protocolVersions = $versions;
}
/**
* Retrieve the request's HTTP method verb.
*
* @return string
*/
public function getMethod(): string
{
return $this->method;
}
/**
* Specify the request's HTTP method verb.
*
* @param string $method
*/
public function setMethod(string $method): void
{
$this->method = $method;
}
/**
* Retrieve the request's URI.
*
* @return UriInterface
*/
public function getUri(): UriInterface
{
return $this->uri;
}
/**
* Specify the request's HTTP URI.
*
* @param string|UriInterface $uri
*/
public function setUri($uri): void
{
$this->uri = $uri instanceof UriInterface ? $uri : $this->createUriFromString($uri);
}
/**
* Assign a value for the specified header field by replacing any existing values for that field.
*
* @param string $field Header name.
* @param string|string[] $value Header value.
*/
public function setHeader(string $field, $value): void
{
if (($field[0] ?? ":") === ":") {
throw new \Error("Header name cannot be empty or start with a colon (:)");
}
parent::setHeader($field, $value);
}
/**
* Assign a value for the specified header field by adding an additional header line.
*
* @param string $field Header name.
* @param string|string[] $value Header value.
*/
public function addHeader(string $field, $value): void
{
if (($field[0] ?? ":") === ":") {
throw new \Error("Header name cannot be empty or start with a colon (:)");
}
parent::addHeader($field, $value);
}
public function setHeaders(array $headers): void
{
/** @noinspection PhpUnhandledExceptionInspection */
parent::setHeaders($headers);
}
/**
* Remove the specified header field from the message.
*
* @param string $field Header name.
*/
public function removeHeader(string $field): void
{
parent::removeHeader($field);
}
/**
* Retrieve the message entity body.
*/
public function getBody(): RequestBody
{
return $this->body;
}
/**
* Assign the message entity body.
*
* @param mixed $body
*/
public function setBody($body): void
{
if ($body === null) {
$this->body = new StringBody("");
} elseif (\is_string($body)) {
$this->body = new StringBody($body);
} elseif (\is_scalar($body)) {
$this->body = new StringBody(\var_export($body, true));
} elseif ($body instanceof RequestBody) {
$this->body = $body;
} else {
/** @noinspection PhpUndefinedClassInspection */
throw new \TypeError("Invalid body type: " . \gettype($body));
}
}
/**
* Registers a callback to the request that is invoked when the server pushes an additional resource.
* The callback is given two parameters: the Request generated from the pushed resource, and a promise for the
* Response containing the pushed resource. An HttpException, StreamException, or CancelledException can be thrown
* to refuse the push. If no callback is registered, pushes are automatically rejected.
*
* Interceptors can mostly use {@code interceptPush} instead.
*
* Example:
* function (Request $request, Promise $response): \Generator {
* $uri = $request->getUri(); // URI of pushed resource.
* $response = yield $promise; // Wait for resource to arrive.
* // Use Response object from resolved promise.
* }
*
* @param callable|null $onPush
*/
public function setPushHandler(?callable $onPush): void
{
$this->onPush = $onPush;
}
/**
* Allows interceptors to modify also pushed responses.
*
* If no push callable has been set by the application, the interceptor won't be invoked. If you want to enable
* push in an interceptor without the application setting a push handler, you need to use {@code setPushHandler}.
*
* @param callable $interceptor Receives the response and might modify it or return a new instance.
*/
public function interceptPush(callable $interceptor): void
{
if ($this->onPush === null) {
return;
}
$onPush = $this->onPush;
$this->onPush = static function (Request $request, Promise $response) use ($onPush, $interceptor) {
$response = call(static function () use ($response, $interceptor) {
return (yield call($interceptor, yield $response)) ?? $response;
});
return $onPush($request, $response);
};
}
/**
* @return callable|null
*/
public function getPushHandler(): ?callable
{
return $this->onPush;
}
/**
* Registers a callback invoked if a 101 response is returned to the request.
*
* @param callable|null $onUpgrade
*/
public function setUpgradeHandler(?callable $onUpgrade): void
{
$this->onUpgrade = $onUpgrade;
}
/**
* @return callable|null
*/
public function getUpgradeHandler(): ?callable
{
return $this->onUpgrade;
}
/**
* Registers a callback invoked when a 1xx response is returned to the request (other than a 101).
*
* @param callable|null $onInformationalResponse
*/
public function setInformationalResponseHandler(?callable $onInformationalResponse): void
{
$this->onInformationalResponse = $onInformationalResponse;
}
/**
* @return callable|null
*/
public function getInformationalResponseHandler(): ?callable
{
return $this->onInformationalResponse;
}
/**
* @return int Timeout in milliseconds for the TCP connection.
*/
public function getTcpConnectTimeout(): int
{
return $this->tcpConnectTimeout;
}
public function setTcpConnectTimeout(int $tcpConnectTimeout): void
{
$this->tcpConnectTimeout = $tcpConnectTimeout;
}
/**
* @return int Timeout in milliseconds for the TLS handshake.
*/
public function getTlsHandshakeTimeout(): int
{
return $this->tlsHandshakeTimeout;
}
public function setTlsHandshakeTimeout(int $tlsHandshakeTimeout): void
{
$this->tlsHandshakeTimeout = $tlsHandshakeTimeout;
}
/**
* @return int Timeout in milliseconds for the HTTP transfer (not counting TCP connect and TLS handshake)
*/
public function getTransferTimeout(): int
{
return $this->transferTimeout;
}
public function setTransferTimeout(int $transferTimeout): void
{
$this->transferTimeout = $transferTimeout;
}
public function getHeaderSizeLimit(): int
{
return $this->headerSizeLimit;
}
public function setHeaderSizeLimit(int $headerSizeLimit): void
{
$this->headerSizeLimit = $headerSizeLimit;
}
public function getBodySizeLimit(): int
{
return $this->bodySizeLimit;
}
public function setBodySizeLimit(int $bodySizeLimit): void
{
$this->bodySizeLimit = $bodySizeLimit;
}
/**
* Note: This method returns a deep clone of the request's attributes, so you can't modify the request attributes
* by modifying the returned value in any way.
*
* @return mixed[] An array of all request attributes in the request's local storage, indexed by name.
*/
public function getAttributes(): array
{
return self::clone($this->attributes);
}
/**
* Check whether a variable with the given name exists in the request's local storage.
*
* Each request has its own local storage to which applications and interceptors may read and write data.
* Other interceptors which are aware of this data can then access it without the server being tightly coupled to
* specific implementations.
*
* @param string $name Name of the attribute, should be namespaced with a vendor and package namespace like classes.
*
* @return bool
*/
public function hasAttribute(string $name): bool
{
return \array_key_exists($name, $this->attributes);
}
/**
* Retrieve a variable from the request's local storage.
*
* Each request has its own local storage to which applications and interceptors may read and write data.
* Other interceptors which are aware of this data can then access it without the server being tightly coupled to
* specific implementations.
*
* Note: This method returns a deep clone of the request's attribute, so you can't modify the request attribute
* by modifying the returned value in any way.
*
* @param string $name Name of the attribute, should be namespaced with a vendor and package namespace like classes.
*
* @return mixed
*
* @throws MissingAttributeError If an attribute with the given name does not exist.
*/
public function getAttribute(string $name)
{
if (!$this->hasAttribute($name)) {
throw new MissingAttributeError("The requested attribute '{$name}' does not exist");
}
return self::clone($this->attributes[$name]);
}
/**
* Assign a variable to the request's local storage.
*
* Each request has its own local storage to which applications and interceptors may read and write data.
* Other interceptors which are aware of this data can then access it without the server being tightly coupled to
* specific implementations.
*
* Note: This method performs a deep clone of the value via serialization, so you can't modify the given value
* after setting it.
*
* **Example**
*
* ```php
* $request->setAttribute(Timing::class, $stopWatch);
* ```
*
* @param string $name Name of the attribute, should be namespaced with a vendor and package namespace like classes.
* @param mixed $value Value of the attribute, might be any serializable value.
*/
public function setAttribute(string $name, $value): void
{
$this->attributes[$name] = self::clone($value);
}
/**
* Remove an attribute from the request's local storage.
*
* @param string $name Name of the attribute, should be namespaced with a vendor and package namespace like classes.
*
* @throws MissingAttributeError If an attribute with the given name does not exist.
*/
public function removeAttribute(string $name): void
{
if (!$this->hasAttribute($name)) {
throw new MissingAttributeError("The requested attribute '{$name}' does not exist");
}
unset($this->attributes[$name]);
}
/**
* Remove all attributes from the request's local storage.
*/
public function removeAttributes(): void
{
$this->attributes = [];
}
public function isIdempotent(): bool
{
// https://www.w3.org/Protocols/rfc2616/rfc2616-sec9.html
return \in_array($this->method, ['GET', 'HEAD', 'PUT', 'DELETE'], true);
}
private function createUriFromString(string $uri): UriInterface
{
return Uri\Http::createFromString($uri);
}
}
PK tAV# src/DelegateHttpClient.phpnu W+A
*/
public function request(Request $request, CancellationToken $cancellation): Promise;
}
PK tAV; src/PooledHttpClient.phpnu W+A connectionPool = $connectionPool ?? new UnlimitedConnectionPool;
}
public function request(Request $request, CancellationToken $cancellation): Promise
{
return call(function () use ($request, $cancellation) {
foreach ($request->getEventListeners() as $eventListener) {
yield $eventListener->startRequest($request);
}
$stream = yield $this->connectionPool->getStream($request, $cancellation);
\assert($stream instanceof Stream);
foreach (\array_reverse($this->networkInterceptors) as $interceptor) {
$stream = new InterceptedStream($stream, $interceptor);
}
return yield $stream->request($request, $cancellation);
});
}
/**
* Adds a network interceptor.
*
* Network interceptors are only invoked if the request requires network access, i.e. there's no short-circuit by
* an application interceptor, e.g. a cache.
*
* Whether the given network interceptor will be respected for currently running requests is undefined.
*
* Any new requests have to take the new interceptor into account.
*
* @param NetworkInterceptor $networkInterceptor
*
* @return self
*/
public function intercept(NetworkInterceptor $networkInterceptor): self
{
$clone = clone $this;
$clone->networkInterceptors[] = $networkInterceptor;
return $clone;
}
}
PK tAVfZ src/InvalidRequestException.phpnu W+A request = $request;
}
public function getRequest(): Request
{
return $this->request;
}
}
PK tAV|!9Z src/Trailers.phpnu W+A true,
"content-encoding" => true,
"content-length" => true,
"content-range" => true,
"content-type" => true,
"cookie" => true,
"expect" => true,
"host" => true,
"pragma" => true,
"proxy-authenticate" => true,
"proxy-authorization" => true,
"range" => true,
"te" => true,
"trailer" => true,
"transfer-encoding" => true,
"www-authenticate" => true,
];
/**
* @param string[]|string[][] $headers
*
* @throws InvalidHeaderException Thrown if a disallowed field is in the header values.
*/
public function __construct(array $headers)
{
if (!empty($headers)) {
$this->setHeaders($headers);
}
if (\array_intersect_key($this->getHeaders(), self::DISALLOWED_TRAILERS)) {
throw new InvalidHeaderException('Disallowed field in trailers');
}
}
}
PK tAV{5ɔ src/ParseException.phpnu W+A httpClient = $httpClient;
$this->interceptor = $interceptor;
}
public function request(Request $request, CancellationToken $cancellation): Promise
{
return call(function () use ($request, $cancellation) {
foreach ($request->getEventListeners() as $eventListener) {
yield $eventListener->startRequest($request);
}
return $this->interceptor->request($request, $cancellation, $this->httpClient);
});
}
}
PK tAV߅ src/EventListener.phpnu W+A source = $source;
$this->sizeLimit = $sizeLimit;
}
public function read(): Promise
{
if ($this->exception) {
return new Failure($this->exception);
}
$promise = $this->source->read();
$promise->onResolve(function ($error, $value) {
if ($value !== null) {
$this->bytesRead += \strlen($value);
if ($this->bytesRead > $this->sizeLimit) {
$this->exception = new ParseException(
"Configured body size exceeded: {$this->bytesRead} bytes received, while the configured limit is {$this->sizeLimit} bytes",
Status::PAYLOAD_TOO_LARGE
);
$this->source = null;
}
}
});
return $promise;
}
}
PK tAV src/Internal/ForbidCloning.phpnu W+A . # src/Internal/ResponseBodyStream.phpnu W+A body = $body;
$this->bodyCancellation = $bodyCancellation;
}
public function read(): Promise
{
$promise = $this->body->read();
$promise->onResolve(function ($error, $value) {
if ($value === null && $error === null) {
$this->successfulEnd = true;
}
});
return $promise;
}
public function __destruct()
{
if (!$this->successfulEnd) {
$this->bodyCancellation->cancel();
}
}
}
PK tAV$ src/HttpClient.phpnu W+A httpClient = $httpClient;
}
/**
* Request a specific resource from an HTTP server.
*
* @param Request $request
* @param CancellationToken $cancellation
*
* @return Promise
*/
public function request(Request $request, ?CancellationToken $cancellation = null): Promise
{
return $this->httpClient->request(clone $request, $cancellation ?? new NullCancellationToken);
}
}
PK tAV