echostream_node
1from __future__ import annotations 2 3import logging 4from abc import ABC 5from dataclasses import dataclass 6from os import cpu_count, environ 7from time import time 8from typing import TYPE_CHECKING, Any, Callable, Union 9from uuid import uuid4 10 11import awsserviceendpoints 12import simplejson as json 13from boto3.session import Session 14from botocore.config import Config 15from echostream_botocore import AppSession 16from gql import Client as GqlClient 17from gql import gql 18from gql.transport.requests import RequestsHTTPTransport 19from pycognito import Cognito 20from pycognito.utils import RequestsSrpAuth 21 22 23def getLogger() -> logging.Logger: 24 """ 25 Returns "echostream-node" logger 26 """ 27 return logging.getLogger("echostream-node") 28 29 30getLogger().addHandler(logging.NullHandler()) 31 32if TYPE_CHECKING: 33 from mypy_boto3_dynamodb.service_resource import Table 34 from mypy_boto3_sqs.client import SQSClient 35 from mypy_boto3_sqs.type_defs import MessageAttributeValueTypeDef 36else: 37 MessageAttributeValueTypeDef = dict 38 SQSClient = object 39 Table = object 40 41_GET_APP_GQL = gql( 42 """ 43 query getNode($name: String!, $tenant: String!) { 44 GetNode(name: $name, tenant: $tenant) { 45 __typename 46 ... on AppChangeReceiverNode { 47 app { 48 __typename 49 ... on CrossAccountApp { 50 auditRecordsEndpoint 51 name 52 tableAccess 53 } 54 ... on ExternalApp { 55 auditRecordsEndpoint 56 name 57 tableAccess 58 } 59 ... on ManagedApp { 60 auditRecordsEndpoint 61 name 62 tableAccess 63 } 64 } 65 } 66 ... on ExternalNode { 67 app { 68 __typename 69 ... on CrossAccountApp { 70 auditRecordsEndpoint 71 name 72 tableAccess 73 } 74 ... on ExternalApp { 75 auditRecordsEndpoint 76 name 77 tableAccess 78 } 79 } 80 } 81 ... on ManagedNode { 82 app { 83 __typename 84 auditRecordsEndpoint 85 name 86 tableAccess 87 } 88 } 89 tenant { 90 region 91 table 92 } 93 } 94 } 95 """ 96) 97 98_GET_BULK_DATA_STORAGE_GQL = gql( 99 """ 100 query getBulkDataStorage($tenant: String!, $useAccelerationEndpoint: Boolean!) { 101 GetBulkDataStorage(tenant: $tenant, count: 20, useAccelerationEndpoint: $useAccelerationEndpoint) { 102 expiration 103 presignedGet 104 presignedPost { 105 fields 106 url 107 } 108 presignedPut { 109 headers 110 url 111 } 112 } 113 } 114 """ 115) 116 117_GET_NODE_GQL = gql( 118 """ 119 query getNode($name: String!, $tenant: String!) { 120 GetNode(name: $name, tenant: $tenant) { 121 ... on AppChangeReceiverNode { 122 receiveEdge { 123 queue 124 source { 125 name 126 } 127 } 128 receiveMessageType { 129 auditor 130 name 131 } 132 } 133 ... on ExternalNode { 134 app { 135 ... on CrossAccountApp { 136 config 137 } 138 ... on ExternalApp { 139 config 140 } 141 } 142 config 143 receiveEdges { 144 queue 145 source { 146 name 147 } 148 } 149 receiveMessageType { 150 auditor 151 name 152 } 153 sendEdges { 154 queue 155 target { 156 name 157 } 158 } 159 sendMessageType { 160 auditor 161 name 162 } 163 stopped 164 } 165 ... on ManagedNode { 166 app { 167 config 168 } 169 config 170 receiveEdges { 171 queue 172 source { 173 name 174 } 175 } 176 receiveMessageType { 177 auditor 178 name 179 } 180 sendEdges { 181 queue 182 target { 183 name 184 } 185 } 186 sendMessageType { 187 auditor 188 name 189 } 190 stopped 191 } 192 tenant { 193 audit 194 config 195 } 196 } 197 } 198 """ 199) 200 201 202Auditor = Callable[..., dict[str, Any]] 203"""Typing for MessageType auditor functions""" 204 205BatchItemFailures = dict[str, list[dict[str, str]]] 206 207 208@dataclass(frozen=True, init=False) 209class BulkDataStorage: 210 """ 211 Class to manage bulk data storage. 212 """ 213 214 expiration: int 215 """Epoch, in seconds, when this expires""" 216 presigned_get: str 217 """URL that you can HTTP 'GET' to retrieve the bulk data""" 218 presigned_post: PresignedPost 219 """URL that you can HTTP 'POST' bulk data to, along with the fields the 'POST' requires""" 220 presigned_put: PresignedPut 221 """URL that you can HTTP 'PUT' bulk data to, along with the headers the 'PUT' requires""" 222 223 def __init__(self, bulk_data_storage: dict[str, Union[str, PresignedPost]]) -> None: 224 super().__init__() 225 super().__setattr__("expiration", bulk_data_storage["expiration"]) 226 super().__setattr__("presigned_get", bulk_data_storage["presignedGet"]) 227 super().__setattr__( 228 "presigned_post", 229 PresignedPost( 230 fields=json.loads(bulk_data_storage["presignedPost"]["fields"]), 231 url=bulk_data_storage["presignedPost"]["url"], 232 ), 233 ) 234 super().__setattr__( 235 "presigned_put", 236 PresignedPut( 237 headers=json.loads(bulk_data_storage["presignedPut"]["headers"]), 238 url=bulk_data_storage["presignedPut"]["url"], 239 ), 240 ) 241 242 @property 243 def expired(self) -> bool: 244 """ 245 Returns False if presigned_post is expired. 246 """ 247 return self.expiration < time() 248 249 250@dataclass(frozen=True) 251class Edge: 252 """ 253 Edge dataclass to manage edge details. 254 """ 255 256 name: str 257 """The name of the Edge, normally the other end of the Edge""" 258 queue: str 259 """The SQS Queue URL of the Edge""" 260 261 262LambdaEvent = Union[bool, dict, float, int, list, str, tuple, None] 263"""Typing for the various types that a Lambda can be invoked with""" 264 265LambdaSqsRecords = list[ 266 dict[ 267 str, 268 Union[ 269 str, 270 dict[str, str], 271 dict[ 272 str, 273 dict[str, dict[str, Union[str, bytes, list[str], list[bytes]]]], 274 ], 275 ], 276 ] 277] 278 279 280@dataclass(frozen=True, init=False) 281class Message: 282 """ 283 Message dataclass to manage message attributes and properties 284 """ 285 286 body: str 287 """The body""" 288 group_id: str 289 """The SQS group id""" 290 length: int 291 """The length, as SQS calculates it""" 292 message_attributes: dict[str, MessageAttributeValueTypeDef] 293 """The user-defined attributes""" 294 message_type: MessageType 295 """The EchoStream message type""" 296 tracking_id: str 297 """The tracking id""" 298 previous_tracking_ids: list[str] 299 """A list of previous tracking ids. Populated if the original message was split""" 300 301 def __init__( 302 self, 303 body: str, 304 message_type: MessageType, 305 group_id: str = None, 306 previous_tracking_ids: Union[list[str], str] = None, 307 tracking_id: str = None, 308 ) -> None: 309 super().__init__() 310 super().__setattr__("body", body) 311 super().__setattr__("group_id", group_id) 312 super().__setattr__("message_type", message_type) 313 super().__setattr__("tracking_id", tracking_id or uuid4().hex) 314 if isinstance(previous_tracking_ids, str): 315 previous_tracking_ids = json.loads(previous_tracking_ids) 316 super().__setattr__( 317 "previous_tracking_ids", 318 previous_tracking_ids if previous_tracking_ids else None, 319 ) 320 message_attributes = dict( 321 trackingId=MessageAttributeValueTypeDef( 322 DataType="String", StringValue=self.tracking_id 323 ) 324 ) 325 if self.previous_tracking_ids: 326 message_attributes["prevTrackingIds"] = MessageAttributeValueTypeDef( 327 DataType="String", 328 StringValue=json.dumps( 329 self.previous_tracking_ids, separators=(",", ":") 330 ), 331 ) 332 super().__setattr__("message_attributes", message_attributes) 333 length = len(self.body) 334 for name, attribute in self.message_attributes.items(): 335 value = attribute[ 336 ( 337 "StringValue" 338 if (data_type := attribute["DataType"]) in ("String", "Number") 339 else "BinaryValue" 340 ) 341 ] 342 length += len(name) + len(data_type) + len(value) 343 if length > 262144: 344 raise ValueError(f"Message is > 262,144 in size") 345 super().__setattr__("length", length) 346 347 def __len__(self) -> int: 348 return self.length 349 350 def _sqs_message(self, node: Node) -> dict: 351 return dict( 352 MessageAttributes=self.message_attributes, 353 MessageBody=self.body, 354 MessageGroupId=self.group_id or node.name.replace(" ", "_"), 355 ) 356 357 358@dataclass(frozen=True) 359class MessageType: 360 """ 361 Dataclass for messagetype 362 """ 363 364 auditor: Auditor 365 """The auditor""" 366 name: str 367 """The name""" 368 369 370class Node(ABC): 371 """ 372 Base level node class. Used by threading and asyncio modules to interact with echostream nodes. 373 """ 374 375 def __init__( 376 self, 377 *, 378 appsync_endpoint: str = None, 379 bulk_data_acceleration: bool = False, 380 client_id: str = None, 381 name: str = None, 382 password: str = None, 383 tenant: str = None, 384 timeout: float = None, 385 user_pool_id: str = None, 386 username: str = None, 387 ) -> None: 388 super().__init__() 389 self.__cognito = Cognito( 390 client_id=client_id or environ["CLIENT_ID"], 391 user_pool_id=user_pool_id or environ["USER_POOL_ID"], 392 username=username or environ["USER_NAME"], 393 ) 394 self.__cognito.authenticate(password=password or environ["PASSWORD"]) 395 name = name or environ["NODE"] 396 tenant = tenant or environ["TENANT"] 397 with GqlClient( 398 fetch_schema_from_transport=True, 399 transport=RequestsHTTPTransport( 400 auth=RequestsSrpAuth(cognito=self.__cognito, http_header_prefix=""), 401 url=appsync_endpoint or environ["APPSYNC_ENDPOINT"], 402 ), 403 ) as session: 404 data: dict[str, Union[str, dict]] = session.execute( 405 _GET_APP_GQL, 406 variable_values=dict(name=name, tenant=tenant), 407 )["GetNode"] 408 self.__app = data["app"]["name"] 409 self.__app_type = data["app"]["__typename"] 410 self.__audit_records_endpoint = data["app"]["auditRecordsEndpoint"] 411 self.__bulk_data_acceleration = bulk_data_acceleration 412 self.__config: dict[str, Any] = None 413 self.__name = name 414 self.__node_type = data["__typename"] 415 self.__session = Session( 416 botocore_session=AppSession( 417 app=self.__app, cognito=self.__cognito, tenant=tenant 418 ), 419 region_name=data["tenant"]["region"], 420 ) 421 self.__sources: frozenset[Edge] = None 422 self.__sqs_client: SQSClient = ( 423 Session(region_name=data["tenant"]["region"]) 424 if self.__app_type == "CrossAccountApp" 425 else self.__session 426 ).client( 427 "sqs", 428 config=Config( 429 max_pool_connections=min(20, ((cpu_count() or 1) + 4) * 2), 430 retries={"mode": "standard"}, 431 ), 432 ) 433 self.__table: str = ( 434 data["tenant"]["table"] if data["app"].get("tableAccess") else None 435 ) 436 self.__targets: frozenset[Edge] = None 437 self.__tenant = tenant 438 self.__timeout = timeout or 0.1 439 self._audit = False 440 self._receive_message_type: MessageType = None 441 self._send_message_type: MessageType = None 442 self._stopped = False 443 444 @property 445 def _audit_records_endpoint(self) -> str: 446 return self.__audit_records_endpoint 447 448 @property 449 def _cognito(self) -> Cognito: 450 return self.__cognito 451 452 @property 453 def _session(self) -> Session: 454 return self.__session 455 456 @property 457 def _sources(self) -> frozenset[Edge]: 458 return self.__sources 459 460 @_sources.setter 461 def _sources(self, sources: set[Edge]) -> None: 462 self.__sources = frozenset(sources) 463 464 @property 465 def _sqs_client(self) -> SQSClient: 466 return self.__sqs_client 467 468 @property 469 def _targets(self) -> frozenset[Edge]: 470 return self.__targets 471 472 @_targets.setter 473 def _targets(self, targets: set[Edge]) -> None: 474 self.__targets = frozenset(targets) 475 476 @property 477 def app(self) -> str: 478 return self.__app 479 480 @property 481 def app_type(self) -> str: 482 return self.__app_type 483 484 @property 485 def audit(self) -> bool: 486 return self._audit 487 488 @property 489 def bulk_data_acceleration(self) -> bool: 490 return self.__bulk_data_acceleration 491 492 @property 493 def config(self) -> dict[str, Any]: 494 return self.__config 495 496 @config.setter 497 def config(self, config: dict[str, Any]) -> None: 498 self.__config = config 499 500 def create_message( 501 self, 502 /, 503 body: str, 504 *, 505 group_id: str = None, 506 previous_tracking_ids: Union[list[str], str] = None, 507 tracking_id: str = None, 508 ) -> Message: 509 """ 510 Creates message as per the message standard of echostream. 511 512 Arguments: 513 body - [POSITIONAL ARGUMENT] content of the message 514 group_id - [KEYWORD ARGUMENT] group id 515 previous_tracking_ids - [KEYWORD ARGUMENT] previous tracking id of the message if available 516 tracking_id - [KEYWORD ARGUMENT] tracking id of the message if available 517 """ 518 return Message( 519 body=body, 520 group_id=group_id, 521 message_type=self.send_message_type, 522 previous_tracking_ids=previous_tracking_ids, 523 tracking_id=tracking_id, 524 ) 525 526 @property 527 def name(self) -> str: 528 return self.__name 529 530 @property 531 def node_type(self) -> str: 532 return self.__node_type 533 534 @property 535 def receive_message_type(self) -> MessageType: 536 return self._receive_message_type 537 538 @property 539 def send_message_type(self) -> MessageType: 540 return self._send_message_type 541 542 @property 543 def sources(self) -> frozenset[Edge]: 544 return self._sources 545 546 @property 547 def stopped(self) -> bool: 548 return self._stopped or False 549 550 @property 551 def table(self) -> Table: 552 if self.__table: 553 return self._session.resource("dynamodb").Table(self.__table) 554 raise RuntimeError(f"App {self.app} does not have tableAccess") 555 556 @property 557 def targets(self) -> frozenset[Edge]: 558 return self._targets 559 560 @property 561 def tenant(self) -> str: 562 return self.__tenant 563 564 @property 565 def timeout(self) -> float: 566 return self.__timeout 567 568 @timeout.setter 569 def timeout(self, timeout: float) -> None: 570 self.__timeout = timeout or 0.1 571 572 573@dataclass(frozen=True) 574class PresignedPost: 575 """ 576 PresignedPost objects are part of the Bulk Data Storage system 577 and are used to POST bulk data. 578 """ 579 580 fields: dict[str, str] 581 """The fields required to be sent when POSTing bulk data""" 582 url: str 583 """The POST url used to POST bulk data""" 584 585 586@dataclass(frozen=True) 587class PresignedPut: 588 """ 589 PresignedPut objects are part of the Bulk Data Storage system 590 and are used to PUT bulk data. 591 """ 592 593 headers: dict[str, str] 594 """The headers required to be sent when PUTing bulk data""" 595 url: str 596 """The PUT url used to PUT bulk data"""
24def getLogger() -> logging.Logger: 25 """ 26 Returns "echostream-node" logger 27 """ 28 return logging.getLogger("echostream-node")
Returns "echostream-node" logger
Typing for MessageType auditor functions
209@dataclass(frozen=True, init=False) 210class BulkDataStorage: 211 """ 212 Class to manage bulk data storage. 213 """ 214 215 expiration: int 216 """Epoch, in seconds, when this expires""" 217 presigned_get: str 218 """URL that you can HTTP 'GET' to retrieve the bulk data""" 219 presigned_post: PresignedPost 220 """URL that you can HTTP 'POST' bulk data to, along with the fields the 'POST' requires""" 221 presigned_put: PresignedPut 222 """URL that you can HTTP 'PUT' bulk data to, along with the headers the 'PUT' requires""" 223 224 def __init__(self, bulk_data_storage: dict[str, Union[str, PresignedPost]]) -> None: 225 super().__init__() 226 super().__setattr__("expiration", bulk_data_storage["expiration"]) 227 super().__setattr__("presigned_get", bulk_data_storage["presignedGet"]) 228 super().__setattr__( 229 "presigned_post", 230 PresignedPost( 231 fields=json.loads(bulk_data_storage["presignedPost"]["fields"]), 232 url=bulk_data_storage["presignedPost"]["url"], 233 ), 234 ) 235 super().__setattr__( 236 "presigned_put", 237 PresignedPut( 238 headers=json.loads(bulk_data_storage["presignedPut"]["headers"]), 239 url=bulk_data_storage["presignedPut"]["url"], 240 ), 241 ) 242 243 @property 244 def expired(self) -> bool: 245 """ 246 Returns False if presigned_post is expired. 247 """ 248 return self.expiration < time()
Class to manage bulk data storage.
224 def __init__(self, bulk_data_storage: dict[str, Union[str, PresignedPost]]) -> None: 225 super().__init__() 226 super().__setattr__("expiration", bulk_data_storage["expiration"]) 227 super().__setattr__("presigned_get", bulk_data_storage["presignedGet"]) 228 super().__setattr__( 229 "presigned_post", 230 PresignedPost( 231 fields=json.loads(bulk_data_storage["presignedPost"]["fields"]), 232 url=bulk_data_storage["presignedPost"]["url"], 233 ), 234 ) 235 super().__setattr__( 236 "presigned_put", 237 PresignedPut( 238 headers=json.loads(bulk_data_storage["presignedPut"]["headers"]), 239 url=bulk_data_storage["presignedPut"]["url"], 240 ), 241 )
URL that you can HTTP 'POST' bulk data to, along with the fields the 'POST' requires
URL that you can HTTP 'PUT' bulk data to, along with the headers the 'PUT' requires
251@dataclass(frozen=True) 252class Edge: 253 """ 254 Edge dataclass to manage edge details. 255 """ 256 257 name: str 258 """The name of the Edge, normally the other end of the Edge""" 259 queue: str 260 """The SQS Queue URL of the Edge"""
Edge dataclass to manage edge details.
Typing for the various types that a Lambda can be invoked with
281@dataclass(frozen=True, init=False) 282class Message: 283 """ 284 Message dataclass to manage message attributes and properties 285 """ 286 287 body: str 288 """The body""" 289 group_id: str 290 """The SQS group id""" 291 length: int 292 """The length, as SQS calculates it""" 293 message_attributes: dict[str, MessageAttributeValueTypeDef] 294 """The user-defined attributes""" 295 message_type: MessageType 296 """The EchoStream message type""" 297 tracking_id: str 298 """The tracking id""" 299 previous_tracking_ids: list[str] 300 """A list of previous tracking ids. Populated if the original message was split""" 301 302 def __init__( 303 self, 304 body: str, 305 message_type: MessageType, 306 group_id: str = None, 307 previous_tracking_ids: Union[list[str], str] = None, 308 tracking_id: str = None, 309 ) -> None: 310 super().__init__() 311 super().__setattr__("body", body) 312 super().__setattr__("group_id", group_id) 313 super().__setattr__("message_type", message_type) 314 super().__setattr__("tracking_id", tracking_id or uuid4().hex) 315 if isinstance(previous_tracking_ids, str): 316 previous_tracking_ids = json.loads(previous_tracking_ids) 317 super().__setattr__( 318 "previous_tracking_ids", 319 previous_tracking_ids if previous_tracking_ids else None, 320 ) 321 message_attributes = dict( 322 trackingId=MessageAttributeValueTypeDef( 323 DataType="String", StringValue=self.tracking_id 324 ) 325 ) 326 if self.previous_tracking_ids: 327 message_attributes["prevTrackingIds"] = MessageAttributeValueTypeDef( 328 DataType="String", 329 StringValue=json.dumps( 330 self.previous_tracking_ids, separators=(",", ":") 331 ), 332 ) 333 super().__setattr__("message_attributes", message_attributes) 334 length = len(self.body) 335 for name, attribute in self.message_attributes.items(): 336 value = attribute[ 337 ( 338 "StringValue" 339 if (data_type := attribute["DataType"]) in ("String", "Number") 340 else "BinaryValue" 341 ) 342 ] 343 length += len(name) + len(data_type) + len(value) 344 if length > 262144: 345 raise ValueError(f"Message is > 262,144 in size") 346 super().__setattr__("length", length) 347 348 def __len__(self) -> int: 349 return self.length 350 351 def _sqs_message(self, node: Node) -> dict: 352 return dict( 353 MessageAttributes=self.message_attributes, 354 MessageBody=self.body, 355 MessageGroupId=self.group_id or node.name.replace(" ", "_"), 356 )
Message dataclass to manage message attributes and properties
302 def __init__( 303 self, 304 body: str, 305 message_type: MessageType, 306 group_id: str = None, 307 previous_tracking_ids: Union[list[str], str] = None, 308 tracking_id: str = None, 309 ) -> None: 310 super().__init__() 311 super().__setattr__("body", body) 312 super().__setattr__("group_id", group_id) 313 super().__setattr__("message_type", message_type) 314 super().__setattr__("tracking_id", tracking_id or uuid4().hex) 315 if isinstance(previous_tracking_ids, str): 316 previous_tracking_ids = json.loads(previous_tracking_ids) 317 super().__setattr__( 318 "previous_tracking_ids", 319 previous_tracking_ids if previous_tracking_ids else None, 320 ) 321 message_attributes = dict( 322 trackingId=MessageAttributeValueTypeDef( 323 DataType="String", StringValue=self.tracking_id 324 ) 325 ) 326 if self.previous_tracking_ids: 327 message_attributes["prevTrackingIds"] = MessageAttributeValueTypeDef( 328 DataType="String", 329 StringValue=json.dumps( 330 self.previous_tracking_ids, separators=(",", ":") 331 ), 332 ) 333 super().__setattr__("message_attributes", message_attributes) 334 length = len(self.body) 335 for name, attribute in self.message_attributes.items(): 336 value = attribute[ 337 ( 338 "StringValue" 339 if (data_type := attribute["DataType"]) in ("String", "Number") 340 else "BinaryValue" 341 ) 342 ] 343 length += len(name) + len(data_type) + len(value) 344 if length > 262144: 345 raise ValueError(f"Message is > 262,144 in size") 346 super().__setattr__("length", length)
359@dataclass(frozen=True) 360class MessageType: 361 """ 362 Dataclass for messagetype 363 """ 364 365 auditor: Auditor 366 """The auditor""" 367 name: str 368 """The name"""
Dataclass for messagetype
371class Node(ABC): 372 """ 373 Base level node class. Used by threading and asyncio modules to interact with echostream nodes. 374 """ 375 376 def __init__( 377 self, 378 *, 379 appsync_endpoint: str = None, 380 bulk_data_acceleration: bool = False, 381 client_id: str = None, 382 name: str = None, 383 password: str = None, 384 tenant: str = None, 385 timeout: float = None, 386 user_pool_id: str = None, 387 username: str = None, 388 ) -> None: 389 super().__init__() 390 self.__cognito = Cognito( 391 client_id=client_id or environ["CLIENT_ID"], 392 user_pool_id=user_pool_id or environ["USER_POOL_ID"], 393 username=username or environ["USER_NAME"], 394 ) 395 self.__cognito.authenticate(password=password or environ["PASSWORD"]) 396 name = name or environ["NODE"] 397 tenant = tenant or environ["TENANT"] 398 with GqlClient( 399 fetch_schema_from_transport=True, 400 transport=RequestsHTTPTransport( 401 auth=RequestsSrpAuth(cognito=self.__cognito, http_header_prefix=""), 402 url=appsync_endpoint or environ["APPSYNC_ENDPOINT"], 403 ), 404 ) as session: 405 data: dict[str, Union[str, dict]] = session.execute( 406 _GET_APP_GQL, 407 variable_values=dict(name=name, tenant=tenant), 408 )["GetNode"] 409 self.__app = data["app"]["name"] 410 self.__app_type = data["app"]["__typename"] 411 self.__audit_records_endpoint = data["app"]["auditRecordsEndpoint"] 412 self.__bulk_data_acceleration = bulk_data_acceleration 413 self.__config: dict[str, Any] = None 414 self.__name = name 415 self.__node_type = data["__typename"] 416 self.__session = Session( 417 botocore_session=AppSession( 418 app=self.__app, cognito=self.__cognito, tenant=tenant 419 ), 420 region_name=data["tenant"]["region"], 421 ) 422 self.__sources: frozenset[Edge] = None 423 self.__sqs_client: SQSClient = ( 424 Session(region_name=data["tenant"]["region"]) 425 if self.__app_type == "CrossAccountApp" 426 else self.__session 427 ).client( 428 "sqs", 429 config=Config( 430 max_pool_connections=min(20, ((cpu_count() or 1) + 4) * 2), 431 retries={"mode": "standard"}, 432 ), 433 ) 434 self.__table: str = ( 435 data["tenant"]["table"] if data["app"].get("tableAccess") else None 436 ) 437 self.__targets: frozenset[Edge] = None 438 self.__tenant = tenant 439 self.__timeout = timeout or 0.1 440 self._audit = False 441 self._receive_message_type: MessageType = None 442 self._send_message_type: MessageType = None 443 self._stopped = False 444 445 @property 446 def _audit_records_endpoint(self) -> str: 447 return self.__audit_records_endpoint 448 449 @property 450 def _cognito(self) -> Cognito: 451 return self.__cognito 452 453 @property 454 def _session(self) -> Session: 455 return self.__session 456 457 @property 458 def _sources(self) -> frozenset[Edge]: 459 return self.__sources 460 461 @_sources.setter 462 def _sources(self, sources: set[Edge]) -> None: 463 self.__sources = frozenset(sources) 464 465 @property 466 def _sqs_client(self) -> SQSClient: 467 return self.__sqs_client 468 469 @property 470 def _targets(self) -> frozenset[Edge]: 471 return self.__targets 472 473 @_targets.setter 474 def _targets(self, targets: set[Edge]) -> None: 475 self.__targets = frozenset(targets) 476 477 @property 478 def app(self) -> str: 479 return self.__app 480 481 @property 482 def app_type(self) -> str: 483 return self.__app_type 484 485 @property 486 def audit(self) -> bool: 487 return self._audit 488 489 @property 490 def bulk_data_acceleration(self) -> bool: 491 return self.__bulk_data_acceleration 492 493 @property 494 def config(self) -> dict[str, Any]: 495 return self.__config 496 497 @config.setter 498 def config(self, config: dict[str, Any]) -> None: 499 self.__config = config 500 501 def create_message( 502 self, 503 /, 504 body: str, 505 *, 506 group_id: str = None, 507 previous_tracking_ids: Union[list[str], str] = None, 508 tracking_id: str = None, 509 ) -> Message: 510 """ 511 Creates message as per the message standard of echostream. 512 513 Arguments: 514 body - [POSITIONAL ARGUMENT] content of the message 515 group_id - [KEYWORD ARGUMENT] group id 516 previous_tracking_ids - [KEYWORD ARGUMENT] previous tracking id of the message if available 517 tracking_id - [KEYWORD ARGUMENT] tracking id of the message if available 518 """ 519 return Message( 520 body=body, 521 group_id=group_id, 522 message_type=self.send_message_type, 523 previous_tracking_ids=previous_tracking_ids, 524 tracking_id=tracking_id, 525 ) 526 527 @property 528 def name(self) -> str: 529 return self.__name 530 531 @property 532 def node_type(self) -> str: 533 return self.__node_type 534 535 @property 536 def receive_message_type(self) -> MessageType: 537 return self._receive_message_type 538 539 @property 540 def send_message_type(self) -> MessageType: 541 return self._send_message_type 542 543 @property 544 def sources(self) -> frozenset[Edge]: 545 return self._sources 546 547 @property 548 def stopped(self) -> bool: 549 return self._stopped or False 550 551 @property 552 def table(self) -> Table: 553 if self.__table: 554 return self._session.resource("dynamodb").Table(self.__table) 555 raise RuntimeError(f"App {self.app} does not have tableAccess") 556 557 @property 558 def targets(self) -> frozenset[Edge]: 559 return self._targets 560 561 @property 562 def tenant(self) -> str: 563 return self.__tenant 564 565 @property 566 def timeout(self) -> float: 567 return self.__timeout 568 569 @timeout.setter 570 def timeout(self, timeout: float) -> None: 571 self.__timeout = timeout or 0.1
Base level node class. Used by threading and asyncio modules to interact with echostream nodes.
376 def __init__( 377 self, 378 *, 379 appsync_endpoint: str = None, 380 bulk_data_acceleration: bool = False, 381 client_id: str = None, 382 name: str = None, 383 password: str = None, 384 tenant: str = None, 385 timeout: float = None, 386 user_pool_id: str = None, 387 username: str = None, 388 ) -> None: 389 super().__init__() 390 self.__cognito = Cognito( 391 client_id=client_id or environ["CLIENT_ID"], 392 user_pool_id=user_pool_id or environ["USER_POOL_ID"], 393 username=username or environ["USER_NAME"], 394 ) 395 self.__cognito.authenticate(password=password or environ["PASSWORD"]) 396 name = name or environ["NODE"] 397 tenant = tenant or environ["TENANT"] 398 with GqlClient( 399 fetch_schema_from_transport=True, 400 transport=RequestsHTTPTransport( 401 auth=RequestsSrpAuth(cognito=self.__cognito, http_header_prefix=""), 402 url=appsync_endpoint or environ["APPSYNC_ENDPOINT"], 403 ), 404 ) as session: 405 data: dict[str, Union[str, dict]] = session.execute( 406 _GET_APP_GQL, 407 variable_values=dict(name=name, tenant=tenant), 408 )["GetNode"] 409 self.__app = data["app"]["name"] 410 self.__app_type = data["app"]["__typename"] 411 self.__audit_records_endpoint = data["app"]["auditRecordsEndpoint"] 412 self.__bulk_data_acceleration = bulk_data_acceleration 413 self.__config: dict[str, Any] = None 414 self.__name = name 415 self.__node_type = data["__typename"] 416 self.__session = Session( 417 botocore_session=AppSession( 418 app=self.__app, cognito=self.__cognito, tenant=tenant 419 ), 420 region_name=data["tenant"]["region"], 421 ) 422 self.__sources: frozenset[Edge] = None 423 self.__sqs_client: SQSClient = ( 424 Session(region_name=data["tenant"]["region"]) 425 if self.__app_type == "CrossAccountApp" 426 else self.__session 427 ).client( 428 "sqs", 429 config=Config( 430 max_pool_connections=min(20, ((cpu_count() or 1) + 4) * 2), 431 retries={"mode": "standard"}, 432 ), 433 ) 434 self.__table: str = ( 435 data["tenant"]["table"] if data["app"].get("tableAccess") else None 436 ) 437 self.__targets: frozenset[Edge] = None 438 self.__tenant = tenant 439 self.__timeout = timeout or 0.1 440 self._audit = False 441 self._receive_message_type: MessageType = None 442 self._send_message_type: MessageType = None 443 self._stopped = False
501 def create_message( 502 self, 503 /, 504 body: str, 505 *, 506 group_id: str = None, 507 previous_tracking_ids: Union[list[str], str] = None, 508 tracking_id: str = None, 509 ) -> Message: 510 """ 511 Creates message as per the message standard of echostream. 512 513 Arguments: 514 body - [POSITIONAL ARGUMENT] content of the message 515 group_id - [KEYWORD ARGUMENT] group id 516 previous_tracking_ids - [KEYWORD ARGUMENT] previous tracking id of the message if available 517 tracking_id - [KEYWORD ARGUMENT] tracking id of the message if available 518 """ 519 return Message( 520 body=body, 521 group_id=group_id, 522 message_type=self.send_message_type, 523 previous_tracking_ids=previous_tracking_ids, 524 tracking_id=tracking_id, 525 )
Creates message as per the message standard of echostream.
Arguments: body - [POSITIONAL ARGUMENT] content of the message group_id - [KEYWORD ARGUMENT] group id previous_tracking_ids - [KEYWORD ARGUMENT] previous tracking id of the message if available tracking_id - [KEYWORD ARGUMENT] tracking id of the message if available
574@dataclass(frozen=True) 575class PresignedPost: 576 """ 577 PresignedPost objects are part of the Bulk Data Storage system 578 and are used to POST bulk data. 579 """ 580 581 fields: dict[str, str] 582 """The fields required to be sent when POSTing bulk data""" 583 url: str 584 """The POST url used to POST bulk data"""
PresignedPost objects are part of the Bulk Data Storage system and are used to POST bulk data.
587@dataclass(frozen=True) 588class PresignedPut: 589 """ 590 PresignedPut objects are part of the Bulk Data Storage system 591 and are used to PUT bulk data. 592 """ 593 594 headers: dict[str, str] 595 """The headers required to be sent when PUTing bulk data""" 596 url: str 597 """The PUT url used to PUT bulk data"""
PresignedPut objects are part of the Bulk Data Storage system and are used to PUT bulk data.