@@ -27,49 +27,57 @@ import (
2727//
2828// It logs the specific reason for scaling decisions, helping in traceability and fine-tuning system performance.
2929func (ch * ConcurrencyHandler ) EvaluateAndAdjustConcurrency (resp * http.Response , responseTime time.Duration ) {
30- // Call monitoring functions
3130 rateLimitFeedback := ch .MonitorRateLimitHeaders (resp )
3231 responseCodeFeedback := ch .MonitorServerResponseCodes (resp )
3332 responseTimeFeedback := ch .MonitorResponseTimeVariability (responseTime )
3433
35- // Log the feedback from each monitoring function for debugging
36- ch .logger .Debug ("Concurrency Adjustment Feedback" ,
37- zap .Int ("RateLimitFeedback" , rateLimitFeedback ),
38- zap .Int ("ResponseCodeFeedback" , responseCodeFeedback ),
39- zap .Int ("ResponseTimeFeedback" , responseTimeFeedback ))
40-
41- // Determine overall action based on feedback
42- suggestions := []int {rateLimitFeedback , responseCodeFeedback , responseTimeFeedback }
43- scaleDownCount := 0
44- scaleUpCount := 0
45-
46- for _ , suggestion := range suggestions {
47- switch suggestion {
48- case - 1 :
49- scaleDownCount ++
50- case 1 :
51- scaleUpCount ++
52- }
53- }
34+ // Compute the weighted feedback
35+ weightedFeedback := float64 (rateLimitFeedback )* WeightRateLimit +
36+ float64 (responseCodeFeedback )* WeightResponseCodes +
37+ float64 (responseTimeFeedback )* WeightResponseTime
5438
55- // Log the counts for scale down and up suggestions
56- ch .logger .Info ("Scaling Decision Counts" ,
57- zap .Int ("ScaleDownCount" , scaleDownCount ),
58- zap .Int ("ScaleUpCount" , scaleUpCount ))
39+ // Log the feedback and weighted result for debugging
40+ ch .logger .Debug ("Concurrency Adjustment Feedback" ,
41+ zap .Float64 ("WeightedFeedback" , weightedFeedback ))
5942
60- // Decide on scaling action
61- if scaleDownCount > scaleUpCount {
62- ch .logger .Info ("Scaling down the concurrency" , zap .String ( "Reason " , "More signals suggested to decrease concurrency" ))
43+ // Apply thresholds to determine scaling action
44+ if weightedFeedback <= ThresholdScaleDown {
45+ ch .logger .Info ("Scaling down the concurrency" , zap .Float64 ( "WeightedFeedback " , weightedFeedback ))
6346 ch .ScaleDown ()
64- } else if scaleUpCount > scaleDownCount {
65- ch .logger .Info ("Scaling up the concurrency" , zap .String ( "Reason " , "More signals suggested to increase concurrency" ))
47+ } else if weightedFeedback >= ThresholdScaleUp {
48+ ch .logger .Info ("Scaling up the concurrency" , zap .Float64 ( "WeightedFeedback " , weightedFeedback ))
6649 ch .ScaleUp ()
6750 } else {
68- ch .logger .Info ("No change in concurrency" , zap .String ( "Reason " , "Equal signals for both scaling up and down" ))
51+ ch .logger .Info ("Maintaining current concurrency level " , zap .Float64 ( "WeightedFeedback " , weightedFeedback ))
6952 }
7053}
7154
7255// MonitorRateLimitHeaders monitors the rate limit headers in the response and suggests a concurrency adjustment.
56+ // func (ch *ConcurrencyHandler) MonitorRateLimitHeaders(resp *http.Response) int {
57+ // remaining := resp.Header.Get("X-RateLimit-Remaining")
58+ // retryAfter := resp.Header.Get("Retry-After")
59+ // suggestion := 0
60+
61+ // if remaining != "" {
62+ // remainingValue, err := strconv.Atoi(remaining)
63+ // if err == nil && remainingValue < 10 {
64+ // // Suggest decrease concurrency if X-RateLimit-Remaining is below the threshold
65+ // suggestion = -1
66+ // }
67+ // }
68+
69+ // if retryAfter != "" {
70+ // // Suggest decrease concurrency if Retry-After is specified
71+ // suggestion = -1
72+ // } else {
73+ // // Suggest increase concurrency if currently below maximum limit and no other decrease suggestion has been made
74+ // if len(ch.sem) < MaxConcurrency && suggestion == 0 {
75+ // suggestion = 1
76+ // }
77+ // }
78+
79+ // return suggestion
80+ // }
7381func (ch * ConcurrencyHandler ) MonitorRateLimitHeaders (resp * http.Response ) int {
7482 remaining := resp .Header .Get ("X-RateLimit-Remaining" )
7583 retryAfter := resp .Header .Get ("Retry-After" )
@@ -78,99 +86,128 @@ func (ch *ConcurrencyHandler) MonitorRateLimitHeaders(resp *http.Response) int {
7886 if remaining != "" {
7987 remainingValue , err := strconv .Atoi (remaining )
8088 if err == nil && remainingValue < 10 {
81- // Suggest decrease concurrency if X-RateLimit-Remaining is below the threshold
8289 suggestion = - 1
8390 }
8491 }
8592
8693 if retryAfter != "" {
87- // Suggest decrease concurrency if Retry-After is specified
8894 suggestion = - 1
89- } else {
90- // Suggest increase concurrency if currently below maximum limit and no other decrease suggestion has been made
91- if len (ch .sem ) < MaxConcurrency && suggestion == 0 {
92- suggestion = 1
93- }
9495 }
9596
9697 return suggestion
9798}
9899
99100// MonitorServerResponseCodes monitors the response status codes and suggests a concurrency adjustment.
101+ // func (ch *ConcurrencyHandler) MonitorServerResponseCodes(resp *http.Response) int {
102+ // statusCode := resp.StatusCode
103+
104+ // // Lock the metrics to ensure thread safety
105+ // ch.Metrics.Lock.Lock()
106+ // defer ch.Metrics.Lock.Unlock()
107+
108+ // // Update the appropriate error count based on the response status code
109+ // switch {
110+ // case statusCode >= 500 && statusCode < 600:
111+ // ch.Metrics.TotalRateLimitErrors++
112+ // case statusCode >= 400 && statusCode < 500:
113+ // // Assuming 4xx errors as client errors
114+ // ch.Metrics.TotalRetries++
115+ // }
116+
117+ // // Calculate error rate
118+ // totalRequests := float64(ch.Metrics.TotalRequests)
119+ // totalErrors := float64(ch.Metrics.TotalRateLimitErrors + ch.Metrics.TotalRetries)
120+ // errorRate := totalErrors / totalRequests
121+
122+ // // Set the new error rate in the metrics
123+ // ch.Metrics.ResponseCodeMetrics.ErrorRate = errorRate
124+
125+ // // Determine action based on the error rate
126+ // if errorRate > ErrorRateThreshold {
127+ // // Suggest decrease concurrency
128+ // return -1
129+ // } else if errorRate <= ErrorRateThreshold && len(ch.sem) < MaxConcurrency {
130+ // // Suggest increase concurrency if there is capacity
131+ // return 1
132+ // }
133+ // return 0
134+ // }
100135func (ch * ConcurrencyHandler ) MonitorServerResponseCodes (resp * http.Response ) int {
101136 statusCode := resp .StatusCode
102-
103- // Lock the metrics to ensure thread safety
104137 ch .Metrics .Lock .Lock ()
105138 defer ch .Metrics .Lock .Unlock ()
106139
107- // Update the appropriate error count based on the response status code
108- switch {
109- case statusCode >= 500 && statusCode < 600 :
140+ if statusCode >= 500 {
110141 ch .Metrics .TotalRateLimitErrors ++
111- case statusCode >= 400 && statusCode < 500 :
112- // Assuming 4xx errors as client errors
142+ return - 1
143+ } else if statusCode >= 400 {
113144 ch .Metrics .TotalRetries ++
114- }
115-
116- // Calculate error rate
117- totalRequests := float64 (ch .Metrics .TotalRequests )
118- totalErrors := float64 (ch .Metrics .TotalRateLimitErrors + ch .Metrics .TotalRetries )
119- errorRate := totalErrors / totalRequests
120-
121- // Set the new error rate in the metrics
122- ch .Metrics .ResponseCodeMetrics .ErrorRate = errorRate
123-
124- // Determine action based on the error rate
125- if errorRate > ErrorRateThreshold {
126- // Suggest decrease concurrency
127145 return - 1
128- } else if errorRate <= ErrorRateThreshold && len (ch .sem ) < MaxConcurrency {
129- // Suggest increase concurrency if there is capacity
130- return 1
131146 }
147+
132148 return 0
133149}
134150
135151// MonitorResponseTimeVariability monitors the response time variability and suggests a concurrency adjustment.
152+ // func (ch *ConcurrencyHandler) MonitorResponseTimeVariability(responseTime time.Duration) int {
153+ // ch.Metrics.Lock.Lock()
154+ // defer ch.Metrics.Lock.Unlock()
155+
156+ // // Update ResponseTimeVariability metrics
157+ // ch.Metrics.ResponseTimeVariability.Lock.Lock()
158+ // defer ch.Metrics.ResponseTimeVariability.Lock.Unlock()
159+ // ch.Metrics.ResponseTimeVariability.Total += responseTime
160+ // ch.Metrics.ResponseTimeVariability.Count++
161+
162+ // // Calculate average response time
163+ // ch.Metrics.ResponseTimeVariability.Average = ch.Metrics.ResponseTimeVariability.Total / time.Duration(ch.Metrics.ResponseTimeVariability.Count)
164+
165+ // // Calculate variance of response times
166+ // ch.Metrics.ResponseTimeVariability.Variance = ch.calculateVariance(ch.Metrics.ResponseTimeVariability.Average, responseTime)
167+
168+ // // Calculate standard deviation of response times
169+ // stdDev := math.Sqrt(ch.Metrics.ResponseTimeVariability.Variance)
170+
171+ // // Determine action based on standard deviation
172+ // if stdDev > ch.Metrics.ResponseTimeVariability.StdDevThreshold {
173+ // // Suggest decrease concurrency
174+ // return -1
175+ // } else if stdDev <= ch.Metrics.ResponseTimeVariability.StdDevThreshold && len(ch.sem) < MaxConcurrency {
176+ // // Suggest increase concurrency if there is capacity
177+ // return 1
178+ // }
179+ // return 0
180+ // }
136181func (ch * ConcurrencyHandler ) MonitorResponseTimeVariability (responseTime time.Duration ) int {
137182 ch .Metrics .Lock .Lock ()
138183 defer ch .Metrics .Lock .Unlock ()
139184
140- // Update ResponseTimeVariability metrics
141- ch .Metrics .ResponseTimeVariability .Lock .Lock ()
142- defer ch .Metrics .ResponseTimeVariability .Lock .Unlock ()
185+ // Update total response time and count
143186 ch .Metrics .ResponseTimeVariability .Total += responseTime
144187 ch .Metrics .ResponseTimeVariability .Count ++
145188
146- // Calculate average response time
147- ch . Metrics . ResponseTimeVariability . Average = ch .Metrics .ResponseTimeVariability .Total / time .Duration (ch .Metrics .ResponseTimeVariability .Count )
189+ // Calculate the average response time
190+ averageResponseTime : = ch .Metrics .ResponseTimeVariability .Total / time .Duration (ch .Metrics .ResponseTimeVariability .Count )
148191
149- // Calculate variance of response times
150- ch .Metrics .ResponseTimeVariability .Variance = ch .calculateVariance (ch .Metrics .ResponseTimeVariability .Average , responseTime )
192+ // Calculate variance
193+ variance := ch .calculateVariance (averageResponseTime , responseTime )
194+ // Calculate standard deviation
195+ stdDev := math .Sqrt (variance )
151196
152- // Calculate standard deviation of response times
153- stdDev := math . Sqrt ( ch . Metrics . ResponseTimeVariability . Variance )
197+ // Convert MaxAcceptableResponseTimeVariability to seconds for comparison
198+ maxStdDev := MaxAcceptableResponseTimeVariability . Seconds ( )
154199
155- // Determine action based on standard deviation
156- if stdDev > ch .Metrics .ResponseTimeVariability .StdDevThreshold {
157- // Suggest decrease concurrency
158- return - 1
159- } else if stdDev <= ch .Metrics .ResponseTimeVariability .StdDevThreshold && len (ch .sem ) < MaxConcurrency {
160- // Suggest increase concurrency if there is capacity
161- return 1
200+ if stdDev > maxStdDev {
201+ return - 1 // Suggest to decrease concurrency if stdDev exceeds the maximum threshold
162202 }
163- return 0
203+ return 1 // Suggest to increase concurrency if stdDev is within the acceptable range
164204}
165205
166- // calculateVariance calculates the variance of response times.
167- func (ch * ConcurrencyHandler ) calculateVariance (averageResponseTime time.Duration , responseTime time.Duration ) float64 {
168- // Convert time.Duration values to seconds
169- averageSeconds := averageResponseTime .Seconds ()
170- responseSeconds := responseTime .Seconds ()
171-
172- // Calculate variance
173- variance := (float64 (ch .Metrics .ResponseTimeVariability .Count - 1 )* math .Pow (averageSeconds - responseSeconds , 2 ) + ch .Metrics .ResponseTimeVariability .Variance ) / float64 (ch .Metrics .ResponseTimeVariability .Count )
174- ch .Metrics .ResponseTimeVariability .Variance = variance
175- return variance
206+ // calculateVariance calculates the variance between the average response time and a new sample.
207+ func (ch * ConcurrencyHandler ) calculateVariance (average , newSample time.Duration ) float64 {
208+ mean := average .Seconds () // Convert to seconds
209+ newValue := newSample .Seconds () // Convert to seconds
210+ newVariance := (float64 (ch .Metrics .ResponseTimeVariability .Count - 1 )* math .Pow (mean - newValue , 2 ) + ch .Metrics .ResponseTimeVariability .Variance ) / float64 (ch .Metrics .ResponseTimeVariability .Count )
211+ ch .Metrics .ResponseTimeVariability .Variance = newVariance // Update the variance in metrics
212+ return newVariance
176213}
0 commit comments