Operators
Operators are APIs that enable developers to customize their data applications. The operators have opinionated function signatures and flexible types. Some operators can be used independently, whereas others must be chained to accomplish the task.
Summary
The engine implements transforms
operators chained in an array inside a transforms section and named
operators following a hierarchical convention.
The tranforms operators are:
The named operators are:
The engine also uses keyword operators to identify a multi-step operation:
Let's take a look at each operator in detail.
Transforms Operators
The transforms
section is an array that allows multiple operators to be chained and perform composite operations.
map
The map
operator takes an input record, applies a transformation, and forwards the modified record to the next operator.
For example, given an user record, you can mask their social security number with the following map
operation:
transforms:
- operator: map
run: |
fn mask_ssn(user: User) -> Result<User> {
let mut u = user.clone();
u.ssn = u.ssn.replace(|c: char| c.is_digit(10), "*");
Ok(u)
}
Check out map example in github for a working demo.
filter
The filter
operator takes a record and returns a boolean that tells the engine whether to drop it or forward it to the next operator:
true
- foward the record to the next operator.false
- drop the record.
While in most cases, one operator's output becomes the next operator's input; the filter
operation works differently. The operator returns a boolean to control the state, and the engine then sends the record to the next operator.
The following example defines a filter
that drops all sentences without a question mark:
transforms:
- operator: filter
run: |
fn filter_questions(input: String) -> Result<bool> {
Ok(input.contains("?"))
}
Check out filter example in github for a working demo.
filter-map
The filter-map
operator combines a filter and a map operation; it takes a record and returns a modified record or none. The return value tells the engine to forward the records to the next operator or to drop them:
Some(Record)
- foward the record to the next operator.None
- drop the record.
The following example defines a filter-map
that transforms sentences longer than ten characters to uppercase and drops all others:
transforms:
- operator: filter-map
run: |
fn long_sentence_to_uppercase(input: String) -> Result<Option<String>> {
if input.len() > 10 {
Ok(Some(input.to_uppercase()))
} else {
Ok(None)
}
}
Check out filter-map example in github for a working demo.
flat-map
The flat-map
operator takes an aggregate record and returns an array of records. The engine reads the array and sends individual records, one at a time, to the next operator.
The following example defines a flat-map
that splits sentences into words:
transforms:
- operator: flat-map
run: |
fn split_sentence(sentence: String) -> Result<Vec<String>> {
Ok(sentence.split_whitespace().map(String::from).collect())
}
Check out flat-map example in github for a working demo.
Named Operators
Named operators are pre-pended by a keyword that identifies the operator's function. The keywords are defined in the next section.
assign-timestamp
The assign-timestamp
operator lets you choose a timestamp for window processing. Timestamps help the engine distributed records to windows. The timestamp may be defined from the record metadata or from the record value.
Assign timestamp from record metadata
The following example shows how to update the timestamp from the record metadata, passed down through the event_time
field:
assign-timestamp:
run: |
fn assign_timestamp(_sentence: String, event_time: i64) -> Result<i64> {
Ok(event_time)
}
Check out word-counter example in github for a working demo.
Assign timestamp from record value
Assuming an user_event record has a timestamp
field, the following example shows how to update the timestamp from the record value:
assign-timestamp:
run: |
fn assign_timestamp(user_event: UserEvent, _event_time: i64) -> Result<i64> {
Ok(user_event.timestamp)
}
The assign_timestamp
operation is mandatory in window processing
assign-key
The assign-key
operation assigns a key to the record metadata. The engine uses the key to determine which records to group in the same partition.
The following example shows how the assign-key operator groups cars by their color:
assign-key:
run: |
fn key_by_color(car: Car) -> Result<String> {
Ok(car.color)
}
Check out car-count example in github for a working demo.
update-state
The update-state
operator informs engine that a state object update is imminent, and this is the last step for the service.
The following example shows how the update-state
operator updates the state object temperature()
with the latest temperature value:
update-state:
run: |
fn update_temperature(event: Temp) -> Result<()> {
let mut temp = temperature();
temp.sensor = event.sensor;
temp.temperature = event.temperature;
temp.update()?;
Ok(())
}
Check out update-state example in github for a working demo.
flush
The flush
operator is the final stage of a window processing service. The engine invokes flush after the watermark closes the window. The flush API is expected to return a value passed to the sink.
Assuming a window operator that generates a state object that counts car colors, the following example shows how the flush
operator returns a list of colors and their counts:
flush:
run: |
fn get_car_color_count() -> Result<CarColors> {
let cc = count_by_color().clone();
Ok(cc.into_iter().map(|(color, count)|
Color {
color,
count
}
).collect())
}
Check out car-count example in github for a working demo.
Keyword Operators
Keyword operators wrap a hierarchy of operation under its name. The keyword operators are: partition
and window
.
partition
The partition
operation divides the stream into data sets that are processed in parallel. Each partition will have its own state. The partition operation has the following sub-sections:
Assign-key is mandatory, whereas tranforms
and update-state
are optional, but at least one of them is required.
The following example shows how the partition
operation divides the stream into words and updates the state object count_per_word()
with the latest word count:
partition:
assign-key:
run: |
fn assign_word_key(word: String) -> Result<String> {
Ok(word.to_lowercase().chars().filter(|c| c.is_alphanumeric()).collect())
}
update-state:
run: |
fn increment_word_count(word: String) -> Result<()> {
count_per_word().increment(1);
Ok(())
}
Check out word-counter example in github for a working demo.
window
The window
operation turns streams of records into finite groups of records captured based on the window size. The window operation is a multi-state operation with the following sub-section:
While there are several types of windows, and the engine will eventually implement all of them, the first version supports tumbling window. A tumbling window
is a window that is continuous and non-overlapping.
Building on the example above, a tumbling window of 20 seconds that computes the top 3 words in a stream of sentences has the following hierarchy:
window:
tumbling:
duration: 20s
assign-timestamp:
run: |
fn assign_event_timestamp(_value: String, event_time: i64) -> Result<i64> {
Ok(event_time)
}
partition:
assign-key:
run: |
fn assign_word_key(word: String) -> Result<String> {
Ok(word.to_lowercase().chars().filter(|c| c.is_alphanumeric()).collect())
}
update-state:
run: |
fn increment_word_count(word: String) -> Result<()> {
count_per_word().increment(1);
Ok(())
}
flush:
run: |
fn compute_most_used_words() -> Result<TopWords> {
let word_counts = count_per_word();
let top3 = word_counts.sql("select * from count_per_word order by count desc limit 3")?;
let rows = top3.rows()?;
let columns = top3.schema(["_key","count"])?;
let mut top_words = vec![];
match &columns[..] {
[key,value] => {
while rows.next() {
let word = rows.str(&key)?;
let count = rows.u32(&value)?;
let word_count = WordCount { word, count };
top_words.push(word_count);
}
},
_ => return Err(anyhow::anyhow!("unexpected schema")),
}
Ok(top_words)
}
First section is the window properties, followed by partitioning, and finally the flush.
Check out word-counter example in github for a working demo.