Zuletzt aktiv 1699971335

data_tasks.go Orginalformat
1package assets
2
3import (
4 "popplio/state"
5
6 "github.com/infinitybotlist/eureka/dovewing"
7 jsoniter "github.com/json-iterator/go"
8 "go.uber.org/zap"
9)
10
11var json = jsoniter.ConfigFastest
12
13func DataTask(taskId, taskName, id, ip string) {
14 del := taskName == "data_delete"
15
16 var done bool
17
18 l, _ := newTaskLogger(taskId)
19
20 // Fail failed tasks
21 defer func() {
22 if !done {
23 l.Error("Failed to complete task", zap.String("id", id), zap.Bool("del", del))
24
25 _, err := state.Pool.Exec(state.Context, "UPDATE tasks SET state = $1 WHERE task_id = $2", "failed", taskId)
26
27 if err != nil {
28 l.Error("Failed to update task", zap.Error(err), zap.String("id", id), zap.Bool("del", del))
29 }
30 }
31 }()
32
33 tx, err := state.Pool.Begin(state.Context)
34
35 if err != nil {
36 l.Error("Failed to begin transaction", zap.Error(err), zap.String("id", id), zap.Bool("del", del))
37 return
38 }
39
40 defer tx.Rollback(state.Context)
41
42 _, err = tx.Exec(state.Context, "DELETE FROM tasks WHERE task_name = $1 AND task_id != $2 AND for_user = $3", taskName, taskId, id)
43
44 if err != nil {
45 l.Error("Failed to delete old data tasks", zap.Error(err), zap.String("id", id), zap.Bool("del", del))
46 return
47 }
48
49 l.Info("Started DR/DDR task", zap.String("id", id), zap.Bool("del", del))
50
51 tableRefs, err := getAllTableRefs(tx)
52
53 if err != nil {
54 l.Error("Failed to get table refs", zap.Error(err), zap.String("id", id), zap.Bool("del", del))
55 return
56 }
57
58 // Begin fetching data recursively
59 collectedData := map[string][]any{}
60 cachedEntityIds := map[string][]string{}
61 for _, tableRef := range tableRefs {
62 fOp, ok := tableOps[tableRef.ForeignTableName]
63
64 if !ok {
65 l.Warn("Cannot fetch table due to no support for its foreign ref", zap.String("table", tableRef.TableName), zap.String("foreignTable", tableRef.ForeignTableName), zap.String("column", tableRef.ColumnName), zap.String("id", id))
66 continue
67 }
68
69 var entityIds []string
70
71 if ids, ok := cachedEntityIds[tableRef.ForeignTableName]; ok {
72 entityIds = ids
73 } else {
74 entityIds, err := fOp.GetIdsForUser(tx, l, id)
75
76 if err != nil {
77 l.Error("Failed to get ids", zap.String("table", tableRef.TableName), zap.String("foreignTable", tableRef.ForeignTableName), zap.String("column", tableRef.ColumnName), zap.String("id", id), zap.Error(err))
78 continue
79 }
80
81 cachedEntityIds[tableRef.ForeignTableName] = entityIds
82 }
83
84 var fkeysNotAdded bool
85 if _, ok := collectedData[tableRef.ForeignTableName]; !ok {
86 fkeysNotAdded = true
87 }
88
89 // Handle the entities now
90 for _, entityId := range entityIds {
91 l.Info("Fetching table", zap.String("table", tableRef.TableName), zap.String("foreignTable", tableRef.ForeignTableName), zap.String("column", tableRef.ColumnName), zap.String("id", id), zap.String("entityId", entityId))
92 rows, err := fOp.Fetch(tx, l, tableRef.TableName, tableRef.ColumnName, entityId)
93
94 if err != nil {
95 l.Error("Failed to fetch table", zap.String("table", tableRef.TableName), zap.String("foreignTable", tableRef.ForeignTableName), zap.String("column", tableRef.ColumnName), zap.String("id", id), zap.Error(err))
96 continue
97 }
98
99 for _, row := range rows {
100 if _, ok := collectedData[tableRef.TableName]; !ok {
101 collectedData[tableRef.TableName] = []any{}
102 }
103
104 collectedData[tableRef.TableName] = append(collectedData[tableRef.TableName], row)
105 }
106
107 if del {
108 err = fOp.Delete(tx, l, tableRef.TableName, tableRef.ColumnName, entityId)
109
110 if err != nil {
111 l.Error("Failed to delete table", zap.String("table", tableRef.TableName), zap.String("foreignTable", tableRef.ForeignTableName), zap.String("column", tableRef.ColumnName), zap.String("id", id), zap.Error(err))
112 }
113 }
114
115 // Fetch the foreign table's data
116 if fkeysNotAdded {
117 l.Info("Fetching foreign table", zap.String("table", tableRef.TableName), zap.String("foreignTable", tableRef.ForeignTableName), zap.String("column", tableRef.ColumnName), zap.String("id", id), zap.String("entityId", entityId))
118 rows, err := fOp.Fetch(tx, l, tableRef.ForeignTableName, tableRef.ForeignColumnName, entityId)
119
120 if err != nil {
121 l.Error("Failed to fetch table", zap.String("table", tableRef.TableName), zap.String("foreignTable", tableRef.ForeignTableName), zap.String("column", tableRef.ColumnName), zap.String("id", id), zap.Error(err))
122 continue
123 }
124
125 for _, row := range rows {
126 if _, ok := collectedData[tableRef.ForeignTableName]; !ok {
127 collectedData[tableRef.ForeignTableName] = []any{}
128 }
129
130 collectedData[tableRef.ForeignTableName] = append(collectedData[tableRef.ForeignTableName], row)
131 }
132
133 if del {
134 err = fOp.Delete(tx, l, tableRef.ForeignTableName, tableRef.ForeignColumnName, entityId)
135
136 if err != nil {
137 l.Error("Failed to delete foreign table", zap.String("table", tableRef.TableName), zap.String("foreignTable", tableRef.ForeignTableName), zap.String("column", tableRef.ColumnName), zap.String("id", id), zap.Error(err))
138 }
139 }
140 }
141 }
142 }
143
144 // Delete from psql user_cache if `del` is true
145 if del {
146 for _, dovewingPlatform := range []*dovewing.DiscordState{state.DovewingPlatformDiscord} {
147 l.Info("Deleting from user_cache [dovewing]", zap.String("id", id), zap.String("platform", dovewingPlatform.PlatformName()))
148 res, err := dovewing.ClearUser(state.Context, id, dovewingPlatform, dovewing.ClearUserReq{})
149
150 if err != nil {
151 l.Error("Error clearing user [dovewing]", zap.Error(err), zap.String("id", id), zap.String("platform", dovewingPlatform.PlatformName()))
152 }
153
154 l.Info("Cleared user [dovewing]", zap.String("id", id), zap.String("platform", dovewingPlatform.PlatformName()), zap.Any("res", res))
155 }
156 }
157
158 finalOutput := map[string]any{
159 "data": collectedData,
160 "meta": map[string]any{
161 "request_ip": ip,
162 },
163 }
164
165 _, err = tx.Exec(state.Context, "UPDATE tasks SET output = $1, state = $2 WHERE task_id = $3", finalOutput, "completed", taskId)
166
167 if err != nil {
168 l.Error("Failed to update task", zap.Error(err), zap.String("id", id), zap.Bool("del", del))
169 return
170 }
171
172 err = tx.Commit(state.Context)
173
174 if err != nil {
175 l.Error("Failed to commit transaction", zap.Error(err), zap.String("id", id), zap.Bool("del", del))
176 return
177 }
178
179 done = true
180}
181