Why is this inner join returning noting, while it should return many lines items

181 views Asked by At

This code executes an innerjoin operation on an influx.db My expectation is that a new table should be returned with entries that are common for booth input tables. But is does not return is nothing.

Can somebody assist to tell me what i doing wrong.

import "join"

// The first query on the influx DB, returning the left stream 
left =
from(bucket: "IoT_Prod")
  |> range(start: -1d)
   |> filter(fn: (r) => r["_field"] == "aanvoer_temp")
  |> filter(fn: (r) => r["CV_status"] == "hwc")
  |> aggregateWindow(every: 1h, fn: last, createEmpty: false)
  |> yield(name: "hwc")

// The second query on the influx DB, returning the right stream
right =
from(bucket: "IoT_Prod")
  |> range(start: -1d)
  |> filter(fn: (r) => r["_field"] == "geleverd gas")
  |> aggregateWindow(every: 1h , fn: mean, createEmpty: false)
  |> yield(name: "gas")

// The inner join operation that should return a DB with common lines on time entry. 
join.inner(
  left : left,
  right : right,
  // Statement to filter on those lines with time is equal on both streams. 
  on : (l,r) => l._time == r._time,
  // The structucture of the data that should be returned. 
  as : (l,r) => ({join_time: r._time, join_value : r._value, join_field : r._field, join_CV_status : l.CV_status}),
  )

The result is the following output ..

enter image description here

I was expecting the influx to return a new table (measurement) with the collored lines. Since they are common in both tables.

1

There are 1 answers

0
H Doucet On

I have found the solution. Each imput table needs to be grouped by _time. As a result flux will created many tables, one per time unit. In my case one table per 1h. A join will group then those tables with similar time units.

import "join"
import "date"

lefti = from(bucket: "IoT_Prod")
  |> range(start: date.truncate(t: -3d, unit: 1h))
  |> filter(fn: (r) => r["_field"] == "aanvoer_temp" and r["CV_status"] == "hwc")
  |> aggregateWindow(every: 1h, fn: last, createEmpty: false)
  //|>limit(n:5)
  |> group (columns: ["_time"])
  |> yield(name: "left_hwc")

right = from(bucket: "IoT_Prod")
  |> range(start: date.truncate(t: -3d, unit: 1h))
  |> filter(fn: (r) => r["_field"] == "geleverd gas" and r["loc"] == "BO")
  |> aggregateWindow(every: 1h , fn: spread, createEmpty: false)
  //|>limit(n:5)
  |> group (columns: ["_time"])
  |> yield(name: "right_gas")

 

// Here we add a join.
join.left(
  left : lefti,
  right : right,
  on : (l,r) => (l._time == r._time),
  as : (l,r) => ({r with cv_status : l.CV_status }),
  //method : "full"
  )

Table 1 - HWC Table 2 - Gas Table 3 - Join Result