seguro.commands.acl_syncer.broker

Functions

reconcile(acl, b[, ignored_clients])

Classes

ACL(*, acltype, topic[, priority, allow])

ACLType(*values)

BaseModel()

Client(*, username[, password, clientid, ...])

Command(cmd, **attrs)

Config(clients, groups, roles)

Group(*, groupname[, roles])

GroupEntry(*, groupname[, priority])

Plugin(b)

Role(*, rolename[, textname, ...])

RoleEntry(*, rolename[, priority])

class seguro.commands.acl_syncer.broker.ACL(*, acltype, topic, priority=-1, allow=True)[source]

Bases: BaseModel

Parameters:
  • acltype (ACLType)

  • topic (str)

  • priority (int)

  • allow (bool)

acltype: ACLType
allow: bool
model_config = {'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

priority: int
topic: str
class seguro.commands.acl_syncer.broker.ACLType(*values)[source]

Bases: Enum

PUBLISH_CLIENT_RECEIVE = 'publishClientReceive'
PUBLISH_CLIENT_SEND = 'publishClientSend'
SUBSCRIBE_LITERAL = 'subscribeLiteral'
SUBSCRIBE_PATTERN = 'subscribePattern'
UNSUBSCRIBE_LITERAL = 'unsubscribeLiteral'
UNSUBSCRIBE_PATTERN = 'unsubscribePattern'
classmethod from_broker_action(act)[source]
Parameters:

act (BrokerAction)

Return type:

list[ACLType]

class seguro.commands.acl_syncer.broker.BaseModel[source]

Bases: BaseModel

dump()[source]
model_config = {'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class seguro.commands.acl_syncer.broker.Client(*, username, password=None, clientid=None, textname=None, textdescription=None, groups=frozenset({}), roles=frozenset({}))[source]

Bases: BaseModel

Parameters:
  • username (str)

  • password (str | None)

  • clientid (str | None)

  • textname (str | None)

  • textdescription (str | None)

  • groups (frozenset[GroupEntry])

  • roles (frozenset[RoleEntry])

clientid: str | None
groups: frozenset[GroupEntry]
model_config = {'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

property name: str
password: str | None
roles: frozenset[RoleEntry]
textdescription: str | None
textname: str | None
username: str
class seguro.commands.acl_syncer.broker.Command(cmd, **attrs)[source]

Bases: object

Parameters:

cmd (str)

classmethod create_client(client)[source]
Parameters:

client (Client)

classmethod create_group(group)[source]
Parameters:

group (Group)

classmethod create_role(role)[source]
Parameters:

role (Role)

classmethod delete_client(name)[source]
Parameters:

name (str)

classmethod delete_group(name)[source]
Parameters:

name (str)

classmethod delete_role(name)[source]
Parameters:

name (str)

classmethod list_clients(verbose=True, count=-1, offset=0)[source]
Parameters:
  • verbose (bool)

  • count (int)

  • offset (int)

classmethod list_groups(verbose=True, count=-1, offset=0)[source]
Parameters:
  • verbose (bool)

  • count (int)

  • offset (int)

classmethod list_roles(verbose=True, count=-1, offset=0)[source]
Parameters:
  • verbose (bool)

  • count (int)

  • offset (int)

classmethod modify_client(client)[source]
Parameters:

client (Client)

classmethod modify_group(group)[source]
Parameters:

group (Group)

classmethod modify_role(role)[source]
Parameters:

role (Role)

classmethod set_default_access(acls)[source]
Parameters:

acls (dict[ACLType, bool])

to_dict()[source]
class seguro.commands.acl_syncer.broker.Config(clients: dict[str, seguro.commands.acl_syncer.broker.Client], groups: dict[str, seguro.commands.acl_syncer.broker.Group], roles: dict[str, seguro.commands.acl_syncer.broker.Role])[source]

Bases: object

Parameters:
  • clients (dict[str, Client])

  • groups (dict[str, Group])

  • roles (dict[str, Role])

also_in(other)[source]
Parameters:

other (Config)

Return type:

Config

belonging_to(client_names={})[source]
Parameters:

client_names (set[str])

Return type:

Config

clients: dict[str, Client]
equal_to(other)[source]
Parameters:

other (Config)

Return type:

Config

classmethod from_acl(acl)[source]
Parameters:

acl (AccessControlList)

Return type:

Config

groups: dict[str, Group]
not_in(other)[source]
Parameters:

other (Config)

Return type:

Config

roles: dict[str, Role]
class seguro.commands.acl_syncer.broker.Group(*, groupname, roles=frozenset({}))[source]

Bases: BaseModel

Parameters:
  • groupname (str)

  • roles (frozenset[RoleEntry])

groupname: str
model_config = {'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

property name: str
roles: frozenset[RoleEntry]
class seguro.commands.acl_syncer.broker.GroupEntry(*, groupname, priority=-1)[source]

Bases: BaseModel

Parameters:
  • groupname (str)

  • priority (int)

groupname: str
model_config = {'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

priority: int
class seguro.commands.acl_syncer.broker.Plugin(b)[source]

Bases: object

Parameters:

b (Client)

create(cfg)[source]
Parameters:

cfg (Config)

Return type:

list[Command]

delete(cfg)[source]
Parameters:

cfg (Config)

Return type:

list[Command]

execute(cmds, timeout=2.0)[source]
Parameters:
  • cmds (list[Command])

  • timeout (float)

get_current_config()[source]
Return type:

Config

modify(cfg)[source]
Parameters:

cfg (Config)

Return type:

list[Command]

queue: Queue[MQTTMessage]
class seguro.commands.acl_syncer.broker.Role(*, rolename, textname=None, textdescription=None, acls=frozenset({}))[source]

Bases: BaseModel

Parameters:
  • rolename (str)

  • textname (str | None)

  • textdescription (str | None)

  • acls (frozenset[ACL])

acls: frozenset[ACL]
model_config = {'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

property name: str
rolename: str
textdescription: str | None
textname: str | None
class seguro.commands.acl_syncer.broker.RoleEntry(*, rolename, priority=-1)[source]

Bases: BaseModel

Parameters:
  • rolename (str)

  • priority (int)

model_config = {'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

priority: int
rolename: str
seguro.commands.acl_syncer.broker.reconcile(acl, b, ignored_clients={})[source]
Parameters: