TSocketPool.php 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300
  1. <?php
  2. /*
  3. * Licensed to the Apache Software Foundation (ASF) under one
  4. * or more contributor license agreements. See the NOTICE file
  5. * distributed with this work for additional information
  6. * regarding copyright ownership. The ASF licenses this file
  7. * to you under the Apache License, Version 2.0 (the
  8. * "License"); you may not use this file except in compliance
  9. * with the License. You may obtain a copy of the License at
  10. *
  11. * http://www.apache.org/licenses/LICENSE-2.0
  12. *
  13. * Unless required by applicable law or agreed to in writing,
  14. * software distributed under the License is distributed on an
  15. * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  16. * KIND, either express or implied. See the License for the
  17. * specific language governing permissions and limitations
  18. * under the License.
  19. *
  20. * @package thrift.transport
  21. */
  22. namespace Thrift\Transport;
  23. use Thrift\Exception\TException;
  24. /**
  25. * This library makes use of APC cache to make hosts as down in a web
  26. * environment. If you are running from the CLI or on a system without APC
  27. * installed, then these null functions will step in and act like cache
  28. * misses.
  29. */
  30. if (!function_exists('apc_fetch')) {
  31. function apc_fetch($key) { return FALSE; }
  32. function apc_store($key, $var, $ttl=0) { return FALSE; }
  33. }
  34. /**
  35. * Sockets implementation of the TTransport interface that allows connection
  36. * to a pool of servers.
  37. *
  38. * @package thrift.transport
  39. */
  40. class TSocketPool extends TSocket
  41. {
  42. /**
  43. * Remote servers. Array of associative arrays with 'host' and 'port' keys
  44. */
  45. private $servers_ = array();
  46. /**
  47. * How many times to retry each host in connect
  48. *
  49. * @var int
  50. */
  51. private $numRetries_ = 1;
  52. /**
  53. * Retry interval in seconds, how long to not try a host if it has been
  54. * marked as down.
  55. *
  56. * @var int
  57. */
  58. private $retryInterval_ = 60;
  59. /**
  60. * Max consecutive failures before marking a host down.
  61. *
  62. * @var int
  63. */
  64. private $maxConsecutiveFailures_ = 1;
  65. /**
  66. * Try hosts in order? or Randomized?
  67. *
  68. * @var bool
  69. */
  70. private $randomize_ = true;
  71. /**
  72. * Always try last host, even if marked down?
  73. *
  74. * @var bool
  75. */
  76. private $alwaysTryLast_ = true;
  77. /**
  78. * Socket pool constructor
  79. *
  80. * @param array $hosts List of remote hostnames
  81. * @param mixed $ports Array of remote ports, or a single common port
  82. * @param bool $persist Whether to use a persistent socket
  83. * @param mixed $debugHandler Function for error logging
  84. */
  85. public function __construct($hosts=array('localhost'),
  86. $ports=array(9090),
  87. $persist=false,
  88. $debugHandler=null) {
  89. parent::__construct(null, 0, $persist, $debugHandler);
  90. if (!is_array($ports)) {
  91. $port = $ports;
  92. $ports = array();
  93. foreach ($hosts as $key => $val) {
  94. $ports[$key] = $port;
  95. }
  96. }
  97. foreach ($hosts as $key => $host) {
  98. $this->servers_ []= array('host' => $host,
  99. 'port' => $ports[$key]);
  100. }
  101. }
  102. /**
  103. * Add a server to the pool
  104. *
  105. * This function does not prevent you from adding a duplicate server entry.
  106. *
  107. * @param string $host hostname or IP
  108. * @param int $port port
  109. */
  110. public function addServer($host, $port)
  111. {
  112. $this->servers_[] = array('host' => $host, 'port' => $port);
  113. }
  114. /**
  115. * Sets how many time to keep retrying a host in the connect function.
  116. *
  117. * @param int $numRetries
  118. */
  119. public function setNumRetries($numRetries)
  120. {
  121. $this->numRetries_ = $numRetries;
  122. }
  123. /**
  124. * Sets how long to wait until retrying a host if it was marked down
  125. *
  126. * @param int $numRetries
  127. */
  128. public function setRetryInterval($retryInterval)
  129. {
  130. $this->retryInterval_ = $retryInterval;
  131. }
  132. /**
  133. * Sets how many time to keep retrying a host before marking it as down.
  134. *
  135. * @param int $numRetries
  136. */
  137. public function setMaxConsecutiveFailures($maxConsecutiveFailures)
  138. {
  139. $this->maxConsecutiveFailures_ = $maxConsecutiveFailures;
  140. }
  141. /**
  142. * Turns randomization in connect order on or off.
  143. *
  144. * @param bool $randomize
  145. */
  146. public function setRandomize($randomize)
  147. {
  148. $this->randomize_ = $randomize;
  149. }
  150. /**
  151. * Whether to always try the last server.
  152. *
  153. * @param bool $alwaysTryLast
  154. */
  155. public function setAlwaysTryLast($alwaysTryLast)
  156. {
  157. $this->alwaysTryLast_ = $alwaysTryLast;
  158. }
  159. /**
  160. * Connects the socket by iterating through all the servers in the pool
  161. * and trying to find one that works.
  162. */
  163. public function open()
  164. {
  165. // Check if we want order randomization
  166. if ($this->randomize_) {
  167. shuffle($this->servers_);
  168. }
  169. // Count servers to identify the "last" one
  170. $numServers = count($this->servers_);
  171. for ($i = 0; $i < $numServers; ++$i) {
  172. // This extracts the $host and $port variables
  173. extract($this->servers_[$i]);
  174. // Check APC cache for a record of this server being down
  175. $failtimeKey = 'thrift_failtime:'.$host.':'.$port.'~';
  176. // Cache miss? Assume it's OK
  177. $lastFailtime = apc_fetch($failtimeKey);
  178. if ($lastFailtime === FALSE) {
  179. $lastFailtime = 0;
  180. }
  181. $retryIntervalPassed = false;
  182. // Cache hit...make sure enough the retry interval has elapsed
  183. if ($lastFailtime > 0) {
  184. $elapsed = time() - $lastFailtime;
  185. if ($elapsed > $this->retryInterval_) {
  186. $retryIntervalPassed = true;
  187. if ($this->debug_) {
  188. call_user_func($this->debugHandler_,
  189. 'TSocketPool: retryInterval '.
  190. '('.$this->retryInterval_.') '.
  191. 'has passed for host '.$host.':'.$port);
  192. }
  193. }
  194. }
  195. // Only connect if not in the middle of a fail interval, OR if this
  196. // is the LAST server we are trying, just hammer away on it
  197. $isLastServer = false;
  198. if ($this->alwaysTryLast_) {
  199. $isLastServer = ($i == ($numServers - 1));
  200. }
  201. if (($lastFailtime === 0) ||
  202. ($isLastServer) ||
  203. ($lastFailtime > 0 && $retryIntervalPassed)) {
  204. // Set underlying TSocket params to this one
  205. $this->host_ = $host;
  206. $this->port_ = $port;
  207. // Try up to numRetries_ connections per server
  208. for ($attempt = 0; $attempt < $this->numRetries_; $attempt++) {
  209. try {
  210. // Use the underlying TSocket open function
  211. parent::open();
  212. // Only clear the failure counts if required to do so
  213. if ($lastFailtime > 0) {
  214. apc_store($failtimeKey, 0);
  215. }
  216. // Successful connection, return now
  217. return;
  218. } catch (TException $tx) {
  219. // Connection failed
  220. }
  221. }
  222. // Mark failure of this host in the cache
  223. $consecfailsKey = 'thrift_consecfails:'.$host.':'.$port.'~';
  224. // Ignore cache misses
  225. $consecfails = apc_fetch($consecfailsKey);
  226. if ($consecfails === FALSE) {
  227. $consecfails = 0;
  228. }
  229. // Increment by one
  230. $consecfails++;
  231. // Log and cache this failure
  232. if ($consecfails >= $this->maxConsecutiveFailures_) {
  233. if ($this->debug_) {
  234. call_user_func($this->debugHandler_,
  235. 'TSocketPool: marking '.$host.':'.$port.
  236. ' as down for '.$this->retryInterval_.' secs '.
  237. 'after '.$consecfails.' failed attempts.');
  238. }
  239. // Store the failure time
  240. apc_store($failtimeKey, time());
  241. // Clear the count of consecutive failures
  242. apc_store($consecfailsKey, 0);
  243. } else {
  244. apc_store($consecfailsKey, $consecfails);
  245. }
  246. }
  247. }
  248. // Oh no; we failed them all. The system is totally ill!
  249. $error = 'TSocketPool: All hosts in pool are down. ';
  250. $hosts = array();
  251. foreach ($this->servers_ as $server) {
  252. $hosts []= $server['host'].':'.$server['port'];
  253. }
  254. $hostlist = implode(',', $hosts);
  255. $error .= '('.$hostlist.')';
  256. if ($this->debug_) {
  257. call_user_func($this->debugHandler_, $error);
  258. }
  259. throw new TException($error);
  260. }
  261. }