echostream_node

  1from __future__ import annotations
  2
  3import logging
  4from abc import ABC
  5from dataclasses import dataclass
  6from os import cpu_count, environ
  7from time import time
  8from typing import TYPE_CHECKING, Any, Callable, Union
  9from uuid import uuid4
 10
 11import awsserviceendpoints
 12import simplejson as json
 13from boto3.session import Session
 14from botocore.config import Config
 15from echostream_botocore import AppSession
 16from gql import Client as GqlClient
 17from gql import gql
 18from gql.transport.requests import RequestsHTTPTransport
 19from pycognito import Cognito
 20from pycognito.utils import RequestsSrpAuth
 21
 22
 23def getLogger() -> logging.Logger:
 24    """
 25    Returns "echostream-node" logger
 26    """
 27    return logging.getLogger("echostream-node")
 28
 29
 30getLogger().addHandler(logging.NullHandler())
 31
 32if TYPE_CHECKING:
 33    from mypy_boto3_dynamodb.service_resource import Table
 34    from mypy_boto3_sqs.client import SQSClient
 35    from mypy_boto3_sqs.type_defs import MessageAttributeValueTypeDef
 36else:
 37    MessageAttributeValueTypeDef = dict
 38    SQSClient = object
 39    Table = object
 40
 41_GET_APP_GQL = gql(
 42    """
 43    query getNode($name: String!, $tenant: String!) {
 44        GetNode(name: $name, tenant: $tenant) {
 45            __typename
 46            ... on AppChangeReceiverNode {
 47                app {
 48                    __typename
 49                    ... on CrossAccountApp {
 50                        auditRecordsEndpoint
 51                        name
 52                        tableAccess
 53                    }
 54                    ... on ExternalApp {
 55                        auditRecordsEndpoint
 56                        name
 57                        tableAccess
 58                    }
 59                    ... on ManagedApp {
 60                        auditRecordsEndpoint
 61                        name
 62                        tableAccess
 63                    }
 64                }
 65            }
 66            ... on ExternalNode {
 67                app {
 68                    __typename
 69                    ... on CrossAccountApp {
 70                        auditRecordsEndpoint
 71                        name
 72                        tableAccess
 73                    }
 74                    ... on ExternalApp {
 75                        auditRecordsEndpoint
 76                        name
 77                        tableAccess
 78                    }
 79                }
 80            }
 81            ... on ManagedNode {
 82                app {
 83                    __typename
 84                    auditRecordsEndpoint
 85                    name
 86                    tableAccess
 87                }
 88            }
 89            tenant {
 90                region
 91                table
 92            }
 93        }
 94    }
 95    """
 96)
 97
 98_GET_BULK_DATA_STORAGE_GQL = gql(
 99    """
100    query getBulkDataStorage($tenant: String!, $useAccelerationEndpoint: Boolean!) {
101        GetBulkDataStorage(tenant: $tenant, count: 20, useAccelerationEndpoint: $useAccelerationEndpoint) {
102            expiration
103            presignedGet
104            presignedPost {
105                fields
106                url
107            }
108            presignedPut {
109                headers
110                url
111            }
112        }
113    }
114    """
115)
116
117_GET_NODE_GQL = gql(
118    """
119    query getNode($name: String!, $tenant: String!) {
120        GetNode(name: $name, tenant: $tenant) {
121            ... on AppChangeReceiverNode {
122                receiveEdge {
123                    queue
124                    source {
125                        name
126                    }
127                }
128                receiveMessageType {
129                    auditor
130                    name
131                }
132            }
133            ... on ExternalNode {
134                app {
135                    ... on CrossAccountApp {
136                        config
137                    }
138                    ... on ExternalApp {
139                        config
140                    }
141                }
142                config
143                receiveEdges {
144                    queue
145                    source {
146                        name
147                    }
148                }
149                receiveMessageType {
150                    auditor
151                    name
152                }
153                sendEdges {
154                    queue
155                    target {
156                        name
157                    }
158                }
159                sendMessageType {
160                    auditor
161                    name
162                }
163                stopped
164            }
165            ... on ManagedNode {
166                app {
167                    config
168                }
169                config
170                receiveEdges {
171                    queue
172                    source {
173                        name
174                    }
175                }
176                receiveMessageType {
177                    auditor
178                    name
179                }
180                sendEdges {
181                    queue
182                    target {
183                        name
184                    }
185                }
186                sendMessageType {
187                    auditor
188                    name
189                }
190                stopped
191            }
192            tenant {
193                audit
194                config
195            }
196        }
197    }
198    """
199)
200
201
202Auditor = Callable[..., dict[str, Any]]
203"""Typing for MessageType auditor functions"""
204
205BatchItemFailures = dict[str, list[dict[str, str]]]
206
207
208@dataclass(frozen=True, init=False)
209class BulkDataStorage:
210    """
211    Class to manage bulk data storage.
212    """
213
214    expiration: int
215    """Epoch, in seconds, when this expires"""
216    presigned_get: str
217    """URL that you can HTTP 'GET' to retrieve the bulk data"""
218    presigned_post: PresignedPost
219    """URL that you can HTTP 'POST' bulk data to, along with the fields the 'POST' requires"""
220    presigned_put: PresignedPut
221    """URL that you can HTTP 'PUT' bulk data to, along with the headers the 'PUT' requires"""
222
223    def __init__(self, bulk_data_storage: dict[str, Union[str, PresignedPost]]) -> None:
224        super().__init__()
225        super().__setattr__("expiration", bulk_data_storage["expiration"])
226        super().__setattr__("presigned_get", bulk_data_storage["presignedGet"])
227        super().__setattr__(
228            "presigned_post",
229            PresignedPost(
230                fields=json.loads(bulk_data_storage["presignedPost"]["fields"]),
231                url=bulk_data_storage["presignedPost"]["url"],
232            ),
233        )
234        super().__setattr__(
235            "presigned_put",
236            PresignedPut(
237                headers=json.loads(bulk_data_storage["presignedPut"]["headers"]),
238                url=bulk_data_storage["presignedPut"]["url"],
239            ),
240        )
241
242    @property
243    def expired(self) -> bool:
244        """
245        Returns False if presigned_post is expired.
246        """
247        return self.expiration < time()
248
249
250@dataclass(frozen=True)
251class Edge:
252    """
253    Edge dataclass to manage edge details.
254    """
255
256    name: str
257    """The name of the Edge, normally the other end of the Edge"""
258    queue: str
259    """The SQS Queue URL of the Edge"""
260
261
262LambdaEvent = Union[bool, dict, float, int, list, str, tuple, None]
263"""Typing for the various types that a Lambda can be invoked with"""
264
265LambdaSqsRecords = list[
266    dict[
267        str,
268        Union[
269            str,
270            dict[str, str],
271            dict[
272                str,
273                dict[str, dict[str, Union[str, bytes, list[str], list[bytes]]]],
274            ],
275        ],
276    ]
277]
278
279
280@dataclass(frozen=True, init=False)
281class Message:
282    """
283    Message dataclass to manage message attributes and properties
284    """
285
286    body: str
287    """The body"""
288    group_id: str
289    """The SQS group id"""
290    length: int
291    """The length, as SQS calculates it"""
292    message_attributes: dict[str, MessageAttributeValueTypeDef]
293    """The user-defined attributes"""
294    message_type: MessageType
295    """The EchoStream message type"""
296    tracking_id: str
297    """The tracking id"""
298    previous_tracking_ids: list[str]
299    """A list of previous tracking ids. Populated if the original message was split"""
300
301    def __init__(
302        self,
303        body: str,
304        message_type: MessageType,
305        group_id: str = None,
306        previous_tracking_ids: Union[list[str], str] = None,
307        tracking_id: str = None,
308    ) -> None:
309        super().__init__()
310        super().__setattr__("body", body)
311        super().__setattr__("group_id", group_id)
312        super().__setattr__("message_type", message_type)
313        super().__setattr__("tracking_id", tracking_id or uuid4().hex)
314        if isinstance(previous_tracking_ids, str):
315            previous_tracking_ids = json.loads(previous_tracking_ids)
316        super().__setattr__(
317            "previous_tracking_ids",
318            previous_tracking_ids if previous_tracking_ids else None,
319        )
320        message_attributes = dict(
321            trackingId=MessageAttributeValueTypeDef(
322                DataType="String", StringValue=self.tracking_id
323            )
324        )
325        if self.previous_tracking_ids:
326            message_attributes["prevTrackingIds"] = MessageAttributeValueTypeDef(
327                DataType="String",
328                StringValue=json.dumps(
329                    self.previous_tracking_ids, separators=(",", ":")
330                ),
331            )
332        super().__setattr__("message_attributes", message_attributes)
333        length = len(self.body)
334        for name, attribute in self.message_attributes.items():
335            value = attribute[
336                (
337                    "StringValue"
338                    if (data_type := attribute["DataType"]) in ("String", "Number")
339                    else "BinaryValue"
340                )
341            ]
342            length += len(name) + len(data_type) + len(value)
343        if length > 262144:
344            raise ValueError(f"Message is > 262,144 in size")
345        super().__setattr__("length", length)
346
347    def __len__(self) -> int:
348        return self.length
349
350    def _sqs_message(self, node: Node) -> dict:
351        return dict(
352            MessageAttributes=self.message_attributes,
353            MessageBody=self.body,
354            MessageGroupId=self.group_id or node.name.replace(" ", "_"),
355        )
356
357
358@dataclass(frozen=True)
359class MessageType:
360    """
361    Dataclass for messagetype
362    """
363
364    auditor: Auditor
365    """The auditor"""
366    name: str
367    """The name"""
368
369
370class Node(ABC):
371    """
372    Base level node class. Used by threading and asyncio modules to interact with echostream nodes.
373    """
374
375    def __init__(
376        self,
377        *,
378        appsync_endpoint: str = None,
379        bulk_data_acceleration: bool = False,
380        client_id: str = None,
381        name: str = None,
382        password: str = None,
383        tenant: str = None,
384        timeout: float = None,
385        user_pool_id: str = None,
386        username: str = None,
387    ) -> None:
388        super().__init__()
389        self.__cognito = Cognito(
390            client_id=client_id or environ["CLIENT_ID"],
391            user_pool_id=user_pool_id or environ["USER_POOL_ID"],
392            username=username or environ["USER_NAME"],
393        )
394        self.__cognito.authenticate(password=password or environ["PASSWORD"])
395        name = name or environ["NODE"]
396        tenant = tenant or environ["TENANT"]
397        with GqlClient(
398            fetch_schema_from_transport=True,
399            transport=RequestsHTTPTransport(
400                auth=RequestsSrpAuth(cognito=self.__cognito, http_header_prefix=""),
401                url=appsync_endpoint or environ["APPSYNC_ENDPOINT"],
402            ),
403        ) as session:
404            data: dict[str, Union[str, dict]] = session.execute(
405                _GET_APP_GQL,
406                variable_values=dict(name=name, tenant=tenant),
407            )["GetNode"]
408        self.__app = data["app"]["name"]
409        self.__app_type = data["app"]["__typename"]
410        self.__audit_records_endpoint = data["app"]["auditRecordsEndpoint"]
411        self.__bulk_data_acceleration = bulk_data_acceleration
412        self.__config: dict[str, Any] = None
413        self.__name = name
414        self.__node_type = data["__typename"]
415        self.__session = Session(
416            botocore_session=AppSession(
417                app=self.__app, cognito=self.__cognito, tenant=tenant
418            ),
419            region_name=data["tenant"]["region"],
420        )
421        self.__sources: frozenset[Edge] = None
422        self.__sqs_client: SQSClient = (
423            Session(region_name=data["tenant"]["region"])
424            if self.__app_type == "CrossAccountApp"
425            else self.__session
426        ).client(
427            "sqs",
428            config=Config(
429                max_pool_connections=min(20, ((cpu_count() or 1) + 4) * 2),
430                retries={"mode": "standard"},
431            ),
432        )
433        self.__table: str = (
434            data["tenant"]["table"] if data["app"].get("tableAccess") else None
435        )
436        self.__targets: frozenset[Edge] = None
437        self.__tenant = tenant
438        self.__timeout = timeout or 0.1
439        self._audit = False
440        self._receive_message_type: MessageType = None
441        self._send_message_type: MessageType = None
442        self._stopped = False
443
444    @property
445    def _audit_records_endpoint(self) -> str:
446        return self.__audit_records_endpoint
447
448    @property
449    def _cognito(self) -> Cognito:
450        return self.__cognito
451
452    @property
453    def _session(self) -> Session:
454        return self.__session
455
456    @property
457    def _sources(self) -> frozenset[Edge]:
458        return self.__sources
459
460    @_sources.setter
461    def _sources(self, sources: set[Edge]) -> None:
462        self.__sources = frozenset(sources)
463
464    @property
465    def _sqs_client(self) -> SQSClient:
466        return self.__sqs_client
467
468    @property
469    def _targets(self) -> frozenset[Edge]:
470        return self.__targets
471
472    @_targets.setter
473    def _targets(self, targets: set[Edge]) -> None:
474        self.__targets = frozenset(targets)
475
476    @property
477    def app(self) -> str:
478        return self.__app
479
480    @property
481    def app_type(self) -> str:
482        return self.__app_type
483
484    @property
485    def audit(self) -> bool:
486        return self._audit
487
488    @property
489    def bulk_data_acceleration(self) -> bool:
490        return self.__bulk_data_acceleration
491
492    @property
493    def config(self) -> dict[str, Any]:
494        return self.__config
495
496    @config.setter
497    def config(self, config: dict[str, Any]) -> None:
498        self.__config = config
499
500    def create_message(
501        self,
502        /,
503        body: str,
504        *,
505        group_id: str = None,
506        previous_tracking_ids: Union[list[str], str] = None,
507        tracking_id: str = None,
508    ) -> Message:
509        """
510        Creates message as per the message standard of echostream.
511
512        Arguments:
513        body - [POSITIONAL ARGUMENT] content of the message
514        group_id - [KEYWORD ARGUMENT] group id
515        previous_tracking_ids - [KEYWORD ARGUMENT] previous tracking id of the message if available
516        tracking_id - [KEYWORD ARGUMENT] tracking id of the message if available
517        """
518        return Message(
519            body=body,
520            group_id=group_id,
521            message_type=self.send_message_type,
522            previous_tracking_ids=previous_tracking_ids,
523            tracking_id=tracking_id,
524        )
525
526    @property
527    def name(self) -> str:
528        return self.__name
529
530    @property
531    def node_type(self) -> str:
532        return self.__node_type
533
534    @property
535    def receive_message_type(self) -> MessageType:
536        return self._receive_message_type
537
538    @property
539    def send_message_type(self) -> MessageType:
540        return self._send_message_type
541
542    @property
543    def sources(self) -> frozenset[Edge]:
544        return self._sources
545
546    @property
547    def stopped(self) -> bool:
548        return self._stopped or False
549
550    @property
551    def table(self) -> Table:
552        if self.__table:
553            return self._session.resource("dynamodb").Table(self.__table)
554        raise RuntimeError(f"App {self.app} does not have tableAccess")
555
556    @property
557    def targets(self) -> frozenset[Edge]:
558        return self._targets
559
560    @property
561    def tenant(self) -> str:
562        return self.__tenant
563
564    @property
565    def timeout(self) -> float:
566        return self.__timeout
567
568    @timeout.setter
569    def timeout(self, timeout: float) -> None:
570        self.__timeout = timeout or 0.1
571
572
573@dataclass(frozen=True)
574class PresignedPost:
575    """
576    PresignedPost objects are part of the Bulk Data Storage system
577    and are used to POST bulk data.
578    """
579
580    fields: dict[str, str]
581    """The fields required to be sent when POSTing bulk data"""
582    url: str
583    """The POST url used to POST bulk data"""
584
585
586@dataclass(frozen=True)
587class PresignedPut:
588    """
589    PresignedPut objects are part of the Bulk Data Storage system
590    and are used to PUT bulk data.
591    """
592
593    headers: dict[str, str]
594    """The headers required to be sent when PUTing bulk data"""
595    url: str
596    """The PUT url used to PUT bulk data"""
def getLogger() -> logging.Logger:
24def getLogger() -> logging.Logger:
25    """
26    Returns "echostream-node" logger
27    """
28    return logging.getLogger("echostream-node")

Returns "echostream-node" logger

Auditor = typing.Callable[..., dict[str, typing.Any]]

Typing for MessageType auditor functions

BatchItemFailures = dict[str, list[dict[str, str]]]
@dataclass(frozen=True, init=False)
class BulkDataStorage:
209@dataclass(frozen=True, init=False)
210class BulkDataStorage:
211    """
212    Class to manage bulk data storage.
213    """
214
215    expiration: int
216    """Epoch, in seconds, when this expires"""
217    presigned_get: str
218    """URL that you can HTTP 'GET' to retrieve the bulk data"""
219    presigned_post: PresignedPost
220    """URL that you can HTTP 'POST' bulk data to, along with the fields the 'POST' requires"""
221    presigned_put: PresignedPut
222    """URL that you can HTTP 'PUT' bulk data to, along with the headers the 'PUT' requires"""
223
224    def __init__(self, bulk_data_storage: dict[str, Union[str, PresignedPost]]) -> None:
225        super().__init__()
226        super().__setattr__("expiration", bulk_data_storage["expiration"])
227        super().__setattr__("presigned_get", bulk_data_storage["presignedGet"])
228        super().__setattr__(
229            "presigned_post",
230            PresignedPost(
231                fields=json.loads(bulk_data_storage["presignedPost"]["fields"]),
232                url=bulk_data_storage["presignedPost"]["url"],
233            ),
234        )
235        super().__setattr__(
236            "presigned_put",
237            PresignedPut(
238                headers=json.loads(bulk_data_storage["presignedPut"]["headers"]),
239                url=bulk_data_storage["presignedPut"]["url"],
240            ),
241        )
242
243    @property
244    def expired(self) -> bool:
245        """
246        Returns False if presigned_post is expired.
247        """
248        return self.expiration < time()

Class to manage bulk data storage.

BulkDataStorage( bulk_data_storage: dict[str, typing.Union[str, PresignedPost]])
224    def __init__(self, bulk_data_storage: dict[str, Union[str, PresignedPost]]) -> None:
225        super().__init__()
226        super().__setattr__("expiration", bulk_data_storage["expiration"])
227        super().__setattr__("presigned_get", bulk_data_storage["presignedGet"])
228        super().__setattr__(
229            "presigned_post",
230            PresignedPost(
231                fields=json.loads(bulk_data_storage["presignedPost"]["fields"]),
232                url=bulk_data_storage["presignedPost"]["url"],
233            ),
234        )
235        super().__setattr__(
236            "presigned_put",
237            PresignedPut(
238                headers=json.loads(bulk_data_storage["presignedPut"]["headers"]),
239                url=bulk_data_storage["presignedPut"]["url"],
240            ),
241        )
expiration: int

Epoch, in seconds, when this expires

presigned_get: str

URL that you can HTTP 'GET' to retrieve the bulk data

presigned_post: PresignedPost

URL that you can HTTP 'POST' bulk data to, along with the fields the 'POST' requires

presigned_put: PresignedPut

URL that you can HTTP 'PUT' bulk data to, along with the headers the 'PUT' requires

expired: bool
243    @property
244    def expired(self) -> bool:
245        """
246        Returns False if presigned_post is expired.
247        """
248        return self.expiration < time()

Returns False if presigned_post is expired.

@dataclass(frozen=True)
class Edge:
251@dataclass(frozen=True)
252class Edge:
253    """
254    Edge dataclass to manage edge details.
255    """
256
257    name: str
258    """The name of the Edge, normally the other end of the Edge"""
259    queue: str
260    """The SQS Queue URL of the Edge"""

Edge dataclass to manage edge details.

Edge(name: str, queue: str)
name: str

The name of the Edge, normally the other end of the Edge

queue: str

The SQS Queue URL of the Edge

LambdaEvent = typing.Union[bool, dict, float, int, list, str, tuple, NoneType]

Typing for the various types that a Lambda can be invoked with

LambdaSqsRecords = list[dict[str, typing.Union[str, dict[str, str], dict[str, dict[str, dict[str, typing.Union[str, bytes, list[str], list[bytes]]]]]]]]
@dataclass(frozen=True, init=False)
class Message:
281@dataclass(frozen=True, init=False)
282class Message:
283    """
284    Message dataclass to manage message attributes and properties
285    """
286
287    body: str
288    """The body"""
289    group_id: str
290    """The SQS group id"""
291    length: int
292    """The length, as SQS calculates it"""
293    message_attributes: dict[str, MessageAttributeValueTypeDef]
294    """The user-defined attributes"""
295    message_type: MessageType
296    """The EchoStream message type"""
297    tracking_id: str
298    """The tracking id"""
299    previous_tracking_ids: list[str]
300    """A list of previous tracking ids. Populated if the original message was split"""
301
302    def __init__(
303        self,
304        body: str,
305        message_type: MessageType,
306        group_id: str = None,
307        previous_tracking_ids: Union[list[str], str] = None,
308        tracking_id: str = None,
309    ) -> None:
310        super().__init__()
311        super().__setattr__("body", body)
312        super().__setattr__("group_id", group_id)
313        super().__setattr__("message_type", message_type)
314        super().__setattr__("tracking_id", tracking_id or uuid4().hex)
315        if isinstance(previous_tracking_ids, str):
316            previous_tracking_ids = json.loads(previous_tracking_ids)
317        super().__setattr__(
318            "previous_tracking_ids",
319            previous_tracking_ids if previous_tracking_ids else None,
320        )
321        message_attributes = dict(
322            trackingId=MessageAttributeValueTypeDef(
323                DataType="String", StringValue=self.tracking_id
324            )
325        )
326        if self.previous_tracking_ids:
327            message_attributes["prevTrackingIds"] = MessageAttributeValueTypeDef(
328                DataType="String",
329                StringValue=json.dumps(
330                    self.previous_tracking_ids, separators=(",", ":")
331                ),
332            )
333        super().__setattr__("message_attributes", message_attributes)
334        length = len(self.body)
335        for name, attribute in self.message_attributes.items():
336            value = attribute[
337                (
338                    "StringValue"
339                    if (data_type := attribute["DataType"]) in ("String", "Number")
340                    else "BinaryValue"
341                )
342            ]
343            length += len(name) + len(data_type) + len(value)
344        if length > 262144:
345            raise ValueError(f"Message is > 262,144 in size")
346        super().__setattr__("length", length)
347
348    def __len__(self) -> int:
349        return self.length
350
351    def _sqs_message(self, node: Node) -> dict:
352        return dict(
353            MessageAttributes=self.message_attributes,
354            MessageBody=self.body,
355            MessageGroupId=self.group_id or node.name.replace(" ", "_"),
356        )

Message dataclass to manage message attributes and properties

Message( body: str, message_type: MessageType, group_id: str = None, previous_tracking_ids: Union[list[str], str] = None, tracking_id: str = None)
302    def __init__(
303        self,
304        body: str,
305        message_type: MessageType,
306        group_id: str = None,
307        previous_tracking_ids: Union[list[str], str] = None,
308        tracking_id: str = None,
309    ) -> None:
310        super().__init__()
311        super().__setattr__("body", body)
312        super().__setattr__("group_id", group_id)
313        super().__setattr__("message_type", message_type)
314        super().__setattr__("tracking_id", tracking_id or uuid4().hex)
315        if isinstance(previous_tracking_ids, str):
316            previous_tracking_ids = json.loads(previous_tracking_ids)
317        super().__setattr__(
318            "previous_tracking_ids",
319            previous_tracking_ids if previous_tracking_ids else None,
320        )
321        message_attributes = dict(
322            trackingId=MessageAttributeValueTypeDef(
323                DataType="String", StringValue=self.tracking_id
324            )
325        )
326        if self.previous_tracking_ids:
327            message_attributes["prevTrackingIds"] = MessageAttributeValueTypeDef(
328                DataType="String",
329                StringValue=json.dumps(
330                    self.previous_tracking_ids, separators=(",", ":")
331                ),
332            )
333        super().__setattr__("message_attributes", message_attributes)
334        length = len(self.body)
335        for name, attribute in self.message_attributes.items():
336            value = attribute[
337                (
338                    "StringValue"
339                    if (data_type := attribute["DataType"]) in ("String", "Number")
340                    else "BinaryValue"
341                )
342            ]
343            length += len(name) + len(data_type) + len(value)
344        if length > 262144:
345            raise ValueError(f"Message is > 262,144 in size")
346        super().__setattr__("length", length)
body: str

The body

group_id: str

The SQS group id

length: int

The length, as SQS calculates it

message_attributes: dict[str, dict]

The user-defined attributes

message_type: MessageType

The EchoStream message type

tracking_id: str

The tracking id

previous_tracking_ids: list[str]

A list of previous tracking ids. Populated if the original message was split

@dataclass(frozen=True)
class MessageType:
359@dataclass(frozen=True)
360class MessageType:
361    """
362    Dataclass for messagetype
363    """
364
365    auditor: Auditor
366    """The auditor"""
367    name: str
368    """The name"""

Dataclass for messagetype

MessageType(auditor: Callable[..., dict[str, Any]], name: str)
auditor: Callable[..., dict[str, Any]]

The auditor

name: str

The name

class Node(abc.ABC):
371class Node(ABC):
372    """
373    Base level node class. Used by threading and asyncio modules to interact with echostream nodes.
374    """
375
376    def __init__(
377        self,
378        *,
379        appsync_endpoint: str = None,
380        bulk_data_acceleration: bool = False,
381        client_id: str = None,
382        name: str = None,
383        password: str = None,
384        tenant: str = None,
385        timeout: float = None,
386        user_pool_id: str = None,
387        username: str = None,
388    ) -> None:
389        super().__init__()
390        self.__cognito = Cognito(
391            client_id=client_id or environ["CLIENT_ID"],
392            user_pool_id=user_pool_id or environ["USER_POOL_ID"],
393            username=username or environ["USER_NAME"],
394        )
395        self.__cognito.authenticate(password=password or environ["PASSWORD"])
396        name = name or environ["NODE"]
397        tenant = tenant or environ["TENANT"]
398        with GqlClient(
399            fetch_schema_from_transport=True,
400            transport=RequestsHTTPTransport(
401                auth=RequestsSrpAuth(cognito=self.__cognito, http_header_prefix=""),
402                url=appsync_endpoint or environ["APPSYNC_ENDPOINT"],
403            ),
404        ) as session:
405            data: dict[str, Union[str, dict]] = session.execute(
406                _GET_APP_GQL,
407                variable_values=dict(name=name, tenant=tenant),
408            )["GetNode"]
409        self.__app = data["app"]["name"]
410        self.__app_type = data["app"]["__typename"]
411        self.__audit_records_endpoint = data["app"]["auditRecordsEndpoint"]
412        self.__bulk_data_acceleration = bulk_data_acceleration
413        self.__config: dict[str, Any] = None
414        self.__name = name
415        self.__node_type = data["__typename"]
416        self.__session = Session(
417            botocore_session=AppSession(
418                app=self.__app, cognito=self.__cognito, tenant=tenant
419            ),
420            region_name=data["tenant"]["region"],
421        )
422        self.__sources: frozenset[Edge] = None
423        self.__sqs_client: SQSClient = (
424            Session(region_name=data["tenant"]["region"])
425            if self.__app_type == "CrossAccountApp"
426            else self.__session
427        ).client(
428            "sqs",
429            config=Config(
430                max_pool_connections=min(20, ((cpu_count() or 1) + 4) * 2),
431                retries={"mode": "standard"},
432            ),
433        )
434        self.__table: str = (
435            data["tenant"]["table"] if data["app"].get("tableAccess") else None
436        )
437        self.__targets: frozenset[Edge] = None
438        self.__tenant = tenant
439        self.__timeout = timeout or 0.1
440        self._audit = False
441        self._receive_message_type: MessageType = None
442        self._send_message_type: MessageType = None
443        self._stopped = False
444
445    @property
446    def _audit_records_endpoint(self) -> str:
447        return self.__audit_records_endpoint
448
449    @property
450    def _cognito(self) -> Cognito:
451        return self.__cognito
452
453    @property
454    def _session(self) -> Session:
455        return self.__session
456
457    @property
458    def _sources(self) -> frozenset[Edge]:
459        return self.__sources
460
461    @_sources.setter
462    def _sources(self, sources: set[Edge]) -> None:
463        self.__sources = frozenset(sources)
464
465    @property
466    def _sqs_client(self) -> SQSClient:
467        return self.__sqs_client
468
469    @property
470    def _targets(self) -> frozenset[Edge]:
471        return self.__targets
472
473    @_targets.setter
474    def _targets(self, targets: set[Edge]) -> None:
475        self.__targets = frozenset(targets)
476
477    @property
478    def app(self) -> str:
479        return self.__app
480
481    @property
482    def app_type(self) -> str:
483        return self.__app_type
484
485    @property
486    def audit(self) -> bool:
487        return self._audit
488
489    @property
490    def bulk_data_acceleration(self) -> bool:
491        return self.__bulk_data_acceleration
492
493    @property
494    def config(self) -> dict[str, Any]:
495        return self.__config
496
497    @config.setter
498    def config(self, config: dict[str, Any]) -> None:
499        self.__config = config
500
501    def create_message(
502        self,
503        /,
504        body: str,
505        *,
506        group_id: str = None,
507        previous_tracking_ids: Union[list[str], str] = None,
508        tracking_id: str = None,
509    ) -> Message:
510        """
511        Creates message as per the message standard of echostream.
512
513        Arguments:
514        body - [POSITIONAL ARGUMENT] content of the message
515        group_id - [KEYWORD ARGUMENT] group id
516        previous_tracking_ids - [KEYWORD ARGUMENT] previous tracking id of the message if available
517        tracking_id - [KEYWORD ARGUMENT] tracking id of the message if available
518        """
519        return Message(
520            body=body,
521            group_id=group_id,
522            message_type=self.send_message_type,
523            previous_tracking_ids=previous_tracking_ids,
524            tracking_id=tracking_id,
525        )
526
527    @property
528    def name(self) -> str:
529        return self.__name
530
531    @property
532    def node_type(self) -> str:
533        return self.__node_type
534
535    @property
536    def receive_message_type(self) -> MessageType:
537        return self._receive_message_type
538
539    @property
540    def send_message_type(self) -> MessageType:
541        return self._send_message_type
542
543    @property
544    def sources(self) -> frozenset[Edge]:
545        return self._sources
546
547    @property
548    def stopped(self) -> bool:
549        return self._stopped or False
550
551    @property
552    def table(self) -> Table:
553        if self.__table:
554            return self._session.resource("dynamodb").Table(self.__table)
555        raise RuntimeError(f"App {self.app} does not have tableAccess")
556
557    @property
558    def targets(self) -> frozenset[Edge]:
559        return self._targets
560
561    @property
562    def tenant(self) -> str:
563        return self.__tenant
564
565    @property
566    def timeout(self) -> float:
567        return self.__timeout
568
569    @timeout.setter
570    def timeout(self, timeout: float) -> None:
571        self.__timeout = timeout or 0.1

Base level node class. Used by threading and asyncio modules to interact with echostream nodes.

Node( *, appsync_endpoint: str = None, bulk_data_acceleration: bool = False, client_id: str = None, name: str = None, password: str = None, tenant: str = None, timeout: float = None, user_pool_id: str = None, username: str = None)
376    def __init__(
377        self,
378        *,
379        appsync_endpoint: str = None,
380        bulk_data_acceleration: bool = False,
381        client_id: str = None,
382        name: str = None,
383        password: str = None,
384        tenant: str = None,
385        timeout: float = None,
386        user_pool_id: str = None,
387        username: str = None,
388    ) -> None:
389        super().__init__()
390        self.__cognito = Cognito(
391            client_id=client_id or environ["CLIENT_ID"],
392            user_pool_id=user_pool_id or environ["USER_POOL_ID"],
393            username=username or environ["USER_NAME"],
394        )
395        self.__cognito.authenticate(password=password or environ["PASSWORD"])
396        name = name or environ["NODE"]
397        tenant = tenant or environ["TENANT"]
398        with GqlClient(
399            fetch_schema_from_transport=True,
400            transport=RequestsHTTPTransport(
401                auth=RequestsSrpAuth(cognito=self.__cognito, http_header_prefix=""),
402                url=appsync_endpoint or environ["APPSYNC_ENDPOINT"],
403            ),
404        ) as session:
405            data: dict[str, Union[str, dict]] = session.execute(
406                _GET_APP_GQL,
407                variable_values=dict(name=name, tenant=tenant),
408            )["GetNode"]
409        self.__app = data["app"]["name"]
410        self.__app_type = data["app"]["__typename"]
411        self.__audit_records_endpoint = data["app"]["auditRecordsEndpoint"]
412        self.__bulk_data_acceleration = bulk_data_acceleration
413        self.__config: dict[str, Any] = None
414        self.__name = name
415        self.__node_type = data["__typename"]
416        self.__session = Session(
417            botocore_session=AppSession(
418                app=self.__app, cognito=self.__cognito, tenant=tenant
419            ),
420            region_name=data["tenant"]["region"],
421        )
422        self.__sources: frozenset[Edge] = None
423        self.__sqs_client: SQSClient = (
424            Session(region_name=data["tenant"]["region"])
425            if self.__app_type == "CrossAccountApp"
426            else self.__session
427        ).client(
428            "sqs",
429            config=Config(
430                max_pool_connections=min(20, ((cpu_count() or 1) + 4) * 2),
431                retries={"mode": "standard"},
432            ),
433        )
434        self.__table: str = (
435            data["tenant"]["table"] if data["app"].get("tableAccess") else None
436        )
437        self.__targets: frozenset[Edge] = None
438        self.__tenant = tenant
439        self.__timeout = timeout or 0.1
440        self._audit = False
441        self._receive_message_type: MessageType = None
442        self._send_message_type: MessageType = None
443        self._stopped = False
app: str
477    @property
478    def app(self) -> str:
479        return self.__app
app_type: str
481    @property
482    def app_type(self) -> str:
483        return self.__app_type
audit: bool
485    @property
486    def audit(self) -> bool:
487        return self._audit
bulk_data_acceleration: bool
489    @property
490    def bulk_data_acceleration(self) -> bool:
491        return self.__bulk_data_acceleration
config: dict[str, typing.Any]
493    @property
494    def config(self) -> dict[str, Any]:
495        return self.__config
def create_message( self, /, body: str, *, group_id: str = None, previous_tracking_ids: Union[list[str], str] = None, tracking_id: str = None) -> Message:
501    def create_message(
502        self,
503        /,
504        body: str,
505        *,
506        group_id: str = None,
507        previous_tracking_ids: Union[list[str], str] = None,
508        tracking_id: str = None,
509    ) -> Message:
510        """
511        Creates message as per the message standard of echostream.
512
513        Arguments:
514        body - [POSITIONAL ARGUMENT] content of the message
515        group_id - [KEYWORD ARGUMENT] group id
516        previous_tracking_ids - [KEYWORD ARGUMENT] previous tracking id of the message if available
517        tracking_id - [KEYWORD ARGUMENT] tracking id of the message if available
518        """
519        return Message(
520            body=body,
521            group_id=group_id,
522            message_type=self.send_message_type,
523            previous_tracking_ids=previous_tracking_ids,
524            tracking_id=tracking_id,
525        )

Creates message as per the message standard of echostream.

Arguments: body - [POSITIONAL ARGUMENT] content of the message group_id - [KEYWORD ARGUMENT] group id previous_tracking_ids - [KEYWORD ARGUMENT] previous tracking id of the message if available tracking_id - [KEYWORD ARGUMENT] tracking id of the message if available

name: str
527    @property
528    def name(self) -> str:
529        return self.__name
node_type: str
531    @property
532    def node_type(self) -> str:
533        return self.__node_type
receive_message_type: MessageType
535    @property
536    def receive_message_type(self) -> MessageType:
537        return self._receive_message_type
send_message_type: MessageType
539    @property
540    def send_message_type(self) -> MessageType:
541        return self._send_message_type
sources: frozenset[Edge]
543    @property
544    def sources(self) -> frozenset[Edge]:
545        return self._sources
stopped: bool
547    @property
548    def stopped(self) -> bool:
549        return self._stopped or False
table: object
551    @property
552    def table(self) -> Table:
553        if self.__table:
554            return self._session.resource("dynamodb").Table(self.__table)
555        raise RuntimeError(f"App {self.app} does not have tableAccess")
targets: frozenset[Edge]
557    @property
558    def targets(self) -> frozenset[Edge]:
559        return self._targets
tenant: str
561    @property
562    def tenant(self) -> str:
563        return self.__tenant
timeout: float
565    @property
566    def timeout(self) -> float:
567        return self.__timeout
@dataclass(frozen=True)
class PresignedPost:
574@dataclass(frozen=True)
575class PresignedPost:
576    """
577    PresignedPost objects are part of the Bulk Data Storage system
578    and are used to POST bulk data.
579    """
580
581    fields: dict[str, str]
582    """The fields required to be sent when POSTing bulk data"""
583    url: str
584    """The POST url used to POST bulk data"""

PresignedPost objects are part of the Bulk Data Storage system and are used to POST bulk data.

PresignedPost(fields: dict[str, str], url: str)
fields: dict[str, str]

The fields required to be sent when POSTing bulk data

url: str

The POST url used to POST bulk data

@dataclass(frozen=True)
class PresignedPut:
587@dataclass(frozen=True)
588class PresignedPut:
589    """
590    PresignedPut objects are part of the Bulk Data Storage system
591    and are used to PUT bulk data.
592    """
593
594    headers: dict[str, str]
595    """The headers required to be sent when PUTing bulk data"""
596    url: str
597    """The PUT url used to PUT bulk data"""

PresignedPut objects are part of the Bulk Data Storage system and are used to PUT bulk data.

PresignedPut(headers: dict[str, str], url: str)
headers: dict[str, str]

The headers required to be sent when PUTing bulk data

url: str

The PUT url used to PUT bulk data