No output when one side of UNION is empty

130 views Asked by At

I have an Azure Stream Analytics job that combines the results of multiple queries and outputs them to the same sink. To do this, I define my queries within a WITH statement, then combine them using UNION and then write them to my sink. However, unfortunately I only get an output to my sink whenever all of my queries actually have an output, and this is where it goes wrong.

I have some queries that continuously (every 5 minutes) give an output, but I also have some queries that rare give an output (maybe a few times per day). This causes the output to not get any results, until the queries all have something to return. Does anyone know why this is and how I can fix it? Shouldn't UNION also give results when set A has results, but set B doesn't? I'm running this locally in VS Code, with a live connection to Event Hub by the way.

Here is a simplified example of 2 queries (one with frequent output, one with infrequent output) that goes wrong:

WITH
HarmonizedMeasurements AS (
    SELECT
        CAST(EHHARM.TimeStamp AS datetime) AS "TimeStamp",
        CAST(EHHARM.ValueNumber AS float) AS "ValueNumber",
        EHHARM.ValueBit AS "ValueBit",
        EHHARM.MeasurementName,
        EHHARM.PartName,
        EHHARM.ElementId,
        EHHARM.ElementName,
        EHHARM.ObjectName,
        EHHARM.TranslationTableId
    FROM EventHubHarmonizedMeasurements AS EHHARM TIMESTAMP BY "TimeStamp"
    PARTITION BY TranslationTableId
),

ToerenAandrijvingCategoriesMeasurements AS (
    SELECT
        AANDRCAT.TimeStamp AS "TimeStamp",
        AANDRCAT.ValueNumber AS "ValueNumber",
        AANDRCAT.ValueBit AS "ValueBit",
        AANDRCAT.MeasurementName AS "MeasurementName",
        AANDRCAT.PartName AS "PartName",
        AANDRCAT.ElementId AS "ElementId",
        AANDRCAT.ElementName AS "ElementName",
        AANDRCAT.ObjectName AS "ObjectName",
        AANDRCAT.TranslationTableId AS "TranslationTableId",
        CASE 
            WHEN (-5000 < AANDRCAT.ValueNumber AND AANDRCAT.ValueNumber <= -1000) THEN 'Dalen'
            WHEN (-1000 < AANDRCAT.ValueNumber AND AANDRCAT.ValueNumber <= -200) THEN 'Dalen Retarderen'
            WHEN (-200 < AANDRCAT.ValueNumber AND AANDRCAT.ValueNumber <= 0) THEN 'Stilstand'
            WHEN (0 < AANDRCAT.ValueNumber AND AANDRCAT.ValueNumber <= 250) THEN 'Nivelleren'
            WHEN (250 < AANDRCAT.ValueNumber AND AANDRCAT.ValueNumber <= 400) THEN 'Heffen Retarderen'
            WHEN (400 < AANDRCAT.ValueNumber AND AANDRCAT.ValueNumber <= 5000) THEN 'Heffen'
            ELSE 'NoCategory'
        END AS "Category"
    FROM HarmonizedMeasurements AS AANDRCAT
    WHERE
        AANDRCAT.ObjectName LIKE 'Schutsluis%' AND
        AANDRCAT.MeasurementName = 'Motortoerental terugkoppeling' AND
        AANDRCAT.ValueNumber <> 0
),
AandrijvingCatStartMeasurements AS (
    SELECT
        AANDRCAT.TimeStamp AS "StartTime",
        AANDRCAT.Category AS "Category",
        AANDRCAT.ElementId AS "ElementId",
        AANDRCAT.TranslationTableId AS "TranslationTableId"
    FROM ToerenAandrijvingCategoriesMeasurements AS AANDRCAT
    WHERE
        LAG(Category, 1) OVER (PARTITION BY ElementId LIMIT DURATION(day, 1)) <> Category
),
AandrijvingCatEndMeasurements AS (
    SELECT
        AANDRST.StartTime AS "EndTime",
        LAG(AANDRST.StartTime, 1) OVER (PARTITION BY ElementId LIMIT DURATION(day, 1)) AS "StartTime",
        LAG(AANDRST.Category, 1) OVER (PARTITION BY ElementId LIMIT DURATION(day, 1)) AS "Category",
        AANDRST.ElementId AS "ElementId",
        AANDRST.TranslationTableId AS "TranslationTableId"
    FROM AandrijvingCatStartMeasurements AS AANDRST
),
VermogenAandrijvingMeasurements AS (
    SELECT
        AANDRVER.TimeStamp AS "TimeStamp",
        AANDRVER.ValueNumber AS "ValueNumber",
        AANDRVER.ValueBit AS "ValueBit",
        CONCAT(AANDRVER.MeasurementName, ' ', AANDREN.Category) AS "MeasurementName",
        AANDRVER.PartName AS "PartName",
        AANDRVER.ElementId AS "ElementId",
        AANDRVER.ElementName AS "ElementName",
        AANDRVER.ObjectName AS "ObjectName",
        AANDRVER.TranslationTableId AS "TranslationTableId"
    FROM HarmonizedMeasurements AS AANDRVER
    LEFT JOIN AandrijvingCatEndMeasurements AS AANDREN
    ON DATEDIFF(minute, AANDRVER, AANDREN) BETWEEN 0 AND 30 AND
        AANDRVER.TimeStamp >= AANDREN.StartTime AND
        AANDRVER.Timestamp < AANDREN.EndTime AND
        AANDRVER.ElementId = AANDREN.ElementId AND
        AANDRVER.TranslationTableId = AANDREN.TranslationTableId
    INNER JOIN SQLMeasurementType AS MEAS
    ON MEAS.Name = CONCAT(AANDRVER.MeasurementName, ' ', AANDREN.Category)
    WHERE
        AANDRVER.ObjectName LIKE 'Schutsluis%' AND
        AANDRVER.MeasurementName = 'Vermogen'
),
LockDoorTop AS (
    SELECT
        Lock.TimeStamp AS "TimeStamp",
        Lock.ValueNumber AS "ValueNumber",
        Lock.ValueBit AS "ValueBit",
        Lock.MeasurementName,
        Lock.PartName,
        Lock.ElementId,
        Lock.ElementName,
        Lock.ObjectName,
        Lock.TranslationTableId
    FROM HarmonizedMeasurements AS Lock
    WHERE
        Lock.MeasurementName = 'Sluisdeur open' AND
        Lock.ElementName = 'Deur sluiskolk 1' AND
        Lock.PartName = 'Bovenhoofd' AND
        Lock.ValueBit = '1'
),
WaterLTop AS (
    SELECT
        WaterTop.TimeStamp AS "TimeStamp",
        WaterTop.ValueNumber AS "ValueNumber",
        WaterTop.ValueBit AS "ValueBit",
        WaterTop.MeasurementName,
        WaterTop.PartName,
        WaterTop.ElementId,
        WaterTop.ElementName,
        WaterTop.ObjectName,
        WaterTop.TranslationTableId
    FROM HarmonizedMeasurements AS WaterTop
    WHERE
        WaterTop.MeasurementName = 'Waterniveaumeting' AND
        WaterTop.ElementName = 'Sluiskolk 1' AND
        WaterTop.PartName = 'Opvaartzijde'
),
WaterLLock AS (
    SELECT
        WaterLock.TimeStamp AS "TimeStamp",
        WaterLock.ValueNumber AS "ValueNumber",
        WaterLock.ValueBit AS "ValueBit",
        WaterLock.MeasurementName,
        WaterLock.PartName,
        WaterLock.ElementId,
        WaterLock.ElementName,
        WaterLock.ObjectName,
        WaterLock.TranslationTableId
    FROM HarmonizedMeasurements AS WaterLock
    WHERE
        WaterLock.MeasurementName = 'Waterniveaumeting' AND
        WaterLock.ElementName = 'Sluiskolk 1' AND
        WaterLock.PartName = 'Sluiskamer'
),
WaterLevelTopMeasurements AS (
    SELECT
        LockDoor.TimeStamp AS "TimeStamp",
        CAST(ROUND((WaterLevelLock.ValueNumber - WaterLevelTop.ValueNumber), 2) AS float) AS "ValueNumber",
        null AS "ValueBit",
        MEAS.Name AS "MeasurementName",
        LockDoor.PartName AS "PartName",
        LockDoor.ElementId AS "ElementId",
        LockDoor.ElementName AS "ElementName",
        LockDoor.ObjectName AS "ObjectName",
        LockDoor.TranslationTableId AS "TranslationTableId"
    FROM LockDoorTop AS LockDoor
    JOIN WaterLTop AS WaterLevelTop
    ON  DATEDIFF(minute, LockDoor, WaterLevelTop) BETWEEN 0 AND 1 AND
        LockDoor.ObjectName = WaterLevelTop.ObjectName
    JOIN WaterLLock AS WaterLevelLock
    ON  DATEDIFF(minute, LockDoor, WaterLevelLock) BETWEEN 0 AND 1 AND
        LockDoor.ObjectName = WaterLevelLock.ObjectName
    INNER JOIN SQLMeasurementType AS MEAS
    ON MEAS.Name = 'Waterniveauverschil'
),

-- Combine queries
DatalakeCombinedMeasurements AS (
    SELECT * FROM VermogenAandrijvingMeasurements
    UNION
    SELECT * FROM WaterLevelTopMeasurements
)

-- Output data
SELECT *
INTO DatalakeHarmonizedMeasurements
FROM DatalakeCombinedMeasurements
PARTITION BY TranslationTableId
0

There are 0 answers