--- a +++ b/qiita_db/user.py @@ -0,0 +1,925 @@ +r""" +User object (:mod:`qiita_db.user`) +================================== + +.. currentmodule:: qiita_db.user + +This modules provides the implementation of the User class. This is used for +handling creation, deletion, and login of users, as well as retrieval of all +studies and analyses that are owned by or shared with the user. + +Classes +------- + +.. autosummary:: + :toctree: generated/ + + User +""" +# ----------------------------------------------------------------------------- +# Copyright (c) 2014--, The Qiita Development Team. +# +# Distributed under the terms of the BSD 3-clause License. +# +# The full license is in the file LICENSE, distributed with this software. +# ----------------------------------------------------------------------------- +from re import sub +from datetime import datetime + +from qiita_core.exceptions import (IncorrectEmailError, IncorrectPasswordError, + IncompetentQiitaDeveloperError) +from qiita_core.qiita_settings import qiita_config + +import qiita_db as qdb + + +class User(qdb.base.QiitaObject): + """ + User object to access to the Qiita user information + + Attributes + ---------- + email + level + info + user_studies + shared_studies + default_analysis + private_analyses + shared_analyses + unread_messages + jobs + + Methods + ------- + change_password + generate_reset_code + change_forgot_password + iter + messages + mark_messages + delete_messages + """ + + _table = "qiita_user" + # The following columns are considered not part of the user info + _non_info = {"email", "user_level_id", "password"} + + def _check_id(self, id_): + r"""Check that the provided ID actually exists in the database + + Parameters + ---------- + id_ : object + The ID to test + + Notes + ----- + This function overwrites the base function, as sql layout doesn't + follow the same conventions done in the other classes. + """ + with qdb.sql_connection.TRN: + sql = """SELECT EXISTS( + SELECT * FROM qiita.qiita_user WHERE email = %s)""" + qdb.sql_connection.TRN.add(sql, [id_]) + return qdb.sql_connection.TRN.execute_fetchlast() + + @classmethod + def iter(cls): + """Iterates over all users, sorted by their email addresses + + Returns + ------- + generator + Yields a user ID (email) and name for each user in the database, + in order of ascending ID + """ + with qdb.sql_connection.TRN: + sql = """select email, name from qiita.{}""".format(cls._table) + qdb.sql_connection.TRN.add(sql) + # Using [-1] to get the results of the last SQL query + for result in qdb.sql_connection.TRN.execute_fetchindex(): + yield result + + @classmethod + def login(cls, email, password): + """Logs a user into the system + + Parameters + ---------- + email : str + The email of the user + password: str + The plaintext password of the user + + Returns + ------- + User object + Returns the User object corresponding to the login information + if correct login information + + Raises + ------ + IncorrectEmailError + Email passed is not a valid email + IncorrectPasswordError + Password passed is not correct for user + """ + with qdb.sql_connection.TRN: + # see if user exists + if not cls.exists(email): + raise IncorrectEmailError("Email not valid: %s" % email) + + if not validate_password(password): + raise IncorrectPasswordError("Password not valid!") + + # pull password out of database + sql = ("SELECT password, user_level_id FROM qiita.{0} WHERE " + "email = %s".format(cls._table)) + qdb.sql_connection.TRN.add(sql, [email]) + # Using [0] because there is only one row + info = qdb.sql_connection.TRN.execute_fetchindex()[0] + + # verify user email verification + # MAGIC NUMBER 5 = unverified email + if int(info[1]) == 5: + return False + + # verify password + dbpass = info[0] + hashed = qdb.util.hash_password(password, dbpass) + if hashed == dbpass: + return cls(email) + else: + raise IncorrectPasswordError("Password not valid!") + + @classmethod + def exists(cls, email): + """Checks if a user exists on the database + + Parameters + ---------- + email : str + the email of the user + """ + with qdb.sql_connection.TRN: + if not validate_email(email): + raise IncorrectEmailError("Email string not valid: %s" % email) + + sql = """SELECT EXISTS( + SELECT * FROM qiita.{0} + WHERE email = %s)""".format(cls._table) + qdb.sql_connection.TRN.add(sql, [email]) + return qdb.sql_connection.TRN.execute_fetchlast() + + @classmethod + def create(cls, email, password, info=None): + """Creates a new user on the database + + Parameters + ---------- + email : str + The email of the user - used for log in + password : + The plaintext password of the user + info: dict + Other information for the user keyed to table column name + + Raises + ------ + IncorrectPasswordError + Password string given is not proper format + IncorrectEmailError + Email string given is not a valid email + QiitaDBDuplicateError + User already exists + """ + with qdb.sql_connection.TRN: + # validate email and password for new user + if not validate_email(email): + raise IncorrectEmailError("Bad email given: %s" % email) + if not validate_password(password): + raise IncorrectPasswordError("Bad password given!") + + # make sure user does not already exist + if cls.exists(email): + raise qdb.exceptions.QiitaDBDuplicateError( + "User", "email: %s" % email) + + # make sure non-info columns aren't passed in info dict + if info: + if cls._non_info.intersection(info): + raise qdb.exceptions.QiitaDBColumnError( + "non info keys passed: %s" % + cls._non_info.intersection(info)) + else: + info = {} + + # create email verification code and hashed password to insert + # add values to info + info["email"] = email + info["password"] = qdb.util.hash_password(password) + info["user_verify_code"] = qdb.util.create_rand_string( + 20, punct=False) + + # make sure keys in info correspond to columns in table + qdb.util.check_table_cols(info, cls._table) + + # build info to insert making sure columns and data are in + # same order for sql insertion + columns = info.keys() + values = [info[col] for col in columns] + # crete user + sql = "INSERT INTO qiita.{0} ({1}) VALUES ({2})".format( + cls._table, ','.join(columns), ','.join(['%s'] * len(values))) + qdb.sql_connection.TRN.add(sql, values) + + # Add system messages to user + sql = """INSERT INTO qiita.message_user (email, message_id) + SELECT %s, message_id FROM qiita.message + WHERE expiration > %s""" + qdb.sql_connection.TRN.add(sql, [email, datetime.now()]) + + qdb.sql_connection.TRN.execute() + + return cls(email) + + @classmethod + def verify_code(cls, email, code, code_type): + """Verify that a code and email match + + Parameters + ---------- + email : str + email address of the user + code : str + code to verify + code_type : {'create', 'reset'} + type of code being verified, whether creating user or reset pass. + + Returns + ------- + bool + + Raises + ------ + IncompentQiitaDeveloper + code_type is not create or reset + QiitaDBError + User has no code of the given type + """ + with qdb.sql_connection.TRN: + if code_type == 'create': + column = 'user_verify_code' + elif code_type == 'reset': + column = 'pass_reset_code' + else: + raise IncompetentQiitaDeveloperError( + "code_type must be 'create' or 'reset' Uknown type %s" + % code_type) + sql = "SELECT {0} FROM qiita.{1} WHERE email = %s".format( + column, cls._table) + qdb.sql_connection.TRN.add(sql, [email]) + db_code = qdb.sql_connection.TRN.execute_fetchindex() + + if not db_code: + return False + + db_code = db_code[0][0] + if db_code is None: + raise qdb.exceptions.QiitaDBError( + "No %s code for user %s" % (column, email)) + + correct_code = db_code == code + + if correct_code: + sql = """UPDATE qiita.{0} SET {1} = NULL + WHERE email = %s""".format(cls._table, column) + qdb.sql_connection.TRN.add(sql, [email]) + + if code_type == "create": + # verify the user + level = qdb.util.convert_to_id( + 'user', 'user_level', 'name') + sql = """UPDATE qiita.{} SET user_level_id = %s + WHERE email = %s""".format(cls._table) + qdb.sql_connection.TRN.add(sql, [level, email]) + + # create user default sample holders once verified + # create one per portal + sql = "SELECT portal_type_id FROM qiita.portal_type" + qdb.sql_connection.TRN.add(sql) + + an_sql = """INSERT INTO qiita.analysis + (email, name, description, dflt) + VALUES (%s, %s, %s, %s) + RETURNING analysis_id""" + ap_sql = """INSERT INTO qiita.analysis_portal + (analysis_id, portal_type_id) + VALUES (%s, %s)""" + + portal_ids = qdb.sql_connection.TRN.execute_fetchflatten() + for portal_id in portal_ids: + args = [email, '%s-dflt-%d' % (email, portal_id), + 'dflt', True] + qdb.sql_connection.TRN.add(an_sql, args) + an_id = qdb.sql_connection.TRN.execute_fetchlast() + qdb.sql_connection.TRN.add(ap_sql, [an_id, portal_id]) + + qdb.sql_connection.TRN.execute() + + return correct_code + + @classmethod + def delete(cls, email, force=False): + if not cls.exists(email): + raise IncorrectEmailError(f'This email does not exist: {email}') + + tables = ['qiita.study_users', 'qiita.study_tags', + 'qiita.processing_job_workflow', 'qiita.processing_job', + 'qiita.message_user', 'qiita.analysis_users', + 'qiita.analysis'] + + not_empty = [] + for t in tables: + with qdb.sql_connection.TRN: + sql = f"SELECT COUNT(email) FROM {t} WHERE email = %s" + qdb.sql_connection.TRN.add(sql, [email]) + count = qdb.sql_connection.TRN.execute_fetchflatten()[0] + if count != 0: + not_empty.append(t) + + if not_empty and not force: + raise ValueError(f'These tables are not empty: "{not_empty}", ' + 'delete them first or use `force=True`') + + sql = """ + DELETE FROM qiita.study_users WHERE email = %(email)s; + DELETE FROM qiita.study_tags WHERE email = %(email)s; + DELETE FROM qiita.processing_job_workflow_root + WHERE processing_job_workflow_id IN ( + SELECT processing_job_workflow_id + FROM qiita.processing_job_workflow + WHERE email = %(email)s); + DELETE FROM qiita.processing_job_workflow WHERE email = %(email)s; + DELETE FROM qiita.processing_job_validator + WHERE processing_job_id IN ( + SELECT processing_job_id + FROM qiita.processing_job + WHERE email = %(email)s); + DELETE FROM qiita.analysis_processing_job + WHERE processing_job_id IN ( + SELECT processing_job_id + FROM qiita.processing_job + WHERE email = %(email)s); + DELETE FROM qiita.artifact_output_processing_job + WHERE processing_job_id IN ( + SELECT processing_job_id + FROM qiita.processing_job + WHERE email = %(email)s); + DELETE FROM qiita.artifact_processing_job + WHERE processing_job_id IN ( + SELECT processing_job_id + FROM qiita.processing_job + WHERE email = %(email)s); + DELETE FROM qiita.parent_processing_job WHERE parent_id IN ( + SELECT processing_job_id + FROM qiita.processing_job + WHERE email = %(email)s); + DELETE FROM qiita.processing_job WHERE email = %(email)s; + DELETE FROM qiita.message_user WHERE email = %(email)s; + DELETE FROM qiita.analysis_users WHERE email = %(email)s; + DELETE FROM qiita.analysis_portal WHERE analysis_id IN ( + SELECT analysis_id + FROM qiita.analysis + WHERE email = %(email)s); + DELETE FROM qiita.analysis_artifact WHERE analysis_id IN ( + SELECT analysis_id + FROM qiita.analysis + WHERE email = %(email)s); + DELETE FROM qiita.analysis_filepath WHERE analysis_id IN ( + SELECT analysis_id + FROM qiita.analysis + WHERE email = %(email)s); + DELETE FROM qiita.analysis_sample WHERE analysis_id IN ( + SELECT analysis_id + FROM qiita.analysis + WHERE email = %(email)s); + DELETE FROM qiita.analysis WHERE email = %(email)s; + DELETE FROM qiita.qiita_user WHERE email = %(email)s;""" + + with qdb.sql_connection.TRN: + qdb.sql_connection.TRN.add(sql, {'email': email}) + qdb.sql_connection.TRN.execute() + + # ---properties--- + @property + def email(self): + """The email of the user""" + return self._id + + @property + def password(self): + """The password of the user""" + with qdb.sql_connection.TRN: + # pull password out of database + sql = "SELECT password FROM qiita.{0} WHERE email = %s".format( + self._table) + qdb.sql_connection.TRN.add(sql, [self.email]) + + return qdb.sql_connection.TRN.execute_fetchlast() + + @property + def level(self): + """The level of privileges of the user""" + with qdb.sql_connection.TRN: + sql = """SELECT ul.name + FROM qiita.user_level ul + JOIN qiita.{0} u + ON ul.user_level_id = u.user_level_id + WHERE u.email = %s""".format(self._table) + qdb.sql_connection.TRN.add(sql, [self._id]) + return qdb.sql_connection.TRN.execute_fetchlast() + + @property + def info(self): + """Dict with any other information attached to the user""" + with qdb.sql_connection.TRN: + sql = "SELECT * from qiita.{0} WHERE email = %s".format( + self._table) + # Need direct typecast from psycopg2 dict to standard dict + qdb.sql_connection.TRN.add(sql, [self._id]) + # [0] retrieves the first row (the only one present) + info = dict(qdb.sql_connection.TRN.execute_fetchindex()[0]) + # Remove non-info columns + for col in self._non_info: + info.pop(col) + return info + + @info.setter + def info(self, info): + """Updates the information attached to the user + + Parameters + ---------- + info : dict + """ + with qdb.sql_connection.TRN: + # make sure non-info columns aren't passed in info dict + if self._non_info.intersection(info): + raise qdb.exceptions.QiitaDBColumnError( + "non info keys passed!") + + # make sure keys in info correspond to columns in table + qdb.util.check_table_cols(info, self._table) + + # build sql command and data to update + sql_insert = [] + data = [] + # items used for py3 compatability + for key, val in info.items(): + sql_insert.append("{0} = %s".format(key)) + data.append(val) + data.append(self._id) + + sql = ("UPDATE qiita.{0} SET {1} WHERE " + "email = %s".format(self._table, ','.join(sql_insert))) + qdb.sql_connection.TRN.add(sql, data) + qdb.sql_connection.TRN.execute() + + @property + def default_analysis(self): + with qdb.sql_connection.TRN: + sql = """SELECT analysis_id + FROM qiita.analysis + JOIN qiita.analysis_portal USING (analysis_id) + JOIN qiita.portal_type USING (portal_type_id) + WHERE email = %s AND dflt = true AND portal = %s""" + qdb.sql_connection.TRN.add(sql, [self._id, qiita_config.portal]) + return qdb.analysis.Analysis( + qdb.sql_connection.TRN.execute_fetchlast()) + + @property + def user_studies(self): + """Returns a list of study ids owned by the user""" + with qdb.sql_connection.TRN: + sql = """SELECT study_id + FROM qiita.study + JOIN qiita.study_portal USING (study_id) + JOIN qiita.portal_type USING (portal_type_id) + WHERE email = %s AND portal = %s""" + qdb.sql_connection.TRN.add(sql, [self._id, qiita_config.portal]) + return set( + qdb.study.Study(sid) + for sid in qdb.sql_connection.TRN.execute_fetchflatten()) + + @property + def shared_studies(self): + """Returns a list of study ids shared with the user""" + with qdb.sql_connection.TRN: + sql = """SELECT study_id + FROM qiita.study_users + JOIN qiita.study_portal USING (study_id) + JOIN qiita.portal_type USING (portal_type_id) + WHERE email = %s and portal = %s""" + qdb.sql_connection.TRN.add(sql, [self._id, qiita_config.portal]) + return set( + qdb.study.Study(sid) + for sid in qdb.sql_connection.TRN.execute_fetchflatten()) + + @property + def private_analyses(self): + """Returns a list of private analysis ids owned by the user""" + with qdb.sql_connection.TRN: + sql = """SELECT analysis_id FROM qiita.analysis + JOIN qiita.analysis_portal USING (analysis_id) + JOIN qiita.portal_type USING (portal_type_id) + WHERE email = %s AND dflt = false AND portal = %s""" + qdb.sql_connection.TRN.add(sql, [self._id, qiita_config.portal]) + return set( + qdb.analysis.Analysis(aid) + for aid in qdb.sql_connection.TRN.execute_fetchflatten()) + + @property + def shared_analyses(self): + """Returns a list of analysis ids shared with the user""" + with qdb.sql_connection.TRN: + sql = """SELECT analysis_id FROM qiita.analysis_users + JOIN qiita.analysis_portal USING (analysis_id) + JOIN qiita.portal_type USING (portal_type_id) + WHERE email = %s AND portal = %s""" + qdb.sql_connection.TRN.add(sql, [self._id, qiita_config.portal]) + return set( + qdb.analysis.Analysis(aid) + for aid in qdb.sql_connection.TRN.execute_fetchflatten()) + + @property + def unread_messages(self): + """Returns all unread messages for a user""" + with qdb.sql_connection.TRN: + sql = """SELECT message_id, message, message_time, read + FROM qiita.message_user + JOIN qiita.message USING (message_id) + WHERE email = %s AND read = FALSE + ORDER BY message_time DESC""" + qdb.sql_connection.TRN.add(sql, [self._id]) + return qdb.sql_connection.TRN.execute_fetchindex() + + @property + def slurm_parameters(self): + "Returns the slumn parameters for this user given by its user level" + with qdb.sql_connection.TRN: + sql = """SELECT slurm_parameters + FROM qiita.user_level + JOIN qiita.qiita_user USING (user_level_id) + WHERE email = %s""" + qdb.sql_connection.TRN.add(sql, [self._id]) + return qdb.sql_connection.TRN.execute_fetchflatten()[0] + + # ------- methods --------- + def user_artifacts(self, artifact_type=None): + """Returns the artifacts owned by the user, grouped by study + + Parameters + ---------- + artifact_type : str, optional + The artifact type to retrieve. Default: retrieve all artfact types + + Returns + ------- + dict of {qiita_db.study.Study: list of qiita_db.artifact.Artifact} + The artifacts owned by the user + """ + with qdb.sql_connection.TRN: + sql_args = [self.id, qiita_config.portal] + sql_a_type = "" + if artifact_type: + sql_a_type = " AND artifact_type = %s" + sql_args.append(artifact_type) + + sql = """SELECT study_id, array_agg( + artifact_id ORDER BY artifact_id) + FROM qiita.study + JOIN qiita.study_portal USING (study_id) + JOIN qiita.portal_type USING (portal_type_id) + JOIN qiita.study_artifact USING (study_id) + JOIN qiita.artifact USING (artifact_id) + JOIN qiita.artifact_type USING (artifact_type_id) + WHERE email = %s AND portal = %s{0} + GROUP BY study_id + ORDER BY study_id""".format(sql_a_type) + qdb.sql_connection.TRN.add(sql, sql_args) + db_res = dict(qdb.sql_connection.TRN.execute_fetchindex()) + res = {} + for s_id, artifact_ids in db_res.items(): + res[qdb.study.Study(s_id)] = [ + qdb.artifact.Artifact(a_id) for a_id in artifact_ids] + + return res + + def change_password(self, oldpass, newpass): + """Changes the password from oldpass to newpass + + Parameters + ---------- + oldpass : str + User's old password + newpass : str + User's new password + + Returns + ------- + bool + password changed or not + """ + with qdb.sql_connection.TRN: + sql = "SELECT password FROM qiita.{0} WHERE email = %s".format( + self._table) + qdb.sql_connection.TRN.add(sql, [self._id]) + dbpass = qdb.sql_connection.TRN.execute_fetchlast() + if dbpass == qdb.util.hash_password(oldpass, dbpass): + self._change_pass(newpass) + return True + return False + + def generate_reset_code(self): + """Generates a password reset code for user""" + reset_code = qdb.util.create_rand_string(20, punct=False) + sql = """UPDATE qiita.{0} + SET pass_reset_code = %s, pass_reset_timestamp = NOW() + WHERE email = %s""".format(self._table) + qdb.sql_connection.perform_as_transaction(sql, [reset_code, self._id]) + + def change_forgot_password(self, code, newpass): + """Changes the password if the code is valid + + Parameters + ---------- + code : str + User's forgotten password ID code + newpass : str + User's new password + + Returns + ------- + bool + password changed or not + """ + with qdb.sql_connection.TRN: + if self.verify_code(self._id, code, "reset"): + self._change_pass(newpass) + return True + return False + + def _change_pass(self, newpass): + if not validate_password(newpass): + raise IncorrectPasswordError("Bad password given!") + + sql = """UPDATE qiita.{0} + SET password=%s, pass_reset_code = NULL + WHERE email = %s""".format(self._table) + qdb.sql_connection.perform_as_transaction( + sql, [qdb.util.hash_password(newpass), self._id]) + + def messages(self, count=None): + """Return messages in user's queue + + Parameters + ---------- + count : int, optional + Number of messages to return, starting with newest. Default all + + Returns + ------- + list of tuples + Messages in the queue, in the form + [(msg_id, msg, timestamp, read, system_message), ...] + + Notes + ----- + system_message is a bool. When True, this is a systemwide message. + """ + with qdb.sql_connection.TRN: + sql_info = [self._id] + sql = """SELECT message_id, message, message_time, read, + (expiration IS NOT NULL) AS system_message + FROM qiita.message_user + JOIN qiita.message USING (message_id) + WHERE email = %s ORDER BY message_time DESC""" + if count is not None: + sql += " LIMIT %s" + sql_info.append(count) + qdb.sql_connection.TRN.add(sql, sql_info) + return qdb.sql_connection.TRN.execute_fetchindex() + + def mark_messages(self, messages, read=True): + """Mark given messages as read/unread + + Parameters + ---------- + messages : list of ints + Message IDs to mark as read/unread + read : bool, optional + Marks as read if True, unread if False. Default True + """ + with qdb.sql_connection.TRN: + sql = """UPDATE qiita.message_user + SET read = %s + WHERE message_id IN %s AND email = %s""" + qdb.sql_connection.TRN.add(sql, [read, tuple(messages), self._id]) + return qdb.sql_connection.TRN.execute_fetchindex() + + def delete_messages(self, messages): + """Delete given messages for the user + + Parameters + ---------- + messages : list of ints + Message IDs to delete + """ + with qdb.sql_connection.TRN: + # remove message from user + sql = """DELETE FROM qiita.message_user + WHERE message_id IN %s AND email = %s""" + qdb.sql_connection.TRN.add(sql, [tuple(messages), self._id]) + # Remove any messages that no longer are attached to a user + # and are not system messages + sql = """DELETE FROM qiita.message + WHERE message_id NOT IN + (SELECT DISTINCT message_id FROM qiita.message_user + UNION + SELECT message_id FROM qiita.message + WHERE expiration IS NOT NULL)""" + qdb.sql_connection.TRN.add(sql) + qdb.sql_connection.TRN.execute() + + def jobs(self, limit=30, ignore_status=['success'], show_hidden=False): + """Return jobs created by the user + + Parameters + ---------- + limit : int, optional + max number of rows to return + ignore_status: list of str, optional + don't retieve jobs that have one of these status + show_hidden: bool, optional + If true, return all jobs, including the hidden ones + + Returns + ------- + list of ProcessingJob + + """ + with qdb.sql_connection.TRN: + sql = """SELECT processing_job_id + FROM qiita.processing_job + LEFT JOIN qiita.processing_job_status + USING (processing_job_status_id) + WHERE email = %s + """ + + if ignore_status: + sql_info = [self._id, tuple(ignore_status), limit] + sql += " AND processing_job_status NOT IN %s" + else: + sql_info = [self._id, limit] + + if not show_hidden: + sql += ' AND hidden = false' + + sql += """ + ORDER BY CASE processing_job_status + WHEN 'in_construction' THEN 1 + WHEN 'running' THEN 2 + WHEN 'queued' THEN 3 + WHEN 'waiting' THEN 4 + WHEN 'error' THEN 5 + WHEN 'success' THEN 6 + END, heartbeat DESC LIMIT %s""" + + qdb.sql_connection.TRN.add(sql, sql_info) + return [qdb.processing_job.ProcessingJob(p[0]) + for p in qdb.sql_connection.TRN.execute_fetchindex()] + + def update_email(self, email): + if not validate_email(email): + raise IncorrectEmailError(f'Bad email given: {email}') + + if self.exists(email): + raise IncorrectEmailError(f'This email already exists: {email}') + + with qdb.sql_connection.TRN: + sql = 'UPDATE qiita.qiita_user SET email = %s where email = %s' + qdb.sql_connection.TRN.add(sql, [email, self.email]) + + +def validate_email(email): + """Validates an email + + Notes + ----- + An email address is of the form local-part@domain_part + For our purposes: + + - No quoted strings are allowed + - No unicode strings are allowed + - There must be exactly one @ symbol + - Neither local-part nor domain-part can be blank + - The local-part cannot start or end with a dot + - The local-part must be composed of the following characters: + a-zA-Z0-9#_~!$&'()*+,;=:.- + - The domain-part must be a valid hostname, composed of: + a-zA-Z0-9. + + Parameters + ---------- + email: str + email to validate + + Returns + ------- + bool + Whether or not the email is valid + """ + # Do not accept email addresses that have unicode characters + try: + email.encode('ascii') + except UnicodeError: + return False + + # we are not allowing quoted strings in the email address + if '"' in email: + return False + + # Must have exactly 1 @ symbol + if email.count('@') != 1: + return False + + local_part, domain_part = email.split('@') + + # Neither part can be blank + if not (local_part and domain_part): + return False + + # The local part cannot begin or end with a dot + if local_part.startswith('.') or local_part.endswith('.'): + return False + + # The domain part cannot begin or end with a hyphen + if domain_part.startswith('-') or domain_part.endswith('-'): + return False + + # This is the full set of allowable characters for the local part. + local_valid_chars = "[a-zA-Z0-9#_~!$&'()*+,;=:.-]" + if len(sub(local_valid_chars, '', local_part)): + return False + + domain_valid_chars = "[a-zA-Z0-9.-]" + if len(sub(domain_valid_chars, '', domain_part)): + return False + + return True + + +def validate_password(password): + """Validates a password + + Notes + ----- + The valid characters for a password are: + + * lowercase letters + * uppercase letters + * numbers + * special characters (e.g., !@#$%^&*()-_=+`~[]{}|;:'",<.>/?) with the + exception of a backslash + * must be ASCII + * no spaces + * must be at least 8 characters + + Parameters + ---------- + password: str + Password to validate + + Returns + ------- + bool + Whether or not the password is valid + + References + ----- + http://stackoverflow.com/q/196345 + """ + if len(password) < 8: + return False + + if "\\" in password or " " in password: + return False + + try: + password.encode('ascii') + except UnicodeError: + return False + + return True