vendor/ruflin/elastica/src/Bulk.php line 304

Open in your IDE?
  1. <?php
  2. namespace Elastica;
  3. use Elastica\Bulk\Action;
  4. use Elastica\Bulk\Action\AbstractDocument as AbstractDocumentAction;
  5. use Elastica\Bulk\Response as BulkResponse;
  6. use Elastica\Bulk\ResponseSet;
  7. use Elastica\Exception\Bulk\ResponseException as BulkResponseException;
  8. use Elastica\Exception\ClientException;
  9. use Elastica\Exception\ConnectionException;
  10. use Elastica\Exception\InvalidException;
  11. use Elastica\Exception\RequestEntityTooLargeException;
  12. use Elastica\Exception\ResponseException;
  13. use Elastica\Script\AbstractScript;
  14. class Bulk
  15. {
  16.     public const DELIMITER "\n";
  17.     /**
  18.      * @var Client
  19.      */
  20.     protected $_client;
  21.     /**
  22.      * @var Action[]
  23.      */
  24.     protected $_actions = [];
  25.     /**
  26.      * @var string|null
  27.      */
  28.     protected $_index;
  29.     /**
  30.      * @var array request parameters to the bulk api
  31.      */
  32.     protected $_requestParams = [];
  33.     public function __construct(Client $client)
  34.     {
  35.         $this->_client $client;
  36.     }
  37.     public function __toString(): string
  38.     {
  39.         $data '';
  40.         foreach ($this->getActions() as $action) {
  41.             $data .= (string) $action;
  42.         }
  43.         return $data;
  44.     }
  45.     /**
  46.      * @param Index|string $index
  47.      *
  48.      * @return $this
  49.      */
  50.     public function setIndex($index): self
  51.     {
  52.         if ($index instanceof Index) {
  53.             $index $index->getName();
  54.         }
  55.         $this->_index = (string) $index;
  56.         return $this;
  57.     }
  58.     /**
  59.      * @return string|null
  60.      */
  61.     public function getIndex()
  62.     {
  63.         return $this->_index;
  64.     }
  65.     public function hasIndex(): bool
  66.     {
  67.         return null !== $this->getIndex() && '' !== $this->getIndex();
  68.     }
  69.     public function getPath(): string
  70.     {
  71.         $path '';
  72.         if ($this->hasIndex()) {
  73.             $path .= $this->getIndex().'/';
  74.         }
  75.         $path .= '_bulk';
  76.         return $path;
  77.     }
  78.     /**
  79.      * @return $this
  80.      */
  81.     public function addAction(Action $action): self
  82.     {
  83.         $this->_actions[] = $action;
  84.         return $this;
  85.     }
  86.     /**
  87.      * @param Action[] $actions
  88.      *
  89.      * @return $this
  90.      */
  91.     public function addActions(array $actions): self
  92.     {
  93.         foreach ($actions as $action) {
  94.             $this->addAction($action);
  95.         }
  96.         return $this;
  97.     }
  98.     /**
  99.      * @return Action[]
  100.      */
  101.     public function getActions(): array
  102.     {
  103.         return $this->_actions;
  104.     }
  105.     /**
  106.      * @return $this
  107.      */
  108.     public function addDocument(Document $document, ?string $opType null): self
  109.     {
  110.         if (!$document->hasRetryOnConflict() && $this->_client->hasConnection() && $this->_client->getConnection()->hasParam('retryOnConflict') && ($retry $this->_client->getConnection()->getParam('retryOnConflict')) > 0) {
  111.             $document->setRetryOnConflict($retry);
  112.         }
  113.         $action AbstractDocumentAction::create($document$opType);
  114.         return $this->addAction($action);
  115.     }
  116.     /**
  117.      * @param Document[] $documents
  118.      *
  119.      * @return $this
  120.      */
  121.     public function addDocuments(array $documents, ?string $opType null): self
  122.     {
  123.         foreach ($documents as $document) {
  124.             $this->addDocument($document$opType);
  125.         }
  126.         return $this;
  127.     }
  128.     /**
  129.      * @return $this
  130.      */
  131.     public function addScript(AbstractScript $script, ?string $opType null): self
  132.     {
  133.         if (!$script->hasRetryOnConflict() && $this->_client->hasConnection() && $this->_client->getConnection()->hasParam('retryOnConflict') && ($retry $this->_client->getConnection()->getParam('retryOnConflict')) > 0) {
  134.             $script->setRetryOnConflict($retry);
  135.         }
  136.         $action AbstractDocumentAction::create($script$opType);
  137.         return $this->addAction($action);
  138.     }
  139.     /**
  140.      * @param AbstractScript[] $scripts
  141.      * @param string|null      $opType
  142.      *
  143.      * @return $this
  144.      */
  145.     public function addScripts(array $scripts$opType null): self
  146.     {
  147.         foreach ($scripts as $script) {
  148.             $this->addScript($script$opType);
  149.         }
  150.         return $this;
  151.     }
  152.     /**
  153.      * @param AbstractScript|array|Document $data
  154.      *
  155.      * @return $this
  156.      */
  157.     public function addData($data, ?string $opType null)
  158.     {
  159.         if (!\is_array($data)) {
  160.             $data = [$data];
  161.         }
  162.         foreach ($data as $actionData) {
  163.             if ($actionData instanceof AbstractScript) {
  164.                 $this->addScript($actionData$opType);
  165.             } elseif ($actionData instanceof Document) {
  166.                 $this->addDocument($actionData$opType);
  167.             } else {
  168.                 throw new \InvalidArgumentException('Data should be a Document, a Script or an array containing Documents and/or Scripts');
  169.             }
  170.         }
  171.         return $this;
  172.     }
  173.     /**
  174.      * @throws InvalidException
  175.      *
  176.      * @return $this
  177.      */
  178.     public function addRawData(array $data): self
  179.     {
  180.         foreach ($data as $row) {
  181.             if (\is_array($row)) {
  182.                 $opType \key($row);
  183.                 $metadata \reset($row);
  184.                 if (Action::isValidOpType($opType)) {
  185.                     // add previous action
  186.                     if (isset($action)) {
  187.                         $this->addAction($action);
  188.                     }
  189.                     $action = new Action($opType$metadata);
  190.                 } elseif (isset($action)) {
  191.                     $action->setSource($row);
  192.                     $this->addAction($action);
  193.                     $action null;
  194.                 } else {
  195.                     throw new InvalidException('Invalid bulk data, source must follow action metadata');
  196.                 }
  197.             } else {
  198.                 throw new InvalidException('Invalid bulk data, should be array of array, Document or Bulk/Action');
  199.             }
  200.         }
  201.         // add last action if available
  202.         if (isset($action)) {
  203.             $this->addAction($action);
  204.         }
  205.         return $this;
  206.     }
  207.     /**
  208.      * Set a url parameter on the request bulk request.
  209.      *
  210.      * @param string $name  name of the parameter
  211.      * @param mixed  $value value of the parameter
  212.      *
  213.      * @return $this
  214.      */
  215.     public function setRequestParam(string $name$value): self
  216.     {
  217.         $this->_requestParams[$name] = $value;
  218.         return $this;
  219.     }
  220.     /**
  221.      * Set the amount of time that the request will wait the shards to come on line.
  222.      * Requires Elasticsearch version >= 0.90.8.
  223.      *
  224.      * @param string $time timeout in Elasticsearch time format
  225.      *
  226.      * @return $this
  227.      */
  228.     public function setShardTimeout(string $time): self
  229.     {
  230.         return $this->setRequestParam('timeout'$time);
  231.     }
  232.     /**
  233.      * @deprecated since version 7.1.3, use the "__toString()" method or cast to string instead.
  234.      */
  235.     public function toString(): string
  236.     {
  237.         \trigger_deprecation('ruflin/elastica''7.1.3''The "%s()" method is deprecated, use "__toString()" or cast to string instead. It will be removed in 8.0.'__METHOD__);
  238.         return (string) $this;
  239.     }
  240.     public function toArray(): array
  241.     {
  242.         $data = [];
  243.         foreach ($this->getActions() as $action) {
  244.             foreach ($action->toArray() as $row) {
  245.                 $data[] = $row;
  246.             }
  247.         }
  248.         return $data;
  249.     }
  250.     /**
  251.      * @throws ClientException
  252.      * @throws ConnectionException
  253.      * @throws ResponseException
  254.      * @throws BulkResponseException
  255.      * @throws InvalidException
  256.      */
  257.     public function send(): ResponseSet
  258.     {
  259.         $response $this->_client->request($this->getPath(), Request::POST, (string) $this$this->_requestParamsRequest::NDJSON_CONTENT_TYPE);
  260.         return $this->_processResponse($response);
  261.     }
  262.     /**
  263.      * @throws BulkResponseException
  264.      * @throws InvalidException
  265.      */
  266.     protected function _processResponse(Response $response): ResponseSet
  267.     {
  268.         switch ($response->getStatus()) {
  269.             case 413: throw new RequestEntityTooLargeException();
  270.         }
  271.         $responseData $response->getData();
  272.         $actions $this->getActions();
  273.         $bulkResponses = [];
  274.         if (isset($responseData['items']) && \is_array($responseData['items'])) {
  275.             foreach ($responseData['items'] as $key => $item) {
  276.                 if (!isset($actions[$key])) {
  277.                     throw new InvalidException('No response found for action #'.$key);
  278.                 }
  279.                 $action $actions[$key];
  280.                 $opType \key($item);
  281.                 $bulkResponseData \reset($item);
  282.                 if ($action instanceof AbstractDocumentAction) {
  283.                     $data $action->getData();
  284.                     if ($data instanceof Document && $data->isAutoPopulate()
  285.                         || $this->_client->getConfigValue(['document''autoPopulate'], false)
  286.                     ) {
  287.                         if (!$data->hasId() && isset($bulkResponseData['_id'])) {
  288.                             $data->setId($bulkResponseData['_id']);
  289.                         }
  290.                         $data->setVersionParams($bulkResponseData);
  291.                     }
  292.                 }
  293.                 $bulkResponses[] = new BulkResponse($bulkResponseData$action$opType);
  294.             }
  295.         }
  296.         $bulkResponseSet = new ResponseSet($response$bulkResponses);
  297.         if ($bulkResponseSet->hasError()) {
  298.             throw new BulkResponseException($bulkResponseSet);
  299.         }
  300.         return $bulkResponseSet;
  301.     }
  302. }