Day 57 — cutypr: an even littler Jupyter console
28 October 2020 · recurse-centerToday I read the first chapter of the zeromq guide and looked at the client-server and pub-sub examples! I love this paragraph from the preface (sounds so similar to Derek Sivers's CD Baby email!):
We took a normal TCP socket, injected it with a mix of radioactive isotopes stolen from a secret Soviet atomic research project, bombarded it with 1950-era cosmic rays, and put it into the hands of a drug-addled comic book author with a badly-disguised fetish for bulging muscles clad in spandex. Yes, ZeroMQ sockets are the world-saving superheroes of the networking world.
Sometime next week, I want to look closely into how Linux sockets work!
Today I also stripped down yesterday's little Jupyter console into an even littler Jupyter console, by looking at the jupyter_client
code, and reusing some of that to make a stripped (a lot!) down version of the client.
At a high level, the code:
- Starts the Python kernel
- Instantiates a
Cutypr
object - Starts a REPL
- Sends code for execution with
client.execute(code)
- Creates an
execute_request
message - Serializes it
- Sends it to the
shell
socket
- Creates an
- Gets the execution result with
client.get_msg()
- Polls the
iopub
socket - Gets a message from it
- Deserializes it
- Prints result!
- Polls the
- Sends code for execution with
import os | |
import sys | |
import json | |
import hmac | |
import hashlib | |
import traceback | |
from binascii import b2a_hex | |
from datetime import datetime, timezone | |
import zmq | |
from zmq.utils import jsonapi | |
from jupyter_client import KernelManager | |
json_packer = lambda obj: jsonapi.dumps(obj, ensure_ascii=False, allow_nan=False) | |
json_unpacker = lambda s: jsonapi.loads(s) | |
def new_id(): | |
buf = os.urandom(16) | |
return "-".join(b2a_hex(x).decode("ascii") for x in (buf[:4], buf[4:])) | |
class Session(object): | |
def __init__(self, key): | |
self.key = key | |
self.session_id = new_id() | |
class Cutypr(object): | |
def __init__(self, session=None, ports=None): | |
self.context = zmq.Context() | |
self.session = session | |
self.ports = ports | |
self.message_count = 0 | |
self._shell_channel = None | |
self._iopub_channel = None | |
self.auth = hmac.HMAC(session.key, digestmod=hashlib.sha256) | |
def _make_url(self, channel): | |
port = self.ports[channel] | |
return f"tcp://127.0.0.1:{port}" | |
def _make_channel(self, channel): | |
socket_type = { | |
"shell": zmq.DEALER, | |
"iopub": zmq.SUB, | |
} | |
sock = self.context.socket(socket_type[channel]) | |
sock.linger = 1000 | |
sock.connect(self._make_url(channel)) | |
if channel == "iopub": | |
sock.setsockopt(zmq.SUBSCRIBE, b"") | |
return sock | |
@property | |
def shell_channel(self): | |
if self._shell_channel is None: | |
self._shell_channel = self._make_channel("shell") | |
return self._shell_channel | |
@property | |
def iopub_channel(self): | |
if self._iopub_channel is None: | |
self._iopub_channel = self._make_channel("iopub") | |
return self._iopub_channel | |
def _make_message(self, message_type, content): | |
msg = {} | |
msg_id = f"{self.session.session_id}_{self.message_count}" | |
self.message_count += 1 | |
header = { | |
"msg_id": msg_id, | |
"msg_type": message_type, | |
"username": "vinayak", | |
"session": self.session.session_id, | |
} | |
msg["header"] = header | |
msg["msg_id"] = header["msg_id"] | |
msg["msg_type"] = header["msg_type"] | |
msg["content"] = content | |
msg["metadata"] = {} | |
msg["parent_header"] = {} | |
return msg | |
def sign(self, msg_list): | |
h = self.auth.copy() | |
for m in msg_list: | |
h.update(m) | |
return h.hexdigest().encode("utf-8") | |
def serialize(self, msg): | |
msg_list = [ | |
json_packer(msg["header"]), | |
json_packer(msg["parent_header"]), | |
json_packer(msg["metadata"]), | |
json_packer(msg.get("content", {})), | |
] | |
DELIM = b"<IDS|MSG>" | |
signature = self.sign(msg_list) | |
to_send = [ | |
DELIM, | |
signature, | |
] | |
to_send.extend(msg_list) | |
return to_send | |
def execute(self, code): | |
content = dict( | |
code=code, | |
silent=False, | |
store_history=True, | |
user_expressions=None, | |
allow_stdin=True, | |
stop_on_error=True, | |
) | |
msg = self._make_message("execute_request", content) | |
msg_list = self.serialize(msg) | |
self.shell_channel.send_multipart(msg_list) | |
return msg["header"]["msg_id"] | |
def deserialize(self, msg_list): | |
message = {} | |
header = json_unpacker(msg_list[3]) | |
message["header"] = header | |
message["msg_id"] = header["msg_id"] | |
message["msg_type"] = header["msg_type"] | |
message["metadata"] = json_unpacker(msg_list[5]) | |
message["content"] = json_unpacker(msg_list[6]) | |
return message | |
def is_alive(self, channel): | |
return eval(f"self.{channel}_channel") is not None | |
def msg_ready(self): | |
return bool(self.iopub_channel.poll(timeout=0)) | |
def get_msg(self): | |
msg_list = self.iopub_channel.recv_multipart() | |
msg = self.deserialize(msg_list) | |
return msg | |
if __name__ == "__main__": | |
try: | |
manager = KernelManager() | |
manager.start_kernel() | |
port_names = ["shell", "stdin", "iopub", "hb", "control"] | |
ports = dict(list(zip(port_names, manager.ports))) | |
session = Session(key=manager.session.key) | |
client = Cutypr(session=session, ports=ports) | |
execution_state = "idle" | |
execution_count = 1 | |
while True: | |
code = input(f"In [{execution_count}]: ") | |
if not code.strip(): | |
continue | |
client.execute(code) | |
execution_state = "busy" | |
while execution_state != "idle" and client.is_alive("iopub"): | |
while client.msg_ready(): | |
msg = client.get_msg() | |
msg_type = msg["header"]["msg_type"] | |
if msg_type == "status": | |
execution_state = msg["content"]["execution_state"] | |
elif msg_type == "stream": | |
if msg["content"]["name"] == "stdout": | |
print(msg["content"]["text"]) | |
sys.stdout.flush() | |
elif msg["content"]["name"] == "stderr": | |
print(msg["content"]["text"], file=sys.stderr) | |
sys.stderr.flush() | |
elif msg_type == "execute_result": | |
pass | |
elif msg_type == "display_data": | |
pass | |
elif msg_type == "execute_input": | |
execution_count = int(msg["content"]["execution_count"]) + 1 | |
elif msg_type == "clear_output": | |
pass | |
elif msg_type == "error": | |
for frame in msg["content"]["traceback"]: | |
print(frame, file=sys.stderr) | |
except Exception as e: | |
traceback.print_exc() | |
finally: | |
manager.shutdown_kernel() |
This is the only example using which I'm testing if everything works for now:
In [1]: import os
In [2]: print(os.getcwd())
/home/vinayak/dev
In [3]:
Now I need to translate the Cutypr
class to Rust, and also figure out a way to start the Python kernel from Rust!
To get some Rust writing practice, I'm also thinking about creating a curlyboi version of Julia Evans's snake.rs game. As a first step, I converted the usage of ncurses
code to the ncursesw
code so that I can get those ▛s in later.
I also made some changes to the pdftopng
setup.py
and GitHub Actions workflow to start building 32-bit Windows wheels! I've pushed the Python 3.8 Windows wheels (both 32 and 64-bit) to PyPI so if you use Windows, please help me test if pdftopng
works there by installing pdftopng
and running the example in the README :)