accession

0.1.1


A Clojure library for Redis

dependencies

org.clojure/clojure
1.4.0-alpha1

dev dependencies

marginalia
0.6.1
lein-marginalia
0.6.1
lein-clojars
0.6.0



(this space intentionally left almost blank)
 
(ns accession.core
  (:refer-clojure :exclude [get set keys type sync sort])
  (:require [clojure.java.io :as io]
            [clojure.string :as str])
  (:import (java.net Socket)
           (java.io BufferedInputStream DataInputStream)))

The new unified protocol was introduced in Redis 1.2, but it became the standard way for talking with the Redis server in Redis 2.0. In the unified protocol all the arguments sent to the Redis server are binary safe. This is the general form:

  *<number of arguments> CR LF
  $<number of bytes of argument 1> CR LF
  <argument data> CR LF
  ...
  $<number of bytes of argument N> CR LF
  <argument data> CR LF

See the following example:

  *3
  $3
  SET
  $5
  mykey
  $7
  myvalue

This is how the above command looks as a quoted string, so that it is possible to see the exact value of every byte in the query:

(defn query
  [name & args]
  (str "*"
       (+ 1 (count args)) "\r\n"
       "$" (count name) "\r\n"
       (str/upper-case name) "\r\n"
       (str/join "\r\n"
                 (map (fn [a] (str "$" (count (.getBytes (str a))) "\r\n" a))
                      args))
       "\r\n"))
"*3\r\n$3\r\nSET\r\n$5\r\nmykey\r\n$7\r\nmyvalue\r\n"

These protocols were created to support pub/sub, in particular, subscribe. Subscribe is called in terms of both a connection-map and an open channel. This is not implemented via the macro because the semantics are so much different from that of the standard Redis calls. Since channels and subscriptions live longer than a single query, we needed a way to keep a connection open and close it later when we are finished.

(defprotocol ISubscribable
  (subscribe [this channels])
  #_(psubscribe [this channels]))
(defprotocol IUnsubscribable
  (unsubscribe [this channels])
  #_(punsubscribe [this channels]))
(defprotocol IRedisChannel
  (close [this]))

Redis will reply to commands with different kinds of replies. It is possible to check the kind of reply from the first byte sent by the server:

  • With a single line reply the first byte of the reply will be +
  • With an error message the first byte of the reply will be -
  • With an integer number the first byte of the reply will be :
  • With bulk reply the first byte of the reply will be $
  • With multi-bulk reply the first byte of the reply will be *
(defmulti response
  (fn [in] (char (.readByte in))))
(defmethod response \- [in]
  (.readLine in))
(defmethod response \+ [in]
  (.readLine in))
(defmethod response \$ [in]
  (let [length (Integer/parseInt (.readLine in))]
    (when (not= length -1)
      (let [content (byte-array (+ 2 length))]
        (.read in content)
        (String. content 0 length)))))
(defmethod response \: [in]
  (Long/parseLong (.readLine in)))
(defmethod response \* [in]
  (let [length (Integer/parseInt (.readLine in))]
    (doall (repeatedly length #(response in)))))

Creates an initial socket for consumption attached to the proper host and port. This socket is subject to modification later depending on the request use case or if a timeout is set.

(defn- socket 
  [spec]
  (doto (Socket. (:host spec) (:port spec))
    (.setTcpNoDelay true)
    (.setKeepAlive true)))

Responsible for actually making the request to the Redis server. Sets the timeout on the socket if one was specified.

(defn request
  [conn & query]
  (with-open [socket (doto (socket conn)
                       (.setSoTimeout (:timeout conn)))
              in (DataInputStream. (BufferedInputStream. (.getInputStream socket)))
              out (.getOutputStream socket)]
    (.write out (.getBytes (apply str query)))
    (if (next query)
      (doall (repeatedly (count query) #(response in)))
      (response in))))

Used in conjunction with an open channel to handle messages that arrive. Takes a channel spec and a message

(defn receive-message
  [channel-spec in]
  (let [next-message (response in)
        channel-name (second next-message)]
    (if-let [f (clojure.core/get @channel-spec channel-name)]
      (f next-message))))

Sends commands out to the specified channels

(defn write-commands
  [out command channels]
  (.write out (.getBytes (apply query command (clojure.core/keys channels)))))

This record implements the protocols defined above to provide the pub/sub infrastructure

(defrecord RedisChannel [channel-fns socket out]
  ISubscribable
  (subscribe [this channels]
    (do (swap! channel-fns merge channels)
        (write-commands out "subscribe" channels)))
  IUnsubscribable
  (unsubscribe [this channel]
    (do (swap! channel-fns dissoc channel)
        (.write out (.getBytes (query "unsubscribe" channel)))))
  IRedisChannel
  (close [this]
    (.close socket)))

Takes a connection and sets up a connection to a Redis channel with the provided functions as callbacks to invoke when messages are received

(defn open-channel
  [conn command channels]
  (let [socket (socket conn)
        in (.getInputStream socket)
        out (.getOutputStream socket)
        in (DataInputStream. (BufferedInputStream. in))
        channel-fns (atom channels)]
    (write-commands out command channels)
    (future (doall (repeatedly #(receive-message channel-fns in))))
    (RedisChannel. channel-fns socket out)))
(extend-type clojure.lang.PersistentArrayMap
  ISubscribable
  (subscribe [this channels]
    (open-channel this "subscribe" channels)))

Creates the initial connection spec. Options can be overridden by passing in a map. The following keys are valid:

:host :port :password :socket :timeout

Although passing in your own socket is not recommended and will probably cause more problems than it solves

(defn connection-map
  ([] (connection-map {}))
  ([spec]
     (let [default {:host "127.0.0.1"
                    :port 6379
                    :password nil
                    :socket nil
                    :timeout 0}]
       (merge default spec))))
(defn with-connection [spec & body]
  (apply request spec body))

We would like to create one function for each command which Redis supports. The set function would look something like this:

(defn set [key value] (query "set" key value))

Similarly, the get function would be:

(defn get [key] (query "get" key))

Because each of these functions has the same pattern, we can use a macro to create them and save a lot of typing.

This function enables vararg style definitions in the queries. For example you can write:

   (mget [key & keys])

and the defquery macro will properly expand out into a variable argument function

(defn parameters
  [params]
  (let [[args varargs] (split-with #(not= '& %)  params)]
    (conj (vec args) (last varargs))))

Given a redis command and a parameter list, create a function of the form:

   (defn <name> <parameter-list>
     (query <command> <p1> <p2> ... <pN>))

The name which is passed is a symbol and is first used as a symbol for the function name. We convert this symbol to a string and use that string as the command name.

params is a list of N symbols which represent the parameters to the function. We use this list as the parameter-list when we create the function. Each symbol in this list will be an argument to query after the command. We use unquote splicing (~@) to insert these arguments after the command string.

(defmacro defquery
  [name params]
  (let [command (str name)
        p (parameters params)]
    `(defn ~name ~params
       (apply query ~command ~@p))))

A call to:

(defqueries (set [key value]))

will expand to:

(do (defn set [key value] (query "set" key value)))

Given any number of redis commands and argument lists, convert them to function definitions.

This is an interesting use of unquote splicing. Unquote splicing works on a sequence and that sequence can be the result of a function call as it is here. The function which produces this sequence maps each argument passed to this macro over a function which takes each list like (set [key value]), binds it to q, and uses unquote splicing again to create a call to defquery which looks like (defquery set [key value]).

Finally, a macro can only return a single form, so we wrap all of the produced expressions in a do special form.

(defmacro defqueries
  [& queries]
  `(do ~@(map (fn [q] `(defquery ~@q)) queries)))
(defqueries
  (auth             [password])
  (quit             [])
  (select           [index])
  (bgwriteaof       [])
  (bgsave           [])
  (flushdb          [])
  (flushall         [])
  (dbsize           [])
  (info             [])
  (save             [])
  (sync             [])
  (lastsave         [])
  (shutdown         [])
  (slaveof          [host port])
  (slowlog          [command argument])
  ;; (config-get [parameter])
  ;; (config-set [parameter value])
  ;; (config-resetstat [])
  ;; (debug-object [key])
  ;; (debug-segfault)

  (echo             [message])
  (ping             [])
  (discard          [])
  (exec             [])
  (monitor          [])
  (multi            [])
  (object           [subcommand & arguments])
  
  (set              [key value])
  (setex            [key seconds value])
  (setnx            [key value])
  (setbit           [key offset value])
  (mset             [key value & pairs])
  (msetnx           [key value & pairs])
  (setrange         [key offset value])
  (get              [key])
  (mget             [key & keys])
  (getbit           [key offset])
  (getset           [key value])
  (getrange         [key start end])
  (append           [key value])
  (keys             [pattern])
  (exists           [key])
  (randomkey        [])
  (type             [key])
  (move             [key db])
  (rename           [key newkey])
  (renamenx         [key newkey])
  (strlen           [key])
  (watch            [key & keys])
  (unwatch          [])
  (del              [key & keys])
  (sort             [key & options])

  (incr             [key])
  (incrby           [key increment])
  (decr             [key])
  (decrby           [key increment])

  (expire           [key seconds])
  (expireat         [key seconds])
  (persist          [key])
  (ttl              [key])

  (llen             [key])
  (lpop             [key])
  (lpush            [key value])
  (lpushx           [key value & values])
  (lset             [key value index])
  (linsert          [key before-after pivot value])
  (lrem             [key count value])
  (ltrim            [key start stop])
  (lrange           [key start end])
  (lindex           [key index])
  (rpop             [key])
  (rpush            [key value & values])
  (rpushx           [key value])
  (rpoplpush        [source destination])
  (blpop            [key timeout])
  (brpop            [key timeout])
  (brpoplpush       [source destination timeout])

  (hset             [key field value])
  (hmset            [key field value & pairs])
  (hsetnx           [key field value])
  (hget             [key field])
  (hmget            [key field & fields])
  (hexists          [key field])
  (hdel             [key field & fields])
  (hgetall          [key])
  (hincrby          [key field increment])
  (hkeys            [key])
  (hvals            [key])
  (hlen             [key])
  
  (sadd             [key member & members])
  (srem             [key member & members])
  (sismember        [key member])
  (smembers         [key])
  (sunion           [key & keys])
  (sunionstore      [destination key & keys])
  (sdiff            [key & keys])
  (sdiffstore       [destination set1 set2])
  (sinter           [key & keys])
  (sinterstore      [destination key & keys])
  (scard            [key])
  (smove            [source desination member])
  (spop             [key])
  (srandmember      [key])

  (zadd             [key score member & more])
  (zrange           [key start end])
  (zrangebyscore    [key min max & more])
  (zrevrange        [key start stop & more])
  (zrevrangebyscore [key max min & more])
  (zcard            [key])
  (zcount           [key min max])
  (zincrby          [key increment member])
  (zinterstore      [destination numkeys set1 set2])
  (zrange           [key start stop])
  (zrank            [member])
  (zrem             [key member & members])
  (zremrangebyrank  [key start stop])
  (zremrangebyscore [key min max])
  (zrevrank         [key member])
  (zscore           [key member])
  (zunionstore      [destination numkeys set1 set2])

  (publish          [channel message]))