Quick Processor

The Quick Processor allows you to create filters and modify records using JavaScript. In this article, we show how to create a Quick Processor and how to access, filter, and modify records.

 

KaDecks Quick Processor feature

Transforming and calculating data sets with the Quick Processor.

 

A Quick Processor is always linked to a view. This means that a Quick Processor is saved when a view is saved.

Hello World

This code randomly picks (filters) a record out of your stream and sets the record's value to "Hello World!".


if ( Math.random() < 0.5) // Filter a record randomly.
    return false; // If false, the record is not displayed.
    
rec.value = "Hello World!"; // Sets a new value for each record

return rec; // Returns the new record with "Hello World!".

 

Usage Scenarios

Transform records

Use the Quick Processor to modify existing attributes of your data objects or add new ones that can be used for filtering using the attribute filters or sorting.

 

Data Correction

In combination with the Multi-Select functionality, you can use the Quick Processor to correct data, select it via Multi-Select and send it to a stream again.

 

Calculation of results based on multiple data sets (e.g., moving average or grouping)

Use the state store of the Quick Processor to store calculation results across multiple executions (e.g., to calculate a moving average) or to group data sets.

 

Complex filter logic

With JavaScript you are capable of modeling even the most complicated business logic. In combination with the attribute filters, you can quickly create useful reports for data analytics or business users.

 

Architecture

The following diagram illustrates the data flow of a data set. The Quick Processor is executed after the decoding process for each individual data set. 

A Quick Processor can be used for data transformation, but can also act as a filter with complex logic or as a state store to create aggregates. Therefore, the result of a Quick Processor execution is not always a data set.

The attribute filters are executed in the last step following the Quick Processor.

 

The data flow with a Quick Processor enabled

The position of the Quick Processor within the data flow when retrieving data.

 

 

Accessible attributes

You can access the attributes of a record by accessing the rec object.

The following attributes are accessible:

 

Attribute Type Description
Value any The record's decoded value
Key any The record's decoded key
Timestamp number The record's timestamp
Topic / Stream string The stream in which the record was published.
Partition / Shard number The partition or shard in which the record was published.
Offset (Apache Kafka only) number The offset of the record within the stream. (Apache Kafka only)
Headers (Apache Kafka only) [{  key: string,
  value: byte[] }]
An array containing the headers as objects. The header objects have a key (the name of the header) and a value attribute (the value as a byte array). 

 

You can access the attributes in code as follows:

rec.value       // record's decoded value

rec.key // record's decoded key
rec.timestamp // record's timestamp
rec.topic // the topic
rec.partition // record's partition / shard
rec.offset // the offset (Apache Kafka only)
rec.headers // record's headers (Apache Kafka only)

// If the value or key is a JSON object
rec.value.myValueAttribute
rec.key.myKeyAttribute

 

Tip: Quickly copy the path of an attribute to the clipboard

Writing out the path of an attribute can be annoying. Therefore, select a record in the record list and click on the corresponding attribute in the detail view. This opens the filter bubble. Click on the name of the attribute on the left. You now have the complete path in the clipboard.

 

State and stateful calculations

The Quick Processor is executed for each individual data record. To group records or perform calculations based on a previous data set or result, a state can be saved in the state store.

A state is saved with the store(any) command. The restore():any command can be used to load the state from the state memory into a variable, for example. To initialize a state memory (e.g. to set an initial value), the command hasStore():boolean can be used to query whether a state already exists.

Moving average example

var fahrenheit = rec.value;

if(!hasStore())
store(fahrenheit); // Init state with current temperature

// Divide the previous average by 2;
store((restore() + fahrenheit) / 2);

// Setting the calculated and the original value as the new record's value
rec.value = { "Average":restore(), "Fahrenheit": fahrenheit };

// Returning the new record with the new value
return rec;

Tip: Store JSON Objects or arrays if you need to store multiple results (or records).

You can also store entire JSON objects or arrays instead of a single value. Please make sure to delete the objects as soon as they are no longer needed to avoid filling up the memory during a request.

 

Filtering data

To filter records, return a boolean value as the return value of the Quick Processor.

A simple, though not very useful, illustrative example that randomly selects and filters (displays) a record in your stream:

// If true, the record is shown; otherwise the record is filtered.

return Math.random() < 0.5;

 

Example: Finding critical values

Suppose we have data with information about the temperature of a sensor. Each record's value is encoded as a JSON object and contains the temperature information as an attribute called "temperature". To detect errors early, we want to know when a certain threshold value (>212 and <32)  is exceeded.

return rec.value.temperature > 212 || rec.value.temperature < 32;

 

Transforming data

In addition to boolean values, you can also use complete data objects as return values. This allows you to modify a data record completely. Note that filtering and transforming a record can be combined: return false  if the record does not meet the criteria; otherwise, return the modified record.

Example:
We have sensor data that shows us temperature information from our machines in Fahrenheit. But we are interested in the values in Celsius.

rec.value.celsius = (rec.value.temperature - 32) * 5/9;

return rec;

Note how we add a new attribute  celsius to our data object's value.

 

Right management

This section only applies to KaDeck Web.

To modify or create a Quick Processor, the user must have both the QuickProcessorModify right and the TopicAccessRead right.

QuickProcessorModify can be assigned for individual connections/environments. In combination with the TopicAccessRead right, which can be assigned at topic level (including wildcard and namespaces), the use of the Quick Processor can be defined for individual topics, namespaces, and environments in a fine-grained manner.

{
    "action": "QuickProcessorModify",
    "effect": "Allow",
    "resource": "*"
  },
  {
    "action": "TopicAccessRead",
    "effect": "Allow",
    "resource": "*:mybusiness.myteam.*"
  }

In the example above, the use of the Quick Processor is granted for all connections and environments in KaDeck, but only for streams beginning with the name mybusiness.myteam.

 

 

 

 

Base 64 Encoding / Decoding:

ATOB and BTOA are now supported.

Legacy, prior KaDeck version 3.X: Currently, ATOB or BTOA are not supported in the Quick Processor. You can use the following functions instead.


function b2a(a) {
var c, d, e, f, g, h, i, j, o, b = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/=", k = 0, l = 0, m = "", n = [];
if (!a) return a;
do c = a.charCodeAt(k++), d = a.charCodeAt(k++), e = a.charCodeAt(k++), j = c << 16 | d << 8 | e,
f = 63 & j >> 18, g = 63 & j >> 12, h = 63 & j >> 6, i = 63 & j, n[l++] = b.charAt(f) + b.charAt(g) + b.charAt(h) + b.charAt(i); while (k < a.length);
return m = n.join(""), o = a.length % 3, (o ? m.slice(0, o - 3) :m) + "===".slice(o || 3);
}

function a2b(a) {
var b, c, d, e = {}, f = 0, g = 0, h = "", i = String.fromCharCode, j = a.length;
for (b = 0; 64 > b; b++) e["ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/".charAt(b)] = b;
for (c = 0; j > c; c++) for (b = e[a.charAt(c)], f = (f << 6) + b, g += 6; g >= 8; ) ((d = 255 & f >>> (g -= 8)) || j - 2 > c) && (h += i(d));
return h;
} rec.value = b2a(rec.value); return rec;
Was this article helpful?
2 out of 5 found this helpful

Comments

0 comments

Please sign in to leave a comment.