com.rpl.rama

+compound

(+compound target-pstate template)(+compound template :> out)

Aggregation specified as a data structure template. Template can be any combination of maps and vectors, with aggregators at the leaves.

Can either be used to update PState partition on current task, or in a <<batch or <<query-topology block to aggregate input into a single value.

Examples:

(+compound $$p {*k {*k2 (aggs/+sum *v)}})

(+compound $$p {*k [(aggs/+count) (aggs/+sum *v)]})

(+compound {*k (aggs/+count)} :> *count-map)

+group-by

(+group-by key-expr & block)

Groups input by one or more keys and performs aggregations in the specified block on each group of data independently. Only usable in <<batch or <<query-topology block. Emits into postagg the vars for keys and results of aggregations.

Key can be specified as either a single key or vector of keys.

Examples:

(+group-by *k
  (aggs/+sum *v :> *sum)
  (aggs/+count :> *count))
(+group-by [*k1 *k2]
  (aggs/+max *v :> *max-value))

<<atomic

(<<atomic & block)

Runs subblock until it completes synchronously. Then runs code following atomicBlock exactly one time.

Example:

user=> (?<-
        (<<atomic
          (range> 0 3 :> !v)
          (println !v))
        (println "DONE"))
0
1
2
DONE
nil

<<batch

(<<batch & block)

Specifies a batch block. A batch block is partially declarative and supports joins, multiple step aggregation, and more. See the extended documentation for details.

<<branch

Attaches block of code to the given anchors. If multiple anchors are provided, it inserts a unify>. Code after the <<branch block attaches to the branch of code prior to its declaration.

Example:

(some-op-with-events
  :event1> <a> *v
  :event2> <b> *v
  :> *o)
 (<<branch <a> <b>
   (println "Event value" *v))
 (println *o)

<<cond

(<<cond & block)

Analogous to Clojure’s cond. It declares cases using the subblock headers case> and default>, like so:

(<<cond
  (case> (< *a 2))
  (identity 3 :> *b)

  (case> (< *a 4))
  (identity 6 :> *b)

  (case> (< *a 100))
  (identity -1 :> *b)
  )
(println "Res:" *b)

<<cond unifies all declared branches only. If there’s no default> case and no condition is matched, the continuation to <<cond will not execute. Also, by the rules of unification the only vars in scope after the <<cond block are those that are bound in all branches of the <<cond.

Here’s an example using default>:

(<<cond
  (case> (< *a 10))
  (identity 1 :> *b)
  (println "Case 1")

  (case> (< *a 100))
  (identity 2 :> *b)
  (println "Case 2")

  (default>)
  (raise-error! :no-match))

Because *b is not defined in the default> case, it is not in scope following the <<cond block. For cases like this where the default is to raise an error, you can tell <<cond not to unify the default branch like this:

(<<cond
  (case> (< *a 10))
  (identity 1 :> *b)
  (println "Case 1")

  (case> (< *a 100))
  (identity 2 :> *b)
  (println "Case 2")

  (default> :unify false)
  (raise-error! :no-match))
(println "Res:" *b)

Now *b will be in scope following the <<cond block.

<<do

(<<do & block)

Just like do in Clojure, <<do just executes its block. Use in cases where you are only allowed to have a single segment, for instance when you want to execute multiple segments following an inline hook:

(some-op :foo> *a :>> (<<do (println "1") (println "2"))
         :> *b)

<<if

(<<if pred & block)

<<if is the most commonly used conditional statement and works just like if in Clojure. It takes in a then block and an else block separated by else>, automatically unifying the two branches. If no else> is specified, then that branch is still unified. For example:

(<<if (= 1 2)
  (identity 1 :> !a)
 (else>)
  (identity 2 :> !a))
(println !a)

This prints 2.

<<query-topology

macro

(<<query-topology topologies name & args)

Defines a query topology in a module. Query topologies are implicitly batch blocks.

<<ramafn

(<<ramafn fragvar args & decls)

Defines an anonymous Rama function within dataflow code. Output is done with :> segment. Body must be fully synchronous, so operations like partitioners cannot be used. If need body to be asynchronous, use <<ramaop.

Example:

(+ 1 2 :> *a)
(<<ramafn %f [*b *c]
  (+ *a *b *c :> *d)
  (:> (* 10 *d)))
(println (%f 4 5))

This prints 120.

<<ramaop

(<<ramaop fragvar args & decls)

Defines an anonymous Rama operation within dataflow code. Output is done with :> segment.

Example:

(<<ramaop %op [*a *b]
  (|hash *a)
  (local-select> [(keypath *a *b) (nil->val 0)] $$p :> *c)
  (:> (inc *c)))

<<shadowif

Syntactic sugar for single-branch var overriding. Shadows the given var with the given replacement value if the var matches the predicate. For example:

(<<shadowif *image nil? "images/logo.png")

This code expands to:

(<<if (nil? *image)
  (identity "images/logo.png" :> *image)
 (else>)
  (identity *image :> *image))

<<sources

macro

(<<sources topology & args)

Specifies a block of dataflow code that subscribes the given topology to one or more depots. Each subscription starts with a source> declaration.

Example:

(<<sources my-stream-topology
  (source> *depot :> *key)
  (+compound $$p {*key (aggs/+count)})

  (source> *depot2 {:start-from (offset-ago 10000 :records)} :> {:keys [*key *value]})
  (|hash *key)
  (local-transform> [(keypath *key) (termval *value)] $$p2))

<<subsource

(<<subsource obj & block)

Like <<switch except always chooses the branch based on the exact class of the object. Optionally allows case-specific destructuring within each case> statement. Useful when processing data on a depot that has heterogenous record types. Example:

(<<subsource *data
 (case> TodoNew :> {:keys [*list-id *id *todo]})
  (anchor> <TodoNewRoot>)
  (<<branch <TodoNewRoot>
    (|hash$$ $$todos *id)
    (+compound $$todos {*id (aggs/+last *todo)}))
  (<<branch <TodoNewRoot>
    (|hash$$ $$todo-lists *list-id)
    (+compound $$todo-lists {*list-id (aggs/+vec-agg *id)}))

 (case> TodoEdit :> {:keys [*id *change-map]})
  (|hash$$ $$todos *id)
  (+compound $$todos {*id (aggs/+merge *change-map)})
 )

<<switch

(<<switch switch-expr & block)

Like Clojure’s switch, this chooses the branch to execute based on equality. For example:

(<<switch (+ 2 *v)
  (case> 1)
  (identity 2 :> *b)

  (case> 2)
  (identity 3 :> *b)

  (default>)
  (identity 4 :> *b))
(println *b)

Like <<cond, <<switch unifies all branches and the usual rules of var scope after unification apply. Unlike <<cond, if there is no default> declared and nothing matches, <<switch will raise an error.

?<-

macro

(?<- & body)

Execute the given dataflow code on the current thread. Should not be used in modules. Can only execute code that doesn’t use parallel capabilities (e.g. partitioners).

Note that since this method compiles the code from scratch each invocation, it is not representative of the performance of dataflow code in a deployed module.

accumulator

(accumulator accum-op & {:keys [init-fn], :as options})

Defines an accumulator aggregator. accum-op takes in any number of arguments and returns a path to apply to the aggregation target. init-fn is an optional zero-arity function that specifies the initial value for aggregation if aggregating to a non-existent location.

Example:

(def +my-sum
  (accumulator
    (fn [v] (term (fn [v2] (+ v v2))))
    :init-fn (fn [] 0)
    ))

ack-return>

(ack-return> *val)

Accumulate value according to depot source options to return with depot appends done with :ack

anchor>

Makes a node with the given anchor.

Example:

(+ 1 2 :> *a)
(anchor> <root>)
(println "hello")
(<<branch <root>
  (println "branch" *a))

anchorvar?

(anchorvar? v)

Returns true if the input is a symbol beginning with < and ending with >.

and>

Same behavior as Clojure’s and for Rama dataflow code.

anon-ramavar?

(anon-ramavar? obj)

Returns true if the input is an anonymous Rama var (e.g. _, _*, etc.).

anyvar?

Returns true if the input is an anyvar, a Rama var beginning with *.

assert!

macro

(assert! expression)(assert! expression message)(assert! expression message data)

Throw an rpl.rama.base.exceptions/AssertionError if the given expression does not evaluate to truthy. Usable in Rama dataflow code.

batch<-

macro

(batch<- output-vars & code)

Defines a subbatch for use within a <<batch or <<query-topology block. See the extended documentation on batch blocks for more information.

block>

Use within defbasicblocksegmacro to splice a vector of dataflow code into another vector of dataflow code. Useful when generating dataflow code dynamically.

Example:

(defbasicblocksegmacro foo [arg1 arg2 arg3 arg4]
  [[inc arg1 :> arg2]
   [block> [[inc arg2 :> arg3] [inc arg3 :> arg4]]]])

case>

Case marker for <<cond, <<subsource, and <<switch.

close!

(close! closeable)

Calls the Java method close on the object. In Rama used to release resources for proxy states or InProcessCluster.

combiner

(combiner combiner-op & {:keys [init-fn], :as options})

Defines a combiner aggregator. combiner-op takes in two objects of the same form and merges them into a single, aggregated object.

In <<batch or <<query-topology blocks, combiner aggregators will parallelize their aggregation. This makes them able to do global aggregations scalably.

init-fn is an optional zero-arity function that specifies the initial value for aggregation if aggregating to a non-existent location.

Example:

(def +my-combiner-sum
  (combiner + :init-fn (fn [] 0)))

compiled-foreign-proxy

(compiled-foreign-proxy compiled-path pstate)(compiled-foreign-proxy compiled-path pstate options)

Reactive query of PState with path that navigates to exactly one value. Return is a ProxyState whole value can be accessed any time with deref. This function blocks until the initial value of the ProxyState is known. Server pushes incremental diffs to returned ProxyState in the background to maintain its value.

Accepts options:

  • :pkey: partitioning key
  • :callback-fn: function of three arguments, “new val”, “diff”, and “old val”, that is called whenever a diff is received.

See the documentation for more details.

The path in this variant must be compiled.

compiled-foreign-proxy-async

(compiled-foreign-proxy-async compiled-path pstate)(compiled-foreign-proxy-async compiled-path pstate {:keys [pkey callback-fn], :as options})

Reactive query of PState with path that navigates to exactly one value. Return is CompletableFuture containing a ProxyState whole value can be accessed any time with deref. The CompletableFuture is delivered when the initial value of the ProxyState is known. Server pushes incremental diffs to returned ProxyState in the background to maintain its value.

Accepts options:

  • :pkey: partitioning key
  • :callback-fn: function of three arguments, “new val”, “diff”, and “old val”, that is called whenever a diff is received.

See the documentation for more details.

The path in this variant must be compiled.

compiled-foreign-select

(compiled-foreign-select compiled-path pstate)(compiled-foreign-select compiled-path pstate options)

Queries PState with path, which must already be compiled. Returns sequence of navigated values.

Accepts :pkey option for the partitioning key to use. If not provided and the PState has more than one partition, the path must start with a key navigator and the query will extract that key as the partitioning key.

See the documentation for more details.

compiled-foreign-select-async

(compiled-foreign-select-async compiled-path pstate)(compiled-foreign-select-async compiled-path pstate {:keys [pkey], :as options})

Queries PState with path, which must already be compiled. Returns sequence of navigated values asynchronously in CompletableFuture.

Accepts :pkey option for the partitioning key to use. If not provided and the PState has more than one partition, the path must start with a key navigator and the query will extract that key as the partitioning key.

See the documentation for more details.

compiled-foreign-select-one

(compiled-foreign-select-one compiled-path pstate)(compiled-foreign-select-one compiled-path pstate options)

Queries PState with path, which must already be compiled. Path must navigate to exactly one value.

Accepts :pkey option for the partitioning key to use. If not provided and the PState has more than one partition, the path must start with a key navigator and the query will extract that key as the partitioning key.

See the documentation for more details.

compiled-foreign-select-one-async

(compiled-foreign-select-one-async compiled-path pstate)(compiled-foreign-select-one-async compiled-path pstate {:keys [pkey], :as options})

Queries PState with path, which must already be compiled. Path must navigate to exactly one value. Returns asynchronously in a CompletableFuture.

Accepts :pkey option for the partitioning key to use. If not provided and the PState has more than one partition, the path must start with a key navigator and the query will extract that key as the partitioning key.

See the documentation for more details.

completable-future>

(completable-future> *completable-future)

Asynchronously wait for CompletableFuture to deliver and emit result if successful.

continue>

Returns to the beginning of a loop<- body. Arguments are the new bindings for the loop’s vars.

declare-depot

macro

(declare-depot setup var-sym partitioner-spec)(declare-depot setup var-sym partitioner-spec options)

Declared a depot with the given partitioning scheme.

Partitioner scheme can be:

  • :random
  • hash-by
  • :disallow, which disallows appends from foreign clients. Appends with this scheme are only allowed within modules.
  • Partitioner defined with defdepotpartitioner

Available options are:

  • :global?: if this depot should have only a single partition. When this option is set, the partitioning scheme is irrelevant.

Examples:

(declare-depot setup *my-depot :random)
(declare-depot setup *my-depot2 (hash-by :url))
(declare-depot setup *my-depot3 (hash-by my-key-extraction-fn))
(declare-depot setup *my-depot4 :disallow)
(declare-depot setup *my-depot5 :random {:global? true})

declare-depot*

(declare-depot* setup var-sym partitioner-spec)(declare-depot* setup var-sym partitioner-spec options)

Function version of declare-depot. Var in this version is specified as a symbol.

Example:

(declare-depot* setup '*my-depot :random)

declare-object

macro

(declare-object setup var-sym obj)

Defines an object to be copied to all tasks. If provided object implements TaskGlobalObject, the object will have a specialized instance per task. This can be used to integrate external queues, databases, or other tools with Rama.

See the documentation for more details.

declare-object*

(declare-object* setup var-sym obj)

Function version of declare-object. Var in this version is specified as a symbol.

Example:

(declare-object* setup '*my-object (->MyTaskGlobalValue))

declare-pstate

macro

(declare-pstate topology var-sym schema)(declare-pstate topology var-sym schema options)

Declares a PState for the given topology with the given schema. The schema is specified as an arbitrary combination of maps, vectors, and sets. map-schema, set-schema, and vector-schema are used to define subindexing options.

Available options are:

  • :global?: if this PState should have only a single partition
  • :private?: if this PState should only be accessible within this module
  • :initial-value: initial value for each partition of this PState
  • :key-partitioner: custom function for routing foreign PState client queries to partitions. Should be a reference to a function of two arguments, num-partitions and key, that picks a partition between 0 and num-partitions.

See the documentation for more details.

Examples:

(declare-pstate mb $$p1 {clojure.lang.Keyword Object})
(declare-pstate mb $$p2 Long {:global? true})
(declare-pstate mb $$p3 {String (set-schema Long {:subindex? true})})
(declare-pstate mb $$p3 {Object Object} {:key-partitioner my-key-partitioner})

declare-pstate*

(declare-pstate* topology var-sym schema)(declare-pstate* topology var-sym schema options)

Function version of declare-pstate. Var in this version is specified as a symbol.

Example:

(declare-pstate mb '$$my-pstate {Long (set-schema String {:subindex? true})})

declare-tick-depot

macro

(declare-tick-depot setup var-sym frequency-millis)

Defines a depot that emits at the given frequency of time. Can be used in any ETL topology and cannot be appended to by clients.

Example:

(declare-tick-depot setup *my-tick-depot 30000)

declare-tick-depot*

(declare-tick-depot* setup var-sym frequency-millis)

Function version of declare-tick-depot*. Var in this version is specified as a symbol.

Example:

(declare-tick-depot* setup '*my-tick-depot 30000)

default>

Default case marker for <<cond.

defbasicblocksegmacro

macro

(defbasicblocksegmacro name doc-string? attr-map? arglist & body)

Create a Rama segment macro. This defines a Clojure function that returns Rama segments as vectors of data.

The arguments declaration specifies how to parse stream arguments. A * suffix captures variable arity inputs into a single list.

Examples:

(defbasicblocksegmacro my-sm [arg1 arg2 :> output-arg :a> another-stream-arg]
  [[+ arg1 arg2 :> '*v#]
   [* '*v# output-arg :> another-stream-arg]])
(defbasicblocksegmacro my-sm2 [:<* all-inputs]
  (vec
    (for [a all-inputs]
      [println "Runtime value:" a])))

defbasicsegmacro

macro

(defbasicsegmacro name doc-string? attr-map? inputs & body)

Create a Rama segment macro. This is the same as defbasicblocksegmacro except returns just a single segment.

Example:

(defbasicsegmacro [arg1 arg2 :> output-arg :a> another-stream-arg]
  [+ arg1 arg2 another-stream-arg :> output-arg])

defblock

macro

(defblock name args & body)

defblock provides a convenient way of making a Rama macro that accepts a variable-length block of dataflow code as input.

defblock is for Rama macros that have some input parameters, a block of code, and no concept of separate inputs and outputs. The naming convention is to prefix these Rama macros with << to indicate they ignore anything given to :> and so should not be used as expressions.

Example:

(defblock <<my-block-macro [a b block]
  [[println "Before" a]
   [<<atomic
    [block> block]]
   [println "After" b]])

defdepotpartitioner

macro

(defdepotpartitioner name args-vec & body)

Defines a custom depot partitioner as a function from the appended data and number of partitions to a partition number between 0 and the number of partitions.

Example:

(defdepotpartitioner partition-by-value-in-data
  [data num-partitions]
  (mod (:some-value data) num-partitions))

defgenerator

macro

(defgenerator name & body)

Clojure function that returns a subbatch defined with batch<- for use in a <<batch or <<query-topology block.

Example:

(defgenerator mygen
  [source]
  (batch<- [*count *sum]
           (source :> *v)
           (|global)
           (aggs/+count)
           (agg/+sum :> *sum)))

defmodule

macro

(defmodule sym params & body)(defmodule sym options params & body)

Defines a Rama module with a Clojure function of setup and topologies.

The first argument after the symbol can be an options map. The :module-name key can be added to this map to override the name of the module, which defaults to the symbol. The module name is always prefixed with the declaring namespace, such as com.mycompany.modules/FooModule.

Examples:

(defmodule FooModule [setup topologies]
  (declare-depot setup *depot :random))
(defmodule BarModule {:module-name "CarModule"} [setup topologies]
  (declare-depot setup *depot :random))

defoperation

macro

(defoperation name & args)

Defines a Rama operation in Clojure that emits to one or more output streams. Can emit to each output stream zero or more times, and each emit can emit zero or more fields.

Example:

(defoperation foo-op
  [out> :>
   a> :a>]
  [arg1 arg2]
  (doseq [i (range arg2)]
   (a> i (str i "!")))
  (out> (str arg1 "?")))

This example emits a dynamic number of times to the :a> stream and one time to the :> stream.

deframafn

macro

(deframafn name args & decls)

Defines a Rama function implemented with dataflow code. Must emit exactly one time to :>, and this must be the last operation of the body.

Example:

(deframafn foofn
  [*a]
  (* 10 *a :> *b)
  (:> (inc *b)))

This example will throw a runtime error because it emits to :> many times:

(deframafn badfn
  []
  (ops/range> 0 10 :> *v)
  (:> *v))

deframaop

macro

(deframaop name args & decls)

Defines a Rama operation implemented with dataflow code. Output is done with :> segment.

Unlike <<ramaop, partitioners cannot be used in deframaop bodies.

Example:

(deframaop foo-op
  [*a *b]
  (:> (+ *a *b))
  (- *a *b :> *v)
  (:> *v))

depot-partition-append!

(depot-partition-append! *depot *data *ack-type)

Append data to a depot partition from within an ETL with the given ack level. Ack level can be :ack, :append-ack, or nil.

else>

Else marker for <<if block.

filter>

(filter> !v)

Executes its continuation only if given truthy as an argument. For example:

(?<-
 (filter> (= 1 1))
 (println "A")
 (filter> (= 1 0))
 (println "B")
 )
A
nil

fixed-keys-schema

(fixed-keys-schema key->schema)

Declare schema for map with different schema for each key. Specified keys are the only ones allowed when writing, but they're all optional.

Example:

(fixed-keys-schema
 {:name String
  :location String
  :creation-time-millis Long
  :followers (set-schema Long {:subindex? true})
  })

foreign-append!

(foreign-append! depot data)(foreign-append! depot data ack-level)

Appends data to a depot client with optional ack level.

Ack levels:

  • nil: fire-and-forget
  • :append-ack: waits for data to be appended and replicated to depot partition
  • :ack: waits for data to be appended and replicated to depot partition and for all colocated stream topologies to finish processing it

If not specified, the ack level defaults to :ack.

foreign-append-async!

(foreign-append-async! depot data)(foreign-append-async! depot data ack-level)

foreign-depot

(foreign-depot cluster module-name depot-name)

Retrieve a client for a depot

foreign-invoke-query

(foreign-invoke-query query & args)

Invoke query topology with given arguments.

foreign-invoke-query-async

(foreign-invoke-query-async query & args)

Invoke query topology with given arguments. Returns result asynchronously in CompletableFuture

foreign-proxy

macro

(foreign-proxy path pstate)(foreign-proxy path pstate options)

Reactive query of PState with path that navigates to exactly one value. Return is a ProxyState whole value can be accessed any time with deref. This function blocks until the initial value of the ProxyState is known. Server pushes incremental diffs to returned ProxyState in the background to maintain its value.

Accepts options:

  • :pkey: partitioning key
  • :callback-fn: function of three arguments, “new val”, “diff”, and “old val”, that is called whenever a diff is received.

See the documentation for more details.

foreign-proxy-async

macro

(foreign-proxy-async path pstate)(foreign-proxy-async path pstate options)

Reactive query of PState with path that navigates to exactly one value. Return is CompletableFuture containing a ProxyState whole value can be accessed any time with deref. The CompletableFuture is delivered when the initial value of the ProxyState is known. Server pushes incremental diffs to returned ProxyState in the background to maintain its value.

Accepts options:

  • :pkey: partitioning key
  • :callback-fn: function of three arguments, “new val”, “diff”, and “old val”, that is called whenever a diff is received.

See the documentation for more details.

foreign-pstate

(foreign-pstate cluster module-name pstate-name)

Retrieve client for a PState

foreign-query

(foreign-query cluster module-name query-name)

Retrieve a client for a query topology

foreign-select

macro

(foreign-select path pstate)(foreign-select path pstate options)

Queries PState with path. Returns sequence of navigated values.

Accepts :pkey option for the partitioning key to use. If not provided and the PState has more than one partition, the path must start with a key navigator and the query will extract that key as the partitioning key.

See the documentation for more details.

foreign-select-async

macro

(foreign-select-async path pstate)(foreign-select-async path pstate options)

Queries PState with path. Returns sequence of navigated values asynchronously in CompletableFuture.

Accepts :pkey option for the partitioning key to use. If not provided and the PState has more than one partition, the path must start with a key navigator and the query will extract that key as the partitioning key.

See the documentation for more details.

foreign-select-one

macro

(foreign-select-one path pstate)(foreign-select-one path pstate options)

Queries PState with path. Path must navigate to exactly one value.

Accepts :pkey option for the partitioning key to use. If not provided and the PState has more than one partition, the path must start with a key navigator and the query will extract that key as the partitioning key.

See the documentation for more details.

foreign-select-one-async

macro

(foreign-select-one-async path pstate)(foreign-select-one-async path pstate options)

Queries PState with path. Path must navigate to exactly one value. Returns asynchronously in a CompletableFuture.

Accepts :pkey option for the partitioning key to use. If not provided and the PState has more than one partition, the path must start with a key navigator and the query will extract that key as the partitioning key.

See the documentation for more details.

fragvar?

Returns true if the input is a fragvar, a Rama var beginning with %.

gen-anchorvar

(gen-anchorvar prefix)(gen-anchorvar)

Generates a unique anchor. Optionally takes as input a prefix string.

gen-anyvar

(gen-anyvar)(gen-anyvar prefix)

Generates a unique var beginning with *. Optionally takes as input a prefix string.

gen-anyvars

(gen-anyvars amt)

Generates a sequence of unique anyvars. Usage: (gen-anyvars 10)

gen-fragvar

(gen-fragvar)(gen-fragvar prefix)

Generates a unique var beginning with %. Optionally takes as input a prefix string.

gen-pstatevar

(gen-pstatevar)(gen-pstatevar prefix)

Generates a unique var beginning with $$. Optionally takes as input a prefix string.

gen>

Used in <<batch or <<query-topology blocks to separate batch sources. The next segment will start a fresh source of data. Equivalent to freshBatchSource in the Java API.

See the documentation for more details.

generator

macro

(generator & body)

Clojure function that returns a subbatch defined with batch<- for use in a <<batch or <<query-topology block.

Example:

(generator [source]
  (batch<- [*count *sum]
           (source :> *v)
           (|global)
           (aggs/+count)
           (agg/+sum :> *sum)))

get-module-name

(get-module-name module)

Retrieves the name of the module.

get-module-status

(get-module-status manager module-name)

Retrieves the status of the module.

hash-by

(hash-by ifn)

Defines depot partitioner that hashes by a key within appended values. ifn can be either a keyword or reference to a top-level Clojure function.

hook>

(hook> anchor)

Changes attachment point for subsequent code to node with given anchor

if>

(if> v :then> :else>)

Emits to :then> or :else> output stream depending on whether value is truthy. This is a lower-level operation than <<if, which is generally preferred. Unlike Clojure, if> is not a special form and can be passed around like any other Rama operation.

ifexpr

(ifexpr v then-expr)(ifexpr v then-expr else-expr)

Clojure-style conditional as a single expression.

Example:

(?<-
  (println (ifexpr (= 1 1) "hi" "bye")))
hi
nil

invoke-query

(invoke-query query-topology-reference & args :> *result)

Invokes a colocated or mirror query topology.

Query topology reference is either the name of a colocated query topology or the var for a mirror query topology.

java-block<-

macro

(java-block<- & segments)

Specify a Block for the Java API with dataflow code defined in Clojure. This enables the Clojure API to use libraries for the Java API that require Block as input to functions.

Example:

(java-block<-
  (+ *a *b :> *c)
  (* *c 10 :> *d))

java-macro!

macro

(java-macro! expr)

Insert code generated by a Java API macro into Clojure API dataflow code.

Example:

(* 10 *a :> *b)
(java-macro! (.genId module-unique-id-gen "*id"))
(println "New ID:" *id)

local-clear>

(local-clear> $s)

Removes all data from the PState partition on current task. Can only be used with PStates owned by stream topologies. Extremely expensive to run for large PState partitions.

local-select>

(local-select> path $$pstate :> *out-var)

Queries the PState partition on current task using the given path. Emits once for every navigated value.

local-transform>

(local-transform> path $$pstate)

Transforms the PState partition on current task with the given path. Transform path must use term, termval, or NONE> at leaves.

loop<-

(loop<- bindings & body)

Creates a dataflow loop with the given vars and body. :> is used to emit from the loop, and continue> is used to execute the loop body again. The vars are given initial values and updated with each call to continue>. The output vars of the loop are specified as part of the bindings.

The body of the loop can perform asynchronous calls, such as partitioners or mirror PState queries.

Example:

(loop<- [*counter 0 :> *task-id *res]
  (:> (ops/current-task-id) *counter)
  (|shuffle)
  (<<if (< 10 *counter)
    (continue> (inc *counter))
    ))
(println "Loop continuation:" *task-id *res)

map-schema

(map-schema key-schema value-schema)(map-schema key-schema value-schema options)

Declare a map schema. Key schema must be a class reference, and value schema can be a class reference or a nested schema definition.

Accepts options map to specify subindexing options. Use {:subindex? true} to declare subindexing with size tracking on. use {:subindex-options {:track-size? false}} to declare subindexing with size tracking off.

Examples:

(map-schema Object Long)
(map-schema Object Object {:subindex? true})
(map-schema Object Object {:subindex-options {:track-size? false}})
(map-schema Object (set-schema String {:subindex? true}) {:subindex? true})

materialize>

(materialize> & args :> $$temp-pstate)

Useful in more complicated microbatch topologies, materialize> lets you capture the results of a <<batch call in an in-memory PState that you can re-use for further <<batch in the same microbatch topology. Called at the end of a <<batch block, materialize> captures those fields into an in-memory PState exactly how they’re currently partitioned. For instance:

(source> *some-source :> %microbatch)
 (<<batch
   (%microbatch :> [*k *v])
   (+group-by *k
     (+sum *v :> *sum))
   (materialize> *k *sum :> $$t)
   )
 (anchor> <s>)
 (<<branch <s>
   (<<batch
     ($$t :> *k *sum)
     ...
     ))
 (<<branch <s>
   (<<batch
     ($$t :> *k *sum)
     ...
     ))

This code is able to efficiently re-use that first computation as many times as it wishes. This is an alternative to declaring everything in defgenerator calls and explicitly re-using a particular subbatch.

microbatch-topology

(microbatch-topology topologies name)

Create a microbatch topology for the module.

mirror-depot

macro

(mirror-depot setup var-sym module-name depot-name)

Defines a mirror depot to access a depot on another module using the specified var.

Example:

(mirror-depot setup *my-mirror "com.mycompany.module/Foo" "*depot")

mirror-depot*

(mirror-depot* setup var-sym module-name depot-name)

Function version of mirror-depot. Var in this version is specified as a symbol.

Example:

(mirror-depot* setup '*depot "com.mycompany.module/Foo" "*depot")

mirror-pstate

macro

(mirror-pstate setup var-sym module-name pstate-name)

Defines a mirror PState to access a PState on another module using the specified var.

Example:

(mirror-pstate setup $$mirror "com.mycompany.module/Foo" "$$p")

mirror-pstate*

(mirror-pstate* setup var-sym module-name pstate-name)

Function version of mirror-pstate. Var in this version is specified as a symbol.

Example:

(mirror-pstate* setup '$$p "com.mycompany.module/Foo" "$$p")

mirror-query

macro

(mirror-query setup var-sym module-name query-topology-name)

Defines a mirror query topology to access a query topology on another module using the specified var. The query topology can be invoked inside the topology with invoke-query.

Example:

(mirror-query setup *my-foo-query "com.mycompany.module/Foo" "foo-query")

mirror-query*

(mirror-query* setup var-sym module-name query-topology-name)

Function version of mirror-query. Var in this version is specified as a symbol. The query topology can be invoked inside the topology with invoke-query.

Example:

(mirror-query* setup '*foo-query "com.mycompany.module/Foo" "foo-query")

module

macro

(module params & body)(module options-map params & body)

Defines an anonymous Rama module with a Clojure function of setup and topologies.

The first argument can be an options map. The :module-name key can be added to this map to override the auto-generated name of the module. The module name is always prefixed with the declaring namespace, such as com.mycompany.modules/FooModule.

Examples:

(module [setup topologies]
  (declare-depot setup *depot :random))
(module {:module-name "FooModule"} [setup topologies]
  (declare-depot setup *depot :random))

NONE>

NONE cannot be used in Rama code since Specter is used in the processing of Rama code, so NONE will never be captured as a value. NONE> is equal to (termval NONE) and is used to remove values in paths used in Rama code.

offset-after-timestamp-millis

(offset-after-timestamp-millis timestamp-millis)

Start from option for depot subscriptions. Only applies for new topologies that have not processed any data yet. Begins processing on each depot partition at the first depot record appended after the given timestamp.

Example:

(source> *depot {:start-from (offset-after-timestamp-millis 12345678)} :> *data)

offset-ago

(offset-ago offset-amount offset-unit)

Start from option for depot subscriptions. Only applies for new topologies that have not processed any data yet. Begins processing on each depot partition at the record corresponding to the number of units prior. Allowed units are :records, :millis, :seconds, :minutes, :hours, :days, :weeks, :months, and :years.

Examples:

(source> *depot {:start-from (offset-ago 1000 :records)} :> *data)
(source> *depot {:start-from (offset-ago 4 :weeks)} :> *data)
(source> *depot {:start-from (offset-ago 200 :minutes)} :> *data)

open-cluster-manager

(open-cluster-manager)(open-cluster-manager config)

Create a cluster manager. The cluster manager is primarily used to fetch clients to depots, PStates, and query topologies with foreign-depot, foreign-pstate, and foreign-query.

With no arguments, it uses the configuration in the rama.yaml file on classpath. Otherwise, it uses the provided config map. Configuration must contain Conductor connection information, such as “conductor.host”. If separate external and internal hostnames are configured, uses external hostnames.

open-cluster-manager-internal

(open-cluster-manager-internal)(open-cluster-manager-internal config)

Create a cluster manager. The cluster manager is primarily used to fetch clients to depots, PStates, and query topologies with foreign-depot, foreign-pstate, and foreign-query.

With no arguments, it uses the configuration in the rama.yaml file on classpath. Otherwise, it uses the provided config map. Configuration must contain Conductor connection information, such as “conductor.host”. If separate external and internal hostnames are configured, uses internal hostnames.

or>

Same behavior as Clojure’s or for Rama dataflow code.

path>

Captures a compiled Specter path. Used by local-select>, local-transform>, and other similar operations. However, this has uses on its own as well, such as when defining accumulator aggregators.

Creating the path at runtime is extremely fast because it precompiles the path at compile-time.

Example:

(path> (keypath *k) AFTER-ELEM (termval 3) :> *path)

pstatevar?

(pstatevar? o)

Returns true if the input is a PState var, a symbol beginning with $$.

ramafn>

(ramafn> % fragvars)

Compiler directive that the given anonymous fragvars are ramafns. This makes the invokes of those anonymous fragvars more efficient.

Example:

(ramafn> %f1 %f2 %f3)

ramavar?

(ramavar? obj)

Returns true if the input is a Rama var of any kind.

recur>

(recur> & params)

Like Clojure recur but for Rama dataflow code within ramafns. Has limitation that the body of the ramafn between the start and the recur> callsite has no ramaop invokes.

seg#

(seg# & body)

Used to mark nested segments returned from defbasicblocksegmacro, defbasicsegmacro, or defblock.

Example:

(defbasicsegmacro my-sm [arg1 arg2 :> output-arg]
  [+ arg1 (seg# inc arg2) :> output-arg])

This is equivalent to:

(defbasicblocksegmacro my-sm [arg1 arg2 :> output-arg]
  [[inc arg2 :> '*v#]
   [+ arg1 '*v# :> output-arg]])

select>

(select> path $$pstate :> *out-var)

Queries the PState using the given path. Changes partitions based on the path. Emits once for every navigated value.

set-schema

(set-schema value-schema)(set-schema value-schema options)

Declare a set schema. Value schema must be a class reference.

Accepts options map to specify subindexing options. Use {:subindex? true} to declare subindexing with size tracking on. use {:subindex-options {:track-size? false}} to declare subindexing with size tracking off.

Examples:

(set-schema Long)
(set-schema Object {:subindex? true})
(set-schema String {:subindex-options {:track-size? false}})

source>

(source> *depot :> out-var)(source> *depot *options :> out-var)

Subscribes topology to specified depot. For stream topology, options map can contain :start-from, :retry-mode, or :ack-return-agg. Microbatch topology accepts :start-from only.

Allowed values for options:

  • :start-from: :end (default), :beginning, offset-ago, or offset-after-timestamp-millis
  • :retry-mode: :individual (default), :all-after, or :none
  • :ack-return-agg: aggregator to use for streaming ack returns, defaults to using the last aggregated value

stream-topology

(stream-topology topologies name)

Create a stream topology for the module.

throw!

(throw! e)

Throw an exception. Used in Rama dataflow code since Clojure throw is a special form.

unify>

(unify> & anchors)

Attaches subsequent code to each of the anchors specified, merging computation flowing out of those nodes. After the unify the only vars in scope are vars that exist in all parent branches.

value-schema

(value-schema klass)

Declare a schema for values matching a particular class.

vector-schema

(vector-schema value-schema)(vector-schema value-schema options)

Declare a vector schema. Value schema can be a class reference or a nested schema definition.

Accepts options map to specify subindexing options. Use {:subindex? true} to declare subindexing with size tracking on. use {:subindex-options {:track-size? false}} to declare subindexing with size tracking off.

Examples:

(vector-schema Long)
(vector-schema Object {:subindex? true})
(vector-schema Object {:subindex-options {:track-size? false}})
(vector-schema (set-schema String {:subindex? true}) {:subindex? true})

yield-if-overtime

(yield-if-overtime)

Yield the task thread if current event is using too much time. Commonly used in loop bodies.

|all

(|all)

Sends continuation to all tasks.

|all$$

|custom

(|custom afn & args)

Partitions according to a custom function. Function accepts as arguments the number of tasks followed by the provided arguments.

Example:

(<<ramafn %custom
  [*num-tasks *arg1 *arg2]
  (:> (mod (+ *arg1 *arg2) *num-tasks)))
(|custom %custom 10 -11)

|custom$$

(|custom$$ afn & args)

Partitions according to a custom function. Function accepts as arguments the number of partitions followed by the provided arguments.

Example:

(<<ramafn %custom
  [*num-partitions *arg1 *arg2]
  (:> (mod (+ *arg1 *arg2) *num-partitions)))
(|custom$$ $$p %custom 10 -11)

|direct

(|direct *task-id)

Sends continuation to given task id.

|direct$$

|global

(|global)

Sends all continuations to task 0.

|global$$

|hash

(|hash *field)

Chooses task to partition to based on hash(field) % num-tasks.

|hash$$

|origin

(|origin)

Special partitioner for use in query topologies. Indicate to partition back to the caller of the query. Must be the final partitioner of the query topology definition.

|path$$

A special partitioner on pstates. Takes in as args the pstate and the path to partition based on. Pstates can be configured with knowledge on how to read a path and determine to which partition to navigate to, and |path$$ uses that functionality to partition.

By default pstates navigate based on hash partitioning expecting the first element of a path to be keypath. If not, they will error if the path given to |path$$ does not start with keypath.

|shuffle

(|shuffle)

Partitions to a single task chosen in random round robin order. Once one event has gone to each task, re-randomizes order again.