rootspring revised this gist . Go to revision
1 file changed, 118 insertions
worker.rs(file created)
@@ -0,0 +1,118 @@ | |||
1 | + | // NOTE: Because the mlua crate is not Sync, we can use tokio spawn_local to run the Lua VM in an async context | |
2 | + | // but pinned to a single thread | |
3 | + | // | |
4 | + | // This is highly experimental | |
5 | + | pub struct LuaWorker { | |
6 | + | /// A handle that allows stopping the VM inside its tokio localset | |
7 | + | /// | |
8 | + | /// This is wrapped in an option to allow destroying the handle when the LuaWorker is dropped | |
9 | + | pub tx_stop: Option<tokio::sync::oneshot::Sender<()>>, | |
10 | + | /// A channel used for sending requests to the VM | |
11 | + | pub tx_msg_recv: tokio::sync::mpsc::Sender<LuaWorkerRequest>, | |
12 | + | /// A channel that can be used to listen for a response from the VM | |
13 | + | pub rx_msg_resp: tokio::sync::mpsc::Receiver<LuaWorkerResponse>, | |
14 | + | } | |
15 | + | ||
16 | + | impl LuaWorker { | |
17 | + | // Executes a Lua script in the Lua VM | |
18 | + | pub async fn exec( | |
19 | + | vm: &Lua, | |
20 | + | template: &str, | |
21 | + | args: serde_json::Value, | |
22 | + | ) -> Result<serde_json::Value, base_data::Error> { | |
23 | + | let f: LuaFunction = vm | |
24 | + | .load(template) | |
25 | + | .eval_async() | |
26 | + | .await | |
27 | + | .map_err(|e| LuaError::external(e.to_string()))?; | |
28 | + | ||
29 | + | let _args = vm | |
30 | + | .create_table() | |
31 | + | .map_err(|e| LuaError::external(e.to_string()))?; | |
32 | + | ||
33 | + | let args = vm | |
34 | + | .to_value(&args) | |
35 | + | .map_err(|e| LuaError::external(e.to_string()))?; | |
36 | + | ||
37 | + | _args | |
38 | + | .set("args", args) | |
39 | + | .map_err(|e| LuaError::external(e.to_string()))?; | |
40 | + | ||
41 | + | let v: LuaValue = f | |
42 | + | .call_async(_args) | |
43 | + | .await | |
44 | + | .map_err(|e| LuaError::external(e.to_string()))?; | |
45 | + | ||
46 | + | let v = serde_json::to_value(v).map_err(|e| LuaError::external(e.to_string()))?; | |
47 | + | ||
48 | + | Ok(v) | |
49 | + | } | |
50 | + | ||
51 | + | // Spawns a new LuaWorker with the given Lua VM | |
52 | + | pub fn new(lua: Lua) -> Self { | |
53 | + | let (tx_stop, rx_stop) = tokio::sync::oneshot::channel(); | |
54 | + | let (tx_msg_recv, rx_msg_recv) = tokio::sync::mpsc::channel(32); | |
55 | + | let (tx_msg_resp, rx_msg_resp) = tokio::sync::mpsc::channel(32); | |
56 | + | ||
57 | + | let worker = LuaWorker { | |
58 | + | tx_stop: Some(tx_stop), | |
59 | + | tx_msg_recv, | |
60 | + | rx_msg_resp, | |
61 | + | }; | |
62 | + | ||
63 | + | let rt = tokio::runtime::Builder::new_current_thread() | |
64 | + | .enable_all() | |
65 | + | .build() | |
66 | + | .unwrap(); | |
67 | + | ||
68 | + | std::thread::spawn(move || { | |
69 | + | rt.block_on(async move { | |
70 | + | let lua = Arc::new(lua); | |
71 | + | let mut rx_stop = rx_stop; | |
72 | + | let mut rx_msg_recv = rx_msg_recv; | |
73 | + | let mut tx_msg_resp = tx_msg_resp; | |
74 | + | ||
75 | + | loop { | |
76 | + | let lua = lua.clone(); | |
77 | + | let tx_msg_resp = tx_msg_resp.clone(); | |
78 | + | ||
79 | + | tokio::select! { | |
80 | + | /*_ = rx_stop => { | |
81 | + | break; | |
82 | + | },*/ | |
83 | + | Some(msg) = rx_msg_recv.recv() => { | |
84 | + | tokio::task::spawn_local(async move { | |
85 | + | let lua = lua.clone(); | |
86 | + | let res = LuaWorker::exec(&lua, &msg.template, msg.args).await; | |
87 | + | let _ = tx_msg_resp.send(match res { | |
88 | + | Ok(v) => LuaWorkerResponse::Ok(v), | |
89 | + | Err(e) => LuaWorkerResponse::Err(e.to_string()), | |
90 | + | }); | |
91 | + | }); | |
92 | + | } | |
93 | + | } | |
94 | + | } | |
95 | + | }); | |
96 | + | }); | |
97 | + | ||
98 | + | worker | |
99 | + | } | |
100 | + | } | |
101 | + | ||
102 | + | impl Drop for LuaWorker { | |
103 | + | fn drop(&mut self) { | |
104 | + | if let Some(sender) = self.tx_stop.take() { | |
105 | + | let _ = sender.send(()); | |
106 | + | } | |
107 | + | } | |
108 | + | } | |
109 | + | ||
110 | + | pub struct LuaWorkerRequest { | |
111 | + | pub template: String, | |
112 | + | pub args: serde_json::Value, | |
113 | + | } | |
114 | + | ||
115 | + | pub enum LuaWorkerResponse { | |
116 | + | Ok(serde_json::Value), | |
117 | + | Err(String), | |
118 | + | } |
Newer
Older