--- a +++ b/qiita_db/test/test_user.py @@ -0,0 +1,581 @@ +# -*- coding: utf-8 -*- + +# ----------------------------------------------------------------------------- +# Copyright (c) 2014--, The Qiita Development Team. +# +# Distributed under the terms of the BSD 3-clause License. +# +# The full license is in the file LICENSE, distributed with this software. +# ----------------------------------------------------------------------------- + +from unittest import TestCase, main +from datetime import datetime, timedelta + +from qiita_core.exceptions import (IncorrectEmailError, IncorrectPasswordError, + IncompetentQiitaDeveloperError) +from qiita_core.util import qiita_test_checker +from qiita_core.qiita_settings import qiita_config +import qiita_db as qdb + + +class SupportTests(TestCase): + def test_validate_password(self): + valid1 = 'abcdefgh' + valid2 = 'abcdefgh1234' + valid3 = 'abcdefgh!@#$' + valid4 = 'aBC123!@#{}' + invalid1 = 'abc' + invalid2 = u'øabcdefghi' + invalid3 = 'abcd efgh' + + self.assertTrue(qdb.user.validate_password(valid1)) + self.assertTrue(qdb.user.validate_password(valid2)) + self.assertTrue(qdb.user.validate_password(valid3)) + self.assertTrue(qdb.user.validate_password(valid4)) + self.assertFalse(qdb.user.validate_password(invalid1)) + self.assertFalse(qdb.user.validate_password(invalid2)) + self.assertFalse(qdb.user.validate_password(invalid3)) + + def test_validate_email(self): + valid1 = 'foo@bar.com' + valid2 = 'asdasd.asdasd.asd123asd@stuff.edu' + valid3 = 'w00t@123.456.789.com' + valid4 = 'name@a.b-c.d' + invalid1 = '@stuff.com' + invalid2 = 'asdasdásd@things.com' + invalid3 = '.asdas@com' + invalid4 = 'name@a.b-c.d-' + + self.assertTrue(qdb.user.validate_email(valid1)) + self.assertTrue(qdb.user.validate_email(valid2)) + self.assertTrue(qdb.user.validate_email(valid3)) + self.assertTrue(qdb.user.validate_email(valid4)) + self.assertFalse(qdb.user.validate_email(invalid1)) + self.assertFalse(qdb.user.validate_email(invalid2)) + self.assertFalse(qdb.user.validate_email(invalid3)) + self.assertFalse(qdb.user.validate_email(invalid4)) + + +@qiita_test_checker() +class UserTest(TestCase): + """Tests the User object and all properties/methods""" + + def setUp(self): + self.user = qdb.user.User('admin@foo.bar') + self.portal = qiita_config.portal + + self.userinfo = { + 'name': 'Dude', + 'affiliation': 'Nowhere University', + 'address': '123 fake st, Apt 0, Faketown, CO 80302', + 'phone': '111-222-3344', + 'pass_reset_code': None, + 'pass_reset_timestamp': None, + 'user_verify_code': None, + 'receive_processing_job_emails': True, + 'social_orcid': None, + 'social_researchgate': None, + 'social_googlescholar': None, + 'creation_timestamp': datetime(2015, 12, 3, 13, 52, 42, 751331) + } + + def tearDown(self): + qiita_config.portal = self.portal + + def test_instantiate_user(self): + qdb.user.User('admin@foo.bar') + + def test_instantiate_unknown_user(self): + with self.assertRaises(qdb.exceptions.QiitaDBUnknownIDError): + qdb.user.User('FAIL@OMG.bar') + + def _check_correct_info(self, obs, exp, ts_before=None): + """Compares info dict of user with special handling of specific keys. + + Parameters + ---------- + obs : dict + Observed user info dictionary. + exp : dict + Expected user info dictionary. + ts_before : datetime.datetime or None + User.create records the creation timestamp through SQL's NOW(). + Since it is set by the database to the microsecond, we can't + predict it a priori and therefore simply record timestamp before + execution of user.create() and compare the relation. + The DB creation_timestamp column is optional, i.e. can be None. + """ + self.assertEqual(set(exp.keys()), set(obs.keys())) + for key in exp: + # user_verify_code and password seed randomly generated so just + # making sure they exist and is correct length + if key == 'user_verify_code': + self.assertEqual(len(obs[key]), 20) + elif key == "password": + self.assertEqual(len(obs[key]), 60) + elif key == "creation_timestamp": + self.assertTrue(((exp[key] is None) and (obs[key] is None)) + or (ts_before <= exp[key])) + else: + self.assertEqual(obs[key], exp[key]) + + def test_create_user(self): + before = datetime.now() + user = qdb.user.User.create('testcreateuser@test.bar', 'password') + + # adding a couple of messages + qdb.util.add_system_message("TESTMESSAGE_OLD", datetime.now()) + qdb.util.add_system_message( + "TESTMESSAGE", datetime.now() + timedelta(milliseconds=1)) + + self.assertEqual(user.id, 'testcreateuser@test.bar') + sql = """SELECT * + FROM qiita.qiita_user + WHERE email = 'testcreateuser@test.bar'""" + with qdb.sql_connection.TRN: + qdb.sql_connection.TRN.add(sql) + obs = qdb.sql_connection.TRN.execute_fetchindex() + self.assertEqual(len(obs), 1) + obs = dict(obs[0]) + exp = { + 'password': '', + 'name': None, + 'pass_reset_timestamp': None, + 'affiliation': None, + 'pass_reset_code': None, + 'phone': None, + 'user_verify_code': '', + 'address': None, + 'user_level_id': 5, + 'receive_processing_job_emails': False, + 'email': 'testcreateuser@test.bar', + 'social_orcid': None, + 'social_researchgate': None, + 'social_googlescholar': None, + 'creation_timestamp': datetime.now()} + self._check_correct_info(obs, exp, before) + + # Make sure new system messages are linked to user + sql = """SELECT message_id FROM qiita.message_user + WHERE email = 'testcreateuser@test.bar'""" + m_id = qdb.util.get_count('qiita.message') + # the user should have the latest message (m_id) and the one before + with qdb.sql_connection.TRN: + qdb.sql_connection.TRN.add(sql) + obs = qdb.sql_connection.TRN.execute_fetchindex() + self.assertEqual(obs, [[m_id-1], [m_id]]) + qdb.util.clear_system_messages() + + def test_create_user_info(self): + before = datetime.now() + user = qdb.user.User.create('testcreateuserinfo@test.bar', 'password', + self.userinfo) + self.assertEqual(user.id, 'testcreateuserinfo@test.bar') + sql = """SELECT * + FROM qiita.qiita_user + WHERE email = 'testcreateuserinfo@test.bar'""" + with qdb.sql_connection.TRN: + qdb.sql_connection.TRN.add(sql) + obs = qdb.sql_connection.TRN.execute_fetchindex() + self.assertEqual(len(obs), 1) + obs = dict(obs[0]) + exp = { + 'password': '', + 'name': 'Dude', + 'affiliation': 'Nowhere University', + 'address': '123 fake st, Apt 0, Faketown, CO 80302', + 'phone': '111-222-3344', + 'pass_reset_timestamp': None, + 'pass_reset_code': None, + 'user_verify_code': '', + 'user_level_id': 5, + 'receive_processing_job_emails': True, + 'email': 'testcreateuserinfo@test.bar', + 'social_orcid': None, + 'social_researchgate': None, + 'social_googlescholar': None, + 'creation_timestamp': datetime.now()} + self._check_correct_info(obs, exp, before) + + def test_create_user_column_not_allowed(self): + self.userinfo["email"] = "FAIL" + with self.assertRaises(qdb.exceptions.QiitaDBColumnError): + qdb.user.User.create('new@test.bar', 'password', self.userinfo) + + def test_create_user_non_existent_column(self): + self.userinfo["BADTHING"] = "FAIL" + with self.assertRaises(qdb.exceptions.QiitaDBColumnError): + qdb.user.User.create('new@test.bar', 'password', self.userinfo) + + def test_create_user_duplicate(self): + with self.assertRaises(qdb.exceptions.QiitaDBDuplicateError): + qdb.user.User.create('test@foo.bar', 'password') + + def test_create_user_bad_email(self): + with self.assertRaises(IncorrectEmailError): + qdb.user.User.create('notanemail', 'password') + + def test_create_user_bad_password(self): + with self.assertRaises(IncorrectPasswordError): + qdb.user.User.create('new@test.com', '') + + def test_login(self): + self.assertEqual(qdb.user.User.login("test@foo.bar", "password"), + qdb.user.User("test@foo.bar")) + + def test_login_incorrect_user(self): + with self.assertRaises(IncorrectEmailError): + qdb.user.User.login("notexist@foo.bar", "password") + + def test_login_incorrect_password(self): + with self.assertRaises(IncorrectPasswordError): + qdb.user.User.login("test@foo.bar", "WRONGPASSWORD") + + def test_login_invalid_password(self): + with self.assertRaises(IncorrectPasswordError): + qdb.user.User.login("test@foo.bar", "SHORT") + + def test_exists(self): + self.assertTrue(qdb.user.User.exists("test@foo.bar")) + + def test_exists_notindb(self): + self.assertFalse(qdb.user.User.exists("notexist@foo.bar")) + + def test_exists_invalid_email(self): + with self.assertRaises(IncorrectEmailError): + qdb.user.User.exists("notanemail.@badformat") + + def test_get_email(self): + self.assertEqual(self.user.email, 'admin@foo.bar') + + def test_get_level(self): + self.assertEqual(self.user.level, "admin") + + def test_get_info(self): + expinfo = { + 'name': 'Admin', + 'affiliation': 'Owner University', + 'address': '312 noname st, Apt K, Nonexistantown, CO 80302', + 'phone': '222-444-6789', + 'pass_reset_code': None, + 'pass_reset_timestamp': None, + 'user_verify_code': None, + 'receive_processing_job_emails': False, + 'phone': '222-444-6789', + 'social_orcid': None, + 'social_researchgate': None, + 'social_googlescholar': None, + 'creation_timestamp': datetime(2015, 12, 3, 13, 52, 42, 751331) + } + + # test database is re-populated during testing several times. + # Creation_timestamp depends on the percise timing of the repopulation, + # i.e. we cannot predict its value. We just test that this date should + # be within an hour and now. For the remainder of tests, we update + # our expectation. + self.assertTrue(datetime.now() - timedelta(hours=1) < + self.user.info['creation_timestamp'] < + datetime.now()) + expinfo['creation_timestamp'] = self.user.info['creation_timestamp'] + + self.assertEqual(self.user.info, expinfo) + + def test_set_info(self): + self.user.info = self.userinfo + self.assertEqual(self.user.info, self.userinfo) + + def test_set_info_not_info(self): + """Tests setting info with a non-allowed column""" + self.userinfo["email"] = "FAIL" + with self.assertRaises(qdb.exceptions.QiitaDBColumnError): + self.user.info = self.userinfo + + def test_set_info_bad_info(self): + """Test setting info with a key not in the table""" + self.userinfo["BADTHING"] = "FAIL" + with self.assertRaises(qdb.exceptions.QiitaDBColumnError): + self.user.info = self.userinfo + + def test_default_analysis(self): + qiita_config.portal = "QIITA" + obs = self.user.default_analysis + self.assertEqual(obs, qdb.analysis.Analysis(4)) + + qiita_config.portal = "EMP" + obs = self.user.default_analysis + self.assertEqual(obs, qdb.analysis.Analysis(8)) + + def test_get_user_studies(self): + user = qdb.user.User('test@foo.bar') + qiita_config.portal = "QIITA" + self.assertEqual(user.user_studies, {qdb.study.Study(1)}) + + qiita_config.portal = "EMP" + self.assertEqual(user.user_studies, set()) + + def test_get_shared_studies(self): + user = qdb.user.User('shared@foo.bar') + qiita_config.portal = "QIITA" + self.assertEqual(user.shared_studies, {qdb.study.Study(1)}) + + qiita_config.portal = "EMP" + self.assertEqual(user.shared_studies, set()) + + def test_get_private_analyses(self): + user = qdb.user.User('test@foo.bar') + qiita_config.portal = "QIITA" + exp = {qdb.analysis.Analysis(1)} + self.assertEqual(user.private_analyses, exp) + + qiita_config.portal = "EMP" + self.assertEqual(user.private_analyses, set()) + + def test_get_shared_analyses(self): + user = qdb.user.User('shared@foo.bar') + qiita_config.portal = "QIITA" + self.assertEqual(user.shared_analyses, {qdb.analysis.Analysis(1)}) + + qiita_config.portal = "EMP" + self.assertEqual(user.shared_analyses, set()) + + def test_verify_code(self): + email = 'testverifycode@test.bar' + qdb.user.User.create(email, 'password') + # making sure that we know the user codes + sql = """UPDATE qiita.qiita_user SET + user_verify_code='verifycode', + pass_reset_code='resetcode' + WHERE email=%s""" + qdb.sql_connection.perform_as_transaction(sql, [email]) + + self.assertFalse( + qdb.user.User.verify_code(email, 'wrongcode', 'create')) + self.assertFalse( + qdb.user.User.verify_code(email, 'wrongcode', 'reset')) + + self.assertTrue( + qdb.user.User.verify_code(email, 'verifycode', 'create')) + self.assertTrue( + qdb.user.User.verify_code(email, 'resetcode', 'reset')) + + # make sure errors raised if code already used or wrong type + with self.assertRaises(qdb.exceptions.QiitaDBError): + qdb.user.User.verify_code(email, 'verifycode', 'create') + with self.assertRaises(qdb.exceptions.QiitaDBError): + qdb.user.User.verify_code(email, 'resetcode', 'reset') + + with self.assertRaises(IncompetentQiitaDeveloperError): + qdb.user.User.verify_code(email, 'fakecode', 'badtype') + + # make sure default analyses created + sql = ("SELECT email, name, description, dflt FROM qiita.analysis " + "WHERE email = %s") + with qdb.sql_connection.TRN: + qdb.sql_connection.TRN.add(sql, [email]) + obs = qdb.sql_connection.TRN.execute_fetchindex() + exp = [[email, 'testverifycode@test.bar-dflt-2', 'dflt', True], + [email, 'testverifycode@test.bar-dflt-1', 'dflt', True]] + self.assertEqual(obs, exp) + + # Make sure default analyses are linked with the portal + sql = """SELECT COUNT(1) + FROM qiita.analysis + JOIN qiita.analysis_portal USING (analysis_id) + JOIN qiita.portal_type USING (portal_type_id) + WHERE email = 'testverifycode@test.bar' AND dflt = true""" + with qdb.sql_connection.TRN: + qdb.sql_connection.TRN.add(sql) + obs = qdb.sql_connection.TRN.execute_fetchflatten()[0] + self.assertEqual(obs, 2) + + def _check_pass(self, user, passwd): + self.assertEqual(qdb.util.hash_password(passwd, user.password), + user.password) + + def test_password(self): + user = qdb.user.User('shared@foo.bar') + self.assertEqual(user.password, '$2a$12$gnUi8Qg.0tvW243v889BhOBhWLIHy' + 'IJjjgaG6dxuRJkUM8nXG9Efe') + + def test_change_pass(self): + user = qdb.user.User.create('testchangepass@test.bar', 'password') + user._change_pass("newpassword") + self._check_pass(user, "newpassword") + self.assertIsNone(user.info["pass_reset_code"]) + + def test_change_pass_short(self): + with self.assertRaises(IncorrectPasswordError): + self.user._change_pass("newpass") + self._check_pass(self.user, "password") + + def test_change_password(self): + self.user.change_password("password", "newpassword") + self._check_pass(self.user, "newpassword") + + def test_change_password_wrong_oldpass(self): + user = qdb.user.User.create('changepasswrongold@test.bar', 'password') + user.change_password("WRONG", "newpass") + self._check_pass(user, "password") + + def test_generate_reset_code(self): + user = qdb.user.User.create('new@test.bar', 'password') + sql = "SELECT LOCALTIMESTAMP" + with qdb.sql_connection.TRN: + qdb.sql_connection.TRN.add(sql) + before = qdb.sql_connection.TRN.execute_fetchflatten()[0] + user.generate_reset_code() + with qdb.sql_connection.TRN: + qdb.sql_connection.TRN.add(sql) + after = qdb.sql_connection.TRN.execute_fetchflatten()[0] + sql = ("SELECT pass_reset_code, pass_reset_timestamp FROM " + "qiita.qiita_user WHERE email = %s") + qdb.sql_connection.TRN.add(sql, ('new@test.bar',)) + obscode, obstime = qdb.sql_connection.TRN.execute_fetchindex()[0] + self.assertEqual(len(obscode), 20) + self.assertTrue(before < obstime < after) + + def test_change_forgot_password(self): + user = qdb.user.User.create( + 'changeforgotpassword@test.bar', 'password') + user.generate_reset_code() + code = user.info["pass_reset_code"] + obsbool = user.change_forgot_password(code, "newpassword") + self.assertEqual(obsbool, True) + self._check_pass(user, "newpassword") + + def test_change_forgot_password_bad_code(self): + user = qdb.user.User.create('badcode@test.bar', 'password') + user.generate_reset_code() + code = "AAAAAAA" + obsbool = user.change_forgot_password(code, "newpassword") + self.assertEqual(obsbool, False) + self._check_pass(user, "password") + + def test_messages(self): + qdb.util.add_system_message('SYS MESSAGE', datetime.now()) + user = qdb.user.User('test@foo.bar') + obs = user.messages() + exp_msg = [ + 'SYS MESSAGE', 'message 1', + 'Lorem ipsum dolor sit amet, consectetur adipiscing elit. ' + 'Pellentesque sed auctor ex, non placerat sapien. Vestibulum ' + 'vestibulum massa ut sapien condimentum, cursus consequat diam' + ' sodales. Nulla aliquam arcu ut massa auctor, et vehicula ' + 'mauris tempor. In lacinia viverra ante quis pellentesque. ' + 'Nunc vel mi accumsan, porttitor eros ut, pharetra elit. Nulla' + ' ac nisi quis dui egestas malesuada vitae ut mauris. Morbi ' + 'blandit non nisl a finibus. In erat velit, congue at ipsum ' + 'sit amet, venenatis bibendum sem. Curabitur vel odio sed est ' + 'rutrum rutrum. Quisque efficitur ut purus in ultrices. ' + 'Pellentesque eu auctor justo.', 'message <a href="#">3</a>'] + self.assertCountEqual([(x[1]) for x in obs], exp_msg) + self.assertTrue(all(x[2] < datetime.now() for x in obs)) + self.assertFalse(all(x[3] for x in obs)) + self.assertEqual([x[4] for x in obs], [True, False, False, False]) + + obs = user.messages(1) + exp_msg = ['SYS MESSAGE'] + self.assertEqual([x[1] for x in obs], exp_msg) + + def test_mark_messages(self): + user = qdb.user.User('test@foo.bar') + user.mark_messages([1, 2]) + obs = user.messages() + exp = [True, True, False] + self.assertEqual([x[3] for x in obs], exp) + + user.mark_messages([1], read=False) + obs = user.messages() + exp = [False, True, False] + self.assertEqual([x[3] for x in obs], exp) + + def test_delete_messages(self): + user = qdb.user.User.create('deletemsg@test.bar', 'password') + self.assertEqual(user.messages(), []) + qdb.util.add_message("New message", [user]) + user_msgs = user.messages() + # Magic number 1: the actual message + self.assertEqual([msg[1] for msg in user_msgs], ["New message"]) + # Magic numbers [0][0] - there is only one message and the first + # element of that message is the message id + user.delete_messages([user_msgs[0][0]]) + self.assertEqual([msg[1] for msg in user.messages()], []) + + def test_user_artifacts(self): + user = qdb.user.User('test@foo.bar') + obs = user.user_artifacts() + exp = {qdb.study.Study(1): [qdb.artifact.Artifact(1), + qdb.artifact.Artifact(2), + qdb.artifact.Artifact(3), + qdb.artifact.Artifact(4), + qdb.artifact.Artifact(5), + qdb.artifact.Artifact(6), + qdb.artifact.Artifact(7)]} + self.assertEqual(obs, exp) + obs = user.user_artifacts(artifact_type='BIOM') + exp = {qdb.study.Study(1): [qdb.artifact.Artifact(4), + qdb.artifact.Artifact(5), + qdb.artifact.Artifact(6), + qdb.artifact.Artifact(7)]} + self.assertEqual(obs, exp) + + def test_jobs(self): + PJ = qdb.processing_job.ProcessingJob + ignore_status = [] + # generates expected jobs + jobs = qdb.user.User('shared@foo.bar').jobs( + ignore_status=ignore_status) + self.assertEqual(jobs, [PJ('b72369f9-a886-4193-8d3d-f7b504168e75')]) + + jobs = qdb.user.User('shared@foo.bar').jobs( + ignore_status=ignore_status, show_hidden=True) + self.assertEqual(jobs, [ + PJ('d19f76ee-274e-4c1b-b3a2-a12d73507c55'), + PJ('b72369f9-a886-4193-8d3d-f7b504168e75')]) + + # just one job + self.assertEqual(qdb.user.User('shared@foo.bar').jobs( + limit=1, ignore_status=ignore_status), [ + PJ('b72369f9-a886-4193-8d3d-f7b504168e75')]) + + # generates expected jobs + jobs = qdb.user.User('shared@foo.bar').jobs() + self.assertEqual(jobs, []) + + def test_update_email(self): + user = qdb.user.User('shared@foo.bar') + with self.assertRaisesRegex(IncorrectEmailError, 'Bad email given:'): + user.update_email('bladfa.adferqerq@$EWE') + + with self.assertRaisesRegex(IncorrectEmailError, + 'This email already exists'): + user.update_email('test@foo.bar') + + user.update_email('bla@ble.bli') + + def test_slurm_parameters(self): + self.assertEqual(qdb.user.User('shared@foo.bar').slurm_parameters, + '--nice=10000') + self.assertEqual(qdb.user.User('admin@foo.bar').slurm_parameters, + '--nice=5000') + + +@qiita_test_checker() +class DeleteUser(TestCase): + def test_delete_users(self): + # let's start with the errors + error = 'This email does not exist: x@y.z' + with self.assertRaisesRegex(IncorrectEmailError, error): + qdb.user.User.delete('x@y.z') + + with self.assertRaises(ValueError): + qdb.user.User.delete('shared@foo.bar') + + qdb.user.User.delete('shared@foo.bar', True) + # verify that the user doesn't exist any more + with self.assertRaises(qdb.exceptions.QiitaDBUnknownIDError): + qdb.user.User('shared@foo.bar') + + +if __name__ == "__main__": + main()