com.rpl.rama
+compound
(+compound target-pstate template)
(+compound template :> out)
Aggregation specified as a data structure template. Template can be any combination of maps and vectors, with aggregators at the leaves.
Can either be used to update PState partition on current task, or in a <<batch or <<query-topology block to aggregate input into a single value.
Examples:
(+compound $$p {*k {*k2 (aggs/+sum *v)}})
(+compound $$p {*k [(aggs/+count) (aggs/+sum *v)]})
(+compound {*k (aggs/+count)} :> *count-map)
+group-by
(+group-by key-expr & block)
Groups input by one or more keys and performs aggregations in the specified block on each group of data independently. Only usable in <<batch or <<query-topology block. Emits into postagg the vars for keys and results of aggregations.
Key can be specified as either a single key or vector of keys.
Examples:
(+group-by *k
(aggs/+sum *v :> *sum)
(aggs/+count :> *count))
(+group-by [*k1 *k2]
(aggs/+max *v :> *max-value))
<<atomic
(<<atomic & block)
Runs subblock until it completes synchronously. Then runs code following atomicBlock exactly one time.
Example:
user=> (?<-
(<<atomic
(range> 0 3 :> !v)
(println !v))
(println "DONE"))
0
1
2
DONE
nil
<<batch
(<<batch & block)
Specifies a batch block. A batch block is partially declarative and supports joins, multiple step aggregation, and more. See the extended documentation for details.
<<branch
Attaches block of code to the given anchors. If multiple anchors are provided, it inserts a unify>. Code after the <<branch
block attaches to the branch of code prior to its declaration.
Example:
(some-op-with-events
:event1> <a> *v
:event2> <b> *v
:> *o)
(<<branch <a> <b>
(println "Event value" *v))
(println *o)
<<cond
(<<cond & block)
Analogous to Clojure’s cond
. It declares cases using the subblock headers case>
and default>
, like so:
(<<cond
(case> (< *a 2))
(identity 3 :> *b)
(case> (< *a 4))
(identity 6 :> *b)
(case> (< *a 100))
(identity -1 :> *b)
)
(println "Res:" *b)
<<cond
unifies all declared branches only. If there’s no default>
case and no condition is matched, the continuation to <<cond
will not execute. Also, by the rules of unification the only vars in scope after the <<cond
block are those that are bound in all branches of the <<cond
.
Here’s an example using default>
:
(<<cond
(case> (< *a 10))
(identity 1 :> *b)
(println "Case 1")
(case> (< *a 100))
(identity 2 :> *b)
(println "Case 2")
(default>)
(raise-error! :no-match))
Because *b is not defined in the default>
case, it is not in scope following the <<cond
block. For cases like this where the default is to raise an error, you can tell <<cond
not to unify the default branch like this:
(<<cond
(case> (< *a 10))
(identity 1 :> *b)
(println "Case 1")
(case> (< *a 100))
(identity 2 :> *b)
(println "Case 2")
(default> :unify false)
(raise-error! :no-match))
(println "Res:" *b)
Now *b will be in scope following the <<cond
block.
<<do
(<<do & block)
Just like do in Clojure, <<do just executes its block. Use in cases where you are only allowed to have a single segment, for instance when you want to execute multiple segments following an inline hook:
(some-op :foo> *a :>> (<<do (println "1") (println "2"))
:> *b)
<<if
(<<if pred & block)
<<if
is the most commonly used conditional statement and works just like if
in Clojure. It takes in a then block and an else block separated by else>
, automatically unifying the two branches. If no else>
is specified, then that branch is still unified. For example:
(<<if (= 1 2)
(identity 1 :> !a)
(else>)
(identity 2 :> !a))
(println !a)
This prints 2
.
<<query-topology
macro
(<<query-topology topologies name & args)
Defines a query topology in a module. Query topologies are implicitly batch blocks.
<<ramafn
(<<ramafn fragvar args & decls)
Defines an anonymous Rama function within dataflow code. Output is done with :>
segment. Body must be fully synchronous, so operations like partitioners cannot be used in code leading to emit. If need body to be asynchronous, use <<ramaop.
Example:
(+ 1 2 :> *a)
(<<ramafn %f [*b *c]
(+ *a *b *c :> *d)
(:> (* 10 *d)))
(println (%f 4 5))
This prints 120
.
<<ramaop
(<<ramaop fragvar args & decls)
Defines an anonymous Rama operation within dataflow code. Output is done with :>
segment.
Example:
(<<ramaop %op [*a *b]
(|hash *a)
(local-select> [(keypath *a *b) (nil->val 0)] $$p :> *c)
(:> (inc *c)))
<<shadowif
Syntactic sugar for single-branch var overriding. Shadows the given var with the given replacement value if the var matches the predicate. For example:
(<<shadowif *image nil? "images/logo.png")
This code expands to:
(<<if (nil? *image)
(identity "images/logo.png" :> *image)
(else>)
(identity *image :> *image))
<<sources
macro
(<<sources topology & args)
Specifies a block of dataflow code that subscribes the given topology to one or more depots. Each subscription starts with a source> declaration.
Example:
(<<sources my-stream-topology
(source> *depot :> *key)
(+compound $$p {*key (aggs/+count)})
(source> *depot2 {:start-from (offset-ago 10000 :records)} :> {:keys [*key *value]})
(|hash *key)
(local-transform> [(keypath *key) (termval *value)] $$p2))
<<subsource
(<<subsource obj & block)
Like <<switch except always chooses the branch based on the exact class of the object. Optionally allows case-specific destructuring within each case>
statement. Useful when processing data on a depot that has heterogenous record types. Example:
(<<subsource *data
(case> TodoNew :> {:keys [*list-id *id *todo]})
(anchor> <TodoNewRoot>)
(<<branch <TodoNewRoot>
(|hash$$ $$todos *id)
(+compound $$todos {*id (aggs/+last *todo)}))
(<<branch <TodoNewRoot>
(|hash$$ $$todo-lists *list-id)
(+compound $$todo-lists {*list-id (aggs/+vec-agg *id)}))
(case> TodoEdit :> {:keys [*id *change-map]})
(|hash$$ $$todos *id)
(+compound $$todos {*id (aggs/+merge *change-map)})
)
<<switch
(<<switch switch-expr & block)
Like Clojure’s switch
, this chooses the branch to execute based on equality. For example:
(<<switch (+ 2 *v)
(case> 1)
(identity 2 :> *b)
(case> 2)
(identity 3 :> *b)
(default>)
(identity 4 :> *b))
(println *b)
Like <<cond, <<switch
unifies all branches and the usual rules of var scope after unification apply. Unlike <<cond
, if there is no default>
declared and nothing matches, <<switch
will raise an error.
<<with-substitutions
(<<with-substitutions bindings & code)
Substitutes Rama vars in its body with the associated expressions. Useful when accessing task globals dynamically in top-level deframaop or deframafn.
Example:
(deframaop dynamic-topology-code [*k]
(<<with-substitutions
[$$p (this-module-pobject-task-global "$$p")
$$mirror (pobject-task-global "com.mycompany.OtherModule" "$$p")]
(|hash$$ $$mirror *k)
(local-select> (keypath *k) $$mirror :> *otherv)
(|hash *k)
(local-select> (keypath *k) $$p :> *v)
(:> (str *v "-" *otherv))))
?<-
macro
(?<- & body)
Execute the given dataflow code on the current thread. Should not be used in modules. Can only execute code that doesn’t use parallel capabilities (e.g. partitioners).
Note that since this method compiles the code from scratch each invocation, it is not representative of the performance of dataflow code in a deployed module.
accumulator
(accumulator accum-op & {:keys [init-fn], :as options})
Defines an accumulator aggregator. accum-op
takes in any number of arguments and returns a path to apply to the aggregation target. init-fn
is an optional zero-arity function that specifies the initial value for aggregation if aggregating to a non-existent location.
Example:
(def +my-sum
(accumulator
(fn [v] (term (fn [v2] (+ v v2))))
:init-fn (fn [] 0)
))
ack-return>
(ack-return> *val)
Accumulate value according to depot source options to return with depot appends done with :ack
anchor>
Makes a node with the given anchor.
Example:
(+ 1 2 :> *a)
(anchor> <root>)
(println "hello")
(<<branch <root>
(println "branch" *a))
anchorvar?
(anchorvar? v)
Returns true
if the input is a symbol beginning with <
and ending with >
.
and>
Same behavior as Clojure’s and
for Rama dataflow code.
anon-ramavar?
(anon-ramavar? obj)
Returns true
if the input is an anonymous Rama var (e.g. _
, _*
, etc.).
anyvar?
Returns true
if the input is an anyvar, a Rama var beginning with *
.
assert!
macro
(assert! expression)
(assert! expression message)
(assert! expression message data)
Throw an rpl.rama.base.exceptions/AssertionError
if the given expression does not evaluate to truthy. Usable in Rama dataflow code.
batch<-
macro
(batch<- output-vars & code)
Defines a subbatch for use within a <<batch or <<query-topology block. See the extended documentation on batch blocks for more information.
block>
Use within defbasicblocksegmacro to splice a vector of dataflow code into another vector of dataflow code. Useful when generating dataflow code dynamically.
Example:
(defbasicblocksegmacro foo [arg1 arg2 arg3 arg4]
[[inc arg1 :> arg2]
[block> [[inc arg2 :> arg3] [inc arg3 :> arg4]]]])
case>
Case marker for <<cond, <<subsource, and <<switch.
close!
(close! closeable)
Calls the Java method close
on the object. In Rama used to release resources for proxy states or InProcessCluster.
combiner
(combiner combiner-op & {:keys [init-fn], :as options})
Defines a combiner aggregator. combiner-op
takes in two objects of the same form and merges them into a single, aggregated object.
In <<batch or <<query-topology blocks, combiner aggregators will parallelize their aggregation. This makes them able to do global aggregations scalably.
init-fn
is an optional zero-arity function that specifies the initial value for aggregation if aggregating to a non-existent location.
Example:
(def +my-combiner-sum
(combiner + :init-fn (fn [] 0)))
compiled-foreign-proxy
(compiled-foreign-proxy compiled-path pstate)
(compiled-foreign-proxy compiled-path pstate options)
Reactive query of PState with path that navigates to exactly one value. Return is a ProxyState whole value can be accessed any time with deref
. This function blocks until the initial value of the ProxyState
is known. Server pushes incremental diffs to returned ProxyState
in the background to maintain its value.
Accepts options:
:pkey
: partitioning key:callback-fn
: function of three arguments, “new val”, “diff”, and “old val”, that is called whenever a diff is received.
See the documentation for more details.
The path in this variant must be compiled.
compiled-foreign-proxy-async
(compiled-foreign-proxy-async compiled-path pstate)
(compiled-foreign-proxy-async compiled-path pstate {:keys [pkey callback-fn], :as options})
Reactive query of PState with path that navigates to exactly one value. Return is CompletableFuture containing a ProxyState whole value can be accessed any time with deref
. The CompletableFuture
is delivered when the initial value of the ProxyState
is known. Server pushes incremental diffs to returned ProxyState
in the background to maintain its value.
Accepts options:
:pkey
: partitioning key:callback-fn
: function of three arguments, “new val”, “diff”, and “old val”, that is called whenever a diff is received.
See the documentation for more details.
The path in this variant must be compiled.
compiled-foreign-select
(compiled-foreign-select compiled-path pstate)
(compiled-foreign-select compiled-path pstate options)
Queries PState with path, which must already be compiled. Returns sequence of navigated values.
Accepts :pkey
option for the partitioning key to use. If not provided and the PState has more than one partition, the path must start with a key navigator and the query will extract that key as the partitioning key.
See the documentation for more details.
compiled-foreign-select-async
(compiled-foreign-select-async compiled-path pstate)
(compiled-foreign-select-async compiled-path pstate {:keys [pkey], :as options})
Queries PState with path, which must already be compiled. Returns sequence of navigated values asynchronously in CompletableFuture.
Accepts :pkey
option for the partitioning key to use. If not provided and the PState has more than one partition, the path must start with a key navigator and the query will extract that key as the partitioning key.
See the documentation for more details.
compiled-foreign-select-one
(compiled-foreign-select-one compiled-path pstate)
(compiled-foreign-select-one compiled-path pstate options)
Queries PState with path, which must already be compiled. Path must navigate to exactly one value.
Accepts :pkey
option for the partitioning key to use. If not provided and the PState has more than one partition, the path must start with a key navigator and the query will extract that key as the partitioning key.
See the documentation for more details.
compiled-foreign-select-one-async
(compiled-foreign-select-one-async compiled-path pstate)
(compiled-foreign-select-one-async compiled-path pstate {:keys [pkey], :as options})
Queries PState with path, which must already be compiled. Path must navigate to exactly one value. Returns asynchronously in a CompletableFuture.
Accepts :pkey
option for the partitioning key to use. If not provided and the PState has more than one partition, the path must start with a key navigator and the query will extract that key as the partitioning key.
See the documentation for more details.
completable-future>
(completable-future> *completable-future)
Asynchronously wait for CompletableFuture to deliver and emit result if successful.
continue>
Returns to the beginning of a loop<-
body. Arguments are the new bindings for the loop’s vars.
declare-depot
macro
(declare-depot setup var-sym partitioner-spec)
(declare-depot setup var-sym partitioner-spec options)
Declared a depot with the given partitioning scheme.
Partitioner scheme can be:
:random
- hash-by
:disallow
, which disallows appends from foreign clients. Appends with this scheme are only allowed within modules.- Partitioner defined with defdepotpartitioner
Available options are:
:global?
: if this depot should have only a single partition. When this option is set, the partitioning scheme is irrelevant.
Examples:
(declare-depot setup *my-depot :random)
(declare-depot setup *my-depot2 (hash-by :url))
(declare-depot setup *my-depot3 (hash-by my-key-extraction-fn))
(declare-depot setup *my-depot4 :disallow)
(declare-depot setup *my-depot5 :random {:global? true})
declare-depot*
(declare-depot* setup var-sym partitioner-spec)
(declare-depot* setup var-sym partitioner-spec options)
Function version of declare-depot. Var in this version is specified as a symbol.
Example:
(declare-depot* setup '*my-depot :random)
declare-object
macro
(declare-object setup var-sym obj)
Defines an object to be copied to all tasks. If provided object implements TaskGlobalObject, the object will have a specialized instance per task. This can be used to integrate external queues, databases, or other tools with Rama.
See the documentation for more details.
declare-object*
(declare-object* setup var-sym obj)
Function version of declare-object. Var in this version is specified as a symbol.
Example:
(declare-object* setup '*my-object (->MyTaskGlobalValue))
declare-pstate
macro
(declare-pstate topology var-sym schema)
(declare-pstate topology var-sym schema options)
Declares a PState for the given topology with the given schema. The schema is specified as an arbitrary combination of maps, vectors, and sets. map-schema, set-schema, and vector-schema are used to define subindexing options.
Available options are:
:global?
: if this PState should have only a single partition:private?
: if this PState should only be accessible within this module:initial-value
: initial value for each partition of this PState:key-partitioner
: custom function for routing foreign PState client queries to partitions. Should be a reference to a function of two arguments,num-partitions
andkey
, that picks a partition between 0 andnum-partitions
.
See the documentation for more details.
Examples:
(declare-pstate mb $$p1 {clojure.lang.Keyword Object})
(declare-pstate mb $$p2 Long {:global? true})
(declare-pstate mb $$p3 {String (set-schema Long {:subindex? true})})
(declare-pstate mb $$p3 {Object Object} {:key-partitioner my-key-partitioner})
declare-pstate*
(declare-pstate* topology var-sym schema)
(declare-pstate* topology var-sym schema options)
Function version of declare-pstate. Var in this version is specified as a symbol.
Example:
(declare-pstate mb '$$my-pstate {Long (set-schema String {:subindex? true})})
declare-tick-depot
macro
(declare-tick-depot setup var-sym frequency-millis)
Defines a depot that emits at the given frequency of time. Can be used in any ETL topology and cannot be appended to by clients.
Example:
(declare-tick-depot setup *my-tick-depot 30000)
declare-tick-depot*
(declare-tick-depot* setup var-sym frequency-millis)
Function version of declare-tick-depot*. Var in this version is specified as a symbol.
Example:
(declare-tick-depot* setup '*my-tick-depot 30000)
declared-object-task-global
(declared-object-task-global name)
Dynamically retrieves the task global declared with declare-object. This can only fetch task globals that implement TaskGlobalObject.
Example:
(declared-object-task-global "*obj" :> *fetched-obj)
default>
Default case marker for <<cond.
defbasicblocksegmacro
macro
(defbasicblocksegmacro name doc-string? attr-map? arglist & body)
Create a Rama segment macro. This defines a Clojure function that returns Rama segments as vectors of data.
The arguments declaration specifies how to parse stream arguments. A *
suffix captures variable arity inputs into a single list.
Examples:
(defbasicblocksegmacro my-sm [arg1 arg2 :> output-arg :a> another-stream-arg]
[[+ arg1 arg2 :> '*v#]
[* '*v# output-arg :> another-stream-arg]])
(defbasicblocksegmacro my-sm2 [:<* all-inputs]
(vec
(for [a all-inputs]
[println "Runtime value:" a])))
defbasicsegmacro
macro
(defbasicsegmacro name doc-string? attr-map? inputs & body)
Create a Rama segment macro. This is the same as defbasicblocksegmacro except returns just a single segment.
Example:
(defbasicsegmacro [arg1 arg2 :> output-arg :a> another-stream-arg]
[+ arg1 arg2 another-stream-arg :> output-arg])
defblock
macro
(defblock name args & body)
defblock
provides a convenient way of making a Rama macro that accepts a variable-length block of dataflow code as input.
defblock
is for Rama macros that have some input parameters, a block of code, and no concept of separate inputs and outputs. The naming convention is to prefix these Rama macros with <<
to indicate they ignore anything given to :>
and so should not be used as expressions.
Example:
(defblock <<my-block-macro [a b block]
[[println "Before" a]
[<<atomic
[block> block]]
[println "After" b]])
defdepotpartitioner
macro
(defdepotpartitioner name args-vec & body)
Defines a custom depot partitioner as a function from the appended data and number of partitions to a partition number between 0 and the number of partitions.
Example:
(defdepotpartitioner partition-by-value-in-data
[data num-partitions]
(mod (:some-value data) num-partitions))
defgenerator
macro
(defgenerator name & body)
Clojure function that returns a subbatch defined with batch<- for use in a <<batch or <<query-topology block.
Example:
(defgenerator mygen
[source]
(batch<- [*count *sum]
(source :> *v)
(|global)
(aggs/+count)
(agg/+sum :> *sum)))
defmodule
macro
(defmodule sym params & body)
(defmodule sym options params & body)
Defines a Rama module with a Clojure function of setup
and topologies
.
The first argument after the symbol can be an options map. The :module-name
key can be added to this map to override the name of the module, which defaults to the symbol. The module name is always prefixed with the declaring namespace, such as com.mycompany.modules/FooModule
.
Examples:
(defmodule FooModule [setup topologies]
(declare-depot setup *depot :random))
(defmodule BarModule {:module-name "CarModule"} [setup topologies]
(declare-depot setup *depot :random))
defoperation
macro
(defoperation name & args)
Defines a Rama operation in Clojure that emits to one or more output streams. Can emit to each output stream zero or more times, and each emit can emit zero or more fields.
Example:
(defoperation foo-op
[out> :>
a> :a>]
[arg1 arg2]
(doseq [i (range arg2)]
(a> i (str i "!")))
(out> (str arg1 "?")))
This example emits a dynamic number of times to the :a>
stream and one time to the :>
stream.
deframafn
macro
(deframafn name args & decls)
Defines a Rama function implemented with dataflow code. Must emit exactly one time to :>
, and this must be the last operation of the body.
Example:
(deframafn foofn
[*a]
(* 10 *a :> *b)
(:> (inc *b)))
This example will throw a runtime error because it emits to :>
many times:
(deframafn badfn
[]
(ops/range> 0 10 :> *v)
(:> *v))
deframaop
macro
(deframaop name args & decls)
Defines a Rama operation implemented with dataflow code. Output is done with :>
segment.
Example:
(deframaop foo-op
[*a *b]
(:> (+ *a *b))
(- *a *b :> *v)
(:> *v))
depot-partition-append!
(depot-partition-append! *depot *data *ack-type)
Append data to a depot partition from within an ETL with the given ack level. Ack level can be :ack
, :append-ack
, or nil
.
else>
Else marker for <<if block.
filter>
(filter> !v)
Executes its continuation only if given truthy as an argument. For example:
(?<-
(filter> (= 1 1))
(println "A")
(filter> (= 1 0))
(println "B")
)
A
nil
fixed-key-additions
(fixed-key-additions key-set)
Migration option used with migrated to specify keys being added to a fixed-keys-schema
fixed-key-removals
(fixed-key-removals key-set)
Migration option used with migrated to specify keys being removed from a fixed-keys-schema
fixed-keys-schema
(fixed-keys-schema key->schema)
Declare schema for map with different schema for each key. Specified keys are the only ones allowed when writing, but they’re all optional.
Example:
(fixed-keys-schema
{:name String
:location String
:creation-time-millis Long
:followers (set-schema Long {:subindex? true})
})
foreign-append!
(foreign-append! depot data)
(foreign-append! depot data ack-level)
Appends data to a depot client with optional ack level.
Ack levels:
nil
: fire-and-forget:append-ack
: waits for data to be appended and replicated to depot partition:ack
: waits for data to be appended and replicated to depot partition and for all colocated stream topologies to finish processing it
If not specified, the ack level defaults to :ack
.
foreign-append-async!
(foreign-append-async! depot data)
(foreign-append-async! depot data ack-level)
foreign-depot
(foreign-depot cluster module-name depot-name)
Retrieve a client for a depot
foreign-depot-partition-info
(foreign-depot-partition-info depot partition-index)
Given the depot and the partition index returns a map with :start-offset
and :end-offset
.
foreign-depot-partition-info-async
(foreign-depot-partition-info-async depot partition-index)
Given the depot and the partition index returns a CompletableFuture that delivers a map with :start-offset
and :end-offset
.
foreign-depot-read
(foreign-depot-read depot partition-index start-offset end-offset)
Given the depot and partition index returns a list of records between the start offset (inclusive) and end offset (exclusive).
foreign-depot-read-async
(foreign-depot-read-async depot partition-index start-offset end-offset)
Given the depot and partition index returns a CompletableFuture that delivers a list of records between the start offset (inclusive) and end offset (exclusive).
foreign-invoke-query
(foreign-invoke-query query & args)
Invoke query topology with given arguments.
foreign-invoke-query-async
(foreign-invoke-query-async query & args)
Invoke query topology with given arguments. Returns result asynchronously in CompletableFuture
foreign-object-info
(foreign-object-info pobject)
Get information about the partitioned object. Returns a map with keys: - :name
- name of the partitioned object - :module-name
- module name in which the partitioned object belongs - :num-partitions
- number of partitions of the partitioned object
foreign-proxy
macro
(foreign-proxy path pstate)
(foreign-proxy path pstate options)
Reactive query of PState with path that navigates to exactly one value. Return is a ProxyState whole value can be accessed any time with deref
. This function blocks until the initial value of the ProxyState
is known. Server pushes incremental diffs to returned ProxyState
in the background to maintain its value.
Accepts options:
:pkey
: partitioning key:callback-fn
: function of three arguments, “new val”, “diff”, and “old val”, that is called whenever a diff is received.
See the documentation for more details.
foreign-proxy-async
macro
(foreign-proxy-async path pstate)
(foreign-proxy-async path pstate options)
Reactive query of PState with path that navigates to exactly one value. Return is CompletableFuture containing a ProxyState whole value can be accessed any time with deref
. The CompletableFuture
is delivered when the initial value of the ProxyState
is known. Server pushes incremental diffs to returned ProxyState
in the background to maintain its value.
Accepts options:
:pkey
: partitioning key:callback-fn
: function of three arguments, “new val”, “diff”, and “old val”, that is called whenever a diff is received.
See the documentation for more details.
foreign-pstate
(foreign-pstate cluster module-name pstate-name)
Retrieve client for a PState
foreign-query
(foreign-query cluster module-name query-name)
Retrieve a client for a query topology
foreign-select
macro
(foreign-select path pstate)
(foreign-select path pstate options)
Queries PState with path. Returns sequence of navigated values.
Accepts :pkey
option for the partitioning key to use. If not provided and the PState has more than one partition, the path must start with a key navigator and the query will extract that key as the partitioning key.
See the documentation for more details.
foreign-select-async
macro
(foreign-select-async path pstate)
(foreign-select-async path pstate options)
Queries PState with path. Returns sequence of navigated values asynchronously in CompletableFuture.
Accepts :pkey
option for the partitioning key to use. If not provided and the PState has more than one partition, the path must start with a key navigator and the query will extract that key as the partitioning key.
See the documentation for more details.
foreign-select-one
macro
(foreign-select-one path pstate)
(foreign-select-one path pstate options)
Queries PState with path. Path must navigate to exactly one value.
Accepts :pkey
option for the partitioning key to use. If not provided and the PState has more than one partition, the path must start with a key navigator and the query will extract that key as the partitioning key.
See the documentation for more details.
foreign-select-one-async
macro
(foreign-select-one-async path pstate)
(foreign-select-one-async path pstate options)
Queries PState with path. Path must navigate to exactly one value. Returns asynchronously in a CompletableFuture.
Accepts :pkey
option for the partitioning key to use. If not provided and the PState has more than one partition, the path must start with a key navigator and the query will extract that key as the partitioning key.
See the documentation for more details.
fragvar?
Returns true
if the input is a fragvar, a Rama var beginning with %
.
gen-anchorvar
(gen-anchorvar prefix)
(gen-anchorvar)
Generates a unique anchor. Optionally takes as input a prefix string.
gen-anyvar
(gen-anyvar)
(gen-anyvar prefix)
Generates a unique var beginning with *
. Optionally takes as input a prefix string.
gen-anyvars
(gen-anyvars amt)
Generates a sequence of unique anyvars. Usage: (gen-anyvars 10)
gen-fragvar
(gen-fragvar)
(gen-fragvar prefix)
Generates a unique var beginning with %
. Optionally takes as input a prefix string.
gen-pstatevar
(gen-pstatevar)
(gen-pstatevar prefix)
Generates a unique var beginning with $$
. Optionally takes as input a prefix string.
gen>
Used in <<batch or <<query-topology blocks to separate batch sources. The next segment will start a fresh source of data. Equivalent to freshBatchSource in the Java API.
See the documentation for more details.
generator
macro
(generator & body)
Clojure function that returns a subbatch defined with batch<- for use in a <<batch or <<query-topology block.
Example:
(generator [source]
(batch<- [*count *sum]
(source :> *v)
(|global)
(aggs/+count)
(agg/+sum :> *sum)))
get-module-name
(get-module-name module)
Retrieves the name of the module.
get-module-status
(get-module-status manager module-name)
Retrieves the status of the module.
hash-by
(hash-by ifn-or-string)
Defines depot partitioner that hashes by a key within appended values. ifn-or-string
can be either a keyword, string, or reference to a top-level Clojure function.
hook>
(hook> anchor)
Changes attachment point for subsequent code to node with given anchor
if>
(if> v :then> :else>)
Emits to :then>
or :else>
output stream depending on whether value is truthy. This is a lower-level operation than <<if, which is generally preferred. Unlike Clojure, if>
is not a special form and can be passed around like any other Rama operation.
ifexpr
(ifexpr v then-expr)
(ifexpr v then-expr else-expr)
Clojure-style conditional as a single expression.
Example:
(?<-
(println (ifexpr (= 1 1) "hi" "bye")))
hi
nil
invoke-query
(invoke-query query-topology-reference & args :> *result)
Invokes a colocated or mirror query topology.
Query topology reference is either the name of a colocated query topology or the var for a mirror query topology.
java-block<-
macro
(java-block<- & segments)
Specify a Block for the Java API with dataflow code defined in Clojure. This enables the Clojure API to use libraries for the Java API that require Block
as input to functions.
Example:
(java-block<-
(+ *a *b :> *c)
(* *c 10 :> *d))
java-macro!
macro
(java-macro! expr)
Insert code generated by a Java API macro into Clojure API dataflow code.
Example:
(* 10 *a :> *b)
(java-macro! (.genId module-unique-id-gen "*id"))
(println "New ID:" *id)
local-clear>
(local-clear> $s)
Removes all data from the PState partition on current task. Can only be used with PStates owned by stream topologies. Extremely expensive to run for large PState partitions.
local-select>
(local-select> path $$pstate :> *out-var)
(local-select> path $$pstate options-map :> *out-var)
Queries the PState partition on current task using the given path. Emits once for every navigated value.
Accepts option :allow-yield?, which when set to true
allows the select call to yield during execution in order to not run for too long at a time on the task thread. Navigators will automatically paginate while traversing structures to maximize efficiency. A stable view of the PState is used for the full select, so even if the path yields many times during execution, it’s always traversing the value of the PState when the select began.
Selects on mirror PStates are always allowed to yield.
local-transform>
(local-transform> path $$pstate)
loop<-
(loop<- bindings & body)
Creates a dataflow loop with the given vars and body. :>
is used to emit from the loop, and continue> is used to execute the loop body again. The vars are given initial values and updated with each call to continue>. The output vars of the loop are specified as part of the bindings.
The body of the loop can perform asynchronous calls, such as partitioners or mirror PState queries.
Example:
(loop<- [*counter 0 :> *task-id *res]
(:> (ops/current-task-id) *counter)
(|shuffle)
(<<if (< 10 *counter)
(continue> (inc *counter))
))
(println "Loop continuation:" *task-id *res)
map-schema
(map-schema key-schema value-schema)
(map-schema key-schema value-schema options)
Declare a map schema. Key schema must be a class reference, and value schema can be a class reference or a nested schema definition.
Accepts options map to specify subindexing options. Use {:subindex? true}
to declare subindexing with size tracking on. use {:subindex-options {:track-size? false}}
to declare subindexing with size tracking off.
Examples:
(map-schema Object Long)
(map-schema Object Object {:subindex? true})
(map-schema Object Object {:subindex-options {:track-size? false}})
(map-schema Object (set-schema String {:subindex? true}) {:subindex? true})
materialize>
(materialize> & args :> $$temp-pstate)
Useful in more complicated microbatch topologies, materialize>
lets you capture the results of a <<batch
call in an in-memory PState that you can re-use for further <<batch
in the same microbatch topology. Called at the end of a <<batch
block, materialize>
captures those fields into an in-memory PState exactly how they’re currently partitioned. For instance:
(source> *some-source :> %microbatch)
(<<batch
(%microbatch :> [*k *v])
(+group-by *k
(+sum *v :> *sum))
(materialize> *k *sum :> $$t)
)
(anchor> <s>)
(<<branch <s>
(<<batch
($$t :> *k *sum)
...
))
(<<branch <s>
(<<batch
($$t :> *k *sum)
...
))
This code is able to efficiently re-use that first computation as many times as it wishes. This is an alternative to declaring everything in defgenerator calls and explicitly re-using a particular subbatch.
microbatch-topology
(microbatch-topology topologies name)
Create a microbatch topology for the module.
migrate-to-subindexed
(migrate-to-subindexed)
Migration option used with migrated to specify a value is being changed to a subindexed structure.
migrated
(migrated new-schema migration-id migration-fn)
(migrated new-schema migration-id migration-fn migration-options)
Declares a migration at a location in a PState schema. This can only be used at positions within a PState that are serialized as a whole, e.g. the value of a subindexed map or the value of a top level map. Migrations are instantaneous in that after a module update all reads will return fully migrated results. The supplied migration-fn
is applied on reads while Rama migrates the values on disk in the background.
migration-id
is used to determine if a migration should be restarted or continue where it left off when the module is updated mid-migration. If migration-id
remains the same, it continues where it left off. Otherwise, it restarts from the beginning of the PState.
migration-fn
is a regular Clojure function that converts values at that position from the old schema to the new schema. It must be idempotent, as it may be called on either a migrated or unmigrated value.
migration-options
is a vector that may contain migrate-to-subindexed, fixed-key-additions, or fixed-key-removals.
See the documentation on migrations for the full details.
mirror-depot
macro
(mirror-depot setup var-sym module-name depot-name)
Defines a mirror depot to access a depot on another module using the specified var.
Example:
(mirror-depot setup *my-mirror "com.mycompany.module/Foo" "*depot")
mirror-depot*
(mirror-depot* setup var-sym module-name depot-name)
Function version of mirror-depot. Var in this version is specified as a symbol.
Example:
(mirror-depot* setup '*depot "com.mycompany.module/Foo" "*depot")
mirror-pstate
macro
(mirror-pstate setup var-sym module-name pstate-name)
Defines a mirror PState to access a PState on another module using the specified var.
Example:
(mirror-pstate setup $$mirror "com.mycompany.module/Foo" "$$p")
mirror-pstate*
(mirror-pstate* setup var-sym module-name pstate-name)
Function version of mirror-pstate. Var in this version is specified as a symbol.
Example:
(mirror-pstate* setup '$$p "com.mycompany.module/Foo" "$$p")
mirror-query
macro
(mirror-query setup var-sym module-name query-topology-name)
Defines a mirror query topology to access a query topology on another module using the specified var. The query topology can be invoked inside the topology with invoke-query.
Example:
(mirror-query setup *my-foo-query "com.mycompany.module/Foo" "foo-query")
mirror-query*
(mirror-query* setup var-sym module-name query-topology-name)
Function version of mirror-query. Var in this version is specified as a symbol. The query topology can be invoked inside the topology with invoke-query.
Example:
(mirror-query* setup '*foo-query "com.mycompany.module/Foo" "foo-query")
module
macro
(module params & body)
(module options-map params & body)
Defines an anonymous Rama module with a Clojure function of setup
and topologies
.
The first argument can be an options map. The :module-name
key can be added to this map to override the auto-generated name of the module. The module name is always prefixed with the declaring namespace, such as com.mycompany.modules/FooModule
.
Examples:
(module [setup topologies]
(declare-depot setup *depot :random))
(module {:module-name "FooModule"} [setup topologies]
(declare-depot setup *depot :random))
NONE>
NONE
cannot be used in Rama code since Specter is used in the processing of Rama code, so NONE
will never be captured as a value. NONE>
is equal to (termval NONE)
and is used to remove values in paths used in Rama code.
offset-after-timestamp-millis
(offset-after-timestamp-millis timestamp-millis)
Start from option for depot subscriptions. Only applies for new topologies that have not processed any data yet. Begins processing on each depot partition at the first depot record appended after the given timestamp.
Example:
(source> *depot {:start-from (offset-after-timestamp-millis 12345678)} :> *data)
offset-ago
(offset-ago offset-amount offset-unit)
Start from option for depot subscriptions. Only applies for new topologies that have not processed any data yet. Begins processing on each depot partition at the record corresponding to the number of units prior. Allowed units are :records
, :millis
, :seconds
, :minutes
, :hours
, :days
, :weeks
, :months
, and :years
.
Examples:
(source> *depot {:start-from (offset-ago 1000 :records)} :> *data)
(source> *depot {:start-from (offset-ago 4 :weeks)} :> *data)
(source> *depot {:start-from (offset-ago 200 :minutes)} :> *data)
open-cluster-manager
(open-cluster-manager)
(open-cluster-manager config)
Create a cluster manager. The cluster manager is primarily used to fetch clients to depots, PStates, and query topologies with foreign-depot, foreign-pstate, and foreign-query.
With no arguments, it uses the configuration in the rama.yaml
file on classpath. Otherwise, it uses the provided config map. Configuration must contain Conductor connection information, such as “conductor.host”. If separate external and internal hostnames are configured, uses external hostnames.
open-cluster-manager-internal
(open-cluster-manager-internal)
(open-cluster-manager-internal config)
Create a cluster manager. The cluster manager is primarily used to fetch clients to depots, PStates, and query topologies with foreign-depot, foreign-pstate, and foreign-query.
With no arguments, it uses the configuration in the rama.yaml
file on classpath. Otherwise, it uses the provided config map. Configuration must contain Conductor connection information, such as “conductor.host”. If separate external and internal hostnames are configured, uses internal hostnames.
or>
Same behavior as Clojure’s or
for Rama dataflow code.
path>
Captures a compiled Specter path. Used by local-select>, local-transform>, and other similar operations. However, this has uses on its own as well, such as when defining accumulator aggregators.
Creating the path at runtime is extremely fast because it precompiles the path at compile-time.
Example:
(path> (keypath *k) AFTER-ELEM (termval 3) :> *path)
pobject-task-global
(pobject-task-global module-name name)
Dynamically retrieves the task global depot or PState. Objects are identified by their owning module name and declared object name.
Example:
(pobject-task-global "com.mycompany/OtherModule" "$$p" :> $$mirror)
(pobject-task-global "com.mycompany/OtherModule" "*depot" :> *mirror-depot)
(pobject-task-global "com.mycompany/MyModule" "$$p" :> $$p)
proxy-status
(proxy-status proxy)
Returns current status of this proxy. Returns :active, :terminated, or :terminated-ungracefully
pstatevar?
(pstatevar? o)
Returns true
if the input is a PState var, a symbol beginning with $$
.
query-topology-task-global
(query-topology-task-global module-name name)
Dynamically retrieves an operation to invoke a query topology. The query topology is identified by its owning module name and topology name.
Example:
(query-topology-task-global "com.mycompany/OtherModule" "q" :> %query)
ramafn>
(ramafn> % fragvars)
Compiler directive that the given anonymous fragvars are ramafns. This makes the invokes of those anonymous fragvars more efficient.
Example:
(ramafn> %f1 %f2 %f3)
ramavar?
(ramavar? obj)
Returns true
if the input is a Rama var of any kind.
recur>
(recur> & params)
Like Clojure recur
but for Rama dataflow code within ramafns. Has limitation that the body of the ramafn between the start and the recur>
callsite has no ramaop invokes.
seg#
(seg# & body)
Used to mark nested segments returned from defbasicblocksegmacro, defbasicsegmacro, or defblock.
Example:
(defbasicsegmacro my-sm [arg1 arg2 :> output-arg]
[+ arg1 (seg# inc arg2) :> output-arg])
This is equivalent to:
(defbasicblocksegmacro my-sm [arg1 arg2 :> output-arg]
[[inc arg2 :> '*v#]
[+ arg1 '*v# :> output-arg]])
select>
(select> path $$pstate :> *out-var)
(select> path $$pstate options-map :> *out-var)
(select> path *obj :> *out-var)
Queries the object using the given path. When used on a PState, changes partitions based on the path. Emits once for every navigated value.
Accepts option :allow-yield?, which when set to true
allows the select call to yield during execution in order to not run for too long at a time on the task thread. Navigators will automatically paginate while traversing structures to maximize efficiency. A stable view of the PState is used for the full select, so even if the path yields many times during execution, it’s always traversing the value of the PState when the select began.
Selects on mirror PStates are always allowed to yield.
set-schema
(set-schema value-schema)
(set-schema value-schema options)
Declare a set schema. Value schema must be a class reference.
Accepts options map to specify subindexing options. Use {:subindex? true}
to declare subindexing with size tracking on. use {:subindex-options {:track-size? false}}
to declare subindexing with size tracking off.
Examples:
(set-schema Long)
(set-schema Object {:subindex? true})
(set-schema String {:subindex-options {:track-size? false}})
source-segments-as-data
(source-segments-as-data source-segments)
Convert Rama source forms into evaluable Clojure data forms embeddable into a segmacro. Useful for bridging a Clojure macro to a Rama segmacro.
source>
(source> *depot :> out-var)
(source> *depot *options :> out-var)
Subscribes topology to specified depot. For stream topology, options map can contain :start-from
, :retry-mode
, or :ack-return-agg
. Microbatch topology accepts :start-from
only.
Allowed values for options:
:start-from
::end
(default),:beginning
, offset-ago, or offset-after-timestamp-millis:retry-mode
::individual
(default),:all-after
, or:none
:ack-return-agg
: aggregator to use for streaming ack returns, defaults to using the last aggregated value
stream-topology
(stream-topology topologies name)
Create a stream topology for the module.
this-module-pobject-task-global
(this-module-pobject-task-global name)
Dynamically retrieves the task global depot or PState declared in this module. Objects are identified by their declared object name.
Example:
(this-module-pobject-task-global "$$p" :> $$p)
(this-module-pobject-task-global "*depot" :> *fetched-depot)
this-module-query-topology-task-global
(this-module-query-topology-task-global name)
Dynamically retrieves an operation to invoke a query topology defined in the same module. The query topology is identified by its owning module name and topology name.
Example:
(this-module-query-topology-task-global "q" :> %query)
throw!
(throw! e)
Throw an exception. Used in Rama dataflow code since Clojure throw
is a special form. Also does tap> on the error, so used in Clojure contexts as well.
unify>
(unify> & anchors)
Attaches subsequent code to each of the anchors specified, merging computation flowing out of those nodes. After the unify the only vars in scope are vars that exist in all parent branches.
value-schema
(value-schema klass)
Declare a schema for values matching a particular class.
vector-schema
(vector-schema value-schema)
(vector-schema value-schema options)
Declare a vector schema. Value schema can be a class reference or a nested schema definition.
Accepts options map to specify subindexing options. Use {:subindex? true}
to declare subindexing. Unlike other schema types, size tracking cannot be disabled for vector-schema since Rama needs the size to efficiently compute the index for a newly appended element.
Examples:
(vector-schema Long)
(vector-schema Object {:subindex? true})
(vector-schema (set-schema String {:subindex? true}) {:subindex? true})
yield-if-overtime
(yield-if-overtime)
Yield the task thread if current event is using too much time. Commonly used in loop bodies.
|all
(|all)
Sends continuation to all tasks.
|all$$
|custom
(|custom afn & args)
Partitions according to a custom function. Function accepts as arguments the number of tasks followed by the provided arguments.
Example:
(<<ramafn %custom
[*num-tasks *arg1 *arg2]
(:> (mod (+ *arg1 *arg2) *num-tasks)))
(|custom %custom 10 -11)
|custom$$
(|custom$$ afn & args)
Partitions according to a custom function. Function accepts as arguments the number of partitions followed by the provided arguments.
Example:
(<<ramafn %custom
[*num-partitions *arg1 *arg2]
(:> (mod (+ *arg1 *arg2) *num-partitions)))
(|custom$$ $$p %custom 10 -11)
|direct
(|direct *task-id)
Sends continuation to given task id.
|direct$$
|global
(|global)
Sends all continuations to task 0.
|global$$
|hash
(|hash *field)
Chooses task to partition to based on hash(field) % num-tasks.
|hash$$
|origin
(|origin)
Special partitioner for use in query topologies. Indicate to partition back to the caller of the query. Must be the final partitioner of the query topology definition.
|path$$
A special partitioner on pstates. Takes in as args the pstate
and the path
to partition based on. Pstates can be configured with knowledge on how to read a path and determine to which partition to navigate to, and |path$$
uses that functionality to partition.
By default pstates navigate based on hash partitioning expecting the first element of a path to be keypath. If not, they will error if the path given to |path$$
does not start with keypath
.
|shuffle
(|shuffle)
Partitions to a single task chosen in random round robin order. Once one event has gone to each task, re-randomizes order again.