Source code for connectordb._connectordb

from __future__ import absolute_import
import json
import os

from ._connection import DatabaseConnection

from ._device import Device
from ._user import User
from ._stream import Stream, DATAPOINT_INSERT_LIMIT

# By default assume that a ConnectorDB server is running on localhost with
# default configuration
CONNECTORDB_URL = "http://localhost:3124"


[docs]class ConnectorDB(Device): """ConnectorDB is the main entry point for any application that uses the python API. The class accepts both a username and password in order to log in as a user, and accepts an apikey when logging in directly from a device:: import connectordb cdb = connectordb.ConnectorDB("myusername","mypassword") #prints "myusername/user" - logging in by username/password combo #logs in as the user device. print cdb.path """ def __init__(self, user_or_apikey=None, user_password=None, url=CONNECTORDB_URL): db = DatabaseConnection(user_or_apikey, user_password, url) # ConnectorDB uses bcrypt by default for password hashing. While great for security # of passwords, it is extremely expensive, so it slows down queries. So, if we logged in # as a user with password, attempt to get the user device apikey to use for future authentication # so that queries are fast if user_password is not None: # Logins happen as a user device Device.__init__(self, db, user_or_apikey + "/user") if self.apikey is not None: # Reset the auth to be apikey db.setauth(self.apikey) else: # We logged in as a device - we have to ping the server to get our # name Device.__init__(self, db, db.path) def __call__(self, path): """Enables getting arbitrary users/devices/streams in a simple way. Just call the object with the u/d/s uri cdb = ConnectorDB("myapikey") cdb("user1") -> user1 object cdb("user1/device1") -> user1/device1 object cdb("user1/device1/stream1") -> user1/device1/stream1 object """ n = path.count("/") if n == 0: return User(self.db, path) elif n == 1: return Device(self.db, path) else: return Stream(self.db, path)
[docs] def close(self): """shuts down all active connections to ConnectorDB""" self.db.close()
[docs] def reset_apikey(self): """invalidates the device's current api key, and generates a new one. Resets current auth to use the new apikey, since the change would have future queries fail if they use the old api key.""" apikey = Device.reset_apikey(self) self.db.setauth(apikey) return apikey
[docs] def count_users(self): """Gets the total number of users registered with the database. Only available to administrator.""" return int(self.db.get("", {"q": "countusers"}).text)
[docs] def count_devices(self): """Gets the total number of devices registered with the database. Only available to administrator.""" return int(self.db.get("", {"q": "countdevices"}).text)
[docs] def count_streams(self): """Gets the total number of streams registered with the database. Only available to administrator.""" return int(self.db.get("", {"q": "countstreams"}).text)
[docs] def info(self): """returns a dictionary of information about the database, including the database version, the transforms and the interpolators supported:: >>>cdb = connectordb.ConnectorDB(apikey) >>>cdb.info() { "version": "0.3.0", "transforms": { "sum": {"description": "Returns the sum of all the datapoints that go through the transform"} ... }, "interpolators": { "closest": {"description": "Uses the datapoint closest to the interpolation timestamp"} ... } } """ return { "version": self.db.get("meta/version").text, "transforms": self.db.get("meta/transforms").json(), "interpolators": self.db.get("meta/interpolators").json() }
def __repr__(self): return "[ConnectorDB:%s]" % (self.path, )
[docs] def users(self): """Returns the list of users in the database""" result = self.db.read("", {"q": "ls"}) if result is None or result.json() is None: return [] users = [] for u in result.json(): usr = self(u["name"]) usr.metadata = u users.append(usr) return users
[docs] def ping(self): """Pings the ConnectorDB server. Useful for checking if the connection is valid""" return self.db.ping()
[docs] def import_users(self, directory): """Imports version 1 of ConnectorDB export. These exports can be generated by running user.export(dir), possibly on multiple users. """ exportInfoFile = os.path.join(directory, "connectordb.json") with open(exportInfoFile) as f: exportInfo = json.load(f) if exportInfo["Version"] != 1: raise ValueError("Not able to read this import version") # Now we list all the user directories for name in os.listdir(directory): udir = os.path.join(directory, name) if os.path.isdir(udir): # Let's read in the user with open(os.path.join(udir, "user.json")) as f: usrdata = json.load(f) u = self(usrdata["name"]) if u.exists(): raise ValueError("The user " + name + " already exists") del usrdata["name"] u.create(password=name, **usrdata) # Now read all of the user's devices for dname in os.listdir(udir): ddir = os.path.join(udir, dname) if os.path.isdir(ddir): u.import_device(ddir)