Last active 1699971335

rootspring's Avatar rootspring revised this gist 1699971335. Go to revision

1 file changed, 180 insertions

data_tasks.go(file created)

@@ -0,0 +1,180 @@
1 + package assets
2 +
3 + import (
4 + "popplio/state"
5 +
6 + "github.com/infinitybotlist/eureka/dovewing"
7 + jsoniter "github.com/json-iterator/go"
8 + "go.uber.org/zap"
9 + )
10 +
11 + var json = jsoniter.ConfigFastest
12 +
13 + func DataTask(taskId, taskName, id, ip string) {
14 + del := taskName == "data_delete"
15 +
16 + var done bool
17 +
18 + l, _ := newTaskLogger(taskId)
19 +
20 + // Fail failed tasks
21 + defer func() {
22 + if !done {
23 + l.Error("Failed to complete task", zap.String("id", id), zap.Bool("del", del))
24 +
25 + _, err := state.Pool.Exec(state.Context, "UPDATE tasks SET state = $1 WHERE task_id = $2", "failed", taskId)
26 +
27 + if err != nil {
28 + l.Error("Failed to update task", zap.Error(err), zap.String("id", id), zap.Bool("del", del))
29 + }
30 + }
31 + }()
32 +
33 + tx, err := state.Pool.Begin(state.Context)
34 +
35 + if err != nil {
36 + l.Error("Failed to begin transaction", zap.Error(err), zap.String("id", id), zap.Bool("del", del))
37 + return
38 + }
39 +
40 + defer tx.Rollback(state.Context)
41 +
42 + _, err = tx.Exec(state.Context, "DELETE FROM tasks WHERE task_name = $1 AND task_id != $2 AND for_user = $3", taskName, taskId, id)
43 +
44 + if err != nil {
45 + l.Error("Failed to delete old data tasks", zap.Error(err), zap.String("id", id), zap.Bool("del", del))
46 + return
47 + }
48 +
49 + l.Info("Started DR/DDR task", zap.String("id", id), zap.Bool("del", del))
50 +
51 + tableRefs, err := getAllTableRefs(tx)
52 +
53 + if err != nil {
54 + l.Error("Failed to get table refs", zap.Error(err), zap.String("id", id), zap.Bool("del", del))
55 + return
56 + }
57 +
58 + // Begin fetching data recursively
59 + collectedData := map[string][]any{}
60 + cachedEntityIds := map[string][]string{}
61 + for _, tableRef := range tableRefs {
62 + fOp, ok := tableOps[tableRef.ForeignTableName]
63 +
64 + if !ok {
65 + l.Warn("Cannot fetch table due to no support for its foreign ref", zap.String("table", tableRef.TableName), zap.String("foreignTable", tableRef.ForeignTableName), zap.String("column", tableRef.ColumnName), zap.String("id", id))
66 + continue
67 + }
68 +
69 + var entityIds []string
70 +
71 + if ids, ok := cachedEntityIds[tableRef.ForeignTableName]; ok {
72 + entityIds = ids
73 + } else {
74 + entityIds, err := fOp.GetIdsForUser(tx, l, id)
75 +
76 + if err != nil {
77 + l.Error("Failed to get ids", zap.String("table", tableRef.TableName), zap.String("foreignTable", tableRef.ForeignTableName), zap.String("column", tableRef.ColumnName), zap.String("id", id), zap.Error(err))
78 + continue
79 + }
80 +
81 + cachedEntityIds[tableRef.ForeignTableName] = entityIds
82 + }
83 +
84 + var fkeysNotAdded bool
85 + if _, ok := collectedData[tableRef.ForeignTableName]; !ok {
86 + fkeysNotAdded = true
87 + }
88 +
89 + // Handle the entities now
90 + for _, entityId := range entityIds {
91 + l.Info("Fetching table", zap.String("table", tableRef.TableName), zap.String("foreignTable", tableRef.ForeignTableName), zap.String("column", tableRef.ColumnName), zap.String("id", id), zap.String("entityId", entityId))
92 + rows, err := fOp.Fetch(tx, l, tableRef.TableName, tableRef.ColumnName, entityId)
93 +
94 + if err != nil {
95 + l.Error("Failed to fetch table", zap.String("table", tableRef.TableName), zap.String("foreignTable", tableRef.ForeignTableName), zap.String("column", tableRef.ColumnName), zap.String("id", id), zap.Error(err))
96 + continue
97 + }
98 +
99 + for _, row := range rows {
100 + if _, ok := collectedData[tableRef.TableName]; !ok {
101 + collectedData[tableRef.TableName] = []any{}
102 + }
103 +
104 + collectedData[tableRef.TableName] = append(collectedData[tableRef.TableName], row)
105 + }
106 +
107 + if del {
108 + err = fOp.Delete(tx, l, tableRef.TableName, tableRef.ColumnName, entityId)
109 +
110 + if err != nil {
111 + l.Error("Failed to delete table", zap.String("table", tableRef.TableName), zap.String("foreignTable", tableRef.ForeignTableName), zap.String("column", tableRef.ColumnName), zap.String("id", id), zap.Error(err))
112 + }
113 + }
114 +
115 + // Fetch the foreign table's data
116 + if fkeysNotAdded {
117 + l.Info("Fetching foreign table", zap.String("table", tableRef.TableName), zap.String("foreignTable", tableRef.ForeignTableName), zap.String("column", tableRef.ColumnName), zap.String("id", id), zap.String("entityId", entityId))
118 + rows, err := fOp.Fetch(tx, l, tableRef.ForeignTableName, tableRef.ForeignColumnName, entityId)
119 +
120 + if err != nil {
121 + l.Error("Failed to fetch table", zap.String("table", tableRef.TableName), zap.String("foreignTable", tableRef.ForeignTableName), zap.String("column", tableRef.ColumnName), zap.String("id", id), zap.Error(err))
122 + continue
123 + }
124 +
125 + for _, row := range rows {
126 + if _, ok := collectedData[tableRef.ForeignTableName]; !ok {
127 + collectedData[tableRef.ForeignTableName] = []any{}
128 + }
129 +
130 + collectedData[tableRef.ForeignTableName] = append(collectedData[tableRef.ForeignTableName], row)
131 + }
132 +
133 + if del {
134 + err = fOp.Delete(tx, l, tableRef.ForeignTableName, tableRef.ForeignColumnName, entityId)
135 +
136 + if err != nil {
137 + l.Error("Failed to delete foreign table", zap.String("table", tableRef.TableName), zap.String("foreignTable", tableRef.ForeignTableName), zap.String("column", tableRef.ColumnName), zap.String("id", id), zap.Error(err))
138 + }
139 + }
140 + }
141 + }
142 + }
143 +
144 + // Delete from psql user_cache if `del` is true
145 + if del {
146 + for _, dovewingPlatform := range []*dovewing.DiscordState{state.DovewingPlatformDiscord} {
147 + l.Info("Deleting from user_cache [dovewing]", zap.String("id", id), zap.String("platform", dovewingPlatform.PlatformName()))
148 + res, err := dovewing.ClearUser(state.Context, id, dovewingPlatform, dovewing.ClearUserReq{})
149 +
150 + if err != nil {
151 + l.Error("Error clearing user [dovewing]", zap.Error(err), zap.String("id", id), zap.String("platform", dovewingPlatform.PlatformName()))
152 + }
153 +
154 + l.Info("Cleared user [dovewing]", zap.String("id", id), zap.String("platform", dovewingPlatform.PlatformName()), zap.Any("res", res))
155 + }
156 + }
157 +
158 + finalOutput := map[string]any{
159 + "data": collectedData,
160 + "meta": map[string]any{
161 + "request_ip": ip,
162 + },
163 + }
164 +
165 + _, err = tx.Exec(state.Context, "UPDATE tasks SET output = $1, state = $2 WHERE task_id = $3", finalOutput, "completed", taskId)
166 +
167 + if err != nil {
168 + l.Error("Failed to update task", zap.Error(err), zap.String("id", id), zap.Bool("del", del))
169 + return
170 + }
171 +
172 + err = tx.Commit(state.Context)
173 +
174 + if err != nil {
175 + l.Error("Failed to commit transaction", zap.Error(err), zap.String("id", id), zap.Bool("del", del))
176 + return
177 + }
178 +
179 + done = true
180 + }
Newer Older