module Sequel::ConnectionExpiration
Attributes
The maximum number of seconds that will be added as a random delay to the expiration timeout Defaults to 0 seconds (no random delay).
The number of seconds that need to pass since connection creation before expiring a connection. Defaults to 14400 seconds (4 hours).
Public Class Methods
Source
# File lib/sequel/extensions/connection_expiration.rb 47 def self.extended(pool) 48 case pool.pool_type 49 when :threaded, :sharded_threaded, :timed_queue, :sharded_timed_queue 50 nil 51 else 52 raise Error, "cannot load connection_expiration extension if using single or sharded_single connection pool (or other unsupported connection pool)" 53 end 54 55 pool.instance_exec do 56 sync do 57 @connection_expiration_timestamps ||= {} 58 @connection_expiration_timeout ||= 14400 59 @connection_expiration_random_delay ||= 0 60 61 # Record an expiration timestamp for any connections that already 62 # exist in the pool, so that a connection opened before the extension 63 # was loaded (e.g. via Sequel.connect) will eventually be expired. 64 register = method(:register_connection_expiration_time) 65 66 case pool_type 67 when :timed_queue, :sharded_timed_queue 68 register_queued_connections = lambda do |queue| 69 conns = [] 70 while conn = queue.pop(timeout: 0) 71 conns << conn 72 end 73 conns.each do |conn| 74 queue.push(register_connection_expiration_time(conn)) 75 end 76 end 77 end 78 79 case pool_type 80 when :threaded 81 @available_connections.each(®ister) 82 @allocated.each_value(®ister) 83 when :sharded_threaded 84 @available_connections.each_value{|conns| conns.each(®ister)} 85 @allocated.each_value{|threads| threads.each_value(®ister)} 86 when :timed_queue 87 register_queued_connections.call(@queue) 88 @allocated.each_value(®ister) 89 else # when :sharded_timed_queue 90 @queues.each_value(®ister_queued_connections) 91 @allocated.each_value{|threads| threads.each_value(®ister)} 92 end 93 end 94 end 95 end
Initialize the data structures used by this extension.
Private Instance Methods
Source
# File lib/sequel/extensions/connection_expiration.rb 119 def acquire(*a) 120 conn = nil 121 1.times do 122 if (conn = super) && 123 (cet = sync{@connection_expiration_timestamps[conn]}) && 124 Sequel.elapsed_seconds_since(cet[0]) > cet[1] 125 126 case pool_type 127 when :sharded_threaded, :sharded_timed_queue 128 sync{@allocated[a.last].delete(Sequel.current)} 129 else 130 sync{@allocated.delete(Sequel.current)} 131 end 132 133 disconnect_acquired_connection(conn, *a) 134 redo 135 end 136 end 137 138 conn 139 end
When acquiring a connection, check if the connection is expired. If it is expired, disconnect the connection, and retry with a new connection.
Calls superclass method
Source
# File lib/sequel/extensions/connection_expiration.rb 100 def disconnect_connection(conn) 101 sync{@connection_expiration_timestamps.delete(conn)} 102 super 103 end
Clean up expiration timestamps during disconnect.
Calls superclass method
Source
# File lib/sequel/extensions/connection_expiration.rb 106 def make_new(*) 107 register_connection_expiration_time(super) 108 end
Record the time the connection was created.
Calls superclass method
Source
# File lib/sequel/extensions/connection_expiration.rb 111 def register_connection_expiration_time(conn) 112 @connection_expiration_timestamps[conn] ||= [Sequel.start_timer, @connection_expiration_timeout + (rand * @connection_expiration_random_delay)].freeze 113 conn 114 end
Record an expiration entry for a connection, returns the connection.