diff --git a/.gitignore b/.gitignore index e94340b..d1e705b 100644 --- a/.gitignore +++ b/.gitignore @@ -22,6 +22,4 @@ Examples/reprocessDataServer.py *.ini *.stats -tests/ - newsletter.py diff --git a/app.py b/app.py index 7a636cf..a1ce13a 100644 --- a/app.py +++ b/app.py @@ -9,12 +9,13 @@ import glob from datetime import datetime, timedelta import numpy as np -from utilsAPI import getAPIURL, getWorkerType, getASInstance, unprotect_current_instance, get_number_of_pending_trials +from utilsAPI import getAPIURL, getWorkerType, getErrorLogBool, getASInstance, unprotect_current_instance, get_number_of_pending_trials from utilsAuth import getToken from utils import (getDataDirectory, checkTime, checkResourceUsage, sendStatusEmail, checkForTrialsWithStatus, getCommitHash, getHostname, postLocalClientInfo, - postProcessedDuration) + postProcessedDuration, makeRequestWithRetry, + writeToErrorLog) logging.basicConfig(level=logging.INFO) @@ -24,6 +25,9 @@ autoScalingInstance = getASInstance() logging.info(f"AUTOSCALING TEST INSTANCE: {autoScalingInstance}") +ERROR_LOG = getErrorLogBool() +error_log_path = "/data/error_log.json" + # if true, will delete entire data directory when finished with a trial isDocker = True @@ -120,8 +124,10 @@ error_msg['error_msg'] = 'No videos uploaded. Ensure phones are connected and you have stable internet connection.' error_msg['error_msg_dev'] = 'No videos uploaded.' - r = requests.patch(trial_url, data={"status": "error", "meta": json.dumps(error_msg)}, - headers = {"Authorization": "Token {}".format(API_TOKEN)}) + r = makeRequestWithRetry('PATCH', + trial_url, + data={"status": "error", "meta": json.dumps(error_msg)}, + headers = {"Authorization": "Token {}".format(API_TOKEN)}) continue # The following is now done in main, to allow reprocessing trials with missing videos @@ -149,15 +155,33 @@ # note a result needs to be posted for the API to know we finished, but we are posting them # automatically thru procesTrial now - r = requests.patch(trial_url, data={"status": "done"}, - headers = {"Authorization": "Token {}".format(API_TOKEN)}) + r = makeRequestWithRetry('PATCH', + trial_url, + data={"status": "done"}, + headers = {"Authorization": "Token {}".format(API_TOKEN)}) + logging.info('0.5s pause if need to restart.') time.sleep(0.5) except Exception as e: - r = requests.patch(trial_url, data={"status": "error"}, - headers = {"Authorization": "Token {}".format(API_TOKEN)}) - traceback.print_exc() + try: + r = makeRequestWithRetry('PATCH', + trial_url, data={"status": "error"}, + headers = {"Authorization": "Token {}".format(API_TOKEN)}) + traceback.print_exc() + + if ERROR_LOG: + stack = traceback.format_exc() + writeToErrorLog(error_log_path, trial["session"], trial["id"], + e, stack) + + except: + traceback.print_exc() + + if ERROR_LOG: + stack = traceback.format_exc() + writeToErrorLog(error_log_path, trial["session"], trial["id"], + e, stack) # Antoine: Removing this, it is too often causing the machines to stop. Not because # the machines are failing, but because for instance the video is very long with a lot @@ -172,8 +196,16 @@ finally: # End process duration timer and post duration to database - process_end_time = datetime.now() - postProcessedDuration(trial_url, process_end_time - process_start_time) + try: + process_end_time = datetime.now() + postProcessedDuration(trial_url, process_end_time - process_start_time) + except Exception as e: + traceback.print_exc() + + if ERROR_LOG: + stack = traceback.format_exc() + writeToErrorLog(error_log_path, trial["session"], trial["id"], + e, stack) justProcessed = True @@ -182,4 +214,4 @@ folders = glob.glob(os.path.join(getDataDirectory(isDocker=True),'Data','*')) for f in folders: shutil.rmtree(f) - logging.info('deleting ' + f) \ No newline at end of file + logging.info('deleting ' + f) diff --git a/docker/Makefile b/docker/Makefile index fda7902..ff2197b 100644 --- a/docker/Makefile +++ b/docker/Makefile @@ -3,6 +3,10 @@ REPO_NAME := opencap PROD_BRANCH := main DEV_BRANCH := dev +# Initialize variables if not passed in +INSTANCE_ID ?= 0 +CPU_SET ?= "" + # Determine the branch name CURRENT_BRANCH := $(shell git rev-parse --abbrev-ref HEAD) @@ -68,12 +72,13 @@ endif .PHONY: run run: -ifeq ($(CURRENT_BRANCH),$(PROD_BRANCH)) - aws ecr get-login-password --region us-west-2 --profile opencap | docker login --username AWS --password-stdin 660440363484.dkr.ecr.us-west-2.amazonaws.com - -else ifeq ($(CURRENT_BRANCH),$(DEV_BRANCH)) - aws ecr get-login-password --region us-west-2 --profile opencap | docker login --username AWS --password-stdin 660440363484.dkr.ecr.us-west-2.amazonaws.com - -endif - - OPENCAP_IMAGE_NAME=$(OPENCAP_IMAGE_NAME) OPENPOSE_IMAGE_NAME=$(OPENPOSE_IMAGE_NAME) MMPOSE_IMAGE_NAME=$(MMPOSE_IMAGE_NAME) docker-compose up \ No newline at end of file + @echo "Usage: sudo make run INSTANCE_ID= CPU_SET=" + @echo "Defaults: INSTANCE_ID=0, CPU_SET=\"\"" + + COMPOSE_PROJECT_NAME=opencap_$(INSTANCE_ID) \ + OPENCAP_IMAGE_NAME=$(OPENCAP_IMAGE_NAME) \ + OPENPOSE_IMAGE_NAME=$(OPENPOSE_IMAGE_NAME) \ + MMPOSE_IMAGE_NAME=$(MMPOSE_IMAGE_NAME) \ + INSTANCE_ID=$(INSTANCE_ID) \ + CPU_SET=$(CPU_SET) \ + docker compose up -d diff --git a/docker/check-containers-health.sh b/docker/check-containers-health.sh new file mode 100755 index 0000000..e87cfc0 --- /dev/null +++ b/docker/check-containers-health.sh @@ -0,0 +1,34 @@ +#!/bin/bash + +# Function to check if a container is running +is_container_alive() { + local container_name=$1 + docker ps --filter "name=^/${container_name}$" --filter "status=running" --format '{{.Names}}' | grep -wq "$container_name" + return $? +} + +# Loop through numbers 0 to 7 +for n in {0..7}; do + # Container names + opencap_openpose="opencap_${n}-openpose-1" + opencap_mmpose="opencap_${n}-mmpose-1" + opencap_mobilecap="opencap_${n}-mobilecap-1" + + # Check if all three containers are alive + if is_container_alive "$opencap_openpose" && \ + is_container_alive "$opencap_mmpose" && \ + is_container_alive "$opencap_mobilecap"; then + echo "All containers for instance $n are alive. Skipping." + continue + fi + + # Check if any container exists + if docker ps -a --filter "name=^/opencap_${n}-(openpose|mmpose|mobilecap)-1$" --format '{{.Names}}' | grep -q "opencap_${n}"; then + echo "Some containers for instance $n are not alive. Stopping instance." + ./stop-container.sh "$n" + ./start-container.sh "$n" + else + echo "No containers for instance $n. Skipping." + fi + +done diff --git a/docker/docker-compose.yaml b/docker/docker-compose.yaml index 169ff40..55acdbd 100644 --- a/docker/docker-compose.yaml +++ b/docker/docker-compose.yaml @@ -13,8 +13,14 @@ services: reservations: devices: - driver: nvidia - count: 1 capabilities: [gpu] + device_ids: ["${INSTANCE_ID}"] + cpuset: "${CPU_SET}" + logging: + driver: "json-file" + options: + max-size: "100m" # Rotate when the log reaches 10MB + max-file: "7" # Keep the last 7 log files openpose: image: ${OPENPOSE_IMAGE_NAME} volumes: @@ -24,8 +30,14 @@ services: reservations: devices: - driver: nvidia - count: 1 capabilities: [gpu] + device_ids: ["${INSTANCE_ID}"] + cpuset: "${CPU_SET}" + logging: + driver: "json-file" + options: + max-size: "100m" # Rotate when the log reaches 10MB + max-file: "7" # Keep the last 7 log files mmpose: image: ${MMPOSE_IMAGE_NAME} volumes: @@ -35,7 +47,14 @@ services: reservations: devices: - driver: nvidia - count: 1 capabilities: [gpu] + device_ids: ["${INSTANCE_ID}"] + cpuset: "${CPU_SET}" + logging: + driver: "json-file" + options: + max-size: "100m" # Rotate when the log reaches 10MB + max-file: "7" # Keep the last 7 log files + volumes: data: {} diff --git a/docker/start-container.sh b/docker/start-container.sh new file mode 100755 index 0000000..18a3dd3 --- /dev/null +++ b/docker/start-container.sh @@ -0,0 +1,51 @@ +#!/bin/bash + +# Configuration +MAX_INSTANCES=8 +CPUS_PER_INSTANCE=14 +GPUS_PER_INSTANCE=1 + +# Get the total number of CPUs and GPUs available +TOTAL_CPUS=$(nproc) +TOTAL_GPUS=$(nvidia-smi --query-gpu=name --format=csv,noheader | wc -l) + +# Check if an instance number is provided +if [ -z "$1" ]; then + echo "Usage: $0 " + echo "Provide the instance number to start (0 to $((MAX_INSTANCES - 1)))." + exit 1 +fi + +INSTANCE_NUMBER=$1 + +# Validate the instance number +if (( INSTANCE_NUMBER < 0 || INSTANCE_NUMBER >= MAX_INSTANCES )); then + echo "Error: Instance number must be between 0 and $((MAX_INSTANCES - 1))." + exit 1 +fi + +# Compute CPU and GPU offsets for the selected instance +CPU_START=$(( INSTANCE_NUMBER * CPUS_PER_INSTANCE )) +CPU_END=$(( CPU_START + CPUS_PER_INSTANCE - 1 )) +CPU_SET="${CPU_START}-${CPU_END}" + +# Validate resource availability +if (( CPU_START + CPUS_PER_INSTANCE > TOTAL_CPUS )); then + echo "Error: Not enough CPUs available for instance $INSTANCE_NUMBER." + exit 1 +fi + +if (( INSTANCE_NUMBER >= TOTAL_GPUS )); then + echo "Error: Not enough GPUs available for instance $INSTANCE_NUMBER." + exit 1 +fi + +# Start the specific instance +echo "Starting instance $INSTANCE_NUMBER with CPU_SET=${CPU_SET} and GPU=${INSTANCE_NUMBER}" + +# Run docker-compose for the specific instance +make run INSTANCE_ID=$INSTANCE_NUMBER CPU_SET=$CPU_SET + +sleep 10 + +echo "Instance $INSTANCE_NUMBER started successfully." diff --git a/docker/start-containers.sh b/docker/start-containers.sh new file mode 100755 index 0000000..fb281e3 --- /dev/null +++ b/docker/start-containers.sh @@ -0,0 +1,60 @@ +#!/bin/bash + +# Configuration +MAX_INSTANCES=8 +CPUS_PER_INSTANCE=14 +GPUS_PER_INSTANCE=1 + +# Get the total number of CPUs and GPUs available +TOTAL_CPUS=$(nproc) +TOTAL_GPUS=$(nvidia-smi --query-gpu=name --format=csv,noheader | wc -l) + +# Read number of instances to start +if [ -z "$1" ]; then + echo "Usage: $0 " + echo "Provide the number of instances to start (max $MAX_INSTANCES)." + exit 1 +fi + +NUM_INSTANCES=$1 + +# Validate the number of instances +if (( NUM_INSTANCES > MAX_INSTANCES )); then + echo "Error: Maximum number of instances is $MAX_INSTANCES." + exit 1 +fi + +# Check if there are enough resources +if (( NUM_INSTANCES * CPUS_PER_INSTANCE > TOTAL_CPUS )); then + echo "Error: Not enough CPUs. Required: $((NUM_INSTANCES * CPUS_PER_INSTANCE)), Available: $TOTAL_CPUS." + exit 1 +fi + +if (( NUM_INSTANCES * GPUS_PER_INSTANCE > TOTAL_GPUS )); then + echo "Error: Not enough GPUs. Required: $((NUM_INSTANCES * GPUS_PER_INSTANCE)), Available: $TOTAL_GPUS." + exit 1 +fi + +# Display summary +echo "Starting $NUM_INSTANCES instances..." +echo "Total CPUs: $TOTAL_CPUS (using $CPUS_PER_INSTANCE per instance)" +echo "Total GPUs: $TOTAL_GPUS (using $GPUS_PER_INSTANCE per instance)" +echo + +# Start instances +for (( i=0; i" + exit 1 +fi + +INSTANCE_ID=$1 +COMPOSE_PROJECT_NAME="opencap_${INSTANCE_ID}" + +echo "Stopping and removing containers for INSTANCE_ID=${INSTANCE_ID}..." + +# Stop and remove containers associated with the project +docker-compose \ + --project-name $COMPOSE_PROJECT_NAME \ + down + +# Verify if containers are removed +if [ $? -eq 0 ]; then + echo "Successfully stopped and removed containers for INSTANCE_ID=${INSTANCE_ID}." +else + echo "Failed to stop and remove containers for INSTANCE_ID=${INSTANCE_ID}." +fi + diff --git a/main.py b/main.py index aba3d62..678fc63 100644 --- a/main.py +++ b/main.py @@ -303,26 +303,18 @@ def main(sessionName, trialName, trial_id, cameras_to_use=['all'], trialRelativePath = os.path.join('InputMedia', trialName, trial_id) if runPoseDetection: - # Detect if checkerboard is upside down. - upsideDownChecker = isCheckerboardUpsideDown(CamParamDict) # Get rotation angles from motion capture environment to OpenSim. # Space-fixed are lowercase, Body-fixed are uppercase. checkerBoardMount = sessionMetadata['checkerBoard']['placement'] - if checkerBoardMount == 'backWall' and not upsideDownChecker: - rotationAngles = {'y':90, 'z':180} - elif checkerBoardMount == 'backWall' and upsideDownChecker: - rotationAngles = {'y':-90} - elif checkerBoardMount == 'backWall_largeCB': - rotationAngles = {'y':-90} - # TODO: uppercase? - elif checkerBoardMount == 'backWall_walking': - rotationAngles = {'YZ':(-90,180)} - elif checkerBoardMount == 'ground': - rotationAngles = {'x':-90, 'y':90} - elif checkerBoardMount == 'ground_jumps': # for sub1 - rotationAngles = {'x':90, 'y':180} - elif checkerBoardMount == 'ground_gaits': # for sub1 - rotationAngles = {'x':90, 'y':90} + if checkerBoardMount == 'backWall' or checkerBoardMount == 'Perpendicular': + # Detect if checkerboard is upside down. + upsideDownChecker = isCheckerboardUpsideDown(CamParamDict) + if upsideDownChecker: + rotationAngles = {'y':-90} + else: + rotationAngles = {'y':90, 'z':180} + elif checkerBoardMount == 'ground' or checkerBoardMount == 'Lying': + rotationAngles = {'x':90, 'y':90} else: raise Exception('checkerBoard placement value in\ sessionMetadata.yaml is not currently supported') diff --git a/requirements.txt b/requirements.txt index 3d62633..0937e2f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -15,4 +15,5 @@ pingouin==0.5.2 openpyxl ffmpeg-python psutil -boto3 \ No newline at end of file +boto3 +pytest diff --git a/tests/test_api.py b/tests/test_api.py new file mode 100644 index 0000000..e9e38c0 --- /dev/null +++ b/tests/test_api.py @@ -0,0 +1,78 @@ +import logging +import pytest +import requests +from unittest.mock import patch, Mock, ANY +from http.client import HTTPMessage + +from utils import makeRequestWithRetry + +class TestMakeRequestWithRetry: + logging.getLogger('urllib3').setLevel(logging.DEBUG) + + @patch("requests.Session.request") + def test_get(self, mock_response): + status_code = 200 + mock_response.return_value.status_code = status_code + + response = makeRequestWithRetry('GET', 'https://test.com', retries=2) + assert response.status_code == status_code + mock_response.assert_called_once_with('GET', 'https://test.com', + headers=None, + data=None, + params=None, + files=None) + + @patch("requests.Session.request") + def test_put(self, mock_response): + status_code = 201 + mock_response.return_value.status_code = status_code + + data = { + "key1": "value1", + "key2": "value2" + } + + params = { + "param1": "value1" + } + + response = makeRequestWithRetry('POST', + 'https://test.com', + data=data, + headers={"Authorization": "my_token"}, + params=params, + retries=2) + + assert response.status_code == status_code + mock_response.assert_called_once_with('POST', + 'https://test.com', + data=data, + headers={"Authorization": "my_token"}, + params=params, + files=None) + + @patch("urllib3.connectionpool.HTTPConnectionPool._get_conn") + def test_success_after_retries(self, mock_response): + mock_response.return_value.getresponse.side_effect = [ + Mock(status=500, msg=HTTPMessage()), + Mock(status=502, msg=HTTPMessage()), + Mock(status=200, msg=HTTPMessage()), + Mock(status=429, msg=HTTPMessage()), + ] + + response = makeRequestWithRetry('GET', + 'https://test.com', + retries=5, + backoff_factor=0.1) + + assert response.status_code == 200 + assert mock_response.call_count == 3 + + # comment out test since httpbin can be unstable and we don't want to rely + # on it for tests. uncomment and see debug log to see retry attempts + '''def test_httpbin(self): + response = makeRequestWithRetry('GET', + 'https://httpbin.org/status/500', + retries=4, + backoff_factor=0.1) + ''' \ No newline at end of file diff --git a/utils.py b/utils.py index 3f85d39..1c2445f 100644 --- a/utils.py +++ b/utils.py @@ -12,10 +12,12 @@ import subprocess import zipfile import time +import datetime import numpy as np import pandas as pd from scipy import signal +from urllib3.util.retry import Retry from utilsAuth import getToken from utilsAPI import getAPIURL @@ -102,13 +104,17 @@ def download_file(url, file_name): shutil.copyfileobj(response, out_file) def getTrialJson(trial_id): - trialJson = requests.get(API_URL + "trials/{}/".format(trial_id), - headers = {"Authorization": "Token {}".format(API_TOKEN)}).json() + response = makeRequestWithRetry('GET', + API_URL + "trials/{}/".format(trial_id), + headers = {"Authorization": "Token {}".format(API_TOKEN)}) + trialJson = response.json() return trialJson def getSessionJson(session_id): - sessionJson = requests.get(API_URL + "sessions/{}/".format(session_id), - headers = {"Authorization": "Token {}".format(API_TOKEN)}).json() + response = makeRequestWithRetry('GET', + API_URL + "sessions/{}/".format(session_id), + headers = {"Authorization": "Token {}".format(API_TOKEN)}) + sessionJson = response.json() # sort trials by time recorded def getCreatedAt(trial): @@ -118,8 +124,10 @@ def getCreatedAt(trial): return sessionJson def getSubjectJson(subject_id): - subjectJson = requests.get(API_URL + "subjects/{}/".format(subject_id), - headers = {"Authorization": "Token {}".format(API_TOKEN)}).json() + response = makeRequestWithRetry('GET', + API_URL + "subjects/{}/".format(subject_id), + headers = {"Authorization": "Token {}".format(API_TOKEN)}) + subjectJson = response.json() return subjectJson def getTrialName(trial_id): @@ -182,8 +190,10 @@ def postCalibrationOptions(session_path,session_id,overwrite=False): "meta":json.dumps({'calibration':calibOptionsJson}) } trial_url = "{}{}{}/".format(API_URL, "trials/", calibration_id) - r= requests.patch(trial_url, data=data, - headers = {"Authorization": "Token {}".format(API_TOKEN)}) + r = makeRequestWithRetry('PATCH', + trial_url, + data=data, + headers = {"Authorization": "Token {}".format(API_TOKEN)}) if r.status_code == 200: print('Wrote calibration selections to metadata.') @@ -360,8 +370,7 @@ def getMetadataFromServer(session_id,justCheckerParams=False): session_desc = importMetadata(defaultMetadataPath) # Get session-specific metadata from api. - - session = getSessionJson(session_id) + session = getSessionJson(session_id) if session['meta'] is not None: if not justCheckerParams: # Backward compatibility @@ -370,7 +379,7 @@ def getMetadataFromServer(session_id,justCheckerParams=False): session_desc["mass_kg"] = float(session['meta']['subject']['mass']) session_desc["height_m"] = float(session['meta']['subject']['height']) if 'gender' in session['meta']['subject']: - session_desc["gender_mf"] = session['meta']['subject']['gender'] + session_desc["gender_mf"] = getGendersDict().get(session['meta']['subject']['gender']) # Before implementing the subject feature, the posemodel was stored # in session['meta']['subject']. After implementing the subject # feature, the posemodel is stored in session['meta']['settings'] @@ -404,7 +413,7 @@ def getMetadataFromServer(session_id,justCheckerParams=False): session_desc["subjectID"] = subject_info['name'] session_desc["mass_kg"] = subject_info['weight'] session_desc["height_m"] = subject_info['height'] - session_desc["gender_mf"] = subject_info['gender'] + session_desc["gender_mf"] = getGendersDict().get(subject_info['gender']) try: session_desc["posemodel"] = session['meta']['settings']['posemodel'] except: @@ -457,8 +466,9 @@ def deleteResult(trial_id, tag=None,resultNum=None): resultNums = [r['id'] for r in trial['results']] for rNum in resultNums: - requests.delete(API_URL + "results/{}/".format(rNum), - headers = {"Authorization": "Token {}".format(API_TOKEN)}) + makeRequestWithRetry('DELETE', + API_URL + "results/{}/".format(rNum), + headers = {"Authorization": "Token {}".format(API_TOKEN)}) def deleteAllResults(session_id): @@ -686,8 +696,10 @@ def changeSessionMetadata(session_ids,newMetaDict): data = {"meta":json.dumps(existingMeta)} - r= requests.patch(session_url, data=data, - headers = {"Authorization": "Token {}".format(API_TOKEN)}) + r = makeRequestWithRetry('PATCH', + session_url, + data=data, + headers = {"Authorization": "Token {}".format(API_TOKEN)}) if r.status_code !=200: print('Changing metadata failed.') @@ -734,9 +746,11 @@ def makeSessionPublic(session_id,publicStatus=True): data = { "public":publicStatus } - - r= requests.patch(session_url, data=data, - headers = {"Authorization": "Token {}".format(API_TOKEN)}) + + r = makeRequestWithRetry('PATCH', + session_url, + data=data, + headers = {"Authorization": "Token {}".format(API_TOKEN)}) if r.status_code == 200: print('Successfully made ' + session_id + ' public.') @@ -860,11 +874,17 @@ def postFileToTrial(filePath,trial_id,tag,device_id): # get S3 link data = {'fileName':os.path.split(filePath)[1]} - r = requests.get(API_URL + "sessions/null/get_presigned_url/",data=data).json() + response = makeRequestWithRetry('GET', + API_URL + "sessions/null/get_presigned_url/", + data=data) + r = response.json() # upload to S3 files = {'file': open(filePath, 'rb')} - requests.post(r['url'], data=r['fields'],files=files) + makeRequestWithRetry('POST', + r['url'], + data=r['fields'], + files=files) files["file"].close() # post link to and data to results @@ -875,8 +895,10 @@ def postFileToTrial(filePath,trial_id,tag,device_id): "media_url" : r['fields']['key'] } - rResult = requests.post(API_URL + "results/", data=data, - headers = {"Authorization": "Token {}".format(API_TOKEN)}) + rResult = makeRequestWithRetry('POST', + API_URL + "results/", + data=data, + headers = {"Authorization": "Token {}".format(API_TOKEN)}) if rResult.status_code != 201: print('server response was + ' + str(r.status_code)) @@ -1483,8 +1505,11 @@ def checkForTrialsWithStatus(status,hours=9999999,relativeTime='newer'): 'justNumber':1, 'relativeTime':relativeTime} - r = requests.get(API_URL+"trials/get_trials_with_status/",params=params, - headers = {"Authorization": "Token {}".format(API_TOKEN)}).json() + response = makeRequestWithRetry('GET', + API_URL+"trials/get_trials_with_status/", + params=params, + headers = {"Authorization": "Token {}".format(API_TOKEN)}) + r = response.json() return r['nTrials'] @@ -1561,11 +1586,42 @@ def checkCudaTF(): sendStatusEmail(message=message) raise Exception("No GPU detected. Exiting.") +def writeToJsonLog(path, new_dict, max_entries=1000): + dir_name = os.path.dirname(path) + if not os.path.exists(dir_name): + os.makedirs(dir_name) + + if os.path.exists(path): + with open(path, 'r') as f: + data = json.load(f) + else: + data = [] + + data.append(new_dict) + + while len(data) > max_entries: + data.pop(0) + + with open(path, 'w') as f: + json.dump(data, f) + +def writeToErrorLog(path, session_id, trial_id, error, stack, max_entries=1000): + error_entry = { + 'session_id': session_id, + 'trial_id': trial_id, + 'datetime': datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%d %H:%M:%S"), + 'error': str(error), + 'stack': stack + } + writeToJsonLog(path, error_entry, max_entries) + # %% Some functions for loading subject data def getSubjectNumber(subjectName): - subjects = requests.get(API_URL + "subjects/", - headers = {"Authorization": "Token {}".format(API_TOKEN)}).json() + response = makeRequestWithRetry('GET', + API_URL + "subjects/", + headers = {"Authorization": "Token {}".format(API_TOKEN)}) + subjects = response.json() sNum = [s['id'] for s in subjects if s['name'] == subjectName] if len(sNum)>1: print(len(sNum) + ' subjects with the name ' + subjectName + '. Will use the first one.') @@ -1575,8 +1631,10 @@ def getSubjectNumber(subjectName): return sNum[0] def getUserSessions(): - sessionJson = requests.get(API_URL + "sessions/valid/", - headers = {"Authorization": "Token {}".format(API_TOKEN)}).json() + response = makeRequestWithRetry('GET', + API_URL + "sessions/valid/", + headers = {"Authorization": "Token {}".format(API_TOKEN)}) + sessionJson = response.json() return sessionJson def getSubjectSessions(subjectName): @@ -1617,6 +1675,16 @@ def get_entry_with_largest_number(trialList): return max_entry +def getGendersDict(): + genders_dict = { + "woman": "Woman", + "man": "Man", + "transgender": "Transgender", + "non-binary": "Non-Binary/Non-Conforming", + "prefer-not-respond": "Prefer not to respond", + } + return genders_dict + # Get local client info and update def getCommitHash(): @@ -1638,8 +1706,10 @@ def postLocalClientInfo(trial_url): "git_commit": getCommitHash(), "hostname": getHostname() } - r = requests.patch(trial_url, data=data, - headers = {"Authorization": "Token {}".format(API_TOKEN)}) + r = makeRequestWithRetry('PATCH', + trial_url, + data=data, + headers = {"Authorization": "Token {}".format(API_TOKEN)}) return r @@ -1650,7 +1720,49 @@ def postProcessedDuration(trial_url, duration): data = { "processed_duration": duration } - r = requests.patch(trial_url, data=data, - headers = {"Authorization": "Token {}".format(API_TOKEN)}) + r = makeRequestWithRetry('PATCH', + trial_url, + data=data, + headers = {"Authorization": "Token {}".format(API_TOKEN)}) return r + +# utils for common HTTP requests +def makeRequestWithRetry(method, url, + headers=None, data=None, params=None, files=None, + retries=5, backoff_factor=1): + """ + Makes an HTTP request with retry logic and returns the Response object. + + Args: + method (str): HTTP method (e.g., 'GET', 'POST', 'PUT', etc.) as used in + requests.Session().request() + url (str): The endpoint URL. + headers (dict): Headers to include in the request. + data (dict): Data to send in the request body. + params (dict): URL query parameters. + retries (int): Number of retry attempts. + backoff_factor (float): Backoff factor for exponential delays. + + Returns: + requests.Response: The response object for further processing. + """ + retry_strategy = Retry( + total=retries, + backoff_factor=backoff_factor, + status_forcelist=[429, 500, 502, 503, 504], + allowed_methods={'DELETE', 'GET', 'POST', 'PUT', 'PATCH'} + ) + + adapter = requests.adapters.HTTPAdapter(max_retries=retry_strategy) + with requests.Session() as session: + session.mount("https://", adapter) + response = session.request(method, + url, + headers=headers, + data=data, + params=params, + files=files) + response.raise_for_status() + return response + diff --git a/utilsAPI.py b/utilsAPI.py index 982bf4f..1535406 100644 --- a/utilsAPI.py +++ b/utilsAPI.py @@ -48,6 +48,9 @@ def getStatusEmails(): return emailInfo +def getErrorLogBool(): + return config('ERROR_LOG', default=False, cast=bool) + def getASInstance(): try: # Check if the ECS_CONTAINER_METADATA_FILE environment variable exists diff --git a/utilsChecker.py b/utilsChecker.py index ee8a390..41f8f93 100644 --- a/utilsChecker.py +++ b/utilsChecker.py @@ -24,6 +24,7 @@ from utilsCameraPy3 import Camera, nview_linear_triangulations from utils import getOpenPoseMarkerNames, getOpenPoseFaceMarkers from utils import numpy2TRC, rewriteVideos, delete_multiple_element,loadCameraParameters +from utils import makeRequestWithRetry from utilsAPI import getAPIURL from utilsAuth import getToken @@ -198,8 +199,9 @@ def computeAverageIntrinsics(session_path,trialIDs,CheckerBoardParams,nImages=25 camModels = [] for trial_id in trialIDs: - resp = requests.get(API_URL + "trials/{}/".format(trial_id), - headers = {"Authorization": "Token {}".format(API_TOKEN)}) + resp = makeRequestWithRetry('GET', + API_URL + "trials/{}/".format(trial_id), + headers = {"Authorization": "Token {}".format(API_TOKEN)}) trial = resp.json() camModels.append(trial['videos'][0]['parameters']['model']) trial_name = trial['name'] diff --git a/utilsDetector.py b/utilsDetector.py index 1e57dde..3155746 100644 --- a/utilsDetector.py +++ b/utilsDetector.py @@ -319,9 +319,13 @@ def runMMposeVideo( # copy /data/output to pathOutputPkl os.system("cp /data/output_mmpose/* {pathOutputPkl}/".format(pathOutputPkl=pathOutputPkl)) - pkl_path_tmp = os.path.join(pathOutputPkl, 'human.pkl') - os.rename(pkl_path_tmp, pklPath) - + pkl_path_tmp = os.path.join(pathOutputPkl, 'human.pkl') + if os.path.exists(pkl_path_tmp): + os.rename(pkl_path_tmp, pklPath) + else: + raise FileNotFoundError( + "We could not detect any pose in your video. Please verify that the subject is correctly in front of the camera." + ) except Exception as e: if len(e.args) == 2: # specific exception raise Exception(e.args[0], e.args[1]) diff --git a/utilsServer.py b/utilsServer.py index f97877c..116c539 100644 --- a/utilsServer.py +++ b/utilsServer.py @@ -4,6 +4,8 @@ import requests import json import logging +import time +import random from main import main from utils import getDataDirectory @@ -25,6 +27,7 @@ from utils import importMetadata from utils import checkAndGetPosePickles from utils import getTrialNameIdMapping +from utils import makeRequestWithRetry from utilsAuth import getToken from utilsAPI import getAPIURL @@ -67,8 +70,10 @@ def processTrial(session_id, trial_id, trial_type = 'dynamic', error_msg = {} error_msg['error_msg'] = e.args[0] error_msg['error_msg_dev'] = e.args[1] - _ = requests.patch(trial_url, data={"meta": json.dumps(error_msg)}, - headers = {"Authorization": "Token {}".format(API_TOKEN)}) + _ = makeRequestWithRetry('PATCH', + trial_url, + data={"meta": json.dumps(error_msg)}, + headers = {"Authorization": "Token {}".format(API_TOKEN)}) raise Exception('Calibration failed', e.args[0], e.args[1]) if not hasWritePermissions: @@ -143,8 +148,10 @@ def processTrial(session_id, trial_id, trial_type = 'dynamic', error_msg = {} error_msg['error_msg'] = e.args[0] error_msg['error_msg_dev'] = e.args[1] - _ = requests.patch(trial_url, data={"meta": json.dumps(error_msg)}, - headers = {"Authorization": "Token {}".format(API_TOKEN)}) + _ = makeRequestWithRetry('PATCH', + trial_url, + data={"meta": json.dumps(error_msg)}, + headers = {"Authorization": "Token {}".format(API_TOKEN)}) raise Exception('Static trial failed', e.args[0], e.args[1]) if not hasWritePermissions: @@ -233,8 +240,10 @@ def processTrial(session_id, trial_id, trial_type = 'dynamic', error_msg = {} error_msg['error_msg'] = e.args[0] error_msg['error_msg_dev'] = e.args[1] - _ = requests.patch(trial_url, data={"meta": json.dumps(error_msg)}, - headers = {"Authorization": "Token {}".format(API_TOKEN)}) + _ = makeRequestWithRetry('PATCH', + trial_url, + data={"meta": json.dumps(error_msg)}, + headers = {"Authorization": "Token {}".format(API_TOKEN)}) raise Exception('Dynamic trial failed.\n' + error_msg['error_msg_dev'], e.args[0], e.args[1]) if not hasWritePermissions: @@ -352,8 +361,10 @@ def batchReprocess(session_ids,calib_id,static_id,dynamic_trialNames,poseDetecto print('Processing ' + session_id) # check if write permissions (session owner or admin) - permissions = requests.get(API_URL + "sessions/{}/get_session_permission/".format(session_id), - headers = {"Authorization": "Token {}".format(API_TOKEN)}).json() + response = makeRequestWithRetry('GET', + API_URL + "sessions/{}/get_session_permission/".format(session_id), + headers = {"Authorization": "Token {}".format(API_TOKEN)}) + permissions = response.json() hasWritePermissions = permissions['isAdmin'] or permissions['isOwner'] @@ -373,13 +384,17 @@ def batchReprocess(session_ids,calib_id,static_id,dynamic_trialNames,poseDetecto hasWritePermissions = hasWritePermissions, cameras_to_use=cameras_to_use) statusData = {'status':'done'} - _ = requests.patch(API_URL + "trials/{}/".format(calib_id_toProcess), data=statusData, - headers = {"Authorization": "Token {}".format(API_TOKEN)}) + _ = makeRequestWithRetry('PATCH', + API_URL + "trials/{}/".format(calib_id_toProcess), + data=statusData, + headers = {"Authorization": "Token {}".format(API_TOKEN)}) except Exception as e: print(e) statusData = {'status':'error'} - _ = requests.patch(API_URL + "trials/{}/".format(calib_id_toProcess), data=statusData, - headers = {"Authorization": "Token {}".format(API_TOKEN)}) + _ = makeRequestWithRetry('PATCH', + API_URL + "trials/{}/".format(calib_id_toProcess), + data=statusData, + headers = {"Authorization": "Token {}".format(API_TOKEN)}) if static_id == None: static_id_toProcess = getNeutralTrialID(session_id) @@ -400,17 +415,22 @@ def batchReprocess(session_ids,calib_id,static_id,dynamic_trialNames,poseDetecto batchProcess = True, cameras_to_use=cameras_to_use) statusData = {'status':'done'} - _ = requests.patch(API_URL + "trials/{}/".format(static_id_toProcess), data=statusData, - headers = {"Authorization": "Token {}".format(API_TOKEN)}) + _ = makeRequestWithRetry('PATCH', + API_URL + "trials/{}/".format(static_id_toProcess), + data=statusData, + headers = {"Authorization": "Token {}".format(API_TOKEN)}) except Exception as e: print(e) statusData = {'status':'error'} - _ = requests.patch(API_URL + "trials/{}/".format(static_id_toProcess), data=statusData, - headers = {"Authorization": "Token {}".format(API_TOKEN)}) + _ = makeRequestWithRetry('PATCH', + API_URL + "trials/{}/".format(static_id_toProcess), + data=statusData, + headers = {"Authorization": "Token {}".format(API_TOKEN)}) if dynamic_ids == None: - - session = requests.get(API_URL + "sessions/{}/".format(session_id), - headers = {"Authorization": "Token {}".format(API_TOKEN)}).json() + response = makeRequestWithRetry('GET', + API_URL + "sessions/{}/".format(session_id), + headers = {"Authorization": "Token {}".format(API_TOKEN)}) + session = response.json() dynamic_ids_toProcess = [t['id'] for t in session['trials'] if (t['name'] != 'calibration' and t['name'] !='neutral')] else: if type(dynamic_ids) == str: @@ -433,43 +453,75 @@ def batchReprocess(session_ids,calib_id,static_id,dynamic_trialNames,poseDetecto cameras_to_use=cameras_to_use) statusData = {'status':'done'} - _ = requests.patch(API_URL + "trials/{}/".format(dID), data=statusData, - headers = {"Authorization": "Token {}".format(API_TOKEN)}) + _ = makeRequestWithRetry('PATCH', + API_URL + "trials/{}/".format(dID), + data=statusData, + headers = {"Authorization": "Token {}".format(API_TOKEN)}) except Exception as e: print(e) statusData = {'status':'error'} - _ = requests.patch(API_URL + "trials/{}/".format(dID), data=statusData, - headers = {"Authorization": "Token {}".format(API_TOKEN)}) + _ = makeRequestWithRetry('PATCH', + API_URL + "trials/{}/".format(dID), + data=statusData, + headers = {"Authorization": "Token {}".format(API_TOKEN)}) -def runTestSession(pose='all',isDocker=True): - trials = {} - - if not any(s in API_URL for s in ['dev.opencap', '127.0']) : # prod trials - trials['openpose'] = '3f2960c7-ca29-45b0-9be5-8d74db6131e5' # session ae2d50f1-537a-44f1-96a5-f5b7717452a3 - trials['hrnet'] = '299ca938-8765-4a84-9adf-6bdf0e072451' # session faef80d3-0c26-452c-a7be-28dbfe04178e - # trials['failure'] = '698162c8-3980-46e5-a3c5-8d4f081db4c4' # failed trial for testing - else: # dev trials - trials['openpose'] = '89d77579-8371-4760-a019-95f2c793622c' # session acd0e19c-6c86-4ba4-95fd-94b97229a926 - trials['hrnet'] = 'e0e02393-42ee-46d4-9ae1-a6fbb0b89c42' # session 3510c726-a1b8-4de4-a4a2-52b021b4aab2 - - if pose == 'all': - trialList = list(trials.values()) - else: - try: - trialList = [trials[pose]] - except: +def runTestSession(pose='all',isDocker=True,maxNumTries=3): + # We retry test sessions because different sometimes when different + # containers are processing the test trial, the API can change the + # URL, causing 404 errors. + numTries = 0 + while numTries < maxNumTries: + numTries += 1 + logging.info(f"Starting test trial attempt #{numTries} of {maxNumTries}") + trials = {} + + if not any(s in API_URL for s in ['dev.opencap', '127.0']) : # prod trials + trials['openpose'] = '3f2960c7-ca29-45b0-9be5-8d74db6131e5' # session ae2d50f1-537a-44f1-96a5-f5b7717452a3 + trials['hrnet'] = '299ca938-8765-4a84-9adf-6bdf0e072451' # session faef80d3-0c26-452c-a7be-28dbfe04178e + # trials['failure'] = '698162c8-3980-46e5-a3c5-8d4f081db4c4' # failed trial for testing + else: # dev trials + trials['openpose'] = '89d77579-8371-4760-a019-95f2c793622c' # session acd0e19c-6c86-4ba4-95fd-94b97229a926 + trials['hrnet'] = 'e0e02393-42ee-46d4-9ae1-a6fbb0b89c42' # session 3510c726-a1b8-4de4-a4a2-52b021b4aab2 + + if pose == 'all': trialList = list(trials.values()) + else: + try: + trialList = [trials[pose]] + except: + trialList = list(trials.values()) + + try: + for trial_id in trialList: + trial = getTrialJson(trial_id) + logging.info("Running status check on trial name: " + trial['name'] + "_" + str(trial_id) + "\n\n") + processTrial(trial["session"], trial_id, trial_type='static', isDocker=isDocker) + + logging.info("\n\n\nStatus check succeeded. \n\n") + return - try: - for trial_id in trialList: - trial = getTrialJson(trial_id) - logging.info("Running status check on trial name: " + trial['name'] + "_" + str(trial_id) + "\n\n") - processTrial(trial["session"], trial_id, trial_type='static', isDocker=isDocker) - except: - logging.info("test trial failed. stopping machine.") - # send email - message = "A backend OpenCap machine failed the status check. It has been stopped." - sendStatusEmail(message=message) - raise Exception('Failed status check. Stopped.') - - logging.info("\n\n\nStatus check succeeded. \n\n") + # Catch and re-enter while loop if it's an HTTPError (could be more + # than just 404 errors). Wait between 30 and 60 seconds before + # retrying. + except requests.exceptions.HTTPError as e: + if numTries < maxNumTries: + logging.info(f"test trial failed on try #{numTries} due to HTTPError. Retrying.") + wait_time = random.randint(30,60) + logging.info(f"waiting {wait_time} seconds then retrying...") + time.sleep(wait_time) + continue + else: + logging.info(f"test trial failed on try #{numTries} due to HTTPError.") + # send email + message = "A backend OpenCap machine failed the status check (HTTPError). It has been stopped." + sendStatusEmail(message=message) + raise Exception('Failed status check (HTTPError). Stopped.') + + # Catch other errors and stop + except: + logging.info("test trial failed. stopping machine.") + # send email + message = "A backend OpenCap machine failed the status check. It has been stopped." + sendStatusEmail(message=message) + raise Exception('Failed status check. Stopped.') + \ No newline at end of file