import testinfra.utils.ansible_runner import re def test_packages(host): package = host.package('influxdb') assert package.is_installed def test_config_file(host): path = host.file('/etc/influxdb/influxdb.conf') assert path.exists assert path.is_file assert path.user == 'root' assert path.group == 'root' assert path.mode == 0o644 assert path.contains('enabled = true') def test_service(host): service = host.service('influxdb') assert service.is_running assert service.is_enabled def test_sockets(host): socket = host.socket('tcp://0.0.0.0:8086') assert socket.is_listening socket = host.socket('udp://0.0.0.0:25826') assert socket.is_listening def test_users(host): cmd = host.run('influx -execute "SHOW USERS"') assert cmd.succeeded assert re.search(r'admin\s+true', cmd.stdout) assert re.search(r'test\s+false', cmd.stdout) assert not re.search('user_absent', cmd.stdout) def test_databases(host): cmd = host.run('influx -execute "SHOW DATABASES"') assert cmd.succeeded assert 'test_db' in cmd.stdout def test_databases(host): cmd = host.run('influx -execute "SHOW RETENTION POLICIES ON test_db"') assert cmd.succeeded assert re.search(r'default\s+24h0m0s\s+1h0m0s\s+1\s+true', cmd.stdout) def test_grants(host): cmd = host.run('influx -execute "SHOW GRANTS FOR test"') assert cmd.succeeded assert re.search(r'test_db\s+WRITE', cmd.stdout)