###
# Modern Albufeira Prolog Interpreter
#
# Warranty & Liability
# To the extent permitted by applicable law and unless explicitly
# otherwise agreed upon, XLOG Technologies AG makes no warranties
# regarding the provided information. XLOG Technologies AG assumes
# no liability that any problems might be solved with the information
# provided by XLOG Technologies AG.
#
# Rights & License
# All industrial property rights regarding the information - copyright
# and patent rights in particular - are the sole property of XLOG
# Technologies AG. If the company was not the originator of some
# excerpts, XLOG Technologies AG has at least obtained the right to
# reproduce, change and translate the information.
#
# Reproduction is restricted to the whole unaltered document. Reproduction
# of the information is only allowed for non-commercial uses. Selling,
# giving away or letting of the execution of the library is prohibited.
# The library can be distributed as part of your applications and libraries
# for execution provided this comment remains unchanged.
#
# Restrictions
# Only to be distributed with programs that add significant and primary
# functionality to the library. Not to be distributed with additional
# software intended to replace any components of the library.
#
# Trademarks
# Jekejeke is a registered trademark of XLOG Technologies AG.
##
import asyncio
import os
import sys
import nova.store as store
from nova.store import (set, clear, set_stage,
Compound, is_variable, set_partition, check_nonvar)
import nova.machine as machine
from nova.machine import (register_signal,
exec_deref, is_pending, launch_async,
invoke_interrupt, launch, VOID_ARGS, is_special,
exec_unify, exec_build, make_error,
Context, CTX_MAIN, copy_term, task_async,
callback)
from nova.special import (check_atom, check_integer, objects_list,
make_check, check_number, norm_float, narrow_float, check_goal)
import nova.runtime as runtime
from nova.runtime import (
MASK_SRC_AREAD, file_close_promise, Source,
get_encoding, map_stream_error,
map_file_error)
import calendar
import urllib.request
import urllib.error
import http.client
import time
import locale
import platform
################################################################
# Main API #
################################################################
###
# Post a message to the Prolog interpreter.
#
# @param message The message.
##
def post(message):
register_signal("main", message)
invoke_interrupt("main")
###
# Run a Prolog term once. The goal is run with auto-yield
# disabled and promises are not accepted.
#
# @param goal The Prolog term.
# @return True, false or yield.
##
def perform(goal):
goal = Compound(".", [goal, "[]"])
return launch(goal, "main", VOID_ARGS)
###
# Run a Prolog term once. The goal is run with auto-yield
# enabled and promises are accepted.
#
# @param goal The Prolog term.
# @return True or false.
##
async def perform_async(goal):
goal = Compound(".", [goal, "[]"])
return await launch_async(goal, "main", VOID_ARGS)
################################################################
# Register API #
################################################################
###
# The flag indicates a non-void dispatch foreign function.
##
FFI_FUNC = 0x00000100
###
# Add a Python function to the knowledge base. The given
# function is invoked with rather slow arguments copying and
# spreading. Also the function is wrapped and registered as a
# special, therefore not eligible to neck optimization.
#
# @param functor The functor.
# @param arity The arity.
# @param func The Python function.
# @param flags The flags.
##
def register(functor, arity, func, flags):
if (flags & FFI_FUNC) != 0:
res = make_check(lambda args: invoke_func(func, args))
else:
res = make_check(lambda args: invoke(func, args))
set(functor, arity, res)
def invoke(func, args):
temp = [NotImplemented] * len(args)
i = 0
while i < len(temp):
temp[i] = exec_build(args[i])
i += 1
func(*temp)
return True
def invoke_func(func, args):
temp = [NotImplemented] * (len(args) - 1)
i = 0
while i < len(temp):
temp[i] = exec_build(args[i])
i += 1
res = func(*temp)
return exec_unify(args[len(args) - 1], res)
################################################################
# ir_object_new/1 and ir_object_current/3 #
################################################################
###
# ir_object_new(O):
# The predicate succeeds in O with a new Python object.
##
def test_ir_object_new(args):
res = {}
return exec_unify(args[0], res)
###
# ir_object_current(O, K, V):
# The predicate succeeds in V with the value of the key K
# in the Python object O.
##
def test_ir_object_current(args):
obj = exec_build(args[0])
key = exec_build(args[1])
check_atom(key)
if isinstance(obj, dict):
res = obj.get(key, NotImplemented)
else:
try:
res = getattr(obj, key, NotImplemented)
except AttributeError:
res = NotImplemented
if res is NotImplemented:
return False
return exec_unify(args[2], res)
################################################################
# ir_object_set/3, ir_object_reset/2 and ir_object_keys/2 #
################################################################
###
# ir_object_set(O, K, V):
# The predicate sets the value of the key K in the
# Python object O to V.
##
def test_ir_object_set(args):
obj = exec_build(args[0])
key = exec_build(args[1])
check_atom(key)
value = exec_build(args[2])
if isinstance(obj, dict):
obj[key] = value
else:
try:
setattr(obj, key, value)
except Exception:
raise make_error(Compound("permission_error",
["modify", "field", key]))
return True
###
# ir_object_reset(O, K):
# The predicate removes the key K from the Python object O.
##
def test_ir_object_reset(args):
obj = exec_build(args[0])
key = exec_build(args[1])
check_atom(key)
if isinstance(obj, dict):
if key in obj:
del obj[key]
else:
try:
if hasattr(obj, key):
delattr(obj, key)
except Exception:
raise make_error(Compound("permission_error",
["modify", "field", key]))
return True
###
# ir_object_keys(O, L):
# The predicate succeeds in L with the keys of
# the Python object O
##
def test_ir_object_keys(args):
obj = exec_build(args[0])
if isinstance(obj, dict):
obj = obj.keys()
else:
obj = dir(obj)
obj = set_to_list(obj)
return exec_unify(args[1], obj)
####
# Convert the keys of a native dict into a Prolog list.
#
# @param temp The native set.
##
def set_to_list(temp):
back = None
res = None
if temp is not None:
for peek in iter(temp):
peek = Compound(".", [peek, NotImplemented])
if back is None:
res = peek
else:
back.args[1] = peek
back = peek
if back is None:
res = "[]"
else:
back.args[1] = "[]"
return res
################################################################
# ir_float_value/2 #
################################################################
###
# ir_float_value(P, H):
# If P is instantiated, then the predicate succeeds in H with the
# Python float of P, otherwise in P with the Prolog float of H.
##
def test_ir_float_value(args):
alpha = exec_deref(args[0])
if is_variable(alpha) or is_pending(alpha):
beta = exec_build(args[1])
check_number(beta)
res = beta if is_special(beta) else (
norm_float(narrow_float(beta)))
return exec_unify(alpha, res)
else:
alpha = exec_build(alpha)
check_number(alpha)
res = alpha if is_special(alpha) else (
narrow_float(alpha))
return exec_unify(args[1], res)
################################################################
# os_call_later/3 and timer_cancel/1 #
################################################################
###
# os_call_later(N, D, T): internal only
# The predicate succeeds in T with a new timer. As a side
# effect it schedules the compiled goal N to be executed
# after D milliseconds.
##
def test_os_call_later(args):
goal = exec_build(args[0])
check_goal(goal)
delay = exec_build(args[1])
check_integer(delay)
if delay < 0:
raise make_error(Compound("domain_error",
["not_less_than_zero", delay]))
buf = machine.ctx
loop = asyncio.get_running_loop()
tid = loop.call_later(delay/1000.0,
lambda: callback(goal, buf, VOID_ARGS))
return exec_unify(args[2], tid)
###
# timer_cancel(T):
# The predicate succeeds. As a side effect it cancels the timer T.
##
def test_timer_cancel(args):
res = exec_build(args[0])
check_timer(res)
res.cancel()
return True
def check_timer(beta):
if not isinstance(beta, asyncio.TimerHandle):
check_nonvar(beta)
beta = copy_term(beta)
raise make_error(Compound("type_error", ["timer", beta]))
################################################################
# current_task/1, task_abort/2 and os_task_create/2 #
################################################################
###
# current_task(E):
# The predicate succeeds in E with the current task.
##
def test_current_task(args):
return exec_unify(args[0], machine.ctx)
###
# task_abort(E, M):
# The predicate succeeds. As a side effect the task E gets
# the message M signalled.
##
def test_task_abort(args):
buf = exec_build(args[0])
check_task(buf)
msg = exec_build(args[1])
msg = copy_term(msg)
register_signal(buf, msg)
invoke_interrupt(buf)
return True
def check_task(beta):
if beta != CTX_MAIN and not isinstance(beta, Context):
check_nonvar(beta)
beta = copy_term(beta)
raise make_error(Compound("type_error", ["task", beta]))
###
# os_task_create(N, E): internal only
# The predicate succeeds in E with a new task for the compiled
# goal N. The task is scheduled to executed immediately.
##
def test_os_task_create(args):
goal = exec_build(args[0])
check_goal(goal)
buf = Context()
loop = asyncio.get_running_loop()
loop.create_task(task_async(goal, buf, VOID_ARGS))
return exec_unify(args[1], buf)
################################################################
# net_open_promise/4 #
################################################################
###
# net_open_promise(P, L, S, Q):
# The predicate succeeds in Q with a promise for open input S
# on path P and option list L.
##
def test_net_open_promise(args):
url = exec_build(args[0])
check_atom(url)
opts = exec_build(args[1])
stream = Source()
if not exec_unify(args[2], stream):
return False
buf = machine.ctx
prom = open_http_promise_opts(buf, stream, url, opts)
return exec_unify(args[3], lambda: prom)
async def open_http_promise_opts(buf, stream, url, opts):
if opts is not None and "method" in opts:
method = opts["method"]
elif opts is not None and "body" in opts:
method = "POST"
else:
method = "GET"
if opts is not None and "body" in opts:
opts2 = opts["body"]
enc = get_encoding(opts2)
body = opts2["text"].encode(enc)
else:
body = None
if opts is not None and "headers" in opts:
headers = opts["headers"]
else:
headers = {}
request = urllib.request.Request(url,
data=body, headers=headers, method=method)
try:
response = await asyncio.to_thread(urllib.request.urlopen, request)
except http.client.RemoteDisconnected as err:
register_signal(buf, map_stream_error(err))
return
except urllib.error.HTTPError as err:
register_signal(buf, map_http_result(err, url))
return
except urllib.error.URLError as err:
register_signal(buf, map_file_error(err, url))
return
if opts is not None:
opts["uri"] = response.url
opts["status"] = response.status
res = {}
vmap = response.headers
for key in vmap:
res[key.lower()] = vmap[key]
opts["fields"] = res
enc = get_encoding(opts)
stream.data = response
stream.receive = lambda b, s: http_read_promise(enc, b, s)
stream.release = file_close_promise
stream.flags |= MASK_SRC_AREAD
async def http_read_promise(enc, buf, stream):
try:
res = await asyncio.to_thread(blocking_chunk, enc, stream.data)
stream.buf = res
stream.pos = 0
except IOError as err:
register_signal(buf, map_stream_error(err))
def blocking_chunk(enc, data):
return data.read().decode(enc)
def map_http_result(err, url):
code = err.code
if code == 403: # Forbidden
return Compound("permission_error",
["open", "source_sink", url])
elif code == 404: # Not Found
return Compound("existence_error",
["source_sink", url])
elif code == 405: # Method Not Allowed
return Compound("resource_error",
["illegal_method"])
elif code == 500: # Internal Server Error
return Compound("resource_error",
["internal_error"])
elif code == 503: # Service Unavailable
return Compound("resource_error",
["service_unavailable"])
else:
return Compound("resource_error",
["io_exception"])
################################################################
# net_open_sync/4 #
################################################################
HTTP_TIME = "%a, %d %b %Y %H:%M:%S GMT"
##
# net_prop_promise(F, M, Q):
# The predicate succeeds in Q with with a promise for the
# properties M of the file F. Barks if path F doesn't exist
# or io exception while resolving.
##
def test_net_prop_promise(args):
url = exec_build(args[0])
check_atom(url)
res = {}
if not exec_unify(args[1], res):
return False
buf = machine.ctx
prom = prop_http_promise(buf, url, res)
return exec_unify(args[2], lambda: prom)
async def prop_http_promise(buf, url, res):
request = urllib.request.Request(url, method="HEAD")
try:
response = await asyncio.to_thread(urllib.request.urlopen, request)
await asyncio.to_thread(response.close)
except http.client.RemoteDisconnected as err:
register_signal(buf, map_stream_error(err))
return
except urllib.error.HTTPError as err:
register_signal(buf, map_http_result(err, url))
return
except urllib.error.URLError as err:
register_signal(buf, Compound("resource_error", ["url_exception"]))
return
val = response.headers.get("last-modified")
mtime = sys_time_parse(val, HTTP_TIME, 1) \
if val is not None else -1
res["last_modified"] = mtime
res["absolute_path"] = response.url
res["type"] = "regular"
res["can_write"] = "false"
################################################################
# net_open_sync/4 and #
################################################################
###
# net_open_sync(P, M, L, S):
# The predicate succeeds. As a side effect the stream S is
# opened on the path P with the mode M and the option list L.
##
def test_net_open_sync(args):
url = exec_build(args[0])
check_atom(url)
raise make_error(Compound("permission_error",
["access", "source_sink", url]))
###
# net_cntrl_sync(F, P):
# The predicate assigns the property P to the file F.
##
def test_net_cntrl_sync(args):
url = exec_build(args[0])
check_atom(url)
raise make_error(Compound("permission_error",
["access", "source_sink", url]))
################################################################
# sys_atom_time/4 #
################################################################
###
# sys_atom_time(A, F, O, T): internal only
# If A is a variable, the built-in succeeds in A with the millisecond
# time T formatted by the pattern F and the flag O. Otherwise the
# built-in succeeds in T with the millisecond time parsed from A
# by the pattern F and the flag O.
##
def test_sys_atom_time(args):
fmt = exec_build(args[1])
check_atom(fmt)
flag = exec_build(args[2])
check_integer(flag)
text = exec_deref(args[0])
if is_variable(text) or is_pending(text):
tms = exec_build(args[3])
check_integer(tms)
try:
res = sys_time_format(fmt, tms, flag)
except ValueError:
raise make_error(Compound("domain_error", ["time_format", fmt]))
return exec_unify(text, res)
else:
text = exec_build(text)
check_atom(text)
try:
res = sys_time_parse(text, fmt, flag)
except ValueError as err:
if "in format" in err.args[0]:
raise make_error(Compound("domain_error", ["time_format", fmt]))
else:
raise make_error(Compound("syntax_error", ["illegal_date"]))
return exec_unify(args[3], res)
###
# <p>Parse a date time string.</p>
#
# @param res The date time as a string.
# @param pat The pattern.
# @param utc The UTC calendar flag.
# @return The date time in UTC milliseconds.
##
def sys_time_parse(text, fmt, utc):
if utc != 0:
locale.setlocale(locale.LC_TIME, (None, None))
date = time.strptime(text, fmt)
return calendar.timegm(date)*1000
else:
locale.setlocale(locale.LC_TIME, locale.getlocale())
date = time.strptime(text, fmt)
return int(time.mktime(date)*1000)
###
# <p>Format a date time string.</p>
#
# @param pat The format pattern.
# @param tms The date time in UTC milliseconds.
# @param utc The UTC calendar flag.
# @return The date time as a string.
##
def sys_time_format(fmt, tms, utc):
if utc != 0:
date = time.gmtime(tms/1000)
locale.setlocale(locale.LC_TIME, (None, None))
return time.strftime(fmt, date)
else:
date = time.localtime(tms/1000)
locale.setlocale(locale.LC_TIME, locale.getlocale())
return time.strftime(fmt, date)
################################################################
# os_get_gestalt/1 and os_get_host/1 #
################################################################
###
# os_get_gestalt(M): Internal only
# The built-in succeeds in M with a gestalt properties map.
##
def test_os_get_gestalt(args):
gestalt = {}
scm = os.environ.get("TERM_SCHEME", None)
gestalt["sys_color"] = scm if scm is not None else "normal"
loc = locale.getlocale()[0]
gestalt["sys_locale"] = loc if loc is not None else ""
res = objects_list(sys.argv, 1, len(sys.argv)-1)
gestalt["argv"] = res
return exec_unify(args[0], gestalt)
###
# os_get_host(M): internal only
# The built-in succeeds in W with a host properties map.
##
def test_os_get_host(args):
res = {}
res["system_url"] = runtime.bootbase
res["foreign_ext"] = ".py"
res["host_info"] = platform.python_implementation()+", Python "+platform.python_version()
res["mach_info"] = platform.processor()+", "+platform.platform()
return exec_unify(args[0], res)
################################################################
# os_get_workdir/1 and os_set_workdir/1 #
################################################################
###
# os_get_workdir(D): internal only
# The built-in succeeds in D with the working directory.
##
def test_os_get_workdir(args):
url = os.getcwd() + os.path.sep
return exec_unify(args[0], url)
###
# os_set_workdir(D): internal only
# The built-in succeeds. As a side effect it changes the working directory to D.
##
def test_os_set_workdir(args):
url = exec_build(args[0])
check_atom(url)
try:
os.chdir(url)
except OSError as err:
raise make_error(map_file_error(err, url))
return True
################################################################
# dg_get_flags/1, dg_get_partition/1 and dg_set_partition/1 #
################################################################
###
# dg_get_flags(W): internal only
# The built-in succeeds in W with the engine flags.
##
def test_dg_get_flags(args):
return exec_unify(args[0], store.engine.flags)
###
# dg_get_partition(D): internal only
# The built-in succeeds in D with the current partition.
##
def test_dg_get_partition(args):
return exec_unify(args[0], store.engine.partition)
###
# dg_set_partition(D): internal only
# The built-in succeeds. As a side effect it changes the current partition.
##
def test_dg_set_partition(args):
value = exec_build(args[0])
check_atom(value)
set_partition(value)
return True
################################################################
# dg_clear_stage/0, dg_get_stage/1 and dg_set_stage/1 #
################################################################
###
# dg_clear_stage: internal only
# The built-in succeeds. As a side effect it clears the current stage.
##
def test_dg_clear_stage(args):
clear()
return True
###
# dg_get_stage(D): internal only
# The built-in succeeds in D with the current stage.
##
def test_dg_get_stage(args):
return exec_unify(args[0], store.stage)
###
# dg_set_stage(D): internal only
# The built-in succeeds. As a side effect it changes the current stage.
##
def test_dg_set_stage(args):
value = exec_build(args[0])
check_integer(value)
set_stage(value)
return True
################################################################
# Theatre Init #
################################################################
# object specials
set("ir_object_new", 1, make_check(test_ir_object_new))
set("ir_object_current", 3, make_check(test_ir_object_current))
set("ir_object_set", 3, make_check(test_ir_object_set))
set("ir_object_reset", 2, make_check(test_ir_object_reset))
set("ir_object_keys", 2, make_check(test_ir_object_keys))
# object specials II
set("ir_float_value", 2, make_check(test_ir_float_value))
# system specials, coroutines
set("os_call_later", 3, make_check(test_os_call_later))
set("timer_cancel", 1, make_check(test_timer_cancel))
set("current_task", 1, make_check(test_current_task))
set("task_abort", 2, make_check(test_task_abort))
set("os_task_create", 2, make_check(test_os_task_create))
# net specials, async
set("net_open_promise", 4, make_check(test_net_open_promise))
set("net_prop_promise", 3, make_check(test_net_prop_promise))
# net specials, sync
set("net_open_sync", 4, make_check(test_net_open_sync))
set("net_cntrl_sync", 2, make_check(test_net_cntrl_sync))
# atom specials, time
set("sys_atom_time", 4, make_check(test_sys_atom_time))
# object specials, gestalt
set("os_get_gestalt", 1, make_check(test_os_get_gestalt))
set("os_get_host", 1, make_check(test_os_get_host))
set("os_get_workdir", 1, make_check(test_os_get_workdir))
set("os_set_workdir", 1, make_check(test_os_set_workdir))
# system specials, staging, internal only
set("dg_get_flags", 1, make_check(test_dg_get_flags))
set("dg_get_partition", 1, make_check(test_dg_get_partition))
set("dg_set_partition", 1, make_check(test_dg_set_partition))
set("dg_clear_stage", 0, make_check(test_dg_clear_stage))
set("dg_get_stage", 1, make_check(test_dg_get_stage))
set("dg_set_stage", 1, make_check(test_dg_set_stage))