GroveStreams Help Center
Connectors (Beta)




Overview

Welcome to the GroveStreams Connector Help Page! Here, you'll find detailed information on GroveStreams Connectors. Many devices can be programmed to use the GroveStreams HTTP and MQTT APIs, but sometimes programming isn't an option. That's where Connectors come in.

A GroveStreams Connector:
  • Accepts Data: Accepts incoming data from devices, sensors, gateways, or other cloud platforms in their proprietary format.
  • Transforms Data: Converts the collected data into a required GroveStreams data format.
  • Inserts Data: Inserts the data into GroveStreams component streams.
Hard-Coded Webhooks
Many devices and other platforms have the ability to "push" data to another platform. Some sites refer to this as a webhook. GroveStreams has hard-coded the ability to accept data from several company's webhooks.

Instead of us writing code and maintaining each company's webhooks, we've given you the ability to design your own connector to handle incoming calls.

The HTTP GS SQL Connector
We anticipate many connectors and processors in the future, but today, there is only one type of connector, the HTTP GS SQL connector. The HTTP GS SQL Connector:
  • Accepts incoming HTTP(S) calls. Each connector can process many incomming calls from many sources.
  • Transforms Data using GS SQL
  • Inserts Data into streams using the GS SQL INSERT OR REPLACE INTO statement

Common GS SQL Parsing Functions used by Connectors:
The GS SQL Connector is available for all pricing plans. Customers with plans that do not include GS SQL will not be able to make SELECT calls to existing stream data. Only parsing functions and INSERT will be available.

Getting Started

GroveStreams - Connectors
1. Sign-in to GroveStreams, enter the studio, choose the Tools tab on the left side of the studio.
2. Right-click Connectors and select New - Connector


GroveStreams - Connectors
1. Select the Connector tab.
2. Enter a name for the connector
3. Enter a URL Endpoint. The Endpoint must be unique within the organization. The URL Endpoint will be appended to the connector sink URL that appears below it.
4. The connector sink URL is the URL that is used by external systems to push data to the connector. Enter this URL in the external system's webhook, or similar "push data" tool. Go to the studio toolbar Admin-Security-HTTP API Keys and select an existing key such as Connector URL Sink Key or create a new one. Click View Secret Key... to obtain the key. The api_key can be supplied to the connector in any one of three ways:
  • URL query string — append ?api_key=<secret> (or &api_key=<secret> if the URL already has a query string) to the sink URL.
  • Cookie — send a request cookie of the form api_key=<secret>.
  • HTTP Basic Auth header — send Authorization: Basic <base64> where the base64 value decodes to <secret>: (api_key in the username field, password blank). Useful when the sending platform doesn't allow custom headers, cookies, or query-string editing but does support Basic Auth.

5. Select a user to run the connector as. The user's configured security is respected during the running of the connector. Enter a time zone to be used by the SQL.


GroveStreams - Connectors
1. Select the Processor: GS SQL tab and enter the GS SQL that will transform and insert the incoming data into component streams. There are three system variables available to the parsing SQL:
  • @HTTP_BODY: This is the HTTP Entity, or body, that was sent with the webhook. This could be blank, or NULL, if the caller's HTTP method is GET or they didn't include a body.
  • @HTTP_QUERY_STRING: This is the query string part of the calling URL. The part after the question mark. It is URL decoded before it is set.
  • @HTTP_METHOD: The caller's HTTP method: PUT, POST, GET, or DELETE
2. Click Save & Close to save the Connector.


The above example processes an iMonnit webhook for the Quad Temperature sensor. The following JSON is sent by the webhook and is accessed via the @HTTP_BODY SQL variable:
{
  "Method": "SensorDataMessages",
  "Result": [
    {
      "DataMessageGUID": "7a19c032-1a36-462b-9e48-xxxxxxxxxxxx",
      "SensorID": 751000,
      "MessageDate": "/Date(1649721230000)/",
      "State": 0,
      "SignalStrength": 75,
      "Voltage": 3.46,
      "Battery": 100,
      "Data": "28.5, 23.8, 17.5, 13",
      "DisplayData": " Condenser Coil 83.3° F, Return Air 74.8° F, Evaporator Coil 63.5° F, Conditioned Air 55.4° F",
      "PlotValue": "83.3",
      "MetNotificationRequirements": false,
      "GatewayID": 974384,
      "DataValues": "28.5|23.8|17.5|13",
      "DataTypes": "TemperatureData|TemperatureData|TemperatureData|TemperatureData",
      "PlotValues": "83.3|74.84|63.5|55.4",
      "PlotLabels": "Fahrenheit|Fahrenheit|Fahrenheit|Fahrenheit"
    }
  ]
}


The GS SQL Statement
Most Connector SQL statements will not be this complicated. We wanted to include an example of parsing a more complicated JSON object. DataValues arriving as a delimited string is what makes it complicated. The data does not have to be JSON. It can be any string including CSV strings.


WITH
    BaseData AS(SELECT
            compId,
            rawData,
            Cast(Left(Right(rawDate, 15), 13) AS LONG) as time
        FROM
            JSON_TABLE(@HTTP_BODY)
        WITH(compId String '$.Result[0].SensorID',
                rawData string '$.Result[0].Data',
                rawDate string '$.Result[0].MessageDate'
            )
    )
INSERT OR REPLACE INTO
    SAMPLE(CID, ID, TIME, SAMPLE)
SELECT
    compId,
    'data1' as streamId,
    time,
    parseCsv(rawData, 1) as data
FROM
    BaseData
UNION ALL
SELECT
    compId,
    'data2' as streamId,
    time,
    parseCsv(rawData, 2) as data
FROM
    BaseData
UNION ALL
SELECT
    compId,
    'data3' as streamId,
    time,
    parseCsv(rawData, 3) as data
FROM
    BaseData
UNION ALL
SELECT
    compId,
    'data4' as streamId,
    time,
    parseCsv(rawData, 4) as data
FROM
    BaseData


Example Overview

1. The Monnit sensor sends a reading to the Monnit cloud. A Monnit webhook is configured to "push" the sensor's JSON data to the configured GroveStreams Connector's sink URL
2. The connector uses GS SQL to process the incoming JSON:
  • The WITH statement declares a SELECT statement that is reused four times below and makes reading the SQL easier.
  • The JSON_TABLE function parses the JSON from the @HTTP_BODY variable.
  • The JSON_TABLE WITH statement extracts the component ID, sensor data (4 values delimited) and the time.
  • The SQL function, parseCsv parses the delimited data string into 4 parts.
  • The four temperature values are combined into one INSERT statement by using four SELECT statements with UNION ALL statements.
  • The INSERT OR REPLACE INTO statements will insert the data into component streams matching the component ID and stream IDs. The WITH statement can also include a component template ID and a folder path for new components.
3. The sensor data is placed into four component streams. Events and derivation are processed during the save.


Simple JSON SQL Example

An example of parsing a simple JSON structure. This can be run from the Studio's GS Query Editor and Tester via Toolbar-Admin. It is setting the @HTTP_BODY variable which is a great way to test and troubleshoot a connector's SQL.

SET @HTTP_BODY = '{
  "compId": "comp1",
  "streamId": "stream1",
  "data": 120.2,
  "time": 1738908000000
}';

INSERT OR REPLACE INTO
    SAMPLE(CID, ID, TIME, SAMPLE)
SELECT CID, ID, TIME, DATA
FROM
    JSON_TABLE(@HTTP_BODY)
WITH(CID String '$.compId',
        ID String '$.streamId',
        DATA String '$.data',
        TIME Long '$.time'
    );



Simple CSV SQL Example

An example of parsing a simple CSV structure. This can be run from the Studio's GS Query Editor and Tester via Toolbar-Admin. It is setting the @HTTP_BODY variable which is a great way to test and troubleshoot a connector's SQL.

SET @HTTP_BODY = '"comp1","stream1","2021-04-01T05:00:00.000",24
"comp1","stream1","2021-04-01T06:00:00.000",23
"comp1","stream1","2021-04-01T07:00:00.000",22';

INSERT OR REPLACE INTO     SAMPLE(CID, ID, TIME, SAMPLE) SELECT CID, ID, TOEPOCHMILLIS(STRING_TIME, 'yyyy-MM-dd''T''HH:mm:ss.SSS') as TIME, DATA FROM     CSV_TABLE(@HTTP_BODY) WITH (    1 CID STRING,    2 ID STRING,    3 STRING_TIME STRING,    4 DATA LONG)


Simple Query String parsing SQL Example

An example of extracting the component and stream ID from the URL query string. This can be run from the Studio's GS Query Editor and Tester via Toolbar-Admin. It is setting the @HTTP_QUERY_STRING variable which is a great way to test and troubleshoot a connector's SQL.

The component and stream ID are part of the URL query string in this example. The function PARSECSV is used to split the query string and return the substring at the requested one-based index. The substrings are placed into a variables @STATION and @READINGS. The variables are used within the INSERT SELECT statement.,
	

SET @HTTP_QUERY_STRING = 'station=12845&readings=temp';
SET @HTTP_BODY = '"2021-04-01T05:00:00.000",24
"2021-04-01T06:00:00.000",23
"2021-04-01T07:00:00.000",22';

SET @STATION = PARSECSV(PARSECSV(@HTTP_QUERY_STRING, 1, '&'), 2, '='); SET @READINGS = PARSECSV(PARSECSV(@HTTP_QUERY_STRING, 2, '&'), 2, '=');

INSERT OR REPLACE INTO     SAMPLE(CID, ID, TIME, SAMPLE)
SELECT @STATION,  @READINGS, TOEPOCHMILLIS(STRING_TIME, 'yyyy-MM-dd''T''HH:mm:ss.SSS') as TIME, DATA FROM     CSV_TABLE(@HTTP_BODY) WITH (    1 STRING_TIME STRING,    2 DATA LONG)


JDBC Import (Beta)

A JDBC Import connector reaches out to an external database on a schedule, fetches rows using SQL, and ingests them into GroveStreams component streams using GS SQL. Unlike the HTTP Webhook connector which waits for data to be pushed, JDBC Import actively pulls data from external databases.

How it Works
The JDBC Import uses a two-SQL model:
  • External SQL: A standard SQL query that runs against the remote database (PostgreSQL, MySQL, SQL Server, Oracle). This query fetches the data you want to import.
  • Ingest SQL: A GS SQL statement that processes the fetched data and inserts it into GroveStreams component streams.
The external SQL results are made available to the Ingest SQL via the @JDBC_RESULT variable as a CSV string. Use the CSV_TABLE function to parse it.

Available Variables
  • @JDBC_RESULT: The CSV string containing the external SQL results with column names as the header row.
  • @LAST_RUN_DATE: The epoch millisecond timestamp of the last successful import run. This is 0 on the first run. Use this for incremental imports.
  • @CONNECTOR_ID: The ID of the JDBC Import connector.
Configuration
  • Connection: A reference to a JDBC Connection connector that defines the database connection details.
  • Running User: The user context for executing the GS SQL (required for INSERT security).
  • Max Rows: Maximum number of rows to fetch from the external database per import run. Set to 0 for unlimited (default: 0).
  • Skip Unchanged Rows: When enabled, rows where all stream values match the previously imported values are skipped, avoiding redundant data points. Reduces storage when re-importing unchanged source data (default: on).
  • Schedule: Optional schedule for automatic periodic imports.
Example: Incremental Import
This example imports new temperature readings from a PostgreSQL database since the last successful import:

External SQL:

SELECT station_id, reading_time, temperature, humidity
FROM sensor_readings
WHERE reading_time > '2024-01-01'
ORDER BY reading_time

Ingest SQL:

INSERT OR REPLACE INTO
    SAMPLE(CID, ID, TIME, SAMPLE)
SELECT station_id, 'temperature', TOEPOCHMILLIS(reading_time, 'yyyy-MM-dd HH:mm:ss'), temperature
FROM CSV_TABLE(@JDBC_RESULT)
WITH (1 station_id STRING, 2 reading_time STRING, 3 temperature DOUBLE, 4 humidity DOUBLE)

UNION ALL

SELECT station_id, 'humidity', TOEPOCHMILLIS(reading_time, 'yyyy-MM-dd HH:mm:ss'), humidity
FROM CSV_TABLE(@JDBC_RESULT)
WITH (1 station_id STRING, 2 reading_time STRING, 3 temperature DOUBLE, 4 humidity DOUBLE);


Example: Daily Sales Import
This example imports daily sales totals from an ERP database. Each store becomes a GroveStreams component with revenue and transactions streams.

External SQL:

SELECT store_code, sale_date, total_revenue, transaction_count
FROM daily_sales_summary
WHERE sale_date >= '2024-01-01'
ORDER BY sale_date

Ingest SQL:

INSERT OR REPLACE INTO
    SAMPLE(CID, ID, TIME, SAMPLE)
SELECT store_code, 'revenue',
    TOEPOCHMILLIS(sale_date, 'yyyy-MM-dd'),
    total_revenue
FROM CSV_TABLE(@JDBC_RESULT)
WITH (1 store_code STRING, 2 sale_date STRING,
     3 total_revenue DOUBLE, 4 transaction_count LONG)

UNION ALL

SELECT store_code, 'transactions',
    TOEPOCHMILLIS(sale_date, 'yyyy-MM-dd'),
    transaction_count
FROM CSV_TABLE(@JDBC_RESULT)
WITH (1 store_code STRING, 2 sale_date STRING,
     3 total_revenue DOUBLE, 4 transaction_count LONG);

Each store's revenue and transaction count are tracked as separate streams, enabling independent roll-ups, derivations, and cross-store correlation within GroveStreams.

Example: Manufacturing Line — Incremental Import
This example imports production batch results from a manufacturing MES database, using @LAST_RUN_DATE to only fetch batches completed since the last import.

External SQL:

SELECT line_id, completed_at, units_produced, defect_count, cycle_time_sec
FROM production_batches
WHERE status = 'COMPLETE'
ORDER BY completed_at

Ingest SQL:

-- Only ingest batches completed after the last successful import
INSERT OR REPLACE INTO
    SAMPLE(CID, ID, TIME, SAMPLE)
SELECT line_id, 'units_produced',
    TOEPOCHMILLIS(completed_at, 'yyyy-MM-dd HH:mm:ss'),
    units_produced
FROM CSV_TABLE(@JDBC_RESULT)
WITH (1 line_id STRING, 2 completed_at STRING,
     3 units_produced LONG, 4 defect_count LONG, 5 cycle_time_sec DOUBLE)
WHERE TOEPOCHMILLIS(completed_at, 'yyyy-MM-dd HH:mm:ss') > @LAST_RUN_DATE

UNION ALL

SELECT line_id, 'defects',
    TOEPOCHMILLIS(completed_at, 'yyyy-MM-dd HH:mm:ss'),
    defect_count
FROM CSV_TABLE(@JDBC_RESULT)
WITH (1 line_id STRING, 2 completed_at STRING,
     3 units_produced LONG, 4 defect_count LONG, 5 cycle_time_sec DOUBLE)
WHERE TOEPOCHMILLIS(completed_at, 'yyyy-MM-dd HH:mm:ss') > @LAST_RUN_DATE

UNION ALL

SELECT line_id, 'cycle_time',
    TOEPOCHMILLIS(completed_at, 'yyyy-MM-dd HH:mm:ss'),
    cycle_time_sec
FROM CSV_TABLE(@JDBC_RESULT)
WITH (1 line_id STRING, 2 completed_at STRING,
     3 units_produced LONG, 4 defect_count LONG, 5 cycle_time_sec DOUBLE)
WHERE TOEPOCHMILLIS(completed_at, 'yyyy-MM-dd HH:mm:ss') > @LAST_RUN_DATE;

The WHERE clause in the Ingest SQL filters out batches already imported in a previous run. For better performance, apply the date filter in the External SQL if the remote database supports it. The External SQL runs as standard SQL against the remote database and does not have access to @LAST_RUN_DATE. Once imported, you can use GroveStreams derivations to calculate defect rates, OEE, and other KPIs automatically.

Tip: Use the AI Assistant
Writing Ingest SQL with CSV_TABLE and column mappings can be tedious. The GroveStreams AI Assistant can help generate the Ingest SQL for you. Describe your external table structure and what streams you want to create, and the assistant will produce the INSERT OR REPLACE INTO statement with the correct CSV_TABLE and WITH clause.