2023-02-28 22:48:34 +00:00
import os
from datetime import datetime
import csv
import hashlib
import pandas as pd
2023-03-10 12:50:14 +00:00
from functools import partial
2023-02-28 22:48:34 +00:00
2025-02-21 17:47:55 +00:00
from utils . settings import CSV_DIR , BB_GRADEBOOKS_DIR , BB_SUBMISSIONS_DIR , MIN_FILESIZE_IN_BYTES
2023-02-28 22:48:34 +00:00
2023-03-02 23:22:49 +00:00
def load_excluded_filenames ( submissions_dir_name : str ) - > list [ str ] : # helper function for hashing all files
csv_file_path = os . path . join ( CSV_DIR , f ' { submissions_dir_name } _excluded.csv ' )
if not os . path . exists ( csv_file_path ) : # if csv file with excluded file names for submission does not exist
2024-02-23 21:23:23 +00:00
print ( f ' [WARNING] Cannot find CSV file with list of excluded file names: { csv_file_path } \n [INFO] All files will be hashed & inspected ' , flush = True )
2023-03-02 23:22:49 +00:00
return [ ] # return empty list to continue without any excluded file names
else : # if csv file with excluded file names for submission exists
try :
df = pd . read_csv ( csv_file_path )
filename_list = df [ ' exclude_filename ' ] . tolist ( ) # get the values of the 'filename' column as a list
2023-03-03 13:13:28 +00:00
filename_list = [ f . lower ( ) for f in filename_list ] # convert to lowercase for comparison with submission files
2024-02-23 21:23:23 +00:00
print ( f ' [INFO] Using CSV file with list of excluded file names: { csv_file_path } ' , flush = True )
2023-03-02 23:22:49 +00:00
return filename_list
except Exception as e : # any exception, print error and return empty list to continue without any excluded file names
2024-02-23 21:23:23 +00:00
print ( f ' [WARNING] Unable to load / read CSV file with list of excluded file names: { csv_file_path } \n [INFO] All files will be hashed & inspected ' , flush = True )
print ( f ' [INFO] Error message: { e } ' , flush = True )
2023-03-02 23:22:49 +00:00
return [ ]
def get_hashes_in_dir ( dir_path : str , excluded_filenames : list = [ ] ) - > list : # helper function for hashing all files
2023-02-28 22:48:34 +00:00
hash_list = [ ]
2023-02-28 23:26:23 +00:00
for subdir , dirs , files in os . walk ( dir_path ) : # loop through all files in the directory and generate hashes
2023-03-02 23:22:49 +00:00
for filename in files :
2023-03-03 13:13:28 +00:00
if filename . lower ( ) not in excluded_filenames : # convert to lowercase for comparison with excluded files & do not hash if in the excluded list
2023-03-02 23:22:49 +00:00
filepath = os . path . join ( subdir , filename )
2025-02-21 17:47:55 +00:00
if os . path . getsize ( filepath ) > MIN_FILESIZE_IN_BYTES : # file size more than MIN_FILESIZE_IN_BYTES (as set in settings.py)
with open ( filepath , ' rb ' ) as f :
filehash = hashlib . sha256 ( f . read ( ) ) . hexdigest ( )
#if filehash != 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855': # do not include hashes of empty files
2023-03-03 13:48:56 +00:00
hash_list . append ( { ' filepath ' : filepath , ' filename ' : filename , ' sha256 hash ' : filehash } )
2025-02-21 17:47:55 +00:00
# else:
# print(f'size: {os.path.getsize(filepath)}B, {filepath}')
2023-02-28 22:48:34 +00:00
return hash_list
2023-03-10 12:50:14 +00:00
def generate_hashes_gradebook ( gradebook_dir_path : str ) - > str : # main function for hashing all files in gradebook
2023-03-10 12:57:37 +00:00
gradebook_dir_name = os . path . abspath ( gradebook_dir_path ) . split ( os . path . sep ) [ - 1 ] # get name of gradebook by separating path and use rightmost part
2023-03-10 12:50:14 +00:00
if not os . path . isdir ( gradebook_dir_path ) :
2024-02-26 20:48:45 +00:00
exit ( f ' Directory { gradebook_dir_path } does not exist. \n Make sure " { gradebook_dir_name } " exists in " { BB_GRADEBOOKS_DIR } " . \n ' )
2023-03-10 12:50:14 +00:00
dicts_with_hashes_list = get_hashes_in_dir ( gradebook_dir_path )
for hash_dict in dicts_with_hashes_list :
student_id = hash_dict [ ' filename ' ] . split ( ' _attempt_ ' ) [ 0 ] . split ( ' _ ' ) [ - 1 ]
2024-11-05 23:20:19 +00:00
relative_path = os . path . join ( ' .. ' , hash_dict [ " filepath " ] )
hash_dict [ ' filename ' ] = f ' =HYPERLINK( " { relative_path } " , " { hash_dict [ " filename " ] } " ) '
2023-03-10 12:50:14 +00:00
del hash_dict [ ' filepath ' ]
hash_dict . update ( { ' Student ID ' : student_id } )
2023-02-28 22:48:34 +00:00
os . makedirs ( CSV_DIR , exist_ok = True )
2023-03-10 12:50:14 +00:00
csv_file_name = f ' { gradebook_dir_name } _gradebook_file_hashes_ { datetime . now ( ) . strftime ( " % Y % m %d - % H % M % S " ) } .csv '
csv_file_path = os . path . join ( CSV_DIR , csv_file_name )
2024-10-24 22:57:20 +01:00
with open ( csv_file_path , ' w ' , newline = ' ' , encoding = ' utf-8 ' ) as csvfile : # open the output CSV file for writing
2023-03-10 12:50:14 +00:00
fieldnames = [ ' Student ID ' , ' filename ' , ' sha256 hash ' ]
writer = csv . DictWriter ( csvfile , fieldnames = fieldnames )
writer . writeheader ( )
writer . writerows ( dicts_with_hashes_list )
2024-02-23 21:23:23 +00:00
print ( f ' [INFO] Created CSV file with all files & hashes in gradebook: { gradebook_dir_name } \n CSV file: { csv_file_path } ' , flush = True )
2023-03-10 12:50:14 +00:00
return csv_file_path
def generate_hashes_submissions ( submissions_dir_path : str ) - > str : # main function for hashing all files in submissions
2023-02-28 23:26:23 +00:00
submissions_dir_name = os . path . abspath ( submissions_dir_path ) . split ( os . path . sep ) [ - 1 ] # get name of submission/assignment by separating path and use rightmost part
2023-03-10 12:50:14 +00:00
if not os . path . isdir ( submissions_dir_path ) :
2024-02-26 20:48:45 +00:00
exit ( f ' Directory { submissions_dir_path } does not exist. \n Make sure " { submissions_dir_name } " exists in " { BB_SUBMISSIONS_DIR } " . \n ' )
2023-03-10 12:50:14 +00:00
2023-03-02 23:22:49 +00:00
excluded_filenames = load_excluded_filenames ( submissions_dir_name )
2023-03-10 12:50:14 +00:00
dicts_with_hashes_list = [ ]
for student_dir_name in os . listdir ( submissions_dir_path ) : # loop through each student dir to get hashes for all files per student
student_dir_path = os . path . join ( submissions_dir_path , student_dir_name )
student_dicts_with_hashes_list = get_hashes_in_dir ( student_dir_path , excluded_filenames ) # dict with hashes for all student files - except for 'excluded' file names
student_dicts_list = [ ]
for hash_dict in student_dicts_with_hashes_list :
hash_dict . update ( { ' Student ID ' : student_dir_name } ) # update hash records with student id
2024-11-05 23:20:19 +00:00
relative_path = os . path . join ( ' .. ' , hash_dict [ " filepath " ] )
hash_dict [ ' filepath ' ] = f ' =HYPERLINK( " { relative_path } " , " { hash_dict [ " filepath " ] } " ) '
hash_dict [ ' filename ' ] = f ' =HYPERLINK( " { relative_path } " , " { hash_dict [ " filename " ] } " ) '
2023-03-10 12:50:14 +00:00
student_dicts_list . append ( hash_dict ) # append file dict to student list of dict for csv export
dicts_with_hashes_list . append ( student_dicts_list ) # append student hashes to main list with all submissions
2023-03-02 23:22:49 +00:00
2023-03-10 12:50:14 +00:00
os . makedirs ( CSV_DIR , exist_ok = True )
csv_file_name = f ' { submissions_dir_name } _submissions_file_hashes_ { datetime . now ( ) . strftime ( " % Y % m %d - % H % M % S " ) } .csv '
2023-02-28 22:48:34 +00:00
csv_file_path = os . path . join ( CSV_DIR , csv_file_name )
2023-03-10 12:50:14 +00:00
2024-10-24 22:57:20 +01:00
with open ( csv_file_path , ' w ' , newline = ' ' , encoding = ' utf-8 ' ) as csvfile : # open the output CSV file for writing
2023-02-28 23:09:33 +00:00
fieldnames = [ ' Student ID ' , ' filepath ' , ' filename ' , ' sha256 hash ' ]
2023-02-28 22:48:34 +00:00
writer = csv . DictWriter ( csvfile , fieldnames = fieldnames )
writer . writeheader ( )
2023-03-10 12:50:14 +00:00
for student_dict in dicts_with_hashes_list :
writer . writerows ( student_dict )
2024-02-23 21:23:23 +00:00
print ( f ' [INFO] Created CSV file with all files & hashes for submissions in: { submissions_dir_name } \n CSV file: { csv_file_path } ' , flush = True )
2024-02-26 20:48:45 +00:00
return csv_file_path
2023-02-28 22:48:34 +00:00
2023-03-10 12:50:14 +00:00
def generate_duplicate_hashes_generic ( hashes_csv_file_path : str , drop_columns : list [ str ] ) :
2023-03-02 23:22:49 +00:00
csv = pd . read_csv ( hashes_csv_file_path )
df = pd . DataFrame ( csv ) # df with all files and their hashes
2023-03-03 13:13:28 +00:00
df_clean = df . drop ( columns = drop_columns ) # clear not needed columns
duplicate_hash = df_clean . loc [ df_clean . duplicated ( subset = [ ' sha256 hash ' ] , keep = False ) , : ] # all files with duplicate hash - incl. files from the same student id
# agg() for 'Student ID' True if more than 1 in groupby (= files with the same hash by multiple student ids)
# False if unique (= files from the same student id with the same hash)
hash_with_multiple_student_ids = duplicate_hash . groupby ( ' sha256 hash ' ) . agg ( lambda x : len ( x . unique ( ) ) > 1 )
# list with duplicate hashes - only if different student id (doesn't include files from same student id)
2023-03-10 12:50:14 +00:00
duplicate_hashes_list = hash_with_multiple_student_ids [ hash_with_multiple_student_ids [ ' Student ID ' ] == True ] . index . to_list ( )
2023-03-03 13:13:28 +00:00
2023-03-10 12:50:14 +00:00
files_with_duplicate_hash = df [ df [ ' sha256 hash ' ] . isin ( duplicate_hashes_list ) ] # df with all files with duplicate hash, excludes files from the same student id
df_duplicate = files_with_duplicate_hash . sort_values ( [ ' sha256 hash ' , ' Student ID ' ] ) # sort before output to csv
2023-03-02 23:22:49 +00:00
2023-03-10 12:50:14 +00:00
gradebook_or_submissions_str = os . path . basename ( hashes_csv_file_path ) . split ( ' _file_hashes_ ' ) [ 0 ] . split ( ' _ ' ) [ - 1 ] # 'gradebook' or 'submissions' depending on which files hashes csv is read
assignment_name = os . path . basename ( hashes_csv_file_path ) . split ( f ' _ { gradebook_or_submissions_str } _ ' ) [ 0 ]
csv_out = hashes_csv_file_path . rsplit ( ' _ ' , 1 ) [ 0 ] . replace ( ' file_hashes ' , ' duplicate_ ' ) + datetime . now ( ) . strftime ( " % Y % m %d - % H % M % S " ) + ' .csv '
2023-03-02 23:22:49 +00:00
try :
2023-03-10 12:50:14 +00:00
df_duplicate . to_csv ( csv_out , index = False )
2024-02-23 21:23:23 +00:00
print ( f ' [INFO] Created CSV file with duplicate hashes in { gradebook_or_submissions_str } : { assignment_name } \n CSV file: { csv_out } ' , flush = True )
2023-03-02 23:22:49 +00:00
except Exception as e :
2023-03-10 12:50:14 +00:00
exit ( f ' [ERROR] Something went wrong while trying to save csv file with duplicate hashes \n Error message: { e } ' )
2023-02-28 22:48:34 +00:00
2023-03-10 12:50:14 +00:00
# partials for generate_duplicate_hashes_generic(), setting the appropriate drop_columns for gradebook / submissions
generate_duplicate_hashes_gradebook = partial ( generate_duplicate_hashes_generic , drop_columns = [ ' filename ' ] )
generate_duplicate_hashes_submissions = partial ( generate_duplicate_hashes_generic , drop_columns = [ ' filepath ' , ' filename ' ] )