Azure Data Factory throwing error "Call to provided Azure function failed with status-'BadRequest' while invoking 'GET'

165 views Asked by At

I'm testing a function app via adf that call the result from CosmosDB. When I run it, it fails on the function app activity and this is the error message on adf pipeline

error code: 3608 - Call to provided Azure function 'CanStart?id=66&forceRun=false' failed with status-'BadRequest' while invoking 'GET' on 'https://functionapptest.azurewebsites.net' and message - '{"status": "ERROR", "message": "'https://cosmosdb_dev.documents.azure.com:443/'", "version": "", "id": "66"}'.

on ADF i have set up parameters:(1) forceRun (2) TriggerDateTime.

below is the screenshot of the pipeline

adf pipeline

function code:

init.py

import os
import sys
import json
import logging


from datetime import datetime, timedelta

from dateutil.parser import parse
import azure.functions as func
import pytz

from shared.keyvault import get_config
from shared.utils import get_datetime_with_offset, get_run_time

from shared.bottler import get_bottler_config
from shared.stage_status import get_bottler_stage_status
from shared.version import VERSION

sys.path.insert(0, os.path.abspath(".."))



def main(req: func.HttpRequest) -> func.HttpResponse:
    logging.info("Python HTTP trigger function processed a request.")

    bottler = str(req.params.get("bottler", None))
    force_run = req.params.get("forceRun", None)

    if req.method == "POST" and (bottler is None or force_run is None):
        bottler = str(req.get_json().get("bottler", None))
        force_run = req.get_json().get("forceRun", None)

    if force_run == "true":
        return func.HttpResponse(json.dumps({"status": "OK", "version": VERSION}),
                                 mimetype="application/json",
                                 headers={"Content-type": "application/json"})

    if not bottler:
        return func.HttpResponse(
            json.dumps(
                {"status": "ERROR", "message": "Please set bottler to check", "version": VERSION,
                 "bottler": bottler}),
            status_code=400,
            mimetype="application/json",
            headers={"Content-type": "application/json"}
        )

    try:
        config = get_config()
        bconfig = get_bottler_config(bottler)
        logging.info("Got bottler [%s] configs.", bottler)

        logging.info("Fetching bottler[%s] statuses.", bottler)
        statuses = get_bottler_stage_status(bottler)
        logging.info("Finish fetching bottler[%s] statuses.", bottler)
        run_time = get_run_time(bconfig, config)

        return func.HttpResponse(
            json.dumps({"status": "OK" if is_time_to_run(statuses, bconfig, run_time) else "NOK", "version": VERSION})
        )
    except Exception as my_e:
        return func.HttpResponse(
            json.dumps(
                {"status": "ERROR", "message": str(my_e), "version": VERSION, "bottler": bottler}),
            status_code=400,
            mimetype="application/json"
        )


def is_time_to_run(statuses: list, bottler_config: dict, run_time: datetime) -> bool:
    """
    Seek bottler config in CosmoDB and match scheduled run time to current time. If current time is in valid range it
    return True. Otherwise return False
    :param statuses:
    :param bottler_config:
    :param run_time
    :return: boolean
    """
    bottler = bottler_config["entityid"]

    if not statuses:
        logging.info("No statuses for bottler[%s]", bottler)
        return True
    else:
        logging.info("Fetched %s statuses for bottler[%s]", len(statuses), bottler)

        statuses = filter_today_statuses(statuses, bottler_config, run_time)

        if not statuses:
            logging.info("No today statuses for bottler[%s]", bottler)
            return check_time_to_run(bottler_config)

        ingest_done_statuses = filter_statuses(statuses, "INGEST", "DONE")
        ingest_in_progress_statuses = filter_statuses(statuses, "INGEST", "IN_PROGRESS")
        is_dispatcher_done = check_last_stage_status(statuses, "Dispatcher", "DONE")

        if not (ingest_done_statuses or ingest_in_progress_statuses) and is_dispatcher_done:
            logging.info("Last bottler[%s] status is: %s", bottler, statuses[-1])
            if statuses[-1]["status"] != "IN_PROGRESS":
                return check_time_to_run(bottler_config)
    return False


def check_last_stage_status(statuses: list, stage: str, req_status: str):
    if statuses is None:
        statuses = []
    statuses = [
        s for s in statuses
        if s["stage"] == stage
    ]
    if not statuses:
        return True
    return statuses[-1]["status"] == req_status


def filter_statuses(statuses: list, stage: str, status: str) -> list:
    statuses = [
        s for s in statuses
        if s["status"] == status and s["stage"] == stage
    ]
    logging.info("Found statuses[%s, %s] : %s", stage, status, statuses)
    return statuses


def filter_today_statuses(statuses: list, bottler_config: dict, run_time: datetime) -> list:
    logging.info("Current time: %s", run_time)
    statuses = [
        s for s in statuses
        if get_datetime_with_offset(bottler_config["timezone"], parse(s["timestamp"])) >= run_time
    ]
    logging.info("Found today statuses: %s", statuses)
    return statuses


def check_time_to_run(bottler_config: dict):
    timezone = pytz.timezone(bottler_config["timezone"])
    offset = timezone.utcoffset(datetime.utcnow())
    now = datetime.utcnow() + offset
    today = datetime(year=now.year, month=now.month, day=now.day)

    if bottler_config.get("processingStartTimeDayTotalMinutes", None) is None:
        bottler_config["processingStartTimeDayTotalMinutes"] = 0

    if bottler_config.get("processingEndTimeDayTotalMinutes", None) is None:
        bottler_config["processingEndTimeDayTotalMinutes"] = 0

    start_time = today + timedelta(minutes=bottler_config["processingStartTimeDayTotalMinutes"])
    end_time = today + timedelta(minutes=bottler_config["processingEndTimeDayTotalMinutes"])

    if start_time.timestamp() <= now.timestamp() <= end_time.timestamp():
        return True
    return False

function.json

{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "authLevel": "anonymous",
      "type": "httpTrigger",
      "direction": "in",
      "name": "req",
      "methods": [
        "get",
        "post"
      ]
    },
    {
      "type": "http",
      "direction": "out",
      "name": "$return"
    }
  ]
}

What else can be the cause of the error? Been stucked on this issue. Thank you.

I have tried to stop and restart the azure function, no firewall setup on the cosmosdb either as it was set to public access.

Note: i have changed the link name on the example above just for the purpose of this question.

0

There are 0 answers