본문으로 건너뛰기
버전: In Development

Client

aerospike-py는 동일한 기능을 가진 동기(Client)와 비동기(AsyncClient) API를 제공합니다.

Creating a Client

import aerospike_py as aerospike

client = aerospike.client({
"hosts": [("127.0.0.1", 3000)],
"cluster_name": "docker",
}).connect()

Context Manager

__enter__() / __exit__()

with aerospike.client({
"hosts": [("127.0.0.1", 3000)],
"cluster_name": "docker",
}).connect() as client:
client.put(key, bins)
# close()가 종료 시 자동으로 호출됩니다

Connection

connect(username=None, password=None)

Aerospike 클러스터에 연결합니다.

메서드 체이닝을 위해 self를 반환합니다.

client = aerospike.client(config).connect()
# 인증 포함
client = aerospike.client(config).connect("admin", "admin")

is_connected()

클라이언트가 연결되어 있으면 True를 반환합니다. 두 클라이언트 모두에서 동기 메서드입니다.

if client.is_connected():
print("Connected")

close()

클러스터와의 연결을 종료합니다.

client.close()

get_node_names()

클러스터 노드 이름 목록을 반환합니다.

nodes = client.get_node_names()

CRUD Operations

put(key, bins, meta=None, policy=None)

레코드를 작성합니다.

파라미터타입설명
keytuple[str, str, str|int|bytes](namespace, set, pk)
binsdict[str, Any]빈 이름-값 쌍
metaWriteMeta선택: {"ttl": int, "gen": int}
policyWritePolicy선택: {"key", "exists", "gen", "timeout", ...}
key = ("test", "demo", "user1")
client.put(key, {"name": "Alice", "age": 30})
client.put(key, {"x": 1}, meta={"ttl": 300})
client.put(key, {"x": 1}, policy={"exists": aerospike.POLICY_EXISTS_CREATE_ONLY})

get(key, policy=None)

레코드를 읽습니다. Record NamedTuple (key, meta, bins)를 반환합니다.

key, meta, bins = client.get(("test", "demo", "user1"))
# meta.gen == 1, meta.ttl == 2591998
# bins = {"name": "Alice", "age": 30}
노트

레코드가 존재하지 않으면 RecordNotFound가 발생합니다.

select(key, bins, policy=None)

레코드에서 특정 빈만 읽습니다.

_, meta, bins = client.select(key, ["name"])
# bins = {"name": "Alice"}

exists(key, policy=None)

레코드 존재 여부를 확인합니다. ExistsResult NamedTuple (key, meta)를 반환하며, 레코드가 없으면 metaNone입니다.

_, meta = client.exists(key)
if meta is not None:
print(f"Found, gen={meta.gen}")

remove(key, meta=None, policy=None)

레코드를 삭제합니다.

client.remove(key)
# 세대 검사 포함
client.remove(key, meta={"gen": 3}, policy={"gen": aerospike.POLICY_GEN_EQ})

touch(key, val=0, meta=None, policy=None)

레코드의 TTL을 리셋합니다.

client.touch(key, val=300)

String / Numeric Operations

append(key, bin, val, meta=None, policy=None)

빈에 문자열을 추가합니다.

client.append(key, "name", "_suffix")

prepend(key, bin, val, meta=None, policy=None)

빈 앞에 문자열을 삽입합니다.

client.prepend(key, "name", "prefix_")

increment(key, bin, offset, meta=None, policy=None)

정수 또는 실수 빈 값을 증가시킵니다.

client.increment(key, "age", 1)
client.increment(key, "score", 0.5)

remove_bin(key, bin_names, meta=None, policy=None)

레코드에서 특정 빈을 제거합니다.

client.remove_bin(key, ["temp_bin", "debug_bin"])

Multi-Operation

operate(key, ops, meta=None, policy=None)

단일 레코드에 여러 연산을 원자적으로 실행합니다.

ops = [
{"op": aerospike.OPERATOR_INCR, "bin": "counter", "val": 1},
{"op": aerospike.OPERATOR_READ, "bin": "counter", "val": None},
]
_, meta, bins = client.operate(key, ops)

operate_ordered(key, ops, meta=None, policy=None)

operate와 동일하지만 결과를 OperateOrderedResult NamedTuple의 ordered_bins 필드에 BinTuple(name, value) 리스트로 반환합니다.

_, meta, results = client.operate_ordered(key, ops)
# results = [("counter", 2)]

Batch Operations

batch_read(keys, bins=None, policy=None)

여러 레코드를 읽습니다. LazyBatchRecords 핸들을 반환하며, .to_dict() / .to_numpy(dtype) 또는 dict-style Mapping 접근으로 materialise 합니다.

  • bins=None - 모든 bin 읽기
  • bins=["a", "b"] - 특정 bin만 읽기
  • bins=[] - 존재 여부만 확인
keys = [("test", "demo", f"user_{i}") for i in range(10)]

# 모든 bin 읽기
batch = client.batch_read(keys)
for br in batch.batch_records:
if br.record:
key, meta, bins = br.record
print(bins)

# 특정 bin만 읽기
batch = client.batch_read(keys, bins=["name", "age"])

# 존재 여부만 확인
batch = client.batch_read(keys, bins=[])
for br in batch.batch_records:
print(f"{br.key}: exists={br.record is not None}")

batch_operate(keys, ops, policy=None)

여러 레코드에 연산을 실행합니다.

ops = [{"op": aerospike.OPERATOR_INCR, "bin": "views", "val": 1}]
results = client.batch_operate(keys, ops)

batch_remove(keys, policy=None)

여러 레코드를 삭제합니다.

results = client.batch_remove(keys)

batch_apply(keys, module, function, args=None, policy=None)

등록된 Lua UDF를 여러 레코드에 한 번의 배치 호출로 실행합니다. apply()와 동일한 와이어 형태이지만, 라운드 트립 한 번으로 키들에 분산 처리합니다.

파라미터설명
keys일반 Key 튜플 리스트, 또는 일반 키와 (key, meta) 페어가 섞인 리스트. metaBatchUDFMeta 평면 dict로, 특정 레코드에 대해 UDF 호출 형태(module/function/args)와 정책 필드(ttl/commit_level/key/durable_delete)를 오버라이드할 수 있습니다.
module레코드 단위 module 오버라이드가 없을 때 사용할 기본 UDF 모듈명.
function기본 UDF 함수명.
args선택적 기본 인자 리스트. BatchUDFMetaargs(빈 리스트 [] 포함)가 이 기본값을 오버라이드합니다.
policy선택적 정책 dict. 전송-레벨 BatchPolicy와 배치-레벨 BatchUDFPolicy 기본값(commit_level, ttl, key, durable_delete, filter_expression)을 합칠 수 있습니다.

반환값: BatchWriteResult. 레코드별 결과 코드는 batch_records: list[BatchRecord]에 담깁니다. UDF 반환값은 호출이 성공한 경우 레코드의 Record.bins 맵에 Lua 컨벤션인 "SUCCESS" 키로 저장됩니다.

# 모든 키에 동일한 UDF 적용
keys = [("test", "demo", f"u_{i}") for i in range(10)]
results = client.batch_apply(keys, "my_udf", "increment_counter", [1])

for br in results.batch_records:
if br.result == 0 and br.record is not None:
# UDF 반환값은 "SUCCESS" 빈에 저장됨
print(br.record.bins.get("SUCCESS"))

# 레코드별 오버라이드: 한 레코드만 다른 args + 긴 TTL
results = client.batch_apply(
[
("test", "demo", "u_1"), # 기본 args 사용
(("test", "demo", "u_2"), {"args": [5], "ttl": 3600}),
],
"my_udf", "increment_counter", args=[1],
)

Query

query(namespace, set_name)

Secondary Index 쿼리를 위한 Query 객체를 생성합니다. Query API를 참조하세요.

query = client.query("test", "demo")

Index Management

index_integer_create(namespace, set_name, bin_name, index_name, policy=None)

숫자 Secondary Index를 생성합니다.

client.index_integer_create("test", "demo", "age", "age_idx")

index_string_create(namespace, set_name, bin_name, index_name, policy=None)

문자열 Secondary Index를 생성합니다.

client.index_string_create("test", "demo", "name", "name_idx")

index_geo2dsphere_create(namespace, set_name, bin_name, index_name, policy=None)

지리공간 Secondary Index를 생성합니다.

client.index_geo2dsphere_create("test", "demo", "location", "geo_idx")

index_remove(namespace, index_name, policy=None)

Secondary Index를 제거합니다.

client.index_remove("test", "age_idx")

Truncate

truncate(namespace, set_name, nanos=0, policy=None)

네임스페이스/세트의 모든 레코드를 제거합니다.

client.truncate("test", "demo")

UDF

udf_put(filename, udf_type=0, policy=None)

Lua UDF 모듈을 등록합니다.

client.udf_put("my_udf.lua")

udf_remove(module, policy=None)

등록된 UDF 모듈을 제거합니다.

client.udf_remove("my_udf")

apply(key, module, function, args=None, policy=None)

레코드에 UDF를 실행합니다.

result = client.apply(key, "my_udf", "my_function", [1, "hello"])

Concurrency Patterns (Async)

asyncio.gather를 사용한 병렬 쓰기

keys = [("test", "demo", f"item_{i}") for i in range(100)]
tasks = [client.put(k, {"idx": i}) for i, k in enumerate(keys)]
await asyncio.gather(*tasks)

병렬 읽기

keys = [("test", "demo", f"item_{i}") for i in range(100)]
tasks = [client.get(k) for k in keys]
results = await asyncio.gather(*tasks, return_exceptions=True)

혼합 연산

async def process_user(client, user_id):
key = ("test", "users", user_id)
_, _, bins = await client.get(key)
bins["visits"] = bins.get("visits", 0) + 1
await client.put(key, bins)
return bins

results = await asyncio.gather(*[
process_user(client, f"user_{i}")
for i in range(10)
])

Admin Operations

User Management

메서드설명
admin_create_user(username, password, roles)사용자 생성
admin_drop_user(username)사용자 삭제
admin_change_password(username, password)비밀번호 변경
admin_grant_roles(username, roles)역할 부여
admin_revoke_roles(username, roles)역할 회수
admin_query_user(username)사용자 정보 조회
admin_query_users()전체 사용자 목록

Role Management

메서드설명
admin_create_role(role, privileges, ...)역할 생성
admin_drop_role(role)역할 삭제
admin_grant_privileges(role, privileges)권한 부여
admin_revoke_privileges(role, privileges)권한 회수
admin_query_role(role)역할 정보 조회
admin_query_roles()전체 역할 목록
admin_set_whitelist(role, whitelist)IP 화이트리스트 설정
admin_set_quotas(role, read_quota, write_quota)쿼터 설정
# 사용자 생성
client.admin_create_user("new_user", "password", ["read-write"])

# 권한이 포함된 역할 생성
client.admin_create_role("custom_role", [
{"code": aerospike.PRIV_READ, "ns": "test", "set": "demo"}
])

Expression Filters

policy 파라미터를 받는 모든 읽기/쓰기/배치 작업은 서버사이드 필터링을 위한 filter_expression 키를 지원합니다 (Server 5.2+ 필요):

from aerospike_py import exp

expr = exp.ge(exp.int_bin("age"), exp.int_val(21))

# 필터와 함께 Get
_, _, bins = client.get(key, policy={"filter_expression": expr})

# 필터와 함께 Put (필터가 매칭될 때만 업데이트)
expr = exp.eq(exp.string_bin("status"), exp.string_val("active"))
client.put(key, {"visits": 1}, policy={"filter_expression": expr})

# 필터와 함께 Query
query = client.query("test", "demo")
records = query.results(policy={"filter_expression": expr})

# 필터와 함께 Batch
ops = [{"op": aerospike.OPERATOR_READ, "bin": "status", "val": None}]
records = client.batch_operate(keys, ops, policy={"filter_expression": expr})

레코드가 필터 expression과 매칭되지 않으면 FilteredOut이 발생합니다. 자세한 문서는 Expression 필터 가이드를 참조하세요.