130 lines
2.8 KiB
Python
130 lines
2.8 KiB
Python
# coding=utf-8
|
|
import tempfile
|
|
import os
|
|
import pytest
|
|
from datetime import datetime
|
|
|
|
from tests import fixtures
|
|
from tsgrain_controller import database, util, task_queue, models
|
|
|
|
JOB1_DATA = {
|
|
'date': datetime(2022, 1, 10, 12, 30),
|
|
'duration': 60,
|
|
'enable': True,
|
|
'repeat': False,
|
|
'zones': [1, 3]
|
|
}
|
|
|
|
JOB2_DATA = {
|
|
'date': datetime(2022, 1, 11, 18, 0),
|
|
'duration': 300,
|
|
'enable': True,
|
|
'repeat': True,
|
|
'zones': [5]
|
|
}
|
|
|
|
|
|
@pytest.fixture
|
|
def db():
|
|
with tempfile.TemporaryDirectory() as td:
|
|
dbfile = os.path.join(td, 'raindb.json')
|
|
db = database.RainDB(dbfile)
|
|
|
|
job1 = models.Job.deserialize(JOB1_DATA)
|
|
job2 = models.Job.deserialize(JOB2_DATA)
|
|
|
|
db.insert_job(job1)
|
|
db.insert_job(job2)
|
|
|
|
assert job1.id == 1
|
|
assert job2.id == 2
|
|
|
|
yield db
|
|
db.close()
|
|
assert os.path.isfile(dbfile)
|
|
|
|
|
|
def test_get_jobs(db):
|
|
jobs = db.get_jobs()
|
|
assert len(jobs) == 2
|
|
assert jobs[0].serialize() == JOB1_DATA
|
|
assert jobs[1].serialize() == JOB2_DATA
|
|
|
|
|
|
def test_get_job(db):
|
|
job1 = db.get_job(1)
|
|
assert job1.id == 1
|
|
assert job1.serialize() == JOB1_DATA
|
|
|
|
job2 = db.get_job(2)
|
|
assert job2.id == 2
|
|
assert job2.serialize() == JOB2_DATA
|
|
|
|
with pytest.raises(KeyError):
|
|
db.get_job(3)
|
|
|
|
|
|
def test_insert_job(db):
|
|
job3 = models.Job(util.datetime_new(2022, 1, 15, 15, 30), 250, [6], True,
|
|
True)
|
|
assert db.insert_job(job3) == 3
|
|
assert job3.id == 3
|
|
|
|
|
|
def test_update_job(db):
|
|
job1 = db.get_job(1)
|
|
job1.zones = [1, 2, 3]
|
|
db.update_job(job1)
|
|
|
|
got_job1 = db.get_job(1)
|
|
assert got_job1.id == job1.id == 1
|
|
assert got_job1.zones == [1, 2, 3]
|
|
|
|
|
|
def test_delete_job(db):
|
|
db.delete_job(1)
|
|
|
|
with pytest.raises(KeyError):
|
|
db.get_job(1)
|
|
|
|
assert len(db.get_jobs()) == 1
|
|
|
|
with pytest.raises(KeyError):
|
|
db.delete_job(3)
|
|
|
|
|
|
def test_queue():
|
|
with tempfile.TemporaryDirectory() as td:
|
|
app = fixtures.TestingApp(td)
|
|
q = task_queue.TaskQueue(app)
|
|
|
|
task1 = models.Task(models.Source.MANUAL, 1, 10)
|
|
task2 = models.Task(models.Source.SCHEDULE, 1, 5)
|
|
|
|
assert q.enqueue(task1)
|
|
assert q.enqueue(task2)
|
|
|
|
dbfile = os.path.join(td, 'raindb.json')
|
|
app = fixtures.TestingApp(dbfile)
|
|
|
|
db = database.RainDB(dbfile)
|
|
db.store_queue(q)
|
|
|
|
# Reopen database
|
|
db.close()
|
|
db = database.RainDB(dbfile)
|
|
|
|
read_queue = task_queue.TaskQueue(app)
|
|
db.load_queue(read_queue)
|
|
|
|
assert util.to_json(
|
|
read_queue) == '[{"source": "MANUAL", "zone_id": 1, \
|
|
"duration": 10, "remaining": 10}, {"source": "SCHEDULE", "zone_id": 1, "duration": 5, \
|
|
"remaining": 5}]'
|
|
|
|
|
|
def test_auto_mode(db):
|
|
assert db.get_auto_mode() is False
|
|
|
|
db.set_auto_mode(True)
|
|
assert db.get_auto_mode() is True
|