seguro.commands.acl_syncer.broker

Functions

reconcile(acl, b[, ignored_clients])

Classes

ACL(*, acltype, topic[, priority])

ACLType(value[, names, module, qualname, ...])

BaseModel()

Client(*, username[, password, clientid, ...])

Command(cmd, **attrs)

Config(clients, groups, roles)

Group(*, groupname[, roles])

GroupEntry(*, groupname[, priority])

Plugin(b)

Role(*, rolename[, textname, ...])

RoleEntry(*, rolename[, priority])

class seguro.commands.acl_syncer.broker.ACL(*, acltype, topic, priority=-1, allow)[source]

Bases: BaseModel

Parameters:
  • acltype (ACLType)

  • topic (str)

  • priority (int)

  • allow (bool)

acltype: ACLType
allow: bool
model_computed_fields: ClassVar[dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'acltype': FieldInfo(annotation=ACLType, required=True), 'allow': FieldInfo(annotation=bool, required=True), 'priority': FieldInfo(annotation=int, required=False, default=-1), 'topic': FieldInfo(annotation=str, required=True)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

priority: int
topic: str
class seguro.commands.acl_syncer.broker.ACLType(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: Enum

PUBLISH_CLIENT_RECEIVE = 'publishClientReceive'
PUBLISH_CLIENT_SEND = 'publishClientSend'
SUBSCRIBE_LITERAL = 'subscribeLiteral'
SUBSCRIBE_PATTERN = 'subscribePattern'
UNSUBSCRIBE_LITERAL = 'unsubscribeLiteral'
UNSUBSCRIBE_PATTERN = 'unsubscribePattern'
classmethod from_broker_action(act)[source]
Parameters:

act (BrokerAction)

Return type:

ACLType

class seguro.commands.acl_syncer.broker.BaseModel[source]

Bases: BaseModel

dump()[source]
model_computed_fields: ClassVar[dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

class seguro.commands.acl_syncer.broker.Client(*, username, password=None, clientid=None, textname=None, textdescription=None, groups=frozenset({}), roles=frozenset({}))[source]

Bases: BaseModel

Parameters:
  • username (str)

  • password (str | None)

  • clientid (str | None)

  • textname (str | None)

  • textdescription (str | None)

  • groups (frozenset[GroupEntry])

  • roles (frozenset[RoleEntry])

clientid: str | None
groups: frozenset[GroupEntry]
model_computed_fields: ClassVar[dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'clientid': FieldInfo(annotation=Union[str, NoneType], required=False, default=None), 'groups': FieldInfo(annotation=frozenset[GroupEntry], required=False, default=frozenset()), 'password': FieldInfo(annotation=Union[str, NoneType], required=False, default=None), 'roles': FieldInfo(annotation=frozenset[RoleEntry], required=False, default=frozenset()), 'textdescription': FieldInfo(annotation=Union[str, NoneType], required=False, default=None), 'textname': FieldInfo(annotation=Union[str, NoneType], required=False, default=None), 'username': FieldInfo(annotation=str, required=True)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

property name: str
password: str | None
roles: frozenset[RoleEntry]
textdescription: str | None
textname: str | None
username: str
class seguro.commands.acl_syncer.broker.Command(cmd, **attrs)[source]

Bases: object

Parameters:

cmd (str)

classmethod create_client(client)[source]
Parameters:

client (Client)

classmethod create_group(group)[source]
Parameters:

group (Group)

classmethod create_role(role)[source]
Parameters:

role (Role)

classmethod delete_client(name)[source]
Parameters:

name (str)

classmethod delete_group(name)[source]
Parameters:

name (str)

classmethod delete_role(name)[source]
Parameters:

name (str)

classmethod list_clients(verbose=True, count=-1, offset=0)[source]
Parameters:
  • verbose (bool)

  • count (int)

  • offset (int)

classmethod list_groups(verbose=True, count=-1, offset=0)[source]
Parameters:
  • verbose (bool)

  • count (int)

  • offset (int)

classmethod list_roles(verbose=True, count=-1, offset=0)[source]
Parameters:
  • verbose (bool)

  • count (int)

  • offset (int)

classmethod modify_client(client)[source]
Parameters:

client (Client)

classmethod modify_group(group)[source]
Parameters:

group (Group)

classmethod modify_role(role)[source]
Parameters:

role (Role)

classmethod set_default_access(acls)[source]
Parameters:

acls (dict[ACLType, bool])

to_dict()[source]
class seguro.commands.acl_syncer.broker.Config(clients: dict[str, seguro.commands.acl_syncer.broker.Client], groups: dict[str, seguro.commands.acl_syncer.broker.Group], roles: dict[str, seguro.commands.acl_syncer.broker.Role])[source]

Bases: object

Parameters:
  • clients (dict[str, Client])

  • groups (dict[str, Group])

  • roles (dict[str, Role])

also_in(other)[source]
Parameters:

other (Config)

Return type:

Config

belonging_to(client_names={})[source]
Parameters:

client_names (set[str])

Return type:

Config

clients: dict[str, Client]
equal_to(other)[source]
Parameters:

other (Config)

Return type:

Config

classmethod from_acl(acl)[source]
Parameters:

acl (AccessControlList)

Return type:

Config

groups: dict[str, Group]
not_in(other)[source]
Parameters:

other (Config)

Return type:

Config

roles: dict[str, Role]
class seguro.commands.acl_syncer.broker.Group(*, groupname, roles=frozenset({}))[source]

Bases: BaseModel

Parameters:
  • groupname (str)

  • roles (frozenset[RoleEntry])

groupname: str
model_computed_fields: ClassVar[dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'groupname': FieldInfo(annotation=str, required=True), 'roles': FieldInfo(annotation=frozenset[RoleEntry], required=False, default=frozenset())}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

property name: str
roles: frozenset[RoleEntry]
class seguro.commands.acl_syncer.broker.GroupEntry(*, groupname, priority=-1)[source]

Bases: BaseModel

Parameters:
  • groupname (str)

  • priority (int)

groupname: str
model_computed_fields: ClassVar[dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'groupname': FieldInfo(annotation=str, required=True), 'priority': FieldInfo(annotation=int, required=False, default=-1)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

priority: int
class seguro.commands.acl_syncer.broker.Plugin(b)[source]

Bases: object

Parameters:

b (Client)

create(cfg)[source]
Parameters:

cfg (Config)

Return type:

list[Command]

delete(cfg)[source]
Parameters:

cfg (Config)

Return type:

list[Command]

execute(cmds, timeout=2.0)[source]
Parameters:
  • cmds (list[Command])

  • timeout (float)

get_current_config()[source]
Return type:

Config

modify(cfg)[source]
Parameters:

cfg (Config)

Return type:

list[Command]

class seguro.commands.acl_syncer.broker.Role(*, rolename, textname=None, textdescription=None, acls=frozenset({}))[source]

Bases: BaseModel

Parameters:
  • rolename (str)

  • textname (str | None)

  • textdescription (str | None)

  • acls (frozenset[ACL])

acls: frozenset[ACL]
model_computed_fields: ClassVar[dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'acls': FieldInfo(annotation=frozenset[ACL], required=False, default=frozenset()), 'rolename': FieldInfo(annotation=str, required=True), 'textdescription': FieldInfo(annotation=Union[str, NoneType], required=False, default=None), 'textname': FieldInfo(annotation=Union[str, NoneType], required=False, default=None)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

property name: str
rolename: str
textdescription: str | None
textname: str | None
class seguro.commands.acl_syncer.broker.RoleEntry(*, rolename, priority=-1)[source]

Bases: BaseModel

Parameters:
  • rolename (str)

  • priority (int)

model_computed_fields: ClassVar[dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'priority': FieldInfo(annotation=int, required=False, default=-1), 'rolename': FieldInfo(annotation=str, required=True)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

priority: int
rolename: str
seguro.commands.acl_syncer.broker.reconcile(acl, b, ignored_clients={})[source]
Parameters: