Skip to content
This repository was archived by the owner on Dec 19, 2019. It is now read-only.

Commit 69daf02

Browse files
ENGCOM-5205: fix AsynchronousOperations multistore issue by passing store_id from … #102
- Merge Pull Request magento-commerce/async-import#102 from comwrap/async-import:2.3-develop-bulk-api-multistore-hot-fix - Merged commits: 1. fd6cf5e 2. 80732c8 3. 51ac4d8 4. 331e311 5. dad12c6 6. 81ecd90 7. ab2f000 8. 48104dc 9. 460a249 10. c336e36 11. 038615b 12. fb06c33 13. 06bc3a2
2 parents 98703f6 + 06bc3a2 commit 69daf02

File tree

10 files changed

+792
-72
lines changed

10 files changed

+792
-72
lines changed
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
<?php
2+
/**
3+
* Copyright © Magento, Inc. All rights reserved.
4+
* See COPYING.txt for license details.
5+
*/
6+
7+
declare(strict_types=1);
8+
9+
namespace Magento\AmqpStore\Plugin\AsynchronousOperations;
10+
11+
use Magento\Framework\Exception\NoSuchEntityException;
12+
use Magento\Store\Model\StoreManagerInterface;
13+
use Magento\Framework\MessageQueue\EnvelopeFactory;
14+
use PhpAmqpLib\Wire\AMQPTable;
15+
use Magento\Framework\MessageQueue\EnvelopeInterface;
16+
use Magento\AsynchronousOperations\Model\MassConsumerEnvelopeCallback as SubjectMassConsumerEnvelopeCallback;
17+
use Psr\Log\LoggerInterface;
18+
19+
/**
20+
* Plugin to get 'store_id' from the new custom header 'store_id' in amqp
21+
* 'application_headers' properties and setCurrentStore by value 'store_id'.
22+
*/
23+
class MassConsumerEnvelopeCallback
24+
{
25+
/**
26+
* @var StoreManagerInterface
27+
*/
28+
private $storeManager;
29+
30+
/**
31+
* @var EnvelopeFactory
32+
*/
33+
private $envelopeFactory;
34+
35+
/**
36+
* @var LoggerInterface
37+
*/
38+
private $logger;
39+
40+
/**
41+
* @param EnvelopeFactory $envelopeFactory
42+
* @param StoreManagerInterface $storeManager
43+
* @param LoggerInterface $logger
44+
*/
45+
public function __construct(
46+
EnvelopeFactory $envelopeFactory,
47+
StoreManagerInterface $storeManager,
48+
LoggerInterface $logger
49+
) {
50+
$this->storeManager = $storeManager;
51+
$this->envelopeFactory = $envelopeFactory;
52+
$this->logger = $logger;
53+
}
54+
55+
/**
56+
* Check if amqpProperties['application_headers'] have 'store_id' and use it to setCurrentStore
57+
* Restore original store value in consumer process after execution.
58+
* Reject queue messages because of wrong store_id.
59+
*
60+
* @param SubjectMassConsumerEnvelopeCallback $subject
61+
* @param callable $proceed
62+
* @param EnvelopeInterface $message
63+
* @return void
64+
* @SuppressWarnings(PHPMD.UnusedFormalParameter)
65+
*/
66+
public function aroundExecute(SubjectMassConsumerEnvelopeCallback $subject, callable $proceed, EnvelopeInterface $message)
67+
{
68+
$amqpProperties = $message->getProperties();
69+
if (isset($amqpProperties['application_headers'])) {
70+
$headers = $amqpProperties['application_headers'];
71+
if ($headers instanceof AMQPTable) {
72+
$headers = $headers->getNativeData();
73+
}
74+
if (isset($headers['store_id'])) {
75+
$storeId = $headers['store_id'];
76+
try {
77+
$currentStoreId = $this->storeManager->getStore()->getId();
78+
} catch (NoSuchEntityException $e) {
79+
$this->logger->error(
80+
sprintf(
81+
"Can't set currentStoreId during processing queue. Message rejected. Error %s.",
82+
$e->getMessage()
83+
)
84+
);
85+
$subject->getQueue()->reject($message, false, $e->getMessage());
86+
return;
87+
}
88+
if (isset($storeId) && $storeId !== $currentStoreId) {
89+
$this->storeManager->setCurrentStore($storeId);
90+
}
91+
}
92+
}
93+
$proceed($message);
94+
if (isset($storeId, $currentStoreId) && $storeId !== $currentStoreId) {
95+
$this->storeManager->setCurrentStore($currentStoreId);//restore original store value
96+
}
97+
}
98+
}
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
<?php
2+
/**
3+
* Copyright © Magento, Inc. All rights reserved.
4+
* See COPYING.txt for license details.
5+
*/
6+
7+
declare(strict_types=1);
8+
9+
namespace Magento\AmqpStore\Plugin\Framework\Amqp\Bulk;
10+
11+
use Magento\Framework\Exception\NoSuchEntityException;
12+
use Magento\Store\Model\StoreManagerInterface;
13+
use Magento\Framework\MessageQueue\EnvelopeFactory;
14+
use PhpAmqpLib\Exception\AMQPInvalidArgumentException;
15+
use PhpAmqpLib\Wire\AMQPTable;
16+
use Magento\Framework\Amqp\Bulk\Exchange as SubjectExchange;
17+
use Magento\Framework\MessageQueue\EnvelopeInterface;
18+
use Psr\Log\LoggerInterface;
19+
20+
/**
21+
* Plugin to set 'store_id' to the new custom header 'store_id' in amqp
22+
* 'application_headers'.
23+
*/
24+
class Exchange
25+
{
26+
/**
27+
* @var StoreManagerInterface
28+
*/
29+
private $storeManager;
30+
31+
/**
32+
* @var EnvelopeFactory
33+
*/
34+
private $envelopeFactory;
35+
36+
/**
37+
* @var LoggerInterface
38+
*/
39+
private $logger;
40+
41+
/**
42+
* @param EnvelopeFactory $envelopeFactory
43+
* @param StoreManagerInterface $storeManager
44+
*/
45+
public function __construct(
46+
EnvelopeFactory $envelopeFactory,
47+
StoreManagerInterface $storeManager,
48+
LoggerInterface $logger
49+
) {
50+
$this->storeManager = $storeManager;
51+
$this->envelopeFactory = $envelopeFactory;
52+
$this->logger = $logger;
53+
}
54+
55+
/**
56+
* Set current store_id in amqpProperties['application_headers']
57+
* so consumer may check store_id and execute operation in correct store scope.
58+
* Prevent publishing inconsistent messages because of store_id not defined or wrong.
59+
*
60+
* @param SubjectExchange $subject
61+
* @param string $topic
62+
* @param EnvelopeInterface[] $envelopes
63+
* @return array
64+
* @throws AMQPInvalidArgumentException
65+
* @throws \LogicException
66+
* @SuppressWarnings(PHPMD.UnusedFormalParameter)
67+
*/
68+
public function beforeEnqueue(SubjectExchange $subject, $topic, array $envelopes)
69+
{
70+
try {
71+
$storeId = $this->storeManager->getStore()->getId();
72+
} catch (NoSuchEntityException $e) {
73+
$errorMessage = sprintf(
74+
"Can't get current storeId and inject to amqp message. Error %s.",
75+
$e->getMessage()
76+
);
77+
$this->logger->error($errorMessage);
78+
throw new \LogicException($errorMessage);
79+
}
80+
81+
$updatedEnvelopes = [];
82+
foreach ($envelopes as $envelope) {
83+
$properties = $envelope->getProperties();
84+
if (!isset($properties)) {
85+
$properties = [];
86+
}
87+
if (isset($properties['application_headers'])) {
88+
$headers = $properties['application_headers'];
89+
if ($headers instanceof AMQPTable) {
90+
try {
91+
$headers->set('store_id', $storeId);
92+
} catch (AMQPInvalidArgumentException $ea) {
93+
$errorMessage = sprintf("Can't set storeId to amqp message. Error %s.", $ea->getMessage());
94+
$this->logger->error($errorMessage);
95+
throw new AMQPInvalidArgumentException($errorMessage);
96+
}
97+
$properties['application_headers'] = $headers;
98+
}
99+
} else {
100+
$properties['application_headers'] = new AMQPTable(['store_id' => $storeId]);
101+
}
102+
$updatedEnvelopes[] = $this->envelopeFactory->create(
103+
[
104+
'body' => $envelope->getBody(),
105+
'properties' => $properties
106+
]
107+
);
108+
}
109+
if (!empty($updatedEnvelopes)) {
110+
$envelopes = $updatedEnvelopes;
111+
}
112+
return [$topic, $envelopes];
113+
}
114+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
{
2+
"name": "magento/module-amqp-store",
3+
"description": "N/A",
4+
"config": {
5+
"sort-packages": true
6+
},
7+
"require": {
8+
"magento/framework": "*",
9+
"magento/framework-amqp": "*",
10+
"magento/framework-message-queue": "*",
11+
"magento/module-store": "*",
12+
"php": "~7.1.3||~7.2.0"
13+
},
14+
"suggest": {
15+
"magento/module-asynchronous-operations": "*"
16+
},
17+
"type": "magento2-module",
18+
"license": [
19+
"OSL-3.0",
20+
"AFL-3.0"
21+
],
22+
"autoload": {
23+
"files": [
24+
"registration.php"
25+
],
26+
"psr-4": {
27+
"Magento\\AmqpStore\\": ""
28+
}
29+
}
30+
}

app/code/Magento/AmqpStore/etc/di.xml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
<?xml version="1.0"?>
2+
<!--
3+
/**
4+
* Copyright © Magento, Inc. All rights reserved.
5+
* See COPYING.txt for license details.
6+
*/
7+
-->
8+
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework:ObjectManager/etc/config.xsd">
9+
<type name="Magento\Framework\Amqp\Bulk\Exchange">
10+
<plugin name="amqpStoreIdFieldForAmqpBulkExchange" type="Magento\AmqpStore\Plugin\Framework\Amqp\Bulk\Exchange"/>
11+
</type>
12+
<type name="Magento\AsynchronousOperations\Model\MassConsumerEnvelopeCallback">
13+
<plugin name="amqpStoreIdFieldForAsynchronousOperationsMassConsumerEnvelopeCallback" type="Magento\AmqpStore\Plugin\AsynchronousOperations\MassConsumerEnvelopeCallback"/>
14+
</type>
15+
</config>
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
<?xml version="1.0"?>
2+
<!--
3+
/**
4+
* Copyright © Magento, Inc. All rights reserved.
5+
* See COPYING.txt for license details.
6+
*/
7+
-->
8+
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework:Module/etc/module.xsd">
9+
<module name="Magento_AmqpStore">
10+
<sequence>
11+
<module name="Magento_Store"/>
12+
</sequence>
13+
</module>
14+
</config>
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
<?php
2+
/**
3+
* Copyright © Magento, Inc. All rights reserved.
4+
* See COPYING.txt for license details.
5+
*/
6+
7+
use \Magento\Framework\Component\ComponentRegistrar;
8+
9+
ComponentRegistrar::register(ComponentRegistrar::MODULE, 'Magento_AmqpStore', __DIR__);

0 commit comments

Comments
 (0)