diff --git a/src/nsqphp/Lookup/Nsqlookupd.php b/src/nsqphp/Lookup/Nsqlookupd.php index 4a0ebc0..6cbafdd 100644 --- a/src/nsqphp/Lookup/Nsqlookupd.php +++ b/src/nsqphp/Lookup/Nsqlookupd.php @@ -49,6 +49,15 @@ public function __construct($hosts = NULL, $connectionTimeout = 1, $responseTime } else { $this->hosts = explode(',', $hosts); } + + foreach ($this->hosts as &$host) { + // ensure host; otherwise go with default (:4161) + if (strpos($host, ':') === FALSE) { + $host .= ':4161'; + } + } + unset( $host ); + $this->connectionTimeout = $connectionTimeout; $this->responseTimeout = $responseTimeout; } @@ -57,10 +66,7 @@ public function __construct($hosts = NULL, $connectionTimeout = 1, $responseTime * Lookup hosts for a given topic * * @param string $topic - * - * @throws LookupException If we cannot talk to / get back invalid response - * from nsqlookupd - * + * * @return array Should return array [] = host:port */ public function lookupHosts($topic) @@ -68,35 +74,9 @@ public function lookupHosts($topic) $lookupHosts = array(); foreach ($this->hosts as $host) { - // ensure host; otherwise go with default (:4161) - if (strpos($host, ':') === FALSE) { - $host .= ':4161'; - } - $url = "http://{$host}/lookup?topic=" . urlencode($topic); - $ch = curl_init($url); - $options = array( - CURLOPT_RETURNTRANSFER => TRUE, - CURLOPT_HEADER => FALSE, - CURLOPT_FOLLOWLOCATION => FALSE, - CURLOPT_ENCODING => '', - CURLOPT_USERAGENT => 'nsqphp', - CURLOPT_CONNECTTIMEOUT => $this->connectionTimeout, - CURLOPT_TIMEOUT => $this->responseTimeout, - CURLOPT_FAILONERROR => TRUE - ); - curl_setopt_array($ch, $options); - $r = curl_exec($ch); - $r = json_decode($r, TRUE); - - // don't fail since we can't distinguish between bad topic and general failure - /* - if (!is_array($r)) { - throw new LookupException( - "Error talking to nsqlookupd via $url" - ); - }*/ - + $r = $this->_request( $url ); + $producers = isset($r['data'], $r['data']['producers']) ? $r['data']['producers'] : array(); foreach ($producers as $prod) { if (isset($prod['address'])) { @@ -108,10 +88,68 @@ public function lookupHosts($topic) if (!in_array($h, $lookupHosts)) { $lookupHosts[] = $h; } - + } } return $lookupHosts; } + + /** + * List all known topics + * + * @return array Should return array [] = string + */ + public function topics() + { + $topics = array(); + + foreach ($this->hosts as $host) { + $url = "http://{$host}/topics"; + $r = $this->_request( $url ); + + $hostTopics = isset($r['data'], $r['data']['topics']) ? $r['data']['topics'] : array(); + $topics = array_merge($topics, $hostTopics); + } + + return $topics; + } + + /** + * Make an http request to nsqlookupd + * + * @param string $url + * + * @throws LookupException If we cannot talk to / get back invalid response + * from nsqlookupd + * + * @return array Should return json-decoded response payload + */ + protected function _request( $url ) { + $ch = curl_init($url); + $options = array( + CURLOPT_RETURNTRANSFER => TRUE, + CURLOPT_HEADER => FALSE, + CURLOPT_FOLLOWLOCATION => FALSE, + CURLOPT_ENCODING => '', + CURLOPT_USERAGENT => 'nsqphp', + CURLOPT_CONNECTTIMEOUT => $this->connectionTimeout, + CURLOPT_TIMEOUT => $this->responseTimeout, + CURLOPT_FAILONERROR => TRUE + ); + + curl_setopt_array($ch, $options); + $r = curl_exec($ch); + $r = json_decode($r, TRUE); + + // don't fail since we can't distinguish between bad topic and general failure + /* + if (!is_array($r)) { + throw new LookupException( + "Error talking to nsqlookupd via $url" + ); + }*/ + + return $r; + } }