diff --git a/src/queue.c b/src/queue.c index a3286b9..26809aa 100644 --- a/src/queue.c +++ b/src/queue.c @@ -470,7 +470,8 @@ void needJobsForQueue(queue *q, int type) { if (now - q->needjobs_bcast_time > bcast_delay) { q->needjobs_bcast_time = now; - q->needjobs_bcast_attempt++; + if (bcast_delay != NEEDJOBS_BCAST_ALL_MAX_DELAY) + q->needjobs_bcast_attempt++; /* Cluster-wide broadcasts are just to discover nodes, * ask for a single job in this case. */ clusterSendNeedJobs(q->name,1,server.cluster->nodes); @@ -493,7 +494,8 @@ void needJobsForQueue(queue *q, int type) { num_responders > 0) { q->needjobs_adhoc_time = now; - q->needjobs_adhoc_attempt++; + if (adhoc_delay != NEEDJOBS_BCAST_ADHOC_MAX_DELAY) + q->needjobs_adhoc_attempt++; clusterSendNeedJobs(q->name,to_fetch,q->needjobs_responders); } } @@ -581,6 +583,7 @@ void receiveNeedJobs(clusterNode *node, robj *qname, uint32_t count) { * we have, but always at least a single job. */ if (qlen < count*2) replyjobs = qlen/2; if (replyjobs == 0) replyjobs = 1; + if (replyjobs > NEEDJOBS_MAX_REQUEST) replyjobs = NEEDJOBS_MAX_REQUEST; job *jobs[NEEDJOBS_MAX_REQUEST]; for (j = 0; j < replyjobs; j++) {