Snowplow Enrich Setup Issue

619 views Asked by At

collector.conf

collector {
 
  interface = "0.0.0.0"
  interface = ${?COLLECTOR_INTERFACE}
  port = 8181
  port = ${?COLLECTOR_PORT}

  # optional SSL/TLS configuration
  ssl {
    enable = false
    enable = ${?COLLECTOR_SSL}
    # whether to redirect HTTP to HTTPS
    redirect = false
    redirect = ${?COLLECTOR_SSL_REDIRECT}
    port = 9543
    port = ${?COLLECTOR_SSL_PORT}
  }

  paths {
    # "/com.acme/track" = "/com.snowplowanalytics.snowplow/tp2"
    # "/com.acme/redirect" = "/r/tp2"
    # "/com.acme/iglu" = "/com.snowplowanalytics.iglu/v1"
  }

  # Configure the P3P policy header.
  p3p {
    policyRef = "/w3c/p3p.xml"
    CP = "NOI DSP COR NID PSA OUR IND COM NAV STA"
  }

  crossDomain {
    enabled = false
    # Domains that are granted access, *.acme.com will match http://acme.com and http://sub.acme.com
    enabled = ${?COLLECTOR_CROSS_DOMAIN_ENABLED}
    domains = [ "*" ]
    domains = [ ${?COLLECTOR_CROSS_DOMAIN_DOMAIN} ]
    # Whether to only grant access to HTTPS or both HTTPS and HTTP sources
    secure = true
    secure = ${?COLLECTOR_CROSS_DOMAIN_SECURE}
  }

  cookie {
    enabled = true
    enabled = ${?COLLECTOR_COOKIE_ENABLED}
    expiration = "365 days"
    expiration = ${?COLLECTOR_COOKIE_EXPIRATION}
    # Network cookie name
    name = zanui_collector_cookie
    name = ${?COLLECTOR_COOKIE_NAME}
    domains = [
        "{{cookieDomain1}}" # e.g. "domain.com" -> any origin domain ending with this will be matched and domain.com will be returned
        "{{cookieDomain2}}" # e.g. "secure.anotherdomain.com" -> any origin domain ending with this will be matched and secure.anotherdomain.com will be returned
        # ... more domains
    ]
    domains += ${?COLLECTOR_COOKIE_DOMAIN_1}
    domains += ${?COLLECTOR_COOKIE_DOMAIN_2}
    fallbackDomain = ""
    fallbackDomain = ${?FALLBACK_DOMAIN}
    secure = false
    secure = ${?COLLECTOR_COOKIE_SECURE}
    httpOnly = false
    httpOnly = ${?COLLECTOR_COOKIE_HTTP_ONLY}
    sameSite = "{{cookieSameSite}}"
    sameSite = ${?COLLECTOR_COOKIE_SAME_SITE}
  }

  doNotTrackCookie {
    enabled = false
    enabled = ${?COLLECTOR_DO_NOT_TRACK_COOKIE_ENABLED}
   # name = {{doNotTrackCookieName}}
    name = zanui-collector-do-not-track-cookie
   # value = {{doNotTrackCookieValue}}
    value = zanui-collector-do-not-track-cookie-value
  }

 
  cookieBounce {
    enabled = false
    enabled = ${?COLLECTOR_COOKIE_BOUNCE_ENABLED}
    name = "n3pc"
    name = ${?COLLECTOR_COOKIE_BOUNCE_NAME}
    fallbackNetworkUserId = "00000000-0000-4000-A000-000000000000"
    fallbackNetworkUserId = ${?COLLECTOR_COOKIE_BOUNCE_FALLBACK_NETWORK_USER_ID}
    forwardedProtocolHeader = "X-Forwarded-Proto"
    forwardedProtocolHeader = ${?COLLECTOR_COOKIE_BOUNCE_FORWARDED_PROTOCOL_HEADER}
  }
  enableDefaultRedirect = true
  enableDefaultRedirect = ${?COLLECTOR_ALLOW_REDIRECTS}
  redirectMacro {
    enabled = false
    enabled = ${?COLLECTOR_REDIRECT_MACRO_ENABLED}
    # Optional custom placeholder token (defaults to the literal `${SP_NUID}`)
    placeholder = "[TOKEN]"
    placeholder = ${?COLLECTOR_REDIRECT_REDIRECT_MACRO_PLACEHOLDER}
  }
  rootResponse {
    enabled = false
    enabled = ${?COLLECTOR_ROOT_RESPONSE_ENABLED}
    statusCode = 302
    statusCode = ${?COLLECTOR_ROOT_RESPONSE_STATUS_CODE}
    # Optional, defaults to empty map
    headers = {
      Location = "https://127.0.0.1/",
      Location = ${?COLLECTOR_ROOT_RESPONSE_HEADERS_LOCATION},
      X-Custom = "something"
    }
    # Optional, defaults to empty string
    body = "302, redirecting"
    body = ${?COLLECTOR_ROOT_RESPONSE_BODY}
  }
  cors {
    accessControlMaxAge = 5 seconds
    accessControlMaxAge = ${?COLLECTOR_CORS_ACCESS_CONTROL_MAX_AGE}
  }

  # Configuration of prometheus http metrics
  prometheusMetrics {
    enabled = false
  }

  streams {
    # Events which have successfully been collected will be stored in the good stream/topic
    good = snowplow-collected-good-events-stream
    good = ${?COLLECTOR_STREAMS_GOOD}

    # Events that are too big (w.r.t Kinesis 1MB limit) will be stored in the bad stream/topic
    bad = snowplow-collected-bad-events-stream
    bad = ${?COLLECTOR_STREAMS_BAD}

    useIpAddressAsPartitionKey = false
    useIpAddressAsPartitionKey = ${?COLLECTOR_STREAMS_USE_IP_ADDRESS_AS_PARTITION_KEY}

    sink {
      enabled = kinesis
      enabled = ${?COLLECTOR_STREAMS_SINK_ENABLED}

      # Region where the streams are located
      region = ap-southeast-2
      region = ${?COLLECTOR_STREAMS_SINK_REGION}

      threadPoolSize = 10
      threadPoolSize = ${?COLLECTOR_STREAMS_SINK_THREAD_POOL_SIZE}
      aws {
        accessKey = env
        accessKey = ${?COLLECTOR_STREAMS_SINK_AWS_ACCESS_KEY}
        secretKey = env
        secretKey = ${?COLLECTOR_STREAMS_SINK_AWS_SECRET_KEY}
      }

      # Minimum and maximum backoff periods, in milliseconds
      backoffPolicy {
        #minBackoff = {{minBackoffMillis}}
        minBackoff = 10
        #maxBackoff = {{maxBackoffMillis}}
        maxBackoff = 10      
        }

 }

    buffer {
      byteLimit = 4500000
      byteLimit = ${?COLLECTOR_STREAMS_BUFFER_BYTE_LIMIT}
      recordLimit =500 # Not supported by Kafka; will be ignored
      recordLimit = ${?COLLECTOR_STREAMS_BUFFER_RECORD_LIMIT}
      timeLimit = 5000
      timeLimit = ${?COLLECTOR_STREAMS_BUFFER_TIME_LIMIT}
    }
  }

}

akka {
  loglevel = DEBUG # 'OFF' for no logging, 'DEBUG' for all logging.
  loglevel = ${?AKKA_LOGLEVEL}
  loggers = ["akka.event.slf4j.Slf4jLogger"]
  loggers = [${?AKKA_LOGGERS}]

  http.server {
    remote-address-header = on
    remote-address-header = ${?AKKA_HTTP_SERVER_REMOTE_ADDRESS_HEADER}

    raw-request-uri-header = on
    raw-request-uri-header = ${?AKKA_HTTP_SERVER_RAW_REQUEST_URI_HEADER}

    # Define the maximum request length (the default is 2048)
    parsing {
      max-uri-length = 32768
      max-uri-length = ${?AKKA_HTTP_SERVER_PARSING_MAX_URI_LENGTH}
      uri-parsing-mode = relaxed
      uri-parsing-mode = ${?AKKA_HTTP_SERVER_PARSING_URI_PARSING_MODE}
    }
  }

}

Run Command:

java -Dcom.amazonaws.sdk.disableCbor -jar snowplow-stream-collector-kinesis-1.0.0.jar --config collector.conf

enricher.conf

enrich {

  streams {

    in {
      # Stream/topic where the raw events to be enriched are located
      raw = snowplow-collected-good-events-stream
      raw = ${?ENRICH_STREAMS_IN_RAW}
    }

    out {
      # Stream/topic where the events that were successfully enriched will end up
      enriched = snowplow-collected-good-events-stream
     
      # Stream/topic where the event that failed enrichment will be stored
      bad = snowplow-collected-bad-events-stream
      bad = ${?ENRICH_STREAMS_OUT_BAD}
      # Stream/topic where the pii tranformation events will end up
     # pii = {{outPii}}
     # pii = ${?ENRICH_STREAMS_OUT_PII}

      partitionKey = event_id
      partitionKey = ${?ENRICH_STREAMS_OUT_PARTITION_KEY}
    }

    sourceSink {

      enabled =  kinesis
      enabled =  ${?ENRICH_STREAMS_SOURCE_SINK_ENABLED}

      region = ap-southeast-2

       aws {
         accessKey = env
         accessKey = ${?ENRICH_STREAMS_SOURCE_SINK_AWS_ACCESS_KEY}
         secretKey = env
         secretKey = ${?ENRICH_STREAMS_SOURCE_SINK_AWS_SECRET_KEY}
       }

       maxRecords = 10000

      initialPosition = TRIM_HORIZON
      initialTimestamp = "2020-09-10T10:00:00Z"

      backoffPolicy {
        minBackoff = 1000
        minBackoff = ${?ENRICH_STREAMS_SOURCE_SINK_BACKOFF_POLICY_MIN_BACKOFF}
        maxBackoff = 5000
        maxBackoff = ${?ENRICH_STREAMS_SOURCE_SINK_BACKOFF_POLICY_MAX_BACKOFF}
      }


    }


    buffer {
      byteLimit = 1000000000
      byteLimit = ${?ENRICH_STREAMS_BUFFER_BYTE_LIMIT}
      recordLimit = 10 # Not supported by Kafka; will be ignored
      recordLimit = ${?ENRICH_STREAMS_BUFFER_RECORD_LIMIT}
      timeLimit = 5000
      timeLimit = ${?ENRICH_STREAMS_BUFFER_TIME_LIMIT}
    }

    appName = "zanui-enricher-app"
    appName = ${?ENRICH_STREAMS_APP_NAME}
  }

}

Run Command:

java -jar snowplow-stream-enrich-kinesis-1.0.0.jar --config enricher.conf --resolver file:resolver.json

S3 Loader Config

source = "kinesis"

sink = "kinesis"

aws {
  accessKey = "env"
  secretKey = "env"
}

# Config for NSQ
 nsq {
  channelName = "nsqSourceChannelName"
    
  # Host name for NSQ tools
  host = "127.0.0.1"

  # HTTP port for nsqd
   port = 4150

  # HTTP port for nsqlookupd
   lookupPort = 4161
}

kinesis {

  initialPosition = "TRIM_HORIZON"
  initialTimestamp = "2017-05-17T10:00:00Z"
  maxRecords = 10000
  region = "ap-southeast-2"
  appName = "zanui-enricher-app"
}

streams {
  inStreamName = "snowplow-collected-good-events-stream"
  outStreamName = "snowplow-collected-bad-events-stream"

  buffer {
    byteLimit = 1000000000 # Not supported by NSQ; will be ignored
    recordLimit = 10
    timeLimit = 5000 # Not supported by NSQ; will be ignored
  }
}

s3 {
  region = "ap-southeast-2"
  bucket = "snowplow-enriched-good-events"
  partitionedBucket = "snowplow-enriched-good-events/partitioned"
  dateFormat = "{YYYY}/{MM}/{dd}/{HH}"
  outputDirectory = "zanui-enriched/good"
  filenamePrefix = "zanui-output"
  format = "gzip"
  # Maximum Timeout that the application is allowed to fail for (in milliseconds)
  maxTimeout = 300000 # 5 minutes
}

Run Command:

java -jar snowplow-s3-loader-0.6.0.jar --config my.conf

But this Snowplow S3 Loader not doing anything so i used Data Fireshose to transfer stream to S3 bucket.

When i try to use Aws Lambda in Data Fireshose, it give error

{"attemptsMade":4,"arrivalTimestamp":1600154159619,"errorCode":"Lambda.FunctionError","errorMessage":"The Lambda function was successfully invoked but it returned an error result.","attemptEndingTimestamp":1600154235846,"rawData":"****","lambdaArn":"arn:aws:lambda:ap-southeast-2:573188294151:function:snowplow-json-transformer-lambda:$LATEST"}
    {"attemptsMade":4,"arrivalTimestamp":1600154161523,"errorCode":"Lambda.FunctionError","errorMessage":"The Lambda function was successfully invoked but it returned an error result.","attemptEndingTimestamp":1600154235846,"rawData":"*****=","lambdaArn":"arn:aws:lambda:ap-southeast-2:573188294151:function:snowplow-json-transformer-lambda:$LATEST"}

If i dont use lambda, log is created in S3 Good Enriched Bucket for page view event, but at the same time, log is created in S3 bad Enriched Bucket for the same page view event saying

{"schema":"iglu:com.snowplowanalytics.snowplow.badrows/collector_payload_format_violation/jsonschema/1-0-0","data":{"processor":{"artifact":"snowplow-stream-enrich","version":"1.0.0"},"failure":{"timestamp":"2020-09-15T07:16:02.488Z","loader":"thrift","message":{"error":"error deserializing raw event: Cannot read. Remote side has closed. Tried to read 2 bytes, but only got 1 bytes. (This is often indicative of an internal error on the server side. Please check your server logs.)"}},"payload":"****="}}

I have followed the documentation repeatedly but i am confused in the setup for stream enrich. What i did not understand is that , do we need to setup database for stream enrich if we are not using custom schema ? Because since i am trying to test with Page View event from Javascript Tracker, I have not setup any database. But i have provided access for DynamoDb create, edit for IAM role.

Please help me to setup snowplow if anyone has done before. Please :(

1

There are 1 answers

1
Aswin Kumar Rajendran On

I had written a blog on how to setup Snowplow Analytics in AWS.

Here is the link, hope this helps you.