# -*- coding: utf-8 -*-
# -----------------------------------------------------------------------------
# Copyright (c) 2014--, The Qiita Development Team.
#
# Distributed under the terms of the BSD 3-clause License.
#
# The full license is in the file LICENSE, distributed with this software.
# -----------------------------------------------------------------------------
from unittest import TestCase, main
from datetime import datetime, timedelta
from qiita_core.exceptions import (IncorrectEmailError, IncorrectPasswordError,
IncompetentQiitaDeveloperError)
from qiita_core.util import qiita_test_checker
from qiita_core.qiita_settings import qiita_config
import qiita_db as qdb
class SupportTests(TestCase):
def test_validate_password(self):
valid1 = 'abcdefgh'
valid2 = 'abcdefgh1234'
valid3 = 'abcdefgh!@#$'
valid4 = 'aBC123!@#{}'
invalid1 = 'abc'
invalid2 = u'øabcdefghi'
invalid3 = 'abcd efgh'
self.assertTrue(qdb.user.validate_password(valid1))
self.assertTrue(qdb.user.validate_password(valid2))
self.assertTrue(qdb.user.validate_password(valid3))
self.assertTrue(qdb.user.validate_password(valid4))
self.assertFalse(qdb.user.validate_password(invalid1))
self.assertFalse(qdb.user.validate_password(invalid2))
self.assertFalse(qdb.user.validate_password(invalid3))
def test_validate_email(self):
valid1 = 'foo@bar.com'
valid2 = 'asdasd.asdasd.asd123asd@stuff.edu'
valid3 = 'w00t@123.456.789.com'
valid4 = 'name@a.b-c.d'
invalid1 = '@stuff.com'
invalid2 = 'asdasdásd@things.com'
invalid3 = '.asdas@com'
invalid4 = 'name@a.b-c.d-'
self.assertTrue(qdb.user.validate_email(valid1))
self.assertTrue(qdb.user.validate_email(valid2))
self.assertTrue(qdb.user.validate_email(valid3))
self.assertTrue(qdb.user.validate_email(valid4))
self.assertFalse(qdb.user.validate_email(invalid1))
self.assertFalse(qdb.user.validate_email(invalid2))
self.assertFalse(qdb.user.validate_email(invalid3))
self.assertFalse(qdb.user.validate_email(invalid4))
@qiita_test_checker()
class UserTest(TestCase):
"""Tests the User object and all properties/methods"""
def setUp(self):
self.user = qdb.user.User('admin@foo.bar')
self.portal = qiita_config.portal
self.userinfo = {
'name': 'Dude',
'affiliation': 'Nowhere University',
'address': '123 fake st, Apt 0, Faketown, CO 80302',
'phone': '111-222-3344',
'pass_reset_code': None,
'pass_reset_timestamp': None,
'user_verify_code': None,
'receive_processing_job_emails': True,
'social_orcid': None,
'social_researchgate': None,
'social_googlescholar': None,
'creation_timestamp': datetime(2015, 12, 3, 13, 52, 42, 751331)
}
def tearDown(self):
qiita_config.portal = self.portal
def test_instantiate_user(self):
qdb.user.User('admin@foo.bar')
def test_instantiate_unknown_user(self):
with self.assertRaises(qdb.exceptions.QiitaDBUnknownIDError):
qdb.user.User('FAIL@OMG.bar')
def _check_correct_info(self, obs, exp, ts_before=None):
"""Compares info dict of user with special handling of specific keys.
Parameters
----------
obs : dict
Observed user info dictionary.
exp : dict
Expected user info dictionary.
ts_before : datetime.datetime or None
User.create records the creation timestamp through SQL's NOW().
Since it is set by the database to the microsecond, we can't
predict it a priori and therefore simply record timestamp before
execution of user.create() and compare the relation.
The DB creation_timestamp column is optional, i.e. can be None.
"""
self.assertEqual(set(exp.keys()), set(obs.keys()))
for key in exp:
# user_verify_code and password seed randomly generated so just
# making sure they exist and is correct length
if key == 'user_verify_code':
self.assertEqual(len(obs[key]), 20)
elif key == "password":
self.assertEqual(len(obs[key]), 60)
elif key == "creation_timestamp":
self.assertTrue(((exp[key] is None) and (obs[key] is None))
or (ts_before <= exp[key]))
else:
self.assertEqual(obs[key], exp[key])
def test_create_user(self):
before = datetime.now()
user = qdb.user.User.create('testcreateuser@test.bar', 'password')
# adding a couple of messages
qdb.util.add_system_message("TESTMESSAGE_OLD", datetime.now())
qdb.util.add_system_message(
"TESTMESSAGE", datetime.now() + timedelta(milliseconds=1))
self.assertEqual(user.id, 'testcreateuser@test.bar')
sql = """SELECT *
FROM qiita.qiita_user
WHERE email = 'testcreateuser@test.bar'"""
with qdb.sql_connection.TRN:
qdb.sql_connection.TRN.add(sql)
obs = qdb.sql_connection.TRN.execute_fetchindex()
self.assertEqual(len(obs), 1)
obs = dict(obs[0])
exp = {
'password': '',
'name': None,
'pass_reset_timestamp': None,
'affiliation': None,
'pass_reset_code': None,
'phone': None,
'user_verify_code': '',
'address': None,
'user_level_id': 5,
'receive_processing_job_emails': False,
'email': 'testcreateuser@test.bar',
'social_orcid': None,
'social_researchgate': None,
'social_googlescholar': None,
'creation_timestamp': datetime.now()}
self._check_correct_info(obs, exp, before)
# Make sure new system messages are linked to user
sql = """SELECT message_id FROM qiita.message_user
WHERE email = 'testcreateuser@test.bar'"""
m_id = qdb.util.get_count('qiita.message')
# the user should have the latest message (m_id) and the one before
with qdb.sql_connection.TRN:
qdb.sql_connection.TRN.add(sql)
obs = qdb.sql_connection.TRN.execute_fetchindex()
self.assertEqual(obs, [[m_id-1], [m_id]])
qdb.util.clear_system_messages()
def test_create_user_info(self):
before = datetime.now()
user = qdb.user.User.create('testcreateuserinfo@test.bar', 'password',
self.userinfo)
self.assertEqual(user.id, 'testcreateuserinfo@test.bar')
sql = """SELECT *
FROM qiita.qiita_user
WHERE email = 'testcreateuserinfo@test.bar'"""
with qdb.sql_connection.TRN:
qdb.sql_connection.TRN.add(sql)
obs = qdb.sql_connection.TRN.execute_fetchindex()
self.assertEqual(len(obs), 1)
obs = dict(obs[0])
exp = {
'password': '',
'name': 'Dude',
'affiliation': 'Nowhere University',
'address': '123 fake st, Apt 0, Faketown, CO 80302',
'phone': '111-222-3344',
'pass_reset_timestamp': None,
'pass_reset_code': None,
'user_verify_code': '',
'user_level_id': 5,
'receive_processing_job_emails': True,
'email': 'testcreateuserinfo@test.bar',
'social_orcid': None,
'social_researchgate': None,
'social_googlescholar': None,
'creation_timestamp': datetime.now()}
self._check_correct_info(obs, exp, before)
def test_create_user_column_not_allowed(self):
self.userinfo["email"] = "FAIL"
with self.assertRaises(qdb.exceptions.QiitaDBColumnError):
qdb.user.User.create('new@test.bar', 'password', self.userinfo)
def test_create_user_non_existent_column(self):
self.userinfo["BADTHING"] = "FAIL"
with self.assertRaises(qdb.exceptions.QiitaDBColumnError):
qdb.user.User.create('new@test.bar', 'password', self.userinfo)
def test_create_user_duplicate(self):
with self.assertRaises(qdb.exceptions.QiitaDBDuplicateError):
qdb.user.User.create('test@foo.bar', 'password')
def test_create_user_bad_email(self):
with self.assertRaises(IncorrectEmailError):
qdb.user.User.create('notanemail', 'password')
def test_create_user_bad_password(self):
with self.assertRaises(IncorrectPasswordError):
qdb.user.User.create('new@test.com', '')
def test_login(self):
self.assertEqual(qdb.user.User.login("test@foo.bar", "password"),
qdb.user.User("test@foo.bar"))
def test_login_incorrect_user(self):
with self.assertRaises(IncorrectEmailError):
qdb.user.User.login("notexist@foo.bar", "password")
def test_login_incorrect_password(self):
with self.assertRaises(IncorrectPasswordError):
qdb.user.User.login("test@foo.bar", "WRONGPASSWORD")
def test_login_invalid_password(self):
with self.assertRaises(IncorrectPasswordError):
qdb.user.User.login("test@foo.bar", "SHORT")
def test_exists(self):
self.assertTrue(qdb.user.User.exists("test@foo.bar"))
def test_exists_notindb(self):
self.assertFalse(qdb.user.User.exists("notexist@foo.bar"))
def test_exists_invalid_email(self):
with self.assertRaises(IncorrectEmailError):
qdb.user.User.exists("notanemail.@badformat")
def test_get_email(self):
self.assertEqual(self.user.email, 'admin@foo.bar')
def test_get_level(self):
self.assertEqual(self.user.level, "admin")
def test_get_info(self):
expinfo = {
'name': 'Admin',
'affiliation': 'Owner University',
'address': '312 noname st, Apt K, Nonexistantown, CO 80302',
'phone': '222-444-6789',
'pass_reset_code': None,
'pass_reset_timestamp': None,
'user_verify_code': None,
'receive_processing_job_emails': False,
'phone': '222-444-6789',
'social_orcid': None,
'social_researchgate': None,
'social_googlescholar': None,
'creation_timestamp': datetime(2015, 12, 3, 13, 52, 42, 751331)
}
# test database is re-populated during testing several times.
# Creation_timestamp depends on the percise timing of the repopulation,
# i.e. we cannot predict its value. We just test that this date should
# be within an hour and now. For the remainder of tests, we update
# our expectation.
self.assertTrue(datetime.now() - timedelta(hours=1) <
self.user.info['creation_timestamp'] <
datetime.now())
expinfo['creation_timestamp'] = self.user.info['creation_timestamp']
self.assertEqual(self.user.info, expinfo)
def test_set_info(self):
self.user.info = self.userinfo
self.assertEqual(self.user.info, self.userinfo)
def test_set_info_not_info(self):
"""Tests setting info with a non-allowed column"""
self.userinfo["email"] = "FAIL"
with self.assertRaises(qdb.exceptions.QiitaDBColumnError):
self.user.info = self.userinfo
def test_set_info_bad_info(self):
"""Test setting info with a key not in the table"""
self.userinfo["BADTHING"] = "FAIL"
with self.assertRaises(qdb.exceptions.QiitaDBColumnError):
self.user.info = self.userinfo
def test_default_analysis(self):
qiita_config.portal = "QIITA"
obs = self.user.default_analysis
self.assertEqual(obs, qdb.analysis.Analysis(4))
qiita_config.portal = "EMP"
obs = self.user.default_analysis
self.assertEqual(obs, qdb.analysis.Analysis(8))
def test_get_user_studies(self):
user = qdb.user.User('test@foo.bar')
qiita_config.portal = "QIITA"
self.assertEqual(user.user_studies, {qdb.study.Study(1)})
qiita_config.portal = "EMP"
self.assertEqual(user.user_studies, set())
def test_get_shared_studies(self):
user = qdb.user.User('shared@foo.bar')
qiita_config.portal = "QIITA"
self.assertEqual(user.shared_studies, {qdb.study.Study(1)})
qiita_config.portal = "EMP"
self.assertEqual(user.shared_studies, set())
def test_get_private_analyses(self):
user = qdb.user.User('test@foo.bar')
qiita_config.portal = "QIITA"
exp = {qdb.analysis.Analysis(1)}
self.assertEqual(user.private_analyses, exp)
qiita_config.portal = "EMP"
self.assertEqual(user.private_analyses, set())
def test_get_shared_analyses(self):
user = qdb.user.User('shared@foo.bar')
qiita_config.portal = "QIITA"
self.assertEqual(user.shared_analyses, {qdb.analysis.Analysis(1)})
qiita_config.portal = "EMP"
self.assertEqual(user.shared_analyses, set())
def test_verify_code(self):
email = 'testverifycode@test.bar'
qdb.user.User.create(email, 'password')
# making sure that we know the user codes
sql = """UPDATE qiita.qiita_user SET
user_verify_code='verifycode',
pass_reset_code='resetcode'
WHERE email=%s"""
qdb.sql_connection.perform_as_transaction(sql, [email])
self.assertFalse(
qdb.user.User.verify_code(email, 'wrongcode', 'create'))
self.assertFalse(
qdb.user.User.verify_code(email, 'wrongcode', 'reset'))
self.assertTrue(
qdb.user.User.verify_code(email, 'verifycode', 'create'))
self.assertTrue(
qdb.user.User.verify_code(email, 'resetcode', 'reset'))
# make sure errors raised if code already used or wrong type
with self.assertRaises(qdb.exceptions.QiitaDBError):
qdb.user.User.verify_code(email, 'verifycode', 'create')
with self.assertRaises(qdb.exceptions.QiitaDBError):
qdb.user.User.verify_code(email, 'resetcode', 'reset')
with self.assertRaises(IncompetentQiitaDeveloperError):
qdb.user.User.verify_code(email, 'fakecode', 'badtype')
# make sure default analyses created
sql = ("SELECT email, name, description, dflt FROM qiita.analysis "
"WHERE email = %s")
with qdb.sql_connection.TRN:
qdb.sql_connection.TRN.add(sql, [email])
obs = qdb.sql_connection.TRN.execute_fetchindex()
exp = [[email, 'testverifycode@test.bar-dflt-2', 'dflt', True],
[email, 'testverifycode@test.bar-dflt-1', 'dflt', True]]
self.assertEqual(obs, exp)
# Make sure default analyses are linked with the portal
sql = """SELECT COUNT(1)
FROM qiita.analysis
JOIN qiita.analysis_portal USING (analysis_id)
JOIN qiita.portal_type USING (portal_type_id)
WHERE email = 'testverifycode@test.bar' AND dflt = true"""
with qdb.sql_connection.TRN:
qdb.sql_connection.TRN.add(sql)
obs = qdb.sql_connection.TRN.execute_fetchflatten()[0]
self.assertEqual(obs, 2)
def _check_pass(self, user, passwd):
self.assertEqual(qdb.util.hash_password(passwd, user.password),
user.password)
def test_password(self):
user = qdb.user.User('shared@foo.bar')
self.assertEqual(user.password, '$2a$12$gnUi8Qg.0tvW243v889BhOBhWLIHy'
'IJjjgaG6dxuRJkUM8nXG9Efe')
def test_change_pass(self):
user = qdb.user.User.create('testchangepass@test.bar', 'password')
user._change_pass("newpassword")
self._check_pass(user, "newpassword")
self.assertIsNone(user.info["pass_reset_code"])
def test_change_pass_short(self):
with self.assertRaises(IncorrectPasswordError):
self.user._change_pass("newpass")
self._check_pass(self.user, "password")
def test_change_password(self):
self.user.change_password("password", "newpassword")
self._check_pass(self.user, "newpassword")
def test_change_password_wrong_oldpass(self):
user = qdb.user.User.create('changepasswrongold@test.bar', 'password')
user.change_password("WRONG", "newpass")
self._check_pass(user, "password")
def test_generate_reset_code(self):
user = qdb.user.User.create('new@test.bar', 'password')
sql = "SELECT LOCALTIMESTAMP"
with qdb.sql_connection.TRN:
qdb.sql_connection.TRN.add(sql)
before = qdb.sql_connection.TRN.execute_fetchflatten()[0]
user.generate_reset_code()
with qdb.sql_connection.TRN:
qdb.sql_connection.TRN.add(sql)
after = qdb.sql_connection.TRN.execute_fetchflatten()[0]
sql = ("SELECT pass_reset_code, pass_reset_timestamp FROM "
"qiita.qiita_user WHERE email = %s")
qdb.sql_connection.TRN.add(sql, ('new@test.bar',))
obscode, obstime = qdb.sql_connection.TRN.execute_fetchindex()[0]
self.assertEqual(len(obscode), 20)
self.assertTrue(before < obstime < after)
def test_change_forgot_password(self):
user = qdb.user.User.create(
'changeforgotpassword@test.bar', 'password')
user.generate_reset_code()
code = user.info["pass_reset_code"]
obsbool = user.change_forgot_password(code, "newpassword")
self.assertEqual(obsbool, True)
self._check_pass(user, "newpassword")
def test_change_forgot_password_bad_code(self):
user = qdb.user.User.create('badcode@test.bar', 'password')
user.generate_reset_code()
code = "AAAAAAA"
obsbool = user.change_forgot_password(code, "newpassword")
self.assertEqual(obsbool, False)
self._check_pass(user, "password")
def test_messages(self):
qdb.util.add_system_message('SYS MESSAGE', datetime.now())
user = qdb.user.User('test@foo.bar')
obs = user.messages()
exp_msg = [
'SYS MESSAGE', 'message 1',
'Lorem ipsum dolor sit amet, consectetur adipiscing elit. '
'Pellentesque sed auctor ex, non placerat sapien. Vestibulum '
'vestibulum massa ut sapien condimentum, cursus consequat diam'
' sodales. Nulla aliquam arcu ut massa auctor, et vehicula '
'mauris tempor. In lacinia viverra ante quis pellentesque. '
'Nunc vel mi accumsan, porttitor eros ut, pharetra elit. Nulla'
' ac nisi quis dui egestas malesuada vitae ut mauris. Morbi '
'blandit non nisl a finibus. In erat velit, congue at ipsum '
'sit amet, venenatis bibendum sem. Curabitur vel odio sed est '
'rutrum rutrum. Quisque efficitur ut purus in ultrices. '
'Pellentesque eu auctor justo.', 'message <a href="#">3</a>']
self.assertCountEqual([(x[1]) for x in obs], exp_msg)
self.assertTrue(all(x[2] < datetime.now() for x in obs))
self.assertFalse(all(x[3] for x in obs))
self.assertEqual([x[4] for x in obs], [True, False, False, False])
obs = user.messages(1)
exp_msg = ['SYS MESSAGE']
self.assertEqual([x[1] for x in obs], exp_msg)
def test_mark_messages(self):
user = qdb.user.User('test@foo.bar')
user.mark_messages([1, 2])
obs = user.messages()
exp = [True, True, False]
self.assertEqual([x[3] for x in obs], exp)
user.mark_messages([1], read=False)
obs = user.messages()
exp = [False, True, False]
self.assertEqual([x[3] for x in obs], exp)
def test_delete_messages(self):
user = qdb.user.User.create('deletemsg@test.bar', 'password')
self.assertEqual(user.messages(), [])
qdb.util.add_message("New message", [user])
user_msgs = user.messages()
# Magic number 1: the actual message
self.assertEqual([msg[1] for msg in user_msgs], ["New message"])
# Magic numbers [0][0] - there is only one message and the first
# element of that message is the message id
user.delete_messages([user_msgs[0][0]])
self.assertEqual([msg[1] for msg in user.messages()], [])
def test_user_artifacts(self):
user = qdb.user.User('test@foo.bar')
obs = user.user_artifacts()
exp = {qdb.study.Study(1): [qdb.artifact.Artifact(1),
qdb.artifact.Artifact(2),
qdb.artifact.Artifact(3),
qdb.artifact.Artifact(4),
qdb.artifact.Artifact(5),
qdb.artifact.Artifact(6),
qdb.artifact.Artifact(7)]}
self.assertEqual(obs, exp)
obs = user.user_artifacts(artifact_type='BIOM')
exp = {qdb.study.Study(1): [qdb.artifact.Artifact(4),
qdb.artifact.Artifact(5),
qdb.artifact.Artifact(6),
qdb.artifact.Artifact(7)]}
self.assertEqual(obs, exp)
def test_jobs(self):
PJ = qdb.processing_job.ProcessingJob
ignore_status = []
# generates expected jobs
jobs = qdb.user.User('shared@foo.bar').jobs(
ignore_status=ignore_status)
self.assertEqual(jobs, [PJ('b72369f9-a886-4193-8d3d-f7b504168e75')])
jobs = qdb.user.User('shared@foo.bar').jobs(
ignore_status=ignore_status, show_hidden=True)
self.assertEqual(jobs, [
PJ('d19f76ee-274e-4c1b-b3a2-a12d73507c55'),
PJ('b72369f9-a886-4193-8d3d-f7b504168e75')])
# just one job
self.assertEqual(qdb.user.User('shared@foo.bar').jobs(
limit=1, ignore_status=ignore_status), [
PJ('b72369f9-a886-4193-8d3d-f7b504168e75')])
# generates expected jobs
jobs = qdb.user.User('shared@foo.bar').jobs()
self.assertEqual(jobs, [])
def test_update_email(self):
user = qdb.user.User('shared@foo.bar')
with self.assertRaisesRegex(IncorrectEmailError, 'Bad email given:'):
user.update_email('bladfa.adferqerq@$EWE')
with self.assertRaisesRegex(IncorrectEmailError,
'This email already exists'):
user.update_email('test@foo.bar')
user.update_email('bla@ble.bli')
def test_slurm_parameters(self):
self.assertEqual(qdb.user.User('shared@foo.bar').slurm_parameters,
'--nice=10000')
self.assertEqual(qdb.user.User('admin@foo.bar').slurm_parameters,
'--nice=5000')
@qiita_test_checker()
class DeleteUser(TestCase):
def test_delete_users(self):
# let's start with the errors
error = 'This email does not exist: x@y.z'
with self.assertRaisesRegex(IncorrectEmailError, error):
qdb.user.User.delete('x@y.z')
with self.assertRaises(ValueError):
qdb.user.User.delete('shared@foo.bar')
qdb.user.User.delete('shared@foo.bar', True)
# verify that the user doesn't exist any more
with self.assertRaises(qdb.exceptions.QiitaDBUnknownIDError):
qdb.user.User('shared@foo.bar')
if __name__ == "__main__":
main()