You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
160 lines
5.0 KiB
160 lines
5.0 KiB
import importlib
|
|
import io
|
|
import sys
|
|
import types
|
|
import unittest
|
|
from contextlib import redirect_stdout
|
|
|
|
|
|
class AuxiliaryToolTests(unittest.TestCase):
|
|
def test_report_receiver_bool_flag(self):
|
|
import report_receiver
|
|
|
|
self.assertTrue(report_receiver.bool_flag("1"))
|
|
self.assertTrue(report_receiver.bool_flag("true"))
|
|
self.assertTrue(report_receiver.bool_flag("yes"))
|
|
self.assertFalse(report_receiver.bool_flag("0"))
|
|
self.assertFalse(report_receiver.bool_flag(""))
|
|
self.assertFalse(report_receiver.bool_flag(None))
|
|
|
|
def test_ami_factory_builds_protocol_with_instance_state(self):
|
|
try:
|
|
import AMI
|
|
except ModuleNotFoundError as exc:
|
|
self.skipTest(str(exc))
|
|
|
|
factory = AMI.AMI.AMIClientFactory(
|
|
AMI.AMI.AMIClient,
|
|
b"user",
|
|
b"secret",
|
|
b"1234",
|
|
b"ilink 3 2350",
|
|
)
|
|
protocol = factory.buildProtocol(None)
|
|
|
|
self.assertEqual(protocol.username, b"user")
|
|
self.assertEqual(protocol.secret, b"secret")
|
|
self.assertEqual(protocol.nodenum, b"1234")
|
|
self.assertEqual(protocol.command, b"ilink 3 2350")
|
|
|
|
def test_report_sql_uses_factory_db_and_parameterized_insert(self):
|
|
self._install_mysql_stub()
|
|
try:
|
|
import report_sql
|
|
report_sql = importlib.reload(report_sql)
|
|
except ModuleNotFoundError as exc:
|
|
self.skipTest(str(exc))
|
|
|
|
fake_db = _FakeDB()
|
|
fake_reactor = object()
|
|
factory = report_sql.reportClientFactory(report_sql.reportClient, fake_db, fake_reactor)
|
|
with redirect_stdout(io.StringIO()):
|
|
client = factory.buildProtocol(None)
|
|
|
|
self.assertIs(client.db, fake_db)
|
|
self.assertIs(client.reactor, fake_reactor)
|
|
|
|
event = {
|
|
"type": "GROUP VOICE",
|
|
"event": "START",
|
|
"trx": "RX",
|
|
"system": "SYSTEM",
|
|
"streamid": "1234",
|
|
"peerid": "5678",
|
|
"subid": "9012",
|
|
"slot": "2",
|
|
"dstid": "2350",
|
|
"duration": "0",
|
|
}
|
|
with redirect_stdout(io.StringIO()):
|
|
client.send_mysql(event)
|
|
|
|
statement, params = fake_db.cursor_obj.executed
|
|
self.assertIn("%s", statement)
|
|
self.assertEqual(params[0], "GROUP VOICE")
|
|
self.assertEqual(params[8], "2350")
|
|
self.assertTrue(fake_db.committed)
|
|
self.assertTrue(fake_db.cursor_obj.closed)
|
|
|
|
def test_proxy_environment_bool_parser(self):
|
|
saved_modules = self._install_proxy_stubs()
|
|
try:
|
|
import hotspot_proxy_v2
|
|
hotspot_proxy_v2 = importlib.reload(hotspot_proxy_v2)
|
|
|
|
self.assertTrue(hotspot_proxy_v2.bool_from_env("1"))
|
|
self.assertTrue(hotspot_proxy_v2.bool_from_env("true"))
|
|
self.assertTrue(hotspot_proxy_v2.bool_from_env("yes"))
|
|
self.assertFalse(hotspot_proxy_v2.bool_from_env("0"))
|
|
self.assertFalse(hotspot_proxy_v2.bool_from_env(""))
|
|
self.assertFalse(hotspot_proxy_v2.bool_from_env(None))
|
|
finally:
|
|
self._restore_modules(saved_modules)
|
|
|
|
def _install_mysql_stub(self):
|
|
mysql_module = types.ModuleType("mysql")
|
|
connector_module = types.ModuleType("mysql.connector")
|
|
|
|
class ConnectorError(Exception):
|
|
pass
|
|
|
|
connector_module.Error = ConnectorError
|
|
connector_module.errorcode = types.SimpleNamespace(
|
|
ER_ACCESS_DENIED_ERROR=1045,
|
|
ER_BAD_DB_ERROR=1049,
|
|
)
|
|
mysql_module.connector = connector_module
|
|
sys.modules["mysql"] = mysql_module
|
|
sys.modules["mysql.connector"] = connector_module
|
|
|
|
def _install_proxy_stubs(self):
|
|
stubbed = ["Pyro5", "Pyro5.api"]
|
|
saved_modules = {name: sys.modules.get(name) for name in stubbed + ["hotspot_proxy_v2"]}
|
|
|
|
pyro5_module = types.ModuleType("Pyro5")
|
|
pyro5_api_module = types.ModuleType("Pyro5.api")
|
|
pyro5_api_module.Proxy = object
|
|
pyro5_module.api = pyro5_api_module
|
|
sys.modules["Pyro5"] = pyro5_module
|
|
sys.modules["Pyro5.api"] = pyro5_api_module
|
|
sys.modules.pop("hotspot_proxy_v2", None)
|
|
return saved_modules
|
|
|
|
def _restore_modules(self, saved_modules):
|
|
for name, module in saved_modules.items():
|
|
if module is None:
|
|
sys.modules.pop(name, None)
|
|
else:
|
|
sys.modules[name] = module
|
|
|
|
|
|
class _FakeCursor:
|
|
def __init__(self):
|
|
self.executed = None
|
|
self.closed = False
|
|
|
|
def execute(self, statement, params):
|
|
self.executed = (statement, params)
|
|
|
|
def close(self):
|
|
self.closed = True
|
|
|
|
|
|
class _FakeDB:
|
|
def __init__(self):
|
|
self.cursor_obj = _FakeCursor()
|
|
self.committed = False
|
|
|
|
def is_connected(self):
|
|
return True
|
|
|
|
def cursor(self):
|
|
return self.cursor_obj
|
|
|
|
def commit(self):
|
|
self.committed = True
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|