Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
a1cc8ab
Initial iteration of tile check script
Dec 6, 2022
8bb6161
Logging + better async cassandra query
Dec 6, 2022
86d4735
Improved logging and handling of failed cassandra queries
Dec 13, 2022
6ba1bc8
Updated changelog to reflect change happening after release 1.0.0
Dec 13, 2022
9f7e18b
Undid accidentally committed changes to analysis/setup.py
Dec 13, 2022
4bfeca1
Updates
Dec 14, 2022
efacbd3
Updates
Dec 19, 2022
a2820dc
Actually try to enforce the limit argument.
Dec 19, 2022
a8f396e
Added defaults for Cassandra uname & password
Dec 19, 2022
5ce8f22
Alternate check to verify Cassandra tiles are in Solr
Dec 20, 2022
3728620
Update dockerfile to install requirements
Dec 21, 2022
5cbd07a
Merge branch 'apache:master' into solr-cassandra-match
RKuttruff Jan 23, 2023
78de29c
Merge remote-tracking branch 'origin/master' into solr-cassandra-match
Jan 23, 2023
5b68d9e
Merge remote-tracking branch 'origin/master' into solr-cassandra-match
Feb 14, 2023
2dc4d7d
Merge branch 'apache:master' into solr-cassandra-match
RKuttruff Feb 21, 2023
9ae066e
Merge branch 'apache:master' into solr-cassandra-match
RKuttruff Mar 13, 2023
98903bc
Merge branch 'apache:master' into solr-cassandra-match
RKuttruff Mar 20, 2023
268bd8e
Merge branch 'apache:master' into solr-cassandra-match
RKuttruff Mar 23, 2023
66aacca
Merge remote-tracking branch 'origin/master' into solr-cassandra-match
Mar 30, 2023
deafc2a
Merge remote-tracking branch 'origin/master' into solr-cassandra-match
Mar 31, 2023
0a85096
Merge branch 'apache:master' into solr-cassandra-match
RKuttruff May 4, 2023
b06e994
Merge remote-tracking branch 'origin/master' into solr-cassandra-match
May 16, 2023
ac66e14
Merge branch 'apache:master' into solr-cassandra-match
RKuttruff May 17, 2023
d994835
Merge branch 'apache:master' into solr-cassandra-match
RKuttruff May 18, 2023
ca109a6
Added scripts for orphan tile verification and deletion
Jun 15, 2023
58aa17c
Merge branch 'apache:master' into solr-cassandra-match
RKuttruff Jun 22, 2023
0c1a467
Merge remote-tracking branch 'RKuttruff/solr-cassandra-match' into so…
Jun 22, 2023
bc66ce5
Merge branch 'apache:master' into solr-cassandra-match
RKuttruff Jul 10, 2023
159ae07
Merge remote-tracking branch 'RKuttruff/solr-cassandra-match' into so…
Jul 10, 2023
96c6d9d
Merge branch 'apache:master' into solr-cassandra-match
RKuttruff Aug 1, 2023
ccc40a4
Merge remote-tracking branch 'origin/master' into solr-cassandra-match
Aug 21, 2023
d16c3bc
Merge remote-tracking branch 'origin/master' into solr-cassandra-match
Aug 22, 2023
b114de4
Merge branch 'apache:master' into solr-cassandra-match
RKuttruff Aug 23, 2023
0f89a95
Merge branch 'apache:master' into solr-cassandra-match
RKuttruff Sep 6, 2023
a1e5446
Merge branch 'apache:master' into solr-cassandra-match
RKuttruff Sep 7, 2023
2848817
Merge branch 'apache:master' into solr-cassandra-match
RKuttruff Sep 14, 2023
555836e
Merge remote-tracking branch 'origin/master' into solr-cassandra-match
Sep 14, 2023
d373140
Merge branch 'apache:master' into solr-cassandra-match
RKuttruff Nov 21, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- SDAP-473: Added support for matchup job prioritization
- SDAP-483: Added `.asf.yaml` to configure Jira auto-linking.
- SDAP-487: Added script to migrate existing `doms.doms_data` data to new schema.
- Added tool to verify tile data matches between Solr & Cassandra
### Changed
- SDAP-453: Updated results storage and retrieval to support output JSON from `/cdmsresults` that matches output from `/match_spark`.
- **NOTE:** Deploying these changes to an existing SDAP deployment will require modifying the Cassandra database with stored results. There is a script to do so at `/tools/update-doms-data-schema/update.py`
Expand Down
1 change: 1 addition & 0 deletions analysis/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

try:
check_call(['mamba', 'install', '-y', '-c', 'conda-forge', '--file', 'conda-requirements.txt'])
pass
except (CalledProcessError, IOError) as e:
print('Failed install with mamba; falling back to conda')
try:
Expand Down
4 changes: 4 additions & 0 deletions docker/nexus-webapp/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ RUN pip3 install -r requirements.txt
RUN pip3 install cython
RUN rm requirements.txt

WORKDIR /incubator-sdap-nexus/tools/solr-cassandra-match
RUN pip3 install -r requirements.txt
RUN rm requirements.txt

WORKDIR /incubator-sdap-nexus

# Upgrade kubernetes client jar from the default version
Expand Down
225 changes: 225 additions & 0 deletions tools/solr-cassandra-match/deletebyid.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import argparse
import uuid
import json
import asyncio

import logging
from datetime import datetime
from functools import partial
from time import sleep
from typing import List, Tuple, Union

import cassandra.concurrent
from cassandra.auth import PlainTextAuthProvider
from cassandra.cluster import Cluster, ExecutionProfile, EXEC_PROFILE_DEFAULT
from cassandra.policies import RoundRobinPolicy, TokenAwarePolicy
from six.moves import input
from solrcloudpy import SolrConnection, SearchOptions
from tenacity import retry, stop_after_attempt, wait_exponential
from tqdm import tqdm


logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(threadName)s] [%(levelname)s] [%(name)s::%(lineno)d] %(message)s"
)

logger = logging.getLogger('delete-by-id')

logger.setLevel(logging.INFO)
logging.getLogger().handlers[0].setFormatter(
logging.Formatter(
fmt="%(asctime)s [%(threadName)s] [%(levelname)s] [%(name)s::%(lineno)d] %(message)s",
datefmt="%Y-%m-%dT%H:%M:%S"
))

logging.getLogger('cassandra').setLevel(logging.CRITICAL)
logging.getLogger('solrcloudpy').setLevel(logging.CRITICAL)

CASSANDRA_BATCH_SIZE = 8192
SOLR_BATCH_SIZE = 256


def delete_from_solr(args, ids):
pass


def delete_from_cassandra(args, ids):
logger.info('Trying to connect to Cassandra...')

dc_policy = RoundRobinPolicy()
token_policy = TokenAwarePolicy(dc_policy)

if args.cassandraUsername and args.cassandraPassword:
auth_provider = PlainTextAuthProvider(username=args.cassandraUsername, password=args.cassandraPassword)
else:
auth_provider = None

cassandra_cluster = Cluster(contact_points=args.cassandra, port=args.cassandraPort,
protocol_version=int(args.cassandraProtocolVersion),
execution_profiles={
EXEC_PROFILE_DEFAULT: ExecutionProfile(load_balancing_policy=token_policy)
},
auth_provider=auth_provider)
cassandra_session = cassandra_cluster.connect(keyspace=args.cassandraKeyspace)

logger.info('Successfully connected to Cassandra')

cassandra_table = args.cassandraTable

batches = [ids[i:i + CASSANDRA_BATCH_SIZE] for i in range(0, len(ids), CASSANDRA_BATCH_SIZE)]

logger.info(f'Prepared {len(batches):,} batches of tile ids to delete')

start = datetime.now()

n_tiles = len(ids)
deleting = 0

prepared_query = cassandra_session.prepare('DELETE FROM %s WHERE tile_id=?' % cassandra_table)

@retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=10, max=60))
def delete_batch(batch):
for tile_id in batch:
futures.append(cassandra_session.execute_async(prepared_query, (tile_id,)))

for f in futures:
try:
f.result()
except:
logger.warning('Batch delete failed; maybe retrying')
raise

for batch in batches:
futures = []

deleting += len(batch)

logger.info(
f'Deleting batch of {len(batch)} tiles from Cassandra | '
f'({deleting}/{n_tiles}) [{deleting / n_tiles * 100:7.3f}%]')

try:
delete_batch(batch)
except:
logger.critical('Failed to delete batch after multiple retries, exiting')
exit(1)

logger.info(f'Deleted {len(ids):,} tiles from Cassandra in {str(datetime.now() - start)} seconds')


def read_ids(args):
logger.info(f'Reading ids from file {args.id_file}')

with open(args.id_file) as f:
ids = json.load(f)

return [uuid.UUID(i) for i in ids]


def main(args):
ids = read_ids(args)

logger.info(f'Successfully read {len(ids):,} tile ids to delete')

if args.target == 'solr':
delete_from_solr(args, ids)
else:
delete_from_cassandra(args, ids)


def parse_args():
parser = argparse.ArgumentParser(description='Delete a list of tile ids from either Solr or Cassandra',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)

parser.add_argument('--target',
required=True,
choices=['solr', 'cassandra'],
help='Store to delete data from',
dest='target')

parser.add_argument('-i', '--id-list',
required=True,
dest='id_file',
help='Path to JSON file containing a list of tile UUIDs to delete')

parser.add_argument('--solr',
help='The url of the SOLR server.',
default='localhost:8983',
metavar='127.0.0.1:8983')

parser.add_argument('--collection',
help='The name of the SOLR collection.',
required=False,
default='nexustiles',
metavar='nexustiles')

parser.add_argument('--solrIdField',
help='The name of the unique ID field for this collection.',
required=False,
default='id',
metavar='id')

parser.add_argument('--cassandra',
help='The hostname(s) or IP(s) of the Cassandra server(s).',
default=['localhost'],
nargs='+',
metavar=('127.0.0.100', '127.0.0.101'))

parser.add_argument('-k', '--cassandraKeyspace',
help='The Cassandra keyspace.',
default='nexustiles',
required=False,
metavar='nexustiles')

parser.add_argument('-t', '--cassandraTable',
help='The name of the cassandra table.',
required=False,
default='sea_surface_temp')

parser.add_argument('-p', '--cassandraPort',
help='The port used to connect to Cassandra.',
required=False,
default='9042')

parser.add_argument('--cassandraUsername',
help='The username used to connect to Cassandra.',
default='cassandra',
required=False)

parser.add_argument('--cassandraPassword',
help='The password used to connect to Cassandra.',
default='cassandra',
required=False)

parser.add_argument('-pv', '--cassandraProtocolVersion',
help='The version of the Cassandra protocol the driver should use.',
required=False,
choices=['1', '2', '3', '4', '5'],
default='3')

parser.add_argument('-v', '--verbose', dest='verbose', action='store_true', help='Enable verbose output')

return parser.parse_args()


if __name__ == '__main__':
args = parse_args()
main(args)


Loading