From a1cc8ab03f4362bd377466e6f8c2cfa05d992f47 Mon Sep 17 00:00:00 2001 From: rileykk Date: Mon, 5 Dec 2022 16:46:00 -0800 Subject: [PATCH 01/12] Initial iteration of tile check script --- CHANGELOG.md | 1 + analysis/setup.py | 10 +- tools/solr-cassandra-match/match.py | 252 ++++++++++++++++++++ tools/solr-cassandra-match/requirements.txt | 1 + 4 files changed, 261 insertions(+), 3 deletions(-) create mode 100644 tools/solr-cassandra-match/match.py create mode 100644 tools/solr-cassandra-match/requirements.txt diff --git a/CHANGELOG.md b/CHANGELOG.md index 2256ba74..39aa5d9e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,6 +25,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - SDAP-407: Added depth to `/domsresults` endpoint - Added documentation for building SDAP docker images - Prepared documentation for v1.0.0 release. +- Added tool to verify tile data matches between Solr & Cassandra ### Changed - SDAP-390: Changed `/doms` to `/cdms` and `doms_reader.py` to `cdms_reader.py` - domslist endpoint points to AWS insitu instead of doms insitu diff --git a/analysis/setup.py b/analysis/setup.py index 99cd707c..0b579bd1 100644 --- a/analysis/setup.py +++ b/analysis/setup.py @@ -17,12 +17,16 @@ import setuptools from subprocess import check_call, CalledProcessError -with open('../VERSION.txt', 'r') as f: - __version__ = f.read() +try: + with open('../VERSION.txt', 'r') as f: + __version__ = f.read() +except: + __version__ = '1.0.0' try: - check_call(['mamba', 'install', '-y', '-c', 'conda-forge', '--file', 'conda-requirements.txt']) + # check_call(['mamba', 'install', '-y', '-c', 'conda-forge', '--file', 'conda-requirements.txt']) + pass except (CalledProcessError, IOError) as e: print('Failed install with mamba; falling back to conda') try: diff --git a/tools/solr-cassandra-match/match.py b/tools/solr-cassandra-match/match.py new file mode 100644 index 00000000..dec3107d --- /dev/null +++ b/tools/solr-cassandra-match/match.py @@ -0,0 +1,252 @@ +import argparse +import json +import logging +import uuid +from typing import List, Tuple + +import cassandra.concurrent +from cassandra.auth import PlainTextAuthProvider +from cassandra.cluster import Cluster +from cassandra.policies import RoundRobinPolicy, TokenAwarePolicy +from six.moves import input +from solrcloudpy import SolrConnection, SearchOptions + +solr_connection = None +solr_collection = None +SOLR_UNIQUE_KEY = None + +cassandra_cluster = None +cassandra_session = None +cassandra_table = None + +logger = None + +PROCEED_THRESHOLD = 300000 + + +def init(args): + global solr_connection + solr_connection = SolrConnection(args.solr) + global solr_collection + solr_collection = solr_connection[args.collection] + global SOLR_UNIQUE_KEY + SOLR_UNIQUE_KEY = args.solrIdField + + dc_policy = RoundRobinPolicy() + token_policy = TokenAwarePolicy(dc_policy) + + if args.cassandraUsername and args.cassandraPassword: + auth_provider = PlainTextAuthProvider(username=args.cassandraUsername, password=args.cassandraPassword) + else: + auth_provider = None + + global cassandra_cluster + cassandra_cluster = Cluster(contact_points=args.cassandra, port=args.cassandraPort, + protocol_version=int(args.cassandraProtocolVersion), + load_balancing_policy=token_policy, + auth_provider=auth_provider) + global cassandra_session + cassandra_session = cassandra_cluster.connect(keyspace=args.cassandraKeyspace) + + global cassandra_table + cassandra_table = args.cassandraTable + + global logger + logging.basicConfig( + level=logging.DEBUG if args.verbose else logging.INFO, + format="%(asctime)s [%(levelname)s] [%(name)s::%(lineno)d] %(message)s" + ) + + logger = logging.getLogger(__name__) + + logger.setLevel(logging.DEBUG if args.verbose else logging.INFO) + logging.getLogger().handlers[0].setFormatter( + logging.Formatter( + fmt="%(asctime)s [%(levelname)s] [%(name)s::%(lineno)d] %(message)s", + datefmt="%Y-%m-%dT%H:%M:%S" + )) + + logging.getLogger('cassandra').setLevel(logging.CRITICAL) + + +def compare_page(args, start) -> Tuple[List, List, List, int]: + se = SearchOptions() + + logger.debug(f'Running solr query from {start} to {start + args.rows}') + + se.commonparams.fl(SOLR_UNIQUE_KEY).q('*:*').start(start).rows(args.rows) + + query = solr_collection.search(se) + docs = query.result.response.docs + + ids = [uuid.UUID(row[SOLR_UNIQUE_KEY]) for row in docs] + + statement = cassandra_session.prepare("SELECT tile_id FROM %s where tile_id=?" % cassandra_table) + + logger.debug('Starting Cassandra query') + results = cassandra.concurrent.execute_concurrent_with_args(cassandra_session, statement, [(uuid.UUID(str(id)),) for id in ids]) + + failed = [] + present = [] + extra = [] + + logger.debug('Processing Cassandra results') + for (success, result) in results: + if not success: + failed.append(result) + else: + rows = result.all() + + present.extend([row[0] for row in rows]) + + logger.debug('Finished processing Cassandra results') + + for id in present: + try: + ids.remove(id) + except: + extra.append(id) + + return ids, extra, failed, len(docs) + + +def do_comparison(args): + missing_cassandra = [] + missing_solr = [] + failed = [] + + se = SearchOptions() + + se.commonparams.rows(0).q('*:*') + + num_tiles = solr_collection.search(se).result.response.numFound + + logger.info(f'Found {num_tiles} tiles in Solr') + + limit = num_tiles + + if args.limit: + limit = min(num_tiles, args.limit) + + if limit >= PROCEED_THRESHOLD: + do_continue = input(f"There are a large number of tile IDs to check. Do you wish to proceed? [y]/n: ") + + while do_continue not in ['y', 'n', '']: + do_continue = input(f"There are a large number of tile IDs to check. Do you wish to proceed? [y]/n: ") + + if do_continue == 'n': + do_continue = input(f"Do you wish to proceed with a limit (300 thousand)? [y]/n: ") + + while do_continue not in ['y', 'n', '']: + do_continue = input(f"Do you wish to proceed with a limit (300 thousand)? [y]/n: ") + + if do_continue == 'n': + logger.info('Exiting...') + exit(0) + else: + limit = PROCEED_THRESHOLD + + start = 0 + + while start < limit: + absent, extra, failed_queries, checked = compare_page(args, start) + start += checked + + missing_cassandra.extend(absent) + missing_solr.extend(extra) + failed.extend(failed_queries) + + if len(missing_cassandra) > 0: + logger.info(f'Found {len(missing_cassandra)} tile IDs missing from Cassandra:\n' + json.dumps(missing_cassandra, indent=4)) + + if len(missing_solr) > 0: + logger.info(f'Found {len(missing_solr)} tile IDs missing from Solr:\n' + json.dumps(missing_solr, indent=4)) + + if len(failed) > 0: + logger.info(f'There were {len(failed)} Cassandra queries that failed:\n' + json.dumps(failed, indent=4)) + + +def parse_args(): + parser = argparse.ArgumentParser(description='Ensure tiles in Solr exist in Cassandra', + formatter_class=argparse.ArgumentDefaultsHelpFormatter) + + parser.add_argument('--solr', + help='The url of the SOLR server.', + required=True, + metavar='127.0.0.1:8983') + + parser.add_argument('--collection', + help='The name of the SOLR collection.', + required=False, + default='nexustiles', + metavar='nexustiles') + + parser.add_argument('--solrIdField', + help='The name of the unique ID field for this collection.', + required=False, + default='id', + metavar='id') + + parser.add_argument('--cassandra', + help='The hostname(s) or IP(s) of the Cassandra server(s).', + required=True, + nargs='+', + metavar=('127.0.0.100', '127.0.0.101')) + + parser.add_argument('-k', '--cassandraKeyspace', + help='The Cassandra keyspace.', + default='nexustiles', + required=False, + metavar='nexustiles') + + parser.add_argument('-t', '--cassandraTable', + help='The name of the cassandra table.', + required=False, + default='sea_surface_temp') + + parser.add_argument('-p', '--cassandraPort', + help='The port used to connect to Cassandra.', + required=False, + default='9042') + + parser.add_argument('--cassandraUsername', + help='The username used to connect to Cassandra.', + required=False) + + parser.add_argument('--cassandraPassword', + help='The password used to connect to Cassandra.', + required=False) + + parser.add_argument('-pv', '--cassandraProtocolVersion', + help='The version of the Cassandra protocol the driver should use.', + required=False, + choices=['1', '2', '3', '4', '5'], + default='3') + + parser.add_argument('--solr-rows', + help='Size of Solr query pages to check', + required=False, + dest='rows', + default=100000, + type=int) + + parser.add_argument('--limit', + help='Maximum number of IDs to check. Default is all tiles', + required=False, + dest='limit', + default=None, + type=int) + + parser.add_argument('-v', '--verbose', dest='verbose', action='store_true', help='Enable verbose output') + + return parser.parse_args() + + +def main(): + args = parse_args() + init(args) + do_comparison(args) + + +if __name__ == '__main__': + main() diff --git a/tools/solr-cassandra-match/requirements.txt b/tools/solr-cassandra-match/requirements.txt new file mode 100644 index 00000000..1852f700 --- /dev/null +++ b/tools/solr-cassandra-match/requirements.txt @@ -0,0 +1 @@ +solrcloudpy==4.0.1 From 8bb6161793f948fbae1f0e6747a2033b15f5e075 Mon Sep 17 00:00:00 2001 From: rileykk Date: Mon, 5 Dec 2022 16:58:35 -0800 Subject: [PATCH 02/12] Logging + better async cassandra query --- tools/solr-cassandra-match/match.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/tools/solr-cassandra-match/match.py b/tools/solr-cassandra-match/match.py index dec3107d..dc413ca1 100644 --- a/tools/solr-cassandra-match/match.py +++ b/tools/solr-cassandra-match/match.py @@ -84,7 +84,14 @@ def compare_page(args, start) -> Tuple[List, List, List, int]: statement = cassandra_session.prepare("SELECT tile_id FROM %s where tile_id=?" % cassandra_table) logger.debug('Starting Cassandra query') - results = cassandra.concurrent.execute_concurrent_with_args(cassandra_session, statement, [(uuid.UUID(str(id)),) for id in ids]) + results = cassandra.concurrent.execute_concurrent_with_args( + cassandra_session, + statement, + [(uuid.UUID(str(id)),) for id in ids], + concurrency=1000, + raise_on_first_error=False, + results_generator=True + ) failed = [] present = [] @@ -107,6 +114,13 @@ def compare_page(args, start) -> Tuple[List, List, List, int]: except: extra.append(id) + logger.debug('Page stats: \n' + json.dumps({ + 'missing': len(ids), + 'extra': len(extra), + 'failed': len(failed), + 'total_checked': len(docs) + }, indent=4)) + return ids, extra, failed, len(docs) From 86d47352a7f0163809c3c6d79dc1507d9b926b9e Mon Sep 17 00:00:00 2001 From: rileykk Date: Tue, 13 Dec 2022 13:16:04 -0800 Subject: [PATCH 03/12] Improved logging and handling of failed cassandra queries --- tools/solr-cassandra-match/match.py | 120 ++++++++++++++++++++-------- 1 file changed, 87 insertions(+), 33 deletions(-) diff --git a/tools/solr-cassandra-match/match.py b/tools/solr-cassandra-match/match.py index dc413ca1..8446a2a7 100644 --- a/tools/solr-cassandra-match/match.py +++ b/tools/solr-cassandra-match/match.py @@ -1,6 +1,7 @@ import argparse import json import logging +from time import sleep import uuid from typing import List, Tuple @@ -24,6 +25,17 @@ PROCEED_THRESHOLD = 300000 +class Encoder(json.JSONEncoder): + def __init__(self, **args): + json.JSONEncoder.__init__(self, **args) + + def default(self, o): + if isinstance(o, uuid.UUID): + return str(o) + else: + return json.JSONEncoder.default(self, o) + + def init(args): global solr_connection solr_connection = SolrConnection(args.solr) @@ -72,47 +84,64 @@ def init(args): def compare_page(args, start) -> Tuple[List, List, List, int]: se = SearchOptions() - logger.debug(f'Running solr query from {start} to {start + args.rows}') + logger.debug(f'Running solr query from {start:,} to {start + args.rows:,}') - se.commonparams.fl(SOLR_UNIQUE_KEY).q('*:*').start(start).rows(args.rows) + se.commonparams.fl(SOLR_UNIQUE_KEY).q(args.q).start(start).rows(args.rows) query = solr_collection.search(se) docs = query.result.response.docs - ids = [uuid.UUID(row[SOLR_UNIQUE_KEY]) for row in docs] + ids = [str(uuid.UUID(row[SOLR_UNIQUE_KEY])) for row in docs] statement = cassandra_session.prepare("SELECT tile_id FROM %s where tile_id=?" % cassandra_table) - logger.debug('Starting Cassandra query') - results = cassandra.concurrent.execute_concurrent_with_args( - cassandra_session, - statement, - [(uuid.UUID(str(id)),) for id in ids], - concurrency=1000, - raise_on_first_error=False, - results_generator=True - ) + retries = 3 - failed = [] - present = [] extra = [] - logger.debug('Processing Cassandra results') - for (success, result) in results: - if not success: - failed.append(result) - else: - rows = result.all() + is_retry = False + + while retries > 0: + if is_retry: + logger.debug('Retrying query with failed IDs') + + logger.debug(f'Starting Cassandra query for {len(ids):,} tiles') + results = cassandra.concurrent.execute_concurrent_with_args( + cassandra_session, + statement, + [(uuid.UUID(str(id)),) for id in ids], + concurrency=10000, + raise_on_first_error=False, + results_generator=True + ) + + failed = [] + present = [] + + logger.debug('Processing Cassandra results') + for (success, result) in results: + if not success: + failed.append(str(result)) + else: + rows = result.all() - present.extend([row[0] for row in rows]) + present.extend([str(row[0]) for row in rows]) - logger.debug('Finished processing Cassandra results') + logger.debug(f'Finished processing Cassandra results: found {len(present):,} tiles') - for id in present: - try: - ids.remove(id) - except: - extra.append(id) + for id in present: + try: + ids.remove(id) + except: + extra.append(id) + + if len(failed) > 0: + logger.warning(f'{len(failed)} queries failed, maybe retrying') + retries -= 1 + is_retry = True + sleep(5) + else: + break logger.debug('Page stats: \n' + json.dumps({ 'missing': len(ids), @@ -131,11 +160,11 @@ def do_comparison(args): se = SearchOptions() - se.commonparams.rows(0).q('*:*') + se.commonparams.rows(0).q(args.q) num_tiles = solr_collection.search(se).result.response.numFound - logger.info(f'Found {num_tiles} tiles in Solr') + logger.info(f'Found {num_tiles:,} tiles in Solr') limit = num_tiles @@ -171,13 +200,22 @@ def do_comparison(args): failed.extend(failed_queries) if len(missing_cassandra) > 0: - logger.info(f'Found {len(missing_cassandra)} tile IDs missing from Cassandra:\n' + json.dumps(missing_cassandra, indent=4)) + logger.info(f'Found {len(missing_cassandra):,} tile IDs missing from Cassandra:\n' + + json.dumps(missing_cassandra, indent=4, cls=Encoder)) + else: + logger.info('No tiles found missing from Cassandra') if len(missing_solr) > 0: - logger.info(f'Found {len(missing_solr)} tile IDs missing from Solr:\n' + json.dumps(missing_solr, indent=4)) + logger.info(f'Found {len(missing_solr):,} tile IDs missing from Solr:\n' + + json.dumps(missing_solr, indent=4, cls=Encoder)) + else: + logger.info('No tiles found missing from Solr') if len(failed) > 0: - logger.info(f'There were {len(failed)} Cassandra queries that failed:\n' + json.dumps(failed, indent=4)) + logger.info(f'There were {len(failed):,} Cassandra queries that failed:\n' + + json.dumps(failed, indent=4, cls=Encoder)) + else: + logger.info('No Cassandra queries have failed') def parse_args(): @@ -251,6 +289,13 @@ def parse_args(): default=None, type=int) + parser.add_argument('-q', + help='Solr query string', + required=False, + dest='q', + default='*:*', + metavar='QUERY') + parser.add_argument('-v', '--verbose', dest='verbose', action='store_true', help='Enable verbose output') return parser.parse_args() @@ -263,4 +308,13 @@ def main(): if __name__ == '__main__': - main() + try: + main() + except Exception as e: + logging.error('Something went wrong!') + logging.exception(e) + + if logger: + logger.info('Exiting') + else: + logging.info('Exiting') From 6ba1bc82d0ff657240309bd27a3f9916ba560d24 Mon Sep 17 00:00:00 2001 From: rileykk Date: Tue, 13 Dec 2022 13:18:37 -0800 Subject: [PATCH 04/12] Updated changelog to reflect change happening after release 1.0.0 --- CHANGELOG.md | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 39aa5d9e..a95a3623 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,15 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [Unreleased] +### Added +- Added tool to verify tile data matches between Solr & Cassandra +### Changed +### Deprecated +### Removed +### Fixed +### Security + ## [1.0.0] - 2022-11-22 ### Added - SDAP-388: Enable SDAP to proxy/redirect to alternate SDAP @@ -25,7 +34,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - SDAP-407: Added depth to `/domsresults` endpoint - Added documentation for building SDAP docker images - Prepared documentation for v1.0.0 release. -- Added tool to verify tile data matches between Solr & Cassandra ### Changed - SDAP-390: Changed `/doms` to `/cdms` and `doms_reader.py` to `cdms_reader.py` - domslist endpoint points to AWS insitu instead of doms insitu From 9f7e18b668240974cef71a9538ad9c5a429ec27c Mon Sep 17 00:00:00 2001 From: rileykk Date: Tue, 13 Dec 2022 13:38:41 -0800 Subject: [PATCH 05/12] Undid accidentally committed changes to analysis/setup.py --- analysis/setup.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/analysis/setup.py b/analysis/setup.py index 0b579bd1..611f971f 100644 --- a/analysis/setup.py +++ b/analysis/setup.py @@ -17,15 +17,12 @@ import setuptools from subprocess import check_call, CalledProcessError -try: - with open('../VERSION.txt', 'r') as f: - __version__ = f.read() -except: - __version__ = '1.0.0' +with open('../VERSION.txt', 'r') as f: + __version__ = f.read() try: - # check_call(['mamba', 'install', '-y', '-c', 'conda-forge', '--file', 'conda-requirements.txt']) + check_call(['mamba', 'install', '-y', '-c', 'conda-forge', '--file', 'conda-requirements.txt']) pass except (CalledProcessError, IOError) as e: print('Failed install with mamba; falling back to conda') From 4bfeca1ba49b5ced198af19e02890fd5b8913946 Mon Sep 17 00:00:00 2001 From: rileykk Date: Wed, 14 Dec 2022 14:40:21 -0800 Subject: [PATCH 06/12] Updates - Improved logging - Try to mitigate Solr query timeouts - Used execution profile for Cassandra --- tools/solr-cassandra-match/match.py | 71 +++++++++++++-------- tools/solr-cassandra-match/requirements.txt | 1 + 2 files changed, 47 insertions(+), 25 deletions(-) diff --git a/tools/solr-cassandra-match/match.py b/tools/solr-cassandra-match/match.py index 8446a2a7..568ad680 100644 --- a/tools/solr-cassandra-match/match.py +++ b/tools/solr-cassandra-match/match.py @@ -1,16 +1,17 @@ import argparse import json import logging -from time import sleep import uuid +from time import sleep from typing import List, Tuple import cassandra.concurrent from cassandra.auth import PlainTextAuthProvider -from cassandra.cluster import Cluster +from cassandra.cluster import Cluster, ExecutionProfile, EXEC_PROFILE_DEFAULT from cassandra.policies import RoundRobinPolicy, TokenAwarePolicy from six.moves import input from solrcloudpy import SolrConnection, SearchOptions +from tenacity import retry, stop_after_attempt, wait_exponential solr_connection = None solr_collection = None @@ -20,7 +21,7 @@ cassandra_session = None cassandra_table = None -logger = None +logger = logging PROCEED_THRESHOLD = 300000 @@ -37,8 +38,25 @@ def default(self, o): def init(args): + global logger + logging.basicConfig( + level=logging.DEBUG if args.verbose else logging.INFO, + format="%(asctime)s [%(levelname)s] [%(name)s::%(lineno)d] %(message)s" + ) + + logger = logging.getLogger(__name__) + + logger.setLevel(logging.DEBUG if args.verbose else logging.INFO) + logging.getLogger().handlers[0].setFormatter( + logging.Formatter( + fmt="%(asctime)s [%(levelname)s] [%(name)s::%(lineno)d] %(message)s", + datefmt="%Y-%m-%dT%H:%M:%S" + )) + + logging.getLogger('cassandra').setLevel(logging.CRITICAL) + global solr_connection - solr_connection = SolrConnection(args.solr) + solr_connection = SolrConnection(args.solr, timeout=60) global solr_collection solr_collection = solr_connection[args.collection] global SOLR_UNIQUE_KEY @@ -55,7 +73,10 @@ def init(args): global cassandra_cluster cassandra_cluster = Cluster(contact_points=args.cassandra, port=args.cassandraPort, protocol_version=int(args.cassandraProtocolVersion), - load_balancing_policy=token_policy, + #load_balancing_policy=token_policy, + execution_profiles={ + EXEC_PROFILE_DEFAULT: ExecutionProfile(load_balancing_policy=token_policy) + }, auth_provider=auth_provider) global cassandra_session cassandra_session = cassandra_cluster.connect(keyspace=args.cassandraKeyspace) @@ -63,32 +84,29 @@ def init(args): global cassandra_table cassandra_table = args.cassandraTable - global logger - logging.basicConfig( - level=logging.DEBUG if args.verbose else logging.INFO, - format="%(asctime)s [%(levelname)s] [%(name)s::%(lineno)d] %(message)s" - ) - logger = logging.getLogger(__name__) - - logger.setLevel(logging.DEBUG if args.verbose else logging.INFO) - logging.getLogger().handlers[0].setFormatter( - logging.Formatter( - fmt="%(asctime)s [%(levelname)s] [%(name)s::%(lineno)d] %(message)s", - datefmt="%Y-%m-%dT%H:%M:%S" - )) - - logging.getLogger('cassandra').setLevel(logging.CRITICAL) +@retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=10, max=60)) +def try_solr(collection, se): + try: + logger.debug('Starting Solr query') + response = collection.search(se) + logger.debug('Finished Solr query') + return response + except Exception as e: + logger.error("Solr query failed") + logger.exception(e) + solr_connection.timeout *= 2 + raise def compare_page(args, start) -> Tuple[List, List, List, int]: se = SearchOptions() - logger.debug(f'Running solr query from {start:,} to {start + args.rows:,}') + logger.info(f'Running solr query from {start:,} to {start + args.rows:,}') se.commonparams.fl(SOLR_UNIQUE_KEY).q(args.q).start(start).rows(args.rows) - query = solr_collection.search(se) + query = try_solr(solr_collection, se) docs = query.result.response.docs ids = [str(uuid.UUID(row[SOLR_UNIQUE_KEY])) for row in docs] @@ -98,12 +116,13 @@ def compare_page(args, start) -> Tuple[List, List, List, int]: retries = 3 extra = [] + failed = [] is_retry = False while retries > 0: if is_retry: - logger.debug('Retrying query with failed IDs') + logger.info('Retrying query with failed IDs') logger.debug(f'Starting Cassandra query for {len(ids):,} tiles') results = cassandra.concurrent.execute_concurrent_with_args( @@ -127,7 +146,7 @@ def compare_page(args, start) -> Tuple[List, List, List, int]: present.extend([str(row[0]) for row in rows]) - logger.debug(f'Finished processing Cassandra results: found {len(present):,} tiles') + logger.info(f'Finished processing Cassandra results: found {len(present):,} tiles') for id in present: try: @@ -162,7 +181,9 @@ def do_comparison(args): se.commonparams.rows(0).q(args.q) - num_tiles = solr_collection.search(se).result.response.numFound + logger.info('Querying Solr to see how many tiles we need to check...') + + num_tiles = try_solr(solr_collection, se).result.response.numFound logger.info(f'Found {num_tiles:,} tiles in Solr') diff --git a/tools/solr-cassandra-match/requirements.txt b/tools/solr-cassandra-match/requirements.txt index 1852f700..3b4e5178 100644 --- a/tools/solr-cassandra-match/requirements.txt +++ b/tools/solr-cassandra-match/requirements.txt @@ -1 +1,2 @@ solrcloudpy==4.0.1 +tenacity==8.0.1 From efacbd39d57f4b03b8a72c5a2c8791f2fc1e5466 Mon Sep 17 00:00:00 2001 From: rileykk Date: Mon, 19 Dec 2022 14:21:02 -0800 Subject: [PATCH 07/12] Updates - Improved logging - Used markers for Solr queries instead of start/rows - Progress bar for Cassandra queries --- tools/solr-cassandra-match/match.py | 73 ++++++++++++++++----- tools/solr-cassandra-match/requirements.txt | 1 + 2 files changed, 57 insertions(+), 17 deletions(-) diff --git a/tools/solr-cassandra-match/match.py b/tools/solr-cassandra-match/match.py index 568ad680..39d5ffee 100644 --- a/tools/solr-cassandra-match/match.py +++ b/tools/solr-cassandra-match/match.py @@ -2,6 +2,7 @@ import json import logging import uuid +from datetime import datetime from time import sleep from typing import List, Tuple @@ -12,6 +13,7 @@ from six.moves import input from solrcloudpy import SolrConnection, SearchOptions from tenacity import retry, stop_after_attempt, wait_exponential +from tqdm import tqdm solr_connection = None solr_collection = None @@ -87,28 +89,51 @@ def init(args): @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=10, max=60)) def try_solr(collection, se): + t_s = datetime.now() + try: logger.debug('Starting Solr query') + response = collection.search(se) - logger.debug('Finished Solr query') + + t_e = datetime.now() + elapsed = t_e - t_s + logger.debug(f'Finished Solr query in {elapsed}') + return response except Exception as e: - logger.error("Solr query failed") + t_e = datetime.now() + elapsed = t_e - t_s + + logger.error(f"Solr query failed after {elapsed}") logger.exception(e) + solr_connection.timeout *= 2 + raise -def compare_page(args, start) -> Tuple[List, List, List, int]: +def compare_page(args, start, mark) -> Tuple[List, List, List, int, str]: se = SearchOptions() logger.info(f'Running solr query from {start:,} to {start + args.rows:,}') - se.commonparams.fl(SOLR_UNIQUE_KEY).q(args.q).start(start).rows(args.rows) + se.commonparams.fl(SOLR_UNIQUE_KEY).q(args.q).rows(args.rows).sort(f'{SOLR_UNIQUE_KEY} asc') + + se.commonparams.remove_param('cursorMark') + se.commonparams.add_params(cursorMark=mark) query = try_solr(solr_collection, se) docs = query.result.response.docs + try: + next_mark = query.result.nextCursorMark + except AttributeError: + return [], [], [], 0, '' + + if next_mark == mark: + return [], [], [], 0, next_mark + ids = [str(uuid.UUID(row[SOLR_UNIQUE_KEY])) for row in docs] statement = cassandra_session.prepare("SELECT tile_id FROM %s where tile_id=?" % cassandra_table) @@ -119,9 +144,12 @@ def compare_page(args, start) -> Tuple[List, List, List, int]: failed = [] is_retry = False + wait = 5 while retries > 0: if is_retry: + sleep(wait) + wait += 10 logger.info('Retrying query with failed IDs') logger.debug(f'Starting Cassandra query for {len(ids):,} tiles') @@ -138,7 +166,8 @@ def compare_page(args, start) -> Tuple[List, List, List, int]: present = [] logger.debug('Processing Cassandra results') - for (success, result) in results: + + for (success, result) in tqdm(results, total=len(ids), desc='Cassandra queries', unit='tile'): if not success: failed.append(str(result)) else: @@ -146,19 +175,20 @@ def compare_page(args, start) -> Tuple[List, List, List, int]: present.extend([str(row[0]) for row in rows]) - logger.info(f'Finished processing Cassandra results: found {len(present):,} tiles') + found = [str(row[0]) for row in rows] - for id in present: - try: - ids.remove(id) - except: - extra.append(id) + for id in found: + try: + ids.remove(id) + except: + extra.append(id) + + logger.info(f'Finished processing Cassandra results: found {len(present):,} tiles') if len(failed) > 0: logger.warning(f'{len(failed)} queries failed, maybe retrying') retries -= 1 is_retry = True - sleep(5) else: break @@ -169,7 +199,7 @@ def compare_page(args, start) -> Tuple[List, List, List, int]: 'total_checked': len(docs) }, indent=4)) - return ids, extra, failed, len(docs) + return ids, extra, failed, len(docs), next_mark def do_comparison(args): @@ -212,10 +242,17 @@ def do_comparison(args): start = 0 - while start < limit: - absent, extra, failed_queries, checked = compare_page(args, start) + mark = '*' + + while True: + absent, extra, failed_queries, checked, next_mark = compare_page(args, start, mark) start += checked + if next_mark == mark: + break + else: + mark = next_mark + missing_cassandra.extend(absent) missing_solr.extend(extra) failed.extend(failed_queries) @@ -329,6 +366,8 @@ def main(): if __name__ == '__main__': + start = datetime.now() + try: main() except Exception as e: @@ -336,6 +375,6 @@ def main(): logging.exception(e) if logger: - logger.info('Exiting') + logger.info(f'Exiting. Run time = {datetime.now() - start}') else: - logging.info('Exiting') + logging.info(f'Exiting. Run time = {datetime.now() - start}') diff --git a/tools/solr-cassandra-match/requirements.txt b/tools/solr-cassandra-match/requirements.txt index 3b4e5178..63d73a69 100644 --- a/tools/solr-cassandra-match/requirements.txt +++ b/tools/solr-cassandra-match/requirements.txt @@ -1,2 +1,3 @@ solrcloudpy==4.0.1 tenacity==8.0.1 +tqdm==4.64.1 From a2820dcd34637d4ec1d72ecac7c0632c21a3b285 Mon Sep 17 00:00:00 2001 From: rileykk Date: Mon, 19 Dec 2022 14:45:47 -0800 Subject: [PATCH 08/12] Actually try to enforce the limit argument. Enforcement is loose; Solr queries will not be limited and may overshoot but no further queries will run after the limit is exceeded --- tools/solr-cassandra-match/match.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/tools/solr-cassandra-match/match.py b/tools/solr-cassandra-match/match.py index 39d5ffee..bf0d7288 100644 --- a/tools/solr-cassandra-match/match.py +++ b/tools/solr-cassandra-match/match.py @@ -75,7 +75,6 @@ def init(args): global cassandra_cluster cassandra_cluster = Cluster(contact_points=args.cassandra, port=args.cassandraPort, protocol_version=int(args.cassandraProtocolVersion), - #load_balancing_policy=token_policy, execution_profiles={ EXEC_PROFILE_DEFAULT: ExecutionProfile(load_balancing_policy=token_policy) }, @@ -257,6 +256,9 @@ def do_comparison(args): missing_solr.extend(extra) failed.extend(failed_queries) + if start >= limit: + break + if len(missing_cassandra) > 0: logger.info(f'Found {len(missing_cassandra):,} tile IDs missing from Cassandra:\n' + json.dumps(missing_cassandra, indent=4, cls=Encoder)) @@ -341,7 +343,9 @@ def parse_args(): type=int) parser.add_argument('--limit', - help='Maximum number of IDs to check. Default is all tiles', + help='Maximum number of IDs to check. Default is all tiles. Enforcement is currently loose; ' + 'will not run Solr queries past the limit but will check the full query. Eg. If limit is ' + '750,000 and solr-rows is 200,000, the first 800,000 tiles will be checked', required=False, dest='limit', default=None, From a8f396e849ce35540d2cda1ad2ff88cc52e00ae8 Mon Sep 17 00:00:00 2001 From: rileykk Date: Mon, 19 Dec 2022 14:47:31 -0800 Subject: [PATCH 09/12] Added defaults for Cassandra uname & password --- tools/solr-cassandra-match/match.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tools/solr-cassandra-match/match.py b/tools/solr-cassandra-match/match.py index bf0d7288..afdf14f0 100644 --- a/tools/solr-cassandra-match/match.py +++ b/tools/solr-cassandra-match/match.py @@ -323,10 +323,12 @@ def parse_args(): parser.add_argument('--cassandraUsername', help='The username used to connect to Cassandra.', + default='cassandra', required=False) parser.add_argument('--cassandraPassword', help='The password used to connect to Cassandra.', + default='cassandra', required=False) parser.add_argument('-pv', '--cassandraProtocolVersion', From 5ce8f22ddb741473286f38a27cdac97ba4aeb98d Mon Sep 17 00:00:00 2001 From: rileykk Date: Tue, 20 Dec 2022 13:42:14 -0800 Subject: [PATCH 10/12] Alternate check to verify Cassandra tiles are in Solr --- tools/solr-cassandra-match/match.py | 148 +++++++++++++++++++++++++--- 1 file changed, 132 insertions(+), 16 deletions(-) diff --git a/tools/solr-cassandra-match/match.py b/tools/solr-cassandra-match/match.py index afdf14f0..b8426304 100644 --- a/tools/solr-cassandra-match/match.py +++ b/tools/solr-cassandra-match/match.py @@ -1,10 +1,12 @@ import argparse +import concurrent.futures import json import logging import uuid from datetime import datetime +from functools import partial from time import sleep -from typing import List, Tuple +from typing import List, Tuple, Union import cassandra.concurrent from cassandra.auth import PlainTextAuthProvider @@ -26,6 +28,7 @@ logger = logging PROCEED_THRESHOLD = 300000 +MAX_SOLR_FQ = 150 class Encoder(json.JSONEncoder): @@ -43,7 +46,7 @@ def init(args): global logger logging.basicConfig( level=logging.DEBUG if args.verbose else logging.INFO, - format="%(asctime)s [%(levelname)s] [%(name)s::%(lineno)d] %(message)s" + format="%(asctime)s [%(threadName)s] [%(levelname)s] [%(name)s::%(lineno)d] %(message)s" ) logger = logging.getLogger(__name__) @@ -51,11 +54,12 @@ def init(args): logger.setLevel(logging.DEBUG if args.verbose else logging.INFO) logging.getLogger().handlers[0].setFormatter( logging.Formatter( - fmt="%(asctime)s [%(levelname)s] [%(name)s::%(lineno)d] %(message)s", + fmt="%(asctime)s [%(threadName)s] [%(levelname)s] [%(name)s::%(lineno)d] %(message)s", datefmt="%Y-%m-%dT%H:%M:%S" )) logging.getLogger('cassandra').setLevel(logging.CRITICAL) + logging.getLogger('solrcloudpy').setLevel(logging.CRITICAL) global solr_connection solr_connection = SolrConnection(args.solr, timeout=60) @@ -86,26 +90,37 @@ def init(args): cassandra_table = args.cassandraTable +def write_json(obj: Union[list, dict], file: str): + logger.info(f'Writing to file {file}') + + json.dump(obj, open(file, 'w'), indent=4, cls=Encoder) + + logger.info('done') + + @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=10, max=60)) -def try_solr(collection, se): +def try_solr(collection, se, inhibit_log=False): t_s = datetime.now() try: - logger.debug('Starting Solr query') + if not inhibit_log: + logger.debug('Starting Solr query') response = collection.search(se) t_e = datetime.now() elapsed = t_e - t_s - logger.debug(f'Finished Solr query in {elapsed}') + if not inhibit_log: + logger.debug(f'Finished Solr query in {elapsed}') return response except Exception as e: t_e = datetime.now() elapsed = t_e - t_s - logger.error(f"Solr query failed after {elapsed}") - logger.exception(e) + if not inhibit_log: + logger.error(f"Solr query failed after {elapsed}") + logger.exception(e) solr_connection.timeout *= 2 @@ -135,7 +150,7 @@ def compare_page(args, start, mark) -> Tuple[List, List, List, int, str]: ids = [str(uuid.UUID(row[SOLR_UNIQUE_KEY])) for row in docs] - statement = cassandra_session.prepare("SELECT tile_id FROM %s where tile_id=?" % cassandra_table) + statement = cassandra_session.prepare("SELECT tile_id FROM %s WHERE tile_id=?" % cassandra_table) retries = 3 @@ -260,24 +275,114 @@ def do_comparison(args): break if len(missing_cassandra) > 0: - logger.info(f'Found {len(missing_cassandra):,} tile IDs missing from Cassandra:\n' + - json.dumps(missing_cassandra, indent=4, cls=Encoder)) + logger.info(f'Found {len(missing_cassandra):,} tile IDs missing from Cassandra:') + write_json(missing_cassandra, 'missing_cassandra.json') else: logger.info('No tiles found missing from Cassandra') if len(missing_solr) > 0: - logger.info(f'Found {len(missing_solr):,} tile IDs missing from Solr:\n' + - json.dumps(missing_solr, indent=4, cls=Encoder)) + logger.info(f'Found {len(missing_solr):,} tile IDs missing from Solr:') + write_json(missing_solr, 'extra_cassandra.json') else: logger.info('No tiles found missing from Solr') if len(failed) > 0: - logger.info(f'There were {len(failed):,} Cassandra queries that failed:\n' + - json.dumps(failed, indent=4, cls=Encoder)) + logger.info(f'There were {len(failed):,} Cassandra queries that failed:') + write_json(failed, 'failed_cassandra.json') else: logger.info('No Cassandra queries have failed') +def cassandra_to_solr(args): + missing = [] + + se = SearchOptions() + + se.commonparams.rows(0).q('*:*') + + logger.info('Querying Solr to estimate the number of tiles in Cassandra...') + + num_tiles = try_solr(solr_collection, se).result.response.numFound + + logger.info(f'Found {num_tiles:,} tiles in Solr') + + if num_tiles >= PROCEED_THRESHOLD: + do_continue = input(f"There are a large number of estimated tiles ({num_tiles:,}). " + f"Do you wish to proceed? [y]/n: ") + + while do_continue not in ['y', 'n', '']: + do_continue = input(f"There are a large number of estimated tiles ({num_tiles:,}). " + f"Do you wish to proceed? [y]/n: ") + + if do_continue == 'n': + return + + statement = cassandra_session.prepare("SELECT tile_id FROM %s" % cassandra_table) + + results = cassandra_session.execute(statement, timeout=60) + + cassandra_tiles = [] + cassandra_tile_count = 0 + + rows = args.rows + + pool = concurrent.futures.ThreadPoolExecutor(max_workers=16, thread_name_prefix='solr-query-worker') + + for result in tqdm(results, total=num_tiles, desc='Cassandra query', unit=' rows'): + cassandra_tiles.append(str(result.tile_id)) + cassandra_tile_count += 1 + + while len(cassandra_tiles) >= rows: + to_check = cassandra_tiles[:rows] + cassandra_tiles = cassandra_tiles[rows:] + + missing.extend(check_solr(args, to_check, pool)) + + if len(cassandra_tiles) > 0: + missing.extend(check_solr(args, cassandra_tiles, pool)) + + pool.shutdown() + + logger.info(f'Finished checking {cassandra_tile_count} tiles') + + if len(missing) > 0: + logger.info(f'Found {len(missing):,} tile IDs missing from Solr:') + write_json(missing, 'missing_solr.json') + else: + logger.info('No tiles found missing from Solr') + + +def check_solr(args, ids: List[str], pool: concurrent.futures.ThreadPoolExecutor) -> List[str]: + ids.sort() + + id_set = set(ids) + + func = partial(aio_solr_query, args) + + solr_tiles = [] + batches = [ids[i:i + MAX_SOLR_FQ] for i in range(0, len(ids), MAX_SOLR_FQ)] + + pool_result = pool.map(func, batches) + + for result in tqdm(pool_result, total=len(batches), desc=' Solr queries', unit=' queries', leave=False): + solr_tiles.extend(result) + + solr_set = set(solr_tiles) + + diff = id_set.difference(solr_set) + + return list(diff) + + +def aio_solr_query(args, ids: List[str]) -> List[str]: + se = SearchOptions() + se.commonparams.q('*:*').fq("{!terms f=id}%s" % ','.join(ids)).fl(SOLR_UNIQUE_KEY).rows(len(ids)) + solr_query = try_solr(solr_collection, se, inhibit_log=True) + solr_tiles = [r[SOLR_UNIQUE_KEY] for r in solr_query.result.response.docs] + + return solr_tiles + + def parse_args(): parser = argparse.ArgumentParser(description='Ensure tiles in Solr exist in Cassandra', formatter_class=argparse.ArgumentDefaultsHelpFormatter) @@ -293,6 +398,12 @@ def parse_args(): default='nexustiles', metavar='nexustiles') + parser.add_argument('--check-cassandra', + help='Check the tiles in Cassandra are present in Solr.', + required=False, + dest='check_cassandra', + action='store_true') + parser.add_argument('--solrIdField', help='The name of the unique ID field for this collection.', required=False, @@ -368,7 +479,12 @@ def parse_args(): def main(): args = parse_args() init(args) - do_comparison(args) + if not args.check_cassandra: + logger.info('Verifying the tiles in Solr are present in Cassandra...') + do_comparison(args) + else: + logger.info('Verifying the tiles in Cassandra are present in Solr...') + cassandra_to_solr(args) if __name__ == '__main__': From 3728620b6c59dae15a4c73fd408a4199b2c280c5 Mon Sep 17 00:00:00 2001 From: rileykk Date: Wed, 21 Dec 2022 13:12:13 -0800 Subject: [PATCH 11/12] Update dockerfile to install requirements --- docker/nexus-webapp/Dockerfile | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docker/nexus-webapp/Dockerfile b/docker/nexus-webapp/Dockerfile index 69af0196..d24181ae 100644 --- a/docker/nexus-webapp/Dockerfile +++ b/docker/nexus-webapp/Dockerfile @@ -103,6 +103,10 @@ RUN pip3 install -r requirements.txt RUN pip3 install cython RUN rm requirements.txt +WORKDIR /incubator-sdap-nexus/tools/solr-cassandra-match +RUN pip3 install -r requirements.txt +RUN rm requirements.txt + WORKDIR /incubator-sdap-nexus # Upgrade kubernetes client jar from the default version From ca109a612534efcc5b4b81edb9c32077f6675adb Mon Sep 17 00:00:00 2001 From: rileykk Date: Thu, 15 Jun 2023 16:21:11 -0700 Subject: [PATCH 12/12] Added scripts for orphan tile verification and deletion --- tools/solr-cassandra-match/deletebyid.py | 225 ++++++++++++++++++++++ tools/solr-cassandra-match/match.py | 24 +++ tools/solr-cassandra-match/verify.py | 233 +++++++++++++++++++++++ 3 files changed, 482 insertions(+) create mode 100644 tools/solr-cassandra-match/deletebyid.py create mode 100644 tools/solr-cassandra-match/verify.py diff --git a/tools/solr-cassandra-match/deletebyid.py b/tools/solr-cassandra-match/deletebyid.py new file mode 100644 index 00000000..f57d9161 --- /dev/null +++ b/tools/solr-cassandra-match/deletebyid.py @@ -0,0 +1,225 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import uuid +import json +import asyncio + +import logging +from datetime import datetime +from functools import partial +from time import sleep +from typing import List, Tuple, Union + +import cassandra.concurrent +from cassandra.auth import PlainTextAuthProvider +from cassandra.cluster import Cluster, ExecutionProfile, EXEC_PROFILE_DEFAULT +from cassandra.policies import RoundRobinPolicy, TokenAwarePolicy +from six.moves import input +from solrcloudpy import SolrConnection, SearchOptions +from tenacity import retry, stop_after_attempt, wait_exponential +from tqdm import tqdm + + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(threadName)s] [%(levelname)s] [%(name)s::%(lineno)d] %(message)s" + ) + +logger = logging.getLogger('delete-by-id') + +logger.setLevel(logging.INFO) +logging.getLogger().handlers[0].setFormatter( + logging.Formatter( + fmt="%(asctime)s [%(threadName)s] [%(levelname)s] [%(name)s::%(lineno)d] %(message)s", + datefmt="%Y-%m-%dT%H:%M:%S" + )) + +logging.getLogger('cassandra').setLevel(logging.CRITICAL) +logging.getLogger('solrcloudpy').setLevel(logging.CRITICAL) + +CASSANDRA_BATCH_SIZE = 8192 +SOLR_BATCH_SIZE = 256 + + +def delete_from_solr(args, ids): + pass + + +def delete_from_cassandra(args, ids): + logger.info('Trying to connect to Cassandra...') + + dc_policy = RoundRobinPolicy() + token_policy = TokenAwarePolicy(dc_policy) + + if args.cassandraUsername and args.cassandraPassword: + auth_provider = PlainTextAuthProvider(username=args.cassandraUsername, password=args.cassandraPassword) + else: + auth_provider = None + + cassandra_cluster = Cluster(contact_points=args.cassandra, port=args.cassandraPort, + protocol_version=int(args.cassandraProtocolVersion), + execution_profiles={ + EXEC_PROFILE_DEFAULT: ExecutionProfile(load_balancing_policy=token_policy) + }, + auth_provider=auth_provider) + cassandra_session = cassandra_cluster.connect(keyspace=args.cassandraKeyspace) + + logger.info('Successfully connected to Cassandra') + + cassandra_table = args.cassandraTable + + batches = [ids[i:i + CASSANDRA_BATCH_SIZE] for i in range(0, len(ids), CASSANDRA_BATCH_SIZE)] + + logger.info(f'Prepared {len(batches):,} batches of tile ids to delete') + + start = datetime.now() + + n_tiles = len(ids) + deleting = 0 + + prepared_query = cassandra_session.prepare('DELETE FROM %s WHERE tile_id=?' % cassandra_table) + + @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=10, max=60)) + def delete_batch(batch): + for tile_id in batch: + futures.append(cassandra_session.execute_async(prepared_query, (tile_id,))) + + for f in futures: + try: + f.result() + except: + logger.warning('Batch delete failed; maybe retrying') + raise + + for batch in batches: + futures = [] + + deleting += len(batch) + + logger.info( + f'Deleting batch of {len(batch)} tiles from Cassandra | ' + f'({deleting}/{n_tiles}) [{deleting / n_tiles * 100:7.3f}%]') + + try: + delete_batch(batch) + except: + logger.critical('Failed to delete batch after multiple retries, exiting') + exit(1) + + logger.info(f'Deleted {len(ids):,} tiles from Cassandra in {str(datetime.now() - start)} seconds') + + +def read_ids(args): + logger.info(f'Reading ids from file {args.id_file}') + + with open(args.id_file) as f: + ids = json.load(f) + + return [uuid.UUID(i) for i in ids] + + +def main(args): + ids = read_ids(args) + + logger.info(f'Successfully read {len(ids):,} tile ids to delete') + + if args.target == 'solr': + delete_from_solr(args, ids) + else: + delete_from_cassandra(args, ids) + + +def parse_args(): + parser = argparse.ArgumentParser(description='Delete a list of tile ids from either Solr or Cassandra', + formatter_class=argparse.ArgumentDefaultsHelpFormatter) + + parser.add_argument('--target', + required=True, + choices=['solr', 'cassandra'], + help='Store to delete data from', + dest='target') + + parser.add_argument('-i', '--id-list', + required=True, + dest='id_file', + help='Path to JSON file containing a list of tile UUIDs to delete') + + parser.add_argument('--solr', + help='The url of the SOLR server.', + default='localhost:8983', + metavar='127.0.0.1:8983') + + parser.add_argument('--collection', + help='The name of the SOLR collection.', + required=False, + default='nexustiles', + metavar='nexustiles') + + parser.add_argument('--solrIdField', + help='The name of the unique ID field for this collection.', + required=False, + default='id', + metavar='id') + + parser.add_argument('--cassandra', + help='The hostname(s) or IP(s) of the Cassandra server(s).', + default=['localhost'], + nargs='+', + metavar=('127.0.0.100', '127.0.0.101')) + + parser.add_argument('-k', '--cassandraKeyspace', + help='The Cassandra keyspace.', + default='nexustiles', + required=False, + metavar='nexustiles') + + parser.add_argument('-t', '--cassandraTable', + help='The name of the cassandra table.', + required=False, + default='sea_surface_temp') + + parser.add_argument('-p', '--cassandraPort', + help='The port used to connect to Cassandra.', + required=False, + default='9042') + + parser.add_argument('--cassandraUsername', + help='The username used to connect to Cassandra.', + default='cassandra', + required=False) + + parser.add_argument('--cassandraPassword', + help='The password used to connect to Cassandra.', + default='cassandra', + required=False) + + parser.add_argument('-pv', '--cassandraProtocolVersion', + help='The version of the Cassandra protocol the driver should use.', + required=False, + choices=['1', '2', '3', '4', '5'], + default='3') + + parser.add_argument('-v', '--verbose', dest='verbose', action='store_true', help='Enable verbose output') + + return parser.parse_args() + + +if __name__ == '__main__': + args = parse_args() + main(args) + + diff --git a/tools/solr-cassandra-match/match.py b/tools/solr-cassandra-match/match.py index b8426304..282184ff 100644 --- a/tools/solr-cassandra-match/match.py +++ b/tools/solr-cassandra-match/match.py @@ -1,3 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import argparse import concurrent.futures import json @@ -328,7 +343,13 @@ def cassandra_to_solr(args): pool = concurrent.futures.ThreadPoolExecutor(max_workers=16, thread_name_prefix='solr-query-worker') + limit_reached = False + for result in tqdm(results, total=num_tiles, desc='Cassandra query', unit=' rows'): + if limit_reached: + logger.warning('Reached check limit; stopping check') + break + cassandra_tiles.append(str(result.tile_id)) cassandra_tile_count += 1 @@ -338,6 +359,9 @@ def cassandra_to_solr(args): missing.extend(check_solr(args, to_check, pool)) + if args.limit is not None and len(missing) >= args.limit: + limit_reached = True + if len(cassandra_tiles) > 0: missing.extend(check_solr(args, cassandra_tiles, pool)) diff --git a/tools/solr-cassandra-match/verify.py b/tools/solr-cassandra-match/verify.py new file mode 100644 index 00000000..b8aea465 --- /dev/null +++ b/tools/solr-cassandra-match/verify.py @@ -0,0 +1,233 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import uuid +import json +import asyncio + +import logging +from datetime import datetime +from functools import partial +from time import sleep +from typing import List, Tuple, Union + +import cassandra.concurrent +from cassandra.auth import PlainTextAuthProvider +from cassandra.cluster import Cluster, ExecutionProfile, EXEC_PROFILE_DEFAULT +from cassandra.policies import RoundRobinPolicy, TokenAwarePolicy +from six.moves import input +from solrcloudpy import SolrConnection, SearchOptions +from tenacity import retry, stop_after_attempt, wait_exponential +from tqdm import tqdm + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(threadName)s] [%(levelname)s] [%(name)s::%(lineno)d] %(message)s" + ) + +logger = logging.getLogger('delete-by-id') + +logger.setLevel(logging.INFO) +logging.getLogger().handlers[0].setFormatter( + logging.Formatter( + fmt="%(asctime)s [%(threadName)s] [%(levelname)s] [%(name)s::%(lineno)d] %(message)s", + datefmt="%Y-%m-%dT%H:%M:%S" + )) + +logging.getLogger('cassandra').setLevel(logging.CRITICAL) +logging.getLogger('solrcloudpy').setLevel(logging.CRITICAL) + + +BATCH_SIZE = 5000000 + + +def compare(l1, l2): + intersection = set(l1).intersection(set(l2)) + + return len(intersection) == 0 + + +def check_cassandra(args, ids): + pass + + +def check_solr(args, ids): + logger.info('Connecting to Solr...') + + solr_connection = SolrConnection(args.solr, timeout=60) + solr_collection = solr_connection[args.collection] + SOLR_UNIQUE_KEY = args.solrIdField + + logger.info('Fetching ids') + + se = SearchOptions() + se.commonparams.q('*:*').fl(SOLR_UNIQUE_KEY).fl('id').rows(args.rows).sort('%s asc' % SOLR_UNIQUE_KEY) + + solr_ids = [] + + next_cursor_mark = '*' + + while True: + se.commonparams.remove_param('cursorMark') + se.commonparams.add_params(cursorMark=next_cursor_mark) + response = solr_collection.search(se) + logger.debug('Executed Solr query for next page') + + try: + result_next_cursor_mark = response.result.nextCursorMark + except AttributeError: + logger.debug('Query empty') + return [] + + if result_next_cursor_mark == next_cursor_mark: + logger.debug('Reached end of Solr results') + break + else: + next_cursor_mark = response.result.nextCursorMark + + solr_ids.extend([uuid.UUID(doc['id']) for doc in response.result.response.docs]) + + logger.debug(f'Added {len(response.result.response.docs):,} docs ( -> {len(solr_ids):,}') + + if len(solr_ids) >= BATCH_SIZE: + logger.debug('Running comparison') + if not compare(solr_ids, ids): + return False + + solr_ids.clear() + + logger.debug('Running final comparison') + + if not compare(solr_ids, ids): + return False + + +def read_ids(args): + logger.info(f'Reading ids from file {args.id_file}') + + with open(args.id_file) as f: + ids = json.load(f) + + return [uuid.UUID(i) for i in ids] + + + +def main(args): + ids = read_ids(args) + + if args.target == 'solr': + disjoint = check_solr(args, ids) + else: + disjoint = check_cassandra(args, ids) + + if disjoint: + logger.info('THERE ARE SHARED TILE IDS') + else: + logger.info('All tile ids are absent from the target store') + + +def parse_args(): + parser = argparse.ArgumentParser(description='Verify that a list of tile ids is absent from either Solr or ' + 'Cassandra', + formatter_class=argparse.ArgumentDefaultsHelpFormatter) + + parser.add_argument('--target', + required=True, + choices=['solr', 'cassandra'], + help='Store to check', + dest='target') + + parser.add_argument('-i', '--id-list', + required=True, + dest='id_file', + help='Path to JSON file containing a list of tile UUIDs to delete') + + parser.add_argument('--solr', + help='The url of the SOLR server.', + default='localhost:8983', + metavar='127.0.0.1:8983') + + parser.add_argument('--collection', + help='The name of the SOLR collection.', + required=False, + default='nexustiles', + metavar='nexustiles') + + parser.add_argument('--solrIdField', + help='The name of the unique ID field for this collection.', + required=False, + default='id', + metavar='id') + + parser.add_argument('--cassandra', + help='The hostname(s) or IP(s) of the Cassandra server(s).', + default=['localhost'], + nargs='+', + metavar=('127.0.0.100', '127.0.0.101')) + + parser.add_argument('-k', '--cassandraKeyspace', + help='The Cassandra keyspace.', + default='nexustiles', + required=False, + metavar='nexustiles') + + parser.add_argument('-t', '--cassandraTable', + help='The name of the cassandra table.', + required=False, + default='sea_surface_temp') + + parser.add_argument('-p', '--cassandraPort', + help='The port used to connect to Cassandra.', + required=False, + default='9042') + + parser.add_argument('--cassandraUsername', + help='The username used to connect to Cassandra.', + default='cassandra', + required=False) + + parser.add_argument('--cassandraPassword', + help='The password used to connect to Cassandra.', + default='cassandra', + required=False) + + parser.add_argument('-pv', '--cassandraProtocolVersion', + help='The version of the Cassandra protocol the driver should use.', + required=False, + choices=['1', '2', '3', '4', '5'], + default='3') + + parser.add_argument('--solr-rows', + help='Size of Solr query pages to check', + required=False, + dest='rows', + default=100000, + type=int) + + parser.add_argument('-v', '--verbose', dest='verbose', action='store_true', help='Enable verbose output') + + return parser.parse_args() + + +if __name__ == '__main__': + args = parse_args() + + if args.verbose: + logger.setLevel(logging.DEBUG) + + main(args) + +