krb5 commit: Add tests for KCM ccache type

Greg Hudson ghudson at
Wed Nov 28 00:59:41 EST 2018
commit f0bcb86131e385b2603ccf0f3c7d65aa3891b220
Author: Greg Hudson <ghudson at>
Date:   Thu Nov 22 00:27:35 2018 -0500

    Add tests for KCM ccache type
    Using a trivial Python implementation of a KCM server, run the tests against the KCM ccache type.

 src/tests/ |  246 ++++++++++++++++++++++++++++++++++++++++++++++++
 src/tests/  |    9 ++-
 2 files changed, 254 insertions(+), 1 deletions(-)

diff --git a/src/tests/ b/src/tests/
new file mode 100644
index 0000000..57432e5
--- /dev/null
+++ b/src/tests/
@@ -0,0 +1,246 @@
+# This is a simple KCM test server, used to exercise the KCM ccache
+# client code.  It will generally throw an uncaught exception if the
+# client sends anything unexpected, so is unsuitable for production.
+# (It also imposes no namespace or access constraints, and blocks
+# while reading requests and writing responses.)
+# This code knows nothing about how to marshal and unmarshal principal
+# names and credentials as is required in the KCM protocol; instead,
+# it just remembers the marshalled forms and replays them to the
+# client when asked.  This works because marshalled creds and
+# principal names are always the last part of marshalled request
+# arguments, and because we don't need to implement remove_cred (which
+# would need to know how to match a cred tag against previously stored
+# credentials).
+# The following code is useful for debugging if anything appears to be
+# going wrong in the server, since daemon output is generally not
+# visible in Python test scripts.
+# import sys, traceback
+# def ehook(etype, value, tb):
+#     with open('/tmp/exception', 'w') as f:
+#         traceback.print_exception(etype, value, tb, file=f)
+# sys.excepthook = ehook
+import select
+import socket
+import struct
+import sys
+caches = {}
+cache_uuidmap = {}
+defname = b'default'
+next_unique = 1
+next_uuid = 1
+class KCMOpcodes(object):
+    GEN_NEW = 3
+    DESTROY = 5
+    STORE = 6
+    GET_CRED_BY_UUID = 10
+    REMOVE_CRED = 11
+    GET_KDC_OFFSET = 22
+    SET_KDC_OFFSET = 23
+class KRB5Errors(object):
+    KRB5_CC_END = -1765328242
+    KRB5_CC_NOSUPP = -1765328137
+    KRB5_FCC_NOFILE = -1765328189
+def make_uuid():
+    global next_uuid
+    uuid = bytes(12) + struct.pack('>L', next_uuid)
+    next_uuid = next_uuid + 1
+    return uuid
+class Cache(object):
+    def __init__(self, name):
+ = name
+        self.princ = None
+        self.uuid = make_uuid()
+        self.cred_uuids = []
+        self.creds = {}
+        self.time_offset = 0
+def get_cache(name):
+    if name in caches:
+        return caches[name]
+    cache = Cache(name)
+    caches[name] = cache
+    cache_uuidmap[cache.uuid] = cache
+    return cache
+def unmarshal_name(argbytes):
+    offset = argbytes.find(b'\0')
+    return argbytes[0:offset], argbytes[offset+1:]
+def op_gen_new(argbytes):
+    # Does not actually check for uniqueness.
+    global next_unique
+    name = b'unique' + str(next_unique).encode('ascii')
+    next_unique += 1
+    return 0, name + b'\0'
+def op_initialize(argbytes):
+    name, princ = unmarshal_name(argbytes)
+    cache = get_cache(name)
+    cache.princ = princ
+    cache.cred_uuids = []
+    cache.creds = {}
+    cache.time_offset = 0
+    return 0, b''
+def op_destroy(argbytes):
+    name, rest = unmarshal_name(argbytes)
+    cache = get_cache(name)
+    del cache_uuidmap[cache.uuid]
+    del caches[name]
+    return 0, b''
+def op_store(argbytes):
+    name, cred = unmarshal_name(argbytes)
+    cache = get_cache(name)
+    uuid = make_uuid()
+    cache.creds[uuid] = cred
+    cache.cred_uuids.append(uuid)
+    return 0, b''
+def op_get_principal(argbytes):
+    name, rest = unmarshal_name(argbytes)
+    cache = get_cache(name)
+    if cache.princ is None:
+        return KRB5Errors.KRB5_FCC_NOFILE, b''
+    return 0, cache.princ + b'\0'
+def op_get_cred_uuid_list(argbytes):
+    name, rest = unmarshal_name(argbytes)
+    cache = get_cache(name)
+    return 0, b''.join(cache.cred_uuids)
+def op_get_cred_by_uuid(argbytes):
+    name, uuid = unmarshal_name(argbytes)
+    cache = get_cache(name)
+    if uuid not in cache.creds:
+        return KRB5Errors.KRB5_CC_END, b''
+    return 0, cache.creds[uuid]
+def op_remove_cred(argbytes):
+    return KRB5Errors.KRB5_CC_NOSUPP, b''
+def op_get_cache_uuid_list(argbytes):
+    return 0, b''.join(cache_uuidmap.keys())
+def op_get_cache_by_uuid(argbytes):
+    uuid = argbytes
+    if uuid not in cache_uuidmap:
+        return KRB5Errors.KRB5_CC_END, b''
+    return 0, cache_uuidmap[uuid].name + b'\0'
+def op_get_default_cache(argbytes):
+    return 0, defname + b'\0'
+def op_set_default_cache(argbytes):
+    global defname
+    defname, rest = unmarshal_name(argbytes)
+    return 0, b''
+def op_get_kdc_offset(argbytes):
+    name, rest = unmarshal_name(argbytes)
+    cache = get_cache(name)
+    return 0, struct.pack('>l', cache.time_offset)
+def op_set_kdc_offset(argbytes):
+    name, obytes = unmarshal_name(argbytes)
+    cache = get_cache(name)
+    cache.time_offset, = struct.unpack('>l', obytes)
+    return 0, b''
+ophandlers = {
+    KCMOpcodes.GEN_NEW : op_gen_new,
+    KCMOpcodes.INITIALIZE : op_initialize,
+    KCMOpcodes.DESTROY : op_destroy,
+    KCMOpcodes.STORE : op_store,
+    KCMOpcodes.GET_PRINCIPAL : op_get_principal,
+    KCMOpcodes.GET_CRED_UUID_LIST : op_get_cred_uuid_list,
+    KCMOpcodes.GET_CRED_BY_UUID : op_get_cred_by_uuid,
+    KCMOpcodes.REMOVE_CRED : op_remove_cred,
+    KCMOpcodes.GET_CACHE_UUID_LIST : op_get_cache_uuid_list,
+    KCMOpcodes.GET_CACHE_BY_UUID : op_get_cache_by_uuid,
+    KCMOpcodes.GET_DEFAULT_CACHE : op_get_default_cache,
+    KCMOpcodes.SET_DEFAULT_CACHE : op_set_default_cache,
+    KCMOpcodes.GET_KDC_OFFSET : op_get_kdc_offset,
+    KCMOpcodes.SET_KDC_OFFSET : op_set_kdc_offset
+# Read and respond to a request from the socket s.
+def service_request(s):
+    lenbytes = b''
+    while len(lenbytes) < 4:
+        lenbytes += s.recv(4 - len(lenbytes))
+        if lenbytes == b'':
+                return False
+    reqlen, = struct.unpack('>L', lenbytes)
+    req = b''
+    while len(req) < reqlen:
+        req += s.recv(reqlen - len(req))
+    majver, minver, op = struct.unpack('>BBH', req[:4])
+    argbytes = req[4:]
+    code, payload = ophandlers[op](argbytes)
+    # The KCM response is the code (4 bytes) and the response payload.
+    # The Heimdal IPC response is the length of the KCM response (4
+    # bytes), a status code which is essentially always 0 (4 bytes),
+    # and the KCM response.
+    kcm_response = struct.pack('>l', code) + payload
+    hipc_response = struct.pack('>LL', len(kcm_response), 0) + kcm_response
+    s.sendall(hipc_response)
+    return True
+server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+select_input = [server,]
+while True:
+    iready, oready, xready =, [], [])
+    for s in iready:
+        if s == server:
+            client, addr = server.accept()
+            select_input.append(client)
+        else:
+            if not service_request(s):
+                select_input.remove(s)
+                s.close()
diff --git a/src/tests/ b/src/tests/
index fcf1a61..66804af 100755
--- a/src/tests/
+++ b/src/tests/
@@ -22,7 +22,10 @@
 from k5test import *
-realm = K5Realm(create_host=False)
+kcm_socket_path = os.path.join(os.getcwd(), 'testdir', 'kcm')
+conf = {'libdefaults': {'kcm_socket': kcm_socket_path,
+                        'kcm_mach_service': '-'}}
+realm = K5Realm(create_host=False, krb5_conf=conf)
 keyctl = which('keyctl')
 out =[klist, '-c', 'KEYRING:process:abcd'], expected_code=1)
@@ -122,6 +125,10 @@ def collection_test(realm, ccname):
 collection_test(realm, 'DIR:' + os.path.join(realm.testdir, 'cc'))
+kcmserver_path = os.path.join(srctop, 'tests', '')
+realm.start_server([sys.executable, kcmserver_path, kcm_socket_path],
+                   'starting...')
+collection_test(realm, 'KCM:')
 if test_keyring:
     def cleanup_keyring(anchor, name):
         out =['keyctl', 'list', anchor])

More information about the cvs-krb5 mailing list