BatchProcessingController.php
15.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
<?php
/**
* This class is a helper intended to handle data processings that need to happen in batches in a deferred way.
* It abstracts away the nuances of (re)scheduling actions and dealing with errors.
*
* Usage:
*
* 1. Create a class that implements BatchProcessorInterface.
* The class must either be registered in the dependency injection container, or have a public parameterless constructor,
* or an instance must be provided via the 'woocommerce_get_batch_processor' filter.
* 2. Whenever there's data to be processed invoke the 'enqueue_processor' method in this class,
* passing the class name of the processor.
*
* That's it, processing will be performed in batches inside scheduled actions; enqueued processors will only
* be dequeued once they notify that no more items are left to process (or when `force_clear_all_processes` is invoked).
* Failed batches will be retried after a while.
*
* There are also a few public methods to get the list of currently enqueued processors
* and to check if a given processor is enqueued/actually scheduled.
*/
namespace Automattic\WooCommerce\Internal\BatchProcessing;
/**
* Class BatchProcessingController
*
* @package Automattic\WooCommerce\Internal\Updates.
*/
class BatchProcessingController {
/*
* Identifier of a "watchdog" action that will schedule a processing action
* for any processor that is enqueued but not yet scheduled
* (because it's been just enqueued or because it threw an error while processing a batch),
* that's one single action that reschedules itself continuously.
*/
const WATCHDOG_ACTION_NAME = 'wc_schedule_pending_batch_processes';
/*
* Identifier of the action that will do the actual batch processing.
* There's one action per enqueued processor that will keep rescheduling itself
* as long as there are still pending items to process
* (except if there's an error that caused no items to be processed at all).
*/
const PROCESS_SINGLE_BATCH_ACTION_NAME = 'wc_run_batch_process';
const ENQUEUED_PROCESSORS_OPTION_NAME = 'wc_pending_batch_processes';
const ACTION_GROUP = 'wc_batch_processes';
/**
* Instance of WC_Logger class.
*
* @var \WC_Logger_Interface
*/
private $logger;
/**
* BatchProcessingController constructor.
*
* Schedules the necessary actions to process batches.
*/
public function __construct() {
add_action(
self::WATCHDOG_ACTION_NAME,
function () {
$this->handle_watchdog_action();
}
);
add_action(
self::PROCESS_SINGLE_BATCH_ACTION_NAME,
function ( $batch_process ) {
$this->process_next_batch_for_single_processor( $batch_process );
},
10,
2
);
$this->logger = wc_get_logger();
}
/**
* Enqueue a processor so that it will get batch processing requests from within scheduled actions.
*
* @param string $processor_class_name Fully qualified class name of the processor, must implement `BatchProcessorInterface`.
*/
public function enqueue_processor( string $processor_class_name ): void {
$pending_updates = $this->get_enqueued_processors();
if ( ! in_array( $processor_class_name, array_keys( $pending_updates ), true ) ) {
$pending_updates[] = $processor_class_name;
$this->set_enqueued_processors( $pending_updates );
}
$this->schedule_watchdog_action( false, true );
}
/**
* Schedule the watchdog action.
*
* @param bool $with_delay Whether to delay the action execution. Should be true when rescheduling, false when enqueueing.
* @param bool $unique Whether to make the action unique.
*/
private function schedule_watchdog_action( bool $with_delay = false, bool $unique = false ): void {
$time = $with_delay ? time() + HOUR_IN_SECONDS : time();
as_schedule_single_action(
$time,
self::WATCHDOG_ACTION_NAME,
array(),
self::ACTION_GROUP,
$unique
);
}
/**
* Schedule a processing action for all the processors that are enqueued but not scheduled
* (because they have just been enqueued, or because the processing for a batch failed).
*/
private function handle_watchdog_action(): void {
$pending_processes = $this->get_enqueued_processors();
if ( empty( $pending_processes ) ) {
return;
}
foreach ( $pending_processes as $process_name ) {
if ( ! $this->is_scheduled( $process_name ) ) {
$this->schedule_batch_processing( $process_name );
}
}
$this->schedule_watchdog_action( true );
}
/**
* Process a batch for a single processor, and handle any required rescheduling or state cleanup.
*
* @param string $processor_class_name Fully qualified class name of the processor.
*
* @throws \Exception If error occurred during batch processing.
*/
private function process_next_batch_for_single_processor( string $processor_class_name ): void {
if ( ! $this->is_enqueued( $processor_class_name ) ) {
return;
}
$batch_processor = $this->get_processor_instance( $processor_class_name );
$pending_count_before = $batch_processor->get_total_pending_count();
$error = $this->process_next_batch_for_single_processor_core( $batch_processor );
$pending_count_after = $batch_processor->get_total_pending_count();
if ( ( $error instanceof \Exception ) && $pending_count_before === $pending_count_after ) {
// The batch processing failed and no items were processed:
// reschedule the processing with a delay, and also throw the error
// so Action Scheduler will ignore the rescheduling if this happens repeatedly.
$this->schedule_batch_processing( $processor_class_name, true );
throw $error;
}
if ( $pending_count_after > 0 ) {
$this->schedule_batch_processing( $processor_class_name );
} else {
$this->dequeue_processor( $processor_class_name );
}
}
/**
* Process a batch for a single processor, updating state and logging any error.
*
* @param BatchProcessorInterface $batch_processor Batch processor instance.
*
* @return null|\Exception Exception if error occurred, null otherwise.
*/
private function process_next_batch_for_single_processor_core( BatchProcessorInterface $batch_processor ): ?\Exception {
$details = $this->get_process_details( $batch_processor );
$time_start = microtime( true );
$batch = $batch_processor->get_next_batch_to_process( $details['current_batch_size'] );
if ( empty( $batch ) ) {
return null;
}
try {
$batch_processor->process_batch( $batch );
$time_taken = microtime( true ) - $time_start;
$this->update_processor_state( $batch_processor, $time_taken );
} catch ( \Exception $exception ) {
$time_taken = microtime( true ) - $time_start;
$this->log_error( $exception, $batch_processor, $batch );
$this->update_processor_state( $batch_processor, $time_taken, $exception );
return $exception;
}
return null;
}
/**
* Get the current state for a given enqueued processor.
*
* @param BatchProcessorInterface $batch_processor Batch processor instance.
*
* @return array Current state for the processor, or a "blank" state if none exists yet.
*/
private function get_process_details( BatchProcessorInterface $batch_processor ): array {
return get_option(
$this->get_processor_state_option_name( $batch_processor ),
array(
'total_time_spent' => 0,
'current_batch_size' => $batch_processor->get_default_batch_size(),
'last_error' => null,
)
);
}
/**
* Get the name of the option where we will be saving state for a given processor.
*
* @param BatchProcessorInterface $batch_processor Batch processor instance.
*
* @return string Option name.
*/
private function get_processor_state_option_name( BatchProcessorInterface $batch_processor ): string {
$class_name = get_class( $batch_processor );
$class_md5 = md5( $class_name );
// truncate the class name so we know that it will fit in the option name column along with md5 hash and prefix.
$class_name = substr( $class_name, 0, 140 );
return 'wc_batch_' . $class_name . '_' . $class_md5;
}
/**
* Update the state for a processor after a batch has completed processing.
*
* @param BatchProcessorInterface $batch_processor Batch processor instance.
* @param float $time_taken Time take by the batch to complete processing.
* @param \Exception|null $last_error Exception object in processing the batch, if there was one.
*/
private function update_processor_state( BatchProcessorInterface $batch_processor, float $time_taken, \Exception $last_error = null ): void {
$current_status = $this->get_process_details( $batch_processor );
$current_status['total_time_spent'] += $time_taken;
$current_status['last_error'] = null !== $last_error ? $last_error->getMessage() : null;
update_option( $this->get_processor_state_option_name( $batch_processor ), $current_status, false );
}
/**
* Schedule a processing action for a single processor.
*
* @param string $processor_class_name Fully qualified class name of the processor.
* @param bool $with_delay Whether to schedule the action for immediate execution or for later.
*/
private function schedule_batch_processing( string $processor_class_name, bool $with_delay = false ) : void {
$time = $with_delay ? time() + MINUTE_IN_SECONDS : time();
as_schedule_single_action( $time, self::PROCESS_SINGLE_BATCH_ACTION_NAME, array( $processor_class_name ) );
}
/**
* Check if a batch processing action is already scheduled for a given processor.
* Differs from `as_has_scheduled_action` in that this excludes actions in progress.
*
* @param string $processor_class_name Fully qualified class name of the batch processor.
*
* @return bool True if a batch processing action is already scheduled for the processor.
*/
public function is_scheduled( string $processor_class_name ): bool {
return as_has_scheduled_action( self::PROCESS_SINGLE_BATCH_ACTION_NAME, array( $processor_class_name ) );
}
/**
* Get an instance of a processor given its class name.
*
* @param string $processor_class_name Full class name of the batch processor.
*
* @return BatchProcessorInterface Instance of batch processor for the given class.
* @throws \Exception If it's not possible to get an instance of the class.
*/
private function get_processor_instance( string $processor_class_name ) : BatchProcessorInterface {
$processor = wc_get_container()->get( $processor_class_name );
/**
* Filters the instance of a processor for a given class name.
*
* @param object|null $processor The processor instance given by the dependency injection container, or null if none was obtained.
* @param string $processor_class_name The full class name of the processor.
* @return BatchProcessorInterface|null The actual processor instance to use, or null if none could be retrieved.
*
* @since 6.8.0.
*/
$processor = apply_filters( 'woocommerce_get_batch_processor', $processor, $processor_class_name );
if ( ! isset( $processor ) && class_exists( $processor_class_name ) ) {
// This is a fallback for when the batch processor is not registered in the container.
$processor = new $processor_class_name();
}
if ( ! is_a( $processor, BatchProcessorInterface::class ) ) {
throw new \Exception( "Unable to initialize batch processor instance for $processor_class_name" );
}
return $processor;
}
/**
* Helper method to get list of all the enqueued processors.
*
* @return array List (of string) of the class names of the enqueued processors.
*/
public function get_enqueued_processors() : array {
return get_option( self::ENQUEUED_PROCESSORS_OPTION_NAME, array() );
}
/**
* Dequeue a processor once it has no more items pending processing.
*
* @param string $processor_class_name Full processor class name.
*/
private function dequeue_processor( string $processor_class_name ): void {
$pending_processes = $this->get_enqueued_processors();
if ( in_array( $processor_class_name, $pending_processes, true ) ) {
$pending_processes = array_diff( $pending_processes, array( $processor_class_name ) );
$this->set_enqueued_processors( $pending_processes );
}
}
/**
* Helper method to set the enqueued processor class names.
*
* @param array $processors List of full processor class names.
*/
private function set_enqueued_processors( array $processors ): void {
update_option( self::ENQUEUED_PROCESSORS_OPTION_NAME, $processors, false );
}
/**
* Check if a particular processor is enqueued.
*
* @param string $processor_class_name Fully qualified class name of the processor.
*
* @return bool True if the processor is enqueued.
*/
public function is_enqueued( string $processor_class_name ) : bool {
return in_array( $processor_class_name, $this->get_enqueued_processors(), true );
}
/**
* Dequeue and de-schedule a processor instance so that it won't be processed anymore.
*
* @param string $processor_class_name Fully qualified class name of the processor.
* @return bool True if the processor has been dequeued, false if the processor wasn't enqueued (so nothing has been done).
*/
public function remove_processor( string $processor_class_name ): bool {
$enqueued_processors = $this->get_enqueued_processors();
if ( ! in_array( $processor_class_name, $enqueued_processors, true ) ) {
return false;
}
$enqueued_processors = array_diff( $enqueued_processors, array( $processor_class_name ) );
if ( empty( $enqueued_processors ) ) {
$this->force_clear_all_processes();
} else {
update_option( self::ENQUEUED_PROCESSORS_OPTION_NAME, $enqueued_processors, false );
as_unschedule_all_actions( self::PROCESS_SINGLE_BATCH_ACTION_NAME, array( $processor_class_name ) );
}
return true;
}
/**
* Dequeues and de-schedules all the processors.
*/
public function force_clear_all_processes(): void {
as_unschedule_all_actions( self::PROCESS_SINGLE_BATCH_ACTION_NAME );
as_unschedule_all_actions( self::WATCHDOG_ACTION_NAME );
update_option( self::ENQUEUED_PROCESSORS_OPTION_NAME, array(), false );
}
/**
* Log an error that happened while processing a batch.
*
* @param \Exception $error Exception object to log.
* @param BatchProcessorInterface $batch_processor Batch processor instance.
* @param array $batch Batch that was being processed.
*/
protected function log_error( \Exception $error, BatchProcessorInterface $batch_processor, array $batch ) : void {
$batch_detail_string = '';
// Log only first and last, as the entire batch may be too big.
if ( count( $batch ) > 0 ) {
// phpcs:ignore WordPress.PHP.DevelopmentFunctions.error_log_print_r -- Logging is for debugging.
$batch_detail_string = '\n' . print_r(
array(
'batch_start' => $batch[0],
'batch_end' => end( $batch ),
),
true
);
}
$error_message = "Error processing batch for {$batch_processor->get_name()}: {$error->getMessage()}" . $batch_detail_string;
/**
* Filters the error message for a batch processing.
*
* @param string $error_message The error message that will be logged.
* @param \Exception $error The exception that was thrown by the processor.
* @param BatchProcessorInterface $batch_processor The processor that threw the exception.
* @param array $batch The batch that was being processed.
* @return string The actual error message that will be logged.
*
* @since 6.8.0
*/
$error_message = apply_filters( 'wc_batch_processing_log_message', $error_message, $error, $batch_processor, $batch );
$this->logger->error( $error_message, array( 'exception' => $error ) );
}
}