class-rest-sender.php
3.81 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
<?php
/**
* Sync package.
*
* @package automattic/jetpack-sync
*/
namespace Automattic\Jetpack\Sync;
use WP_Error;
/**
* This class will handle checkout of Sync queues for REST Endpoints.
*
* @since 1.23.1
*/
class REST_Sender {
/**
* Items pending send.
*
* @var array
*/
public $items = array();
/**
* Checkout objects from the queue
*
* @param string $queue_name Name of Queue.
* @param int $number_of_items Number of Items.
* @param array $args arguments.
*
* @return array|WP_Error
*/
public function queue_pull( $queue_name, $number_of_items, $args ) {
$queue = new Queue( $queue_name );
if ( 0 === $queue->size() ) {
return new WP_Error( 'queue_size', 'The queue is empty and there is nothing to send', 400 );
}
$sender = Sender::get_instance();
// try to give ourselves as much time as possible.
set_time_limit( 0 );
if ( ! empty( $args['pop'] ) ) {
$buffer = new Queue_Buffer( 'pop', $queue->pop( $number_of_items ) );
} else {
// let's delete the checkin state.
if ( $args['force'] ) {
$queue->unlock();
}
$buffer = $this->get_buffer( $queue, $number_of_items );
}
// Check that the $buffer is not checkout out already.
if ( is_wp_error( $buffer ) ) {
return new WP_Error( 'buffer_open', "We couldn't get the buffer it is currently checked out", 400 );
}
if ( ! is_object( $buffer ) ) {
return new WP_Error( 'buffer_non-object', 'Buffer is not an object', 400 );
}
$encode = isset( $args['encode'] ) ? $args['encode'] : true;
Settings::set_is_syncing( true );
list( $items_to_send, $skipped_items_ids ) = $sender->get_items_to_send( $buffer, $encode );
Settings::set_is_syncing( false );
return array(
'buffer_id' => $buffer->id,
'items' => $items_to_send,
'skipped_items' => $skipped_items_ids,
'codec' => $encode ? $sender->get_codec()->name() : null,
'sent_timestamp' => time(),
);
}
/**
* Adds Sync items to local property.
*/
public function jetpack_sync_send_data_listener() {
foreach ( func_get_args()[0] as $key => $item ) {
$this->items[ $key ] = $item;
}
}
/**
* Check out a buffer of full sync actions.
*
* @return array Sync Actions to be returned to requestor
*/
public function immediate_full_sync_pull() {
// try to give ourselves as much time as possible.
set_time_limit( 0 );
$original_send_data_cb = array( 'Automattic\Jetpack\Sync\Actions', 'send_data' );
$temp_send_data_cb = array( $this, 'jetpack_sync_send_data_listener' );
Sender::get_instance()->set_enqueue_wait_time( 0 );
remove_filter( 'jetpack_sync_send_data', $original_send_data_cb );
add_filter( 'jetpack_sync_send_data', $temp_send_data_cb, 10, 6 );
Sender::get_instance()->do_full_sync();
remove_filter( 'jetpack_sync_send_data', $temp_send_data_cb );
add_filter( 'jetpack_sync_send_data', $original_send_data_cb, 10, 6 );
return array(
'items' => $this->items,
'codec' => Sender::get_instance()->get_codec()->name(),
'sent_timestamp' => time(),
'status' => Actions::get_sync_status(),
);
}
/**
* Checkout items out of the sync queue.
*
* @param Queue $queue Sync Queue.
* @param int $number_of_items Number of items to checkout.
*
* @return WP_Error
*/
protected function get_buffer( $queue, $number_of_items ) {
$start = time();
$max_duration = 5; // this will try to get the buffer.
$buffer = $queue->checkout( $number_of_items );
$duration = time() - $start;
while ( is_wp_error( $buffer ) && $duration < $max_duration ) {
sleep( 2 );
$duration = time() - $start;
$buffer = $queue->checkout( $number_of_items );
}
if ( false === $buffer ) {
return new WP_Error( 'queue_size', 'The queue is empty and there is nothing to send', 400 );
}
return $buffer;
}
}