#StackBounty: #apache-beam #beam-sql Apache beam: SQL aggregation outputs no results for Unbounded/Bounded join

Bounty: 50

I am working on an apache beam pipeline to run a SQL aggregation function.Reference: https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java#L159.
The example here works fine.However, when I replace the source with an actual unbounded source and do an aggregation, I see no results.
Steps in my pipeline:

  1. Read bounded data from a source and convert to collection of rows.
  2. Read unbounded json data from a websocket source.
  3. Assign timestamp to the every source stream via a DoFn.
  4. Convert the unbounded json to unbounded row collection
  5. Apply a window on the row collection
  6. Apply a SQL statement.
  7. Output the result of the sql.

A normal SQL statement executes and outputs the results. However, when I use a group by in the SQL, there is no output.

SELECT 
  o1.detectedCount,
  o1.sensor se,
  o2.sensor sa
FROM SENSOR o1 
  LEFT JOIN AREA o2 
  on o1.sensor = o2.sensor

The results are continous and like shown below.

2019-07-19 20:43:11 INFO ConsoleSink:27 - {
                                           "detectedCount":0,
                                           "se":"3a002f000647363432323230",
                                           "sa":"3a002f000647363432323230"
                                          }

2019-07-19 20:43:11 INFO ConsoleSink:27 - {
                                           "detectedCount":1,
                                           "se":"3a002f000647363432323230",
                                           "sa":"3a002f000647363432323230"
                                          }

2019-07-19 20:43:11 INFO ConsoleSink:27 - {
                                           "detectedCount":0,            
                                           "se":"3a002f000647363432323230",
                                           "sa":"3a002f000647363432323230"
                                          }

The results don’t show up at all when I change the sql to

SELECT
  COUNT(o1.detectedCount) o2.sensor sa
FROM SENSOR o1
  LEFT JOIN AREA o2
  on o1.sensor = o2.sensor
GROUP BY o2.sensor

Is there anything I am doing wrong in this implementation.Any pointers would be really helpful.


Get this bounty!!!

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.