List Datasets

datasetName / datasetId datasetStatus readConnector writeConnector errorConnector computeEngine manifestFile datasetProgress executionLogs

Create New Datasets

Create New Dataset Using:

Create New Datasets

The new datasets can be created by:

  • JSON: Specifying the Dataset Configuration JSON. This option is useful when copying dataset configuration from existing datasets or by reusing the dataset configuration from the CLI.
  • Form: Filling the Create New Dataset form which walks the user through each step and produces the same dataset configuration JSON that it uses to create the dataset. This option is recommended when getting started with #Let's Data. It has detailed help about each item in the JSON and should help the user gain a good understanding of the concepts.


Dataset Configuration JSON

Datasets and the Dataset Configuration JSON

Overview

Datasets are collection of data tasks grouped together as a logical entity. They can also be called Data jobs, Data tasks that the user needs to run. A dataset will have tasks for the work items in the dataset.
For example, a user may want to map reduce all the files in a S3 bucket to process data. They will define this work - 'map reduce all the files' - as a dataset in #Let'sData. Once this dataset is created, #Let'sData will create a data task for each file in the S3 Bucket (map reduce each file to process data). These data tasks grouped together are collectively called a dataset.
Dataset name uniquely identifies these aggregated data tasks and needs to be unique and alphanumeric (chars such as _ . | etc are not allowed). In the 'map reduce all the files' example above, the dataset can be named "ClickstreamsMapReduceJune2022"

User Configuration

Datasets require a few different pieces of configuration to be able to create and successfully run data tasks. A dataset has the following different configuration components:

  • The Dataset Name: A unique name for the dataset
  • The Region: The AWS region for the dataset. LetsData supported AWS regions: [us-east-1, us-east-2, us-west-2, eu-west-1, ap-south-1, ap-northeast-1]
  • The Read Connector: A read connector configuration defines where the data that needs to be read by the tasks is located and how it would be read.
  • The Write Connector: A write connector configuration defines where the data is going to be written to.
  • The Error Connector: An error connector configuration defines where the error records in the data processing will be archived.
  • The Compute Engine: The compute engine configuration defines the infrastructure that would used to run these data tasks.
  • The Manifest File: A manifest file configuration that defines the read connector data for each individual task - for example, if the read connector is an S3 Bucket, the manifest file specifies what files in the bucket are to be read by the read connector. Each line in the manifest file definition becomes a data task in # Let's Data
  • The Access Grant Role: The ARN of the Access Grant IAM Role - this is the role that grants permissions to read from the read connector destination, write to the write connector and error connector destinations, gives access to the manifest file and any additional artifacts required by the dataset.
  • The Customer Account For Let's Data Resource Access: The aws account id of an account that should be granted access to the resources that this dataset will create. For example, if #Let's Data is creating the write connector's kinesis stream and error connector's S3 bucket, then #Let's Data will give this aws account id access to these resources.

Schema & Example

Here is a representation of a dataset with high level component configuration and an actual example dataset:

    {
        "datasetName": "String",
        "region": "String - us-east-1|us-east-2|us-west-2|eu-west-1|ap-south-1|ap-northeast-1",
        "accessGrantRoleArn": "String",
        "customerAccountForAccess": "String",
        "readConnector": {
            ...
        },
        "writeConnector": {
            ...
        },
        "errorConnector": {
            ...
        },
        "computeEngine": {
            ...
        },
        "manifestFile": {
            ...
        },

        // System / Internal attributes
        "tenantId": "String",
        "userId": "String",
        "datasetId": "String"
        // Status
        "datasetStatus": "CREATED|INITIALIZING|PROCESSING|COMPLETED|ERRORED|REDRIVING|DESCALED|FROZEN|DELETED|UPDATING|FREEZING|DESCALING|DELETING|STOPPING_ERROR|STOPPING_COMPLETE",
        // Progress
        "datasetProgress": {
            "totalTasks": "long",
            "completedTasks": "long",
            "errorTasks": "long",
        },
        // Execution Logs
        "taskExecutionLogs": [
            {
                "startDatetime": "long",
                "endDatetime": "long",
            }
        ],
        // Access
        "customerAccessRoleArn": "String",
        "createDatetime": "long",
        "updateDatetime": "long"
    }
    {
        "datasetName": "IndexWebCrawlDataNov2022",
        "region": "us-east-1",
        "accessGrantRoleArn": "arn:aws:iam::151166716410:role/LetsData_AccessRole_IndexWebCrawlDataNov2022",
        "customerAccountForAccess": "151166716410"
        "readConnector": {
            // connector destination
            "connectorDestination": "S3",
            "bucketName": "commoncrawl",
            // bucket access
            "bucketResourceLocation": "Customer";

            // reader type definition
            "readerType": "MULTIPLEFILESTATEMACHINEREADER"
            "singleFileStateMachineParserImplementationClassNameMap": {
                "WARC": "com.resonance.saas.data.commoncrawl.parser.WARCFileParser",
                "WAT": "com.resonance.saas.data.commoncrawl.parser.WATFileParser",
                "WET": "com.resonance.saas.data.commoncrawl.parser.WETFileParser",
            },
            "multipleFileStateMachineReaderClassName": "com.resonance.saas.data.commoncrawl.reader.CommonCrawlReader",

            // implementation jar file artifact
            "artifactImplementationLanguage": "JAVA",
            "artifactFileS3Link": "s3://resonancecommoncrawl-jar/resonance-saas-data-common-crawl-1.0-SNAPSHOT.jar",
            // implementation jar file artifact access
            "artifactFileS3LinkResourceLocation": "Customer"
        },
        "writeConnector": {
            "connectorDestination": "KINESIS",
            "kinesisStreamName": "tldwc4c5854fa92dba54ba706459d3b0440dc",
            "resourceLocation": "LetsData"
            "kinesisShardCount": 15
        },
        "errorConnector": {
            "connectorDestination": "S3",
            "bucketName": "tldec4c5854fa92dba54ba706459d3b0440dc",
            "errorConnectorResourceLocation": "LetsData"
        },
        "computeEngine": {
            "computeEngineType": "LAMBDA",
            "concurrency": 15,
            "memoryLimitInMegabytes": 10240,
            "timeoutInSeconds": 300
        },
        "manifestFile": {
            "manifestFileS3Uri": "s3://resonancemanifestfile/resonance_manifest.txt",
            "manifestType": "S3ReaderS3LinkManifestFile",
            "readerType": "MULTIPLEFILESTATEMACHINEREADER",
            "readerManifestResourceLocation": "Customer"
        },
        "datasetStatus": "FROZEN",
        "datasetProgress": {
            "totalTasks": 6,
            "completedTasks": 6,
            "errorTasks": 0
        },
        "executionLogs": [
            {
              "startDatetime": 1678546821208,
              "endDatetime": 1678547579649
            },
            {
              "startDatetime": 1678547819604,
              "endDatetime": 1678548407473
            }
        ],
        "createDatetime": 1678546569567,
        "updateDatetime": 1678548407473
    }
            

Details about the schema and examples for different dataset components, see the following detailed docs:

  • Datasets: https://www.letsdata.io/docs#datasets
  • Read Connectors: https://www.letsdata.io/docs#readconnectors
  • Write Connectors: https://www.letsdata.io/docs#writeconnectors
  • Error Connectors: https://www.letsdata.io/docs#errorconnectors
  • Manifest File: https://www.letsdata.io/docs#readconnectormanifestfile
  • Compute Engine: https://www.letsdata.io/docs#computeengine
  • Access Grants: https://www.letsdata.io/docs#accessgrants
  • Dataset Configuration Form:

    Dataset name

    Dataset Region


    Datasets

    Datasets are collection of data tasks grouped together as a logical entity. They can also be called Data jobs, Data tasks that the user needs to run. A dataset will have tasks for the work items in the dataset.
    For example, a user may want to map reduce all the files in a S3 bucket to process data. They will define this work - 'map reduce all the files' - as a dataset in #Let'sData. Once this dataset is created, #Let'sData will create a data task for each file in the S3 Bucket (map reduce each file to process data). These data tasks grouped together are collectively called a dataset.
    Dataset name uniquely identifies these aggregated data tasks and needs to be unique and alphanumeric (chars such as _ . | etc are not allowed). In the 'map reduce all the files' example above, the dataset can be named "ClickstreamsMapReduceJune2022"

    JSON Element
    This value is saved as the dataset configuration's datasetName attribute

    Dataset Region

    The AWS region for the dataset. For details about how regions effect dataset processing, see Regions documentation.

    JSON Element
    This value is saved as the dataset configuration's region attribute

    Dataset Read Connector

    Read Connector Region

    Read Connector Destination

    Implementation Language


    Read Connectors

    Read connectors are logical connections that are created to read data from read connector destinations. For example, an S3 Read Connector is used to read data from S3.

    Read Connectors implement the #Let's Data's data interface (the user data handlers) and are primarily responsible for making sense of the read records and creating output documents. These output documents are then written to the write connector destination by the write connector.

    The #Let'sData infrastructure simplifies the data processing for the customers. Our promise to the customers is that "Focus on the data - we'll manage the infrastructure". #Let'sData implements a control plane for the data tasks, reads and writes to different destinations, scales the processing infrastructure as needed, builds in monitoring and instrumentation. However, it does not know what the data is, how it is stored in the data files and how to read the data and create transformed documents. The customer needs to implement the user data handlers (letsdata-data-interface) that tell us what makes a data record in the files - we'll then send the records to these data handlers where the user can transform the records into documents that can be sent to the write destination.

    The #Let's Data's data interfaces are defined in the github repo letsdata-data-interface.

    We currently support the following read connectors destinations:

    • AWS S3 Read Connector
    • AWS SQS Read Connector
    • AWS Kinesis Read Connector
    • AWS DynamoDB Streams Read Connector
    • AWS DynamoDB Table Read Connector

    JSON Element
    This value is saved as the dataset configuration's readConnector.connectorDestination attribute

    Read Connector Region

    (Optional) The AWS region for the dataset's readConnector. If not specified, read connector will default to the dataset region. For details about how regions effect dataset processing, see Regions documentation.

    JSON Element
    This value is saved as the dataset configuration's readConnector.region attribute

    Artifact Implementation Language

    The read connector implementation of the #LetsData interface is called the artifact and is implemented in either Java, Python or Javascript. When the implementation is in Java, the artifact is essentially a JAR file with the implementation of the #LetsData interface. For Python and Javascript, the artifact is an ECR Image with the #LetsData interface implementation

    JSON Element
    This value is saved as the dataset configuration's readConnector.artifactImplementationLanguage attribute

    Read Connector Artifact

    Reader Connector Artifact


    Read Connector Artifact

    • artifactFileS3Link: The S3 link of the artifact - the JAR file that implements the required interfaces mentioned above.
    • artifactFileS3LinkResourceLocation: #Let's Data needs to know how to access the artifactFileS3Link. If it is in the customer account, we'll use the IAM Role specified in the dataset's accessGrantRoleArn to access this JAR file. Otherwise, we'll use #Let's Data account to access. You'll need to add access to this JAR file in the IAM role's policy document.

    JSON Element
    These values are saved as the dataset configuration's readConnector.artifactFileS3Link, readConnector.artifactFileS3LinkResourceLocation attributes

    Dataset Write Connector

    Write Connector Region

    Connector Destination


    Dataset Write Connectors

    Write connectors are logical connections that are created to write output docs to write connector destinations. For example, an Kinesis Write Connector is used to write data to Kinesis.

    Write Connectors are implemented by #Let's Data - no coding is required by the customers to write records to each destination. All the customers have to do is to:

    1. Configure a write connector for the dataset by specifying the writeConnector in the dataset configuration JSON
    2. Output the documents from the user data handlers according to the requirements of the write connector destination (For example, SQS documents should be less than 256KiB in size, Dynamo DB Documents must have keys in correct format / size, Kinesis records should be less than 1 MB etc.)

    connectorDestination: The data destination for the connector. This can be S3, SQS or any of the supported write connector destinations.

    We currently support the following write connector destinations:

    • Momento Vector Index Write Connector
    • AWS Kinesis Write Connector
    • AWS Dynamo DB Write Connector
    • AWS S3 Write Connector
    • AWS SQS Write Connector
    • AWS Kafka Write Connector

    JSON Element
    This value is saved as the dataset configuration's writeConnector.connectorDestination attributes

    Write Connector Region

    (Optional) The AWS region for the dataset's writeConnector. If not specified, write connector will default to the dataset region. For details about how regions effect dataset processing, see Regions documentation.

    JSON Element
    This value is saved as the dataset configuration's writeConnector.region attribute

    Dataset Error Connector

    Error Connector Region

    Connector Destination

    Error Connector S3 Bucket Location

    AWS S3 Error Connector Bucket Name

    S3 Error Bucket Name Will Be Auto Generated

    Error Connector Region

    (Optional) The AWS region for the dataset's errorConnector. If not specified, error connector will default to the dataset region. For details about how regions effect dataset processing, see Regions documentation.

    JSON Element
    This value is saved as the dataset configuration's errorConnector.region attribute

    Dataset Error Connectors

    Error connectors are logical connections that are created to write error docs to error connector destinations. For example, an S3 Error Connector is used to write error records to S3.

    Error Connectors are implemented by #Let's Data - no coding is required by the customers to write error to the error destination. All the customers have to do is to:

    • Configure a error connector for the dataset by specifying the errorConnector in the dataset configuration JSON
    • Output the error documents from the user handers in case of errors

    In addition to the user created error documents, #Let's Data can also create error docs for errors encountered during the task processing (For example, these errors could be SQS documents greater than 256KiB in size, Dynamo DB Documents keys in incorrect format / size, Kinesis records greater than 1 MB etc.)

    These error documents are written to the error destination by the error connector. For details around error handling, error document schema and on how to retrieve error docs, look at the Error Handling docs

    We currently only support AWS S3 Error Connector as error connector destination.

    JSON Element
    This value is saved as the dataset configuration's errorConnector.connectorDestination attribute

    S3 Error Connectors: Config

    S3 Error Connector Configuration

    The S3 Error connector requires the following configuration:

    • region: (Optional) The optional error connector region and is set to dataset's region if unspecified. This is the region is where the error destination (S3 Bucket) is located. When the dataset starts processing, it will create clients for this region to connect to these error destinations. Supported Regions: [us-east-1, us-east-2, us-west-2, eu-west-1, ap-south-1, ap-northeast-1]
    • connectorDestination: The data destination for the error connector. Currently only S3 is the supported error destination.
    • resourceLocation: #Let's Data needs to know how to access the error destination. If it is publicly accessible or in the customer account, we'll use the IAM Role specified in the dataset's accessGrantRoleArn to access the error destination. Otherwise, we'll use #Let's Data account to access. You'll need to add access to the error destination in the IAM role's policy document. See 'Access Grants' for details.

        # S3 Error Connector Schema
        "errorConnector": {
            # Configuration common to error connectors
            "region": "String - (Optional) The error connector region - us-east-1|us-east-2|us-west-2|eu-west-1|ap-south-1|ap-northeast-1",
            "connectorDestination": "S3",
            "resourceLocation": "Customer|LetsData",
    
            # S3 error connector configuration
            "bucketName": "String - NULL when resourceLocation == letsdata | Valid S3 bucket name when resourceLocation == customer",
        }
    
        # Example S3 Error Connector Configuration - resourceLocation == letsdata
        "errorConnector": {
    
            # Configuration common to error connectors
            "region": "us-east-1",
            "connectorDestination": "S3",
            "resourceLocation": "LetsData",
        }
    
        # Example S3 Error Connector Configuration - resourceLocation == customer
        "errorConnector": {
    
            # Configuration common to error connectors
            "region": "us-east-1",
            "connectorDestination": "S3",
            "resourceLocation": "Customer",
    
            # S3 error connector configuration
            "bucketName": "letsdata-common-crawl-errors"
        }
    
    • bucketName: String - a valid S3 bucket name. This is required when the resourceLocation is 'customer' as this means data is being written to an existing s3 bucket. When resource is 'letsdata', this needs to be null. #Let's Data will generate a unique S3 bucket name so that it does not collide with other bucket names in the same account.

    JSON Element
    These values form the S3 error connector JSON element in the dataset configuration's
    "errorConnector": {
        "connectorDestination": "S3",
        "resourceLocation": "Customer",
        "bucketName": "letsdata-common-crawl-errors"
    }

    Dataset Compute Engine

    Compute Engine Region

    Dataset Compute Engine Type


    Compute Engine Region

    (Optional) The AWS region for the dataset's computeEngine. If not specified, compute engine will default to the dataset region. For details about how regions effect dataset processing, see Regions documentation.

    JSON Element
    This value is saved as the dataset configuration's computeEngine.region attribute

    Dataset Compute Engine

    Dataset Compute Engine is the compute infrastructure that is used to process the dataset. Lambda and Lambda And Sagemaker are currently supported Compute Engines in #LetsData. We intend to support EC2, ECS and Kubernetes compute engines as well. Here are the currently supported compute engines:

    • AWS Lambda Compute Engine: The user's code is packaged as a Data Task Lambda Function and #LetsData manages the task creation, execution and monitoring.
    • AWS Lambda And Sagemaker Compute Engine: The user's code is packaged as a Data Task Lambda Function and #LetsData manages the task creation, execution and monitoring. Additionally, AI/ ML inferences are run through a Sagemaker endpoint. #LetsData can create and manage these Sagemaker endpoints or re-use existing Sagemaker endpoints.
    • Spark Compute Engine: The Spark Compute Engine runs user's spark code on AWS Lambda and offers infrastructure simplifications such as removing the needs for clusters (no cluster provisioning, no cluster management, no cluster scaling issues). It adds infinte elastic scaling (no need for application scheduling, application run as soon as they are created) and adds a layer of file level progress / task management that is consistent with LetsData datasets. Your spark code will just work out of the box - no jar issues, classpath problems or elaborate session and cluster configurations.

    #LetsData creates the selected compute engines for data processing and manages the infrastructure as needed until the data processing is complete. The user does not need to specify any additional infrastructure details or write any code to manage the compute infrastructure. This is what we promise - "Focus on the data and we'll manage the infrastructure."

    JSON Element
    These values form the S3 error connector JSON element in the dataset configuration's
    "computeEngine": {
        "computeEngineType": "Lambda",
    }

    Access Grants

    Let's Data would need access to the different dataset resources

    Let's Data would need access to the different resources that are needed to process the dataset.

    Resource Locations

    In terms of access, the different resources (S3 Buckets, DynamoDB Tables, SQS Queues etc) that are read from, written to, and managed by #Let's Data can be divided into two groups - 1./ Customer: Resources that are located in external AWS accounts - 2./ LetsData: Resources that located in #Let's Data AWS account

    • Customer: Resources that are not located in #Let's Data AWS account but are used in dataset processing can either be public or access limited by the owner. In these cases, #Let's Data requires that the owner adds #Let's Data to the access lists.
    • Let's Data: Resources that are located in #Let's Data AWS account are managed completely by #Let's Data - we'll grant the customer account access to these resources to read, write and manage them.

    Regardless of the resource location, #Let's Data adheres to the strictest software security principles. The code follows the principal of least privilege, runs in context of the dataset's user and is granted access only to the resources that it needs.

    Managing Access

    • Some resources (such as read connector s3 buckets, artifact file in s3 and manifest file in s3) are read-only to Let's Data, so they need to be public or the customer needs to give their Let's Data IAM User access (More on how to give access later).
    • The resources which are written to by Let's Data, customer can decide whether these would be in customer account or should Let's Data manage these resources. These resources are the error connector S3 bucket, the write connector kinesis stream etc.
    • If these are managed by the customer, then the customer would need to grant access to these resources to their Let's Data IAM User. If these are managed by Let's Data, then we'll be granting the customer's AWS account access to these resources.

    How To?

    See the "What Access Policy do I need?" button to generate a policy document from the dataset configuration in this form. The "How do I Grant Access?" button has details on how to grant access using this policy document.

    Validating Access

    We'd be validating this access as part of the dataset creation and would let you know if there are any access issues.

    JSON Element
    This value is saved as the dataset configuration's accessGrantRoleArn attribute

    Dataset Resources Role ARN

      
    What Access Policy do I need?

    Access Checklist

    Here's a quick checklist that would help us determine what access needs to be granted by the customer and what access Let's Data might need to provision:


    Access Grant Role Policy

    Based on the checklist above, Let's Data would need access the following resources that are needed to process the dataset:

        {
            "Version": "2012-10-17",
            "Statement": [
                ...
            ]
        }
                            
    How do I Grant Access?

    Access Grant Instructions

    Here are the instructions to grant these permissions using the AWS Console or the AWS CLI:

    • Find the User details: We need the following identifiers from the logged in user's data to enable access.

      1. #Let's Data IAM Account ARN: the logged in user's #Let's Data IAM Account ARN. This is the IAM user that was created automatically by #Let's Data when you signed up. All the dataset execution would be scoped to this user's security perimeter.
      2. UserId: the logged in user's User Id. We use the userId as the STS ExternalId to follow Amazon's security best practices. This would be an additional identifier (similar to MFA) that would limit someone inadvertently gaining access.

      The console's 'User Management' tab lists your IAM user ARN. You can also find it via CLI.

          $ > ./letsdata users list --prettyPrint
      
          Output
          ------
              {
                "loggedInUser": {                                                                                                      <----- logged in user details
                  "fullName": "#Let's Data Support Account",
                  "emailAddress": "support@letsdata.io",
                  "iamUserARN": "arn:aws:iam::956943252347:user/cc02afb4-fbe1-4a6a-8ab8-eedff083faa6-support@letsdata.io",             <----- The IAM User ARN
                  "userRole": "TenantAdmin",
                  "userStatus": "ACTIVE",
                  "userId": "4de219ce-4f24-424b-b169-afcf6088c696",                                                                    <----- The User Id
                  ...
                },
                "users": [
                  ...
                ]
              }
                                                
      IAM user account ARN on the #Let's Data console
    • Create an IAM role and policy

              # define a trust policy to trust the user's #Let's Data IAM User for access (who you trust for access)
              # scope it to the userId as the externalId for an additional security layer
              $ > cat policies/iam_trust_policy.json
      
              {
                "Version": "2012-10-17",
                "Statement": [
                  {
                    "Effect": "Allow",
                    "Principal": {
                      "AWS": "arn:aws:iam::223413462631:user/c16bc0aa-994f-4d8a-983e-fe94cd31cee7-support@letsdata.io"        <----- Replace the IAM User ARN
                    },
                    "Action": "sts:AssumeRole",
                    "Condition": {
                        "StringEquals": {
                            "sts:ExternalId": "4de219ce-4f24-424b-b169-afcf6088c696"                                          <----- Replace the UserId
                        }
                    }
                  }
                ]
              }
      
              # define the IAM policy using the policy document we created to allow access to the resources in an iam_policy.json file (this should be listed before the trust iam policy document)
              $ > ls policies/iam_policy.json
      
              iam_policy.json
      
              # create the role, policy and attach the policy to the role (rename the names in command according to the usecase)
              # also replace the <aws_acct_id> in the policy arn with your account id - this is essentially the arn of the created policy (or copy the ARN from the command's output)
              $ > aws iam create-role --role-name LetsData_AccessRole_LogFilesNov2022 --assume-role-policy-document file://policies/iam_trust_policy.json
              $ > aws iam create-policy --policy-name LetsData_AccessRole_LogFilesNov2022Policy --policy-document file://policies/iam_policy.json
              $ > aws iam attach-role-policy --role-name LetsData_AccessRole_LogFilesNov2022 --policy-arn arn:aws:iam::<aws_acct_id>:policy/LetsData_AccessRole_LogFilesNov2022Policy
              

    Access For Customer Account

    Customer Access

    #Let's Data initialization will grant access to any error and write destinations created during data processing to the specified customer's AWS account. Read on for detailed explanation.

    JSON Element
    This value is saved as the dataset configuration's customerAccountForAccess attribute

    Details

    The #Let's Data datasets write output records to a write destination and error records to an error destination. These write and error destinations can be:

    1. either located in the #Let's Data AWS account and be managed by #Let's Data
    2. or located in the customer's account.

    When these error and write destinations are in the customer's account, accessing the output and error records is simple - the customer can use their credentials with the AWS API and access the records.

    However, when these error and write destinations are located in the #Let's Data AWS account, the #Let's Data initialization workflow will grant the customer account access to these error and write detinations via an IAM role. This IAM role is listed in the dataset json as the 'customerAccessRoleArn' attribute. We'll also need the dataset's createDatetime which is set as the externalId (contextId) for the sts:assumeRole call:

        {
            "datasetName": "ExtractTargetUri1222202216",
            "accessGrantRoleArn": "arn:aws:iam::308240606591:role/LetsData_AccessRole_TargetUriExtractor",
            "customerAccountForAccess": "308240606591",
    
            "customerAccessRoleArn": "arn:aws:iam::223413462631:role/TestCustomerAccess24d29d89b4a2eedc6988cfa17a2c3d81IAMRole",
            "createDatetime": 1685331931671,
    
            "readConnector": {
              "readerType": "SINGLEFILEREADER",
              "bucketName": "commoncrawl",
              ...
            }
        }
    

    Here is how the customer can access the data using the customerAccessRoleArn:

    • Use AWS SecurityTokenService (AWS STS)'s assumeRole API to get access credentials to the resources. In this case, the customer code is running as the customer's aws account which would then assume the 'customerAccessRoleArn' IAM role. There is one caveat, assumeRole API can assume roles only when running as an IAM user (not as a root account). If the customer code is running as the root account, the assumeRole API will return an error. The simple fix is to create an IAM User and grant it assumeRole access. (We've granted these IAM users AdministratorAccess and that seems to work fine). To follow the AWS security best practices, we've also added an additional externalId (contextId) in the sts:assumeRole call to disallow access in from unknown contexts. Currently, the dataset's createDatetime is set as the externalId.
    • Call the write / error destination APIs to get the data using these access credentials. The stream details such as streamName and the error bucketName are in the dataset json.
    • A sample implementation of the STS assume role is in the STSUtil.java - this can be used for the Kinesis and S3 destinations. For the Kafka destination, we use the AWS's aws-msk-iam-auth library which uses the same methodology to connect securely to the Kafka cluster. We did make a private fix to this library - you'll need to download our custom version of the jar to access Kafka Cluster. For those interested, here is the issue and the fix that we made
          #download the aws-msk-iam-auth custom jar
          $ > curl -o aws-msk-iam-auth-1.1.7-letsdata-custom.jar https://d108vtfcfy7u5c.cloudfront.net/downloads/aws-msk-iam-auth-1.1.7-letsdata-custom.jar
      
          # install the aws-msk-iam-auth-1.1.7-letsdata-custom.jar JAR in the maven repo - update the downloaded path as needed
          mvn -e install:install-file -Dfile=aws-msk-iam-auth-1.1.7-letsdata-custom.jar -DgroupId=software.amazon.msk -DartifactId=aws-msk-iam-auth -Dpackaging=jar -Dversion=1.1.7-letsdata-custom
                                          

    Here are details for each of these steps - STS assumeRole API, Kinesis Reader, S3 Reader, Kafka Reader, IAM User with AdministratorAccess and the cli driver Main class. You can view these code examples in entirety at the letsdata-writeconnector-reader github repo.

    • Simple implementation creates an STS client using the IAM User's credentials
    • Calls the assumeRole API with the roleArn and policy texts

    package com.letsdata.reader;
    
    import com.amazonaws.auth.AWSCredentials;
    import com.amazonaws.auth.AWSStaticCredentialsProvider;
    import com.amazonaws.services.securitytoken.AWSSecurityTokenService;
    import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder;
    import com.amazonaws.services.securitytoken.model.AssumeRoleRequest;
    import com.amazonaws.services.securitytoken.model.AssumeRoleResult;
    import com.amazonaws.services.securitytoken.model.PolicyDescriptorType;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.ArrayList;
    import java.util.List;
    
    public class STSUtil {
        private static final Logger logger = LoggerFactory.getLogger(Logger.class);
        private final AWSSecurityTokenService stsClient;
    
        public STSUtil(String region, String awsAccessKeyId, String awsSecretKey) {
            AWSCredentials awsCredentials = new AWSCredentials() {
                @Override
                public String getAWSAccessKeyId() {
                    return awsAccessKeyId;
                }
    
                @Override
                public String getAWSSecretKey() {
                    return awsSecretKey;
                }
            };
    
            this.stsClient = AWSSecurityTokenServiceClientBuilder.standard().
                    withRegion(region).
                    withCredentials(new AWSStaticCredentialsProvider(awsCredentials)).
                    build();
        }
    
        public AssumeRoleResult assumeRole(String roleArn, String externalId, String policy, String roleSessionName, List managedPolicyArnList) {
            AssumeRoleRequest request = new AssumeRoleRequest().withRoleArn(roleArn).withPolicy(policy).withRoleSessionName(roleSessionName).withExternalId(externalId);
    
            if (managedPolicyArnList != null && !managedPolicyArnList.isEmpty()) {
                List policyDescriptorTypeList = new ArrayList<>();
                for (String managedPolicyArn : managedPolicyArnList) {
                    policyDescriptorTypeList.add(new PolicyDescriptorType().withArn(managedPolicyArn));
                }
                request.withPolicyArns(policyDescriptorTypeList);
            }
    
            logger.debug("calling assume role for role arn "+roleArn);
    
            AssumeRoleResult result = stsClient.assumeRole(request);
            return result;
        }
    }
    
    
    

    • Create a Kinesis Client using the STS Assume Role utility from the previous step
    • Use the Kinesis clent to describeStreams, listShards, getShardIterator and getRecords

    package com.letsdata.reader;
    
    import com.amazonaws.auth.AWSSessionCredentials;
    import com.amazonaws.auth.AWSSessionCredentialsProvider;
    import com.amazonaws.services.kinesis.AmazonKinesis;
    import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
    import com.amazonaws.services.kinesis.model.*;
    import com.amazonaws.services.securitytoken.model.AssumeRoleResult;
    import com.amazonaws.services.securitytoken.model.Credentials;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.ArrayList;
    import java.util.Date;
    import java.util.List;
    import java.util.concurrent.TimeUnit;
    
    public class KinesisReader {
        private static final Logger logger = LoggerFactory.getLogger(KinesisReader.class);
    
        private final STSUtil stsUtil;
        private final AmazonKinesis amazonKinesis;
        private final String roleArn;
        private final String externalId;
        private final String roleAccessPolicyText;
        private final String roleSessionName;
        private final List managedPolicyArnList;
    
        public KinesisReader(String region, STSUtil stsUtil, String roleArn, String externalId, String roleAccessPolicyText, String roleSessionName, List managedPolicyArnList) {
            this.stsUtil = stsUtil;
            this.roleArn = roleArn;
            this.externalId = externalId;
            this.roleAccessPolicyText = roleAccessPolicyText;
            this.roleSessionName = roleSessionName;
            this.managedPolicyArnList = managedPolicyArnList;
    
            this.amazonKinesis = AmazonKinesisClientBuilder.
                    standard().
                    withRegion(region).
                    withCredentials(new AWSSessionCredentialsProvider() {
                        private volatile Credentials credentials;
    
                        @Override
                        public AWSSessionCredentials getCredentials() {
                            if (credentials == null || credentials.getExpiration().before(new Date(System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(1)))) {
                                refresh();
                            }
                            if (credentials != null && !credentials.getExpiration().before(new Date())) {
                                return new AWSSessionCredentials() {
                                    @Override
                                    public String getSessionToken() {
                                        return credentials.getSessionToken();
                                    }
    
                                    @Override
                                    public String getAWSAccessKeyId() {
                                        return credentials.getAccessKeyId();
                                    }
    
                                    @Override
                                    public String getAWSSecretKey() {
                                        return credentials.getSecretAccessKey();
                                    }
                                };
                            } else {
                                throw new RuntimeException("Credentials could not be obtained");
                            }
                        }
    
                        @Override
                        public void refresh() {
                            AssumeRoleResult stsAssumeRoleResult = stsUtil.assumeRole(roleArn, externalId, roleAccessPolicyText, roleSessionName,  managedPolicyArnList);
                            this.credentials = stsAssumeRoleResult.getCredentials();
                        }
                    }).
                    build();
        }
    
        public DescribeStreamResult describeStream(String streamName, String exclusiveStartShardId) {
            DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
            describeStreamRequest.setStreamName(streamName);
            if (exclusiveStartShardId != null) {
                describeStreamRequest.setExclusiveStartShardId(exclusiveStartShardId);
            }
            return amazonKinesis.describeStream(describeStreamRequest);
        }
    
        public List listShards(String streamName, Date streamCreationTimestamp, String startToken) {
            List shardList = new ArrayList<>();
            String nextToken = startToken;
            do {
                ListShardsRequest listShardsRequest = new ListShardsRequest();
                listShardsRequest.setStreamName(streamName);
    
                if (streamCreationTimestamp != null) {
                    listShardsRequest.setStreamCreationTimestamp(streamCreationTimestamp);
                }
    
                if (nextToken != null) {
                    listShardsRequest.setNextToken(nextToken);
                }
                listShardsRequest.setMaxResults(1000);
    
    
                ListShardsResult listShardsResult = null;
                try {
                    logger.debug("Executing listShards iteration");
                    listShardsResult = amazonKinesis.listShards(listShardsRequest);
                    logger.debug("Completed listShards iteration");
                } catch (Exception ex) {
                    logger.error(streamName + " listShards threw an exception ", ex.getCause());
                    throw new RuntimeException(ex);
                }
    
                shardList.addAll(listShardsResult.getShards());
                nextToken = listShardsResult.getNextToken();
            } while (nextToken != null);
    
            return shardList;
        }
    
        public String getShardIterator(String streamName, String shardId, ShardIteratorType shardIteratorType, String startingSequenceNumber, Date startingTimestamp) {
            GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest();
            getShardIteratorRequest.setStreamName(streamName);
            getShardIteratorRequest.setShardId(shardId);
            getShardIteratorRequest.setShardIteratorType(shardIteratorType);
            if (startingSequenceNumber != null) {
                getShardIteratorRequest.setStartingSequenceNumber(startingSequenceNumber);
            }
    
            if (startingTimestamp != null) {
                getShardIteratorRequest.setTimestamp(startingTimestamp);
            }
    
            GetShardIteratorResult getShardIteratorResult = null;
            try {
                logger.debug("Executing getShardIterator");
                getShardIteratorResult = amazonKinesis.getShardIterator(getShardIteratorRequest);
                logger.debug("Completed getShardIterator");
            } catch (Exception ex) {
                logger.error("shardId " + shardId+ " getShardIterator threw an exception ", ex.getCause());
                throw new RuntimeException(ex);
            }
    
            return getShardIteratorResult.getShardIterator();
        }
    
        public GetRecordsResult getRecords(Integer limit, String shardIterator) {
            int recordLimit = limit == null ? 1000 : Math.min(1000, limit);
    
            GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
            getRecordsRequest.setLimit(recordLimit);
            getRecordsRequest.setShardIterator(shardIterator);
    
            GetRecordsResult getRecordsResult = null;
            try {
                logger.debug("Executing getRecords");
                getRecordsResult = amazonKinesis.getRecords(getRecordsRequest);
                logger.debug("Completed getRecords");
            } catch (Exception ex) {
                logger.error("shardIterator " + shardIterator + " getRecords threw an exception ", ex.getCause());
                throw new RuntimeException(ex);
            }
            return getRecordsResult;
        }
    }
    
    

    • Create a Kafka Consumer using the `aws-msk-iam-auth` library for auth
    • Use the Kafka client to listTopics, assignTopicPartitions, listAssignments, pollTopic, commitPolledRecords, topicPartitionPositions, listSubscriptions, subscribeTopic

    package com.letsdata.reader;
    
    import com.amazonaws.auth.AWSSessionCredentials;
    import com.amazonaws.auth.AWSSessionCredentialsProvider;
    import com.amazonaws.services.kafka.AWSKafka;
    import com.amazonaws.services.kafka.AWSKafkaClientBuilder;
    import com.amazonaws.services.kafka.model.GetBootstrapBrokersRequest;
    import com.amazonaws.services.kafka.model.GetBootstrapBrokersResult;
    import com.amazonaws.services.securitytoken.model.AssumeRoleResult;
    import com.amazonaws.services.securitytoken.model.Credentials;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.PartitionInfo;
    import org.apache.kafka.common.TopicPartition;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.net.InetAddress;
    import java.net.UnknownHostException;
    import java.util.*;
    import java.util.concurrent.TimeUnit;
    
    public class KafkaReader {
        private static final Logger logger = LoggerFactory.getLogger(KafkaReader.class);
    
        private final STSUtil stsUtil;
        private final AWSKafka awsKafka;
        private final KafkaConsumer kafkaConsumer;
        private final String roleArn;
        private final String externalId;
        private final String roleAccessPolicyText;
        private final String roleSessionName;
        private final List<String> managedPolicyArnList;
    
        public KafkaReader(String region, String clusterArn, String awsAccessKeyId, String awsSecretAccessKey, STSUtil stsUtil, String roleArn, String externalId, String roleAccessPolicyText, String roleSessionName, List<String> managedPolicyArnList) {
            this.stsUtil = stsUtil;
            this.roleArn = roleArn;
            this.externalId = externalId;
            this.roleAccessPolicyText = roleAccessPolicyText;
            this.roleSessionName = roleSessionName;
            this.managedPolicyArnList = managedPolicyArnList;
    
            this.awsKafka = AWSKafkaClientBuilder.
                    standard().
                    withRegion(region).
                    withCredentials(new AWSSessionCredentialsProvider() {
                        private volatile Credentials credentials;
    
                        @Override
                        public AWSSessionCredentials getCredentials() {
                            if (credentials == null || credentials.getExpiration().before(new Date(System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(1)))) {
                                refresh();
                            }
                            if (credentials != null && !credentials.getExpiration().before(new Date())) {
                                return new AWSSessionCredentials() {
                                    @Override
                                    public String getSessionToken() {
                                        return credentials.getSessionToken();
                                    }
    
                                    @Override
                                    public String getAWSAccessKeyId() {
                                        return credentials.getAccessKeyId();
                                    }
    
                                    @Override
                                    public String getAWSSecretKey() {
                                        return credentials.getSecretAccessKey();
                                    }
                                };
                            } else {
                                throw new RuntimeException("Credentials could not be obtained");
                            }
                        }
    
                        @Override
                        public void refresh() {
                            AssumeRoleResult stsAssumeRoleResult = stsUtil.assumeRole(roleArn, externalId, roleAccessPolicyText, roleSessionName,  managedPolicyArnList);
                            this.credentials = stsAssumeRoleResult.getCredentials();
                        }
                    }).
                    build();
    
            Properties consumerConfig = new Properties();
            try {
                consumerConfig.put("client.id", InetAddress.getLocalHost().getHostName());
                consumerConfig.put("group.id", "foo");
                consumerConfig.put("bootstrap.servers", getBootstrapBrokers(clusterArn));
                consumerConfig.put("security.protocol", "SASL_SSL");
                consumerConfig.put("sasl.mechanism", "AWS_MSK_IAM");
                consumerConfig.put("sasl.client.callback.handler.class", "software.amazon.msk.auth.iam.IAMClientCallbackHandler");
                consumerConfig.put("sasl.jaas.config", "software.amazon.msk.auth.iam.IAMLoginModule required awsRoleArn=\""+roleArn+"\" awsRoleAccessKeyId=\""+awsAccessKeyId+"\" awsRoleSecretAccessKey=\""+awsSecretAccessKey+"\" awsRoleExternalId=\""+externalId+"\" awsRoleSessionName=\"KafkaConsumer"+UUID.randomUUID().toString()+"\"  awsStsRegion=\""+region+"\";");
                consumerConfig.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
                consumerConfig.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
                logger.debug("creating new consumerClient");
                kafkaConsumer = new KafkaConsumer(consumerConfig);
            } catch (UnknownHostException e) {
                throw new RuntimeException("Unexpected exception in creating kafka consumer", e);
            }
        }
    
        public String getBootstrapBrokers(String clusterArn) {
            GetBootstrapBrokersRequest getBootstrapBrokersRequest = new GetBootstrapBrokersRequest().withClusterArn(clusterArn);
            GetBootstrapBrokersResult getBootstrapBrokersResult = awsKafka.getBootstrapBrokers(getBootstrapBrokersRequest);
            return getBootstrapBrokersResult.getBootstrapBrokerStringSaslIam();
        }
    
        public Map<String, List<PartitionInfo>> listTopics() {
            return kafkaConsumer.listTopics();
        }
    
        public Set<String> listSubscriptions() {
            return kafkaConsumer.subscription();
        }
    
        public void subscribe(String topicName) {
            kafkaConsumer.subscribe(Arrays.asList(topicName));
        }
    
        public ConsumerRecords pollTopic(long timeout) {
            return kafkaConsumer.poll(timeout);
        }
    
        public void commitSync() {
            kafkaConsumer.commitSync();
        }
    
        public Set<TopicPartition> assignments() {
            return kafkaConsumer.assignment();
        }
    
        public void assign(String topicName) {
            Set<TopicPartition> topicPartitionSet = new HashSet<>();
            List<PartitionInfo> partitionInfoList = kafkaConsumer.partitionsFor(topicName);
            for(PartitionInfo partitionInfo : partitionInfoList) {
                topicPartitionSet.add(new TopicPartition(topicName, partitionInfo.partition()));
            }
            kafkaConsumer.assign(topicPartitionSet);
            kafkaConsumer.seekToBeginning(topicPartitionSet);
        }
    
        public Map<String, Map<Integer, Long>> positions() {
            Map<String, Map<Integer, Long>> topicNamePartitionPositionMap = new HashMap<>();
            Map<String, List<PartitionInfo>> topicPartitionInfoMap = listTopics();
            for (String topic : topicPartitionInfoMap.keySet()) {
                for (PartitionInfo topicPartitionInfo : topicPartitionInfoMap.get(topic)){
                    String topicName = topicPartitionInfo.topic();
                    int partition = topicPartitionInfo.partition();
                    TopicPartition topicPartition = new TopicPartition(topicName, partition);
                    long position = kafkaConsumer.position(topicPartition);
                    if (!topicNamePartitionPositionMap.containsKey(topicName)) {
                        topicNamePartitionPositionMap.put(topicName, new HashMap<>());
                    }
                    Long existing = topicNamePartitionPositionMap.get(topicName).put(partition, position);
                    if (existing != null) {
                        throw new RuntimeException("Unexpected duplicate entry for topic partition");
                    }
                }
            }
            return topicNamePartitionPositionMap;
        }
    }
                                        

    • Create a AWS S3 Client using the STS Assume Role utility from the STS Assume Role step
    • Use the S3 clent to listObjectsForPrefix and readObjectFromS3Bucket

    package com.letsdata.reader;
    
    import com.amazonaws.auth.AWSSessionCredentials;
    import com.amazonaws.auth.AWSSessionCredentialsProvider;
    import com.amazonaws.services.s3.AmazonS3;
    import com.amazonaws.services.s3.AmazonS3ClientBuilder;
    import com.amazonaws.services.s3.model.*;
    import com.amazonaws.services.securitytoken.model.AssumeRoleResult;
    import com.amazonaws.services.securitytoken.model.Credentials;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.ArrayList;
    import java.util.Date;
    import java.util.List;
    import java.util.concurrent.TimeUnit;
    
    public class S3Reader {
        private static final Logger logger = LoggerFactory.getLogger(S3Reader.class);
        private final STSUtil stsUtil;
        private AmazonS3 s3Client;
        private final String roleArn;
        private final String externalId;
        private final String roleAccessPolicyText;
        private final String roleSessionName;
        private final List<String> managedPolicyArnList;
    
        public S3Reader(String region, STSUtil stsUtil, String roleArn, String externalId, String roleAccessPolicyText, String roleSessionName, List<String> managedPolicyArnList) {
            this.stsUtil = stsUtil;
            this.roleArn = roleArn;
            this.externalId = externalId;
            this.roleAccessPolicyText = roleAccessPolicyText;
            this.roleSessionName = roleSessionName;
            this.managedPolicyArnList = managedPolicyArnList;
    
            this.s3Client = AmazonS3ClientBuilder.standard().
                    withRegion(region).
                    withCredentials(new AWSSessionCredentialsProvider() {
                        private volatile Credentials credentials;
    
                        @Override
                        public AWSSessionCredentials getCredentials() {
                            if (credentials == null || credentials.getExpiration().before(new Date(System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(1)))) {
                                refresh();
                            }
                            if (credentials != null && !credentials.getExpiration().before(new Date())) {
                                return new AWSSessionCredentials() {
                                    @Override
                                    public String getSessionToken() {
                                        return credentials.getSessionToken();
                                    }
    
                                    @Override
                                    public String getAWSAccessKeyId() {
                                        return credentials.getAccessKeyId();
                                    }
    
                                    @Override
                                    public String getAWSSecretKey() {
                                        return credentials.getSecretAccessKey();
                                    }
                                };
                            } else {
                                throw new RuntimeException("Credentials could not be obtained");
                            }
                        }
    
                        @Override
                        public void refresh() {
                            AssumeRoleResult stsAssumeRoleResult = stsUtil.assumeRole(roleArn, externalId, roleAccessPolicyText, roleSessionName,  managedPolicyArnList);
                            this.credentials = stsAssumeRoleResult.getCredentials();
                        }
                    }).
                    build();
        }
    
        public List<S3ObjectSummary> listObjectsForPrefix(String bucketName, String prefix) {
            logger.debug("calling listObjectsForPrefix - bucketName: "+bucketName+", prefix: "+prefix);
            String nextMarker = null;
            ObjectListing objectListing = null;
            List<S3ObjectSummary> objectSummaryList = new ArrayList<>();
            do {
                try {
                    logger.debug("iteration listObjectsForPrefix - bucketName: "+bucketName+", prefix: "+prefix+", nextMarker: "+nextMarker);
                    ListObjectsRequest request = new ListObjectsRequest(bucketName, prefix, nextMarker, null, null);
                    objectListing = s3Client.listObjects(request);
                    logger.debug("completing iteration listObjectsForPrefix - bucketName: "+bucketName+", prefix: "+prefix+", nextMarker: "+nextMarker+", objectListing.size(): "+objectListing.getObjectSummaries().size()+", objectListing.nextMarker(): "+objectListing.getNextMarker());
                } catch (AmazonS3Exception ex) {
                    throw ex;
                }
    
                objectSummaryList.addAll(objectListing.getObjectSummaries());
                nextMarker = objectListing.getNextMarker();
            } while (objectListing != null && objectListing.isTruncated() && nextMarker != null);
            return objectSummaryList;
        }
    
        public String readObjectFromS3Bucket(String bucketName, String object) {
            String contents = null;
            try {
                contents = s3Client.getObjectAsString(bucketName, object);
            } catch (Exception ex) {
                logger.error("readObjectFromS3Bucket threw exception - bucketName: "+bucketName+", object: "+ object+", ex: "+ex);
                throw ex;
            }
            return contents;
        }
    }
    

    The assumeRole API is disallowed for root accounts. The simple fix is to create an IAM User and grant it assumeRole access. (We'll grant these IAM users AdministratorAccess).

    # create an IAM user
    $ > aws iam create-user --user-name letsDataReader
    
    # attach user policy to allow AdministratorAccess
    $ > aws iam attach-user-policy --policy-arn arn:aws:iam:<ACCOUNT-ID>:aws:policy/AdministratorAccess --user-name letsDataReader
    

    The CLI driver code uses the Kinesis Reader and the STS Util from earlier to implement the following CLI commands:

        # Given a streamName, list shards for the stream
        $ > kinesis_reader listShards --streamName 'streamName' --customerAccessRoleArn 'customerAccessRoleArn' --externalId 'externalId' --awsRegion 'awsRegion' --awsAccessKeyId 'awsAccessKeyId' --awsSecretKey 'awsSecretKey'
    
        # Given a shardId, get the Shard Iterator
        $ > kinesis_reader getShardIterator --streamName 'streamName' --customerAccessRoleArn 'customerAccessRoleArn' --externalId 'externalId' --awsRegion 'awsRegion' --awsAccessKeyId 'awsAccessKeyId' --awsSecretKey 'awsSecretKey' --shardId 'shardId'
    
        # Given a shardIterator, get the records from the stream
        $ > kinesis_reader getRecords --streamName 'streamName' --customerAccessRoleArn 'customerAccessRoleArn' --externalId 'externalId' --awsRegion 'awsRegion' --awsAccessKeyId 'awsAccessKeyId' --awsSecretKey 'awsSecretKey' --shardIterator 'shardIterator'
    

    Here is the code. It creates an inline policy to allow listShards, getShardIterator and getRecords APIs. You can use any Kinesis API, just add that in the inline policy:

    package com.letsdata.reader;
    
    import com.amazonaws.services.kinesis.model.GetRecordsResult;
    import com.amazonaws.services.kinesis.model.Record;
    import com.amazonaws.services.kinesis.model.Shard;
    import com.amazonaws.services.kinesis.model.ShardIteratorType;
    import net.sourceforge.argparse4j.ArgumentParsers;
    import net.sourceforge.argparse4j.inf.ArgumentParser;
    import net.sourceforge.argparse4j.inf.ArgumentParserException;
    import net.sourceforge.argparse4j.inf.Namespace;
    import software.amazon.awssdk.utils.StringUtils;
    
    import java.io.ByteArrayInputStream;
    import java.io.ByteArrayOutputStream;
    import java.io.IOException;
    import java.util.*;
    import java.util.zip.GZIPInputStream;
    import java.util.zip.GZIPOutputStream;
    
    public class Main {
    
        public static void main(String[] args) {
            ArgumentParser parser = ArgumentParsers.newFor("letsdatawriteconnector").build();
            parser.addArgument("action").choices("listShards", "getShardIterator", "getRecords").required(true).help("The kinesis client api method that needs to be called. [\"listShards\", \"getShardIterator\", \"getRecords\"]");
            parser.addArgument("--awsRegion").required(false).type(String.class).help("The awsRegion - default to us-east-1").setDefault("us-east-1");
            parser.addArgument("--awsAccessKeyId").required(true).type(String.class).help("The awsAccessKeyId for the customerAccountForAccess for the dataset");
            parser.addArgument("--customerAccessRoleArn").required(true).type(String.class).help("The customerAccessRoleArn from the dataset that has the been granted the access to the write connector");
            parser.addArgument("--externalId").required(true).type(String.class).help("The externalId for the sts assumeRole. This is the dataset createDatetime.");
            parser.addArgument("--awsSecretKey").required(true).type(String.class).help("The awsSecretKey for the customerAccountForAccess for the dataset");
            parser.addArgument("--streamName").required(true).type(String.class).help("The kinesis stream name");
            parser.addArgument("--shardId").required(false).type(String.class).help("The shardId for the getShardIterator call");
            parser.addArgument("--shardIterator").required(false).type(String.class).help("The shardIterator for the getRecords call");
    
            try {
                Namespace namespace = parser.parseArgs(args);
    
                String action = namespace.get("action");
                if (StringUtils.isBlank(action)) {
                    throw new ArgumentParserException("action should not be blank", parser);
                }
    
                String region = namespace.getString("awsRegion");
                String streamName = namespace.getString("streamName");
                String customerAccessRoleArn = namespace.getString("customerAccessRoleArn");
                String externalId = namespace.getString("externalId");
                STSUtil stsUtil = new STSUtil(region, namespace.getString("awsAccessKeyId"), namespace.getString("awsSecretKey"));
                String roleAccessPolicyText = "{\n" +
                        "    \"Version\": \"2012-10-17\",\n" +
                        "    \"Statement\": [\n" +
                        "        {\n" +
                        "            \"Effect\": \"Allow\",\n" +
                        "            \"Action\": [\n" +
                        "                \"kinesis:GetShardIterator\",\n" +
                        "                \"kinesis:GetRecords\"\n" +
                        "            ],\n" +
                        "            \"Resource\": \"arn:aws:kinesis:" + region + ":223413462631:stream/" + streamName + "\"\n" +
                        "        },\n" +
                        "        {\n" +
                        "            \"Effect\": \"Allow\",\n" +
                        "            \"Action\": \"kinesis:ListShards\",\n" +
                        "            \"Resource\": \"*\"\n" +
                        "        }\n" +
                        "    ]\n" +
                        "}";
    
                String roleSessionName = streamName + System.currentTimeMillis();
                KinesisReader kinesisReader = new KinesisReader(region, stsUtil, customerAccessRoleArn, externalId, roleAccessPolicyText, roleSessionName, null);
                switch (action) {
                    case "listShards": {
                        List shardList = kinesisReader.listShards(streamName, null, null);
                        System.out.println(shardList);
                        break;
                    }
                    case "getShardIterator": {
                        String shardIterator = kinesisReader.getShardIterator(streamName, namespace.getString("shardId"), ShardIteratorType.TRIM_HORIZON, null, null);
                        System.out.println(shardIterator);
                        break;
                    }
                    case "getRecords": {
                        GetRecordsResult getRecordsResult = kinesisReader.getRecords(null, namespace.getString("shardIterator"));
                        List recordList = getRecordsResult.getRecords();
                        for (Record record : recordList) {
                            byte[] dataBytes = record.getData().array();
                            String recordStr = new String(GZipUtil.decompressByteArr(dataBytes));
                            System.out.println("record: " + recordStr);
                        }
                        System.out.println("getRecordsResult - recordList.size:  " + getRecordsResult.getRecords().size() + ", nextShardIterator: " + getRecordsResult.getNextShardIterator());
                        break;
                    }
                    default: {
                        throw new ArgumentParserException("Unknown action " + action, parser);
                    }
                }
            } catch (ArgumentParserException e) {
                parser.handleError(e);
            }
        }
    }
    
                                        

    • Run the kafka_reader.sh file in the bin folder. You may need to update the jar path as needed.
    • The CLI driver code (KafkaMain.java) uses the Kafka Reader to implement the following CLI commands:

        # cd into the bin directory
        $ > cd src/bin
    
        # awsAccessKeyId and awsSecretKey are the security credentials of an IAM User in the customer AWS account. This is the customer AWS account that was granted access. In case this is a root account, you can create an IAM user. See the "IAM User With AdministratorAccess" section above.
    
        # Connect a Kafka Consumer to the Kafka Cluster using aws-msk-iam-auth library
        $ > kafka_reader --clusterArn 'clusterArn' --customerAccessRoleArn 'customerAccessRoleArn' --externalId 'externalId' --awsRegion 'awsRegion' --awsAccessKeyId 'awsAccessKeyId' --awsSecretKey 'awsSecretKey' --topicName 'topicName'
    
        > Enter the kafka consumer method to invoke. ["listTopics", "listSubscriptions", "subscribeTopic", "pollTopic", "commitPolledRecords", "topicPartitionPositions","assignTopicPartitions", "listAssignments","quit"]
        listTopics
        {commoncrawl1}
    
        > Enter the kafka consumer method to invoke. ["listTopics", "listSubscriptions", "subscribeTopic", "pollTopic", "commitPolledRecords", "topicPartitionPositions","assignTopicPartitions", "listAssignments","quit"]
        assignTopicPartitions
    
        > Enter the kafka consumer method to invoke. ["listTopics", "listSubscriptions", "subscribeTopic", "pollTopic", "commitPolledRecords", "topicPartitionPositions","assignTopicPartitions", "listAssignments","quit"]
        topicPartitionPositions
        {commoncrawl1={0=0, 1=0, 2=0, 3=0, 4=0}}
    
        > Enter the kafka consumer method to invoke. ["listTopics", "listSubscriptions", "subscribeTopic", "pollTopic", "commitPolledRecords", "topicPartitionPositions","assignTopicPartitions", "listAssignments","quit"]
        pollTopic
        ...
        ...
        ...
    
        > Enter the kafka consumer method to invoke. ["listTopics", "listSubscriptions", "subscribeTopic", "pollTopic", "commitPolledRecords", "topicPartitionPositions","assignTopicPartitions", "listAssignments","quit"]
        commitPolledRecords
    
        > Enter the kafka consumer method to invoke. ["listTopics", "listSubscriptions", "subscribeTopic", "pollTopic", "commitPolledRecords", "topicPartitionPositions","assignTopicPartitions", "listAssignments","quit"]
        topicPartitionPositions
        {commoncrawl1={0=179, 1=424, 2=249, 3=185, 4=233}}
    
        > Enter the kafka consumer method to invoke. ["listTopics", "listSubscriptions", "subscribeTopic", "pollTopic", "commitPolledRecords", "topicPartitionPositions","assignTopicPartitions", "listAssignments","quit"]
        quit
    

    Here is the code. It creates an inline policy to allow kafka and kafka-cluster APIs. You can use any Kafka Client API (the client api to read / write data to the cluster), or any Kafka API to manage the cluster (implement in KafkaReader):

    package com.letsdata.reader;
    
    import net.sourceforge.argparse4j.ArgumentParsers;
    import net.sourceforge.argparse4j.inf.ArgumentParser;
    import net.sourceforge.argparse4j.inf.ArgumentParserException;
    import net.sourceforge.argparse4j.inf.Namespace;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    
    import java.io.BufferedReader;
    import java.io.InputStreamReader;
    import java.util.Iterator;
    
    public class KafkaMain {
        // $ > kafka_reader --clusterArn 'clusterArn' --customerAccessRoleArn 'customerAccessRoleArn' --externalId 'externalId' --awsRegion 'awsRegion' --awsAccessKeyId 'awsAccessKeyId' --awsSecretKey 'awsSecretKey' --topicName 'topicName'
        public static void main(String[] args) {
            ArgumentParser parser = ArgumentParsers.newFor("letsdatawriteconnector").build();
            parser.addArgument("--awsRegion").required(false).type(String.class).help("The awsRegion - default to us-east-1").setDefault("us-east-1");
            parser.addArgument("--awsAccessKeyId").required(true).type(String.class).help("The awsAccessKeyId for the customerAccountForAccess for the dataset");
            parser.addArgument("--customerAccessRoleArn").required(true).type(String.class).help("The customerAccessRoleArn from the dataset that has the been granted the access to the write connector");
            parser.addArgument("--externalId").required(true).type(String.class).help("The externalId for the sts assumeRole. This is the dataset createDatetime.");
            parser.addArgument("--awsSecretKey").required(true).type(String.class).help("The awsSecretKey for the customerAccountForAccess for the dataset");
            parser.addArgument("--clusterArn").required(true).type(String.class).help("The kafka clusterArn");
            parser.addArgument("--topicName").required(true).type(String.class).help("The kafka topic name");
    
            try {
                Namespace namespace = parser.parseArgs(args);
    
                String region = namespace.getString("awsRegion");
                String clusterArn = namespace.getString("clusterArn");
                String customerAccessRoleArn = namespace.getString("customerAccessRoleArn");
                String externalId = namespace.getString("externalId");
                STSUtil stsUtil = new STSUtil(region, namespace.getString("awsAccessKeyId"), namespace.getString("awsSecretKey"));
                String roleAccessPolicyText = "{\n" +
                        "    \"Version\": \"2012-10-17\",\n" +
                        "    \"Statement\": [\n" +
                        "        {\n" +
                        "            \"Effect\": \"Allow\",\n" +
                        "            \"Action\": [\n" +
                        "                \"kafka:*\",\n" +
                        "                \"kafka-cluster:*\"\n" +
                        "            ],\n" +
                        "            \"Resource\": \"*\"\n" +
                        "        }\n" +
                        "    ]\n" +
                        "}";
                String action;
                BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
                String roleSessionName = "KafkaReader" + System.currentTimeMillis();
                KafkaReader kafkaReader = new KafkaReader(region, clusterArn, namespace.getString("awsAccessKeyId"), namespace.getString("awsSecretKey"), stsUtil, customerAccessRoleArn, externalId, roleAccessPolicyText, roleSessionName, null);
                do {
                    System.out.println("> Enter the kafka consumer method to invoke. [\"listTopics\", \"listSubscriptions\", \"subscribeTopic\", \"pollTopic\", \"commitPolledRecords\", \"topicPartitionPositions\",\"assignTopicPartitions\", \"listAssignments\",\"quit\"]");
    
                    action = reader.readLine();
                    switch (action) {
                        case "listTopics": {
                            System.out.println(kafkaReader.listTopics());
                            break;
                        }
                        case "listSubscriptions": {
                            System.out.println(kafkaReader.listSubscriptions());
                            break;
                        }
                        case "subscribeTopic": {
                            kafkaReader.subscribe(namespace.getString("topicName"));
                            break;
                        }
                        case "pollTopic": {
                            ConsumerRecords consumerRecords = kafkaReader.pollTopic(60000);
                            System.out.println("Polled "+consumerRecords.count()+ " records");
                            Iterator<ConsumerRecord> iter = consumerRecords.records(namespace.getString("topicName")).iterator();
                            while (iter.hasNext()) {
                                System.out.println(iter.next().value());
                            }
                            break;
                        }
                        case "commitPolledRecords": {
                            kafkaReader.commitSync();
                            break;
                        }
                        case "topicPartitionPositions": {
                            System.out.println(kafkaReader.positions());
                            break;
                        }
                        case "listAssignments": {
                            System.out.println(kafkaReader.assignments());
                            break;
                        }
                        case "assignTopicPartitions": {
                            kafkaReader.assign(namespace.getString("topicName"));
                            break;
                        }
                        case "quit":{
                            break;
                        }
                        default: {
                            System.out.println("ERROR: Unknown action " + action +", try again");
                        }
                    }
                } while (!"quit".equalsIgnoreCase(action));
            } catch (ArgumentParserException e) {
                parser.handleError(e);
            } catch (Exception ex) {
                throw new RuntimeException(ex);
            }
        }
    }
                                        

    Here is the POM File with the different imports and maven config to create the project:

        <?xml version="1.0" encoding="UTF-8"?>
        <project xmlns="http://maven.apache.org/POM/4.0.0"
                 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
            <modelVersion>4.0.0</modelVersion>
    
            <groupId>com.resonance.letsdata.example</groupId>
            <artifactId>letsdata-writeconnector-reader</artifactId>
            <version>1.0-SNAPSHOT</version>
            <description>
                Sample code to access the write connector data by assuming the customerAccessRole which is granted by #Let's Data. This is needed when write connector resource location is #Let's Data. See https://letsdata.io/docs#accessgrants (Granting Customer Access to #Let's Data Resources) for details
            </description>
            <properties>
                <maven.compiler.source>1.8</maven.compiler.source>
                <maven.compiler.target>1.8</maven.compiler.target>
                <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
                <build.time>${maven.build.timestamp}</build.time>
                <maven.build.timestamp.format>yyyyMMddhhmmss</maven.build.timestamp.format>
                <build.tag />
            </properties>
    
            <dependencies>
                <!-- aws -->
                <dependency>
                    <groupId>com.amazonaws</groupId>
                    <artifactId>aws-java-sdk-sts</artifactId>
                    <version>1.12.302</version>
                </dependency>
                <dependency>
                    <groupId>com.amazonaws</groupId>
                    <artifactId>aws-java-sdk-kinesis</artifactId>
                    <version>1.12.302</version>
                </dependency>
                <dependency>
                    <groupId>software.amazon.awssdk</groupId>
                    <artifactId>kinesis</artifactId>
                    <version>2.17.157</version>
                </dependency>
                <dependency>
                    <groupId>com.amazonaws</groupId>
                    <artifactId>aws-java-sdk-s3</artifactId>
                    <version>1.12.302</version>
                </dependency>
                <dependency>
                    <groupId>com.amazonaws</groupId>
                    <artifactId>aws-java-sdk-kafka</artifactId>
                    <version>1.12.496</version>
                </dependency>
                <dependency>
                    <groupId>software.amazon.msk</groupId>
                    <artifactId>aws-msk-iam-auth</artifactId>
                    <version>1.1.7-letsdata-custom</version>
                </dependency>
    
                <!-- Kafka -->
                <dependency>
                    <groupId>org.apache.kafka</groupId>
                    <artifactId>kafka-clients</artifactId>
                    <version>2.8.1</version>
                </dependency>
    
                <!-- Cli parser -->
                <dependency>
                    <groupId>net.sourceforge.argparse4j</groupId>
                    <artifactId>argparse4j</artifactId>
                    <version>0.9.0</version>
                </dependency>
    
                <!-- Test Dependencies -->
                <dependency>
                    <groupId>junit</groupId>
                    <artifactId>junit</artifactId>
                    <version>4.12</version>
                    <scope>test</scope>
                </dependency>
            </dependencies>
            <!-- "mvn clean compile assembly:single" for a single jar with dependencies -->
            <!-- "mvn clean compile test assembly:single" will run tests as well -->
            <build>
                <plugins>
                    <plugin>
                        <artifactId>maven-assembly-plugin</artifactId>
                        <configuration>
                            <archive>
                                <manifest>
                                    <mainClass>com.letsdata.reader.Main</mainClass>
                                </manifest>
                            </archive>
                            <descriptorRefs>
                                <descriptorRef>jar-with-dependencies</descriptorRef>
                            </descriptorRefs>
                        </configuration>
                    </plugin>
                </plugins>
            </build>
        </project>
    

    View Dataset

    Dataset Overview


    Status

    Configuration

    Read Connector

    Write Connector

    Manifest File

    Error Connector

    Compute Engine

    Actions

    Update Dataset Code

    S3 Artifact File Access Grant Role ARN




    Update Dataset Compute Engine

    Select Compute Engine Type



    Delete Dataset

    Select DeleteType



    Redrive Error Tasks

    Select Tasks to Redrive

    Select Task Redrive Policy



    Stop Dataset

    Move stopped tasks to:




    
                                

    Dataset Tasks

    taskId taskStatus taskProgress taskCheckpoint taskExecutionLogs readerDefinition computeDefinition JSON

    Dataset Metrics


    •    Each datapoint is the percentage of tasks that completed successfully at that minute, plotted on the left axis. 1.0 means 100% tasks succeeded, 0.0 means all tasks that completed at that minute failed.
    •    Plotted on the right axis is the number of tasks that completed at that minute.
    •    For example, % of 0.75 and number of tasks = 4 means 75% of the 4 completing tasks succeeded at that minute and 25% of the 4 tasks failed.


    •    Each datapoint is the average latency of completion for the tasks that completed at that minute.
    •    For example, a value of 196,926.0 milliseconds means that the tasks completing at that minute took 196 seconds to complete on average.
    •    This metric can be correlated to the number of tasks metric for that minute in the previous graph.
    •    For example, if 4 tasks completed at that minute and the average latency is 196,926.0 at that minute, then the sample size for the average is the 4 tasks that completed that minute.


    •    Each datapoint is the percentage of task checkpoints that completed successfully at that minute, plotted on the left axis.
    •    1.0 means 100% checkpoints succeeded, 0.0 means all checkpoints at that minute failed.
    •    Plotted on the right axis is the average latency of each checkpoint in milliseconds.
    •    A task checkpoints after reading, for example, every 500 records from the file. In a minute, the task can create multiple checkpoints. And there are multiple tasks running concurrently. This is the average across all these checkpoints.
    •    While this isn't controllable by the user, knowing how much time each checkpoint is taking can help with diagnosing task issues. The #Let's Data team closely monitors this metric to find any DB throttling / DB latency issues. We expect this metric to be within the ~25 msec range.


    •    Each datapoint is the sum of the number of records processed (red), skipped (green) and errored (blue) by each task at that minute, plotted on the left axis.
    •    This record is a composite record which is being sent to the write destination - not necessarily the records read from the file.
    •    For example, if each task is reading data from 2 files, and it produces a composite document (or decision to skip / error) from multiple records from each file, the metric will count these as 1.


    •    Each datapoint is the sum of the Write Connector's Put API call by each task at that minute, plotted on the left axis.
    •    We batch the Write Connector Put API calls, so each call may have multiple records and may succeed / fail partially (which would be retried till completion or task error).
    •    A task calls Write Connector Put API when the batch size records are in the buffer or the buffer size in bytes reaches max allowed threshold.
    •    In a minute, the task can call Write Connector Put API multiple times and there are multiple tasks running concurrently. This is a sum across all these Write Connector Put API calls.
    •    User can use the volume, latency (and the Write Connector Bytes Written and the Put Retries) graphs to determine whether the Write Connector scaling needs fine-tuning.


    •    Each datapoint is the average percentage of the Write Connector Put API calls that were retried by the tasks.
    •    A 0.0 value means that there were no retries to Write Connector (Write Connector is adequately scaled and one can even see if descaling a little would cause any issues or not).
    •    In a minute, the task can call Write Connector Put API multiple times and there are multiple tasks running concurrently. This is the average retry percentage across all these task Put API calls.
    •    A value of 0.25 means that 25% of the Write Connector calls were retried across the tasks on average. If there are 4 tasks concurrently running, then this is the average of retry percentage for each task. For example, either each task could be retrying 25% of the time before the call succeeds (possible Write Connector scaling issues) or maybe 1 task is retrying 100% of the time and 3 tasks are retrying 0.0% (possible some issues with the task - this is contrived example and is unlikely).
    •    User can use the Put Retries (and the Write Connector Bytes Written and the volume, latency) graphs to determine whether the Write Connector stream scaling needs fine-tuning.


    •    Each datapoint plotted on the left axis is the (avg, min and max) latency of extraction of the record by the user handlers (readers and parser).
    •    Plotted on the right axis is the sample count for the latency metric.
    •    This is pure CPU work that the parsers and readers do on the bytes from the file to extract the records and create composite records (plus some work that the system does to put them into buffers etc).
    •    This may be a good metric to look at to find performance issues with the parser, we expect these latency to be < 10 ms (min, avg) and < 30 ms (max for example for large documents).


    •    Each datapoint plotted on the left axis is the bytes read by the readers from S3 (in KBs).
    •    In a minute, the readers can read from S3 many times and there could be multiple readers in a task (one per file type). There could be many tasks running concurrently. This is the average KBs read by all readers across all concurrently running tasks.
    •    Each datapoint plotted on the right axis is the bytes written by the task to Write Connector (in KBs).
    •    In a minute, the tasks can write to Write Connector multiple times. There could be many tasks running concurrently. This is the average KBs written per min across all concurrently running tasks.
    •    Users can look at these metrics to reason about the system's throughput and debug any issues that may arise from network read / write.
    •    We expect max / avg network throughput of XYZ /ABC from S3 file read. We expect max / avg network throughput of XYZ / ABC to Write Connector for each shard (multiply by number of shards).

    Task Latency Metrics

    #Let's Data Task Component Diagram

    Readers


    •    Each datapoint on the left axis (red) is the average latency of reading each individual record from the read destination at that minute.
    •    For example, a value of 196.0 milliseconds means that the tasks took 196 ms to read the next record from read destination on average at that minute.
    •    Each datapoint on the right axis (green) is the average of the number of records that were read by the tasks at that minute from the read destination.
    •    For example, if number of messages is 5000, then tasks read 5000 records on average at that minute from the read destination.


    •    Each datapoint is the average enqueue time in milliseconds that the readers (tasks) took to enqueue to the compute / writer queue.
    •    This can be used to diagnose slower downstream components (compute, writers), which can cause the reader queue to be full waiting for space to become available in the queue.

    Writers


    •    Each datapoint is the average wait time in milliseconds for a record in the writer queue. This can be used to diagnose the throughput issues.
    •    For example, a large wait time might require increasing write resources to increase the write throughput.


    •    Each datapoint is the average dequeue time in milliseconds to dequeue a message from the writer queue. A high dequeue value means that the writer is waiting on an empty queue.
    •    This is to detect issues where compute may not be enqueuing messages fast enough in cases where compute endpoints are not adequately scaled.


    •    Each datapoint is the average time writers took to pre-process the message. This could be deserialization, error doc creation or no preprocessing
    •    A high value means that writers are doing additional processing prior to writing to the destination which could be an issue
    •    There isn't much preprocessing done by the writers, so we expect this to be less than 5 millisecs


    •    Each datapoint plotted on the left axis is the bytes read by the readers from S3 (in KBs).
    •    In a minute, the readers can read from S3 many times and there could be multiple readers in a task (one per file type). There could be many tasks running concurrently. This is the average KBs read by all readers across all concurrently running tasks.
    •    Each datapoint plotted on the right axis is the bytes written by the task to Write Connector (in KBs).
    •    In a minute, the tasks can write to Write Connector multiple times. There could be many tasks running concurrently. This is the average KBs written per min across all concurrently running tasks.
    •    Users can look at these metrics to reason about the system's throughput and debug any issues that may arise from network read / write.
    •    We expect max / avg network throughput of XYZ /ABC from S3 file read. We expect max / avg network throughput of XYZ / ABC to Write Connector for each shard (multiply by number of shards).


    •    Each datapoint on the left axis (red) is the average latency of the write for each individual record at that minute. For destinations that support batched writes, this is the latency of the batched call whereas for multi-threaded batching by Lets Data, this is the latency of each message's write call.
    •    For example, a value of 196.0 milliseconds means that the writers took 196 ms to write each record on average at that minute.
    •    Each datapoint on the right axis (green) is the average of the number of records that were written by the writers at that minute.
    •    For example, if number of messages is 5000, then writers processed 5000 records on average at that minute for the write destination.

    Overall


    •    Each datapoint on the left axis (red) is the average total latency for each individual record at that minute and includes read, queueing, compute and write times.
    •    For example, a value of 196.0 milliseconds means that the record took 196 ms total time on average at that minute to be processed end to end by the task.
    •    Each datapoint on the right axis (green) is the average of the number of records that were processed by the task at that minute.
    •    For example, if number of messages is 5000, then task processed 5000 records on average at that minute.

    Execution Logs

    taskId taskStatus logs

    Error Records

    taskId taskStatus numberOfErrors errors

    Usage Records

    resourceType resource eventStartTime eventEndTime meteringDimension meteringUnit meteringValue billedStatus

    User Management

    Logged In User



    User List

    fullName emailAddress phone userRole userStatus JSON

    User Actions

    Add User

    Full Name


    Email Address


    Phone


    User Role



    Update User

    Email Address


    Attribute To Update

    Attribute Existing Value


    Attribute New Value



    Delete User

    Email Address



    Costs and Billing

    Customer Details

    Payment Details

    ccBrand ccLastFour ccExpiry paymentMethodType

    Pricing

    Price Name Product Description Unit Amount

    Invoices

    Start Time End Time Status Due Date Currency Tax Total Amount Due Amount Paid Charge Pay Now PDF Invoice Line Items

    Support

    Drop us a note:


    Send us an email:

    support@letsdata.io