tvar.rb
5.91 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
require 'set'
require 'concurrent/synchronization'
module Concurrent
# A `TVar` is a transactional variable - a single-element container that
# is used as part of a transaction - see `Concurrent::atomically`.
#
# @!macro thread_safe_variable_comparison
#
# {include:file:doc/tvar.md}
class TVar < Synchronization::Object
safe_initialization!
# Create a new `TVar` with an initial value.
def initialize(value)
@value = value
@version = 0
@lock = Mutex.new
end
# Get the value of a `TVar`.
def value
Concurrent::atomically do
Transaction::current.read(self)
end
end
# Set the value of a `TVar`.
def value=(value)
Concurrent::atomically do
Transaction::current.write(self, value)
end
end
# @!visibility private
def unsafe_value # :nodoc:
@value
end
# @!visibility private
def unsafe_value=(value) # :nodoc:
@value = value
end
# @!visibility private
def unsafe_version # :nodoc:
@version
end
# @!visibility private
def unsafe_increment_version # :nodoc:
@version += 1
end
# @!visibility private
def unsafe_lock # :nodoc:
@lock
end
end
# Run a block that reads and writes `TVar`s as a single atomic transaction.
# With respect to the value of `TVar` objects, the transaction is atomic, in
# that it either happens or it does not, consistent, in that the `TVar`
# objects involved will never enter an illegal state, and isolated, in that
# transactions never interfere with each other. You may recognise these
# properties from database transactions.
#
# There are some very important and unusual semantics that you must be aware of:
#
# * Most importantly, the block that you pass to atomically may be executed
# more than once. In most cases your code should be free of
# side-effects, except for via TVar.
#
# * If an exception escapes an atomically block it will abort the transaction.
#
# * It is undefined behaviour to use callcc or Fiber with atomically.
#
# * If you create a new thread within an atomically, it will not be part of
# the transaction. Creating a thread counts as a side-effect.
#
# Transactions within transactions are flattened to a single transaction.
#
# @example
# a = new TVar(100_000)
# b = new TVar(100)
#
# Concurrent::atomically do
# a.value -= 10
# b.value += 10
# end
def atomically
raise ArgumentError.new('no block given') unless block_given?
# Get the current transaction
transaction = Transaction::current
# Are we not already in a transaction (not nested)?
if transaction.nil?
# New transaction
begin
# Retry loop
loop do
# Create a new transaction
transaction = Transaction.new
Transaction::current = transaction
# Run the block, aborting on exceptions
begin
result = yield
rescue Transaction::AbortError => e
transaction.abort
result = Transaction::ABORTED
rescue Transaction::LeaveError => e
transaction.abort
break result
rescue => e
transaction.abort
raise e
end
# If we can commit, break out of the loop
if result != Transaction::ABORTED
if transaction.commit
break result
end
end
end
ensure
# Clear the current transaction
Transaction::current = nil
end
else
# Nested transaction - flatten it and just run the block
yield
end
end
# Abort a currently running transaction - see `Concurrent::atomically`.
def abort_transaction
raise Transaction::AbortError.new
end
# Leave a transaction without committing or aborting - see `Concurrent::atomically`.
def leave_transaction
raise Transaction::LeaveError.new
end
module_function :atomically, :abort_transaction, :leave_transaction
private
class Transaction
ABORTED = Object.new
ReadLogEntry = Struct.new(:tvar, :version)
AbortError = Class.new(StandardError)
LeaveError = Class.new(StandardError)
def initialize
@read_log = []
@write_log = {}
end
def read(tvar)
Concurrent::abort_transaction unless valid?
if @write_log.has_key? tvar
@write_log[tvar]
else
@read_log.push(ReadLogEntry.new(tvar, tvar.unsafe_version))
tvar.unsafe_value
end
end
def write(tvar, value)
# Have we already written to this TVar?
unless @write_log.has_key? tvar
# Try to lock the TVar
unless tvar.unsafe_lock.try_lock
# Someone else is writing to this TVar - abort
Concurrent::abort_transaction
end
# If we previously wrote to it, check the version hasn't changed
@read_log.each do |log_entry|
if log_entry.tvar == tvar and tvar.unsafe_version > log_entry.version
Concurrent::abort_transaction
end
end
end
# Record the value written
@write_log[tvar] = value
end
def abort
unlock
end
def commit
return false unless valid?
@write_log.each_pair do |tvar, value|
tvar.unsafe_value = value
tvar.unsafe_increment_version
end
unlock
true
end
def valid?
@read_log.each do |log_entry|
unless @write_log.has_key? log_entry.tvar
if log_entry.tvar.unsafe_version > log_entry.version
return false
end
end
end
true
end
def unlock
@write_log.each_key do |tvar|
tvar.unsafe_lock.unlock
end
end
def self.current
Thread.current[:current_tvar_transaction]
end
def self.current=(transaction)
Thread.current[:current_tvar_transaction] = transaction
end
end
end