Skip to content
This repository was archived by the owner on Feb 12, 2020. It is now read-only.
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 71 additions & 33 deletions src/nsqphp/Lookup/Nsqlookupd.php
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,15 @@ public function __construct($hosts = NULL, $connectionTimeout = 1, $responseTime
} else {
$this->hosts = explode(',', $hosts);
}

foreach ($this->hosts as &$host) {
// ensure host; otherwise go with default (:4161)
if (strpos($host, ':') === FALSE) {
$host .= ':4161';
}
}
unset( $host );

$this->connectionTimeout = $connectionTimeout;
$this->responseTimeout = $responseTimeout;
}
Expand All @@ -57,46 +66,17 @@ public function __construct($hosts = NULL, $connectionTimeout = 1, $responseTime
* Lookup hosts for a given topic
*
* @param string $topic
*
* @throws LookupException If we cannot talk to / get back invalid response
* from nsqlookupd
*
*
* @return array Should return array [] = host:port
*/
public function lookupHosts($topic)
{
$lookupHosts = array();

foreach ($this->hosts as $host) {
// ensure host; otherwise go with default (:4161)
if (strpos($host, ':') === FALSE) {
$host .= ':4161';
}

$url = "http://{$host}/lookup?topic=" . urlencode($topic);
$ch = curl_init($url);
$options = array(
CURLOPT_RETURNTRANSFER => TRUE,
CURLOPT_HEADER => FALSE,
CURLOPT_FOLLOWLOCATION => FALSE,
CURLOPT_ENCODING => '',
CURLOPT_USERAGENT => 'nsqphp',
CURLOPT_CONNECTTIMEOUT => $this->connectionTimeout,
CURLOPT_TIMEOUT => $this->responseTimeout,
CURLOPT_FAILONERROR => TRUE
);
curl_setopt_array($ch, $options);
$r = curl_exec($ch);
$r = json_decode($r, TRUE);

// don't fail since we can't distinguish between bad topic and general failure
/*
if (!is_array($r)) {
throw new LookupException(
"Error talking to nsqlookupd via $url"
);
}*/

$r = $this->_request( $url );

$producers = isset($r['data'], $r['data']['producers']) ? $r['data']['producers'] : array();
foreach ($producers as $prod) {
if (isset($prod['address'])) {
Expand All @@ -108,10 +88,68 @@ public function lookupHosts($topic)
if (!in_array($h, $lookupHosts)) {
$lookupHosts[] = $h;
}

}
}

return $lookupHosts;
}

/**
* List all known topics
*
* @return array Should return array [] = string
*/
public function topics()
{
$topics = array();

foreach ($this->hosts as $host) {
$url = "http://{$host}/topics";
$r = $this->_request( $url );

$hostTopics = isset($r['data'], $r['data']['topics']) ? $r['data']['topics'] : array();
$topics = array_merge($topics, $hostTopics);
}

return $topics;
}

/**
* Make an http request to nsqlookupd
*
* @param string $url
*
* @throws LookupException If we cannot talk to / get back invalid response
* from nsqlookupd
*
* @return array Should return json-decoded response payload
*/
protected function _request( $url ) {
$ch = curl_init($url);
$options = array(
CURLOPT_RETURNTRANSFER => TRUE,
CURLOPT_HEADER => FALSE,
CURLOPT_FOLLOWLOCATION => FALSE,
CURLOPT_ENCODING => '',
CURLOPT_USERAGENT => 'nsqphp',
CURLOPT_CONNECTTIMEOUT => $this->connectionTimeout,
CURLOPT_TIMEOUT => $this->responseTimeout,
CURLOPT_FAILONERROR => TRUE
);

curl_setopt_array($ch, $options);
$r = curl_exec($ch);
$r = json_decode($r, TRUE);

// don't fail since we can't distinguish between bad topic and general failure
/*
if (!is_array($r)) {
throw new LookupException(
"Error talking to nsqlookupd via $url"
);
}*/

return $r;
}
}