Python "runtime"

         
###
# Modern Albufeira Prolog Interpreter
#
# Warranty & Liability
# To the extent permitted by applicable law and unless explicitly
# otherwise agreed upon, XLOG Technologies AG makes no warranties
# regarding the provided information. XLOG Technologies AG assumes
# no liability that any problems might be solved with the information
# provided by XLOG Technologies AG.
#
# Rights & License
# All industrial property rights regarding the information - copyright
# and patent rights in particular - are the sole property of XLOG
# Technologies AG. If the company was not the originator of some
# excerpts, XLOG Technologies AG has at least obtained the right to
# reproduce, change and translate the information.
#
# Reproduction is restricted to the whole unaltered document. Reproduction
# of the information is only allowed for non-commercial uses. Selling,
# giving away or letting of the execution of the library is prohibited.
# The library can be distributed as part of your applications and libraries
# for execution provided this comment remains unchanged.
#
# Restrictions
# Only to be distributed with programs that add significant and primary
# functionality to the library. Not to be distributed with additional
# software intended to replace any components of the library.
#
# Trademarks
# Jekejeke is a registered trademark of XLOG Technologies AG.
##
import nova.store as store
from nova.store import (MASK_PRED_DYNAMIC, pred_link,
is_provable, remove_clause, is_logical, is_cache,
pred_destroy, Cache, check_nonvar,
make_defined, set, pred_touch, snapshot_rope,
Variable, Compound, is_structure, is_variable,
deref, copy_term, is_stick, defined_pred)
import nova.machine as machine
from nova.machine import (cont, Choice, more, exec_build,
make_indicator, lookup_pred, exec_body, exec_head,
exec_frame, unify, exec_unify, bind,
make_indicator_term, is_atom, unbind,
VOID_ARGS, make_error, register_signal)
from nova.special import (check_atom, make_check, check_callable,
check_integer, make_special, list_objects, check_clause)
import os
import asyncio
import stat
import errno
MAX_BUF = 4096
bootbase = ""
###
# Set the boot base.
#
# @param url The boot base.
##
def set_bootbase(url):
global bootbase
bootbase = url
###################################################################
# current_output/1 and current_error/1 #
###################################################################
MASK_DST_COLR = 0x00000004
MASK_DST_CYCL = 0x00000008
MASK_DST_FEAT = MASK_DST_COLR | MASK_DST_CYCL
MASK_DST_LINE = 0x00000010
###
# Create a text output.
##
class Sink:
def __init__(self):
self.buf = ""
self.send = lambda fd, buf: fd
self.last = -1
self.flags = 0
self.notify = lambda fd: None
self.release = lambda fd: None
self.data = NotImplemented
self.indent = 0
self.offset = 0
###
# current_output(S): [ISO 8.11.2]
# The built-in succeeds in S with the current output.
##
def test_current_output(args):
alpha = store.engine.text_output
return exec_unify(args[0], alpha)
###
# current_error(S):
# The built-in succeeds in S with the current error.
##
def test_current_error(args):
alpha = store.engine.text_error
return exec_unify(args[0], alpha)
###################################################################
# set_output/1 and set_error/1 #
###################################################################
###
# set_output(S): [ISO 8.11.4]
# The built-in succeeds. As a side effect the current output is set to S.
##
def test_set_output(args):
obj = exec_build(args[0])
check_sink(obj)
store.engine.text_output = obj
return True
###
# set_error(S):
# The built-in succeeds. As a side effect the current error is set to S.
##
def test_set_error(args):
obj = exec_build(args[0])
check_sink(obj)
store.engine.text_error = obj
return True
###
# Assure that the object is a sink.
#
# @param beta The object.
##
def check_sink(beta):
if not isinstance(beta, Sink):
check_nonvar(beta)
beta = copy_term(beta)
raise make_error(Compound("type_error", ["writer", beta]))
################################################################
# put_atom/2 #
################################################################
###
# put_atom(S, A):
# The built-in succeeds. As a side effect, it adds
# the atom to the stream S.
##
def test_put_atom(args):
stream = exec_build(args[0])
check_sink(stream)
text = exec_build(args[1])
check_atom(text)
put_atom(stream, text)
return True
def put_atom(stream, text):
if len(text) > 0:
pos = text.rfind("\n")
if pos != -1:
stream.offset = len(text) - pos - 1
else:
stream.offset += len(text)
stream.last = ord(text[len(text) - 1])
if stream.buf is not None:
stream.buf += text
if len(stream.buf) >= MAX_BUF:
flush_buffer(stream)
else:
stream.data = stream.send(stream.data, text)
def flush_buffer(stream):
if stream.buf is not None and len(stream.buf) > 0:
text = stream.buf
stream.buf = ""
stream.data = stream.send(stream.data, text)
################################################################
# current_input/1 and set_input/1 #
################################################################
MASK_SRC_SKIP = 0x00000001
MASK_SRC_AREAD = 0x00000002
###
# Create a text input.
##
class Source:
def __init__(self):
self.buf = ""
self.pos = 0
self.receive = lambda fd: ""
self.flags = 0
self.lineno = 0
self.release = lambda fd: None
self.data = NotImplemented
def illegal_state():
raise make_error(Compound("resource_error", ["illegal_state"]))
###
# current_input(S): [ISO 8.11.1]
# The built-in succeeds in S with the current input.
##
def test_current_input(args):
alpha = store.engine.text_input
return exec_unify(args[0], alpha)
###
# set_input(S): [ISO 8.11.3]
# The built-in succeeds. As a side effect it sets the current input to S.
##
def test_set_input(args):
obj = exec_build(args[0])
check_source(obj)
store.engine.text_input = obj
return True
###
# Assure that the object is a source.
#
# @param beta The object.
##
def check_source(beta):
if not isinstance(beta, Source):
check_nonvar(beta)
beta = copy_term(beta)
raise make_error(Compound("type_error", ["reader", beta]))
################################################################
# os_get_code/2 and os_peek_code/2 #
################################################################
###
# os_get_code(S, C):
# The predicate succeeds in C with the Unicode point from the stream buffer S.
# As a side effect the stream position is advanced.
##
def test_os_get_code(args):
stream = exec_build(args[0])
pos = stream.pos
buf = stream.buf
if pos < len(buf):
ch = ord(buf[pos])
pos += 1
if ch == 13 or (ch == 10 and (stream.flags & MASK_SRC_SKIP) == 0):
stream.lineno += 1
if ch == 13:
stream.flags |= MASK_SRC_SKIP
else:
stream.flags &= ~MASK_SRC_SKIP
stream.pos = pos
return exec_unify(args[1], ch)
else:
return False
###
# os_peek_code(S, C):
# The built-in succeeds in C with the Unicode point from the stream buffer S.
##
def test_os_peek_code(args):
stream = exec_build(args[0])
pos = stream.pos
buf = stream.buf
if pos < len(buf):
ch = ord(buf[pos])
return exec_unify(args[1], ch)
else:
return False
################################################################
# os_open_promise_opts/4 #
################################################################
###
# os_open_promise_opts(P, L, S, Q):
# The predicate succeeds in Q with a promise for open input S
# on path P and option list L.
##
def test_os_open_promise(args):
url = exec_build(args[0])
check_atom(url)
opts = exec_build(args[1])
stream = Source()
if not exec_unify(args[2], stream):
return False
buf = machine.ctx
prom = open_file_promise_opts(buf, stream, url, opts)
return exec_unify(args[3], lambda: prom)
def get_encoding(opts):
if opts is not None and "encoding" in opts:
return opts["encoding"]
else:
return "utf8"
async def open_file_promise_opts(buf, stream, url, opts):
enc = get_encoding(opts)
try:
file = await asyncio.to_thread(open, url, encoding=enc, newline='')
except OSError as err:
register_signal(buf, map_file_error(err, url))
return
stream.data = file
stream.receive = file_read_promise
stream.release = file_close_promise
stream.flags |= MASK_SRC_AREAD
def map_file_error(err, url):
if hasattr(err, 'reason'):
code = err.reason.errno
if code == 11001:
return Compound("resource_error", ["unknown_host"])
else:
return Compound("resource_error", ["connect_failed"])
else:
code = err.errno
if code == errno.ENOENT or errno.EACCES:
return Compound("existence_error", ["source_sink", url])
else:
return Compound("resource_error", ["remote_error"])
###################################################################
# os_read_promise/2 and os_close_promise/2 #
###################################################################
###
# os_read_promise(S, P):
# The predicate suceeds in P with a read promise for a input S.
##
def test_os_read_promise(args):
stream = exec_build(args[0])
buf = machine.ctx
return exec_unify(args[1], lambda: stream.receive(buf, stream))
async def file_read_promise(buf, stream):
try:
res = await asyncio.to_thread(blocking_read, stream.data)
stream.buf = res
stream.pos = 0
except IOError as err:
register_signal(buf, map_stream_error(err))
def blocking_read(data):
if data.fileno() == 0:
return data.readline(8192)
else:
return data.read(8192)
###
# Map a stream error when in transit.
#
# @param err The offending error.
# @return The Prolog error term.
##
def map_stream_error(err):
code = err.errno
if code == errno.ETIMEDOUT:
return Compound("resource_error", ["socket_timeout"])
elif code == errno.EADDRINUSE:
return Compound("resource_error", ["port_error"])
elif code == errno.ECONNRESET:
return Compound("resource_error", ["remote_error"])
else:
return Compound("resource_error", ["io_exception"])
###
# os_close_promise(S, P):
# The predicate suceeds in P with a read promise for a input S.
##
def test_os_close_promise(args):
stream = exec_build(args[0])
buf = machine.ctx
return exec_unify(args[1], lambda: stream.release(buf, stream))
async def file_close_promise(buf, stream):
try:
await asyncio.to_thread(blocking_close, stream.data)
except IOError as err:
register_signal(buf, map_stream_error(err))
def blocking_close(data):
return data.close()
################################################################
# os_prop_promise/3 #
################################################################
###
# os_prop_promise(F, M, Q):
# The predicate succeeds in Q with with a promise for the
# properties M of the file F. Barks if path F doesn't exist
# or io exception while resolving.
##
def test_os_prop_promise(args):
url = exec_build(args[0])
check_atom(url)
res = {}
if not exec_unify(args[1], res):
return False
buf = machine.ctx
prom = prop_file_promise(buf, url, res)
return exec_unify(args[2], lambda: prom)
async def prop_file_promise(buf, url, res):
try:
stats = await asyncio.to_thread(os.stat, url)
ftype = "regular" if stat.S_ISREG(stats.st_mode) else \
("directory" if stat.S_ISDIR(stats.st_mode) else "other")
res["last_modified"] = stats.st_mtime_ns // 1000000
res["absolute_path"] = os.path.abspath(url) + os.path.sep \
if ftype == "directory" else os.path.abspath(url)
res["type"] = ftype
res["can_write"] = "true" \
if (stats.st_mode & stat.S_IWUSR) != 0 else "false"
except OSError as err:
register_signal(buf, map_file_error(err, url))
return
################################################################
# os_open_sync_opts/4 #
################################################################
###
# os_open_sync_opts(P, M, L, S):
# The predicate succeeds. As a side effect the stream S is
# opened on the path P with the mode M and the option list L.
##
def test_os_open_sync(args):
url = exec_build(args[0])
check_atom(url)
mode = exec_build(args[1])
check_atom(mode)
opts = exec_build(args[2])
if "read" == mode:
raise make_error(Compound("resource_error",
["not_supported"]))
elif "write" == mode:
stream = open_write(url, "w", opts)
elif "append" == mode:
stream = open_write(url, "a", opts)
else:
raise make_error(Compound("domain_error",
["io_mode", mode]))
return exec_unify(args[3], stream)
def open_write(url, mode, opts):
enc = get_encoding(opts)
try:
file = open(url, mode, encoding=enc, newline='')
except OSError as err:
raise make_error(map_file_error(err, url))
dst = Sink()
dst.data = file
dst.send = file_write
dst.release = file_close
return dst
def file_write(data, buf):
try:
data.write(buf)
return data
except IOError as err:
raise make_error(map_stream_error(err))
def file_close(data):
try:
data.close()
except IOError as err:
raise make_error(map_stream_error(err))
################################################################
# flush_output/1 and os_close_sync/1 #
################################################################
###
# flush_output(S): [ISO 8.11.7]
# The built-in succeeds. As a side effect, it flushes the stream buffer.
##
def test_flush_output(args):
stream = exec_build(args[0])
check_sink(stream)
stream_flush(stream)
return True
def stream_flush(stream):
flush_buffer(stream)
stream.notify(stream.data)
###
# close(S): [ISO 8.11.6]
# The built-in succeeds. As a side effect, the stream S is closed.
##
def test_os_close_sync(args):
stream = exec_build(args[0])
if isinstance(stream, Sink):
pass
else:
check_source(stream)
stream_close(stream)
return True
def stream_close(stream):
if isinstance(stream, Sink):
flush_buffer(stream)
stream.release(stream.data)
else:
stream.release(stream.data)
################################################################
# os_cntrl_sync/2 and os_read_sync/1 #
################################################################
###
# os_cntrl_sync(F, P):
# The predicate assigns the property P to the file F.
##
def test_os_cntrl_sync(args):
url = exec_build(args[0])
check_atom(url)
prop = exec_build(args[1])
if (is_structure(prop) and
prop.functor == "last_modified"
and len(prop.args) == 1):
val = deref(prop.args[0])
check_integer(val)
if val < 0:
raise make_error(Compound("domain_error",
["not_less_than_zero", val]))
try:
os.utime(url, (val/1000, val/1000))
except OSError as err:
raise make_error(map_file_error(err, url))
elif (is_structure(prop) and
prop.functor == "can_write"
and len(prop.args) == 1):
val = deref(prop.args[0])
check_atom(val)
if "true" != val and "false" != val:
raise make_error(Compound("domain_error",
["boolean", val]))
try:
stats = os.stat(url)
if "true" == val:
os.chmod(url, (stats.st_mode | 0x80))
else:
os.chmod(url, (stats.st_mode & ~0x80))
except OSError as err:
raise make_error(map_file_error(err, url))
else:
check_nonvar(prop)
prop = copy_term(prop)
raise make_error(Compound("domain_error",
["file_property", prop]))
return True
###
# os_read_sync(S):
# The predicate succeeds. As a side effect the stream buffer is read.
##
def test_os_read_sync(args):
stream = exec_build(args[0])
stream.buf = stream.receive(stream.data)
stream.pos = 0
return True
################################################################
# ir_site_new/2, ir_is_site/1 and ir_site_name/2 #
################################################################
###
# ir_site_new(X, Y): internal only
# The built-in succeeds in Y with a cacheable X.
##
def test_ir_site_new(args):
alpha = exec_build(args[0])
if is_structure(alpha):
if is_atom(alpha.functor):
alpha = Compound(Cache(alpha.functor), alpha.args)
elif is_atom(alpha):
alpha = Cache(alpha)
return exec_unify(args[1], alpha)
###
# ir_is_site(X): internal only
# The built-in succeeds if X is cacheable.
##
def test_ir_is_site(args):
alpha = exec_build(args[0])
if is_structure(alpha):
return is_cache(alpha.functor)
return is_cache(alpha)
###
# ir_site_name(X, Y): internal only
# The built-in succeeds in Y with a cachefree X.
##
def test_ir_site_name(args):
alpha = exec_build(args[0])
if is_structure(alpha):
if is_cache(alpha.functor):
alpha = Compound(alpha.functor.name, alpha.args)
elif is_cache(alpha):
alpha = alpha.name
return exec_unify(args[1], alpha)
################################################################
# ir_link_new/2 and ir_is_link/1 #
################################################################
###
# ir_link_new(L, P): internal only
# The built-in succeeds in P with an annonymous predicate for the clauses L.
##
def test_ir_link_new(args):
alpha = exec_build(args[0])
alpha = list_objects(alpha)
return exec_unify(args[1], make_defined(alpha))
###
# ir_is_link(Q): internal only
# The built-in succeeds if Q is a provable.
##
def test_ir_is_link(args):
alpha = exec_build(args[0])
if is_structure(alpha):
return is_provable(alpha.functor)
return is_provable(alpha)
################################################################
# kb_clause_ref/3 and kb_pred_touch/3 #
################################################################
MASK_FIND_MODIFY = 0x00000001
MASK_FIND_DYNAMIC = 0x00000002
MASK_FIND_REVERSE = 0x00000004
MASK_FIND_SIGNAL = 0x00000008
###
# kb_clause_ref(H, F, C): internal only
# The built-in succeeds in C with the clause references
# for the head H and the flags F.
##
def special_kb_clause_ref(args):
head = deref(args[0])
check_callable(head)
flags = deref(args[1])
check_integer(flags)
peek = lookup_pred(head)
if peek is NotImplemented:
if (flags & MASK_FIND_SIGNAL) != 0:
raise make_error(Compound("existence_error",
["procedure", make_indicator_term(head)]))
else:
return False
if (flags & MASK_FIND_DYNAMIC) != 0:
if (peek.flags & MASK_PRED_DYNAMIC) == 0:
make_error_find(head, flags)
if not is_logical(peek.func) and not is_stick(peek.func):
if (flags & MASK_FIND_SIGNAL) != 0:
make_error_find(head, flags)
else:
return False
if is_structure(head):
head = head.args
else:
head = VOID_ARGS
peek = defined_pred(peek, head)
peek = snapshot_rope(peek)
if (flags & MASK_FIND_REVERSE) == 0:
return solve2_ref(args, peek, 0, None)
else:
return solve2_ref_reverse(args, peek, len(peek), None)
def make_error_find(head, flags):
if (flags & MASK_FIND_MODIFY) != 0:
raise make_error(Compound("permission_error",
["modify", "static_procedure", make_indicator_term(head)]))
else:
raise make_error(Compound("permission_error",
["access", "private_procedure", make_indicator_term(head)]))
def solve_ref(rope, at, choice):
goal = deref(machine.call.args[0])
return solve2_ref(goal.args, rope, at, choice)
def solve_ref_reverse(rope, at, choice):
goal = deref(machine.call.args[0])
return solve2_ref_reverse(goal.args, rope, at, choice)
###
# Search a Prolog clause and return it.
#
# @param args The current arguments.
# @param rope The clause list.
# @param at The clause index.
# @param choice The choice point for reuse or null.
# @return True if search succeeds, otherwise false.
##
def solve2_ref(args, rope, at, choice):
mark = machine.trail
while at < len(rope):
clause = rope[at]
at += 1
if unify(args[2], clause):
if at < len(rope):
if choice is None:
choice = Choice(solve_ref, rope, at, mark)
else:
choice.at = at
more(choice)
cont(machine.call.args[1])
return True
unbind(mark)
return False
###
# Search a Prolog clause backwards and return it.
#
# @param args The current arguments.
# @param rope The clause list.
# @param at The clause index.
# @param choice The choice point for reuse or null.
# @return True if search succeeds, otherwise false.
##
def solve2_ref_reverse(args, rope, at, choice):
mark = machine.trail
while at > 0:
at -= 1
clause = rope[at]
if unify(args[2], clause):
if at > 0:
if choice is None:
choice = Choice(solve_ref_reverse, rope, at, mark)
else:
choice.at = at
more(choice)
cont(machine.call.args[1])
return True
unbind(mark)
return False
###
# kb_pred_touch(F, N, O): internal only
# The built-in succeeds. As a side effect the predicate
# indicator F/N with options O is touched.
##
def test_kb_pred_touch(args):
functor = exec_build(args[0])
check_atom(functor)
arity = exec_build(args[1])
check_integer(arity)
flags = exec_build(args[2])
check_integer(flags)
pred_touch(functor, arity, flags)
return True
################################################################
# kb_clause_remove/2 and kb_pred_destroy/2 #
################################################################
###
# kb_clause_remove(C, O): internal only
# The built-in succeeds if the clause C could
# be removed according to the options O.
##
def test_kb_clause_remove(args):
alpha = exec_build(args[0])
check_clause(alpha)
beta = exec_build(args[1])
check_integer(beta)
return remove_clause(alpha, beta)
###
# kb_pred_destroy(F, N): internal only
# The built-in succeeds. As a side effect the
# predicate indicator F/N is destroyed.
##
def test_kb_pred_destroy(args):
functor = exec_build(args[0])
check_atom(functor)
arity = exec_build(args[1])
check_integer(arity)
pred_destroy(functor, arity)
return True
################################################################
# kb_pred_link/3, kb_link_flags/2 and kb_pred_list/1 #
################################################################
###
# kb_pred_link(F, N, Q): internal only
# The built-in succeeds in Q with the provable of the
# predicate indicator F/A. Otherwise if no such provable
# exists the built-in fails.
##
def test_kb_pred_link(args):
functor = exec_build(args[0])
check_atom(functor)
arity = exec_build(args[1])
check_integer(arity)
peek = pred_link(functor, arity)
if peek is NotImplemented:
return False
return exec_unify(args[2], peek)
###
# kb_link_flags(Q, F): internal only
# The built-in succeeds in F with the flags of the provable Q.
##
def test_kb_link_flags(args):
peek = exec_build(args[0])
check_provable(peek)
res = peek.flags
return exec_unify(args[1], res)
###
# Assure that the object is a provable.
#
# @param beta The object.
##
def check_provable(beta):
if not is_provable(beta):
check_nonvar(beta)
beta = copy_term(beta)
raise make_error(Compound("type_error",
["provable", beta]))
###
# kb_pred_list(L): internal only
# The built-in succeeds in L with the current predicate indicators.
##
def test_kb_pred_list(args):
res = kb_pred_list()
return exec_unify(args[0], res)
def kb_pred_list():
back = None
res = None
for functor in store.kb:
temp = store.kb[functor]
i = 0
while i < len(temp):
peek = temp[i]
if peek is NotImplemented or peek.remover is not NotImplemented:
i += 1
continue
peek = Compound(".", [make_indicator(functor, i), NotImplemented])
if back is None:
res = peek
else:
back.args[1] = peek
back = peek
i += 1
if back is None:
res = "[]"
else:
back.args[1] = "[]"
return res
################################################################
# kb_clause_shard/2, kb_clause_head/2 and kb_clause_data/4 #
################################################################
###
# kb_clause_shard(C, S): internal only
# The built-in succeeds in S with the shard of the clause C.
##
def test_kb_clause_shard(args):
clause = exec_build(args[0])
check_clause(clause)
return exec_unify(args[1], clause.shard)
###
# kb_clause_head(C, H): internal only
# The built-in succeeds in H with the head of the clause C.
##
def special_kb_clause_head(args):
clause = deref(args[0])
check_clause(clause)
head = deref(args[1])
if clause.size != 0:
display = [NotImplemented] * clause.size
else:
display = None
if not is_variable(head):
if is_structure(head):
paras = head.args
else:
paras = VOID_ARGS
if not exec_head(clause.head, display, paras):
return False
else:
alpha = exec_frame(clause.functor, clause.head, display)
bind(alpha, head)
cont(machine.call.args[1])
return True
###
# kb_clause_data(C, H, O, L): internal only
# The built-in succeeds in H, O and L with the head,
# the cut var and the translated body of the clause C.
##
def special_kb_clause_data(args):
clause = deref(args[0])
check_clause(clause)
head = deref(args[1])
if clause.size != 0:
display = [NotImplemented] * clause.size
else:
display = None
if not is_variable(head):
if is_structure(head):
paras = head.args
else:
paras = VOID_ARGS
if not exec_head(clause.head, display, paras):
return False
else:
alpha = exec_frame(clause.functor, clause.head, display)
bind(alpha, head)
peek = clause.cutvar
if peek != -1:
temp = Variable()
display[peek] = temp
temp = Compound("just", [temp])
else:
temp = "nothing"
if not unify(args[2], temp):
return False
temp = exec_body(clause.body, display)
if not unify(args[3], temp):
return False
cont(machine.call.args[1])
return True
################################################################
# Runtime Init #
################################################################
# stream specials, output
set("current_output", 1, make_check(test_current_output))
set("current_error", 1, make_check(test_current_error))
set("set_output", 1, make_check(test_set_output))
set("set_error", 1, make_check(test_set_error))
set("put_atom", 2, make_check(test_put_atom))
# stream specials, async
set("current_input", 1, make_check(test_current_input))
set("set_input", 1, make_check(test_set_input))
set("os_get_code", 2, make_check(test_os_get_code))
set("os_peek_code", 2, make_check(test_os_peek_code))
set("os_open_promise", 4, make_check(test_os_open_promise))
set("os_read_promise", 2, make_check(test_os_read_promise))
set("os_close_promise", 2, make_check(test_os_close_promise))
set("os_prop_promise", 3, make_check(test_os_prop_promise))
# stream specials, sync
set("os_open_sync", 4, make_check(test_os_open_sync))
set("flush_output", 1, make_check(test_flush_output))
set("os_close_sync", 1, make_check(test_os_close_sync))
set("os_cntrl_sync", 2, make_check(test_os_cntrl_sync))
set("os_read_sync", 1, make_check(test_os_read_sync))
# intermediate representation, Albufeira term
set("ir_site_new", 2, make_check(test_ir_site_new))
set("ir_is_site", 1, make_check(test_ir_is_site))
set("ir_site_name", 2, make_check(test_ir_site_name))
set("ir_link_new", 2, make_check(test_ir_link_new))
set("ir_is_link", 1, make_check(test_ir_is_link))
# knowledge base specials, dynamic database, internal only
set("kb_clause_ref", 3, make_special(special_kb_clause_ref))
set("kb_pred_touch", 3, make_check(test_kb_pred_touch))
set("kb_clause_remove", 2, make_check(test_kb_clause_remove))
set("kb_pred_destroy", 2, make_check(test_kb_pred_destroy))
# knowledge base specials, linked provables, internal only
set("kb_pred_link", 3, make_check(test_kb_pred_link))
set("kb_link_flags", 2, make_check(test_kb_link_flags))
set("kb_pred_list", 1, make_check(test_kb_pred_list))
# knowledge base specials, meta data, internal only
set("kb_clause_shard", 2, make_check(test_kb_clause_shard))
set("kb_clause_head", 2, make_special(special_kb_clause_head))
set("kb_clause_data", 4, make_special(special_kb_clause_data))

Use Privacy (c) 2005-2026 XLOG Technologies AG