Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
__pycache__
229 changes: 229 additions & 0 deletions cleanup_tsv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
#!/usr/bin/env python3

# ####################################################################
#
# cleanup_tsv
#
# Cleanse invalid rows from sample TSV from RingDNA and load cleansed
# file into 'users' table in Redshift
#
# Usage:
#
# - These variables must be set in the host environment:
# REDSHIFT_USERNAME
# REDSHIFT_PASSWORD
#
# - See "variables that need changed" section below
#
# ####################################################################

import boto3
import csv
import os
import psycopg2


# validate row
def is_valid_row(test_row):
# check # of columns
if len(test_row) != 5:
return False
# sanity check last column (email) contains '@'
if '@' not in test_row[4]:
return False
# ensure first column (id) is an int
try:
int(test_row[0])
return True
except ValueError:
return False

# end is_valid_row()


# attempt to cleanse an invalid row
def fix_bad_row(bad_row):
# strip spaces and remove blank columns
candidate_row = [bad_row[0]] + \
[x.strip() for x in bad_row[1:] if x.strip() != '']
if is_valid_row(candidate_row):
return candidate_row

# check for account_number not starting with a digit
# if not, assume 3 name fields
if not candidate_row[3][0].isdigit():

# 2nd and 3rd names the same: delete duplicate
if candidate_row[2] == candidate_row[3]:
del candidate_row[3]

# otherwise join 2nd and 3rd names together
else:
candidate_row[2] = " ".join(candidate_row[2:4])
del candidate_row[3]

# check for validity now
if is_valid_row(candidate_row):
return candidate_row
# unaccounted for case
else:
return ['-1'] + bad_row

# unaccounted for case
else:
return ['-1'] + bad_row

# end fix_bad_row()


# upload file to an S3 bucket
def upload_to_s3(local_file, bucket, s3_file, profile):

try:
session = boto3.Session(profile_name=profile)
s3_client = session.client('s3')
s3_client.upload_file(local_file, bucket, s3_file)
print(" S3 upload is successful")
return True
except FileNotFoundError:
print(" local file not found")
return False
except Exception as err:
print(f" exception while uploading to S3: {err}")

# end upload_to_s3()


# load from S3 into users table in Redshift
def load_to_redshift(
bucket, s3_file, host_name, db_name, iam_role_arn, aws_region
):

'''
DDL for user table:

CREATE TABLE users (
id INTEGER PRIMARY KEY,
first_name VARCHAR(20),
last_name VARCHAR(30),
account_number VARCHAR(8) NOT NULL,
email VARCHAR(80) NOT NULL
)
'''

# get username and password from env variables
redshift_username = os.environ.get('REDSHIFT_USERNAME')
redshift_password = os.environ.get('REDSHIFT_PASSWORD')

try:
conn = psycopg2.connect(
host=host_name,
port=5439,
user=redshift_username,
password=redshift_password,
dbname=db_name)
print(" successfully connected to Redshift")
except Exception as err:
print(f" error connecting to Redshift: {err}")
return False

# Redshift does not enforce primary keys,
# so truncate table before load, in case of reload
sql = f'''TRUNCATE users;
COPY users FROM 's3://{bucket}/{s3_file}'
iam_role '{iam_role_arn}'
region '{aws_region}'
format as csv
delimiter '\t'
ignoreheader 1
emptyasnull;'''

try:
cur = conn.cursor()
cur.execute(sql)
conn.commit()
conn.close()
print(f" data successfully loaded into {db_name}.users table")
except Exception as err:
print(f" error loading into users table from S3: {err}")

# end load_to_redshift()


#
# variables that need changed
#
# would be better to make them command line parameters
# and/or read them from a config file...

# change these if different local filenames are desired
input_filename = "data/data.tsv"
output_filename = "data/cleansed.tsv"

# change these variables for appropriate values for AWS account
s3_bucket = 'mropp-ringdna'
s3_file = 'cleansed.tsv'
config_profile = 'dev'
redshift_host = 'cluster-1.cq4c9ckj1pmi.us-west-1.redshift.amazonaws.com'
db_name = 'dev'
redshift_iam_role = 'arn:aws:iam::891705061311:role/myRedshiftRole'
region = 'us-west-1'

#
# start main script
#
print(f"cleansing TSV: {input_filename}")
with open(input_filename, encoding='utf-16-le', newline='') as inputf, \
open(output_filename, 'w', encoding='utf-8', newline='') as outputf:
reader = csv.reader(inputf, delimiter='\t', strict=True)
writer = csv.writer(outputf, delimiter='\t', quotechar='"',
lineterminator='\n')

# get and write out the header row
header_row = next(reader)
writer.writerow(header_row)

prev_row = []
for row in reader:
if is_valid_row(row):
# write out id as is, with spaces stripped from other columns
writer.writerow([row[0]] + [x.strip() for x in row[1:]])

# process invalid row
else:
# if prev_row isn't empty, combine with current row
if prev_row != []:
combined = prev_row + row
if is_valid_row(combined):
writer.writerow([row[0]] + [x.strip() for x in row[1:]])
prev_row = []
else:
# if > 5 items and last item in list contains '@'
# then this is a "complete" bad row to be cleansed
if len(combined) > 5 and '@' in combined[-1:][0]:
fixed_row = fix_bad_row(combined)
if fixed_row[0] == '-1':
print(f"discarding row that couldn't be cleansed: "
f"{combined}")
else:
writer.writerow(fixed_row)
prev_row = []

# otherwise continue to next row with current combined
# as previous row value
else:
prev_row = combined

# otherwise, start of a bad row
else:
prev_row = row

print(f"output written to: {output_filename}")

print("uploading to S3")
if upload_to_s3(output_filename, s3_bucket, s3_file, config_profile):
print("loading into Redshift")
load_to_redshift(
s3_bucket, s3_file, redshift_host, db_name,
redshift_iam_role, region
)
34 changes: 34 additions & 0 deletions notes.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
anomalies observed:

- one or more spaces before and/or after values in all columns except the id

- one or more carriage returns/line feeds in the middle of a row

- one or more extra tabs in the middle of a row

- three consecutive name fields; when this occurs, assume 2nd and 3rd are the
last name and join with a space or remove one if identical

observations/judgements:

- all of the first column values (id) for the good rows can be cast to int
without a problem, so no need to trim spaces on those

- an email address has to contain a “@“ character, so can validate the 5th
column contains one; not worth the complexity or performance to do a full
email regex

- account numbers all start with [1-9], so can validate account number vs name
field by looking at that digit; optionally could have cleansed ‘-‘ and ‘/‘
from account numbers and tried converting to an int

- further normalization of names could be attempted by changing case, however,
names such as “McGrath” would be hard to deal with elegantly, and all upper
case or all lower case aren’t particularly appealing

- some otherwise valid rows have missing first name, last name, or both;
they are assumed to be allowed as the output is still a valid TSV and
sometimes that information might be missing for a user

- email addresses could easily be normalized by changing to all lower case if
needed or desired